diff --git a/client.go b/client.go index 721dfa5..e2609f9 100644 --- a/client.go +++ b/client.go @@ -385,9 +385,9 @@ func NewClient(s *SFU, id string, name string, peerConnectionConfig webrtc.Confi simulcast.AddRemoteTrack(remoteTrack, client.statsGetter, onStatsUpdated) } - // // only process track when the highest quality is available + // // only process track when the lowest quality is available // simulcast.mu.Lock() - // isHighAvailable := simulcast.remoteTrackHigh != nil + // isLowAvailable := simulcast.remoteTrackLow != nil // simulcast.mu.Unlock() if !track.IsProcessed() { diff --git a/clienttrack.go b/clienttrack.go index 07abb75..5e7744f 100644 --- a/clienttrack.go +++ b/clienttrack.go @@ -10,7 +10,7 @@ import ( ) type iClientTrack interface { - push(rtp *rtp.Packet, quality QualityLevel) + push(rtp rtp.Packet, quality QualityLevel) ID() string Kind() webrtc.RTPCodecType LocalTrack() *webrtc.TrackLocalStaticRTP @@ -50,7 +50,7 @@ func (t *clientTrack) Kind() webrtc.RTPCodecType { return t.remoteTrack.track.Kind() } -func (t *clientTrack) push(rtp *rtp.Packet, quality QualityLevel) { +func (t *clientTrack) push(rtp rtp.Packet, quality QualityLevel) { if t.client.peerConnection.PC().ConnectionState() != webrtc.PeerConnectionStateConnected { return } @@ -59,7 +59,7 @@ func (t *clientTrack) push(rtp *rtp.Packet, quality QualityLevel) { // do something here with audio level } - if err := t.localTrack.WriteRTP(rtp); err != nil { + if err := t.localTrack.WriteRTP(&rtp); err != nil { glog.Error("clienttrack: error on write rtp", err) } } diff --git a/docs/extension.md b/docs/extension.md new file mode 100644 index 0000000..f27c6cc --- /dev/null +++ b/docs/extension.md @@ -0,0 +1,5 @@ +# Developing an extension package +An extension can add more features to the SFU without need to modify the SFU code. + +## How it works +The extension can be develop by utilizing the events from SFU components. Each component has its own events. \ No newline at end of file diff --git a/remotetrack.go b/remotetrack.go index 54f6d7e..846d3c2 100644 --- a/remotetrack.go +++ b/remotetrack.go @@ -18,7 +18,7 @@ type remoteTrack struct { cancel context.CancelFunc mu sync.Mutex track IRemoteTrack - onRead func(*rtp.Packet) + onRead func(rtp.Packet) onPLI func() error bitrate *atomic.Uint32 previousBytesReceived *atomic.Uint64 @@ -30,7 +30,7 @@ type remoteTrack struct { onStatsUpdated func(*stats.Stats) } -func newRemoteTrack(ctx context.Context, track IRemoteTrack, pliInterval time.Duration, onPLI func() error, statsGetter stats.Getter, onStatsUpdated func(*stats.Stats), onRead func(*rtp.Packet)) *remoteTrack { +func newRemoteTrack(ctx context.Context, track IRemoteTrack, pliInterval time.Duration, onPLI func() error, statsGetter stats.Getter, onStatsUpdated func(*stats.Stats), onRead func(rtp.Packet)) *remoteTrack { localctx, cancel := context.WithCancel(ctx) rt := &remoteTrack{ context: localctx, @@ -88,7 +88,7 @@ func (t *remoteTrack) readRTP() { return } - t.onRead(rtp) + t.onRead(*rtp) go t.updateStats() } diff --git a/scalableclienttrack.go b/scalableclienttrack.go index 6a958b3..d754da8 100644 --- a/scalableclienttrack.go +++ b/scalableclienttrack.go @@ -106,11 +106,11 @@ func (t *scaleableClientTrack) Client() *Client { return t.client } -func (t *scaleableClientTrack) writeRTP(p *rtp.Packet) { +func (t *scaleableClientTrack) writeRTP(p rtp.Packet) { t.lastTimestamp = p.Timestamp t.sequenceNumber = p.SequenceNumber - if err := t.localTrack.WriteRTP(p); err != nil { + if err := t.localTrack.WriteRTP(&p); err != nil { glog.Error("track: error on write rtp", err) } } @@ -136,7 +136,7 @@ func (t *scaleableClientTrack) isKeyframe(vp9 *codecs.VP9Packet) bool { // this where the temporal and spatial layers are will be decided to be sent to the client or not // compare it with the claimed quality to decide if the packet should be sent or not -func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) { +func (t *scaleableClientTrack) push(p rtp.Packet, _ QualityLevel) { var qualityPreset IQualityPreset vp9Packet := &codecs.VP9Packet{} @@ -211,7 +211,7 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) { t.send(p) } -func (t *scaleableClientTrack) send(p *rtp.Packet) { +func (t *scaleableClientTrack) send(p rtp.Packet) { p.SequenceNumber = p.SequenceNumber - t.dropCounter t.writeRTP(p) } diff --git a/simulcastclienttrack.go b/simulcastclienttrack.go index 84e4cbd..ac4ed94 100644 --- a/simulcastclienttrack.go +++ b/simulcastclienttrack.go @@ -34,32 +34,32 @@ func (t *simulcastClientTrack) Client() *Client { return t.client } -func (t *simulcastClientTrack) isFirstKeyframePacket(p *rtp.Packet) bool { +func (t *simulcastClientTrack) isFirstKeyframePacket(p rtp.Packet) bool { isKeyframe := IsKeyframe(t.mimeType, p) return isKeyframe && t.lastTimestamp.Load() != p.Timestamp } -func (t *simulcastClientTrack) send(p *rtp.Packet, quality QualityLevel, lastQuality QualityLevel) { +func (t *simulcastClientTrack) send(p rtp.Packet, quality QualityLevel, lastQuality QualityLevel) { t.lastTimestamp.Store(p.Timestamp) if lastQuality != quality { t.lastQuality.Store(uint32(quality)) } - t.rewritePacket(p, quality) + p = t.rewritePacket(p, quality) t.writeRTP(p) } -func (t *simulcastClientTrack) writeRTP(p *rtp.Packet) { - if err := t.localTrack.WriteRTP(p); err != nil { +func (t *simulcastClientTrack) writeRTP(p rtp.Packet) { + if err := t.localTrack.WriteRTP(&p); err != nil { glog.Error("track: error on write rtp", err) } } -func (t *simulcastClientTrack) push(p *rtp.Packet, quality QualityLevel) { +func (t *simulcastClientTrack) push(p rtp.Packet, quality QualityLevel) { var trackQuality QualityLevel lastQuality := t.LastQuality() @@ -245,7 +245,7 @@ func (t *simulcastClientTrack) IsScaleable() bool { return false } -func (t *simulcastClientTrack) rewritePacket(p *rtp.Packet, quality QualityLevel) { +func (t *simulcastClientTrack) rewritePacket(p rtp.Packet, quality QualityLevel) rtp.Packet { t.remoteTrack.mu.Lock() defer t.remoteTrack.mu.Unlock() // make sure the timestamp and sequence number is consistent from the previous packet even it is not the same track @@ -265,6 +265,8 @@ func (t *simulcastClientTrack) rewritePacket(p *rtp.Packet, quality QualityLevel t.sequenceNumber.Add(uint32(sequenceDelta)) p.SequenceNumber = uint16(t.sequenceNumber.Load()) + + return p } func (t *simulcastClientTrack) RequestPLI() { diff --git a/track.go b/track.go index 4dd002c..044ca30 100644 --- a/track.go +++ b/track.go @@ -90,7 +90,7 @@ func newTrack(ctx context.Context, clientID string, trackRemote IRemoteTrack, pl onEndedCallbacks: make([]func(), 0), } - onRead := func(p *rtp.Packet) { + onRead := func(p rtp.Packet) { // do tracks := t.base.clientTracks.GetTracks() for _, track := range tracks { @@ -278,13 +278,12 @@ func (t *Track) OnRead(callback func(rtp.Packet, QualityLevel)) { t.onReadCallbacks = append(t.onReadCallbacks, callback) } -func (t *Track) onRead(p *rtp.Packet, quality QualityLevel) { +func (t *Track) onRead(p rtp.Packet, quality QualityLevel) { // t.mu.Lock() // defer t.mu.Unlock() for _, callback := range t.onReadCallbacks { - pClone := *p - callback(pClone, quality) + callback(p, quality) } } @@ -427,7 +426,7 @@ func (t *SimulcastTrack) AddRemoteTrack(track IRemoteTrack, stats stats.Getter, quality := RIDToQuality(track.RID()) - onRead := func(p *rtp.Packet) { + onRead := func(p rtp.Packet) { // set the base timestamp for the track if it is not set yet if t.baseTS == 0 { t.baseTS = p.Timestamp @@ -466,18 +465,18 @@ func (t *SimulcastTrack) AddRemoteTrack(track IRemoteTrack, stats stats.Getter, t.onRead(p, quality) } - t.mu.Lock() - remoteTrack = newRemoteTrack(t.context, track, t.pliInterval, t.onPLI, stats, onStatsUpdated, onRead) switch quality { case QualityHigh: + t.mu.Lock() t.remoteTrackHigh = remoteTrack + t.mu.Unlock() remoteTrack.OnEnded(func() { t.mu.Lock() - defer t.mu.Unlock() t.remoteTrackHigh = nil + t.mu.Unlock() if t.remoteTrackHigh == nil && t.remoteTrackMid == nil && t.remoteTrackLow == nil { t.onEnded() @@ -485,12 +484,14 @@ func (t *SimulcastTrack) AddRemoteTrack(track IRemoteTrack, stats stats.Getter, }) case QualityMid: + t.mu.Lock() t.remoteTrackMid = remoteTrack + t.mu.Unlock() remoteTrack.OnEnded(func() { t.mu.Lock() - defer t.mu.Unlock() t.remoteTrackMid = nil + t.mu.Unlock() if t.remoteTrackHigh == nil && t.remoteTrackMid == nil && t.remoteTrackLow == nil { t.onEnded() @@ -498,11 +499,14 @@ func (t *SimulcastTrack) AddRemoteTrack(track IRemoteTrack, stats stats.Getter, }) case QualityLow: + t.mu.Lock() t.remoteTrackLow = remoteTrack + t.mu.Unlock() + remoteTrack.OnEnded(func() { t.mu.Lock() - defer t.mu.Unlock() t.remoteTrackLow = nil + t.mu.Unlock() if t.remoteTrackHigh == nil && t.remoteTrackMid == nil && t.remoteTrackLow == nil { t.onEnded() @@ -518,8 +522,6 @@ func (t *SimulcastTrack) AddRemoteTrack(track IRemoteTrack, stats stats.Getter, t.onTrackComplete() } - t.mu.Unlock() - t.onRemoteTrackAddedCallbacks(remoteTrack) return remoteTrack @@ -751,12 +753,9 @@ func (t *SimulcastTrack) OnRead(callback func(rtp.Packet, QualityLevel)) { t.onReadCallbacks = append(t.onReadCallbacks, callback) } -func (t *SimulcastTrack) onRead(p *rtp.Packet, quality QualityLevel) { - t.mu.Lock() - defer t.mu.Unlock() - +func (t *SimulcastTrack) onRead(p rtp.Packet, quality QualityLevel) { for _, callback := range t.onReadCallbacks { - callback(*p, quality) + callback(p, quality) } } diff --git a/util.go b/util.go index d54b41d..6be3e20 100644 --- a/util.go +++ b/util.go @@ -107,7 +107,7 @@ func RegisterSimulcastHeaderExtensions(m *webrtc.MediaEngine, codecType webrtc.R } } -func IsKeyframe(codec string, packet *rtp.Packet) bool { +func IsKeyframe(codec string, packet rtp.Packet) bool { isIt1, isIt2 := Keyframe(codec, packet) return isIt1 && isIt2 } @@ -119,7 +119,7 @@ func IsKeyframe(codec string, packet *rtp.Packet) bool { // It returns (true, true) if that is the case, (false, true) if that is // definitely not the case, and (false, false) if the information cannot // be determined. -func Keyframe(codec string, packet *rtp.Packet) (bool, bool) { +func Keyframe(codec string, packet rtp.Packet) (bool, bool) { if strings.EqualFold(codec, "video/vp8") { var vp8 codecs.VP8Packet _, err := vp8.Unmarshal(packet.Payload)