From 32938bea443a57b7e93ef677730f369005d70103 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 11 Oct 2024 12:33:57 -0700 Subject: [PATCH 1/9] initial commit --- opentelemetry-sdk/src/trace/provider.rs | 88 ++++++++++++++++++++++--- 1 file changed, 78 insertions(+), 10 deletions(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 9550ce11d2..dd5105db02 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -36,8 +36,8 @@ static NOOP_TRACER_PROVIDER: Lazy = Lazy::new(|| TracerProvider span_limits: SpanLimits::default(), resource: Cow::Owned(Resource::empty()), }, + is_shutdown: AtomicBool::new(true), }), - is_shutdown: Arc::new(AtomicBool::new(true)), }); /// TracerProvider inner type @@ -45,13 +45,20 @@ static NOOP_TRACER_PROVIDER: Lazy = Lazy::new(|| TracerProvider pub(crate) struct TracerProviderInner { processors: Vec>, config: crate::trace::Config, + is_shutdown: AtomicBool, } impl Drop for TracerProviderInner { fn drop(&mut self) { - for processor in &mut self.processors { - if let Err(err) = processor.shutdown() { - global::handle_error(err); + if self + .is_shutdown + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + for processor in &self.processors { + if let Err(err) = processor.shutdown() { + global::handle_error(err); + } } } } @@ -65,7 +72,6 @@ impl Drop for TracerProviderInner { #[derive(Clone, Debug)] pub struct TracerProvider { inner: Arc, - is_shutdown: Arc, } impl Default for TracerProvider { @@ -79,7 +85,6 @@ impl TracerProvider { pub(crate) fn new(inner: TracerProviderInner) -> Self { TracerProvider { inner: Arc::new(inner), - is_shutdown: Arc::new(AtomicBool::new(false)), } } @@ -101,7 +106,7 @@ impl TracerProvider { /// true if the provider has been shutdown /// Don't start span or export spans when provider is shutdown pub(crate) fn is_shutdown(&self) -> bool { - self.is_shutdown.load(Ordering::Relaxed) + self.inner.is_shutdown.load(Ordering::Relaxed) } /// Force flush all remaining spans in span processors and return results. @@ -153,6 +158,7 @@ impl TracerProvider { /// Note that shut down doesn't means the TracerProvider has dropped pub fn shutdown(&self) -> TraceResult<()> { if self + .inner .is_shutdown .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_ok() @@ -215,7 +221,7 @@ impl opentelemetry::trace::TracerProvider for TracerProvider { } fn library_tracer(&self, library: Arc) -> Self::Tracer { - if self.is_shutdown.load(Ordering::Relaxed) { + if self.inner.is_shutdown.load(Ordering::Relaxed) { return Tracer::new(library, NOOP_TRACER_PROVIDER.clone()); } Tracer::new(library, self.clone()) @@ -292,7 +298,12 @@ impl Builder { p.set_resource(config.resource.as_ref()); } - TracerProvider::new(TracerProviderInner { processors, config }) + let is_shutdown = AtomicBool::new(false); + TracerProvider::new(TracerProviderInner { + processors, + config, + is_shutdown, + }) } } @@ -391,6 +402,7 @@ mod tests { Box::from(TestSpanProcessor::new(false)), ], config: Default::default(), + is_shutdown: AtomicBool::new(false), }); let results = tracer_provider.force_flush(); @@ -534,6 +546,7 @@ mod tests { let tracer_provider = super::TracerProvider::new(TracerProviderInner { processors: vec![Box::from(processor)], config: Default::default(), + is_shutdown: AtomicBool::new(false), }); let test_tracer_1 = tracer_provider.tracer("test1"); @@ -554,14 +567,69 @@ mod tests { // after shutdown we should get noop tracer let noop_tracer = tracer_provider.tracer("noop"); + // noop tracer cannot start anything let _ = noop_tracer.start("test"); assert!(assert_handle.started_span_count(2)); // noop tracer's tracer provider should be shutdown - assert!(noop_tracer.provider().is_shutdown.load(Ordering::SeqCst)); + assert!(noop_tracer.provider().is_shutdown()); // existing tracer becomes noops after shutdown let _ = test_tracer_1.start("test"); assert!(assert_handle.started_span_count(2)); } + + #[test] + fn test_tracer_provider_inner_drop_shutdown() { + // Test 1: Already shutdown case + { + let processor = TestSpanProcessor::new(true); + let assert_handle = processor.assert_info(); + let provider = super::TracerProvider::new(TracerProviderInner { + processors: vec![Box::from(processor)], + config: Default::default(), + is_shutdown: AtomicBool::new(false), + }); + + // Create multiple providers sharing same inner + let provider2 = provider.clone(); + let provider3 = provider.clone(); + + // Shutdown explicitly first + assert!(provider.shutdown().is_ok()); + + // Drop all providers - should not trigger another shutdown in TracerProviderInner::drop + drop(provider); + drop(provider2); + drop(provider3); + + // Verify shutdown was called exactly once + assert!(assert_handle.0.is_shutdown.load(Ordering::SeqCst)); + } + + // Test 2: Not shutdown case + { + let processor = TestSpanProcessor::new(true); + let assert_handle = processor.assert_info(); + let provider = super::TracerProvider::new(TracerProviderInner { + processors: vec![Box::from(processor)], + config: Default::default(), + is_shutdown: AtomicBool::new(false), + }); + + // Create multiple providers sharing same inner + let provider2 = provider.clone(); + let provider3 = provider.clone(); + + // Drop providers without explicit shutdown + drop(provider); + drop(provider2); + + // Last drop should trigger shutdown in TracerProviderInner::drop + drop(provider3); + + // Verify shutdown was called exactly once + assert!(assert_handle.0.is_shutdown.load(Ordering::SeqCst)); + } + } } From c08c05530f4816f0852c21807e73eb807f71c15f Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 11 Oct 2024 15:36:43 -0700 Subject: [PATCH 2/9] user reusable shutdown --- opentelemetry-sdk/src/trace/provider.rs | 63 +++++++++++++------------ 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index dd5105db02..d8877cc56e 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -48,27 +48,52 @@ pub(crate) struct TracerProviderInner { is_shutdown: AtomicBool, } -impl Drop for TracerProviderInner { - fn drop(&mut self) { +impl TracerProviderInner { + /// Crate-private shutdown method to be called both from explicit shutdown + /// and from Drop when the last reference is released. + pub(crate) fn shutdown(&self) -> TraceResult<()> { if self .is_shutdown .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_ok() { + // propagate the shutdown signal to processors + // it's up to the processor to properly block new spans after shutdown + let mut errs = vec![]; for processor in &self.processors { if let Err(err) = processor.shutdown() { - global::handle_error(err); + errs.push(err); } } + + if errs.is_empty() { + Ok(()) + } else { + Err(TraceError::Other(format!("{errs:?}").into())) + } + } else { + Err(TraceError::Other( + "tracer provider already shut down".into(), + )) + } + } +} + +impl Drop for TracerProviderInner { + fn drop(&mut self) { + if let Err(err) = self.shutdown() { + global::handle_error(err); } } } /// Creator and registry of named [`Tracer`] instances. /// -/// `TracerProvider` is lightweight container holding pointers to `SpanProcessor` and other components. -/// Cloning and dropping them will not stop the span processing. To stop span processing, users -/// must either call `shutdown` method explicitly, or drop every clone of `TracerProvider`. +/// `TracerProvider` is a lightweight container holding pointers to `SpanProcessor` and other components. +/// Cloning a `TracerProvider` instance will not stop span processing. To stop span processing, users +/// must either call the `shutdown` method explicitly or allow the last reference to the `TracerProvider` +/// to be dropped. When the last reference is dropped, the shutdown process will be automatically triggered +/// to ensure proper cleanup. #[derive(Clone, Debug)] pub struct TracerProvider { inner: Arc, @@ -157,31 +182,7 @@ impl TracerProvider { /// /// Note that shut down doesn't means the TracerProvider has dropped pub fn shutdown(&self) -> TraceResult<()> { - if self - .inner - .is_shutdown - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - { - // propagate the shutdown signal to processors - // it's up to the processor to properly block new spans after shutdown - let mut errs = vec![]; - for processor in &self.inner.processors { - if let Err(err) = processor.shutdown() { - errs.push(err); - } - } - - if errs.is_empty() { - Ok(()) - } else { - Err(TraceError::Other(format!("{errs:?}").into())) - } - } else { - Err(TraceError::Other( - "tracer provider already shut down".into(), - )) - } + self.inner.shutdown() } } From f270dcdd101bb135fbece7d89951c1d1764205f0 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 14 Oct 2024 10:01:18 -0700 Subject: [PATCH 3/9] restructure TracerProviderInner::Drop, TracerProvider::Shutdown, and TracerProviderInner::Shutdown --- opentelemetry-sdk/src/trace/provider.rs | 142 ++++++++++++++++++------ opentelemetry/src/trace/mod.rs | 4 + 2 files changed, 109 insertions(+), 37 deletions(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index d8877cc56e..c2428557cc 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -1,13 +1,73 @@ -//! # Trace Provider SDK -//! -//! ## Tracer Creation -//! -//! New [`Tracer`] instances are always created through a [`TracerProvider`]. -//! -//! All configuration objects and extension points (span processors, -//! propagators) are provided by the [`TracerProvider`]. [`Tracer`] instances do -//! not duplicate this data to avoid that different [`Tracer`] instances -//! of the [`TracerProvider`] have different versions of these data. +/// # Trace Provider SDK +/// +/// The `TracerProvider` handles the creation and management of [`Tracer`] instances and coordinates +/// span processing. It serves as the central configuration point for tracing, ensuring consistency +/// across all [`Tracer`] instances it creates. +/// +/// ## Tracer Creation +/// +/// New [`Tracer`] instances are always created through a `TracerProvider`. These `Tracer`s share +/// a common configuration, which includes the [`Resource`], span processors, sampling strategies, +/// and span limits. This avoids the need for each `Tracer` to maintain its own version of these +/// configurations, ensuring uniform behavior across all instances. +/// +/// ## Cloning and Shutdown +/// +/// The `TracerProvider` is designed to be lightweight and clonable. Cloning a `TracerProvider` +/// creates a new reference to the same provider, not a new instance. Dropping the last reference +/// to the `TracerProvider` will automatically trigger its shutdown. During shutdown, the provider +/// will flush all remaining spans for **batch processors**, ensuring they are exported to the configured +/// exporters. However, **simple processors** do not require a flush, as they export spans immediately +/// when they end. Users can also manually trigger shutdown using the [`shutdown`](TracerProvider::shutdown) +/// method, which will ensure the same behavior (flushing for batch processors, but no additional action +/// for simple processors). +/// +/// Once shut down, the `TracerProvider` transitions into a disabled state. In this state, further +/// operations on its associated `Tracer` instances will result in no-ops, ensuring that no spans +/// are processed or exported after shutdown. +/// +/// ## Span Processing and Force Flush +/// +/// The `TracerProvider` manages the lifecycle of span processors, which are responsible for +/// collecting, processing, and exporting spans. To ensure all spans are processed before shutdown, +/// users can call the [`force_flush`](TracerProvider::force_flush) method at any time to trigger +/// an immediate flush of all pending spans for **batch processors** to the exporters. Note that +/// calling [`force_flush`](TracerProvider::force_flush) is optional before shutdown, as `shutdown` +/// will automatically trigger a flush for batch processors, but not for simple processors. +/// +/// # Examples +/// +/// ``` +/// use opentelemetry::global; +/// use opentelemetry_sdk::trace::TracerProvider; +/// +/// fn init_tracing() -> TracerProvider { +/// let provider = TracerProvider::default(); +/// +/// // Set the provider to be used globally +/// let _ = global::set_tracer_provider(provider.clone()); +/// +/// provider +/// } +/// +/// fn main() { +/// let provider = init_tracing(); +/// +/// // create spans... +/// +/// // Flush all spans before shutdown (optional for batch processors) +/// for result in provider.force_flush() { +/// if let Err(err) = result { +/// // Handle flush error... +/// } +/// } +/// +/// // Dropping the provider ensures remaining spans are flushed for batch processors +/// // and shuts down the global tracer provider. +/// drop(provider); +/// global::shutdown_tracer_provider(); +/// } +/// ``` use crate::runtime::RuntimeChannel; use crate::trace::{ BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer, @@ -16,7 +76,7 @@ use crate::{export::trace::SpanExporter, trace::SpanProcessor}; use crate::{InstrumentationLibrary, Resource}; use once_cell::sync::{Lazy, OnceCell}; use opentelemetry::trace::TraceError; -use opentelemetry::{global, trace::TraceResult}; +use opentelemetry::{otel_debug, trace::TraceResult}; use std::borrow::Cow; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -51,38 +111,31 @@ pub(crate) struct TracerProviderInner { impl TracerProviderInner { /// Crate-private shutdown method to be called both from explicit shutdown /// and from Drop when the last reference is released. - pub(crate) fn shutdown(&self) -> TraceResult<()> { - if self - .is_shutdown - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - { - // propagate the shutdown signal to processors - // it's up to the processor to properly block new spans after shutdown - let mut errs = vec![]; - for processor in &self.processors { - if let Err(err) = processor.shutdown() { - errs.push(err); - } - } - - if errs.is_empty() { - Ok(()) - } else { - Err(TraceError::Other(format!("{errs:?}").into())) + pub(crate) fn shutdown(&self) -> Vec { + let mut errs = vec![]; + for processor in &self.processors { + if let Err(err) = processor.shutdown() { + // Log at debug level because: + // - The error is also returned to the user for handling (if applicable) + // - Or the error occurs during `TracerProviderInner::Drop` as part of telemetry shutdown, + // which is non-actionable by the user + otel_debug!(name: "TracerProvider.Drop.ShutdownError", + error = format!("{err}")); + errs.push(err); } - } else { - Err(TraceError::Other( - "tracer provider already shut down".into(), - )) } + errs } } impl Drop for TracerProviderInner { fn drop(&mut self) { - if let Err(err) = self.shutdown() { - global::handle_error(err); + if !self.is_shutdown.load(Ordering::Relaxed) { + let _ = self.shutdown(); // errors are handled within shutdown + } else { + otel_debug!( + name: "TracerProviderProvider.Drop.AlreadyShutdown" + ); } } } @@ -182,7 +235,22 @@ impl TracerProvider { /// /// Note that shut down doesn't means the TracerProvider has dropped pub fn shutdown(&self) -> TraceResult<()> { - self.inner.shutdown() + if self + .inner + .is_shutdown + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + // propagate the shutdown signal to processors + let errs = self.inner.shutdown(); + if errs.is_empty() { + Ok(()) + } else { + Err(TraceError::Other(format!("{errs:?}").into())) + } + } else { + Err(TraceError::AlreadyShutdown("TracerProvider".to_string())) + } } } diff --git a/opentelemetry/src/trace/mod.rs b/opentelemetry/src/trace/mod.rs index 5e3edc3519..cfc0a96cde 100644 --- a/opentelemetry/src/trace/mod.rs +++ b/opentelemetry/src/trace/mod.rs @@ -200,6 +200,10 @@ pub enum TraceError { #[error("Exporting timed out after {} seconds", .0.as_secs())] ExportTimedOut(time::Duration), + /// already shutdown error + #[error("{0} already shutdown")] + AlreadyShutdown(String), + /// Other errors propagated from trace SDK that weren't covered above #[error(transparent)] Other(#[from] Box), From 1a3dd673e6b022b62bebaf9e489789b8c89a03b5 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 15 Oct 2024 18:33:13 -0700 Subject: [PATCH 4/9] Update opentelemetry-sdk/src/trace/provider.rs Co-authored-by: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> --- opentelemetry-sdk/src/trace/provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index c2428557cc..6594f771e4 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -134,7 +134,7 @@ impl Drop for TracerProviderInner { let _ = self.shutdown(); // errors are handled within shutdown } else { otel_debug!( - name: "TracerProviderProvider.Drop.AlreadyShutdown" + name: "TracerProvider.Drop.AlreadyShutdown" ); } } From 36012dc764b0e236856dcd920e8d459a70406099 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 15 Oct 2024 18:34:45 -0700 Subject: [PATCH 5/9] Update opentelemetry-sdk/src/trace/provider.rs Co-authored-by: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> --- opentelemetry-sdk/src/trace/provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 6594f771e4..c497cf0022 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -143,7 +143,7 @@ impl Drop for TracerProviderInner { /// Creator and registry of named [`Tracer`] instances. /// /// `TracerProvider` is a lightweight container holding pointers to `SpanProcessor` and other components. -/// Cloning a `TracerProvider` instance will not stop span processing. To stop span processing, users +/// Cloning a `TracerProvider` instance and dropping it will not stop span processing. To stop span processing, users /// must either call the `shutdown` method explicitly or allow the last reference to the `TracerProvider` /// to be dropped. When the last reference is dropped, the shutdown process will be automatically triggered /// to ensure proper cleanup. From 0d7d1abddc8d926f01b98439a113aa43cea356ea Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 17 Oct 2024 11:28:26 -0700 Subject: [PATCH 6/9] fix unit test --- opentelemetry-sdk/src/trace/provider.rs | 174 ++++++++++++++++-------- opentelemetry/src/trace/mod.rs | 4 +- 2 files changed, 120 insertions(+), 58 deletions(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index c497cf0022..5518ac3407 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -16,11 +16,9 @@ /// The `TracerProvider` is designed to be lightweight and clonable. Cloning a `TracerProvider` /// creates a new reference to the same provider, not a new instance. Dropping the last reference /// to the `TracerProvider` will automatically trigger its shutdown. During shutdown, the provider -/// will flush all remaining spans for **batch processors**, ensuring they are exported to the configured -/// exporters. However, **simple processors** do not require a flush, as they export spans immediately -/// when they end. Users can also manually trigger shutdown using the [`shutdown`](TracerProvider::shutdown) -/// method, which will ensure the same behavior (flushing for batch processors, but no additional action -/// for simple processors). +/// will flush all remaining spans, ensuring they are exported to the configured exporters. +/// Users can also manually trigger shutdown using the [`shutdown`](TracerProvider::shutdown) +/// method, which will ensure the same behavior. /// /// Once shut down, the `TracerProvider` transitions into a disabled state. In this state, further /// operations on its associated `Tracer` instances will result in no-ops, ensuring that no spans @@ -31,15 +29,14 @@ /// The `TracerProvider` manages the lifecycle of span processors, which are responsible for /// collecting, processing, and exporting spans. To ensure all spans are processed before shutdown, /// users can call the [`force_flush`](TracerProvider::force_flush) method at any time to trigger -/// an immediate flush of all pending spans for **batch processors** to the exporters. Note that -/// calling [`force_flush`](TracerProvider::force_flush) is optional before shutdown, as `shutdown` -/// will automatically trigger a flush for batch processors, but not for simple processors. +/// an immediate flush of all pending spans to the exporters. /// /// # Examples /// /// ``` /// use opentelemetry::global; /// use opentelemetry_sdk::trace::TracerProvider; +/// use opentelemetry::trace::Tracer; /// /// fn init_tracing() -> TracerProvider { /// let provider = TracerProvider::default(); @@ -53,19 +50,16 @@ /// fn main() { /// let provider = init_tracing(); /// -/// // create spans... +/// // create tracer.. +/// let tracer = global::tracer("example/client"); /// -/// // Flush all spans before shutdown (optional for batch processors) -/// for result in provider.force_flush() { -/// if let Err(err) = result { -/// // Handle flush error... -/// } -/// } +/// // create span... +/// let span = tracer +/// .span_builder("test_span") +/// .start(&tracer); /// -/// // Dropping the provider ensures remaining spans are flushed for batch processors -/// // and shuts down the global tracer provider. -/// drop(provider); -/// global::shutdown_tracer_provider(); +/// // Explicitly shut down the provider +/// provider.shutdown(); /// } /// ``` use crate::runtime::RuntimeChannel; @@ -249,7 +243,7 @@ impl TracerProvider { Err(TraceError::Other(format!("{errs:?}").into())) } } else { - Err(TraceError::AlreadyShutdown("TracerProvider".to_string())) + Err(TraceError::AlreadyShutdown) } } } @@ -391,6 +385,7 @@ mod tests { use std::env; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; + use std::sync::Mutex; // fields below is wrapped with Arc so we can assert it #[derive(Default, Debug)] @@ -648,57 +643,124 @@ mod tests { assert!(assert_handle.started_span_count(2)); } + #[derive(Debug)] + struct CountingShutdownProcessor { + shutdown_count: Arc>, + flush_called: Arc>, + } + + impl CountingShutdownProcessor { + fn new(shutdown_count: Arc>, flush_called: Arc>) -> Self { + CountingShutdownProcessor { + shutdown_count, + flush_called, + } + } + } + + impl SpanProcessor for CountingShutdownProcessor { + fn on_start(&self, _span: &mut Span, _cx: &Context) { + // No operation needed for this processor + } + + fn on_end(&self, _span: SpanData) { + // No operation needed for this processor + } + + fn force_flush(&self) -> TraceResult<()> { + *self.flush_called.lock().unwrap() = true; + Ok(()) + } + + fn shutdown(&self) -> TraceResult<()> { + let mut count = self.shutdown_count.lock().unwrap(); + *count += 1; + Ok(()) + } + } + #[test] - fn test_tracer_provider_inner_drop_shutdown() { - // Test 1: Already shutdown case + fn drop_test_with_multiple_providers() { + let shutdown_called = Arc::new(Mutex::new(0)); + let flush_called = Arc::new(Mutex::new(false)); + { - let processor = TestSpanProcessor::new(true); - let assert_handle = processor.assert_info(); - let provider = super::TracerProvider::new(TracerProviderInner { - processors: vec![Box::from(processor)], - config: Default::default(), + // Create a shared TracerProviderInner and use it across multiple providers + let shared_inner = Arc::new(TracerProviderInner { + processors: vec![Box::new(CountingShutdownProcessor::new( + shutdown_called.clone(), + flush_called.clone(), + ))], + config: Config::default(), is_shutdown: AtomicBool::new(false), }); - // Create multiple providers sharing same inner - let provider2 = provider.clone(); - let provider3 = provider.clone(); + { + let tracer_provider1 = super::TracerProvider { + inner: shared_inner.clone(), + }; + let tracer_provider2 = super::TracerProvider { + inner: shared_inner.clone(), + }; - // Shutdown explicitly first - assert!(provider.shutdown().is_ok()); + let tracer1 = tracer_provider1.tracer("test-tracer1"); + let tracer2 = tracer_provider2.tracer("test-tracer2"); - // Drop all providers - should not trigger another shutdown in TracerProviderInner::drop - drop(provider); - drop(provider2); - drop(provider3); + let _span1 = tracer1.start("span1"); + let _span2 = tracer2.start("span2"); - // Verify shutdown was called exactly once - assert!(assert_handle.0.is_shutdown.load(Ordering::SeqCst)); + // TracerProviderInner should not be dropped yet, since both providers and `shared_inner` + // are still holding a reference. + } + // At this point, both `tracer_provider1` and `tracer_provider2` are dropped, + // but `shared_inner` still holds a reference, so `TracerProviderInner` is NOT dropped yet. + assert_eq!(*shutdown_called.lock().unwrap(), 0); } + // Verify shutdown was called during the drop of the shared TracerProviderInner + assert_eq!(*shutdown_called.lock().unwrap(), 1); + // Verify flush was not called during drop + assert!(!*flush_called.lock().unwrap()); + } - // Test 2: Not shutdown case + #[test] + fn drop_after_shutdown_test_with_multiple_providers() { + let shutdown_called = Arc::new(Mutex::new(0)); // Count the number of times shutdown is called + let flush_called = Arc::new(Mutex::new(false)); + + // Create a shared TracerProviderInner and use it across multiple providers + let shared_inner = Arc::new(TracerProviderInner { + processors: vec![Box::new(CountingShutdownProcessor::new( + shutdown_called.clone(), + flush_called.clone(), + ))], + config: Config::default(), + is_shutdown: AtomicBool::new(false), + }); + + // Create a scope to test behavior when providers are dropped { - let processor = TestSpanProcessor::new(true); - let assert_handle = processor.assert_info(); - let provider = super::TracerProvider::new(TracerProviderInner { - processors: vec![Box::from(processor)], - config: Default::default(), - is_shutdown: AtomicBool::new(false), - }); + let tracer_provider1 = super::TracerProvider { + inner: shared_inner.clone(), + }; + let tracer_provider2 = super::TracerProvider { + inner: shared_inner.clone(), + }; - // Create multiple providers sharing same inner - let provider2 = provider.clone(); - let provider3 = provider.clone(); + // Explicitly shut down the tracer provider + let shutdown_result = tracer_provider1.shutdown(); + assert!(shutdown_result.is_ok()); - // Drop providers without explicit shutdown - drop(provider); - drop(provider2); + // Verify that shutdown was called exactly once + assert_eq!(*shutdown_called.lock().unwrap(), 1); - // Last drop should trigger shutdown in TracerProviderInner::drop - drop(provider3); + // TracerProvider2 should observe the shutdown state but not trigger another shutdown + let shutdown_result2 = tracer_provider2.shutdown(); + assert!(shutdown_result2.is_err()); - // Verify shutdown was called exactly once - assert!(assert_handle.0.is_shutdown.load(Ordering::SeqCst)); + // Both tracer providers will be dropped at the end of this scope } + + // Verify that shutdown was only called once, even after drop + assert_eq!(*shutdown_called.lock().unwrap(), 1); } } diff --git a/opentelemetry/src/trace/mod.rs b/opentelemetry/src/trace/mod.rs index cfc0a96cde..5dfb47244e 100644 --- a/opentelemetry/src/trace/mod.rs +++ b/opentelemetry/src/trace/mod.rs @@ -201,8 +201,8 @@ pub enum TraceError { ExportTimedOut(time::Duration), /// already shutdown error - #[error("{0} already shutdown")] - AlreadyShutdown(String), + #[error("TracerProvider already shutdown")] + AlreadyShutdown, /// Other errors propagated from trace SDK that weren't covered above #[error(transparent)] From 2279f3a741129108e79ecaef7bfe630f1862b8ad Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 17 Oct 2024 14:55:27 -0700 Subject: [PATCH 7/9] update doc --- opentelemetry-sdk/src/trace/provider.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 5518ac3407..3985846296 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -16,7 +16,7 @@ /// The `TracerProvider` is designed to be lightweight and clonable. Cloning a `TracerProvider` /// creates a new reference to the same provider, not a new instance. Dropping the last reference /// to the `TracerProvider` will automatically trigger its shutdown. During shutdown, the provider -/// will flush all remaining spans, ensuring they are exported to the configured exporters. +/// will flush all remaining spans, ensuring they are passed to the configured exporters. /// Users can also manually trigger shutdown using the [`shutdown`](TracerProvider::shutdown) /// method, which will ensure the same behavior. /// @@ -27,9 +27,9 @@ /// ## Span Processing and Force Flush /// /// The `TracerProvider` manages the lifecycle of span processors, which are responsible for -/// collecting, processing, and exporting spans. To ensure all spans are processed before shutdown, -/// users can call the [`force_flush`](TracerProvider::force_flush) method at any time to trigger -/// an immediate flush of all pending spans to the exporters. +/// collecting, processing, and exporting spans. The [`force_flush`](TracerProvider::force_flush) method +/// invoked at any time will trigger an immediate flush of all pending spans (if any) to the exporters. +/// This will block the user thread till all the spans are passed to exporters. /// /// # Examples /// From 21fa84e9a530c7136b1ed5785fc3b07ed9cfac43 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 17 Oct 2024 15:12:09 -0700 Subject: [PATCH 8/9] update to atomics --- opentelemetry-sdk/src/trace/provider.rs | 34 ++++++++++++------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 3985846296..2e34a76160 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -385,7 +385,6 @@ mod tests { use std::env; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; - use std::sync::Mutex; // fields below is wrapped with Arc so we can assert it #[derive(Default, Debug)] @@ -645,12 +644,12 @@ mod tests { #[derive(Debug)] struct CountingShutdownProcessor { - shutdown_count: Arc>, - flush_called: Arc>, + shutdown_count: Arc, + flush_called: Arc, } impl CountingShutdownProcessor { - fn new(shutdown_count: Arc>, flush_called: Arc>) -> Self { + fn new(shutdown_count: Arc, flush_called: Arc) -> Self { CountingShutdownProcessor { shutdown_count, flush_called, @@ -668,27 +667,26 @@ mod tests { } fn force_flush(&self) -> TraceResult<()> { - *self.flush_called.lock().unwrap() = true; + self.flush_called.store(true, Ordering::SeqCst); Ok(()) } fn shutdown(&self) -> TraceResult<()> { - let mut count = self.shutdown_count.lock().unwrap(); - *count += 1; + self.shutdown_count.fetch_add(1, Ordering::SeqCst); Ok(()) } } #[test] fn drop_test_with_multiple_providers() { - let shutdown_called = Arc::new(Mutex::new(0)); - let flush_called = Arc::new(Mutex::new(false)); + let shutdown_count = Arc::new(AtomicU32::new(0)); + let flush_called = Arc::new(AtomicBool::new(false)); { // Create a shared TracerProviderInner and use it across multiple providers let shared_inner = Arc::new(TracerProviderInner { processors: vec![Box::new(CountingShutdownProcessor::new( - shutdown_called.clone(), + shutdown_count.clone(), flush_called.clone(), ))], config: Config::default(), @@ -714,23 +712,23 @@ mod tests { } // At this point, both `tracer_provider1` and `tracer_provider2` are dropped, // but `shared_inner` still holds a reference, so `TracerProviderInner` is NOT dropped yet. - assert_eq!(*shutdown_called.lock().unwrap(), 0); + assert_eq!(shutdown_count.load(Ordering::SeqCst), 0); } // Verify shutdown was called during the drop of the shared TracerProviderInner - assert_eq!(*shutdown_called.lock().unwrap(), 1); + assert_eq!(shutdown_count.load(Ordering::SeqCst), 1); // Verify flush was not called during drop - assert!(!*flush_called.lock().unwrap()); + assert!(!flush_called.load(Ordering::SeqCst)); } #[test] fn drop_after_shutdown_test_with_multiple_providers() { - let shutdown_called = Arc::new(Mutex::new(0)); // Count the number of times shutdown is called - let flush_called = Arc::new(Mutex::new(false)); + let shutdown_count = Arc::new(AtomicU32::new(0)); + let flush_called = Arc::new(AtomicBool::new(false)); // Create a shared TracerProviderInner and use it across multiple providers let shared_inner = Arc::new(TracerProviderInner { processors: vec![Box::new(CountingShutdownProcessor::new( - shutdown_called.clone(), + shutdown_count.clone(), flush_called.clone(), ))], config: Config::default(), @@ -751,7 +749,7 @@ mod tests { assert!(shutdown_result.is_ok()); // Verify that shutdown was called exactly once - assert_eq!(*shutdown_called.lock().unwrap(), 1); + assert_eq!(shutdown_count.load(Ordering::SeqCst), 1); // TracerProvider2 should observe the shutdown state but not trigger another shutdown let shutdown_result2 = tracer_provider2.shutdown(); @@ -761,6 +759,6 @@ mod tests { } // Verify that shutdown was only called once, even after drop - assert_eq!(*shutdown_called.lock().unwrap(), 1); + assert_eq!(shutdown_count.load(Ordering::SeqCst), 1); } } From 6ea4a63654e8147385c88ac0a0b76a4f88a279fc Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 18 Oct 2024 12:03:29 -0700 Subject: [PATCH 9/9] Update opentelemetry-sdk/src/trace/provider.rs Co-authored-by: Cijo Thomas --- opentelemetry-sdk/src/trace/provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 2e34a76160..eef35aaa9b 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -16,7 +16,7 @@ /// The `TracerProvider` is designed to be lightweight and clonable. Cloning a `TracerProvider` /// creates a new reference to the same provider, not a new instance. Dropping the last reference /// to the `TracerProvider` will automatically trigger its shutdown. During shutdown, the provider -/// will flush all remaining spans, ensuring they are passed to the configured exporters. +/// will flush all remaining spans, ensuring they are passed to the configured processors. /// Users can also manually trigger shutdown using the [`shutdown`](TracerProvider::shutdown) /// method, which will ensure the same behavior. ///