Skip to content

Commit

Permalink
integrate stats scaffolding changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jmwample committed Nov 5, 2024
1 parent 9708485 commit 6f53b3a
Show file tree
Hide file tree
Showing 22 changed files with 758 additions and 397 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/client-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ features = ["wasm-bindgen"]

[dev-dependencies]
tempfile = { workspace = true }
pretty_env_logger = { workspace = true}

[features]
default = []
Expand Down
60 changes: 31 additions & 29 deletions common/client-core/src/client/base_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Copyright 2022-2023 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use super::packet_statistics_control::PacketStatisticsReporter;
use super::received_buffer::ReceivedBufferMessage;
use super::statistics::StatisticsControl;
use super::statistics_control::StatisticsControl;
use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
Expand All @@ -13,7 +12,6 @@ use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
use crate::client::packet_statistics_control::PacketStatisticsControl;
use crate::client::real_messages_control;
use crate::client::real_messages_control::RealMessagesController;
use crate::client::received_buffer::{
Expand Down Expand Up @@ -50,7 +48,7 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_sphinx::params::PacketType;
use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
use nym_statistics_common::clients::ClientStatsReporter;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_task::{TaskClient, TaskHandle};
use nym_topology::provider_trait::TopologyProvider;
Expand All @@ -61,6 +59,7 @@ use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use url::Url;

#[cfg(all(
Expand Down Expand Up @@ -283,7 +282,7 @@ where
self_address: Recipient,
topology_accessor: TopologyAccessor,
mix_tx: BatchMixMessageSender,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
shutdown: TaskClient,
) {
info!("Starting loop cover traffic stream...");
Expand Down Expand Up @@ -316,7 +315,7 @@ where
client_connection_rx: ConnectionCommandReceiver,
shutdown: TaskClient,
packet_type: PacketType,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
) {
info!("Starting real traffic stream...");

Expand Down Expand Up @@ -345,7 +344,7 @@ where
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
shutdown: TaskClient,
packet_statistics_control: PacketStatisticsReporter,
metrics_reporter: ClientStatsSender,
) {
info!("Starting received messages buffer controller...");
let controller: ReceivedMessagesBufferController<SphinxMessageReceiver> =
Expand All @@ -355,7 +354,7 @@ where
mixnet_receiver,
reply_key_storage,
reply_controller_sender,
packet_statistics_control,
metrics_reporter,
);
controller.start_with_shutdown(shutdown)
}
Expand Down Expand Up @@ -596,11 +595,21 @@ where
Ok(())
}

fn start_packet_statistics_control(shutdown: TaskClient) -> PacketStatisticsReporter {
fn start_statistics_control(
stats_reporting_addres: Option<Recipient>,
input_sender: Sender<InputMessage>,
shutdown: TaskClient,
) -> ClientStatsSender {
info!("Starting packet statistics control...");
let (packet_statistics_control, packet_stats_reporter) = PacketStatisticsControl::new();
packet_statistics_control.start_with_shutdown(shutdown);
packet_stats_reporter
match stats_reporting_addres {
Some(reporting_address) => {
let (stats_control, stats_reporter) =
StatisticsControl::new(reporting_address, input_sender.clone());
stats_control.start_with_shutdown(shutdown.fork("statistics_control"));
stats_reporter
}
None => ClientStatsSender::sink(),
}
}

fn start_mix_traffic_controller(
Expand Down Expand Up @@ -730,6 +739,12 @@ where
self.user_agent.clone(),
);

let stats_reporter = Self::start_statistics_control(
self.stats_reporting_address,
input_sender.clone(),
shutdown.fork("statistics_control"),
);

// needs to be started as the first thing to block if required waiting for the gateway
Self::start_topology_refresher(
topology_provider,
Expand All @@ -741,9 +756,6 @@ where
)
.await?;

let packet_stats_reporter =
Self::start_packet_statistics_control(shutdown.fork("packet_statistics_control"));

let gateway_packet_router = PacketRouter::new(
ack_sender,
mixnet_messages_sender,
Expand Down Expand Up @@ -775,7 +787,7 @@ where
reply_storage.key_storage(),
reply_controller_sender.clone(),
shutdown.fork("received_messages_buffer"),
packet_stats_reporter.clone(),
stats_reporter.clone(),
);

// The message_sender is the transmitter for any component generating sphinx packets
Expand Down Expand Up @@ -814,7 +826,7 @@ where
client_connection_rx,
shutdown.fork("real_traffic_controller"),
self.config.debug.traffic.packet_type,
packet_stats_reporter.clone(),
stats_reporter.clone(),
);

if !self
Expand All @@ -829,21 +841,11 @@ where
self_address,
shared_topology_accessor.clone(),
message_sender,
packet_stats_reporter,
stats_reporter.clone(),
shutdown.fork("cover_traffic_stream"),
);
}

let stats_reporter = match self.stats_reporting_address {
Some(reporting_address) => {
let (stats_control, stats_reporter) =
StatisticsControl::new(reporting_address, input_sender.clone());
stats_control.start_with_shutdown(shutdown.fork("statistics_control"));
Some(stats_reporter)
}
None => None,
};

debug!("Core client startup finished!");
debug!("The address of this client is: {self_address}");

Expand Down Expand Up @@ -879,7 +881,7 @@ pub struct BaseClient {
pub client_input: ClientInputStatus,
pub client_output: ClientOutputStatus,
pub client_state: ClientState,
pub stats_reporter: Option<ClientStatsReporter>,
pub stats_reporter: ClientStatsSender,

pub task_handle: TaskHandle,
}
12 changes: 6 additions & 6 deletions common/client-core/src/client/cover_traffic_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter};
use crate::client::topology_control::TopologyAccessor;
use crate::{config, spawn_future};
use futures::task::{Context, Poll};
Expand All @@ -13,6 +12,7 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -63,7 +63,7 @@ where

packet_type: PacketType,

stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
}

impl<R> Stream for LoopCoverTrafficStream<R>
Expand Down Expand Up @@ -109,7 +109,7 @@ impl LoopCoverTrafficStream<OsRng> {
topology_access: TopologyAccessor,
traffic_config: config::Traffic,
cover_config: config::CoverTraffic,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
) -> Self {
let rng = OsRng;

Expand Down Expand Up @@ -198,9 +198,9 @@ impl LoopCoverTrafficStream<OsRng> {
}
}
} else {
self.stats_tx.report(PacketStatisticsEvent::CoverPacketSent(
cover_traffic_packet_size.size(),
));
self.stats_tx.report(
PacketStatisticsEvent::CoverPacketSent(cover_traffic_packet_size.size()).into(),
);
}

// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
Expand Down
3 changes: 1 addition & 2 deletions common/client-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ pub(crate) mod helpers;
pub mod inbound_messages;
pub mod key_manager;
pub mod mix_traffic;
pub(crate) mod packet_statistics_control;
pub mod real_messages_control;
pub mod received_buffer;
pub mod replies;
pub mod statistics;
pub mod statistics_control;
pub mod topology_control;
pub(crate) mod transmission_buffer;
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2021 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter};

use super::action_controller::{AckActionSender, Action};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};

use futures::StreamExt;
use log::*;
use nym_gateway_client::AcknowledgementReceiver;
Expand All @@ -19,15 +19,15 @@ pub(super) struct AcknowledgementListener {
ack_key: Arc<AckKey>,
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
}

impl AcknowledgementListener {
pub(super) fn new(
ack_key: Arc<AckKey>,
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
) -> Self {
AcknowledgementListener {
ack_key,
Expand All @@ -40,7 +40,7 @@ impl AcknowledgementListener {
async fn on_ack(&mut self, ack_content: Vec<u8>) {
trace!("Received an ack");
self.stats_tx
.report(PacketStatisticsEvent::AckReceived(ack_content.len()));
.report(PacketStatisticsEvent::AckReceived(ack_content.len()).into());

let frag_id = match recover_identifier(&self.ack_key, &ack_content)
.map(FragmentIdentifier::try_from_bytes)
Expand All @@ -57,13 +57,13 @@ impl AcknowledgementListener {
if frag_id == COVER_FRAG_ID {
trace!("Received an ack for a cover message - no need to do anything");
self.stats_tx
.report(PacketStatisticsEvent::CoverAckReceived(ack_content.len()));
.report(PacketStatisticsEvent::CoverAckReceived(ack_content.len()).into());
return;
}

trace!("Received {} from the mix network", frag_id);
self.stats_tx
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()));
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
self.action_sender
.unbounded_send(Action::new_remove(frag_id))
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use self::{
sent_notification_listener::SentNotificationListener,
};
use crate::client::inbound_messages::InputMessageReceiver;
use crate::client::packet_statistics_control::PacketStatisticsReporter;
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller::ReplyControllerSender;
use crate::spawn_future;
Expand All @@ -24,6 +23,7 @@ use nym_sphinx::{
chunking::fragment::{Fragment, FragmentIdentifier},
Delay as SphinxDelay,
};
use nym_statistics_common::clients::ClientStatsSender;
use rand::{CryptoRng, Rng};
use std::{
sync::{Arc, Weak},
Expand Down Expand Up @@ -209,7 +209,7 @@ where
connectors: AcknowledgementControllerConnectors,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
) -> Self {
let (retransmission_tx, retransmission_rx) = mpsc::unbounded();

Expand Down
4 changes: 2 additions & 2 deletions common/client-core/src/client/real_messages_control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::client::replies::reply_controller;
use crate::config;
pub(crate) use acknowledgement_control::{AckActionSender, Action};

use super::packet_statistics_control::PacketStatisticsReporter;
use nym_statistics_common::clients::ClientStatsSender;

pub(crate) mod acknowledgement_control;
pub(crate) mod message_handler;
Expand Down Expand Up @@ -145,7 +145,7 @@ impl RealMessagesController<OsRng> {
reply_controller_receiver: ReplyControllerReceiver,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
) -> Self {
let rng = OsRng;

Expand Down
Loading

0 comments on commit 6f53b3a

Please sign in to comment.