Skip to content

Commit

Permalink
chore(rust): removed NodeManager dependency when instantiating a `M…
Browse files Browse the repository at this point in the history
…ultiAddr`
  • Loading branch information
davide-baldo committed Dec 5, 2024
1 parent a98145f commit 8f77e11
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 97 deletions.
47 changes: 41 additions & 6 deletions implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) use plain_tcp::PlainTcpInstantiator;
pub(crate) use project::ProjectInstantiator;
pub(crate) use secure::SecureChannelInstantiator;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

#[derive(Clone)]
pub struct Connection {
Expand Down Expand Up @@ -181,13 +182,49 @@ pub trait Instantiator: Send + Sync + 'static {
/// The returned [`Changes`] will be used to update the builder state.
async fn instantiate(
&self,
ctx: &Context,
node_manager: &NodeManager,
context: &Context,
transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
) -> Result<Changes, ockam_core::Error>;
}

/// Aggregates multiple [`Instantiator`]s, having a single object containing
/// all runtime dependencies.
pub struct ConnectionInstantiator {
instantiator: Vec<Arc<dyn Instantiator>>,
}

impl ConnectionInstantiator {
pub fn new() -> Self {
ConnectionInstantiator {
instantiator: vec![],
}
}

pub fn add(mut self, instantiator: impl Instantiator) -> Self {
self.instantiator.push(Arc::new(instantiator));
self
}

/// Resolve project ID (if any), create secure channel (if needed) and create a tcp connection
/// Returns [`Connection`]
pub async fn connect(&self, ctx: &Context, addr: &MultiAddr) -> Result<Connection> {
debug!("connecting to {}", &addr);

let mut connection_builder = ConnectionBuilder::new(addr.clone());
for instantiator in self.instantiator.clone() {
connection_builder = connection_builder
.instantiate(ctx, instantiator.as_ref())
.await?;
}
let connection = connection_builder.build();
connection.add_default_consumers(ctx);

debug!("connected to {connection:?}");
Ok(connection)
}
}

impl ConnectionBuilder {
pub fn new(multi_addr: MultiAddr) -> Self {
ConnectionBuilder {
Expand All @@ -214,11 +251,10 @@ impl ConnectionBuilder {
/// Used to instantiate a connection from a [`MultiAddr`]
/// when called multiple times the instantiator order matters and it's up to the
/// user make sure higher protocol abstraction are called before lower level ones
pub async fn instantiate(
pub async fn instantiate<T: Instantiator + ?Sized>(
mut self,
ctx: &Context,
node_manager: &NodeManager,
instantiator: impl Instantiator,
instantiator: &T,
) -> Result<Self, ockam_core::Error> {
//executing a regex-like search, shifting the starting point one by one
//not efficient by any mean, but it shouldn't be an issue
Expand All @@ -240,7 +276,6 @@ impl ConnectionBuilder {
let mut changes = instantiator
.instantiate(
ctx,
node_manager,
self.transport_route.clone(),
self.extract(start, instantiator.matches().len()),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ use crate::error::ApiError;
use crate::nodes::connection::{Changes, ConnectionBuilder, Instantiator};
use crate::{multiaddr_to_route, route_to_multiaddr};

use crate::nodes::NodeManager;
use ockam_core::{async_trait, Error, Route};
use ockam_core::{async_trait, Route};
use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Tcp};
use ockam_multiaddr::{Match, MultiAddr, Protocol};
use ockam_node::Context;
use ockam_transport_tcp::TcpTransport;

/// Creates the tcp connection.
pub(crate) struct PlainTcpInstantiator {}
pub(crate) struct PlainTcpInstantiator {
tcp_transport: TcpTransport,
}

impl PlainTcpInstantiator {
pub(crate) fn new() -> Self {
Self {}
pub(crate) fn new(tcp_transport: TcpTransport) -> Self {
Self { tcp_transport }
}
}

Expand All @@ -29,14 +31,13 @@ impl Instantiator for PlainTcpInstantiator {

async fn instantiate(
&self,
_ctx: &Context,
node_manager: &NodeManager,
_context: &Context,
_transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
) -> Result<Changes, Error> {
) -> Result<Changes, ockam_core::Error> {
let (before, tcp_piece, after) = extracted;

let mut tcp = multiaddr_to_route(&tcp_piece, &node_manager.tcp_transport)
let mut tcp = multiaddr_to_route(&tcp_piece, &self.tcp_transport)
.await
.ok_or_else(|| {
ApiError::core(format!(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,40 @@
use crate::error::ApiError;
use crate::nodes::connection::{Changes, Instantiator};
use crate::nodes::NodeManager;
use crate::{multiaddr_to_route, try_address_to_multiaddr};
use crate::{multiaddr_to_route, try_address_to_multiaddr, CliState};
use std::sync::Arc;

use ockam_core::{async_trait, Error, Route};
use ockam_core::{async_trait, Route};
use ockam_multiaddr::proto::Project;
use ockam_multiaddr::{Match, MultiAddr, Protocol};
use ockam_node::Context;

use crate::nodes::service::SecureChannelType;
use ockam::identity::Identifier;
use ockam::identity::{Identifier, SecureChannelOptions, SecureChannels};
use ockam_transport_tcp::TcpTransport;
use std::time::Duration;

/// Creates a secure connection to the project using provided credential
pub(crate) struct ProjectInstantiator {
identifier: Identifier,
timeout: Option<Duration>,
cli_state: CliState,
secure_channels: Arc<SecureChannels>,
tcp_transport: TcpTransport,
}

impl ProjectInstantiator {
pub fn new(identifier: Identifier, timeout: Option<Duration>) -> Self {
pub fn new(
identifier: Identifier,
timeout: Option<Duration>,
cli_state: CliState,
secure_channels: Arc<SecureChannels>,
tcp_transport: TcpTransport,
) -> Self {
Self {
identifier,
timeout,
cli_state,
secure_channels,
tcp_transport,
}
}
}
Expand All @@ -35,11 +47,10 @@ impl Instantiator for ProjectInstantiator {

async fn instantiate(
&self,
ctx: &Context,
node_manager: &NodeManager,
context: &Context,
_transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
) -> Result<Changes, Error> {
) -> Result<Changes, ockam_core::Error> {
let (_before, project_piece, after) = extracted;

let project_protocol_value = project_piece
Expand All @@ -50,11 +61,25 @@ impl Instantiator for ProjectInstantiator {
.cast::<Project>()
.ok_or_else(|| ApiError::core("invalid project protocol in multiaddr"))?;

let (project_multiaddr, project_identifier) =
node_manager.resolve_project(&project).await?;
let (project_multiaddr, project_identifier) = self
.cli_state
.projects()
.get_project_by_name(&project)
.await
.map(|project| {
(
project.project_multiaddr().cloned(),
project
.project_identifier()
.ok_or_else(|| ApiError::core("project identifier is missing")),
)
})?;

let project_identifier = project_identifier?;
let project_multiaddr = project_multiaddr?;

debug!(addr = %project_multiaddr, "creating secure channel");
let tcp = multiaddr_to_route(&project_multiaddr, &node_manager.tcp_transport)
let tcp = multiaddr_to_route(&project_multiaddr, &self.tcp_transport)
.await
.ok_or_else(|| {
ApiError::core(format!(
Expand All @@ -63,27 +88,29 @@ impl Instantiator for ProjectInstantiator {
})?;

debug!("create a secure channel to the project {project_identifier}");
let sc = node_manager
.create_secure_channel_internal(
ctx,
tcp.route,
&self.identifier.clone(),
Some(vec![project_identifier]),
None,
self.timeout,
SecureChannelType::KeyExchangeAndMessages,
)

let options = SecureChannelOptions::new().with_authority(project_identifier);
let options = if let Some(timeout) = self.timeout {
options.with_timeout(timeout)
} else {
options
};

let secure_channel = self
.secure_channels
.create_secure_channel(context, &self.identifier.clone(), tcp.route, options)
.await?;

// when creating a secure channel we want the route to pass through that
// when creating a secure channel, we want the route to pass through that
// ignoring previous steps, since they will be implicit
let mut current_multiaddr = try_address_to_multiaddr(sc.encryptor_address()).unwrap();
let mut current_multiaddr =
try_address_to_multiaddr(secure_channel.encryptor_address()).unwrap();
current_multiaddr.try_extend(after.iter())?;

Ok(Changes {
flow_control_id: Some(sc.flow_control_id().clone()),
flow_control_id: Some(secure_channel.flow_control_id().clone()),
current_multiaddr,
secure_channel_encryptors: vec![sc.encryptor_address().clone()],
secure_channel_encryptors: vec![secure_channel.encryptor_address().clone()],
tcp_connection: tcp.tcp_connection,
})
}
Expand Down
64 changes: 43 additions & 21 deletions implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::sync::Arc;
use std::time::Duration;

use crate::nodes::connection::{Changes, Instantiator};
use crate::nodes::NodeManager;
use crate::{local_multiaddr_to_route, try_address_to_multiaddr};

use crate::nodes::service::SecureChannelType;
use ockam::identity::Identifier;
use ockam_core::{async_trait, route, AsyncTryClone, Error, Route};
use ockam::identity::{
Identifier, SecureChannelOptions, SecureChannels, TrustEveryonePolicy,
TrustMultiIdentifiersPolicy,
};
use ockam_core::{async_trait, route, Route};
use ockam_multiaddr::proto::Secure;
use ockam_multiaddr::{Match, MultiAddr, Protocol};
use ockam_node::Context;
Expand All @@ -16,18 +18,24 @@ pub(crate) struct SecureChannelInstantiator {
identifier: Identifier,
authorized_identities: Option<Vec<Identifier>>,
timeout: Option<Duration>,
secure_channels: Arc<SecureChannels>,
authority: Option<Identifier>,
}

impl SecureChannelInstantiator {
pub(crate) fn new(
identifier: &Identifier,
timeout: Option<Duration>,
authorized_identities: Option<Vec<Identifier>>,
authority: Option<Identifier>,
secure_channels: Arc<SecureChannels>,
) -> Self {
Self {
identifier: identifier.clone(),
authorized_identities,
authority,
timeout,
secure_channels,
}
}
}
Expand All @@ -40,39 +48,53 @@ impl Instantiator for SecureChannelInstantiator {

async fn instantiate(
&self,
ctx: &Context,
node_manager: &NodeManager,
context: &Context,
transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
) -> Result<Changes, Error> {
) -> Result<Changes, ockam_core::Error> {
let (_before, secure_piece, after) = extracted;
debug!(%secure_piece, %transport_route, "creating secure channel");
let route = local_multiaddr_to_route(&secure_piece)?;

let sc_ctx = ctx.async_try_clone().await?;
let sc = node_manager
.create_secure_channel_internal(
&sc_ctx,
//the transport route is needed to reach the secure channel listener
//since it can be in another node
route![transport_route, route],
let options = SecureChannelOptions::new();

let options = match self.authorized_identities.clone() {
Some(ids) => options.with_trust_policy(TrustMultiIdentifiersPolicy::new(ids)),
None => options.with_trust_policy(TrustEveryonePolicy),
};

let options = if let Some(authority) = self.authority.clone() {
options.with_authority(authority)
} else {
options
};

let options = if let Some(timeout) = self.timeout {
options.with_timeout(timeout)
} else {
options
};

let secure_channel = self
.secure_channels
.create_secure_channel(
context,
&self.identifier,
self.authorized_identities.clone(),
None,
self.timeout,
SecureChannelType::KeyExchangeAndMessages,
route![transport_route, route],
options,
)
.await?;

// when creating a secure channel we want the route to pass through that
// ignoring previous steps, since they will be implicit
let mut current_multiaddr = try_address_to_multiaddr(sc.encryptor_address()).unwrap();
let mut current_multiaddr =
try_address_to_multiaddr(secure_channel.encryptor_address()).unwrap();
current_multiaddr.try_extend(after.iter())?;

Ok(Changes {
current_multiaddr,
flow_control_id: Some(sc.flow_control_id().clone()),
secure_channel_encryptors: vec![sc.encryptor_address().clone()],
flow_control_id: Some(secure_channel.flow_control_id().clone()),
secure_channel_encryptors: vec![secure_channel.encryptor_address().clone()],
tcp_connection: None,
})
}
Expand Down
Loading

0 comments on commit 8f77e11

Please sign in to comment.