diff --git a/examples/chat.rs b/examples/chat.rs index e4a57990..75559542 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -495,8 +495,13 @@ impl Client { if let TrackOutState::ToOpen = track.state { if let Some(track_in) = track.track_in.upgrade() { let stream_id = track_in.origin.to_string(); - let mid = - change.add_media(track_in.kind, Direction::SendOnly, Some(stream_id), None); + let mid = change.add_media( + track_in.kind, + Direction::SendOnly, + Some(stream_id), + None, + None, + ); track.state = TrackOutState::Negotiating(mid); } } diff --git a/src/change/direct.rs b/src/change/direct.rs index 4cbd2504..85c81493 100644 --- a/src/change/direct.rs +++ b/src/change/direct.rs @@ -232,7 +232,7 @@ impl<'a> DirectApi<'a> { mid: Mid, rid: Option, ) -> &mut StreamTx { - let Some(media) = self.rtc.session.media_by_mid(mid) else { + let Some(media) = self.rtc.session.media_by_mid_mut(mid) else { panic!("No media declared for mid: {}", mid); }; @@ -240,6 +240,11 @@ impl<'a> DirectApi<'a> { let midrid = MidRid(mid, rid); + // If there is a RID tx, declare it so we an use it in Writer API + if let Some(rid) = rid { + media.add_to_rid_tx(rid); + } + let stream = self .rtc .session diff --git a/src/change/sdp.rs b/src/change/sdp.rs index 5dd73567..d73b30f8 100644 --- a/src/change/sdp.rs +++ b/src/change/sdp.rs @@ -8,7 +8,7 @@ use crate::crypto::Fingerprint; use crate::format::CodecConfig; use crate::format::PayloadParams; use crate::io::Id; -use crate::media::Media; +use crate::media::{Media, Simulcast}; use crate::packet::MediaKind; use crate::rtp_::MidRid; use crate::rtp_::Rid; @@ -128,7 +128,7 @@ impl<'a> SdpApi<'a> { /// let mut rtc = Rtc::new(); /// /// let mut changes = rtc.sdp_api(); - /// let mid = changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None); + /// let mid = changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None, None); /// let (offer, pending) = changes.apply().unwrap(); /// /// // send offer to remote peer, receive answer back @@ -201,7 +201,7 @@ impl<'a> SdpApi<'a> { /// let mut changes = rtc.sdp_api(); /// assert!(!changes.has_changes()); /// - /// let mid = changes.add_media(MediaKind::Audio, Direction::SendRecv, None, None); + /// let mid = changes.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None); /// assert!(changes.has_changes()); /// # } /// ``` @@ -227,7 +227,7 @@ impl<'a> SdpApi<'a> { /// /// let mut changes = rtc.sdp_api(); /// - /// let mid = changes.add_media(MediaKind::Audio, Direction::SendRecv, None, None); + /// let mid = changes.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None); /// # } /// ``` pub fn add_media( @@ -236,6 +236,7 @@ impl<'a> SdpApi<'a> { dir: Direction, stream_id: Option, track_id: Option, + simulcast: Option, ) -> Mid { let mid = self.rtc.new_mid(); @@ -266,8 +267,15 @@ impl<'a> SdpApi<'a> { Id::<20>::random().to_string() }; - let rtx = kind.is_video().then(|| self.rtc.session.streams.new_ssrc()); - let ssrcs = vec![(self.rtc.session.streams.new_ssrc(), rtx)]; + let mut ssrcs = Vec::new(); + + // Main SSRC, not counting RTX. + let main_ssrc_count = simulcast.as_ref().map(|s| s.send.len()).unwrap_or(1); + + for _ in 0..main_ssrc_count { + let rtx = kind.is_video().then(|| self.rtc.session.streams.new_ssrc()); + ssrcs.push((self.rtc.session.streams.new_ssrc(), rtx)); + } // TODO: let user configure stream/track name. let msid = Msid { @@ -282,6 +290,7 @@ impl<'a> SdpApi<'a> { kind, dir, ssrcs, + simulcast, // Added later pts: vec![], @@ -459,11 +468,11 @@ impl<'a> SdpApi<'a> { /// # use str0m::Rtc; /// let mut rtc = Rtc::new(); /// let mut changes = rtc.sdp_api(); - /// changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None); + /// changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None, None); /// let (_offer, pending) = changes.apply().unwrap(); /// /// let mut changes = rtc.sdp_api(); - /// changes.add_media(MediaKind::Video, Direction::SendOnly, None, None); + /// changes.add_media(MediaKind::Video, Direction::SendOnly, None, None, None); /// changes.merge(pending); /// /// // This `SdpOffer` will have changes from the first `SdpPendingChanges` @@ -489,7 +498,7 @@ impl<'a> SdpApi<'a> { /// let mut rtc = Rtc::new(); /// /// let mut changes = rtc.sdp_api(); -/// let mid = changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None); +/// let mid = changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None, None); /// let (offer, pending) = changes.apply().unwrap(); /// /// // send offer to remote peer, receive answer back @@ -561,6 +570,7 @@ pub(crate) struct AddMedia { pub kind: MediaKind, pub dir: Direction, pub ssrcs: Vec<(Ssrc, Option)>, + pub simulcast: Option, // pts and index are filled in when creating the SDP OFFER. // The default PT order is set by the Session (BUNDLE). @@ -894,9 +904,12 @@ fn add_pending_changes(session: &mut Session, pending: Changes) { media.set_cname(add_media.cname); media.set_msid(add_media.msid); - for (ssrc, rtx) in add_media.ssrcs { - // TODO: When we allow sending RID, we need to add that here. - let midrid = MidRid(add_media.mid, None); + // If there are RIDs, the SSRC order matches that of the rid order. + let rids = add_media.simulcast.map(|x| x.send).unwrap_or(vec![]); + + for (i, (ssrc, rtx)) in add_media.ssrcs.into_iter().enumerate() { + let maybe_rid = rids.get(i).cloned(); + let midrid = MidRid(add_media.mid, maybe_rid); let stream = session.streams.declare_stream_tx(ssrc, rtx, midrid); @@ -1068,8 +1081,14 @@ fn update_media( media.set_direction(new_dir); } - for rid in m.rids().iter() { - media.expect_rid(*rid); + if new_dir.is_sending() { + // The other side has declared how it EXPECTING to receive. We must only send + // the RIDs declared in the answer. + media.set_rid_tx(m.rids().into()); + } + if new_dir.is_receiving() { + // The other side has declared what it proposes to send. We are accepting it. + media.set_rid_rx(m.rids().into()); } // Narrowing/ordering of of PT @@ -1102,44 +1121,47 @@ fn update_media( } media.set_remote_extmap(remote_extmap); - if new_dir.is_receiving() { - // SSRC changes - // This will always be for ReceiverSource since any incoming a=ssrc line will be - // about the remote side's SSRC. - let infos = m.ssrc_info(); - let main = infos.iter().filter(|i| i.repairs.is_none()); - - if m.simulcast().is_none() { - // Only use pre-communicated SSRC if we are running without simulcast. - // We found a bug in FF where the order of the simulcast lines does not - // correspond to the order of the simulcast declarations. In this case - // it's better to fall back on mid/rid dynamic mapping. - - for i in main { - // TODO: If the remote is communicating _BOTH_ rid and a=ssrc this will fail. - info!("Adding pre-communicated SSRC: {:?}", i); - let repair_ssrc = infos - .iter() - .find(|r| r.repairs == Some(i.ssrc)) - .map(|r| r.ssrc); - - // If remote communicated a main a=ssrc, but no RTX, we will not send nacks. - let midrid = MidRid(media.mid(), None); - let suppress_nack = repair_ssrc.is_none(); - streams.expect_stream_rx(i.ssrc, repair_ssrc, midrid, suppress_nack); - } - } + // SSRC changes + // This will always be for ReceiverSource since any incoming a=ssrc line will be + // about the remote side's SSRC. + if !new_dir.is_receiving() { + return; + } - // Simulcast configuration - if let Some(s) = m.simulcast() { - if s.is_munged { - warn!("Not supporting simulcast via munging SDP"); - } else if media.simulcast().is_none() { - // Invert before setting, since it has a recv and send config. - media.set_simulcast(s.invert()); - } + // Simulcast configuration + if let Some(s) = m.simulcast() { + if s.is_munged { + warn!("Not supporting simulcast via munging SDP"); + } else if media.simulcast().is_none() { + // Invert before setting, since it has a recv and send config. + media.set_simulcast(s.invert()); } } + + // Only use pre-communicated SSRC if we are running without simulcast. + // We found a bug in FF where the order of the simulcast lines does not + // correspond to the order of the simulcast declarations. In this case + // it's better to fall back on mid/rid dynamic mapping. + if m.simulcast().is_some() { + return; + } + + let infos = m.ssrc_info(); + let main = infos.iter().filter(|i| i.repairs.is_none()); + + for i in main { + // TODO: If the remote is communicating _BOTH_ rid and a=ssrc this will fail. + info!("Adding pre-communicated SSRC: {:?}", i); + let repair_ssrc = infos + .iter() + .find(|r| r.repairs == Some(i.ssrc)) + .map(|r| r.ssrc); + + // If remote communicated a main a=ssrc, but no RTX, we will not send nacks. + let midrid = MidRid(media.mid(), None); + let suppress_nack = repair_ssrc.is_none(); + streams.expect_stream_rx(i.ssrc, repair_ssrc, midrid, suppress_nack); + } } trait AsSdpMediaLine { @@ -1291,18 +1313,13 @@ impl AsSdpMediaLine for Media { } } - let count = ssrcs_tx.len(); - #[allow(clippy::comparison_chain)] - if count == 1 { - let (ssrc, ssrc_rtx) = &ssrcs_tx[0]; + for (ssrc, ssrc_rtx) in ssrcs_tx { if let Some(ssrc_rtx) = ssrc_rtx { attrs.push(MediaAttribute::SsrcGroup { semantics: "FID".to_string(), ssrcs: vec![*ssrc, *ssrc_rtx], }); } - } else { - // TODO: handle simulcast } MediaLine { @@ -1549,7 +1566,10 @@ impl Change { #[cfg(test)] mod test { + use sdp::RestrictionId; + use crate::format::Codec; + use crate::media::Simulcast; use crate::sdp::RtpMap; use super::*; @@ -1595,11 +1615,11 @@ mod test { let mut rtc = Rtc::new(); let mut changes = rtc.sdp_api(); - changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None); + changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None, None); let (offer, pending) = changes.apply().unwrap(); let mut changes = rtc.sdp_api(); - changes.add_media(MediaKind::Video, Direction::SendOnly, None, None); + changes.add_media(MediaKind::Video, Direction::SendOnly, None, None, None); changes.merge(pending); let (new_offer, _) = changes.apply().unwrap(); @@ -1624,7 +1644,7 @@ mod test { .build(); let mut change1 = rtc1.sdp_api(); - change1.add_media(MediaKind::Video, Direction::SendOnly, None, None); + change1.add_media(MediaKind::Video, Direction::SendOnly, None, None, None); let (offer1, _) = change1.apply().unwrap(); let answer = rtc2.sdp_api().accept_offer(offer1).unwrap(); @@ -1652,4 +1672,73 @@ mod test { "VP9 was not offered, so it should not be present in the answer" ); } + + #[test] + fn simulcast_ssrc_allocation() { + crate::init_crypto_default(); + + let mut rtc1 = Rtc::new(); + + let mut change = rtc1.sdp_api(); + change.add_media( + MediaKind::Video, + Direction::SendOnly, + None, + None, + Some(Simulcast { + send: vec!["m".into(), "h".into(), "l".into()], + recv: vec![], + }), + ); + + let Change::AddMedia(am) = &change.changes[0] else { + panic!("Not AddMedia?!"); + }; + + // these should be organized in order: m, h, l + let pending_ssrcs = am.ssrcs.clone(); + assert_eq!(pending_ssrcs.len(), 3); + + for p in &pending_ssrcs { + assert!(p.1.is_some()); // all should have rtx + } + + let (offer, _) = change.apply().unwrap(); + let sdp = offer.into_inner(); + let line = &sdp.media_lines[0]; + + assert_eq!( + line.simulcast().unwrap().send, + SimulcastGroups(vec![ + RestrictionId("m".into(), true), + RestrictionId("h".into(), true), + RestrictionId("l".into(), true), + ]) + ); + + // Each SSRC, both regular and RTX get their own a=ssrc line. + assert_eq!(line.ssrc_info().len(), pending_ssrcs.len() * 2); + + let fids: Vec<_> = line + .attrs + .iter() + .filter_map(|a| { + if let MediaAttribute::SsrcGroup { semantics, ssrcs } = a { + // We don't have any other semantics right now. + assert_eq!(semantics, "FID"); + assert_eq!(ssrcs.len(), 2); + Some((ssrcs[0], ssrcs[1])) + } else { + None + } + }) + .collect(); + + assert_eq!(fids.len(), pending_ssrcs.len()); + + for (a, b) in fids.iter().zip(pending_ssrcs.iter()) { + assert_eq!(a.0, b.0); + assert_eq!(Some(a.1), b.1); + } + } } diff --git a/src/lib.rs b/src/lib.rs index 95df894d..5be04480 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,7 +96,7 @@ //! let mut change = rtc.sdp_api(); //! //! // Do some change. A valid OFFER needs at least one "m-line" (media). -//! let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None); +//! let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None); //! //! // Get the offer. //! let (offer, pending) = change.apply().unwrap(); @@ -1281,8 +1281,8 @@ impl Rtc { /// let mut rtc = Rtc::new(); /// /// let mut changes = rtc.sdp_api(); - /// let mid_audio = changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None); - /// let mid_video = changes.add_media(MediaKind::Video, Direction::SendOnly, None, None); + /// let mid_audio = changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None, None); + /// let mid_video = changes.add_media(MediaKind::Video, Direction::SendOnly, None, None, None); /// /// let (offer, pending) = changes.apply().unwrap(); /// let json = serde_json::to_vec(&offer).unwrap(); @@ -1335,6 +1335,8 @@ impl Rtc { panic!("In rtp_mode use direct_api().stream_tx().write_rtp()"); } + // This does not catch potential RIDs required to send simulcast, but + // it's a good start. An error might arise later on RID mismatch. self.session.media_by_mid_mut(mid)?; Some(Writer::new(&mut self.session, mid)) diff --git a/src/media/event.rs b/src/media/event.rs index 2878b377..4057daf2 100644 --- a/src/media/event.rs +++ b/src/media/event.rs @@ -59,7 +59,7 @@ pub struct MediaChanged { /// The [full spec][1] covers many cases that are not used by simple simulcast. /// /// [1]: https://datatracker.ietf.org/doc/html/draft-ietf-mmusic-sdp-simulcast-14 -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct Simulcast { /// The RID used for sending simulcast. pub send: Vec, @@ -67,6 +67,26 @@ pub struct Simulcast { pub recv: Vec, } +impl Simulcast { + pub(crate) fn into_sdp(self) -> SdpSimulcast { + SdpSimulcast { + send: crate::sdp::SimulcastGroups( + self.send + .into_iter() + .map(|r| crate::sdp::RestrictionId::new_active(r.to_string())) + .collect(), + ), + recv: crate::sdp::SimulcastGroups( + self.recv + .into_iter() + .map(|r| crate::sdp::RestrictionId::new_active(r.to_string())) + .collect(), + ), + is_munged: false, + } + } +} + /// Video or audio data from the remote peer. /// /// This is obtained via [`Event::MediaData`][crate::Event::MediaData]. diff --git a/src/media/mod.rs b/src/media/mod.rs index 5a9271fa..5d4cc2cb 100644 --- a/src/media/mod.rs +++ b/src/media/mod.rs @@ -49,6 +49,11 @@ pub struct Media { /// RTP level. rids_rx: Rids, + /// Rid that we can send using the [`Writer`]. + /// + /// RTP level. + rids_tx: Rids, + // ========================================= SDP level ========================================= // /// The index of this media line in the Session::media Vec. @@ -120,8 +125,10 @@ pub struct Media { } #[derive(Debug)] -/// Config value for [`Media::rids_rx()`] +/// Config value for [`Media::rids_rx()`] and [`Media::rids_tx()`] pub enum Rids { + /// No rid is allowed. + None, /// Any Rid is allowed. /// /// This is the default value for direct API. @@ -133,8 +140,9 @@ pub enum Rids { } impl Rids { - pub(crate) fn expects(&self, rid: Rid) -> bool { + pub(crate) fn contains(&self, rid: Rid) -> bool { match self { + Rids::None => false, Rids::Any => true, Rids::Specific(v) => v.contains(&rid), } @@ -143,6 +151,22 @@ impl Rids { pub(crate) fn is_specific(&self) -> bool { matches!(self, Rids::Specific(_)) } + + fn add(&mut self, rid: Rid) { + match self { + Rids::None | Rids::Any => { + *self = Rids::Specific(vec![rid]); + } + Rids::Specific(vec) if !vec.contains(&rid) => vec.push(rid), + Rids::Specific(_) => {} + } + } +} + +impl> From for Rids { + fn from(value: I) -> Self { + Rids::Specific(value.as_ref().to_vec()) + } } #[derive(Debug)] @@ -180,14 +204,8 @@ impl Media { /// a mid/rid combination in the RTP header extensions. /// /// RTP level. - pub fn expect_rid(&mut self, rid: Rid) { - match &mut self.rids_rx { - rids @ Rids::Any => { - *rids = Rids::Specific(vec![rid]); - } - Rids::Specific(v) if !v.contains(&rid) => v.push(rid), - _ => {} - } + pub fn expect_rid_rx(&mut self, rid: Rid) { + self.rids_rx.add(rid); } /// Rids we are expecting to see on incoming RTP packets that map to this mid. @@ -200,6 +218,16 @@ impl Media { &self.rids_rx } + /// Rids we are can send via the [`Writer`]. + /// + /// By default this is set to [`Rids::None`], which changes to [`Rids::Specific`] via SDP negotiation + /// that configures Simulcast where specific rids are expected. + /// + /// RTP level. + pub fn rids_tx(&self) -> &Rids { + &self.rids_tx + } + pub(crate) fn index(&self) -> usize { self.index } @@ -467,6 +495,18 @@ impl Media { // Simply remove the depayloader, it will be re-created on the next RTP packet. self.depayloaders.remove(&(payload_type, rid)); } + + pub(crate) fn set_rid_rx(&mut self, rids: Rids) { + self.rids_rx = rids; + } + + pub(crate) fn set_rid_tx(&mut self, rids: Rids) { + self.rids_tx = rids; + } + + pub(crate) fn add_to_rid_tx(&mut self, rid: Rid) { + self.rids_tx.add(rid) + } } impl Default for Media { @@ -484,6 +524,7 @@ impl Default for Media { dir: Direction::SendRecv, simulcast: None, rids_rx: Rids::Any, + rids_tx: Rids::None, payloaders: HashMap::new(), depayloaders: HashMap::new(), to_payload: VecDeque::default(), @@ -528,6 +569,7 @@ impl Media { remote_pts: a.pts, remote_exts: a.exts, remote_created: false, + simulcast: a.simulcast.map(|s| s.into_sdp()), ..Default::default() } } diff --git a/src/media/writer.rs b/src/media/writer.rs index 2097776a..376bc6a5 100644 --- a/src/media/writer.rs +++ b/src/media/writer.rs @@ -129,7 +129,7 @@ impl<'a> Writer<'a> { } if let Some(rid) = self.rid { - if !media.rids_rx().expects(rid) && media.rids_rx().is_specific() { + if !media.rids_tx().contains(rid) { return Err(RtcError::UnknownRid(rid)); } } diff --git a/src/packet/pacer.rs b/src/packet/pacer.rs index 9c83e121..e0d8ca5e 100644 --- a/src/packet/pacer.rs +++ b/src/packet/pacer.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::time::{Duration, Instant}; -use crate::rtp_::{Bitrate, DataSize, Mid}; +use crate::rtp_::{Bitrate, DataSize, MidRid}; use crate::util::already_happened; use crate::util::not_happening; use crate::util::Soonest; @@ -49,14 +49,14 @@ impl Pacer for PacerImpl { } } - fn poll_queue(&mut self) -> Option { + fn poll_queue(&mut self) -> Option { match self { PacerImpl::Null(v) => v.poll_queue(), PacerImpl::LeakyBucket(v) => v.poll_queue(), } } - fn register_send(&mut self, now: Instant, packet_size: DataSize, from: Mid) { + fn register_send(&mut self, now: Instant, packet_size: DataSize, from: MidRid) { match self { PacerImpl::Null(v) => v.register_send(now, packet_size, from), PacerImpl::LeakyBucket(v) => v.register_send(now, packet_size, from), @@ -68,7 +68,7 @@ impl Pacer for PacerImpl { /// /// The pacer is responsible for ensuring correct pacing of packets onto the network at a given /// bitrate. -pub trait Pacer { +pub(crate) trait Pacer { /// Set the pacing bitrate. The pacing rate can be exceeded if required to drain excessively /// long packet queues. fn set_pacing_rate(&mut self, pacing_bitrate: Bitrate); @@ -87,12 +87,12 @@ pub trait Pacer { ) -> Option; /// Determines which mid to poll, if any. - fn poll_queue(&mut self) -> Option; + fn poll_queue(&mut self) -> Option; /// Register a packet having been sent. /// /// **MUST** be called each time [`Pacer::poll_queue`] produces a mid. - fn register_send(&mut self, now: Instant, packet_size: DataSize, from: Mid); + fn register_send(&mut self, now: Instant, packet_size: DataSize, from: MidRid); } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -167,8 +167,8 @@ impl Default for QueueSnapshot { /// The state of a single upstream queue. /// The pacer manages packets across several upstream queues. #[derive(Debug, Clone, Copy)] -pub struct QueueState { - pub mid: Mid, +pub(crate) struct QueueState { + pub midrid: MidRid, pub unpaced: bool, pub use_for_padding: bool, pub snapshot: QueueSnapshot, @@ -176,9 +176,9 @@ pub struct QueueState { /// A request to generate a specific amount of padding. #[derive(Debug, Clone, Copy)] -pub struct PaddingRequest { +pub(crate) struct PaddingRequest { /// The Mid that should generate and queue the padding. - pub mid: Mid, + pub midrid: MidRid, /// The amount of padding in bytes to generate. pub padding: usize, } @@ -186,7 +186,7 @@ pub struct PaddingRequest { /// A null pacer that doesn't pace. #[derive(Debug, Default)] pub struct NullPacer { - last_sends: HashMap, + last_sends: HashMap, queue_states: Vec, need_immediate_timeout: bool, } @@ -219,15 +219,15 @@ impl Pacer for NullPacer { None } - fn poll_queue(&mut self) -> Option { + fn poll_queue(&mut self) -> Option { let non_empty_queues = self .queue_states .iter() .filter(|q| q.snapshot.packet_count > 0); // Pick a queue using round robin, prioritize the least recently sent on queue. - let to_send_on = non_empty_queues.min_by_key(|q| self.last_sends.get(&q.mid)); + let to_send_on = non_empty_queues.min_by_key(|q| self.last_sends.get(&q.midrid)); - let result = to_send_on.map(|q| q.mid); + let result = to_send_on.map(|q| q.midrid); if result.is_some() { self.need_immediate_timeout = true; @@ -236,7 +236,7 @@ impl Pacer for NullPacer { result } - fn register_send(&mut self, now: Instant, _packet_size: DataSize, from: Mid) { + fn register_send(&mut self, now: Instant, _packet_size: DataSize, from: MidRid) { let e = self.last_sends.entry(from).or_insert(now); *e = now; } @@ -265,7 +265,7 @@ pub struct LeakyBucketPacer { /// The queue states given by last handle_timeout. queue_states: Vec, /// The next return value for `poll_queue`` - next_poll_queue: Option, + next_poll_queue: Option, } impl Pacer for LeakyBucketPacer { @@ -305,7 +305,7 @@ impl Pacer for LeakyBucketPacer { self.maybe_update_adjusted_bitrate(now); if let Some(request) = self.maybe_create_padding_request() { - self.next_poll_queue = Some(request.mid); + self.next_poll_queue = Some(request.midrid); return Some(request); } @@ -332,21 +332,21 @@ impl Pacer for LeakyBucketPacer { self.next_poll_queue = None; self.next_poll_time = Some(next_poll_time); } else { - self.next_poll_queue = queue.map(|q| q.mid); + self.next_poll_queue = queue.map(|q| q.midrid); self.next_poll_time = Some(next_poll_time); } None } - fn poll_queue(&mut self) -> Option { + fn poll_queue(&mut self) -> Option { let next = self.next_poll_queue.take()?; self.request_immediate_timeout(); Some(next) } - fn register_send(&mut self, now: Instant, packet_size: DataSize, _from: Mid) { + fn register_send(&mut self, now: Instant, packet_size: DataSize, _from: MidRid) { self.last_emitted = Some(now); self.media_debt += packet_size; @@ -560,7 +560,7 @@ impl LeakyBucketPacer { let padding = (self.padding_bitrate * PADDING_BURST_INTERVAL).as_bytes_usize(); Some(PaddingRequest { - mid: queue.mid, + midrid: queue.midrid, padding, }) } @@ -987,7 +987,7 @@ mod test { let now = Instant::now(); let mut state = QueueState { - mid: Mid::from("001"), + midrid: MidRid(Mid::from("001"), None), unpaced: false, use_for_padding: true, snapshot: QueueSnapshot { @@ -1002,7 +1002,7 @@ mod test { }; let other = QueueState { - mid: Mid::from("002"), + midrid: MidRid(Mid::from("002"), None), unpaced: false, use_for_padding: false, snapshot: QueueSnapshot { @@ -1018,7 +1018,7 @@ mod test { state.snapshot.merge(&other.snapshot); - assert_eq!(state.mid, Mid::from("001")); + assert_eq!(state.midrid.mid(), Mid::from("001")); assert_eq!(state.snapshot.size, 40_usize); assert_eq!(state.snapshot.packet_count, 1337); assert_eq!(state.snapshot.total_queue_time_origin, duration_ms(1337)); @@ -1174,16 +1174,16 @@ mod test { } let timeout = { - if let Some(mid) = pacer.poll_queue() { + if let Some(midrid) = pacer.poll_queue() { let packet = queue .next_packet() - .unwrap_or_else(|| panic!("Should have a packet for mid {mid}")); - queue.register_send(mid, base + elapsed); + .unwrap_or_else(|| panic!("Should have a packet for {:?}", midrid)); + queue.register_send(midrid, base + elapsed); queue.update_average_queue_time(base + elapsed); pacer.register_send( base + elapsed, DataSize::bytes(packet.payload_len as u64), - mid, + midrid, ); if packet.kind == PacketKind::Padding { padding_sent += packet.payload_len.into(); @@ -1313,15 +1313,18 @@ mod test { .into_iter() } - pub(super) fn register_send(&mut self, mid: Mid, now: Instant) { - if self.video_queue.mid == mid { + pub(super) fn register_send(&mut self, midrid: MidRid, now: Instant) { + if self.video_queue.midrid == midrid { self.video_queue.last_emitted = Some(now); - } else if self.audio_queue.mid == mid { + } else if self.audio_queue.midrid == midrid { self.audio_queue.last_emitted = Some(now); - } else if self.padding_queue.mid == mid { + } else if self.padding_queue.midrid == midrid { self.padding_queue.last_emitted = Some(now); } else { - panic!("Attempted to register send on unknown queue with id {mid:?}"); + panic!( + "Attempted to register send on unknown queue with id {:?}", + midrid + ); } } @@ -1355,9 +1358,21 @@ mod test { impl Default for Queue { fn default() -> Self { Self { - audio_queue: Inner::new(Mid::from("001"), true, QueuePriority::Media), - video_queue: Inner::new(Mid::from("002"), false, QueuePriority::Media), - padding_queue: Inner::new(Mid::from("003"), false, QueuePriority::Padding), + audio_queue: Inner::new( + MidRid(Mid::from("001"), None), + true, + QueuePriority::Media, + ), + video_queue: Inner::new( + MidRid(Mid::from("002"), None), + false, + QueuePriority::Media, + ), + padding_queue: Inner::new( + MidRid(Mid::from("003"), None), + false, + QueuePriority::Padding, + ), } } } @@ -1369,7 +1384,7 @@ mod test { } struct Inner { - mid: Mid, + midrid: MidRid, last_emitted: Option, queue: VecDeque, packet_count: u32, @@ -1380,9 +1395,9 @@ mod test { } impl Inner { - fn new(mid: Mid, is_audio: bool, priority: QueuePriority) -> Self { + fn new(midrid: MidRid, is_audio: bool, priority: QueuePriority) -> Self { Self { - mid, + midrid, last_emitted: None, queue: VecDeque::default(), packet_count: 0, @@ -1430,7 +1445,7 @@ mod test { fn queue_state(&self, now: Instant) -> QueueState { QueueState { - mid: self.mid, + midrid: self.midrid, unpaced: self.is_audio, use_for_padding: !self.is_audio && self.last_emitted.is_some(), snapshot: QueueSnapshot { diff --git a/src/rtp/vla.rs b/src/rtp/vla.rs index 112f5e4a..65768beb 100644 --- a/src/rtp/vla.rs +++ b/src/rtp/vla.rs @@ -211,10 +211,151 @@ impl VideoLayersAllocation { pub struct Serializer; impl ExtensionSerializer for Serializer { - fn write_to(&self, _buf: &mut [u8], ev: &ExtensionValues) -> usize { - if ev.user_values.get::().is_some() { - // Writing the VLA header extension is currently not supported. - todo!(); + // +-+-+-+-+-+-+-+-+ + // |RID| NS| sl_bm | + // +-+-+-+-+-+-+-+-+ + // Spatial layer bitmask |sl0_bm |sl1_bm | + // up to 2 bytes |---------------| + // when sl_bm == 0 |sl2_bm |sl3_bm | + // +-+-+-+-+-+-+-+-+ + // Number of temporal layers |#tl|#tl|#tl|#tl| + // per spatial layer | | | | | + // +-+-+-+-+-+-+-+-+ + // Target bitrate in kpbs | | + // per temporal layer : ... : + // leb128 encoded | | + // +-+-+-+-+-+-+-+-+ + // Resolution and framerate | | + // 5 bytes per spatial layer + width-1 for + + // (optional) | rid=0, sid=0 | + // +---------------+ + // | | + // + height-1 for + + // | rid=0, sid=0 | + // +---------------+ + // | max framerate | + // +-+-+-+-+-+-+-+-+ + // : ... : + // +-+-+-+-+-+-+-+-+ + + fn write_to(&self, buf: &mut [u8], ev: &ExtensionValues) -> usize { + if let Some(vla) = ev.user_values.get::() { + let mut index = 0; + + buf[index] = 0; + + if vla.current_simulcast_stream_index == 0 && vla.simulcast_streams.is_empty() { + return index + 1; + } + + // RID: RTP stream index this allocation is sent on, numbered from 0. 2 bits. + buf[index] |= (vla.current_simulcast_stream_index & 0b11) << 6; + // NS: Number of RTP streams minus one. 2 bits, thus allowing up-to 4 RTP streams. + buf[index] |= ((vla.simulcast_streams.len() - 1) as u8 & 0b11) << 4; + + // sl_bm: BitMask of the active Spatial Layers when same for all RTP streams or 0 otherwise. + // 4 bits, thus allows up to 4 spatial layers per RTP streams. + let total_spatial_layers = vla.simulcast_streams.len(); + let spatial_layers = vla.simulcast_streams.iter().enumerate().fold( + [0u8; 4], + |mut spatial_layers, (stream_index, stream)| { + let sl_bm = stream.spatial_layers.iter().enumerate().fold( + 0u8, + |is_active, (layer_id, l)| { + is_active | if l.temporal_layers.is_empty() { 0 } else { 1 } << layer_id + }, + ); + + spatial_layers[stream_index] = sl_bm; + spatial_layers + }, + ); + + let shared_spatial_layer_bitmask = spatial_layers[..total_spatial_layers] + .iter() + .all(|i| *i == spatial_layers[0]); + + if shared_spatial_layer_bitmask { + buf[index] |= spatial_layers[0] & 0b1111; + } else { + // slX_bm: BitMask of the active Spatial Layers for RTP stream with index=X. + // When NS < 2, takes one byte, otherwise uses two bytes. Zero-padded to byte alignment. + for (stream_index, sl_bm) in + spatial_layers[..total_spatial_layers].iter().enumerate() + { + let shift = if stream_index % 2 == 0 { 4 } else { 0 }; + if shift == 4 { + index += 1; + buf[index] = 0; + } + buf[index + (stream_index / 2)] |= (sl_bm & 0b1111) << shift; + } + + // When writing 1 or 3 entries, skip the remaining nibble to be byte aligned + if total_spatial_layers % 2 != 0 { + buf[index] |= 0b1111; + } + } + + index += 1; + + // #tl: 2-bit value of number of temporal layers-1, thus allowing up-to 4 temporal layers. + // Values are stored in ascending order of spatial id. Zero-padded to byte alignment. + let mut tl_index = 0; + let mut wrote_temporal_layer_count = false; + for s in &vla.simulcast_streams { + for spatial in &s.spatial_layers { + if !spatial.temporal_layers.is_empty() { + wrote_temporal_layer_count = true; + let temporal_layer_count_minus_one = + (spatial.temporal_layers.len() - 1) as u8; + if tl_index % 4 == 0 { + tl_index = 0; + buf[index] = 0; + } + + buf[index] |= temporal_layer_count_minus_one << (8 - ((tl_index + 1) * 2)); + tl_index += 1; + } + } + } + + if wrote_temporal_layer_count { + // Ensure byte aligned + if tl_index % 4 != 0 { + index += 1; + } + } else { + buf[index] = 0; + index += 1; + return index; + } + + for s in &vla.simulcast_streams { + for spatial in &s.spatial_layers { + for temporal in &spatial.temporal_layers { + index += encode_leb_u63(temporal.cumulative_kbps, &mut buf[index..]); + } + } + } + + for s in &vla.simulcast_streams { + for spatial in &s.spatial_layers { + if let Some(r) = &spatial.resolution_and_framerate { + let width = (r.width - 1).to_be_bytes(); + let height = (r.height - 1).to_be_bytes(); + let framerate = r.framerate; + buf[index..index + 2].copy_from_slice(&width[..]); + index += 2; + buf[index..index + 2].copy_from_slice(&height[..]); + index += 2; + buf[index] = framerate; + index += 1; + } + } + } + + return index; } 0 } @@ -236,8 +377,7 @@ impl ExtensionSerializer for Serializer { } fn requires_two_byte_form(&self, _ev: &ExtensionValues) -> bool { - // Writing isn't implemented yet - false + true } } @@ -261,6 +401,24 @@ fn parse_leb_u63(bytes: &[u8]) -> (u64, &[u8]) { (0, bytes) } +/// Encodes leb128 +pub fn encode_leb_u63(mut value: u64, buf: &mut [u8]) -> usize { + let mut index = 0; + loop { + if value < 0x80 { + buf[index] = value as u8; + index += 1; + break; + } else { + buf[index] = ((value & 0x7f) | 0x80) as u8; + value >>= 7; + index += 1; + } + } + + index +} + // If successful, the size of the left will be mid, // and the size of the right while be buf.len()-mid. #[allow(dead_code)] @@ -328,6 +486,25 @@ fn read_bits(bits: u8, range: std::ops::Range) -> u8 { mod test { use super::*; + fn serialize(vla: Option<&VideoLayersAllocation>) -> Vec { + let Some(vla) = vla else { return Vec::new() }; + let mut buf: [u8; 100] = [0u8; 100]; + let mut ext_values: ExtensionValues = Default::default(); + ext_values.user_values.set(vla.clone()); + + let actual_size = Serializer {}.write_to(&mut buf, &ext_values); + + buf[..actual_size].to_vec() + } + + fn assert_ser_deser(bytes: &[u8], vla: Option) { + let vla_deserialized = VideoLayersAllocation::parse(bytes); + let vla_serialized = serialize(vla.as_ref()); + + assert_eq!(vla, vla_deserialized); + assert_eq!(bytes, vla_serialized); + } + #[test] fn test_read_bits() { assert_eq!(read_bits(0b1100_0000, 0..2), 0b0000_0011); @@ -396,20 +573,28 @@ mod test { #[test] fn test_parse_vla_empty_buffer() { - assert_eq!(VideoLayersAllocation::parse(&[]), None); + assert_ser_deser(&[], None); } #[test] fn test_parse_vla_empty() { - assert_eq!( - VideoLayersAllocation::parse(&[0b0000_0000]), + assert_ser_deser( + &[0b0000_0000], Some(VideoLayersAllocation { current_simulcast_stream_index: 0, simulcast_streams: vec![], - }) + }), ); } + #[test] + fn test_res() { + let vla = VideoLayersAllocation::parse(&[ + 17, 111, 7, 0, 15, 92, 4, 255, 2, 207, 30, 7, 127, 4, 55, 30, + ]); + assert!(vla.is_some()) + } + #[test] fn test_parse_vla_missing_spatial_layer_bitmasks() { assert_eq!(VideoLayersAllocation::parse(&[0b0110_0000]), None); @@ -417,18 +602,18 @@ mod test { #[test] fn test_parse_vla_1_simulcast_stream_with_no_active_layers() { - assert_eq!( - VideoLayersAllocation::parse(&[ + assert_ser_deser( + &[ 0b0100_0000, // 1 bitmask 0b0000_0000, - ]), + ], Some(VideoLayersAllocation { current_simulcast_stream_index: 1, simulcast_streams: vec![SimulcastStreamAllocation { spatial_layers: vec![], }], - }) + }), ); } @@ -452,16 +637,16 @@ mod test { }, SimulcastStreamAllocation { spatial_layers: vec![], - } + }, ], - }) + }), ); } #[test] fn test_parse_vla_3_simulcast_streams_with_1_active_spatial_layers_and_2_temporal_layers() { - assert_eq!( - VideoLayersAllocation::parse(&[ + assert_ser_deser( + &[ 0b0110_0001, // 3 temporal layer counts (minus 1), 2 bits each 0b0101_0100, @@ -472,7 +657,7 @@ mod test { 0b0000_1000, 0b0001_0000, 0b0010_0000, - ]), + ], Some(VideoLayersAllocation { current_simulcast_stream_index: 1, simulcast_streams: vec![ @@ -480,7 +665,7 @@ mod test { spatial_layers: vec![SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { cumulative_kbps: 1 }, - TemporalLayerAllocation { cumulative_kbps: 2 } + TemporalLayerAllocation { cumulative_kbps: 2 }, ], resolution_and_framerate: None, }], @@ -489,7 +674,7 @@ mod test { spatial_layers: vec![SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { cumulative_kbps: 4 }, - TemporalLayerAllocation { cumulative_kbps: 8 } + TemporalLayerAllocation { cumulative_kbps: 8 }, ], resolution_and_framerate: None, }], @@ -498,25 +683,25 @@ mod test { spatial_layers: vec![SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 16 + cumulative_kbps: 16, }, TemporalLayerAllocation { - cumulative_kbps: 32 - } + cumulative_kbps: 32, + }, ], resolution_and_framerate: None, }], - } + }, ], - }) + }), ); } #[test] fn test_parse_vla_3_simulcast_streams_with_1_active_spatial_layers_and_2_temporal_layers_with_resolutions( ) { - assert_eq!( - VideoLayersAllocation::parse(&[ + assert_ser_deser( + &[ 0b0110_0001, // 3 temporal layer counts (minus 1), 2 bits each 0b0101_0100, @@ -546,7 +731,7 @@ mod test { 2, 207, 60, - ]), + ], Some(VideoLayersAllocation { current_simulcast_stream_index: 1, simulcast_streams: vec![ @@ -554,11 +739,11 @@ mod test { spatial_layers: vec![SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 100 + cumulative_kbps: 100, }, TemporalLayerAllocation { - cumulative_kbps: 101 - } + cumulative_kbps: 101, + }, ], resolution_and_framerate: Some(ResolutionAndFramerate { width: 320, @@ -571,11 +756,11 @@ mod test { spatial_layers: vec![SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 110 + cumulative_kbps: 110, }, TemporalLayerAllocation { - cumulative_kbps: 111 - } + cumulative_kbps: 111, + }, ], resolution_and_framerate: Some(ResolutionAndFramerate { width: 640, @@ -588,11 +773,11 @@ mod test { spatial_layers: vec![SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 120 + cumulative_kbps: 120, }, TemporalLayerAllocation { - cumulative_kbps: 121 - } + cumulative_kbps: 121, + }, ], resolution_and_framerate: Some(ResolutionAndFramerate { width: 1280, @@ -600,16 +785,16 @@ mod test { framerate: 60, }), }], - } + }, ], - }) + }), ); } #[test] fn test_parse_vla_3_simulcast_streams_with_differing_active_spatial_layers_with_resolutions() { - assert_eq!( - VideoLayersAllocation::parse(&[ + assert_ser_deser( + &[ 0b0010_0000, // 3 active spatial layer bitmasks, 4 bits each; only the base layer is active 0b0001_0000, @@ -626,7 +811,7 @@ mod test { 0, 179, 15, - ]), + ], Some(VideoLayersAllocation { current_simulcast_stream_index: 0, simulcast_streams: vec![ @@ -634,11 +819,11 @@ mod test { spatial_layers: vec![SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 100 + cumulative_kbps: 100, }, TemporalLayerAllocation { - cumulative_kbps: 101 - } + cumulative_kbps: 101, + }, ], resolution_and_framerate: Some(ResolutionAndFramerate { width: 320, @@ -652,16 +837,16 @@ mod test { }, SimulcastStreamAllocation { spatial_layers: vec![], - } + }, ], - }) + }), ); } #[test] fn test_parse_vla_1_simulcast_streams_with_3_spatial_layers() { - assert_eq!( - VideoLayersAllocation::parse(&[ + assert_ser_deser( + &[ 0b0000_0111, // 3 temporal layer counts (minus 1), 2 bits each 0b0101_0100, @@ -672,7 +857,7 @@ mod test { 111, 120, 121, - ]), + ], Some(VideoLayersAllocation { current_simulcast_stream_index: 0, simulcast_streams: vec![SimulcastStreamAllocation { @@ -680,46 +865,46 @@ mod test { SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 100 + cumulative_kbps: 100, }, TemporalLayerAllocation { - cumulative_kbps: 101 - } + cumulative_kbps: 101, + }, ], resolution_and_framerate: None, }, SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 110 + cumulative_kbps: 110, }, TemporalLayerAllocation { - cumulative_kbps: 111 - } + cumulative_kbps: 111, + }, ], resolution_and_framerate: None, }, SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 120 + cumulative_kbps: 120, }, TemporalLayerAllocation { - cumulative_kbps: 121 - } + cumulative_kbps: 121, + }, ], resolution_and_framerate: None, - } + }, ], - },], - }) + }], + }), ); } #[test] fn test_parse_vla_1_simulcast_streams_with_4_spatial_layers_1_inactive() { - assert_eq!( - VideoLayersAllocation::parse(&[ + assert_ser_deser( + &[ 0b0000_1011, // 3 temporal layer counts (minus 1), 2 bits each 0b0101_0100, @@ -730,7 +915,7 @@ mod test { 111, 120, 121, - ]), + ], Some(VideoLayersAllocation { current_simulcast_stream_index: 0, simulcast_streams: vec![SimulcastStreamAllocation { @@ -738,22 +923,22 @@ mod test { SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 100 + cumulative_kbps: 100, }, TemporalLayerAllocation { - cumulative_kbps: 101 - } + cumulative_kbps: 101, + }, ], resolution_and_framerate: None, }, SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 110 + cumulative_kbps: 110, }, TemporalLayerAllocation { - cumulative_kbps: 111 - } + cumulative_kbps: 111, + }, ], resolution_and_framerate: None, }, @@ -764,17 +949,17 @@ mod test { SpatialLayerAllocation { temporal_layers: vec![ TemporalLayerAllocation { - cumulative_kbps: 120 + cumulative_kbps: 120, }, TemporalLayerAllocation { - cumulative_kbps: 121 - } + cumulative_kbps: 121, + }, ], resolution_and_framerate: None, - } + }, ], - },], - }) + }], + }), ); } } diff --git a/src/sdp/mod.rs b/src/sdp/mod.rs index 6d3e3857..918c3655 100644 --- a/src/sdp/mod.rs +++ b/src/sdp/mod.rs @@ -8,7 +8,7 @@ use thiserror::Error; mod data; pub(crate) use data::{FormatParam, Sdp, Session, SessionAttribute, Setup}; pub(crate) use data::{MediaAttribute, MediaLine, MediaType, Msid, Proto}; -pub(crate) use data::{Simulcast, SimulcastGroups}; +pub(crate) use data::{RestrictionId, Simulcast, SimulcastGroups}; pub(crate) use parser::parse_candidate; #[cfg(test)] @@ -42,6 +42,11 @@ impl SdpOffer { pub fn to_sdp_string(&self) -> String { self.0.to_string() } + + #[cfg(test)] + pub(crate) fn into_inner(self) -> Sdp { + self.0 + } } #[derive(Debug, PartialEq, Eq)] diff --git a/src/session.rs b/src/session.rs index 21435d6a..136c4944 100644 --- a/src/session.rs +++ b/src/session.rs @@ -249,11 +249,9 @@ impl Session { return; }; - let midrid = MidRid(padding_request.mid, None); - let stream = self .streams - .stream_tx_by_midrid(midrid) + .stream_tx_by_midrid(padding_request.midrid) .expect("pacer to use an existing stream"); stream.generate_padding(padding_request.padding); @@ -696,17 +694,17 @@ impl Session { let srtp_tx = self.srtp_tx.as_mut()?; // Figure out which, if any, queue to poll - let mid = self.pacer.poll_queue()?; + let midrid = self.pacer.poll_queue()?; let media = self .medias .iter() - .find(|m| m.mid() == mid) + .find(|m| m.mid() == midrid.mid()) .expect("index is media"); let buf = &mut self.poll_packet_buf; let twcc_seq = self.twcc; - let stream = self.streams.stream_tx_by_midrid(MidRid(mid, None))?; + let stream = self.streams.stream_tx_by_midrid(midrid)?; let params = &self.codec_config; let exts = media.remote_extmap(); @@ -734,7 +732,7 @@ impl Session { crate::log_stat!("PACKET_SENT", header.ssrc, payload_size, kind); } - self.pacer.register_send(now, payload_size.into(), mid); + self.pacer.register_send(now, payload_size.into(), midrid); if let Some(raw_packets) = &mut self.raw_packets { raw_packets.push_back(Box::new(RawPacket::RtpTx(header.clone(), buf.clone()))); diff --git a/src/streams/mod.rs b/src/streams/mod.rs index 622d32c0..a264e6c0 100644 --- a/src/streams/mod.rs +++ b/src/streams/mod.rs @@ -193,7 +193,7 @@ impl Streams { .expect("map_dynamic_by_rid to be called with Rid"); // Check if the mid/rid combo is not expected - if !media.rids_rx().expects(rid) { + if !media.rids_rx().contains(rid) { trace!("Mid does not expect rid: {} {}", midrid.mid(), rid); return; } diff --git a/src/streams/send.rs b/src/streams/send.rs index adf249ff..48cd6af1 100644 --- a/src/streams/send.rs +++ b/src/streams/send.rs @@ -123,6 +123,11 @@ pub struct StreamTx { // The _main_ PT to use for padding. This is main PT, since the poll_packet() loop // figures out the param.resend() RTX PT using main. pt_for_padding: Option, + + /// Whether a receiver report has been received for this SSRC, thus acknowledging + /// that the receiver has bound the Mid/Rid tuple to the SSRC and no longer + /// needs to be sent on every packet + remote_acked_ssrc: bool, } /// Holder of stats. @@ -203,6 +208,7 @@ impl StreamTx { stats: StreamTxStats::default(), rtx_ratio: (0.0, already_happened()), pt_for_padding: None, + remote_acked_ssrc: false, } } @@ -365,6 +371,7 @@ impl StreamTx { let mid = self.midrid.mid(); let rid = self.midrid.rid(); let ssrc_rtx = self.rtx; + let remote_acked_ssrc = self.remote_acked_ssrc; let (next, is_padding) = if let Some(next) = self.poll_packet_resend(now) { (next, false) @@ -382,8 +389,23 @@ impl StreamTx { // TODO: Can we remove this? let header_ref = &mut next.pkt.header; + // + // BUNDLE requires that the receiver "bind" the received SSRC to the values + // in the MID and/or (R)RID header extensions if present. Therefore, the + // sender can reduce overhead by omitting these header extensions once it + // knows that the receiver has "bound" the SSRC. + // + // The algorithm here is fairly simple: Always attach a MID and/or RID (if + // configured) to the outgoing packets until an RTCP receiver report comes + // back for this SSRC. That feedback indicates the receiver must have + // received a packet with the SSRC and header extension(s), so the sender + // then stops attaching the MID and RID. + // This is true also for RTX. - header_ref.ext_vals.mid = Some(mid); + if !remote_acked_ssrc { + header_ref.ext_vals.mid = Some(mid); + header_ref.ext_vals.rid = rid; + } let pt_main = header_ref.payload_type; @@ -431,7 +453,6 @@ impl StreamTx { next.pkt.time = time; // Modify the original (and also cached) header value. - header_ref.ext_vals.rid = rid; header_ref.ext_vals.rid_repair = None; header_ref.clone() @@ -759,7 +780,11 @@ impl StreamTx { pub(crate) fn handle_rtcp(&mut self, now: Instant, fb: RtcpFb) { use RtcpFb::*; match fb { - ReceptionReport(r) => self.stats.update_with_rr(now, r), + ReceptionReport(r) => { + // Receiver has bound MidRid to SSRC + self.remote_acked_ssrc = true; + self.stats.update_with_rr(now, r) + } Nack(_, list) => { self.stats.increase_nacks(); let entries = list.into_iter(); @@ -923,7 +948,7 @@ impl StreamTx { } QueueState { - mid: self.midrid.mid(), + midrid: self.midrid, unpaced, use_for_padding, snapshot, diff --git a/tests/bidirectional.rs b/tests/bidirectional.rs index 776469d5..f7d9ab52 100644 --- a/tests/bidirectional.rs +++ b/tests/bidirectional.rs @@ -23,7 +23,7 @@ pub fn bidirectional_same_m_line() -> Result<(), RtcError> { r.add_local_candidate(host2); let mut change = l.sdp_api(); - let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None); + let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None); let (offer, pending) = change.apply().unwrap(); let answer = r.rtc.sdp_api().accept_offer(offer)?; diff --git a/tests/contiguous.rs b/tests/contiguous.rs index 5a92b151..ff113084 100644 --- a/tests/contiguous.rs +++ b/tests/contiguous.rs @@ -158,7 +158,7 @@ impl Server { // The change is on the L (sending side) with Direction::SendRecv. let mut change = l.sdp_api(); - let mid = change.add_media(MediaKind::Video, Direction::SendOnly, None, None); + let mid = change.add_media(MediaKind::Video, Direction::SendOnly, None, None, None); let (offer, pending) = change.apply().unwrap(); let answer = r.rtc.sdp_api().accept_offer(offer)?; diff --git a/tests/keyframes.rs b/tests/keyframes.rs index b1a50f77..3c521911 100644 --- a/tests/keyframes.rs +++ b/tests/keyframes.rs @@ -25,7 +25,7 @@ pub fn test_vp8_keyframes_detection() -> Result<(), RtcError> { // The change is on the L (sending side) with Direction::SendRecv. let mut change = l.sdp_api(); - let mid = change.add_media(MediaKind::Video, Direction::SendOnly, None, None); + let mid = change.add_media(MediaKind::Video, Direction::SendOnly, None, None, None); let (offer, pending) = change.apply().unwrap(); let answer = r.rtc.sdp_api().accept_offer(offer)?; @@ -117,7 +117,7 @@ pub fn test_vp9_keyframes_detection() -> Result<(), RtcError> { // The change is on the L (sending side) with Direction::SendRecv. let mut change = l.sdp_api(); - let mid = change.add_media(MediaKind::Video, Direction::SendOnly, None, None); + let mid = change.add_media(MediaKind::Video, Direction::SendOnly, None, None, None); let (offer, pending) = change.apply().unwrap(); let answer = r.rtc.sdp_api().accept_offer(offer)?; @@ -210,7 +210,7 @@ pub fn test_h264_keyframes_detection() -> Result<(), RtcError> { // The change is on the L (sending side) with Direction::SendRecv. let mut change = l.sdp_api(); - let mid = change.add_media(MediaKind::Video, Direction::SendOnly, None, None); + let mid = change.add_media(MediaKind::Video, Direction::SendOnly, None, None, None); let (offer, pending) = change.apply().unwrap(); let answer = r.rtc.sdp_api().accept_offer(offer)?; diff --git a/tests/remb.rs b/tests/remb.rs index eb5e284a..ed78688a 100644 --- a/tests/remb.rs +++ b/tests/remb.rs @@ -26,7 +26,7 @@ pub fn remb() -> Result<(), RtcError> { r.add_local_candidate(host2); let mid = negotiate(&mut l, &mut r, |change| { - change.add_media(MediaKind::Video, Direction::SendOnly, None, None) + change.add_media(MediaKind::Video, Direction::SendOnly, None, None, None) }); loop { diff --git a/tests/rtp-direct-mid-rid.rs b/tests/rtp-direct-mid-rid.rs index 8ce9d882..d7fb563e 100644 --- a/tests/rtp-direct-mid-rid.rs +++ b/tests/rtp-direct-mid-rid.rs @@ -29,7 +29,7 @@ pub fn rtp_direct_mid_rid() -> Result<(), RtcError> { r.direct_api() .declare_media(mid, MediaKind::Audio) - .expect_rid(rid); + .expect_rid_rx(rid); let max = l.last.max(r.last); l.last = max; diff --git a/tests/rtx-cache-0.rs b/tests/rtx-cache-0.rs index bdd57d30..6b7a2cd7 100644 --- a/tests/rtx-cache-0.rs +++ b/tests/rtx-cache-0.rs @@ -32,7 +32,7 @@ pub fn rtx_cache_0() -> Result<(), RtcError> { r.direct_api() .declare_media(mid, MediaKind::Audio) - .expect_rid(rid); + .expect_rid_rx(rid); let max = l.last.max(r.last); l.last = max; diff --git a/tests/sdp-negotiation.rs b/tests/sdp-negotiation.rs index 52c9af9b..3b49a9fe 100644 --- a/tests/sdp-negotiation.rs +++ b/tests/sdp-negotiation.rs @@ -172,7 +172,7 @@ pub fn answer_different_pt_to_offer() { assert_eq!(&[vp8(96)], &**r.codec_config()); let mut change = r.sdp_api(); - change.add_media(MediaKind::Video, Direction::SendOnly, None, None); + change.add_media(MediaKind::Video, Direction::SendOnly, None, None, None); let (offer, _pending) = change.apply().unwrap(); let sdp = offer.to_sdp_string(); @@ -332,7 +332,7 @@ fn non_media_creator_cannot_change_inactive_to_recvonly() { ); negotiate(&mut l, &mut r, |change| { - change.add_media(MediaKind::Video, Direction::Inactive, None, None); + change.add_media(MediaKind::Video, Direction::Inactive, None, None, None); }); let mid = r._mids()[0]; let m_r = r.media(mid).unwrap(); @@ -367,7 +367,7 @@ fn media_creator_can_change_inactive_to_recvonly() { ); negotiate(&mut l, &mut r, |change| { - change.add_media(MediaKind::Video, Direction::Inactive, None, None); + change.add_media(MediaKind::Video, Direction::Inactive, None, None, None); }); let mid = r._mids()[0]; let m_r = r.media(mid).unwrap(); @@ -400,7 +400,7 @@ fn with_params( .unwrap_or(MediaKind::Audio); negotiate(&mut l, &mut r, |change| { - change.add_media(kind, Direction::SendRecv, None, None); + change.add_media(kind, Direction::SendRecv, None, None, None); }); (l, r) @@ -411,7 +411,7 @@ fn with_exts(exts_l: ExtensionMap, exts_r: ExtensionMap) -> (TestRtc, TestRtc) { let mut r = build_exts(info_span!("R"), exts_r); negotiate(&mut l, &mut r, |change| { - change.add_media(MediaKind::Video, Direction::SendRecv, None, None); + change.add_media(MediaKind::Video, Direction::SendRecv, None, None, None); }); (l, r) diff --git a/tests/stats.rs b/tests/stats.rs index d440fa54..90c35561 100644 --- a/tests/stats.rs +++ b/tests/stats.rs @@ -27,7 +27,7 @@ pub fn stats() -> Result<(), RtcError> { r.add_local_candidate(host2); let mut change = l.sdp_api(); - let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None); + let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None); let (offer, pending) = change.apply().unwrap(); let answer = r.rtc.sdp_api().accept_offer(offer)?; diff --git a/tests/twcc.rs b/tests/twcc.rs index 68ccd21d..5e4c07f9 100644 --- a/tests/twcc.rs +++ b/tests/twcc.rs @@ -27,7 +27,7 @@ pub fn twcc() -> Result<(), RtcError> { r.add_local_candidate(host2); let mid = negotiate(&mut l, &mut r, |change| { - change.add_media(MediaKind::Video, Direction::SendOnly, None, None) + change.add_media(MediaKind::Video, Direction::SendOnly, None, None, None) }); loop { diff --git a/tests/unidirectional-r-create-media.rs b/tests/unidirectional-r-create-media.rs index a52cd3eb..1b60a3e8 100644 --- a/tests/unidirectional-r-create-media.rs +++ b/tests/unidirectional-r-create-media.rs @@ -25,7 +25,7 @@ pub fn unidirectional_r_create_media() -> Result<(), RtcError> { // The change is on the R (not sending side) with Direction::RecvOnly. let mut change = r.sdp_api(); - let mid = change.add_media(MediaKind::Audio, Direction::RecvOnly, None, None); + let mid = change.add_media(MediaKind::Audio, Direction::RecvOnly, None, None, None); let (offer, pending) = change.apply().unwrap(); // str0m always produces a=ssrc lines, also for RecvOnly (since direction can change). diff --git a/tests/unidirectional.rs b/tests/unidirectional.rs index ddd723d6..806a861e 100644 --- a/tests/unidirectional.rs +++ b/tests/unidirectional.rs @@ -24,7 +24,7 @@ pub fn unidirectional() -> Result<(), RtcError> { // The change is on the L (sending side) with Direction::SendRecv. let mut change = l.sdp_api(); - let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None); + let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None); let (offer, pending) = change.apply().unwrap(); let answer = r.rtc.sdp_api().accept_offer(offer)?; diff --git a/tests/user-rtp-header-extension.rs b/tests/user-rtp-header-extension.rs index d860017e..e9af72a6 100644 --- a/tests/user-rtp-header-extension.rs +++ b/tests/user-rtp-header-extension.rs @@ -89,7 +89,7 @@ pub fn user_rtp_header_extension() -> Result<(), RtcError> { // The change is on the L (sending side) with Direction::SendRecv. let mut change = l.sdp_api(); - let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None); + let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None); let (offer, pending) = change.apply().unwrap(); let offer_str = offer.to_sdp_string(); let offer_parsed = @@ -232,7 +232,7 @@ pub fn user_rtp_header_extension_two_byte_form() -> Result<(), RtcError> { // The change is on the L (sending side) with Direction::SendRecv. let mut change = l.sdp_api(); - let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None); + let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None); let (offer, pending) = change.apply().unwrap(); let answer = r.rtc.sdp_api().accept_offer(offer)?;