From da42d53bb008aacf30b26796789f151aae4c42b0 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 20 Dec 2024 10:10:12 -0800 Subject: [PATCH 1/3] initial commit --- opentelemetry-sdk/src/logs/log_emitter.rs | 8 +++ opentelemetry-sdk/src/logs/log_processor.rs | 63 +++------------------ 2 files changed, 17 insertions(+), 54 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 479ca36dd2..28e18f78c6 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -268,6 +268,14 @@ impl opentelemetry::logs::Logger for Logger { /// Emit a `LogRecord`. fn emit(&self, mut record: Self::LogRecord) { + if self.provider.inner.is_shutdown.load(Ordering::Relaxed) { + // Optionally, log a debug message indicating logs are being discarded due to shutdown. + otel_debug!( + name: "Logger.Emit.Discarded", + message = "Log discarded because the LoggerProvider is shut down." + ); + return; + } let provider = &self.provider; let processors = provider.log_processors(); diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 3fb0ee5f42..fcbae1b29b 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -9,7 +9,7 @@ use std::sync::mpsc::{self, RecvTimeoutError, SyncSender}; use opentelemetry::logs::Severity; use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::{cmp::min, env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, @@ -87,29 +87,18 @@ pub trait LogProcessor: Send + Sync + Debug { #[derive(Debug)] pub struct SimpleLogProcessor { exporter: Mutex, - is_shutdown: AtomicBool, } impl SimpleLogProcessor { pub(crate) fn new(exporter: T) -> Self { SimpleLogProcessor { exporter: Mutex::new(exporter), - is_shutdown: AtomicBool::new(false), } } } impl LogProcessor for SimpleLogProcessor { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { - // noop after shutdown - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - // this is a warning, as the user is trying to log after the processor has been shutdown - otel_warn!( - name: "SimpleLogProcessor.Emit.ProcessorShutdown", - ); - return; - } - let result = self .exporter .lock() @@ -141,8 +130,6 @@ impl LogProcessor for SimpleLogProcessor { } fn shutdown(&self) -> LogResult<()> { - self.is_shutdown - .store(true, std::sync::atomic::Ordering::Relaxed); if let Ok(mut exporter) = self.exporter.lock() { exporter.shutdown(); Ok(()) @@ -182,7 +169,6 @@ pub struct BatchLogProcessor { handle: Mutex>>, forceflush_timeout: Duration, shutdown_timeout: Duration, - is_shutdown: AtomicBool, // Track dropped logs - we'll log this at shutdown dropped_logs_count: AtomicUsize, @@ -201,15 +187,6 @@ impl Debug for BatchLogProcessor { impl LogProcessor for BatchLogProcessor { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { - // noop after shutdown - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - otel_warn!( - name: "BatchLogProcessor.Emit.ProcessorShutdown", - message = "BatchLogProcessor has been shutdown. No further logs will be emitted." - ); - return; - } - let result = self .message_sender .try_send(BatchMessage::ExportLog(Box::new(( @@ -229,11 +206,6 @@ impl LogProcessor for BatchLogProcessor { } fn force_flush(&self) -> LogResult<()> { - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - return LogResult::Err(LogError::Other( - "BatchLogProcessor is already shutdown".into(), - )); - } let (sender, receiver) = mpsc::sync_channel(1); self.message_sender .try_send(BatchMessage::ForceFlush(sender)) @@ -251,20 +223,6 @@ impl LogProcessor for BatchLogProcessor { } fn shutdown(&self) -> LogResult<()> { - // test and set is_shutdown flag if it is not set - if self - .is_shutdown - .swap(true, std::sync::atomic::Ordering::Relaxed) - { - otel_warn!( - name: "BatchLogProcessor.Shutdown.ProcessorShutdown", - message = "BatchLogProcessor has been shutdown. No further logs will be emitted." - ); - return LogResult::Err(LogError::AlreadyShutdown( - "BatchLogProcessor is already shutdown".into(), - )); - } - let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_logs > 0 { @@ -406,7 +364,6 @@ impl BatchLogProcessor { handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable - is_shutdown: AtomicBool::new(false), dropped_logs_count: AtomicUsize::new(0), max_queue_size, } @@ -1183,20 +1140,18 @@ mod tests { .keep_records_on_shutdown() .build(); let processor = SimpleLogProcessor::new(exporter.clone()); + let provider = LoggerProvider::builder() + .with_log_processor(processor) + .build(); + let logger = provider.logger("test-simple-logger"); - let mut record: LogRecord = Default::default(); - let instrumentation: InstrumentationScope = Default::default(); - - processor.emit(&mut record, &instrumentation); + let record: LogRecord = Default::default(); - processor.shutdown().unwrap(); + logger.emit(record.clone()); - let is_shutdown = processor - .is_shutdown - .load(std::sync::atomic::Ordering::Relaxed); - assert!(is_shutdown); + provider.shutdown().unwrap(); - processor.emit(&mut record, &instrumentation); + logger.emit(record); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) } From cabb4da1418041c8c60c7420f00cb0910c7805b5 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 20 Dec 2024 10:13:37 -0800 Subject: [PATCH 2/3] fix error name --- opentelemetry-sdk/src/logs/log_emitter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 28e18f78c6..77d661a473 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -271,7 +271,7 @@ impl opentelemetry::logs::Logger for Logger { if self.provider.inner.is_shutdown.load(Ordering::Relaxed) { // Optionally, log a debug message indicating logs are being discarded due to shutdown. otel_debug!( - name: "Logger.Emit.Discarded", + name: "Logger.Emit.ProviderShutdown", message = "Log discarded because the LoggerProvider is shut down." ); return; From 54543bc872874cfe0d3330b95cb61c322712dc29 Mon Sep 17 00:00:00 2001 From: Lalit Date: Sat, 21 Dec 2024 08:09:17 +0000 Subject: [PATCH 3/3] add benchmark --- opentelemetry-sdk/benches/log.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index 99083fbdae..17f43dc6b7 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -126,10 +126,36 @@ fn logging_comparable_to_appender(c: &mut Criterion) { }); } +fn logger_emit(c: &mut Criterion) { + // Provider is created once, outside of the benchmark + let provider = LoggerProvider::builder() + .with_log_processor(NoopProcessor {}) + .build(); + + let logger = provider.logger("benchmark_emit"); + + // Create the log record once + let mut log_record = logger.create_log_record(); + log_record.set_body("simple log".into()); + + // Convert log_record into a raw pointer + let log_record_ptr: *mut _ = &mut log_record; + + c.bench_function("logger_emit", |b| { + b.iter(|| { + unsafe { + // Dereference the raw pointer to pass it to emit + logger.emit(std::ptr::read(log_record_ptr)); + } + }); + }); +} + fn criterion_benchmark(c: &mut Criterion) { logger_creation(c); log_provider_creation(c); logging_comparable_to_appender(c); + logger_emit(c); log_benchmark_group(c, "simple-log", |logger| { let mut log_record = logger.create_log_record(); log_record.set_body("simple log".into());