Skip to content

Commit

Permalink
feat: session replacer sends notifications on session lost/replaced
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianbenavides committed Jan 6, 2025
1 parent 5c8afa0 commit 15c8e51
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ pub struct CliState {
database: SqlxDatabase,
application_database: SqlxDatabase,
exporting_enabled: ExportingEnabled,
/// Broadcast channel to be notified of major events during a process supported by the
/// CliState API
/// Broadcast channel to be notified of major events during a process supported by the CliState API
notifications: Sender<Notification>,
}

Expand Down
26 changes: 24 additions & 2 deletions implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{Arc, Weak};
use std::time::Duration;

use colorful::Colorful;
use miette::IntoDiagnostic;

use ockam::identity::models::CredentialAndPurposeKey;
Expand All @@ -14,6 +15,8 @@ use ockam_multiaddr::MultiAddr;
use ockam_node::compat::asynchronous::Mutex;
use ockam_node::Context;

use super::{NodeManager, NodeManagerWorker};
use crate::colors::color_primary;
use crate::nodes::connection::Connection;
use crate::nodes::models::relay::{CreateRelay, RelayInfo, ReturnTiming};
use crate::nodes::models::secure_channel::{
Expand All @@ -25,8 +28,7 @@ use crate::nodes::service::secure_channel::SecureChannelType;
use crate::nodes::BackgroundNodeClient;
use crate::session::replacer::{ReplacerOutcome, ReplacerOutputKind, SessionReplacer};
use crate::session::session::Session;

use super::{NodeManager, NodeManagerWorker};
use crate::{fmt_info, fmt_ok, fmt_warn};

impl NodeManagerWorker {
pub async fn create_relay(
Expand Down Expand Up @@ -374,6 +376,26 @@ impl SessionReplacer for RelaySessionReplacer {
}
}
}

async fn on_session_down(&self) {
if let Some(node_manager) = self.node_manager.upgrade() {
node_manager.cli_state.notify_message(
fmt_warn!(
"The Node lost the connection to the Relay at {}\n",
color_primary(&self.addr)
) + &fmt_info!("Attempting to reconnect...\n"),
);
}
}

async fn on_session_replaced(&self) {
if let Some(node_manager) = self.node_manager.upgrade() {
node_manager.cli_state.notify_message(fmt_ok!(
"The Node has restored the connection to the Relay at {}\n",
color_primary(&self.addr)
));
}
}
}

#[async_trait]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::nodes::service::certificate_provider::ProjectCertificateProvider;
use ockam_transport_tcp::new_certificate_provider_cache;
use std::sync::{Arc, Weak};
use std::time::Duration;

use colorful::Colorful;
use tokio::time::timeout;

use crate::DefaultAddress;
use ockam::identity::{Identifier, SecureChannel};
use ockam::tcp::TcpInletOptions;
use ockam::udp::{UdpPuncture, UdpPunctureNegotiation, UdpTransport};
Expand All @@ -18,14 +17,17 @@ use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
use ockam_transport_tcp::TcpInlet;

use crate::colors::color_primary;
use crate::error::ApiError;
use crate::nodes::connection::Connection;
use crate::nodes::service::certificate_provider::ProjectCertificateProvider;
use crate::nodes::service::SecureChannelType;
use crate::nodes::NodeManager;
use crate::session::replacer::{
AdditionalSessionReplacer, CurrentInletStatus, ReplacerOutcome, ReplacerOutputKind,
SessionReplacer, MAX_RECOVERY_TIME,
};
use crate::{fmt_info, fmt_ok, fmt_warn, DefaultAddress};

pub(super) struct InletSessionReplacer {
pub(super) node_manager: Weak<NodeManager>,
Expand Down Expand Up @@ -296,6 +298,28 @@ impl SessionReplacer for InletSessionReplacer {
self.close_inlet().await;
self.close_connection(&node_manager).await;
}

async fn on_session_down(&self) {
if let Some(node_manager) = self.node_manager.upgrade() {
node_manager.cli_state.notify_message(
fmt_warn!(
"The TCP Inlet at {} lost the connection to the TCP Outlet at {}\n",
color_primary(&self.listen_addr),
color_primary(&self.outlet_addr)
) + &fmt_info!("Attempting to reconnect...\n"),
);
}
}

async fn on_session_replaced(&self) {
if let Some(node_manager) = self.node_manager.upgrade() {
node_manager.cli_state.notify_message(fmt_ok!(
"The TCP Inlet at {} has restored the connection to the TCP Outlet at {}\n",
color_primary(&self.listen_addr),
color_primary(&self.outlet_addr)
));
}
}
}

#[async_trait]
Expand Down
4 changes: 4 additions & 0 deletions implementations/rust/ockam/ockam_api/src/session/replacer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub trait SessionReplacer: Send + Sync + 'static {
async fn create(&mut self) -> Result<ReplacerOutcome>;

async fn close(&mut self);

async fn on_session_down(&self);

async fn on_session_replaced(&self);
}

#[async_trait]
Expand Down
15 changes: 11 additions & 4 deletions implementations/rust/ockam/ockam_api/src/session/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,15 +490,21 @@ impl Session {

sleep(ping_interval).await;
}
// The session is down, or we reached the maximum number of failures
_ => {
// We reached the maximum number of failures
let mut replacer = shared_state.replacer.lock().await;

if first_creation && !initial_connect_was_called {
debug!(key = %key, "session is down. starting");
first_creation = false;
} else {
warn!(key = %key, "session unresponsive. replacing");
}

if !first_creation && pings.len() > 0 {
replacer.on_session_down().await;
}

shared_state.status.set_down();
*shared_state.last_outcome.lock().unwrap() = None;
shared_state
Expand All @@ -507,11 +513,12 @@ impl Session {
pings.clear();
drop(pings);

let res = shared_state.replacer.lock().await.create().await;

match res {
match replacer.create().await {
Ok(replacer_outcome) => {
info!(key = %key, ping_route = %replacer_outcome.ping_route, "replacement is up");
if !first_creation {
replacer.on_session_replaced().await;
}

shared_state.status.set_up(replacer_outcome.ping_route);
shared_state
Expand Down
16 changes: 8 additions & 8 deletions implementations/rust/ockam/ockam_api/src/ui/terminal/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ macro_rules! fmt_log {
($input:expr $(, $args:expr)* $(,)?) => {
format!("{}{}",
$crate::terminal::PADDING,
format!($input, $($args),+))
format!($input, $($args),*))
};
}

Expand All @@ -58,7 +58,7 @@ macro_rules! fmt_ok {
"✔"
.color($crate::colors::OckamColor::FmtOKBackground.color())
.bold(),
format!($input, $($args),+))
format!($input, $($args),*))
};
}

Expand All @@ -78,7 +78,7 @@ macro_rules! fmt_para {
"│"
.color($crate::colors::OckamColor::FmtINFOBackground.color())
.bold(),
format!($input, $($args),+))
format!($input, $($args),*))
};
}

Expand All @@ -98,7 +98,7 @@ macro_rules! fmt_list {
"│"
.color($crate::colors::OckamColor::FmtLISTBackground.color())
.bold(),
format!($input, $($args),+))
format!($input, $($args),*))
};
}

Expand All @@ -114,7 +114,7 @@ macro_rules! fmt_heading {
($input:expr $(, $args:expr)* $(,)?) => {
format!("\n{}{}\n{}{}",
$crate::terminal::PADDING,
format!($input, $($args),+),
format!($input, $($args),*),
$crate::terminal::PADDING,
"─".repeat($crate::terminal::get_separator_width()).dim().light_gray())
};
Expand Down Expand Up @@ -150,7 +150,7 @@ macro_rules! fmt_info {
">"
.color($crate::colors::OckamColor::FmtINFOBackground.color())
.bold(),
format!($input, $($args),+))
format!($input, $($args),*))
};
}

Expand All @@ -170,7 +170,7 @@ macro_rules! fmt_warn {
"!"
.color($crate::colors::OckamColor::FmtWARNBackground.color())
.bold(),
format!($input, $($args),+))
format!($input, $($args),*))
};
}

Expand All @@ -190,6 +190,6 @@ macro_rules! fmt_err {
"✗"
.color($crate::colors::OckamColor::FmtERRORBackground.color())
.bold(),
format!($input, $($args),+))
format!($input, $($args),*))
};
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use crate::terminal::{Terminal, TerminalWriter};
use crate::{fmt_log, CliState};
use core::sync::atomic::AtomicBool;
use core::sync::atomic::Ordering::{Acquire, Release};
use indicatif::ProgressBar;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;
use tokio::select;

use tokio::sync::broadcast::Receiver;
use tokio::time::sleep;

const REPORTING_CHANNEL_POLL_DELAY: Duration = Duration::from_millis(100);
const REPORTING_CHANNEL_POLL_DELAY: Duration = Duration::from_millis(20);

#[derive(Debug, Clone, PartialEq)]
pub enum Notification {
Expand Down Expand Up @@ -46,13 +48,12 @@ impl Notification {
}

pub struct NotificationHandle {
stop: Arc<Mutex<bool>>,
stop: Arc<AtomicBool>,
}

impl Drop for NotificationHandle {
fn drop(&mut self) {
let mut stop = self.stop.lock().unwrap();
*stop = true;
self.stop.store(true, Release);
}
}

Expand All @@ -67,14 +68,14 @@ pub struct NotificationHandler<T: TerminalWriter + Debug + Send + 'static> {
/// User terminal
terminal: Terminal<T>,
/// Flag to determine if the progress display should stop
stop: Arc<Mutex<bool>>,
stop: Arc<AtomicBool>,
}

impl<T: TerminalWriter + Debug + Send + 'static> NotificationHandler<T> {
/// Create a new NotificationsProgress without progress bar.
/// The notifications are printed as they arrive and stay on screen
pub fn start(cli_state: &CliState, terminal: Terminal<T>) -> NotificationHandle {
let stop = Arc::new(Mutex::new(false));
let stop = Arc::new(AtomicBool::new(false));
let _self = NotificationHandler {
rx: cli_state.subscribe_to_notifications(),
terminal: terminal.clone(),
Expand All @@ -90,7 +91,7 @@ impl<T: TerminalWriter + Debug + Send + 'static> NotificationHandler<T> {
loop {
select! {
_ = sleep(REPORTING_CHANNEL_POLL_DELAY) => {
if *self.stop.lock().unwrap() {
if self.stop.load(Acquire) {
// Drain the channel
while let Ok(notification) = self.rx.try_recv() {
self.handle_notification(notification).await;
Expand Down
8 changes: 8 additions & 0 deletions implementations/rust/ockam/ockam_api/tests/common/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ impl SessionReplacer for MockReplacer {
async fn close(&mut self) {
self.close_impl()
}

async fn on_session_down(&self) {
info!("MockReplacer {} on_session_down called", self.name);
}

async fn on_session_replaced(&self) {
info!("MockReplacer {} on_session_replaced called", self.name);
}
}

#[async_trait]
Expand Down
6 changes: 3 additions & 3 deletions implementations/rust/ockam/ockam_command/src/relay/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ impl Command for CreateCommand {
.write_line()?;
} else {
let plain = {
let from = color_primary(&at);
let to = color_primary(format!("/node/{}", &node.node_name()));
fmt_warn!("A relay was created at {to} but failed to connect to {from}\n")
let at = color_primary(&at);
let node = color_primary(format!("/node/{}", &node.node_name()));
fmt_warn!("A relay was created at {node} but failed to connect to {at}\n")
+ &fmt_info!("It will retry to connect automatically")
};
opts.terminal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,11 @@ impl ParsedCommands {
if len > 0 {
opts.terminal.write_line("")?;
}
for (idx, cmd) in self.commands.into_iter().enumerate() {
for cmd in self.commands.into_iter() {
if cmd.is_valid(ctx, opts).await? {
cmd.run(ctx, opts).await?;
if idx < len - 1 {
// Newline between commands
opts.terminal.write_line("")?;
}
// Newline after each command
opts.terminal.write_line("")?;
}
}
Ok(())
Expand Down
Loading

0 comments on commit 15c8e51

Please sign in to comment.