From 02502d1f6ce4fd10fb469d84eb1d212f7f8d5c79 Mon Sep 17 00:00:00 2001 From: Oleksandr Deundiak Date: Wed, 11 Dec 2024 13:31:02 +0100 Subject: [PATCH] feat(rust): introduce env variables to adjust transport performance --- .../ockam/ockam_api/src/influxdb/portal.rs | 6 +- .../src/kafka/tests/integration_test.rs | 3 +- .../src/kafka/tests/interceptor_test.rs | 11 ++- .../src/nodes/service/kafka_services.rs | 6 +- .../src/environment/static/env_info.txt | 8 +- .../rust/ockam/ockam_core/src/env/env.rs | 36 +++++---- .../ockam/ockam_core/src/env/from_string.rs | 7 ++ .../rust/ockam/ockam_transport_tcp/src/lib.rs | 2 +- .../src/portal/inlet_listener.rs | 1 + .../src/portal/interceptor.rs | 20 ++++- .../ockam_transport_tcp/src/portal/options.rs | 24 +++++- .../src/portal/outlet_listener.rs | 1 + .../src/portal/portal_message.rs | 4 +- .../src/portal/portal_receiver.rs | 8 +- .../src/portal/portal_worker.rs | 8 ++ .../ockam_transport_tcp/tests/interceptor.rs | 5 +- .../rust/ockam/ockam_transport_udp/src/lib.rs | 2 + .../src/messages/transport_message.rs | 27 +++---- .../ockam/ockam_transport_udp/src/options.rs | 15 +++- .../ockam_transport_udp/src/size_options.rs | 74 +++++++++++++++++++ .../ockam_transport_udp/src/transport/bind.rs | 16 +++- .../src/workers/pending_messages/mod.rs | 30 +++++--- .../peer_pending_routing_message_storage.rs | 22 +++--- .../pending_routing_message_storage.rs | 25 +++++-- .../transport_messages_iterator.rs | 37 +++++++--- .../src/workers/receiver.rs | 22 ++++-- .../ockam_transport_udp/src/workers/sender.rs | 4 + 27 files changed, 318 insertions(+), 106 deletions(-) create mode 100644 implementations/rust/ockam/ockam_transport_udp/src/size_options.rs diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs b/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs index 7308ed4ebe2..6057c3bfc85 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs @@ -19,7 +19,9 @@ use ockam_core::route; use ockam_multiaddr::proto::Service; use ockam_multiaddr::MultiAddr; use ockam_transport_core::HostnamePort; -use ockam_transport_tcp::{PortalInletInterceptor, PortalOutletInterceptor}; +use ockam_transport_tcp::{ + read_portal_payload_length, PortalInletInterceptor, PortalOutletInterceptor, +}; use std::sync::Arc; use std::time::Duration; @@ -234,6 +236,7 @@ impl NodeManagerWorker { http_interceptor_factory, Arc::new(policy_access_control.create_outgoing(ctx).await?), Arc::new(policy_access_control.create_incoming()), + read_portal_payload_length(), ) .await?; @@ -282,6 +285,7 @@ impl NodeManagerWorker { http_interceptor_factory, Arc::new(policy_access_control.create_incoming()), Arc::new(policy_access_control.create_outgoing(ctx).await?), + read_portal_payload_length(), ) .await?; Ok(interceptor_address) diff --git a/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs index 016fc7f062d..8df8e390761 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs @@ -40,7 +40,7 @@ use ockam_multiaddr::proto::Service; use ockam_multiaddr::MultiAddr; use ockam_node::compat::tokio; use ockam_transport_core::HostnamePort; -use ockam_transport_tcp::PortalInletInterceptor; +use ockam_transport_tcp::{read_portal_payload_length, PortalInletInterceptor}; // TODO: upgrade to 13 by adding a metadata request to map uuid<=>topic_name const TEST_KAFKA_API_VERSION: i16 = 12; @@ -119,6 +119,7 @@ async fn create_kafka_service( )), Arc::new(AllowAll), Arc::new(AllowAll), + read_portal_payload_length(), ) .await?; diff --git a/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs index bea6e701729..cce3da3006d 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs @@ -22,7 +22,7 @@ use ockam_core::{route, Address, AllowAll, NeutralMessage, Routed, Worker}; use ockam_multiaddr::MultiAddr; use ockam_node::database::SqlxDatabase; use ockam_node::Context; -use ockam_transport_tcp::{PortalInterceptorWorker, PortalMessage, MAX_PAYLOAD_SIZE}; +use ockam_transport_tcp::{read_portal_payload_length, PortalInterceptorWorker, PortalMessage}; use crate::kafka::inlet_controller::KafkaInletController; use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl; @@ -169,7 +169,7 @@ async fn kafka_portal_worker__bigger_than_limit_kafka_message__error( ); let huge_payload = request_buffer.as_ref(); - for chunk in huge_payload.chunks(MAX_PAYLOAD_SIZE) { + for chunk in huge_payload.chunks(read_portal_payload_length()) { let _error = context .send( route![portal_inlet_address.clone(), context.address()], @@ -229,7 +229,10 @@ async fn kafka_portal_worker__almost_over_limit_than_limit_kafka_message__two_ka // let's duplicate the message huge_outgoing_request.extend(huge_outgoing_request.clone()); - for chunk in huge_outgoing_request.as_ref().chunks(MAX_PAYLOAD_SIZE) { + for chunk in huge_outgoing_request + .as_ref() + .chunks(read_portal_payload_length()) + { context .send( route![portal_inlet_address.clone(), "tcp_payload_receiver"], @@ -327,6 +330,7 @@ async fn setup_only_worker(context: &mut Context, handle: &NodeManagerHandle) -> )), TEST_MAX_KAFKA_MESSAGE_SIZE, )), + read_portal_payload_length(), ) .await .unwrap() @@ -425,6 +429,7 @@ async fn kafka_portal_worker__metadata_exchange__response_changed( )), MAX_KAFKA_MESSAGE_SIZE, )), + read_portal_payload_length(), ) .await?; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs index 05ed74d73d0..4663ca465e0 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs @@ -26,7 +26,9 @@ use ockam_core::flow_control::FlowControls; use ockam_core::route; use ockam_multiaddr::proto::Project; use ockam_multiaddr::MultiAddr; -use ockam_transport_tcp::{PortalInletInterceptor, PortalOutletInterceptor}; +use ockam_transport_tcp::{ + read_portal_payload_length, PortalInletInterceptor, PortalOutletInterceptor, +}; use std::sync::Arc; impl NodeManagerWorker { @@ -241,6 +243,7 @@ impl InMemoryNode { )), Arc::new(policy_access_control.create_incoming()), Arc::new(policy_access_control.create_outgoing(context).await?), + read_portal_payload_length(), ) .await?; @@ -297,6 +300,7 @@ impl InMemoryNode { )), Arc::new(policy_access_control.create_outgoing(context).await?), Arc::new(policy_access_control.create_incoming()), + read_portal_payload_length(), ) .await?; diff --git a/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt b/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt index aece9f207d9..6b37e4f74ac 100644 --- a/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt +++ b/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt @@ -46,11 +46,14 @@ Tracing - OCKAM_BACKGROUND_LOG_EXPORT_CUTOFF: Cutoff time for sending log records batches to an OpenTelemetry baclground node, without waiting for a response. Default value: `3s`. - OCKAM_BACKGROUND_SPAN_EXPORT_CUTOFF: Cutoff time for sending span batches to an OpenTelemetry background inlet, without waiting for a response. Default value: `3s`. -UDP Puncture +UDP - OCKAM_RENDEZVOUS_SERVER: set this variable to the hostname and port of the Rendezvous service +- OCKAM_UDP_PENDING_MESSAGES_PER_PEER: maximum number of messages per UDP peer that are cached to be assembled if their parts arrive out of order. Default value: 5 +- OCKAM_UDP_MAX_ON_THE_WIRE_PACKET_SIZE: maximum size of a UDP packet on the wire. Default value: 508 -TCP Portals +TCP - OCKAM_PRIVILEGED: if variable is set, all TCP Inlets/Outlets will use eBPF (overrides `--privileged` argument for `ockam tcp-inlet create` and `ockam tcp-outlet create`). +- OCKAM_TCP_PORTAL_PAYLOAD_LENGTH: size of the buffer into which TCP Portal reads the TCP stream. Default value: `128 * 1024` Devs Usage - OCKAM: a `string` that defines the path to the ockam binary to use. @@ -65,3 +68,4 @@ Devs Usage Internal (to enable some special behavior in the logic) - OCKAM_HELP_RENDER_MARKDOWN: a `boolean` to control the markdown rendering of the commands documentation. + diff --git a/implementations/rust/ockam/ockam_core/src/env/env.rs b/implementations/rust/ockam/ockam_core/src/env/env.rs index cdd8bb200f3..485ab79ab25 100644 --- a/implementations/rust/ockam/ockam_core/src/env/env.rs +++ b/implementations/rust/ockam/ockam_core/src/env/env.rs @@ -6,34 +6,38 @@ use std::env::VarError; /// Get environmental value `var_name`. If value is not found returns Ok(None) pub fn get_env(var_name: &str) -> Result> { - get_env_impl::>(var_name, None) -} - -/// Return true if `var_name` is set and has a valid value -pub fn is_set(var_name: &str) -> Result { - get_env_impl::>(var_name, None).map(|v| v.is_some()) -} - -/// Get environmental value `var_name`. If value is not found returns `default_value` -pub fn get_env_with_default(var_name: &str, default_value: T) -> Result { - get_env_impl::(var_name, default_value) -} - -fn get_env_impl(var_name: &str, default_value: T) -> Result { match env::var(var_name) { Ok(val) => { match T::from_string(&val) { - Ok(v) => Ok(v), + Ok(v) => Ok(Some(v)), Err(e) => Err(error(format!("The environment variable `{var_name}` cannot be decoded. The value `{val}` is invalid: {e:?}"))), } }, Err(e) => match e { - VarError::NotPresent => Ok(default_value), + VarError::NotPresent => Ok(None), VarError::NotUnicode(_) => Err(error(format!("The environment variable `{var_name}` cannot be decoded because it is not some valid Unicode"))), }, } } +/// Return true if `var_name` is set and has a valid value +pub fn is_set(var_name: &str) -> Result { + Ok(get_env::(var_name)?.is_some()) +} + +/// Get environmental value `var_name`. If value is not found returns `default_value` +pub fn get_env_with_default(var_name: &str, default_value: T) -> Result { + Ok(get_env::(var_name)?.unwrap_or(default_value)) +} + +/// Get environmental value `var_name`. If value is not found returns `default_value` +pub fn get_env_with_default_ignore_error(var_name: &str, default_value: T) -> T { + get_env::(var_name) + .ok() + .flatten() + .unwrap_or(default_value) +} + pub(crate) fn error(msg: String) -> Error { Error::new(Origin::Core, Kind::Invalid, msg) } diff --git a/implementations/rust/ockam/ockam_core/src/env/from_string.rs b/implementations/rust/ockam/ockam_core/src/env/from_string.rs index 2c7f2bc0a22..d7f1041361a 100644 --- a/implementations/rust/ockam/ockam_core/src/env/from_string.rs +++ b/implementations/rust/ockam/ockam_core/src/env/from_string.rs @@ -1,6 +1,7 @@ use crate::env::error; use crate::errcode::{Kind, Origin}; use crate::{Error, Result}; +use core::str::FromStr; use once_cell::sync::OnceCell; use regex::Regex; use std::path::PathBuf; @@ -46,6 +47,12 @@ impl FromString for char { } } +impl FromString for usize { + fn from_string(s: &str) -> Result { + usize::from_str(s).map_err(|err| error(format!("usize parsing error: {err}"))) + } +} + impl FromString for String { fn from_string(s: &str) -> Result { Ok(s.to_owned()) diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs b/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs index 43dffb09877..dc5893bc184 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs @@ -31,7 +31,7 @@ pub use options::{TcpConnectionOptions, TcpListenerOptions}; pub use portal::{ new_certificate_provider_cache, Direction, PortalInletInterceptor, PortalInterceptor, PortalInterceptorFactory, PortalInterceptorWorker, PortalInternalMessage, PortalMessage, - PortalOutletInterceptor, TlsCertificate, TlsCertificateProvider, MAX_PAYLOAD_SIZE, + PortalOutletInterceptor, TlsCertificate, TlsCertificateProvider, }; pub use protocol_version::*; pub use registry::*; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs index 07e2941b9e6..7d6aa6e87cb 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs @@ -226,6 +226,7 @@ impl Processor for TcpInletListenProcessor { addresses, self.options.incoming_access_control.clone(), self.options.outgoing_access_control.clone(), + self.options.portal_payload_length, ) .await?; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs index cb44928531b..22d55ffc49b 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs @@ -1,4 +1,4 @@ -use crate::{PortalMessage, MAX_PAYLOAD_SIZE}; +use crate::PortalMessage; use ockam_core::flow_control::{FlowControlId, FlowControlOutgoingAccessControl, FlowControls}; use ockam_core::{ async_trait, route, Address, AllowOnwardAddress, AllowSourceAddress, Any, @@ -46,6 +46,7 @@ pub struct PortalOutletInterceptor { outgoing_access_control: Arc, incoming_access_control: Arc, spawner_flow_control_id: Option, + portal_payload_length: usize, } impl PortalOutletInterceptor { @@ -66,12 +67,14 @@ impl PortalOutletInterceptor { interceptor_factory: Arc, outgoing_access_control: Arc, incoming_access_control: Arc, + portal_payload_length: usize, ) -> ockam_core::Result<()> { let worker = Self { spawner_flow_control_id, interceptor_factory, outgoing_access_control, incoming_access_control: incoming_access_control.clone(), + portal_payload_length, }; WorkerBuilder::new(worker) @@ -110,6 +113,7 @@ impl Worker for PortalOutletInterceptor { self.incoming_access_control.clone(), self.outgoing_access_control.clone(), self.interceptor_factory.create(), + self.portal_payload_length, ) .await?; @@ -145,6 +149,7 @@ pub struct PortalInletInterceptor { interceptor_factory: Arc, request_outgoing_access_control: Arc, response_incoming_access_control: Arc, + portal_payload_length: usize, } impl PortalInletInterceptor { @@ -164,11 +169,13 @@ impl PortalInletInterceptor { interceptor_factory: Arc, response_incoming_access_control: Arc, request_outgoing_access_control: Arc, + portal_payload_length: usize, ) -> ockam_core::Result<()> { let worker = Self { interceptor_factory, request_outgoing_access_control, response_incoming_access_control, + portal_payload_length, }; context.start_worker(listener_address, worker).await @@ -213,6 +220,7 @@ impl Worker for PortalInletInterceptor { self.request_outgoing_access_control.clone(), self.response_incoming_access_control.clone(), self.interceptor_factory.create(), + self.portal_payload_length, ) .await?; @@ -238,6 +246,7 @@ pub struct PortalInterceptorWorker { disconnect_received: Arc, interceptor: Arc, direction: Direction, + portal_payload_length: usize, } #[async_trait] @@ -368,6 +377,7 @@ impl PortalInterceptorWorker { outgoing_access_control: Arc, incoming_access_control: Arc, interceptor: Arc, + portal_payload_length: usize, ) -> ockam_core::Result
{ let from_inlet_worker_address = Address::random_tagged("InterceptorPortalWorker.from_inlet_to_outlet"); @@ -386,6 +396,7 @@ impl PortalInterceptorWorker { disconnect_received: disconnect_received.clone(), fixed_onward_route: Some(inlet_instance), interceptor: interceptor.clone(), + portal_payload_length, }; WorkerBuilder::new(from_outlet_worker) @@ -400,6 +411,7 @@ impl PortalInterceptorWorker { disconnect_received: disconnect_received.clone(), fixed_onward_route: None, interceptor: interceptor.clone(), + portal_payload_length, }; WorkerBuilder::new(from_inlet_worker) @@ -430,6 +442,7 @@ impl PortalInterceptorWorker { /// - `spawner_flow_control_id` to account for future created outlets, /// - `incoming_access_control` is the access control for the incoming messages. /// - `outgoing_access_control` is the access control for the outgoing messages. + #[allow(clippy::too_many_arguments)] async fn create_outlet_interceptor( context: &mut Context, outlet_route: Route, @@ -438,6 +451,7 @@ impl PortalInterceptorWorker { incoming_access_control: Arc, outgoing_access_control: Arc, interceptor: Arc, + portal_payload_length: usize, ) -> ockam_core::Result
{ let from_inlet_worker_address = Address::random_tagged("InterceptorPortalWorker.from_inlet_to_outlet"); @@ -451,6 +465,7 @@ impl PortalInterceptorWorker { disconnect_received: disconnect_received.clone(), fixed_onward_route: Some(outlet_route), interceptor: interceptor.clone(), + portal_payload_length, }; let from_outlet_worker = Self { other_worker_address: from_inlet_worker_address.clone(), @@ -458,6 +473,7 @@ impl PortalInterceptorWorker { disconnect_received: disconnect_received.clone(), fixed_onward_route: None, interceptor: interceptor.clone(), + portal_payload_length, }; let flow_controls = context.flow_controls(); @@ -569,7 +585,7 @@ impl PortalInterceptorWorker { onward_route = provided_onward_route.clone().modify().pop_front().into(); }; - for chunk in buffer.chunks(MAX_PAYLOAD_SIZE) { + for chunk in buffer.chunks(self.portal_payload_length) { let message = LocalMessage::new() .with_onward_route(onward_route.clone()) .with_return_route(return_route.clone()) diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs index 47aa7cc5397..1130ac1b53a 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs @@ -1,16 +1,23 @@ use crate::portal::addresses::Addresses; use crate::TlsCertificateProvider; use ockam_core::compat::sync::Arc; +use ockam_core::env::get_env_with_default_ignore_error; use ockam_core::flow_control::{FlowControlId, FlowControls}; use ockam_core::{Address, AllowAll, IncomingAccessControl, OutgoingAccessControl}; -/// Trust Options for an Inlet +/// Maximum allowed size for a payload for TCP Portal +pub fn read_portal_payload_length() -> usize { + get_env_with_default_ignore_error("OCKAM_TCP_PORTAL_PAYLOAD_LENGTH", 128 * 1024) +} + +/// Options for an Inlet #[derive(Clone, Debug)] pub struct TcpInletOptions { pub(crate) incoming_access_control: Arc, pub(crate) outgoing_access_control: Arc, pub(crate) is_paused: bool, pub(crate) tls_certificate_provider: Option>, + pub(crate) portal_payload_length: usize, } impl TcpInletOptions { @@ -21,6 +28,7 @@ impl TcpInletOptions { outgoing_access_control: Arc::new(AllowAll), is_paused: false, tls_certificate_provider: None, + portal_payload_length: read_portal_payload_length(), } } @@ -105,13 +113,14 @@ impl Default for TcpInletOptions { } } -/// Trust Options for an Outlet +/// Options for an Outlet #[derive(Clone, Debug)] pub struct TcpOutletOptions { pub(crate) consumer: Vec, pub(crate) incoming_access_control: Arc, pub(crate) outgoing_access_control: Arc, pub(crate) tls: bool, + pub(crate) portal_payload_length: usize, } impl TcpOutletOptions { @@ -122,6 +131,7 @@ impl TcpOutletOptions { incoming_access_control: Arc::new(AllowAll), outgoing_access_control: Arc::new(AllowAll), tls: false, + portal_payload_length: read_portal_payload_length(), } } @@ -208,3 +218,13 @@ impl Default for TcpOutletOptions { Self::new() } } + +#[allow(non_snake_case)] +#[test] +fn tcp_portal_options_portal_length__env_var_set__pulls_correct_value() { + let length: usize = rand::random(); + std::env::set_var("OCKAM_TCP_PORTAL_PAYLOAD_LENGTH", length.to_string()); + + assert_eq!(TcpInletOptions::default().portal_payload_length, length); + assert_eq!(TcpOutletOptions::default().portal_payload_length, length); +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs index 52d2666dabe..06f0428edb6 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs @@ -104,6 +104,7 @@ impl Worker for TcpOutletListenWorker { addresses.clone(), self.options.incoming_access_control.clone(), self.options.outgoing_access_control.clone(), + self.options.portal_payload_length, ) .await?; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs index 603d8844d4d..f6d869f664f 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs @@ -2,6 +2,7 @@ use ockam_core::bare::{read_slice, write_slice}; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{Encodable, Encoded, Message, NeutralMessage}; use serde::{Deserialize, Serialize}; +use std::convert::TryInto; /// A command message type for a Portal #[derive(Debug, PartialEq, Eq)] @@ -107,9 +108,6 @@ pub enum PortalInternalMessage { Disconnect, } -/// Maximum allowed size for a payload -pub const MAX_PAYLOAD_SIZE: usize = 128 * 1024; - #[cfg(test)] mod test { use crate::PortalMessage; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs index 6210a023038..cd542770a37 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs @@ -1,5 +1,4 @@ use crate::portal::addresses::Addresses; -use crate::portal::portal_message::MAX_PAYLOAD_SIZE; use crate::{PortalInternalMessage, PortalMessage, TcpRegistry}; use ockam_core::compat::vec::Vec; use ockam_core::{ @@ -25,6 +24,7 @@ pub(crate) struct TcpPortalRecvProcessor { addresses: Addresses, onward_route: Route, payload_packet_counter: u16, + portal_payload_length: usize, } impl TcpPortalRecvProcessor { @@ -34,14 +34,16 @@ impl TcpPortalRecvProcessor { read_half: R, addresses: Addresses, onward_route: Route, + portal_payload_length: usize, ) -> Self { Self { registry, - buf: Vec::with_capacity(MAX_PAYLOAD_SIZE), + buf: Vec::with_capacity(portal_payload_length), read_half, addresses, onward_route, payload_packet_counter: 0, + portal_payload_length, } } } @@ -113,7 +115,7 @@ impl Processor for TcpPortalRecvPr } // Loop just in case buf was extended (should not happen though) - for chunk in self.buf.chunks(MAX_PAYLOAD_SIZE) { + for chunk in self.buf.chunks(self.portal_payload_length) { let msg = LocalMessage::new() .with_tracing_context(tracing_context.clone()) .with_onward_route(self.onward_route.clone()) diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs index ef4c9f5049a..15519b8d47b 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs @@ -52,6 +52,7 @@ pub(crate) struct TcpPortalWorker { last_received_packet_counter: u16, outgoing_access_control: Arc, is_tls: bool, + portal_payload_length: usize, } pub(crate) enum ReadHalfMaybeTls { @@ -78,6 +79,7 @@ impl TcpPortalWorker { addresses: Addresses, incoming_access_control: Arc, outgoing_access_control: Arc, // To propagate to the receiver + portal_payload_length: usize, ) -> Result<()> { Self::start( ctx, @@ -90,6 +92,7 @@ impl TcpPortalWorker { addresses, incoming_access_control, outgoing_access_control, + portal_payload_length, ) .await } @@ -107,6 +110,7 @@ impl TcpPortalWorker { addresses: Addresses, incoming_access_control: Arc, outgoing_access_control: Arc, + portal_payload_length: usize, ) -> Result<()> { Self::start( ctx, @@ -119,6 +123,7 @@ impl TcpPortalWorker { addresses, incoming_access_control, outgoing_access_control, + portal_payload_length, ) .await } @@ -137,6 +142,7 @@ impl TcpPortalWorker { addresses: Addresses, incoming_access_control: Arc, outgoing_access_control: Arc, + portal_payload_length: usize, ) -> Result<()> { let portal_type = if streams.is_some() { PortalType::Inlet @@ -173,6 +179,7 @@ impl TcpPortalWorker { last_received_packet_counter: u16::MAX, is_tls, outgoing_access_control: outgoing_access_control.clone(), + portal_payload_length, }; let internal_mailbox = Mailbox::new( @@ -234,6 +241,7 @@ impl TcpPortalWorker { rx, self.addresses.clone(), onward_route, + self.portal_payload_length, ); let remote = Mailbox::new( diff --git a/implementations/rust/ockam/ockam_transport_tcp/tests/interceptor.rs b/implementations/rust/ockam/ockam_transport_tcp/tests/interceptor.rs index 83abdddf974..d0acc75a88d 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/tests/interceptor.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/tests/interceptor.rs @@ -1,8 +1,8 @@ use ockam_core::{async_trait, route, AllowAll}; use ockam_node::Context; use ockam_transport_tcp::{ - Direction, PortalInletInterceptor, PortalInterceptor, PortalInterceptorFactory, - TcpInletOptions, TcpOutletOptions, TcpTransport, + read_portal_payload_length, Direction, PortalInletInterceptor, PortalInterceptor, + PortalInterceptorFactory, TcpInletOptions, TcpOutletOptions, TcpTransport, }; use rand::random; use std::sync::{Arc, Mutex}; @@ -70,6 +70,7 @@ async fn setup( }), Arc::new(AllowAll), Arc::new(AllowAll), + read_portal_payload_length(), ) .await .unwrap(); diff --git a/implementations/rust/ockam/ockam_transport_udp/src/lib.rs b/implementations/rust/ockam/ockam_transport_udp/src/lib.rs index 33f0913c996..982bffcd243 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/lib.rs @@ -21,12 +21,14 @@ mod error; mod messages; mod options; mod puncture; +mod size_options; mod transport; mod workers; pub use error::*; pub use options::UdpBindOptions; pub use puncture::*; +pub use size_options::*; pub use transport::{UdpBind, UdpBindArguments, UdpTransport, UdpTransportExtension}; pub(crate) const CLUSTER_NAME: &str = "_internals.transport.udp"; diff --git a/implementations/rust/ockam/ockam_transport_udp/src/messages/transport_message.rs b/implementations/rust/ockam/ockam_transport_udp/src/messages/transport_message.rs index 5c7b4c2cd93..ae27569f50d 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/messages/transport_message.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/messages/transport_message.rs @@ -5,16 +5,6 @@ use ockam_core::CowBytes; /// Current protocol version. pub const CURRENT_VERSION: Version = Version(1); -/// According to IETF RFC 1122 [https://datatracker.ietf.org/doc/html/rfc1122] IP packets of size -/// up to 576 bytes should be supported, which means we can have at least 508 bytes for our -/// payload while using UDP. This should give us high probability of packets not being dropped -/// somewhere on the way. -pub const MAX_ON_THE_WIRE_SIZE: usize = 508; - -/// Maximum payload size which will allow message not to exceed [`MAX_ON_THE_WIRE_SIZE`] -/// after encoding. -pub const MAX_PAYLOAD_SIZE: usize = 493; - /// Protocol version. #[derive(Debug, Copy, Clone, Eq, PartialEq, Encode, Decode, CborLen)] #[cbor(transparent)] @@ -55,37 +45,40 @@ impl<'a> UdpTransportMessage<'a> { #[cfg(test)] mod tests { - use crate::messages::{ - RoutingNumber, UdpTransportMessage, Version, MAX_ON_THE_WIRE_SIZE, MAX_PAYLOAD_SIZE, - }; + use crate::messages::{RoutingNumber, UdpTransportMessage, Version}; + use crate::UdpSizeOptions; #[test] fn test_max_size_current_protocol() { + let size_options = UdpSizeOptions::default(); + let msg = UdpTransportMessage::new( Version(u8::MAX), RoutingNumber(u16::MAX), u16::MAX, u16::MAX, - vec![0u8; MAX_PAYLOAD_SIZE], + vec![0u8; size_options.max_payload_size_per_packet], ); let len = ockam_core::cbor_encode_preallocate(msg).unwrap().len(); - assert!(len <= MAX_ON_THE_WIRE_SIZE); + assert!(len <= size_options.max_on_the_wire_packet_size); } #[test] fn test_max_size_max_protocol() { + let size_options = UdpSizeOptions::default(); + let msg = UdpTransportMessage::new( Version(u8::MAX), RoutingNumber(u16::MAX), u16::MAX, u16::MAX, - vec![0u8; MAX_PAYLOAD_SIZE], + vec![0u8; size_options.max_payload_size_per_packet], ); let len = ockam_core::cbor_encode_preallocate(msg).unwrap().len(); - assert_eq!(len, MAX_ON_THE_WIRE_SIZE); + assert_eq!(len, size_options.max_on_the_wire_packet_size); } } diff --git a/implementations/rust/ockam/ockam_transport_udp/src/options.rs b/implementations/rust/ockam/ockam_transport_udp/src/options.rs index 50d641246fc..ab61938c684 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/options.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/options.rs @@ -1,13 +1,15 @@ use crate::workers::Addresses; +use crate::UdpSizeOptions; use ockam_core::compat::sync::Arc; use ockam_core::flow_control::{FlowControlId, FlowControlOutgoingAccessControl, FlowControls}; use ockam_core::OutgoingAccessControl; -/// Trust Options for a UDP connection +/// Options for a UDP connection #[derive(Debug)] pub struct UdpBindOptions { pub(super) consumer: Vec, pub(crate) flow_control_id: FlowControlId, + pub(crate) size_options: UdpSizeOptions, } impl UdpBindOptions { @@ -17,6 +19,7 @@ impl UdpBindOptions { Self { consumer: vec![], flow_control_id: FlowControls::generate_flow_control_id(), + size_options: UdpSizeOptions::read_from_env(), } } @@ -33,6 +36,12 @@ impl UdpBindOptions { } } +impl Default for UdpBindOptions { + fn default() -> Self { + Self::new() + } +} + impl UdpBindOptions { pub(crate) fn setup_flow_control(&self, flow_controls: &FlowControls, addresses: &Addresses) { flow_controls.add_producer( @@ -48,12 +57,12 @@ impl UdpBindOptions { } pub(crate) fn create_receiver_outgoing_access_control( - self, + &self, flow_controls: &FlowControls, ) -> Arc { Arc::new(FlowControlOutgoingAccessControl::new( flow_controls, - self.flow_control_id, + self.flow_control_id.clone(), None, )) } diff --git a/implementations/rust/ockam/ockam_transport_udp/src/size_options.rs b/implementations/rust/ockam/ockam_transport_udp/src/size_options.rs new file mode 100644 index 00000000000..3979229c54e --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_udp/src/size_options.rs @@ -0,0 +1,74 @@ +use ockam_core::env::get_env; + +/// Various vector sizes that affect performance and robustness of UDP transport +#[derive(Clone, Copy, Debug)] +pub struct UdpSizeOptions { + /// Number of pending messages per peer + pub pending_messages_per_peer: u16, + /// Max size of the UDP packet on the wire + pub max_on_the_wire_packet_size: usize, + /// Max payload size for a UDP packet + /// A packet with payload of this size should increase to max_on_the_wire_size after encoding + pub max_payload_size_per_packet: usize, +} + +impl Default for UdpSizeOptions { + fn default() -> Self { + // According to IETF RFC 1122 [https://datatracker.ietf.org/doc/html/rfc1122] IP packets + // of size up to 576 bytes should be supported. This should give us high probability of packets + // not being dropped somewhere on the way. + let default_on_the_wire_size = 508usize; + + Self { + pending_messages_per_peer: 5, + max_on_the_wire_packet_size: default_on_the_wire_size, + max_payload_size_per_packet: Self::calculate_max_payload_size_per_packet( + default_on_the_wire_size, + ), + } + } +} + +impl UdpSizeOptions { + fn calculate_max_payload_size_per_packet(max_on_the_wire_size: usize) -> usize { + // Encoding overhead for [`UdpTransportMessage`] + let encoding_overhead = 15usize; + max_on_the_wire_size - encoding_overhead + } + + /// Read values from environment with fallback to default values + pub fn read_from_env() -> Self { + let mut s = Self::default(); + + if let Some(p) = get_env("OCKAM_UDP_PENDING_MESSAGES_PER_PEER") + .ok() + .flatten() + { + s.pending_messages_per_peer = p; + } + + if let Some(p) = get_env("OCKAM_UDP_MAX_ON_THE_WIRE_PACKET_SIZE") + .ok() + .flatten() + { + s.max_on_the_wire_packet_size = p; + s.max_payload_size_per_packet = Self::calculate_max_payload_size_per_packet(p); + } + + s + } +} + +#[allow(non_snake_case)] +#[test] +fn udp_size_options_read_from_env__env_var_set__pulls_correct_value() { + let m: u16 = rand::random(); + std::env::set_var("OCKAM_UDP_PENDING_MESSAGES_PER_PEER", m.to_string()); + std::env::set_var("OCKAM_UDP_MAX_ON_THE_WIRE_PACKET_SIZE", 800.to_string()); + + let size_options = UdpSizeOptions::read_from_env(); + + assert_eq!(size_options.pending_messages_per_peer, m); + assert_eq!(size_options.max_on_the_wire_packet_size, 800); + assert_eq!(size_options.max_payload_size_per_packet, 785); +} diff --git a/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs b/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs index 74cb2bd14f7..f19f7d8e2b3 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs @@ -114,7 +114,12 @@ impl UdpTransport { let receiver_outgoing_access_control = options.create_receiver_outgoing_access_control(self.ctx.flow_controls()); - let sender = UdpSenderWorker::new(addresses.clone(), socket_write, arguments.peer_address); + let sender = UdpSenderWorker::new( + addresses.clone(), + socket_write, + arguments.peer_address, + options.size_options.max_payload_size_per_packet, + ); WorkerBuilder::new(sender) .with_address(addresses.sender_address().clone()) .with_incoming_access_control(AllowAll) @@ -122,8 +127,13 @@ impl UdpTransport { .start(&self.ctx) .await?; - let receiver = - UdpReceiverProcessor::new(addresses.clone(), socket_read, arguments.peer_address); + let receiver = UdpReceiverProcessor::new( + addresses.clone(), + socket_read, + arguments.peer_address, + options.size_options.pending_messages_per_peer, + options.size_options.max_on_the_wire_packet_size, + ); ProcessorBuilder::new(receiver) .with_address(addresses.receiver_address().clone()) .with_incoming_access_control(DenyAll) diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/mod.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/mod.rs index 23a6b944340..94964357fdf 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/mod.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/mod.rs @@ -11,11 +11,9 @@ pub(crate) use transport_messages_iterator::*; #[allow(non_snake_case)] #[cfg(test)] mod tests { - use crate::messages::{ - RoutingNumber, UdpRoutingMessage, UdpTransportMessage, MAX_PAYLOAD_SIZE, - }; + use crate::messages::{RoutingNumber, UdpRoutingMessage, UdpTransportMessage}; use crate::workers::pending_messages::{PendingMessage, TransportMessagesIterator}; - use crate::MAX_MESSAGE_SIZE; + use crate::{UdpSizeOptions, MAX_MESSAGE_SIZE}; use ockam_core::compat::rand::RngCore; use ockam_core::{route, Result}; use rand::prelude::SliceRandom; @@ -32,7 +30,11 @@ mod tests { let routing_number = RoutingNumber::default(); - let mut iterator = TransportMessagesIterator::new(routing_number, &message)?; + let mut iterator = TransportMessagesIterator::new( + routing_number, + &message, + UdpSizeOptions::default().max_payload_size_per_packet, + )?; let next = iterator.next().transpose()?.unwrap(); let packet: UdpTransportMessage = minicbor::decode(&next)?; @@ -49,7 +51,8 @@ mod tests { #[test] fn medium_message__reassemble__should_succeed() -> Result<()> { - let mut payload = vec![0; MAX_PAYLOAD_SIZE]; + let max_payload_size_per_packet = UdpSizeOptions::default().max_payload_size_per_packet; + let mut payload = vec![0; max_payload_size_per_packet]; thread_rng().fill_bytes(&mut payload); let message = @@ -59,7 +62,8 @@ mod tests { let mut pending_message = PendingMessage::new(vec![]); - let mut iterator = TransportMessagesIterator::new(routing_number, &message)?; + let mut iterator = + TransportMessagesIterator::new(routing_number, &message, max_payload_size_per_packet)?; let next = iterator.next().transpose()?.unwrap(); let packet: UdpTransportMessage = minicbor::decode(&next)?; @@ -90,7 +94,11 @@ mod tests { let mut pending_message = PendingMessage::new(vec![]); - let mut iterator = TransportMessagesIterator::new(routing_number, &message)?; + let mut iterator = TransportMessagesIterator::new( + routing_number, + &message, + UdpSizeOptions::default().max_payload_size_per_packet, + )?; while let Some(next) = iterator.next().transpose()? { let packet: UdpTransportMessage = minicbor::decode(&next)?; @@ -130,7 +138,11 @@ mod tests { let mut pending_message = PendingMessage::new(vec![]); - let mut iterator = TransportMessagesIterator::new(routing_number, &message)?; + let mut iterator = TransportMessagesIterator::new( + routing_number, + &message, + UdpSizeOptions::default().max_payload_size_per_packet, + )?; let mut packets = vec![]; diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/peer_pending_routing_message_storage.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/peer_pending_routing_message_storage.rs index 3c493558498..b627c6d6c4d 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/peer_pending_routing_message_storage.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/peer_pending_routing_message_storage.rs @@ -5,9 +5,6 @@ use ockam_core::compat::collections::VecDeque; use ockam_core::Result; use tracing::{error, trace}; -const MAX_PENDING_MESSAGES_U16: u16 = 5; -const MAX_PENDING_MESSAGES_USIZE: usize = MAX_PENDING_MESSAGES_U16 as usize; - /// Pending routing messages for a certain peer /// This storage will cache packets (until they fit into the cache) and assemble them into /// full Ockam Routing message when all parts are receiver. Then the Ockam routing message is @@ -17,17 +14,20 @@ pub(crate) struct PeerPendingRoutingMessageStorage { buffer_queue: VecDeque>, // Oldest routing number we can accept oldest_routing_number: RoutingNumber, + // Max number of messages we cache + max_pending_messages: u16, // Messages with following routing numbers: - // [self.oldest_routing_number, ..., self.oldest_routing_number + MAX_PENDING_MESSAGES_USIZE - 1] - pending_messages: [PendingMessageState; MAX_PENDING_MESSAGES_USIZE], + // [self.oldest_routing_number, ..., self.oldest_routing_number + max_pending_messages - 1] + pending_messages: Vec, } impl PeerPendingRoutingMessageStorage { // Create given the first received message - pub(crate) fn new(routing_number: RoutingNumber) -> Self { + pub(crate) fn new(routing_number: RoutingNumber, max_pending_messages: u16) -> Self { Self { buffer_queue: Default::default(), oldest_routing_number: routing_number, + max_pending_messages, pending_messages: Default::default(), } } @@ -58,15 +58,15 @@ impl PeerPendingRoutingMessageStorage { let diff = transport_message.routing_number - self.oldest_routing_number; // Move self.pending_messages if needed and update the diff - let diff = if diff >= MAX_PENDING_MESSAGES_U16 { + let diff = if diff >= self.max_pending_messages { // We received a much newer message, we need to drop one or few older messages so that // this message fits into our self.pending_messages // Length of the shift we need to perform on our self.pending_messages array - let shift = diff - MAX_PENDING_MESSAGES_U16 + 1; + let shift = diff - self.max_pending_messages + 1; // Drop the messages that don't fit anymore - let number_of_messages_to_drop = min(shift, MAX_PENDING_MESSAGES_U16) as usize; + let number_of_messages_to_drop = min(shift, self.max_pending_messages) as usize; for i in 0..number_of_messages_to_drop { match self.pending_messages[i].take() { @@ -93,8 +93,8 @@ impl PeerPendingRoutingMessageStorage { } // If we didn't drop all the messages, move the rest to the left - if shift < MAX_PENDING_MESSAGES_U16 { - let number_of_messages_to_shift = (MAX_PENDING_MESSAGES_U16 - shift) as usize; + if shift < self.max_pending_messages { + let number_of_messages_to_shift = (self.max_pending_messages - shift) as usize; for i in 0..number_of_messages_to_shift { self.pending_messages[i] = self.pending_messages[i + shift as usize].take(); } diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/pending_routing_message_storage.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/pending_routing_message_storage.rs index eef352be3e0..ec67970811b 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/pending_routing_message_storage.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/pending_routing_message_storage.rs @@ -6,12 +6,19 @@ use std::net::SocketAddr; /// Pending routing messages that we haven't yet assembled for all peers /// TODO: Clearing everything for a socket after long inactivity would be nice -#[derive(Default)] -pub(crate) struct PendingRoutingMessageStorage( - HashMap, -); +pub(crate) struct PendingRoutingMessageStorage { + storage: HashMap, + max_pending_messages_per_peer: u16, +} impl PendingRoutingMessageStorage { + pub(crate) fn new(max_pending_messages_per_peer: u16) -> Self { + Self { + storage: Default::default(), + max_pending_messages_per_peer, + } + } + pub(crate) fn add_transport_message_and_try_assemble( &mut self, peer: SocketAddr, @@ -19,10 +26,12 @@ impl PendingRoutingMessageStorage { ) -> Result>> { let routing_number = transport_message.routing_number; - let peer_pending_messages = self - .0 - .entry(peer) - .or_insert_with(|| PeerPendingRoutingMessageStorage::new(routing_number)); + let peer_pending_messages = self.storage.entry(peer).or_insert_with(|| { + PeerPendingRoutingMessageStorage::new( + routing_number, + self.max_pending_messages_per_peer, + ) + }); peer_pending_messages.add_transport_message_and_try_assemble(transport_message) } diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/transport_messages_iterator.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/transport_messages_iterator.rs index db5ceed7543..d290d46e4d7 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/transport_messages_iterator.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/pending_messages/transport_messages_iterator.rs @@ -1,6 +1,4 @@ -use crate::messages::{ - RoutingNumber, UdpRoutingMessage, UdpTransportMessage, CURRENT_VERSION, MAX_PAYLOAD_SIZE, -}; +use crate::messages::{RoutingNumber, UdpRoutingMessage, UdpTransportMessage, CURRENT_VERSION}; use crate::MAX_MESSAGE_SIZE; use ockam_core::Result; use ockam_transport_core::TransportError; @@ -11,12 +9,14 @@ pub(crate) struct TransportMessagesIterator { offset: u16, total: u16, data: Vec, + max_payload_size_per_packet: usize, } impl TransportMessagesIterator { pub(crate) fn new( current_routing_number: RoutingNumber, routing_message: &UdpRoutingMessage, + max_payload_size_per_packet: usize, ) -> Result { let routing_message = ockam_core::cbor_encode_preallocate(routing_message)?; @@ -24,7 +24,7 @@ impl TransportMessagesIterator { return Err(TransportError::MessageLengthExceeded)?; } - let total = routing_message.len() / MAX_PAYLOAD_SIZE + 1; + let total = routing_message.len() / max_payload_size_per_packet + 1; let total: u16 = total .try_into() @@ -35,6 +35,7 @@ impl TransportMessagesIterator { offset: 0, total, data: routing_message, + max_payload_size_per_packet, }) } } @@ -47,11 +48,11 @@ impl Iterator for TransportMessagesIterator { return None; } - let data_offset_begin = (self.offset as usize) * MAX_PAYLOAD_SIZE; + let data_offset_begin = (self.offset as usize) * self.max_payload_size_per_packet; let data_offset_end = if self.offset + 1 == self.total { self.data.len() } else { - data_offset_begin + MAX_PAYLOAD_SIZE + data_offset_begin + self.max_payload_size_per_packet }; let part = UdpTransportMessage::new( @@ -80,9 +81,9 @@ impl Iterator for TransportMessagesIterator { #[cfg(test)] mod tests { - use crate::messages::{RoutingNumber, UdpRoutingMessage, MAX_PAYLOAD_SIZE}; + use crate::messages::{RoutingNumber, UdpRoutingMessage}; use crate::workers::pending_messages::TransportMessagesIterator; - use crate::MAX_MESSAGE_SIZE; + use crate::{UdpSizeOptions, MAX_MESSAGE_SIZE}; use ockam_core::{route, Result}; #[allow(non_snake_case)] @@ -97,7 +98,11 @@ mod tests { let routing_number = RoutingNumber::default(); - let iterator = TransportMessagesIterator::new(routing_number, &message)?; + let iterator = TransportMessagesIterator::new( + routing_number, + &message, + UdpSizeOptions::default().max_payload_size_per_packet, + )?; assert_eq!(iterator.current_routing_number, routing_number); assert_eq!(iterator.total, 1); @@ -118,7 +123,12 @@ mod tests { let routing_number = RoutingNumber::default(); - assert!(TransportMessagesIterator::new(routing_number, &message).is_err()); + assert!(TransportMessagesIterator::new( + routing_number, + &message, + UdpSizeOptions::default().max_payload_size_per_packet, + ) + .is_err()); Ok(()) } @@ -126,16 +136,19 @@ mod tests { #[allow(non_snake_case)] #[test] fn large_message__create_iterator__should_split_correctly() -> Result<()> { + let max_payload_size_per_packet = UdpSizeOptions::default().max_payload_size_per_packet; + let message = UdpRoutingMessage::new( route!["onward"], route!["return"], - vec![0; MAX_PAYLOAD_SIZE * 2].into(), + vec![0; max_payload_size_per_packet * 2].into(), None, ); let routing_number = RoutingNumber::default(); - let mut iterator = TransportMessagesIterator::new(routing_number, &message)?; + let mut iterator = + TransportMessagesIterator::new(routing_number, &message, max_payload_size_per_packet)?; assert_eq!(iterator.current_routing_number, routing_number); assert_eq!(iterator.total, 3); diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs index 0353a736ba0..be2d9181584 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs @@ -1,5 +1,5 @@ use super::{Addresses, UdpSocketRead}; -use crate::messages::{UdpTransportMessage, MAX_ON_THE_WIRE_SIZE}; +use crate::messages::UdpTransportMessage; use crate::workers::pending_messages::PendingRoutingMessageStorage; use crate::UDP; use ockam_core::errcode::{Kind, Origin}; @@ -20,6 +20,7 @@ pub(crate) struct UdpReceiverProcessor { addresses: Addresses, /// The read half of the underlying UDP socket. socket_read: UdpSocketRead, + buffer: Vec, /// Will be Some if we communicate with one specific peer. peer: Option, /// Pending routing messages that we haven't yet assembled fully @@ -27,12 +28,21 @@ pub(crate) struct UdpReceiverProcessor { } impl UdpReceiverProcessor { - pub fn new(addresses: Addresses, socket_read: UdpSocketRead, peer: Option) -> Self { + pub fn new( + addresses: Addresses, + socket_read: UdpSocketRead, + peer: Option, + max_pending_messages_per_peer: u16, + max_on_the_wire_packet_size: usize, + ) -> Self { Self { addresses, socket_read, + buffer: vec![0; max_on_the_wire_packet_size], peer, - pending_routing_messages: Default::default(), + pending_routing_messages: PendingRoutingMessageStorage::new( + max_pending_messages_per_peer, + ), } } } @@ -48,10 +58,10 @@ impl Processor for UdpReceiverProcessor { async fn process(&mut self, ctx: &mut Self::Context) -> Result { trace!("Waiting for incoming UDP datagram..."); - let mut buf = [0u8; MAX_ON_THE_WIRE_SIZE]; + self.buffer.clear(); let (len, addr) = self .socket_read - .recv_from(&mut buf) + .recv_from(&mut self.buffer) .await .map_err(|e| Error::new(Origin::Transport, Kind::Io, e))?; @@ -66,7 +76,7 @@ impl Processor for UdpReceiverProcessor { } } - let transport_message: UdpTransportMessage = minicbor::decode(&buf[..len])?; + let transport_message: UdpTransportMessage = minicbor::decode(&self.buffer[..len])?; // Let's save newly received message and see if we can assemble a Routing Message let routing_message = match self diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs index 3c2bb8dfe60..f3469111c2c 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs @@ -22,6 +22,7 @@ pub(crate) struct UdpSenderWorker { peer: Option, /// Current number of the packet current_routing_number: RoutingNumber, + max_payload_size_per_packet: usize, } impl UdpSenderWorker { @@ -30,12 +31,14 @@ impl UdpSenderWorker { addresses: Addresses, socket_write: UdpSocketWrite, peer: Option, + max_payload_size_per_packet: usize, ) -> Self { Self { addresses, socket_write, peer, current_routing_number: RoutingNumber::default(), + max_payload_size_per_packet, } } } @@ -91,6 +94,7 @@ impl Worker for UdpSenderWorker { let messages = TransportMessagesIterator::new( self.current_routing_number, &UdpRoutingMessage::from(msg), + self.max_payload_size_per_packet, )?; self.current_routing_number.increment();