Skip to content

Commit

Permalink
feat(rust): merge Processor into Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Jan 12, 2025
1 parent 80e114f commit 3778d95
Show file tree
Hide file tree
Showing 99 changed files with 444 additions and 1,231 deletions.
1 change: 0 additions & 1 deletion examples/rust/file_transfer/examples/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ struct FileReception {

#[ockam::worker]
impl Worker for FileReception {
type Context = Context;
type Message = FileData;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Self::Message>) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion examples/rust/get_started/examples/bob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ struct Echoer;
// echoes it back on its return route.
#[ockam::worker]
impl Worker for Echoer {
type Context = Context;
type Message = String;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion examples/rust/get_started/src/echoer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub struct Echoer;

#[ockam::worker]
impl Worker for Echoer {
type Context = Context;
type Message = String;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion examples/rust/get_started/src/hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub struct Hop;

#[ockam::worker]
impl Worker for Hop {
type Context = Context;
type Message = Any;

/// This handle function takes any incoming message and forwards
Expand Down
1 change: 0 additions & 1 deletion examples/rust/get_started/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub struct Logger;

#[ockam::worker]
impl Worker for Logger {
type Context = Context;
type Message = Any;

/// This handle function takes any incoming message and print its content as a UTF-8 string
Expand Down
1 change: 0 additions & 1 deletion examples/rust/get_started/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ impl Relay {

#[ockam::worker]
impl Worker for Relay {
type Context = Context;
type Message = Any;

/// This handle function takes any incoming message and forwards
Expand Down
4 changes: 2 additions & 2 deletions examples/rust/get_started/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ fn run_05_secure_channel_over_two_transport_hops() -> Result<(), Error> {
// Launch responder, wait for it to start up
let resp =
CmdBuilder::new("cargo run --locked --example 05-secure-channel-over-two-transport-hops-responder").spawn()?;
resp.match_stdout("Initializing ockam processor")?;
resp.match_stdout("Initializing ockam worker")?;

// Launch middle, wait for it to start up
let mid =
CmdBuilder::new("cargo run --locked --example 05-secure-channel-over-two-transport-hops-middle").spawn()?;
mid.match_stdout("Initializing ockam processor")?;
mid.match_stdout("Initializing ockam worker")?;

// Run initiator to completion
let (exitcode, stdout) =
Expand Down
14 changes: 7 additions & 7 deletions examples/rust/mitm_node/src/tcp_interceptor/workers/listener.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::tcp_interceptor::{Role, TcpMitmProcessor, TcpMitmRegistry, TcpMitmTransport};
use ockam_core::{async_trait, compat::net::SocketAddr};
use ockam_core::{Address, Processor, Result};
use ockam_node::Context;
use ockam_core::{Address, Result};
use ockam_node::{Context, Worker};
use ockam_transport_core::TransportError;
use tokio::net::{TcpListener, TcpStream};
use tracing::debug;
Expand Down Expand Up @@ -34,29 +34,29 @@ impl TcpMitmListenProcessor {
target_addr,
};

ctx.start_processor(address.clone(), processor)?;
ctx.start_worker(address.clone(), processor)?;

Ok((saddr, address))
}
}

#[async_trait]
impl Processor for TcpMitmListenProcessor {
type Context = Context;
impl Worker for TcpMitmListenProcessor {
type Message = ();

async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
self.registry.add_listener(ctx.primary_address());

Ok(())
}

async fn shutdown(&mut self, ctx: &mut Self::Context) -> Result<()> {
async fn shutdown(&mut self, ctx: &mut Context) -> Result<()> {
self.registry.remove_listener(ctx.primary_address());

Ok(())
}

async fn process(&mut self, ctx: &mut Self::Context) -> Result<bool> {
async fn process(&mut self, ctx: &mut Context) -> Result<bool> {
debug!("Waiting for incoming TCP connection...");

let (stream, _peer) = self.inner.accept().await.map_err(TransportError::from)?;
Expand Down
12 changes: 6 additions & 6 deletions examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::tcp_interceptor::{Role, TcpMitmRegistry};
use ockam_core::compat::sync::Arc;
use ockam_core::Result;
use ockam_core::{async_trait, Address, AllowAll};
use ockam_core::{Processor, Result};
use ockam_node::compat::asynchronous::Mutex;
use ockam_node::Context;
use ockam_node::{Context, Worker};
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::{io::AsyncReadExt, net::tcp::OwnedReadHalf};
Expand Down Expand Up @@ -47,15 +47,15 @@ impl TcpMitmProcessor {

let receiver = Self::new(address_of_other_processor, role, read_half, write_half, registry);

ctx.start_processor_with_access_control(address, receiver, AllowAll, AllowAll)?;
ctx.start_worker_with_access_control(address, receiver, AllowAll, AllowAll)?;

Ok(())
}
}

#[async_trait]
impl Processor for TcpMitmProcessor {
type Context = Context;
impl Worker for TcpMitmProcessor {
type Message = ();

async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
self.registry
Expand All @@ -66,7 +66,7 @@ impl Processor for TcpMitmProcessor {
Ok(())
}

async fn shutdown(&mut self, ctx: &mut Self::Context) -> Result<()> {
async fn shutdown(&mut self, ctx: &mut Context) -> Result<()> {
self.registry.remove_processor(ctx.primary_address());

debug!("Shutdown {}", ctx.primary_address());
Expand Down
1 change: 0 additions & 1 deletion examples/rust/no_std/src/echoer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub struct Echoer;

#[ockam::worker]
impl Worker for Echoer {
type Context = Context;
type Message = String;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion examples/rust/no_std/src/hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub struct Hop;

#[ockam::worker]
impl Worker for Hop {
type Context = Context;
type Message = Any;

/// This handle function takes any incoming message and forwards
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/tcp_inlet_and_outlet/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn run_01_inlet_outlet_one_process() -> Result<(), Error> {
"cargo run --locked --example 01-inlet-outlet 127.0.0.1:{port} ockam.io:80"
))
.spawn()?;
runner.match_stdout(r"(?i)Starting new processor")?;
runner.match_stdout(r"(?i)Starting new worker")?;

// Run curl and check for a successful run
let (exitcode, stdout) =
Expand Down
4 changes: 2 additions & 2 deletions implementations/rust/ockam/ockam/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub use ockam_core::processor;
pub use ockam_core::worker;
pub use ockam_core::{
allow, deny, errcode, route, Address, Any, Encoded, Error, LocalMessage, Mailbox, Mailboxes,
Message, Processor, ProtocolId, Result, Route, Routed, TransportMessage, TryClone, Worker,
Message, ProtocolId, Result, Route, Routed, TransportMessage, TryClone,
};
pub use ockam_identity as identity;
// ---
Expand All @@ -78,7 +78,7 @@ pub use ockam_macros::{node, test};
pub use ockam_node::database::*;
pub use ockam_node::{
debugger, Context, DelayedEvent, Executor, MessageReceiveOptions, MessageSendReceiveOptions,
NodeBuilder, WorkerBuilder,
NodeBuilder, Worker, WorkerBuilder,
};
#[cfg(feature = "ockam_transport_tcp")]
/// TCP transport
Expand Down
40 changes: 5 additions & 35 deletions implementations/rust/ockam/ockam/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ use ockam_core::compat::string::String;
use ockam_core::compat::sync::Arc;
use ockam_core::flow_control::FlowControls;
use ockam_core::{
Address, IncomingAccessControl, Message, OutgoingAccessControl, Processor, Result, Route,
Routed, TryClone, Worker,
Address, IncomingAccessControl, Message, OutgoingAccessControl, Result, Route, Routed, TryClone,
};
use ockam_identity::{
CredentialRepository, IdentitiesAttributes, IdentitiesVerification,
IdentityAttributesRepository, PurposeKeys, Vault,
};
use ockam_node::{Context, HasContext, MessageReceiveOptions, MessageSendReceiveOptions};
use ockam_node::{Context, HasContext, MessageReceiveOptions, MessageSendReceiveOptions, Worker};
use ockam_vault::storage::SecretsRepository;
use ockam_vault::SigningSecretKeyHandle;

Expand Down Expand Up @@ -161,51 +160,22 @@ impl Node {
}

/// Start a new worker instance at the given address. Default Access Control is AllowAll
pub fn start_worker<W>(&self, address: impl Into<Address>, worker: W) -> Result<()>
where
W: Worker<Context = Context>,
{
pub fn start_worker<W: Worker>(&self, address: impl Into<Address>, worker: W) -> Result<()> {
self.context.start_worker(address, worker)
}

/// Start a new worker instance at the given address with given Access Controls
pub fn start_worker_with_access_control<W>(
pub fn start_worker_with_access_control<W: Worker>(
&self,
address: impl Into<Address>,
worker: W,
incoming: impl IncomingAccessControl,
outgoing: impl OutgoingAccessControl,
) -> Result<()>
where
W: Worker<Context = Context>,
{
) -> Result<()> {
self.context
.start_worker_with_access_control(address, worker, incoming, outgoing)
}

/// Start a new processor instance at the given address. Default Access Control is DenyAll
pub fn start_processor<P>(&self, address: impl Into<Address>, processor: P) -> Result<()>
where
P: Processor<Context = Context>,
{
self.context.start_processor(address, processor)
}

/// Start a new processor instance at the given address with given Access Controls
pub fn start_processor_with_access_control<P>(
&self,
address: impl Into<Address>,
processor: P,
incoming: impl IncomingAccessControl,
outgoing: impl OutgoingAccessControl,
) -> Result<()>
where
P: Processor<Context = Context>,
{
self.context
.start_processor_with_access_control(address, processor, incoming, outgoing)
}

/// Signal to the local runtime to shut down
pub async fn shutdown(&mut self) -> Result<()> {
self.context.shutdown_node().await
Expand Down
9 changes: 4 additions & 5 deletions implementations/rust/ockam/ockam/src/relay_service/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use ockam_core::compat::sync::Arc;
use ockam_core::compat::{boxed::Box, vec::Vec};
use ockam_core::{
route, Address, AllowAll, AllowOnwardAddress, Any, IncomingAccessControl, LocalMessage,
OutgoingAccessControl, Result, Route, Routed, Worker,
OutgoingAccessControl, Result, Route, Routed,
};
use ockam_node::WorkerBuilder;
use ockam_node::{Worker, WorkerBuilder};
use tracing::info;

pub(super) struct Relay {
Expand Down Expand Up @@ -52,10 +52,9 @@ impl Relay {

#[crate::worker]
impl Worker for Relay {
type Context = Context;
type Message = Any;

async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> {
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
let payload = self
.payload
.take()
Expand All @@ -77,7 +76,7 @@ impl Worker for Relay {

async fn handle_message(
&mut self,
ctx: &mut Self::Context,
ctx: &mut Context,
msg: Routed<Self::Message>,
) -> Result<()> {
let mut local_message = msg.into_local_message();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use alloc::string::String;
use ockam_core::compat::boxed::Box;
use ockam_core::compat::sync::Arc;
use ockam_core::{
Address, DenyAll, Encodable, Mailbox, Mailboxes, Result, Routed, SecureChannelLocalInfo, Worker,
Address, DenyAll, Encodable, Mailbox, Mailboxes, Result, Routed, SecureChannelLocalInfo,
};
use ockam_node::WorkerBuilder;
use ockam_node::{Worker, WorkerBuilder};

/// Alias worker to register remote workers under local names.
///
Expand Down Expand Up @@ -62,12 +62,11 @@ impl RelayService {

#[crate::worker]
impl Worker for RelayService {
type Context = Context;
type Message = String;

async fn handle_message(
&mut self,
ctx: &mut Self::Context,
ctx: &mut Context,
message: Routed<Self::Message>,
) -> Result<()> {
let secure_channel_local_info =
Expand Down
6 changes: 3 additions & 3 deletions implementations/rust/ockam/ockam/src/remote/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use ockam_core::compat::{
boxed::Box,
string::{String, ToString},
};
use ockam_core::{Any, Decodable, Result, Routed, Worker};
use ockam_core::{Any, Decodable, Result, Routed};
use ockam_node::Worker;
use tracing::{debug, info};

#[crate::worker]
impl Worker for RemoteRelay {
type Context = Context;
type Message = Any;

async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> {
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
debug!(registration_route = %self.registration_route, "RemoteRelay initializing...");

ctx.send_from_address(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use ockam_core::api::{Method, RequestHeader, Response};
use ockam_core::compat::boxed::Box;
use ockam_core::compat::sync::Arc;
use ockam_core::compat::vec::Vec;
use ockam_core::{Result, Routed, SecureChannelLocalInfo, Worker};
use ockam_node::Context;
use ockam_core::{Result, Routed, SecureChannelLocalInfo};
use ockam_node::{Context, Worker};

/// This struct runs as a Worker to issue credentials based on a request/response protocol
pub struct CredentialIssuerWorker {
Expand Down Expand Up @@ -48,7 +48,6 @@ impl CredentialIssuerWorker {

#[ockam_core::worker]
impl Worker for CredentialIssuerWorker {
type Context = Context;
type Message = Vec<u8>;

async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use tracing::trace;
use ockam::identity::{Identifier, IdentitiesAttributes};
use ockam_core::api::{Method, RequestHeader, Response};
use ockam_core::compat::sync::Arc;
use ockam_core::{Result, Routed, SecureChannelLocalInfo, Worker};
use ockam_node::Context;
use ockam_core::{Result, Routed, SecureChannelLocalInfo};
use ockam_node::{Context, Worker};

use crate::authenticator::direct::types::AddMember;
use crate::authenticator::direct::DirectAuthenticator;
Expand Down Expand Up @@ -37,7 +37,6 @@ impl DirectAuthenticatorWorker {
#[ockam_core::worker]
impl Worker for DirectAuthenticatorWorker {
type Message = Vec<u8>;
type Context = Context;

async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {
let secure_channel_info = match SecureChannelLocalInfo::find_info(m.local_message()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use minicbor::Decoder;
use ockam::identity::Identifier;
use ockam_core::api::{Method, RequestHeader, Response};
use ockam_core::compat::sync::Arc;
use ockam_core::{Result, Routed, SecureChannelLocalInfo, Worker};
use ockam_node::Context;
use ockam_core::{Result, Routed, SecureChannelLocalInfo};
use ockam_node::{Context, Worker};
use tracing::trace;

pub struct EnrollmentTokenAcceptorWorker {
Expand All @@ -27,7 +27,6 @@ impl EnrollmentTokenAcceptorWorker {

#[ockam_core::worker]
impl Worker for EnrollmentTokenAcceptorWorker {
type Context = Context;
type Message = Vec<u8>;

async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {
Expand Down
Loading

0 comments on commit 3778d95

Please sign in to comment.