Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rust): add low latency example #8725

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"examples/rust/file_transfer",
"examples/rust/get_started",
"examples/rust/tcp_inlet_and_outlet",
"examples/rust/low_latency_portal",
"examples/rust/mitm_node",
"implementations/rust/ockam/*",
"tools/docs/example_blocks",
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/get_started/examples/bob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl Worker for Echoer {
type Message = String;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
println!("\n[✓] Address: {}, Received: {:?}", ctx.address(), msg);
println!("\n[✓] Address: {}, Received: {:?}", ctx.primary_address(), msg);

// Echo the message body back on its return_route.
ctx.send(msg.return_route().clone(), msg.into_body()?).await
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/get_started/src/echoer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ impl Worker for Echoer {
type Message = String;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
println!("Address: {}, Received: {:?}", ctx.address(), msg);
println!("Address: {}, Received: {:?}", ctx.primary_address(), msg);

// Echo the message body back on its return_route.
ctx.send(msg.return_route().clone(), msg.into_body()?).await
Expand Down
4 changes: 2 additions & 2 deletions examples/rust/get_started/src/hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ impl Worker for Hop {
/// This handle function takes any incoming message and forwards
/// it to the next hop in it's onward route
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
println!("Address: {}, Received: {:?}", ctx.address(), msg);
println!("Address: {}, Received: {:?}", ctx.primary_address(), msg);

// Send the message to the next worker on its onward_route
ctx.forward(msg.into_local_message().step_forward(&ctx.address())?)
ctx.forward(msg.into_local_message().step_forward(ctx.primary_address())?)
.await
}
}
10 changes: 7 additions & 3 deletions examples/rust/get_started/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ impl Worker for Logger {
let payload = local_msg.payload_ref();

if let Ok(str) = String::from_utf8(payload.to_vec()) {
println!("Address: {}, Received string: {}", ctx.address(), str);
println!("Address: {}, Received string: {}", ctx.primary_address(), str);
} else {
println!("Address: {}, Received binary: {}", ctx.address(), hex::encode(payload));
println!(
"Address: {}, Received binary: {}",
ctx.primary_address(),
hex::encode(payload)
);
}

ctx.forward(local_msg.step_forward(&ctx.address())?).await
ctx.forward(local_msg.step_forward(ctx.primary_address())?).await
}
}
2 changes: 1 addition & 1 deletion examples/rust/get_started/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Worker for Relay {
/// This handle function takes any incoming message and forwards
/// it to the next hop in it's onward route
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
println!("Address: {}, Received: {:?}", ctx.address(), msg);
println!("Address: {}, Received: {:?}", ctx.primary_address(), msg);

let next_on_route = self.route.next()?.clone();

Expand Down
1 change: 1 addition & 0 deletions examples/rust/low_latency_portal/.rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
max_width = 120
16 changes: 16 additions & 0 deletions examples/rust/low_latency_portal/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "low_latency_portal"
version = "0.1.0"
authors = ["Ockam Developers"]
edition = "2021"
license = "Apache-2.0"
publish = false
rust-version = "1.70.0"

[dependencies]
async-trait = "0.1"
hex = "0.4.3"
log = "0.4.22"
ockam = { path = "../../../implementations/rust/ockam/ockam", features = ["aws-lc"] }
serde = { version = "1.0.215", features = ["derive"] }
serde_json = "1.0"
1 change: 1 addition & 0 deletions examples/rust/low_latency_portal/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Low latency portal
90 changes: 90 additions & 0 deletions examples/rust/low_latency_portal/examples/config-generator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use low_latency_portal::{InletConfig, OutletConfig, RelayConfig};
use ockam::compat::rand::{thread_rng, RngCore};
use ockam::identity::{Identities, IdentityBuilder, Vault};
use ockam::transport::HostnamePort;
use ockam::vault::{
EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSigning, EDDSA_CURVE25519_SECRET_KEY_LENGTH,
};
use ockam::{Context, Result};

#[ockam::node]
async fn main(ctx: Context) -> Result<()> {
let identity_vault = SoftwareVaultForSigning::create().await?;

let mut inlet_key_binary = [0u8; EDDSA_CURVE25519_SECRET_KEY_LENGTH];
let mut outlet_key_binary = [0u8; EDDSA_CURVE25519_SECRET_KEY_LENGTH];
let mut relay_key_binary = [0u8; EDDSA_CURVE25519_SECRET_KEY_LENGTH];

{
let mut rng = thread_rng();

rng.fill_bytes(&mut inlet_key_binary);
rng.fill_bytes(&mut outlet_key_binary);
rng.fill_bytes(&mut relay_key_binary);
}

let inlet_key = identity_vault
.import_key(SigningSecret::EdDSACurve25519(EdDSACurve25519SecretKey::new(
inlet_key_binary,
)))
.await?;
let outlet_key = identity_vault
.import_key(SigningSecret::EdDSACurve25519(EdDSACurve25519SecretKey::new(
outlet_key_binary,
)))
.await?;
let relay_key = identity_vault
.import_key(SigningSecret::EdDSACurve25519(EdDSACurve25519SecretKey::new(
relay_key_binary,
)))
.await?;

let mut vault = Vault::create().await?;
vault.identity_vault = identity_vault;

let identities = Identities::builder().await?.with_vault(vault).build();
let inlet_identifier = IdentityBuilder::new(identities.identities_creation())
.with_existing_key(inlet_key)
.build()
.await?;
let outlet_identifier = IdentityBuilder::new(identities.identities_creation())
.with_existing_key(outlet_key)
.build()
.await?;
let relay_identifier = IdentityBuilder::new(identities.identities_creation())
.with_existing_key(relay_key)
.build()
.await?;

let inlet_config = InletConfig {
inlet_change_history: hex::encode(identities.export_identity(&inlet_identifier).await?),
inlet_identity_key: hex::encode(inlet_key_binary),
inlet_address: HostnamePort::new("0.0.0.0", 4000),
relay_address: HostnamePort::new("127.0.0.1", 4001),
outlet_identifier: outlet_identifier.to_string(),
outlet_relay_name: "outlet_relay".to_string(),
};

let outlet_config = OutletConfig {
outlet_change_history: hex::encode(identities.export_identity(&outlet_identifier).await?),
outlet_identity_key: hex::encode(outlet_key_binary),
outlet_relay_name: "outlet_relay".to_string(),
outlet_peer_address: HostnamePort::new("127.0.0.1", 5000),
relay_identifier: relay_identifier.to_string(),
relay_address: HostnamePort::new("127.0.0.1", 4001),
inlet_identifiers: vec![inlet_identifier.to_string()],
};

let relay_config = RelayConfig {
relay_change_history: hex::encode(identities.export_identity(&relay_identifier).await?),
relay_identity_key: hex::encode(relay_key_binary),
outlet_identifier: outlet_identifier.to_string(),
relay_listener_address: HostnamePort::new("0.0.0.0", 4001),
};

std::fs::write("inlet.config.json", serde_json::to_vec(&inlet_config).unwrap()).unwrap();
std::fs::write("outlet.config.json", serde_json::to_vec(&outlet_config).unwrap()).unwrap();
std::fs::write("relay.config.json", serde_json::to_vec(&relay_config).unwrap()).unwrap();

ctx.stop().await
}
96 changes: 96 additions & 0 deletions examples/rust/low_latency_portal/examples/inlet-node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use log::info;
use low_latency_portal::{parse, HashMapRepository, InletConfig};
use ockam::compat::str::FromStr;
use ockam::compat::sync::Arc;
use ockam::identity::models::ChangeHistory;
use ockam::identity::{
Identifier, Identities, SecureChannelOptions, SecureChannelRegistry, SecureChannels, TrustIdentifierPolicy, Vault,
};
use ockam::tcp::{TcpConnectionOptions, TcpInletOptions, TcpTransport};
use ockam::vault::{
EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSecureChannels, SoftwareVaultForSigning,
SoftwareVaultForVerifyingSignatures,
};
use ockam::{route, Context, Result};

#[ockam::node]
async fn main(ctx: Context) -> Result<()> {
let args: Vec<String> = std::env::args().collect();

let config = if let Some(config) = args.get(1) {
config.clone()
} else {
let config = std::fs::read("inlet.config.json").unwrap();
String::from_utf8(config).unwrap()
};

let config: InletConfig = parse(&config)?;

let outlet_identifier = Identifier::from_str(&config.outlet_identifier)?;
let inlet_identity_key = hex::decode(config.inlet_identity_key).unwrap();
let inlet_change_history = ChangeHistory::import_from_string(&config.inlet_change_history)?;

let hash_map_storage = Arc::new(HashMapRepository::default());

let tcp = TcpTransport::create(&ctx).await?;

let identity_vault = Arc::new(SoftwareVaultForSigning::new(hash_map_storage.clone()));
let secure_channel_vault = Arc::new(SoftwareVaultForSecureChannels::new(hash_map_storage.clone()));
let credential_vault = Arc::new(SoftwareVaultForSigning::new(hash_map_storage.clone()));
let verifying_vault = Arc::new(SoftwareVaultForVerifyingSignatures::new());

let relay_identity_key = EdDSACurve25519SecretKey::new(inlet_identity_key.try_into().unwrap());

let relay_identity_key = SigningSecret::EdDSACurve25519(relay_identity_key);

identity_vault.import_key(relay_identity_key).await?;

let vault = Vault::new(identity_vault, secure_channel_vault, credential_vault, verifying_vault);

let identities = Identities::new(
vault,
hash_map_storage.clone(),
hash_map_storage.clone(),
hash_map_storage.clone(),
hash_map_storage.clone(),
);
let secure_channels = SecureChannels::new(
Arc::new(identities),
SecureChannelRegistry::new(),
hash_map_storage.clone(),
);

let inlet_identifier = secure_channels
.identities()
.identities_verification()
.import_from_change_history(None, inlet_change_history)
.await?;

let tcp_connection_to_relay = tcp
.connect(config.relay_address.to_string(), TcpConnectionOptions::new())
.await?;

let secure_channel_to_outlet = secure_channels
.create_secure_channel(
&ctx,
&inlet_identifier,
route![
tcp_connection_to_relay,
format!("forward_to_{}", config.outlet_relay_name),
"api"
],
SecureChannelOptions::new().with_trust_policy(TrustIdentifierPolicy::new(outlet_identifier)),
)
.await?;

tcp.create_inlet(
config.inlet_address.to_string(),
route![secure_channel_to_outlet, "outlet"],
TcpInletOptions::new(),
)
.await?;

info!("Initialized successfully");

Ok(())
}
Loading
Loading