Skip to content

Commit

Permalink
chore(query): refactor truncate metrics (databendlabs#12634)
Browse files Browse the repository at this point in the history
* chore(query): refactor truncate metrics

* chore(query): refactor truncate metrics
  • Loading branch information
zhang2014 authored and andylokandy committed Nov 27, 2023
1 parent f6578f3 commit 070f94f
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 72 deletions.
197 changes: 193 additions & 4 deletions src/common/metrics/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::mem::ManuallyDrop;
use std::sync::Arc;
use std::sync::Once;

Expand All @@ -21,12 +22,24 @@ use metrics::decrement_gauge;
use metrics::gauge;
use metrics::histogram;
use metrics::increment_gauge;
use metrics::Counter;
use metrics::CounterFn;
use metrics::Gauge;
use metrics::GaugeFn;
use metrics::Histogram;
use metrics::HistogramFn;
use metrics::Key;
use metrics::KeyName;
use metrics::Recorder;
use metrics::SharedString;
use metrics::Unit;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_exporter_prometheus::PrometheusHandle;
use metrics_exporter_prometheus::PrometheusRecorder;
use once_cell::sync::Lazy;
use parking_lot::RwLock;

static PROMETHEUS_HANDLE: Lazy<Arc<RwLock<Option<PrometheusHandle>>>> =
static PROMETHEUS_HANDLE: Lazy<Arc<RwLock<Option<ClearableRecorder>>>> =
Lazy::new(|| Arc::new(RwLock::new(None)));

pub const LABEL_KEY_TENANT: &str = "tenant";
Expand Down Expand Up @@ -107,9 +120,9 @@ pub fn init_default_metrics_recorder() {

/// Init prometheus recorder.
fn init_prometheus_recorder() {
let recorder = PrometheusBuilder::new().build_recorder();
let recorder = ClearableRecorder::create();
let mut h = PROMETHEUS_HANDLE.as_ref().write();
*h = Some(recorder.handle());
*h = Some(recorder.clone());
unsafe {
metrics::clear_recorder();
}
Expand All @@ -120,5 +133,181 @@ fn init_prometheus_recorder() {
}

pub fn try_handle() -> Option<PrometheusHandle> {
PROMETHEUS_HANDLE.as_ref().read().clone()
let read_guard = PROMETHEUS_HANDLE.as_ref().read();
read_guard.as_ref().map(ClearableRecorder::handle)
}

pub fn try_get_record() -> Option<ClearableRecorder> {
let read_guard = PROMETHEUS_HANDLE.as_ref().read();
read_guard.as_ref().cloned()
}

struct CounterFnWrap<Holder> {
pub counter: ManuallyDrop<Counter>,
pub holder: ManuallyDrop<Arc<Holder>>,
}

impl<Holder> Drop for CounterFnWrap<Holder> {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.counter);
ManuallyDrop::drop(&mut self.holder);
}
}
}

impl<Holder> CounterFn for CounterFnWrap<Holder> {
fn increment(&self, value: u64) {
self.counter.increment(value)
}

fn absolute(&self, value: u64) {
self.counter.absolute(value)
}
}

struct GaugeFnWrap<Holder> {
pub gauge: ManuallyDrop<Gauge>,
pub holder: ManuallyDrop<Arc<Holder>>,
}

impl<Holder> Drop for GaugeFnWrap<Holder> {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.gauge);
ManuallyDrop::drop(&mut self.holder);
}
}
}

impl<Holder> GaugeFn for GaugeFnWrap<Holder> {
fn increment(&self, value: f64) {
self.gauge.increment(value)
}

fn decrement(&self, value: f64) {
self.gauge.decrement(value)
}

fn set(&self, value: f64) {
self.gauge.set(value)
}
}

struct HistogramFnWrap<Holder> {
pub histogram: std::mem::ManuallyDrop<Histogram>,
pub holder: ManuallyDrop<Arc<Holder>>,
}

impl<Holder> Drop for HistogramFnWrap<Holder> {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.histogram);
ManuallyDrop::drop(&mut self.holder);
}
}
}

impl<Holder> HistogramFn for HistogramFnWrap<Holder> {
fn record(&self, value: f64) {
self.histogram.record(value)
}
}

// It will be ensured that the recorder will be destroyed after all counters, gauge, histogram are destroyed
struct ArcRecorder<T: Recorder> {
pub inner: Arc<T>,
}

impl<T: Recorder + Send + Sync + 'static> Recorder for ArcRecorder<T> {
#[inline]
fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.describe_counter(key, unit, description)
}

#[inline]
fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.describe_gauge(key, unit, description)
}

#[inline]
fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.describe_histogram(key, unit, description)
}

fn register_counter(&self, key: &Key) -> Counter {
Counter::from_arc(Arc::new(CounterFnWrap {
counter: ManuallyDrop::new(self.inner.register_counter(key)),
holder: ManuallyDrop::new(self.inner.clone()),
}))
}

fn register_gauge(&self, key: &Key) -> Gauge {
Gauge::from_arc(Arc::new(GaugeFnWrap {
gauge: ManuallyDrop::new(self.inner.register_gauge(key)),
holder: ManuallyDrop::new(self.inner.clone()),
}))
}

fn register_histogram(&self, key: &Key) -> Histogram {
Histogram::from_arc(Arc::new(HistogramFnWrap {
histogram: ManuallyDrop::new(self.inner.register_histogram(key)),
holder: ManuallyDrop::new(self.inner.clone()),
}))
}
}

// TODO: use atomic refactor rwlock
#[derive(Clone)]
pub struct ClearableRecorder {
inner: Arc<RwLock<ArcRecorder<PrometheusRecorder>>>,
}

impl ClearableRecorder {
pub fn create() -> ClearableRecorder {
let recorder = PrometheusBuilder::new().build_recorder();
ClearableRecorder {
inner: Arc::new(RwLock::new(ArcRecorder {
inner: Arc::new(recorder),
})),
}
}

pub fn clear(&self) {
let mut inner = self.inner.write();
let recorder = PrometheusBuilder::new().build_recorder();
*inner = ArcRecorder {
inner: Arc::new(recorder),
};
}

pub fn handle(&self) -> PrometheusHandle {
self.inner.read().inner.handle()
}
}

impl Recorder for ClearableRecorder {
fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.read().describe_counter(key, unit, description)
}

fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.read().describe_gauge(key, unit, description)
}

fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.read().describe_histogram(key, unit, description)
}

fn register_counter(&self, key: &Key) -> Counter {
self.inner.read().register_counter(key)
}

fn register_gauge(&self, key: &Key) -> Gauge {
self.inner.read().register_gauge(key)
}

fn register_histogram(&self, key: &Key) -> Histogram {
self.inner.read().register_histogram(key)
}
}
15 changes: 5 additions & 10 deletions src/common/metrics/src/reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,14 @@
// limitations under the License.

use common_exception::Result;
use metrics::gauge;
use metrics_exporter_prometheus::PrometheusHandle;

use crate::dump_metric_samples;
use crate::MetricValue;
use crate::recorder::try_get_record;

/// Reset gauge metrics to 0.
pub fn reset_metrics(handle: PrometheusHandle) -> Result<()> {
let samples = dump_metric_samples(handle)?;
for sample in samples {
if let MetricValue::Gauge(_) = sample.value {
gauge!(sample.name, 0_f64);
}
pub fn reset_metrics() -> Result<()> {
if let Some(recorder) = try_get_record() {
recorder.clear();
}

Ok(())
}
6 changes: 1 addition & 5 deletions src/query/storages/system/src/metrics_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ impl SyncSystemTable for MetricsTable {
}

fn truncate(&self, _ctx: Arc<dyn TableContext>) -> Result<()> {
let prometheus_handle = common_metrics::try_handle().ok_or_else(|| {
ErrorCode::InitPrometheusFailure("Prometheus recorder is not initialized yet.")
})?;

reset_metrics(prometheus_handle)?;
reset_metrics()?;
Ok(())
}
}
Expand Down
53 changes: 0 additions & 53 deletions tests/sqllogictests/suites/base/03_common/03_0025_delete_from
Original file line number Diff line number Diff line change
Expand Up @@ -196,31 +196,15 @@ insert into t values(2);
statement ok
insert into t values(3);

# clear metrics
statement ok
truncate table system.metrics;

statement ok
delete from t where c = 2;

# expects 2 blocks pruned: the blocks of value 1 and 3
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_nums';
----
2.0

query I
select * from t order by c;
----
1
3

# expects 1 whole block deletion: the block of value 2
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_whole_block_nums';
----
1.0

# case: 3 blocks

statement ok
Expand All @@ -235,25 +219,9 @@ insert into t values(3), (5);
statement ok
insert into t values(6), (7);

# clear metrics
statement ok
truncate table system.metrics;

statement ok
delete from t where c > 0 and c < 4;

# expects 1 blocks pruned: the block of value {6..7}
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_nums';
----
1.0

# expects 1 whole block deletion: the block of value {1..3}
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_whole_block_nums';
----
1.0

query I
select * from t order by c;
----
Expand Down Expand Up @@ -287,24 +255,9 @@ insert into t values(4),(5),(6);
statement ok
insert into t values(7),(8),(9);

#clear metrics
statement ok
truncate table system.metrics;

statement ok
delete from t where c > 3 and c < 8;

# expects 1 block pruned and
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_nums';
----
1.0

query I
select value from system.metrics where metric = 'fuse_deletion_segment_range_pruned_whole_segment_nums';
----
1.0

query I
select * from t order by c;
----
Expand All @@ -314,12 +267,6 @@ select * from t order by c;
8
9

# expects 1 whole block deletion: the block of value 2
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_whole_block_nums';
----
1.0

statement ok
drop table t all

Expand Down

0 comments on commit 070f94f

Please sign in to comment.