Skip to content

Commit

Permalink
Merge branch 'leon/qs_metrics_for_debug' into 'master'
Browse files Browse the repository at this point in the history
Increase Oberservability for QueryStats

This MR adds a number of metrics into the QueryStats feature, that are hopefully providing some insights into how the data flows through the different components 

See merge request dfinity-lab/public/ic!19266
  • Loading branch information
Sawchord committed May 15, 2024
2 parents 8775228 + 44e5ca1 commit 5ba1412
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 56 deletions.
1 change: 1 addition & 0 deletions rs/query_stats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl QueryStatsCollector {
.or_default()
.saturating_accumulate(stats);

self.metrics.query_stats_collector.add(stats);
self.metrics
.query_stats_collector_num_canister_ids
.set(state.len() as i64);
Expand Down
100 changes: 70 additions & 30 deletions rs/query_stats/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,71 @@
use ic_metrics::{buckets::decimal_buckets, MetricsRegistry};
use ic_types::batch::QueryStats;
use prometheus::{HistogramVec, IntCounter, IntGauge};

pub(crate) const CRITICAL_ERROR_AGGREGATION_FAILURE: &str = "query_stats_aggregator_failure";

/// A set of the statistics reported by this feature
///
/// Occasionally, we want to export the metrics which contain the statistics reported
/// by various components. This struct is a helper to make this reporting more concise
#[derive(Clone, Debug)]
pub(crate) struct QueryStatsMetricsSet {
num_calls: IntGauge,
num_instructions: IntGauge,
request_bytes: IntGauge,
response_bytes: IntGauge,
}

impl QueryStatsMetricsSet {
pub fn new(metrics_registry: &MetricsRegistry, name: &str) -> Self {
Self {
num_calls: metrics_registry.int_gauge(
format!("query_stats_{}_num_calls", name),
"Sum of calls".to_string(),
),
num_instructions: metrics_registry.int_gauge(
format!("query_stats_{}_num_instructions", name),
"Sum of instructions".to_string(),
),
request_bytes: metrics_registry.int_gauge(
format!("query_stats_{}_request_bytes", name),
"Sum of request bytes".to_string(),
),
response_bytes: metrics_registry.int_gauge(
format!("query_stats_{}_response_bytes", name),
"Sum of response bytes".to_string(),
),
}
}

pub fn add(&self, query_stats: &QueryStats) {
self.num_calls.add(query_stats.num_calls as i64);
self.num_instructions
.add(query_stats.num_instructions as i64);
self.request_bytes
.add(query_stats.ingress_payload_size as i64);
self.response_bytes
.add(query_stats.egress_payload_size as i64);
}
}

/// Metrics for query stats collector
///
/// The collector is responsible for locally collecting statistics for
/// each query executed. It is not part of the replicated state machine.
pub(crate) struct CollectorMetrics {
/// The statistics as currently reported by the collector
pub query_stats_collector: QueryStatsMetricsSet,
/// The number of canister IDs registered in the collector for the current epoch.
pub(crate) query_stats_collector_num_canister_ids: IntGauge,
pub query_stats_collector_num_canister_ids: IntGauge,
/// The epoch for which query calls are locally collected at the moment.
pub(crate) query_stats_collector_current_epoch: IntGauge,
pub query_stats_collector_current_epoch: IntGauge,
}

impl CollectorMetrics {
pub(crate) fn new(metrics_registry: &MetricsRegistry) -> Self {
Self {
query_stats_collector: QueryStatsMetricsSet::new(metrics_registry, "collector"),
query_stats_collector_num_canister_ids: metrics_registry.int_gauge(
"query_stats_collector_num_canister_ids",
"Current number of canister ids in the query stats collector",
Expand All @@ -37,6 +86,8 @@ impl CollectorMetrics {
pub(crate) struct QueryStatsPayloadBuilderMetrics {
/// Records the time it took to perform an operation
pub(crate) query_stats_payload_builder_duration: HistogramVec,
/// Records the statistics received from the collector
pub(crate) query_stats_payload_builder_current: QueryStatsMetricsSet,
/// The current epoch as seen by the payload builder.
///
/// Should be slightly behind the current epoch of [`CollectorMetrics`]
Expand All @@ -57,6 +108,10 @@ impl QueryStatsPayloadBuilderMetrics {
decimal_buckets(-4, 0),
&["operation"],
),
query_stats_payload_builder_current: QueryStatsMetricsSet::new(
metrics_registry,
"payload_builder_current",
),
query_stats_payload_builder_current_epoch: metrics_registry.int_gauge(
"query_stats_payload_builder_current_epoch",
"The current epoch as seen by the payload builder",
Expand All @@ -73,29 +128,29 @@ impl QueryStatsPayloadBuilderMetrics {
///
/// The query stats aggregator runs as part of the replicated state machine.
/// It deterministically aggregates query stats received from consensus blocks.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct QueryStatsAggregatorMetrics {
/// Sum of stats received from the payload builder
pub(crate) query_stats_received: QueryStatsMetricsSet,
/// The epoch for which we currently aggregate query stats.
/// This is lower than the epoch for which we collect stats, as there is
/// a delay for propagating local query stats via consensus blocks.
pub query_stats_aggregator_current_epoch: IntGauge,
pub(crate) query_stats_aggregator_current_epoch: IntGauge,
/// The number of records stored in the unaggregateed state
pub query_stats_aggregator_num_records: IntGauge,
/// Sum of delivered call statistics
pub query_stats_delivered_num_calls: IntGauge,
/// Sum of delivered instruction statistics
pub query_stats_delivered_num_instructions: IntGauge,
/// Sum of delivered request bytes
pub query_stats_delivered_request_bytes: IntGauge,
/// Sum of delivered response bytes
pub query_stats_delivered_response_bytes: IntGauge,
pub(crate) query_stats_aggregator_num_records: IntGauge,
/// Sum of statistics delivered to the canisters
pub(crate) query_stats_delivered: QueryStatsMetricsSet,
/// Critical error occuring in aggregator
pub query_stats_critical_error_aggregator_failure: IntCounter,
pub(crate) query_stats_critical_error_aggregator_failure: IntCounter,
}

impl QueryStatsAggregatorMetrics {
pub fn new(metrics_registry: &MetricsRegistry) -> Self {
Self {
query_stats_received: QueryStatsMetricsSet::new(
metrics_registry,
"aggregator_received",
),
query_stats_aggregator_current_epoch: metrics_registry.int_gauge(
"query_stats_aggregator_current_epoch",
"Current epoch of the query stats aggregator",
Expand All @@ -104,22 +159,7 @@ impl QueryStatsAggregatorMetrics {
"query_stats_aggregator_num_records",
"The number of records stored in the unaggregateed state",
),
query_stats_delivered_num_calls: metrics_registry.int_gauge(
"query_stats_delivered_num_calls",
"Sum of delivered call statistics",
),
query_stats_delivered_num_instructions: metrics_registry.int_gauge(
"query_stats_delivered_num_instructions",
"Sum of delivered instruction statistics",
),
query_stats_delivered_request_bytes: metrics_registry.int_gauge(
"query_stats_delivered_request_bytes",
"Sum of delivered request bytes",
),
query_stats_delivered_response_bytes: metrics_registry.int_gauge(
"query_stats_delivered_response_bytes",
"Sum of delivered response bytes",
),
query_stats_delivered: QueryStatsMetricsSet::new(metrics_registry, "delivered"),
query_stats_critical_error_aggregator_failure: metrics_registry
.error_counter(CRITICAL_ERROR_AGGREGATION_FAILURE),
}
Expand Down
21 changes: 17 additions & 4 deletions rs/query_stats/src/payload_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use ic_logger::{error, warn, ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_replicated_state::ReplicatedState;
use ic_types::{
batch::{LocalQueryStats, QueryStatsPayload, ValidationContext},
batch::{LocalQueryStats, QueryStats, QueryStatsPayload, ValidationContext},
epoch_from_height, CanisterId, Height, NodeId, NumBytes, QueryStatsEpoch,
};
use std::{
Expand Down Expand Up @@ -80,11 +80,24 @@ impl BatchPayloadBuilder for QueryStatsPayloadBuilderImpl {
.start_timer();

match self.receiver.try_recv() {
Ok(new_epoch) => {
let Ok(mut epoch) = self.current_stats.write() else {
Ok(new_stats) => {
let Ok(mut current_stats) = self.current_stats.write() else {
return vec![];
};
*epoch = Some(new_epoch);
*current_stats = Some(new_stats);

// Update the metrics about the received metrics
if let Some(current_stats) = current_stats.as_ref() {
let mut report = QueryStats::default();
current_stats
.stats
.iter()
.for_each(|next_stats| report.saturating_accumulate(&next_stats.stats));

self.metrics
.query_stats_payload_builder_current
.add(&report);
};
}
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Disconnected) => {
Expand Down
32 changes: 10 additions & 22 deletions rs/query_stats/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ fn process_payload(
query_stats: &QueryStatsPayload,
state: &mut ReplicatedState,
logger: &ReplicaLogger,
metrics: &QueryStatsAggregatorMetrics,
) -> bool {
let state = &mut state.epoch_query_stats;

Expand Down Expand Up @@ -199,6 +200,7 @@ fn process_payload(
None => node.entry(query_stats.epoch).or_default(),
};

let mut received_stats = QueryStats::default();
for message in &query_stats.stats {
let previous_value = stats.insert(
message.canister_id,
Expand All @@ -209,6 +211,8 @@ fn process_payload(
egress_payload_size: message.stats.egress_payload_size,
},
);
received_stats.saturating_accumulate(&message.stats);

if previous_value.is_some() {
error!(
logger,
Expand All @@ -219,6 +223,7 @@ fn process_payload(
);
}
}
metrics.query_stats_received.add(&received_stats);

true
}
Expand Down Expand Up @@ -331,16 +336,10 @@ fn try_aggregate_one_epoch(
query_stats_to_be_applied.push((canister_id, aggregated_stats));
}

let mut delivered_num_calls = 0;
let mut delivered_num_instructions = 0;
let mut delivered_request_bytes = 0;
let mut delivered_response_bytes = 0;
let mut delivered_query_stats = QueryStats::default();

for (canister_id, aggregated_stats) in query_stats_to_be_applied {
delivered_num_calls += aggregated_stats.num_calls;
delivered_num_instructions += aggregated_stats.num_instructions;
delivered_request_bytes += aggregated_stats.ingress_payload_size;
delivered_response_bytes += aggregated_stats.egress_payload_size;
delivered_query_stats.saturating_accumulate(&aggregated_stats);

apply_query_stats_to_canister(
&aggregated_stats,
Expand All @@ -351,18 +350,7 @@ fn try_aggregate_one_epoch(
);
}

metrics
.query_stats_delivered_num_calls
.add(delivered_num_calls as i64);
metrics
.query_stats_delivered_num_instructions
.add(delivered_num_instructions as i64);
metrics
.query_stats_delivered_request_bytes
.add(delivered_request_bytes as i64);
metrics
.query_stats_delivered_response_bytes
.add(delivered_response_bytes as i64);
metrics.query_stats_delivered.add(&delivered_query_stats);

true
}
Expand Down Expand Up @@ -400,7 +388,7 @@ fn update_metrics(state: &ReplicatedState, metrics: &QueryStatsAggregatorMetrics
let num_records: usize = state
.stats
.values()
.map(|records| records.values().len())
.map(|epochs| epochs.values().map(|record| record.len()).sum::<usize>())
.sum();
metrics
.query_stats_aggregator_num_records
Expand All @@ -416,7 +404,7 @@ pub fn deliver_query_stats(
logger: &ReplicaLogger,
metrics: &QueryStatsAggregatorMetrics,
) {
if process_payload(query_stats, state, logger) {
if process_payload(query_stats, state, logger, metrics) {
// While in theory is is guaranteed that `try_aggregate_one_epoch` will eventually return
// `false`, the code is relatively complex and we don't want to rely on correct implementation
// only.
Expand Down

0 comments on commit 5ba1412

Please sign in to comment.