Skip to content

Commit

Permalink
Refactor negotiation process and improve video
Browse files Browse the repository at this point in the history
quality
  • Loading branch information
Yohan Totting committed Nov 9, 2023
1 parent b8b4184 commit 689c467
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 29 deletions.
37 changes: 28 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,16 +511,14 @@ func (c *Client) OnRenegotiation(callback func(context.Context, webrtc.SessionDe
c.onRenegotiation = callback
}

// The renegotiation can be in race condition when a client is renegotiating and new track is added to the client because another client is publishing to the room.
// We can block the renegotiation until the current renegotiation is finish, but it will block the negotiation process for a while.
// TODO:
// delay negotiation using timeout and make sure only one negotiation is running when a client left and joined again
func (c *Client) renegotiateQueuOp() {
if c.onRenegotiation == nil {
glog.Error("client: onRenegotiation is not set, can't do renegotiation")
return
}

c.negotiationNeeded.Store(true)

if c.isInRemoteNegotiation.Load() {
glog.Info("sfu: renegotiation is delayed because the remote client is doing negotiation ", c.ID)

Expand All @@ -536,7 +534,13 @@ func (c *Client) renegotiateQueuOp() {
// mark negotiation is in progress to make sure no concurrent negotiation
c.isInRenegotiation.Store(true)

for c.negotiationNeeded.Load() {
go func() {
defer c.isInRenegotiation.Store(false)
timout, cancel := context.WithTimeout(c.context, 100*time.Millisecond)
defer cancel()

<-timout.Done()

// mark negotiation is not needed after this done, so it will out of the loop
c.negotiationNeeded.Store(false)

Expand Down Expand Up @@ -576,9 +580,9 @@ func (c *Client) renegotiateQueuOp() {
return
}
}
}

c.isInRenegotiation.Store(false)
}()

}

func (c *Client) allowRemoteRenegotiation() {
Expand Down Expand Up @@ -627,6 +631,15 @@ func (c *Client) setClientTrack(t ITrack) iClientTrack {
return nil
}

t.Client().OnLeft(func() {
if err := c.peerConnection.PC().RemoveTrack(transc.Sender()); err != nil {
glog.Error("client: error remove track ", err)
return
}

c.renegotiate()
})

t.OnEnded(func() {
if err := c.peerConnection.PC().RemoveTrack(transc.Sender()); err != nil {
glog.Error("client: error remove track ", err)
Expand Down Expand Up @@ -731,6 +744,8 @@ func (c *Client) afterClosed() {

c.cancel()

c.onLeft()

c.sfu.onAfterClientStopped(c)
}

Expand All @@ -748,9 +763,7 @@ func (c *Client) stop() error {

func (c *Client) AddICECandidate(candidate webrtc.ICECandidateInit) error {
if c.peerConnection.PC().RemoteDescription() == nil {
// c.mu.Lock()
c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, candidate)
// c.mu.Unlock()
} else {
if err := c.peerConnection.PC().AddICECandidate(candidate); err != nil {
glog.Error("client: error add ice candidate ", err)
Expand Down Expand Up @@ -812,6 +825,12 @@ func (c *Client) OnLeft(callback func()) {
c.onLeftCallbacks = append(c.onLeftCallbacks, callback)
}

func (c *Client) onLeft() {
for _, callback := range c.onLeftCallbacks {
callback()
}
}

func (c *Client) OnTrackRemoved(callback func(sourceType string, track *webrtc.TrackLocalStaticRTP)) {
c.onTrackRemovedCallbacks = append(c.onTrackRemovedCallbacks, callback)
}
Expand Down
9 changes: 8 additions & 1 deletion examples/http-websocket/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@
streams: [stream],
sendEncodings: [
{
maxBitrate: 1200*1000,
maxBitrate: 1000*1000,
scalabilityMode: 'L3T2'
},

Expand All @@ -380,6 +380,13 @@
}
}

// push the rest of the codecs
for(let i = 0; i < codecs.length; i++){
if(codecs[i].mimeType != "video/VP9"){
vp9_codecs.push(codecs[i]);
}
}

// currently not all browsers support setCodecPreferences
if(tcvr.setCodecPreferences != undefined){
tcvr.setCodecPreferences(vp9_codecs);
Expand Down
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@ require (
github.com/pion/interceptor v0.1.18
github.com/pion/rtcp v1.2.10
github.com/pion/webrtc/v3 v3.2.19
github.com/speps/go-hashids v2.0.0+incompatible
github.com/stretchr/testify v1.8.4
)

require (
github.com/golang/glog v1.1.2
github.com/jaevor/go-nanoid v1.3.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/text v0.12.0
)

require (
github.com/jaevor/go-nanoid v1.3.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
require gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect

require (
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ github.com/pion/webrtc/v3 v3.2.19/go.mod h1:vVURQTBOG5BpWKOJz3nlr23NfTDeyKVmubRN
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
github.com/speps/go-hashids v2.0.0+incompatible h1:kSfxGfESueJKTx0mpER9Y/1XHl+FVQjtCqRyYcviFbw=
github.com/speps/go-hashids v2.0.0+incompatible/go.mod h1:P7hqPzMdnZOfyIk+xrlG1QaSMw+gCBdHKsBDnhpaZvc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
7 changes: 3 additions & 4 deletions remotetrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,13 @@ func (t *remoteTrack) sendPLI() error {
t.mu.Lock()
defer t.mu.Unlock()

maxGapSeconds := 1 * time.Second
maxGapSeconds := 250 * time.Millisecond
requestGap := time.Since(t.lastPLIRequestTime)

if time.Since(t.lastPLIRequestTime) < maxGapSeconds {
if requestGap < maxGapSeconds {
return nil
}

// glog.Info("sending PLI for track: ", t.track.ID(), " last PLI was requested ", time.Since(t.lastPLIRequestTime).Seconds(), " seconds ago")

t.lastPLIRequestTime = time.Now()

return t.client.peerConnection.PC().WriteRTCP([]rtcp.Packet{
Expand Down
8 changes: 0 additions & 8 deletions scalableclienttrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ func (t *scaleabletClientTrack) push(p *rtp.Packet, _ QualityLevel) {
if vp9Packet.B && t.sid != qualityPreset.GetSID() {
if vp9Packet.SID == qualityPreset.GetSID() && !vp9Packet.P {
t.sid = qualityPreset.GetSID()
} else {
t.RequestPLI()
}
}

Expand All @@ -188,15 +186,9 @@ func (t *scaleabletClientTrack) push(p *rtp.Packet, _ QualityLevel) {

if vp9Packet.E && t.sid == vp9Packet.SID {
p.Marker = true
// if t.client.isDebug {
// glog.Info("scalabletrack: end of frame mark as final frame, sid: ", vp9Packet.SID)
// }
}

if vp9Packet.TID == 0 && vp9Packet.SID == 0 {
// if p.Marker && t.client.isDebug {
// glog.Info("scalabletrack: marker is set, sid: ", vp9Packet.SID)
// }
t.send(p)
return
}
Expand Down

0 comments on commit 689c467

Please sign in to comment.