diff --git a/src/common/metrics/src/recorder.rs b/src/common/metrics/src/recorder.rs index 1279dd7c6114..8addaf0da91e 100644 --- a/src/common/metrics/src/recorder.rs +++ b/src/common/metrics/src/recorder.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::mem::ManuallyDrop; use std::sync::Arc; use std::sync::Once; @@ -21,12 +22,24 @@ use metrics::decrement_gauge; use metrics::gauge; use metrics::histogram; use metrics::increment_gauge; +use metrics::Counter; +use metrics::CounterFn; +use metrics::Gauge; +use metrics::GaugeFn; +use metrics::Histogram; +use metrics::HistogramFn; +use metrics::Key; +use metrics::KeyName; +use metrics::Recorder; +use metrics::SharedString; +use metrics::Unit; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_exporter_prometheus::PrometheusHandle; +use metrics_exporter_prometheus::PrometheusRecorder; use once_cell::sync::Lazy; use parking_lot::RwLock; -static PROMETHEUS_HANDLE: Lazy>>> = +static PROMETHEUS_HANDLE: Lazy>>> = Lazy::new(|| Arc::new(RwLock::new(None))); pub const LABEL_KEY_TENANT: &str = "tenant"; @@ -107,9 +120,9 @@ pub fn init_default_metrics_recorder() { /// Init prometheus recorder. fn init_prometheus_recorder() { - let recorder = PrometheusBuilder::new().build_recorder(); + let recorder = ClearableRecorder::create(); let mut h = PROMETHEUS_HANDLE.as_ref().write(); - *h = Some(recorder.handle()); + *h = Some(recorder.clone()); unsafe { metrics::clear_recorder(); } @@ -120,5 +133,181 @@ fn init_prometheus_recorder() { } pub fn try_handle() -> Option { - PROMETHEUS_HANDLE.as_ref().read().clone() + let read_guard = PROMETHEUS_HANDLE.as_ref().read(); + read_guard.as_ref().map(ClearableRecorder::handle) +} + +pub fn try_get_record() -> Option { + let read_guard = PROMETHEUS_HANDLE.as_ref().read(); + read_guard.as_ref().cloned() +} + +struct CounterFnWrap { + pub counter: ManuallyDrop, + pub holder: ManuallyDrop>, +} + +impl Drop for CounterFnWrap { + fn drop(&mut self) { + unsafe { + ManuallyDrop::drop(&mut self.counter); + ManuallyDrop::drop(&mut self.holder); + } + } +} + +impl CounterFn for CounterFnWrap { + fn increment(&self, value: u64) { + self.counter.increment(value) + } + + fn absolute(&self, value: u64) { + self.counter.absolute(value) + } +} + +struct GaugeFnWrap { + pub gauge: ManuallyDrop, + pub holder: ManuallyDrop>, +} + +impl Drop for GaugeFnWrap { + fn drop(&mut self) { + unsafe { + ManuallyDrop::drop(&mut self.gauge); + ManuallyDrop::drop(&mut self.holder); + } + } +} + +impl GaugeFn for GaugeFnWrap { + fn increment(&self, value: f64) { + self.gauge.increment(value) + } + + fn decrement(&self, value: f64) { + self.gauge.decrement(value) + } + + fn set(&self, value: f64) { + self.gauge.set(value) + } +} + +struct HistogramFnWrap { + pub histogram: std::mem::ManuallyDrop, + pub holder: ManuallyDrop>, +} + +impl Drop for HistogramFnWrap { + fn drop(&mut self) { + unsafe { + ManuallyDrop::drop(&mut self.histogram); + ManuallyDrop::drop(&mut self.holder); + } + } +} + +impl HistogramFn for HistogramFnWrap { + fn record(&self, value: f64) { + self.histogram.record(value) + } +} + +// It will be ensured that the recorder will be destroyed after all counters, gauge, histogram are destroyed +struct ArcRecorder { + pub inner: Arc, +} + +impl Recorder for ArcRecorder { + #[inline] + fn describe_counter(&self, key: KeyName, unit: Option, description: SharedString) { + self.inner.describe_counter(key, unit, description) + } + + #[inline] + fn describe_gauge(&self, key: KeyName, unit: Option, description: SharedString) { + self.inner.describe_gauge(key, unit, description) + } + + #[inline] + fn describe_histogram(&self, key: KeyName, unit: Option, description: SharedString) { + self.inner.describe_histogram(key, unit, description) + } + + fn register_counter(&self, key: &Key) -> Counter { + Counter::from_arc(Arc::new(CounterFnWrap { + counter: ManuallyDrop::new(self.inner.register_counter(key)), + holder: ManuallyDrop::new(self.inner.clone()), + })) + } + + fn register_gauge(&self, key: &Key) -> Gauge { + Gauge::from_arc(Arc::new(GaugeFnWrap { + gauge: ManuallyDrop::new(self.inner.register_gauge(key)), + holder: ManuallyDrop::new(self.inner.clone()), + })) + } + + fn register_histogram(&self, key: &Key) -> Histogram { + Histogram::from_arc(Arc::new(HistogramFnWrap { + histogram: ManuallyDrop::new(self.inner.register_histogram(key)), + holder: ManuallyDrop::new(self.inner.clone()), + })) + } +} + +// TODO: use atomic refactor rwlock +#[derive(Clone)] +pub struct ClearableRecorder { + inner: Arc>>, +} + +impl ClearableRecorder { + pub fn create() -> ClearableRecorder { + let recorder = PrometheusBuilder::new().build_recorder(); + ClearableRecorder { + inner: Arc::new(RwLock::new(ArcRecorder { + inner: Arc::new(recorder), + })), + } + } + + pub fn clear(&self) { + let mut inner = self.inner.write(); + let recorder = PrometheusBuilder::new().build_recorder(); + *inner = ArcRecorder { + inner: Arc::new(recorder), + }; + } + + pub fn handle(&self) -> PrometheusHandle { + self.inner.read().inner.handle() + } +} + +impl Recorder for ClearableRecorder { + fn describe_counter(&self, key: KeyName, unit: Option, description: SharedString) { + self.inner.read().describe_counter(key, unit, description) + } + + fn describe_gauge(&self, key: KeyName, unit: Option, description: SharedString) { + self.inner.read().describe_gauge(key, unit, description) + } + + fn describe_histogram(&self, key: KeyName, unit: Option, description: SharedString) { + self.inner.read().describe_histogram(key, unit, description) + } + + fn register_counter(&self, key: &Key) -> Counter { + self.inner.read().register_counter(key) + } + + fn register_gauge(&self, key: &Key) -> Gauge { + self.inner.read().register_gauge(key) + } + + fn register_histogram(&self, key: &Key) -> Histogram { + self.inner.read().register_histogram(key) + } } diff --git a/src/common/metrics/src/reset.rs b/src/common/metrics/src/reset.rs index a19ebd26ef4a..a81804819852 100644 --- a/src/common/metrics/src/reset.rs +++ b/src/common/metrics/src/reset.rs @@ -13,19 +13,14 @@ // limitations under the License. use common_exception::Result; -use metrics::gauge; -use metrics_exporter_prometheus::PrometheusHandle; -use crate::dump_metric_samples; -use crate::MetricValue; +use crate::recorder::try_get_record; /// Reset gauge metrics to 0. -pub fn reset_metrics(handle: PrometheusHandle) -> Result<()> { - let samples = dump_metric_samples(handle)?; - for sample in samples { - if let MetricValue::Gauge(_) = sample.value { - gauge!(sample.name, 0_f64); - } +pub fn reset_metrics() -> Result<()> { + if let Some(recorder) = try_get_record() { + recorder.clear(); } + Ok(()) } diff --git a/src/query/storages/system/src/metrics_table.rs b/src/query/storages/system/src/metrics_table.rs index ab398f8f83fa..a70750ca88c7 100644 --- a/src/query/storages/system/src/metrics_table.rs +++ b/src/query/storages/system/src/metrics_table.rs @@ -83,11 +83,7 @@ impl SyncSystemTable for MetricsTable { } fn truncate(&self, _ctx: Arc) -> Result<()> { - let prometheus_handle = common_metrics::try_handle().ok_or_else(|| { - ErrorCode::InitPrometheusFailure("Prometheus recorder is not initialized yet.") - })?; - - reset_metrics(prometheus_handle)?; + reset_metrics()?; Ok(()) } } diff --git a/tests/sqllogictests/suites/base/03_common/03_0025_delete_from b/tests/sqllogictests/suites/base/03_common/03_0025_delete_from index 567f84d7ca06..cd87a75e77e8 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0025_delete_from +++ b/tests/sqllogictests/suites/base/03_common/03_0025_delete_from @@ -196,31 +196,15 @@ insert into t values(2); statement ok insert into t values(3); -# clear metrics -statement ok -truncate table system.metrics; - statement ok delete from t where c = 2; -# expects 2 blocks pruned: the blocks of value 1 and 3 -query I -select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_nums'; ----- -2.0 - query I select * from t order by c; ---- 1 3 -# expects 1 whole block deletion: the block of value 2 -query I -select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_whole_block_nums'; ----- -1.0 - # case: 3 blocks statement ok @@ -235,25 +219,9 @@ insert into t values(3), (5); statement ok insert into t values(6), (7); -# clear metrics -statement ok -truncate table system.metrics; - statement ok delete from t where c > 0 and c < 4; -# expects 1 blocks pruned: the block of value {6..7} -query I -select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_nums'; ----- -1.0 - -# expects 1 whole block deletion: the block of value {1..3} -query I -select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_whole_block_nums'; ----- -1.0 - query I select * from t order by c; ---- @@ -287,24 +255,9 @@ insert into t values(4),(5),(6); statement ok insert into t values(7),(8),(9); -#clear metrics -statement ok -truncate table system.metrics; - statement ok delete from t where c > 3 and c < 8; -# expects 1 block pruned and -query I -select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_nums'; ----- -1.0 - -query I -select value from system.metrics where metric = 'fuse_deletion_segment_range_pruned_whole_segment_nums'; ----- -1.0 - query I select * from t order by c; ---- @@ -314,12 +267,6 @@ select * from t order by c; 8 9 -# expects 1 whole block deletion: the block of value 2 -query I -select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_whole_block_nums'; ----- -1.0 - statement ok drop table t all