diff --git a/statime/src/port/error.rs b/statime/src/port/error.rs index b06fda31d..bc4363fff 100644 --- a/statime/src/port/error.rs +++ b/statime/src/port/error.rs @@ -7,8 +7,6 @@ pub type Result = core::result::Result; pub enum PortError { #[cfg_attr(feature = "std", error("failed to retrieve local clock"))] ClockBusy, - #[cfg_attr(feature = "std", error("failed to retrieve filter"))] - FilterBusy, #[cfg_attr(feature = "std", error("something went wrong on the network"))] Network, #[cfg_attr(feature = "std", error("wire format error: {0}"))] diff --git a/statime/src/port/mod.rs b/statime/src/port/mod.rs index fcae56654..59301f256 100644 --- a/statime/src/port/mod.rs +++ b/statime/src/port/mod.rs @@ -43,6 +43,7 @@ pub struct Port { // Corresponds with PortDS port_state and enabled port_state: PortState, bmca: Bmca, + packet_buffer: [u8; MAX_DATA_LEN], lifecycle: L, } @@ -96,6 +97,8 @@ pub enum PortAction<'a> { }, } +/// Guarantees to end user: Any set of actions will only ever contain a single +/// time critical send pub struct PortActionIterator<'a> { internal: > as IntoIterator>::IntoIter, } @@ -123,7 +126,17 @@ impl<'a, C: Clock, F: Filter> Port> { context: TimestampContext, timestamp: Time, ) -> PortActionIterator<'_> { - PortActionIterator::from(self.port_state.handle_timestamp(context, timestamp)) + let actions = + PortActionIterator::from(self.port_state.handle_timestamp(context, timestamp)); + + handle_time_measurement( + &mut self.port_state, + &self.lifecycle.state.filter, + &self.lifecycle.state.local_clock, + &self.lifecycle.state.time_properties_ds, + ); + + actions } // Handle the announce timer going of @@ -148,13 +161,11 @@ impl<'a, C: Clock, F: Filter> Port> { } // Handle a message over the timecritical channel - pub fn handle_timecritical_receive<'b>( + pub fn handle_timecritical_receive( &mut self, data: &[u8], timestamp: Time, - // Temporary parameters until refactoring of central state management - buffer: &'b mut [u8], - ) -> PortActionIterator<'b> { + ) -> PortActionIterator { let message = match Message::deserialize(data) { Ok(message) => message, Err(error) => { @@ -176,18 +187,21 @@ impl<'a, C: Clock, F: Filter> Port> { self.config.min_delay_req_interval(), self.config.port_identity, &self.lifecycle.state.default_ds, - buffer, + &mut self.packet_buffer, )); - if let Err(error) = self.temp_handle_time_measurement() { - log::error!("Could not handle time measurement: {:?}", error); - } + handle_time_measurement( + &mut self.port_state, + &self.lifecycle.state.filter, + &self.lifecycle.state.local_clock, + &self.lifecycle.state.time_properties_ds, + ); actions } // Handle a general ptp message - pub fn handle_general_receive<'b>(&mut self, data: &[u8]) -> PortActionIterator<'b> { + pub fn handle_general_receive(&mut self, data: &[u8]) -> PortActionIterator { let message = match Message::deserialize(data) { Ok(message) => message, Err(error) => { @@ -223,9 +237,12 @@ impl<'a, C: Clock, F: Filter> Port> { } }; - if let Err(error) = self.temp_handle_time_measurement() { - log::error!("Could not handle time measurement: {:?}", error); - } + handle_time_measurement( + &mut self.port_state, + &self.lifecycle.state.filter, + &self.lifecycle.state.local_clock, + &self.lifecycle.state.time_properties_ds, + ); PortActionIterator::from(action) } @@ -237,6 +254,7 @@ impl<'a, C: Clock, F: Filter> Port> { port_state: self.port_state, config: self.config, bmca: self.bmca, + packet_buffer: [0; MAX_DATA_LEN], lifecycle: InBmca { pending_action: None, local_best: None, @@ -259,6 +277,7 @@ impl<'a, C, F> Port> { port_state: self.port_state, config: self.config, bmca: self.bmca, + packet_buffer: [0; MAX_DATA_LEN], lifecycle: Running { state_refcell: self.lifecycle.state_refcell, state: self.lifecycle.state_refcell.borrow(), @@ -466,6 +485,7 @@ impl

Port> { config, port_state: PortState::Listening, bmca, + packet_buffer: [0; MAX_DATA_LEN], lifecycle: Startup { network_port }, } } @@ -479,6 +499,7 @@ impl

Port> { config: self.config, port_state: self.port_state, bmca: self.bmca, + packet_buffer: [0; MAX_DATA_LEN], lifecycle: Running { state_refcell, state: state_refcell.borrow(), @@ -490,35 +511,6 @@ impl

Port> { } impl<'a, C: Clock, F: Filter> Port> { - fn temp_handle_time_measurement(&mut self) -> Result<()> { - // If the received message allowed the (slave) state to calculate its offset - // from the master, update the local clock - if let Some(measurement) = self.port_state.extract_measurement() { - let (offset, freq_corr) = self - .lifecycle - .state - .filter - .try_borrow_mut() - .map(|mut borrow| borrow.absorb(measurement)) - .map_err(|_| PortError::FilterBusy)?; - - let mut local_clock = self - .lifecycle - .state - .local_clock - .try_borrow_mut() - .map_err(|_| PortError::ClockBusy)?; - - if let Err(error) = - local_clock.adjust(offset, freq_corr, &self.lifecycle.state.time_properties_ds) - { - log::error!("failed to adjust clock: {:?}", error); - } - } - - Ok(()) - } - #[allow(clippy::too_many_arguments)] pub(crate) async fn run_port( &mut self, @@ -528,101 +520,85 @@ impl<'a, C: Clock, F: Filter> Port> { announce_timeout: &mut Pin<&mut Ticker FT>>, mut stop: Signal<'_>, ) { + let mut pending_timestamp = None; loop { log::trace!("Loop iter port {}", self.config.port_identity.port_number); - let timeouts = select::select3( - announce_receipt_timeout.next(), - sync_timeout.next(), - announce_timeout.next(), - ); - let packet = network_port.recv(); - match select::select3(timeouts, packet, stop.wait_for()).await { - Either3::First(timeout) => match timeout { - Either3::First(_) => { - log::trace!( - "Port {} force master timeout", - self.config.port_identity.port_number - ); - // No announces received for a long time, become master - match self.port_state { - PortState::Master(_) => (), - _ => self.set_forced_port_state(PortState::Master(MasterState::new())), + let actions = if let Some((context, timestamp)) = pending_timestamp.take() { + self.handle_send_timestamp(context, timestamp) + } else { + let timeouts = select::select3( + announce_receipt_timeout.next(), + sync_timeout.next(), + announce_timeout.next(), + ); + let packet = network_port.recv(); + match select::select3(timeouts, packet, stop.wait_for()).await { + Either3::First(timeout) => match timeout { + Either3::First(_) => { + log::trace!( + "Port {} force master timeout", + self.config.port_identity.port_number + ); + // No announces received for a long time, become master + match self.port_state { + PortState::Master(_) => (), + _ => self + .set_forced_port_state(PortState::Master(MasterState::new())), + } + PortActionIterator::from(None) } - } - Either3::Second(_) => { + Either3::Second(_) => { + log::trace!( + "Port {} sync timeout", + self.config.port_identity.port_number + ); + // Send sync message + if let Err(error) = self.send_sync(network_port).await { + log::error!("{:?}", error); + } + PortActionIterator::from(None) + } + Either3::Third(_) => { + log::trace!( + "Port {} announce timeout", + self.config.port_identity.port_number + ); + // Send announce message + if let Err(error) = self.send_announce(network_port).await { + log::error!("{:?}", error); + } + PortActionIterator::from(None) + } + }, + Either3::Second(Ok(packet)) => { log::trace!( - "Port {} sync timeout", - self.config.port_identity.port_number + "Port {} message received: {:?}", + self.config.port_identity.port_number, + packet ); - // Send sync message - if let Err(error) = self.send_sync(network_port).await { - log::error!("{:?}", error); + match packet.timestamp { + Some(timestamp) => { + self.handle_timecritical_receive(&packet.data, timestamp) + } + None => self.handle_general_receive(&packet.data), } } + Either3::Second(Err(error)) => { + log::error!("failed to parse packet {:?}", error); + PortActionIterator::from(None) + } Either3::Third(_) => { log::trace!( - "Port {} announce timeout", + "Port {} bmca trigger", self.config.port_identity.port_number ); - // Send announce message - if let Err(error) = self.send_announce(network_port).await { - log::error!("{:?}", error); - } + break; } - }, - Either3::Second(Ok(packet)) => { - log::trace!( - "Port {} message received: {:?}", - self.config.port_identity.port_number, - packet - ); - let mut buffer = [0u8; MAX_DATA_LEN]; - let actions = match packet.timestamp { - Some(timestamp) => { - self.handle_timecritical_receive(&packet.data, timestamp, &mut buffer) - } - None => self.handle_general_receive(&packet.data), - }; - self.temp_handle_actions(actions, network_port, announce_receipt_timeout) - .await; } - Either3::Second(Err(error)) => log::error!("failed to parse packet {:?}", error), - Either3::Third(_) => { - log::trace!( - "Port {} bmca trigger", - self.config.port_identity.port_number - ); - break; - } - } - } - } + }; - async fn temp_handle_actions( - &mut self, - actions: PortActionIterator<'_>, - network_port: &mut P, - announce_receipt_timeout: &mut Pin<&mut Ticker FT>>, - ) { - for action in actions { - match action { - PortAction::SendTimeCritical { context, data } => { - match network_port.send_time_critical(data).await { - Ok(Some(timestamp)) => { - let mut followup = self.handle_send_timestamp(context, timestamp); - assert!(followup.next().is_none()); - } - Ok(None) => { - log::error!("Missing timestamp for packet"); - } - Err(error) => { - log::error!("Could not send message: {:?}", error) - } - } - } - PortAction::ResetAnnounceReceiptTimer { .. } => announce_receipt_timeout.reset(), - _ => panic!("Unexpected action here"), - } + pending_timestamp = + temp_handle_actions(actions, network_port, announce_receipt_timeout).await; } } @@ -653,3 +629,64 @@ impl<'a, C: Clock, F: Filter> Port> { .await } } + +// Separate from the object to deal with lifetime issues. +fn handle_time_measurement( + port_state: &mut PortState, + filter: &RefCell, + clock: &RefCell, + time_properties_ds: &TimePropertiesDS, +) { + // If the received message allowed the (slave) state to calculate its offset + // from the master, update the local clock + let mut filter = match filter.try_borrow_mut() { + Ok(filter) => filter, + Err(_) => { + log::error!("Statime bug: filter busy"); + return; + } + }; + let mut clock = match clock.try_borrow_mut() { + Ok(clock) => clock, + Err(_) => { + log::error!("Statime bug: filter busy"); + return; + } + }; + + if let Some(measurement) = port_state.extract_measurement() { + let (offset, freq_corr) = filter.absorb(measurement); + + if let Err(error) = clock.adjust(offset, freq_corr, time_properties_ds) { + log::error!("failed to adjust clock: {:?}", error); + } + } +} + +async fn temp_handle_actions( + actions: PortActionIterator<'_>, + network_port: &mut P, + announce_receipt_timeout: &mut Pin<&mut Ticker FT>>, +) -> Option<(TimestampContext, Time)> { + let mut pending_timestamp = None; + for action in actions { + match action { + PortAction::SendTimeCritical { context, data } => { + match network_port.send_time_critical(data).await { + Ok(Some(timestamp)) => { + pending_timestamp = Some((context, timestamp)); + } + Ok(None) => { + log::error!("Missing timestamp for packet"); + } + Err(error) => { + log::error!("Could not send message: {:?}", error) + } + } + } + PortAction::ResetAnnounceReceiptTimer { .. } => announce_receipt_timeout.reset(), + _ => panic!("Unexpected action here"), + } + } + pending_timestamp +} diff --git a/statime/src/port/state/mod.rs b/statime/src/port/state/mod.rs index b88ceb4ba..e08735f55 100644 --- a/statime/src/port/state/mod.rs +++ b/statime/src/port/state/mod.rs @@ -36,7 +36,7 @@ impl PortState { &mut self, context: TimestampContext, timestamp: Time, - ) -> Option> { + ) -> Option> { match self { PortState::Slave(slave) => slave.handle_timestamp(context, timestamp), PortState::Master(_) | PortState::Listening | PortState::Passive => None, diff --git a/statime/src/port/state/slave.rs b/statime/src/port/state/slave.rs index 625ae4ae4..700afa5b5 100644 --- a/statime/src/port/state/slave.rs +++ b/statime/src/port/state/slave.rs @@ -69,7 +69,7 @@ impl SlaveState { &mut self, context: TimestampContext, timestamp: Time, - ) -> Option> { + ) -> Option> { match context.inner { crate::port::TimestampContextInner::DelayReq { id } => { self.handle_delay_timestamp(id, timestamp) @@ -81,7 +81,7 @@ impl SlaveState { &mut self, timestamp_id: u16, timestamp: Time, - ) -> Option> { + ) -> Option> { match self.delay_state { DelayState::Measuring { id,