Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure RTX ratio cap #570

Merged
merged 5 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Fix bug using unreliable channels by default #548
* New add_channel_with_config() for configured data channels #548
* Fix RTX stops working after packet loss spike #566
* Configure RTX ratio cap via `StreamTx::set_rtx_cache` #570

# 0.6.1
* Force openssl to be >=0.10.66 #545
Expand Down
4 changes: 2 additions & 2 deletions src/change/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::crypto::Fingerprint;
use crate::media::{Media, MediaKind};
use crate::rtp_::{Mid, Rid, Ssrc};
use crate::sctp::ChannelConfig;
use crate::streams::{StreamRx, StreamTx, DEFAULT_RTX_CACHE_DURATION};
use crate::streams::{StreamRx, StreamTx, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP};
use crate::IceCreds;
use crate::Rtc;
use crate::RtcError;
Expand Down Expand Up @@ -246,7 +246,7 @@ impl<'a> DirectApi<'a> {
self.rtc.session.send_buffer_video
};

stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION);
stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP);

stream
}
Expand Down
7 changes: 3 additions & 4 deletions src/change/sdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use crate::RtcError;
use crate::{Candidate, IceCreds};

pub use crate::sdp::{SdpAnswer, SdpOffer};
use crate::streams::Streams;
use crate::streams::DEFAULT_RTX_CACHE_DURATION;
use crate::streams::{Streams, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP};

/// Changes to the Rtc via SDP Offer/Answer dance.
pub struct SdpApi<'a> {
Expand Down Expand Up @@ -863,7 +862,7 @@ fn ensure_stream_tx(session: &mut Session) {
session.send_buffer_video
};

stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION);
stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP);
}
}
}
Expand Down Expand Up @@ -899,7 +898,7 @@ fn add_pending_changes(session: &mut Session, pending: Changes) {
session.send_buffer_video
};

stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION);
stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) mod rtx_cache_buf;
mod send;
mod send_queue;

pub(crate) use send::DEFAULT_RTX_CACHE_DURATION;
pub(crate) use send::{DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP};

// Time between regular receiver reports.
// https://www.rfc-editor.org/rfc/rfc8829#section-5.1.2
Expand Down
106 changes: 89 additions & 17 deletions src/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const MIN_SPURIOUS_PADDING_SIZE: usize = 50;

pub const DEFAULT_RTX_CACHE_DURATION: Duration = Duration::from_secs(3);

pub const DEFAULT_RTX_RATIO_CAP: Option<f32> = Some(0.15f32);

/// Outgoing encoded stream.
///
/// A stream is a primary SSRC + optional RTX SSRC.
Expand Down Expand Up @@ -102,6 +104,9 @@ pub struct StreamTx {
/// sending spurious resends as padding.
rtx_cache: RtxCache,

/// Determines retransmitted bytes ratio value to clear queued resends.
rtx_ratio_cap: Option<f32>,

/// Last time we produced a SR.
last_sender_report: Instant,

Expand All @@ -123,7 +128,7 @@ pub struct StreamTx {
}

/// Holder of stats.
#[derive(Debug, Default)]
#[derive(Debug)]
pub(crate) struct StreamTxStats {
/// count of bytes sent, including retransmissions
/// <https://www.w3.org/TR/webrtc-stats/#dom-rtcsentrtpstreamstats-bytessent>
Expand All @@ -146,8 +151,30 @@ pub(crate) struct StreamTxStats {
rtt: Option<f32>,
/// losses collecter from RR (known packets, lost ratio)
losses: Vec<(u64, f32)>,
bytes_transmitted: ValueHistory<u64>,
bytes_retransmitted: ValueHistory<u64>,

/// `None` if `rtx_ratio_cap` is `None`.
bytes_transmitted: Option<ValueHistory<u64>>,

/// `None` if `rtx_ratio_cap` is `None`.
bytes_retransmitted: Option<ValueHistory<u64>>,
}

impl Default for StreamTxStats {
fn default() -> Self {
Self {
bytes: 0,
bytes_resent: 0,
packets: 0,
packets_resent: 0,
firs: 0,
plis: 0,
nacks: 0,
rtt: None,
losses: Vec::default(),
bytes_transmitted: Some(Default::default()),
bytes_retransmitted: Some(Default::default()),
}
}
}

impl StreamTx {
Expand All @@ -172,6 +199,7 @@ impl StreamTx {
padding: 0,
blank_packet: RtpPacket::blank(),
rtx_cache: RtxCache::new(2000, DEFAULT_RTX_CACHE_DURATION),
rtx_ratio_cap: DEFAULT_RTX_RATIO_CAP,
last_sender_report: already_happened(),
pending_request_keyframe: None,
pending_request_remb: None,
Expand Down Expand Up @@ -209,10 +237,31 @@ impl StreamTx {
///
/// This determines how old incoming NACKs we can reply to.
///
/// The default is 1024 packets over 3 seconds.
pub fn set_rtx_cache(&mut self, max_packets: usize, max_age: Duration) {
/// `rtx_ratio_cap` determines when to clear queued resends because of too many resends,
/// i.e. if `tx_sum / (rtx_sum + tx_sum) > rtx_ratio_cap`. `None` disables this functionality
/// so all queued resends will be sent.
///
/// The default is 1024 packets over 3 seconds and RTX cache drop ratio of 0.15.
pub fn set_rtx_cache(
&mut self,
max_packets: usize,
max_age: Duration,
rtx_ratio_cap: Option<f32>,
) {
// Dump old cache to avoid having to deal with resizing logic inside the cache impl.
self.rtx_cache = RtxCache::new(max_packets, max_age);
if rtx_ratio_cap.is_some() {
self.stats
.bytes_transmitted
.get_or_insert_with(ValueHistory::default);
self.stats
.bytes_retransmitted
.get_or_insert_with(ValueHistory::default);
} else {
self.stats.bytes_transmitted = None;
self.stats.bytes_retransmitted = None;
}
self.rtx_ratio_cap = rtx_ratio_cap;
}

/// Set whether this stream is unpaced or not.
Expand Down Expand Up @@ -523,31 +572,50 @@ impl StreamTx {
}

fn rtx_ratio_downsampled(&mut self, now: Instant) -> f32 {
assert!(
self.stats.bytes_transmitted.is_some(),
"rtx_ratio_cap must be enabled"
);
assert!(
self.stats.bytes_retransmitted.is_some(),
"rtx_ratio_cap must be enabled"
);

let (value, ts) = self.rtx_ratio;
if now - ts < Duration::from_millis(50) {
// not worth re-evaluating, return the old value
return value;
}

// bytes stats refer to the last second by default
self.stats.bytes_transmitted.purge_old(now);
self.stats.bytes_retransmitted.purge_old(now);

let bytes_transmitted = self.stats.bytes_transmitted.sum();
let bytes_retransmitted = self.stats.bytes_retransmitted.sum();
self.stats
.bytes_transmitted
.as_mut()
.unwrap()
.purge_old(now);
self.stats
.bytes_retransmitted
.as_mut()
.unwrap()
.purge_old(now);

let bytes_transmitted = self.stats.bytes_transmitted.as_mut().unwrap().sum();
let bytes_retransmitted = self.stats.bytes_retransmitted.as_mut().unwrap().sum();
let ratio = bytes_retransmitted as f32 / (bytes_retransmitted + bytes_transmitted) as f32;
let ratio = if ratio.is_finite() { ratio } else { 0_f32 };
self.rtx_ratio = (ratio, now);
ratio
}

fn poll_packet_resend(&mut self, now: Instant) -> Option<NextPacket<'_>> {
let ratio = self.rtx_ratio_downsampled(now);
if let Some(ratio_cap) = self.rtx_ratio_cap {
let ratio = self.rtx_ratio_downsampled(now);

// If we hit the cap, stop doing resends by clearing those we have queued.
if ratio > 0.15_f32 {
self.resends.clear();
return None;
// If we hit the cap, stop doing resends by clearing those we have queued.
if ratio > ratio_cap {
self.resends.clear();
return None;
}
}

let seq_no = loop {
Expand All @@ -573,7 +641,9 @@ impl StreamTx {

let len = pkt.payload.len() as u64;
self.stats.update_packet_counts(len, true);
self.stats.bytes_retransmitted.push(now, len);
if let Some(h) = &mut self.stats.bytes_retransmitted {
h.push(now, len);
}

let seq_no = self.seq_no_rtx.inc();

Expand All @@ -596,7 +666,9 @@ impl StreamTx {

let len = pkt.payload.len() as u64;
self.stats.update_packet_counts(len, false);
self.stats.bytes_transmitted.push(now, len);
if let Some(h) = &mut self.stats.bytes_transmitted {
h.push(now, len)
}

let seq_no = pkt.seq_no;

Expand Down
2 changes: 1 addition & 1 deletion tests/rtx-cache-0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn rtx_cache_0() -> Result<(), RtcError> {
.declare_stream_tx(ssrc_tx, None, mid, Some(rid))
//
// disable RTX cache by setting 0
.set_rtx_cache(0, Duration::ZERO);
.set_rtx_cache(0, Duration::ZERO, Some(0.15));

r.direct_api()
.declare_media(mid, MediaKind::Audio)
Expand Down
Loading