Skip to content

Commit

Permalink
Support negative time deltas in BWE
Browse files Browse the repository at this point in the history
  • Loading branch information
alexlapa authored Jan 24, 2025
1 parent 2b1bfee commit d433ab4
Show file tree
Hide file tree
Showing 8 changed files with 863 additions and 273 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* Ensure lexical ordering of SDP-formatted candidates follows priority #557
* Limit TWCC iteration with packet status count #606
* Dedupe acked packets from `TwccSendRegister::apply_report()` #601, #605
* Align BWE ArrivalGroup calculation with libwebrtc implementation #608
* Align BWE ArrivalGroup calculation with libwebrtc implementation #608, #615

# 0.6.2

Expand Down
119 changes: 57 additions & 62 deletions src/packet/bwe/arrival_group.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::cmp::Ordering;
use std::mem;
use std::time::{Duration, Instant};

use crate::rtp_::SeqNo;

use super::AckedPacket;
use super::{
time::{TimeDelta, Timestamp},
AckedPacket,
};

const BURST_TIME_INTERVAL: Duration = Duration::from_millis(5);
const SEND_TIME_GROUP_LENGTH: Duration = Duration::from_millis(5);
Expand Down Expand Up @@ -62,32 +64,15 @@ impl ArrivalGroup {
return Belongs::Skipped;
};

let inter_arrival_time = {
let last_remote_recv_time = self.remote_recv_time();

if packet.remote_recv_time >= last_remote_recv_time {
(packet.remote_recv_time - last_remote_recv_time).as_secs_f64()
} else {
(last_remote_recv_time - packet.remote_recv_time).as_secs_f64() * -1.0
}
};

let last_send_delta = {
let last_send_time = self.local_send_time();

match packet.local_send_time.cmp(&last_send_time) {
Ordering::Equal => {
return Belongs::Yes;
}
Ordering::Greater => (packet.local_send_time - last_send_time).as_secs_f64(),
Ordering::Less => (last_send_time - packet.local_send_time).as_secs_f64() * -1.0,
}
};

let inter_group_delay_delta = inter_arrival_time - last_send_delta;
let send_time_delta = Timestamp::from(packet.local_send_time) - self.local_send_time();
if send_time_delta == TimeDelta::ZERO {
return Belongs::Yes;
}
let arrival_time_delta = Timestamp::from(packet.remote_recv_time) - self.remote_recv_time();

if inter_group_delay_delta < 0.0
&& inter_arrival_time <= BURST_TIME_INTERVAL.as_secs_f64()
let propagation_delta = arrival_time_delta - send_time_delta;
if propagation_delta < TimeDelta::ZERO
&& arrival_time_delta <= BURST_TIME_INTERVAL
&& packet.remote_recv_time - first_remote_recv_time < MAX_BURST_DURATION
{
Belongs::Yes
Expand All @@ -98,32 +83,14 @@ impl ArrivalGroup {
}
}

/// Calculate the inter group delay delta between self and a subsequent group.
pub(super) fn inter_group_delay_delta(&self, other: &Self) -> Option<f64> {
let first_seq_no = self.first.map(|(s, _, _)| s)?;
let last_seq_no = self.last_seq_no?;

let arrival_delta = self.arrival_delta(other)?.as_secs_f64() * 1000.0;
let departure_delta = self.departure_delta(other)?.as_secs_f64() * 1000.0;

assert!(arrival_delta >= 0.0);

let result = arrival_delta - departure_delta;
trace!("Delay delta for group({first_seq_no}..={last_seq_no}. {result:?} = {arrival_delta:?} - {departure_delta:?}");

Some(result)
}

pub(super) fn departure_delta(&self, other: &Self) -> Option<Duration> {
other
.local_send_time()
.checked_duration_since(self.local_send_time())
/// Calculate the send time delta between self and a subsequent group.
fn departure_delta(&self, other: &Self) -> TimeDelta {
Timestamp::from(other.local_send_time()) - self.local_send_time()
}

fn arrival_delta(&self, other: &Self) -> Option<Duration> {
other
.remote_recv_time()
.checked_duration_since(self.remote_recv_time())
/// Calculate the remote receive time delta between self and a subsequent group.
fn arrival_delta(&self, other: &Self) -> TimeDelta {
Timestamp::from(other.remote_recv_time()) - self.remote_recv_time()
}

/// The local send time i.e. departure time, for the group.
Expand Down Expand Up @@ -183,7 +150,7 @@ impl ArrivalGroupAccumulator {
}

// Variation between previous group and current.
let delay_delta = self.inter_group_delay_delta();
let arrival_delta = self.arrival_delta();
let send_delta = self.send_delta();
let last_remote_recv_time = self.current_group.remote_recv_time();

Expand All @@ -194,32 +161,32 @@ impl ArrivalGroupAccumulator {

Some(InterGroupDelayDelta {
send_delta: send_delta?,
delay_delta: delay_delta?,
arrival_delta: arrival_delta?,
last_remote_recv_time,
})
}

fn inter_group_delay_delta(&self) -> Option<f64> {
fn arrival_delta(&self) -> Option<TimeDelta> {
self.previous_group
.as_ref()
.and_then(|prev| prev.inter_group_delay_delta(&self.current_group))
.map(|prev| prev.arrival_delta(&self.current_group))
}

fn send_delta(&self) -> Option<Duration> {
fn send_delta(&self) -> Option<TimeDelta> {
self.previous_group
.as_ref()
.and_then(|prev| prev.departure_delta(&self.current_group))
.map(|prev| prev.departure_delta(&self.current_group))
}
}

/// The calculate delay delta between two groups of packets.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct InterGroupDelayDelta {
/// The delta between the send times of the two groups i.e. delta between the last packet sent
/// in each group.
pub(super) send_delta: Duration,
/// The delay delta between the two groups.
pub(super) delay_delta: f64,
pub(super) send_delta: TimeDelta,
/// The delta between the remote arrival times of the two groups.
pub(super) arrival_delta: TimeDelta,
/// The reported receive time for the last packet in the first arrival group.
pub(super) last_remote_recv_time: Instant,
}
Expand All @@ -230,7 +197,7 @@ mod test {

use crate::rtp_::DataSize;

use super::{AckedPacket, ArrivalGroup, Belongs};
use super::{AckedPacket, ArrivalGroup, ArrivalGroupAccumulator, Belongs, TimeDelta};

#[test]
fn test_arrival_group_all_packets_belong_to_empty_group() {
Expand Down Expand Up @@ -441,6 +408,34 @@ mod test {
assert_eq!(group.size, 3, "Expected group to contain 4 packets");
}

#[test]
fn group_reorder() {
let data = vec![
((Duration::from_millis(0), Duration::from_millis(0)), None),
((Duration::from_millis(60), Duration::from_millis(5)), None),
((Duration::from_millis(40), Duration::from_millis(10)), None),
(
(Duration::from_millis(70), Duration::from_millis(20)),
Some((TimeDelta::from_millis(-20), TimeDelta::from_millis(5))),
),
];

let now = Instant::now();
let mut aga = ArrivalGroupAccumulator::default();

for ((local_send_time, remote_recv_time), deltas) in data {
let group_delta = aga.accumulate_packet(&AckedPacket {
seq_no: Default::default(),
size: Default::default(),
local_send_time: now + local_send_time,
remote_recv_time: now + remote_recv_time,
local_recv_time: Instant::now(), // does not matter
});

assert_eq!(group_delta.map(|d| (d.send_delta, d.arrival_delta)), deltas);
}
}

fn duration_us(us: u64) -> Duration {
Duration::from_micros(us)
}
Expand Down
2 changes: 1 addition & 1 deletion src/packet/bwe/delay_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl DelayController {
.arrival_group_accumulator
.accumulate_packet(acked_packet)
{
crate::packet::bwe::macros::log_delay_variation!(delay_variation.delay_delta);
crate::packet::bwe::macros::log_delay_variation!(delay_variation.arrival_delta);

// Got a new delay variation, add it to the trendline
self.trendline_estimator
Expand Down
68 changes: 32 additions & 36 deletions src/packet/bwe/loss_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::packet::bwe::macros::log_loss_bw_limit_in_window;
use crate::rtp_::TwccSendRecord;
use crate::{Bitrate, DataSize};

use super::super_instant::SuperInstant;
use super::time::{TimeDelta, Timestamp};

/// Loss controller based loosely on libWebRTC's `LossBasedBweV2`(commit `14e2779a6ccdc67038ed2069a5732dd41617c6f0`)
/// We don't implement ALR, link capacity estimates or probing(although we use constant padding
Expand Down Expand Up @@ -38,7 +38,7 @@ pub struct LossController {
partial_observation: PartialObservation,

/// The last packet sent in the most recent observation.
last_send_time_most_recent_observation: SuperInstant,
last_send_time_most_recent_observation: Timestamp,

// Observation window
/// Forever growing counter of observations. Observation::id derives from this.
Expand All @@ -53,11 +53,11 @@ pub struct LossController {
/// Precomputed instantaneous upper bound on bandwidth estimate.
cached_instant_upper_bound: Option<Bitrate>,
/// Last time we reduced the estimate.
last_time_estimate_reduced: SuperInstant,
last_time_estimate_reduced: Timestamp,

/// When we started recovering after being loss limited last time.
/// While in this window the bandwidth estimate is bounded by `bandwidth_limit_in_current_window`.
recovering_after_loss_timestamp: SuperInstant,
recovering_after_loss_timestamp: Timestamp,
/// Upper bound on estimate while in recovery window.
bandwidth_limit_in_current_window: Bitrate,

Expand Down Expand Up @@ -105,16 +105,16 @@ impl LossController {
let mut controller = LossController {
state: LossControllerState::DelayBased,
partial_observation: PartialObservation::new(),
last_send_time_most_recent_observation: SuperInstant::DistantFuture,
last_send_time_most_recent_observation: Timestamp::DistantFuture,
observations: vec![Observation::DUMMY; config.observation_window_size]
.into_boxed_slice(),
num_observations: 0,
temporal_weights: vec![0_f64; config.observation_window_size].into_boxed_slice(),
instant_upper_bound_temporal_weights: vec![0_f64; config.observation_window_size]
.into_boxed_slice(),
cached_instant_upper_bound: None,
last_time_estimate_reduced: SuperInstant::DistantPast,
recovering_after_loss_timestamp: SuperInstant::DistantPast,
last_time_estimate_reduced: Timestamp::DistantPast,
recovering_after_loss_timestamp: Timestamp::DistantPast,
bandwidth_limit_in_current_window: Bitrate::MAX,

current_estimate: ChannelParameters::new(config.initial_inherent_loss_estimate),
Expand Down Expand Up @@ -212,7 +212,7 @@ impl LossController {
// `delayed_increase_window` ago, and
// 2. The best candidate is greater than bandwidth_limit_in_current_window.

if self.recovering_after_loss_timestamp.is_finite()
if self.recovering_after_loss_timestamp.is_exact()
&& self.recovering_after_loss_timestamp + self.config.delayed_increase_window
> self.last_send_time_most_recent_observation
&& best_candidate.loss_limited_bandwidth > self.bandwidth_limit_in_current_window
Expand Down Expand Up @@ -258,7 +258,7 @@ impl LossController {
const CONF_MAX_INCREASE_FACTOR: f64 = 1.3;

if self.is_bandwidth_limited_due_to_loss()
&& (self.recovering_after_loss_timestamp.is_not_finite()
&& (!self.recovering_after_loss_timestamp.is_exact()
|| self.recovering_after_loss_timestamp + self.config.delayed_increase_window
< self.last_send_time_most_recent_observation)
{
Expand Down Expand Up @@ -315,19 +315,15 @@ impl LossController {
return false;
};

let last_send_time = summary.last_send_time;
let last_send_time = Timestamp::from(summary.last_send_time);

self.partial_observation.update(summary);

if !self.last_send_time_most_recent_observation.is_finite() {
self.last_send_time_most_recent_observation = last_send_time.into();
if !self.last_send_time_most_recent_observation.is_exact() {
self.last_send_time_most_recent_observation = last_send_time;
}

let observation_duration = last_send_time
- self
.last_send_time_most_recent_observation
.as_instant()
.expect("instant is not finite");
let observation_duration = last_send_time - self.last_send_time_most_recent_observation;

if observation_duration <= Duration::ZERO {
return false;
Expand All @@ -338,7 +334,7 @@ impl LossController {
return false;
}

self.last_send_time_most_recent_observation = last_send_time.into();
self.last_send_time_most_recent_observation = last_send_time;

let observation = {
let id = self.num_observations;
Expand Down Expand Up @@ -585,24 +581,24 @@ impl LossController {
return upper_bound;
}

if self.config.rampup_acceleration_max_factor > Duration::ZERO {
if let (Some(most_recent), Some(reduced)) = (
self.last_send_time_most_recent_observation.as_instant(),
self.last_time_estimate_reduced.as_instant(),
) {
let delta = most_recent - reduced;
let time_since_bw_reduced = self
.config
.rampup_acceleration_maxout_time
.min(delta.max(Duration::ZERO))
.as_secs_f64();

let rampup_acceleration = self.config.rampup_acceleration_max_factor.as_secs_f64()
* time_since_bw_reduced
/ self.config.rampup_acceleration_maxout_time.as_secs_f64();

upper_bound = upper_bound + (self.acknowledged_bitrate * rampup_acceleration);
}
if self.config.rampup_acceleration_max_factor > Duration::ZERO
&& self.last_send_time_most_recent_observation.is_exact()
&& self.last_time_estimate_reduced.is_exact()
{
let delta = (self.last_send_time_most_recent_observation
- self.last_time_estimate_reduced)
.max(TimeDelta::ZERO);
let time_since_bw_reduced = self
.config
.rampup_acceleration_maxout_time
.as_secs_f64()
.min(delta.as_secs_f64());

let rampup_acceleration = self.config.rampup_acceleration_max_factor.as_secs_f64()
* time_since_bw_reduced
/ self.config.rampup_acceleration_maxout_time.as_secs_f64();

upper_bound = upper_bound + (self.acknowledged_bitrate * rampup_acceleration);
}

upper_bound
Expand Down
2 changes: 1 addition & 1 deletion src/packet/bwe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod delay_controller;
mod loss_controller;
pub(crate) mod macros;
mod rate_control;
mod super_instant;
mod time;
mod trendline_estimator;

use acked_bitrate_estimator::AckedBitrateEstimator;
Expand Down
Loading

0 comments on commit d433ab4

Please sign in to comment.