From 55e2aad4bb0dd3fd1b9b2a1d50f325876ae08525 Mon Sep 17 00:00:00 2001 From: Yohan Totting Date: Thu, 23 Nov 2023 15:39:16 +0700 Subject: [PATCH] add red support (#11) Co-authored-by: Yohan Totting --- client.go | 10 ++ clienttrackred.go | 151 ++++++++++++++++++ ...tclienttrack.go => clienttracksimulcast.go | 0 scalableclienttrack.go => clienttracksvc.go | 0 codec.go | 4 + examples/http-websocket/index.html | 43 ++++- examples/http-websocket/main.go | 1 - room.go | 2 +- sfu.go | 2 +- track.go | 47 +++++- 10 files changed, 251 insertions(+), 9 deletions(-) create mode 100644 clienttrackred.go rename simulcastclienttrack.go => clienttracksimulcast.go (100%) rename scalableclienttrack.go => clienttracksvc.go (100%) diff --git a/client.go b/client.go index 7b61b5a..b97da1c 100644 --- a/client.go +++ b/client.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "regexp" "sync" "sync/atomic" "time" @@ -119,6 +120,7 @@ type Client struct { publishedTracks *trackList pendingRemoteRenegotiation *atomic.Bool queue *queue + receiveRED bool state *atomic.Value sfu *SFU onConnectionStateChangedCallbacks []func(webrtc.PeerConnectionState) @@ -518,6 +520,14 @@ func (c *Client) negotiateQueuOp(offer webrtc.SessionDescription) (*webrtc.Sessi currentTransceiverCount := len(c.peerConnection.PC().GetTransceivers()) + if !c.receiveRED { + match, err := regexp.MatchString(`a=rtpmap:63`, offer.SDP) + if err != nil { + glog.Error("client: error on check RED support in SDP ", err) + } else { + c.receiveRED = match + } + } // Set the remote SessionDescription err := c.peerConnection.PC().SetRemoteDescription(offer) if err != nil { diff --git a/clienttrackred.go b/clienttrackred.go new file mode 100644 index 0000000..b10d9f5 --- /dev/null +++ b/clienttrackred.go @@ -0,0 +1,151 @@ +package sfu + +import ( + "context" + "encoding/binary" + "errors" + "sync" + + "github.com/golang/glog" + "github.com/pion/rtp" + "github.com/pion/webrtc/v3" +) + +var ( + ErrIncompleteRedHeader = errors.New("util: incomplete RED header") + ErrIncompleteRedBlock = errors.New("util: incomplete RED block") +) + +type clientTrackRed struct { + id string + context context.Context + cancel context.CancelFunc + mu sync.RWMutex + client *Client + kind webrtc.RTPCodecType + mimeType string + localTrack *webrtc.TrackLocalStaticRTP + remoteTrack *remoteTrack + isReceiveRed bool +} + +func (t *clientTrackRed) ID() string { + return t.id +} + +func (t *clientTrackRed) Context() context.Context { + return t.context +} + +func (t *clientTrackRed) Client() *Client { + return t.client +} + +func (t *clientTrackRed) Kind() webrtc.RTPCodecType { + return t.remoteTrack.track.Kind() +} + +func (t *clientTrackRed) push(rtp rtp.Packet, quality QualityLevel) { + if t.client.peerConnection.PC().ConnectionState() != webrtc.PeerConnectionStateConnected { + return + } + + if !t.isReceiveRed { + rtp = t.getPrimaryEncoding(rtp) + } + + if err := t.localTrack.WriteRTP(&rtp); err != nil { + glog.Error("clienttrack: error on write rtp", err) + } +} + +func (t *clientTrackRed) LocalTrack() *webrtc.TrackLocalStaticRTP { + return t.localTrack +} + +func (t *clientTrackRed) IsScreen() bool { + return false +} + +func (t *clientTrackRed) SetSourceType(_ TrackType) { + // do nothing +} + +func (t *clientTrackRed) IsSimulcast() bool { + return false +} + +func (t *clientTrackRed) IsScaleable() bool { + return false +} + +func (t *clientTrackRed) RequestPLI() { + if err := t.remoteTrack.sendPLI(); err != nil { + glog.Error("clienttrack: error on send pli", err) + } +} + +func (t *clientTrackRed) SetMaxQuality(_ QualityLevel) { + // do nothing +} + +func (t *clientTrackRed) MaxQuality() QualityLevel { + return QualityHigh +} + +func (t *clientTrackRed) getPrimaryEncoding(rtp rtp.Packet) rtp.Packet { + payload, err := extractPrimaryEncodingForRED(rtp.Payload) + if err != nil { + glog.Error("clienttrack: error on extract primary encoding for red", err) + return rtp + } + + rtp.Payload = payload + // set to opus payload type + rtp.PayloadType = 111 + return rtp +} + +// // Credit to Livekit +// // https://github.com/livekit/livekit/blob/56dd39968408f0973374e5b336a28606a1da79d2/pkg/sfu/redprimaryreceiver.go#L267 +func extractPrimaryEncodingForRED(payload []byte) ([]byte, error) { + + /* RED payload https://datatracker.ietf.org/doc/html/rfc2198#section-3 + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |F| block PT | timestamp offset | block length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + F: 1 bit First bit in header indicates whether another header block + follows. If 1 further header blocks follow, if 0 this is the + last header block. + */ + + var blockLength int + for { + if len(payload) < 1 { + // illegal data, need at least one byte for primary encoding + return nil, ErrIncompleteRedHeader + } + + if payload[0]&0x80 == 0 { + // last block is primary encoding data + payload = payload[1:] + break + } else { + if len(payload) < 4 { + // illegal data + return nil, ErrIncompleteRedHeader + } + + blockLength += int(binary.BigEndian.Uint16(payload[2:]) & 0x03FF) + payload = payload[4:] + } + } + + if len(payload) < blockLength { + return nil, ErrIncompleteRedBlock + } + + return payload[blockLength:], nil +} diff --git a/simulcastclienttrack.go b/clienttracksimulcast.go similarity index 100% rename from simulcastclienttrack.go rename to clienttracksimulcast.go diff --git a/scalableclienttrack.go b/clienttracksvc.go similarity index 100% rename from scalableclienttrack.go rename to clienttracksvc.go diff --git a/codec.go b/codec.go index 08bce01..d60cdcb 100644 --- a/codec.go +++ b/codec.go @@ -102,6 +102,10 @@ var ( } audioCodecs = []webrtc.RTPCodecParameters{ + { + RTPCodecCapability: webrtc.RTPCodecCapability{"audio/red", 48000, 2, "111/111", nil}, + PayloadType: 63, + }, { RTPCodecCapability: webrtc.RTPCodecCapability{webrtc.MimeTypeOpus, 48000, 2, "minptime=10;useinbandfec=1", nil}, PayloadType: 111, diff --git a/examples/http-websocket/index.html b/examples/http-websocket/index.html index 89d910e..37ab4a5 100644 --- a/examples/http-websocket/index.html +++ b/examples/http-websocket/index.html @@ -127,6 +127,8 @@ let videoObserver = null + let red = true + peerConnection.ondatachannel = function(e) { console.log("ondatachannel"); if (e.channel.label == "internal") { @@ -152,6 +154,11 @@ if (urlParams.has('debug')){ debug = true } + + if (urlParams.has('disablered')){ + red = false + } + ws = new WebSocket(`ws://${window.location.host}/ws?${debug?'debug=1':''}`); const promise = new Promise((resolve, reject) => { ws.onopen = function() { @@ -297,6 +304,7 @@ e.receiver.playoutDelayHint = 0.1; e.streams.forEach((stream) => { console.log("ontrack"); + console.log(e.track.getCapabilities()); let container = document.getElementById("container-"+stream.id); if (!container) { @@ -352,14 +360,39 @@ // peerConnection.addTrack(stream.getVideoTracks()[0], stream); - peerConnection.addTransceiver(stream.getAudioTracks()[0], { + const audioTcvr= peerConnection.addTransceiver(stream.getAudioTracks()[0], { direction: 'sendonly', streams: [stream], sendEncodings: [{priority: 'high'}], }) + + + if(audioTcvr.setCodecPreferences != undefined){ + const audioCodecs = RTCRtpReceiver.getCapabilities('audio').codecs; + + let audioCodecsPref = []; + if (red){ + for(let i = 0; i < audioCodecs.length; i++){ + // audio/red 48000 111/111 + if(audioCodecs[i].mimeType == "audio/red"){ + audioCodecsPref.push(audioCodecs[i]); + } + } + } + + for(let i = 0; i < audioCodecs.length; i++){ + if(audioCodecs[i].mimeType == "audio/opus"){ + audioCodecsPref.push(audioCodecs[i]); + } + } + + audioTcvr.setCodecPreferences(audioCodecsPref); + } + + if (codec ==='vp9'){ - const tcvr = peerConnection.addTransceiver(stream.getVideoTracks()[0], { + const videoTcvr = peerConnection.addTransceiver(stream.getVideoTracks()[0], { direction: 'sendonly', streams: [stream], sendEncodings: [ @@ -371,7 +404,7 @@ ] }) - let codecs = RTCRtpReceiver.getCapabilities('video').codecs; + const codecs = RTCRtpReceiver.getCapabilities('video').codecs; let vp9_codecs = []; // iterate over supported codecs and pull out the codecs we want for(let i = 0; i < codecs.length; i++){ @@ -388,8 +421,8 @@ } // currently not all browsers support setCodecPreferences - if(tcvr.setCodecPreferences != undefined){ - tcvr.setCodecPreferences(vp9_codecs); + if(videoTcvr.setCodecPreferences != undefined){ + videoTcvr.setCodecPreferences(vp9_codecs); } } else { peerConnection.addTransceiver(stream.getVideoTracks()[0], { diff --git a/examples/http-websocket/main.go b/examples/http-websocket/main.go index 2711179..48b9ba7 100644 --- a/examples/http-websocket/main.go +++ b/examples/http-websocket/main.go @@ -90,7 +90,6 @@ func main() { roomsOpts := sfu.DefaultRoomOptions() roomsOpts.Bitrates.InitialBandwidth = 1_000_000 roomsOpts.PLIInterval = 3 * time.Second - roomsOpts.Codecs = []string{webrtc.MimeTypeVP9, webrtc.MimeTypeH264, webrtc.MimeTypeOpus} defaultRoom, _ := roomManager.NewRoom(roomID, roomName, sfu.RoomTypeLocal, roomsOpts) fakeClientCount := 0 diff --git a/room.go b/room.go index b793e91..6d15e80 100644 --- a/room.go +++ b/room.go @@ -88,7 +88,7 @@ func DefaultRoomOptions() RoomOptions { return RoomOptions{ Bitrates: DefaultBitrates(), QualityPreset: DefaultQualityPreset(), - Codecs: []string{webrtc.MimeTypeVP9, webrtc.MimeTypeH264, webrtc.MimeTypeOpus}, + Codecs: []string{webrtc.MimeTypeVP9, webrtc.MimeTypeH264, "audio/red", webrtc.MimeTypeOpus}, ClientTimeout: 10 * time.Minute, PLIInterval: 5 * time.Second, } diff --git a/sfu.go b/sfu.go index eaf5434..e839469 100644 --- a/sfu.go +++ b/sfu.go @@ -26,7 +26,7 @@ type BitratesConfig struct { func DefaultBitrates() BitratesConfig { return BitratesConfig{ - AudioRed: 48_000 * 3, + AudioRed: 48_000 * 1.5, Audio: 48_000, Video: 1_200_000, VideoHigh: 1_200_000, diff --git a/track.go b/track.go index 21eb3bd..e4d605d 100644 --- a/track.go +++ b/track.go @@ -58,6 +58,7 @@ type ITrack interface { TotalTracks() int Context() context.Context Relay(func(webrtc.SSRC, rtp.Packet)) + PayloadType() webrtc.PayloadType } type Track struct { @@ -127,6 +128,18 @@ func (t *Track) createLocalTrack() *webrtc.TrackLocalStaticRTP { return track } +func (t *Track) createOpusLocalTrack() *webrtc.TrackLocalStaticRTP { + c := t.remoteTrack.track.Codec().RTPCodecCapability + c.MimeType = webrtc.MimeTypeOpus + c.SDPFmtpLine = "minptime=10;useinbandfec=1" + track, newTrackErr := webrtc.NewTrackLocalStaticRTP(c, t.base.id, t.base.streamid) + if newTrackErr != nil { + panic(newTrackErr) + } + + return track +} + func (t *Track) ID() string { return t.base.id } @@ -205,12 +218,35 @@ func (t *Track) subscribe(c *Client) iClientTrack { client: c, localTrack: t.createLocalTrack(), remoteTrack: t, - isScreen: false, + isScreen: t.IsScreen(), onTrackEndedCallbacks: make([]func(), 0), qualityPreset: c.SFU().QualityPreset(), maxQuality: QualityHigh, lastQuality: QualityHigh, } + } else if t.Kind() == webrtc.RTPCodecTypeAudio && t.PayloadType() == 63 { + glog.Info("track: red enabled", c.receiveRED) + + mimeType := t.remoteTrack.track.Codec().MimeType + localTrack := t.createLocalTrack() + + if !c.receiveRED { + mimeType = webrtc.MimeTypeOpus + localTrack = t.createOpusLocalTrack() + } + + ct = &clientTrackRed{ + id: t.base.id, + context: ctx, + cancel: cancel, + mu: sync.RWMutex{}, + client: c, + kind: t.base.kind, + mimeType: mimeType, + localTrack: localTrack, + remoteTrack: t.remoteTrack, + isReceiveRed: c.receiveRED, + } } else { isScreen := &atomic.Bool{} isScreen.Store(t.IsScreen()) @@ -227,6 +263,7 @@ func (t *Track) subscribe(c *Client) iClientTrack { remoteTrack: t.remoteTrack, isScreen: isScreen, } + } if t.Kind() == webrtc.RTPCodecTypeAudio && c.IsVADEnabled() { @@ -281,6 +318,10 @@ func (t *Track) Relay(f func(webrtc.SSRC, rtp.Packet)) { }) } +func (t *Track) PayloadType() webrtc.PayloadType { + return t.base.codec.PayloadType +} + type SimulcastTrack struct { context context.Context mu sync.Mutex @@ -776,6 +817,10 @@ func (t *SimulcastTrack) Relay(f func(webrtc.SSRC, rtp.Packet)) { }) } +func (t *SimulcastTrack) PayloadType() webrtc.PayloadType { + return t.base.codec.PayloadType +} + type SubscribeTrackRequest struct { ClientID string `json:"client_id"` StreamID string `json:"stream_id"`