Skip to content

Commit

Permalink
graph: track raw entity number stats in BlockStateMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
MoonBoi9001 committed Jan 22, 2025
1 parent bd0ab9a commit 1addbb0
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions graph/src/components/metrics/block_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct BlockStateMetrics {
pub op_counter: HashMap<CounterKey, u64>,
pub read_bytes_counter: HashMap<CounterKey, u64>,
pub write_bytes_counter: HashMap<CounterKey, u64>,
pub entity_count_changes: HashMap<CounterKey, u64>,
}

#[derive(Hash, PartialEq, Eq, Debug, Clone)]
Expand All @@ -44,6 +45,7 @@ impl BlockStateMetrics {
write_bytes_counter: HashMap::new(),
gas_counter: HashMap::new(),
op_counter: HashMap::new(),
entity_count_changes: HashMap::new(),
}
}

Expand All @@ -63,6 +65,10 @@ impl BlockStateMetrics {
for (key, value) in other.op_counter {
*self.op_counter.entry(key).or_insert(0) += value;
}

for (key, value) in other.entity_count_changes {
*self.entity_count_changes.entry(key).or_insert(0) = value;
}
}

fn serialize_to_csv<T: Serialize, U: Serialize, I: IntoIterator<Item = T>>(
Expand Down Expand Up @@ -97,6 +103,25 @@ impl BlockStateMetrics {
)
}

pub fn counter_to_csv_i32(
data: &HashMap<CounterKey, i32>,
column_names: Vec<&str>,
) -> Result<String> {
Self::serialize_to_csv(
data.iter().map(|(key, value)| match key {
CounterKey::Entity(typename, id) => {
vec![
typename.typename().to_string(),
id.to_string(),
value.to_string(),
]
}
CounterKey::String(key) => vec![key.to_string(), value.to_string()],
}),
column_names,
)
}

async fn write_csv_to_store(bucket: &str, path: &str, data: String) -> Result<()> {
let data_bytes = data.into_bytes();

Expand Down Expand Up @@ -158,6 +183,18 @@ impl BlockStateMetrics {
}
}

pub fn track_entity_count_change(&mut self, entity_type: &EntityType, change: i32) {
if ENV_VARS.enable_dips_metrics {
let key = CounterKey::Entity(entity_type.clone(), Id::new("total".to_string()));
let counter = self.entity_count_changes.entry(key).or_insert(0);
if change < 0 {
*counter = counter.saturating_sub((-change) as u64);
} else {
*counter = counter.saturating_add(change as u64);
}
}
}

pub fn flush_metrics_to_store(
&self,
logger: &Logger,
Expand All @@ -180,6 +217,7 @@ impl BlockStateMetrics {
let op_counter = self.op_counter.clone();
let read_bytes_counter = self.read_bytes_counter.clone();
let write_bytes_counter = self.write_bytes_counter.clone();
let entity_count_changes = self.entity_count_changes.clone();

// Spawn the async task
crate::spawn(async move {
Expand All @@ -203,6 +241,11 @@ impl BlockStateMetrics {
Self::counter_to_csv(&write_bytes_counter, vec!["entity", "id", "bytes"])
.unwrap(),
),
(
"entities",
Self::counter_to_csv(&entity_count_changes, vec!["entity", "id", "count"])
.unwrap(),
),
];

// Convert each metrics upload into a future
Expand Down

0 comments on commit 1addbb0

Please sign in to comment.