Skip to content

Commit

Permalink
improve packet manager allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohan Totting committed Jul 31, 2024
1 parent 2c8179c commit bec71a9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
34 changes: 22 additions & 12 deletions pkg/rtppool/packetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,27 @@ import (

var (
errPacketReleased = errors.New("packet has been released")
errFailedToCastPacketPool = errors.New("failed to cast packet pool")
errFailedToCastHeaderPool = errors.New("failed to cast header pool")
errFailedToCastPayloadPool = errors.New("failed to cast payload pool")
)

const maxPayloadLen = 1460

type PacketManager struct {
PacketPool *sync.Pool
HeaderPool *sync.Pool
PayloadPool *sync.Pool
}

func newPacketManager() *PacketManager {
return &PacketManager{
PacketPool: &sync.Pool{
New: func() interface{} {
return &RetainablePacket{}
},
},

HeaderPool: &sync.Pool{
New: func() interface{} {
return &rtp.Header{}
Expand All @@ -45,17 +53,20 @@ func (m *PacketManager) NewPacket(header *rtp.Header, payload []byte) (*Retainab
return nil, io.ErrShortBuffer
}

p := &RetainablePacket{
onRelease: m.releasePacket,
// new packets have retain count of 1
count: 1,
addedTime: time.Now(),
var ok bool

p, ok := m.PacketPool.Get().(*RetainablePacket)
if !ok {
return nil, errFailedToCastPacketPool
}

p.onRelease = m.releasePacket
p.count = 1
p.addedTime = time.Now()

p.mu.Lock()
defer p.mu.Unlock()

var ok bool
p.header, ok = m.HeaderPool.Get().(*rtp.Header)
if !ok {
return nil, errFailedToCastHeaderPool
Expand All @@ -76,15 +87,17 @@ func (m *PacketManager) NewPacket(header *rtp.Header, payload []byte) (*Retainab
return p, nil
}

func (m *PacketManager) releasePacket(header *rtp.Header, payload *[]byte) {
func (m *PacketManager) releasePacket(header *rtp.Header, payload *[]byte, p *RetainablePacket) {
m.HeaderPool.Put(header)
if payload != nil {
m.PayloadPool.Put(payload)
}

m.PacketPool.Put(p)
}

type RetainablePacket struct {
onRelease func(*rtp.Header, *[]byte)
onRelease func(*rtp.Header, *[]byte, *RetainablePacket)
mu sync.RWMutex
count int

Expand Down Expand Up @@ -133,9 +146,6 @@ func (p *RetainablePacket) Release() {

if p.count == 0 {
// release back to pool
p.onRelease(p.header, p.buffer)
p.header = nil
p.buffer = nil
p.payload = nil
p.onRelease(p.header, p.buffer, p)
}
}
11 changes: 11 additions & 0 deletions pkg/rtppool/rtppool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ var testPacket = &rtp.Packet{
Header: rtp.Header{},
Payload: make([]byte, 1400),
}
var header = &rtp.Header{}
var payload = make([]byte, 1400)

func BenchmarkSlicePool(b *testing.B) {

Expand All @@ -22,3 +24,12 @@ func BenchmarkSlicePool(b *testing.B) {
ResetPacketPoolAllocation(p)
}
}

func BenchmarkPacketManager(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
p, _ := rtpPacketPool.PacketManager.NewPacket(header, payload)

p.Release()
}
}

0 comments on commit bec71a9

Please sign in to comment.