Skip to content

Commit

Permalink
feat(rust): converted socket addresses to hostnames in command
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed Jul 8, 2024
1 parent d77cfff commit bf4b56d
Show file tree
Hide file tree
Showing 30 changed files with 266 additions and 313 deletions.
24 changes: 1 addition & 23 deletions examples/rust/mitm_node/src/tcp_interceptor/transport/common.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,7 @@
use ockam_core::compat::net::{SocketAddr, ToSocketAddrs};
use ockam_core::compat::net::SocketAddr;
use ockam_core::Result;
use ockam_transport_core::TransportError;

/// Resolve the given peer to a [`SocketAddr`](std::net::SocketAddr)
pub(super) fn resolve_peer(peer: String) -> Result<SocketAddr> {
// Try to parse as SocketAddr
if let Ok(p) = parse_socket_addr(&peer) {
return Ok(p);
}

// Try to resolve hostname
if let Ok(mut iter) = peer.to_socket_addrs() {
// Prefer ip4
if let Some(p) = iter.find(|x| x.is_ipv4()) {
return Ok(p);
}
if let Some(p) = iter.find(|x| x.is_ipv6()) {
return Ok(p);
}
}

// Nothing worked, return an error
Err(TransportError::InvalidAddress(peer))?
}

pub(super) fn parse_socket_addr(s: &str) -> Result<SocketAddr> {
Ok(s.parse().map_err(|_| TransportError::InvalidAddress(s.to_string()))?)
}
4 changes: 3 additions & 1 deletion implementations/rust/ockam/ockam/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ pub mod transport {
#[cfg(feature = "std")]
pub use ockam_transport_core::resolve_peer;

pub use ockam_transport_core::{parse_socket_addr, HostnamePort, Transport};
pub use ockam_transport_core::{
parse_socket_addr, HostnamePort, StaticHostnamePort, Transport,
};
}

// ---
Expand Down
42 changes: 22 additions & 20 deletions implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use core::str::FromStr;
use minicbor::Decoder;
use ockam_core::compat::net::IpAddr;

use crate::kafka::kafka_outlet_address;
use crate::nodes::models::portal::{CreateInlet, InletStatus};
use crate::nodes::NODEMANAGER_ADDR;
use crate::port_range::PortRange;
use ockam::compat::tokio::sync::Mutex;
use ockam_abac::PolicyExpression;
use ockam_core::api::{Request, ResponseHeader, Status};
Expand All @@ -14,11 +17,7 @@ use ockam_core::{route, Error};
use ockam_core::{Result, Route};
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;

use crate::kafka::kafka_outlet_address;
use crate::nodes::models::portal::{CreateInlet, InletStatus};
use crate::nodes::NODEMANAGER_ADDR;
use crate::port_range::PortRange;
use ockam_transport_core::HostnamePort;

type BrokerId = i32;

Expand All @@ -33,10 +32,10 @@ pub(crate) struct KafkaInletController {

#[derive(Debug)]
struct KafkaInletMapInner {
broker_map: HashMap<BrokerId, SocketAddr>,
broker_map: HashMap<BrokerId, HostnamePort>,
port_range: PortRange,
current_port: u16,
bind_ip: IpAddr,
bind_hostname: String,
outlet_node_multiaddr: MultiAddr,
local_interceptor_route: Route,
remote_interceptor_route: Route,
Expand All @@ -47,7 +46,7 @@ impl KafkaInletController {
outlet_node_multiaddr: MultiAddr,
local_interceptor_route: Route,
remote_interceptor_route: Route,
bind_ip: IpAddr,
bind_hostname: String,
port_range: PortRange,
policy_expression: Option<PolicyExpression>,
) -> KafkaInletController {
Expand All @@ -57,7 +56,7 @@ impl KafkaInletController {
broker_map: HashMap::new(),
current_port: port_range.start(),
port_range,
bind_ip,
bind_hostname,
local_interceptor_route,
remote_interceptor_route,
})),
Expand All @@ -66,9 +65,9 @@ impl KafkaInletController {
}

#[cfg(test)]
pub(crate) async fn retrieve_inlet(&self, broker_id: BrokerId) -> Option<SocketAddr> {
pub(crate) async fn retrieve_inlet(&self, broker_id: BrokerId) -> Option<HostnamePort> {
let inner = self.inner.lock().await;
inner.broker_map.get(&broker_id).copied()
inner.broker_map.get(&broker_id).cloned()
}

/// Asserts the presence of an inlet for a broker.
Expand All @@ -78,10 +77,10 @@ impl KafkaInletController {
&self,
context: &Context,
broker_id: BrokerId,
) -> Result<SocketAddr> {
) -> Result<HostnamePort> {
let mut inner = self.inner.lock().await;
if let Some(address) = inner.broker_map.get(&broker_id) {
Ok(*address)
Ok(address.clone())
} else {
if inner.current_port > inner.port_range.end() {
// we don't have any port left for the broker!
Expand All @@ -92,10 +91,11 @@ impl KafkaInletController {
));
}

let socket_address = SocketAddr::new(inner.bind_ip, inner.current_port);
let inlet_bind_address =
HostnamePort::new(inner.bind_hostname.clone(), inner.current_port);
Self::request_inlet_creation(
context,
socket_address,
inlet_bind_address.clone(),
inner.outlet_node_multiaddr.clone(),
inner.local_interceptor_route.clone(),
route![
Expand All @@ -107,22 +107,24 @@ impl KafkaInletController {
.await?;

inner.current_port += 1;
inner.broker_map.insert(broker_id, socket_address);
inner
.broker_map
.insert(broker_id, inlet_bind_address.clone());

Ok(socket_address)
Ok(inlet_bind_address)
}
}

async fn request_inlet_creation(
context: &Context,
socket_address: SocketAddr,
inlet_bind_address: HostnamePort,
to: MultiAddr,
prefix: Route,
suffix: Route,
policy_expression: Option<PolicyExpression>,
) -> Result<SocketAddr> {
let mut payload = CreateInlet::to_node(
socket_address.to_string(),
inlet_bind_address,
to,
format!("kafka-inlet-{}", random_string()),
prefix,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ mod test {
MultiAddr::default(),
route![],
route![],
[255, 255, 255, 255].into(),
"255.255.255.255".to_string(),
PortRange::new(0, 0).unwrap(),
None,
);
Expand Down Expand Up @@ -905,7 +905,7 @@ mod test {
MultiAddr::default(),
route![],
route![],
[127, 0, 0, 1].into(),
"127.0.0.1".to_string(),
PortRange::new(0, 0).unwrap(),
None,
);
Expand Down Expand Up @@ -1023,7 +1023,7 @@ mod test {
assert_eq!(0, broker.port);

let address = inlet_map.retrieve_inlet(1).await.expect("inlet not found");
assert_eq!("127.0.0.1".to_string(), address.ip().to_string());
assert_eq!("127.0.0.1".to_string(), address.hostname());
assert_eq!(0, address.port());
} else {
panic!("invalid message type")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::{Error, ErrorKind};
use std::net::SocketAddr;

use bytes::{Bytes, BytesMut};
use kafka_protocol::messages::fetch_response::FetchResponse;
Expand Down Expand Up @@ -149,7 +148,7 @@ impl InletInterceptorImpl {
trace!("metadata response before: {:?}", &response);

for (broker_id, info) in response.brokers.iter_mut() {
let inlet_address: SocketAddr = inlet_map
let inlet_address = inlet_map
.assert_inlet_for_broker(context, broker_id.0)
.await
.map_err(InterceptError::Ockam)?;
Expand All @@ -160,8 +159,7 @@ impl InletInterceptorImpl {
broker_id.0
);

let ip_address = inlet_address.ip().to_string();
info.host = StrBytes::from_string(ip_address);
info.host = StrBytes::from_string(inlet_address.hostname());
info.port = inlet_address.port() as i32;
}
trace!("metadata response after: {:?}", &response);
Expand All @@ -185,28 +183,26 @@ impl InletInterceptorImpl {
let mut response: FindCoordinatorResponse =
decode_body(buffer, request_info.request_api_version)?;

// similarly to metadata, we want to expressed the coordinator using
// similarly to metadata, we want to express the coordinator using
// local sidecar ip address
// the format changed to array since version 4
if request_info.request_api_version >= 4 {
for coordinator in response.coordinators.iter_mut() {
let inlet_address: SocketAddr = inlet_map
let inlet_address = inlet_map
.assert_inlet_for_broker(context, coordinator.node_id.0)
.await
.map_err(InterceptError::Ockam)?;

let ip_address = inlet_address.ip().to_string();
coordinator.host = StrBytes::from_string(ip_address);
coordinator.host = StrBytes::from_string(inlet_address.hostname());
coordinator.port = inlet_address.port() as i32;
}
} else {
let inlet_address: SocketAddr = inlet_map
let inlet_address = inlet_map
.assert_inlet_for_broker(context, response.node_id.0)
.await
.map_err(InterceptError::Ockam)?;

let ip_address = inlet_address.ip().to_string();
response.host = StrBytes::from_string(ip_address);
response.host = StrBytes::from_string(inlet_address.hostname());
response.port = inlet_address.port() as i32;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod test {
MultiAddr::default(),
route![],
route![],
[127, 0, 0, 1].into(),
"127.0.0.1".to_string(),
PortRange::new(0, 0).unwrap(),
None,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{route_to_multiaddr, try_address_to_multiaddr};
#[cbor(map)]
pub struct CreateInlet {
/// The address the portal should listen at.
#[n(1)] pub(crate) listen_addr: String,
#[n(1)] pub(crate) listen_addr: HostnamePort,
/// The peer address.
/// This can either be the address of an already
/// created outlet, or a forwarding mechanism via ockam cloud.
Expand Down Expand Up @@ -63,7 +63,7 @@ pub struct CreateInlet {
impl CreateInlet {
#[allow(clippy::too_many_arguments)]
pub fn via_project(
listen: String,
listen: HostnamePort,
to: MultiAddr,
alias: String,
prefix_route: Route,
Expand All @@ -90,7 +90,7 @@ impl CreateInlet {

#[allow(clippy::too_many_arguments)]
pub fn to_node(
listen: String,
listen: HostnamePort,
to: MultiAddr,
alias: String,
prefix_route: Route,
Expand Down Expand Up @@ -128,7 +128,7 @@ impl CreateInlet {
self.secure_channel_identifier = Some(identifier);
}

pub fn listen_addr(&self) -> String {
pub fn listen_addr(&self) -> HostnamePort {
self.listen_addr.clone()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::output::Output;
use crate::terminal::fmt;
use minicbor::{CborLen, Decode, Encode};
use ockam_abac::PolicyExpression;
use ockam_core::compat::net::SocketAddr;
use ockam_core::Address;
use ockam_multiaddr::MultiAddr;
use ockam_transport_core::HostnamePort;
use serde::Serialize;
use std::fmt::Display;
use std::fmt::Write;
Expand Down Expand Up @@ -57,14 +57,14 @@ impl DeleteServiceRequest {
#[rustfmt::skip]
#[cbor(map)]
pub struct StartKafkaOutletRequest {
#[n(1)] bootstrap_server_addr: String,
#[n(1)] bootstrap_server_addr: HostnamePort,
#[n(2)] tls: bool,
#[n(3)] policy_expression: Option<PolicyExpression>,
}

impl StartKafkaOutletRequest {
pub fn new(
bootstrap_server_addr: String,
bootstrap_server_addr: HostnamePort,
tls: bool,
policy_expression: Option<PolicyExpression>,
) -> Self {
Expand All @@ -75,7 +75,7 @@ impl StartKafkaOutletRequest {
}
}

pub fn bootstrap_server_addr(&self) -> String {
pub fn bootstrap_server_addr(&self) -> HostnamePort {
self.bootstrap_server_addr.clone()
}

Expand All @@ -92,7 +92,7 @@ impl StartKafkaOutletRequest {
#[rustfmt::skip]
#[cbor(map)]
pub struct StartKafkaInletRequest {
#[n(1)] bind_address: SocketAddr,
#[n(1)] bind_address: HostnamePort,
#[n(2)] brokers_port_range: (u16, u16),
#[n(3)] kafka_outlet_route: MultiAddr,
#[n(4)] encrypt_content: bool,
Expand All @@ -106,7 +106,7 @@ pub struct StartKafkaInletRequest {
impl StartKafkaInletRequest {
#[allow(clippy::too_many_arguments)]
pub fn new(
bind_address: SocketAddr,
bind_address: HostnamePort,
brokers_port_range: impl Into<(u16, u16)>,
kafka_outlet_route: MultiAddr,
encrypt_content: bool,
Expand All @@ -129,8 +129,8 @@ impl StartKafkaInletRequest {
}
}

pub fn bind_address(&self) -> SocketAddr {
self.bind_address
pub fn bind_address(&self) -> HostnamePort {
self.bind_address.clone()
}
pub fn brokers_port_range(&self) -> (u16, u16) {
self.brokers_port_range
Expand Down
Loading

0 comments on commit bf4b56d

Please sign in to comment.