Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): rewrite ockam_node #8739

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions examples/rust/file_transfer/examples/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Worker for FileReception {
Ok(n) => {
self.written_size += n;
if self.written_size == self.size {
ctx.stop().await?;
ctx.shutdown_node().await?;
}
}
Err(e) => {
Expand All @@ -71,7 +71,7 @@ impl Worker for FileReception {
);
}
}
FileData::Quit => ctx.stop().await?,
FileData::Quit => ctx.shutdown_node().await?,
}

Ok(())
Expand All @@ -81,7 +81,7 @@ impl Worker for FileReception {
#[ockam::node]
async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an Identity to represent Receiver.
let receiver = node.create_identity().await?;
Expand All @@ -90,13 +90,14 @@ async fn main(ctx: Context) -> Result<()> {
let secure_channel_listener_options =
SecureChannelListenerOptions::new().as_consumer(&tcp_options.flow_control_id());

node.flow_controls()
.add_consumer("receiver", &secure_channel_listener_options.spawner_flow_control_id());
node.flow_controls().add_consumer(
&"receiver".into(),
&secure_channel_listener_options.spawner_flow_control_id(),
);

// Create a secure channel listener for Receiver that will wait for requests to
// initiate an Authenticated Key Exchange.
node.create_secure_channel_listener(&receiver, "listener", secure_channel_listener_options)
.await?;
node.create_secure_channel_listener(&receiver, "listener", secure_channel_listener_options)?;

// The computer that is running this program is likely within a private network and
// not accessible over the internet.
Expand All @@ -116,8 +117,8 @@ async fn main(ctx: Context) -> Result<()> {
println!("{}", relay.remote_address());

// Start a worker, of type FileReception, at address "receiver".
node.start_worker("receiver", FileReception::default()).await?;
node.start_worker("receiver", FileReception::default())?;

// We won't call ctx.stop() here, this program will quit when the file will be entirely received
// We won't call ctx.shutdown_node() here, this program will quit when the file will be entirely received
Ok(())
}
4 changes: 2 additions & 2 deletions examples/rust/file_transfer/examples/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main(ctx: Context) -> Result<()> {
let opt = Sender::parse();

let node = node(ctx).await?;
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an Identity to represent Sender.
let sender = node.create_identity().await?;
Expand Down Expand Up @@ -94,6 +94,6 @@ async fn main(ctx: Context) -> Result<()> {
}
}

// We won't call ctx.stop() here, this program will run until you stop it with Ctrl-C
// We won't call ctx.shutdown_node() here, this program will run until you stop it with Ctrl-C
Ok(())
}
2 changes: 1 addition & 1 deletion examples/rust/get_started/examples/01-node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Stop the node as soon as it starts.
node.stop().await
node.shutdown().await
}
4 changes: 2 additions & 2 deletions examples/rust/get_started/examples/02-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Start a worker, of type Echoer, at address "echoer"
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Send a message to the worker at address "echoer".
node.send("echoer", "Hello Ockam!".to_string()).await?;
Expand All @@ -19,5 +19,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply.into_body()?); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
10 changes: 5 additions & 5 deletions examples/rust/get_started/examples/03-routing-many-hops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Start an Echoer worker at address "echoer"
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Start 3 hop workers at addresses "h1", "h2" and "h3".
node.start_worker("h1", Hop).await?;
node.start_worker("h2", Hop).await?;
node.start_worker("h3", Hop).await?;
node.start_worker("h1", Hop)?;
node.start_worker("h2", Hop)?;
node.start_worker("h3", Hop)?;

// Send a message to the echoer worker via the "h1", "h2", and "h3" workers
let r = route!["h1", "h2", "h3", "echoer"];
Expand All @@ -25,5 +25,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply.into_body()?); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
6 changes: 3 additions & 3 deletions examples/rust/get_started/examples/03-routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Start a worker, of type Echoer, at address "echoer"
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Start a worker, of type Hop, at address "h1"
node.start_worker("h1", Hop).await?;
node.start_worker("h1", Hop)?;

// Send a message to the worker at address "echoer",
// via the worker at address "h1"
Expand All @@ -23,5 +23,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply.into_body()?); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Initialize the TCP Transport.
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to a different node.
let connection_to_responder = tcp.connect("localhost:4000", TcpConnectionOptions::new()).await?;
Expand All @@ -22,5 +22,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:4000", TcpListenerOptions::new()).await?;

// Allow access to the Echoer via TCP connections from the TCP listener
node.flow_controls().add_consumer("echoer", listener.flow_control_id());
node.flow_controls()
.add_consumer(&"echoer".into(), listener.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to the middle node.
let connection_to_middle_node = tcp.connect("localhost:3000", TcpConnectionOptions::new()).await?;
Expand All @@ -21,5 +21,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,21 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to the responder node.
let connection_to_responder = tcp.connect("127.0.0.1:4000", TcpConnectionOptions::new()).await?;

// Create and start a Relay worker
node.start_worker("forward_to_responder", Relay::new(connection_to_responder))
.await?;
node.start_worker("forward_to_responder", Relay::new(connection_to_responder))?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:3000", TcpListenerOptions::new()).await?;

// Allow access to the Relay via TCP connections from the TCP listener
node.flow_controls()
.add_consumer("forward_to_responder", listener.flow_control_id());
.add_consumer(&"forward_to_responder".into(), listener.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:4000", TcpListenerOptions::new()).await?;

// Allow access to the Echoer via TCP connections from the TCP listener
node.flow_controls().add_consumer("echoer", listener.flow_control_id());
node.flow_controls()
.add_consumer(&"echoer".into(), listener.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ async fn main(ctx: Context) -> Result<()> {
.await?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

node.flow_controls().add_consumer("echoer", bind.flow_control_id());
node.flow_controls()
.add_consumer(&"echoer".into(), bind.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply.into_body()?); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ async fn main(ctx: Context) -> Result<()> {
uds.listen("/tmp/ockam-example-echoer").await?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main(ctx: Context) -> Result<()> {
let alice = node.create_identity().await?;

// Create a TCP connection to the middle node.
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;
let connection_to_middle_node = tcp.connect("localhost:3000", TcpConnectionOptions::new()).await?;

// Connect to a secure channel listener and perform a handshake.
Expand All @@ -31,5 +31,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to Bob.
let connection_to_bob = tcp.connect("127.0.0.1:4000", TcpConnectionOptions::new()).await?;

// Start a Relay to forward messages to Bob using the TCP connection.
node.start_worker("forward_to_bob", Relay::new(route![connection_to_bob]))
.await?;
node.start_worker("forward_to_bob", Relay::new(route![connection_to_bob]))?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:3000", TcpListenerOptions::new()).await?;

node.flow_controls()
.add_consumer("forward_to_bob", listener.flow_control_id());
.add_consumer(&"forward_to_bob".into(), listener.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport.
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

let bob = node.create_identity().await?;

Expand All @@ -23,18 +23,16 @@ async fn main(ctx: Context) -> Result<()> {

// Create a secure channel listener for Bob that will wait for requests to
// initiate an Authenticated Key Exchange.
let secure_channel_listener = node
.create_secure_channel_listener(
&bob,
"bob_listener",
SecureChannelListenerOptions::new().as_consumer(listener.flow_control_id()),
)
.await?;
let secure_channel_listener = node.create_secure_channel_listener(
&bob,
"bob_listener",
SecureChannelListenerOptions::new().as_consumer(listener.flow_control_id()),
)?;

// Allow access to the Echoer via Secure Channels
node.flow_controls()
.add_consumer("echoer", secure_channel_listener.flow_control_id());
.add_consumer(&"echoer".into(), secure_channel_listener.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Loading
Loading