From fbf23060a5e0a476ba20e9af15f1b1f5369b1918 Mon Sep 17 00:00:00 2001 From: alexlapa Date: Thu, 26 Sep 2024 12:38:52 +0300 Subject: [PATCH 1/3] wip --- src/change/direct.rs | 10 ++++-- src/change/sdp.rs | 15 ++++++--- src/streams/mod.rs | 2 +- src/streams/send.rs | 80 +++++++++++++++++++++++++++++++++++--------- tests/rtx-cache-0.rs | 2 +- 5 files changed, 85 insertions(+), 24 deletions(-) diff --git a/src/change/direct.rs b/src/change/direct.rs index 17f9c5a5..a0cd1c18 100644 --- a/src/change/direct.rs +++ b/src/change/direct.rs @@ -3,7 +3,9 @@ use crate::crypto::Fingerprint; use crate::media::{Media, MediaKind}; use crate::rtp_::{Mid, Rid, Ssrc}; use crate::sctp::ChannelConfig; -use crate::streams::{StreamRx, StreamTx, DEFAULT_RTX_CACHE_DURATION}; +use crate::streams::{ + StreamRx, StreamTx, DEFAULT_RTX_CACHE_DROP_RATIO, DEFAULT_RTX_CACHE_DURATION, +}; use crate::IceCreds; use crate::Rtc; use crate::RtcError; @@ -246,7 +248,11 @@ impl<'a> DirectApi<'a> { self.rtc.session.send_buffer_video }; - stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION); + stream.set_rtx_cache( + size, + DEFAULT_RTX_CACHE_DURATION, + DEFAULT_RTX_CACHE_DROP_RATIO, + ); stream } diff --git a/src/change/sdp.rs b/src/change/sdp.rs index 4c115794..20e9a819 100644 --- a/src/change/sdp.rs +++ b/src/change/sdp.rs @@ -22,8 +22,7 @@ use crate::RtcError; use crate::{Candidate, IceCreds}; pub use crate::sdp::{SdpAnswer, SdpOffer}; -use crate::streams::Streams; -use crate::streams::DEFAULT_RTX_CACHE_DURATION; +use crate::streams::{Streams, DEFAULT_RTX_CACHE_DROP_RATIO, DEFAULT_RTX_CACHE_DURATION}; /// Changes to the Rtc via SDP Offer/Answer dance. pub struct SdpApi<'a> { @@ -863,7 +862,11 @@ fn ensure_stream_tx(session: &mut Session) { session.send_buffer_video }; - stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION); + stream.set_rtx_cache( + size, + DEFAULT_RTX_CACHE_DURATION, + DEFAULT_RTX_CACHE_DROP_RATIO, + ); } } } @@ -899,7 +902,11 @@ fn add_pending_changes(session: &mut Session, pending: Changes) { session.send_buffer_video }; - stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION); + stream.set_rtx_cache( + size, + DEFAULT_RTX_CACHE_DURATION, + DEFAULT_RTX_CACHE_DROP_RATIO, + ); } } } diff --git a/src/streams/mod.rs b/src/streams/mod.rs index c653ca9f..b2069e43 100644 --- a/src/streams/mod.rs +++ b/src/streams/mod.rs @@ -24,7 +24,7 @@ pub(crate) mod rtx_cache_buf; mod send; mod send_queue; -pub(crate) use send::DEFAULT_RTX_CACHE_DURATION; +pub(crate) use send::{DEFAULT_RTX_CACHE_DROP_RATIO, DEFAULT_RTX_CACHE_DURATION}; // Time between regular receiver reports. // https://www.rfc-editor.org/rfc/rfc8829#section-5.1.2 diff --git a/src/streams/send.rs b/src/streams/send.rs index c6c891b1..dc19c76f 100644 --- a/src/streams/send.rs +++ b/src/streams/send.rs @@ -38,6 +38,8 @@ const MIN_SPURIOUS_PADDING_SIZE: usize = 50; pub const DEFAULT_RTX_CACHE_DURATION: Duration = Duration::from_secs(3); +pub const DEFAULT_RTX_CACHE_DROP_RATIO: Option = Some(0.15f32); + /// Outgoing encoded stream. /// /// A stream is a primary SSRC + optional RTX SSRC. @@ -102,6 +104,8 @@ pub struct StreamTx { /// sending spurious resends as padding. rtx_cache: RtxCache, + rtx_cache_drop_ratio: Option, + /// Last time we produced a SR. last_sender_report: Instant, @@ -123,7 +127,7 @@ pub struct StreamTx { } /// Holder of stats. -#[derive(Debug, Default)] +#[derive(Debug)] pub(crate) struct StreamTxStats { /// count of bytes sent, including retransmissions /// @@ -146,8 +150,26 @@ pub(crate) struct StreamTxStats { rtt: Option, /// losses collecter from RR (known packets, lost ratio) losses: Vec<(u64, f32)>, - bytes_transmitted: ValueHistory, - bytes_retransmitted: ValueHistory, + bytes_transmitted: Option>, + bytes_retransmitted: Option>, +} + +impl Default for StreamTxStats { + fn default() -> Self { + Self { + bytes: 0, + bytes_resent: 0, + packets: 0, + packets_resent: 0, + firs: 0, + plis: 0, + nacks: 0, + rtt: None, + losses: Vec::default(), + bytes_transmitted: Some(Default::default()), + bytes_retransmitted: Some(Default::default()), + } + } } impl StreamTx { @@ -172,6 +194,7 @@ impl StreamTx { padding: 0, blank_packet: RtpPacket::blank(), rtx_cache: RtxCache::new(2000, DEFAULT_RTX_CACHE_DURATION), + rtx_cache_drop_ratio: DEFAULT_RTX_CACHE_DROP_RATIO, last_sender_report: already_happened(), pending_request_keyframe: None, pending_request_remb: None, @@ -210,9 +233,22 @@ impl StreamTx { /// This determines how old incoming NACKs we can reply to. /// /// The default is 1024 packets over 3 seconds. - pub fn set_rtx_cache(&mut self, max_packets: usize, max_age: Duration) { + pub fn set_rtx_cache( + &mut self, + max_packets: usize, + max_age: Duration, + rtx_cache_drop_ratio: Option, + ) { // Dump old cache to avoid having to deal with resizing logic inside the cache impl. self.rtx_cache = RtxCache::new(max_packets, max_age); + if rtx_cache_drop_ratio.is_some() { + self.stats.bytes_transmitted = Some(Default::default()); + self.stats.bytes_retransmitted = Some(Default::default()); + } else { + self.stats.bytes_transmitted = None; + self.stats.bytes_retransmitted = None; + } + self.rtx_cache_drop_ratio = rtx_cache_drop_ratio; } /// Set whether this stream is unpaced or not. @@ -523,6 +559,15 @@ impl StreamTx { } fn rtx_ratio_downsampled(&mut self, now: Instant) -> f32 { + assert!( + self.stats.bytes_transmitted.is_some(), + "must only be called if rtx_cache_drop_ratio is enabled" + ); + assert!( + self.stats.bytes_retransmitted.is_some(), + "must only be called if rtx_cache_drop_ratio is enabled" + ); + let (value, ts) = self.rtx_ratio; if now - ts < Duration::from_millis(50) { // not worth re-evaluating, return the old value @@ -530,11 +575,8 @@ impl StreamTx { } // bytes stats refer to the last second by default - self.stats.bytes_transmitted.purge_old(now); - self.stats.bytes_retransmitted.purge_old(now); - - let bytes_transmitted = self.stats.bytes_transmitted.sum(); - let bytes_retransmitted = self.stats.bytes_retransmitted.sum(); + let bytes_transmitted = self.stats.bytes_transmitted.as_mut().unwrap().sum(); + let bytes_retransmitted = self.stats.bytes_retransmitted.as_mut().unwrap().sum(); let ratio = bytes_retransmitted as f32 / (bytes_retransmitted + bytes_transmitted) as f32; let ratio = if ratio.is_finite() { ratio } else { 0_f32 }; self.rtx_ratio = (ratio, now); @@ -542,12 +584,14 @@ impl StreamTx { } fn poll_packet_resend(&mut self, now: Instant) -> Option> { - let ratio = self.rtx_ratio_downsampled(now); + if let Some(drop_ratio) = self.rtx_cache_drop_ratio { + let ratio = self.rtx_ratio_downsampled(now); - // If we hit the cap, stop doing resends by clearing those we have queued. - if ratio > 0.15_f32 { - self.resends.clear(); - return None; + // If we hit the cap, stop doing resends by clearing those we have queued. + if ratio > drop_ratio { + self.resends.clear(); + return None; + } } let seq_no = loop { @@ -573,7 +617,9 @@ impl StreamTx { let len = pkt.payload.len() as u64; self.stats.update_packet_counts(len, true); - self.stats.bytes_retransmitted.push(now, len); + if let Some(h) = &mut self.stats.bytes_retransmitted { + h.push(now, len); + } let seq_no = self.seq_no_rtx.inc(); @@ -596,7 +642,9 @@ impl StreamTx { let len = pkt.payload.len() as u64; self.stats.update_packet_counts(len, false); - self.stats.bytes_transmitted.push(now, len); + if let Some(h) = &mut self.stats.bytes_transmitted { + h.push(now, len) + } let seq_no = pkt.seq_no; diff --git a/tests/rtx-cache-0.rs b/tests/rtx-cache-0.rs index 2e857773..762e5831 100644 --- a/tests/rtx-cache-0.rs +++ b/tests/rtx-cache-0.rs @@ -27,7 +27,7 @@ pub fn rtx_cache_0() -> Result<(), RtcError> { .declare_stream_tx(ssrc_tx, None, mid, Some(rid)) // // disable RTX cache by setting 0 - .set_rtx_cache(0, Duration::ZERO); + .set_rtx_cache(0, Duration::ZERO, Some(0.15)); r.direct_api() .declare_media(mid, MediaKind::Audio) From c80f1a030150097df129679cfd1769f5d81f981b Mon Sep 17 00:00:00 2001 From: alexlapa Date: Mon, 7 Oct 2024 11:02:14 +0300 Subject: [PATCH 2/3] some docs and changelog --- CHANGELOG.md | 1 + src/change/direct.rs | 10 ++-------- src/change/sdp.rs | 14 +++----------- src/streams/mod.rs | 2 +- src/streams/send.rs | 46 +++++++++++++++++++++++++++++++------------- 5 files changed, 40 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 545a4e2c..d1a2d36f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * Fix bug using unreliable channels by default #548 * New add_channel_with_config() for configured data channels #548 * Fix RTX stops working after packet loss spike #566 + * Configure RTX ratio cap via `StreamTx::set_rtx_cache` #570 # 0.6.1 * Force openssl to be >=0.10.66 #545 diff --git a/src/change/direct.rs b/src/change/direct.rs index a0cd1c18..67655b00 100644 --- a/src/change/direct.rs +++ b/src/change/direct.rs @@ -3,9 +3,7 @@ use crate::crypto::Fingerprint; use crate::media::{Media, MediaKind}; use crate::rtp_::{Mid, Rid, Ssrc}; use crate::sctp::ChannelConfig; -use crate::streams::{ - StreamRx, StreamTx, DEFAULT_RTX_CACHE_DROP_RATIO, DEFAULT_RTX_CACHE_DURATION, -}; +use crate::streams::{StreamRx, StreamTx, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP}; use crate::IceCreds; use crate::Rtc; use crate::RtcError; @@ -248,11 +246,7 @@ impl<'a> DirectApi<'a> { self.rtc.session.send_buffer_video }; - stream.set_rtx_cache( - size, - DEFAULT_RTX_CACHE_DURATION, - DEFAULT_RTX_CACHE_DROP_RATIO, - ); + stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP); stream } diff --git a/src/change/sdp.rs b/src/change/sdp.rs index 20e9a819..802e4b5a 100644 --- a/src/change/sdp.rs +++ b/src/change/sdp.rs @@ -22,7 +22,7 @@ use crate::RtcError; use crate::{Candidate, IceCreds}; pub use crate::sdp::{SdpAnswer, SdpOffer}; -use crate::streams::{Streams, DEFAULT_RTX_CACHE_DROP_RATIO, DEFAULT_RTX_CACHE_DURATION}; +use crate::streams::{Streams, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP}; /// Changes to the Rtc via SDP Offer/Answer dance. pub struct SdpApi<'a> { @@ -862,11 +862,7 @@ fn ensure_stream_tx(session: &mut Session) { session.send_buffer_video }; - stream.set_rtx_cache( - size, - DEFAULT_RTX_CACHE_DURATION, - DEFAULT_RTX_CACHE_DROP_RATIO, - ); + stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP); } } } @@ -902,11 +898,7 @@ fn add_pending_changes(session: &mut Session, pending: Changes) { session.send_buffer_video }; - stream.set_rtx_cache( - size, - DEFAULT_RTX_CACHE_DURATION, - DEFAULT_RTX_CACHE_DROP_RATIO, - ); + stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP); } } } diff --git a/src/streams/mod.rs b/src/streams/mod.rs index b2069e43..c9e4d3fe 100644 --- a/src/streams/mod.rs +++ b/src/streams/mod.rs @@ -24,7 +24,7 @@ pub(crate) mod rtx_cache_buf; mod send; mod send_queue; -pub(crate) use send::{DEFAULT_RTX_CACHE_DROP_RATIO, DEFAULT_RTX_CACHE_DURATION}; +pub(crate) use send::{DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP}; // Time between regular receiver reports. // https://www.rfc-editor.org/rfc/rfc8829#section-5.1.2 diff --git a/src/streams/send.rs b/src/streams/send.rs index dc19c76f..3e47ee0c 100644 --- a/src/streams/send.rs +++ b/src/streams/send.rs @@ -38,7 +38,7 @@ const MIN_SPURIOUS_PADDING_SIZE: usize = 50; pub const DEFAULT_RTX_CACHE_DURATION: Duration = Duration::from_secs(3); -pub const DEFAULT_RTX_CACHE_DROP_RATIO: Option = Some(0.15f32); +pub const DEFAULT_RTX_RATIO_CAP: Option = Some(0.15f32); /// Outgoing encoded stream. /// @@ -104,7 +104,8 @@ pub struct StreamTx { /// sending spurious resends as padding. rtx_cache: RtxCache, - rtx_cache_drop_ratio: Option, + /// Determines retransmitted bytes ratio value to clear queued resends. + rtx_ratio_cap: Option, /// Last time we produced a SR. last_sender_report: Instant, @@ -150,7 +151,11 @@ pub(crate) struct StreamTxStats { rtt: Option, /// losses collecter from RR (known packets, lost ratio) losses: Vec<(u64, f32)>, + + /// `None` if `rtx_ratio_cap` is `None`. bytes_transmitted: Option>, + + /// `None` if `rtx_ratio_cap` is `None`. bytes_retransmitted: Option>, } @@ -194,7 +199,7 @@ impl StreamTx { padding: 0, blank_packet: RtpPacket::blank(), rtx_cache: RtxCache::new(2000, DEFAULT_RTX_CACHE_DURATION), - rtx_cache_drop_ratio: DEFAULT_RTX_CACHE_DROP_RATIO, + rtx_ratio_cap: DEFAULT_RTX_RATIO_CAP, last_sender_report: already_happened(), pending_request_keyframe: None, pending_request_remb: None, @@ -232,23 +237,27 @@ impl StreamTx { /// /// This determines how old incoming NACKs we can reply to. /// - /// The default is 1024 packets over 3 seconds. + /// `rtx_ratio_cap` determines when to clear queued resends because of too many resends, + /// i.e. if `tx_sum / (rtx_sum + tx_sum) > rtx_ratio_cap`. `None` disables this functionality + /// so all queued resends will be sent. + /// + /// The default is 1024 packets over 3 seconds and RTX cache drop ratio of 0.15. pub fn set_rtx_cache( &mut self, max_packets: usize, max_age: Duration, - rtx_cache_drop_ratio: Option, + rtx_ratio_cap: Option, ) { // Dump old cache to avoid having to deal with resizing logic inside the cache impl. self.rtx_cache = RtxCache::new(max_packets, max_age); - if rtx_cache_drop_ratio.is_some() { - self.stats.bytes_transmitted = Some(Default::default()); - self.stats.bytes_retransmitted = Some(Default::default()); + if rtx_ratio_cap.is_some() { + self.stats.bytes_transmitted = Some(ValueHistory::default()); + self.stats.bytes_retransmitted = Some(ValueHistory::default()); } else { self.stats.bytes_transmitted = None; self.stats.bytes_retransmitted = None; } - self.rtx_cache_drop_ratio = rtx_cache_drop_ratio; + self.rtx_ratio_cap = rtx_ratio_cap; } /// Set whether this stream is unpaced or not. @@ -561,11 +570,11 @@ impl StreamTx { fn rtx_ratio_downsampled(&mut self, now: Instant) -> f32 { assert!( self.stats.bytes_transmitted.is_some(), - "must only be called if rtx_cache_drop_ratio is enabled" + "rtx_ratio_cap must be enabled" ); assert!( self.stats.bytes_retransmitted.is_some(), - "must only be called if rtx_cache_drop_ratio is enabled" + "rtx_ratio_cap must be enabled" ); let (value, ts) = self.rtx_ratio; @@ -575,6 +584,17 @@ impl StreamTx { } // bytes stats refer to the last second by default + self.stats + .bytes_transmitted + .as_mut() + .unwrap() + .purge_old(now); + self.stats + .bytes_retransmitted + .as_mut() + .unwrap() + .purge_old(now); + let bytes_transmitted = self.stats.bytes_transmitted.as_mut().unwrap().sum(); let bytes_retransmitted = self.stats.bytes_retransmitted.as_mut().unwrap().sum(); let ratio = bytes_retransmitted as f32 / (bytes_retransmitted + bytes_transmitted) as f32; @@ -584,11 +604,11 @@ impl StreamTx { } fn poll_packet_resend(&mut self, now: Instant) -> Option> { - if let Some(drop_ratio) = self.rtx_cache_drop_ratio { + if let Some(ratio_cap) = self.rtx_ratio_cap { let ratio = self.rtx_ratio_downsampled(now); // If we hit the cap, stop doing resends by clearing those we have queued. - if ratio > drop_ratio { + if ratio > ratio_cap { self.resends.clear(); return None; } From ee66d1947b775bcc7d4e8227f0570867549147db Mon Sep 17 00:00:00 2001 From: alexlapa Date: Tue, 8 Oct 2024 15:32:15 +0300 Subject: [PATCH 3/3] fix review notes --- src/streams/send.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/streams/send.rs b/src/streams/send.rs index 3e47ee0c..c22bd814 100644 --- a/src/streams/send.rs +++ b/src/streams/send.rs @@ -251,8 +251,12 @@ impl StreamTx { // Dump old cache to avoid having to deal with resizing logic inside the cache impl. self.rtx_cache = RtxCache::new(max_packets, max_age); if rtx_ratio_cap.is_some() { - self.stats.bytes_transmitted = Some(ValueHistory::default()); - self.stats.bytes_retransmitted = Some(ValueHistory::default()); + self.stats + .bytes_transmitted + .get_or_insert_with(ValueHistory::default); + self.stats + .bytes_retransmitted + .get_or_insert_with(ValueHistory::default); } else { self.stats.bytes_transmitted = None; self.stats.bytes_retransmitted = None;