From d45453696d1e3a1430d64c46416860342d9fcbf0 Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 1 Dec 2024 02:26:13 +0000 Subject: [PATCH 01/19] initial commit --- opentelemetry-sdk/src/export/logs/mod.rs | 7 +- opentelemetry-sdk/src/logs/log_processor.rs | 147 +++++++++----------- 2 files changed, 67 insertions(+), 87 deletions(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index a2b0c98729..88d63c7860 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -2,7 +2,6 @@ use crate::logs::LogRecord; use crate::logs::{LogError, LogResult}; use crate::Resource; -use async_trait::async_trait; #[cfg(feature = "spec_unstable_logs_enabled")] use opentelemetry::logs::Severity; use opentelemetry::InstrumentationScope; @@ -63,7 +62,6 @@ impl LogBatch<'_> { } /// `LogExporter` defines the interface that log exporters should implement. -#[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of log records and their associated instrumentation scopes. /// @@ -82,7 +80,10 @@ pub trait LogExporter: Send + Sync + Debug { /// A `LogResult<()>`, which is a result type indicating either a successful export (with /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. /// - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>; + fn export<'a>( + &mut self, + batch: LogBatch<'a>, + ) -> impl std::future::Future> + Send; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index df79ce3f28..7d32e01df8 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -13,7 +13,7 @@ use futures_util::{ use opentelemetry::logs::Severity; use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::AtomicBool; use std::{cmp::min, env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, @@ -77,13 +77,13 @@ pub trait LogProcessor: Send + Sync + Debug { /// debugging and testing. For scenarios requiring higher /// performance/throughput, consider using [BatchLogProcessor]. #[derive(Debug)] -pub struct SimpleLogProcessor { - exporter: Mutex>, +pub struct SimpleLogProcessor { + exporter: Mutex, is_shutdown: AtomicBool, } -impl SimpleLogProcessor { - pub(crate) fn new(exporter: Box) -> Self { +impl SimpleLogProcessor { + pub(crate) fn new(exporter: T) -> Self { SimpleLogProcessor { exporter: Mutex::new(exporter), is_shutdown: AtomicBool::new(false), @@ -91,7 +91,7 @@ impl SimpleLogProcessor { } } -impl LogProcessor for SimpleLogProcessor { +impl LogProcessor for SimpleLogProcessor { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { @@ -154,12 +154,6 @@ impl LogProcessor for SimpleLogProcessor { /// them at a pre-configured interval. pub struct BatchLogProcessor { message_sender: R::Sender, - - // Track dropped logs - we'll log this at shutdown - dropped_logs_count: AtomicUsize, - - // Track the maximum queue size that was configured for this processor - max_queue_size: usize, } impl Debug for BatchLogProcessor { @@ -178,13 +172,11 @@ impl LogProcessor for BatchLogProcessor { ))); // TODO - Implement throttling to prevent error flooding when the queue is full or closed. - if result.is_err() { - // Increment dropped logs count. The first time we have to drop a log, - // emit a warning. - if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 { - otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", - message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); - } + if let Err(err) = result { + otel_error!( + name: "BatchLogProcessor.Export.Error", + error = format!("{}", err) + ); } } @@ -200,17 +192,6 @@ impl LogProcessor for BatchLogProcessor { } fn shutdown(&self) -> LogResult<()> { - let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); - let max_queue_size = self.max_queue_size; - if dropped_logs > 0 { - otel_warn!( - name: "BatchLogProcessor.LogsDropped", - dropped_logs_count = dropped_logs, - max_queue_size = max_queue_size, - message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." - ); - } - let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessage::Shutdown(res_sender)) @@ -230,11 +211,14 @@ impl LogProcessor for BatchLogProcessor { } impl BatchLogProcessor { - pub(crate) fn new(mut exporter: Box, config: BatchConfig, runtime: R) -> Self { + pub(crate) fn new(mut exporter: E, config: BatchConfig, runtime: R) -> Self + where + E: LogExporter + Send + Sync + 'static, + { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); + //let exporter = Arc::new(Mutex::new(exporter)); let inner_runtime = runtime.clone(); - let max_queue_size = config.max_queue_size; // Spawn worker process via user-defined spawn function. runtime.spawn(Box::pin(async move { @@ -256,7 +240,8 @@ impl BatchLogProcessor { if logs.len() == config.max_export_batch_size { let result = export_with_timeout( config.max_export_timeout, - exporter.as_mut(), + &mut exporter, + // Arc::clone(&exporter ), &timeout_runtime, logs.split_off(0), ) @@ -274,7 +259,8 @@ impl BatchLogProcessor { BatchMessage::Flush(res_channel) => { let result = export_with_timeout( config.max_export_timeout, - exporter.as_mut(), + //Arc::clone(&exporter ), + &mut exporter, &timeout_runtime, logs.split_off(0), ) @@ -293,7 +279,8 @@ impl BatchLogProcessor { BatchMessage::Shutdown(ch) => { let result = export_with_timeout( config.max_export_timeout, - exporter.as_mut(), + //Arc::clone(&exporter ), + &mut exporter, &timeout_runtime, logs.split_off(0), ) @@ -316,13 +303,8 @@ impl BatchLogProcessor { } } })); - // Return batch processor with link to worker - BatchLogProcessor { - message_sender, - dropped_logs_count: AtomicUsize::new(0), - max_queue_size, - } + BatchLogProcessor { message_sender } } /// Create a new batch processor builder @@ -338,8 +320,9 @@ impl BatchLogProcessor { } } -async fn export_with_timeout( +async fn export_with_timeout( time_out: Duration, + //exporter: Arc>, exporter: &mut E, runtime: &R, batch: Vec<(LogRecord, InstrumentationScope)>, @@ -528,7 +511,7 @@ where /// Build a batch processor pub fn build(self) -> BatchLogProcessor { - BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime) + BatchLogProcessor::new(self.exporter, self.config, self.runtime) } } @@ -586,8 +569,11 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> { - Ok(()) + fn export( + &mut self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async { Ok(()) } } fn shutdown(&mut self) {} @@ -764,7 +750,7 @@ mod tests { let exporter = MockLogExporter { resource: Arc::new(Mutex::new(None)), }; - let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let processor = SimpleLogProcessor::new(exporter.clone()); let _ = LoggerProvider::builder() .with_log_processor(processor) .with_resource(Resource::new(vec![ @@ -783,11 +769,8 @@ mod tests { let exporter = MockLogExporter { resource: Arc::new(Mutex::new(None)), }; - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - runtime::Tokio, - ); + let processor = + BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); let provider = LoggerProvider::builder() .with_log_processor(processor) .with_resource(Resource::new(vec![ @@ -810,11 +793,8 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default() .keep_records_on_shutdown() .build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - runtime::Tokio, - ); + let processor = + BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); let mut record = LogRecord::default(); let instrumentation = InstrumentationScope::default(); @@ -832,7 +812,7 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default() .keep_records_on_shutdown() .build(); - let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let processor = SimpleLogProcessor::new(exporter.clone()); let mut record: LogRecord = Default::default(); let instrumentation: InstrumentationScope = Default::default(); @@ -855,11 +835,8 @@ mod tests { #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"] async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - runtime::Tokio, - ); + let processor = + BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); // // deadloack happens in shutdown with tokio current_thread runtime @@ -871,7 +848,7 @@ mod tests { async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::TokioCurrentThread, ); @@ -882,11 +859,8 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - runtime::Tokio, - ); + let processor = + BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); processor.shutdown().unwrap(); } @@ -895,7 +869,7 @@ mod tests { async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::TokioCurrentThread, ); @@ -1013,7 +987,7 @@ mod tests { #[test] fn test_simple_processor_sync_exporter_without_runtime() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let processor = SimpleLogProcessor::new(exporter.clone()); let mut record: LogRecord = Default::default(); let instrumentation: InstrumentationScope = Default::default(); @@ -1026,7 +1000,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_simple_processor_sync_exporter_with_runtime() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let processor = SimpleLogProcessor::new(exporter.clone()); let mut record: LogRecord = Default::default(); let instrumentation: InstrumentationScope = Default::default(); @@ -1039,7 +1013,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_simple_processor_sync_exporter_with_multi_thread_runtime() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = Arc::new(SimpleLogProcessor::new(Box::new(exporter.clone()))); + let processor = Arc::new(SimpleLogProcessor::new(exporter.clone())); let mut handles = vec![]; for _ in 0..10 { @@ -1062,7 +1036,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn test_simple_processor_sync_exporter_with_current_thread_runtime() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let processor = SimpleLogProcessor::new(exporter.clone()); let mut record: LogRecord = Default::default(); let instrumentation: InstrumentationScope = Default::default(); @@ -1091,16 +1065,21 @@ mod tests { } } - #[async_trait::async_trait] impl LogExporter for LogExporterThatRequiresTokio { - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { + fn export( + &mut self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { // Simulate minimal dependency on tokio by sleeping asynchronously for a short duration - tokio::time::sleep(Duration::from_millis(50)).await; + let export_count = Arc::clone(&self.export_count); + async move { + tokio::time::sleep(Duration::from_millis(50)).await; - for _ in batch.iter() { - self.export_count.fetch_add(1, Ordering::Acquire); + for _ in batch.iter() { + export_count.fetch_add(1, Ordering::Acquire); + } + Ok(()) } - Ok(()) } } @@ -1109,7 +1088,7 @@ mod tests { // Use `catch_unwind` to catch the panic caused by missing Tokio runtime let result = std::panic::catch_unwind(|| { let exporter = LogExporterThatRequiresTokio::new(); - let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let processor = SimpleLogProcessor::new(exporter.clone()); let mut record: LogRecord = Default::default(); let instrumentation: InstrumentationScope = Default::default(); @@ -1158,7 +1137,7 @@ mod tests { // tasks nor the exporter can proceed. async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() { let exporter = LogExporterThatRequiresTokio::new(); - let processor = Arc::new(SimpleLogProcessor::new(Box::new(exporter.clone()))); + let processor = Arc::new(SimpleLogProcessor::new(exporter.clone())); let concurrent_emit = 4; // number of worker threads @@ -1189,7 +1168,7 @@ mod tests { // tasks occupy the runtime. async fn test_simple_processor_async_exporter_with_runtime() { let exporter = LogExporterThatRequiresTokio::new(); - let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let processor = SimpleLogProcessor::new(exporter.clone()); let mut record: LogRecord = Default::default(); let instrumentation: InstrumentationScope = Default::default(); @@ -1208,7 +1187,7 @@ mod tests { async fn test_simple_processor_async_exporter_with_multi_thread_runtime() { let exporter = LogExporterThatRequiresTokio::new(); - let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let processor = SimpleLogProcessor::new(exporter.clone()); let mut record: LogRecord = Default::default(); let instrumentation: InstrumentationScope = Default::default(); @@ -1228,7 +1207,7 @@ mod tests { async fn test_simple_processor_async_exporter_with_current_thread_runtime() { let exporter = LogExporterThatRequiresTokio::new(); - let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let processor = SimpleLogProcessor::new(exporter.clone()); let mut record: LogRecord = Default::default(); let instrumentation: InstrumentationScope = Default::default(); From 3ed482d8f30c3517160d6c8a094e126f83b90c4b Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 1 Dec 2024 06:15:19 +0000 Subject: [PATCH 02/19] futher changes.. --- opentelemetry-sdk/Cargo.toml | 2 +- opentelemetry-sdk/src/logs/log_emitter.rs | 2 +- opentelemetry-sdk/src/logs/log_processor.rs | 7 +-- opentelemetry-sdk/src/logs/mod.rs | 2 +- .../src/testing/logs/in_memory_exporter.rs | 25 +++++----- opentelemetry-stdout/src/logs/exporter.rs | 46 +++++++++++-------- 6 files changed, 47 insertions(+), 37 deletions(-) diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index f140f9702b..9d1259001c 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -44,7 +44,7 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] } default = ["trace", "metrics", "logs", "internal-logs"] trace = ["opentelemetry/trace", "rand", "percent-encoding"] jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"] -logs = ["opentelemetry/logs", "async-trait", "serde_json"] +logs = ["opentelemetry/logs", "serde_json"] spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"] metrics = ["opentelemetry/metrics", "glob", "async-trait"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index efa8c42641..bb4b196bcf 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -188,7 +188,7 @@ impl Builder { /// The `LogExporter` that this provider should use. pub fn with_simple_exporter(self, exporter: T) -> Self { let mut processors = self.processors; - processors.push(Box::new(SimpleLogProcessor::new(Box::new(exporter)))); + processors.push(Box::new(SimpleLogProcessor::new(exporter))); Builder { processors, ..self } } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 7d32e01df8..35aa1010a6 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -552,7 +552,6 @@ mod tests { testing::logs::InMemoryLogExporter, Resource, }; - use async_trait::async_trait; use opentelemetry::logs::AnyValue; use opentelemetry::logs::LogRecord as _; use opentelemetry::logs::{Logger, LoggerProvider as _}; @@ -567,12 +566,11 @@ mod tests { resource: Arc>>, } - #[async_trait] impl LogExporter for MockLogExporter { fn export( &mut self, _batch: LogBatch<'_>, - ) -> impl std::future::Future> + Send { + ) -> impl std::future::Future> + Send + '_ { async { Ok(()) } } @@ -1071,12 +1069,11 @@ mod tests { batch: LogBatch<'_>, ) -> impl std::future::Future> + Send { // Simulate minimal dependency on tokio by sleeping asynchronously for a short duration - let export_count = Arc::clone(&self.export_count); async move { tokio::time::sleep(Duration::from_millis(50)).await; for _ in batch.iter() { - export_count.fetch_add(1, Ordering::Acquire); + self.export_count.fetch_add(1, Ordering::Acquire); } Ok(()) } diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index d1212bbdeb..1d91af3ee4 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -49,7 +49,7 @@ mod tests { let exporter: InMemoryLogExporter = InMemoryLogExporter::default(); let logger_provider = LoggerProvider::builder() .with_resource(resource.clone()) - .with_log_processor(SimpleLogProcessor::new(Box::new(exporter.clone()))) + .with_log_processor(SimpleLogProcessor::new(exporter.clone())) .build(); // Act diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index daafd47fc0..ce3cf5c37b 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -2,7 +2,6 @@ use crate::export::logs::{LogBatch, LogExporter}; use crate::logs::LogRecord; use crate::logs::{LogError, LogResult}; use crate::Resource; -use async_trait::async_trait; use opentelemetry::InstrumentationScope; use std::borrow::Cow; use std::sync::{Arc, Mutex}; @@ -182,18 +181,22 @@ impl InMemoryLogExporter { } } -#[async_trait] impl LogExporter for InMemoryLogExporter { - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { - let mut logs_guard = self.logs.lock().map_err(LogError::from)?; - for (log_record, instrumentation) in batch.iter() { - let owned_log = OwnedLogData { - record: (*log_record).clone(), - instrumentation: (*instrumentation).clone(), - }; - logs_guard.push(owned_log); + fn export( + &mut self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async move { + let mut logs_guard = self.logs.lock().map_err(LogError::from)?; + for (log_record, instrumentation) in batch.iter() { + let owned_log = OwnedLogData { + record: (*log_record).clone(), + instrumentation: (*instrumentation).clone(), + }; + logs_guard.push(owned_log); + } + Ok(()) } - Ok(()) } fn shutdown(&mut self) { diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index c5edb880bf..9dd2ab335f 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -5,6 +5,7 @@ use opentelemetry_sdk::export::logs::LogBatch; use opentelemetry_sdk::logs::LogResult; use opentelemetry_sdk::Resource; use std::sync::atomic; +use std::sync::{Arc, Mutex}; /// An OpenTelemetry exporter that writes Logs to stdout on export. pub struct LogExporter { @@ -29,30 +30,39 @@ impl fmt::Debug for LogExporter { } } -#[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { - if self.is_shutdown.load(atomic::Ordering::SeqCst) { - return Err("exporter is shut down".into()); - } else { - println!("Logs"); - if self.resource_emitted { - print_logs(batch); + fn export( + &mut self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + let is_shutdown = self.is_shutdown.load(atomic::Ordering::SeqCst); + let resource_emitted_arc = Arc::new(Mutex::new(self.resource_emitted)); + let resource_emitted_arc_clone = Arc::clone(&resource_emitted_arc); + let resource = self.resource.clone(); + async move { + if is_shutdown { + Err("exporter is shut down".into()) } else { - self.resource_emitted = true; - println!("Resource"); - if let Some(schema_url) = self.resource.schema_url() { - println!("\t Resource SchemaUrl: {:?}", schema_url); + println!("Logs"); + let mut resource_emitted_guard = resource_emitted_arc_clone.lock().unwrap(); + if *resource_emitted_guard { + print_logs(batch); + } else { + println!("Resource"); + if let Some(schema_url) = resource.schema_url() { + println!("\t Resource SchemaUrl: {:?}", schema_url); + } + resource.iter().for_each(|(k, v)| { + println!("\t -> {}={:?}", k, v); + }); + + print_logs(batch); + *resource_emitted_guard = true; } - self.resource.iter().for_each(|(k, v)| { - println!("\t -> {}={:?}", k, v); - }); - print_logs(batch); + Ok(()) } - - Ok(()) } } From a2aa648c5ec7191b17236f7566d9ecfcab6ec456 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 2 Dec 2024 07:07:22 +0000 Subject: [PATCH 03/19] changes.. --- opentelemetry-sdk/src/export/logs/mod.rs | 7 +++-- opentelemetry-sdk/src/logs/log_processor.rs | 23 +++++++-------- .../src/testing/logs/in_memory_exporter.rs | 8 +++--- opentelemetry-stdout/src/logs/exporter.rs | 28 ++++++++----------- 4 files changed, 31 insertions(+), 35 deletions(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 88d63c7860..707cb524e4 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -81,9 +81,10 @@ pub trait LogExporter: Send + Sync + Debug { /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. /// fn export<'a>( - &mut self, - batch: LogBatch<'a>, - ) -> impl std::future::Future> + Send; + &'a mut self, + batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a; + /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 35aa1010a6..a0d930e8f1 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -108,7 +108,8 @@ impl LogProcessor for SimpleLogProcessor { .map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into())) .and_then(|mut exporter| { let log_tuple = &[(record as &LogRecord, instrumentation)]; - futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) + let log_batch = LogBatch::new(log_tuple); + futures_executor::block_on(exporter.export(&log_batch)) }); // Handle errors with specific static names match result { @@ -217,7 +218,6 @@ impl BatchLogProcessor { { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); - //let exporter = Arc::new(Mutex::new(exporter)); let inner_runtime = runtime.clone(); // Spawn worker process via user-defined spawn function. @@ -340,7 +340,8 @@ where .iter() .map(|log_data| (&log_data.0, &log_data.1)) .collect(); - let export = exporter.export(LogBatch::new(log_vec.as_slice())); + let log_batch = LogBatch::new(log_vec.as_slice()); + let export = exporter.export(&log_batch); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); @@ -567,10 +568,10 @@ mod tests { } impl LogExporter for MockLogExporter { - fn export( - &mut self, - _batch: LogBatch<'_>, - ) -> impl std::future::Future> + Send + '_ { + fn export<'a>( + &'a mut self, + _batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a { async { Ok(()) } } @@ -1064,10 +1065,10 @@ mod tests { } impl LogExporter for LogExporterThatRequiresTokio { - fn export( - &mut self, - batch: LogBatch<'_>, - ) -> impl std::future::Future> + Send { + fn export<'a>( + &'a mut self, + batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a { // Simulate minimal dependency on tokio by sleeping asynchronously for a short duration async move { tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index ce3cf5c37b..53500d6908 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -182,10 +182,10 @@ impl InMemoryLogExporter { } impl LogExporter for InMemoryLogExporter { - fn export( - &mut self, - batch: LogBatch<'_>, - ) -> impl std::future::Future> + Send { + fn export<'a>( + &'a mut self, + batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a { async move { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; for (log_record, instrumentation) in batch.iter() { diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 9dd2ab335f..2c70fd51a3 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -32,33 +32,27 @@ impl fmt::Debug for LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - fn export( - &mut self, - batch: LogBatch<'_>, - ) -> impl std::future::Future> + Send { - let is_shutdown = self.is_shutdown.load(atomic::Ordering::SeqCst); - let resource_emitted_arc = Arc::new(Mutex::new(self.resource_emitted)); - let resource_emitted_arc_clone = Arc::clone(&resource_emitted_arc); - let resource = self.resource.clone(); + fn export<'a>( + &'a mut self, + batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a { async move { - if is_shutdown { - Err("exporter is shut down".into()) + if self.is_shutdown.load(atomic::Ordering::SeqCst) { + return Err("exporter is shut down".into()); } else { println!("Logs"); - let mut resource_emitted_guard = resource_emitted_arc_clone.lock().unwrap(); - if *resource_emitted_guard { + if self.resource_emitted { print_logs(batch); } else { + self.resource_emitted = true; println!("Resource"); - if let Some(schema_url) = resource.schema_url() { + if let Some(schema_url) = self.resource.schema_url() { println!("\t Resource SchemaUrl: {:?}", schema_url); } - resource.iter().for_each(|(k, v)| { + self.resource.iter().for_each(|(k, v)| { println!("\t -> {}={:?}", k, v); }); - print_logs(batch); - *resource_emitted_guard = true; } Ok(()) @@ -75,7 +69,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { } } -fn print_logs(batch: LogBatch<'_>) { +fn print_logs(batch: &LogBatch<'_>) { for (i, log) in batch.iter().enumerate() { println!("Log #{}", i); let (record, _library) = log; From 8ba1173697662339199aef84425bf99048c14895 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 2 Dec 2024 18:11:27 +0000 Subject: [PATCH 04/19] initial change --- opentelemetry-appender-tracing/src/layer.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 3b168807fa..0352643d52 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -247,11 +247,16 @@ mod tests { #[async_trait] impl LogExporter for ReentrantLogExporter { - async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> { - // This will cause a deadlock as the export itself creates a log - // while still within the lock of the SimpleLogProcessor. - warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io"); - Ok(()) + fn export<'a>( + &'a mut self, + _batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a { + async { + // This will cause a deadlock as the export itself creates a log + // while still within the lock of the SimpleLogProcessor. + warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io"); + Ok(()) + } } } From 37823e7370982d552ce69c62d670aec675e191fb Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 2 Dec 2024 19:17:21 +0000 Subject: [PATCH 05/19] stdout exporter --- opentelemetry-stdout/src/logs/exporter.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 2c70fd51a3..840157852e 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,11 +1,9 @@ -use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::fmt; use opentelemetry_sdk::export::logs::LogBatch; use opentelemetry_sdk::logs::LogResult; use opentelemetry_sdk::Resource; use std::sync::atomic; -use std::sync::{Arc, Mutex}; /// An OpenTelemetry exporter that writes Logs to stdout on export. pub struct LogExporter { From 3b4cd571184272b909e19e6eaefa0bff77c73d9b Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 2 Dec 2024 19:43:56 +0000 Subject: [PATCH 06/19] update stress --- stress/Cargo.toml | 1 + stress/src/logs.rs | 59 ++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 50 insertions(+), 10 deletions(-) diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 6d3dd4dd1f..49af173bd7 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -52,6 +52,7 @@ tracing-subscriber = { workspace = true, features = ["registry", "std"] } num-format = "0.4.4" sysinfo = { version = "0.32", optional = true } libc = "=0.2.164" # https://github.com/GuillaumeGomez/sysinfo/issues/1392 +futures-executor = { workspace = true } [features] stats = ["sysinfo"] diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 1ede37b1ee..82a1c90a61 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -11,36 +11,75 @@ use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer; -use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider}; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::logs::{LogProcessor, LogResult, LoggerProvider}; +use std::{ + os::unix::process, + sync::{Arc, Mutex}, +}; use tracing::error; use tracing_subscriber::prelude::*; mod throughput; +#[derive(Clone, Debug, Default)] +pub struct SimpleExporter; + +impl LogExporter for SimpleExporter { + fn export<'a>( + &'a mut self, + _batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a { + async { Ok(()) } + } +} + #[derive(Debug)] -pub struct NoOpLogProcessor; +pub struct NoOpLogProcessor { + exporter: Arc>, +} + +impl NoOpLogProcessor { + pub fn new(exporter: SimpleExporter) -> Self { + Self { + exporter: Arc::new(Mutex::new(exporter)), + } + } +} impl LogProcessor for NoOpLogProcessor { - fn emit( - &self, - _record: &mut opentelemetry_sdk::logs::LogRecord, - _scope: &InstrumentationScope, - ) { + fn emit(&self, record: &mut opentelemetry_sdk::logs::LogRecord, scope: &InstrumentationScope) { + let log_tuple = &[(record as &opentelemetry_sdk::logs::LogRecord, scope)]; + let log_batch = LogBatch::new(log_tuple); + + // Access the exporter using the Mutex + if let Ok(mut exporter) = self.exporter.lock() { + match futures_executor::block_on(exporter.export(&log_batch)) { + Err(err) => { + eprintln!("Error: Export failed in NoOpLogProcessor. Error: {:?}", err); + } + _ => {} + } + } else { + eprintln!("Error: Failed to acquire lock on exporter."); + } } - fn force_flush(&self) -> opentelemetry_sdk::logs::LogResult<()> { + fn force_flush(&self) -> LogResult<()> { Ok(()) } - fn shutdown(&self) -> opentelemetry_sdk::logs::LogResult<()> { + fn shutdown(&self) -> LogResult<()> { Ok(()) } } fn main() { + let exporter = SimpleExporter::default(); + let processor = NoOpLogProcessor::new(exporter); // LoggerProvider with a no-op processor. let provider: LoggerProvider = LoggerProvider::builder() - .with_log_processor(NoOpLogProcessor {}) + .with_log_processor(processor) .build(); // Use the OpenTelemetryTracingBridge to test the throughput of the appender-tracing. From 8cacf52fa3bef8ee05c6dd0d4c8846d59f13000d Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 6 Dec 2024 23:48:05 +0000 Subject: [PATCH 07/19] fix otlp --- .../benches/logs.rs | 20 +++-- opentelemetry-appender-tracing/src/layer.rs | 1 + opentelemetry-otlp/src/exporter/http/logs.rs | 76 ++++++++++--------- opentelemetry-otlp/src/exporter/http/mod.rs | 8 +- opentelemetry-otlp/src/exporter/tonic/logs.rs | 54 +++++++------ opentelemetry-otlp/src/exporter/tonic/mod.rs | 4 +- opentelemetry-otlp/src/logs.rs | 46 +++++++++-- opentelemetry-proto/src/transform/logs.rs | 6 +- opentelemetry-sdk/src/logs/log_processor.rs | 2 + .../src/testing/logs/in_memory_exporter.rs | 1 + opentelemetry-stdout/src/logs/exporter.rs | 33 ++++---- stress/src/logs.rs | 22 +----- 12 files changed, 155 insertions(+), 118 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 3a3887cc67..59ed99f7b5 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -34,8 +34,12 @@ struct NoopExporter { #[async_trait] impl LogExporter for NoopExporter { - async fn export(&self, _: LogBatch<'_>) -> LogResult<()> { - LogResult::Ok(()) + #[allow(clippy::manual_async_fn)] + fn export<'a>( + &'a self, + _batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a { + async { LogResult::Ok(()) } } fn event_enabled(&self, _: opentelemetry::logs::Severity, _: &str, _: &str) -> bool { @@ -44,17 +48,17 @@ impl LogExporter for NoopExporter { } #[derive(Debug)] -struct NoopProcessor { - exporter: Box, +struct NoopProcessor { + exporter: E, } -impl NoopProcessor { - fn new(exporter: Box) -> Self { +impl NoopProcessor { + fn new(exporter: E) -> Self { Self { exporter } } } -impl LogProcessor for NoopProcessor { +impl LogProcessor for NoopProcessor { fn emit(&self, _: &mut LogRecord, _: &InstrumentationScope) { // no-op } @@ -124,7 +128,7 @@ fn benchmark_no_subscriber(c: &mut Criterion) { fn benchmark_with_ot_layer(c: &mut Criterion, enabled: bool, bench_name: &str) { let exporter = NoopExporter { enabled }; - let processor = NoopProcessor::new(Box::new(exporter)); + let processor = NoopProcessor::new(exporter); let provider = LoggerProvider::builder() .with_resource(Resource::new(vec![KeyValue::new( "service.name", diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 796c249bf9..c6255dfb85 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -247,6 +247,7 @@ mod tests { #[async_trait] impl LogExporter for ReentrantLogExporter { + #[allow(clippy::manual_async_fn)] fn export<'a>( &'a self, _batch: &'a LogBatch<'a>, diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 24bb667129..05f04300d4 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -9,42 +9,48 @@ use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { - let client = self - .client - .lock() - .map_err(|e| LogError::Other(e.to_string().into())) - .and_then(|g| match &*g { - Some(client) => Ok(Arc::clone(client)), - _ => Err(LogError::Other("exporter is already shut down".into())), - })?; - - let (body, content_type) = { self.build_logs_export_body(batch)? }; - let mut request = http::Request::builder() - .method(Method::POST) - .uri(&self.collector_endpoint) - .header(CONTENT_TYPE, content_type) - .body(body) - .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; - - for (k, v) in &self.headers { - request.headers_mut().insert(k.clone(), v.clone()); + #[allow(clippy::manual_async_fn)] + fn export<'a>( + &'a self, + batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a { + async move { + let client = self + .client + .lock() + .map_err(|e| LogError::Other(e.to_string().into())) + .and_then(|g| match &*g { + Some(client) => Ok(Arc::clone(client)), + _ => Err(LogError::Other("exporter is already shut down".into())), + })?; + + let (body, content_type) = { self.build_logs_export_body(batch)? }; + let mut request = http::Request::builder() + .method(Method::POST) + .uri(&self.collector_endpoint) + .header(CONTENT_TYPE, content_type) + .body(body) + .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; + + for (k, v) in &self.headers { + request.headers_mut().insert(k.clone(), v.clone()); + } + + let request_uri = request.uri().to_string(); + let response = client.send(request).await?; + + if !response.status().is_success() { + let error = format!( + "OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}", + response.status().as_u16(), + request_uri, + response.body() + ); + return Err(LogError::Other(error.into())); + } + + Ok(()) } - - let request_uri = request.uri().to_string(); - let response = client.send(request).await?; - - if !response.status().is_success() { - let error = format!( - "OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}", - response.status().as_u16(), - request_uri, - response.body() - ); - return Err(LogError::Other(error.into())); - } - - Ok(()) } fn shutdown(&mut self) { diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index da6c280a61..3fd8152e7b 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -28,7 +28,7 @@ use std::time::Duration; mod metrics; #[cfg(feature = "logs")] -mod logs; +pub(crate) mod logs; #[cfg(feature = "trace")] mod trace; @@ -236,7 +236,7 @@ impl HttpExporterBuilder { OTEL_EXPORTER_OTLP_LOGS_HEADERS, )?; - Ok(crate::LogExporter::new(client)) + Ok(crate::LogExporter::from_http(client)) } /// Create a metrics exporter with the current configuration @@ -262,7 +262,7 @@ impl HttpExporterBuilder { } #[derive(Debug)] -struct OtlpHttpClient { +pub(crate) struct OtlpHttpClient { client: Mutex>>, collector_endpoint: Uri, headers: HashMap, @@ -314,7 +314,7 @@ impl OtlpHttpClient { #[cfg(feature = "logs")] fn build_logs_export_body( &self, - logs: LogBatch<'_>, + logs: &LogBatch<'_>, ) -> opentelemetry_sdk::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index f0b52abf1b..70ec0ee0d9 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -55,33 +55,39 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { - let (mut client, metadata, extensions) = match &self.inner { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .lock() - .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here - .call(Request::new(())) - .map_err(|e| LogError::Other(Box::new(e)))? - .into_parts(); - (inner.client.clone(), m, e) - } - None => return Err(LogError::Other("exporter is already shut down".into())), - }; + #[allow(clippy::manual_async_fn)] + fn export<'a>( + &'a self, + batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a { + async move { + let (mut client, metadata, extensions) = match &self.inner { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .lock() + .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here + .call(Request::new(())) + .map_err(|e| LogError::Other(Box::new(e)))? + .into_parts(); + (inner.client.clone(), m, e) + } + None => return Err(LogError::Other("exporter is already shut down".into())), + }; - let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); - client - .export(Request::from_parts( - metadata, - extensions, - ExportLogsServiceRequest { resource_logs }, - )) - .await - .map_err(crate::Error::from)?; + client + .export(Request::from_parts( + metadata, + extensions, + ExportLogsServiceRequest { resource_logs }, + )) + .await + .map_err(crate::Error::from)?; - Ok(()) + Ok(()) + } } fn shutdown(&mut self) { diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index f041eeee2d..70ecc5ddc8 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -19,7 +19,7 @@ use crate::{ }; #[cfg(feature = "logs")] -mod logs; +pub(crate) mod logs; #[cfg(feature = "metrics")] mod metrics; @@ -266,7 +266,7 @@ impl TonicExporterBuilder { let client = TonicLogsClient::new(channel, interceptor, compression); - Ok(crate::logs::LogExporter::new(client)) + Ok(crate::logs::LogExporter::from_tonic(client)) } /// Build a new tonic metrics exporter diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 8ae10afb06..72c2ea5ebc 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -105,7 +105,16 @@ impl HasHttpConfig for LogExporterBuilder { /// OTLP exporter that sends log data #[derive(Debug)] pub struct LogExporter { - client: Box, + //client: Box, + client: LogExporterInner, +} + +#[derive(Debug)] +enum LogExporterInner { + #[cfg(feature = "grpc-tonic")] + Tonic(crate::exporter::tonic::logs::TonicLogsClient), + #[cfg(any(feature = "http-proto", feature = "http-json"))] + Http(crate::exporter::http::OtlpHttpClient), } impl LogExporter { @@ -114,21 +123,44 @@ impl LogExporter { LogExporterBuilder::default() } - /// Create a new log exporter - pub fn new(client: impl opentelemetry_sdk::export::logs::LogExporter + 'static) -> Self { + #[cfg(any(feature = "http-proto", feature = "http-json"))] + pub(crate) fn from_http(client: crate::exporter::http::OtlpHttpClient) -> Self { LogExporter { - client: Box::new(client), + client: LogExporterInner::Http(client), + } + } + + #[cfg(feature = "grpc-tonic")] + pub(crate) fn from_tonic(client: crate::exporter::tonic::logs::TonicLogsClient) -> Self { + LogExporter { + client: LogExporterInner::Tonic(client), } } } #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { - self.client.export(batch).await + #[allow(clippy::manual_async_fn)] + fn export<'a>( + &'a self, + batch: &'a LogBatch<'a>, + ) -> impl std::future::Future> + Send + 'a { + async move { + match &self.client { + #[cfg(feature = "grpc-tonic")] + LogExporterInner::Tonic(client) => client.export(batch).await, + #[cfg(any(feature = "http-proto", feature = "http-json"))] + LogExporterInner::Http(client) => client.export(batch).await, + } + } } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { - self.client.set_resource(resource); + match &mut self.client { + #[cfg(feature = "grpc-tonic")] + LogExporterInner::Tonic(client) => client.set_resource(resource), + #[cfg(any(feature = "http-proto", feature = "http-json"))] + LogExporterInner::Http(client) => client.set_resource(resource), + } } } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index a85b2ed7a7..23b8689cd0 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -178,7 +178,7 @@ pub mod tonic { } pub fn group_logs_by_resource_and_scope( - logs: LogBatch<'_>, + logs: &LogBatch<'_>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -261,7 +261,7 @@ mod tests { let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; @@ -281,7 +281,7 @@ mod tests { let log_batch = LogBatch::new(&logs); let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6e012c512f..43f3422001 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -568,6 +568,7 @@ mod tests { } impl LogExporter for MockLogExporter { + #[allow(clippy::manual_async_fn)] fn export<'a>( &'a self, _batch: &'a LogBatch<'a>, @@ -1065,6 +1066,7 @@ mod tests { } impl LogExporter for LogExporterThatRequiresTokio { + #[allow(clippy::manual_async_fn)] fn export<'a>( &'a self, batch: &'a LogBatch<'a>, diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index cc7d291de4..3f846a955e 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -181,6 +181,7 @@ impl InMemoryLogExporter { } impl LogExporter for InMemoryLogExporter { + #[allow(clippy::manual_async_fn)] fn export<'a>( &'a self, batch: &'a LogBatch<'a>, diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 481a834241..9427b00d70 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -31,30 +31,31 @@ impl fmt::Debug for LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout + #[allow(clippy::manual_async_fn)] fn export<'a>( &'a self, batch: &'a LogBatch<'a>, ) -> impl std::future::Future> + Send + 'a { async move { if self.is_shutdown.load(atomic::Ordering::SeqCst) { - return Err("exporter is shut down".into()); + Err("exporter is shut down".into()) } else { println!("Logs"); - if self - .resource_emitted - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_err() - { - print_logs(batch); - } else { - println!("Resource"); - if let Some(schema_url) = self.resource.schema_url() { - println!("\t Resource SchemaUrl: {:?}", schema_url); - } - self.resource.iter().for_each(|(k, v)| { - println!("\t -> {}={:?}", k, v); - }); - print_logs(batch); + if self + .resource_emitted + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + print_logs(batch); + } else { + println!("Resource"); + if let Some(schema_url) = self.resource.schema_url() { + println!("\t Resource SchemaUrl: {:?}", schema_url); + } + self.resource.iter().for_each(|(k, v)| { + println!("\t -> {}={:?}", k, v); + }); + print_logs(batch); } Ok(()) diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 4066f1684f..c9e91f418c 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -13,33 +13,18 @@ use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer; use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, LoggerProvider}; -use std::{ - os::unix::process, - sync::{Arc, Mutex}, -}; use tracing::error; use tracing_subscriber::prelude::*; mod throughput; -use async_trait::async_trait; #[derive(Debug, Clone)] struct MockLogExporter; -#[async_trait] impl LogExporter for MockLogExporter { - async fn export(&self, _: LogBatch<'_>) -> LogResult<()> { - LogResult::Ok(()) - } -} - -#[derive(Clone, Debug, Default)] -pub struct SimpleExporter; - -impl LogExporter for SimpleExporter { fn export<'a>( - &'a mut self, + &'a self, _batch: &'a LogBatch<'a>, ) -> impl std::future::Future> + Send + 'a { async { Ok(()) } @@ -54,7 +39,8 @@ pub struct MockLogProcessor { impl LogProcessor for MockLogProcessor { fn emit(&self, record: &mut opentelemetry_sdk::logs::LogRecord, scope: &InstrumentationScope) { let log_tuple = &[(record as &LogRecord, scope)]; - let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); + let log_batch = LogBatch::new(log_tuple); + let _ = futures_executor::block_on(self.exporter.export(&log_batch)); } fn force_flush(&self) -> LogResult<()> { @@ -67,8 +53,6 @@ impl LogProcessor for MockLogProcessor { } fn main() { - let exporter = SimpleExporter::default(); - let processor = NoOpLogProcessor::new(exporter); // LoggerProvider with a no-op processor. let provider: LoggerProvider = LoggerProvider::builder() .with_log_processor(MockLogProcessor { From f752d5055be3cf7ca5ee45dd3579969953369f92 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 12 Dec 2024 23:18:35 -0800 Subject: [PATCH 08/19] add comment --- opentelemetry-otlp/src/exporter/tonic/logs.rs | 18 +++++++++--------- opentelemetry-sdk/src/export/logs/mod.rs | 3 +++ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 2c634db7f9..df4e61184d 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -80,16 +80,16 @@ impl LogExporter for TonicLogsClient { let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); - otel_debug!(name: "TonicsLogsClient.CallingExport"); + otel_debug!(name: "TonicsLogsClient.CallingExport"); - client - .export(Request::from_parts( - metadata, - extensions, - ExportLogsServiceRequest { resource_logs }, - )) - .await - .map_err(crate::Error::from)?; + client + .export(Request::from_parts( + metadata, + extensions, + ExportLogsServiceRequest { resource_logs }, + )) + .await + .map_err(crate::Error::from)?; Ok(()) } } diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 9c7e3d7db8..5ddcff29a5 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -79,6 +79,9 @@ pub trait LogExporter: Send + Sync + Debug { /// A `LogResult<()>`, which is a result type indicating either a successful export (with /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. /// + /// Note: + /// The `Send` bound ensures the future can be safely moved across threads, which is crucial for multi-threaded async runtimes like Tokio. + /// Explicit lifetimes (`'a`) synchronize the lifetimes of `self`, `batch`, and the returned future. fn export<'a>( &'a self, batch: &'a LogBatch<'a>, From 8db475c2aab6bffbc2c54d5ce346d5473059741d Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 15 Dec 2024 06:24:47 -0800 Subject: [PATCH 09/19] review comment --- .../benches/logs.rs | 2 -- opentelemetry-otlp/src/exporter/http/logs.rs | 2 -- opentelemetry-otlp/src/exporter/tonic/logs.rs | 2 -- opentelemetry-otlp/src/logs.rs | 19 ++++++++----------- 4 files changed, 8 insertions(+), 17 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 63cc29d0e7..679315ef6b 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -13,7 +13,6 @@ | ot_layer_enabled | 196 ns | */ -use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer as tracing_layer; @@ -32,7 +31,6 @@ struct NoopExporter { enabled: bool, } -#[async_trait] impl LogExporter for NoopExporter { #[allow(clippy::manual_async_fn)] fn export<'a>( diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 05f04300d4..4a84b2932c 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -1,13 +1,11 @@ use std::sync::Arc; -use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{LogError, LogResult}; use super::OtlpHttpClient; -#[async_trait] impl LogExporter for OtlpHttpClient { #[allow(clippy::manual_async_fn)] fn export<'a>( diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index df4e61184d..23198cfa7d 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use core::fmt; use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::logs::v1::{ @@ -56,7 +55,6 @@ impl TonicLogsClient { } } -#[async_trait] impl LogExporter for TonicLogsClient { #[allow(clippy::manual_async_fn)] fn export<'a>( diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index fc220e01e4..c765095664 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -2,7 +2,6 @@ //! //! Defines a [LogExporter] to send logs via the OpenTelemetry Protocol (OTLP) -use async_trait::async_trait; use opentelemetry::otel_debug; use std::fmt::Debug; @@ -108,12 +107,11 @@ impl HasHttpConfig for LogExporterBuilder { /// OTLP exporter that sends log data #[derive(Debug)] pub struct LogExporter { - //client: Box, - client: LogExporterInner, + client: SupportedTransportClient, } #[derive(Debug)] -enum LogExporterInner { +enum SupportedTransportClient { #[cfg(feature = "grpc-tonic")] Tonic(crate::exporter::tonic::logs::TonicLogsClient), #[cfg(any(feature = "http-proto", feature = "http-json"))] @@ -129,19 +127,18 @@ impl LogExporter { #[cfg(any(feature = "http-proto", feature = "http-json"))] pub(crate) fn from_http(client: crate::exporter::http::OtlpHttpClient) -> Self { LogExporter { - client: LogExporterInner::Http(client), + client: SupportedTransportClient::Http(client), } } #[cfg(feature = "grpc-tonic")] pub(crate) fn from_tonic(client: crate::exporter::tonic::logs::TonicLogsClient) -> Self { LogExporter { - client: LogExporterInner::Tonic(client), + client: SupportedTransportClient::Tonic(client), } } } -#[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { #[allow(clippy::manual_async_fn)] fn export<'a>( @@ -151,9 +148,9 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { async move { match &self.client { #[cfg(feature = "grpc-tonic")] - LogExporterInner::Tonic(client) => client.export(batch).await, + SupportedTransportClient::Tonic(client) => client.export(batch).await, #[cfg(any(feature = "http-proto", feature = "http-json"))] - LogExporterInner::Http(client) => client.export(batch).await, + SupportedTransportClient::Http(client) => client.export(batch).await, } } } @@ -161,9 +158,9 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { match &mut self.client { #[cfg(feature = "grpc-tonic")] - LogExporterInner::Tonic(client) => client.set_resource(resource), + SupportedTransportClient::Tonic(client) => client.set_resource(resource), #[cfg(any(feature = "http-proto", feature = "http-json"))] - LogExporterInner::Http(client) => client.set_resource(resource), + SupportedTransportClient::Http(client) => client.set_resource(resource), } } } From d1f3cbb4e78b48dcd7f257b0e35972c56bf81512 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 18 Dec 2024 21:52:49 -0800 Subject: [PATCH 10/19] resolve conflicts --- opentelemetry-sdk/src/logs/log_processor.rs | 27 ++++++++++++--------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 3ae4d245c9..ec237102ed 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -9,7 +9,8 @@ use std::sync::mpsc::{self, RecvTimeoutError, SyncSender}; use opentelemetry::logs::Severity; use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::{cmp::min, env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, @@ -318,7 +319,10 @@ impl LogProcessor for BatchLogProcessor { } impl BatchLogProcessor { - pub(crate) fn new(mut exporter: Box, config: BatchConfig) -> Self { + pub(crate) fn new(mut exporter: E, config: BatchConfig) -> Self + where + E: LogExporter + Send + Sync + 'static, + { let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size); let max_queue_size = config.max_queue_size; @@ -346,7 +350,7 @@ impl BatchLogProcessor { { let _ = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -355,7 +359,7 @@ impl BatchLogProcessor { Ok(BatchMessage::ForceFlush(sender)) => { let result = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -364,7 +368,7 @@ impl BatchLogProcessor { Ok(BatchMessage::Shutdown(sender)) => { let result = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -381,7 +385,7 @@ impl BatchLogProcessor { Err(RecvTimeoutError::Timeout) => { let _ = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -442,7 +446,8 @@ where .iter() .map(|log_data| (&log_data.0, &log_data.1)) .collect(); - let export = exporter.export(LogBatch::new(log_vec.as_slice())); + let log_batch = LogBatch::new(log_vec.as_slice()); + let export = exporter.export(&log_batch); let export_result = futures_executor::block_on(export); match export_result { @@ -477,7 +482,7 @@ where /// Build a batch processor pub fn build(self) -> BatchLogProcessor { - BatchLogProcessor::new(Box::new(self.exporter), self.config) + BatchLogProcessor::new(self.exporter, self.config) } } @@ -569,7 +574,7 @@ impl LogProcessor for BatchLogProcessorWithAsyncRuntime { #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] impl BatchLogProcessorWithAsyncRuntime { - pub(crate) fn new(mut exporter: E, config: BatchConfig, runtime: R) -> Self + pub(crate) fn new(mut exporter: E, config: BatchConfig, runtime: R) -> Self where E: LogExporter + Send + Sync + 'static, { @@ -1125,7 +1130,7 @@ mod tests { let exporter = MockLogExporter { resource: Arc::new(Mutex::new(None)), }; - let processor = BatchLogProcessor::new(exporter.clone(),, BatchConfig::default()); + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); let provider = LoggerProvider::builder() .with_log_processor(processor) .with_resource( @@ -1195,7 +1200,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); processor.shutdown().unwrap(); } From fa11cd7476923b1cc97eba0c06ada1cff2a8cde1 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 18 Dec 2024 22:16:45 -0800 Subject: [PATCH 11/19] initial commit --- opentelemetry-sdk/src/logs/log_processor.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index ec237102ed..6c5996483f 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -554,6 +554,16 @@ impl LogProcessor for BatchLogProcessorWithAsyncRuntime { } fn shutdown(&self) -> LogResult<()> { + let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); + let max_queue_size = self.max_queue_size; + if dropped_logs > 0 { + otel_warn!( + name: "BatchLogProcessor.LogsDropped", + dropped_logs_count = dropped_logs, + max_queue_size = max_queue_size, + message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." + ); + } let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessageWithAsyncRuntime::Shutdown(res_sender)) @@ -669,7 +679,7 @@ impl BatchLogProcessorWithAsyncRuntime { BatchLogProcessorWithAsyncRuntime { message_sender, dropped_logs_count: AtomicUsize::new(0), - max_queue_size, + max_queue_size: config.max_queue_size, } } From 67f006ae59a19af351b39b5acf9bba0e29846e33 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 18 Dec 2024 22:22:22 -0800 Subject: [PATCH 12/19] further conflicts --- opentelemetry-sdk/src/logs/log_processor.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6c5996483f..53631d16b1 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -534,11 +534,13 @@ impl LogProcessor for BatchLogProcessorWithAsyncRuntime { ))); // TODO - Implement throttling to prevent error flooding when the queue is full or closed. - if let Err(err) = result { - otel_error!( - name: "BatchLogProcessor.Export.Error", - error = format!("{}", err) - ); + if result.is_err() { + // Increment dropped logs count. The first time we have to drop a log, + // emit a warning. + if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", + message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); + } } } From 85f653c4e3861ec73ee74872f49a5fc26de5120d Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 18 Dec 2024 22:29:03 -0800 Subject: [PATCH 13/19] remove unwantd comments --- opentelemetry-sdk/src/logs/log_processor.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 53631d16b1..532d408997 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -633,7 +633,6 @@ impl BatchLogProcessorWithAsyncRuntime { BatchMessageWithAsyncRuntime::Flush(res_channel) => { let result = export_with_timeout( config.max_export_timeout, - //Arc::clone(&exporter ), &mut exporter, &timeout_runtime, logs.split_off(0), @@ -653,7 +652,6 @@ impl BatchLogProcessorWithAsyncRuntime { BatchMessageWithAsyncRuntime::Shutdown(ch) => { let result = export_with_timeout( config.max_export_timeout, - //Arc::clone(&exporter ), &mut exporter, &timeout_runtime, logs.split_off(0), @@ -701,7 +699,6 @@ impl BatchLogProcessorWithAsyncRuntime { #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] async fn export_with_timeout( time_out: Duration, - //exporter: Arc>, exporter: &mut E, runtime: &R, batch: Vec<(LogRecord, InstrumentationScope)>, From 0d5a17cca8d30e9205f848e331b2974267f88707 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 19 Dec 2024 09:39:38 -0800 Subject: [PATCH 14/19] merge conflict --- opentelemetry-sdk/src/logs/log_processor.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 3f3eded24b..e9c595112b 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1664,7 +1664,7 @@ mod tests { resource: Arc::new(Mutex::new(None)), }; let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::Tokio, ); @@ -1692,7 +1692,7 @@ mod tests { .keep_records_on_shutdown() .build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::Tokio, ); @@ -1714,7 +1714,7 @@ mod tests { async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::Tokio, ); @@ -1731,7 +1731,7 @@ mod tests { { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::TokioCurrentThread, ); @@ -1744,7 +1744,7 @@ mod tests { async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::Tokio, ); @@ -1757,7 +1757,7 @@ mod tests { async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::TokioCurrentThread, ); From 4e4a18979e536bab193de5eb27048e823c1dc032 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 20 Dec 2024 08:44:49 +0530 Subject: [PATCH 15/19] Remove async_trait import and attribute --- opentelemetry-appender-tracing/src/layer.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 76cd0d7151..48fffdf971 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -209,7 +209,6 @@ const fn severity_of_level(level: &Level) -> Severity { #[cfg(test)] mod tests { use crate::layer; - use async_trait::async_trait; use opentelemetry::logs::Severity; use opentelemetry::trace::TracerProvider as _; use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer}; @@ -245,7 +244,6 @@ mod tests { #[derive(Clone, Debug, Default)] struct ReentrantLogExporter; - #[async_trait] impl LogExporter for ReentrantLogExporter { #[allow(clippy::manual_async_fn)] fn export<'a>( From 7627224f205d4c6cd2225659171b2676bcfc20b5 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 19 Dec 2024 19:27:45 -0800 Subject: [PATCH 16/19] remove unused crate --- opentelemetry-appender-tracing/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/opentelemetry-appender-tracing/Cargo.toml b/opentelemetry-appender-tracing/Cargo.toml index e3c682b77d..9e831eb38f 100644 --- a/opentelemetry-appender-tracing/Cargo.toml +++ b/opentelemetry-appender-tracing/Cargo.toml @@ -24,7 +24,6 @@ opentelemetry-stdout = { path = "../opentelemetry-stdout", features = ["logs"] } opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["logs", "testing"] } tracing-subscriber = { workspace = true, features = ["registry", "std", "env-filter"] } tracing-log = "0.2" -async-trait = { workspace = true } criterion = { workspace = true } tokio = { workspace = true, features = ["full"]} From ddb15bfa7ceb6924ecdfc7c70c0a9932ee84fa02 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 20 Dec 2024 07:29:17 -0800 Subject: [PATCH 17/19] update export to take batch by value --- .../benches/logs.rs | 8 +++---- opentelemetry-appender-tracing/src/layer.rs | 8 +++---- opentelemetry-otlp/src/exporter/http/logs.rs | 10 ++++----- opentelemetry-otlp/src/exporter/tonic/logs.rs | 10 ++++----- opentelemetry-otlp/src/logs.rs | 8 +++---- opentelemetry-sdk/src/export/logs/mod.rs | 11 ++++------ opentelemetry-sdk/src/logs/log_processor.rs | 22 +++++++++---------- .../src/testing/logs/in_memory_exporter.rs | 8 +++---- opentelemetry-stdout/src/logs/exporter.rs | 12 +++++----- stress/src/logs.rs | 10 ++++----- 10 files changed, 52 insertions(+), 55 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 679315ef6b..e5fb98273c 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -33,10 +33,10 @@ struct NoopExporter { impl LogExporter for NoopExporter { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - _batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async { LogResult::Ok(()) } } diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 48fffdf971..af752c5f8e 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -246,10 +246,10 @@ mod tests { impl LogExporter for ReentrantLogExporter { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - _batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async { // This will cause a deadlock as the export itself creates a log // while still within the lock of the SimpleLogProcessor. diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index f6300d30ef..859055ba46 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -9,10 +9,10 @@ use super::OtlpHttpClient; impl LogExporter for OtlpHttpClient { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async move { let client = self .client @@ -23,7 +23,7 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - let (body, content_type) = { self.build_logs_export_body(batch)? }; + let (body, content_type) = { self.build_logs_export_body(&batch)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 23198cfa7d..3f4049cfed 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -57,10 +57,10 @@ impl TonicLogsClient { impl LogExporter for TonicLogsClient { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async move { let (mut client, metadata, extensions) = match &self.inner { Some(inner) => { @@ -76,7 +76,7 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; - let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(&batch, &self.resource); otel_debug!(name: "TonicsLogsClient.CallingExport"); diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index c765095664..c199aa5a96 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -141,10 +141,10 @@ impl LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async move { match &self.client { #[cfg(feature = "grpc-tonic")] diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 5ddcff29a5..902adb54dd 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -79,13 +79,10 @@ pub trait LogExporter: Send + Sync + Debug { /// A `LogResult<()>`, which is a result type indicating either a successful export (with /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. /// - /// Note: - /// The `Send` bound ensures the future can be safely moved across threads, which is crucial for multi-threaded async runtimes like Tokio. - /// Explicit lifetimes (`'a`) synchronize the lifetimes of `self`, `batch`, and the returned future. - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a; + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send; /// Shuts down the exporter. fn shutdown(&mut self) {} diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index e9c595112b..de3bf03387 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -118,7 +118,7 @@ impl LogProcessor for SimpleLogProcessor { .and_then(|exporter| { let log_tuple = &[(record as &LogRecord, instrumentation)]; let log_batch = LogBatch::new(log_tuple); - futures_executor::block_on(exporter.export(&log_batch)) + futures_executor::block_on(exporter.export(log_batch)) }); // Handle errors with specific static names match result { @@ -447,7 +447,7 @@ where .map(|log_data| (&log_data.0, &log_data.1)) .collect(); let log_batch = LogBatch::new(log_vec.as_slice()); - let export = exporter.export(&log_batch); + let export = exporter.export(log_batch); let export_result = futures_executor::block_on(export); match export_result { @@ -717,7 +717,7 @@ where .map(|log_data| (&log_data.0, &log_data.1)) .collect(); let log_batch = LogBatch::new(log_vec.as_slice()); - let export = exporter.export(&log_batch); + let export = exporter.export(log_batch); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); @@ -937,10 +937,10 @@ mod tests { impl LogExporter for MockLogExporter { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - _batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async { Ok(()) } } @@ -1443,10 +1443,10 @@ mod tests { impl LogExporter for LogExporterThatRequiresTokio { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { // Simulate minimal dependency on tokio by sleeping asynchronously for a short duration async move { tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 3cccfeaf92..dff6d93c7e 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -182,10 +182,10 @@ impl InMemoryLogExporter { impl LogExporter for InMemoryLogExporter { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async move { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; for (log_record, instrumentation) in batch.iter() { diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index f99636e356..56b6818e6c 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -32,10 +32,10 @@ impl fmt::Debug for LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async move { if self.is_shutdown.load(atomic::Ordering::SeqCst) { Err("exporter is shut down".into()) @@ -46,7 +46,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { - print_logs(batch); + print_logs(&batch); } else { println!("Resource"); if let Some(schema_url) = self.resource.schema_url() { @@ -55,7 +55,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { self.resource.iter().for_each(|(k, v)| { println!("\t -> {}={:?}", k, v); }); - print_logs(batch); + print_logs(&batch); } Ok(()) diff --git a/stress/src/logs.rs b/stress/src/logs.rs index c9e91f418c..bcec774b30 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -23,10 +23,10 @@ mod throughput; struct MockLogExporter; impl LogExporter for MockLogExporter { - fn export<'a>( - &'a self, - _batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async { Ok(()) } } } @@ -40,7 +40,7 @@ impl LogProcessor for MockLogProcessor { fn emit(&self, record: &mut opentelemetry_sdk::logs::LogRecord, scope: &InstrumentationScope) { let log_tuple = &[(record as &LogRecord, scope)]; let log_batch = LogBatch::new(log_tuple); - let _ = futures_executor::block_on(self.exporter.export(&log_batch)); + let _ = futures_executor::block_on(self.exporter.export(log_batch)); } fn force_flush(&self) -> LogResult<()> { From 158e922dc2c5a28dc952f861e2ab9c6bdec3ea71 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 20 Dec 2024 07:38:53 -0800 Subject: [PATCH 18/19] remove temporary batch variable --- opentelemetry-sdk/src/logs/log_processor.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index de3bf03387..cb5b224da5 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -117,8 +117,7 @@ impl LogProcessor for SimpleLogProcessor { .map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into())) .and_then(|exporter| { let log_tuple = &[(record as &LogRecord, instrumentation)]; - let log_batch = LogBatch::new(log_tuple); - futures_executor::block_on(exporter.export(log_batch)) + futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) }); // Handle errors with specific static names match result { @@ -446,8 +445,7 @@ where .iter() .map(|log_data| (&log_data.0, &log_data.1)) .collect(); - let log_batch = LogBatch::new(log_vec.as_slice()); - let export = exporter.export(log_batch); + let export = exporter.export(LogBatch::new(log_vec.as_slice())); let export_result = futures_executor::block_on(export); match export_result { @@ -716,8 +714,7 @@ where .iter() .map(|log_data| (&log_data.0, &log_data.1)) .collect(); - let log_batch = LogBatch::new(log_vec.as_slice()); - let export = exporter.export(log_batch); + let export = exporter.export(LogBatch::new(log_vec.as_slice())); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); From 36ea18aee77d8a78d80f4ddfed45aca0ded0c28d Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 20 Dec 2024 07:47:48 -0800 Subject: [PATCH 19/19] keep diff minimal --- opentelemetry-otlp/src/exporter/http/logs.rs | 2 +- opentelemetry-otlp/src/exporter/http/mod.rs | 2 +- opentelemetry-otlp/src/exporter/tonic/logs.rs | 2 +- opentelemetry-proto/src/transform/logs.rs | 6 +++--- opentelemetry-sdk/src/logs/log_processor.rs | 4 +--- opentelemetry-stdout/src/logs/exporter.rs | 6 +++--- stress/src/logs.rs | 3 +-- 7 files changed, 11 insertions(+), 14 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 859055ba46..9d00602eed 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -23,7 +23,7 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - let (body, content_type) = { self.build_logs_export_body(&batch)? }; + let (body, content_type) = { self.build_logs_export_body(batch)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index c146623915..4d1af8c880 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -317,7 +317,7 @@ impl OtlpHttpClient { #[cfg(feature = "logs")] fn build_logs_export_body( &self, - logs: &LogBatch<'_>, + logs: LogBatch<'_>, ) -> opentelemetry_sdk::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 3f4049cfed..053331b428 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -76,7 +76,7 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; - let resource_logs = group_logs_by_resource_and_scope(&batch, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); otel_debug!(name: "TonicsLogsClient.CallingExport"); diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index f7eedd76e2..785bdfd97f 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -182,7 +182,7 @@ pub mod tonic { } pub fn group_logs_by_resource_and_scope( - logs: &LogBatch<'_>, + logs: LogBatch<'_>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -265,7 +265,7 @@ mod tests { let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; @@ -285,7 +285,7 @@ mod tests { let log_batch = LogBatch::new(&logs); let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index cb5b224da5..3fb0ee5f42 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -9,8 +9,7 @@ use std::sync::mpsc::{self, RecvTimeoutError, SyncSender}; use opentelemetry::logs::Severity; use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; -use std::sync::atomic::Ordering; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{cmp::min, env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, @@ -613,7 +612,6 @@ impl BatchLogProcessorWithAsyncRuntime { let result = export_with_timeout( config.max_export_timeout, &mut exporter, - // Arc::clone(&exporter ), &timeout_runtime, logs.split_off(0), ) diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index cbc651b348..6313474dd1 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -46,7 +46,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { - print_logs(&batch); + print_logs(batch); } else { println!("Resource"); if let Some(schema_url) = self.resource.schema_url() { @@ -55,7 +55,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { self.resource.iter().for_each(|(k, v)| { println!("\t -> {}={:?}", k, v); }); - print_logs(&batch); + print_logs(batch); } Ok(()) @@ -72,7 +72,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { } } -fn print_logs(batch: &LogBatch<'_>) { +fn print_logs(batch: LogBatch<'_>) { for (i, log) in batch.iter().enumerate() { println!("Log #{}", i); let (record, _library) = log; diff --git a/stress/src/logs.rs b/stress/src/logs.rs index bcec774b30..2242d48eea 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -39,8 +39,7 @@ pub struct MockLogProcessor { impl LogProcessor for MockLogProcessor { fn emit(&self, record: &mut opentelemetry_sdk::logs::LogRecord, scope: &InstrumentationScope) { let log_tuple = &[(record as &LogRecord, scope)]; - let log_batch = LogBatch::new(log_tuple); - let _ = futures_executor::block_on(self.exporter.export(log_batch)); + let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); } fn force_flush(&self) -> LogResult<()> {