Skip to content

Commit

Permalink
Use MidRid in many more places
Browse files Browse the repository at this point in the history
  • Loading branch information
algesten committed Dec 28, 2024
1 parent d5b6d70 commit 96b7c97
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 169 deletions.
15 changes: 11 additions & 4 deletions src/change/direct.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::channel::ChannelId;
use crate::crypto::Fingerprint;
use crate::media::{Media, MediaKind};
use crate::rtp_::MidRid;
use crate::rtp_::{Mid, Rid, Ssrc};
use crate::sctp::ChannelConfig;
use crate::streams::{StreamRx, StreamTx, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP};
Expand Down Expand Up @@ -187,10 +188,12 @@ impl<'a> DirectApi<'a> {
// By default we do not suppress nacks, this has to be called explicitly by the user of direct API.
let suppress_nack = false;

let midrid = MidRid(mid, rid);

self.rtc
.session
.streams
.expect_stream_rx(ssrc, rtx, mid, rid, suppress_nack)
.expect_stream_rx(ssrc, rtx, midrid, suppress_nack)
}

/// Remove the receive stream for the given SSRC.
Expand All @@ -211,7 +214,8 @@ impl<'a> DirectApi<'a> {

/// Obtain a recv stream by looking it up via mid/rid.
pub fn stream_rx_by_mid(&mut self, mid: Mid, rid: Option<Rid>) -> Option<&mut StreamRx> {
self.rtc.session.streams.stream_rx_by_mid_rid(mid, rid)
let midrid = MidRid(mid, rid);
self.rtc.session.streams.stream_rx_by_midrid(midrid)
}

/// Declare the intention to send data using the given SSRC.
Expand All @@ -234,11 +238,13 @@ impl<'a> DirectApi<'a> {

let is_audio = media.kind().is_audio();

let midrid = MidRid(mid, rid);

let stream = self
.rtc
.session
.streams
.declare_stream_tx(ssrc, rtx, mid, rid);
.declare_stream_tx(ssrc, rtx, midrid);

let size = if is_audio {
self.rtc.session.send_buffer_audio
Expand Down Expand Up @@ -267,6 +273,7 @@ impl<'a> DirectApi<'a> {

/// Obtain a send stream by looking it up via mid/rid.
pub fn stream_tx_by_mid(&mut self, mid: Mid, rid: Option<Rid>) -> Option<&mut StreamTx> {
self.rtc.session.streams.stream_tx_by_mid_rid(mid, rid)
let midrid = MidRid(mid, rid);
self.rtc.session.streams.stream_tx_by_midrid(midrid)
}
}
21 changes: 10 additions & 11 deletions src/change/sdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::format::PayloadParams;
use crate::io::Id;
use crate::media::Media;
use crate::packet::MediaKind;
use crate::rtp_::MidRid;
use crate::rtp_::Rid;
use crate::rtp_::{Direction, Extension, ExtensionMap, Mid, Pt, Ssrc};
use crate::sctp::ChannelConfig;
Expand Down Expand Up @@ -839,11 +840,10 @@ fn ensure_stream_tx(session: &mut Session) {
.any(|p| p.resend().is_some());

for rid in rids {
let midrid = MidRid(media.mid(), rid);

// If we already have the stream, we don't make any new one.
let has_stream = session
.streams
.stream_tx_by_mid_rid(media.mid(), rid)
.is_some();
let has_stream = session.streams.stream_tx_by_midrid(midrid).is_some();

if has_stream {
continue;
Expand All @@ -857,9 +857,7 @@ fn ensure_stream_tx(session: &mut Session) {
(ssrc, None)
};

let stream = session
.streams
.declare_stream_tx(ssrc, rtx, media.mid(), rid);
let stream = session.streams.declare_stream_tx(ssrc, rtx, midrid);

// Configure cache size
let size = if media.kind().is_audio() {
Expand Down Expand Up @@ -899,9 +897,9 @@ fn add_pending_changes(session: &mut Session, pending: Changes) {

for (ssrc, rtx) in add_media.ssrcs {
// TODO: When we allow sending RID, we need to add that here.
let stream = session
.streams
.declare_stream_tx(ssrc, rtx, add_media.mid, None);
let midrid = MidRid(add_media.mid, None);

let stream = session.streams.declare_stream_tx(ssrc, rtx, midrid);

let size = if media.kind().is_audio() {
session.send_buffer_audio
Expand Down Expand Up @@ -1127,8 +1125,9 @@ fn update_media(
.map(|r| r.ssrc);

// If remote communicated a main a=ssrc, but no RTX, we will not send nacks.
let midrid = MidRid(media.mid(), None);
let suppress_nack = repair_ssrc.is_none();
streams.expect_stream_rx(i.ssrc, repair_ssrc, media.mid(), None, suppress_nack);
streams.expect_stream_rx(i.ssrc, repair_ssrc, midrid, suppress_nack);
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/media/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::format::CodecConfig;
use crate::io::{Id, DATAGRAM_MTU};
use crate::packet::{DepacketizingBuffer, Payloader, RtpMeta};
use crate::rtp_::ExtensionMap;
use crate::rtp_::MidRid;
use crate::rtp_::SRTP_BLOCK_SIZE;
use crate::rtp_::SRTP_OVERHEAD;
use crate::RtcError;
Expand Down Expand Up @@ -390,7 +391,9 @@ impl Media {

let is_audio = self.kind.is_audio();

let stream = streams.stream_tx_by_mid_rid(self.mid, *rid);
let midrid = MidRid(self.mid, *rid);

let stream = streams.stream_tx_by_midrid(midrid);

let Some(stream) = stream else {
return Err(RtcError::NoSenderSource);
Expand Down
5 changes: 4 additions & 1 deletion src/media/writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Instant;

use crate::format::PayloadParams;
use crate::rtp_::MidRid;
use crate::rtp_::VideoOrientation;
use crate::session::Session;
use crate::RtcError;
Expand Down Expand Up @@ -204,10 +205,12 @@ impl<'a> Writer<'a> {
return Err(RtcError::NotReceivingDirection);
}

let midrid = MidRid(self.mid, rid);

let stream = self
.session
.streams
.stream_rx_by_mid_rid(self.mid, rid)
.stream_rx_by_midrid(midrid)
.ok_or_else(|| RtcError::NoReceiverSource(rid))?;

stream.request_keyframe(kind);
Expand Down
76 changes: 34 additions & 42 deletions src/packet/pacer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::time::{Duration, Instant};

use crate::rtp_::MidRid;
use crate::rtp_::{Bitrate, DataSize, Mid, Rid};
use crate::rtp_::{Bitrate, DataSize};
use crate::util::already_happened;
use crate::util::not_happening;
use crate::util::Soonest;
Expand Down Expand Up @@ -169,8 +169,7 @@ impl Default for QueueSnapshot {
/// The pacer manages packets across several upstream queues.
#[derive(Debug, Clone, Copy)]
pub struct QueueState {
pub mid: Mid,
pub rid: Option<Rid>,
pub midrid: MidRid,
pub unpaced: bool,
pub use_for_padding: bool,
pub snapshot: QueueSnapshot,
Expand All @@ -179,10 +178,8 @@ pub struct QueueState {
/// A request to generate a specific amount of padding.
#[derive(Debug, Clone, Copy)]
pub struct PaddingRequest {
/// The Mid that should generate and queue the padding.
pub mid: Mid,
/// The Rid that might be used for this stream.
pub rid: Option<Rid>,
/// The Mid/Rid that should generate and queue the padding.
pub midrid: MidRid,
/// The amount of padding in bytes to generate.
pub padding: usize,
}
Expand Down Expand Up @@ -229,10 +226,9 @@ impl Pacer for NullPacer {
.iter()
.filter(|q| q.snapshot.packet_count > 0);
// Pick a queue using round robin, prioritize the least recently sent on queue.
let to_send_on =
non_empty_queues.min_by_key(|q| self.last_sends.get(&MidRid::new(q.mid, q.rid)));
let to_send_on = non_empty_queues.min_by_key(|q| self.last_sends.get(&q.midrid));

let result = to_send_on.map(|q| MidRid::new(q.mid, q.rid));
let result = to_send_on.map(|q| q.midrid);

if result.is_some() {
self.need_immediate_timeout = true;
Expand Down Expand Up @@ -310,7 +306,7 @@ impl Pacer for LeakyBucketPacer {
self.maybe_update_adjusted_bitrate(now);

if let Some(request) = self.maybe_create_padding_request() {
self.next_poll_queue = Some(MidRid::new(request.mid, request.rid));
self.next_poll_queue = Some(request.midrid);
return Some(request);
}

Expand All @@ -337,7 +333,7 @@ impl Pacer for LeakyBucketPacer {
self.next_poll_queue = None;
self.next_poll_time = Some(next_poll_time);
} else {
self.next_poll_queue = queue.map(|q| MidRid::new(q.mid, q.rid));
self.next_poll_queue = queue.map(|q| q.midrid);
self.next_poll_time = Some(next_poll_time);
}

Expand Down Expand Up @@ -565,8 +561,7 @@ impl LeakyBucketPacer {
let padding = (self.padding_bitrate * PADDING_BURST_INTERVAL).as_bytes_usize();

Some(PaddingRequest {
mid: queue.mid,
rid: queue.rid,
midrid: queue.midrid,
padding,
})
}
Expand Down Expand Up @@ -602,7 +597,7 @@ impl QueueSnapshot {
mod test {
use std::time::{Duration, Instant};

use crate::rtp_::{DataSize, RtpHeader};
use crate::rtp_::{DataSize, Mid, RtpHeader};

use super::*;

Expand Down Expand Up @@ -993,8 +988,7 @@ mod test {
let now = Instant::now();

let mut state = QueueState {
mid: Mid::from("001"),
rid: None,
midrid: MidRid(Mid::from("001"), None),
unpaced: false,
use_for_padding: true,
snapshot: QueueSnapshot {
Expand All @@ -1009,8 +1003,7 @@ mod test {
};

let other = QueueState {
mid: Mid::from("002"),
rid: None,
midrid: MidRid(Mid::from("002"), None),
unpaced: false,
use_for_padding: false,
snapshot: QueueSnapshot {
Expand All @@ -1026,7 +1019,7 @@ mod test {

state.snapshot.merge(&other.snapshot);

assert_eq!(state.mid, Mid::from("001"));
assert_eq!(state.midrid.mid(), Mid::from("001"));
assert_eq!(state.snapshot.size, 40_usize);
assert_eq!(state.snapshot.packet_count, 1337);
assert_eq!(state.snapshot.total_queue_time_origin, duration_ms(1337));
Expand Down Expand Up @@ -1182,19 +1175,16 @@ mod test {
}

let timeout = {
if let Some(mid_rid) = pacer.poll_queue() {
let packet = queue.next_packet().unwrap_or_else(|| {
panic!(
"Should have a packet for mid {} rid {:?}",
mid_rid.mid, mid_rid.rid
)
});
queue.register_send(mid_rid, base + elapsed);
if let Some(midrid) = pacer.poll_queue() {
let packet = queue
.next_packet()
.unwrap_or_else(|| panic!("Should have a packet for {:?}", midrid));
queue.register_send(midrid, base + elapsed);
queue.update_average_queue_time(base + elapsed);
pacer.register_send(
base + elapsed,
DataSize::bytes(packet.payload_len as u64),
mid_rid,
midrid,
);
if packet.kind == PacketKind::Padding {
padding_sent += packet.payload_len.into();
Expand Down Expand Up @@ -1324,15 +1314,18 @@ mod test {
.into_iter()
}

pub(super) fn register_send(&mut self, mid_rid: MidRid, now: Instant) {
if self.video_queue.mid_rid == mid_rid {
pub(super) fn register_send(&mut self, midrid: MidRid, now: Instant) {
if self.video_queue.midrid == midrid {
self.video_queue.last_emitted = Some(now);
} else if self.audio_queue.mid_rid == mid_rid {
} else if self.audio_queue.midrid == midrid {
self.audio_queue.last_emitted = Some(now);
} else if self.padding_queue.mid_rid == mid_rid {
} else if self.padding_queue.midrid == midrid {
self.padding_queue.last_emitted = Some(now);
} else {
panic!("Attempted to register send on unknown queue with id {mid_rid:?}");
panic!(
"Attempted to register send on unknown queue with id {:?}",
midrid
);
}
}

Expand Down Expand Up @@ -1367,17 +1360,17 @@ mod test {
fn default() -> Self {
Self {
audio_queue: Inner::new(
MidRid::new(Mid::from("001"), None),
MidRid(Mid::from("001"), None),
true,
QueuePriority::Media,
),
video_queue: Inner::new(
MidRid::new(Mid::from("002"), None),
MidRid(Mid::from("002"), None),
false,
QueuePriority::Media,
),
padding_queue: Inner::new(
MidRid::new(Mid::from("003"), None),
MidRid(Mid::from("003"), None),
false,
QueuePriority::Padding,
),
Expand All @@ -1392,7 +1385,7 @@ mod test {
}

struct Inner {
mid_rid: MidRid,
midrid: MidRid,
last_emitted: Option<Instant>,
queue: VecDeque<QueuedPacket>,
packet_count: u32,
Expand All @@ -1403,9 +1396,9 @@ mod test {
}

impl Inner {
fn new(mid_rid: MidRid, is_audio: bool, priority: QueuePriority) -> Self {
fn new(midrid: MidRid, is_audio: bool, priority: QueuePriority) -> Self {
Self {
mid_rid,
midrid,
last_emitted: None,
queue: VecDeque::default(),
packet_count: 0,
Expand Down Expand Up @@ -1453,8 +1446,7 @@ mod test {

fn queue_state(&self, now: Instant) -> QueueState {
QueueState {
mid: self.mid_rid.mid,
rid: self.mid_rid.rid,
midrid: self.midrid,
unpaced: self.is_audio,
use_for_padding: !self.is_audio && self.last_emitted.is_some(),
snapshot: QueueSnapshot {
Expand Down
2 changes: 1 addition & 1 deletion src/sdp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use thiserror::Error;
mod data;
pub(crate) use data::{FormatParam, Sdp, Session, SessionAttribute, Setup};
pub(crate) use data::{MediaAttribute, MediaLine, MediaType, Msid, Proto};
pub(crate) use data::{Simulcast, SimulcastGroups, RestrictionId};
pub(crate) use data::{RestrictionId, Simulcast, SimulcastGroups};
pub(crate) use parser::parse_candidate;

#[cfg(test)]
Expand Down
Loading

0 comments on commit 96b7c97

Please sign in to comment.