Skip to content

Commit

Permalink
add packet mapping and clean some codes
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohan Totting committed Apr 13, 2024
1 parent fb2511b commit 7fa02ef
Show file tree
Hide file tree
Showing 10 changed files with 585 additions and 160 deletions.
85 changes: 4 additions & 81 deletions bitratecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ type bitrateClaim struct {
bitrate uint32
quality QualityLevel
simulcast bool
delayCounter int
lastIncreaseTime time.Time
lastDecreaseTime time.Time
}

func (c *bitrateClaim) Quality() QualityLevel {
Expand Down Expand Up @@ -130,59 +128,6 @@ func (bc *bitrateController) setQuality(clientTrackID string, quality QualityLev
}
}

func (bc *bitrateController) setSimulcastClaim(clientTrackID string, simulcast bool) {
bc.mu.Lock()
defer bc.mu.Unlock()

if claim, ok := bc.claims[clientTrackID]; ok {
claim.simulcast = simulcast
bc.claims[clientTrackID] = claim
}
}

// this handle some simulcast failed to send mid and low track, only high track available
// by default we just send the high track that is only available
func (bc *bitrateController) checkAllTrackActive(claim *bitrateClaim) (bool, QualityLevel) {
trackCount := 0
quality := QualityNone
track, ok := claim.track.(*simulcastClientTrack)

if ok {
if track.remoteTrack.remoteTrackHigh != nil {
trackCount++
quality = QualityHigh
}

if track.remoteTrack.remoteTrackMid != nil {
trackCount++
quality = QualityMid
}

if track.remoteTrack.remoteTrackLow != nil {
trackCount++
quality = QualityLow
}

if trackCount == 1 {
qualityLvl := Uint32ToQualityLevel(uint32(quality))
if claim.quality != qualityLvl {
bc.setQuality(claim.track.ID(), qualityLvl)
}

// this will force the current track identified as non simulcast track
if claim.simulcast {
bc.setSimulcastClaim(claim.track.ID(), false)
}

return true, qualityLvl
}

return true, claim.quality
}

return false, claim.quality
}

func (bc *bitrateController) addAudioClaims(clientTracks []iClientTrack) (leftTracks []iClientTrack, err error) {
errors := make([]error, 0)

Expand Down Expand Up @@ -299,11 +244,15 @@ func (bc *bitrateController) addClaim(clientTrack iClientTrack, quality QualityL
go func() {
ctx, cancel := context.WithCancel(clientTrack.Context())
defer cancel()

<-ctx.Done()

bc.removeClaim(clientTrack.ID())

if bc.client.IsDebugEnabled() {
glog.Info("clienttrack: track ", clientTrack.ID(), " claim removed")
}

clientTrack.Client().stats.removeSenderStats(clientTrack.ID())
}()

Expand Down Expand Up @@ -333,32 +282,6 @@ func (bc *bitrateController) exists(id string) bool {
return false
}

func (bc *bitrateController) isScreenNeedIncrease(highestQuality QualityLevel) bool {
bc.mu.RLock()
defer bc.mu.RUnlock()

for _, claim := range bc.claims {
if claim.track.IsScreen() && claim.quality <= highestQuality {
return true
}
}

return false
}

func (bc *bitrateController) isThereNonScreenCanDecrease(lowestQuality QualityLevel) bool {
bc.mu.RLock()
defer bc.mu.RUnlock()

for _, claim := range bc.claims {
if !claim.track.IsScreen() && (claim.track.IsScaleable() || claim.track.IsSimulcast()) && claim.quality > lowestQuality {
return true
}
}

return false
}

func (bc *bitrateController) totalSentBitrates() uint32 {
total := uint32(0)

Expand Down
71 changes: 34 additions & 37 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,22 @@ type ClientOptions struct {
Type string
EnableVoiceDetection bool
EnablePlayoutDelay bool
// Configure the minimum playout delay that will be used by the room
// Configure the minimum playout delay that will be used by the client
// Recommendation:
// 0 ms: Certain gaming scenarios (likely without audio) where we will want to play the frame as soon as possible. Also, for remote desktop without audio where rendering a frame asap makes sense
// 100/150/200 ms: These could be the max target latency for interactive streaming use cases depending on the actual application (gaming, remoting with audio, interactive scenarios)
// 400 ms: Application that want to ensure a network glitch has very little chance of causing a freeze can start with a minimum delay target that is high enough to deal with network issues. Video streaming is one example.
MinPlayoutDelay uint16
// Configure the minimum playout delay that will be used by the room
// Configure the minimum playout delay that will be used by the client
// Recommendation:
// 0 ms: Certain gaming scenarios (likely without audio) where we will want to play the frame as soon as possible. Also, for remote desktop without audio where rendering a frame asap makes sense
// 100/150/200 ms: These could be the max target latency for interactive streaming use cases depending on the actual application (gaming, remoting with audio, interactive scenarios)
// 400 ms: Application that want to ensure a network glitch has very little chance of causing a freeze can start with a minimum delay target that is high enough to deal with network issues. Video streaming is one example.
MaxPlayoutDelay uint16
JitterBufferMinWait time.Duration
JitterBufferMaxWait time.Duration
// disable this will turn off the adaptive stream and jitter buffer. It just forward all received packets
EnableAdaptiveStream bool
// On unstable network, the packets can be arrived unordered which may affected the nack and packet loss counts, set this to true to allow the SFU to handle reordered packet
ReorderPackets bool
}

type internalDataMessage struct {
Expand Down Expand Up @@ -180,13 +180,12 @@ func DefaultClientOptions() ClientOptions {
return ClientOptions{
IdleTimeout: 5 * time.Minute,
Type: ClientTypePeer,
EnableVoiceDetection: false,
EnableVoiceDetection: true,
EnablePlayoutDelay: true,
MinPlayoutDelay: 100,
MaxPlayoutDelay: 200,
JitterBufferMinWait: 20 * time.Millisecond,
JitterBufferMaxWait: 150 * time.Millisecond,
EnableAdaptiveStream: true,
}
}

Expand Down Expand Up @@ -501,7 +500,7 @@ func NewClient(s *SFU, id string, name string, peerConnectionConfig webrtc.Confi

if err != nil {
// if track not found, add it
track = newSimulcastTrack(client.context, client, remoteTrack, opts.JitterBufferMinWait, opts.JitterBufferMaxWait, s.pliInterval, onPLI, client.statsGetter, onStatsUpdated)
track = newSimulcastTrack(client.context, client.options.ReorderPackets, client, remoteTrack, opts.JitterBufferMinWait, opts.JitterBufferMaxWait, s.pliInterval, onPLI, client.statsGetter, onStatsUpdated)
if err := client.tracks.Add(track); err != nil {
glog.Error("client: error add track ", err)
}
Expand All @@ -513,10 +512,6 @@ func NewClient(s *SFU, id string, name string, peerConnectionConfig webrtc.Confi
client.stats.removeReceiverStats(remoteTrack.ID())
}()

if simulcast, ok = track.(*SimulcastTrack); !ok {
glog.Error("client: error track is not simulcast track")
}

} else if simulcast, ok = track.(*SimulcastTrack); ok {
simulcast.AddRemoteTrack(simulcast.context, remoteTrack, opts.JitterBufferMinWait, opts.JitterBufferMaxWait, client.statsGetter, onStatsUpdated, onPLI)
}
Expand Down Expand Up @@ -819,7 +814,8 @@ func (c *Client) renegotiateQueuOp() {
err = c.peerConnection.PC().SetLocalDescription(offer)
if err != nil {
glog.Error("sfu: error set local description on renegotiation ", err)
c.stop()
_ = c.stop()

return
}

Expand All @@ -828,19 +824,22 @@ func (c *Client) renegotiateQueuOp() {
if err != nil {
//TODO: when this happen, we need to close the client and ask the remote client to reconnect
glog.Error("sfu: error on renegotiation ", err)
c.stop()
_ = c.stop()

return
}

if answer.Type != webrtc.SDPTypeAnswer {
glog.Error("sfu: error on renegotiation, the answer is not an answer type")
c.stop()
_ = c.stop()

return
}

err = c.peerConnection.PC().SetRemoteDescription(answer)
if err != nil {
c.stop()
_ = c.stop()

return
}
}
Expand Down Expand Up @@ -918,9 +917,6 @@ func (c *Client) setClientTrack(t ITrack) iClientTrack {
}()

sender := senderTcv.Sender()
if sender == nil {
return
}

if c.peerConnection == nil || c.peerConnection.PC() == nil || sender == nil || c.peerConnection.PC().ConnectionState() == webrtc.PeerConnectionStateClosed {
return
Expand Down Expand Up @@ -1285,32 +1281,33 @@ func (c *Client) SubscribeTracks(req []SubscribeTrackRequest) error {
continue
}

if client, err := c.sfu.clients.GetClient(r.ClientID); err == nil {
for _, track := range client.tracks.GetTracks() {
if track.ID() == r.TrackID {
if clientTrack := c.setClientTrack(track); clientTrack != nil {
clientTracks = append(clientTracks, clientTrack)
}
client, err := c.sfu.clients.GetClient(r.ClientID)
if err != nil {
return err
}

glog.Info("client: subscribe track ", r.TrackID, " from ", r.ClientID, " to ", c.ID())
for _, track := range client.tracks.GetTracks() {
if track.ID() == r.TrackID {
if clientTrack := c.setClientTrack(track); clientTrack != nil {
clientTracks = append(clientTracks, clientTrack)
}

trackFound = true
glog.Info("client: subscribe track ", r.TrackID, " from ", r.ClientID, " to ", c.ID())

}
}
trackFound = true

// look on relay tracks
for _, track := range c.SFU().relayTracks {
if track.ID() == r.TrackID {
if clientTrack := c.setClientTrack(track); clientTrack != nil {
clientTracks = append(clientTracks, clientTrack)
}
}
}

trackFound = true
// look on relay tracks
for _, track := range c.SFU().relayTracks {
if track.ID() == r.TrackID {
if clientTrack := c.setClientTrack(track); clientTrack != nil {
clientTracks = append(clientTracks, clientTrack)
}

trackFound = true
}
} else if err != nil {
return err
}

if !trackFound {
Expand Down
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestClientDataChannel(t *testing.T) {
t.Fatal("timeout waiting for data channel")
case state := <-connChan:
if state == webrtc.PeerConnectionStateConnected {
pc.CreateDataChannel("test", nil)
_, _ = pc.CreateDataChannel("test", nil)

negotiate(pc, client)
}
Expand Down
34 changes: 15 additions & 19 deletions clienttracksvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sfu

import (
"github.com/golang/glog"
"github.com/inlivedev/sfu/pkg/packetmap"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
)
Expand Down Expand Up @@ -55,10 +56,10 @@ type scaleableClientTrack struct {
sid uint8
lastTimestamp uint32
lastSequence uint16
baseSequence uint16
qualityPresets QualityPresets
init bool
packetCaches *packetCaches
packetmap *packetmap.Map
}

func newScaleableClientTrack(
Expand All @@ -74,7 +75,7 @@ func newScaleableClientTrack(
lastQuality: QualityHigh,
tid: qualityPresets.High.TID,
sid: qualityPresets.High.SID,
packetCaches: newPacketCaches(),
packetmap: &packetmap.Map{},
}

return sct
Expand Down Expand Up @@ -109,7 +110,6 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {

vp9Packet := &codecs.VP9Packet{}
if _, err := vp9Packet.Unmarshal(p.Payload); err != nil {
t.baseSequence++
return
}

Expand All @@ -123,7 +123,6 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {

quality := t.getQuality()
if quality == QualityNone {
t.baseSequence++
glog.Info("scalabletrack: packet ", p.SequenceNumber, " is dropped because of quality none")
return
}
Expand All @@ -142,7 +141,6 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {

if !t.init {
t.init = true
t.baseSequence = p.SequenceNumber
t.sid = targetSID
t.tid = targetTID
} else {
Expand All @@ -168,12 +166,6 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {
}
}

if currentTID < vp9Packet.TID {
// glog.Info("scalabletrack: packet ", p.SequenceNumber, " is dropped because of currentTID ", currentTID, " < vp9Packet.TID", vp9Packet.TID)
t.baseSequence++
return
}

// check if possible to scale up spatial layer

if currentSID < targetSID && !isLatePacket {
Expand All @@ -193,12 +185,21 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {
t.setLastQuality(quality)
}

if currentSID < vp9Packet.SID {
t.baseSequence++
// glog.Info("scalabletrack: packet ", p.SequenceNumber, " is dropped because of currentSID ", currentSID, " < vp9Packet.SID", vp9Packet.SID)
if currentTID < vp9Packet.TID || currentSID < vp9Packet.SID {
// glog.Info("scalabletrack: packet ", p.SequenceNumber, " is dropped because of currentTID ", currentTID, " < vp9Packet.TID", vp9Packet.TID)
ok := t.packetmap.Drop(p.SequenceNumber, vp9Packet.PictureID)
if ok {
return
}
}

ok, newseqno, _ := t.packetmap.Map(p.SequenceNumber, vp9Packet.PictureID)
if !ok {
return
}

p.SequenceNumber = newseqno

// mark packet as a last spatial layer packet
if vp9Packet.E && currentSID == vp9Packet.SID && targetSID <= currentSID {
p.Marker = true
Expand All @@ -208,11 +209,6 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {
}

func (t *scaleableClientTrack) send(p *rtp.Packet, pSID, tSID, decidedSID, decidedTID uint8) {

t.packetCaches.Add(p.SequenceNumber, t.baseSequence, p.Timestamp, pSID, tSID, decidedSID, decidedTID)

p.Header.SequenceNumber = p.Header.SequenceNumber - t.baseSequence

t.lastTimestamp = p.Timestamp

if err := t.localTrack.WriteRTP(p); err != nil {
Expand Down
Loading

0 comments on commit 7fa02ef

Please sign in to comment.