Skip to content

Commit

Permalink
pass by value for packet
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohan Totting committed Nov 22, 2023
1 parent fbad034 commit 28f600b
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 38 deletions.
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,9 @@ func NewClient(s *SFU, id string, name string, peerConnectionConfig webrtc.Confi
simulcast.AddRemoteTrack(remoteTrack, client.statsGetter, onStatsUpdated)
}

// // only process track when the highest quality is available
// // only process track when the lowest quality is available
// simulcast.mu.Lock()
// isHighAvailable := simulcast.remoteTrackHigh != nil
// isLowAvailable := simulcast.remoteTrackLow != nil
// simulcast.mu.Unlock()

if !track.IsProcessed() {
Expand Down
6 changes: 3 additions & 3 deletions clienttrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type iClientTrack interface {
push(rtp *rtp.Packet, quality QualityLevel)
push(rtp rtp.Packet, quality QualityLevel)
ID() string
Kind() webrtc.RTPCodecType
LocalTrack() *webrtc.TrackLocalStaticRTP
Expand Down Expand Up @@ -50,7 +50,7 @@ func (t *clientTrack) Kind() webrtc.RTPCodecType {
return t.remoteTrack.track.Kind()
}

func (t *clientTrack) push(rtp *rtp.Packet, quality QualityLevel) {
func (t *clientTrack) push(rtp rtp.Packet, quality QualityLevel) {
if t.client.peerConnection.PC().ConnectionState() != webrtc.PeerConnectionStateConnected {
return
}
Expand All @@ -59,7 +59,7 @@ func (t *clientTrack) push(rtp *rtp.Packet, quality QualityLevel) {
// do something here with audio level
}

if err := t.localTrack.WriteRTP(rtp); err != nil {
if err := t.localTrack.WriteRTP(&rtp); err != nil {
glog.Error("clienttrack: error on write rtp", err)
}
}
Expand Down
5 changes: 5 additions & 0 deletions docs/extension.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Developing an extension package
An extension can add more features to the SFU without need to modify the SFU code.

## How it works
The extension can be develop by utilizing the events from SFU components. Each component has its own events.
6 changes: 3 additions & 3 deletions remotetrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type remoteTrack struct {
cancel context.CancelFunc
mu sync.Mutex
track IRemoteTrack
onRead func(*rtp.Packet)
onRead func(rtp.Packet)
onPLI func() error
bitrate *atomic.Uint32
previousBytesReceived *atomic.Uint64
Expand All @@ -30,7 +30,7 @@ type remoteTrack struct {
onStatsUpdated func(*stats.Stats)
}

func newRemoteTrack(ctx context.Context, track IRemoteTrack, pliInterval time.Duration, onPLI func() error, statsGetter stats.Getter, onStatsUpdated func(*stats.Stats), onRead func(*rtp.Packet)) *remoteTrack {
func newRemoteTrack(ctx context.Context, track IRemoteTrack, pliInterval time.Duration, onPLI func() error, statsGetter stats.Getter, onStatsUpdated func(*stats.Stats), onRead func(rtp.Packet)) *remoteTrack {
localctx, cancel := context.WithCancel(ctx)
rt := &remoteTrack{
context: localctx,
Expand Down Expand Up @@ -88,7 +88,7 @@ func (t *remoteTrack) readRTP() {
return
}

t.onRead(rtp)
t.onRead(*rtp)

go t.updateStats()
}
Expand Down
8 changes: 4 additions & 4 deletions scalableclienttrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ func (t *scaleableClientTrack) Client() *Client {
return t.client
}

func (t *scaleableClientTrack) writeRTP(p *rtp.Packet) {
func (t *scaleableClientTrack) writeRTP(p rtp.Packet) {
t.lastTimestamp = p.Timestamp
t.sequenceNumber = p.SequenceNumber

if err := t.localTrack.WriteRTP(p); err != nil {
if err := t.localTrack.WriteRTP(&p); err != nil {
glog.Error("track: error on write rtp", err)
}
}
Expand All @@ -136,7 +136,7 @@ func (t *scaleableClientTrack) isKeyframe(vp9 *codecs.VP9Packet) bool {

// this where the temporal and spatial layers are will be decided to be sent to the client or not
// compare it with the claimed quality to decide if the packet should be sent or not
func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {
func (t *scaleableClientTrack) push(p rtp.Packet, _ QualityLevel) {
var qualityPreset IQualityPreset

vp9Packet := &codecs.VP9Packet{}
Expand Down Expand Up @@ -211,7 +211,7 @@ func (t *scaleableClientTrack) push(p *rtp.Packet, _ QualityLevel) {
t.send(p)
}

func (t *scaleableClientTrack) send(p *rtp.Packet) {
func (t *scaleableClientTrack) send(p rtp.Packet) {
p.SequenceNumber = p.SequenceNumber - t.dropCounter
t.writeRTP(p)
}
Expand Down
16 changes: 9 additions & 7 deletions simulcastclienttrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,32 @@ func (t *simulcastClientTrack) Client() *Client {
return t.client
}

func (t *simulcastClientTrack) isFirstKeyframePacket(p *rtp.Packet) bool {
func (t *simulcastClientTrack) isFirstKeyframePacket(p rtp.Packet) bool {
isKeyframe := IsKeyframe(t.mimeType, p)

return isKeyframe && t.lastTimestamp.Load() != p.Timestamp
}

func (t *simulcastClientTrack) send(p *rtp.Packet, quality QualityLevel, lastQuality QualityLevel) {
func (t *simulcastClientTrack) send(p rtp.Packet, quality QualityLevel, lastQuality QualityLevel) {
t.lastTimestamp.Store(p.Timestamp)

if lastQuality != quality {
t.lastQuality.Store(uint32(quality))
}

t.rewritePacket(p, quality)
p = t.rewritePacket(p, quality)

t.writeRTP(p)

}

func (t *simulcastClientTrack) writeRTP(p *rtp.Packet) {
if err := t.localTrack.WriteRTP(p); err != nil {
func (t *simulcastClientTrack) writeRTP(p rtp.Packet) {
if err := t.localTrack.WriteRTP(&p); err != nil {
glog.Error("track: error on write rtp", err)
}
}

func (t *simulcastClientTrack) push(p *rtp.Packet, quality QualityLevel) {
func (t *simulcastClientTrack) push(p rtp.Packet, quality QualityLevel) {
var trackQuality QualityLevel

lastQuality := t.LastQuality()
Expand Down Expand Up @@ -245,7 +245,7 @@ func (t *simulcastClientTrack) IsScaleable() bool {
return false
}

func (t *simulcastClientTrack) rewritePacket(p *rtp.Packet, quality QualityLevel) {
func (t *simulcastClientTrack) rewritePacket(p rtp.Packet, quality QualityLevel) rtp.Packet {
t.remoteTrack.mu.Lock()
defer t.remoteTrack.mu.Unlock()
// make sure the timestamp and sequence number is consistent from the previous packet even it is not the same track
Expand All @@ -265,6 +265,8 @@ func (t *simulcastClientTrack) rewritePacket(p *rtp.Packet, quality QualityLevel

t.sequenceNumber.Add(uint32(sequenceDelta))
p.SequenceNumber = uint16(t.sequenceNumber.Load())

return p
}

func (t *simulcastClientTrack) RequestPLI() {
Expand Down
33 changes: 16 additions & 17 deletions track.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func newTrack(ctx context.Context, clientID string, trackRemote IRemoteTrack, pl
onEndedCallbacks: make([]func(), 0),
}

onRead := func(p *rtp.Packet) {
onRead := func(p rtp.Packet) {
// do
tracks := t.base.clientTracks.GetTracks()
for _, track := range tracks {
Expand Down Expand Up @@ -278,13 +278,12 @@ func (t *Track) OnRead(callback func(rtp.Packet, QualityLevel)) {
t.onReadCallbacks = append(t.onReadCallbacks, callback)
}

func (t *Track) onRead(p *rtp.Packet, quality QualityLevel) {
func (t *Track) onRead(p rtp.Packet, quality QualityLevel) {
// t.mu.Lock()
// defer t.mu.Unlock()

for _, callback := range t.onReadCallbacks {
pClone := *p
callback(pClone, quality)
callback(p, quality)
}
}

Expand Down Expand Up @@ -427,7 +426,7 @@ func (t *SimulcastTrack) AddRemoteTrack(track IRemoteTrack, stats stats.Getter,

quality := RIDToQuality(track.RID())

onRead := func(p *rtp.Packet) {
onRead := func(p rtp.Packet) {
// set the base timestamp for the track if it is not set yet
if t.baseTS == 0 {
t.baseTS = p.Timestamp
Expand Down Expand Up @@ -466,43 +465,48 @@ func (t *SimulcastTrack) AddRemoteTrack(track IRemoteTrack, stats stats.Getter,
t.onRead(p, quality)
}

t.mu.Lock()

remoteTrack = newRemoteTrack(t.context, track, t.pliInterval, t.onPLI, stats, onStatsUpdated, onRead)

switch quality {
case QualityHigh:
t.mu.Lock()
t.remoteTrackHigh = remoteTrack
t.mu.Unlock()

remoteTrack.OnEnded(func() {
t.mu.Lock()
defer t.mu.Unlock()
t.remoteTrackHigh = nil
t.mu.Unlock()

if t.remoteTrackHigh == nil && t.remoteTrackMid == nil && t.remoteTrackLow == nil {
t.onEnded()
}
})

case QualityMid:
t.mu.Lock()
t.remoteTrackMid = remoteTrack
t.mu.Unlock()

remoteTrack.OnEnded(func() {
t.mu.Lock()
defer t.mu.Unlock()
t.remoteTrackMid = nil
t.mu.Unlock()

if t.remoteTrackHigh == nil && t.remoteTrackMid == nil && t.remoteTrackLow == nil {
t.onEnded()
}
})

case QualityLow:
t.mu.Lock()
t.remoteTrackLow = remoteTrack
t.mu.Unlock()

remoteTrack.OnEnded(func() {
t.mu.Lock()
defer t.mu.Unlock()
t.remoteTrackLow = nil
t.mu.Unlock()

if t.remoteTrackHigh == nil && t.remoteTrackMid == nil && t.remoteTrackLow == nil {
t.onEnded()
Expand All @@ -518,8 +522,6 @@ func (t *SimulcastTrack) AddRemoteTrack(track IRemoteTrack, stats stats.Getter,
t.onTrackComplete()
}

t.mu.Unlock()

t.onRemoteTrackAddedCallbacks(remoteTrack)

return remoteTrack
Expand Down Expand Up @@ -751,12 +753,9 @@ func (t *SimulcastTrack) OnRead(callback func(rtp.Packet, QualityLevel)) {
t.onReadCallbacks = append(t.onReadCallbacks, callback)
}

func (t *SimulcastTrack) onRead(p *rtp.Packet, quality QualityLevel) {
t.mu.Lock()
defer t.mu.Unlock()

func (t *SimulcastTrack) onRead(p rtp.Packet, quality QualityLevel) {
for _, callback := range t.onReadCallbacks {
callback(*p, quality)
callback(p, quality)
}
}

Expand Down
4 changes: 2 additions & 2 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func RegisterSimulcastHeaderExtensions(m *webrtc.MediaEngine, codecType webrtc.R
}
}

func IsKeyframe(codec string, packet *rtp.Packet) bool {
func IsKeyframe(codec string, packet rtp.Packet) bool {
isIt1, isIt2 := Keyframe(codec, packet)
return isIt1 && isIt2
}
Expand All @@ -119,7 +119,7 @@ func IsKeyframe(codec string, packet *rtp.Packet) bool {
// It returns (true, true) if that is the case, (false, true) if that is
// definitely not the case, and (false, false) if the information cannot
// be determined.
func Keyframe(codec string, packet *rtp.Packet) (bool, bool) {
func Keyframe(codec string, packet rtp.Packet) (bool, bool) {
if strings.EqualFold(codec, "video/vp8") {
var vp8 codecs.VP8Packet
_, err := vp8.Unmarshal(packet.Payload)
Expand Down

0 comments on commit 28f600b

Please sign in to comment.