Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use native Rust support for async traits in LogExporter::export() method (11% improvement) #2374

Merged
merged 34 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d454536
initial commit
lalitb Dec 1, 2024
3ed482d
futher changes..
lalitb Dec 1, 2024
a2aa648
changes..
lalitb Dec 2, 2024
8ba1173
initial change
lalitb Dec 2, 2024
37823e7
stdout exporter
lalitb Dec 2, 2024
3b4cd57
update stress
lalitb Dec 2, 2024
858cd87
Merge branch 'main' into log-async-trait-impl
lalitb Dec 3, 2024
e6472bf
Merge branch 'main' into log-async-trait-impl
lalitb Dec 6, 2024
8cacf52
fix otlp
lalitb Dec 6, 2024
dfac978
Merge branch 'main' into log-async-trait-impl
lalitb Dec 13, 2024
f752d50
add comment
lalitb Dec 13, 2024
6ac8aa0
Merge branch 'main' into log-async-trait-impl
cijothomas Dec 13, 2024
4924e3b
Merge branch 'main' into log-async-trait-impl
TommyCpp Dec 13, 2024
8db475c
review comment
lalitb Dec 15, 2024
4a6e36f
Merge branch 'main' into log-async-trait-impl
lalitb Dec 15, 2024
3af54c3
Merge branch 'main' into log-async-trait-impl
lalitb Dec 15, 2024
8c34380
Merge branch 'main' into log-async-trait-impl
lalitb Dec 16, 2024
87c6c9d
Merge branch 'main' into log-async-trait-impl
cijothomas Dec 16, 2024
be7ecec
Merge branch 'main' into log-async-trait-impl
lalitb Dec 17, 2024
5c8c644
Merge branch 'main' into log-async-trait-impl
lalitb Dec 19, 2024
d1f3cbb
resolve conflicts
lalitb Dec 19, 2024
fa11cd7
initial commit
lalitb Dec 19, 2024
67f006a
further conflicts
lalitb Dec 19, 2024
85f653c
remove unwantd comments
lalitb Dec 19, 2024
33b7572
Merge branch 'main' into log-async-trait-impl
cijothomas Dec 19, 2024
0d5a17c
merge conflict
lalitb Dec 19, 2024
e8333f5
Merge branch 'main' into log-async-trait-impl
lalitb Dec 20, 2024
4e4a189
Remove async_trait import and attribute
lalitb Dec 20, 2024
7627224
remove unused crate
lalitb Dec 20, 2024
09f7cb8
Merge branch 'main' into log-async-trait-impl
lalitb Dec 20, 2024
ddb15bf
update export to take batch by value
lalitb Dec 20, 2024
5e025d2
Merge branch 'log-async-trait-impl' of github.com:lalitb/opentelemetr…
lalitb Dec 20, 2024
158e922
remove temporary batch variable
lalitb Dec 20, 2024
36ea18a
keep diff minimal
lalitb Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion opentelemetry-appender-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ opentelemetry-stdout = { path = "../opentelemetry-stdout", features = ["logs"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["logs", "testing"] }
tracing-subscriber = { workspace = true, features = ["registry", "std", "env-filter"] }
tracing-log = "0.2"
async-trait = { workspace = true }
criterion = { workspace = true }
tokio = { workspace = true, features = ["full"]}

Expand Down
22 changes: 12 additions & 10 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
| ot_layer_enabled | 196 ns |
*/

use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::InstrumentationScope;
use opentelemetry_appender_tracing::layer as tracing_layer;
Expand All @@ -32,10 +31,13 @@ struct NoopExporter {
enabled: bool,
}

#[async_trait]
impl LogExporter for NoopExporter {
async fn export(&self, _: LogBatch<'_>) -> LogResult<()> {
LogResult::Ok(())
#[allow(clippy::manual_async_fn)]
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async { LogResult::Ok(()) }
}

fn event_enabled(&self, _: opentelemetry::logs::Severity, _: &str, _: &str) -> bool {
Expand All @@ -44,17 +46,17 @@ impl LogExporter for NoopExporter {
}

#[derive(Debug)]
struct NoopProcessor {
exporter: Box<dyn LogExporter>,
struct NoopProcessor<E: LogExporter> {
exporter: E,
}

impl NoopProcessor {
fn new(exporter: Box<dyn LogExporter>) -> Self {
impl<E: LogExporter> NoopProcessor<E> {
fn new(exporter: E) -> Self {
Self { exporter }
}
}

impl LogProcessor for NoopProcessor {
impl<E: LogExporter> LogProcessor for NoopProcessor<E> {
fn emit(&self, _: &mut LogRecord, _: &InstrumentationScope) {
// no-op
}
Expand Down Expand Up @@ -124,7 +126,7 @@ fn benchmark_no_subscriber(c: &mut Criterion) {

fn benchmark_with_ot_layer(c: &mut Criterion, enabled: bool, bench_name: &str) {
let exporter = NoopExporter { enabled };
let processor = NoopProcessor::new(Box::new(exporter));
let processor = NoopProcessor::new(exporter);
let provider = LoggerProvider::builder()
.with_resource(
Resource::builder_empty()
Expand Down
18 changes: 11 additions & 7 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ const fn severity_of_level(level: &Level) -> Severity {
#[cfg(test)]
mod tests {
use crate::layer;
use async_trait::async_trait;
use opentelemetry::logs::Severity;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer};
Expand Down Expand Up @@ -245,13 +244,18 @@ mod tests {
#[derive(Clone, Debug, Default)]
struct ReentrantLogExporter;

#[async_trait]
impl LogExporter for ReentrantLogExporter {
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())
#[allow(clippy::manual_async_fn)]
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())
}
}
}

Expand Down
80 changes: 42 additions & 38 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,56 @@
use std::sync::Arc;

use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::export::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogError, LogResult};

use super::OtlpHttpClient;

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let client = self
.client
.lock()
.map_err(|e| LogError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = { self.build_logs_export_body(batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
let client = self
.client
.lock()
.map_err(|e| LogError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = { self.build_logs_export_body(&batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}

let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.CallingExport");
let response = client.send(request).await?;

if !response.status().is_success() {
let error = format!(
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
response.status().as_u16(),
request_uri,
response.body()
);
return Err(LogError::Other(error.into()));
}

Ok(())
}

let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.CallingExport");
let response = client.send(request).await?;

if !response.status().is_success() {
let error = format!(
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
response.status().as_u16(),
request_uri,
response.body()
);
return Err(LogError::Other(error.into()));
}

Ok(())
}

fn shutdown(&mut self) {
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::time::Duration;
mod metrics;

#[cfg(feature = "logs")]
mod logs;
pub(crate) mod logs;

#[cfg(feature = "trace")]
mod trace;
Expand Down Expand Up @@ -239,7 +239,7 @@ impl HttpExporterBuilder {
OTEL_EXPORTER_OTLP_LOGS_HEADERS,
)?;

Ok(crate::LogExporter::new(client))
Ok(crate::LogExporter::from_http(client))
}

/// Create a metrics exporter with the current configuration
Expand All @@ -265,7 +265,7 @@ impl HttpExporterBuilder {
}

#[derive(Debug)]
struct OtlpHttpClient {
pub(crate) struct OtlpHttpClient {
client: Mutex<Option<Arc<dyn HttpClient>>>,
collector_endpoint: Uri,
headers: HashMap<HeaderName, HeaderValue>,
Expand Down Expand Up @@ -317,7 +317,7 @@ impl OtlpHttpClient {
#[cfg(feature = "logs")]
fn build_logs_export_body(
&self,
logs: LogBatch<'_>,
logs: &LogBatch<'_>,
) -> opentelemetry_sdk::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
Expand Down
59 changes: 31 additions & 28 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use core::fmt;
use opentelemetry::otel_debug;
use opentelemetry_proto::tonic::collector::logs::v1::{
Expand Down Expand Up @@ -56,37 +55,41 @@ impl TonicLogsClient {
}
}

#[async_trait]
impl LogExporter for TonicLogsClient {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| LogError::Other(Box::new(e)))?
.into_parts();
(inner.client.clone(), m, e)
}
None => return Err(LogError::Other("exporter is already shut down".into())),
};
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| LogError::Other(Box::new(e)))?
.into_parts();
(inner.client.clone(), m, e)
}
None => return Err(LogError::Other("exporter is already shut down".into())),
};

let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(&batch, &self.resource);

otel_debug!(name: "TonicsLogsClient.CallingExport");
otel_debug!(name: "TonicsLogsClient.CallingExport");

client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(crate::Error::from)?;

Ok(())
client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(crate::Error::from)?;
Ok(())
}
}

fn shutdown(&mut self) {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
};

#[cfg(feature = "logs")]
mod logs;
pub(crate) mod logs;

#[cfg(feature = "metrics")]
mod metrics;
Expand Down Expand Up @@ -273,7 +273,7 @@ impl TonicExporterBuilder {

let client = TonicLogsClient::new(channel, interceptor, compression);

Ok(crate::logs::LogExporter::new(client))
Ok(crate::logs::LogExporter::from_tonic(client))
}

/// Build a new tonic metrics exporter
Expand Down
47 changes: 38 additions & 9 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//!
//! Defines a [LogExporter] to send logs via the OpenTelemetry Protocol (OTLP)

use async_trait::async_trait;
use opentelemetry::otel_debug;
use std::fmt::Debug;

Expand Down Expand Up @@ -108,7 +107,15 @@ impl HasHttpConfig for LogExporterBuilder<HttpExporterBuilderSet> {
/// OTLP exporter that sends log data
#[derive(Debug)]
pub struct LogExporter {
client: Box<dyn opentelemetry_sdk::export::logs::LogExporter>,
client: SupportedTransportClient,
}

#[derive(Debug)]
enum SupportedTransportClient {
#[cfg(feature = "grpc-tonic")]
Tonic(crate::exporter::tonic::logs::TonicLogsClient),
#[cfg(any(feature = "http-proto", feature = "http-json"))]
Http(crate::exporter::http::OtlpHttpClient),
}

impl LogExporter {
Expand All @@ -117,21 +124,43 @@ impl LogExporter {
LogExporterBuilder::default()
}

/// Create a new log exporter
pub fn new(client: impl opentelemetry_sdk::export::logs::LogExporter + 'static) -> Self {
#[cfg(any(feature = "http-proto", feature = "http-json"))]
pub(crate) fn from_http(client: crate::exporter::http::OtlpHttpClient) -> Self {
LogExporter {
client: Box::new(client),
client: SupportedTransportClient::Http(client),
}
}

#[cfg(feature = "grpc-tonic")]
pub(crate) fn from_tonic(client: crate::exporter::tonic::logs::TonicLogsClient) -> Self {
LogExporter {
client: SupportedTransportClient::Tonic(client),
}
}
}

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
self.client.export(batch).await
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
match &self.client {
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.export(batch).await,
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.export(batch).await,
}
}
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.client.set_resource(resource);
match &mut self.client {
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.set_resource(resource),
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.set_resource(resource),
}
}
}
Loading
Loading