Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(query): refactor truncate metrics #12634

Merged
merged 6 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 193 additions & 4 deletions src/common/metrics/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::mem::ManuallyDrop;
use std::sync::Arc;
use std::sync::Once;

Expand All @@ -21,12 +22,24 @@ use metrics::decrement_gauge;
use metrics::gauge;
use metrics::histogram;
use metrics::increment_gauge;
use metrics::Counter;
use metrics::CounterFn;
use metrics::Gauge;
use metrics::GaugeFn;
use metrics::Histogram;
use metrics::HistogramFn;
use metrics::Key;
use metrics::KeyName;
use metrics::Recorder;
use metrics::SharedString;
use metrics::Unit;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_exporter_prometheus::PrometheusHandle;
use metrics_exporter_prometheus::PrometheusRecorder;
use once_cell::sync::Lazy;
use parking_lot::RwLock;

static PROMETHEUS_HANDLE: Lazy<Arc<RwLock<Option<PrometheusHandle>>>> =
static PROMETHEUS_HANDLE: Lazy<Arc<RwLock<Option<ClearableRecorder>>>> =
Lazy::new(|| Arc::new(RwLock::new(None)));

pub const LABEL_KEY_TENANT: &str = "tenant";
Expand Down Expand Up @@ -107,9 +120,9 @@ pub fn init_default_metrics_recorder() {

/// Init prometheus recorder.
fn init_prometheus_recorder() {
let recorder = PrometheusBuilder::new().build_recorder();
let recorder = ClearableRecorder::create();
let mut h = PROMETHEUS_HANDLE.as_ref().write();
*h = Some(recorder.handle());
*h = Some(recorder.clone());
unsafe {
metrics::clear_recorder();
}
Expand All @@ -120,5 +133,181 @@ fn init_prometheus_recorder() {
}

pub fn try_handle() -> Option<PrometheusHandle> {
PROMETHEUS_HANDLE.as_ref().read().clone()
let read_guard = PROMETHEUS_HANDLE.as_ref().read();
read_guard.as_ref().map(ClearableRecorder::handle)
}

pub fn try_get_record() -> Option<ClearableRecorder> {
let read_guard = PROMETHEUS_HANDLE.as_ref().read();
read_guard.as_ref().cloned()
}

struct CounterFnWrap<Holder> {
pub counter: ManuallyDrop<Counter>,
pub holder: ManuallyDrop<Arc<Holder>>,
}

impl<Holder> Drop for CounterFnWrap<Holder> {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.counter);
ManuallyDrop::drop(&mut self.holder);
}
}
}

impl<Holder> CounterFn for CounterFnWrap<Holder> {
fn increment(&self, value: u64) {
self.counter.increment(value)
}

fn absolute(&self, value: u64) {
self.counter.absolute(value)
}
}

struct GaugeFnWrap<Holder> {
pub gauge: ManuallyDrop<Gauge>,
pub holder: ManuallyDrop<Arc<Holder>>,
}

impl<Holder> Drop for GaugeFnWrap<Holder> {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.gauge);
ManuallyDrop::drop(&mut self.holder);
}
}
}

impl<Holder> GaugeFn for GaugeFnWrap<Holder> {
fn increment(&self, value: f64) {
self.gauge.increment(value)
}

fn decrement(&self, value: f64) {
self.gauge.decrement(value)
}

fn set(&self, value: f64) {
self.gauge.set(value)
}
}

struct HistogramFnWrap<Holder> {
pub histogram: std::mem::ManuallyDrop<Histogram>,
pub holder: ManuallyDrop<Arc<Holder>>,
}

impl<Holder> Drop for HistogramFnWrap<Holder> {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.histogram);
ManuallyDrop::drop(&mut self.holder);
}
}
}

impl<Holder> HistogramFn for HistogramFnWrap<Holder> {
fn record(&self, value: f64) {
self.histogram.record(value)
}
}

// It will be ensured that the recorder will be destroyed after all counters, gauge, histogram are destroyed
struct ArcRecorder<T: Recorder> {
pub inner: Arc<T>,
}

impl<T: Recorder + Send + Sync + 'static> Recorder for ArcRecorder<T> {
#[inline]
fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.describe_counter(key, unit, description)
}

#[inline]
fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.describe_gauge(key, unit, description)
}

#[inline]
fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.describe_histogram(key, unit, description)
}

fn register_counter(&self, key: &Key) -> Counter {
Counter::from_arc(Arc::new(CounterFnWrap {
counter: ManuallyDrop::new(self.inner.register_counter(key)),
holder: ManuallyDrop::new(self.inner.clone()),
}))
}

fn register_gauge(&self, key: &Key) -> Gauge {
Gauge::from_arc(Arc::new(GaugeFnWrap {
gauge: ManuallyDrop::new(self.inner.register_gauge(key)),
holder: ManuallyDrop::new(self.inner.clone()),
}))
}

fn register_histogram(&self, key: &Key) -> Histogram {
Histogram::from_arc(Arc::new(HistogramFnWrap {
histogram: ManuallyDrop::new(self.inner.register_histogram(key)),
holder: ManuallyDrop::new(self.inner.clone()),
}))
}
}

// TODO: use atomic refactor rwlock
#[derive(Clone)]
pub struct ClearableRecorder {
inner: Arc<RwLock<ArcRecorder<PrometheusRecorder>>>,
}

impl ClearableRecorder {
pub fn create() -> ClearableRecorder {
let recorder = PrometheusBuilder::new().build_recorder();
ClearableRecorder {
inner: Arc::new(RwLock::new(ArcRecorder {
inner: Arc::new(recorder),
})),
}
}

pub fn clear(&self) {
let mut inner = self.inner.write();
let recorder = PrometheusBuilder::new().build_recorder();
*inner = ArcRecorder {
inner: Arc::new(recorder),
};
}

pub fn handle(&self) -> PrometheusHandle {
self.inner.read().inner.handle()
}
}

impl Recorder for ClearableRecorder {
fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.read().describe_counter(key, unit, description)
}

fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.read().describe_gauge(key, unit, description)
}

fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
self.inner.read().describe_histogram(key, unit, description)
}

fn register_counter(&self, key: &Key) -> Counter {
self.inner.read().register_counter(key)
}

fn register_gauge(&self, key: &Key) -> Gauge {
self.inner.read().register_gauge(key)
}

fn register_histogram(&self, key: &Key) -> Histogram {
self.inner.read().register_histogram(key)
}
}
15 changes: 5 additions & 10 deletions src/common/metrics/src/reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,14 @@
// limitations under the License.

use common_exception::Result;
use metrics::gauge;
use metrics_exporter_prometheus::PrometheusHandle;

use crate::dump_metric_samples;
use crate::MetricValue;
use crate::recorder::try_get_record;

/// Reset gauge metrics to 0.
pub fn reset_metrics(handle: PrometheusHandle) -> Result<()> {
let samples = dump_metric_samples(handle)?;
for sample in samples {
if let MetricValue::Gauge(_) = sample.value {
gauge!(sample.name, 0_f64);
}
pub fn reset_metrics() -> Result<()> {
if let Some(recorder) = try_get_record() {
recorder.clear();
}

Ok(())
}
6 changes: 1 addition & 5 deletions src/query/storages/system/src/metrics_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ impl SyncSystemTable for MetricsTable {
}

fn truncate(&self, _ctx: Arc<dyn TableContext>) -> Result<()> {
let prometheus_handle = common_metrics::try_handle().ok_or_else(|| {
ErrorCode::InitPrometheusFailure("Prometheus recorder is not initialized yet.")
})?;

reset_metrics(prometheus_handle)?;
reset_metrics()?;
Ok(())
}
}
Expand Down
53 changes: 0 additions & 53 deletions tests/sqllogictests/suites/base/03_common/03_0025_delete_from
Original file line number Diff line number Diff line change
Expand Up @@ -196,31 +196,15 @@ insert into t values(2);
statement ok
insert into t values(3);

# clear metrics
statement ok
truncate table system.metrics;

statement ok
delete from t where c = 2;

# expects 2 blocks pruned: the blocks of value 1 and 3
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_nums';
----
2.0

query I
select * from t order by c;
----
1
3

# expects 1 whole block deletion: the block of value 2
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_whole_block_nums';
----
1.0

# case: 3 blocks

statement ok
Expand All @@ -235,25 +219,9 @@ insert into t values(3), (5);
statement ok
insert into t values(6), (7);

# clear metrics
statement ok
truncate table system.metrics;

statement ok
delete from t where c > 0 and c < 4;

# expects 1 blocks pruned: the block of value {6..7}
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_nums';
----
1.0

# expects 1 whole block deletion: the block of value {1..3}
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_whole_block_nums';
----
1.0

query I
select * from t order by c;
----
Expand Down Expand Up @@ -287,24 +255,9 @@ insert into t values(4),(5),(6);
statement ok
insert into t values(7),(8),(9);

#clear metrics
statement ok
truncate table system.metrics;

statement ok
delete from t where c > 3 and c < 8;

# expects 1 block pruned and
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_nums';
----
1.0

query I
select value from system.metrics where metric = 'fuse_deletion_segment_range_pruned_whole_segment_nums';
----
1.0

query I
select * from t order by c;
----
Expand All @@ -314,12 +267,6 @@ select * from t order by c;
8
9

# expects 1 whole block deletion: the block of value 2
query I
select value from system.metrics where metric = 'fuse_deletion_block_range_pruned_whole_block_nums';
----
1.0

statement ok
drop table t all

Expand Down
Loading