diff --git a/opentelemetry-appender-tracing/Cargo.toml b/opentelemetry-appender-tracing/Cargo.toml index e3c682b77d..9e831eb38f 100644 --- a/opentelemetry-appender-tracing/Cargo.toml +++ b/opentelemetry-appender-tracing/Cargo.toml @@ -24,7 +24,6 @@ opentelemetry-stdout = { path = "../opentelemetry-stdout", features = ["logs"] } opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["logs", "testing"] } tracing-subscriber = { workspace = true, features = ["registry", "std", "env-filter"] } tracing-log = "0.2" -async-trait = { workspace = true } criterion = { workspace = true } tokio = { workspace = true, features = ["full"]} diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 1c60bd82d7..e5fb98273c 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -13,7 +13,6 @@ | ot_layer_enabled | 196 ns | */ -use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer as tracing_layer; @@ -32,10 +31,13 @@ struct NoopExporter { enabled: bool, } -#[async_trait] impl LogExporter for NoopExporter { - async fn export(&self, _: LogBatch<'_>) -> LogResult<()> { - LogResult::Ok(()) + #[allow(clippy::manual_async_fn)] + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async { LogResult::Ok(()) } } fn event_enabled(&self, _: opentelemetry::logs::Severity, _: &str, _: &str) -> bool { @@ -44,17 +46,17 @@ impl LogExporter for NoopExporter { } #[derive(Debug)] -struct NoopProcessor { - exporter: Box, +struct NoopProcessor { + exporter: E, } -impl NoopProcessor { - fn new(exporter: Box) -> Self { +impl NoopProcessor { + fn new(exporter: E) -> Self { Self { exporter } } } -impl LogProcessor for NoopProcessor { +impl LogProcessor for NoopProcessor { fn emit(&self, _: &mut LogRecord, _: &InstrumentationScope) { // no-op } @@ -124,7 +126,7 @@ fn benchmark_no_subscriber(c: &mut Criterion) { fn benchmark_with_ot_layer(c: &mut Criterion, enabled: bool, bench_name: &str) { let exporter = NoopExporter { enabled }; - let processor = NoopProcessor::new(Box::new(exporter)); + let processor = NoopProcessor::new(exporter); let provider = LoggerProvider::builder() .with_resource( Resource::builder_empty() diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index a8354822e1..af752c5f8e 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -209,7 +209,6 @@ const fn severity_of_level(level: &Level) -> Severity { #[cfg(test)] mod tests { use crate::layer; - use async_trait::async_trait; use opentelemetry::logs::Severity; use opentelemetry::trace::TracerProvider as _; use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer}; @@ -245,13 +244,18 @@ mod tests { #[derive(Clone, Debug, Default)] struct ReentrantLogExporter; - #[async_trait] impl LogExporter for ReentrantLogExporter { - async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> { - // This will cause a deadlock as the export itself creates a log - // while still within the lock of the SimpleLogProcessor. - warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io"); - Ok(()) + #[allow(clippy::manual_async_fn)] + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async { + // This will cause a deadlock as the export itself creates a log + // while still within the lock of the SimpleLogProcessor. + warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io"); + Ok(()) + } } } diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 905fb638d0..9d00602eed 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; @@ -8,45 +7,50 @@ use opentelemetry_sdk::logs::{LogError, LogResult}; use super::OtlpHttpClient; -#[async_trait] impl LogExporter for OtlpHttpClient { - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { - let client = self - .client - .lock() - .map_err(|e| LogError::Other(e.to_string().into())) - .and_then(|g| match &*g { - Some(client) => Ok(Arc::clone(client)), - _ => Err(LogError::Other("exporter is already shut down".into())), - })?; - - let (body, content_type) = { self.build_logs_export_body(batch)? }; - let mut request = http::Request::builder() - .method(Method::POST) - .uri(&self.collector_endpoint) - .header(CONTENT_TYPE, content_type) - .body(body) - .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; - - for (k, v) in &self.headers { - request.headers_mut().insert(k.clone(), v.clone()); + #[allow(clippy::manual_async_fn)] + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async move { + let client = self + .client + .lock() + .map_err(|e| LogError::Other(e.to_string().into())) + .and_then(|g| match &*g { + Some(client) => Ok(Arc::clone(client)), + _ => Err(LogError::Other("exporter is already shut down".into())), + })?; + + let (body, content_type) = { self.build_logs_export_body(batch)? }; + let mut request = http::Request::builder() + .method(Method::POST) + .uri(&self.collector_endpoint) + .header(CONTENT_TYPE, content_type) + .body(body) + .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; + + for (k, v) in &self.headers { + request.headers_mut().insert(k.clone(), v.clone()); + } + + let request_uri = request.uri().to_string(); + otel_debug!(name: "HttpLogsClient.CallingExport"); + let response = client.send(request).await?; + + if !response.status().is_success() { + let error = format!( + "OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}", + response.status().as_u16(), + request_uri, + response.body() + ); + return Err(LogError::Other(error.into())); + } + + Ok(()) } - - let request_uri = request.uri().to_string(); - otel_debug!(name: "HttpLogsClient.CallingExport"); - let response = client.send(request).await?; - - if !response.status().is_success() { - let error = format!( - "OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}", - response.status().as_u16(), - request_uri, - response.body() - ); - return Err(LogError::Other(error.into())); - } - - Ok(()) } fn shutdown(&mut self) { diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index e718960686..4d1af8c880 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -28,7 +28,7 @@ use std::time::Duration; mod metrics; #[cfg(feature = "logs")] -mod logs; +pub(crate) mod logs; #[cfg(feature = "trace")] mod trace; @@ -239,7 +239,7 @@ impl HttpExporterBuilder { OTEL_EXPORTER_OTLP_LOGS_HEADERS, )?; - Ok(crate::LogExporter::new(client)) + Ok(crate::LogExporter::from_http(client)) } /// Create a metrics exporter with the current configuration @@ -265,7 +265,7 @@ impl HttpExporterBuilder { } #[derive(Debug)] -struct OtlpHttpClient { +pub(crate) struct OtlpHttpClient { client: Mutex>>, collector_endpoint: Uri, headers: HashMap, diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 2737c2bd99..053331b428 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use core::fmt; use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::logs::v1::{ @@ -56,37 +55,41 @@ impl TonicLogsClient { } } -#[async_trait] impl LogExporter for TonicLogsClient { - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { - let (mut client, metadata, extensions) = match &self.inner { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .lock() - .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here - .call(Request::new(())) - .map_err(|e| LogError::Other(Box::new(e)))? - .into_parts(); - (inner.client.clone(), m, e) - } - None => return Err(LogError::Other("exporter is already shut down".into())), - }; + #[allow(clippy::manual_async_fn)] + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async move { + let (mut client, metadata, extensions) = match &self.inner { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .lock() + .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here + .call(Request::new(())) + .map_err(|e| LogError::Other(Box::new(e)))? + .into_parts(); + (inner.client.clone(), m, e) + } + None => return Err(LogError::Other("exporter is already shut down".into())), + }; - let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); - otel_debug!(name: "TonicsLogsClient.CallingExport"); + otel_debug!(name: "TonicsLogsClient.CallingExport"); - client - .export(Request::from_parts( - metadata, - extensions, - ExportLogsServiceRequest { resource_logs }, - )) - .await - .map_err(crate::Error::from)?; - - Ok(()) + client + .export(Request::from_parts( + metadata, + extensions, + ExportLogsServiceRequest { resource_logs }, + )) + .await + .map_err(crate::Error::from)?; + Ok(()) + } } fn shutdown(&mut self) { diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index 99d2e8fa15..9e2b54c631 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -20,7 +20,7 @@ use crate::{ }; #[cfg(feature = "logs")] -mod logs; +pub(crate) mod logs; #[cfg(feature = "metrics")] mod metrics; @@ -273,7 +273,7 @@ impl TonicExporterBuilder { let client = TonicLogsClient::new(channel, interceptor, compression); - Ok(crate::logs::LogExporter::new(client)) + Ok(crate::logs::LogExporter::from_tonic(client)) } /// Build a new tonic metrics exporter diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index bb643cb095..c199aa5a96 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -2,7 +2,6 @@ //! //! Defines a [LogExporter] to send logs via the OpenTelemetry Protocol (OTLP) -use async_trait::async_trait; use opentelemetry::otel_debug; use std::fmt::Debug; @@ -108,7 +107,15 @@ impl HasHttpConfig for LogExporterBuilder { /// OTLP exporter that sends log data #[derive(Debug)] pub struct LogExporter { - client: Box, + client: SupportedTransportClient, +} + +#[derive(Debug)] +enum SupportedTransportClient { + #[cfg(feature = "grpc-tonic")] + Tonic(crate::exporter::tonic::logs::TonicLogsClient), + #[cfg(any(feature = "http-proto", feature = "http-json"))] + Http(crate::exporter::http::OtlpHttpClient), } impl LogExporter { @@ -117,21 +124,43 @@ impl LogExporter { LogExporterBuilder::default() } - /// Create a new log exporter - pub fn new(client: impl opentelemetry_sdk::export::logs::LogExporter + 'static) -> Self { + #[cfg(any(feature = "http-proto", feature = "http-json"))] + pub(crate) fn from_http(client: crate::exporter::http::OtlpHttpClient) -> Self { LogExporter { - client: Box::new(client), + client: SupportedTransportClient::Http(client), + } + } + + #[cfg(feature = "grpc-tonic")] + pub(crate) fn from_tonic(client: crate::exporter::tonic::logs::TonicLogsClient) -> Self { + LogExporter { + client: SupportedTransportClient::Tonic(client), } } } -#[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { - self.client.export(batch).await + #[allow(clippy::manual_async_fn)] + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async move { + match &self.client { + #[cfg(feature = "grpc-tonic")] + SupportedTransportClient::Tonic(client) => client.export(batch).await, + #[cfg(any(feature = "http-proto", feature = "http-json"))] + SupportedTransportClient::Http(client) => client.export(batch).await, + } + } } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { - self.client.set_resource(resource); + match &mut self.client { + #[cfg(feature = "grpc-tonic")] + SupportedTransportClient::Tonic(client) => client.set_resource(resource), + #[cfg(any(feature = "http-proto", feature = "http-json"))] + SupportedTransportClient::Http(client) => client.set_resource(resource), + } } } diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index ac33f369dc..1310189f89 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -45,7 +45,7 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] } default = ["trace", "metrics", "logs", "internal-logs"] trace = ["opentelemetry/trace", "rand", "percent-encoding"] jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"] -logs = ["opentelemetry/logs", "async-trait", "serde_json"] +logs = ["opentelemetry/logs", "serde_json"] spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"] metrics = ["opentelemetry/metrics", "glob", "async-trait"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 5e2168a7ce..902adb54dd 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -2,7 +2,6 @@ use crate::logs::LogRecord; use crate::logs::{LogError, LogResult}; use crate::Resource; -use async_trait::async_trait; #[cfg(feature = "spec_unstable_logs_enabled")] use opentelemetry::logs::Severity; use opentelemetry::InstrumentationScope; @@ -62,7 +61,6 @@ impl LogBatch<'_> { } /// `LogExporter` defines the interface that log exporters should implement. -#[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of log records and their associated instrumentation scopes. /// @@ -81,7 +79,11 @@ pub trait LogExporter: Send + Sync + Debug { /// A `LogResult<()>`, which is a result type indicating either a successful export (with /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. /// - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()>; + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send; + /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 47d410c381..3fb0ee5f42 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -317,7 +317,10 @@ impl LogProcessor for BatchLogProcessor { } impl BatchLogProcessor { - pub(crate) fn new(mut exporter: Box, config: BatchConfig) -> Self { + pub(crate) fn new(mut exporter: E, config: BatchConfig) -> Self + where + E: LogExporter + Send + Sync + 'static, + { let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size); let max_queue_size = config.max_queue_size; @@ -345,7 +348,7 @@ impl BatchLogProcessor { { let _ = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -354,7 +357,7 @@ impl BatchLogProcessor { Ok(BatchMessage::ForceFlush(sender)) => { let result = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -363,7 +366,7 @@ impl BatchLogProcessor { Ok(BatchMessage::Shutdown(sender)) => { let result = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -380,7 +383,7 @@ impl BatchLogProcessor { Err(RecvTimeoutError::Timeout) => { let _ = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -476,7 +479,7 @@ where /// Build a batch processor pub fn build(self) -> BatchLogProcessor { - BatchLogProcessor::new(Box::new(self.exporter), self.config) + BatchLogProcessor::new(self.exporter, self.config) } } @@ -560,7 +563,6 @@ impl LogProcessor for BatchLogProcessorWithAsyncRuntime { message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." ); } - let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessageWithAsyncRuntime::Shutdown(res_sender)) @@ -581,11 +583,13 @@ impl LogProcessor for BatchLogProcessorWithAsyncRuntime { #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] impl BatchLogProcessorWithAsyncRuntime { - pub(crate) fn new(mut exporter: Box, config: BatchConfig, runtime: R) -> Self { + pub(crate) fn new(mut exporter: E, config: BatchConfig, runtime: R) -> Self + where + E: LogExporter + Send + Sync + 'static, + { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); let inner_runtime = runtime.clone(); - let max_queue_size = config.max_queue_size; // Spawn worker process via user-defined spawn function. runtime.spawn(Box::pin(async move { @@ -607,7 +611,7 @@ impl BatchLogProcessorWithAsyncRuntime { if logs.len() == config.max_export_batch_size { let result = export_with_timeout( config.max_export_timeout, - exporter.as_mut(), + &mut exporter, &timeout_runtime, logs.split_off(0), ) @@ -625,7 +629,7 @@ impl BatchLogProcessorWithAsyncRuntime { BatchMessageWithAsyncRuntime::Flush(res_channel) => { let result = export_with_timeout( config.max_export_timeout, - exporter.as_mut(), + &mut exporter, &timeout_runtime, logs.split_off(0), ) @@ -644,7 +648,7 @@ impl BatchLogProcessorWithAsyncRuntime { BatchMessageWithAsyncRuntime::Shutdown(ch) => { let result = export_with_timeout( config.max_export_timeout, - exporter.as_mut(), + &mut exporter, &timeout_runtime, logs.split_off(0), ) @@ -667,12 +671,11 @@ impl BatchLogProcessorWithAsyncRuntime { } } })); - // Return batch processor with link to worker BatchLogProcessorWithAsyncRuntime { message_sender, dropped_logs_count: AtomicUsize::new(0), - max_queue_size, + max_queue_size: config.max_queue_size, } } @@ -690,7 +693,7 @@ impl BatchLogProcessorWithAsyncRuntime { } #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] -async fn export_with_timeout( +async fn export_with_timeout( time_out: Duration, exporter: &mut E, runtime: &R, @@ -883,7 +886,7 @@ where /// Build a batch processor pub fn build(self) -> BatchLogProcessorWithAsyncRuntime { - BatchLogProcessorWithAsyncRuntime::new(Box::new(self.exporter), self.config, self.runtime) + BatchLogProcessorWithAsyncRuntime::new(self.exporter, self.config, self.runtime) } } @@ -908,7 +911,6 @@ mod tests { testing::logs::InMemoryLogExporter, Resource, }; - use async_trait::async_trait; use opentelemetry::logs::AnyValue; use opentelemetry::logs::LogRecord as _; use opentelemetry::logs::{Logger, LoggerProvider as _}; @@ -928,10 +930,13 @@ mod tests { resource: Arc>>, } - #[async_trait] impl LogExporter for MockLogExporter { - async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> { - Ok(()) + #[allow(clippy::manual_async_fn)] + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async { Ok(()) } } fn shutdown(&mut self) {} @@ -1129,7 +1134,7 @@ mod tests { let exporter = MockLogExporter { resource: Arc::new(Mutex::new(None)), }; - let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); let provider = LoggerProvider::builder() .with_log_processor(processor) .with_resource( @@ -1159,7 +1164,7 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default() .keep_records_on_shutdown() .build(); - let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); let mut record = LogRecord::default(); let instrumentation = InstrumentationScope::default(); @@ -1199,7 +1204,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); processor.shutdown().unwrap(); } @@ -1211,7 +1216,7 @@ mod tests { ) { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::Tokio, ); @@ -1225,24 +1230,21 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); - + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); processor.shutdown().unwrap(); } #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); - + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); processor.shutdown().unwrap(); } #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); - + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); processor.shutdown().unwrap(); } @@ -1434,16 +1436,21 @@ mod tests { } } - #[async_trait::async_trait] impl LogExporter for LogExporterThatRequiresTokio { - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { + #[allow(clippy::manual_async_fn)] + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { // Simulate minimal dependency on tokio by sleeping asynchronously for a short duration - tokio::time::sleep(Duration::from_millis(50)).await; + async move { + tokio::time::sleep(Duration::from_millis(50)).await; - for _ in batch.iter() { - self.export_count.fetch_add(1, Ordering::Acquire); + for _ in batch.iter() { + self.export_count.fetch_add(1, Ordering::Acquire); + } + Ok(()) } - Ok(()) } } @@ -1652,7 +1659,7 @@ mod tests { resource: Arc::new(Mutex::new(None)), }; let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::Tokio, ); @@ -1680,7 +1687,7 @@ mod tests { .keep_records_on_shutdown() .build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::Tokio, ); @@ -1702,7 +1709,7 @@ mod tests { async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::Tokio, ); @@ -1719,7 +1726,7 @@ mod tests { { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::TokioCurrentThread, ); @@ -1732,7 +1739,7 @@ mod tests { async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::Tokio, ); @@ -1745,7 +1752,7 @@ mod tests { async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessorWithAsyncRuntime::new( - Box::new(exporter.clone()), + exporter.clone(), BatchConfig::default(), runtime::TokioCurrentThread, ); diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index cbc16bdfa3..dff6d93c7e 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -2,7 +2,6 @@ use crate::export::logs::{LogBatch, LogExporter}; use crate::logs::LogRecord; use crate::logs::{LogError, LogResult}; use crate::Resource; -use async_trait::async_trait; use opentelemetry::InstrumentationScope; use std::borrow::Cow; use std::sync::{Arc, Mutex}; @@ -181,18 +180,23 @@ impl InMemoryLogExporter { } } -#[async_trait] impl LogExporter for InMemoryLogExporter { - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { - let mut logs_guard = self.logs.lock().map_err(LogError::from)?; - for (log_record, instrumentation) in batch.iter() { - let owned_log = OwnedLogData { - record: (*log_record).clone(), - instrumentation: (*instrumentation).clone(), - }; - logs_guard.push(owned_log); + #[allow(clippy::manual_async_fn)] + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async move { + let mut logs_guard = self.logs.lock().map_err(LogError::from)?; + for (log_record, instrumentation) in batch.iter() { + let owned_log = OwnedLogData { + record: (*log_record).clone(), + instrumentation: (*instrumentation).clone(), + }; + logs_guard.push(owned_log); + } + Ok(()) } - Ok(()) } fn shutdown(&mut self) { diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index f881f3adb2..6313474dd1 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::fmt; use opentelemetry_sdk::export::logs::LogBatch; @@ -30,33 +29,37 @@ impl fmt::Debug for LogExporter { } } -#[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { - if self.is_shutdown.load(atomic::Ordering::SeqCst) { - return Err("exporter is shut down".into()); - } else { - println!("Logs"); - if self - .resource_emitted - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_err() - { - print_logs(batch); + #[allow(clippy::manual_async_fn)] + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async move { + if self.is_shutdown.load(atomic::Ordering::SeqCst) { + Err("exporter is shut down".into()) } else { - println!("Resource"); - if let Some(schema_url) = self.resource.schema_url() { - println!("\t Resource SchemaUrl: {:?}", schema_url); + println!("Logs"); + if self + .resource_emitted + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + print_logs(batch); + } else { + println!("Resource"); + if let Some(schema_url) = self.resource.schema_url() { + println!("\t Resource SchemaUrl: {:?}", schema_url); + } + self.resource.iter().for_each(|(k, v)| { + println!("\t -> {}={:?}", k, v); + }); + print_logs(batch); } - self.resource.iter().for_each(|(k, v)| { - println!("\t -> {}={:?}", k, v); - }); - print_logs(batch); + Ok(()) } - - Ok(()) } } diff --git a/stress/Cargo.toml b/stress/Cargo.toml index b4b86ba330..f86769da5a 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -51,7 +51,6 @@ tracing = { workspace = true, features = ["std"]} tracing-subscriber = { workspace = true, features = ["registry", "std"] } num-format = "0.4.4" sysinfo = { version = "0.32", optional = true } -async-trait = "0.1.51" futures-executor = { workspace = true } [features] diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 87d5e2c6ed..2242d48eea 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -13,19 +13,21 @@ use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer; use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, LoggerProvider}; + use tracing::error; use tracing_subscriber::prelude::*; mod throughput; -use async_trait::async_trait; #[derive(Debug, Clone)] struct MockLogExporter; -#[async_trait] impl LogExporter for MockLogExporter { - async fn export(&self, _: LogBatch<'_>) -> LogResult<()> { - LogResult::Ok(()) + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { + async { Ok(()) } } } @@ -40,11 +42,11 @@ impl LogProcessor for MockLogProcessor { let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); } - fn force_flush(&self) -> opentelemetry_sdk::logs::LogResult<()> { + fn force_flush(&self) -> LogResult<()> { Ok(()) } - fn shutdown(&self) -> opentelemetry_sdk::logs::LogResult<()> { + fn shutdown(&self) -> LogResult<()> { Ok(()) } }