Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simulcast sending support #603

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,13 @@ impl Client {
if let TrackOutState::ToOpen = track.state {
if let Some(track_in) = track.track_in.upgrade() {
let stream_id = track_in.origin.to_string();
let mid =
change.add_media(track_in.kind, Direction::SendOnly, Some(stream_id), None);
let mid = change.add_media(
track_in.kind,
Direction::SendOnly,
Some(stream_id),
None,
None,
);
track.state = TrackOutState::Negotiating(mid);
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/change/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,19 @@ impl<'a> DirectApi<'a> {
mid: Mid,
rid: Option<Rid>,
) -> &mut StreamTx {
let Some(media) = self.rtc.session.media_by_mid(mid) else {
let Some(media) = self.rtc.session.media_by_mid_mut(mid) else {
panic!("No media declared for mid: {}", mid);
};

let is_audio = media.kind().is_audio();

let midrid = MidRid(mid, rid);

// If there is a RID tx, declare it so we an use it in Writer API
if let Some(rid) = rid {
media.add_to_rid_tx(rid);
}

let stream = self
.rtc
.session
Expand Down
205 changes: 147 additions & 58 deletions src/change/sdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::crypto::Fingerprint;
use crate::format::CodecConfig;
use crate::format::PayloadParams;
use crate::io::Id;
use crate::media::Media;
use crate::media::{Media, Simulcast};
use crate::packet::MediaKind;
use crate::rtp_::MidRid;
use crate::rtp_::Rid;
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<'a> SdpApi<'a> {
/// let mut rtc = Rtc::new();
///
/// let mut changes = rtc.sdp_api();
/// let mid = changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None);
/// let mid = changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None, None);
/// let (offer, pending) = changes.apply().unwrap();
///
/// // send offer to remote peer, receive answer back
Expand Down Expand Up @@ -201,7 +201,7 @@ impl<'a> SdpApi<'a> {
/// let mut changes = rtc.sdp_api();
/// assert!(!changes.has_changes());
///
/// let mid = changes.add_media(MediaKind::Audio, Direction::SendRecv, None, None);
/// let mid = changes.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None);
/// assert!(changes.has_changes());
/// # }
/// ```
Expand All @@ -227,7 +227,7 @@ impl<'a> SdpApi<'a> {
///
/// let mut changes = rtc.sdp_api();
///
/// let mid = changes.add_media(MediaKind::Audio, Direction::SendRecv, None, None);
/// let mid = changes.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None);
/// # }
/// ```
pub fn add_media(
Expand All @@ -236,6 +236,7 @@ impl<'a> SdpApi<'a> {
dir: Direction,
stream_id: Option<String>,
track_id: Option<String>,
simulcast: Option<crate::media::Simulcast>,
algesten marked this conversation as resolved.
Show resolved Hide resolved
) -> Mid {
let mid = self.rtc.new_mid();

Expand Down Expand Up @@ -266,8 +267,15 @@ impl<'a> SdpApi<'a> {
Id::<20>::random().to_string()
};

let rtx = kind.is_video().then(|| self.rtc.session.streams.new_ssrc());
let ssrcs = vec![(self.rtc.session.streams.new_ssrc(), rtx)];
let mut ssrcs = Vec::new();

// Main SSRC, not counting RTX.
let main_ssrc_count = simulcast.as_ref().map(|s| s.send.len()).unwrap_or(1);

for _ in 0..main_ssrc_count {
let rtx = kind.is_video().then(|| self.rtc.session.streams.new_ssrc());
ssrcs.push((self.rtc.session.streams.new_ssrc(), rtx));
}

// TODO: let user configure stream/track name.
let msid = Msid {
Expand All @@ -282,6 +290,7 @@ impl<'a> SdpApi<'a> {
kind,
dir,
ssrcs,
simulcast,

// Added later
pts: vec![],
Expand Down Expand Up @@ -459,11 +468,11 @@ impl<'a> SdpApi<'a> {
/// # use str0m::Rtc;
/// let mut rtc = Rtc::new();
/// let mut changes = rtc.sdp_api();
/// changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None);
/// changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None, None);
/// let (_offer, pending) = changes.apply().unwrap();
///
/// let mut changes = rtc.sdp_api();
/// changes.add_media(MediaKind::Video, Direction::SendOnly, None, None);
/// changes.add_media(MediaKind::Video, Direction::SendOnly, None, None, None);
/// changes.merge(pending);
///
/// // This `SdpOffer` will have changes from the first `SdpPendingChanges`
Expand All @@ -489,7 +498,7 @@ impl<'a> SdpApi<'a> {
/// let mut rtc = Rtc::new();
///
/// let mut changes = rtc.sdp_api();
/// let mid = changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None);
/// let mid = changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None, None);
/// let (offer, pending) = changes.apply().unwrap();
///
/// // send offer to remote peer, receive answer back
Expand Down Expand Up @@ -561,6 +570,7 @@ pub(crate) struct AddMedia {
pub kind: MediaKind,
pub dir: Direction,
pub ssrcs: Vec<(Ssrc, Option<Ssrc>)>,
pub simulcast: Option<Simulcast>,

// pts and index are filled in when creating the SDP OFFER.
// The default PT order is set by the Session (BUNDLE).
Expand Down Expand Up @@ -894,9 +904,12 @@ fn add_pending_changes(session: &mut Session, pending: Changes) {
media.set_cname(add_media.cname);
media.set_msid(add_media.msid);

for (ssrc, rtx) in add_media.ssrcs {
// TODO: When we allow sending RID, we need to add that here.
let midrid = MidRid(add_media.mid, None);
// If there are RIDs, the SSRC order matches that of the rid order.
let rids = add_media.simulcast.map(|x| x.send).unwrap_or(vec![]);

for (i, (ssrc, rtx)) in add_media.ssrcs.into_iter().enumerate() {
let maybe_rid = rids.get(i).cloned();
let midrid = MidRid(add_media.mid, maybe_rid);

let stream = session.streams.declare_stream_tx(ssrc, rtx, midrid);

Expand Down Expand Up @@ -1068,8 +1081,14 @@ fn update_media(
media.set_direction(new_dir);
}

for rid in m.rids().iter() {
media.expect_rid(*rid);
if new_dir.is_sending() {
// The other side has declared how it EXPECTING to receive. We must only send
// the RIDs declared in the answer.
media.set_rid_tx(m.rids().into());
}
if new_dir.is_receiving() {
// The other side has declared what it proposes to send. We are accepting it.
media.set_rid_rx(m.rids().into());
}

// Narrowing/ordering of of PT
Expand Down Expand Up @@ -1102,44 +1121,47 @@ fn update_media(
}
media.set_remote_extmap(remote_extmap);

if new_dir.is_receiving() {
// SSRC changes
// This will always be for ReceiverSource since any incoming a=ssrc line will be
// about the remote side's SSRC.
let infos = m.ssrc_info();
let main = infos.iter().filter(|i| i.repairs.is_none());

if m.simulcast().is_none() {
// Only use pre-communicated SSRC if we are running without simulcast.
// We found a bug in FF where the order of the simulcast lines does not
// correspond to the order of the simulcast declarations. In this case
// it's better to fall back on mid/rid dynamic mapping.

for i in main {
// TODO: If the remote is communicating _BOTH_ rid and a=ssrc this will fail.
info!("Adding pre-communicated SSRC: {:?}", i);
let repair_ssrc = infos
.iter()
.find(|r| r.repairs == Some(i.ssrc))
.map(|r| r.ssrc);

// If remote communicated a main a=ssrc, but no RTX, we will not send nacks.
let midrid = MidRid(media.mid(), None);
let suppress_nack = repair_ssrc.is_none();
streams.expect_stream_rx(i.ssrc, repair_ssrc, midrid, suppress_nack);
}
}
// SSRC changes
// This will always be for ReceiverSource since any incoming a=ssrc line will be
// about the remote side's SSRC.
if !new_dir.is_receiving() {
return;
}

// Simulcast configuration
if let Some(s) = m.simulcast() {
if s.is_munged {
warn!("Not supporting simulcast via munging SDP");
} else if media.simulcast().is_none() {
// Invert before setting, since it has a recv and send config.
media.set_simulcast(s.invert());
}
// Simulcast configuration
if let Some(s) = m.simulcast() {
if s.is_munged {
warn!("Not supporting simulcast via munging SDP");
} else if media.simulcast().is_none() {
// Invert before setting, since it has a recv and send config.
media.set_simulcast(s.invert());
}
}

// Only use pre-communicated SSRC if we are running without simulcast.
// We found a bug in FF where the order of the simulcast lines does not
// correspond to the order of the simulcast declarations. In this case
// it's better to fall back on mid/rid dynamic mapping.
if m.simulcast().is_some() {
return;
}

let infos = m.ssrc_info();
let main = infos.iter().filter(|i| i.repairs.is_none());

for i in main {
// TODO: If the remote is communicating _BOTH_ rid and a=ssrc this will fail.
info!("Adding pre-communicated SSRC: {:?}", i);
let repair_ssrc = infos
.iter()
.find(|r| r.repairs == Some(i.ssrc))
.map(|r| r.ssrc);

// If remote communicated a main a=ssrc, but no RTX, we will not send nacks.
let midrid = MidRid(media.mid(), None);
let suppress_nack = repair_ssrc.is_none();
streams.expect_stream_rx(i.ssrc, repair_ssrc, midrid, suppress_nack);
}
}

trait AsSdpMediaLine {
Expand Down Expand Up @@ -1291,18 +1313,13 @@ impl AsSdpMediaLine for Media {
}
}

let count = ssrcs_tx.len();
#[allow(clippy::comparison_chain)]
if count == 1 {
let (ssrc, ssrc_rtx) = &ssrcs_tx[0];
for (ssrc, ssrc_rtx) in ssrcs_tx {
if let Some(ssrc_rtx) = ssrc_rtx {
attrs.push(MediaAttribute::SsrcGroup {
semantics: "FID".to_string(),
ssrcs: vec![*ssrc, *ssrc_rtx],
});
}
} else {
// TODO: handle simulcast
}

MediaLine {
Expand Down Expand Up @@ -1549,7 +1566,10 @@ impl Change {

#[cfg(test)]
mod test {
use sdp::RestrictionId;

use crate::format::Codec;
use crate::media::Simulcast;
use crate::sdp::RtpMap;

use super::*;
Expand Down Expand Up @@ -1595,11 +1615,11 @@ mod test {

let mut rtc = Rtc::new();
let mut changes = rtc.sdp_api();
changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None);
changes.add_media(MediaKind::Audio, Direction::SendOnly, None, None, None);
let (offer, pending) = changes.apply().unwrap();

let mut changes = rtc.sdp_api();
changes.add_media(MediaKind::Video, Direction::SendOnly, None, None);
changes.add_media(MediaKind::Video, Direction::SendOnly, None, None, None);
changes.merge(pending);
let (new_offer, _) = changes.apply().unwrap();

Expand All @@ -1624,7 +1644,7 @@ mod test {
.build();

let mut change1 = rtc1.sdp_api();
change1.add_media(MediaKind::Video, Direction::SendOnly, None, None);
change1.add_media(MediaKind::Video, Direction::SendOnly, None, None, None);
let (offer1, _) = change1.apply().unwrap();

let answer = rtc2.sdp_api().accept_offer(offer1).unwrap();
Expand Down Expand Up @@ -1652,4 +1672,73 @@ mod test {
"VP9 was not offered, so it should not be present in the answer"
);
}

#[test]
fn simulcast_ssrc_allocation() {
crate::init_crypto_default();

let mut rtc1 = Rtc::new();

let mut change = rtc1.sdp_api();
change.add_media(
MediaKind::Video,
Direction::SendOnly,
None,
None,
Some(Simulcast {
send: vec!["m".into(), "h".into(), "l".into()],
recv: vec![],
}),
);

let Change::AddMedia(am) = &change.changes[0] else {
panic!("Not AddMedia?!");
};

// these should be organized in order: m, h, l
let pending_ssrcs = am.ssrcs.clone();
assert_eq!(pending_ssrcs.len(), 3);

for p in &pending_ssrcs {
assert!(p.1.is_some()); // all should have rtx
}

let (offer, _) = change.apply().unwrap();
let sdp = offer.into_inner();
let line = &sdp.media_lines[0];

assert_eq!(
line.simulcast().unwrap().send,
SimulcastGroups(vec![
RestrictionId("m".into(), true),
RestrictionId("h".into(), true),
RestrictionId("l".into(), true),
])
);

// Each SSRC, both regular and RTX get their own a=ssrc line.
assert_eq!(line.ssrc_info().len(), pending_ssrcs.len() * 2);

let fids: Vec<_> = line
.attrs
.iter()
.filter_map(|a| {
if let MediaAttribute::SsrcGroup { semantics, ssrcs } = a {
// We don't have any other semantics right now.
assert_eq!(semantics, "FID");
assert_eq!(ssrcs.len(), 2);
Some((ssrcs[0], ssrcs[1]))
} else {
None
}
})
.collect();

assert_eq!(fids.len(), pending_ssrcs.len());

for (a, b) in fids.iter().zip(pending_ssrcs.iter()) {
assert_eq!(a.0, b.0);
assert_eq!(Some(a.1), b.1);
}
}
}
Loading
Loading