diff --git a/client.go b/client.go index d80974c..9f66554 100644 --- a/client.go +++ b/client.go @@ -1021,6 +1021,10 @@ func (c *Client) stop() error { c.mu.Lock() defer c.mu.Unlock() + if c.peerConnection.pc.ConnectionState() == webrtc.PeerConnectionStateClosed { + return nil + } + err := c.peerConnection.Close() if err != nil { return err diff --git a/clienttracksimulcast.go b/clienttracksimulcast.go index 54ff9c5..2b5b4b7 100644 --- a/clienttracksimulcast.go +++ b/clienttracksimulcast.go @@ -113,6 +113,10 @@ func (t *simulcastClientTrack) push(p *rtp.Packet, quality QualityLevel) { targetQuality := t.getQuality() + if targetQuality == QualityNone { + return + } + if !t.client.bitrateController.exists(t.ID()) { // do nothing if the bitrate claim is not exist return @@ -285,6 +289,10 @@ func (t *simulcastClientTrack) getQuality() QualityLevel { claim := t.client.bitrateController.GetClaim(t.ID()) + if claim == nil { + return QualityNone + } + quality := min(claim.quality, t.MaxQuality(), Uint32ToQualityLevel(t.client.quality.Load())) if quality != QualityNone && !track.isTrackActive(quality) { diff --git a/clienttracksvc.go b/clienttracksvc.go index d8d881f..ea16ad9 100644 --- a/clienttracksvc.go +++ b/clienttracksvc.go @@ -49,17 +49,16 @@ func DefaultQualityPresets() QualityPresets { type scaleableClientTrack struct { *clientTrack - lastQuality QualityLevel - maxQuality QualityLevel - tid uint8 - sid uint8 - lastTimestamp uint32 - lastSequence uint16 - baseSequence uint16 - qualityPresets QualityPresets - hasInterPicture bool - init bool - packetCaches *packetCaches + lastQuality QualityLevel + maxQuality QualityLevel + tid uint8 + sid uint8 + lastTimestamp uint32 + lastSequence uint16 + baseSequence uint16 + qualityPresets QualityPresets + init bool + packetCaches *packetCaches } func newScaleableClientTrack( @@ -103,14 +102,7 @@ func (t *scaleableClientTrack) isKeyframe(vp9 *codecs.VP9Packet) bool { func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) { var qualityPreset IQualityPreset - t.mu.RLock() - isLatePacket := IsRTPPacketLate(p.SequenceNumber, t.lastSequence) - t.mu.RUnlock() - - if !t.init { - t.init = true - t.baseSequence = p.SequenceNumber - } + var isLatePacket bool vp9Packet := &codecs.VP9Packet{} if _, err := vp9Packet.Unmarshal(p.Payload); err != nil { @@ -137,18 +129,15 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) { targetSID := qualityPreset.GetSID() targetTID := qualityPreset.GetTID() - if !t.hasInterPicture { - if !vp9Packet.P { - t.baseSequence++ - t.RequestPLI() - glog.Info("scalabletrack: packet ", p.SequenceNumber, " client ", t.client.ID(), " is dropped because of no intra frame") - return - - } - - t.hasInterPicture = true + if !t.init { + t.init = true + t.baseSequence = p.SequenceNumber t.sid = targetSID t.tid = targetTID + } else { + t.mu.RLock() + isLatePacket = IsRTPPacketLate(p.SequenceNumber, t.lastSequence) + t.mu.RUnlock() } currentBaseSequence := t.baseSequence diff --git a/manager.go b/manager.go index b2db5ae..9c44e4d 100644 --- a/manager.go +++ b/manager.go @@ -5,6 +5,7 @@ import ( "errors" "sync" + "github.com/golang/glog" "github.com/pion/webrtc/v3" ) @@ -107,31 +108,25 @@ func (m *Manager) NewRoom(id, name, roomType string, opts RoomOptions) (*Room, e } }) - var emptyRoomTimeout context.Context + idle := true var emptyRoomCancel context.CancelFunc + _, emptyRoomCancel = startRoomTimeout(m, room) + room.OnClientLeft(func(client *Client) { if room.SFU().clients.Length() == 0 { - emptyRoomTimeout, emptyRoomCancel = context.WithTimeout(m.context, room.options.EmptyRoomTimeout) - go func() { - <-emptyRoomTimeout.Done() - if emptyRoomTimeout.Err() == context.DeadlineExceeded { - m.mutex.Lock() - defer m.mutex.Unlock() - room.Close() - delete(m.rooms, room.id) - emptyRoomCancel = nil - emptyRoomTimeout = nil - } - }() + idle = true + _, emptyRoomCancel = startRoomTimeout(m, room) } }) room.OnClientJoined(func(client *Client) { - if emptyRoomTimeout != nil && emptyRoomCancel != nil { + if idle { emptyRoomCancel() } + + idle = false }) m.rooms[room.id] = room @@ -218,3 +213,24 @@ func (m *Manager) Close() { func (m *Manager) Context() context.Context { return m.context } + +func startRoomTimeout(m *Manager, room *Room) (context.Context, context.CancelFunc) { + var cancel context.CancelFunc + + var ctx context.Context + + ctx, cancel = context.WithTimeout(m.context, room.options.EmptyRoomTimeout) + + go func() { + <-ctx.Done() + if ctx.Err() == context.DeadlineExceeded { + m.mutex.Lock() + defer m.mutex.Unlock() + room.Close() + delete(m.rooms, room.id) + glog.Info("room ", room.id, " is closed because it's empty and idle for ", room.options.EmptyRoomTimeout) + } + }() + + return ctx, cancel +}