Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support negative time deltas in BWE #615

Merged
merged 6 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* Ensure lexical ordering of SDP-formatted candidates follows priority #557
* Limit TWCC iteration with packet status count #606
* Dedupe acked packets from `TwccSendRegister::apply_report()` #601, #605
* Align BWE ArrivalGroup calculation with libwebrtc implementation #608
* Align BWE ArrivalGroup calculation with libwebrtc implementation #608, #615

# 0.6.2

Expand Down
119 changes: 57 additions & 62 deletions src/packet/bwe/arrival_group.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::cmp::Ordering;
use std::mem;
use std::time::{Duration, Instant};

use crate::rtp_::SeqNo;

use super::AckedPacket;
use super::{
time::{TimeDelta, Timestamp},
AckedPacket,
};

const BURST_TIME_INTERVAL: Duration = Duration::from_millis(5);
const SEND_TIME_GROUP_LENGTH: Duration = Duration::from_millis(5);
Expand Down Expand Up @@ -62,32 +64,15 @@ impl ArrivalGroup {
return Belongs::Skipped;
};

let inter_arrival_time = {
let last_remote_recv_time = self.remote_recv_time();

if packet.remote_recv_time >= last_remote_recv_time {
(packet.remote_recv_time - last_remote_recv_time).as_secs_f64()
} else {
(last_remote_recv_time - packet.remote_recv_time).as_secs_f64() * -1.0
}
};

let last_send_delta = {
let last_send_time = self.local_send_time();

match packet.local_send_time.cmp(&last_send_time) {
Ordering::Equal => {
return Belongs::Yes;
}
Ordering::Greater => (packet.local_send_time - last_send_time).as_secs_f64(),
Ordering::Less => (last_send_time - packet.local_send_time).as_secs_f64() * -1.0,
}
};

let inter_group_delay_delta = inter_arrival_time - last_send_delta;
let send_time_delta = Timestamp::from(packet.local_send_time) - self.local_send_time();
if send_time_delta == TimeDelta::ZERO {
return Belongs::Yes;
}
let arrival_time_delta = Timestamp::from(packet.remote_recv_time) - self.remote_recv_time();

if inter_group_delay_delta < 0.0
&& inter_arrival_time <= BURST_TIME_INTERVAL.as_secs_f64()
let propagation_delta = arrival_time_delta - send_time_delta;
if propagation_delta < TimeDelta::ZERO
&& arrival_time_delta <= BURST_TIME_INTERVAL
&& packet.remote_recv_time - first_remote_recv_time < MAX_BURST_DURATION
{
Belongs::Yes
Expand All @@ -98,32 +83,14 @@ impl ArrivalGroup {
}
}

/// Calculate the inter group delay delta between self and a subsequent group.
pub(super) fn inter_group_delay_delta(&self, other: &Self) -> Option<f64> {
let first_seq_no = self.first.map(|(s, _, _)| s)?;
let last_seq_no = self.last_seq_no?;

let arrival_delta = self.arrival_delta(other)?.as_secs_f64() * 1000.0;
let departure_delta = self.departure_delta(other)?.as_secs_f64() * 1000.0;

assert!(arrival_delta >= 0.0);

let result = arrival_delta - departure_delta;
trace!("Delay delta for group({first_seq_no}..={last_seq_no}. {result:?} = {arrival_delta:?} - {departure_delta:?}");

Some(result)
}

pub(super) fn departure_delta(&self, other: &Self) -> Option<Duration> {
other
.local_send_time()
.checked_duration_since(self.local_send_time())
/// Calculate the send time delta between self and a subsequent group.
fn departure_delta(&self, other: &Self) -> TimeDelta {
Timestamp::from(other.local_send_time()) - self.local_send_time()
}

fn arrival_delta(&self, other: &Self) -> Option<Duration> {
other
.remote_recv_time()
.checked_duration_since(self.remote_recv_time())
/// Calculate the remote receive time delta between self and a subsequent group.
fn arrival_delta(&self, other: &Self) -> TimeDelta {
Timestamp::from(other.remote_recv_time()) - self.remote_recv_time()
}

/// The local send time i.e. departure time, for the group.
Expand Down Expand Up @@ -183,7 +150,7 @@ impl ArrivalGroupAccumulator {
}

// Variation between previous group and current.
let delay_delta = self.inter_group_delay_delta();
let arrival_delta = self.arrival_delta();
let send_delta = self.send_delta();
let last_remote_recv_time = self.current_group.remote_recv_time();

Expand All @@ -194,32 +161,32 @@ impl ArrivalGroupAccumulator {

Some(InterGroupDelayDelta {
send_delta: send_delta?,
delay_delta: delay_delta?,
arrival_delta: arrival_delta?,
last_remote_recv_time,
})
}

fn inter_group_delay_delta(&self) -> Option<f64> {
fn arrival_delta(&self) -> Option<TimeDelta> {
self.previous_group
.as_ref()
.and_then(|prev| prev.inter_group_delay_delta(&self.current_group))
.map(|prev| prev.arrival_delta(&self.current_group))
}

fn send_delta(&self) -> Option<Duration> {
fn send_delta(&self) -> Option<TimeDelta> {
self.previous_group
.as_ref()
.and_then(|prev| prev.departure_delta(&self.current_group))
.map(|prev| prev.departure_delta(&self.current_group))
}
}

/// The calculate delay delta between two groups of packets.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct InterGroupDelayDelta {
/// The delta between the send times of the two groups i.e. delta between the last packet sent
/// in each group.
pub(super) send_delta: Duration,
/// The delay delta between the two groups.
pub(super) delay_delta: f64,
pub(super) send_delta: TimeDelta,
/// The delta between the remote arrival times of the two groups.
pub(super) arrival_delta: TimeDelta,
/// The reported receive time for the last packet in the first arrival group.
pub(super) last_remote_recv_time: Instant,
}
Expand All @@ -230,7 +197,7 @@ mod test {

use crate::rtp_::DataSize;

use super::{AckedPacket, ArrivalGroup, Belongs};
use super::{AckedPacket, ArrivalGroup, ArrivalGroupAccumulator, Belongs, TimeDelta};

#[test]
fn test_arrival_group_all_packets_belong_to_empty_group() {
Expand Down Expand Up @@ -441,6 +408,34 @@ mod test {
assert_eq!(group.size, 3, "Expected group to contain 4 packets");
}

#[test]
fn group_reorder() {
let data = vec![
((Duration::from_millis(0), Duration::from_millis(0)), None),
((Duration::from_millis(60), Duration::from_millis(5)), None),
((Duration::from_millis(40), Duration::from_millis(10)), None),
(
(Duration::from_millis(70), Duration::from_millis(20)),
Some((TimeDelta::from_millis(-20), TimeDelta::from_millis(5))),
),
];

let now = Instant::now();
let mut aga = ArrivalGroupAccumulator::default();

for ((local_send_time, remote_recv_time), deltas) in data {
let group_delta = aga.accumulate_packet(&AckedPacket {
seq_no: Default::default(),
size: Default::default(),
local_send_time: now + local_send_time,
remote_recv_time: now + remote_recv_time,
local_recv_time: Instant::now(), // does not matter
});

assert_eq!(group_delta.map(|d| (d.send_delta, d.arrival_delta)), deltas);
}
}

fn duration_us(us: u64) -> Duration {
Duration::from_micros(us)
}
Expand Down
2 changes: 1 addition & 1 deletion src/packet/bwe/delay_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl DelayController {
.arrival_group_accumulator
.accumulate_packet(acked_packet)
{
crate::packet::bwe::macros::log_delay_variation!(delay_variation.delay_delta);
crate::packet::bwe::macros::log_delay_variation!(delay_variation.arrival_delta);

// Got a new delay variation, add it to the trendline
self.trendline_estimator
Expand Down
68 changes: 32 additions & 36 deletions src/packet/bwe/loss_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::packet::bwe::macros::log_loss_bw_limit_in_window;
use crate::rtp_::TwccSendRecord;
use crate::{Bitrate, DataSize};

use super::super_instant::SuperInstant;
use super::time::{TimeDelta, Timestamp};

/// Loss controller based loosely on libWebRTC's `LossBasedBweV2`(commit `14e2779a6ccdc67038ed2069a5732dd41617c6f0`)
/// We don't implement ALR, link capacity estimates or probing(although we use constant padding
Expand Down Expand Up @@ -38,7 +38,7 @@ pub struct LossController {
partial_observation: PartialObservation,

/// The last packet sent in the most recent observation.
last_send_time_most_recent_observation: SuperInstant,
last_send_time_most_recent_observation: Timestamp,

// Observation window
/// Forever growing counter of observations. Observation::id derives from this.
Expand All @@ -53,11 +53,11 @@ pub struct LossController {
/// Precomputed instantaneous upper bound on bandwidth estimate.
cached_instant_upper_bound: Option<Bitrate>,
/// Last time we reduced the estimate.
last_time_estimate_reduced: SuperInstant,
last_time_estimate_reduced: Timestamp,

/// When we started recovering after being loss limited last time.
/// While in this window the bandwidth estimate is bounded by `bandwidth_limit_in_current_window`.
recovering_after_loss_timestamp: SuperInstant,
recovering_after_loss_timestamp: Timestamp,
/// Upper bound on estimate while in recovery window.
bandwidth_limit_in_current_window: Bitrate,

Expand Down Expand Up @@ -105,16 +105,16 @@ impl LossController {
let mut controller = LossController {
state: LossControllerState::DelayBased,
partial_observation: PartialObservation::new(),
last_send_time_most_recent_observation: SuperInstant::DistantFuture,
last_send_time_most_recent_observation: Timestamp::DistantFuture,
observations: vec![Observation::DUMMY; config.observation_window_size]
.into_boxed_slice(),
num_observations: 0,
temporal_weights: vec![0_f64; config.observation_window_size].into_boxed_slice(),
instant_upper_bound_temporal_weights: vec![0_f64; config.observation_window_size]
.into_boxed_slice(),
cached_instant_upper_bound: None,
last_time_estimate_reduced: SuperInstant::DistantPast,
recovering_after_loss_timestamp: SuperInstant::DistantPast,
last_time_estimate_reduced: Timestamp::DistantPast,
recovering_after_loss_timestamp: Timestamp::DistantPast,
bandwidth_limit_in_current_window: Bitrate::MAX,

current_estimate: ChannelParameters::new(config.initial_inherent_loss_estimate),
Expand Down Expand Up @@ -212,7 +212,7 @@ impl LossController {
// `delayed_increase_window` ago, and
// 2. The best candidate is greater than bandwidth_limit_in_current_window.

if self.recovering_after_loss_timestamp.is_finite()
if self.recovering_after_loss_timestamp.is_exact()
&& self.recovering_after_loss_timestamp + self.config.delayed_increase_window
> self.last_send_time_most_recent_observation
&& best_candidate.loss_limited_bandwidth > self.bandwidth_limit_in_current_window
Expand Down Expand Up @@ -258,7 +258,7 @@ impl LossController {
const CONF_MAX_INCREASE_FACTOR: f64 = 1.3;

if self.is_bandwidth_limited_due_to_loss()
&& (self.recovering_after_loss_timestamp.is_not_finite()
&& (!self.recovering_after_loss_timestamp.is_exact()
|| self.recovering_after_loss_timestamp + self.config.delayed_increase_window
< self.last_send_time_most_recent_observation)
{
Expand Down Expand Up @@ -315,19 +315,15 @@ impl LossController {
return false;
};

let last_send_time = summary.last_send_time;
let last_send_time = Timestamp::from(summary.last_send_time);

self.partial_observation.update(summary);

if !self.last_send_time_most_recent_observation.is_finite() {
self.last_send_time_most_recent_observation = last_send_time.into();
if !self.last_send_time_most_recent_observation.is_exact() {
self.last_send_time_most_recent_observation = last_send_time;
}

let observation_duration = last_send_time
- self
.last_send_time_most_recent_observation
.as_instant()
.expect("instant is not finite");
let observation_duration = last_send_time - self.last_send_time_most_recent_observation;

if observation_duration <= Duration::ZERO {
return false;
Expand All @@ -338,7 +334,7 @@ impl LossController {
return false;
}

self.last_send_time_most_recent_observation = last_send_time.into();
self.last_send_time_most_recent_observation = last_send_time;

let observation = {
let id = self.num_observations;
Expand Down Expand Up @@ -585,24 +581,24 @@ impl LossController {
return upper_bound;
}

if self.config.rampup_acceleration_max_factor > Duration::ZERO {
if let (Some(most_recent), Some(reduced)) = (
self.last_send_time_most_recent_observation.as_instant(),
self.last_time_estimate_reduced.as_instant(),
) {
let delta = most_recent - reduced;
let time_since_bw_reduced = self
.config
.rampup_acceleration_maxout_time
.min(delta.max(Duration::ZERO))
.as_secs_f64();

let rampup_acceleration = self.config.rampup_acceleration_max_factor.as_secs_f64()
* time_since_bw_reduced
/ self.config.rampup_acceleration_maxout_time.as_secs_f64();

upper_bound = upper_bound + (self.acknowledged_bitrate * rampup_acceleration);
}
if self.config.rampup_acceleration_max_factor > Duration::ZERO
&& self.last_send_time_most_recent_observation.is_exact()
&& self.last_time_estimate_reduced.is_exact()
{
let delta = (self.last_send_time_most_recent_observation
- self.last_time_estimate_reduced)
.max(TimeDelta::ZERO);
let time_since_bw_reduced = self
.config
.rampup_acceleration_maxout_time
.as_secs_f64()
.min(delta.as_secs_f64());

let rampup_acceleration = self.config.rampup_acceleration_max_factor.as_secs_f64()
* time_since_bw_reduced
/ self.config.rampup_acceleration_maxout_time.as_secs_f64();

upper_bound = upper_bound + (self.acknowledged_bitrate * rampup_acceleration);
}

upper_bound
Expand Down
2 changes: 1 addition & 1 deletion src/packet/bwe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod delay_controller;
mod loss_controller;
pub(crate) mod macros;
mod rate_control;
mod super_instant;
mod time;
mod trendline_estimator;

use acked_bitrate_estimator::AckedBitrateEstimator;
Expand Down
Loading
Loading