From 15c8e51d28660bd67c9af17035ca7dec72a10872 Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Thu, 2 Jan 2025 11:47:45 +0100 Subject: [PATCH] feat: session replacer sends notifications on session lost/replaced --- .../ockam_api/src/cli_state/cli_state.rs | 3 +- .../ockam_api/src/nodes/service/relay.rs | 26 +++++++++++++++-- .../service/tcp_inlets/session_replacer.rs | 28 +++++++++++++++++-- .../ockam/ockam_api/src/session/replacer.rs | 4 +++ .../ockam/ockam_api/src/session/session.rs | 15 +++++++--- .../ockam/ockam_api/src/ui/terminal/fmt.rs | 16 +++++------ .../ockam_api/src/ui/terminal/notification.rs | 17 +++++------ .../ockam/ockam_api/tests/common/session.rs | 8 ++++++ .../ockam/ockam_command/src/relay/create.rs | 6 ++-- .../src/run/parser/resource/traits.rs | 8 ++---- .../ockam_command/src/tcp/inlet/create.rs | 28 +++++++++---------- .../ockam/ockam_node/src/router/shutdown.rs | 2 +- 12 files changed, 111 insertions(+), 50 deletions(-) diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs index 3301aaaa9b9..18e04a50169 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs @@ -44,8 +44,7 @@ pub struct CliState { database: SqlxDatabase, application_database: SqlxDatabase, exporting_enabled: ExportingEnabled, - /// Broadcast channel to be notified of major events during a process supported by the - /// CliState API + /// Broadcast channel to be notified of major events during a process supported by the CliState API notifications: Sender, } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs index f51b9fcbbc0..4037ed8dce7 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs @@ -1,6 +1,7 @@ use std::sync::{Arc, Weak}; use std::time::Duration; +use colorful::Colorful; use miette::IntoDiagnostic; use ockam::identity::models::CredentialAndPurposeKey; @@ -14,6 +15,8 @@ use ockam_multiaddr::MultiAddr; use ockam_node::compat::asynchronous::Mutex; use ockam_node::Context; +use super::{NodeManager, NodeManagerWorker}; +use crate::colors::color_primary; use crate::nodes::connection::Connection; use crate::nodes::models::relay::{CreateRelay, RelayInfo, ReturnTiming}; use crate::nodes::models::secure_channel::{ @@ -25,8 +28,7 @@ use crate::nodes::service::secure_channel::SecureChannelType; use crate::nodes::BackgroundNodeClient; use crate::session::replacer::{ReplacerOutcome, ReplacerOutputKind, SessionReplacer}; use crate::session::session::Session; - -use super::{NodeManager, NodeManagerWorker}; +use crate::{fmt_info, fmt_ok, fmt_warn}; impl NodeManagerWorker { pub async fn create_relay( @@ -374,6 +376,26 @@ impl SessionReplacer for RelaySessionReplacer { } } } + + async fn on_session_down(&self) { + if let Some(node_manager) = self.node_manager.upgrade() { + node_manager.cli_state.notify_message( + fmt_warn!( + "The Node lost the connection to the Relay at {}\n", + color_primary(&self.addr) + ) + &fmt_info!("Attempting to reconnect...\n"), + ); + } + } + + async fn on_session_replaced(&self) { + if let Some(node_manager) = self.node_manager.upgrade() { + node_manager.cli_state.notify_message(fmt_ok!( + "The Node has restored the connection to the Relay at {}\n", + color_primary(&self.addr) + )); + } + } } #[async_trait] diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs index a3e87076109..294c9286b83 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs @@ -1,11 +1,10 @@ -use crate::nodes::service::certificate_provider::ProjectCertificateProvider; use ockam_transport_tcp::new_certificate_provider_cache; use std::sync::{Arc, Weak}; use std::time::Duration; +use colorful::Colorful; use tokio::time::timeout; -use crate::DefaultAddress; use ockam::identity::{Identifier, SecureChannel}; use ockam::tcp::TcpInletOptions; use ockam::udp::{UdpPuncture, UdpPunctureNegotiation, UdpTransport}; @@ -18,14 +17,17 @@ use ockam_multiaddr::MultiAddr; use ockam_node::Context; use ockam_transport_tcp::TcpInlet; +use crate::colors::color_primary; use crate::error::ApiError; use crate::nodes::connection::Connection; +use crate::nodes::service::certificate_provider::ProjectCertificateProvider; use crate::nodes::service::SecureChannelType; use crate::nodes::NodeManager; use crate::session::replacer::{ AdditionalSessionReplacer, CurrentInletStatus, ReplacerOutcome, ReplacerOutputKind, SessionReplacer, MAX_RECOVERY_TIME, }; +use crate::{fmt_info, fmt_ok, fmt_warn, DefaultAddress}; pub(super) struct InletSessionReplacer { pub(super) node_manager: Weak, @@ -296,6 +298,28 @@ impl SessionReplacer for InletSessionReplacer { self.close_inlet().await; self.close_connection(&node_manager).await; } + + async fn on_session_down(&self) { + if let Some(node_manager) = self.node_manager.upgrade() { + node_manager.cli_state.notify_message( + fmt_warn!( + "The TCP Inlet at {} lost the connection to the TCP Outlet at {}\n", + color_primary(&self.listen_addr), + color_primary(&self.outlet_addr) + ) + &fmt_info!("Attempting to reconnect...\n"), + ); + } + } + + async fn on_session_replaced(&self) { + if let Some(node_manager) = self.node_manager.upgrade() { + node_manager.cli_state.notify_message(fmt_ok!( + "The TCP Inlet at {} has restored the connection to the TCP Outlet at {}\n", + color_primary(&self.listen_addr), + color_primary(&self.outlet_addr) + )); + } + } } #[async_trait] diff --git a/implementations/rust/ockam/ockam_api/src/session/replacer.rs b/implementations/rust/ockam/ockam_api/src/session/replacer.rs index 9ce755f4b99..55fdccf643e 100644 --- a/implementations/rust/ockam/ockam_api/src/session/replacer.rs +++ b/implementations/rust/ockam/ockam_api/src/session/replacer.rs @@ -14,6 +14,10 @@ pub trait SessionReplacer: Send + Sync + 'static { async fn create(&mut self) -> Result; async fn close(&mut self); + + async fn on_session_down(&self); + + async fn on_session_replaced(&self); } #[async_trait] diff --git a/implementations/rust/ockam/ockam_api/src/session/session.rs b/implementations/rust/ockam/ockam_api/src/session/session.rs index 0beebe7f158..9e8425a67a3 100644 --- a/implementations/rust/ockam/ockam_api/src/session/session.rs +++ b/implementations/rust/ockam/ockam_api/src/session/session.rs @@ -490,8 +490,10 @@ impl Session { sleep(ping_interval).await; } + // The session is down, or we reached the maximum number of failures _ => { - // We reached the maximum number of failures + let mut replacer = shared_state.replacer.lock().await; + if first_creation && !initial_connect_was_called { debug!(key = %key, "session is down. starting"); first_creation = false; @@ -499,6 +501,10 @@ impl Session { warn!(key = %key, "session unresponsive. replacing"); } + if !first_creation && pings.len() > 0 { + replacer.on_session_down().await; + } + shared_state.status.set_down(); *shared_state.last_outcome.lock().unwrap() = None; shared_state @@ -507,11 +513,12 @@ impl Session { pings.clear(); drop(pings); - let res = shared_state.replacer.lock().await.create().await; - - match res { + match replacer.create().await { Ok(replacer_outcome) => { info!(key = %key, ping_route = %replacer_outcome.ping_route, "replacement is up"); + if !first_creation { + replacer.on_session_replaced().await; + } shared_state.status.set_up(replacer_outcome.ping_route); shared_state diff --git a/implementations/rust/ockam/ockam_api/src/ui/terminal/fmt.rs b/implementations/rust/ockam/ockam_api/src/ui/terminal/fmt.rs index 855e5515235..4760661f52b 100644 --- a/implementations/rust/ockam/ockam_api/src/ui/terminal/fmt.rs +++ b/implementations/rust/ockam/ockam_api/src/ui/terminal/fmt.rs @@ -38,7 +38,7 @@ macro_rules! fmt_log { ($input:expr $(, $args:expr)* $(,)?) => { format!("{}{}", $crate::terminal::PADDING, - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -58,7 +58,7 @@ macro_rules! fmt_ok { "✔" .color($crate::colors::OckamColor::FmtOKBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -78,7 +78,7 @@ macro_rules! fmt_para { "│" .color($crate::colors::OckamColor::FmtINFOBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -98,7 +98,7 @@ macro_rules! fmt_list { "│" .color($crate::colors::OckamColor::FmtLISTBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -114,7 +114,7 @@ macro_rules! fmt_heading { ($input:expr $(, $args:expr)* $(,)?) => { format!("\n{}{}\n{}{}", $crate::terminal::PADDING, - format!($input, $($args),+), + format!($input, $($args),*), $crate::terminal::PADDING, "─".repeat($crate::terminal::get_separator_width()).dim().light_gray()) }; @@ -150,7 +150,7 @@ macro_rules! fmt_info { ">" .color($crate::colors::OckamColor::FmtINFOBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -170,7 +170,7 @@ macro_rules! fmt_warn { "!" .color($crate::colors::OckamColor::FmtWARNBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -190,6 +190,6 @@ macro_rules! fmt_err { "✗" .color($crate::colors::OckamColor::FmtERRORBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } diff --git a/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs b/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs index 0524986b588..7ab4c2d014b 100644 --- a/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs +++ b/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs @@ -1,15 +1,17 @@ use crate::terminal::{Terminal, TerminalWriter}; use crate::{fmt_log, CliState}; +use core::sync::atomic::AtomicBool; +use core::sync::atomic::Ordering::{Acquire, Release}; use indicatif::ProgressBar; use std::fmt::Debug; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; use tokio::select; use tokio::sync::broadcast::Receiver; use tokio::time::sleep; -const REPORTING_CHANNEL_POLL_DELAY: Duration = Duration::from_millis(100); +const REPORTING_CHANNEL_POLL_DELAY: Duration = Duration::from_millis(20); #[derive(Debug, Clone, PartialEq)] pub enum Notification { @@ -46,13 +48,12 @@ impl Notification { } pub struct NotificationHandle { - stop: Arc>, + stop: Arc, } impl Drop for NotificationHandle { fn drop(&mut self) { - let mut stop = self.stop.lock().unwrap(); - *stop = true; + self.stop.store(true, Release); } } @@ -67,14 +68,14 @@ pub struct NotificationHandler { /// User terminal terminal: Terminal, /// Flag to determine if the progress display should stop - stop: Arc>, + stop: Arc, } impl NotificationHandler { /// Create a new NotificationsProgress without progress bar. /// The notifications are printed as they arrive and stay on screen pub fn start(cli_state: &CliState, terminal: Terminal) -> NotificationHandle { - let stop = Arc::new(Mutex::new(false)); + let stop = Arc::new(AtomicBool::new(false)); let _self = NotificationHandler { rx: cli_state.subscribe_to_notifications(), terminal: terminal.clone(), @@ -90,7 +91,7 @@ impl NotificationHandler { loop { select! { _ = sleep(REPORTING_CHANNEL_POLL_DELAY) => { - if *self.stop.lock().unwrap() { + if self.stop.load(Acquire) { // Drain the channel while let Ok(notification) = self.rx.try_recv() { self.handle_notification(notification).await; diff --git a/implementations/rust/ockam/ockam_api/tests/common/session.rs b/implementations/rust/ockam/ockam_api/tests/common/session.rs index 86e2723d5be..8a25724d91e 100644 --- a/implementations/rust/ockam/ockam_api/tests/common/session.rs +++ b/implementations/rust/ockam/ockam_api/tests/common/session.rs @@ -171,6 +171,14 @@ impl SessionReplacer for MockReplacer { async fn close(&mut self) { self.close_impl() } + + async fn on_session_down(&self) { + info!("MockReplacer {} on_session_down called", self.name); + } + + async fn on_session_replaced(&self) { + info!("MockReplacer {} on_session_replaced called", self.name); + } } #[async_trait] diff --git a/implementations/rust/ockam/ockam_command/src/relay/create.rs b/implementations/rust/ockam/ockam_command/src/relay/create.rs index a27711c4774..c70db02840f 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/create.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/create.rs @@ -166,9 +166,9 @@ impl Command for CreateCommand { .write_line()?; } else { let plain = { - let from = color_primary(&at); - let to = color_primary(format!("/node/{}", &node.node_name())); - fmt_warn!("A relay was created at {to} but failed to connect to {from}\n") + let at = color_primary(&at); + let node = color_primary(format!("/node/{}", &node.node_name())); + fmt_warn!("A relay was created at {node} but failed to connect to {at}\n") + &fmt_info!("It will retry to connect automatically") }; opts.terminal diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs index d4ad70f2400..564d6e12acc 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs @@ -71,13 +71,11 @@ impl ParsedCommands { if len > 0 { opts.terminal.write_line("")?; } - for (idx, cmd) in self.commands.into_iter().enumerate() { + for cmd in self.commands.into_iter() { if cmd.is_valid(ctx, opts).await? { cmd.run(ctx, opts).await?; - if idx < len - 1 { - // Newline between commands - opts.terminal.write_line("")?; - } + // Newline after each command + opts.terminal.write_line("")?; } } Ok(()) diff --git a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs index f8ca201b158..e99047467a3 100644 --- a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs @@ -233,39 +233,37 @@ impl Command for CreateCommand { .await?; let created_message = format!( - "Created a new TCP Inlet in the Node {} bound to {}\n", + "Created a new TCP Inlet in the Node {} bound to {}", color_primary(&node_name), color_primary(cmd.from.to_string()), ); - let created_message = fmt_ok!("{}", created_message); - let mut plain = if cmd.no_connection_wait { - created_message - + &fmt_log!( + fmt_ok!("{created_message}\n") + + &fmt_info!( "It will automatically connect to the TCP Outlet at {} as soon as it is available\n", color_primary(&cmd.to) ) } else if inlet_status.status == ConnectionStatus::Up { - created_message + fmt_ok!("{created_message}\n") + &fmt_log!( "sending traffic to the TCP Outlet at {}\n", color_primary(&cmd.to) ) } else { - let mut msg = fmt_warn!("A TCP Inlet was created in the Node {} bound to {} but failed to connect to the TCP Outlet at {}\n", - color_primary(&node_name), - color_primary(cmd.from.to_string()), - color_primary(&cmd.to)); - - msg += &fmt_info!("It will retry to connect automatically\n"); - - msg + fmt_warn!("{created_message}\n") + + &fmt_log!( + "but it failed to connect to the TCP Outlet at {}\n", + color_primary(&cmd.to) + ) + + &fmt_info!( + "It will automatically connect to the TCP Outlet as soon as it is available\n", + ) }; if privileged { plain += &fmt_info!( - "This Inlet is operating in {} mode\n", + "This TCP Inlet is operating in {} mode\n", color_primary_alt("privileged".to_string()) ); } diff --git a/implementations/rust/ockam/ockam_node/src/router/shutdown.rs b/implementations/rust/ockam/ockam_node/src/router/shutdown.rs index 599cd259388..cfff5da9721 100644 --- a/implementations/rust/ockam/ockam_node/src/router/shutdown.rs +++ b/implementations/rust/ockam/ockam_node/src/router/shutdown.rs @@ -107,7 +107,7 @@ pub(super) async fn graceful( warn!(%timeout, "shutdown timeout reached; aborting node!"); // This works only because the state of the router is `Stopping` if sender.send(NodeMessage::AbortNode).await.is_err() { - warn!("failed to send node abort signal to router"); + warn!("couldn't send node abort signal to router"); } }); }