Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ainghazal committed Feb 2, 2024
1 parent 815cf6e commit d32996b
Showing 1 changed file with 55 additions and 51 deletions.
106 changes: 55 additions & 51 deletions internal/reliabletransport/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,64 +59,68 @@ func (ws *workersState) moveDownWorker() {
// nearestDeadlineTo(now) ensures that we do not receive a time before now, and
// that increments the passed moment by an epsilon if all deadlines are expired,
// so it should be safe to reset the ticker with that timeout.
now := time.Now()
timeout := inflightSequence(sender.inFlight).nearestDeadlineTo(now)

ticker.Reset(timeout.Sub(now))
scheduledNow := inflightSequence(sender.inFlight).readyToSend(now)

if len(scheduledNow) > 0 {
// we flush everything that is ready to be sent.
for _, p := range scheduledNow {
p.ScheduleForRetransmission(now)

// append any pending ACKs
p.packet.ACKs = sender.NextPacketIDsToACK()

// log the packet
p.packet.Log(ws.logger, model.DirectionOutgoing)

select {
case ws.dataOrControlToMuxer <- p.packet:
case <-ws.workersManager.ShouldShutdown():
return
}
}
} else {
if !sender.hasPendingACKs() {
continue
}

// special case, we want to send the clientHello as soon as possible -----------------------------
// (TODO: coordinate this with hardReset)

/*
// TODO is this doing the right thing?
if sender.pendingACKsToSend.Len() == 1 && *sender.pendingACKsToSend.first() == model.PacketID(0) {
continue
}
*/

ws.logger.Debugf("Creating ACK: %d pending to ack", sender.pendingACKsToSend.Len())

ACK, err := ws.sessionManager.NewACKForPacketIDs(sender.NextPacketIDsToACK())
if err != nil {
ws.logger.Warnf("%s: cannot create ack: %v", workerName, err.Error())
}
ACK.Log(ws.logger, model.DirectionOutgoing)
select {
case ws.dataOrControlToMuxer <- ACK:
case <-ws.workersManager.ShouldShutdown():
return
}
}
ws.blockOnTryingToSend(sender, ticker)

case <-ws.workersManager.ShouldShutdown():
return
}
}
}

func (ws *workersState) blockOnTryingToSend(sender *reliableSender, ticker *time.Ticker) {
now := time.Now()
timeout := inflightSequence(sender.inFlight).nearestDeadlineTo(now)

ticker.Reset(timeout.Sub(now))
scheduledNow := inflightSequence(sender.inFlight).readyToSend(now)

if len(scheduledNow) > 0 {
// we flush everything that is ready to be sent.
for _, p := range scheduledNow {
p.ScheduleForRetransmission(now)

// append any pending ACKs
p.packet.ACKs = sender.NextPacketIDsToACK()

// log the packet
p.packet.Log(ws.logger, model.DirectionOutgoing)

select {
case ws.dataOrControlToMuxer <- p.packet:
case <-ws.workersManager.ShouldShutdown():
return
}
}
return
}
if !sender.hasPendingACKs() {
return
}

// special case, we want to send the clientHello as soon as possible -----------------------------
// (TODO: coordinate this with hardReset)

/*
// TODO is this doing the right thing?
if sender.pendingACKsToSend.Len() == 1 && *sender.pendingACKsToSend.first() == model.PacketID(0) {
continue
}
*/

ws.logger.Debugf("Creating ACK: %d pending to ack", sender.pendingACKsToSend.Len())

ACK, err := ws.sessionManager.NewACKForPacketIDs(sender.NextPacketIDsToACK())
if err != nil {
ws.logger.Warnf("moveDownWorker: tryToSend: cannot create ack: %v", err.Error())
}
ACK.Log(ws.logger, model.DirectionOutgoing)
select {
case ws.dataOrControlToMuxer <- ACK:
case <-ws.workersManager.ShouldShutdown():
return
}
}

// reliableSender keeps state about the in flight packet queue, and implements outgoingPacketHandler.
// Please use the constructor `newReliableSender()`
type reliableSender struct {
Expand Down

0 comments on commit d32996b

Please sign in to comment.