Skip to content

Commit

Permalink
fix late packet handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohan Totting committed Feb 29, 2024
1 parent a081da0 commit 0bd8ab7
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 108 deletions.
15 changes: 15 additions & 0 deletions clienttracksvc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,18 @@ func sendPackets(ctx context.Context, track *webrtc.TrackLocalStaticRTP) {

}
}

func TestNormalizeSequence(t *testing.T) {
seqs := []uint16{65531, 65532, 65533, 65534, 65535, 1, 2, 3, 4, 5}
dropped := uint16(3000)
expecteds := []uint16{62531, 62532, 62533, 62534, 62535, 62536, 62537, 62538, 62539, 62540}

require.Equal(t, len(seqs), len(expecteds), "length of seqs and expecteds should be the same")

for i, seq := range seqs {
expected := expecteds[i]
normalizeSeq := normalizeSequenceNumber(seq, dropped)
require.Equal(t, expected, normalizeSeq, "expected %d got %d", expected, normalizeSeq)
}

}
17 changes: 14 additions & 3 deletions packetcaches.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sfu
import (
"container/list"
"sync"

"github.com/golang/glog"
)

// buffer ring for cached packets
Expand Down Expand Up @@ -41,6 +43,7 @@ func (p *packetCaches) Push(sequence uint16, timestamp uint32, dropCounter uint1
})

if p.caches.Len() > p.size {
glog.Info("packetCaches: dropping packet", p.caches.Front().Value.(cachedPacket).sequence)
p.caches.Remove(p.caches.Front())
}
}
Expand All @@ -54,7 +57,7 @@ Loop:
packet := e.Value.(cachedPacket)
if packet.sequence == sequence {
return packet, true
} else if packet.sequence > sequence {
} else if packet.sequence < sequence {
break Loop
}
}
Expand All @@ -66,10 +69,18 @@ func (p *packetCaches) GetPacketOrBefore(sequence uint16) (cachedPacket, bool) {
p.mu.RLock()
defer p.mu.RUnlock()

for e := p.caches.Back(); e != nil; e = e.Prev() {
var e *list.Element

for e = p.caches.Back(); e != nil; e = e.Prev() {
packet := e.Value.(cachedPacket)
if packet.sequence == sequence || packet.sequence > sequence {

if packet.sequence == sequence || packet.sequence < sequence {
return packet, true
} else if packet.sequence > sequence && e.Next() != nil {
packetBefore := e.Next().Value.(cachedPacket)
if packet.sequence > packetBefore.sequence && packet.sequence-packetBefore.sequence > 30000 {
return packet, true
}
}
}

Expand Down
203 changes: 98 additions & 105 deletions packetcaches_test.go
Original file line number Diff line number Diff line change
@@ -1,107 +1,100 @@
package sfu

// import (
// "slices"
// "testing"
// )

// func TestPush(t *testing.T) {
// p := newPacketCaches(1000)
// p.push(1, 2, 3)
// if p.tail != 1 {
// t.Error("tail is not 1")
// }
// if p.head != 0 {
// t.Error("head is not 0")
// }
// if p.caches[1].sequence != 1 {
// t.Error("sequence is not 1")
// }
// if p.caches[1].timestamp != 2 {
// t.Error("timestamp is not 2")
// }
// if p.caches[1].dropCounter != 3 {
// t.Error("dropCounter is not 3")
// }
// }

// func TestOverwrite(t *testing.T) {
// p := newPacketCaches(1000)
// for i := 0; i < 1100; i++ {
// p.push(uint16(i), uint32(i), uint16(i))
// }

// }

// func TestGet(t *testing.T) {
// p := newPacketCaches(1000)
// for i := 0; i < 1100; i++ {
// p.push(uint16(i), uint32(i), uint16(i))
// }

// pkt, ok := p.getPacket(1099)
// if !ok {
// t.Error("packet not found")
// }

// if pkt.sequence != 1099 {
// t.Error("sequence is not 1099, got ", pkt.sequence)
// }

// if pkt.timestamp != 1099 {
// t.Error("timestamp is not 1099, got ", pkt.timestamp)
// }

// if pkt.dropCounter != 1099 {
// t.Error("dropCounter is not 1099, got ", pkt.dropCounter)
// }

// pkt, ok = p.getPacket(1299)
// if ok {
// t.Error("packet found, expected not found")
// }

// pkt, ok = p.getPacket(99)
// if ok {
// t.Error("packet found, expected not found")
// }
// }

// func TestGetPacketOrBefore(t *testing.T) {
// // drop 8 packets
// dropPackets := []uint16{111, 222, 333, 444, 555, 666, 777, 888}
// p := newPacketCaches(1000)
// for i := 0; i < 1100; i++ {
// if !slices.Contains(dropPackets, uint16(i)) {
// p.push(uint16(i), uint32(i), uint16(i))
// }
// }

// if p.tail != 92 {
// t.Error("tail is not 92, got ", p.tail)
// }

// pkt, ok := p.getPacket(999)
// if !ok {
// t.Error("packet not found, expected found")
// }

// if pkt.sequence != 999 {
// t.Error("sequence is not 999, got ", pkt.sequence)
// }

// pkt, ok = p.getPacket(777)
// if ok {
// t.Error("packet found, expected not found")
// }

// pkt, ok = p.getPacketOrBefore(777)
// if !ok {
// t.Error("packet not found, expected found")
// }

// if pkt.sequence != 776 {
// t.Error("sequence is not 776, got ", pkt.sequence)
// }

// }
import (
"slices"
"testing"
)

func TestPush(t *testing.T) {
p := newPacketCaches(1000)
p.Push(1, 0, 0, 0, 0)
p.Push(2, 0, 0, 0, 0)
p.Push(3, 0, 0, 0, 0)

if p.caches.Front().Value.(cachedPacket).sequence != 1 {
t.Error("front is not 1")
}
if p.caches.Back().Value.(cachedPacket).sequence != 3 {
t.Error("back is not 3")
}

}

func TestOverwrite(t *testing.T) {
p := newPacketCaches(1000)
for i := 0; i < 1100; i++ {
p.Push(uint16(i), uint32(i), uint16(i), 0, 0)
}

}

func TestGet(t *testing.T) {
p := newPacketCaches(1000)
for i := 0; i < 1100; i++ {
p.Push(uint16(i), uint32(i), uint16(i), uint8(0), uint8(0))
}

pkt, ok := p.GetPacket(1099)
if !ok {
t.Error("packet not found")
}

if pkt.sequence != 1099 {
t.Error("sequence is not 1099, got ", pkt.sequence)
}

if pkt.timestamp != 1099 {
t.Error("timestamp is not 1099, got ", pkt.timestamp)
}

if pkt.dropCounter != 1099 {
t.Error("dropCounter is not 1099, got ", pkt.dropCounter)
}

pkt, ok = p.GetPacket(1299)
if ok {
t.Error("packet found, expected not found")
}

pkt, ok = p.GetPacket(99)
if ok {
t.Error("packet found, expected not found")
}
}

func TestGetPacketOrBefore(t *testing.T) {
// drop 8 packets
dropPackets := []uint16{65111, 65222, 65333, 65444, 1, 11, 222, 333, 444}
p := newPacketCaches(1000)

for i := 65035; i < 65536; i++ {
if !slices.Contains(dropPackets, uint16(i)) {
p.Push(uint16(i), uint32(i), uint16(i), uint8(0), uint8(0))
}
}

for i := 1; i < 500; i++ {
if !slices.Contains(dropPackets, uint16(i)) {
p.Push(uint16(i), uint32(i), uint16(i), uint8(0), uint8(0))
}
}

pkt, ok := p.GetPacketOrBefore(444)
if !ok {
t.Error("packet not found, expected found")
}

if pkt.sequence != 443 {
t.Error("sequence is not 444, got ", pkt.sequence)
}

pkt, ok = p.GetPacketOrBefore(1)
if !ok {
t.Error("packet not found, expected found")
}

if pkt.sequence != 65535 {
t.Error("sequence is not 65535, got ", pkt.sequence)
}

}

0 comments on commit 0bd8ab7

Please sign in to comment.