From e91efce4b18cb55e4663f65da4b36d3a718513a6 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Sun, 15 Dec 2024 15:41:34 +0800 Subject: [PATCH] chore: faster hash feature Signed-off-by: iGxnon --- .cargo/config.toml | 4 -- Cargo.toml | 4 +- README.md | 3 +- benches/micro.rs | 5 +- examples/tracing.rs | 2 +- src/codec/decoder/ordered.rs | 6 +-- src/guard.rs | 24 ++++++--- src/lib.rs | 6 +++ src/link.rs | 8 +-- src/packet/connected/ack.rs | 6 ++- src/packet/connected/frame_set.rs | 20 ++++--- src/packet/connected/mod.rs | 3 +- src/packet/mod.rs | 3 +- src/packet/unconnected.rs | 3 +- src/server/handler/offline.rs | 7 ++- src/server/incoming/tokio.rs | 5 +- src/tests.rs | 75 +++++++++++++++++++++++++- src/utils/hash.rs | 87 +++++++++++++++++++++++++++++++ src/utils/mod.rs | 2 + src/utils/reactor.rs | 39 ++++++-------- src/utils/tests.rs | 2 +- 21 files changed, 242 insertions(+), 72 deletions(-) create mode 100644 src/utils/hash.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index 02b3a80..73ac2e5 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,6 +1,2 @@ [target.'cfg(all())'] rustflags = ["--cfg", "tokio_unstable"] - -[alias] -x = "run --package xtask --" -xtask = "run --package xtask --" diff --git a/Cargo.toml b/Cargo.toml index 58abe44..f8d1c07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ lru = "0.12" parking_lot = "0.12" pin-project-lite = "0.2" rand = "0.8" +rustc-hash = { version = "2.1.0", optional = true } serde = { version = "1", features = ["derive"], optional = true } thiserror = "1" tokio = { version = "1", features = ["net", "rt"], optional = true } @@ -39,12 +40,13 @@ tokio = { version = "1", features = ["full"] } default = ["tokio-rt", "serde"] tokio-rt = ["dep:tokio"] serde = ["dep:serde"] +rustc-hash = ["dep:rustc-hash"] # enable faster hash if the network is trusted micro-bench = [] # for benchmark, do not enable it in normal use [[bench]] name = "micro" harness = false -required-features = ["micro-bench"] +required-features = ["micro-bench", "rustc-hash"] [profile.bench] opt-level = 3 diff --git a/README.md b/README.md index de7d22a..5c1a4d9 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Yet another project rewritten in Rust. - Support `Unreliable`, `Reliable` and `ReliableOrdered` packets. - Support multiple order channels. - Support `ACK`/`NACK` mechanism. +- Support message priority for unordered reliability. - Full tracing: - You can track a packet's span during deduplication, fragmentation, ... @@ -84,6 +85,6 @@ let config = client::Config::new() ... let (_, writer) = socket.connect_to(, config).await?; tokio::pin!(writer); -writer.send(Message::new(Reliability::Reliable, 0, Bytes::from_static(b"Hello, Anyone there?"))) +writer.send(Message::new(Bytes::from_static(b"Hello, Anyone there?"))) .await?; ``` diff --git a/benches/micro.rs b/benches/micro.rs index ef8b8b9..4cd2f10 100644 --- a/benches/micro.rs +++ b/benches/micro.rs @@ -20,9 +20,8 @@ pub fn codec_benchmark(c: &mut Criterion) { cnt: usize, throughput: impl Fn(&BenchOpts) -> Throughput, ) { - let datagrams = repeat(Bytes::from_static(datagram)).take(cnt); let opts = micro_bench::codec::BenchOpts { - datagrams: black_box(datagrams.collect()), + datagrams: repeat(Bytes::from_static(datagram)).take(cnt).collect(), seed: 114514, dup_ratio: 0., shuffle_ratio: 0., @@ -33,7 +32,7 @@ pub fn codec_benchmark(c: &mut Criterion) { format!("decode_cnt-{cnt}_size-{}", datagram.len()), |bencher| { bencher.to_async(FuturesExecutor).iter_batched( - || opts.gen_inputs(), + || black_box(opts.gen_inputs()), micro_bench::codec::run_bench, BatchSize::SmallInput, ); diff --git a/examples/tracing.rs b/examples/tracing.rs index a04ef8f..149220e 100644 --- a/examples/tracing.rs +++ b/examples/tracing.rs @@ -103,7 +103,7 @@ fn display(spans: Vec) { .map(|span| (span.span_id, span.clone())) .collect(); let adjacency_lists: HashMap>> = spans.iter().fold( - std::collections::HashMap::new(), + HashMap::new(), |mut map, SpanRecord { trace_id, diff --git a/src/codec/decoder/ordered.rs b/src/codec/decoder/ordered.rs index 80d4d97..fff514c 100644 --- a/src/codec/decoder/ordered.rs +++ b/src/codec/decoder/ordered.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::pin::Pin; use std::task::{ready, Context, Poll}; @@ -10,8 +9,7 @@ use pin_project_lite::pin_project; use crate::errors::CodecError; use crate::packet::connected::{self, Frame, FrameSet}; use crate::utils::u24; - -const INITIAL_ORDERING_MAP_CAP: usize = 64; +use crate::HashMap; struct Ordering { map: HashMap>, @@ -21,7 +19,7 @@ struct Ordering { impl Default for Ordering { fn default() -> Self { Self { - map: HashMap::with_capacity(INITIAL_ORDERING_MAP_CAP), + map: HashMap::default(), read: 0.into(), } } diff --git a/src/guard.rs b/src/guard.rs index c9ff700..710e601 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -1,4 +1,4 @@ -use std::collections::{BinaryHeap, HashMap, VecDeque}; +use std::collections::{BinaryHeap, VecDeque}; use std::net::SocketAddr; use std::pin::Pin; use std::task::{ready, Context, Poll}; @@ -14,16 +14,24 @@ use crate::link::SharedLink; use crate::opts::FlushStrategy; use crate::packet::connected::{self, AckOrNack, Frame, FrameSet, Frames, FramesRef, Record}; use crate::packet::{Packet, FRAME_SET_HEADER_SIZE}; -use crate::utils::{u24, ConnId, Reactor}; -use crate::{Peer, Priority, Role}; +use crate::utils::{combine_hashes, u24, Reactor}; +use crate::{HashMap, Peer, Priority, Role}; // A frame with penalty -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug)] struct PenaltyFrame { penalty: u8, frame: Frame, } +impl PartialEq for PenaltyFrame { + fn eq(&self, other: &Self) -> bool { + self.penalty == other.penalty + } +} + +impl Eq for PenaltyFrame {} + impl PartialOrd for PenaltyFrame { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) @@ -431,7 +439,7 @@ struct ResendMap { impl ResendMap { fn new(role: Role, peer: Peer, estimator: Box) -> Self { Self { - map: HashMap::new(), + map: HashMap::default(), role, peer, last_record_expired_at: Instant::now(), @@ -593,15 +601,15 @@ impl ResendMap { } else { return Poll::Ready(()); } - let c_id = ConnId::new(self.role.guid(), self.peer.guid); + let key = combine_hashes(self.role.guid(), self.peer.guid); trace!( - "[{}] wait on {c_id:?} for resend seq_num {} to {} within {:?}", + "[{}] wait on timer {key} for resend seq_num {} to {} within {:?}", self.role, seq_num, self.peer, expired_at - now ); - Reactor::get().insert_timer(c_id, expired_at, cx.waker()); + Reactor::get().insert_timer(key, expired_at, cx.waker()); Poll::Pending } } diff --git a/src/lib.rs b/src/lib.rs index 154f3c9..495bed4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,6 +51,12 @@ pub mod micro_bench { } } +#[cfg(feature = "rustc-hash")] +pub type HashMap = std::collections::HashMap; + +#[cfg(not(feature = "rustc-hash"))] +pub type HashMap = std::collections::HashMap; + /// Unit tests #[cfg(test)] mod tests; diff --git a/src/link.rs b/src/link.rs index b9e4358..7e0f52d 100644 --- a/src/link.rs +++ b/src/link.rs @@ -11,7 +11,7 @@ use log::{debug, warn}; use crate::packet::connected::{self, AckOrNack, FrameBody, FrameSet, FramesMut}; use crate::packet::unconnected; -use crate::utils::{u24, ConnId, Reactor}; +use crate::utils::{combine_hashes, u24, Reactor}; use crate::{Peer, Role}; /// Shared link between stream and sink @@ -92,15 +92,15 @@ impl TransferLink { } // wake up after receiving an ack if self.should_waking() { - let c_id = ConnId::new(self.role.guid(), self.peer.guid); + let key = combine_hashes(self.role.guid(), self.peer.guid); let mut cnt = 0; - for waker in Reactor::get().cancel_all_timers(c_id) { + for waker in Reactor::get().cancel_all_timers(key) { // safe to panic waker.wake(); cnt += 1; } debug!( - "[{}] wake up {cnt} wakers after receives ack on connection: {c_id:?}", + "[{}] wake up {cnt} wakers after receives ack on timer {key}", self.role ); } diff --git a/src/packet/connected/ack.rs b/src/packet/connected/ack.rs index 4c64dfc..53d66af 100644 --- a/src/packet/connected/ack.rs +++ b/src/packet/connected/ack.rs @@ -4,7 +4,8 @@ use crate::errors::CodecError; use crate::packet::read_buf; use crate::utils::{u24, BufExt, BufMutExt}; -#[derive(PartialEq, Clone)] +#[derive(Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) struct AckOrNack { pub(crate) records: Vec, } @@ -123,7 +124,8 @@ impl AckOrNack { const RECORD_RANGE: u8 = 0; const RECORD_SINGLE: u8 = 1; -#[derive(PartialEq, Clone)] +#[derive(Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) enum Record { Range(u24, u24), Single(u24), diff --git a/src/packet/connected/frame_set.rs b/src/packet/connected/frame_set.rs index d51b0d4..0afc525 100644 --- a/src/packet/connected/frame_set.rs +++ b/src/packet/connected/frame_set.rs @@ -19,7 +19,8 @@ pub(crate) type FramesMut = Vec>; pub(crate) type FrameMut = Frame; -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) struct FrameSet { pub(crate) seq_num: u24, pub(crate) set: S, @@ -51,7 +52,8 @@ impl<'a> FrameSet> { } } -#[derive(PartialEq, Eq, Clone)] +#[derive(Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) struct Frame { pub(crate) flags: Flags, pub(crate) reliable_frame_index: Option, @@ -185,12 +187,13 @@ impl Frame { /// Top 3 bits are reliability type, fourth bit is 1 when the frame is fragmented and part of a /// compound. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) struct Flags { raw: u8, pub(crate) reliability: Reliability, pub(crate) parted: bool, - needs_bas: bool, + // needs_bas: bool, } impl Flags { @@ -204,7 +207,6 @@ impl Flags { raw, reliability, parted, - needs_bas: true, } } @@ -225,12 +227,12 @@ impl Flags { raw, reliability: unsafe { std::mem::transmute::(r) }, parted: raw & PARTED_FLAG != 0, - needs_bas: raw & NEEDS_B_AND_AS_FLAG != 0, } } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) struct Fragment { pub(crate) parted_size: u32, pub(crate) parted_id: u16, @@ -253,7 +255,8 @@ impl Fragment { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) struct Ordered { pub(crate) frame_index: u24, pub(crate) channel: u8, @@ -277,6 +280,7 @@ impl Ordered { const MAX_SYSTEM_ADDRESSES_ENDPOINTS: usize = 20; #[derive(Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) enum FrameBody { ConnectedPing { client_timestamp: i64, diff --git a/src/packet/connected/mod.rs b/src/packet/connected/mod.rs index a8ce144..512c5ab 100644 --- a/src/packet/connected/mod.rs +++ b/src/packet/connected/mod.rs @@ -12,7 +12,8 @@ pub(crate) use frame_set::*; use super::{ACK_FLAG, CONTINUOUS_SEND_FLAG, NACK_FLAG, NEEDS_B_AND_AS_FLAG, VALID_FLAG}; // Packet when RakNet has established a connection -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) enum Packet { FrameSet(FrameSet), Ack(AckOrNack), diff --git a/src/packet/mod.rs b/src/packet/mod.rs index 9ba8478..5dfe494 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -140,7 +140,8 @@ impl PackType { } /// Raknet packet -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) enum Packet { Unconnected(unconnected::Packet), Connected(connected::Packet), diff --git a/src/packet/unconnected.rs b/src/packet/unconnected.rs index e4f6111..770455a 100644 --- a/src/packet/unconnected.rs +++ b/src/packet/unconnected.rs @@ -6,7 +6,8 @@ use crate::errors::CodecError; use crate::packet::{read_buf, MagicRead, MagicWrite, PackType, SocketAddrRead, SocketAddrWrite}; /// Request sent before establishing a connection -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) enum Packet { UnconnectedPing { send_timestamp: i64, diff --git a/src/server/handler/offline.rs b/src/server/handler/offline.rs index 7b5892c..c242210 100644 --- a/src/server/handler/offline.rs +++ b/src/server/handler/offline.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::io; use std::net::SocketAddr; use std::num::NonZeroUsize; @@ -14,7 +13,7 @@ use pin_project_lite::pin_project; use crate::packet::connected::{self, FramesMut}; use crate::packet::{unconnected, Packet}; -use crate::{Peer, Role}; +use crate::{HashMap, Peer, Role}; #[derive(Debug, Clone)] pub(crate) struct Config { @@ -59,7 +58,7 @@ pin_project! { // and it will be popped out during the OpenConnectionRequest2 // or when the connection is disconnected. pending: lru::LruCache, - // A `HashMap` that caches connections + // A hashmap that caches connections // in the OpenConnectionRequest2 stage and is cleaned up on disconnection. // The `connected` map is used to check if a `Peer` has completed the connection // from the socket. @@ -85,7 +84,7 @@ where guid: config.sever_guid, }, config, - connected: HashMap::new(), + connected: HashMap::default(), state: OfflineState::Listening, read_span: None, } diff --git a/src/server/incoming/tokio.rs b/src/server/incoming/tokio.rs index 959ea67..58f99e4 100644 --- a/src/server/incoming/tokio.rs +++ b/src/server/incoming/tokio.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::io; use std::net::SocketAddr; use std::pin::Pin; @@ -24,7 +23,7 @@ use crate::server::handler::offline::OfflineHandler; use crate::server::handler::online::HandleOnline; use crate::state::{CloseOnDrop, IncomingStateManage, OutgoingStateManage}; use crate::utils::{Logged, TraceStreamExt}; -use crate::Message; +use crate::{HashMap, Message}; pin_project! { struct Incoming { @@ -55,7 +54,7 @@ impl MakeIncoming for TokioUdpSocket { ), socket, config, - router: HashMap::new(), + router: HashMap::default(), close_events: Arc::new(ConcurrentQueue::unbounded()), } } diff --git a/src/tests.rs b/src/tests.rs index 1f95511..4767c4e 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,5 +1,6 @@ use std::future::poll_fn; use std::iter::repeat; +use std::sync::{Arc, Mutex}; use std::task::ContextBuilder; use std::time::Duration; @@ -12,7 +13,7 @@ use crate::client::{self, ConnectTo}; use crate::opts::FlushStrategy; use crate::server::{self, MakeIncoming}; use crate::utils::tests::test_trace_log_setup; -use crate::Message; +use crate::{Message, Priority, Reliability}; impl From for Message { fn from(data: Bytes) -> Self { @@ -257,3 +258,75 @@ async fn test_flush_strategy_works() { Bytes::from_iter(repeat(0xfe).take(256)) ); } + +#[tokio::test(unhandled_panic = "shutdown_runtime")] +async fn test_message_priority_works() { + let _guard = test_trace_log_setup(); + + let recv = Arc::new(Mutex::new(Vec::new())); + let recv_p = recv.clone(); + let server = async move { + let mut incoming = UdpSocket::bind("0.0.0.0:19135") + .await + .unwrap() + .make_incoming(make_server_conf()); + loop { + let recv_c = recv_p.clone(); + let (reader, sender) = incoming.next().await.unwrap(); + tokio::spawn(async move { + tokio::pin!(reader); + tokio::pin!(sender); + let mut ticker = tokio::time::interval(Duration::from_millis(5)); + loop { + tokio::select! { + Some(data) = reader.next() => { + recv_c.lock().unwrap().push(data); + } + _ = ticker.tick() => { + sender.flush().await.unwrap(); + } + }; + } + }); + } + }; + + tokio::spawn(server); + + let client = async move { + let (_, dst) = UdpSocket::bind("0.0.0.0:0") + .await + .unwrap() + .connect_to("127.0.0.1:19135", make_client_conf()) + .await + .unwrap(); + + tokio::pin!(dst); + + dst.feed( + Message::new(Bytes::from_iter(repeat(0xfe).take(256))) + .reliability(Reliability::Reliable), + ) + .await + .unwrap(); + dst.feed( + Message::new(Bytes::from_iter(repeat(0xfe).take(512))) + .priority(Priority::High(0)) + .reliability(Reliability::Reliable), + ) + .await + .unwrap(); + + dst.flush().await.unwrap(); + + while recv.lock().unwrap().len() < 2 { + tokio::time::sleep(Duration::from_millis(10)).await; + } + + assert_eq!(recv.lock().unwrap().len(), 2); + assert_eq!(recv.lock().unwrap()[0].len(), 512); + assert_eq!(recv.lock().unwrap()[1].len(), 256); + }; + + tokio::spawn(client).await.unwrap(); +} diff --git a/src/utils/hash.rs b/src/utils/hash.rs new file mode 100644 index 0000000..cf1b0ff --- /dev/null +++ b/src/utils/hash.rs @@ -0,0 +1,87 @@ +use std::hash::{BuildHasher, Hasher}; + +// Only support integer key with maximum 64 bits width. +#[derive(Debug, Clone, Copy, Default)] +pub(crate) struct NoHashBuilder; + +impl BuildHasher for NoHashBuilder { + type Hasher = NoHashHasher; + + fn build_hasher(&self) -> Self::Hasher { + NoHashHasher(0) + } +} + +#[derive(Debug, Clone, Copy)] +pub(crate) struct NoHashHasher(u64); + +impl Hasher for NoHashHasher { + fn finish(&self) -> u64 { + self.0 + } + + fn write_u8(&mut self, i: u8) { + self.0 = u64::from(i); + } + + fn write_u16(&mut self, i: u16) { + self.0 = u64::from(i); + } + + fn write_u32(&mut self, i: u32) { + self.0 = u64::from(i); + } + + fn write_u64(&mut self, i: u64) { + self.0 = i; + } + + fn write_usize(&mut self, i: usize) { + self.0 = i as u64; + } + + fn write_i8(&mut self, i: i8) { + self.0 = i as u64; + } + + fn write_i16(&mut self, i: i16) { + self.0 = i as u64; + } + + fn write_i32(&mut self, i: i32) { + self.0 = i as u64; + } + + fn write_i64(&mut self, i: i64) { + self.0 = i as u64; + } + + fn write_isize(&mut self, i: isize) { + self.0 = i as u64; + } + + fn write_u128(&mut self, _: u128) { + unimplemented!("unsupported") + } + + fn write_i128(&mut self, _: i128) { + unimplemented!("unsupported") + } + + fn write(&mut self, _: &[u8]) { + unimplemented!("unsupported") + } +} + +// From Google's city hash. +#[inline(always)] +pub(crate) fn combine_hashes(upper: u64, lower: u64) -> u64 { + const MUL: u64 = 0x9ddfea08eb382d69; + + let mut a = (lower ^ upper).wrapping_mul(MUL); + a ^= a >> 47; + let mut b = (upper ^ a).wrapping_mul(MUL); + b ^= b >> 47; + b = b.wrapping_mul(MUL); + b +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index cd909a0..4e2d866 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,11 +1,13 @@ mod bit_queue; mod fastrace; +mod hash; mod log; mod reactor; mod seq_num; pub(crate) use self::bit_queue::*; pub(crate) use self::fastrace::*; +pub(crate) use self::hash::*; pub(crate) use self::log::*; pub(crate) use self::reactor::*; pub(crate) use self::seq_num::*; diff --git a/src/utils/reactor.rs b/src/utils/reactor.rs index fafec3f..18ca815 100644 --- a/src/utils/reactor.rs +++ b/src/utils/reactor.rs @@ -1,28 +1,21 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::sync::OnceLock; use std::task::Waker; use std::time::{Duration, Instant}; use std::{mem, panic, thread}; +use super::NoHashBuilder; +use crate::HashMap; + /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting /// the timer. type Timers = BTreeMap<(Instant, usize), Waker>; -/// A distinct identifier for a connection. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub(crate) struct ConnId(u64, u64); - -impl ConnId { - pub(crate) fn new(from_guid: u64, to_guid: u64) -> Self { - ConnId(from_guid, to_guid) - } -} - /// A reactor that manages timers. pub(crate) struct Reactor { - /// Map of registered timers, distinguished by their connection id. - conn_timers: parking_lot::Mutex>, + /// Map of registered timers, distinguished by their unique timer key. + conn_timers: parking_lot::Mutex>, /// A condvar used to wake up the reactor when timers changed. cond: parking_lot::Condvar, } @@ -46,25 +39,23 @@ impl Reactor { .expect("cannot spawn timer-reactor thread"); Reactor { - conn_timers: parking_lot::Mutex::new(HashMap::new()), + conn_timers: parking_lot::Mutex::new(HashMap::default()), cond: parking_lot::Condvar::new(), } }) } - /// Insert a timer with the given `guid` and `when` to fire. - pub(crate) fn insert_timer(&self, c_id: ConnId, when: Instant, waker: &Waker) { + pub(crate) fn insert_timer(&self, key: u64, when: Instant, waker: &Waker) { let mut timers = self.conn_timers.lock(); - let timers = timers.entry(c_id).or_default(); + let timers = timers.entry(key).or_default(); timers.insert((when, timers.len()), waker.clone()); self.cond.notify_one(); } - /// Cancel all timers with the given `guid`. - pub(crate) fn cancel_all_timers(&self, c_id: ConnId) -> impl Iterator { + pub(crate) fn cancel_all_timers(&self, key: u64) -> impl Iterator { let mut timers = self.conn_timers.lock(); let res = timers - .remove(&c_id) + .remove(&key) .into_iter() .flat_map(BTreeMap::into_values); self.cond.notify_one(); @@ -121,16 +112,16 @@ mod test { let when = Instant::now() + dur; { let (waker, test) = TestWaker::pair(); - reactor.insert_timer(ConnId(1, 1), when, &waker); - assert_eq!(reactor.cancel_all_timers(ConnId(1, 1)).count(), 1); + reactor.insert_timer(1, when, &waker); + assert_eq!(reactor.cancel_all_timers(1).count(), 1); assert!(!test.woken.load(std::sync::atomic::Ordering::Relaxed)); } { let (waker, test) = TestWaker::pair(); - reactor.insert_timer(ConnId(2, 2), when, &waker); + reactor.insert_timer(2, when, &waker); std::thread::sleep(dur + Duration::from_millis(10)); - assert_eq!(reactor.cancel_all_timers(ConnId(2, 2)).count(), 0); + assert_eq!(reactor.cancel_all_timers(2).count(), 0); assert!(test.woken.load(std::sync::atomic::Ordering::Relaxed)); } } diff --git a/src/utils/tests.rs b/src/utils/tests.rs index 5350afb..92f9860 100644 --- a/src/utils/tests.rs +++ b/src/utils/tests.rs @@ -21,7 +21,7 @@ impl Drop for TestTraceLogGuard { .map(|span| (span.span_id, span.clone())) .collect(); let adjacency_lists: HashMap>> = spans.iter().fold( - std::collections::HashMap::new(), + HashMap::new(), |mut map, SpanRecord { trace_id,