From 9a9ac59b46527ac4bb69e826f14648ce9d6c77dd Mon Sep 17 00:00:00 2001 From: Oleksandr Deundiak Date: Mon, 30 Dec 2024 22:30:29 +0100 Subject: [PATCH] chore(rust): add low latency example --- Cargo.lock | 11 ++ Cargo.toml | 1 + .../rust/low_latency_portal/.rustfmt.toml | 1 + examples/rust/low_latency_portal/Cargo.toml | 15 +++ examples/rust/low_latency_portal/README.md | 1 + .../examples/config-generator.rs | 90 +++++++++++++++ .../low_latency_portal/examples/inlet-node.rs | 105 ++++++++++++++++++ .../examples/outlet-node.rs | 94 ++++++++++++++++ .../low_latency_portal/examples/relay-node.rs | 66 +++++++++++ .../rust/low_latency_portal/src/configs.rs | 43 +++++++ examples/rust/low_latency_portal/src/lib.rs | 3 + .../src/secure_channel/options.rs | 1 + 12 files changed, 431 insertions(+) create mode 100644 examples/rust/low_latency_portal/.rustfmt.toml create mode 100644 examples/rust/low_latency_portal/Cargo.toml create mode 100644 examples/rust/low_latency_portal/README.md create mode 100644 examples/rust/low_latency_portal/examples/config-generator.rs create mode 100644 examples/rust/low_latency_portal/examples/inlet-node.rs create mode 100644 examples/rust/low_latency_portal/examples/outlet-node.rs create mode 100644 examples/rust/low_latency_portal/examples/relay-node.rs create mode 100644 examples/rust/low_latency_portal/src/configs.rs create mode 100644 examples/rust/low_latency_portal/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 80d8ca87c62..2e01782417b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3964,6 +3964,17 @@ dependencies = [ "value-bag", ] +[[package]] +name = "low_latency_portal" +version = "0.1.0" +dependencies = [ + "hex", + "log", + "ockam", + "serde", + "serde_json", +] + [[package]] name = "lz4" version = "1.28.0" diff --git a/Cargo.toml b/Cargo.toml index fb034b5285c..21553c568ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "examples/rust/file_transfer", "examples/rust/get_started", "examples/rust/tcp_inlet_and_outlet", + "examples/rust/low_latency_portal", "examples/rust/mitm_node", "implementations/rust/ockam/*", "tools/docs/example_blocks", diff --git a/examples/rust/low_latency_portal/.rustfmt.toml b/examples/rust/low_latency_portal/.rustfmt.toml new file mode 100644 index 00000000000..75306517965 --- /dev/null +++ b/examples/rust/low_latency_portal/.rustfmt.toml @@ -0,0 +1 @@ +max_width = 120 diff --git a/examples/rust/low_latency_portal/Cargo.toml b/examples/rust/low_latency_portal/Cargo.toml new file mode 100644 index 00000000000..e69475838ac --- /dev/null +++ b/examples/rust/low_latency_portal/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "low_latency_portal" +version = "0.1.0" +authors = ["Ockam Developers"] +edition = "2021" +license = "Apache-2.0" +publish = false +rust-version = "1.70.0" + +[dependencies] +ockam = { path = "../../../implementations/rust/ockam/ockam", features = ["aws-lc"] } +serde = { version = "1.0.215", features = ["derive"] } +serde_json = "1.0" +hex = "0.4.3" +log = "0.4.22" diff --git a/examples/rust/low_latency_portal/README.md b/examples/rust/low_latency_portal/README.md new file mode 100644 index 00000000000..9162e4bfedd --- /dev/null +++ b/examples/rust/low_latency_portal/README.md @@ -0,0 +1 @@ +# Low latency portal diff --git a/examples/rust/low_latency_portal/examples/config-generator.rs b/examples/rust/low_latency_portal/examples/config-generator.rs new file mode 100644 index 00000000000..8d43f4f43a8 --- /dev/null +++ b/examples/rust/low_latency_portal/examples/config-generator.rs @@ -0,0 +1,90 @@ +use low_latency_portal::{InletConfig, OutletConfig, RelayConfig}; +use ockam::compat::rand::{thread_rng, RngCore}; +use ockam::identity::{Identities, IdentityBuilder, Vault}; +use ockam::transport::HostnamePort; +use ockam::vault::{ + EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSigning, EDDSA_CURVE25519_SECRET_KEY_LENGTH, +}; +use ockam::{Context, Result}; + +#[ockam::node] +async fn main(ctx: Context) -> Result<()> { + let identity_vault = SoftwareVaultForSigning::create().await?; + + let mut inlet_key_binary = [0u8; EDDSA_CURVE25519_SECRET_KEY_LENGTH]; + let mut outlet_key_binary = [0u8; EDDSA_CURVE25519_SECRET_KEY_LENGTH]; + let mut relay_key_binary = [0u8; EDDSA_CURVE25519_SECRET_KEY_LENGTH]; + + { + let mut rng = thread_rng(); + + rng.fill_bytes(&mut inlet_key_binary); + rng.fill_bytes(&mut outlet_key_binary); + rng.fill_bytes(&mut relay_key_binary); + } + + let inlet_key = identity_vault + .import_key(SigningSecret::EdDSACurve25519(EdDSACurve25519SecretKey::new( + inlet_key_binary, + ))) + .await?; + let outlet_key = identity_vault + .import_key(SigningSecret::EdDSACurve25519(EdDSACurve25519SecretKey::new( + outlet_key_binary, + ))) + .await?; + let relay_key = identity_vault + .import_key(SigningSecret::EdDSACurve25519(EdDSACurve25519SecretKey::new( + relay_key_binary, + ))) + .await?; + + let mut vault = Vault::create().await?; + vault.identity_vault = identity_vault; + + let identities = Identities::builder().await?.with_vault(vault).build(); + let inlet_identifier = IdentityBuilder::new(identities.identities_creation()) + .with_existing_key(inlet_key) + .build() + .await?; + let outlet_identifier = IdentityBuilder::new(identities.identities_creation()) + .with_existing_key(outlet_key) + .build() + .await?; + let relay_identifier = IdentityBuilder::new(identities.identities_creation()) + .with_existing_key(relay_key) + .build() + .await?; + + let inlet_config = InletConfig { + inlet_change_history: hex::encode(identities.export_identity(&inlet_identifier).await?), + inlet_identity_key: hex::encode(inlet_key_binary), + inlet_address: HostnamePort::new("0.0.0.0", 4000), + relay_address: HostnamePort::new("127.0.0.1", 4001), + outlet_identifier: outlet_identifier.to_string(), + outlet_relay_name: "outlet_relay".to_string(), + }; + + let outlet_config = OutletConfig { + outlet_change_history: hex::encode(identities.export_identity(&outlet_identifier).await?), + outlet_identity_key: hex::encode(outlet_key_binary), + outlet_relay_name: "outlet_relay".to_string(), + outlet_peer_address: HostnamePort::new("127.0.0.1", 5000), + relay_identifier: relay_identifier.to_string(), + relay_address: HostnamePort::new("127.0.0.1", 4001), + inlet_identifiers: vec![inlet_identifier.to_string()], + }; + + let relay_config = RelayConfig { + relay_change_history: hex::encode(identities.export_identity(&relay_identifier).await?), + relay_identity_key: hex::encode(relay_key_binary), + outlet_identifier: outlet_identifier.to_string(), + relay_listener_address: HostnamePort::new("0.0.0.0", 4001), + }; + + std::fs::write("inlet.config.json", serde_json::to_vec(&inlet_config).unwrap()).unwrap(); + std::fs::write("outlet.config.json", serde_json::to_vec(&outlet_config).unwrap()).unwrap(); + std::fs::write("relay.config.json", serde_json::to_vec(&relay_config).unwrap()).unwrap(); + + ctx.stop().await +} diff --git a/examples/rust/low_latency_portal/examples/inlet-node.rs b/examples/rust/low_latency_portal/examples/inlet-node.rs new file mode 100644 index 00000000000..7bf535eaa05 --- /dev/null +++ b/examples/rust/low_latency_portal/examples/inlet-node.rs @@ -0,0 +1,105 @@ +use log::info; +use low_latency_portal::{parse, InletConfig}; +use ockam::identity::models::ChangeHistory; +use ockam::identity::{Identifier, SecureChannelOptions, SecureChannels, TrustIdentifierPolicy, Vault}; +use ockam::tcp::{TcpConnectionOptions, TcpInletOptions, TcpTransport}; +use ockam::vault::{EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSigning}; +use ockam::{route, Context, Result}; +use std::str::FromStr; + +#[ockam::node] +async fn main(ctx: Context) -> Result<()> { + let args: Vec = std::env::args().collect(); + + info!("A"); + let config = if let Some(config) = args.get(1) { + config.clone() + } else { + let config = std::fs::read("inlet.config.json").unwrap(); + String::from_utf8(config).unwrap() + }; + + info!("B"); + let config: InletConfig = parse(&config)?; + info!("C"); + + let outlet_identifier = Identifier::from_str(&config.outlet_identifier)?; + let inlet_identity_key = hex::decode(config.inlet_identity_key).unwrap(); + let inlet_change_history = ChangeHistory::import_from_string(&config.inlet_change_history)?; + + info!("D"); + + let tcp = TcpTransport::create(&ctx).await?; + + info!("E"); + + let identity_vault = SoftwareVaultForSigning::create().await?; // FIXME: 16ms + + info!("F"); + + let relay_identity_key = EdDSACurve25519SecretKey::new(inlet_identity_key.try_into().unwrap()); + + info!("G"); + let relay_identity_key = SigningSecret::EdDSACurve25519(relay_identity_key); + + info!("H"); + + identity_vault.import_key(relay_identity_key).await?; + + info!("J"); + + let mut vault = Vault::create().await?; // FIXME: 25ms + vault.identity_vault = identity_vault; + + info!("K"); + + let secure_channels = SecureChannels::builder().await?.with_vault(vault).build(); // FIXME: 16 ms + + info!("L"); + + let inlet_identifier = secure_channels + .identities() + .identities_verification() + .import_from_change_history(None, inlet_change_history) + .await?; + + info!("M"); + + let tcp_connection_to_relay = tcp + .connect(config.relay_address.to_string(), TcpConnectionOptions::new()) + .await?; + + info!("N"); + + let secure_channel_to_outlet = secure_channels + .create_secure_channel( + &ctx, + &inlet_identifier, + route![ + tcp_connection_to_relay, + format!("forward_to_{}", config.outlet_relay_name), + "api" + ], + SecureChannelOptions::new().with_trust_policy(TrustIdentifierPolicy::new(outlet_identifier)), + ) + .await?; + + info!("O"); + + tcp.create_inlet( + config.inlet_address.to_string(), + route![secure_channel_to_outlet, "outlet"], + TcpInletOptions::new(), + ) + .await?; + + info!("P"); + + info!("Initialized successfully"); + + ctx.stop().await?; + + info!("Q"); + + Ok(()) +} diff --git a/examples/rust/low_latency_portal/examples/outlet-node.rs b/examples/rust/low_latency_portal/examples/outlet-node.rs new file mode 100644 index 00000000000..cfe770c6bd4 --- /dev/null +++ b/examples/rust/low_latency_portal/examples/outlet-node.rs @@ -0,0 +1,94 @@ +use log::info; +use low_latency_portal::{parse, OutletConfig}; +use ockam::identity::models::ChangeHistory; +use ockam::identity::{ + Identifier, SecureChannelListenerOptions, SecureChannelOptions, SecureChannels, TrustIdentifierPolicy, + TrustMultiIdentifiersPolicy, Vault, +}; +use ockam::remote::{RemoteRelay, RemoteRelayOptions}; +use ockam::tcp::{TcpConnectionOptions, TcpOutletOptions, TcpTransport}; +use ockam::vault::{EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSigning}; +use ockam::{route, Context, Result}; +use std::str::FromStr; + +#[ockam::node] +async fn main(ctx: Context) -> Result<()> { + let args: Vec = std::env::args().collect(); + + let config = if let Some(config) = args.get(1) { + config.clone() + } else { + let config = std::fs::read("outlet.config.json").unwrap(); + String::from_utf8(config).unwrap() + }; + + let config: OutletConfig = parse(&config)?; + + let relay_identifier = Identifier::from_str(&config.relay_identifier)?; + let outlet_identity_key = hex::decode(config.outlet_identity_key).unwrap(); + let inlet_identifiers = config + .inlet_identifiers + .iter() + .map(|i| Identifier::from_str(i).unwrap()) + .collect(); + let outlet_change_history = ChangeHistory::import_from_string(&config.outlet_change_history)?; + + let tcp = TcpTransport::create(&ctx).await?; + + let identity_vault = SoftwareVaultForSigning::create().await?; + let outlet_identity_key = EdDSACurve25519SecretKey::new(outlet_identity_key.try_into().unwrap()); + let outlet_identity_key = SigningSecret::EdDSACurve25519(outlet_identity_key); + identity_vault.import_key(outlet_identity_key).await?; + + let mut vault = Vault::create().await?; + vault.identity_vault = identity_vault; + + let secure_channels = SecureChannels::builder().await?.with_vault(vault).build(); + + let outlet_identifier = secure_channels + .identities() + .identities_verification() + .import_from_change_history(None, outlet_change_history) + .await?; + + let tcp_connection_options = TcpConnectionOptions::new(); + let secure_channel_options = + SecureChannelOptions::new().with_trust_policy(TrustIdentifierPolicy::new(relay_identifier)); + let secure_channel_listener_options = SecureChannelListenerOptions::new() + .as_consumer(&secure_channel_options.producer_flow_control_id()) + .with_trust_policy(TrustMultiIdentifiersPolicy::new(inlet_identifiers)); + let tcp_outlet_options = + TcpOutletOptions::new().as_consumer(&secure_channel_listener_options.spawner_flow_control_id()); + + tcp.create_outlet("outlet", config.outlet_peer_address, tcp_outlet_options) + .await?; + + secure_channels + .create_secure_channel_listener(&ctx, &outlet_identifier, "api", secure_channel_listener_options) + .await?; + + let tcp_connection_to_relay = tcp + .connect(config.relay_address.to_string(), tcp_connection_options) + .await?; + let secure_channel_to_relay = secure_channels + .create_secure_channel( + &ctx, + &outlet_identifier, + route![tcp_connection_to_relay, "api"], + secure_channel_options, + ) + .await?; + + let relay_options = RemoteRelayOptions::new(); + RemoteRelay::create_static( + &ctx, + route![secure_channel_to_relay], + config.outlet_relay_name, + relay_options, + ) + .await?; + + info!("Initialized successfully"); + + Ok(()) +} diff --git a/examples/rust/low_latency_portal/examples/relay-node.rs b/examples/rust/low_latency_portal/examples/relay-node.rs new file mode 100644 index 00000000000..a4815329840 --- /dev/null +++ b/examples/rust/low_latency_portal/examples/relay-node.rs @@ -0,0 +1,66 @@ +use log::info; +use low_latency_portal::{parse, RelayConfig}; +use ockam::identity::models::ChangeHistory; +use ockam::identity::{Identifier, SecureChannelListenerOptions, SecureChannels, TrustIdentifierPolicy, Vault}; +use ockam::tcp::{TcpListenerOptions, TcpTransport}; +use ockam::vault::{EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSigning}; +use ockam::{Context, RelayService, RelayServiceOptions, Result}; +use std::str::FromStr; + +#[ockam::node] +async fn main(ctx: Context) -> Result<()> { + let args: Vec = std::env::args().collect(); + + let config = if let Some(config) = args.get(1) { + config.clone() + } else { + let config = std::fs::read("relay.config.json").unwrap(); + String::from_utf8(config).unwrap() + }; + + let config: RelayConfig = parse(&config)?; + + let outlet_identifier = Identifier::from_str(&config.outlet_identifier)?; + let relay_identity_key = hex::decode(config.relay_identity_key).unwrap(); + let relay_change_history = ChangeHistory::import_from_string(&config.relay_change_history)?; + + let tcp = TcpTransport::create(&ctx).await?; + + let identity_vault = SoftwareVaultForSigning::create().await?; + let relay_identity_key = EdDSACurve25519SecretKey::new(relay_identity_key.try_into().unwrap()); + let relay_identity_key = SigningSecret::EdDSACurve25519(relay_identity_key); + identity_vault.import_key(relay_identity_key).await?; + + let mut vault = Vault::create().await?; + vault.identity_vault = identity_vault; + + let secure_channels = SecureChannels::builder().await?.with_vault(vault).build(); + + let relay_identifier = secure_channels + .identities() + .identities_verification() + .import_from_change_history(None, relay_change_history) + .await?; + + let tcp_listener_options = TcpListenerOptions::new(); + let secure_channel_listener_options = SecureChannelListenerOptions::new() + .as_consumer(&tcp_listener_options.spawner_flow_control_id()) + .with_trust_policy(TrustIdentifierPolicy::new(outlet_identifier)); + let relay_service_options = RelayServiceOptions::new() + .service_as_consumer(&secure_channel_listener_options.spawner_flow_control_id()) + .relay_as_consumer(&tcp_listener_options.spawner_flow_control_id()) + .prefix("forward_to_"); + + RelayService::create(&ctx, "static_forwarding_service", relay_service_options).await?; + + secure_channels + .create_secure_channel_listener(&ctx, &relay_identifier, "api", secure_channel_listener_options) + .await?; + + tcp.listen(config.relay_listener_address.to_string(), tcp_listener_options) + .await?; + + info!("Initialized successfully"); + + Ok(()) +} diff --git a/examples/rust/low_latency_portal/src/configs.rs b/examples/rust/low_latency_portal/src/configs.rs new file mode 100644 index 00000000000..fbcfa2a41a2 --- /dev/null +++ b/examples/rust/low_latency_portal/src/configs.rs @@ -0,0 +1,43 @@ +use ockam::errcode::{Kind, Origin}; +use ockam::transport::HostnamePort; +use ockam::{Error, Result}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct InletConfig { + pub inlet_change_history: String, + pub inlet_identity_key: String, + pub inlet_address: HostnamePort, + pub relay_address: HostnamePort, + pub outlet_identifier: String, + pub outlet_relay_name: String, +} + +#[derive(Serialize, Deserialize)] +pub struct OutletConfig { + pub outlet_change_history: String, + pub outlet_identity_key: String, + pub outlet_relay_name: String, + pub outlet_peer_address: HostnamePort, + pub relay_identifier: String, + pub relay_address: HostnamePort, + pub inlet_identifiers: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct RelayConfig { + pub relay_change_history: String, + pub relay_identity_key: String, + pub outlet_identifier: String, + pub relay_listener_address: HostnamePort, +} + +pub fn parse<'a, T: Deserialize<'a>>(config: &'a str) -> Result { + serde_json::from_str(config).map_err(|err| { + Error::new( + Origin::Application, + Kind::Misuse, + format!("Invalid config json: {}", err), + ) + }) +} diff --git a/examples/rust/low_latency_portal/src/lib.rs b/examples/rust/low_latency_portal/src/lib.rs new file mode 100644 index 00000000000..f93e0db7574 --- /dev/null +++ b/examples/rust/low_latency_portal/src/lib.rs @@ -0,0 +1,3 @@ +mod configs; + +pub use configs::*; diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs index 9166ee4e11d..b2f408b0051 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs @@ -99,6 +99,7 @@ impl SecureChannelOptions { /// Freshly generated [`FlowControlId`] pub fn producer_flow_control_id(&self) -> FlowControlId { + // TODO: Return a reference instead self.flow_control_id.clone() }