Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(p2p_service): add metrics for number of blocks requested over p2p req/res protocol #2135

Merged
merged 5 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added
- [2135](https://github.com/FuelLabs/fuel-core/pull/2135): Added metrics logging for number of blocks served over the p2p req/res protocol.


## [Version 0.35.0]

### Added
Expand Down
26 changes: 24 additions & 2 deletions crates/metrics/src/p2p_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use crate::global_registry;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::{
counter::Counter,
gauge::Gauge,
};
use std::sync::OnceLock;

pub struct P2PMetrics {
pub unique_peers: Counter,
pub blocks_requested: Gauge,
}

impl P2PMetrics {
fn new() -> Self {
let unique_peers = Counter::default();
let blocks_requested = Gauge::default();

let metrics = P2PMetrics { unique_peers };
let metrics = P2PMetrics {
unique_peers,
blocks_requested,
};

let mut registry = global_registry().registry.lock();
registry.register(
Expand All @@ -19,6 +27,12 @@ impl P2PMetrics {
metrics.unique_peers.clone(),
);

registry.register(
"Blocks_Requested",
"A Gauge which keeps track of how many blocks were requested and served over the p2p req/res protocol",
metrics.blocks_requested.clone()
);

metrics
}
}
Expand All @@ -28,3 +42,11 @@ static P2P_METRICS: OnceLock<P2PMetrics> = OnceLock::new();
pub fn p2p_metrics() -> &'static P2PMetrics {
P2P_METRICS.get_or_init(P2PMetrics::new)
}

pub fn increment_unique_peers() {
p2p_metrics().unique_peers.inc();
}

pub fn set_blocks_requested(count: usize) {
p2p_metrics().blocks_requested.set(count as i64);
}
15 changes: 11 additions & 4 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
},
TryPeerId,
};
use fuel_core_metrics::p2p_metrics::p2p_metrics;
use fuel_core_metrics::p2p_metrics::increment_unique_peers;
use fuel_core_types::{
fuel_types::BlockHeight,
services::p2p::peer_reputation::AppScore,
Expand Down Expand Up @@ -271,6 +271,15 @@ impl FuelP2PService {
}
}

pub fn update_metrics<T>(&self, update_fn: T)
where
T: FnOnce(),
{
if self.metrics {
update_fn();
}
}

#[cfg(feature = "test-helpers")]
pub fn multiaddrs(&self) -> Vec<Multiaddr> {
let local_peer = self.local_peer_id;
Expand Down Expand Up @@ -644,9 +653,7 @@ impl FuelP2PService {
fn handle_identify_event(&mut self, event: identify::Event) -> Option<FuelP2PEvent> {
match event {
identify::Event::Received { peer_id, info } => {
if self.metrics {
p2p_metrics().unique_peers.inc();
}
self.update_metrics(increment_unique_peers);

let mut addresses = info.listen_addrs;
let agent_version = info.agent_version;
Expand Down
31 changes: 30 additions & 1 deletion crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
},
};
use anyhow::anyhow;
use fuel_core_metrics::p2p_metrics::set_blocks_requested;
use fuel_core_services::{
stream::BoxStream,
RunnableService,
Expand Down Expand Up @@ -196,9 +197,20 @@ pub trait TaskP2PService: Send {
) -> anyhow::Result<()>;

fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()>;

fn update_metrics<T>(&self, update_fn: T)
netrome marked this conversation as resolved.
Show resolved Hide resolved
where
T: FnOnce();
}

impl TaskP2PService for FuelP2PService {
fn update_metrics<T>(&self, update_fn: T)
where
T: FnOnce(),
{
FuelP2PService::update_metrics(self, update_fn)
}

fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> {
self.peer_manager().get_all_peers().collect()
}
Expand Down Expand Up @@ -427,6 +439,13 @@ where
V: AtomicView + 'static,
V::LatestView: P2pDb,
{
fn update_metrics<T>(&self, update_fn: T)
where
T: FnOnce(),
{
self.p2p_service.update_metrics(update_fn)
}

fn process_request(
&mut self,
request_message: RequestMessage,
Expand Down Expand Up @@ -464,8 +483,11 @@ where
// If there are other types of data we send over p2p req/res protocol, then this needs
// to be generalized
let max_len = self.max_headers_per_request;
let range_len = range.len();

self.update_metrics(|| set_blocks_requested(range_len));

if range.len() > max_len {
if range_len > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
Expand Down Expand Up @@ -1031,6 +1053,13 @@ pub mod tests {
}

impl TaskP2PService for FakeP2PService {
fn update_metrics<T>(&self, _: T)
where
T: FnOnce(),
{
unimplemented!()
}

fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> {
self.peer_info.iter().map(|tup| (&tup.0, &tup.1)).collect()
}
Expand Down
Loading