diff --git a/Cargo.lock b/Cargo.lock index 7fb808d4..ddb1f411 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -247,6 +247,26 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "baton" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07d1e996140e7f9b33421c076a4ec5de8eb9b81e8b6810a3eadf67b7e7a8d85c" +dependencies = [ + "baton-derive", + "tokio", +] + +[[package]] +name = "baton-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0a78a981083cc2a1436ec52b4e9ecbf8536a2e6e6470a3a8257376971a6d8fb" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "bindgen" version = "0.69.5" @@ -1487,6 +1507,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "moq-async" +version = "0.1.0" +dependencies = [ + "futures", + "tokio", + "tracing", + "wasm-bindgen-futures", +] + [[package]] name = "moq-clock" version = "0.5.19" @@ -1527,6 +1557,7 @@ dependencies = [ "derive_more", "hex", "lazy_static", + "moq-async", "moq-native", "moq-transfork", "mp4-atom", @@ -1538,6 +1569,7 @@ dependencies = [ "tokio", "tracing", "url", + "web-time", ] [[package]] @@ -1587,6 +1619,7 @@ version = "0.8.2" dependencies = [ "bytes", "futures", + "moq-async", "num_enum", "thiserror 2.0.11", "tokio", @@ -1599,10 +1632,12 @@ dependencies = [ name = "moq-web" version = "0.3.11" dependencies = [ + "baton", "console_error_panic_hook", "gloo-net", "hex", "js-sys", + "moq-async", "moq-karp", "thiserror 2.0.11", "tokio", @@ -1612,14 +1647,15 @@ dependencies = [ "wasm-bindgen-futures", "wasm-tracing", "web-codecs", + "web-streams", "web-sys", ] [[package]] name = "mp4-atom" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "952d37eb0f265505e2b4888d1a58bf3ddea0047f623c06f656d03bcc3184caab" +checksum = "4e6ca41187fd83535ec10045e7bd4e6ee508b5eb0488c3bce66bca9182d26a96" dependencies = [ "bytes", "num", @@ -3007,9 +3043,9 @@ dependencies = [ [[package]] name = "web-codecs" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96ac804a22cc9f7df65cf7048b23b5ef2dc74ade47b783ea2026088939820c6a" +checksum = "294dfa949f7d28cba1b2b59c581e46f4e87f45fec52d3ea021e2cbd1acc8d983" dependencies = [ "bytes", "derive_more", @@ -3021,6 +3057,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "web-streams" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64b009237ac7dbc6e1cc55bb5f0e511f6895b086899cb42ad3645d524c9aea1a" +dependencies = [ + "thiserror 1.0.69", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.77" diff --git a/Cargo.toml b/Cargo.toml index 845a6c73..441680dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "moq-karp", "moq-gst", "moq-web", + "moq-async", ] default-members = [ "moq-transfork", @@ -15,6 +16,7 @@ default-members = [ "moq-native", "moq-karp", "moq-web", + "moq-async", # "moq-gst", # Requires gstreamer is installed; annoying ] resolver = "2" diff --git a/moq-async/Cargo.toml b/moq-async/Cargo.toml new file mode 100644 index 00000000..14b6c9e9 --- /dev/null +++ b/moq-async/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "moq-async" +description = "Media over QUIC - Async helpers and utilities" +authors = ["Luke Curley"] +repository = "https://github.com/kixelated/moq-rs" +license = "MIT OR Apache-2.0" + +version = "0.1.0" +edition = "2021" + +keywords = ["quic", "http3", "webtransport", "media", "live"] +categories = ["multimedia", "network-programming", "web-programming"] + +[dependencies] +tracing = "0.1" +tokio = { version = "1.41", features = ["rt"] } +futures = "0.3" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen-futures = "0.4" diff --git a/moq-async/src/close.rs b/moq-async/src/close.rs new file mode 100644 index 00000000..4129854f --- /dev/null +++ b/moq-async/src/close.rs @@ -0,0 +1,21 @@ +use std::error::Error; + +pub trait Close { + fn close(&mut self, err: E); +} + +pub trait OrClose, V, E: Error + Clone> { + fn or_close(self, stream: &mut S) -> Result; +} + +impl, V, E: Error + Clone> OrClose for Result { + fn or_close(self, stream: &mut S) -> Result { + match self { + Ok(v) => Ok(v), + Err(err) => { + stream.close(err.clone()); + Err(err) + } + } + } +} diff --git a/moq-transfork/src/util/futures.rs b/moq-async/src/futures.rs similarity index 60% rename from moq-transfork/src/util/futures.rs rename to moq-async/src/futures.rs index a9c59f1d..ef7c3bef 100644 --- a/moq-transfork/src/util/futures.rs +++ b/moq-async/src/futures.rs @@ -9,6 +9,13 @@ pub trait FuturesExt: Future { { Transpose { future: self } } + + fn cloned(self) -> Cloned + where + Self: Sized, + { + Cloned { future: self } + } } impl FuturesExt for F {} @@ -35,3 +42,25 @@ where } } } + +pub struct Cloned { + future: F, +} + +impl Future for Cloned +where + F: Future, + T: Clone, +{ + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Frankly I have no idea if this is correct; I hate Pin + let future = unsafe { self.map_unchecked_mut(|s| &mut s.future) }; + + match future.poll(cx) { + Poll::Ready(val) => Poll::Ready(val.clone()), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/moq-transfork/src/util/mod.rs b/moq-async/src/lib.rs similarity index 100% rename from moq-transfork/src/util/mod.rs rename to moq-async/src/lib.rs diff --git a/moq-transfork/src/util/lock.rs b/moq-async/src/lock.rs similarity index 100% rename from moq-transfork/src/util/lock.rs rename to moq-async/src/lock.rs diff --git a/moq-transfork/src/util/spawn.rs b/moq-async/src/spawn.rs similarity index 100% rename from moq-transfork/src/util/spawn.rs rename to moq-async/src/spawn.rs diff --git a/moq-gst/src/sink/imp.rs b/moq-gst/src/sink/imp.rs index 064250eb..6fc4e732 100644 --- a/moq-gst/src/sink/imp.rs +++ b/moq-gst/src/sink/imp.rs @@ -171,6 +171,7 @@ impl MoqSink { .path_segments() .expect("missing path") .collect::(); + let broadcast = moq_karp::BroadcastProducer::new(session, path).unwrap(); let media = moq_karp::cmaf::Import::new(broadcast); diff --git a/moq-karp/Cargo.toml b/moq-karp/Cargo.toml index e1117ab8..9777a34d 100644 --- a/moq-karp/Cargo.toml +++ b/moq-karp/Cargo.toml @@ -15,12 +15,13 @@ categories = ["multimedia", "network-programming", "web-programming"] [dependencies] moq-transfork = { path = "../moq-transfork", version = "0.8" } +moq-async = { path = "../moq-async", version = "0.1" } url = "2" bytes = "1.9" hex = "0.4" -mp4-atom = { version = "0.3", features = ["tokio", "bytes"] } +mp4-atom = { version = "0.4", features = ["tokio", "bytes"] } serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -33,6 +34,8 @@ regex = "1" tokio = { version = "1.43", features = ["macros"] } +web-time = "1" + # CLI only dependencies moq-native = { path = "../moq-native", version = "0.6", optional = true } clap = { version = "4", features = ["derive"], optional = true } diff --git a/moq-karp/src/audio/mod.rs b/moq-karp/src/audio/mod.rs index 95d5cf4b..49384869 100644 --- a/moq-karp/src/audio/mod.rs +++ b/moq-karp/src/audio/mod.rs @@ -20,9 +20,8 @@ pub struct Audio { #[serde_as(as = "DisplayFromStr")] pub codec: AudioCodec, - pub sample_rate: u16, - pub channel_count: u16, + pub sample_rate: u32, + pub channel_count: u32, - #[serde(skip_serializing_if = "Option::is_none")] - pub bitrate: Option, + pub bitrate: Option, } diff --git a/moq-karp/src/broadcast.rs b/moq-karp/src/broadcast.rs index d7d942de..798503b1 100644 --- a/moq-karp/src/broadcast.rs +++ b/moq-karp/src/broadcast.rs @@ -1,32 +1,36 @@ use crate::{Audio, Catalog, Error, Result, Track, TrackConsumer, TrackProducer, Video}; +use moq_async::{spawn, Lock}; use moq_transfork::{Announced, AnnouncedConsumer, Path, Session}; use derive_more::Debug; -#[derive(Debug)] +#[derive(Debug, Clone)] #[debug("{:?}", path)] pub struct BroadcastProducer { pub session: Session, pub path: Path, - id: u128, + id: u64, + catalog: Lock, +} - catalog: Catalog, - catalog_producer: moq_transfork::TrackProducer, // need to hold the track to keep it open +struct CatalogProducer { + current: Catalog, + track: moq_transfork::TrackProducer, } impl BroadcastProducer { pub fn new(mut session: Session, path: Path) -> Result { // Generate a "unique" ID for this broadcast session. // If we crash, then the viewers will automatically reconnect to the new ID. - let id = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) + let id = web_time::SystemTime::now() + .duration_since(web_time::SystemTime::UNIX_EPOCH) .unwrap() - .as_millis(); + .as_millis() as u64; let full = path.clone().push(id); - let track = moq_transfork::Track { + let catalog = moq_transfork::Track { path: full, priority: -1, group_order: moq_transfork::GroupOrder::Desc, @@ -34,18 +38,23 @@ impl BroadcastProducer { } .produce(); - session.publish(track.1)?; + // Publish the catalog, even though it's currently empty. + session.publish(catalog.1)?; + + let catalog = Lock::new(CatalogProducer { + current: Catalog::default(), + track: catalog.0, + }); Ok(Self { session, path, id, - catalog: Catalog::default(), - catalog_producer: track.0, + catalog, }) } - pub fn video(&mut self, info: Video) -> Result { + pub fn publish_video(&mut self, info: Video) -> Result { let path = self.path.clone().push(self.id).push(&info.track.name); let (producer, consumer) = moq_transfork::Track { @@ -58,13 +67,28 @@ impl BroadcastProducer { .produce(); self.session.publish(consumer)?; - self.catalog.video.push(info); - self.update()?; - Ok(TrackProducer::new(producer)) + let mut catalog = self.catalog.lock(); + catalog.current.video.push(info.clone()); + catalog.update()?; + + let producer = TrackProducer::new(producer); + let consumer = producer.subscribe(); + + // Start a task that will remove the catalog on drop. + let catalog = self.catalog.clone(); + spawn(async move { + consumer.closed().await.ok(); + + let mut catalog = catalog.lock(); + catalog.current.video.retain(|v| v.track != info.track); + catalog.update().unwrap(); + }); + + Ok(producer) } - pub fn audio(&mut self, info: Audio) -> Result { + pub fn publish_audio(&mut self, info: Audio) -> Result { let path = self.path.clone().push(self.id).push(&info.track.name); let (producer, consumer) = moq_transfork::Track { @@ -77,20 +101,33 @@ impl BroadcastProducer { .produce(); self.session.publish(consumer)?; - self.catalog.audio.push(info); - self.update()?; - Ok(TrackProducer::new(producer)) - } + let mut catalog = self.catalog.lock(); + catalog.current.audio.push(info.clone()); + catalog.update()?; + + let producer = TrackProducer::new(producer); + let consumer = producer.subscribe(); + + // Start a task that will remove the catalog on drop. + let catalog = self.catalog.clone(); + spawn(async move { + consumer.closed().await.ok(); + + let mut catalog = catalog.lock(); + catalog.current.audio.retain(|v| v.track != info.track); + catalog.update().unwrap(); + }); - pub fn catalog(&self) -> &Catalog { - &self.catalog + Ok(producer) } +} +impl CatalogProducer { fn update(&mut self) -> Result<()> { - let frame = self.catalog.to_string()?; + let frame = self.current.to_string()?; - let mut group = self.catalog_producer.append_group(); + let mut group = self.track.append_group(); group.write_frame(frame); Ok(()) @@ -110,6 +147,9 @@ pub struct BroadcastConsumer { // The ID of the current broadcast current: Option, + // True if we should None because the broadcast has ended. + ended: bool, + catalog_track: Option, catalog_group: Option, } @@ -123,17 +163,21 @@ impl BroadcastConsumer { path, announced, current: None, + ended: false, catalog_track: None, catalog_group: None, } } - /// Returns the latest catalog, or None if the broadcast has ended. - // TODO Make a new interface instead of returning the catalog directly. - // Otherwise, consumers won't realize that the underlying tracks are completely different. - // ex. "video" on the old session !== "video" on the new session + /// Returns the latest catalog, or None if the channel is offline. pub async fn catalog(&mut self) -> Result> { loop { + if self.ended { + // Avoid returning None again. + self.ended = false; + return Ok(None); + } + tokio::select! { biased; // Wait for new announcements. @@ -144,7 +188,9 @@ impl BroadcastConsumer { Announced::Ended(suffix) => self.unload(suffix), Announced::Live => { // Return None if we're caught up to live with no broadcast. - if self.current.is_none() { return Ok(None) } + if self.current.is_none() { + return Ok(None) + } }, } }, @@ -157,7 +203,7 @@ impl BroadcastConsumer { self.catalog_group.take(); // We don't support deltas yet return Ok(Some(catalog)); }, - else => return Ok(None), + else => return Err(self.session.closed().await.into()), } } } @@ -202,6 +248,7 @@ impl BroadcastConsumer { self.current = None; self.catalog_track = None; self.catalog_group = None; + self.ended = true; } } diff --git a/moq-karp/src/cmaf/import.rs b/moq-karp/src/cmaf/import.rs index bec54f27..188ae176 100644 --- a/moq-karp/src/cmaf/import.rs +++ b/moq-karp/src/cmaf/import.rs @@ -4,9 +4,7 @@ use std::collections::HashMap; use tokio::io::{AsyncRead, AsyncReadExt}; use super::{Error, Result}; -use crate::{ - Audio, BroadcastProducer, Catalog, Dimensions, Frame, Timestamp, Track, TrackProducer, Video, AAC, H264, VP9, -}; +use crate::{Audio, BroadcastProducer, Dimensions, Frame, Timestamp, Track, TrackProducer, Video, AAC, H264, VP9}; /// Converts fMP4 -> Karp pub struct Import { @@ -85,11 +83,11 @@ impl Import { let track = match handler.as_ref() { b"vide" => { let track = Self::init_video(trak)?; - self.broadcast.video(track)? + self.broadcast.publish_video(track)? } b"soun" => { let track = Self::init_audio(trak)?; - self.broadcast.audio(track)? + self.broadcast.publish_audio(track)? } b"sbtl" => return Err(Error::UnsupportedTrack("subtitle")), _ => return Err(Error::UnsupportedTrack("unknown")), @@ -116,8 +114,8 @@ impl Import { Video { track: Track { name, priority: 2 }, resolution: Dimensions { - width: avc1.width, - height: avc1.height, + width: avc1.width as _, + height: avc1.height as _, }, codec: H264 { profile: avcc.avc_profile_indication, @@ -125,7 +123,7 @@ impl Import { level: avcc.avc_level_indication, } .into(), - description: description.freeze(), + description: Some(description.freeze()), bitrate: None, } } else if let Some(hev1) = &stsd.hev1 { @@ -169,8 +167,8 @@ impl Import { .into(), description: Default::default(), resolution: Dimensions { - width: vp09.width, - height: vp09.height, + width: vp09.width as _, + height: vp09.height as _, }, bitrate: None, } @@ -205,9 +203,9 @@ impl Import { profile: desc.dec_specific.profile, } .into(), - sample_rate: mp4a.samplerate.integer(), - channel_count: mp4a.channelcount, - bitrate: Some(std::cmp::max(desc.avg_bitrate, desc.max_bitrate)), + sample_rate: mp4a.samplerate.integer() as _, + channel_count: mp4a.channelcount as _, + bitrate: Some(std::cmp::max(desc.avg_bitrate, desc.max_bitrate) as _), } } else { return Err(Error::UnsupportedCodec("unknown")); @@ -400,8 +398,4 @@ impl Import { Ok(()) } - - pub fn catalog(&self) -> &Catalog { - self.broadcast.catalog() - } } diff --git a/moq-karp/src/error.rs b/moq-karp/src/error.rs index 47597461..c9965970 100644 --- a/moq-karp/src/error.rs +++ b/moq-karp/src/error.rs @@ -1,4 +1,6 @@ -#[derive(Debug, thiserror::Error)] +use std::sync::Arc; + +#[derive(Debug, thiserror::Error, Clone)] pub enum Error { #[error("transfork error: {0}")] Transfork(#[from] moq_transfork::Error), @@ -7,7 +9,7 @@ pub enum Error { Decode(#[from] moq_transfork::coding::DecodeError), #[error("json error: {0}")] - Json(#[from] serde_json::Error), + Json(Arc), #[error("duplicate track")] DuplicateTrack, @@ -35,3 +37,10 @@ pub enum Error { } pub type Result = std::result::Result; + +// Wrap in an Arc so it is Clone +impl From for Error { + fn from(err: serde_json::Error) -> Self { + Error::Json(Arc::new(err)) + } +} diff --git a/moq-karp/src/frame.rs b/moq-karp/src/frame.rs index a718a09c..bef1f61e 100644 --- a/moq-karp/src/frame.rs +++ b/moq-karp/src/frame.rs @@ -2,9 +2,7 @@ use std::fmt; use moq_transfork::coding::*; -use derive_more::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Rem, RemAssign, Sub, SubAssign, Sum}; - -use derive_more::Debug; +use derive_more::{Add, AddAssign, Debug, Div, DivAssign, Mul, MulAssign, Rem, RemAssign, Sub, SubAssign, Sum}; #[derive(Clone, Debug)] pub struct Frame { @@ -15,6 +13,7 @@ pub struct Frame { pub payload: Bytes, } +// TODO combine this with web_codecs::Timestamp #[derive( Clone, Copy, diff --git a/moq-karp/src/main.rs b/moq-karp/src/main.rs index b7545529..91a1a283 100644 --- a/moq-karp/src/main.rs +++ b/moq-karp/src/main.rs @@ -77,7 +77,7 @@ async fn publish(config: Config, url: String) -> anyhow::Result<()> { let mut import = cmaf::Import::new(broadcast); import.init_from(&mut input).await.context("failed to initialize")?; - tracing::info!(catalog = ?import.catalog(), "publishing"); + tracing::info!("publishing"); tokio::select! { res = import.read_from(&mut input) => Ok(res?), diff --git a/moq-karp/src/timestamp.rs b/moq-karp/src/timestamp.rs deleted file mode 100644 index 8b137891..00000000 --- a/moq-karp/src/timestamp.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/moq-karp/src/track.rs b/moq-karp/src/track.rs index 9bf24e81..088ecc28 100644 --- a/moq-karp/src/track.rs +++ b/moq-karp/src/track.rs @@ -46,6 +46,10 @@ impl TrackProducer { self.group.replace(group); } + + pub fn subscribe(&self) -> TrackConsumer { + TrackConsumer::new(self.track.subscribe()) + } } #[derive(Debug)] @@ -113,4 +117,8 @@ impl TrackConsumer { Ok(frame) } + + pub async fn closed(&self) -> Result<(), Error> { + self.track.closed().await.map_err(Into::into) + } } diff --git a/moq-karp/src/video/dimensions.rs b/moq-karp/src/video/dimensions.rs index 0a72867f..4afb3c96 100644 --- a/moq-karp/src/video/dimensions.rs +++ b/moq-karp/src/video/dimensions.rs @@ -2,6 +2,6 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct Dimensions { - pub width: u16, - pub height: u16, + pub width: u32, + pub height: u32, } diff --git a/moq-karp/src/video/mod.rs b/moq-karp/src/video/mod.rs index 67ea456a..0778bd47 100644 --- a/moq-karp/src/video/mod.rs +++ b/moq-karp/src/video/mod.rs @@ -32,13 +32,13 @@ pub struct Video { // Some codecs unfortunately aren't self-describing // One of the best examples is H264, which needs the sps/pps out of band to function. - #[serde(default, skip_serializing_if = "Bytes::is_empty")] - #[serde_as(as = "Hex")] - pub description: Bytes, + #[serde(default)] + #[serde_as(as = "Option")] + pub description: Option, // The encoded width/height of the media pub resolution: Dimensions, #[serde(default)] - pub bitrate: Option, + pub bitrate: Option, } diff --git a/moq-native/src/quic.rs b/moq-native/src/quic.rs index 08cca3d2..6a5fa636 100644 --- a/moq-native/src/quic.rs +++ b/moq-native/src/quic.rs @@ -51,8 +51,8 @@ impl Endpoint { // Enable BBR congestion control // TODO validate the implementation let mut transport = quinn::TransportConfig::default(); - transport.max_idle_timeout(Some(time::Duration::from_secs(20).try_into().unwrap())); - transport.keep_alive_interval(Some(time::Duration::from_secs(6))); // TODO make this smarter + transport.max_idle_timeout(Some(time::Duration::from_secs(9).try_into().unwrap())); + transport.keep_alive_interval(Some(time::Duration::from_secs(4))); // TODO make this smarter transport.congestion_controller_factory(Arc::new(quinn::congestion::BbrConfig::default())); transport.mtu_discovery_config(None); // Disable MTU discovery let transport = Arc::new(transport); diff --git a/moq-relay/src/cluster.rs b/moq-relay/src/cluster.rs index 0f185c8e..ce8b213a 100644 --- a/moq-relay/src/cluster.rs +++ b/moq-relay/src/cluster.rs @@ -138,7 +138,6 @@ impl Cluster { // Extract the hostname from the first part of the path. let host = path.first().context("missing node")?.to_string(); if Some(&host) == node.as_ref() { - tracing::warn!(?host, "skipping self"); // Skip ourselves. continue; } diff --git a/moq-transfork/Cargo.toml b/moq-transfork/Cargo.toml index 8a98e3bc..8fab311b 100644 --- a/moq-transfork/Cargo.toml +++ b/moq-transfork/Cargo.toml @@ -28,5 +28,7 @@ futures = "0.3" num_enum = "0.7" +moq-async = { path = "../moq-async" } + [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/moq-transfork/src/lib.rs b/moq-transfork/src/lib.rs index eb4e41d1..5ed2d2b0 100644 --- a/moq-transfork/src/lib.rs +++ b/moq-transfork/src/lib.rs @@ -38,8 +38,6 @@ mod session; pub mod coding; pub mod message; -pub(crate) mod util; - pub use error::*; pub use model::*; pub use session::*; diff --git a/moq-transfork/src/session/mod.rs b/moq-transfork/src/session/mod.rs index 89ee8d85..83a06dd5 100644 --- a/moq-transfork/src/session/mod.rs +++ b/moq-transfork/src/session/mod.rs @@ -1,8 +1,6 @@ -use crate::{ - message, - util::{spawn, Close, OrClose}, - AnnouncedConsumer, Error, Path, RouterConsumer, Track, TrackConsumer, -}; +use crate::{message, AnnouncedConsumer, Error, Path, RouterConsumer, Track, TrackConsumer}; + +use moq_async::{spawn, Close, OrClose}; mod publisher; mod reader; diff --git a/moq-transfork/src/session/publisher.rs b/moq-transfork/src/session/publisher.rs index cdba9c24..24975134 100644 --- a/moq-transfork/src/session/publisher.rs +++ b/moq-transfork/src/session/publisher.rs @@ -5,10 +5,11 @@ use futures::{stream::FuturesUnordered, StreamExt}; use crate::{ message, model::{GroupConsumer, Track, TrackConsumer}, - util::{spawn, FuturesExt, Lock, OrClose}, Announced, AnnouncedConsumer, AnnouncedProducer, Error, Path, RouterConsumer, }; +use moq_async::{spawn, FuturesExt, Lock, OrClose}; + use super::{Stream, Writer}; #[derive(Clone)] @@ -119,7 +120,7 @@ impl Publisher { self.serve_subscribe(stream, subscribe).await } - #[tracing::instrument("subscribed", skip_all, err, fields(track = ?subscribe.path, id = subscribe.id))] + #[tracing::instrument("publishing", skip_all, err, fields(track = ?subscribe.path, id = subscribe.id))] async fn serve_subscribe(&mut self, stream: &mut Stream, subscribe: message::Subscribe) -> Result<(), Error> { let track = Track { path: subscribe.path, @@ -137,7 +138,7 @@ impl Publisher { track_priority: track.priority, }; - tracing::info!(?info); + tracing::info!(?info, "active"); stream.writer.encode(&info).await?; diff --git a/moq-transfork/src/session/reader.rs b/moq-transfork/src/session/reader.rs index d85a2da0..919e6b28 100644 --- a/moq-transfork/src/session/reader.rs +++ b/moq-transfork/src/session/reader.rs @@ -86,7 +86,7 @@ impl Reader { */ } -impl Close for Reader { +impl Close for Reader { fn close(&mut self, err: Error) { self.stream.stop(err.to_code()); } diff --git a/moq-transfork/src/session/stream.rs b/moq-transfork/src/session/stream.rs index 8c8273ea..f06a5d4c 100644 --- a/moq-transfork/src/session/stream.rs +++ b/moq-transfork/src/session/stream.rs @@ -27,7 +27,7 @@ impl Stream { } } -impl Close for Stream { +impl Close for Stream { fn close(&mut self, err: Error) { self.writer.close(err.clone()); self.reader.close(err); diff --git a/moq-transfork/src/session/subscriber.rs b/moq-transfork/src/session/subscriber.rs index 3020e6c2..409ca772 100644 --- a/moq-transfork/src/session/subscriber.rs +++ b/moq-transfork/src/session/subscriber.rs @@ -6,10 +6,11 @@ use std::{ use crate::{ message, model::{Track, TrackConsumer}, - util::{spawn, Lock, OrClose}, AnnouncedProducer, Error, Path, TrackProducer, }; +use moq_async::{spawn, Lock, OrClose}; + use super::{AnnouncedConsumer, Reader, Stream}; #[derive(Clone)] @@ -158,9 +159,9 @@ impl Subscriber { stream.writer.encode(&request).await?; // TODO use the response to correctly populate the track info - let _response: message::Info = stream.reader.decode().await?; + let info: message::Info = stream.reader.decode().await?; - tracing::info!("subscribed"); + tracing::info!(?info, "active"); loop { tokio::select! { diff --git a/moq-transfork/src/session/writer.rs b/moq-transfork/src/session/writer.rs index a7a6fd0f..7c05b953 100644 --- a/moq-transfork/src/session/writer.rs +++ b/moq-transfork/src/session/writer.rs @@ -1,8 +1,8 @@ use std::fmt; -use crate::util::Close; -use crate::Error; -use crate::{coding::*, message}; +use crate::{coding::*, message, Error}; + +use moq_async::Close; pub(super) struct Writer { stream: web_transport::SendStream, @@ -43,7 +43,7 @@ impl Writer { } } -impl Close for Writer { +impl Close for Writer { fn close(&mut self, err: Error) { self.stream.reset(err.to_code()); } diff --git a/moq-transfork/src/util/close.rs b/moq-transfork/src/util/close.rs deleted file mode 100644 index f5429b41..00000000 --- a/moq-transfork/src/util/close.rs +++ /dev/null @@ -1,21 +0,0 @@ -use crate::Error; - -pub trait Close { - fn close(&mut self, err: Error); -} - -pub trait OrClose { - fn or_close(self, stream: &mut S) -> Result; -} - -impl OrClose for Result { - fn or_close(self, stream: &mut S) -> Result { - match self { - Ok(v) => Ok(v), - Err(err) => { - stream.close(err.clone()); - Err(err) - } - } - } -} diff --git a/moq-web/Cargo.toml b/moq-web/Cargo.toml index 2edde945..e8aa2b1d 100644 --- a/moq-web/Cargo.toml +++ b/moq-web/Cargo.toml @@ -20,32 +20,35 @@ wasm-tracing = "0.2" gloo-net = "0.6" moq-karp = { version = "0.11", path = "../moq-karp", default-features = false } -web-codecs = "0.3.2" +moq-async = { version = "0.1", path = "../moq-async" } +web-streams = "0.1.1" +web-codecs = "0.3.3" tokio = { version = "1", features = ["sync"] } js-sys = "0.3.77" url = "2" thiserror = "2" hex = "0.4" +baton = "0.2" [dependencies.web-sys] version = "0.3.77" features = [ + # DOM "Window", "Document", "HtmlElement", "Node", "Text", + "HtmlVideoElement", - # WebCodecs - "VideoDecoder", - "VideoDecoderInit", - "VideoDecoderConfig", - "VideoFrame", - "VideoColorSpaceInit", - "EncodedVideoChunk", - "EncodedVideoChunkInit", - "EncodedVideoChunkType", + # Custom elements + "HtmlSlotElement", + "AssignedNodesOptions", + "CustomEvent", + "CustomEventInit", + "Event", + "EventTarget", # Canvas stuff "CanvasRenderingContext2d", @@ -54,4 +57,12 @@ features = [ "OffscreenCanvas", "DedicatedWorkerGlobalScope", "OffscreenCanvasRenderingContext2d", + + # Capture + "MediaStream", + "MediaStreamTrack", + "MediaTrackSettings", + "MediaStreamTrackProcessor", + "MediaStreamTrackProcessorInit", + "ReadableStreamDefaultReader", ] diff --git a/moq-web/src/bridge.ts b/moq-web/src/bridge.ts deleted file mode 100644 index b443abbb..00000000 --- a/moq-web/src/bridge.ts +++ /dev/null @@ -1,20 +0,0 @@ -import * as Comlink from "comlink"; -import type { Api } from "./worker"; - -// Create a new worker instance -// We wait until the worker is fully initialized before we return the proxy. -function init(): Promise> { - return new Promise((resolve) => { - const worker = new Worker(new URL("./worker", import.meta.url), { type: "module" }); - worker.addEventListener( - "message", - (event) => { - const proxy: Comlink.Remote = Comlink.wrap(worker); - resolve(proxy); - }, - { once: true }, - ); - }); -} - -export { init }; diff --git a/moq-web/src/decoder.rs b/moq-web/src/decoder.rs deleted file mode 100644 index 3bab9c35..00000000 --- a/moq-web/src/decoder.rs +++ /dev/null @@ -1,27 +0,0 @@ -use crate::{Result, Run}; - -pub struct Decoder { - track: moq_karp::TrackConsumer, - decoder: web_codecs::VideoDecoder, -} - -impl Decoder { - pub fn new(track: moq_karp::TrackConsumer, decoder: web_codecs::VideoDecoder) -> Self { - Self { track, decoder } - } -} - -impl Run for Decoder { - async fn run(&mut self) -> Result<()> { - while let Some(frame) = self.track.read().await? { - let frame = web_codecs::EncodedFrame { - payload: frame.payload, - timestamp: frame.timestamp.as_micros() as _, - keyframe: frame.keyframe, - }; - self.decoder.decode(frame)?; - } - - Ok(()) - } -} diff --git a/moq-web/src/demo/index.html b/moq-web/src/demo/index.html index 7d5d0bd6..1fdee368 100644 --- a/moq-web/src/demo/index.html +++ b/moq-web/src/demo/index.html @@ -5,10 +5,20 @@ MoQ Demo + - - + + + + - + \ No newline at end of file diff --git a/moq-web/src/demo/index.ts b/moq-web/src/demo/index.ts index 64138ab6..57550c10 100644 --- a/moq-web/src/demo/index.ts +++ b/moq-web/src/demo/index.ts @@ -1 +1,3 @@ -export { MoqVideoElement } from ".."; +export { MoqPublishElement } from "../publish/element"; +export { MoqWatchElement } from "../watch/element"; +export { MoqVideoElement } from "../video"; diff --git a/moq-web/src/element/index.ts b/moq-web/src/element/index.ts deleted file mode 100644 index 0e56991b..00000000 --- a/moq-web/src/element/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -import MoqVideoElement from "./video"; -export { MoqVideoElement }; diff --git a/moq-web/src/error.rs b/moq-web/src/error.rs index dcf86247..303065b1 100644 --- a/moq-web/src/error.rs +++ b/moq-web/src/error.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; + use moq_karp::moq_transfork; use moq_transfork::web_transport; use wasm_bindgen::JsValue; -#[derive(Debug, thiserror::Error)] +#[derive(Clone, Debug, thiserror::Error)] pub enum Error { #[error("transfork error: {0}")] Transfork(#[from] moq_transfork::Error), @@ -13,6 +15,9 @@ pub enum Error { #[error("webcodecs error: {0}")] WebCodecs(#[from] web_codecs::Error), + #[error("streams error: {0}")] + Streams(#[from] web_streams::Error), + #[error("karp error: {0}")] Karp(#[from] moq_karp::Error), @@ -25,8 +30,17 @@ pub enum Error { #[error("offline")] Offline, + #[error("unsupported")] + Unsupported, + + #[error("closed")] + Closed, + + #[error("capture failed")] + InitFailed, + #[error("http error: {0}")] - Http(#[from] gloo_net::Error), + Http(Arc), } pub type Result = std::result::Result; @@ -36,3 +50,9 @@ impl From for JsValue { JsValue::from_str(&format!("{}", err)) } } + +impl From for Error { + fn from(err: gloo_net::Error) -> Self { + Error::Http(Arc::new(err)) + } +} diff --git a/moq-web/src/index.ts b/moq-web/src/index.ts index b86238b4..d5b9fae2 100644 --- a/moq-web/src/index.ts +++ b/moq-web/src/index.ts @@ -1,9 +1,2 @@ // Export the library -export { Watch } from "./watch"; - -// NOTE: You can also use the custom elements -// They are in a separate path since they require side-effects to function fully. -// import "@kixelated/moq/video" - -// We still export them here for convenience -export { MoqVideoElement } from "./element"; +export * from "../../pkg"; diff --git a/moq-web/src/lib.rs b/moq-web/src/lib.rs index c518b9dc..a2a76837 100644 --- a/moq-web/src/lib.rs +++ b/moq-web/src/lib.rs @@ -1,18 +1,14 @@ -mod decoder; mod error; -//mod publish; -mod renderer; -mod run; +mod publish; mod session; mod watch; pub use error::*; -//pub use publish::*; -pub use watch::*; +pub use publish::*; -use decoder::*; -use renderer::*; -use run::*; +pub(crate) use session::*; + +pub use watch::Watch; use wasm_bindgen::prelude::*; diff --git a/moq-web/src/publish/audio.rs b/moq-web/src/publish/audio.rs new file mode 100644 index 00000000..55d0aff5 --- /dev/null +++ b/moq-web/src/publish/audio.rs @@ -0,0 +1 @@ +pub struct Audio {} diff --git a/moq-web/src/publish/backend.rs b/moq-web/src/publish/backend.rs new file mode 100644 index 00000000..c62dfb16 --- /dev/null +++ b/moq-web/src/publish/backend.rs @@ -0,0 +1,106 @@ +use moq_karp::{moq_transfork::Path, BroadcastProducer, TrackProducer}; +use wasm_bindgen::JsCast; + +use super::{ControlsRecv, PublishState, StatusSend, Video}; +use crate::{Connect, Error, Result}; + +pub struct Backend { + controls: ControlsRecv, + status: StatusSend, + + connect: Option, + path: Path, + broadcast: Option, + + video: Option