Skip to content

Commit

Permalink
[Product Data] First step in gateway usage data collection (#4963)
Browse files Browse the repository at this point in the history
* add stats model

* add stats collection

* add stats route

* propagate stuff and run stuff

* cargo stuff

* sqlx unused what?

* add sessions started stat

* session durations in miliseconds

* apply Jon's comments

* [Product Data] Second step in gateway usage data collection  (#4964)

* turn stats collection into event based

* move events into a common crate for future use elsewhere

* apply Jon's comments
  • Loading branch information
simonwicky authored Oct 15, 2024
1 parent 1fc7e07 commit 435f236
Show file tree
Hide file tree
Showing 17 changed files with 394 additions and 13 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ members = [
"common/socks5-client-core",
"common/socks5/proxy-helpers",
"common/socks5/requests",
"common/statistics",
"common/store-cipher",
"common/task",
"common/topology",
Expand Down
16 changes: 16 additions & 0 deletions common/statistics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2024 - Nym Technologies SA <[email protected]>
# SPDX-License-Identifier: Apache-2.0

[package]
name = "nym-statistics-common"
version = "0.1.0"
edition.workspace = true
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = { workspace = true }
time = { workspace = true }

nym-sphinx = { path = "../nymsphinx" }
39 changes: 39 additions & 0 deletions common/statistics/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

use futures::channel::mpsc;
use nym_sphinx::DestinationAddressBytes;
use time::OffsetDateTime;

pub type StatsEventSender = mpsc::UnboundedSender<StatsEvent>;
pub type StatsEventReceiver = mpsc::UnboundedReceiver<StatsEvent>;
pub enum StatsEvent {
SessionStatsEvent(SessionEvent),
}

impl StatsEvent {
pub fn new_session_start(client: DestinationAddressBytes) -> StatsEvent {
StatsEvent::SessionStatsEvent(SessionEvent::SessionStart {
start_time: OffsetDateTime::now_utc(),
client,
})
}

pub fn new_session_stop(client: DestinationAddressBytes) -> StatsEvent {
StatsEvent::SessionStatsEvent(SessionEvent::SessionStop {
stop_time: OffsetDateTime::now_utc(),
client,
})
}
}

pub enum SessionEvent {
SessionStart {
start_time: OffsetDateTime,
client: DestinationAddressBytes,
},
SessionStop {
stop_time: OffsetDateTime,
client: DestinationAddressBytes,
},
}
4 changes: 4 additions & 0 deletions common/statistics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright 2024 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

pub mod events;
9 changes: 2 additions & 7 deletions gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,9 @@ rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
si-scale = { workspace = true }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
"time",
] }
subtle-encoding = { workspace = true, features = ["bech32-preview"] }
thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true, features = [
"rt-multi-thread",
"net",
Expand Down Expand Up @@ -83,6 +77,7 @@ nym-network-requester = { path = "../service-providers/network-requester" }
nym-node-http-api = { path = "../nym-node/nym-node-http-api" }
nym-pemstore = { path = "../common/pemstore" }
nym-sphinx = { path = "../common/nymsphinx" }
nym-statistics-common = { path = "../common/statistics" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
Expand Down
18 changes: 17 additions & 1 deletion gateway/src/node/client_handling/active_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use super::websocket::message_receiver::{IsActiveRequestSender, MixMessageSender
use crate::node::client_handling::embedded_clients::LocalEmbeddedClientHandle;
use dashmap::DashMap;
use nym_sphinx::DestinationAddressBytes;
use nym_statistics_common::events;
use nym_statistics_common::events::StatsEventSender;
use std::sync::Arc;
use tracing::warn;

Expand Down Expand Up @@ -35,6 +37,7 @@ impl ActiveClient {
#[derive(Clone)]
pub(crate) struct ActiveClientsStore {
inner: Arc<DashMap<DestinationAddressBytes, ActiveClient>>,
stats_event_sender: StatsEventSender,
}

#[derive(Clone)]
Expand All @@ -48,9 +51,10 @@ pub(crate) struct ClientIncomingChannels {

impl ActiveClientsStore {
/// Creates new instance of `ActiveClientsStore` to store in-memory handles to all currently connected clients.
pub(crate) fn new() -> Self {
pub(crate) fn new(stats_event_sender: StatsEventSender) -> Self {
ActiveClientsStore {
inner: Arc::new(DashMap::new()),
stats_event_sender,
}
}

Expand Down Expand Up @@ -126,6 +130,12 @@ impl ActiveClientsStore {
/// * `client`: address of the client for which to remove the handle.
pub(crate) fn disconnect(&self, client: DestinationAddressBytes) {
self.inner.remove(&client);
if let Err(e) = self
.stats_event_sender
.unbounded_send(events::StatsEvent::new_session_stop(client))
{
warn!("Failed to send session stop event to collector : {e}")
};
}

/// Insert new client handle into the store.
Expand All @@ -147,6 +157,12 @@ impl ActiveClientsStore {
if self.inner.insert(client, entry).is_some() {
panic!("inserted a duplicate remote client")
}
if let Err(e) = self
.stats_event_sender
.unbounded_send(events::StatsEvent::new_session_start(client))
{
warn!("Failed to send session start event to collector : {e}")
};
}

/// Inserts a handle to the embedded client
Expand Down
32 changes: 31 additions & 1 deletion gateway/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
use nym_network_defaults::NymNetworkDetails;
use nym_network_requester::{LocalGateway, NRServiceProviderBuilder, RequestFilter};
use nym_node_http_api::state::metrics::SharedSessionStats;
use nym_statistics_common::events;
use nym_task::{TaskClient, TaskHandle, TaskManager};
use nym_types::gateway::GatewayNodeDetailsResponse;
use nym_validator_client::nyxd::{Coin, CosmWasmClient};
use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient};
use rand::seq::SliceRandom;
use rand::thread_rng;
use statistics::GatewayStatisticsCollector;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -36,6 +39,7 @@ use tracing::*;
pub(crate) mod client_handling;
pub(crate) mod helpers;
pub(crate) mod mixnet_handling;
pub(crate) mod statistics;

pub use nym_gateway_storage::{PersistentStorage, Storage};

Expand Down Expand Up @@ -147,6 +151,8 @@ pub struct Gateway<St = PersistentStorage> {

wireguard_data: Option<nym_wireguard::WireguardData>,

session_stats: Option<SharedSessionStats>,

run_http_server: bool,
task_client: Option<TaskClient>,
}
Expand All @@ -168,6 +174,7 @@ impl<St> Gateway<St> {
ip_packet_router_opts,
authenticator_opts: None,
wireguard_data: None,
session_stats: None,
run_http_server: true,
task_client: None,
})
Expand All @@ -191,6 +198,7 @@ impl<St> Gateway<St> {
sphinx_keypair,
storage,
wireguard_data: None,
session_stats: None,
run_http_server: true,
task_client: None,
}
Expand All @@ -204,6 +212,10 @@ impl<St> Gateway<St> {
self.task_client = Some(task_client)
}

pub fn set_session_stats(&mut self, session_stats: SharedSessionStats) {
self.session_stats = Some(session_stats);
}

pub fn set_wireguard_data(&mut self, wireguard_data: nym_wireguard::WireguardData) {
self.wireguard_data = Some(wireguard_data)
}
Expand Down Expand Up @@ -393,6 +405,19 @@ impl<St> Gateway<St> {
packet_sender
}

fn start_stats_collector(
&self,
shared_session_stats: SharedSessionStats,
shutdown: TaskClient,
) -> events::StatsEventSender {
info!("Starting gateway stats collector...");

let (mut stats_collector, stats_event_sender) =
GatewayStatisticsCollector::new(shared_session_stats);
tokio::spawn(async move { stats_collector.run(shutdown).await });
stats_event_sender
}

// TODO: rethink the logic in this function...
async fn start_network_requester(
&self,
Expand Down Expand Up @@ -599,6 +624,11 @@ impl<St> Gateway<St> {
return Err(GatewayError::InsufficientNodeBalance { account, balance });
}
}
let shared_session_stats = self.session_stats.take().unwrap_or_default();
let stats_event_sender = self.start_stats_collector(
shared_session_stats,
shutdown.fork("statistics::GatewayStatisticsCollector"),
);

let handler_config = CredentialHandlerConfig {
revocation_bandwidth_penalty: self
Expand Down Expand Up @@ -629,7 +659,7 @@ impl<St> Gateway<St> {

let mix_forwarding_channel = self.start_packet_forwarder(shutdown.fork("PacketForwarder"));

let active_clients_store = ActiveClientsStore::new();
let active_clients_store = ActiveClientsStore::new(stats_event_sender.clone());
self.start_mix_socket_listener(
mix_forwarding_channel.clone(),
active_clients_store.clone(),
Expand Down
63 changes: 63 additions & 0 deletions gateway/src/node/statistics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2022 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

use futures::{channel::mpsc, StreamExt};
use nym_node_http_api::state::metrics::SharedSessionStats;
use nym_statistics_common::events::{StatsEvent, StatsEventReceiver, StatsEventSender};
use nym_task::TaskClient;
use sessions::SessionStatsHandler;
use std::time::Duration;
use time::OffsetDateTime;
use tracing::trace;

pub mod sessions;

const STATISTICS_UPDATE_TIMER_INTERVAL: Duration = Duration::from_secs(3600); //update timer, no need to check everytime

pub(crate) struct GatewayStatisticsCollector {
stats_event_rx: StatsEventReceiver,
session_stats: SessionStatsHandler,
//here goes additionnal stats handler
}

impl GatewayStatisticsCollector {
pub fn new(
shared_session_stats: SharedSessionStats,
) -> (GatewayStatisticsCollector, StatsEventSender) {
let (stats_event_tx, stats_event_rx) = mpsc::unbounded();
let collector = GatewayStatisticsCollector {
stats_event_rx,
session_stats: SessionStatsHandler::new(shared_session_stats),
};
(collector, stats_event_tx)
}

async fn update_shared_state(&mut self, update_time: OffsetDateTime) {
self.session_stats.update_shared_state(update_time).await;
//here goes additionnal stats handler update
}

pub async fn run(&mut self, mut shutdown: TaskClient) {
let mut update_interval = tokio::time::interval(STATISTICS_UPDATE_TIMER_INTERVAL);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
trace!("StatisticsCollector: Received shutdown");
},
_ = update_interval.tick() => {
let now = OffsetDateTime::now_utc();
self.update_shared_state(now).await;
},

Some(stat_event) = self.stats_event_rx.next() => {
//dispatching event to proper handler
match stat_event {
StatsEvent::SessionStatsEvent(event) => self.session_stats.handle_event(event),
}
},

}
}
}
}
Loading

0 comments on commit 435f236

Please sign in to comment.