Skip to content

Commit

Permalink
fix process nack
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 31, 2024
1 parent 92382d7 commit 727e703
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ where
);
if !frames.is_empty() {
trace!(
"[{}] send frames to {}, seq_num: {}, reliable: {}, first data byte: 0x{:02x}, data size: {}B, actual size: {}B",
"[{}] send frames to {}, seq_num: {}, reliable: {}, first data byte: 0x{:02x}, data size: {}, actual size: {}",
this.role,
this.peer,
*this.seq_num_write_index,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#![feature(context_ext)]
#![feature(local_waker)]
#![feature(option_get_or_insert_default)]
#![feature(binary_heap_drain_sorted)]

/// Protocol codec
mod codec;
Expand Down
40 changes: 19 additions & 21 deletions src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,6 @@ pub(crate) struct TransferLink {
peer: Peer,
}

/// Pop priority queue while holding the lock
struct BatchRecv<'a, T> {
guard: parking_lot::MutexGuard<'a, BinaryHeap<Reverse<T>>>,
}

impl<'a, T> BatchRecv<'a, T> {
fn new(guard: parking_lot::MutexGuard<'a, BinaryHeap<Reverse<T>>>) -> Self {
Self { guard }
}
}

impl<'a, T: Ord> Iterator for BatchRecv<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.guard.pop().map(|v| v.0)
}
}

impl TransferLink {
pub(crate) fn new_arc(role: Role, peer: Peer) -> SharedLink {
// avoiding ack flood, the overwhelming ack will be dropped and new ack will be displaced
Expand Down Expand Up @@ -153,11 +134,28 @@ impl TransferLink {
}

pub(crate) fn process_outgoing_ack(&self, mtu: u16) -> Option<AckOrNack> {
AckOrNack::extend_from(BatchRecv::new(self.outgoing_ack.lock()), mtu)
AckOrNack::extend_from(self.outgoing_ack.lock().drain_sorted().map(|v| v.0), mtu)
}

pub(crate) fn process_outgoing_nack(&self, mtu: u16) -> Option<AckOrNack> {
AckOrNack::extend_from(self.outgoing_nack.lock().iter().copied(), mtu)
struct BatchRecv<'a> {
guard: parking_lot::MutexGuard<'a, BTreeSet<u24>>,
}

impl<'a> Iterator for BatchRecv<'a> {
type Item = u24;

fn next(&mut self) -> Option<Self::Item> {
self.guard.pop_first()
}
}

AckOrNack::extend_from(
BatchRecv {
guard: self.outgoing_nack.lock(),
},
mtu,
)
}

pub(crate) fn process_unconnected(&self) -> impl Iterator<Item = unconnected::Packet> + '_ {
Expand Down

0 comments on commit 727e703

Please sign in to comment.