From f54246261cb6315706ef1f4513d366f9e0fcfcda Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 15:55:07 +0200 Subject: [PATCH 01/14] add stats storage to gateways --- gateway/Cargo.toml | 1 + gateway/src/config/persistence/paths.rs | 10 +- gateway/src/config/template.rs | 3 + gateway/src/error.rs | 7 + gateway/src/node/helpers.rs | 9 + gateway/src/node/mod.rs | 33 +- nym-node/src/config/old_configs/mod.rs | 2 + .../src/config/old_configs/old_config_v3.rs | 92 +- .../src/config/old_configs/old_config_v4.rs | 1422 +++++++++++++++++ nym-node/src/config/persistence.rs | 9 + nym-node/src/config/upgrade_helpers.rs | 3 +- nym-node/src/node/mod.rs | 15 + 12 files changed, 1536 insertions(+), 70 deletions(-) create mode 100644 nym-node/src/config/old_configs/old_config_v4.rs diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index f910f13b6e..ccfabc8c96 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -69,6 +69,7 @@ nym-credentials-interface = { path = "../common/credentials-interface" } nym-credential-verification = { path = "../common/credential-verification" } nym-crypto = { path = "../common/crypto" } nym-gateway-storage = { path = "../common/gateway-storage" } +nym-gateway-stats-storage = { path = "../common/gateway-stats-storage" } nym-gateway-requests = { path = "../common/gateway-requests" } nym-mixnet-client = { path = "../common/client-libs/mixnet-client" } nym-mixnode-common = { path = "../common/mixnode-common" } diff --git a/gateway/src/config/persistence/paths.rs b/gateway/src/config/persistence/paths.rs index c416bce4b2..45a989615a 100644 --- a/gateway/src/config/persistence/paths.rs +++ b/gateway/src/config/persistence/paths.rs @@ -12,6 +12,7 @@ pub const DEFAULT_PRIVATE_SPHINX_KEY_FILENAME: &str = "private_sphinx.pem"; pub const DEFAULT_PUBLIC_SPHINX_KEY_FILENAME: &str = "public_sphinx.pem"; pub const DEFAULT_CLIENTS_STORAGE_FILENAME: &str = "db.sqlite"; +pub const DEFAULT_STATS_STORAGE_FILENAME: &str = "stats.sqlite"; pub const DEFAULT_NETWORK_REQUESTER_CONFIG_FILENAME: &str = "network_requester_config.toml"; pub const DEFAULT_NETWORK_REQUESTER_DATA_DIR: &str = "network-requester-data"; @@ -39,6 +40,10 @@ pub struct GatewayPaths { #[serde(alias = "persistent_storage")] pub clients_storage: PathBuf, + /// Path to sqlite database containing all persistent stats data. + #[serde(alias = "persistent_stats_storage")] + pub stats_storage: PathBuf, + /// Path to the configuration of the embedded network requester. #[serde(deserialize_with = "de_maybe_stringified")] pub network_requester_config: Option, @@ -54,7 +59,9 @@ impl GatewayPaths { pub fn new_default>(id: P) -> Self { GatewayPaths { keys: KeysPaths::new_default(id.as_ref()), - clients_storage: default_data_directory(id).join(DEFAULT_CLIENTS_STORAGE_FILENAME), + clients_storage: default_data_directory(id.as_ref()) + .join(DEFAULT_CLIENTS_STORAGE_FILENAME), + stats_storage: default_data_directory(id).join(DEFAULT_STATS_STORAGE_FILENAME), // node_description: default_config_filepath(id).join(DEFAULT_DESCRIPTION_FILENAME), network_requester_config: None, ip_packet_router_config: None, @@ -70,6 +77,7 @@ impl GatewayPaths { public_sphinx_key_file: Default::default(), }, clients_storage: Default::default(), + stats_storage: Default::default(), network_requester_config: None, ip_packet_router_config: None, } diff --git a/gateway/src/config/template.rs b/gateway/src/config/template.rs index 2f1a34d822..3c9f526270 100644 --- a/gateway/src/config/template.rs +++ b/gateway/src/config/template.rs @@ -98,6 +98,9 @@ keys.public_sphinx_key_file = '{{ storage_paths.keys.public_sphinx_key_file }}' # derived shared keys and available client bandwidths. clients_storage = '{{ storage_paths.clients_storage }}' +#Path to sqlite database containing all persistent stats data. +stats_storage = '{{ storage_paths.stats_storage }}' + # Path to the configuration of the embedded network requester. network_requester_config = '{{ storage_paths.network_requester_config }}' diff --git a/gateway/src/error.rs b/gateway/src/error.rs index dc37c4c710..ca2c9fd7bd 100644 --- a/gateway/src/error.rs +++ b/gateway/src/error.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-3.0-only use nym_authenticator::error::AuthenticatorError; +use nym_gateway_stats_storage::error::StatsStorageError; use nym_gateway_storage::error::StorageError; use nym_ip_packet_router::error::IpPacketRouterError; use nym_network_requester::error::{ClientCoreError, NetworkRequesterError}; @@ -115,6 +116,12 @@ pub enum GatewayError { source: StorageError, }, + #[error("stats storage failure: {source}")] + StatsStorageError { + #[from] + source: StatsStorageError, + }, + #[error("Path to network requester configuration file hasn't been specified. Perhaps try to run `setup-network-requester`?")] UnspecifiedNetworkRequesterConfig, diff --git a/gateway/src/node/helpers.rs b/gateway/src/node/helpers.rs index 854254e215..80f504dae1 100644 --- a/gateway/src/node/helpers.rs +++ b/gateway/src/node/helpers.rs @@ -5,6 +5,7 @@ use crate::config::Config; use crate::error::GatewayError; use nym_crypto::asymmetric::encryption; +use nym_gateway_stats_storage::PersistentStatsStorage; use nym_gateway_storage::PersistentStorage; use nym_pemstore::traits::PemStorableKeyPair; use nym_pemstore::KeyPairPath; @@ -74,6 +75,14 @@ pub(crate) async fn initialise_main_storage( Ok(PersistentStorage::init(path, retrieval_limit).await?) } +pub(crate) async fn initialise_stats_storage( + config: &Config, +) -> Result { + let path = &config.storage_paths.stats_storage; + + Ok(PersistentStatsStorage::init(path).await?) +} + pub fn load_keypair( paths: KeyPairPath, name: impl Into, diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index c095f1fa86..1948661ed9 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -12,7 +12,9 @@ use crate::http::HttpApiBuilder; use crate::node::client_handling::active_clients::ActiveClientsStore; use crate::node::client_handling::embedded_clients::{LocalEmbeddedClientHandle, MessageRouter}; use crate::node::client_handling::websocket; -use crate::node::helpers::{initialise_main_storage, load_network_requester_config}; +use crate::node::helpers::{ + initialise_main_storage, initialise_stats_storage, load_network_requester_config, +}; use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler; use futures::channel::{mpsc, oneshot}; use nym_credential_verification::ecash::{ @@ -41,6 +43,7 @@ pub(crate) mod helpers; pub(crate) mod mixnet_handling; pub(crate) mod statistics; +pub use nym_gateway_stats_storage::PersistentStatsStorage; pub use nym_gateway_storage::{PersistentStorage, Storage}; // TODO: should this struct live here? @@ -96,6 +99,8 @@ pub async fn create_gateway( let storage = initialise_main_storage(&config).await?; + let stats_storage = initialise_stats_storage(&config).await?; + let nr_opts = network_requester_config.map(|config| LocalNetworkRequesterOpts { config: config.clone(), custom_mixnet_path: custom_mixnet.clone(), @@ -106,7 +111,7 @@ pub async fn create_gateway( custom_mixnet_path: custom_mixnet.clone(), }); - Gateway::new(config, nr_opts, ip_opts, storage) + Gateway::new(config, nr_opts, ip_opts, storage, stats_storage) } #[derive(Debug, Clone)] @@ -147,7 +152,9 @@ pub struct Gateway { /// x25519 keypair used for Diffie-Hellman. Currently only used for sphinx key derivation. sphinx_keypair: Arc, - storage: St, + client_storage: St, + + stats_storage: PersistentStatsStorage, wireguard_data: Option, @@ -163,10 +170,12 @@ impl Gateway { config: Config, network_requester_opts: Option, ip_packet_router_opts: Option, - storage: St, + client_storage: St, + stats_storage: PersistentStatsStorage, ) -> Result { Ok(Gateway { - storage, + client_storage, + stats_storage, identity_keypair: Arc::new(load_identity_keys(&config)?), sphinx_keypair: Arc::new(helpers::load_sphinx_keys(&config)?), config, @@ -179,7 +188,7 @@ impl Gateway { task_client: None, }) } - + #[allow(clippy::too_many_arguments)] pub fn new_loaded( config: Config, network_requester_opts: Option, @@ -187,7 +196,8 @@ impl Gateway { authenticator_opts: Option, identity_keypair: Arc, sphinx_keypair: Arc, - storage: St, + client_storage: St, + stats_storage: PersistentStatsStorage, ) -> Self { Gateway { config, @@ -196,7 +206,8 @@ impl Gateway { authenticator_opts, identity_keypair, sphinx_keypair, - storage, + client_storage, + stats_storage, wireguard_data: None, session_stats: None, run_http_server: true, @@ -240,7 +251,7 @@ impl Gateway { let connection_handler = ConnectionHandler::new( packet_processor, - self.storage.clone(), + self.client_storage.clone(), ack_sender, active_clients_store, ); @@ -377,7 +388,7 @@ impl Gateway { let shared_state = websocket::CommonHandlerState { ecash_verifier, - storage: self.storage.clone(), + storage: self.client_storage.clone(), local_identity: Arc::clone(&self.identity_keypair), only_coconut_credentials: self.config.gateway.only_coconut_credentials, bandwidth_cfg: (&self.config).into(), @@ -654,7 +665,7 @@ impl Gateway { nyxd_client, self.identity_keypair.public_key().to_bytes(), shutdown.fork("EcashVerifier"), - self.storage.clone(), + self.client_storage.clone(), ) .await?, ); diff --git a/nym-node/src/config/old_configs/mod.rs b/nym-node/src/config/old_configs/mod.rs index 295cf46ed3..3b1f589858 100644 --- a/nym-node/src/config/old_configs/mod.rs +++ b/nym-node/src/config/old_configs/mod.rs @@ -4,7 +4,9 @@ mod old_config_v1; mod old_config_v2; mod old_config_v3; +mod old_config_v4; pub use old_config_v1::try_upgrade_config_v1; pub use old_config_v2::try_upgrade_config_v2; pub use old_config_v3::try_upgrade_config_v3; +pub use old_config_v4::try_upgrade_config_v4; diff --git a/nym-node/src/config/old_configs/old_config_v3.rs b/nym-node/src/config/old_configs/old_config_v3.rs index f13cfdf4a0..01ce8a9e49 100644 --- a/nym-node/src/config/old_configs/old_config_v3.rs +++ b/nym-node/src/config/old_configs/old_config_v3.rs @@ -4,13 +4,7 @@ #![allow(dead_code)] use crate::{config::*, error::KeyIOFailure}; -use entry_gateway::Debug as EntryGatewayConfigDebug; -use exit_gateway::{IpPacketRouter, IpPacketRouterDebug, NetworkRequester, NetworkRequesterDebug}; -use mixnode::{Verloc, VerlocDebug}; -use nym_client_core_config_types::{ - disk_persistence::{ClientKeysPaths, CommonClientPaths}, - DebugConfig as ClientDebugConfig, -}; +use nym_client_core_config_types::DebugConfig as ClientDebugConfig; use nym_config::serde_helpers::de_maybe_port; use nym_crypto::asymmetric::{ed25519, x25519}; use nym_network_requester::{ @@ -19,6 +13,7 @@ use nym_network_requester::{ }; use nym_pemstore::{store_key, store_keypair}; use nym_sphinx_acknowledgements::AckKey; +use old_configs::old_config_v4::*; use persistence::*; use rand::rngs::OsRng; use serde::{Deserialize, Serialize}; @@ -90,12 +85,12 @@ pub enum NodeModeV3 { ExitGateway, } -impl From for NodeMode { +impl From for NodeModeV4 { fn from(config: NodeModeV3) -> Self { match config { - NodeModeV3::Mixnode => NodeMode::Mixnode, - NodeModeV3::EntryGateway => NodeMode::EntryGateway, - NodeModeV3::ExitGateway => NodeMode::ExitGateway, + NodeModeV3::Mixnode => NodeModeV4::Mixnode, + NodeModeV3::EntryGateway => NodeModeV4::EntryGateway, + NodeModeV3::ExitGateway => NodeModeV4::ExitGateway, } } } @@ -601,23 +596,6 @@ impl AuthenticatorPathsV3 { } } - pub fn to_common_client_paths(&self) -> CommonClientPaths { - CommonClientPaths { - keys: ClientKeysPaths { - private_identity_key_file: self.private_ed25519_identity_key_file.clone(), - public_identity_key_file: self.public_ed25519_identity_key_file.clone(), - private_encryption_key_file: self.private_x25519_diffie_hellman_key_file.clone(), - public_encryption_key_file: self.public_x25519_diffie_hellman_key_file.clone(), - ack_key_file: self.ack_key_file.clone(), - }, - gateway_registrations: self.gateway_registrations.clone(), - - // not needed for embedded providers - credentials_database: Default::default(), - reply_surb_database: self.reply_surb_database.clone(), - } - } - pub fn ed25519_identity_storage_paths(&self) -> nym_pemstore::KeyPairPath { nym_pemstore::KeyPairPath::new( &self.private_ed25519_identity_key_file, @@ -963,7 +941,7 @@ pub async fn initialise( pub async fn try_upgrade_config_v3>( path: P, prev_config: Option, -) -> Result { +) -> Result { tracing::debug!("Updating from 1.1.4"); let old_cfg = if let Some(prev_config) = prev_config { prev_config @@ -981,21 +959,21 @@ pub async fn try_upgrade_config_v3>( .ok_or(NymNodeError::DataDirDerivationFailure)?, ); - let cfg = Config { + let cfg = ConfigV4 { save_path: old_cfg.save_path, id: old_cfg.id, mode: old_cfg.mode.into(), - host: Host { + host: HostV4 { public_ips: old_cfg.host.public_ips, hostname: old_cfg.host.hostname, location: old_cfg.host.location, }, - mixnet: Mixnet { + mixnet: MixnetV4 { bind_address: old_cfg.mixnet.bind_address, announce_port: None, nym_api_urls: old_cfg.mixnet.nym_api_urls, nyxd_urls: old_cfg.mixnet.nyxd_urls, - debug: MixnetDebug { + debug: MixnetDebugV4 { packet_forwarding_initial_backoff: old_cfg .mixnet .debug @@ -1009,8 +987,8 @@ pub async fn try_upgrade_config_v3>( unsafe_disable_noise: old_cfg.mixnet.debug.unsafe_disable_noise, }, }, - storage_paths: NymNodePaths { - keys: KeysPaths { + storage_paths: NymNodePathsV4 { + keys: KeysPathsV4 { private_ed25519_identity_key_file: old_cfg .storage_paths .keys @@ -1038,7 +1016,7 @@ pub async fn try_upgrade_config_v3>( }, description: old_cfg.storage_paths.description, }, - http: Http { + http: HttpV4 { bind_address: old_cfg.http.bind_address, landing_page_assets_path: old_cfg.http.landing_page_assets_path, access_token: old_cfg.http.access_token, @@ -1046,13 +1024,13 @@ pub async fn try_upgrade_config_v3>( expose_system_hardware: old_cfg.http.expose_system_hardware, expose_crypto_hardware: old_cfg.http.expose_crypto_hardware, }, - wireguard: Wireguard { + wireguard: WireguardV4 { enabled: old_cfg.wireguard.enabled, bind_address: old_cfg.wireguard.bind_address, private_ip: old_cfg.wireguard.private_ip, announced_port: old_cfg.wireguard.announced_port, private_network_prefix: old_cfg.wireguard.private_network_prefix, - storage_paths: WireguardPaths { + storage_paths: WireguardPathsV4 { private_diffie_hellman_key_file: old_cfg .wireguard .storage_paths @@ -1063,12 +1041,12 @@ pub async fn try_upgrade_config_v3>( .public_diffie_hellman_key_file, }, }, - mixnode: MixnodeConfig { - storage_paths: MixnodePaths {}, - verloc: Verloc { + mixnode: MixnodeConfigV4 { + storage_paths: MixnodePathsV4 {}, + verloc: VerlocV4 { bind_address: old_cfg.mixnode.verloc.bind_address, announce_port: None, - debug: VerlocDebug { + debug: VerlocDebugV4 { packets_per_node: old_cfg.mixnode.verloc.debug.packets_per_node, connection_timeout: old_cfg.mixnode.verloc.debug.connection_timeout, packet_timeout: old_cfg.mixnode.verloc.debug.packet_timeout, @@ -1078,16 +1056,16 @@ pub async fn try_upgrade_config_v3>( retry_timeout: old_cfg.mixnode.verloc.debug.retry_timeout, }, }, - debug: mixnode::Debug { + debug: DebugV4 { node_stats_logging_delay: old_cfg.mixnode.debug.node_stats_logging_delay, node_stats_updating_delay: old_cfg.mixnode.debug.node_stats_updating_delay, }, }, - entry_gateway: EntryGatewayConfig { - storage_paths: EntryGatewayPaths { + entry_gateway: EntryGatewayConfigV4 { + storage_paths: EntryGatewayPathsV4 { clients_storage: old_cfg.entry_gateway.storage_paths.clients_storage, cosmos_mnemonic: old_cfg.entry_gateway.storage_paths.cosmos_mnemonic, - authenticator: AuthenticatorPaths { + authenticator: AuthenticatorPathsV4 { private_ed25519_identity_key_file: old_cfg .entry_gateway .storage_paths @@ -1129,16 +1107,16 @@ pub async fn try_upgrade_config_v3>( bind_address: old_cfg.entry_gateway.bind_address, announce_ws_port: old_cfg.entry_gateway.announce_ws_port, announce_wss_port: old_cfg.entry_gateway.announce_wss_port, - debug: EntryGatewayConfigDebug { + debug: EntryGatewayConfigDebugV4 { message_retrieval_limit: old_cfg.entry_gateway.debug.message_retrieval_limit, // \/ ADDED zk_nym_tickets: Default::default(), }, }, - exit_gateway: ExitGatewayConfig { - storage_paths: ExitGatewayPaths { + exit_gateway: ExitGatewayConfigV4 { + storage_paths: ExitGatewayPathsV4 { clients_storage: exit_gateway_paths.clients_storage, - network_requester: NetworkRequesterPaths { + network_requester: NetworkRequesterPathsV4 { private_ed25519_identity_key_file: old_cfg .exit_gateway .storage_paths @@ -1175,7 +1153,7 @@ pub async fn try_upgrade_config_v3>( .network_requester .gateway_registrations, }, - ip_packet_router: IpPacketRouterPaths { + ip_packet_router: IpPacketRouterPathsV4 { private_ed25519_identity_key_file: old_cfg .exit_gateway .storage_paths @@ -1212,7 +1190,7 @@ pub async fn try_upgrade_config_v3>( .ip_packet_router .gateway_registrations, }, - authenticator: AuthenticatorPaths { + authenticator: AuthenticatorPathsV4 { private_ed25519_identity_key_file: old_cfg .exit_gateway .storage_paths @@ -1252,8 +1230,8 @@ pub async fn try_upgrade_config_v3>( }, open_proxy: old_cfg.exit_gateway.open_proxy, upstream_exit_policy_url: old_cfg.exit_gateway.upstream_exit_policy_url, - network_requester: NetworkRequester { - debug: NetworkRequesterDebug { + network_requester: NetworkRequesterV4 { + debug: NetworkRequesterDebugV4 { enabled: old_cfg.exit_gateway.network_requester.debug.enabled, disable_poisson_rate: old_cfg .exit_gateway @@ -1263,8 +1241,8 @@ pub async fn try_upgrade_config_v3>( client_debug: old_cfg.exit_gateway.network_requester.debug.client_debug, }, }, - ip_packet_router: IpPacketRouter { - debug: IpPacketRouterDebug { + ip_packet_router: IpPacketRouterV4 { + debug: IpPacketRouterDebugV4 { enabled: old_cfg.exit_gateway.ip_packet_router.debug.enabled, disable_poisson_rate: old_cfg .exit_gateway @@ -1277,7 +1255,7 @@ pub async fn try_upgrade_config_v3>( debug: Default::default(), }, authenticator: Default::default(), - logging: LoggingSettings {}, + logging: LoggingSettingsV4 {}, }; Ok(cfg) diff --git a/nym-node/src/config/old_configs/old_config_v4.rs b/nym-node/src/config/old_configs/old_config_v4.rs new file mode 100644 index 0000000000..9bad844258 --- /dev/null +++ b/nym-node/src/config/old_configs/old_config_v4.rs @@ -0,0 +1,1422 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +#![allow(dead_code)] + +use crate::{config::*, error::KeyIOFailure}; +use entry_gateway::{Debug as EntryGatewayConfigDebug, ZkNymTicketHandlerDebug}; +use exit_gateway::{ + Debug as ExitGatewayConfigDebug, IpPacketRouter, IpPacketRouterDebug, NetworkRequester, + NetworkRequesterDebug, +}; +use mixnode::{Verloc, VerlocDebug}; +use nym_client_core_config_types::{ + disk_persistence::{ClientKeysPaths, CommonClientPaths}, + DebugConfig as ClientDebugConfig, +}; +use nym_config::{defaults::TICKETBOOK_VALIDITY_DAYS, serde_helpers::de_maybe_port}; +use nym_crypto::asymmetric::{ed25519, x25519}; +use nym_network_requester::{ + set_active_gateway, setup_fs_gateways_storage, store_gateway_details, CustomGatewayDetails, + GatewayDetails, +}; +use nym_pemstore::{store_key, store_keypair}; +use nym_sphinx_acknowledgements::AckKey; +use persistence::*; +use rand::rngs::OsRng; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct WireguardPathsV4 { + pub private_diffie_hellman_key_file: PathBuf, + pub public_diffie_hellman_key_file: PathBuf, +} + +impl WireguardPathsV4 { + pub fn new>(data_dir: P) -> Self { + let data_dir = data_dir.as_ref(); + WireguardPathsV4 { + private_diffie_hellman_key_file: data_dir + .join(persistence::DEFAULT_X25519_WG_DH_KEY_FILENAME), + public_diffie_hellman_key_file: data_dir + .join(persistence::DEFAULT_X25519_WG_PUBLIC_DH_KEY_FILENAME), + } + } + + pub fn x25519_wireguard_storage_paths(&self) -> nym_pemstore::KeyPairPath { + nym_pemstore::KeyPairPath::new( + &self.private_diffie_hellman_key_file, + &self.public_diffie_hellman_key_file, + ) + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct WireguardV4 { + /// Specifies whether the wireguard service is enabled on this node. + pub enabled: bool, + + /// Socket address this node will use for binding its wireguard interface. + /// default: `0.0.0.0:51822` + pub bind_address: SocketAddr, + + /// Ip address of the private wireguard network. + /// default: `10.1.0.0` + pub private_ip: IpAddr, + + /// Port announced to external clients wishing to connect to the wireguard interface. + /// Useful in the instances where the node is behind a proxy. + pub announced_port: u16, + + /// The prefix denoting the maximum number of the clients that can be connected via Wireguard. + /// The maximum value for IPv4 is 32 and for IPv6 is 128 + pub private_network_prefix: u8, + + /// Paths for wireguard keys, client registries, etc. + pub storage_paths: WireguardPathsV4, +} + +// a temporary solution until all "types" are run at the same time +#[derive(Debug, Default, Serialize, Deserialize, ValueEnum, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum NodeModeV4 { + #[default] + #[clap(alias = "mix")] + Mixnode, + + #[clap(alias = "entry", alias = "gateway")] + EntryGateway, + + #[clap(alias = "exit")] + ExitGateway, +} + +impl From for NodeMode { + fn from(config: NodeModeV4) -> Self { + match config { + NodeModeV4::Mixnode => NodeMode::Mixnode, + NodeModeV4::EntryGateway => NodeMode::EntryGateway, + NodeModeV4::ExitGateway => NodeMode::ExitGateway, + } + } +} + +// TODO: this is very much a WIP. we need proper ssl certificate support here +#[derive(Debug, Clone, Default, Deserialize, PartialEq, Serialize)] +#[serde(default)] +#[serde(deny_unknown_fields)] +pub struct HostV4 { + /// Ip address(es) of this host, such as 1.1.1.1 that external clients will use for connections. + /// If no values are provided, when this node gets included in the network, + /// its ip addresses will be populated by whatever value is resolved by associated nym-api. + pub public_ips: Vec, + + /// Optional hostname of this node, for example nymtech.net. + // TODO: this is temporary. to be replaced by pulling the data directly from the certs. + #[serde(deserialize_with = "de_maybe_stringified")] + pub hostname: Option, + + /// Optional ISO 3166 alpha-2 two-letter country code of the node's **physical** location + #[serde(deserialize_with = "de_maybe_stringified")] + pub location: Option, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +#[serde(default)] +#[serde(deny_unknown_fields)] +pub struct MixnetDebugV4 { + /// Initial value of an exponential backoff to reconnect to dropped TCP connection when + /// forwarding sphinx packets. + #[serde(with = "humantime_serde")] + pub packet_forwarding_initial_backoff: Duration, + + /// Maximum value of an exponential backoff to reconnect to dropped TCP connection when + /// forwarding sphinx packets. + #[serde(with = "humantime_serde")] + pub packet_forwarding_maximum_backoff: Duration, + + /// Timeout for establishing initial connection when trying to forward a sphinx packet. + #[serde(with = "humantime_serde")] + pub initial_connection_timeout: Duration, + + /// Maximum number of packets that can be stored waiting to get sent to a particular connection. + pub maximum_connection_buffer_size: usize, + + /// Specifies whether this node should **NOT** use noise protocol in the connections (currently not implemented) + pub unsafe_disable_noise: bool, +} + +impl MixnetDebugV4 { + const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_millis(10_000); + const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_millis(300_000); + const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500); + const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000; +} + +impl Default for MixnetDebugV4 { + fn default() -> Self { + MixnetDebugV4 { + packet_forwarding_initial_backoff: Self::DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF, + packet_forwarding_maximum_backoff: Self::DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF, + initial_connection_timeout: Self::DEFAULT_INITIAL_CONNECTION_TIMEOUT, + maximum_connection_buffer_size: Self::DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE, + // to be changed by @SW once the implementation is there + unsafe_disable_noise: true, + } + } +} + +impl Default for MixnetV4 { + fn default() -> Self { + // SAFETY: + // our hardcoded values should always be valid + #[allow(clippy::expect_used)] + // is if there's anything set in the environment, otherwise fallback to mainnet + let nym_api_urls = if let Ok(env_value) = env::var(var_names::NYM_API) { + parse_urls(&env_value) + } else { + vec![mainnet::NYM_API.parse().expect("Invalid default API URL")] + }; + + #[allow(clippy::expect_used)] + let nyxd_urls = if let Ok(env_value) = env::var(var_names::NYXD) { + parse_urls(&env_value) + } else { + vec![mainnet::NYXD_URL.parse().expect("Invalid default nyxd URL")] + }; + + MixnetV4 { + bind_address: SocketAddr::new(inaddr_any(), DEFAULT_MIXNET_PORT), + announce_port: None, + nym_api_urls, + nyxd_urls, + debug: Default::default(), + } + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +#[serde(default)] +#[serde(deny_unknown_fields)] +pub struct MixnetV4 { + /// Address this node will bind to for listening for mixnet packets + /// default: `0.0.0.0:1789` + pub bind_address: SocketAddr, + + /// If applicable, custom port announced in the self-described API that other clients and nodes + /// will use. + /// Useful when the node is behind a proxy. + #[serde(deserialize_with = "de_maybe_port")] + pub announce_port: Option, + + /// Addresses to nym APIs from which the node gets the view of the network. + pub nym_api_urls: Vec, + + /// Addresses to nyxd which the node uses to interact with the nyx chain. + pub nyxd_urls: Vec, + + #[serde(default)] + pub debug: MixnetDebugV4, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct KeysPathsV4 { + /// Path to file containing ed25519 identity private key. + pub private_ed25519_identity_key_file: PathBuf, + + /// Path to file containing ed25519 identity public key. + pub public_ed25519_identity_key_file: PathBuf, + + /// Path to file containing x25519 sphinx private key. + pub private_x25519_sphinx_key_file: PathBuf, + + /// Path to file containing x25519 sphinx public key. + pub public_x25519_sphinx_key_file: PathBuf, + + /// Path to file containing x25519 noise private key. + pub private_x25519_noise_key_file: PathBuf, + + /// Path to file containing x25519 noise public key. + pub public_x25519_noise_key_file: PathBuf, +} + +impl KeysPathsV4 { + pub fn new>(data_dir: P) -> Self { + let data_dir = data_dir.as_ref(); + + KeysPathsV4 { + private_ed25519_identity_key_file: data_dir + .join(DEFAULT_ED25519_PRIVATE_IDENTITY_KEY_FILENAME), + public_ed25519_identity_key_file: data_dir + .join(DEFAULT_ED25519_PUBLIC_IDENTITY_KEY_FILENAME), + private_x25519_sphinx_key_file: data_dir + .join(DEFAULT_X25519_PRIVATE_SPHINX_KEY_FILENAME), + public_x25519_sphinx_key_file: data_dir.join(DEFAULT_X25519_PUBLIC_SPHINX_KEY_FILENAME), + private_x25519_noise_key_file: data_dir.join(DEFAULT_X25519_PRIVATE_NOISE_KEY_FILENAME), + public_x25519_noise_key_file: data_dir.join(DEFAULT_X25519_PUBLIC_NOISE_KEY_FILENAME), + } + } + + pub fn ed25519_identity_storage_paths(&self) -> nym_pemstore::KeyPairPath { + nym_pemstore::KeyPairPath::new( + &self.private_ed25519_identity_key_file, + &self.public_ed25519_identity_key_file, + ) + } + + pub fn x25519_sphinx_storage_paths(&self) -> nym_pemstore::KeyPairPath { + nym_pemstore::KeyPairPath::new( + &self.private_x25519_sphinx_key_file, + &self.public_x25519_sphinx_key_file, + ) + } + + pub fn x25519_noise_storage_paths(&self) -> nym_pemstore::KeyPairPath { + nym_pemstore::KeyPairPath::new( + &self.private_x25519_noise_key_file, + &self.public_x25519_noise_key_file, + ) + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct NymNodePathsV4 { + pub keys: KeysPathsV4, + + /// Path to a file containing basic node description: human-readable name, website, details, etc. + pub description: PathBuf, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +#[serde(default)] +#[serde(deny_unknown_fields)] +pub struct HttpV4 { + /// Socket address this node will use for binding its http API. + /// default: `0.0.0.0:8080` + pub bind_address: SocketAddr, + + /// Path to assets directory of custom landing page of this node. + #[serde(deserialize_with = "de_maybe_stringified")] + pub landing_page_assets_path: Option, + + /// An optional bearer token for accessing certain http endpoints. + /// Currently only used for obtaining mixnode's stats. + #[serde(default)] + pub access_token: Option, + + /// Specify whether basic system information should be exposed. + /// default: true + pub expose_system_info: bool, + + /// Specify whether basic system hardware information should be exposed. + /// This option is superseded by `expose_system_info` + /// default: true + pub expose_system_hardware: bool, + + /// Specify whether detailed system crypto hardware information should be exposed. + /// This option is superseded by `expose_system_hardware` + /// default: true + pub expose_crypto_hardware: bool, +} + +impl Default for HttpV4 { + fn default() -> Self { + HttpV4 { + bind_address: SocketAddr::new(inaddr_any(), DEFAULT_HTTP_PORT), + landing_page_assets_path: None, + access_token: None, + expose_system_info: true, + expose_system_hardware: true, + expose_crypto_hardware: true, + } + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct MixnodePathsV4 {} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct DebugV4 { + /// Delay between each subsequent node statistics being logged to the console + #[serde(with = "humantime_serde")] + pub node_stats_logging_delay: Duration, + + /// Delay between each subsequent node statistics being updated + #[serde(with = "humantime_serde")] + pub node_stats_updating_delay: Duration, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct VerlocDebugV4 { + /// Specifies number of echo packets sent to each node during a measurement run. + pub packets_per_node: usize, + + /// Specifies maximum amount of time to wait for the connection to get established. + #[serde(with = "humantime_serde")] + pub connection_timeout: Duration, + + /// Specifies maximum amount of time to wait for the reply packet to arrive before abandoning the test. + #[serde(with = "humantime_serde")] + pub packet_timeout: Duration, + + /// Specifies delay between subsequent test packets being sent (after receiving a reply). + #[serde(with = "humantime_serde")] + pub delay_between_packets: Duration, + + /// Specifies number of nodes being tested at once. + pub tested_nodes_batch_size: usize, + + /// Specifies delay between subsequent test runs. + #[serde(with = "humantime_serde")] + pub testing_interval: Duration, + + /// Specifies delay between attempting to run the measurement again if the previous run failed + /// due to being unable to get the list of nodes. + #[serde(with = "humantime_serde")] + pub retry_timeout: Duration, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct VerlocV4 { + /// Socket address this node will use for binding its verloc API. + /// default: `0.0.0.0:1790` + pub bind_address: SocketAddr, + + #[serde(deserialize_with = "de_maybe_port")] + pub announce_port: Option, + + #[serde(default)] + pub debug: VerlocDebugV4, +} + +impl VerlocDebugV4 { + const DEFAULT_PACKETS_PER_NODE: usize = 100; + const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_millis(5000); + const DEFAULT_PACKET_TIMEOUT: Duration = Duration::from_millis(1500); + const DEFAULT_DELAY_BETWEEN_PACKETS: Duration = Duration::from_millis(50); + const DEFAULT_BATCH_SIZE: usize = 50; + const DEFAULT_TESTING_INTERVAL: Duration = Duration::from_secs(60 * 60 * 12); + const DEFAULT_RETRY_TIMEOUT: Duration = Duration::from_secs(60 * 30); +} + +impl Default for VerlocDebugV4 { + fn default() -> Self { + VerlocDebugV4 { + packets_per_node: Self::DEFAULT_PACKETS_PER_NODE, + connection_timeout: Self::DEFAULT_CONNECTION_TIMEOUT, + packet_timeout: Self::DEFAULT_PACKET_TIMEOUT, + delay_between_packets: Self::DEFAULT_DELAY_BETWEEN_PACKETS, + tested_nodes_batch_size: Self::DEFAULT_BATCH_SIZE, + testing_interval: Self::DEFAULT_TESTING_INTERVAL, + retry_timeout: Self::DEFAULT_RETRY_TIMEOUT, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct MixnodeConfigV4 { + pub storage_paths: MixnodePathsV4, + + pub verloc: VerlocV4, + + #[serde(default)] + pub debug: DebugV4, +} + +impl DebugV4 { + const DEFAULT_NODE_STATS_LOGGING_DELAY: Duration = Duration::from_millis(60_000); + const DEFAULT_NODE_STATS_UPDATING_DELAY: Duration = Duration::from_millis(30_000); +} + +impl Default for DebugV4 { + fn default() -> Self { + DebugV4 { + node_stats_logging_delay: Self::DEFAULT_NODE_STATS_LOGGING_DELAY, + node_stats_updating_delay: Self::DEFAULT_NODE_STATS_UPDATING_DELAY, + } + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct EntryGatewayPathsV4 { + /// Path to sqlite database containing all persistent data: messages for offline clients, + /// derived shared keys and available client bandwidths. + pub clients_storage: PathBuf, + + /// Path to file containing cosmos account mnemonic used for zk-nym redemption. + pub cosmos_mnemonic: PathBuf, + + pub authenticator: AuthenticatorPathsV4, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct ZkNymTicketHandlerDebugV4 { + /// Specifies the multiplier for revoking a malformed/double-spent ticket + /// (if it has to go all the way to the nym-api for verification) + /// e.g. if one ticket grants 100Mb and `revocation_bandwidth_penalty` is set to 1.5, + /// the client will lose 150Mb + pub revocation_bandwidth_penalty: f32, + + /// Specifies the interval for attempting to resolve any failed, pending operations, + /// such as ticket verification or redemption. + #[serde(with = "humantime_serde")] + pub pending_poller: Duration, + + pub minimum_api_quorum: f32, + + /// Specifies the minimum number of tickets this gateway will attempt to redeem. + pub minimum_redemption_tickets: usize, + + /// Specifies the maximum time between two subsequent tickets redemptions. + /// That's required as nym-apis will purge all ticket information for tickets older than maximum validity. + #[serde(with = "humantime_serde")] + pub maximum_time_between_redemption: Duration, +} + +impl ZkNymTicketHandlerDebugV4 { + pub const DEFAULT_REVOCATION_BANDWIDTH_PENALTY: f32 = 10.0; + pub const DEFAULT_PENDING_POLLER: Duration = Duration::from_secs(300); + pub const DEFAULT_MINIMUM_API_QUORUM: f32 = 0.8; + pub const DEFAULT_MINIMUM_REDEMPTION_TICKETS: usize = 100; + + // use min(4/5 of max validity, validity - 1), but making sure it's no greater than 1 day + // ASSUMPTION: our validity period is AT LEAST 2 days + // + // this could have been a constant, but it's more readable as a function + pub const fn default_maximum_time_between_redemption() -> Duration { + let desired_secs = TICKETBOOK_VALIDITY_DAYS * (86400 * 4) / 5; + let desired_secs_alt = (TICKETBOOK_VALIDITY_DAYS - 1) * 86400; + + // can't use `min` in const context + let target_secs = if desired_secs < desired_secs_alt { + desired_secs + } else { + desired_secs_alt + }; + + assert!( + target_secs > 86400, + "the maximum time between redemption can't be lower than 1 day!" + ); + Duration::from_secs(target_secs as u64) + } +} + +impl Default for ZkNymTicketHandlerDebugV4 { + fn default() -> Self { + ZkNymTicketHandlerDebugV4 { + revocation_bandwidth_penalty: Self::DEFAULT_REVOCATION_BANDWIDTH_PENALTY, + pending_poller: Self::DEFAULT_PENDING_POLLER, + minimum_api_quorum: Self::DEFAULT_MINIMUM_API_QUORUM, + minimum_redemption_tickets: Self::DEFAULT_MINIMUM_REDEMPTION_TICKETS, + maximum_time_between_redemption: Self::default_maximum_time_between_redemption(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct EntryGatewayConfigDebugV4 { + /// Number of messages from offline client that can be pulled at once (i.e. with a single SQL query) from the storage. + pub message_retrieval_limit: i64, + pub zk_nym_tickets: ZkNymTicketHandlerDebugV4, +} + +impl EntryGatewayConfigDebugV4 { + const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: i64 = 100; +} + +impl Default for EntryGatewayConfigDebugV4 { + fn default() -> Self { + EntryGatewayConfigDebugV4 { + message_retrieval_limit: Self::DEFAULT_MESSAGE_RETRIEVAL_LIMIT, + zk_nym_tickets: Default::default(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct EntryGatewayConfigV4 { + pub storage_paths: EntryGatewayPathsV4, + + /// Indicates whether this gateway is accepting only coconut credentials for accessing the mixnet + /// or if it also accepts non-paying clients + pub enforce_zk_nyms: bool, + + /// Socket address this node will use for binding its client websocket API. + /// default: `0.0.0.0:9000` + pub bind_address: SocketAddr, + + /// Custom announced port for listening for websocket client traffic. + /// If unspecified, the value from the `bind_address` will be used instead + /// default: None + #[serde(deserialize_with = "de_maybe_port")] + pub announce_ws_port: Option, + + /// If applicable, announced port for listening for secure websocket client traffic. + /// (default: None) + #[serde(deserialize_with = "de_maybe_port")] + pub announce_wss_port: Option, + + #[serde(default)] + pub debug: EntryGatewayConfigDebugV4, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct NetworkRequesterPathsV4 { + /// Path to file containing network requester ed25519 identity private key. + pub private_ed25519_identity_key_file: PathBuf, + + /// Path to file containing network requester ed25519 identity public key. + pub public_ed25519_identity_key_file: PathBuf, + + /// Path to file containing network requester x25519 diffie hellman private key. + pub private_x25519_diffie_hellman_key_file: PathBuf, + + /// Path to file containing network requester x25519 diffie hellman public key. + pub public_x25519_diffie_hellman_key_file: PathBuf, + + /// Path to file containing key used for encrypting and decrypting the content of an + /// acknowledgement so that nobody besides the client knows which packet it refers to. + pub ack_key_file: PathBuf, + + /// Path to the persistent store for received reply surbs, unused encryption keys and used sender tags. + pub reply_surb_database: PathBuf, + + /// Normally this is a path to the file containing information about gateways used by this client, + /// i.e. details such as their public keys, owner addresses or the network information. + /// but in this case it just has the basic information of "we're using custom gateway". + /// Due to how clients are started up, this file has to exist. + pub gateway_registrations: PathBuf, + // it's possible we might have to add credential storage here for return tickets +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct IpPacketRouterPathsV4 { + /// Path to file containing ip packet router ed25519 identity private key. + pub private_ed25519_identity_key_file: PathBuf, + + /// Path to file containing ip packet router ed25519 identity public key. + pub public_ed25519_identity_key_file: PathBuf, + + /// Path to file containing ip packet router x25519 diffie hellman private key. + pub private_x25519_diffie_hellman_key_file: PathBuf, + + /// Path to file containing ip packet router x25519 diffie hellman public key. + pub public_x25519_diffie_hellman_key_file: PathBuf, + + /// Path to file containing key used for encrypting and decrypting the content of an + /// acknowledgement so that nobody besides the client knows which packet it refers to. + pub ack_key_file: PathBuf, + + /// Path to the persistent store for received reply surbs, unused encryption keys and used sender tags. + pub reply_surb_database: PathBuf, + + /// Normally this is a path to the file containing information about gateways used by this client, + /// i.e. details such as their public keys, owner addresses or the network information. + /// but in this case it just has the basic information of "we're using custom gateway". + /// Due to how clients are started up, this file has to exist. + pub gateway_registrations: PathBuf, + // it's possible we might have to add credential storage here for return tickets +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct AuthenticatorPathsV4 { + /// Path to file containing authenticator ed25519 identity private key. + pub private_ed25519_identity_key_file: PathBuf, + + /// Path to file containing authenticator ed25519 identity public key. + pub public_ed25519_identity_key_file: PathBuf, + + /// Path to file containing authenticator x25519 diffie hellman private key. + pub private_x25519_diffie_hellman_key_file: PathBuf, + + /// Path to file containing authenticator x25519 diffie hellman public key. + pub public_x25519_diffie_hellman_key_file: PathBuf, + + /// Path to file containing key used for encrypting and decrypting the content of an + /// acknowledgement so that nobody besides the client knows which packet it refers to. + pub ack_key_file: PathBuf, + + /// Path to the persistent store for received reply surbs, unused encryption keys and used sender tags. + pub reply_surb_database: PathBuf, + + /// Normally this is a path to the file containing information about gateways used by this client, + /// i.e. details such as their public keys, owner addresses or the network information. + /// but in this case it just has the basic information of "we're using custom gateway". + /// Due to how clients are started up, this file has to exist. + pub gateway_registrations: PathBuf, + // it's possible we might have to add credential storage here for return tickets +} + +impl AuthenticatorPathsV4 { + pub fn new>(data_dir: P) -> Self { + let data_dir = data_dir.as_ref(); + AuthenticatorPathsV4 { + private_ed25519_identity_key_file: data_dir + .join(DEFAULT_ED25519_AUTH_PRIVATE_IDENTITY_KEY_FILENAME), + public_ed25519_identity_key_file: data_dir + .join(DEFAULT_ED25519_AUTH_PUBLIC_IDENTITY_KEY_FILENAME), + private_x25519_diffie_hellman_key_file: data_dir + .join(DEFAULT_X25519_AUTH_PRIVATE_DH_KEY_FILENAME), + public_x25519_diffie_hellman_key_file: data_dir + .join(DEFAULT_X25519_AUTH_PUBLIC_DH_KEY_FILENAME), + ack_key_file: data_dir.join(DEFAULT_AUTH_ACK_KEY_FILENAME), + reply_surb_database: data_dir.join(DEFAULT_AUTH_REPLY_SURB_DB_FILENAME), + gateway_registrations: data_dir.join(DEFAULT_AUTH_GATEWAYS_DB_FILENAME), + } + } + + pub fn to_common_client_paths(&self) -> CommonClientPaths { + CommonClientPaths { + keys: ClientKeysPaths { + private_identity_key_file: self.private_ed25519_identity_key_file.clone(), + public_identity_key_file: self.public_ed25519_identity_key_file.clone(), + private_encryption_key_file: self.private_x25519_diffie_hellman_key_file.clone(), + public_encryption_key_file: self.public_x25519_diffie_hellman_key_file.clone(), + ack_key_file: self.ack_key_file.clone(), + }, + gateway_registrations: self.gateway_registrations.clone(), + + // not needed for embedded providers + credentials_database: Default::default(), + reply_surb_database: self.reply_surb_database.clone(), + } + } + + pub fn ed25519_identity_storage_paths(&self) -> nym_pemstore::KeyPairPath { + nym_pemstore::KeyPairPath::new( + &self.private_ed25519_identity_key_file, + &self.public_ed25519_identity_key_file, + ) + } + + pub fn x25519_diffie_hellman_storage_paths(&self) -> nym_pemstore::KeyPairPath { + nym_pemstore::KeyPairPath::new( + &self.private_x25519_diffie_hellman_key_file, + &self.public_x25519_diffie_hellman_key_file, + ) + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ExitGatewayPathsV4 { + pub clients_storage: PathBuf, + + pub network_requester: NetworkRequesterPathsV4, + + pub ip_packet_router: IpPacketRouterPathsV4, + + pub authenticator: AuthenticatorPathsV4, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +pub struct AuthenticatorV4 { + #[serde(default)] + pub debug: AuthenticatorDebugV4, +} + +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] +#[serde(default)] +pub struct AuthenticatorDebugV4 { + /// Specifies whether authenticator service is enabled in this process. + /// This is only here for debugging purposes as exit gateway should always run + /// the authenticator. + pub enabled: bool, + + /// Disable Poisson sending rate. + /// This is equivalent to setting client_debug.traffic.disable_main_poisson_packet_distribution = true + /// (or is it (?)) + pub disable_poisson_rate: bool, + + /// Shared detailed client configuration options + #[serde(flatten)] + pub client_debug: ClientDebugConfig, +} + +impl Default for AuthenticatorDebugV4 { + fn default() -> Self { + AuthenticatorDebugV4 { + enabled: true, + disable_poisson_rate: true, + client_debug: Default::default(), + } + } +} + +#[allow(clippy::derivable_impls)] +impl Default for AuthenticatorV4 { + fn default() -> Self { + AuthenticatorV4 { + debug: Default::default(), + } + } +} + +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] +#[serde(default)] +pub struct IpPacketRouterDebugV4 { + /// Specifies whether ip packet routing service is enabled in this process. + /// This is only here for debugging purposes as exit gateway should always run **both** + /// network requester and an ip packet router. + pub enabled: bool, + + /// Disable Poisson sending rate. + /// This is equivalent to setting client_debug.traffic.disable_main_poisson_packet_distribution = true + /// (or is it (?)) + pub disable_poisson_rate: bool, + + /// Shared detailed client configuration options + #[serde(flatten)] + pub client_debug: ClientDebugConfig, +} + +impl Default for IpPacketRouterDebugV4 { + fn default() -> Self { + IpPacketRouterDebugV4 { + enabled: true, + disable_poisson_rate: true, + client_debug: Default::default(), + } + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +pub struct IpPacketRouterV4 { + #[serde(default)] + pub debug: IpPacketRouterDebugV4, +} + +#[allow(clippy::derivable_impls)] +impl Default for IpPacketRouterV4 { + fn default() -> Self { + IpPacketRouterV4 { + debug: Default::default(), + } + } +} + +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] +pub struct NetworkRequesterDebugV4 { + /// Specifies whether network requester service is enabled in this process. + /// This is only here for debugging purposes as exit gateway should always run **both** + /// network requester and an ip packet router. + pub enabled: bool, + + /// Disable Poisson sending rate. + /// This is equivalent to setting client_debug.traffic.disable_main_poisson_packet_distribution = true + /// (or is it (?)) + pub disable_poisson_rate: bool, + + /// Shared detailed client configuration options + #[serde(flatten)] + pub client_debug: ClientDebugConfig, +} + +impl Default for NetworkRequesterDebugV4 { + fn default() -> Self { + NetworkRequesterDebugV4 { + enabled: true, + disable_poisson_rate: true, + client_debug: Default::default(), + } + } +} + +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] +pub struct NetworkRequesterV4 { + #[serde(default)] + pub debug: NetworkRequesterDebugV4, +} + +#[allow(clippy::derivable_impls)] +impl Default for NetworkRequesterV4 { + fn default() -> Self { + NetworkRequesterV4 { + debug: Default::default(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ExitGatewayDebugV4 { + /// Number of messages from offline client that can be pulled at once (i.e. with a single SQL query) from the storage. + pub message_retrieval_limit: i64, +} + +impl ExitGatewayDebugV4 { + const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: i64 = 100; +} + +impl Default for ExitGatewayDebugV4 { + fn default() -> Self { + ExitGatewayDebugV4 { + message_retrieval_limit: Self::DEFAULT_MESSAGE_RETRIEVAL_LIMIT, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ExitGatewayConfigV4 { + pub storage_paths: ExitGatewayPathsV4, + + /// specifies whether this exit node should run in 'open-proxy' mode + /// and thus would attempt to resolve **ANY** request it receives. + pub open_proxy: bool, + + /// Specifies the url for an upstream source of the exit policy used by this node. + pub upstream_exit_policy_url: Url, + + pub network_requester: NetworkRequesterV4, + + pub ip_packet_router: IpPacketRouterV4, + + #[serde(default)] + pub debug: ExitGatewayDebugV4, +} + +#[derive(Debug, Default, Copy, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct LoggingSettingsV4 { + // well, we need to implement something here at some point... +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ConfigV4 { + // additional metadata holding on-disk location of this config file + #[serde(skip)] + pub(crate) save_path: Option, + + /// Human-readable ID of this particular node. + pub id: String, + + /// Current mode of this nym-node. + /// Expect this field to be changed in the future to allow running the node in multiple modes (i.e. mixnode + gateway) + pub mode: NodeModeV4, + + pub host: HostV4, + + pub mixnet: MixnetV4, + + /// Storage paths to persistent nym-node data, such as its long term keys. + pub storage_paths: NymNodePathsV4, + + #[serde(default)] + pub http: HttpV4, + + pub wireguard: WireguardV4, + + pub mixnode: MixnodeConfigV4, + + pub entry_gateway: EntryGatewayConfigV4, + + pub exit_gateway: ExitGatewayConfigV4, + + pub authenticator: AuthenticatorV4, + + #[serde(default)] + pub logging: LoggingSettingsV4, +} + +impl NymConfigTemplate for ConfigV4 { + fn template(&self) -> &'static str { + CONFIG_TEMPLATE + } +} + +impl ConfigV4 { + pub fn save(&self) -> Result<(), NymNodeError> { + let save_location = self.save_location(); + debug!( + "attempting to save config file to '{}'", + save_location.display() + ); + save_formatted_config_to_file(self, &save_location).map_err(|source| { + NymNodeError::ConfigSaveFailure { + id: self.id.clone(), + path: save_location, + source, + } + }) + } + + pub fn save_location(&self) -> PathBuf { + self.save_path + .clone() + .unwrap_or(self.default_save_location()) + } + + pub fn default_save_location(&self) -> PathBuf { + default_config_filepath(&self.id) + } + + pub fn default_data_directory>(config_path: P) -> Result { + let config_path = config_path.as_ref(); + + // we got a proper path to the .toml file + let Some(config_dir) = config_path.parent() else { + error!( + "'{}' does not have a parent directory. Have you pointed to the fs root?", + config_path.display() + ); + return Err(NymNodeError::DataDirDerivationFailure); + }; + + let Some(config_dir_name) = config_dir.file_name() else { + error!( + "could not obtain parent directory name of '{}'. Have you used relative paths?", + config_path.display() + ); + return Err(NymNodeError::DataDirDerivationFailure); + }; + + if config_dir_name != DEFAULT_CONFIG_DIR { + error!( + "the parent directory of '{}' ({}) is not {DEFAULT_CONFIG_DIR}. currently this is not supported", + config_path.display(), config_dir_name.to_str().unwrap_or("UNKNOWN") + ); + return Err(NymNodeError::DataDirDerivationFailure); + } + + let Some(node_dir) = config_dir.parent() else { + error!( + "'{}' does not have a parent directory. Have you pointed to the fs root?", + config_dir.display() + ); + return Err(NymNodeError::DataDirDerivationFailure); + }; + + Ok(node_dir.join(DEFAULT_DATA_DIR)) + } + + // simple wrapper that reads config file and assigns path location + fn read_from_path>(path: P) -> Result { + let path = path.as_ref(); + let mut loaded: ConfigV4 = + read_config_from_toml_file(path).map_err(|source| NymNodeError::ConfigLoadFailure { + path: path.to_path_buf(), + source, + })?; + loaded.save_path = Some(path.to_path_buf()); + debug!("loaded config file from {}", path.display()); + Ok(loaded) + } + + pub fn read_from_toml_file>(path: P) -> Result { + Self::read_from_path(path) + } +} + +pub async fn initialise( + paths: &AuthenticatorPaths, + public_key: nym_crypto::asymmetric::identity::PublicKey, +) -> Result<(), NymNodeError> { + let mut rng = OsRng; + let ed25519_keys = ed25519::KeyPair::new(&mut rng); + let x25519_keys = x25519::KeyPair::new(&mut rng); + let aes128ctr_key = AckKey::new(&mut rng); + let gateway_details = GatewayDetails::Custom(CustomGatewayDetails::new(public_key)).into(); + + store_keypair(&ed25519_keys, &paths.ed25519_identity_storage_paths()).map_err(|e| { + KeyIOFailure::KeyPairStoreFailure { + keys: "ed25519-identity".to_string(), + paths: paths.ed25519_identity_storage_paths(), + err: e, + } + })?; + store_keypair(&x25519_keys, &paths.x25519_diffie_hellman_storage_paths()).map_err(|e| { + KeyIOFailure::KeyPairStoreFailure { + keys: "x25519-dh".to_string(), + paths: paths.x25519_diffie_hellman_storage_paths(), + err: e, + } + })?; + store_key(&aes128ctr_key, &paths.ack_key_file).map_err(|e| KeyIOFailure::KeyStoreFailure { + key: "ack".to_string(), + path: paths.ack_key_file.clone(), + err: e, + })?; + + // insert all required information into the gateways store + // (I hate that we have to do it, but that's currently the simplest thing to do) + let storage = setup_fs_gateways_storage(&paths.gateway_registrations).await?; + store_gateway_details(&storage, &gateway_details).await?; + set_active_gateway(&storage, &gateway_details.gateway_id().to_base58_string()).await?; + + Ok(()) +} + +pub async fn try_upgrade_config_v4>( + path: P, + prev_config: Option, +) -> Result { + tracing::debug!("Updating from 1.1.4"); + let old_cfg = if let Some(prev_config) = prev_config { + prev_config + } else { + ConfigV4::read_from_path(&path)? + }; + + let exit_gateway_paths = ExitGatewayPaths::new( + old_cfg + .exit_gateway + .storage_paths + .clients_storage + .parent() + .ok_or(NymNodeError::DataDirDerivationFailure)?, + ); + + let entry_gateway_paths = EntryGatewayPaths::new( + old_cfg + .entry_gateway + .storage_paths + .clients_storage + .parent() + .ok_or(NymNodeError::DataDirDerivationFailure)?, + ); + + let cfg = Config { + save_path: old_cfg.save_path, + id: old_cfg.id, + mode: old_cfg.mode.into(), + host: Host { + public_ips: old_cfg.host.public_ips, + hostname: old_cfg.host.hostname, + location: old_cfg.host.location, + }, + mixnet: Mixnet { + bind_address: old_cfg.mixnet.bind_address, + announce_port: old_cfg.mixnet.announce_port, + nym_api_urls: old_cfg.mixnet.nym_api_urls, + nyxd_urls: old_cfg.mixnet.nyxd_urls, + debug: MixnetDebug { + packet_forwarding_initial_backoff: old_cfg + .mixnet + .debug + .packet_forwarding_initial_backoff, + packet_forwarding_maximum_backoff: old_cfg + .mixnet + .debug + .packet_forwarding_maximum_backoff, + initial_connection_timeout: old_cfg.mixnet.debug.initial_connection_timeout, + maximum_connection_buffer_size: old_cfg.mixnet.debug.maximum_connection_buffer_size, + unsafe_disable_noise: old_cfg.mixnet.debug.unsafe_disable_noise, + }, + }, + storage_paths: NymNodePaths { + keys: KeysPaths { + private_ed25519_identity_key_file: old_cfg + .storage_paths + .keys + .private_ed25519_identity_key_file, + public_ed25519_identity_key_file: old_cfg + .storage_paths + .keys + .public_ed25519_identity_key_file, + private_x25519_sphinx_key_file: old_cfg + .storage_paths + .keys + .private_x25519_sphinx_key_file, + public_x25519_sphinx_key_file: old_cfg + .storage_paths + .keys + .public_x25519_sphinx_key_file, + private_x25519_noise_key_file: old_cfg + .storage_paths + .keys + .private_x25519_noise_key_file, + public_x25519_noise_key_file: old_cfg + .storage_paths + .keys + .public_x25519_noise_key_file, + }, + description: old_cfg.storage_paths.description, + }, + http: Http { + bind_address: old_cfg.http.bind_address, + landing_page_assets_path: old_cfg.http.landing_page_assets_path, + access_token: old_cfg.http.access_token, + expose_system_info: old_cfg.http.expose_system_info, + expose_system_hardware: old_cfg.http.expose_system_hardware, + expose_crypto_hardware: old_cfg.http.expose_crypto_hardware, + }, + wireguard: Wireguard { + enabled: old_cfg.wireguard.enabled, + bind_address: old_cfg.wireguard.bind_address, + private_ip: old_cfg.wireguard.private_ip, + announced_port: old_cfg.wireguard.announced_port, + private_network_prefix: old_cfg.wireguard.private_network_prefix, + storage_paths: WireguardPaths { + private_diffie_hellman_key_file: old_cfg + .wireguard + .storage_paths + .private_diffie_hellman_key_file, + public_diffie_hellman_key_file: old_cfg + .wireguard + .storage_paths + .public_diffie_hellman_key_file, + }, + }, + mixnode: MixnodeConfig { + storage_paths: MixnodePaths {}, + verloc: Verloc { + bind_address: old_cfg.mixnode.verloc.bind_address, + announce_port: old_cfg.mixnode.verloc.announce_port, + debug: VerlocDebug { + packets_per_node: old_cfg.mixnode.verloc.debug.packets_per_node, + connection_timeout: old_cfg.mixnode.verloc.debug.connection_timeout, + packet_timeout: old_cfg.mixnode.verloc.debug.packet_timeout, + delay_between_packets: old_cfg.mixnode.verloc.debug.delay_between_packets, + tested_nodes_batch_size: old_cfg.mixnode.verloc.debug.tested_nodes_batch_size, + testing_interval: old_cfg.mixnode.verloc.debug.testing_interval, + retry_timeout: old_cfg.mixnode.verloc.debug.retry_timeout, + }, + }, + debug: mixnode::Debug { + node_stats_logging_delay: old_cfg.mixnode.debug.node_stats_logging_delay, + node_stats_updating_delay: old_cfg.mixnode.debug.node_stats_updating_delay, + }, + }, + entry_gateway: EntryGatewayConfig { + storage_paths: EntryGatewayPaths { + clients_storage: old_cfg.entry_gateway.storage_paths.clients_storage, + stats_storage: entry_gateway_paths.stats_storage, + cosmos_mnemonic: old_cfg.entry_gateway.storage_paths.cosmos_mnemonic, + authenticator: AuthenticatorPaths { + private_ed25519_identity_key_file: old_cfg + .entry_gateway + .storage_paths + .authenticator + .private_ed25519_identity_key_file, + public_ed25519_identity_key_file: old_cfg + .entry_gateway + .storage_paths + .authenticator + .public_ed25519_identity_key_file, + private_x25519_diffie_hellman_key_file: old_cfg + .entry_gateway + .storage_paths + .authenticator + .private_x25519_diffie_hellman_key_file, + public_x25519_diffie_hellman_key_file: old_cfg + .entry_gateway + .storage_paths + .authenticator + .public_x25519_diffie_hellman_key_file, + ack_key_file: old_cfg + .entry_gateway + .storage_paths + .authenticator + .ack_key_file, + reply_surb_database: old_cfg + .entry_gateway + .storage_paths + .authenticator + .reply_surb_database, + gateway_registrations: old_cfg + .entry_gateway + .storage_paths + .authenticator + .gateway_registrations, + }, + }, + enforce_zk_nyms: old_cfg.entry_gateway.enforce_zk_nyms, + bind_address: old_cfg.entry_gateway.bind_address, + announce_ws_port: old_cfg.entry_gateway.announce_ws_port, + announce_wss_port: old_cfg.entry_gateway.announce_wss_port, + debug: EntryGatewayConfigDebug { + message_retrieval_limit: old_cfg.entry_gateway.debug.message_retrieval_limit, + zk_nym_tickets: ZkNymTicketHandlerDebug { + revocation_bandwidth_penalty: old_cfg + .entry_gateway + .debug + .zk_nym_tickets + .revocation_bandwidth_penalty, + pending_poller: old_cfg.entry_gateway.debug.zk_nym_tickets.pending_poller, + minimum_api_quorum: old_cfg + .entry_gateway + .debug + .zk_nym_tickets + .minimum_api_quorum, + minimum_redemption_tickets: old_cfg + .entry_gateway + .debug + .zk_nym_tickets + .minimum_redemption_tickets, + maximum_time_between_redemption: old_cfg + .entry_gateway + .debug + .zk_nym_tickets + .maximum_time_between_redemption, + }, + }, + }, + exit_gateway: ExitGatewayConfig { + storage_paths: ExitGatewayPaths { + clients_storage: old_cfg.exit_gateway.storage_paths.clients_storage, + stats_storage: exit_gateway_paths.stats_storage, + network_requester: NetworkRequesterPaths { + private_ed25519_identity_key_file: old_cfg + .exit_gateway + .storage_paths + .network_requester + .private_ed25519_identity_key_file, + public_ed25519_identity_key_file: old_cfg + .exit_gateway + .storage_paths + .network_requester + .public_ed25519_identity_key_file, + private_x25519_diffie_hellman_key_file: old_cfg + .exit_gateway + .storage_paths + .network_requester + .private_x25519_diffie_hellman_key_file, + public_x25519_diffie_hellman_key_file: old_cfg + .exit_gateway + .storage_paths + .network_requester + .public_x25519_diffie_hellman_key_file, + ack_key_file: old_cfg + .exit_gateway + .storage_paths + .network_requester + .ack_key_file, + reply_surb_database: old_cfg + .exit_gateway + .storage_paths + .network_requester + .reply_surb_database, + gateway_registrations: old_cfg + .exit_gateway + .storage_paths + .network_requester + .gateway_registrations, + }, + ip_packet_router: IpPacketRouterPaths { + private_ed25519_identity_key_file: old_cfg + .exit_gateway + .storage_paths + .ip_packet_router + .private_ed25519_identity_key_file, + public_ed25519_identity_key_file: old_cfg + .exit_gateway + .storage_paths + .ip_packet_router + .public_ed25519_identity_key_file, + private_x25519_diffie_hellman_key_file: old_cfg + .exit_gateway + .storage_paths + .ip_packet_router + .private_x25519_diffie_hellman_key_file, + public_x25519_diffie_hellman_key_file: old_cfg + .exit_gateway + .storage_paths + .ip_packet_router + .public_x25519_diffie_hellman_key_file, + ack_key_file: old_cfg + .exit_gateway + .storage_paths + .ip_packet_router + .ack_key_file, + reply_surb_database: old_cfg + .exit_gateway + .storage_paths + .ip_packet_router + .reply_surb_database, + gateway_registrations: old_cfg + .exit_gateway + .storage_paths + .ip_packet_router + .gateway_registrations, + }, + authenticator: AuthenticatorPaths { + private_ed25519_identity_key_file: old_cfg + .exit_gateway + .storage_paths + .authenticator + .private_ed25519_identity_key_file, + public_ed25519_identity_key_file: old_cfg + .exit_gateway + .storage_paths + .authenticator + .public_ed25519_identity_key_file, + private_x25519_diffie_hellman_key_file: old_cfg + .exit_gateway + .storage_paths + .authenticator + .private_x25519_diffie_hellman_key_file, + public_x25519_diffie_hellman_key_file: old_cfg + .exit_gateway + .storage_paths + .authenticator + .public_x25519_diffie_hellman_key_file, + ack_key_file: old_cfg + .exit_gateway + .storage_paths + .authenticator + .ack_key_file, + reply_surb_database: old_cfg + .exit_gateway + .storage_paths + .authenticator + .reply_surb_database, + gateway_registrations: old_cfg + .exit_gateway + .storage_paths + .authenticator + .gateway_registrations, + }, + }, + open_proxy: old_cfg.exit_gateway.open_proxy, + upstream_exit_policy_url: old_cfg.exit_gateway.upstream_exit_policy_url, + network_requester: NetworkRequester { + debug: NetworkRequesterDebug { + enabled: old_cfg.exit_gateway.network_requester.debug.enabled, + disable_poisson_rate: old_cfg + .exit_gateway + .network_requester + .debug + .disable_poisson_rate, + client_debug: old_cfg.exit_gateway.network_requester.debug.client_debug, + }, + }, + ip_packet_router: IpPacketRouter { + debug: IpPacketRouterDebug { + enabled: old_cfg.exit_gateway.ip_packet_router.debug.enabled, + disable_poisson_rate: old_cfg + .exit_gateway + .ip_packet_router + .debug + .disable_poisson_rate, + client_debug: old_cfg.exit_gateway.ip_packet_router.debug.client_debug, + }, + }, + debug: ExitGatewayConfigDebug { + message_retrieval_limit: old_cfg.exit_gateway.debug.message_retrieval_limit, + }, + }, + authenticator: Default::default(), + logging: LoggingSettings {}, + }; + + Ok(cfg) +} diff --git a/nym-node/src/config/persistence.rs b/nym-node/src/config/persistence.rs index 3ae8595fe8..4106156aff 100644 --- a/nym-node/src/config/persistence.rs +++ b/nym-node/src/config/persistence.rs @@ -24,6 +24,7 @@ pub const DEFAULT_NYMNODE_DESCRIPTION_FILENAME: &str = "description.toml"; // Entry Gateway: pub const DEFAULT_CLIENTS_STORAGE_FILENAME: &str = "clients.sqlite"; +pub const DEFAULT_STATS_STORAGE_FILENAME: &str = "stats.sqlite"; pub const DEFAULT_MNEMONIC_FILENAME: &str = "cosmos_mnemonic"; // Exit Gateway: @@ -147,6 +148,9 @@ pub struct EntryGatewayPaths { /// derived shared keys, available client bandwidths and wireguard peers. pub clients_storage: PathBuf, + /// Path to sqlite database containing all persistent stats data. + pub stats_storage: PathBuf, + /// Path to file containing cosmos account mnemonic used for zk-nym redemption. pub cosmos_mnemonic: PathBuf, @@ -157,6 +161,7 @@ impl EntryGatewayPaths { pub fn new>(data_dir: P) -> Self { EntryGatewayPaths { clients_storage: data_dir.as_ref().join(DEFAULT_CLIENTS_STORAGE_FILENAME), + stats_storage: data_dir.as_ref().join(DEFAULT_STATS_STORAGE_FILENAME), cosmos_mnemonic: data_dir.as_ref().join(DEFAULT_MNEMONIC_FILENAME), authenticator: AuthenticatorPaths::new(data_dir), } @@ -207,6 +212,9 @@ pub struct ExitGatewayPaths { /// derived shared keys, available client bandwidths and wireguard peers. pub clients_storage: PathBuf, + /// Path to sqlite database containing all persistent stats data. + pub stats_storage: PathBuf, + pub network_requester: NetworkRequesterPaths, pub ip_packet_router: IpPacketRouterPaths, @@ -459,6 +467,7 @@ impl ExitGatewayPaths { let data_dir = data_dir.as_ref(); ExitGatewayPaths { clients_storage: data_dir.join(DEFAULT_CLIENTS_STORAGE_FILENAME), + stats_storage: data_dir.join(DEFAULT_STATS_STORAGE_FILENAME), network_requester: NetworkRequesterPaths::new(data_dir), ip_packet_router: IpPacketRouterPaths::new(data_dir), authenticator: AuthenticatorPaths::new(data_dir), diff --git a/nym-node/src/config/upgrade_helpers.rs b/nym-node/src/config/upgrade_helpers.rs index 4645b1a285..29352ec525 100644 --- a/nym-node/src/config/upgrade_helpers.rs +++ b/nym-node/src/config/upgrade_helpers.rs @@ -10,7 +10,8 @@ use std::path::Path; async fn try_upgrade_config(path: &Path) -> Result<(), NymNodeError> { let cfg = try_upgrade_config_v1(path, None).await.ok(); let cfg = try_upgrade_config_v2(path, cfg).await.ok(); - match try_upgrade_config_v3(path, cfg).await { + let cfg = try_upgrade_config_v3(path, cfg).await.ok(); + match try_upgrade_config_v4(path, cfg).await { Ok(cfg) => cfg.save(), Err(e) => { tracing::error!("Failed to finish upgrade - {e}"); diff --git a/nym-node/src/node/mod.rs b/nym-node/src/node/mod.rs index 5a7bc0c419..4b455cf6a5 100644 --- a/nym-node/src/node/mod.rs +++ b/nym-node/src/node/mod.rs @@ -67,6 +67,7 @@ impl MixnodeData { pub struct EntryGatewayData { mnemonic: Zeroizing, client_storage: nym_gateway::node::PersistentStorage, + stats_storage: nym_gateway::node::PersistentStatsStorage, sessions_stats: SharedSessionStats, } @@ -94,6 +95,11 @@ impl EntryGatewayData { ) .await .map_err(nym_gateway::GatewayError::from)?, + stats_storage: nym_gateway::node::PersistentStatsStorage::init( + &config.storage_paths.stats_storage, + ) + .await + .map_err(nym_gateway::GatewayError::from)?, sessions_stats: SharedSessionStats::new(), }) } @@ -114,6 +120,7 @@ pub struct ExitGatewayData { auth_x25519: x25519::PublicKey, client_storage: nym_gateway::node::PersistentStorage, + stats_storage: nym_gateway::node::PersistentStatsStorage, } impl ExitGatewayData { @@ -262,6 +269,11 @@ impl ExitGatewayData { .await .map_err(nym_gateway::GatewayError::from)?; + let stats_storage = + nym_gateway::node::PersistentStatsStorage::init(&config.storage_paths.stats_storage) + .await + .map_err(nym_gateway::GatewayError::from)?; + Ok(ExitGatewayData { nr_ed25519, nr_x25519, @@ -270,6 +282,7 @@ impl ExitGatewayData { auth_ed25519, auth_x25519, client_storage, + stats_storage, }) } } @@ -580,6 +593,7 @@ impl NymNode { self.ed25519_identity_keys.clone(), self.x25519_sphinx_keys.clone(), self.entry_gateway.client_storage.clone(), + self.entry_gateway.stats_storage.clone(), ); entry_gateway.disable_http_server(); entry_gateway.set_task_client(task_client); @@ -610,6 +624,7 @@ impl NymNode { self.ed25519_identity_keys.clone(), self.x25519_sphinx_keys.clone(), self.exit_gateway.client_storage.clone(), + self.exit_gateway.stats_storage.clone(), ); exit_gateway.disable_http_server(); exit_gateway.set_task_client(task_client); From 712455914c5aeaca926959de8834e923668530c4 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 15:55:08 +0200 Subject: [PATCH 02/14] config fix --- nym-node/src/config/old_configs/old_config_v4.rs | 2 +- nym-node/src/config/template.rs | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/nym-node/src/config/old_configs/old_config_v4.rs b/nym-node/src/config/old_configs/old_config_v4.rs index 9bad844258..95fd2714ec 100644 --- a/nym-node/src/config/old_configs/old_config_v4.rs +++ b/nym-node/src/config/old_configs/old_config_v4.rs @@ -1069,7 +1069,7 @@ pub async fn try_upgrade_config_v4>( path: P, prev_config: Option, ) -> Result { - tracing::debug!("Updating from 1.1.4"); + tracing::debug!("Updating from 1.1.5"); let old_cfg = if let Some(prev_config) = prev_config { prev_config } else { diff --git a/nym-node/src/config/template.rs b/nym-node/src/config/template.rs index 83588a0639..cd41116e44 100644 --- a/nym-node/src/config/template.rs +++ b/nym-node/src/config/template.rs @@ -185,6 +185,9 @@ announce_wss_port = {{#if entry_gateway.announce_wss_port }} {{ entry_gateway.an # derived shared keys, available client bandwidths and wireguard peers. clients_storage = '{{ entry_gateway.storage_paths.clients_storage }}' +#Path to sqlite database containing all persistent stats data. +stats_storage = '{{ entry_gateway.storage_paths.stats_storage }}' + # Path to file containing cosmos account mnemonic used for zk-nym redemption. cosmos_mnemonic = '{{ entry_gateway.storage_paths.cosmos_mnemonic }}' @@ -237,6 +240,10 @@ upstream_exit_policy_url = '{{ exit_gateway.upstream_exit_policy_url }}' # derived shared keys, available client bandwidths and wireguard peers. clients_storage = '{{ exit_gateway.storage_paths.clients_storage }}' +#Path to sqlite database containing all persistent stats data. +stats_storage = '{{ exit_gateway.storage_paths.stats_storage }}' + + [exit_gateway.storage_paths.network_requester] # Path to file containing network requester ed25519 identity private key. private_ed25519_identity_key_file = '{{ exit_gateway.storage_paths.network_requester.private_ed25519_identity_key_file }}' From 1d3e4e9301a9488af010afb82ece76517052d82b Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 15:55:08 +0200 Subject: [PATCH 03/14] add stats storage model and logic --- Cargo.lock | 15 ++ Cargo.toml | 1 + common/gateway-stats-storage/Cargo.toml | 36 ++++ common/gateway-stats-storage/build.rs | 28 +++ .../20241017120000_create_initial_tables.sql | 26 +++ common/gateway-stats-storage/src/error.rs | 13 ++ common/gateway-stats-storage/src/lib.rs | 168 ++++++++++++++++++ common/gateway-stats-storage/src/models.rs | 109 ++++++++++++ common/gateway-stats-storage/src/sessions.rs | 144 +++++++++++++++ 9 files changed, 540 insertions(+) create mode 100644 common/gateway-stats-storage/Cargo.toml create mode 100644 common/gateway-stats-storage/build.rs create mode 100644 common/gateway-stats-storage/migrations/20241017120000_create_initial_tables.sql create mode 100644 common/gateway-stats-storage/src/error.rs create mode 100644 common/gateway-stats-storage/src/lib.rs create mode 100644 common/gateway-stats-storage/src/models.rs create mode 100644 common/gateway-stats-storage/src/sessions.rs diff --git a/Cargo.lock b/Cargo.lock index a653c26556..185aa20abe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5258,6 +5258,7 @@ dependencies = [ "nym-credentials-interface", "nym-crypto", "nym-gateway-requests", + "nym-gateway-stats-storage", "nym-gateway-storage", "nym-ip-packet-router", "nym-mixnet-client", @@ -5353,6 +5354,20 @@ dependencies = [ "zeroize", ] +[[package]] +name = "nym-gateway-stats-storage" +version = "0.1.0" +dependencies = [ + "log", + "nym-credentials-interface", + "nym-sphinx", + "sqlx", + "thiserror", + "time", + "tokio", + "tracing", +] + [[package]] name = "nym-gateway-storage" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index f64f8ddbb3..7ba40009d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ members = [ "common/exit-policy", "common/gateway-requests", "common/gateway-storage", + "common/gateway-stats-storage", "common/http-api-client", "common/http-api-common", "common/inclusion-probability", diff --git a/common/gateway-stats-storage/Cargo.toml b/common/gateway-stats-storage/Cargo.toml new file mode 100644 index 0000000000..c122339ce3 --- /dev/null +++ b/common/gateway-stats-storage/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "nym-gateway-stats-storage" +version = "0.1.0" +authors.workspace = true +repository.workspace = true +homepage.workspace = true +documentation.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +#bincode = { workspace = true } +log = { workspace = true } +sqlx = { workspace = true, features = [ + "runtime-tokio-rustls", + "sqlite", + "macros", + "migrate", + "time", +] } +time = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } + +nym-sphinx = { path = "../nymsphinx" } +nym-credentials-interface = { path = "../credentials-interface" } + + +[build-dependencies] +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } +sqlx = { workspace = true, features = [ + "runtime-tokio-rustls", + "sqlite", + "macros", + "migrate", +] } diff --git a/common/gateway-stats-storage/build.rs b/common/gateway-stats-storage/build.rs new file mode 100644 index 0000000000..b254ca3e37 --- /dev/null +++ b/common/gateway-stats-storage/build.rs @@ -0,0 +1,28 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use sqlx::{Connection, SqliteConnection}; +use std::env; + +#[tokio::main] +async fn main() { + let out_dir = env::var("OUT_DIR").unwrap(); + let database_path = format!("{}/gateway-stats-example.sqlite", out_dir); + + let mut conn = SqliteConnection::connect(&format!("sqlite://{}?mode=rwc", database_path)) + .await + .expect("Failed to create SQLx database connection"); + + sqlx::migrate!("./migrations") + .run(&mut conn) + .await + .expect("Failed to perform SQLx migrations"); + + #[cfg(target_family = "unix")] + println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path); + + #[cfg(target_family = "windows")] + // for some strange reason we need to add a leading `/` to the windows path even though it's + // not a valid windows path... but hey, it works... + println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path); +} diff --git a/common/gateway-stats-storage/migrations/20241017120000_create_initial_tables.sql b/common/gateway-stats-storage/migrations/20241017120000_create_initial_tables.sql new file mode 100644 index 0000000000..6b435f3591 --- /dev/null +++ b/common/gateway-stats-storage/migrations/20241017120000_create_initial_tables.sql @@ -0,0 +1,26 @@ +/* + * Copyright 2021 - Nym Technologies SA + * SPDX-License-Identifier: Apache-2.0 + */ + +CREATE TABLE sessions_active +( + client_address TEXT NOT NULL PRIMARY KEY UNIQUE, + start_time TIMESTAMP WITHOUT TIME ZONE NOT NULL, + typ TEXT NOT NULL +); + +CREATE TABLE sessions_finished +( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + day DATE NOT NULL, + duration_ms INTEGER NOT NULL, + typ TEXT NOT NULL +); + +CREATE TABLE sessions_unique_users +( + day DATE NOT NULL, + client_address TEXT NOT NULL, + PRIMARY KEY (day, client_address) +); \ No newline at end of file diff --git a/common/gateway-stats-storage/src/error.rs b/common/gateway-stats-storage/src/error.rs new file mode 100644 index 0000000000..b51ae6defe --- /dev/null +++ b/common/gateway-stats-storage/src/error.rs @@ -0,0 +1,13 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum StatsStorageError { + #[error("Database experienced an internal error: {0}")] + InternalDatabaseError(#[from] sqlx::Error), + + #[error("Failed to perform database migration: {0}")] + MigrationError(#[from] sqlx::migrate::MigrateError), +} diff --git a/common/gateway-stats-storage/src/lib.rs b/common/gateway-stats-storage/src/lib.rs new file mode 100644 index 0000000000..43b54d911e --- /dev/null +++ b/common/gateway-stats-storage/src/lib.rs @@ -0,0 +1,168 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use error::StatsStorageError; +use models::{ActiveSession, FinishedSession, SessionType, StoredFinishedSession}; +use nym_sphinx::DestinationAddressBytes; +use sessions::SessionManager; +use sqlx::ConnectOptions; +use std::path::Path; +use time::Date; +use tracing::{debug, error}; + +pub mod error; +pub mod models; +mod sessions; + +// note that clone here is fine as upon cloning the same underlying pool will be used +#[derive(Clone)] +pub struct PersistentStatsStorage { + session_manager: SessionManager, +} + +impl PersistentStatsStorage { + /// Initialises `PersistentStatsStorage` using the provided path. + /// + /// # Arguments + /// + /// * `database_path`: path to the database. + pub async fn init + Send>(database_path: P) -> Result { + debug!( + "Attempting to connect to database {:?}", + database_path.as_ref().as_os_str() + ); + + // TODO: we can inject here more stuff based on our gateway global config + // struct. Maybe different pool size or timeout intervals? + let mut opts = sqlx::sqlite::SqliteConnectOptions::new() + .filename(database_path) + .create_if_missing(true); + + // TODO: do we want auto_vacuum ? + + opts.disable_statement_logging(); + + let connection_pool = match sqlx::SqlitePool::connect_with(opts).await { + Ok(db) => db, + Err(err) => { + error!("Failed to connect to SQLx database: {err}"); + return Err(err.into()); + } + }; + + if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await { + error!("Failed to perform migration on the SQLx database: {err}"); + return Err(err.into()); + } + + // the cloning here are cheap as connection pool is stored behind an Arc + Ok(PersistentStatsStorage { + session_manager: sessions::SessionManager::new(connection_pool), + }) + } + + //Sessions fn + pub async fn insert_finished_sessions( + &self, + date: Date, + session: FinishedSession, + ) -> Result<(), StatsStorageError> { + Ok(self + .session_manager + .insert_finished_sessions( + date, + session.duration.whole_milliseconds() as i64, + session.typ.to_string().into(), + ) + .await?) + } + + pub async fn get_finished_sessions( + &self, + date: Date, + ) -> Result, StatsStorageError> { + Ok(self.session_manager.get_finished_sessions(date).await?) + } + + pub async fn delete_finished_sessions( + &self, + before_date: Date, + ) -> Result<(), StatsStorageError> { + Ok(self + .session_manager + .delete_finished_sessions(before_date) + .await?) + } + + pub async fn insert_unique_users( + &self, + date: Date, + client_address: DestinationAddressBytes, + ) -> Result<(), StatsStorageError> { + Ok(self + .session_manager + .insert_unique_users(date, client_address.as_base58_string()) + .await?) + } + + pub async fn get_unique_users(&self, date: Date) -> Result { + Ok(self.session_manager.get_unique_users(date).await?) + } + + pub async fn delete_unique_users(&self, before_date: Date) -> Result<(), StatsStorageError> { + Ok(self + .session_manager + .delete_unique_users(before_date) + .await?) + } + + pub async fn insert_active_sessions( + &self, + client_address: DestinationAddressBytes, + session: ActiveSession, + ) -> Result<(), StatsStorageError> { + Ok(self + .session_manager + .insert_active_sessions( + client_address.as_base58_string(), + session.start, + session.typ.to_string().into(), + ) + .await?) + } + + pub async fn update_active_session_type( + &self, + client_address: DestinationAddressBytes, + session_type: SessionType, + ) -> Result<(), StatsStorageError> { + Ok(self + .session_manager + .update_active_session_type( + client_address.as_base58_string(), + session_type.to_string().into(), + ) + .await?) + } + + pub async fn get_active_session( + &self, + client_address: DestinationAddressBytes, + ) -> Result, StatsStorageError> { + Ok(self + .session_manager + .get_active_session(client_address.as_base58_string()) + .await? + .map(Into::into)) + } + + pub async fn delete_active_sessions( + &self, + client_address: DestinationAddressBytes, + ) -> Result<(), StatsStorageError> { + Ok(self + .session_manager + .delete_active_sessions(client_address.as_base58_string()) + .await?) + } +} diff --git a/common/gateway-stats-storage/src/models.rs b/common/gateway-stats-storage/src/models.rs new file mode 100644 index 0000000000..6f53cf429a --- /dev/null +++ b/common/gateway-stats-storage/src/models.rs @@ -0,0 +1,109 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use nym_credentials_interface::TicketType; +use sqlx::prelude::FromRow; +use time::{Duration, OffsetDateTime}; + +#[derive(FromRow)] +pub struct StoredFinishedSession { + duration_ms: i64, + typ: String, +} + +impl StoredFinishedSession { + pub fn serialize(&self) -> (u64, String) { + ( + self.duration_ms as u64, //we are sure that it fits in a u64, see `fn end_at` + self.typ.clone(), + ) + } +} + +pub struct FinishedSession { + pub duration: Duration, + pub typ: SessionType, +} + +#[derive(PartialEq)] +pub enum SessionType { + Vpn, + Mixnet, + Unknown, +} + +impl SessionType { + pub fn to_string(&self) -> &str { + match self { + Self::Vpn => "vpn", + Self::Mixnet => "mixnet", + Self::Unknown => "unknown", + } + } + + pub fn from_string(s: &str) -> Self { + match s { + "vpn" => Self::Vpn, + "mixnet" => Self::Mixnet, + _ => Self::Unknown, + } + } +} + +impl From for SessionType { + fn from(value: TicketType) -> Self { + match value { + TicketType::V1MixnetEntry => Self::Mixnet, + TicketType::V1MixnetExit => Self::Mixnet, + TicketType::V1WireguardEntry => Self::Vpn, + TicketType::V1WireguardExit => Self::Vpn, + } + } +} + +#[derive(FromRow)] +pub(crate) struct StoredActiveSession { + start_time: OffsetDateTime, + typ: String, +} + +pub struct ActiveSession { + pub start: OffsetDateTime, + pub typ: SessionType, +} + +impl ActiveSession { + pub fn new(start_time: OffsetDateTime) -> Self { + ActiveSession { + start: start_time, + typ: SessionType::Unknown, + } + } + + pub fn set_type(&mut self, ticket_type: TicketType) { + self.typ = ticket_type.into(); + } + + pub fn end_at(self, stop_time: OffsetDateTime) -> Option { + let session_duration = stop_time - self.start; + //ensure duration is positive to fit in a u64 + //u64::max milliseconds is 500k millenia so no overflow issue + if session_duration > Duration::ZERO { + Some(FinishedSession { + duration: session_duration, + typ: self.typ, + }) + } else { + None + } + } +} + +impl From for ActiveSession { + fn from(value: StoredActiveSession) -> Self { + ActiveSession { + start: value.start_time, + typ: SessionType::from_string(&value.typ), + } + } +} diff --git a/common/gateway-stats-storage/src/sessions.rs b/common/gateway-stats-storage/src/sessions.rs new file mode 100644 index 0000000000..15a3911626 --- /dev/null +++ b/common/gateway-stats-storage/src/sessions.rs @@ -0,0 +1,144 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use time::{Date, OffsetDateTime}; + +use crate::models::{StoredActiveSession, StoredFinishedSession}; + +pub(crate) type Result = std::result::Result; + +#[derive(Clone)] +pub(crate) struct SessionManager { + connection_pool: sqlx::SqlitePool, +} + +impl SessionManager { + /// Creates new instance of the `SessionsManager` with the provided sqlite connection pool. + /// + /// # Arguments + /// + /// * `connection_pool`: database connection pool to use. + pub(crate) fn new(connection_pool: sqlx::SqlitePool) -> Self { + SessionManager { connection_pool } + } + + pub(crate) async fn insert_finished_sessions( + &self, + date: Date, + duration_ms: i64, + typ: String, + ) -> Result<()> { + sqlx::query!( + "INSERT INTO sessions_finished (day, duration_ms, typ) VALUES (?, ?, ?)", + date, + duration_ms, + typ + ) + .execute(&self.connection_pool) + .await?; + Ok(()) + } + + pub(crate) async fn get_finished_sessions( + &self, + date: Date, + ) -> Result> { + sqlx::query_as("SELECT duration_ms, typ FROM sessions_finished WHERE day = ?") + .bind(date) + .fetch_all(&self.connection_pool) + .await + } + + pub(crate) async fn delete_finished_sessions(&self, before_date: Date) -> Result<()> { + sqlx::query!("DELETE FROM sessions_finished WHERE day <= ? ", before_date) + .execute(&self.connection_pool) + .await?; + Ok(()) + } + + pub(crate) async fn insert_unique_users( + &self, + date: Date, + client_address_b58: String, + ) -> Result<()> { + sqlx::query!( + "INSERT OR IGNORE INTO sessions_unique_users (day, client_address) VALUES (?, ?)", + date, + client_address_b58, + ) + .execute(&self.connection_pool) + .await?; + Ok(()) + } + + pub(crate) async fn get_unique_users(&self, date: Date) -> Result { + Ok(sqlx::query!( + "SELECT COUNT(*) as count FROM sessions_unique_users WHERE day = ?", + date + ) + .fetch_one(&self.connection_pool) + .await? + .count) + } + + pub(crate) async fn delete_unique_users(&self, before_date: Date) -> Result<()> { + sqlx::query!("DELETE FROM sessions_finished WHERE day <= ? ", before_date) + .execute(&self.connection_pool) + .await?; + Ok(()) + } + + pub(crate) async fn insert_active_sessions( + &self, + client_address_b58: String, + start_time: OffsetDateTime, + typ: String, + ) -> Result<()> { + sqlx::query!( + "INSERT INTO sessions_active (client_address, start_time, typ) VALUES (?, ?, ?)", + client_address_b58, + start_time, + typ + ) + .execute(&self.connection_pool) + .await?; + Ok(()) + } + + pub(crate) async fn update_active_session_type( + &self, + client_address_b58: String, + typ: String, + ) -> Result<()> { + sqlx::query!( + "UPDATE sessions_active SET typ = ? WHERE client_address = ?", + typ, + client_address_b58, + ) + .execute(&self.connection_pool) + .await?; + Ok(()) + } + + pub(crate) async fn get_active_session( + &self, + client_address_b58: String, + ) -> Result> { + let session = + sqlx::query_as("SELECT start_time, typ FROM sessions_active WHERE client_address = ?") + .bind(client_address_b58) + .fetch_optional(&self.connection_pool) + .await?; + Ok(session) + } + + pub(crate) async fn delete_active_sessions(&self, client_address_b58: String) -> Result<()> { + sqlx::query!( + "DELETE FROM sessions_active WHERE client_address = ?", + client_address_b58 + ) + .execute(&self.connection_pool) + .await?; + Ok(()) + } +} From 15b1a699a0dc1abac752d731663c6ebfff9f17ec Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 15:55:08 +0200 Subject: [PATCH 04/14] adapt stats collection to new storage --- common/gateway-stats-storage/src/lib.rs | 10 +- common/gateway-stats-storage/src/sessions.rs | 21 ++- gateway/src/node/mod.rs | 2 +- gateway/src/node/statistics/mod.rs | 18 +- gateway/src/node/statistics/sessions.rs | 187 +++++++------------ 5 files changed, 108 insertions(+), 130 deletions(-) diff --git a/common/gateway-stats-storage/src/lib.rs b/common/gateway-stats-storage/src/lib.rs index 43b54d911e..28f6a45d70 100644 --- a/common/gateway-stats-storage/src/lib.rs +++ b/common/gateway-stats-storage/src/lib.rs @@ -94,14 +94,14 @@ impl PersistentStatsStorage { .await?) } - pub async fn insert_unique_users( + pub async fn insert_unique_user( &self, date: Date, - client_address: DestinationAddressBytes, + client_address_bs58: String, ) -> Result<(), StatsStorageError> { Ok(self .session_manager - .insert_unique_users(date, client_address.as_base58_string()) + .insert_unique_user(date, client_address_bs58) .await?) } @@ -156,6 +156,10 @@ impl PersistentStatsStorage { .map(Into::into)) } + pub async fn get_active_users(&self) -> Result, StatsStorageError> { + Ok(self.session_manager.get_active_users().await?) + } + pub async fn delete_active_sessions( &self, client_address: DestinationAddressBytes, diff --git a/common/gateway-stats-storage/src/sessions.rs b/common/gateway-stats-storage/src/sessions.rs index 15a3911626..811ac328dd 100644 --- a/common/gateway-stats-storage/src/sessions.rs +++ b/common/gateway-stats-storage/src/sessions.rs @@ -56,7 +56,7 @@ impl SessionManager { Ok(()) } - pub(crate) async fn insert_unique_users( + pub(crate) async fn insert_unique_user( &self, date: Date, client_address_b58: String, @@ -124,12 +124,19 @@ impl SessionManager { &self, client_address_b58: String, ) -> Result> { - let session = - sqlx::query_as("SELECT start_time, typ FROM sessions_active WHERE client_address = ?") - .bind(client_address_b58) - .fetch_optional(&self.connection_pool) - .await?; - Ok(session) + sqlx::query_as("SELECT start_time, typ FROM sessions_active WHERE client_address = ?") + .bind(client_address_b58) + .fetch_optional(&self.connection_pool) + .await + } + + pub(crate) async fn get_active_users(&self) -> Result> { + Ok(sqlx::query!("SELECT client_address from sessions_active") + .fetch_all(&self.connection_pool) + .await? + .into_iter() + .map(|record| record.client_address) + .collect()) } pub(crate) async fn delete_active_sessions(&self, client_address_b58: String) -> Result<()> { diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index 1948661ed9..f708c95ee4 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -426,7 +426,7 @@ impl Gateway { info!("Starting gateway stats collector..."); let (mut stats_collector, stats_event_sender) = - GatewayStatisticsCollector::new(shared_session_stats); + GatewayStatisticsCollector::new(shared_session_stats, self.stats_storage.clone()); tokio::spawn(async move { stats_collector.run(shutdown).await }); stats_event_sender } diff --git a/gateway/src/node/statistics/mod.rs b/gateway/src/node/statistics/mod.rs index 53ea277db5..cd31e76639 100644 --- a/gateway/src/node/statistics/mod.rs +++ b/gateway/src/node/statistics/mod.rs @@ -2,13 +2,14 @@ // SPDX-License-Identifier: GPL-3.0-only use futures::{channel::mpsc, StreamExt}; +use nym_gateway_stats_storage::PersistentStatsStorage; use nym_node_http_api::state::metrics::SharedSessionStats; use nym_statistics_common::events::{StatsEvent, StatsEventReceiver, StatsEventSender}; use nym_task::TaskClient; use sessions::SessionStatsHandler; use std::time::Duration; use time::OffsetDateTime; -use tracing::trace; +use tracing::{error, trace, warn}; pub mod sessions; @@ -23,22 +24,28 @@ pub(crate) struct GatewayStatisticsCollector { impl GatewayStatisticsCollector { pub fn new( shared_session_stats: SharedSessionStats, + stats_storage: PersistentStatsStorage, ) -> (GatewayStatisticsCollector, StatsEventSender) { let (stats_event_tx, stats_event_rx) = mpsc::unbounded(); + + let session_stats = SessionStatsHandler::new(shared_session_stats, stats_storage); let collector = GatewayStatisticsCollector { stats_event_rx, - session_stats: SessionStatsHandler::new(shared_session_stats), + session_stats, }; (collector, stats_event_tx) } async fn update_shared_state(&mut self, update_time: OffsetDateTime) { - self.session_stats.update_shared_state(update_time).await; + if let Err(e) = self.session_stats.update_shared_state(update_time).await { + error!("Failed to update session stats - {e}") + } //here goes additionnal stats handler update } pub async fn run(&mut self, mut shutdown: TaskClient) { let mut update_interval = tokio::time::interval(STATISTICS_UPDATE_TIMER_INTERVAL); + //SW TODO : cleanup handlers on start, in case of ungraceful shutdown while !shutdown.is_shutdown() { tokio::select! { biased; @@ -53,7 +60,10 @@ impl GatewayStatisticsCollector { Some(stat_event) = self.stats_event_rx.next() => { //dispatching event to proper handler match stat_event { - StatsEvent::SessionStatsEvent(event) => self.session_stats.handle_event(event), + StatsEvent::SessionStatsEvent(event) => { + if let Err(e) = self.session_stats.handle_event(event).await{ + warn!("Session event handling error - {e}"); + }}, } }, diff --git a/gateway/src/node/statistics/sessions.rs b/gateway/src/node/statistics/sessions.rs index 2853f75ad5..5d6dbb28ae 100644 --- a/gateway/src/node/statistics/sessions.rs +++ b/gateway/src/node/statistics/sessions.rs @@ -2,176 +2,133 @@ // SPDX-License-Identifier: GPL-3.0-only use nym_credentials_interface::TicketType; +use nym_gateway_stats_storage::PersistentStatsStorage; +use nym_gateway_stats_storage::{error::StatsStorageError, models::ActiveSession}; use nym_node_http_api::state::metrics::SharedSessionStats; use nym_sphinx::DestinationAddressBytes; -use std::collections::{HashMap, HashSet}; use time::{Date, Duration, OffsetDateTime}; use nym_statistics_common::events::SessionEvent; -const FINISHED_SESSIONS_CAP: usize = 1_000_000; //to be on the safe side of memory blowups until persistent storage - -#[derive(PartialEq)] -enum SessionType { - Vpn, - Mixnet, - Unknown, -} - -impl SessionType { - fn to_string(&self) -> &str { - match self { - Self::Vpn => "vpn", - Self::Mixnet => "mixnet", - Self::Unknown => "unknown", - } - } -} - -impl From for SessionType { - fn from(value: TicketType) -> Self { - match value { - TicketType::V1MixnetEntry => Self::Mixnet, - TicketType::V1MixnetExit => Self::Mixnet, - TicketType::V1WireguardEntry => Self::Vpn, - TicketType::V1WireguardExit => Self::Vpn, - } - } -} - -struct FinishedSession { - duration: Duration, - typ: SessionType, -} - -impl FinishedSession { - fn serialize(&self) -> (u64, String) { - ( - self.duration.whole_milliseconds() as u64, //we are sure that it fits in a u64, see `fn end_at` - self.typ.to_string().into(), - ) - } -} - -struct ActiveSession { - start: OffsetDateTime, - typ: SessionType, -} - -impl ActiveSession { - fn new(start_time: OffsetDateTime) -> Self { - ActiveSession { - start: start_time, - typ: SessionType::Unknown, - } - } - - fn set_type(&mut self, ticket_type: TicketType) { - self.typ = ticket_type.into(); - } - - fn end_at(self, stop_time: OffsetDateTime) -> Option { - let session_duration = stop_time - self.start; - //ensure duration is positive to fit in a u64 - //u64::max milliseconds is 500k millenia so no overflow issue - if session_duration > Duration::ZERO { - Some(FinishedSession { - duration: session_duration, - typ: self.typ, - }) - } else { - None - } - } -} - pub(crate) struct SessionStatsHandler { + storage: PersistentStatsStorage, last_update_day: Date, shared_session_stats: SharedSessionStats, - active_sessions: HashMap, - unique_users: HashSet, sessions_started: u32, - finished_sessions: Vec, } impl SessionStatsHandler { - pub fn new(shared_session_stats: SharedSessionStats) -> Self { + pub fn new(shared_session_stats: SharedSessionStats, storage: PersistentStatsStorage) -> Self { SessionStatsHandler { + storage, last_update_day: OffsetDateTime::now_utc().date(), shared_session_stats, - active_sessions: Default::default(), - unique_users: Default::default(), sessions_started: 0, - finished_sessions: Default::default(), } } - pub(crate) fn handle_event(&mut self, event: SessionEvent) { + pub(crate) async fn handle_event( + &mut self, + event: SessionEvent, + ) -> Result<(), StatsStorageError> { match event { SessionEvent::SessionStart { start_time, client } => { - self.handle_session_start(start_time, client); + self.handle_session_start(start_time, client).await } + SessionEvent::SessionStop { stop_time, client } => { - self.handle_session_stop(stop_time, client); + self.handle_session_stop(stop_time, client).await } + SessionEvent::EcashTicket { ticket_type, client, - } => self.handle_ecash_ticket(ticket_type, client), + } => self.handle_ecash_ticket(ticket_type, client).await, } } - fn handle_session_start( + async fn handle_session_start( &mut self, start_time: OffsetDateTime, client: DestinationAddressBytes, - ) { + ) -> Result<(), StatsStorageError> { self.sessions_started += 1; - self.unique_users.insert(client); - self.active_sessions - .insert(client, ActiveSession::new(start_time)); + self.storage + .insert_unique_user(self.last_update_day, client.as_base58_string()) + .await?; + self.storage + .insert_active_sessions(client, ActiveSession::new(start_time)) + .await?; + Ok(()) } - fn handle_session_stop(&mut self, stop_time: OffsetDateTime, client: DestinationAddressBytes) { - if let Some(session) = self.active_sessions.remove(&client) { + + async fn handle_session_stop( + &mut self, + stop_time: OffsetDateTime, + client: DestinationAddressBytes, + ) -> Result<(), StatsStorageError> { + if let Some(session) = self.storage.get_active_session(client).await? { if let Some(finished_session) = session.end_at(stop_time) { - if self.finished_sessions.len() < FINISHED_SESSIONS_CAP { - self.finished_sessions.push(finished_session); - } + self.storage + .insert_finished_sessions(self.last_update_day, finished_session) + .await?; } } + Ok(()) } - fn handle_ecash_ticket(&mut self, ticket_type: TicketType, client: DestinationAddressBytes) { - if let Some(active_session) = self.active_sessions.get_mut(&client) { - if active_session.typ == SessionType::Unknown { - active_session.set_type(ticket_type); - } - } + async fn handle_ecash_ticket( + &mut self, + ticket_type: TicketType, + client: DestinationAddressBytes, + ) -> Result<(), StatsStorageError> { + self.storage + .update_active_session_type(client, ticket_type.into()) + .await?; + Ok(()) } //update shared state once a day has passed, with data from the previous day - pub(crate) async fn update_shared_state(&mut self, update_time: OffsetDateTime) { + pub(crate) async fn update_shared_state( + &mut self, + update_time: OffsetDateTime, + ) -> Result<(), StatsStorageError> { let update_date = update_time.date(); if update_date != self.last_update_day { + let finished_sessions = self + .storage + .get_finished_sessions(self.last_update_day) + .await?; + let user_count = self.storage.get_unique_users(self.last_update_day).await?; { let mut shared_state = self.shared_session_stats.write().await; shared_state.update_time = self.last_update_day; - shared_state.unique_active_users = self.unique_users.len() as u32; + shared_state.unique_active_users = user_count as u32; shared_state.session_started = self.sessions_started; - shared_state.sessions = self - .finished_sessions - .iter() - .map(|s| s.serialize()) - .collect(); + shared_state.sessions = finished_sessions.iter().map(|s| s.serialize()).collect(); } - self.reset_stats(update_date); + self.reset_stats(update_date).await?; } + Ok(()) } - fn reset_stats(&mut self, reset_day: Date) { + async fn reset_stats(&mut self, reset_day: Date) -> Result<(), StatsStorageError> { self.last_update_day = reset_day; - self.unique_users = self.active_sessions.keys().copied().collect(); - self.finished_sessions = Default::default(); + + //active users reset + let new_active_users = self.storage.get_active_users().await?; + self.storage + .delete_unique_users(reset_day - Duration::DAY) + .await?; + for user in new_active_users { + self.storage.insert_unique_user(reset_day, user).await?; + } + + //finished session reset + self.storage + .delete_finished_sessions(reset_day - Duration::DAY) + .await?; self.sessions_started = 0; + Ok(()) } } From c4e44ecec00a9131835c8a94036e1707014924fe Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 15:55:08 +0200 Subject: [PATCH 05/14] stats cleanup on start --- common/gateway-stats-storage/src/lib.rs | 16 ++++--- common/gateway-stats-storage/src/sessions.rs | 13 ++++-- gateway/src/node/statistics/mod.rs | 17 ++++++-- gateway/src/node/statistics/sessions.rs | 45 +++++++++++++------- 4 files changed, 63 insertions(+), 28 deletions(-) diff --git a/common/gateway-stats-storage/src/lib.rs b/common/gateway-stats-storage/src/lib.rs index 28f6a45d70..6018bde44d 100644 --- a/common/gateway-stats-storage/src/lib.rs +++ b/common/gateway-stats-storage/src/lib.rs @@ -62,14 +62,14 @@ impl PersistentStatsStorage { } //Sessions fn - pub async fn insert_finished_sessions( + pub async fn insert_finished_session( &self, date: Date, session: FinishedSession, ) -> Result<(), StatsStorageError> { Ok(self .session_manager - .insert_finished_sessions( + .insert_finished_session( date, session.duration.whole_milliseconds() as i64, session.typ.to_string().into(), @@ -116,14 +116,14 @@ impl PersistentStatsStorage { .await?) } - pub async fn insert_active_sessions( + pub async fn insert_active_session( &self, client_address: DestinationAddressBytes, session: ActiveSession, ) -> Result<(), StatsStorageError> { Ok(self .session_manager - .insert_active_sessions( + .insert_active_session( client_address.as_base58_string(), session.start, session.typ.to_string().into(), @@ -160,13 +160,17 @@ impl PersistentStatsStorage { Ok(self.session_manager.get_active_users().await?) } - pub async fn delete_active_sessions( + pub async fn delete_active_session( &self, client_address: DestinationAddressBytes, ) -> Result<(), StatsStorageError> { Ok(self .session_manager - .delete_active_sessions(client_address.as_base58_string()) + .delete_active_session(client_address.as_base58_string()) .await?) } + + pub async fn cleanup_active_sessions(&self) -> Result<(), StatsStorageError> { + Ok(self.session_manager.cleanup_active_sessions().await?) + } } diff --git a/common/gateway-stats-storage/src/sessions.rs b/common/gateway-stats-storage/src/sessions.rs index 811ac328dd..cdb71f57fb 100644 --- a/common/gateway-stats-storage/src/sessions.rs +++ b/common/gateway-stats-storage/src/sessions.rs @@ -22,7 +22,7 @@ impl SessionManager { SessionManager { connection_pool } } - pub(crate) async fn insert_finished_sessions( + pub(crate) async fn insert_finished_session( &self, date: Date, duration_ms: i64, @@ -88,7 +88,7 @@ impl SessionManager { Ok(()) } - pub(crate) async fn insert_active_sessions( + pub(crate) async fn insert_active_session( &self, client_address_b58: String, start_time: OffsetDateTime, @@ -139,7 +139,7 @@ impl SessionManager { .collect()) } - pub(crate) async fn delete_active_sessions(&self, client_address_b58: String) -> Result<()> { + pub(crate) async fn delete_active_session(&self, client_address_b58: String) -> Result<()> { sqlx::query!( "DELETE FROM sessions_active WHERE client_address = ?", client_address_b58 @@ -148,4 +148,11 @@ impl SessionManager { .await?; Ok(()) } + + pub(crate) async fn cleanup_active_sessions(&self) -> Result<()> { + sqlx::query!("DELETE FROM sessions_active") + .execute(&self.connection_pool) + .await?; + Ok(()) + } } diff --git a/gateway/src/node/statistics/mod.rs b/gateway/src/node/statistics/mod.rs index cd31e76639..6cc943cba7 100644 --- a/gateway/src/node/statistics/mod.rs +++ b/gateway/src/node/statistics/mod.rs @@ -37,15 +37,26 @@ impl GatewayStatisticsCollector { } async fn update_shared_state(&mut self, update_time: OffsetDateTime) { - if let Err(e) = self.session_stats.update_shared_state(update_time).await { - error!("Failed to update session stats - {e}") + if let Err(e) = self + .session_stats + .maybe_update_shared_state(update_time) + .await + { + error!("Failed to update session stats - {e}"); } //here goes additionnal stats handler update } + async fn on_start(&mut self) { + if let Err(e) = self.session_stats.on_start().await { + error!("Failed to cleanup session stats handler - {e}"); + } + //here goes additionnal stats handler start cleanup + } + pub async fn run(&mut self, mut shutdown: TaskClient) { + self.on_start().await; let mut update_interval = tokio::time::interval(STATISTICS_UPDATE_TIMER_INTERVAL); - //SW TODO : cleanup handlers on start, in case of ungraceful shutdown while !shutdown.is_shutdown() { tokio::select! { biased; diff --git a/gateway/src/node/statistics/sessions.rs b/gateway/src/node/statistics/sessions.rs index 5d6dbb28ae..b5179daf27 100644 --- a/gateway/src/node/statistics/sessions.rs +++ b/gateway/src/node/statistics/sessions.rs @@ -57,7 +57,7 @@ impl SessionStatsHandler { .insert_unique_user(self.last_update_day, client.as_base58_string()) .await?; self.storage - .insert_active_sessions(client, ActiveSession::new(start_time)) + .insert_active_session(client, ActiveSession::new(start_time)) .await?; Ok(()) } @@ -70,7 +70,7 @@ impl SessionStatsHandler { if let Some(session) = self.storage.get_active_session(client).await? { if let Some(finished_session) = session.end_at(stop_time) { self.storage - .insert_finished_sessions(self.last_update_day, finished_session) + .insert_finished_session(self.last_update_day, finished_session) .await?; } } @@ -88,26 +88,39 @@ impl SessionStatsHandler { Ok(()) } + pub(crate) async fn on_start(&mut self) -> Result<(), StatsStorageError> { + let yesterday = OffsetDateTime::now_utc().date() - Duration::DAY; + //publish yesterday's data if any + self.publish_stats(yesterday).await?; + //cleanup active sessions + self.storage.cleanup_active_sessions().await?; + //reset stats + self.reset_stats(yesterday).await?; + Ok(()) + } + //update shared state once a day has passed, with data from the previous day - pub(crate) async fn update_shared_state( + async fn publish_stats(&mut self, stats_date: Date) -> Result<(), StatsStorageError> { + let finished_sessions = self.storage.get_finished_sessions(stats_date).await?; + let user_count = self.storage.get_unique_users(stats_date).await?; + { + let mut shared_state = self.shared_session_stats.write().await; + shared_state.update_time = stats_date; + shared_state.unique_active_users = user_count as u32; + shared_state.session_started = self.sessions_started; + shared_state.sessions = finished_sessions.iter().map(|s| s.serialize()).collect(); + } + + Ok(()) + } + pub(crate) async fn maybe_update_shared_state( &mut self, update_time: OffsetDateTime, ) -> Result<(), StatsStorageError> { let update_date = update_time.date(); if update_date != self.last_update_day { - let finished_sessions = self - .storage - .get_finished_sessions(self.last_update_day) - .await?; - let user_count = self.storage.get_unique_users(self.last_update_day).await?; - { - let mut shared_state = self.shared_session_stats.write().await; - shared_state.update_time = self.last_update_day; - shared_state.unique_active_users = user_count as u32; - shared_state.session_started = self.sessions_started; - shared_state.sessions = finished_sessions.iter().map(|s| s.serialize()).collect(); - } - self.reset_stats(update_date).await?; + self.publish_stats(self.last_update_day).await?; + self.reset_stats(self.last_update_day).await?; } Ok(()) } From fe29954837b1d1f904ae75ea53a659f1978cd48d Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 15:55:08 +0200 Subject: [PATCH 06/14] change to linux only code --- gateway/src/node/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index f708c95ee4..738d6f3083 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -286,7 +286,7 @@ impl Gateway { forwarding_channel, router_tx, ); - let all_peers = self.storage.get_all_wireguard_peers().await?; + let all_peers = self.client_storage.get_all_wireguard_peers().await?; let used_private_network_ips = all_peers .iter() .cloned() @@ -341,7 +341,7 @@ impl Gateway { .start_with_shutdown(router_shutdown); let wg_api = nym_wireguard::start_wireguard( - self.storage.clone(), + self.client_storage.clone(), all_peers, shutdown, wireguard_data, From 2d759bcae0cfbfbab7ca17370cdce22290f323bf Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 15:55:09 +0200 Subject: [PATCH 07/14] tweaks --- gateway/src/node/statistics/sessions.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/gateway/src/node/statistics/sessions.rs b/gateway/src/node/statistics/sessions.rs index b5179daf27..135d6e4463 100644 --- a/gateway/src/node/statistics/sessions.rs +++ b/gateway/src/node/statistics/sessions.rs @@ -12,7 +12,7 @@ use nym_statistics_common::events::SessionEvent; pub(crate) struct SessionStatsHandler { storage: PersistentStatsStorage, - last_update_day: Date, + current_day: Date, shared_session_stats: SharedSessionStats, sessions_started: u32, @@ -22,7 +22,7 @@ impl SessionStatsHandler { pub fn new(shared_session_stats: SharedSessionStats, storage: PersistentStatsStorage) -> Self { SessionStatsHandler { storage, - last_update_day: OffsetDateTime::now_utc().date(), + current_day: OffsetDateTime::now_utc().date(), shared_session_stats, sessions_started: 0, } @@ -54,7 +54,7 @@ impl SessionStatsHandler { ) -> Result<(), StatsStorageError> { self.sessions_started += 1; self.storage - .insert_unique_user(self.last_update_day, client.as_base58_string()) + .insert_unique_user(self.current_day, client.as_base58_string()) .await?; self.storage .insert_active_session(client, ActiveSession::new(start_time)) @@ -70,8 +70,9 @@ impl SessionStatsHandler { if let Some(session) = self.storage.get_active_session(client).await? { if let Some(finished_session) = session.end_at(stop_time) { self.storage - .insert_finished_session(self.last_update_day, finished_session) + .insert_finished_session(self.current_day, finished_session) .await?; + self.storage.delete_active_session(client).await?; } } Ok(()) @@ -118,16 +119,15 @@ impl SessionStatsHandler { update_time: OffsetDateTime, ) -> Result<(), StatsStorageError> { let update_date = update_time.date(); - if update_date != self.last_update_day { - self.publish_stats(self.last_update_day).await?; - self.reset_stats(self.last_update_day).await?; + if update_date != self.current_day { + self.publish_stats(self.current_day).await?; + self.reset_stats(self.current_day).await?; + self.current_day = update_date; } Ok(()) } async fn reset_stats(&mut self, reset_day: Date) -> Result<(), StatsStorageError> { - self.last_update_day = reset_day; - //active users reset let new_active_users = self.storage.get_active_users().await?; self.storage From 8e42877aca38b3675959cfc7b7e5721746c91e94 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 15:55:09 +0200 Subject: [PATCH 08/14] modified stats cleanup + change session started --- common/gateway-stats-storage/src/lib.rs | 10 +++++++ common/gateway-stats-storage/src/sessions.rs | 10 +++++++ gateway/src/node/statistics/sessions.rs | 31 ++++++++++---------- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/common/gateway-stats-storage/src/lib.rs b/common/gateway-stats-storage/src/lib.rs index 6018bde44d..478cc87797 100644 --- a/common/gateway-stats-storage/src/lib.rs +++ b/common/gateway-stats-storage/src/lib.rs @@ -156,6 +156,16 @@ impl PersistentStatsStorage { .map(Into::into)) } + pub async fn get_started_sessions_count( + &self, + start_date: Date, + ) -> Result { + Ok(self + .session_manager + .get_started_sessions_count(start_date) + .await?) + } + pub async fn get_active_users(&self) -> Result, StatsStorageError> { Ok(self.session_manager.get_active_users().await?) } diff --git a/common/gateway-stats-storage/src/sessions.rs b/common/gateway-stats-storage/src/sessions.rs index cdb71f57fb..1bd157d33a 100644 --- a/common/gateway-stats-storage/src/sessions.rs +++ b/common/gateway-stats-storage/src/sessions.rs @@ -130,6 +130,16 @@ impl SessionManager { .await } + pub(crate) async fn get_started_sessions_count(&self, start_date: Date) -> Result { + Ok(sqlx::query!( + "SELECT COUNT(*) as count FROM sessions_active WHERE date(start_time) = ?", + start_date + ) + .fetch_one(&self.connection_pool) + .await? + .count) + } + pub(crate) async fn get_active_users(&self) -> Result> { Ok(sqlx::query!("SELECT client_address from sessions_active") .fetch_all(&self.connection_pool) diff --git a/gateway/src/node/statistics/sessions.rs b/gateway/src/node/statistics/sessions.rs index 135d6e4463..5deb69a18b 100644 --- a/gateway/src/node/statistics/sessions.rs +++ b/gateway/src/node/statistics/sessions.rs @@ -15,7 +15,6 @@ pub(crate) struct SessionStatsHandler { current_day: Date, shared_session_stats: SharedSessionStats, - sessions_started: u32, } impl SessionStatsHandler { @@ -24,7 +23,6 @@ impl SessionStatsHandler { storage, current_day: OffsetDateTime::now_utc().date(), shared_session_stats, - sessions_started: 0, } } @@ -52,7 +50,6 @@ impl SessionStatsHandler { start_time: OffsetDateTime, client: DestinationAddressBytes, ) -> Result<(), StatsStorageError> { - self.sessions_started += 1; self.storage .insert_unique_user(self.current_day, client.as_base58_string()) .await?; @@ -94,9 +91,10 @@ impl SessionStatsHandler { //publish yesterday's data if any self.publish_stats(yesterday).await?; //cleanup active sessions - self.storage.cleanup_active_sessions().await?; - //reset stats - self.reset_stats(yesterday).await?; + self.storage.cleanup_active_sessions().await?; //store them with duration 0 + + //delete old entries + self.delete_old_stats(yesterday - Duration::DAY).await?; Ok(()) } @@ -104,11 +102,12 @@ impl SessionStatsHandler { async fn publish_stats(&mut self, stats_date: Date) -> Result<(), StatsStorageError> { let finished_sessions = self.storage.get_finished_sessions(stats_date).await?; let user_count = self.storage.get_unique_users(stats_date).await?; + let session_started = self.storage.get_started_sessions_count(stats_date).await? as u32; { let mut shared_state = self.shared_session_stats.write().await; shared_state.update_time = stats_date; shared_state.unique_active_users = user_count as u32; - shared_state.session_started = self.sessions_started; + shared_state.session_started = session_started; shared_state.sessions = finished_sessions.iter().map(|s| s.serialize()).collect(); } @@ -121,7 +120,9 @@ impl SessionStatsHandler { let update_date = update_time.date(); if update_date != self.current_day { self.publish_stats(self.current_day).await?; - self.reset_stats(self.current_day).await?; + self.delete_old_stats(self.current_day - Duration::DAY) + .await?; + self.reset_stats(update_date).await?; self.current_day = update_date; } Ok(()) @@ -130,18 +131,16 @@ impl SessionStatsHandler { async fn reset_stats(&mut self, reset_day: Date) -> Result<(), StatsStorageError> { //active users reset let new_active_users = self.storage.get_active_users().await?; - self.storage - .delete_unique_users(reset_day - Duration::DAY) - .await?; for user in new_active_users { self.storage.insert_unique_user(reset_day, user).await?; } - //finished session reset - self.storage - .delete_finished_sessions(reset_day - Duration::DAY) - .await?; - self.sessions_started = 0; + Ok(()) + } + + async fn delete_old_stats(&mut self, delete_before: Date) -> Result<(), StatsStorageError> { + self.storage.delete_finished_sessions(delete_before).await?; + self.storage.delete_unique_users(delete_before).await?; Ok(()) } } From ba3002631dafb14db714bc573e1feebf4aba526f Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 15:55:09 +0200 Subject: [PATCH 09/14] change wrong table name --- common/gateway-stats-storage/src/sessions.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/common/gateway-stats-storage/src/sessions.rs b/common/gateway-stats-storage/src/sessions.rs index 1bd157d33a..9928e55581 100644 --- a/common/gateway-stats-storage/src/sessions.rs +++ b/common/gateway-stats-storage/src/sessions.rs @@ -82,9 +82,12 @@ impl SessionManager { } pub(crate) async fn delete_unique_users(&self, before_date: Date) -> Result<()> { - sqlx::query!("DELETE FROM sessions_finished WHERE day <= ? ", before_date) - .execute(&self.connection_pool) - .await?; + sqlx::query!( + "DELETE FROM sessions_unique_users WHERE day <= ? ", + before_date + ) + .execute(&self.connection_pool) + .await?; Ok(()) } From 1a4a22a84e2f3cf31601968f77f93d7bee33b350 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 15:55:09 +0200 Subject: [PATCH 10/14] store crashed session as 0 duration --- common/gateway-stats-storage/src/lib.rs | 14 ++++++++++++-- common/gateway-stats-storage/src/sessions.rs | 8 +++++++- gateway/src/node/statistics/sessions.rs | 17 +++++++++++++++-- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/common/gateway-stats-storage/src/lib.rs b/common/gateway-stats-storage/src/lib.rs index 478cc87797..e0e8916f9c 100644 --- a/common/gateway-stats-storage/src/lib.rs +++ b/common/gateway-stats-storage/src/lib.rs @@ -105,8 +105,8 @@ impl PersistentStatsStorage { .await?) } - pub async fn get_unique_users(&self, date: Date) -> Result { - Ok(self.session_manager.get_unique_users(date).await?) + pub async fn get_unique_users_count(&self, date: Date) -> Result { + Ok(self.session_manager.get_unique_users_count(date).await?) } pub async fn delete_unique_users(&self, before_date: Date) -> Result<(), StatsStorageError> { @@ -156,6 +156,16 @@ impl PersistentStatsStorage { .map(Into::into)) } + pub async fn get_all_active_sessions(&self) -> Result, StatsStorageError> { + Ok(self + .session_manager + .get_all_active_sessions() + .await? + .into_iter() + .map(Into::into) + .collect()) + } + pub async fn get_started_sessions_count( &self, start_date: Date, diff --git a/common/gateway-stats-storage/src/sessions.rs b/common/gateway-stats-storage/src/sessions.rs index 9928e55581..660bc684e3 100644 --- a/common/gateway-stats-storage/src/sessions.rs +++ b/common/gateway-stats-storage/src/sessions.rs @@ -71,7 +71,7 @@ impl SessionManager { Ok(()) } - pub(crate) async fn get_unique_users(&self, date: Date) -> Result { + pub(crate) async fn get_unique_users_count(&self, date: Date) -> Result { Ok(sqlx::query!( "SELECT COUNT(*) as count FROM sessions_unique_users WHERE day = ?", date @@ -133,6 +133,12 @@ impl SessionManager { .await } + pub(crate) async fn get_all_active_sessions(&self) -> Result> { + sqlx::query_as("SELECT start_time, typ FROM sessions_active") + .fetch_all(&self.connection_pool) + .await + } + pub(crate) async fn get_started_sessions_count(&self, start_date: Date) -> Result { Ok(sqlx::query!( "SELECT COUNT(*) as count FROM sessions_active WHERE date(start_time) = ?", diff --git a/gateway/src/node/statistics/sessions.rs b/gateway/src/node/statistics/sessions.rs index 5deb69a18b..202047e7fc 100644 --- a/gateway/src/node/statistics/sessions.rs +++ b/gateway/src/node/statistics/sessions.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-3.0-only use nym_credentials_interface::TicketType; +use nym_gateway_stats_storage::models::FinishedSession; use nym_gateway_stats_storage::PersistentStatsStorage; use nym_gateway_stats_storage::{error::StatsStorageError, models::ActiveSession}; use nym_node_http_api::state::metrics::SharedSessionStats; @@ -90,8 +91,20 @@ impl SessionStatsHandler { let yesterday = OffsetDateTime::now_utc().date() - Duration::DAY; //publish yesterday's data if any self.publish_stats(yesterday).await?; + //store "active" sessions as duration 0 + for active_session in self.storage.get_all_active_sessions().await? { + self.storage + .insert_finished_session( + self.current_day, + FinishedSession { + duration: Duration::ZERO, + typ: active_session.typ, + }, + ) + .await? + } //cleanup active sessions - self.storage.cleanup_active_sessions().await?; //store them with duration 0 + self.storage.cleanup_active_sessions().await?; //delete old entries self.delete_old_stats(yesterday - Duration::DAY).await?; @@ -101,7 +114,7 @@ impl SessionStatsHandler { //update shared state once a day has passed, with data from the previous day async fn publish_stats(&mut self, stats_date: Date) -> Result<(), StatsStorageError> { let finished_sessions = self.storage.get_finished_sessions(stats_date).await?; - let user_count = self.storage.get_unique_users(stats_date).await?; + let user_count = self.storage.get_unique_users_count(stats_date).await?; let session_started = self.storage.get_started_sessions_count(stats_date).await? as u32; { let mut shared_state = self.shared_session_stats.write().await; From cf7a2a9b6226156b6f64d02cb188a0700f6389dd Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 23 Oct 2024 16:00:44 +0200 Subject: [PATCH 11/14] adapt for sqlx 0.7 --- common/gateway-stats-storage/src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/common/gateway-stats-storage/src/lib.rs b/common/gateway-stats-storage/src/lib.rs index e0e8916f9c..51fd84f2d9 100644 --- a/common/gateway-stats-storage/src/lib.rs +++ b/common/gateway-stats-storage/src/lib.rs @@ -34,14 +34,13 @@ impl PersistentStatsStorage { // TODO: we can inject here more stuff based on our gateway global config // struct. Maybe different pool size or timeout intervals? - let mut opts = sqlx::sqlite::SqliteConnectOptions::new() + let opts = sqlx::sqlite::SqliteConnectOptions::new() .filename(database_path) - .create_if_missing(true); + .create_if_missing(true) + .disable_statement_logging(); // TODO: do we want auto_vacuum ? - opts.disable_statement_logging(); - let connection_pool = match sqlx::SqlitePool::connect_with(opts).await { Ok(db) => db, Err(err) => { From 7a32981cab398a4ccf2ac190733c688b719a6548 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Thu, 24 Oct 2024 10:05:41 +0200 Subject: [PATCH 12/14] remove unused dependencies --- Cargo.lock | 1 - common/gateway-stats-storage/Cargo.toml | 2 -- 2 files changed, 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 185aa20abe..7843a41661 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5358,7 +5358,6 @@ dependencies = [ name = "nym-gateway-stats-storage" version = "0.1.0" dependencies = [ - "log", "nym-credentials-interface", "nym-sphinx", "sqlx", diff --git a/common/gateway-stats-storage/Cargo.toml b/common/gateway-stats-storage/Cargo.toml index c122339ce3..d439b34a16 100644 --- a/common/gateway-stats-storage/Cargo.toml +++ b/common/gateway-stats-storage/Cargo.toml @@ -9,8 +9,6 @@ edition.workspace = true license.workspace = true [dependencies] -#bincode = { workspace = true } -log = { workspace = true } sqlx = { workspace = true, features = [ "runtime-tokio-rustls", "sqlite", From c3c865691765f493acc06c91de783d314f081aa3 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Thu, 24 Oct 2024 10:15:47 +0200 Subject: [PATCH 13/14] revert changes from gateway config, as it is broken anyway --- gateway/src/config/template.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/gateway/src/config/template.rs b/gateway/src/config/template.rs index 3c9f526270..2f1a34d822 100644 --- a/gateway/src/config/template.rs +++ b/gateway/src/config/template.rs @@ -98,9 +98,6 @@ keys.public_sphinx_key_file = '{{ storage_paths.keys.public_sphinx_key_file }}' # derived shared keys and available client bandwidths. clients_storage = '{{ storage_paths.clients_storage }}' -#Path to sqlite database containing all persistent stats data. -stats_storage = '{{ storage_paths.stats_storage }}' - # Path to the configuration of the embedded network requester. network_requester_config = '{{ storage_paths.network_requester_config }}' From 6486609040c2845c7785a029fad7c83f07ebf03a Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Mon, 28 Oct 2024 08:38:35 +0100 Subject: [PATCH 14/14] copyright and misc stuff --- common/gateway-stats-storage/build.rs | 2 +- .../migrations/20241017120000_create_initial_tables.sql | 4 ++-- gateway/src/config/persistence/paths.rs | 1 - nym-node/src/config/template.rs | 4 ++-- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/common/gateway-stats-storage/build.rs b/common/gateway-stats-storage/build.rs index b254ca3e37..166349c67a 100644 --- a/common/gateway-stats-storage/build.rs +++ b/common/gateway-stats-storage/build.rs @@ -1,4 +1,4 @@ -// Copyright 2023 - Nym Technologies SA +// Copyright 2024 - Nym Technologies SA // SPDX-License-Identifier: GPL-3.0-only use sqlx::{Connection, SqliteConnection}; diff --git a/common/gateway-stats-storage/migrations/20241017120000_create_initial_tables.sql b/common/gateway-stats-storage/migrations/20241017120000_create_initial_tables.sql index 6b435f3591..aff233dff9 100644 --- a/common/gateway-stats-storage/migrations/20241017120000_create_initial_tables.sql +++ b/common/gateway-stats-storage/migrations/20241017120000_create_initial_tables.sql @@ -1,6 +1,6 @@ /* - * Copyright 2021 - Nym Technologies SA - * SPDX-License-Identifier: Apache-2.0 + * Copyright 2024 - Nym Technologies SA + * SPDX-License-Identifier: GPL-3.0-only */ CREATE TABLE sessions_active diff --git a/gateway/src/config/persistence/paths.rs b/gateway/src/config/persistence/paths.rs index 45a989615a..4062d41bd1 100644 --- a/gateway/src/config/persistence/paths.rs +++ b/gateway/src/config/persistence/paths.rs @@ -41,7 +41,6 @@ pub struct GatewayPaths { pub clients_storage: PathBuf, /// Path to sqlite database containing all persistent stats data. - #[serde(alias = "persistent_stats_storage")] pub stats_storage: PathBuf, /// Path to the configuration of the embedded network requester. diff --git a/nym-node/src/config/template.rs b/nym-node/src/config/template.rs index cd41116e44..a43b942caf 100644 --- a/nym-node/src/config/template.rs +++ b/nym-node/src/config/template.rs @@ -185,7 +185,7 @@ announce_wss_port = {{#if entry_gateway.announce_wss_port }} {{ entry_gateway.an # derived shared keys, available client bandwidths and wireguard peers. clients_storage = '{{ entry_gateway.storage_paths.clients_storage }}' -#Path to sqlite database containing all persistent stats data. +# Path to sqlite database containing all persistent stats data. stats_storage = '{{ entry_gateway.storage_paths.stats_storage }}' # Path to file containing cosmos account mnemonic used for zk-nym redemption. @@ -240,7 +240,7 @@ upstream_exit_policy_url = '{{ exit_gateway.upstream_exit_policy_url }}' # derived shared keys, available client bandwidths and wireguard peers. clients_storage = '{{ exit_gateway.storage_paths.clients_storage }}' -#Path to sqlite database containing all persistent stats data. +# Path to sqlite database containing all persistent stats data. stats_storage = '{{ exit_gateway.storage_paths.stats_storage }}'