From 6108b3d2cfdbe7c49ca32a5fa7887e1544ff92f2 Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Thu, 12 Dec 2024 11:07:01 +0100 Subject: [PATCH] feat(rust): simplify `node create` execution --- .../ockam_api/src/cli_state/cli_state.rs | 33 +- .../ockam/ockam_api/src/nodes/models/node.rs | 12 +- .../nodes/service/background_node_client.rs | 2 +- .../ockam_command/src/authority/create.rs | 6 +- .../ockam_command/src/command_global_opts.rs | 2 +- .../ockam/ockam_command/src/node/create.rs | 348 ++++++++++-------- .../src/node/create/background.rs | 67 +--- .../ockam_command/src/node/create/config.rs | 182 +++++---- .../src/node/create/foreground.rs | 34 +- .../rust/ockam/ockam_command/src/node/show.rs | 32 +- .../rust/ockam/ockam_command/src/node/util.rs | 133 ++++--- .../src/run/parser/resource/node.rs | 1 - .../src/run/parser/resource/project_enroll.rs | 4 +- .../src/run/parser/resource/traits.rs | 73 +--- .../ockam_command/src/util/foreground_args.rs | 1 + .../rust/ockam/ockam_command/src/util/mod.rs | 3 + .../ockam_command/tests/bats/local/nodes.bats | 20 + .../tests/bats/orchestrator_enroll/nodes.bats | 20 + .../database/database_configuration.rs | 2 + 19 files changed, 513 insertions(+), 462 deletions(-) diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs index f221068a20d..a5311c8438b 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs @@ -4,7 +4,7 @@ use tokio::sync::broadcast::{channel, Receiver, Sender}; use ockam::SqlxDatabase; use ockam_core::env::get_env_with_default; -use ockam_node::database::DatabaseConfiguration; +use ockam_node::database::{DatabaseConfiguration, OCKAM_IN_MEMORY}; use ockam_node::Executor; use crate::cli_state::error::Result; @@ -71,6 +71,13 @@ impl CliState { Self::make_database_configuration(&self.dir) } + pub fn is_using_in_memory_database(&self) -> Result { + match self.database_configuration()? { + DatabaseConfiguration::SqliteInMemory { .. } => Ok(true), + _ => Ok(false), + } + } + pub fn is_database_path(&self, path: &Path) -> bool { let database_configuration = self.database_configuration().ok(); match database_configuration { @@ -248,9 +255,15 @@ impl CliState { pub(super) fn make_database_configuration(root_path: &Path) -> Result { match DatabaseConfiguration::postgres()? { Some(configuration) => Ok(configuration), - None => Ok(DatabaseConfiguration::sqlite( - root_path.join("database.sqlite3").as_path(), - )), + None => { + if get_env_with_default::(OCKAM_IN_MEMORY, false)? { + Ok(DatabaseConfiguration::sqlite_in_memory()) + } else { + Ok(DatabaseConfiguration::sqlite( + root_path.join("database.sqlite3").as_path(), + )) + } + } } } @@ -260,9 +273,15 @@ impl CliState { ) -> Result { match DatabaseConfiguration::postgres()? { Some(configuration) => Ok(configuration), - None => Ok(DatabaseConfiguration::sqlite( - root_path.join("application_database.sqlite3").as_path(), - )), + None => { + if get_env_with_default::(OCKAM_IN_MEMORY, false)? { + Ok(DatabaseConfiguration::sqlite_in_memory()) + } else { + Ok(DatabaseConfiguration::sqlite( + root_path.join("application_database.sqlite3").as_path(), + )) + } + } } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/node.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/node.rs index 22d03e20247..4b844c996fc 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/node.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/node.rs @@ -23,15 +23,19 @@ use std::fmt::{Display, Formatter}; pub struct NodeStatus { #[n(1)] pub name: String, #[n(2)] pub identifier: Identifier, - #[n(3)] pub status: NodeProcessStatus, + #[n(3)] pub process_status: NodeProcessStatus, } impl NodeStatus { - pub fn new(name: impl Into, identifier: Identifier, status: NodeProcessStatus) -> Self { + pub fn new( + name: impl Into, + identifier: Identifier, + process_status: NodeProcessStatus, + ) -> Self { Self { name: name.into(), identifier, - status, + process_status, } } } @@ -41,7 +45,7 @@ impl From<&NodeInfo> for NodeStatus { Self { name: node.name(), identifier: node.identifier(), - status: node.status(), + process_status: node.status(), } } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/background_node_client.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/background_node_client.rs index 3c37dc8e94c..14ed3cefddb 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/background_node_client.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/background_node_client.rs @@ -3,8 +3,8 @@ use std::time::Duration; use miette::{miette, IntoDiagnostic}; use minicbor::{Decode, Encode}; -use ockam::identity::get_default_timeout; +use ockam::identity::get_default_timeout; use ockam::tcp::{TcpConnection, TcpConnectionOptions, TcpTransport}; use ockam_core::api::{Reply, Request}; use ockam_core::Route; diff --git a/implementations/rust/ockam/ockam_command/src/authority/create.rs b/implementations/rust/ockam/ockam_command/src/authority/create.rs index bc729cab171..834edabc321 100644 --- a/implementations/rust/ockam/ockam_command/src/authority/create.rs +++ b/implementations/rust/ockam/ockam_command/src/authority/create.rs @@ -6,6 +6,7 @@ use colorful::Colorful; use miette::{miette, IntoDiagnostic, WrapErr}; use serde::{Deserialize, Serialize}; use tokio::fs::read_to_string; +use tokio::process::Child; use tokio_retry::strategy::FixedInterval; use tokio_retry::Retry; use tracing::{debug, error, info}; @@ -140,7 +141,7 @@ impl CreateCommand { pub(crate) async fn spawn_background_node( &self, opts: &CommandGlobalOpts, - ) -> miette::Result<()> { + ) -> miette::Result { if !self.skip_is_running_check { self.guard_node_is_not_already_running(opts).await?; } @@ -282,7 +283,8 @@ impl CreateCommand { /// Given a Context start a node in a new OS process async fn create_background_node(&self, opts: CommandGlobalOpts) -> miette::Result<()> { // Spawn node in another, new process - self.spawn_background_node(&opts).await + self.spawn_background_node(&opts).await?; + Ok(()) } /// Start an authority node: diff --git a/implementations/rust/ockam/ockam_command/src/command_global_opts.rs b/implementations/rust/ockam/ockam_command/src/command_global_opts.rs index 935adb3b539..3e58f85f208 100644 --- a/implementations/rust/ockam/ockam_command/src/command_global_opts.rs +++ b/implementations/rust/ockam/ockam_command/src/command_global_opts.rs @@ -31,7 +31,7 @@ pub struct CommandGlobalOpts { pub state: CliState, pub terminal: Terminal>, pub rt: Arc, - tracing_guard: Option>, + pub tracing_guard: Option>, } impl CommandGlobalOpts { diff --git a/implementations/rust/ockam/ockam_command/src/node/create.rs b/implementations/rust/ockam/ockam_command/src/node/create.rs index 381a7b90636..89349ae5e76 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create.rs @@ -16,7 +16,7 @@ use ockam_api::cli_state::random_name; use ockam_api::colors::{color_error, color_primary}; use ockam_api::nodes::models::transport::Port; use ockam_api::terminal::notification::NotificationHandler; -use ockam_api::{fmt_log, fmt_ok, CliState}; +use ockam_api::{fmt_log, fmt_ok}; use ockam_core::{opentelemetry_context_parser, OpenTelemetryContext}; use ockam_node::Context; use opentelemetry::trace::TraceContextExt; @@ -175,9 +175,9 @@ impl Command for CreateCommand { #[instrument(skip_all)] fn run(mut self, opts: CommandGlobalOpts) -> miette::Result<()> { - self.parse_args(&opts)?; if self.should_run_config() { async_cmd(&self.name(), opts.clone(), |ctx| async move { + self.parse_args(&opts).await?; self.run_config(&ctx, opts).await }) } else if self.foreground_args.foreground { @@ -189,29 +189,26 @@ impl Command for CreateCommand { local_cmd(embedded_node_that_is_not_stopped( opts.rt.clone(), |ctx| async move { - self.name = self.get_default_node_name(&opts.state).await; + self.parse_args(&opts).await?; self.foreground_mode(&ctx, opts).await }, )) } else { async_cmd(&self.name(), opts.clone(), |ctx| async move { - self.name = self.get_default_node_name(&opts.state).await; + self.parse_args(&opts).await?; self.background_mode(&ctx, opts).await }) } } async fn async_run(mut self, ctx: &Context, opts: CommandGlobalOpts) -> Result<()> { - self.parse_args(&opts)?; + self.parse_args(&opts).await?; if self.should_run_config() { self.run_config(ctx, opts).await? + } else if self.foreground_args.foreground { + self.foreground_mode(ctx, opts).await? } else { - self.name = self.get_default_node_name(&opts.state).await; - if self.foreground_args.foreground { - self.foreground_mode(ctx, opts).await? - } else { - self.background_mode(ctx, opts).await? - } + self.background_mode(ctx, opts).await? } Ok(()) } @@ -230,20 +227,14 @@ impl CreateCommand { return false; } - let name_arg_is_a_config = self.name_arg_is_a_config(); - - let no_config_args = !name_arg_is_a_config + if !self.name_arg_is_a_config() && self.config_args.configuration.is_none() - && self.config_args.enrollment_ticket.is_none(); - if no_config_args { + && self.config_args.enrollment_ticket.is_none() + { return false; } - let name_arg_is_default_node_name_or_config = - self.name.eq(DEFAULT_NODE_NAME) || name_arg_is_a_config; - name_arg_is_default_node_name_or_config - || self.config_args.configuration.is_some() - || self.config_args.enrollment_ticket.is_some() + true } /// Return true if the `name` argument is a URL, a file path, or an inline config @@ -260,7 +251,12 @@ impl CreateCommand { !self.name_arg_is_a_config() } - fn parse_args(&mut self, opts: &CommandGlobalOpts) -> miette::Result<()> { + async fn parse_args(&mut self, opts: &CommandGlobalOpts) -> miette::Result<()> { + // return error if trying to create an in-memory node in background mode + if !self.foreground_args.foreground && opts.state.is_using_in_memory_database()? { + return Err(miette!("Only foreground nodes can be created in-memory",)); + } + // return error if there are duplicated variables let mut variables = std::collections::HashMap::new(); for (key, value) in self.config_args.variables.iter() { @@ -274,7 +270,7 @@ impl CreateCommand { variables.insert(key.clone(), value.clone()); } - // return error if the name arg is not a valid node name, and is not a config + // return error if the name arg is not a config and is not a valid node name let re = Regex::new(r"[^\w_-]").into_diagnostic()?; if self.name_arg_is_a_node_name() && re.is_match(&self.name) { return Err(miette!( @@ -284,6 +280,55 @@ impl CreateCommand { )); } + // return error if the name arg is a config and the config arg is also set + if self.name_arg_is_a_config() && self.config_args.configuration.is_some() { + return Err(miette!( + "Cannot set both {} and {}", + color_primary("NAME_OR_CONFIGURATION"), + color_primary("--configuration"), + )); + } + + // if the name arg is a config, move it to the configuration field and replace + // the node name with its default value + if self.name_arg_is_a_config() { + self.config_args.configuration = Some(self.get_node_config_contents().await?); + self.name = DEFAULT_NODE_NAME.to_string(); + } + + self.name = { + let mut name = if let Ok(node_config) = self.parse_node_config().await { + node_config.node.name().unwrap_or(self.name.clone()) + } else { + self.name.clone() + }; + if name == DEFAULT_NODE_NAME { + name = random_name(); + if let Ok(default_node) = opts.state.get_default_node().await { + if !default_node.is_running() { + // The default node was stopped, so we can reuse the name + name = default_node.name(); + } + } + } + name + }; + + if !self.skip_is_running_check + && opts + .state + .get_node(&self.name) + .await + .ok() + .map(|n| n.is_running()) + .unwrap_or(false) + { + return Err(miette!( + "Node {} is already running", + color_primary(&self.name) + )); + } + if self.http_server { print_warning_for_deprecated_flag_no_effect(opts, "http-server")?; } @@ -318,34 +363,20 @@ impl CreateCommand { ) .into_diagnostic()?; } - writeln!( - buf, - "\n{}", - fmt_log!( - "To see more details on this Node, run: {}", - color_primary(format!("ockam node show {}", node_name)) - ) - ) - .into_diagnostic()?; - Ok(buf) - } - async fn get_default_node_name(&self, state: &CliState) -> String { - let mut name = if self.name_arg_is_a_config() { - DEFAULT_NODE_NAME.to_string() - } else { - self.name.clone() - }; - if name == DEFAULT_NODE_NAME { - name = random_name(); - if let Ok(default_node) = state.get_default_node().await { - if !default_node.is_running() { - // The default node was stopped, so we can reuse the name - name = default_node.name(); - } - } + if self.foreground_args.child_process { + writeln!( + buf, + "\n{}", + fmt_log!( + "To see more details on this Node, run: {}", + color_primary(format!("ockam node show {}", node_name)) + ) + ) + .into_diagnostic()?; } - name + + Ok(buf) } async fn get_or_create_identity( @@ -385,9 +416,13 @@ fn parse_launch_config(config_or_path: &str) -> Result { #[cfg(test)] mod tests { - use crate::run::parser::resource::utils::parse_cmd_from_args; - use super::*; + use crate::run::parser::resource::utils::parse_cmd_from_args; + use crate::GlobalArgs; + use ockam_api::output::OutputFormat; + use ockam_api::terminal::Terminal; + use ockam_api::CliState; + use std::sync::Arc; #[test] fn command_can_be_parsed_from_name() { @@ -447,7 +482,7 @@ mod tests { let cmd = CreateCommand::default(); assert!(!cmd.should_run_config()); - // True if the name is the default node name and the configuration is set + // False the name is the default node name, and the configuration is set let cmd = CreateCommand { config_args: ConfigArgs { configuration: Some(config_path.clone()), @@ -457,7 +492,7 @@ mod tests { }; assert!(cmd.should_run_config()); - // True if the name is the default node name and the enrollment ticket is set + // True the name is the default node name, and the enrollment ticket is set let cmd = CreateCommand { config_args: ConfigArgs { enrollment_ticket: Some("ticket".to_string()), @@ -467,7 +502,7 @@ mod tests { }; assert!(cmd.should_run_config()); - // True if the name is not the default node name and the enrollment ticket is set + // True the name is not the default node name and the enrollment ticket is set let cmd = CreateCommand { name: "node".to_string(), config_args: ConfigArgs { @@ -478,7 +513,7 @@ mod tests { }; assert!(cmd.should_run_config()); - // True if the name is not the default node name and the inline config is set + // True the name is not the default node name and the inline config is set let cmd = CreateCommand { name: "node".to_string(), config_args: ConfigArgs { @@ -489,21 +524,21 @@ mod tests { }; assert!(cmd.should_run_config()); - // True if the name is a file path + // True if foreground and the name is a file path let cmd = CreateCommand { name: config_path.clone(), ..CreateCommand::default() }; assert!(cmd.should_run_config()); - // True if the name is a URL + // True and the name is a URL let cmd = CreateCommand { name: "http://localhost:8080".to_string(), ..CreateCommand::default() }; assert!(cmd.should_run_config()); - // False if the name is a node name and no config is set + // False the name is a node name, and no config is set let cmd = CreateCommand { name: "node".to_string(), ..CreateCommand::default() @@ -511,102 +546,121 @@ mod tests { assert!(!cmd.should_run_config()); } - #[tokio::test] - async fn get_default_node_name_no_previous_state() { - let state = CliState::test().await.unwrap(); - let cmd = CreateCommand::default(); - let name = cmd.get_default_node_name(&state).await; - assert_ne!(name, DEFAULT_NODE_NAME); - - let cmd = CreateCommand { - name: r#"{tcp-outlet: {to: "5500"}}"#.to_string(), - ..Default::default() - }; - let name = cmd.get_default_node_name(&state).await; - assert_ne!(name, DEFAULT_NODE_NAME); - assert_ne!(name, cmd.name); - - let cmd = CreateCommand { - config_args: ConfigArgs { - configuration: Some(r#"{tcp-outlet: {to: "5500"}}"#.to_string()), + #[test] + fn get_default_node_name_no_previous_state() { + let rt = Arc::new(tokio::runtime::Runtime::new().unwrap()); + let rt_moved = rt.clone(); + rt.block_on(async { + let opts = CommandGlobalOpts { + state: CliState::test().await.unwrap(), + terminal: Terminal::new(false, false, false, true, false, OutputFormat::Plain), + rt: rt_moved, + global_args: GlobalArgs::default(), + tracing_guard: None, + }; + let mut cmd = CreateCommand::default(); + cmd.parse_args(&opts).await.unwrap(); + assert_ne!(cmd.name, DEFAULT_NODE_NAME); + + let mut cmd = CreateCommand { + name: r#"{tcp-outlet: {to: "5500"}}"#.to_string(), ..Default::default() - }, - ..Default::default() - }; - let name = cmd.get_default_node_name(&state).await; - assert_ne!(name, DEFAULT_NODE_NAME); - assert_ne!(name, cmd.name); - - let cmd = CreateCommand { - name: "n1".to_string(), - ..Default::default() - }; - let name = cmd.get_default_node_name(&state).await; - assert_eq!(name, cmd.name); + }; + cmd.parse_args(&opts).await.unwrap(); + assert_ne!(cmd.name, DEFAULT_NODE_NAME); + + let mut cmd = CreateCommand { + config_args: ConfigArgs { + configuration: Some(r#"{tcp-outlet: {to: "5500"}}"#.to_string()), + ..Default::default() + }, + ..Default::default() + }; + cmd.parse_args(&opts).await.unwrap(); + assert_ne!(cmd.name, DEFAULT_NODE_NAME); - let cmd = CreateCommand { - name: "n1".to_string(), - config_args: ConfigArgs { - configuration: Some(r#"{tcp-outlet: {to: "5500"}}"#.to_string()), + let mut cmd = CreateCommand { + name: "n1".to_string(), ..Default::default() - }, - ..Default::default() - }; - let name = cmd.get_default_node_name(&state).await; - assert_eq!(name, cmd.name); + }; + cmd.parse_args(&opts).await.unwrap(); + assert_eq!(cmd.name, "n1"); + + let mut cmd = CreateCommand { + name: "n1".to_string(), + config_args: ConfigArgs { + configuration: Some(r#"{tcp-outlet: {to: "5500"}}"#.to_string()), + ..Default::default() + }, + ..Default::default() + }; + cmd.parse_args(&opts).await.unwrap(); + assert_eq!(cmd.name, "n1"); + }); } - #[tokio::test] - async fn get_default_node_name_with_previous_state() { - let state = CliState::test().await.unwrap(); - let default_node_name = "n1"; - state.create_node(default_node_name).await.unwrap(); - - let cmd = CreateCommand::default(); - let name = cmd.get_default_node_name(&state).await; - assert_ne!(name, default_node_name); - - // There is a default node stored in the state, but it's stopped. - // All the later calls should return the default node name. - state.stop_node(default_node_name).await.unwrap(); - let cmd = CreateCommand::default(); - let name = cmd.get_default_node_name(&state).await; - assert_eq!(name, default_node_name); - - let cmd = CreateCommand { - name: r#"{tcp-outlet: {to: "5500"}}"#.to_string(), - ..Default::default() - }; - let name = cmd.get_default_node_name(&state).await; - assert_eq!(name, default_node_name); - - let cmd = CreateCommand { - config_args: ConfigArgs { - configuration: Some(r#"{tcp-outlet: {to: "5500"}}"#.to_string()), + #[test] + fn get_default_node_name_with_previous_state() { + let rt = Arc::new(tokio::runtime::Runtime::new().unwrap()); + let rt_moved = rt.clone(); + rt.block_on(async { + let opts = CommandGlobalOpts { + state: CliState::test().await.unwrap(), + terminal: Terminal::new(false, false, false, true, false, OutputFormat::Plain), + rt: rt_moved, + global_args: GlobalArgs::default(), + tracing_guard: None, + }; + + let default_node_name = "n1"; + opts.state.create_node(default_node_name).await.unwrap(); + + let mut cmd = CreateCommand::default(); + cmd.parse_args(&opts).await.unwrap(); + assert_ne!(cmd.name, default_node_name); + + // There is a default node stored in the state, but it's stopped. + // All the later calls should return the default node name. + opts.state.stop_node(default_node_name).await.unwrap(); + let mut cmd = CreateCommand::default(); + cmd.parse_args(&opts).await.unwrap(); + assert_eq!(cmd.name, default_node_name); + + let mut cmd = CreateCommand { + name: r#"{tcp-outlet: {to: "5500"}}"#.to_string(), ..Default::default() - }, - ..Default::default() - }; - let name = cmd.get_default_node_name(&state).await; - assert_eq!(name, default_node_name); - - // Unless we explicitly set a name - let cmd = CreateCommand { - name: "n2".to_string(), - ..Default::default() - }; - let name = cmd.get_default_node_name(&state).await; - assert_eq!(name, cmd.name); + }; + cmd.parse_args(&opts).await.unwrap(); + assert_eq!(cmd.name, default_node_name); + + let mut cmd = CreateCommand { + config_args: ConfigArgs { + configuration: Some(r#"{tcp-outlet: {to: "5500"}}"#.to_string()), + ..Default::default() + }, + ..Default::default() + }; + cmd.parse_args(&opts).await.unwrap(); + assert_eq!(cmd.name, default_node_name); - let cmd = CreateCommand { - name: "n2".to_string(), - config_args: ConfigArgs { - configuration: Some(r#"{tcp-outlet: {to: "5500"}}"#.to_string()), + // Unless we explicitly set a name + let mut cmd = CreateCommand { + name: "n2".to_string(), ..Default::default() - }, - ..Default::default() - }; - let name = cmd.get_default_node_name(&state).await; - assert_eq!(name, cmd.name); + }; + cmd.parse_args(&opts).await.unwrap(); + assert_eq!(cmd.name, "n2"); + + let mut cmd = CreateCommand { + name: "n2".to_string(), + config_args: ConfigArgs { + configuration: Some(r#"{tcp-outlet: {to: "5500"}}"#.to_string()), + ..Default::default() + }, + ..Default::default() + }; + cmd.parse_args(&opts).await.unwrap(); + assert_eq!(cmd.name, "n2"); + }); } } diff --git a/implementations/rust/ockam/ockam_command/src/node/create/background.rs b/implementations/rust/ockam/ockam_command/src/node/create/background.rs index 4397e729301..a9c073610a7 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/background.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/background.rs @@ -1,16 +1,12 @@ -use colorful::Colorful; -use miette::{miette, IntoDiagnostic}; +use miette::miette; use tracing::{debug, instrument}; use ockam::Context; use ockam_api::cli_state::journeys::{JourneyEvent, NODE_NAME}; -use ockam_api::colors::color_primary; -use ockam_api::fmt_warn; use ockam_api::logs::CurrentSpan; -use ockam_api::nodes::BackgroundNodeClient; use ockam_core::OpenTelemetryContext; -use crate::node::show::get_node_resources; +use crate::node::show::wait_until_node_is_up; use crate::node::util::spawn_node; use crate::node::CreateCommand; use crate::CommandGlobalOpts; @@ -27,68 +23,33 @@ impl CreateCommand { debug!(%node_name, "creating node in background mode"); CurrentSpan::set_attribute(NODE_NAME, node_name.as_str()); - // Early checks if self.foreground_args.child_process { return Err(miette!( "Cannot create a background node from another background node" )); } - if opts - .state - .get_node(&node_name) - .await - .ok() - .map(|n| n.is_running()) - .unwrap_or(false) - { - return Err(miette!( - "Node {} has been already created", - color_primary(&node_name) - )); - } - self.get_or_create_identity(&opts, &self.identity).await?; - // Create node and wait for it to be up - let cmd_with_trace_context = CreateCommand { + // Create node and wait for it to be up and configured + let cmd = CreateCommand { opentelemetry_context: self .opentelemetry_context .clone() .or(Some(OpenTelemetryContext::current())), ..self.clone() }; - cmd_with_trace_context.spawn_background_node(&opts).await?; - let mut node = BackgroundNodeClient::create_to_node(ctx, &opts.state, &node_name).await?; - let node_resources = get_node_resources(ctx, &opts.state, &mut node, true).await?; - opts.state - .add_journey_event( - JourneyEvent::NodeCreated, - [(NODE_NAME, node_name.clone())].into(), - ) - .await?; - // Output - if !node_resources.status.is_running() { - opts.terminal.write_line(fmt_warn!( - "Node was {} created but is not reachable", - color_primary(&node_name) - ))?; - } + // Run foreground node in a separate process + // Output is handled in the foreground execution + let handle = spawn_node(&opts, cmd).await?; - opts.terminal - .clone() - .stdout() - .plain(self.plain_output(&opts, &node_name).await?) - .machine(&node_name) - .json(serde_json::to_string(&node_resources).into_diagnostic()?) - .write_line()?; + tokio::select! { + _ = handle.wait_with_output() => { std::process::exit(1) } + _ = wait_until_node_is_up(ctx, &opts.state, node_name.clone()) => {} + } + opts.state + .add_journey_event(JourneyEvent::NodeCreated, [(NODE_NAME, node_name)].into()) + .await?; Ok(()) } - - pub(crate) async fn spawn_background_node( - self, - opts: &CommandGlobalOpts, - ) -> miette::Result<()> { - spawn_node(opts, self).await - } } diff --git a/implementations/rust/ockam/ockam_command/src/node/create/config.rs b/implementations/rust/ockam/ockam_command/src/node/create/config.rs index 123b536e898..a42dfec746f 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/config.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/config.rs @@ -7,15 +7,12 @@ use crate::value_parsers::{parse_config_or_path_or_url, parse_key_val}; use crate::CommandGlobalOpts; use clap::Args; use miette::{miette, IntoDiagnostic}; -use nix::sys::signal; use ockam_api::cli_state::journeys::APPLICATION_EVENT_COMMAND_CONFIGURATION_FILE; use ockam_api::nodes::BackgroundNodeClient; -use ockam_api::CliState; -use ockam_core::{AsyncTryClone, OpenTelemetryContext}; +use ockam_core::OpenTelemetryContext; use ockam_node::Context; use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; -use tracing::{debug, instrument, Span}; +use tracing::{debug, instrument, trace, Span}; pub const ENROLLMENT_TICKET: &str = "ENROLLMENT_TICKET"; @@ -47,9 +44,10 @@ impl CreateCommand { /// Run the creation of a node using a node configuration #[instrument(skip_all)] pub async fn run_config(self, ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> { - debug!("Running node create with a node config"); - let mut node_config = self.get_node_config().await?; - node_config.merge(&self, &opts.state).await?; + debug!("running node create with a node config"); + let mut node_config = self.parse_node_config().await?; + node_config.merge(&self).await?; + trace!(?node_config, "merged node config with command args"); let node_name = node_config.node.name().ok_or(miette!( "Node name should be set to the command's default value" ))?; @@ -62,7 +60,7 @@ impl CreateCommand { .await } else { node_config - .run(ctx, &opts, &node_name, &identity_name) + .run_background(ctx, &opts, &node_name, &identity_name) .await }; if res.is_err() { @@ -71,27 +69,31 @@ impl CreateCommand { res } - /// Try to read the `name` argument as either: - /// - a URL to a configuration file - /// - a local path to a configuration file - /// - an inline configuration - /// or read the `configuration` argument - #[instrument(skip_all, fields(app.event.command.configuration_file))] - pub async fn get_node_config(&self) -> miette::Result { - let contents = match self.config_args.configuration.clone() { - Some(contents) => contents, + pub(super) async fn get_node_config_contents(&self) -> miette::Result { + match self.config_args.configuration.clone() { + Some(contents) => Ok(contents), None => match parse_config_or_path_or_url::(&self.name).await { - Ok(contents) => contents, + Ok(contents) => Ok(contents), Err(err) => { // If just the enrollment ticket is passed, create a minimal configuration if let Some(ticket) = &self.config_args.enrollment_ticket { - format!("ticket: {}", ticket) + Ok(format!("ticket: {}", ticket)) } else { - return Err(err); + Err(err) } } }, - }; + } + } + + /// Try to read the `name` argument as either: + /// - a URL to a configuration file + /// - a local path to a configuration file + /// - an inline configuration + /// or read the `configuration` argument + #[instrument(skip_all, fields(app.event.command.configuration_file))] + pub(super) async fn parse_node_config(&self) -> miette::Result { + let contents = self.get_node_config_contents().await?; // Set environment variables from the cli command args // This needs to be done before parsing the configuration for (key, value) in &self.config_args.variables { @@ -144,7 +146,7 @@ impl NodeConfig { /// Merge the arguments of the node defined in the config with the arguments from the /// "create" command, giving precedence to the command args. - async fn merge(&mut self, cmd: &CreateCommand, state: &CliState) -> miette::Result<()> { + async fn merge(&mut self, cmd: &CreateCommand) -> miette::Result<()> { // Set environment variables from the cli command again // to override the duplicate entries from the config file. for (key, value) in &cmd.config_args.variables { @@ -156,7 +158,7 @@ impl NodeConfig { // Set default values to the config, if not present if self.node.name.is_none() { - self.node.name = Some(cmd.get_default_node_name(state).await.into()); + self.node.name = Some(cmd.name.clone().into()); } if self.node.opentelemetry_context.is_none() { self.node.opentelemetry_context = Some( @@ -222,96 +224,82 @@ impl NodeConfig { Ok(()) } - pub async fn run( + async fn run_foreground( self, ctx: &Context, opts: &CommandGlobalOpts, node_name: &String, identity_name: &String, - ) -> miette::Result<()> { - debug!("Running node config"); - for section in self.parse_commands(node_name, identity_name)? { - section.run(ctx, opts).await? - } - Ok(()) - } - - pub async fn run_foreground( - self, - ctx: &Context, - opts: &CommandGlobalOpts, - node_name: &String, - identity_name: &str, ) -> miette::Result<()> { debug!("Running node config in foreground mode"); // First, run the `project enroll` commands to prepare the identity and project data - if self.project_enroll.ticket.is_some() { - if !self - .project_enroll - .run_in_new_process( - &opts.global_args, - vec![("identity".into(), identity_name.into())] - .into_iter() - .collect(), - )? - .wait() - .await - .into_diagnostic()? - .success() - { - return Err(miette!("Project enroll failed")); - } + if let Some(command) = self + .project_enroll + .into_parsed_commands(Some(identity_name))? + .into_iter() + .next() + { + command.run(ctx, opts).await?; // Newline before the `node create` command opts.terminal.write_line("")?; } - // Next, run the 'node create' command - let node_process = self - .node - .run_in_new_process(&opts.global_args, BTreeMap::default())?; + // Next, run the 'node create' command in a separate tokio task, + // where the foreground node will run until stopped + let node_handle = { + let node_command = self + .node + .into_parsed_commands()? + .into_iter() + .next() + .ok_or(miette!("A node command should be defined"))?; + let opts = opts.clone(); + std::thread::spawn(move || crate::Command::run(node_command, opts)) + }; // Wait for the node to be up let is_up = { - let ctx = ctx.async_try_clone().await.into_diagnostic()?; let mut node = - BackgroundNodeClient::create_to_node(&ctx, &opts.state, node_name).await?; - is_node_up(&ctx, &mut node, true).await? + BackgroundNodeClient::create_to_node(ctx, &opts.state, node_name).await?; + is_node_up(ctx, &mut node, true).await? }; if !is_up { return Err(miette!("Node failed to start")); } - let node = opts.state.get_node(node_name).await?; - let node_pid = node.pid().ok_or(miette!("Node has no PID set"))?; - ctrlc::set_handler(move || { - // Send a SIGTERM signal to the node process to trigger - // the foreground's ctrlc handler - let _ = signal::kill( - nix::unistd::Pid::from_raw(node_pid as i32), - signal::Signal::SIGTERM, - ); - }) - .expect("Error setting exit signal handler"); // Run the other sections let node_name = Some(node_name); let other_sections: Vec = vec![ self.policies.into_parsed_commands()?.into(), self.relays.into_parsed_commands(node_name)?.into(), - self.tcp_inlets.into_parsed_commands(node_name)?.into(), self.tcp_outlets.into_parsed_commands(node_name)?.into(), - self.influxdb_inlets.into_parsed_commands(node_name)?.into(), + self.tcp_inlets.into_parsed_commands(node_name)?.into(), self.influxdb_outlets .into_parsed_commands(node_name)? .into(), - self.kafka_inlet.into_parsed_commands(node_name)?.into(), + self.influxdb_inlets.into_parsed_commands(node_name)?.into(), self.kafka_outlet.into_parsed_commands(node_name)?.into(), + self.kafka_inlet.into_parsed_commands(node_name)?.into(), ]; - for cmds in other_sections { - cmds.run(ctx, opts).await?; + for section in other_sections { + section.run(ctx, opts).await?; } - // Block on the node process until it exits - node_process.wait_with_output().await.into_diagnostic()?; + // Block on the node until it exits + let _ = node_handle.join(); + Ok(()) + } + + async fn run_background( + self, + ctx: &Context, + opts: &CommandGlobalOpts, + node_name: &String, + identity_name: &String, + ) -> miette::Result<()> { + for section in self.parse_commands(node_name, identity_name)? { + section.run(ctx, opts).await? + } Ok(()) } @@ -330,14 +318,14 @@ impl NodeConfig { self.node.into_parsed_commands()?.into(), self.policies.into_parsed_commands()?.into(), self.relays.into_parsed_commands(node_name)?.into(), - self.tcp_inlets.into_parsed_commands(node_name)?.into(), self.tcp_outlets.into_parsed_commands(node_name)?.into(), - self.influxdb_inlets.into_parsed_commands(node_name)?.into(), + self.tcp_inlets.into_parsed_commands(node_name)?.into(), self.influxdb_outlets .into_parsed_commands(node_name)? .into(), - self.kafka_inlet.into_parsed_commands(node_name)?.into(), + self.influxdb_inlets.into_parsed_commands(node_name)?.into(), self.kafka_outlet.into_parsed_commands(node_name)?.into(), + self.kafka_inlet.into_parsed_commands(node_name)?.into(), ]) } } @@ -346,7 +334,6 @@ impl NodeConfig { mod tests { use super::*; use ockam_api::cli_state::ExportedEnrollmentTicket; - use ockam_api::CliState; #[tokio::test] async fn get_node_config_from_path() { @@ -357,7 +344,7 @@ mod tests { name: dummy_file.path().to_str().unwrap().to_string(), ..Default::default() }; - let res = cmd.get_node_config().await.unwrap(); + let res = cmd.parse_node_config().await.unwrap(); assert_eq!(res.node.name, Some("n1".into())); } @@ -376,7 +363,7 @@ mod tests { name: config_url, ..Default::default() }; - let res = cmd.get_node_config().await.unwrap(); + let res = cmd.parse_node_config().await.unwrap(); assert_eq!(res.node.name, Some("n1".into())); } @@ -390,7 +377,7 @@ mod tests { }, ..Default::default() }; - let res = cmd.get_node_config().await.unwrap(); + let res = cmd.parse_node_config().await.unwrap(); assert_eq!(res.node.name, Some("n1".into())); } @@ -405,7 +392,7 @@ mod tests { }, ..Default::default() }; - let res = cmd.get_node_config().await.unwrap(); + let res = cmd.parse_node_config().await.unwrap(); assert_eq!(res.project_enroll.ticket, Some(ticket_encoded)); } @@ -429,8 +416,6 @@ mod tests { #[tokio::test] async fn node_name_is_handled_correctly() { - let state = CliState::test().await.unwrap(); - // The command doesn't define a node name, the config file does let tmp_directory = tempfile::tempdir().unwrap(); let tmp_file = tmp_directory.path().join("config.json"); @@ -439,8 +424,8 @@ mod tests { name: tmp_file.to_str().unwrap().to_string(), ..Default::default() }; - let mut config = cmd.get_node_config().await.unwrap(); - config.merge(&cmd, &state).await.unwrap(); + let mut config = cmd.parse_node_config().await.unwrap(); + config.merge(&cmd).await.unwrap(); assert_eq!(config.node.name, Some("n1".into())); // Same with inline config @@ -451,8 +436,8 @@ mod tests { }, ..Default::default() }; - let mut config = cmd.get_node_config().await.unwrap(); - config.merge(&cmd, &state).await.unwrap(); + let mut config = cmd.parse_node_config().await.unwrap(); + config.merge(&cmd).await.unwrap(); assert_eq!(config.node.name, Some("n1".into())); // If the command defines a node name, it should override the inline config @@ -464,14 +449,13 @@ mod tests { }, ..Default::default() }; - let mut config = cmd.get_node_config().await.unwrap(); - config.merge(&cmd, &state).await.unwrap(); + let mut config = cmd.parse_node_config().await.unwrap(); + config.merge(&cmd).await.unwrap(); assert_eq!(config.node.name, Some("n2".into())); } #[tokio::test] async fn merge_config_with_cli() { - let state = CliState::test().await.unwrap(); let cli_enrollment_ticket = ExportedEnrollmentTicket::new_test(); let cli_enrollment_ticket_encoded = cli_enrollment_ticket.to_string(); @@ -487,7 +471,7 @@ mod tests { // No node config, cli args should be used let contents = String::new(); let mut config = NodeConfig::parse(contents).unwrap(); - config.merge(&cli_args, &state).await.unwrap(); + config.merge(&cli_args).await.unwrap(); let node = config.node.into_parsed_commands().unwrap().pop().unwrap(); assert_eq!(node.tcp_listener_address, "127.0.0.1:1234"); assert_eq!( @@ -507,7 +491,7 @@ mod tests { "# .to_string(); let mut config = NodeConfig::parse(contents).unwrap(); - config.merge(&cli_args, &state).await.unwrap(); + config.merge(&cli_args).await.unwrap(); let node = config.node.into_parsed_commands().unwrap().pop().unwrap(); assert_eq!(node.name, "n1"); assert_eq!(node.tcp_listener_address, cli_args.tcp_listener_address); diff --git a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs index 54c7abcb061..1151d88f5b1 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs @@ -12,7 +12,6 @@ use crate::CommandGlobalOpts; use ockam::tcp::{TcpListenerOptions, TcpTransport}; use ockam::udp::{UdpBindArguments, UdpBindOptions, UdpTransport}; use ockam::{Address, Context}; -use ockam_api::colors::color_primary; use ockam_api::fmt_log; use ockam_api::nodes::service::NodeManagerTransport; use ockam_api::nodes::{ @@ -33,21 +32,6 @@ impl CreateCommand { let node_name = self.name.clone(); debug!("creating node in foreground mode"); - if !self.skip_is_running_check - && opts - .state - .get_node(&node_name) - .await - .ok() - .map(|n| n.is_running()) - .unwrap_or(false) - { - return Err(miette!( - "Node {} is already running", - color_primary(&node_name) - )); - } - let trust_options = opts .state .retrieve_trust_options( @@ -115,7 +99,8 @@ impl CreateCommand { .into_diagnostic()?; debug!("in-memory node created"); - let node_manager_worker = NodeManagerWorker::new(Arc::new(node_man)); + let node_manager = Arc::new(node_man); + let node_manager_worker = NodeManagerWorker::new(node_manager.clone()); ctx.flow_controls() .add_consumer(NODEMANAGER_ADDR, tcp_listener.flow_control_id()); ctx.start_worker(NODEMANAGER_ADDR, node_manager_worker) @@ -137,13 +122,14 @@ impl CreateCommand { return Err(miette!("Failed to start services")); } - if !self.foreground_args.child_process { - opts.terminal - .clone() - .stdout() - .plain(self.plain_output(&opts, &node_name).await?) - .write_line()?; - } + let node_resources = node_manager.get_node_resources().await?; + opts.terminal + .clone() + .stdout() + .plain(self.plain_output(&opts, &node_name).await?) + .machine(&node_name) + .json_obj(&node_resources)? + .write_line()?; wait_for_exit_signal( &self.foreground_args, diff --git a/implementations/rust/ockam/ockam_command/src/node/show.rs b/implementations/rust/ockam/ockam_command/src/node/show.rs index 5ccbc034bee..518d2400454 100644 --- a/implementations/rust/ockam/ockam_command/src/node/show.rs +++ b/implementations/rust/ockam/ockam_command/src/node/show.rs @@ -8,7 +8,7 @@ use miette::IntoDiagnostic; use ockam_api::CliState; use tokio_retry::strategy::FixedInterval; -use tracing::{info, trace, warn}; +use tracing::{debug, info, trace, warn}; use ockam_api::nodes::models::node::{NodeResources, NodeStatus}; use ockam_api::nodes::BackgroundNodeClient; @@ -26,10 +26,10 @@ const PREVIEW_TAG: &str = include_str!("../static/preview_tag.txt"); const AFTER_LONG_HELP: &str = include_str!("./static/show/after_long_help.txt"); const IS_NODE_ACCESSIBLE_TIME_BETWEEN_CHECKS_MS: u64 = 25; -const IS_NODE_ACCESSIBLE_TIMEOUT: Duration = Duration::from_secs(10); +const IS_NODE_ACCESSIBLE_TIMEOUT: Duration = Duration::from_secs(5); const IS_NODE_READY_TIME_BETWEEN_CHECKS_MS: u64 = 25; -const IS_NODE_READY_TIMEOUT: Duration = Duration::from_secs(20); +const IS_NODE_READY_TIMEOUT: Duration = Duration::from_secs(10); /// Show the details of a node #[derive(Clone, Debug, Args)] @@ -141,8 +141,8 @@ pub async fn get_node_resources( } } -// Wait for a node to be up. We wait until the IS_NODE_ACCESSIBLE_TIMEOUT is passed and return `false` -// if the node is not up after that time. +/// Wait for a node to be up. We wait until the IS_NODE_ACCESSIBLE_TIMEOUT is passed and return `false` +/// if the node is not up after that time. pub async fn wait_until_node_is_up( ctx: &Context, cli_state: &CliState, @@ -164,6 +164,7 @@ pub async fn is_node_up( node: &mut BackgroundNodeClient, wait_until_ready: bool, ) -> Result { + debug!("waiting for node to be up"); let node_name = node.node_name(); if !is_node_accessible(ctx, node, wait_until_ready).await? { warn!(%node_name, "the node was not accessible in time"); @@ -186,9 +187,15 @@ async fn is_node_accessible( let retries = FixedInterval::from_millis(IS_NODE_ACCESSIBLE_TIME_BETWEEN_CHECKS_MS); let mut total_time = Duration::from_secs(0); for timeout_duration in retries { - if total_time >= IS_NODE_ACCESSIBLE_TIMEOUT || !wait_until_ready && !total_time.is_zero() { + // Max time exceeded + if total_time >= IS_NODE_ACCESSIBLE_TIMEOUT { return Ok(false); }; + // We don't wait and didn't succeed in the first try + if !wait_until_ready && !total_time.is_zero() { + return Ok(false); + } + // Check if node is accessible if node.is_accessible(ctx).await.is_ok() { info!(%node_name, "node is accessible"); return Ok(true); @@ -211,16 +218,20 @@ async fn is_node_ready( let now = std::time::Instant::now(); let mut total_time = Duration::from_secs(0); for timeout_duration in retries { - if total_time >= IS_NODE_READY_TIMEOUT || !wait_until_ready && !total_time.is_zero() { + // Max time exceeded + if total_time >= IS_NODE_READY_TIMEOUT { return Ok(false); }; - // Test if node is ready - // If the node is down, we expect it won't reply and the timeout will trigger the next loop + // We don't wait and didn't succeed in the first try + if !wait_until_ready && !total_time.is_zero() { + return Ok(false); + } + // Check if node is ready let result = node .ask_with_timeout::<(), NodeStatus>(ctx, api::query_status(), Duration::from_secs(1)) .await; if let Ok(node_status) = result { - if node_status.status.is_running() { + if node_status.process_status.is_running() { let elapsed = now.elapsed(); info!(%node_name, ?elapsed, "node is ready {:?}", node_status); return Ok(true); @@ -230,6 +241,7 @@ async fn is_node_ready( } else { trace!(%node_name, "node is initializing"); } + tokio::time::sleep(timeout_duration).await; total_time = total_time.add(timeout_duration) } Ok(false) diff --git a/implementations/rust/ockam/ockam_command/src/node/util.rs b/implementations/rust/ockam/ockam_command/src/node/util.rs index e1637a384ab..0c0b08554e4 100644 --- a/implementations/rust/ockam/ockam_command/src/node/util.rs +++ b/implementations/rust/ockam/ockam_command/src/node/util.rs @@ -1,19 +1,18 @@ +use crate::node::show::wait_until_node_is_up; +use crate::node::CreateCommand; +use crate::run::parser::resource::utils::subprocess_stdio; +use crate::shared_args::TrustOpts; +use crate::{Command as CommandTrait, CommandGlobalOpts}; use miette::Context as _; use miette::IntoDiagnostic; use ockam_core::env::get_env_with_default; use ockam_node::Context; use rand::random; use std::env::current_exe; -use std::os::unix::prelude::CommandExt; -use std::process::{Command, Stdio}; +use std::process::Stdio; +use tokio::process::{Child, Command}; use tracing::info; -use crate::node::show::wait_until_node_is_up; -use crate::node::CreateCommand; -use crate::run::parser::resource::utils::subprocess_stdio; -use crate::shared_args::TrustOpts; -use crate::{Command as CommandTrait, CommandGlobalOpts}; - pub struct NodeManagerDefaults { pub node_name: String, pub tcp_listener_address: String, @@ -36,45 +35,46 @@ pub async fn initialize_default_node( ctx: &Context, opts: &CommandGlobalOpts, ) -> miette::Result<()> { - if opts.state.get_default_node().await.is_err() { + if opts.state.is_using_in_memory_database()? { + return Ok(()); + } else if opts.state.get_default_node().await.is_err() { let cmd = CreateCommand::default(); cmd.async_run(ctx, opts.clone()).await?; opts.terminal.write_line("")?; + } else { + let node = opts.state.get_default_node().await?; + if !node.is_running() { + wait_until_node_is_up(ctx, &opts.state, node.name()).await?; + } } - let node_name = opts.state.get_default_node().await?; - wait_until_node_is_up(ctx, &opts.state, node_name.name()).await?; Ok(()) } /// Construct the argument list and re-execute the ockam /// CLI in foreground mode to start the newly created node #[allow(clippy::too_many_arguments)] -pub async fn spawn_node(opts: &CommandGlobalOpts, cmd: CreateCommand) -> miette::Result<()> { +pub async fn spawn_node(opts: &CommandGlobalOpts, cmd: CreateCommand) -> miette::Result { info!( "preparing to spawn a new node with name {} in the background", &cmd.name ); let CreateCommand { - skip_is_running_check, name, - identity: identity_name, + config_args, + foreground_args, + skip_is_running_check, tcp_listener_address, udp_listener_address, + http_server, no_status_endpoint, status_endpoint_port, udp, launch_configuration, + identity, trust_opts, opentelemetry_context, - .. } = cmd; - let TrustOpts { - project_name, - authority_identity, - authority_route, - credential_scope, - } = trust_opts; let mut args = vec![ match opts.global_args.verbose { @@ -91,22 +91,60 @@ pub async fn spawn_node(opts: &CommandGlobalOpts, cmd: CreateCommand) -> miette: "--child-process".to_string(), ]; - if let Some(credential_scope) = credential_scope { - args.push("--credential-scope".to_string()); - args.push(credential_scope) + // global args + if !opts.terminal.is_tty() { + args.push("--no-color".to_string()); + } + if opts.terminal.is_quiet() { + args.push("--quiet".to_string()); + } + if opts.global_args.no_input { + args.push("--no-input".to_string()); + } + if let Some(output_format) = opts.global_args.output_format.as_ref() { + args.push("--output".to_string()); + args.push(output_format.to_string()); + } + + // config args + if config_args.started_from_configuration { + args.push("--started-from-configuration".to_string()); + } + if let Some(enrollment_ticket) = config_args.enrollment_ticket { + args.push("--enrollment-ticket".to_string()); + args.push(enrollment_ticket); + } + if let Some(configuration) = config_args.configuration { + args.push("--configuration".to_string()); + args.push(configuration); + } + for (key, value) in config_args.variables { + args.push("--variable".to_string()); + args.push(format!("{}={}", key, value)); + } + + if foreground_args.exit_on_eof { + args.push("--exit-on-eof".to_string()); } if skip_is_running_check { args.push("--skip-is-running-check".to_string()); } - if !opts.terminal.is_tty() { - args.push("--no-color".to_string()); + // health check args + if http_server { + args.push("--http-server".to_string()); + } + if no_status_endpoint { + args.push("--no-status-endpoint".to_string()); + } + if let Some(status_endpoint_port) = status_endpoint_port { + args.push("--status-endpoint-port".to_string()); + args.push(status_endpoint_port.to_string()); } - if let Some(identity_name) = identity_name { - args.push("--identity".to_string()); - args.push(identity_name); + if udp { + args.push("--udp".to_string()); } if let Some(config) = launch_configuration { @@ -114,17 +152,25 @@ pub async fn spawn_node(opts: &CommandGlobalOpts, cmd: CreateCommand) -> miette: args.push(serde_json::to_string(&config).unwrap()); } - if let Some(project_name) = project_name { + if let Some(identity_name) = identity { + args.push("--identity".to_string()); + args.push(identity_name); + } + + // trust opts + if let Some(credential_scope) = trust_opts.credential_scope { + args.push("--credential-scope".to_string()); + args.push(credential_scope) + } + if let Some(project_name) = trust_opts.project_name { args.push("--project".to_string()); args.push(project_name); } - - if let Some(authority_identity) = authority_identity { + if let Some(authority_identity) = trust_opts.authority_identity { args.push("--authority-identity".to_string()); args.push(authority_identity.export_as_string().into_diagnostic()?); } - - if let Some(authority_route) = authority_route { + if let Some(authority_route) = trust_opts.authority_route { args.push("--authority-route".to_string()); args.push(authority_route.to_string()); } @@ -134,26 +180,13 @@ pub async fn spawn_node(opts: &CommandGlobalOpts, cmd: CreateCommand) -> miette: args.push(opentelemetry_context.to_string()); } - if no_status_endpoint { - args.push("--no-status-endpoint".to_string()); - } - - if let Some(status_endpoint_port) = status_endpoint_port { - args.push("--status-endpoint-port".to_string()); - args.push(status_endpoint_port.to_string()); - } - - if udp { - args.push("--udp".to_string()); - } - args.push(name.to_owned()); run_ockam(args, opts.global_args.quiet).await } /// Run the ockam command line with specific arguments -pub async fn run_ockam(args: Vec, quiet: bool) -> miette::Result<()> { +pub async fn run_ockam(args: Vec, quiet: bool) -> miette::Result { info!("spawning a new process"); // On systems with non-obvious path setups (or during @@ -179,8 +212,6 @@ pub async fn run_ockam(args: Vec, quiet: bool) -> miette::Result<()> { }) .spawn() .into_diagnostic() - .context("failed to spawn node")?; + .context("failed to spawn node") } - - Ok(()) } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs index 0440579d850..e213774b36b 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs @@ -85,7 +85,6 @@ impl Resource for Node { if let Some(opentelemetry_context) = self.opentelemetry_context { args.insert("opentelemetry-context".into(), opentelemetry_context); } - if let Some(udp) = self.udp { args.insert("udp".into(), udp); } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/project_enroll.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/project_enroll.rs index 4ea5bf62959..db18827ee27 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/project_enroll.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/project_enroll.rs @@ -17,8 +17,8 @@ impl Resource for ProjectEnroll { const COMMAND_NAME: &'static str = EnrollCommand::NAME; fn args(self) -> Vec { - if let Some(path_or_contents) = self.ticket { - vec![path_or_contents] + if let Some(ticket) = self.ticket { + vec![ticket] } else { vec![] } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs index b6ec42b352b..d4ad70f2400 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs @@ -1,15 +1,7 @@ -use std::collections::BTreeMap; -use std::process::Stdio; - -use crate::run::parser::building_blocks::{as_command_args, ArgKey, ArgValue}; -use crate::run::parser::resource::utils::{binary_path, subprocess_stdio}; -use crate::{Command, CommandGlobalOpts, GlobalArgs}; +use crate::{Command, CommandGlobalOpts}; use async_trait::async_trait; -use miette::{IntoDiagnostic, Result}; -use ockam_api::colors::color_primary; -use ockam_core::AsyncTryClone; +use miette::Result; use ockam_node::Context; -use tokio::process::{Child, Command as ProcessCommand}; use tracing::debug; /// This trait defines the methods that a resource must implement before it's parsed into a Command. @@ -21,50 +13,6 @@ pub trait Resource: Sized + Send + Sync + 'static { fn args(self) -> Vec { vec![] } - - fn run_in_new_process( - self, - global_args: &GlobalArgs, - extra_args: BTreeMap, - ) -> Result { - let mut args = self.args(); - args.append(as_command_args(extra_args).as_mut()); - if global_args.quiet { - args.push("--quiet".to_string()); - } - if global_args.no_color { - args.push("--no-color".to_string()); - } - if global_args.no_input { - args.push("--no-input".to_string()); - } - if global_args.verbose > 0 { - args.push(format!("-{}", "v".repeat(global_args.verbose as usize))); - } - if let Some(o) = &global_args.output_format { - args.push("--output".to_string()); - args.push(o.to_string()); - } - let args = Self::COMMAND_NAME - .split(' ') - .chain(args.iter().map(|s| s.as_str())); - let handle = unsafe { - ProcessCommand::new(binary_path()) - .args(args) - .stdout(subprocess_stdio(global_args.quiet)) - .stderr(subprocess_stdio(global_args.quiet)) - .stdin(Stdio::null()) - // This unsafe block will only panic if the closure panics, which shouldn't happen - .pre_exec(|| { - // Detach the process from the parent - nix::unistd::setsid().map_err(std::io::Error::from)?; - Ok(()) - }) - .spawn() - .into_diagnostic()? - }; - Ok(handle) - } } /// This trait represents a Clap command which can be validated and executed @@ -92,7 +40,7 @@ where } async fn run(&self, ctx: &Context, opts: &CommandGlobalOpts) -> Result<()> { - debug!("Running command {}\n{:?}", color_primary(self.name()), self); + debug!("running command {} {:?}", self.name(), self); Ok(self.clone().async_run_with_retry(ctx, opts.clone()).await?) } } @@ -119,12 +67,17 @@ impl ParsedCommands { /// Validate and run each command pub async fn run(self, ctx: &Context, opts: &CommandGlobalOpts) -> Result<()> { - for cmd in self.commands.into_iter() { + let len = self.commands.len(); + if len > 0 { + opts.terminal.write_line("")?; + } + for (idx, cmd) in self.commands.into_iter().enumerate() { if cmd.is_valid(ctx, opts).await? { - let ctx = ctx.async_try_clone().await.into_diagnostic()?; - cmd.run(&ctx, opts).await?; - // Newline between commands - opts.terminal.write_line("")?; + cmd.run(ctx, opts).await?; + if idx < len - 1 { + // Newline between commands + opts.terminal.write_line("")?; + } } } Ok(()) diff --git a/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs b/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs index 01552c1d55f..3ffe1182fd9 100644 --- a/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs +++ b/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs @@ -71,6 +71,7 @@ pub async fn wait_for_exit_signal( debug!("waiting for exit signal"); if !args.child_process { + opts.terminal.write_line("")?; opts.terminal.write_line(fmt_log!("{}", msg))?; } diff --git a/implementations/rust/ockam/ockam_command/src/util/mod.rs b/implementations/rust/ockam/ockam_command/src/util/mod.rs index 195615a72c2..0de8550cfe7 100644 --- a/implementations/rust/ockam/ockam_command/src/util/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/util/mod.rs @@ -188,6 +188,9 @@ pub async fn clean_nodes_multiaddr( pub fn port_is_free_guard(address: &SocketAddr) -> Result<()> { let port = address.port(); + if port == 0 { + return Ok(()); + } let ip = address.ip(); if TcpListener::bind((ip, port)).is_err() { Err(miette!( diff --git a/implementations/rust/ockam/ockam_command/tests/bats/local/nodes.bats b/implementations/rust/ockam/ockam_command/tests/bats/local/nodes.bats index 2ee6e250ecc..88fbfdddad1 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/local/nodes.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/local/nodes.bats @@ -224,3 +224,23 @@ name: n1 EOF run_success "$OCKAM" node create "$OCKAM_HOME/node.yaml" } + +@test "node - create in-memory foreground node" { + # create a node in-memory + OCKAM_IN_MEMORY=true "$OCKAM" node create n1 -f -vv >$OCKAM_HOME/node.logs & + pid=$! + sleep 2 + + # check logs + run_success cat $OCKAM_HOME/node.logs + assert_output --partial "Created a new Node named n1" + + # no database or files should be created + run_failure ls -l "$OCKAM_HOME/nodes" + run_failure ls -l "$OCKAM_HOME/database.sqlite3" + run_success $OCKAM node show n1 + assert_output "[]" + + # stop the node + run_success kill -9 $pid +} diff --git a/implementations/rust/ockam/ockam_command/tests/bats/orchestrator_enroll/nodes.bats b/implementations/rust/ockam/ockam_command/tests/bats/orchestrator_enroll/nodes.bats index d1031dd31fb..0cb4bd57a61 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/orchestrator_enroll/nodes.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/orchestrator_enroll/nodes.bats @@ -91,6 +91,26 @@ EOF run_success curl -sfI --retry-connrefused --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$CLIENT_PORT" } +@test "nodes - in-memory, create with config, single machine, unnamed portal" { + ADMIN_HOME_DIR="$OCKAM_HOME" + ticket_path="$ADMIN_HOME_DIR/enrollment.ticket" + export RELAY_NAME=$(random_str) + $OCKAM project ticket --usage-count 5 --relay $RELAY_NAME >"$ticket_path" + + setup_home_dir + export NODE_PORT=$(random_port) + export CLIENT_PORT=$(random_port) + OCKAM_IN_MEMORY=true "$OCKAM" node create "$BATS_TEST_DIRNAME/fixtures/node-create.1.unnamed-portal.config.yaml" \ + --variable SERVICE_PORT="$PYTHON_SERVER_PORT" --enrollment-ticket $ticket_path -f & + pid=$! + sleep 5 + + # portal is working: inlet -> relay -> outlet -> python server + run_success curl -sfI --retry-connrefused --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$CLIENT_PORT" + + kill -9 $pid +} + @test "nodes - create with config, single machine, named portal" { export RELAY_NAME=$(random_str) export CLIENT_PORT=$(random_port) diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/database_configuration.rs b/implementations/rust/ockam/ockam_node/src/storage/database/database_configuration.rs index 1aea9be5fbb..511cd46074e 100644 --- a/implementations/rust/ockam/ockam_node/src/storage/database/database_configuration.rs +++ b/implementations/rust/ockam/ockam_node/src/storage/database/database_configuration.rs @@ -5,6 +5,8 @@ use ockam_core::{Error, Result}; use std::fs::create_dir_all; use std::path::{Path, PathBuf}; +/// Use an in-memory SQLite database +pub const OCKAM_IN_MEMORY: &str = "OCKAM_IN_MEMORY"; /// Database host environment variable pub const OCKAM_POSTGRES_HOST: &str = "OCKAM_POSTGRES_HOST"; /// Database port environment variable