Skip to content

Commit

Permalink
fix idle timeout and some improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohan Totting committed Mar 28, 2024
1 parent a88d204 commit fa39f90
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 43 deletions.
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,10 @@ func (c *Client) stop() error {
c.mu.Lock()
defer c.mu.Unlock()

if c.peerConnection.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
return nil
}

err := c.peerConnection.Close()
if err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions clienttracksimulcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (t *simulcastClientTrack) push(p *rtp.Packet, quality QualityLevel) {

targetQuality := t.getQuality()

if targetQuality == QualityNone {
return
}

if !t.client.bitrateController.exists(t.ID()) {
// do nothing if the bitrate claim is not exist
return
Expand Down Expand Up @@ -285,6 +289,10 @@ func (t *simulcastClientTrack) getQuality() QualityLevel {

claim := t.client.bitrateController.GetClaim(t.ID())

if claim == nil {
return QualityNone
}

quality := min(claim.quality, t.MaxQuality(), Uint32ToQualityLevel(t.client.quality.Load()))

if quality != QualityNone && !track.isTrackActive(quality) {
Expand Down
47 changes: 18 additions & 29 deletions clienttracksvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,16 @@ func DefaultQualityPresets() QualityPresets {

type scaleableClientTrack struct {
*clientTrack
lastQuality QualityLevel
maxQuality QualityLevel
tid uint8
sid uint8
lastTimestamp uint32
lastSequence uint16
baseSequence uint16
qualityPresets QualityPresets
hasInterPicture bool
init bool
packetCaches *packetCaches
lastQuality QualityLevel
maxQuality QualityLevel
tid uint8
sid uint8
lastTimestamp uint32
lastSequence uint16
baseSequence uint16
qualityPresets QualityPresets
init bool
packetCaches *packetCaches
}

func newScaleableClientTrack(
Expand Down Expand Up @@ -103,14 +102,7 @@ func (t *scaleableClientTrack) isKeyframe(vp9 *codecs.VP9Packet) bool {
func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {
var qualityPreset IQualityPreset

t.mu.RLock()
isLatePacket := IsRTPPacketLate(p.SequenceNumber, t.lastSequence)
t.mu.RUnlock()

if !t.init {
t.init = true
t.baseSequence = p.SequenceNumber
}
var isLatePacket bool

vp9Packet := &codecs.VP9Packet{}
if _, err := vp9Packet.Unmarshal(p.Payload); err != nil {
Expand All @@ -137,18 +129,15 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {
targetSID := qualityPreset.GetSID()
targetTID := qualityPreset.GetTID()

if !t.hasInterPicture {
if !vp9Packet.P {
t.baseSequence++
t.RequestPLI()
glog.Info("scalabletrack: packet ", p.SequenceNumber, " client ", t.client.ID(), " is dropped because of no intra frame")
return

}

t.hasInterPicture = true
if !t.init {
t.init = true
t.baseSequence = p.SequenceNumber
t.sid = targetSID
t.tid = targetTID
} else {
t.mu.RLock()
isLatePacket = IsRTPPacketLate(p.SequenceNumber, t.lastSequence)
t.mu.RUnlock()
}

currentBaseSequence := t.baseSequence
Expand Down
44 changes: 30 additions & 14 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"sync"

"github.com/golang/glog"
"github.com/pion/webrtc/v3"
)

Expand Down Expand Up @@ -107,31 +108,25 @@ func (m *Manager) NewRoom(id, name, roomType string, opts RoomOptions) (*Room, e
}
})

var emptyRoomTimeout context.Context
idle := true

var emptyRoomCancel context.CancelFunc

_, emptyRoomCancel = startRoomTimeout(m, room)

room.OnClientLeft(func(client *Client) {
if room.SFU().clients.Length() == 0 {
emptyRoomTimeout, emptyRoomCancel = context.WithTimeout(m.context, room.options.EmptyRoomTimeout)
go func() {
<-emptyRoomTimeout.Done()
if emptyRoomTimeout.Err() == context.DeadlineExceeded {
m.mutex.Lock()
defer m.mutex.Unlock()
room.Close()
delete(m.rooms, room.id)
emptyRoomCancel = nil
emptyRoomTimeout = nil
}
}()
idle = true
_, emptyRoomCancel = startRoomTimeout(m, room)
}
})

room.OnClientJoined(func(client *Client) {
if emptyRoomTimeout != nil && emptyRoomCancel != nil {
if idle {
emptyRoomCancel()
}

idle = false
})

m.rooms[room.id] = room
Expand Down Expand Up @@ -218,3 +213,24 @@ func (m *Manager) Close() {
func (m *Manager) Context() context.Context {
return m.context
}

func startRoomTimeout(m *Manager, room *Room) (context.Context, context.CancelFunc) {
var cancel context.CancelFunc

var ctx context.Context

ctx, cancel = context.WithTimeout(m.context, room.options.EmptyRoomTimeout)

go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
m.mutex.Lock()
defer m.mutex.Unlock()
room.Close()
delete(m.rooms, room.id)
glog.Info("room ", room.id, " is closed because it's empty and idle for ", room.options.EmptyRoomTimeout)
}
}()

return ctx, cancel
}

0 comments on commit fa39f90

Please sign in to comment.