From e5d35497bd1f134b3d24be4084e983a305acdaf5 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Tue, 7 Jan 2025 09:46:13 +0200 Subject: [PATCH] Init aggregation data utils --- .../src/metrics/internal/aggregate.rs | 61 ++++++++++++- .../metrics/internal/exponential_histogram.rs | 73 ++++++++-------- .../src/metrics/internal/histogram.rs | 70 +++++++-------- .../src/metrics/internal/last_value.rs | 75 +++++++--------- .../src/metrics/internal/precomputed_sum.rs | 86 ++++++++----------- opentelemetry-sdk/src/metrics/internal/sum.rs | 85 ++++++++---------- stress/src/logs.rs | 7 +- 7 files changed, 227 insertions(+), 230 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index fc9d5975c3..525d3d4917 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -1,7 +1,7 @@ use std::{ marker, mem::replace, - ops::DerefMut, + ops::{Deref, DerefMut}, sync::{Arc, Mutex}, time::SystemTime, }; @@ -121,6 +121,65 @@ impl AttributeSetFilter { } } +pub(crate) trait InitAggregationData { + type Aggr; + fn create_new(&self, time: AggregateTime) -> Self::Aggr; + fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime); +} + +pub(crate) enum AggregationData<'a, Aggr> { + Existing(&'a mut Aggr), + New(Aggr), +} + +impl<'a, Aggr> AggregationData<'a, Aggr> +where + Aggr: Aggregation, +{ + pub(crate) fn init( + init: &impl InitAggregationData, + existing: Option<&'a mut dyn Aggregation>, + time: AggregateTime, + ) -> Self { + match existing.and_then(|aggr| aggr.as_mut().downcast_mut::()) { + Some(existing) => { + init.reset_existing(existing, time); + AggregationData::Existing(existing) + } + None => AggregationData::New(init.create_new(time)), + } + } + + pub(crate) fn into_new_boxed(self) -> Option> { + match self { + AggregationData::Existing(_) => None, + AggregationData::New(aggregation) => { + Some(Box::new(aggregation) as Box) + } + } + } +} + +impl Deref for AggregationData<'_, Aggr> { + type Target = Aggr; + + fn deref(&self) -> &Self::Target { + match self { + AggregationData::Existing(existing) => existing, + AggregationData::New(new) => new, + } + } +} + +impl DerefMut for AggregationData<'_, Aggr> { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + AggregationData::Existing(existing) => existing, + AggregationData::New(new) => new, + } + } +} + /// Builds aggregate functions pub(crate) struct AggregateBuilder { /// The temporality used for the returned aggregate functions. diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index 170f4a068d..273c185cff 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -9,7 +9,10 @@ use crate::metrics::{ }; use super::{ - aggregate::{AggregateTimeInitiator, AttributeSetFilter}, + aggregate::{ + AggregateTime, AggregateTimeInitiator, AggregationData, AttributeSetFilter, + InitAggregationData, + }, Aggregator, ComputeAggregation, Measure, Number, ValueMap, }; @@ -384,26 +387,10 @@ impl ExpoHistogram { } fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { - let time = self.init_time.delta(); - - let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if h.is_none() { - Some(data::ExponentialHistogram { - data_points: vec![], - start_time: time.start, - time: time.current, - temporality: Temporality::Delta, - }) - } else { - None - }; - let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); - h.temporality = Temporality::Delta; - h.start_time = time.start; - h.time = time.current; + let mut s_data = AggregationData::init(self, dest, self.init_time.delta()); self.value_map - .collect_and_reset(&mut h.data_points, |attributes, attr| { + .collect_and_reset(&mut s_data.data_points, |attributes, attr| { let b = attr.into_inner().unwrap_or_else(|err| err.into_inner()); data::ExponentialHistogramDataPoint { attributes, @@ -434,33 +421,17 @@ impl ExpoHistogram { } }); - (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) + (s_data.data_points.len(), s_data.into_new_boxed()) } fn cumulative( &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = self.init_time.cumulative(); - - let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if h.is_none() { - Some(data::ExponentialHistogram { - data_points: vec![], - start_time: time.start, - time: time.current, - temporality: Temporality::Cumulative, - }) - } else { - None - }; - let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); - h.temporality = Temporality::Cumulative; - h.start_time = time.start; - h.time = time.current; + let mut s_data = AggregationData::init(self, dest, self.init_time.cumulative()); self.value_map - .collect_readonly(&mut h.data_points, |attributes, attr| { + .collect_readonly(&mut s_data.data_points, |attributes, attr| { let b = attr.lock().unwrap_or_else(|err| err.into_inner()); data::ExponentialHistogramDataPoint { attributes, @@ -491,7 +462,7 @@ impl ExpoHistogram { } }); - (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) + (s_data.data_points.len(), s_data.into_new_boxed()) } } @@ -524,6 +495,30 @@ where } } } + +impl InitAggregationData for ExpoHistogram +where + T: Number, +{ + type Aggr = data::ExponentialHistogram; + + fn create_new(&self, time: AggregateTime) -> Self::Aggr { + data::ExponentialHistogram { + data_points: vec![], + start_time: time.start, + time: time.current, + temporality: self.temporality, + } + } + + fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime) { + existing.data_points.clear(); + existing.start_time = time.start; + existing.time = time.current; + existing.temporality = self.temporality; + } +} + #[cfg(test)] mod tests { use std::{ops::Neg, time::SystemTime}; diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 988f8cf359..30e936701a 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -7,8 +7,10 @@ use crate::metrics::data::{self, Aggregation}; use crate::metrics::Temporality; use opentelemetry::KeyValue; -use super::aggregate::AggregateTimeInitiator; use super::aggregate::AttributeSetFilter; +use super::aggregate::{ + AggregateTime, AggregateTimeInitiator, AggregationData, InitAggregationData, +}; use super::ComputeAggregation; use super::Measure; use super::ValueMap; @@ -108,26 +110,10 @@ impl Histogram { } fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { - let time = self.init_time.delta(); - - let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if h.is_none() { - Some(data::Histogram { - data_points: vec![], - start_time: time.start, - time: time.current, - temporality: Temporality::Delta, - }) - } else { - None - }; - let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); - h.temporality = Temporality::Delta; - h.start_time = time.start; - h.time = time.current; + let mut s_data = AggregationData::init(self, dest, self.init_time.delta()); self.value_map - .collect_and_reset(&mut h.data_points, |attributes, aggr| { + .collect_and_reset(&mut s_data.data_points, |attributes, aggr| { let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner()); HistogramDataPoint { attributes, @@ -153,32 +139,17 @@ impl Histogram { } }); - (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) + (s_data.data_points.len(), s_data.into_new_boxed()) } fn cumulative( &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = self.init_time.cumulative(); - let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if h.is_none() { - Some(data::Histogram { - data_points: vec![], - start_time: time.start, - time: time.current, - temporality: Temporality::Cumulative, - }) - } else { - None - }; - let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); - h.temporality = Temporality::Cumulative; - h.start_time = time.start; - h.time = time.current; + let mut s_data = AggregationData::init(self, dest, self.init_time.cumulative()); self.value_map - .collect_readonly(&mut h.data_points, |attributes, aggr| { + .collect_readonly(&mut s_data.data_points, |attributes, aggr| { let b = aggr.lock().unwrap_or_else(|err| err.into_inner()); HistogramDataPoint { attributes, @@ -204,7 +175,7 @@ impl Histogram { } }); - (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) + (s_data.data_points.len(), s_data.into_new_boxed()) } } @@ -239,6 +210,29 @@ where } } +impl InitAggregationData for Histogram +where + T: Number, +{ + type Aggr = data::Histogram; + + fn create_new(&self, time: AggregateTime) -> Self::Aggr { + data::Histogram { + data_points: vec![], + start_time: time.start, + time: time.current, + temporality: self.temporality, + } + } + + fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime) { + existing.data_points.clear(); + existing.start_time = time.start; + existing.time = time.current; + existing.temporality = self.temporality; + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index cc2176b897..da6a3fa228 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -5,7 +5,10 @@ use crate::metrics::{ use opentelemetry::KeyValue; use super::{ - aggregate::{AggregateTimeInitiator, AttributeSetFilter}, + aggregate::{ + AggregateTime, AggregateTimeInitiator, AggregationData, AttributeSetFilter, + InitAggregationData, + }, Aggregator, AtomicTracker, AtomicallyUpdate, ComputeAggregation, Measure, Number, ValueMap, }; @@ -59,25 +62,8 @@ impl LastValue { } } - pub(crate) fn delta( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { - let time = self.init_time.delta(); - - let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if s_data.is_none() { - Some(data::Gauge { - data_points: vec![], - start_time: Some(time.start), - time: time.current, - }) - } else { - None - }; - let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.start_time = Some(time.start); - s_data.time = time.current; + fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + let mut s_data = AggregationData::init(self, dest, self.init_time.delta()); self.value_map .collect_and_reset(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint { @@ -86,31 +72,14 @@ impl LastValue { exemplars: vec![], }); - ( - s_data.data_points.len(), - new_agg.map(|a| Box::new(a) as Box<_>), - ) + (s_data.data_points.len(), s_data.into_new_boxed()) } - pub(crate) fn cumulative( + fn cumulative( &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = self.init_time.cumulative(); - let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if s_data.is_none() { - Some(data::Gauge { - data_points: vec![], - start_time: Some(time.start), - time: time.current, - }) - } else { - None - }; - let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - - s_data.start_time = Some(time.start); - s_data.time = time.current; + let mut s_data = AggregationData::init(self, dest, self.init_time.cumulative()); self.value_map .collect_readonly(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint { @@ -119,10 +88,7 @@ impl LastValue { exemplars: vec![], }); - ( - s_data.data_points.len(), - new_agg.map(|a| Box::new(a) as Box<_>), - ) + (s_data.data_points.len(), s_data.into_new_boxed()) } } @@ -148,3 +114,24 @@ where } } } + +impl InitAggregationData for LastValue +where + T: Number, +{ + type Aggr = data::Gauge; + + fn create_new(&self, time: AggregateTime) -> Self::Aggr { + data::Gauge { + data_points: vec![], + start_time: Some(time.start), + time: time.current, + } + } + + fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime) { + existing.data_points.clear(); + existing.start_time = Some(time.start); + existing.time = time.current; + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index b2f478e078..c1a5700f77 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -3,7 +3,9 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, SumDataPoint}; use crate::metrics::Temporality; -use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter}; +use super::aggregate::{ + AggregateTime, AggregateTimeInitiator, AggregationData, AttributeSetFilter, InitAggregationData, +}; use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; use super::{ComputeAggregation, Measure}; use std::{collections::HashMap, sync::Mutex}; @@ -34,29 +36,8 @@ impl PrecomputedSum { } } - pub(crate) fn delta( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { - let time = self.init_time.delta(); - - let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if s_data.is_none() { - Some(data::Sum { - data_points: vec![], - start_time: time.start, - time: time.current, - temporality: Temporality::Delta, - is_monotonic: self.monotonic, - }) - } else { - None - }; - let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.start_time = time.start; - s_data.time = time.current; - s_data.temporality = Temporality::Delta; - s_data.is_monotonic = self.monotonic; + fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + let mut s_data = AggregationData::init(self, dest, self.init_time.delta()); let mut reported = match self.reported.lock() { Ok(r) => r, @@ -79,35 +60,14 @@ impl PrecomputedSum { *reported = new_reported; drop(reported); // drop before values guard is dropped - ( - s_data.data_points.len(), - new_agg.map(|a| Box::new(a) as Box<_>), - ) + (s_data.data_points.len(), s_data.into_new_boxed()) } - pub(crate) fn cumulative( + fn cumulative( &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = self.init_time.cumulative(); - - let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if s_data.is_none() { - Some(data::Sum { - data_points: vec![], - start_time: time.start, - time: time.current, - temporality: Temporality::Cumulative, - is_monotonic: self.monotonic, - }) - } else { - None - }; - let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.start_time = time.start; - s_data.time = time.current; - s_data.temporality = Temporality::Cumulative; - s_data.is_monotonic = self.monotonic; + let mut s_data = AggregationData::init(self, dest, self.init_time.cumulative()); self.value_map .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint { @@ -116,10 +76,7 @@ impl PrecomputedSum { exemplars: vec![], }); - ( - s_data.data_points.len(), - new_agg.map(|a| Box::new(a) as Box<_>), - ) + (s_data.data_points.len(), s_data.into_new_boxed()) } } @@ -145,3 +102,28 @@ where } } } + +impl InitAggregationData for PrecomputedSum +where + T: Number, +{ + type Aggr = data::Sum; + + fn create_new(&self, time: AggregateTime) -> Self::Aggr { + data::Sum { + data_points: vec![], + start_time: time.start, + time: time.current, + temporality: self.temporality, + is_monotonic: self.monotonic, + } + } + + fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime) { + existing.data_points.clear(); + existing.start_time = time.start; + existing.time = time.current; + existing.temporality = self.temporality; + existing.is_monotonic = self.monotonic; + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 24b656dc1f..191b4d7822 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -2,7 +2,9 @@ use crate::metrics::data::{self, Aggregation, SumDataPoint}; use crate::metrics::Temporality; use opentelemetry::KeyValue; -use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter}; +use super::aggregate::{ + AggregateTime, AggregateTimeInitiator, AggregationData, AttributeSetFilter, InitAggregationData, +}; use super::{Aggregator, AtomicTracker, ComputeAggregation, Measure, Number}; use super::{AtomicallyUpdate, ValueMap}; @@ -66,28 +68,8 @@ impl Sum { } } - pub(crate) fn delta( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { - let time = self.init_time.delta(); - let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if s_data.is_none() { - Some(data::Sum { - data_points: vec![], - start_time: time.start, - time: time.current, - temporality: Temporality::Delta, - is_monotonic: self.monotonic, - }) - } else { - None - }; - let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.start_time = time.start; - s_data.time = time.current; - s_data.temporality = Temporality::Delta; - s_data.is_monotonic = self.monotonic; + fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + let mut s_data = AggregationData::init(self, dest, self.init_time.delta()); self.value_map .collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint { @@ -96,35 +78,14 @@ impl Sum { exemplars: vec![], }); - ( - s_data.data_points.len(), - new_agg.map(|a| Box::new(a) as Box<_>), - ) + (s_data.data_points.len(), s_data.into_new_boxed()) } - pub(crate) fn cumulative( + fn cumulative( &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = self.init_time.cumulative(); - let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if s_data.is_none() { - Some(data::Sum { - data_points: vec![], - start_time: time.start, - time: time.current, - temporality: Temporality::Cumulative, - is_monotonic: self.monotonic, - }) - } else { - None - }; - let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - - s_data.start_time = time.start; - s_data.time = time.current; - s_data.temporality = Temporality::Cumulative; - s_data.is_monotonic = self.monotonic; + let mut s_data = AggregationData::init(self, dest, self.init_time.cumulative()); self.value_map .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint { @@ -133,10 +94,7 @@ impl Sum { exemplars: vec![], }); - ( - s_data.data_points.len(), - new_agg.map(|a| Box::new(a) as Box<_>), - ) + (s_data.data_points.len(), s_data.into_new_boxed()) } } @@ -162,3 +120,28 @@ where } } } + +impl InitAggregationData for Sum +where + T: Number, +{ + type Aggr = data::Sum; + + fn create_new(&self, time: AggregateTime) -> Self::Aggr { + data::Sum { + data_points: vec![], + start_time: time.start, + time: time.current, + temporality: self.temporality, + is_monotonic: self.monotonic, + } + } + + fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime) { + existing.data_points.clear(); + existing.start_time = time.start; + existing.time = time.current; + existing.temporality = self.temporality; + existing.is_monotonic = self.monotonic; + } +} diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 2242d48eea..ac9cad018b 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -23,11 +23,8 @@ mod throughput; struct MockLogExporter; impl LogExporter for MockLogExporter { - fn export( - &self, - _batch: LogBatch<'_>, - ) -> impl std::future::Future> + Send { - async { Ok(()) } + async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> { + Ok(()) } }