Skip to content

Commit

Permalink
fix packet mapping on svc
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohan Totting committed Apr 18, 2024
1 parent 3f681e1 commit 1670926
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 47 deletions.
3 changes: 3 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func DefaultClientOptions() ClientOptions {
MaxPlayoutDelay: 200,
JitterBufferMinWait: 20 * time.Millisecond,
JitterBufferMaxWait: 150 * time.Millisecond,
ReorderPackets: false,
}
}

Expand Down Expand Up @@ -507,6 +508,8 @@ func NewClient(s *SFU, id string, name string, peerConnectionConfig webrtc.Confi
defer cancel()
<-ctx.Done()
simulcastTrack := track.(*SimulcastTrack)
simulcastTrack.mu.Lock()
defer simulcastTrack.mu.Unlock()
if simulcastTrack.remoteTrackHigh != nil {
client.stats.removeReceiverStats(simulcastTrack.remoteTrackHigh.track.ID() + simulcastTrack.remoteTrackHigh.track.RID())
}
Expand Down
3 changes: 1 addition & 2 deletions clienttrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,8 @@ func (t *clientTrack) ReceiveBitrate() uint32 {
return 0
}

bitrate, err := t.client.stats.GetReceiverBitrate(t.ID(), t.remoteTrack.track.RID())
bitrate, err := t.client.stats.GetReceiverBitrate(t.remoteTrack.track.ID(), t.remoteTrack.track.RID())
if err != nil {
glog.Error("clienttrack: error on get receiver", err)
return 0
}

Expand Down
27 changes: 13 additions & 14 deletions clienttracksvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,15 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {

vp9Packet := &codecs.VP9Packet{}
if _, err := vp9Packet.Unmarshal(p.Payload); err != nil {
return
}
_ = t.packetmap.Drop(p.SequenceNumber, vp9Packet.PictureID)

// check if svc packet
if vp9Packet.SID > 0 || vp9Packet.TID > 0 {
if !t.remoteTrack.IsAdaptive() {
t.remoteTrack.SetSVC(true)
glog.Info("scalabletrack: remote track is adaptive")
}
return
}

quality := t.getQuality()
if quality == QualityNone {
_ = t.packetmap.Drop(p.SequenceNumber, vp9Packet.PictureID)

glog.Info("scalabletrack: packet ", p.SequenceNumber, " is dropped because of quality none")
return
}
Expand Down Expand Up @@ -182,6 +178,8 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {

if vp9Packet.E && t.tid == targetTID && t.sid == targetSID {
t.setLastQuality(quality)
} else {
t.RequestPLI()
}

if currentTID < vp9Packet.TID || currentSID < vp9Packet.SID {
Expand All @@ -190,6 +188,12 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {
if ok {
return
}

}

// mark packet as a last spatial layer packet
if vp9Packet.E && currentSID == vp9Packet.SID && targetSID <= currentSID {
p.Marker = true
}

ok, newseqno, _ := t.packetmap.Map(p.SequenceNumber, vp9Packet.PictureID)
Expand All @@ -199,11 +203,6 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {

p.SequenceNumber = newseqno

// mark packet as a last spatial layer packet
if vp9Packet.E && currentSID == vp9Packet.SID && targetSID <= currentSID {
p.Marker = true
}

t.send(p)
}

Expand Down Expand Up @@ -254,7 +253,7 @@ func (t *scaleableClientTrack) IsScaleable() bool {
}

func (t *scaleableClientTrack) RequestPLI() {
t.remoteTrack.sendPLI()
go t.remoteTrack.sendPLI()
}

func (t *scaleableClientTrack) getQuality() QualityLevel {
Expand Down
1 change: 1 addition & 0 deletions examples/http-websocket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func clientHandler(isDebug bool, conn *websocket.Conn, messageChan chan Request,
// you can also get the client by using r.GetClient(clientID)
opts := sfu.DefaultClientOptions()
opts.EnableVoiceDetection = true
opts.ReorderPackets = false
client, err := r.AddClient(clientID, clientID, opts)
if err != nil {
log.Panic(err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/pacer/leakybucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (p *LeakyBucketPacer) Close() error {
p.qLock.Lock()
defer p.qLock.Unlock()
for ssrc, queue := range p.queues {
queue.mu.RLock()
for e := queue.Front(); e != nil; e = e.Next() {
i, ok := e.Value.(*item)
if !ok {
Expand All @@ -284,6 +285,7 @@ func (p *LeakyBucketPacer) Close() error {

i.packet.Release()
}
queue.mu.RUnlock()

queue.Init()
delete(p.queues, ssrc)
Expand Down
38 changes: 7 additions & 31 deletions remotetrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type remoteTrack struct {
cancel context.CancelFunc
mu sync.RWMutex
track IRemoteTrack
isSVC bool

onRead func(*rtp.Packet)
onPLI func()
Expand Down Expand Up @@ -59,7 +58,7 @@ func newRemoteTrack(ctx context.Context, useBuffer bool, track IRemoteTrack, min
monitor: networkmonitor.Default(),
}

if useBuffer {
if useBuffer && track.Kind() == webrtc.RTPCodecTypeVideo {
rt.packetBuffers = newPacketBuffers(localctx, minWait, maxWait, true)
}

Expand All @@ -71,7 +70,7 @@ func newRemoteTrack(ctx context.Context, useBuffer bool, track IRemoteTrack, min

go rt.readRTP()

if useBuffer && rt.IsAdaptive() {
if useBuffer && track.Kind() == webrtc.RTPCodecTypeVideo {
go rt.loop()
}

Expand All @@ -86,21 +85,6 @@ func (t *remoteTrack) Context() context.Context {
return t.context
}

func (t *remoteTrack) IsAdaptive() bool {
return t.Track().RID() != "" || t.isSVC
}

func (t *remoteTrack) SetSVC(isAdaptive bool) {
t.mu.Lock()
defer t.mu.Unlock()

t.isSVC = isAdaptive

if t.packetBuffers != nil && isAdaptive {
go t.loop()
}
}

func (t *remoteTrack) readRTP() {
defer t.cancel()

Expand Down Expand Up @@ -144,13 +128,11 @@ func (t *remoteTrack) readRTP() {
go t.updateStats()
}

if t.Buffered() && t.Track().Kind() == webrtc.RTPCodecTypeVideo && t.IsAdaptive() {
// // adaptive video needs to be reordered
if t.Buffered() && t.Track().Kind() == webrtc.RTPCodecTypeVideo {
retainablePacket := rtppool.NewPacket(&p.Header, p.Payload)
_ = t.packetBuffers.Add(retainablePacket)

} else {
// audio and non adaptive video doesn't need to be reordered
t.onRead(p)
}

Expand Down Expand Up @@ -205,11 +187,6 @@ func (t *remoteTrack) loop() {
case <-ctx.Done():
return
default:
if !t.IsAdaptive() {
t.Flush()
return
}

t.packetBuffers.WaitAvailablePacket()
t.mu.RLock()
for orderedPkt := t.packetBuffers.Pop(); orderedPkt != nil; orderedPkt = t.packetBuffers.Pop() {
Expand Down Expand Up @@ -282,21 +259,20 @@ func (t *remoteTrack) Track() IRemoteTrack {
}

func (t *remoteTrack) sendPLI() {
t.mu.Lock()
defer t.mu.Unlock()

// return if there is a pending PLI request
maxGapSeconds := 250 * time.Millisecond
t.mu.RLock()
requestGap := time.Since(t.lastPLIRequestTime)
t.mu.RUnlock()

if requestGap < maxGapSeconds {
return // ignore PLI request
}

t.mu.Lock()
t.lastPLIRequestTime = time.Now()
t.mu.Unlock()

t.onPLI()
go t.onPLI()
}

func (t *remoteTrack) enableIntervalPLI(interval time.Duration) {
Expand Down

0 comments on commit 1670926

Please sign in to comment.