Skip to content

Commit

Permalink
Fix RTX stops working after packet loss spike
Browse files Browse the repository at this point in the history
  • Loading branch information
alexlapa authored Sep 26, 2024
1 parent 35d9bba commit ed9b9eb
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Ensure compatibility with some 32-bit targets #533
* Fix bug using unreliable channels by default #548
* New add_channel_with_config() for configured data channels #548
* Fix RTX stops working after packet loss spike #566

# 0.6.1
* Force openssl to be >=0.10.66 #545
Expand Down
3 changes: 3 additions & 0 deletions src/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,9 @@ impl StreamTx {
}

// bytes stats refer to the last second by default
self.stats.bytes_transmitted.purge_old(now);
self.stats.bytes_retransmitted.purge_old(now);

let bytes_transmitted = self.stats.bytes_transmitted.sum();
let bytes_retransmitted = self.stats.bytes_retransmitted.sum();
let ratio = bytes_retransmitted as f32 / (bytes_retransmitted + bytes_transmitted) as f32;
Expand Down
47 changes: 38 additions & 9 deletions src/util/value_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,26 @@ where
pub fn push(&mut self, t: Instant, v: T) {
self.value += v;
self.history.push_back((t, v));
self.drain(t);
}

/// Returns the sum of all values in the history up to max_time
/// This is more efficient than sum_since() as it does not need to iterate over the history
/// Returns the sum of all values in the history up to max_time. Might
/// return stale value unless [`ValueHistory::purge_old`] is called before.
pub fn sum(&self) -> T {
self.value
}

fn drain(&mut self, t: Instant) -> Option<()> {
while t.duration_since(self.history.front()?.0) > self.max_time {
/// Recalculates sum purging values older than `now - max_time`.
pub fn purge_old(&mut self, now: Instant) {
while {
let Some(front_t) = self.history.front().map(|v| v.0) else {
return;
};
now.duration_since(front_t) > self.max_time
} {
if let Some((_, v)) = self.history.pop_front() {
self.value -= v;
}
}

Some(())
}
}

Expand All @@ -63,7 +66,7 @@ mod test {
use super::ValueHistory;

#[test]
fn test() {
fn with_value_test() {
let now = Instant::now();

let mut h = ValueHistory {
Expand All @@ -72,11 +75,37 @@ mod test {
..Default::default()
};

assert_eq!(h.sum(), 11);
h.purge_old(now);
assert_eq!(h.sum(), 11);
h.push(now - Duration::from_millis(1500), 22);
h.push(now - Duration::from_millis(500), 22);
assert_eq!(h.sum(), 55);
assert_eq!(h.sum(), 11 + 22 + 22);
h.purge_old(now);
assert_eq!(h.sum(), 11 + 22);
h.push(now, 0);
assert_eq!(h.sum(), 11 + 22);
}

#[test]
fn test() {
let now = Instant::now();
let mut h = ValueHistory::default();

assert_eq!(h.sum(), 0);
h.push(now - Duration::from_millis(1500), 22);
assert_eq!(h.sum(), 22);
h.purge_old(now);
assert_eq!(h.sum(), 0);
h.push(now - Duration::from_millis(700), 22);
h.push(now - Duration::from_millis(500), 33);
assert_eq!(h.sum(), 22 + 33);
h.purge_old(now);
assert_eq!(h.sum(), 22 + 33);

h.purge_old(now + Duration::from_millis(400));
assert_eq!(h.sum(), 33);
h.purge_old(now + Duration::from_millis(600));
assert_eq!(h.sum(), 0);
}
}

0 comments on commit ed9b9eb

Please sign in to comment.