Skip to content

Commit

Permalink
congestion: implemented dynamic packet pacing based on network condit…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
cooldogedev committed Oct 11, 2024
1 parent bb265e9 commit 9658cea
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 61 deletions.
20 changes: 10 additions & 10 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newConnection(conn *internal.Conn, connectionID protocol.ConnectionID, pare
ctx: ctx,
cancelFunc: cancelFunc,
cc: congestion.NewCubic(),
pacer: congestion.NewPacer(internal.DefaultRTT, 30),
pacer: congestion.NewPacer(),
ack: newAckQueue(),
receiveQueue: newReceiveQueue(),
retransmission: newRetransmissionQueue(),
Expand Down Expand Up @@ -187,11 +187,11 @@ func (c *connection) receive(sequenceID uint32, frames []frame.Frame) (err error
func (c *connection) handle(fr frame.Frame) (err error) {
switch fr := fr.(type) {
case *frame.Acknowledgement:
var ackBytes float64
var ackBytes uint64
for i, r := range fr.Ranges {
for j := r[0]; j <= r[1]; j++ {
if entry := c.retransmission.remove(j); entry != nil {
ackBytes += float64(len(entry.payload))
ackBytes += uint64(len(entry.payload))
if i == len(fr.Ranges)-1 && j == r[1] {
c.rtt.Add(time.Duration(time.Since(entry.timestamp).Nanoseconds()-fr.Delay) * time.Nanosecond)
}
Expand Down Expand Up @@ -260,18 +260,18 @@ func (c *connection) acknowledge() (err error) {
}

func (c *connection) transmit() (err error) {
rtt := c.rtt.RTT()
sequenceID, pk := c.sendQueue.shift()
cwnd := c.cc.Cwnd()
inFlight := c.cc.InFlight()
c.pacer.SetInterval(time.Duration((float64(c.rtt.RTT()) / cwnd) * (cwnd - inFlight)))
if d := c.pacer.Consume(len(pk)); d > 0 {
time.Sleep(d)
for !c.cc.CanSend(uint64(len(pk))) {
time.Sleep(rtt)
}

for !c.cc.OnSend(float64(len(pk))) {
time.Sleep(c.rtt.RTT())
if d := c.pacer.Delay(rtt, uint64(len(pk)), c.cc.Cwnd()); d > 0 {
time.Sleep(d)
}

c.cc.OnSend(uint64(len(pk)))
c.pacer.OnSend(uint64(len(pk)))
c.retransmission.add(sequenceID, pk)
if _, err := c.conn.Write(pk); err != nil {
return err
Expand Down
46 changes: 21 additions & 25 deletions internal/congestion/cubic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ const (
minWindow = protocol.MaxPacketSize * 2
maxWindow = protocol.MaxPacketSize * 10000

cubicC = 0.7
cubicBeta = 0.4
cubicC = 0.7
)

type Cubic struct {
inFlight float64
cwnd float64
wMax float64
ssthresh float64
inFlight uint64
cwnd uint64
wMax uint64
ssthresh uint64
k float64
epochStart time.Time
mu sync.RWMutex
Expand All @@ -33,21 +33,23 @@ func NewCubic() *Cubic {
return &Cubic{
cwnd: initialWindow,
wMax: initialWindow,
ssthresh: math.Inf(1),
ssthresh: math.MaxUint64,
}
}

func (c *Cubic) OnSend(bytes float64) bool {
func (c *Cubic) CanSend(bytes uint64) bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cwnd-c.inFlight >= bytes
}

func (c *Cubic) OnSend(bytes uint64) {
c.mu.Lock()
defer c.mu.Unlock()
if c.cwnd-c.inFlight >= bytes {
c.inFlight += bytes
return true
}
return false
c.inFlight += bytes
c.mu.Unlock()
}

func (c *Cubic) OnAck(bytes float64) {
func (c *Cubic) OnAck(bytes uint64) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -63,11 +65,11 @@ func (c *Cubic) OnAck(bytes float64) {

if c.epochStart.IsZero() {
c.epochStart = time.Now()
c.k = math.Cbrt(c.wMax * (1.0 - cubicBeta) / cubicC)
c.k = math.Cbrt(float64(c.wMax) * (1.0 - cubicBeta) / cubicC)
}

elapsed := time.Since(c.epochStart).Seconds()
cwnd := cubicC*math.Pow(elapsed-c.k, 3) + c.wMax
cwnd := uint64(cubicC*math.Pow(elapsed-c.k, 3) + float64(c.wMax))
if cwnd > c.cwnd {
c.cwnd = min(cwnd, maxWindow)
}
Expand All @@ -77,25 +79,19 @@ func (c *Cubic) OnLoss() {
c.mu.Lock()
c.inFlight = 0
c.wMax = c.cwnd
c.cwnd = max(c.cwnd*cubicBeta, minWindow)
c.cwnd = max(uint64(float64(c.cwnd)*cubicBeta), minWindow)
c.ssthresh = c.cwnd
c.epochStart = time.Time{}
c.k = math.Cbrt(c.wMax * (1.0 - cubicBeta) / cubicC)
c.k = math.Cbrt(float64(c.wMax) * (1.0 - cubicBeta) / cubicC)
c.mu.Unlock()
}

func (c *Cubic) Cwnd() float64 {
func (c *Cubic) Cwnd() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cwnd
}

func (c *Cubic) InFlight() float64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.inFlight
}

func (c *Cubic) shouldIncreaseWindow() bool {
if c.inFlight >= c.cwnd {
return true
Expand Down
78 changes: 52 additions & 26 deletions internal/congestion/pacer.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,69 @@
package congestion

import "time"
import (
"time"

const bytesPerToken = 512
"github.com/cooldogedev/spectral/internal/protocol"
)

const (
minPacingDelay = time.Millisecond * 2
minBurstSize = 2
maxBurstSize = 10
)

type Pacer struct {
capacity int
tokens int
interval time.Duration
lastRefill time.Time
capacity uint64
tokens uint64
window uint64
rate float64
rtt time.Duration
prev time.Time
}

func NewPacer(interval time.Duration, capacity int) *Pacer {
return &Pacer{
capacity: capacity,
tokens: capacity,
interval: interval,
lastRefill: time.Now(),
}
func NewPacer() *Pacer {
return &Pacer{prev: time.Now()}
}

func (p *Pacer) Consume(bytes int) time.Duration {
now := time.Now()
elapsed := now.Sub(p.lastRefill)
if elapsed >= p.interval {
p.tokens = min(p.tokens+int(elapsed/p.interval), p.capacity)
p.lastRefill = now
func (p *Pacer) Delay(rtt time.Duration, bytes uint64, window uint64) time.Duration {
if window != p.window || rtt != p.rtt {
p.capacity = optimalCapacity(rtt, window)
p.tokens = min(p.tokens, p.capacity)
p.window = window
p.rate = float64(window) / max(rtt.Seconds(), 1)
p.rtt = rtt
}

tokensNeeded := (bytes + bytesPerToken - 1) / bytesPerToken
if p.tokens >= tokensNeeded {
p.tokens -= tokensNeeded
if p.tokens >= bytes {
return 0
}
return time.Duration(tokensNeeded-p.tokens) * p.interval

now := time.Now()
newTokens := p.rate * now.Sub(p.prev).Seconds()
p.tokens = min(p.tokens+uint64(newTokens), p.capacity)
p.prev = now
if p.tokens >= bytes {
return 0
}
delay := time.Duration(float64(bytes-p.tokens) / p.rate * float64(time.Second))
return max(delay, minPacingDelay)
}

func (p *Pacer) OnSend(bytes uint64) {
p.tokens = max(p.tokens-bytes, 0)
}

func optimalCapacity(rtt time.Duration, window uint64) uint64 {
rttNs := max(rtt.Nanoseconds(), 1)
capacity := (window * uint64(minPacingDelay.Nanoseconds())) / uint64(rttNs)
return clamp(capacity, minBurstSize*protocol.MaxPacketSize, maxBurstSize*protocol.MaxPacketSize)
}

func (p *Pacer) SetInterval(interval time.Duration) {
if interval > 0 {
p.interval = interval
func clamp(value, min, max uint64) uint64 {
if value < min {
return min
} else if value > max {
return max
}
return value
}

0 comments on commit 9658cea

Please sign in to comment.