diff --git a/CHANGELOG.md b/CHANGELOG.md index 262babb8..b2c2bf55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ * Ensure lexical ordering of SDP-formatted candidates follows priority #557 * Limit TWCC iteration with packet status count #606 * Dedupe acked packets from `TwccSendRegister::apply_report()` #601, #605 - * Align BWE ArrivalGroup calculation with libwebrtc implementation #608 + * Align BWE ArrivalGroup calculation with libwebrtc implementation #608, #615 # 0.6.2 diff --git a/src/packet/bwe/arrival_group.rs b/src/packet/bwe/arrival_group.rs index b07227cd..06d17415 100644 --- a/src/packet/bwe/arrival_group.rs +++ b/src/packet/bwe/arrival_group.rs @@ -1,10 +1,12 @@ -use std::cmp::Ordering; use std::mem; use std::time::{Duration, Instant}; use crate::rtp_::SeqNo; -use super::AckedPacket; +use super::{ + time::{TimeDelta, Timestamp}, + AckedPacket, +}; const BURST_TIME_INTERVAL: Duration = Duration::from_millis(5); const SEND_TIME_GROUP_LENGTH: Duration = Duration::from_millis(5); @@ -62,32 +64,15 @@ impl ArrivalGroup { return Belongs::Skipped; }; - let inter_arrival_time = { - let last_remote_recv_time = self.remote_recv_time(); - - if packet.remote_recv_time >= last_remote_recv_time { - (packet.remote_recv_time - last_remote_recv_time).as_secs_f64() - } else { - (last_remote_recv_time - packet.remote_recv_time).as_secs_f64() * -1.0 - } - }; - - let last_send_delta = { - let last_send_time = self.local_send_time(); - - match packet.local_send_time.cmp(&last_send_time) { - Ordering::Equal => { - return Belongs::Yes; - } - Ordering::Greater => (packet.local_send_time - last_send_time).as_secs_f64(), - Ordering::Less => (last_send_time - packet.local_send_time).as_secs_f64() * -1.0, - } - }; - - let inter_group_delay_delta = inter_arrival_time - last_send_delta; + let send_time_delta = Timestamp::from(packet.local_send_time) - self.local_send_time(); + if send_time_delta == TimeDelta::ZERO { + return Belongs::Yes; + } + let arrival_time_delta = Timestamp::from(packet.remote_recv_time) - self.remote_recv_time(); - if inter_group_delay_delta < 0.0 - && inter_arrival_time <= BURST_TIME_INTERVAL.as_secs_f64() + let propagation_delta = arrival_time_delta - send_time_delta; + if propagation_delta < TimeDelta::ZERO + && arrival_time_delta <= BURST_TIME_INTERVAL && packet.remote_recv_time - first_remote_recv_time < MAX_BURST_DURATION { Belongs::Yes @@ -98,32 +83,14 @@ impl ArrivalGroup { } } - /// Calculate the inter group delay delta between self and a subsequent group. - pub(super) fn inter_group_delay_delta(&self, other: &Self) -> Option { - let first_seq_no = self.first.map(|(s, _, _)| s)?; - let last_seq_no = self.last_seq_no?; - - let arrival_delta = self.arrival_delta(other)?.as_secs_f64() * 1000.0; - let departure_delta = self.departure_delta(other)?.as_secs_f64() * 1000.0; - - assert!(arrival_delta >= 0.0); - - let result = arrival_delta - departure_delta; - trace!("Delay delta for group({first_seq_no}..={last_seq_no}. {result:?} = {arrival_delta:?} - {departure_delta:?}"); - - Some(result) - } - - pub(super) fn departure_delta(&self, other: &Self) -> Option { - other - .local_send_time() - .checked_duration_since(self.local_send_time()) + /// Calculate the send time delta between self and a subsequent group. + fn departure_delta(&self, other: &Self) -> TimeDelta { + Timestamp::from(other.local_send_time()) - self.local_send_time() } - fn arrival_delta(&self, other: &Self) -> Option { - other - .remote_recv_time() - .checked_duration_since(self.remote_recv_time()) + /// Calculate the remote receive time delta between self and a subsequent group. + fn arrival_delta(&self, other: &Self) -> TimeDelta { + Timestamp::from(other.remote_recv_time()) - self.remote_recv_time() } /// The local send time i.e. departure time, for the group. @@ -183,7 +150,7 @@ impl ArrivalGroupAccumulator { } // Variation between previous group and current. - let delay_delta = self.inter_group_delay_delta(); + let arrival_delta = self.arrival_delta(); let send_delta = self.send_delta(); let last_remote_recv_time = self.current_group.remote_recv_time(); @@ -194,32 +161,32 @@ impl ArrivalGroupAccumulator { Some(InterGroupDelayDelta { send_delta: send_delta?, - delay_delta: delay_delta?, + arrival_delta: arrival_delta?, last_remote_recv_time, }) } - fn inter_group_delay_delta(&self) -> Option { + fn arrival_delta(&self) -> Option { self.previous_group .as_ref() - .and_then(|prev| prev.inter_group_delay_delta(&self.current_group)) + .map(|prev| prev.arrival_delta(&self.current_group)) } - fn send_delta(&self) -> Option { + fn send_delta(&self) -> Option { self.previous_group .as_ref() - .and_then(|prev| prev.departure_delta(&self.current_group)) + .map(|prev| prev.departure_delta(&self.current_group)) } } /// The calculate delay delta between two groups of packets. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(super) struct InterGroupDelayDelta { /// The delta between the send times of the two groups i.e. delta between the last packet sent /// in each group. - pub(super) send_delta: Duration, - /// The delay delta between the two groups. - pub(super) delay_delta: f64, + pub(super) send_delta: TimeDelta, + /// The delta between the remote arrival times of the two groups. + pub(super) arrival_delta: TimeDelta, /// The reported receive time for the last packet in the first arrival group. pub(super) last_remote_recv_time: Instant, } @@ -230,7 +197,7 @@ mod test { use crate::rtp_::DataSize; - use super::{AckedPacket, ArrivalGroup, Belongs}; + use super::{AckedPacket, ArrivalGroup, ArrivalGroupAccumulator, Belongs, TimeDelta}; #[test] fn test_arrival_group_all_packets_belong_to_empty_group() { @@ -441,6 +408,34 @@ mod test { assert_eq!(group.size, 3, "Expected group to contain 4 packets"); } + #[test] + fn group_reorder() { + let data = vec![ + ((Duration::from_millis(0), Duration::from_millis(0)), None), + ((Duration::from_millis(60), Duration::from_millis(5)), None), + ((Duration::from_millis(40), Duration::from_millis(10)), None), + ( + (Duration::from_millis(70), Duration::from_millis(20)), + Some((TimeDelta::from_millis(-20), TimeDelta::from_millis(5))), + ), + ]; + + let now = Instant::now(); + let mut aga = ArrivalGroupAccumulator::default(); + + for ((local_send_time, remote_recv_time), deltas) in data { + let group_delta = aga.accumulate_packet(&AckedPacket { + seq_no: Default::default(), + size: Default::default(), + local_send_time: now + local_send_time, + remote_recv_time: now + remote_recv_time, + local_recv_time: Instant::now(), // does not matter + }); + + assert_eq!(group_delta.map(|d| (d.send_delta, d.arrival_delta)), deltas); + } + } + fn duration_us(us: u64) -> Duration { Duration::from_micros(us) } diff --git a/src/packet/bwe/delay_controller.rs b/src/packet/bwe/delay_controller.rs index 354e901b..055a0c89 100644 --- a/src/packet/bwe/delay_controller.rs +++ b/src/packet/bwe/delay_controller.rs @@ -66,7 +66,7 @@ impl DelayController { .arrival_group_accumulator .accumulate_packet(acked_packet) { - crate::packet::bwe::macros::log_delay_variation!(delay_variation.delay_delta); + crate::packet::bwe::macros::log_delay_variation!(delay_variation.arrival_delta); // Got a new delay variation, add it to the trendline self.trendline_estimator diff --git a/src/packet/bwe/loss_controller.rs b/src/packet/bwe/loss_controller.rs index 4f907bfd..8e7fb62f 100644 --- a/src/packet/bwe/loss_controller.rs +++ b/src/packet/bwe/loss_controller.rs @@ -8,7 +8,7 @@ use crate::packet::bwe::macros::log_loss_bw_limit_in_window; use crate::rtp_::TwccSendRecord; use crate::{Bitrate, DataSize}; -use super::super_instant::SuperInstant; +use super::time::{TimeDelta, Timestamp}; /// Loss controller based loosely on libWebRTC's `LossBasedBweV2`(commit `14e2779a6ccdc67038ed2069a5732dd41617c6f0`) /// We don't implement ALR, link capacity estimates or probing(although we use constant padding @@ -38,7 +38,7 @@ pub struct LossController { partial_observation: PartialObservation, /// The last packet sent in the most recent observation. - last_send_time_most_recent_observation: SuperInstant, + last_send_time_most_recent_observation: Timestamp, // Observation window /// Forever growing counter of observations. Observation::id derives from this. @@ -53,11 +53,11 @@ pub struct LossController { /// Precomputed instantaneous upper bound on bandwidth estimate. cached_instant_upper_bound: Option, /// Last time we reduced the estimate. - last_time_estimate_reduced: SuperInstant, + last_time_estimate_reduced: Timestamp, /// When we started recovering after being loss limited last time. /// While in this window the bandwidth estimate is bounded by `bandwidth_limit_in_current_window`. - recovering_after_loss_timestamp: SuperInstant, + recovering_after_loss_timestamp: Timestamp, /// Upper bound on estimate while in recovery window. bandwidth_limit_in_current_window: Bitrate, @@ -105,7 +105,7 @@ impl LossController { let mut controller = LossController { state: LossControllerState::DelayBased, partial_observation: PartialObservation::new(), - last_send_time_most_recent_observation: SuperInstant::DistantFuture, + last_send_time_most_recent_observation: Timestamp::DistantFuture, observations: vec![Observation::DUMMY; config.observation_window_size] .into_boxed_slice(), num_observations: 0, @@ -113,8 +113,8 @@ impl LossController { instant_upper_bound_temporal_weights: vec![0_f64; config.observation_window_size] .into_boxed_slice(), cached_instant_upper_bound: None, - last_time_estimate_reduced: SuperInstant::DistantPast, - recovering_after_loss_timestamp: SuperInstant::DistantPast, + last_time_estimate_reduced: Timestamp::DistantPast, + recovering_after_loss_timestamp: Timestamp::DistantPast, bandwidth_limit_in_current_window: Bitrate::MAX, current_estimate: ChannelParameters::new(config.initial_inherent_loss_estimate), @@ -212,7 +212,7 @@ impl LossController { // `delayed_increase_window` ago, and // 2. The best candidate is greater than bandwidth_limit_in_current_window. - if self.recovering_after_loss_timestamp.is_finite() + if self.recovering_after_loss_timestamp.is_exact() && self.recovering_after_loss_timestamp + self.config.delayed_increase_window > self.last_send_time_most_recent_observation && best_candidate.loss_limited_bandwidth > self.bandwidth_limit_in_current_window @@ -258,7 +258,7 @@ impl LossController { const CONF_MAX_INCREASE_FACTOR: f64 = 1.3; if self.is_bandwidth_limited_due_to_loss() - && (self.recovering_after_loss_timestamp.is_not_finite() + && (!self.recovering_after_loss_timestamp.is_exact() || self.recovering_after_loss_timestamp + self.config.delayed_increase_window < self.last_send_time_most_recent_observation) { @@ -315,19 +315,15 @@ impl LossController { return false; }; - let last_send_time = summary.last_send_time; + let last_send_time = Timestamp::from(summary.last_send_time); self.partial_observation.update(summary); - if !self.last_send_time_most_recent_observation.is_finite() { - self.last_send_time_most_recent_observation = last_send_time.into(); + if !self.last_send_time_most_recent_observation.is_exact() { + self.last_send_time_most_recent_observation = last_send_time; } - let observation_duration = last_send_time - - self - .last_send_time_most_recent_observation - .as_instant() - .expect("instant is not finite"); + let observation_duration = last_send_time - self.last_send_time_most_recent_observation; if observation_duration <= Duration::ZERO { return false; @@ -338,7 +334,7 @@ impl LossController { return false; } - self.last_send_time_most_recent_observation = last_send_time.into(); + self.last_send_time_most_recent_observation = last_send_time; let observation = { let id = self.num_observations; @@ -585,24 +581,24 @@ impl LossController { return upper_bound; } - if self.config.rampup_acceleration_max_factor > Duration::ZERO { - if let (Some(most_recent), Some(reduced)) = ( - self.last_send_time_most_recent_observation.as_instant(), - self.last_time_estimate_reduced.as_instant(), - ) { - let delta = most_recent - reduced; - let time_since_bw_reduced = self - .config - .rampup_acceleration_maxout_time - .min(delta.max(Duration::ZERO)) - .as_secs_f64(); - - let rampup_acceleration = self.config.rampup_acceleration_max_factor.as_secs_f64() - * time_since_bw_reduced - / self.config.rampup_acceleration_maxout_time.as_secs_f64(); - - upper_bound = upper_bound + (self.acknowledged_bitrate * rampup_acceleration); - } + if self.config.rampup_acceleration_max_factor > Duration::ZERO + && self.last_send_time_most_recent_observation.is_exact() + && self.last_time_estimate_reduced.is_exact() + { + let delta = (self.last_send_time_most_recent_observation + - self.last_time_estimate_reduced) + .max(TimeDelta::ZERO); + let time_since_bw_reduced = self + .config + .rampup_acceleration_maxout_time + .as_secs_f64() + .min(delta.as_secs_f64()); + + let rampup_acceleration = self.config.rampup_acceleration_max_factor.as_secs_f64() + * time_since_bw_reduced + / self.config.rampup_acceleration_maxout_time.as_secs_f64(); + + upper_bound = upper_bound + (self.acknowledged_bitrate * rampup_acceleration); } upper_bound diff --git a/src/packet/bwe/mod.rs b/src/packet/bwe/mod.rs index 2c7916f2..eac46e02 100644 --- a/src/packet/bwe/mod.rs +++ b/src/packet/bwe/mod.rs @@ -16,7 +16,7 @@ mod delay_controller; mod loss_controller; pub(crate) mod macros; mod rate_control; -mod super_instant; +mod time; mod trendline_estimator; use acked_bitrate_estimator::AckedBitrateEstimator; diff --git a/src/packet/bwe/super_instant.rs b/src/packet/bwe/super_instant.rs deleted file mode 100644 index 74764c02..00000000 --- a/src/packet/bwe/super_instant.rs +++ /dev/null @@ -1,155 +0,0 @@ -use std::{ - ops::{Add, AddAssign, Sub, SubAssign}, - time::{Duration, Instant}, -}; - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] - -/// This is created to ease the modeling of values that represent a moment -/// that is far in the future or in the past, yet allowing PartialEq, Ord, Add, -/// "transparently". -// -// Before doing this, a workaround like Optional was considered, -// but it would not clarify whether the absence of a value would be considered -// as a distant past or distant future and it would have to be handled on a -// case-by-case basis. -pub(crate) enum SuperInstant { - DistantPast, - Value { instant: Instant }, - DistantFuture, -} - -impl SuperInstant { - #[cfg(test)] - pub fn now() -> Self { - Self::Value { - instant: Instant::now(), - } - } - - pub fn is_finite(&self) -> bool { - matches!(self, Self::Value { .. }) - } - - pub fn is_not_finite(&self) -> bool { - !self.is_finite() - } - - pub fn as_instant(&self) -> Option { - match self { - Self::DistantPast | Self::DistantFuture => None, - Self::Value { instant } => Some(*instant), - } - } -} - -impl PartialOrd for SuperInstant { - fn partial_cmp(&self, other: &Self) -> Option { - match (self, other) { - (Self::DistantPast, Self::DistantPast) => Some(std::cmp::Ordering::Equal), - (Self::DistantPast, _) => Some(std::cmp::Ordering::Less), - (_, Self::DistantPast) => Some(std::cmp::Ordering::Greater), - (Self::DistantFuture, Self::DistantFuture) => Some(std::cmp::Ordering::Equal), - (Self::DistantFuture, _) => Some(std::cmp::Ordering::Greater), - (_, Self::DistantFuture) => Some(std::cmp::Ordering::Less), - (Self::Value { instant: a }, Self::Value { instant: b }) => a.partial_cmp(b), - } - } -} - -impl Sub for SuperInstant { - type Output = Self; - - fn sub(self, rhs: Duration) -> Self::Output { - match self { - Self::DistantPast => Self::DistantPast, - Self::DistantFuture => Self::DistantFuture, - Self::Value { instant } => Self::Value { - instant: instant - rhs, - }, - } - } -} - -impl SubAssign for SuperInstant { - fn sub_assign(&mut self, rhs: Duration) { - match self { - Self::DistantPast => {} - Self::DistantFuture => {} - Self::Value { instant } => *instant -= rhs, - } - } -} - -impl AddAssign for SuperInstant { - fn add_assign(&mut self, rhs: Duration) { - match self { - Self::DistantPast => {} - Self::DistantFuture => {} - Self::Value { instant } => *instant += rhs, - } - } -} - -impl Add for SuperInstant { - type Output = Self; - - fn add(self, rhs: Duration) -> Self::Output { - match self { - Self::DistantPast => Self::DistantPast, - Self::DistantFuture => Self::DistantFuture, - Self::Value { instant } => Self::Value { - instant: instant + rhs, - }, - } - } -} - -impl From for SuperInstant { - fn from(instant: Instant) -> Self { - Self::Value { instant } - } -} - -#[cfg(test)] -mod test { - use std::time::Duration; - - use crate::packet::bwe::super_instant::SuperInstant; - - #[test] - fn test() { - let mut past = SuperInstant::DistantPast; - let mut future = SuperInstant::DistantFuture; - let now = SuperInstant::now(); - assert!(past < now); - assert!(now > past); - - assert!(now < future); - assert!(future > now); - - assert!(past < future); - assert!(future > past); - - assert!(now == now); - assert!(past == past); - assert!(future == future); - - assert!(now != past); - assert!(now != future); - assert!(future != past); - - assert!(past - Duration::from_secs(1) == past); - past -= Duration::from_secs(1); - assert!(past == past); - past += Duration::from_secs(1); - assert!(past == past); - - assert!(future + Duration::from_secs(1) == future); - assert!(future - Duration::from_secs(1) == future); - future -= Duration::from_secs(1); - assert!(future == future); - future += Duration::from_secs(1); - assert!(future == future); - } -} diff --git a/src/packet/bwe/time.rs b/src/packet/bwe/time.rs new file mode 100644 index 00000000..18b7d4fe --- /dev/null +++ b/src/packet/bwe/time.rs @@ -0,0 +1,748 @@ +use std::cmp::Ordering; +use std::ops::{Add, AddAssign, Div, Neg as _, Sub, SubAssign}; +use std::time::{Duration, Instant}; + +use crate::bwe::Bitrate; +use crate::rtp_::DataSize; + +/// Wrapper for [`Instant`] that provides additional time points in the past or future. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum Timestamp { + /// A time in the past that already happened. + DistantPast, + + /// An exact instant. + Exact(Instant), + + /// A time in the future that will never happen. + DistantFuture, +} + +/// Wrapper for [`Duration`] that can be negative and provides a duration to a +/// distant future or past. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum TimeDelta { + /// Time delta to some event in distant past that already happened. + NegativeInfinity, + + /// An exact negative duration. + Negative(Duration), + + /// An exact positive duration. + Positive(Duration), + + /// Time delta to some event in distant future that will never happen. + PositiveInfinity, +} + +impl TimeDelta { + pub(super) const ZERO: Self = Self::Positive(Duration::ZERO); + + /// Returns the number of seconds contained by this [`TimeDelta`] as `f64`. + pub fn as_secs_f64(&self) -> f64 { + match self { + Self::NegativeInfinity => f64::NEG_INFINITY, + Self::Negative(d) => d.as_secs_f64().neg(), + Self::Positive(d) => d.as_secs_f64(), + Self::PositiveInfinity => f64::INFINITY, + } + } +} + +#[cfg(test)] +impl TimeDelta { + /// Creates a [`TimeDelta`] from seconds. + pub const fn from_secs(secs: i64) -> TimeDelta { + if secs >= 0 { + Self::Positive(Duration::from_secs(secs as u64)) + } else { + Self::Negative(Duration::from_secs(-secs as u64)) + } + } + + /// Creates a [`TimeDelta`] from milliseconds. + pub const fn from_millis(millis: i64) -> Self { + if millis >= 0 { + Self::Positive(Duration::from_millis(millis as u64)) + } else { + Self::Negative(Duration::from_millis(-millis as u64)) + } + } +} + +impl Timestamp { + /// Indicates whether this [`Timestamp`] is [`Timestamp::Exact`]. + pub const fn is_exact(&self) -> bool { + matches!(self, Self::Exact(_)) + } +} + +impl Add for Timestamp { + type Output = Self; + + fn add(self, rhs: TimeDelta) -> Self::Output { + match (self, rhs) { + (Self::DistantFuture, _) | (_, TimeDelta::PositiveInfinity) => Self::DistantFuture, + (Self::DistantPast, _) | (_, TimeDelta::NegativeInfinity) => Self::DistantPast, + (Self::Exact(i), TimeDelta::Negative(d)) => Self::Exact(i - d), + (Self::Exact(i), TimeDelta::Positive(d)) => Self::Exact(i + d), + } + } +} + +impl Sub for Timestamp { + type Output = Self; + + fn sub(self, rhs: TimeDelta) -> Self::Output { + match (self, rhs) { + (Self::DistantFuture, _) | (_, TimeDelta::NegativeInfinity) => Self::DistantFuture, + (Self::DistantPast, _) | (_, TimeDelta::PositiveInfinity) => Self::DistantPast, + (Self::Exact(i), TimeDelta::Negative(d)) => Self::Exact(i + d), + (Self::Exact(i), TimeDelta::Positive(d)) => Self::Exact(i - d), + } + } +} + +impl Sub for Timestamp { + type Output = TimeDelta; + + fn sub(self, rhs: Self) -> Self::Output { + match (self, rhs) { + (Self::DistantFuture, _) | (_, Self::DistantPast) => TimeDelta::PositiveInfinity, + (Self::DistantPast, _) | (_, Self::DistantFuture) => TimeDelta::NegativeInfinity, + (Self::Exact(this), Self::Exact(that)) => match this.cmp(&that) { + Ordering::Less => TimeDelta::Negative(that - this), + Ordering::Equal => TimeDelta::ZERO, + Ordering::Greater => TimeDelta::Positive(this - that), + }, + } + } +} + +impl Add for Timestamp { + type Output = Self; + + fn add(self, rhs: Duration) -> Self::Output { + self + TimeDelta::from(rhs) + } +} + +impl Sub for Timestamp { + type Output = Self; + + fn sub(self, rhs: Duration) -> Self::Output { + self - TimeDelta::from(rhs) + } +} + +impl Sub for Timestamp { + type Output = TimeDelta; + + fn sub(self, rhs: Instant) -> Self::Output { + self.sub(Self::from(rhs)) + } +} + +impl SubAssign for Timestamp { + fn sub_assign(&mut self, rhs: TimeDelta) { + *self = *self - rhs; + } +} + +impl AddAssign for Timestamp { + fn add_assign(&mut self, rhs: TimeDelta) { + *self = *self + rhs; + } +} + +impl SubAssign for Timestamp { + fn sub_assign(&mut self, rhs: Duration) { + *self = *self - rhs; + } +} + +impl AddAssign for Timestamp { + fn add_assign(&mut self, rhs: Duration) { + *self = *self + rhs; + } +} + +impl PartialOrd for Timestamp { + fn partial_cmp(&self, other: &Self) -> Option { + Some(Self::cmp(self, other)) + } +} + +impl Ord for Timestamp { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Self::DistantPast, Self::DistantPast) => Ordering::Equal, + (Self::DistantPast, _) => Ordering::Less, + (_, Self::DistantPast) => Ordering::Greater, + (Self::DistantFuture, Self::DistantFuture) => Ordering::Equal, + (Self::DistantFuture, _) => Ordering::Greater, + (_, Self::DistantFuture) => Ordering::Less, + (Self::Exact(v1), Self::Exact(v2)) => v1.cmp(v2), + } + } +} + +impl From for Timestamp { + fn from(value: Instant) -> Self { + Self::Exact(value) + } +} + +impl Add for TimeDelta { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + match (self, rhs) { + (Self::PositiveInfinity, _) | (_, Self::PositiveInfinity) => Self::PositiveInfinity, + (Self::NegativeInfinity, _) | (_, Self::NegativeInfinity) => Self::NegativeInfinity, + (Self::Negative(this), Self::Negative(that)) => Self::Negative(this + that), + (Self::Positive(this), Self::Positive(that)) => Self::Positive(this + that), + (Self::Positive(this), Self::Negative(that)) => match this.cmp(&that) { + Ordering::Less => Self::Negative(that - this), + Ordering::Equal => Self::ZERO, + Ordering::Greater => Self::Positive(this - that), + }, + (Self::Negative(this), Self::Positive(that)) => match this.cmp(&that) { + Ordering::Less => Self::Positive(that - this), + Ordering::Equal => Self::ZERO, + Ordering::Greater => Self::Negative(this - that), + }, + } + } +} + +impl Sub for TimeDelta { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + match (self, rhs) { + (Self::PositiveInfinity, _) | (_, Self::NegativeInfinity) => Self::PositiveInfinity, + (Self::NegativeInfinity, _) | (_, Self::PositiveInfinity) => Self::NegativeInfinity, + (Self::Positive(this), Self::Negative(that)) => Self::Positive(this + that), + (Self::Negative(this), Self::Positive(that)) => Self::Negative(this + that), + (Self::Positive(this), Self::Positive(that)) => match this.cmp(&that) { + Ordering::Less => Self::Negative(that - this), + Ordering::Equal => Self::ZERO, + Ordering::Greater => Self::Positive(this - that), + }, + (Self::Negative(this), Self::Negative(that)) => match this.cmp(&that) { + Ordering::Less => Self::Positive(that - this), + Ordering::Equal => Self::ZERO, + Ordering::Greater => Self::Negative(this - that), + }, + } + } +} + +impl PartialOrd for TimeDelta { + fn partial_cmp(&self, other: &Self) -> Option { + Some(Self::cmp(self, other)) + } +} + +impl Ord for TimeDelta { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Self::NegativeInfinity, Self::NegativeInfinity) => Ordering::Equal, + (Self::NegativeInfinity, _) => Ordering::Less, + (_, Self::NegativeInfinity) => Ordering::Greater, + (Self::PositiveInfinity, Self::PositiveInfinity) => Ordering::Equal, + (Self::PositiveInfinity, _) => Ordering::Greater, + (_, Self::PositiveInfinity) => Ordering::Less, + (Self::Negative(_), Self::Positive(_)) => Ordering::Less, + (Self::Positive(_), Self::Negative(_)) => Ordering::Greater, + (Self::Positive(this), Self::Positive(that)) => this.cmp(that), + (Self::Negative(this), Self::Negative(that)) => that.cmp(this), + } + } +} + +impl PartialEq for TimeDelta { + fn eq(&self, other: &Duration) -> bool { + *self == Self::from(*other) + } +} + +impl PartialOrd for TimeDelta { + fn partial_cmp(&self, other: &Duration) -> Option { + Some(Self::cmp(self, &Self::from(*other))) + } +} + +impl SubAssign for TimeDelta { + fn sub_assign(&mut self, rhs: Self) { + *self = *self - rhs; + } +} + +impl AddAssign for TimeDelta { + fn add_assign(&mut self, rhs: Self) { + *self = *self + rhs; + } +} + +impl Div for TimeDelta { + type Output = Self; + + #[inline] + fn div(self, rhs: u32) -> Self { + match self { + Self::NegativeInfinity | Self::PositiveInfinity => self, + Self::Negative(duration) => Self::Negative(duration / rhs), + Self::Positive(duration) => Self::Positive(duration / rhs), + } + } +} + +impl From for TimeDelta { + fn from(value: Duration) -> Self { + Self::Positive(value) + } +} + +impl Div for DataSize { + type Output = Bitrate; + + fn div(self, rhs: TimeDelta) -> Self::Output { + let bytes = self.as_bytes_f64(); + let s = rhs.as_secs_f64(); + + if s == 0.0 { + return Bitrate::ZERO; + } + + let bps = (bytes * 8.0) / s; + + bps.into() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn instant_add_duration() { + let now = Instant::now(); + + assert_eq!( + Timestamp::Exact(now) + TimeDelta::from_secs(5), + Timestamp::from(now + Duration::from_secs(5)) + ); + assert_eq!( + Timestamp::Exact(now) + TimeDelta::from_secs(-5), + Timestamp::from(now - Duration::from_secs(5)) + ); + assert_eq!( + Timestamp::Exact(now) + TimeDelta::NegativeInfinity, + Timestamp::DistantPast + ); + assert_eq!( + Timestamp::Exact(now) + TimeDelta::PositiveInfinity, + Timestamp::DistantFuture + ); + + assert_eq!( + Timestamp::DistantPast + TimeDelta::from_secs(5), + Timestamp::DistantPast + ); + assert_eq!( + Timestamp::DistantPast + TimeDelta::from_secs(-5), + Timestamp::DistantPast + ); + assert_eq!( + Timestamp::DistantPast + TimeDelta::NegativeInfinity, + Timestamp::DistantPast + ); + assert_eq!( + Timestamp::DistantPast + TimeDelta::PositiveInfinity, + Timestamp::DistantFuture + ); + + assert_eq!( + Timestamp::DistantFuture + TimeDelta::from_secs(5), + Timestamp::DistantFuture + ); + assert_eq!( + Timestamp::DistantFuture + TimeDelta::from_secs(-5), + Timestamp::DistantFuture + ); + assert_eq!( + Timestamp::DistantFuture + TimeDelta::NegativeInfinity, + Timestamp::DistantFuture + ); + assert_eq!( + Timestamp::DistantFuture + TimeDelta::PositiveInfinity, + Timestamp::DistantFuture + ); + } + + #[test] + fn instant_sub_duration() { + let now = Instant::now(); + + assert_eq!( + Timestamp::Exact(now) - TimeDelta::from_secs(5), + Timestamp::from(now - Duration::from_secs(5)) + ); + assert_eq!( + Timestamp::Exact(now) - TimeDelta::from_secs(-5), + Timestamp::from(now + Duration::from_secs(5)) + ); + assert_eq!( + Timestamp::Exact(now) - TimeDelta::NegativeInfinity, + Timestamp::DistantFuture + ); + assert_eq!( + Timestamp::Exact(now) - TimeDelta::PositiveInfinity, + Timestamp::DistantPast + ); + + assert_eq!( + Timestamp::DistantPast - TimeDelta::from_secs(5), + Timestamp::DistantPast + ); + assert_eq!( + Timestamp::DistantPast - TimeDelta::from_secs(-5), + Timestamp::DistantPast + ); + assert_eq!( + Timestamp::DistantPast - TimeDelta::NegativeInfinity, + Timestamp::DistantFuture + ); + assert_eq!( + Timestamp::DistantPast - TimeDelta::PositiveInfinity, + Timestamp::DistantPast + ); + + assert_eq!( + Timestamp::DistantFuture - TimeDelta::from_secs(5), + Timestamp::DistantFuture + ); + assert_eq!( + Timestamp::DistantFuture - TimeDelta::from_secs(-5), + Timestamp::DistantFuture + ); + assert_eq!( + Timestamp::DistantFuture - TimeDelta::NegativeInfinity, + Timestamp::DistantFuture + ); + assert_eq!( + Timestamp::DistantFuture - TimeDelta::PositiveInfinity, + Timestamp::DistantFuture + ); + } + + #[test] + fn instant_sub_instant() { + let now = Instant::now(); + + assert_eq!( + Timestamp::Exact(now) - Timestamp::Exact(now), + TimeDelta::ZERO + ); + assert_eq!( + Timestamp::Exact(now) - Timestamp::Exact(now - Duration::from_secs(5)), + TimeDelta::from_secs(5) + ); + assert_eq!( + Timestamp::Exact(now) - Timestamp::Exact(now + Duration::from_secs(5)), + TimeDelta::from_secs(-5) + ); + assert_eq!( + Timestamp::Exact(now) - Timestamp::DistantPast, + TimeDelta::PositiveInfinity + ); + assert_eq!( + Timestamp::Exact(now) - Timestamp::DistantFuture, + TimeDelta::NegativeInfinity + ); + + assert_eq!( + Timestamp::DistantPast - Timestamp::Exact(now), + TimeDelta::NegativeInfinity + ); + assert_eq!( + Timestamp::DistantPast - Timestamp::Exact(now - Duration::from_secs(5)), + TimeDelta::NegativeInfinity + ); + assert_eq!( + Timestamp::DistantPast - Timestamp::Exact(now + Duration::from_secs(5)), + TimeDelta::NegativeInfinity + ); + assert_eq!( + Timestamp::DistantPast - Timestamp::DistantPast, + TimeDelta::PositiveInfinity + ); + assert_eq!( + Timestamp::DistantPast - Timestamp::DistantFuture, + TimeDelta::NegativeInfinity + ); + + assert_eq!( + Timestamp::DistantFuture - Timestamp::Exact(now), + TimeDelta::PositiveInfinity + ); + assert_eq!( + Timestamp::DistantFuture - Timestamp::Exact(now - Duration::from_secs(5)), + TimeDelta::PositiveInfinity + ); + assert_eq!( + Timestamp::DistantFuture - Timestamp::Exact(now + Duration::from_secs(5)), + TimeDelta::PositiveInfinity + ); + assert_eq!( + Timestamp::DistantFuture - Timestamp::DistantPast, + TimeDelta::PositiveInfinity + ); + assert_eq!( + Timestamp::DistantFuture - Timestamp::DistantFuture, + TimeDelta::PositiveInfinity + ); + } + + #[test] + fn instant_ord() { + let now = Timestamp::Exact(Instant::now()); + let now_minus_1 = now - TimeDelta::from_secs(1); + let now_plus_1 = now + TimeDelta::from_secs(1); + + assert!(Timestamp::DistantFuture > now_plus_1); + assert!(Timestamp::DistantFuture > now_minus_1); + assert!(Timestamp::DistantFuture > Timestamp::DistantPast); + + assert!(now_plus_1 > now_minus_1); + assert!(now_plus_1 > Timestamp::DistantPast); + + assert!(now_minus_1 > Timestamp::DistantPast); + } + + #[test] + fn duration_ord() { + assert!(TimeDelta::PositiveInfinity > TimeDelta::from_secs(-2)); + assert!(TimeDelta::PositiveInfinity > TimeDelta::from_secs(2)); + assert!(TimeDelta::PositiveInfinity > TimeDelta::NegativeInfinity); + + assert!(TimeDelta::from_secs(2) > TimeDelta::from_secs(1)); + assert!(TimeDelta::from_secs(2) > TimeDelta::from_secs(-1)); + assert!(TimeDelta::from_secs(2) > TimeDelta::from_secs(-2)); + assert!(TimeDelta::from_secs(2) > TimeDelta::NegativeInfinity); + + assert!(TimeDelta::from_secs(1) > TimeDelta::from_secs(-1)); + assert!(TimeDelta::from_secs(1) > TimeDelta::from_secs(-2)); + assert!(TimeDelta::from_secs(1) > TimeDelta::NegativeInfinity); + + assert!(TimeDelta::from_secs(-1) > TimeDelta::from_secs(-2)); + assert!(TimeDelta::from_secs(-1) > TimeDelta::NegativeInfinity); + + assert!(TimeDelta::from_secs(-2) > TimeDelta::NegativeInfinity); + + assert_eq!(TimeDelta::from_secs(1), Duration::from_secs(1)); + assert!(TimeDelta::from_secs(2) > Duration::from_secs(1)); + assert!(TimeDelta::from_secs(1) < Duration::from_secs(2)); + assert!(TimeDelta::from_secs(-1) < Duration::ZERO); + assert!(TimeDelta::from_secs(-1) < Duration::from_secs(1)); + assert!(TimeDelta::PositiveInfinity > Duration::from_secs(2)); + assert!(TimeDelta::NegativeInfinity < Duration::from_secs(1)); + assert!(TimeDelta::NegativeInfinity < Duration::ZERO); + } + + #[test] + fn duration_add() { + assert_eq!( + TimeDelta::PositiveInfinity + TimeDelta::PositiveInfinity, + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::PositiveInfinity + TimeDelta::NegativeInfinity, + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::PositiveInfinity + TimeDelta::from_secs(-2), + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::PositiveInfinity + TimeDelta::from_secs(2), + TimeDelta::PositiveInfinity + ); + + assert_eq!( + TimeDelta::NegativeInfinity + TimeDelta::PositiveInfinity, + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::NegativeInfinity + TimeDelta::NegativeInfinity, + TimeDelta::NegativeInfinity + ); + assert_eq!( + TimeDelta::NegativeInfinity + TimeDelta::from_secs(-2), + TimeDelta::NegativeInfinity + ); + assert_eq!( + TimeDelta::NegativeInfinity + TimeDelta::from_secs(2), + TimeDelta::NegativeInfinity + ); + + assert_eq!( + TimeDelta::from_secs(1) + TimeDelta::PositiveInfinity, + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::from_secs(1) + TimeDelta::NegativeInfinity, + TimeDelta::NegativeInfinity + ); + assert_eq!( + TimeDelta::from_secs(1) + TimeDelta::from_secs(-1), + TimeDelta::ZERO + ); + assert_eq!( + TimeDelta::from_secs(1) + TimeDelta::from_secs(-2), + TimeDelta::from_secs(-1) + ); + assert_eq!( + TimeDelta::from_secs(1) + TimeDelta::from_secs(2), + TimeDelta::from_secs(3) + ); + + assert_eq!( + TimeDelta::from_secs(-1) + TimeDelta::PositiveInfinity, + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::from_secs(-1) + TimeDelta::NegativeInfinity, + TimeDelta::NegativeInfinity + ); + assert_eq!( + TimeDelta::from_secs(-1) + TimeDelta::from_secs(1), + TimeDelta::ZERO + ); + assert_eq!( + TimeDelta::from_secs(-1) + TimeDelta::from_secs(-2), + TimeDelta::from_secs(-3) + ); + assert_eq!( + TimeDelta::from_secs(-1) + TimeDelta::from_secs(2), + TimeDelta::from_secs(1) + ); + } + + #[test] + fn duration_sub() { + assert_eq!( + TimeDelta::PositiveInfinity - TimeDelta::PositiveInfinity, + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::PositiveInfinity - TimeDelta::NegativeInfinity, + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::PositiveInfinity - TimeDelta::from_secs(-2), + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::PositiveInfinity - TimeDelta::from_secs(2), + TimeDelta::PositiveInfinity + ); + + assert_eq!( + TimeDelta::NegativeInfinity - TimeDelta::NegativeInfinity, + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::NegativeInfinity - TimeDelta::PositiveInfinity, + TimeDelta::NegativeInfinity + ); + assert_eq!( + TimeDelta::NegativeInfinity - TimeDelta::from_secs(-2), + TimeDelta::NegativeInfinity + ); + assert_eq!( + TimeDelta::NegativeInfinity - TimeDelta::from_secs(2), + TimeDelta::NegativeInfinity + ); + + assert_eq!( + TimeDelta::from_secs(1) - TimeDelta::NegativeInfinity, + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::from_secs(1) - TimeDelta::PositiveInfinity, + TimeDelta::NegativeInfinity + ); + assert_eq!( + TimeDelta::from_secs(1) - TimeDelta::from_secs(1), + TimeDelta::ZERO + ); + assert_eq!( + TimeDelta::from_secs(1) - TimeDelta::from_secs(-1), + TimeDelta::from_secs(2) + ); + assert_eq!( + TimeDelta::from_secs(1) - TimeDelta::from_secs(2), + TimeDelta::from_secs(-1) + ); + + assert_eq!( + TimeDelta::from_secs(-1) - TimeDelta::NegativeInfinity, + TimeDelta::PositiveInfinity + ); + assert_eq!( + TimeDelta::from_secs(-1) - TimeDelta::PositiveInfinity, + TimeDelta::NegativeInfinity + ); + assert_eq!( + TimeDelta::from_secs(-1) - TimeDelta::from_secs(-1), + TimeDelta::ZERO + ); + assert_eq!( + TimeDelta::from_secs(-1) - TimeDelta::from_secs(1), + TimeDelta::from_secs(-2) + ); + assert_eq!( + TimeDelta::from_secs(-1) - TimeDelta::from_secs(2), + TimeDelta::from_secs(-3) + ); + } + + #[test] + fn super_instant_test() { + let mut past = Timestamp::DistantPast; + let mut future = Timestamp::DistantFuture; + let now = Timestamp::Exact(Instant::now()); + assert!(past < now); + assert!(now > past); + + assert!(now < future); + assert!(future > now); + + assert!(past < future); + assert!(future > past); + + assert!(now == now); + assert!(past == past); + assert!(future == future); + + assert!(now != past); + assert!(now != future); + assert!(future != past); + + assert!(past - Duration::from_secs(1) == past); + past -= Duration::from_secs(1); + assert!(past == past); + past += Duration::from_secs(1); + assert!(past == past); + + assert!(future + Duration::from_secs(1) == future); + assert!(future - Duration::from_secs(1) == future); + future -= Duration::from_secs(1); + assert!(future == future); + future += Duration::from_secs(1); + assert!(future == future); + } +} diff --git a/src/packet/bwe/trendline_estimator.rs b/src/packet/bwe/trendline_estimator.rs index e061db11..74b1a60a 100644 --- a/src/packet/bwe/trendline_estimator.rs +++ b/src/packet/bwe/trendline_estimator.rs @@ -2,7 +2,10 @@ use std::collections::VecDeque; use std::ops::RangeInclusive; use std::time::{Duration, Instant}; -use super::{BandwidthUsage, InterGroupDelayDelta}; +use super::{ + time::{TimeDelta, Timestamp}, + BandwidthUsage, InterGroupDelayDelta, +}; const SMOOTHING_COEF: f64 = 0.9; const OVER_USE_THRESHOLD_DEFAULT_MS: f64 = 12.5; @@ -20,7 +23,7 @@ pub(super) struct TrendlineEstimator { window_size: usize, /// The first instant we saw, used as zero point. - zero_time: Option, + zero_time: Option, /// The history of observed delay variations. history: VecDeque, @@ -102,17 +105,20 @@ impl TrendlineEstimator { } fn do_add_to_history(&mut self, variation: InterGroupDelayDelta, now: Instant) { - let zero_time = *self - .zero_time - .get_or_insert(variation.last_remote_recv_time); + let last_remote_recv_time = Timestamp::from(variation.last_remote_recv_time); + + let zero_time = *self.zero_time.get_or_insert(last_remote_recv_time); + + let delay_delta = variation.arrival_delta.as_secs_f64() * 1000.0 + - variation.send_delta.as_secs_f64() * 1000.0; self.num_delay_variations += 1; self.num_delay_variations = self.num_delay_variations.min(*DELAY_COUNT_RANGE.end()); - self.accumulated_delay += variation.delay_delta; + self.accumulated_delay += delay_delta; self.smoothed_delay = self.smoothed_delay * SMOOTHING_COEF + (1.0 - SMOOTHING_COEF) * self.accumulated_delay; - let remote_recv_time = variation.last_remote_recv_time - zero_time; + let remote_recv_time = last_remote_recv_time - zero_time; let timing = Timing { at: now, remote_recv_time_ms: remote_recv_time.as_secs_f64() * 1000.0, @@ -286,7 +292,7 @@ struct Timing { struct Overuse { count: usize, - time_overusing: Duration, + time_overusing: TimeDelta, } #[cfg(test)] @@ -304,14 +310,14 @@ mod test { let mut estimator = TrendlineEstimator::new(20); estimator.add_delay_observation( - delay_variation(0.0, duration_ms(1), remote_recv_time_base), + delay_variation(duration_ms(1), duration_ms(1), remote_recv_time_base), now, ); for _ in 0..25 { estimator.add_delay_observation( delay_variation( - 10.0, + duration_ms(11), duration_ms(1), remote_recv_time_base + duration_ms(350), ), @@ -332,7 +338,7 @@ mod test { for i in 0..5 { estimator.add_delay_observation( delay_variation( - 0.0, + duration_ms(1), duration_ms(1), remote_recv_time_base + Duration::from_micros(5_000 * g + i * 40), ), @@ -346,7 +352,7 @@ mod test { estimator.add_delay_observation( delay_variation( - 12.0, + duration_ms(17), duration_ms(5), remote_recv_time_base + Duration::from_micros(25_000), ), @@ -360,7 +366,7 @@ mod test { estimator.add_delay_observation( delay_variation( - 13.0, + duration_ms(18), duration_ms(5), remote_recv_time_base + Duration::from_micros(25_140), ), @@ -374,7 +380,7 @@ mod test { estimator.add_delay_observation( delay_variation( - 14.0, + duration_ms(22), duration_ms(8), remote_recv_time_base + Duration::from_micros(25_250), ), @@ -392,13 +398,13 @@ mod test { } fn delay_variation( - delay: f64, + recv_delta: Duration, send_delta: Duration, last_remote_recv_time: Instant, ) -> InterGroupDelayDelta { InterGroupDelayDelta { - send_delta, - delay_delta: delay, + send_delta: send_delta.into(), + arrival_delta: recv_delta.into(), last_remote_recv_time, } }