diff --git a/.vscode/settings.json b/.vscode/settings.json
index f89435241..31a66246a 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -1,4 +1,5 @@
{
+ "rust-analyzer.cargo.features": "all",
"todo-tree.filtering.excludeGlobs": [
"Cargo.lock",
"**/target"
diff --git a/README.md b/README.md
index 5253331d1..55a5b07ae 100644
--- a/README.md
+++ b/README.md
@@ -2,9 +2,10 @@
Stardust is a flexible networking crate built for Bevy, with a focus on extensibility and parallelism.
-![License](https://img.shields.io/badge/license-MIT_or_Apache_2.0-green)
-[![Bevy version](https://img.shields.io/badge/bevy-0.13-blue?color=blue)](https://bevyengine.org/)
-[![Crates.io](https://img.shields.io/crates/v/bevy_stardust)](https://crates.io/crates/bevy_stardust)
+[![License](https://img.shields.io/badge/license-MIT_or_Apache_2.0-green)](#license)
+[![Bevy version](https://img.shields.io/badge/bevy-0.14-blue?color=blue)](https://bevyengine.org/)
+[![crates.io](https://img.shields.io/crates/v/bevy_stardust)](https://crates.io/crates/bevy_stardust)
+[![docs.rs](https://img.shields.io/docsrs/bevy_stardust)](https://docs.rs/bevy_stardust/latest/bevy_stardust/)
## Why Stardust?
### ECS-integrated
@@ -24,17 +25,10 @@ You can use any transport layer you want. Use UDP, TCP, QUIC, HTTP, some homebre
You can use any replication or extra features you want. If you prefer a specific crate for replication, it's really easy to integrate it into Stardust, as long as it has some kind of API for taking in and outputting bytes.
-## Planned extensions
-The following features are planned to be created as additional crates, as part of the overall project.
-
-- Replication plugin
-- UDP, QUIC, and WebTransport plugins
-- Real time voice plugin
-
## Usage
| Bevy | Stardust |
| ---- | -------- |
-| 0.13 | 0.5 |
+| 0.14 | 0.6 |
| 0.12 | 0.2 |
| 0.11 | 0.1 |
@@ -45,18 +39,13 @@ The following features are planned to be created as additional crates, as part o
**A simple example project:**
```rust
-// This example assumes that you don't have the reflect feature flag.
-// If you do, make sure your channel types implement TypePath.
-// Additionally, spawning NetworkPeer entities is handled by transport layer plugins.
-// For the purpose of this example, we'll assume they magically appeared somehow.
-
use std::any::TypeId;
-use bevy_ecs::prelude::*;
-use bevy_app::{prelude::*, ScheduleRunnerPlugin, MainSchedulePlugin};
+use bevy::{prelude::*, app::{ScheduleRunnerPlugin, MainSchedulePlugin}};
use bevy_stardust::prelude::*;
// Channels are accessed with types in the type system.
-// Simply put, you just need to create simple types like this.
+// Any type that implements Any is usable in Stardust.
+// Simply put, you just need to create a field-less struct like this.
// You can use Rust's privacy system to control channel access.
struct MyChannel;
@@ -73,16 +62,9 @@ fn main() {
// Once you do this, it becomes visible in the ChannelRegistry.
// The ChannelRegistry is effectively a giant table of every registered channel.
app.add_channel::(ChannelConfiguration {
- // 'Reliable' messages will be detected if lost.
- reliable: ReliabilityGuarantee::Reliable,
-
- // 'Ordered' messages will be received in the same order they're sent.
- ordered: OrderingGuarantee::Ordered,
-
- // 'Fragmentable' messages will be broken up for transmission if need be.
- // This is actually just a flag to say that the messages *might* need to be fragmented.
- // Whether or not things are fragmented is up to the transport layer.
- fragmented: true,
+ // Controls the reliability and ordering of messages.
+ // Read the documentation for MessageConsistency for a full explanation.
+ consistency: MessageConsistency::ReliableOrdered,
// Higher priority messages will be sent before others.
priority: 0,
@@ -99,27 +81,31 @@ fn main() {
app.add_systems(Update, (send_words_system, read_words_system));
}
-// Messages use the Bytes type.
-// This is cheaply clonable and you can send the same message to multiple peers.
-// For this example, we create one from the bytes of a static str.
-const MESSAGE: Bytes = Bytes::from_static("Hello, world!".as_bytes());
+// Messages use the Message type, which is a wrapper around the Bytes type.
+// This is cheaply clonable and you can send the same message to multiple peers without copying.
+// Here, we simply use the from_static_str method, which is very cheap.
+const MESSAGE: Message = Message::from_static_str("Hello, world!");
// Queueing messages just requires component access.
// This means you can use query filters to achieve better parallelism.
fn send_words_system(
- registry: ChannelRegistry,
- mut query: Query<(Entity, &mut NetworkMessages), With>
+ channels: Channels,
+ mut query: Query<(Entity, &mut PeerMessages), With>
) {
// The ChannelId must be retrieved from the registry.
// These are more friendly to store since they're just numbers.
// You can cache them if you want, as long as they aren't used in different Worlds.
- let channel = registry.channel_id(TypeId::of::()).unwrap();
+ let channel = channels.id(TypeId::of::()).unwrap();
// You can also iterate in parallel, if you have a lot of things.
for (entity, mut outgoing) in query.iter_mut() {
// Bytes objects are cheaply clonable, reference counted storages.
// You can send them to as many peers as you want once created.
- outgoing.push(channel, MESSAGE);
+ outgoing.push_one(ChannelMessage {
+ channel,
+ message: MESSAGE,
+ });
+
println!("Sent a message to {entity:?}");
}
}
@@ -128,27 +114,38 @@ fn send_words_system(
// The reading queue is a different component from the sending queue.
// This means you can read and send bytes in parallel, or in different systems.
fn read_words_system(
- registry: ChannelRegistry,
- query: Query<(Entity, &NetworkMessages), With>
+ channels: Channels,
+ query: Query<(Entity, &PeerMessages), With>
) {
- let channel = registry.channel_id(TypeId::of::()).unwrap();
+ let channel = channels.id(TypeId::of::()).unwrap();
for (entity, incoming) in query.iter() {
- let messages = incoming.channel_queue(channel);
- for message in messages.iter() {
+ for message in incoming.iter_channel(channel) {
// Stardust only outputs bytes, so you need to convert to the desired type.
- // Also, in real products, don't unwrap, write checks. Never trust user data.
- let string = std::str::from_utf8(&*message).unwrap();
+ // We unwrap here for the sake of an example. In real code, you should
+ // program defensively, and handle error cases appropriately.
+ let string = message.as_str().unwrap();
println!("Received a message from {entity:?}: {string:?}");
}
}
}
```
-Available feature flags:
-- `reflect`: Adds `Reflect` to the `Channel` supertrait
-- `hashing`: Allows hashing Stardust-related data
+## Related crates
+### Existing
+The following crates are parts of the project that are out of scope for the `bevy_stardust` crate, and are distributed separately, such as transport layers.
+
+| Crate | Description |
+|------------------------|-----------------------------|
+| `bevy_stardust_extras` | A collection of misc. tools |
+
+### Planned
+The following crates are planned to be implemented as part of the overall project, but aren't done yet. They're also too significant or too different to end up in `bevy_stardust` or `bevy_stardust_extras`.
-**Please note:** The `hashing` feature flag is dependent on `gxhash`, which will not compile on targets without AES intrinsics. It's made available for local testing, but will break in production. See the [tracking issue](https://github.com/Veritius/bevy_stardust/issues/31) for more.
+| Crate | Description |
+|---------------------------|--------------------------|
+| `bevy_stardust_quic` | QUIC transport layer |
+| `bevy_stardust_voip` | Voice chat plugin |
+| `bevy_stardust_replicate` | State replication plugin |
## License
bevy_stardust is free and open source software. It's licensed under:
diff --git a/extras/Cargo.toml b/extras/Cargo.toml
new file mode 100644
index 000000000..8addce91a
--- /dev/null
+++ b/extras/Cargo.toml
@@ -0,0 +1,24 @@
+[package]
+name="bevy_stardust_extras"
+version="0.1.0"
+edition="2021"
+authors=["Veritius "]
+license="MIT OR Apache-2.0"
+description="Miscellaneous utilities for bevy_stardust"
+repository="https://github.com/veritius/bevy_stardust/"
+keywords=["bevy", "gamedev", "networking"]
+
+[dependencies.bevy]
+version = "0.14"
+default-features = false
+
+[dependencies.bevy_stardust]
+version = "0.6"
+path = "../stardust"
+
+[dependencies.octs]
+version = "0.4.0"
+optional = true
+
+[features]
+octs = ["dep:octs"]
\ No newline at end of file
diff --git a/extras/LICENSE-APACHE b/extras/LICENSE-APACHE
new file mode 120000
index 000000000..965b606f3
--- /dev/null
+++ b/extras/LICENSE-APACHE
@@ -0,0 +1 @@
+../LICENSE-APACHE
\ No newline at end of file
diff --git a/extras/LICENSE-MIT b/extras/LICENSE-MIT
new file mode 120000
index 000000000..76219eb72
--- /dev/null
+++ b/extras/LICENSE-MIT
@@ -0,0 +1 @@
+../LICENSE-MIT
\ No newline at end of file
diff --git a/extras/README.md b/extras/README.md
new file mode 100644
index 000000000..1fc27490b
--- /dev/null
+++ b/extras/README.md
@@ -0,0 +1,18 @@
+# bevy_stardust_extras
+Miscellaneous functionality that doesn't belong in `bevy_stardust`, but aren't significant enough to have its own crate. Includes various tools for testing and writing examples, as well as some tricks for encoding.
+
+| Bevy version | Stardust version | Crate version |
+|--------------|------------------|---------------|
+| `0.14.0` | `0.6.0` | `0.1.0` |
+
+## Feature flags
+- `octs` - Adds implementations for traits from the `octs` crate.
+
+## License
+bevy_stardust_extras is free and open source software. It's licensed under:
+* MIT License ([LICENSE-MIT](LICENSE-MIT) or [http://opensource.org/licenses/MIT](http://opensource.org/licenses/MIT))
+* Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or [http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0))
+
+at your option.
+
+Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
\ No newline at end of file
diff --git a/extras/src/lib.rs b/extras/src/lib.rs
new file mode 100644
index 000000000..1483dddd5
--- /dev/null
+++ b/extras/src/lib.rs
@@ -0,0 +1,5 @@
+#![doc = include_str!("../README.md")]
+#![warn(missing_docs)]
+
+pub mod link;
+pub mod numbers;
\ No newline at end of file
diff --git a/extras/src/link.rs b/extras/src/link.rs
new file mode 100644
index 000000000..5ad543db8
--- /dev/null
+++ b/extras/src/link.rs
@@ -0,0 +1,121 @@
+//! A simple transport layer using inter-thread communications, intended for use in tests and examples.
+//!
+//! Usage is simple, just add [`LinkTransportPlugin`] to all involved apps.
+//! Then, use [`pair`] to create two [`Link`] components that communicate with eachother.
+//! These 'links' don't do any kind of handshake. Once added to an entity, they communicate immediately.
+
+use std::sync::{mpsc::{channel, Receiver, Sender, TryRecvError}, Mutex};
+use bevy::prelude::*;
+use bevy_stardust::prelude::*;
+
+/// Adds a simple transport plugin for apps part of the same process.
+/// See the [top level documentation](self) for more information.
+pub struct LinkTransportPlugin;
+
+impl Plugin for LinkTransportPlugin {
+ fn build(&self, app: &mut App) {
+ app.add_systems(PreUpdate, (recv_link_data, remove_disconnected)
+ .chain().in_set(NetworkRecv::Receive));
+
+ app.add_systems(PostUpdate, (send_link_data, remove_disconnected)
+ .chain().in_set(NetworkSend::Transmit));
+ }
+}
+
+/// A connection to another `Link`, made with [`pair`].
+///
+/// A `Link` will only communicate with its counterpart.
+#[derive(Component)]
+pub struct Link(SideInner);
+
+/// Creates two connected [`Link`] objects.
+pub fn pair() -> (Link, Link) {
+ let (left_tx, left_rx) = channel();
+ let (right_tx, right_rx) = channel();
+
+ let left = Link(SideInner {
+ sender: left_tx,
+ receiver: Mutex::new(right_rx),
+ disconnected: false,
+ });
+
+ let right = Link(SideInner {
+ sender: right_tx,
+ receiver: Mutex::new(left_rx),
+ disconnected: false,
+ });
+
+ return (left, right);
+}
+
+struct SideInner {
+ sender: Sender,
+ // Makes the struct Sync, so it can be in a Component.
+ // Use Exclusive when it's stabilised.
+ receiver: Mutex>,
+ disconnected: bool,
+}
+
+fn recv_link_data(
+ mut query: Query<(&mut Link, &mut PeerMessages), With>,
+) {
+ query.par_iter_mut().for_each(|(mut link, mut queue)| {
+ let receiver = link.0.receiver.get_mut().unwrap();
+ loop {
+ match receiver.try_recv() {
+ Ok(message) => {
+ queue.push_one(message);
+ },
+
+ Err(TryRecvError::Empty) => { break },
+
+ Err(TryRecvError::Disconnected) => {
+ link.0.disconnected = true;
+ break;
+ },
+ }
+ }
+ });
+}
+
+fn send_link_data(
+ mut query: Query<(&mut Link, &PeerMessages), With>,
+) {
+ query.par_iter_mut().for_each(|(mut link, queue)| {
+ let sender = &link.0.sender;
+ 'outer: for (channel, queue) in queue {
+ for payload in queue {
+ match sender.send(ChannelMessage { channel, message: payload }) {
+ Ok(_) => {},
+ Err(_) => {
+ link.0.disconnected = true;
+ break 'outer;
+ },
+ }
+ }
+ }
+ });
+}
+
+fn remove_disconnected(
+ mut commands: Commands,
+ mut query: Query<(Entity, &Link, Option<&mut PeerLifestage>)>,
+ mut events: EventWriter,
+) {
+ for (entity, link, stage) in query.iter_mut() {
+ if link.0.disconnected {
+ debug!("Link on entity {entity:?} disconnected");
+ commands.entity(entity).remove:: ();
+
+ events.send(PeerDisconnectedEvent {
+ peer: entity,
+ reason: DisconnectReason::Unspecified,
+ comment: None,
+ });
+
+ if let Some(mut stage) = stage {
+ *stage = PeerLifestage::Closed;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/src/numbers/mod.rs b/extras/src/numbers/mod.rs
new file mode 100644
index 000000000..fb4487026
--- /dev/null
+++ b/extras/src/numbers/mod.rs
@@ -0,0 +1,7 @@
+//! Types for working with numbers, such as efficient encoding and easier logic.
+
+mod sequence;
+mod varint;
+
+pub use sequence::Sequence;
+pub use varint::VarInt;
\ No newline at end of file
diff --git a/extras/src/numbers/sequence.rs b/extras/src/numbers/sequence.rs
new file mode 100644
index 000000000..5d5486bf3
--- /dev/null
+++ b/extras/src/numbers/sequence.rs
@@ -0,0 +1,245 @@
+use std::{cmp::Ordering, ops::{Add, AddAssign, Sub, SubAssign}};
+
+/// A sequence value that always wraps.
+///
+/// When you are sending a sequence of items, you may want to identify them with a unique number.
+/// However, if you reach the limit of representable values for a type like `u32`, you cannot send further items.
+/// This is what `Sequence` solves. Mutation always wraps around, and comparison takes wrapping into account.
+/// However, this type is only suitable for values that **only increment** and will only increment a certain
+/// amount in a certain span of time. If you can receive more than 1/3 the range of values of your `Sequence`
+/// at once, you should use a `T` that can represent more values.
+///
+/// The `Ord` implementation takes into account the wrapping difference between the two values.
+/// A **very high** sequence number is considered **lesser** than a **very low** sequence number.
+/// Since we know the value wraps, we can assume that, for a `Sequence`, `0` was sent *after* `255`,
+/// since we only [`increment`](Self::increment) the sequence value a certain amount. For example, the
+/// difference between `4` and `9` is `5`, but the difference between `254` and `1` is `3`, again
+/// assuming you're using a `Sequence`.
+///
+/// The `SeqValue` trait is intentionally hidden, as its internals are not important.
+/// `T` can be any one of `u8`, `u16`, `u32`, `u64`, or `u128`.
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
+#[repr(transparent)]
+pub struct Sequence(T);
+
+impl From for Sequence {
+ #[inline]
+ fn from(value: T) -> Self {
+ Self(value)
+ }
+}
+
+impl Sequence {
+ /// Returns the inner integer value.
+ #[inline]
+ pub fn inner(&self) -> T {
+ self.0
+ }
+
+ /// Increment the value by `1`. Wraps at numerical bounds.
+ pub fn increment(&mut self) {
+ *self = *self + T::ONE;
+ }
+
+ /// Returns the difference between two sequence values.
+ pub fn diff(&self, other: &Self) -> T {
+ let a = self.0;
+ let b = other.0;
+
+ let diff = a.abs_diff(b);
+ if diff < T::MID { return diff }
+
+ if a > b {
+ return b.wrapping_sub(a);
+ } else {
+ return a.wrapping_sub(b);
+ }
+ }
+}
+
+impl PartialOrd for Sequence {
+ fn partial_cmp(&self, other: &Self) -> Option {
+ self.0.partial_cmp(&other.0)
+ }
+}
+
+impl Ord for Sequence {
+ fn cmp(&self, other: &Self) -> Ordering {
+ // An adaptation of Glenn Fiedler's wrapping sequence identifier algorithm, modified to output an Ordering
+ // https://www.gafferongames.com/post/reliability_ordering_and_congestion_avoidance_over_udp/
+ if self == other { return Ordering::Equal }
+ let a = self.0; let b = other.0;
+ let r = ((a>b)&&(a-b<=T::MID))||((aT::MID));
+ match r {
+ true => Ordering::Greater,
+ false => Ordering::Less,
+ }
+ }
+}
+
+impl PartialEq for Sequence {
+ fn eq(&self, other: &T) -> bool {
+ self.0.eq(other)
+ }
+}
+
+impl Add for Sequence {
+ type Output = Sequence;
+
+ #[inline]
+ fn add(self, rhs: T) -> Self::Output {
+ Sequence(self.0.wrapping_add(rhs))
+ }
+}
+
+impl AddAssign for Sequence {
+ fn add_assign(&mut self, rhs: T) {
+ *self = *self + rhs;
+ }
+}
+
+impl Sub for Sequence {
+ type Output = Sequence;
+
+ #[inline]
+ fn sub(self, rhs: T) -> Self::Output {
+ Sequence(self.0.wrapping_sub(rhs))
+ }
+}
+
+impl SubAssign for Sequence {
+ fn sub_assign(&mut self, rhs: T) {
+ *self = *self - rhs;
+ }
+}
+
+/// A number that can be used in a [`Sequence`] value.
+#[doc(hidden)]
+pub trait SeqValue
+where
+ Self: sealed::Sealed,
+ Self: Sized + Clone + Copy + Default + Ord,
+ Self: Add + Sub,
+{
+ const ONE: Self;
+ const MIN: Self;
+ const MID: Self;
+ const MAX: Self;
+
+ fn abs_diff(self, other: Self) -> Self;
+ fn wrapping_sub(self, other: Self) -> Self;
+ fn wrapping_add(self, other: Self) -> Self;
+}
+
+macro_rules! impl_seqvalue {
+ ($type:ty, $val:expr) => {
+ impl SeqValue for $type {
+ const ONE: $type = 1;
+ const MIN: $type = <$type>::MIN;
+ const MID: $type = $val;
+ const MAX: $type = <$type>::MAX;
+
+ #[inline]
+ fn abs_diff(self, other: Self) -> Self {
+ self.abs_diff(other)
+ }
+
+ #[inline]
+ fn wrapping_sub(self, other: Self) -> Self {
+ self.wrapping_sub(other)
+ }
+
+ #[inline]
+ fn wrapping_add(self, other: Self) -> Self {
+ self.wrapping_add(other)
+ }
+ }
+ };
+}
+
+impl_seqvalue!(u8, 2u8.pow(7));
+impl_seqvalue!(u16, 2u16.pow(15));
+impl_seqvalue!(u32, 2u32.pow(31));
+impl_seqvalue!(u64, 2u64.pow(63));
+impl_seqvalue!(u128, 2u128.pow(127));
+
+mod sealed {
+ pub trait Sealed {}
+ impl Sealed for u8 {}
+ impl Sealed for u16 {}
+ impl Sealed for u32 {}
+ impl Sealed for u64 {}
+ impl Sealed for u128 {}
+}
+
+#[test]
+fn sequence_id_difference_test() {
+ const MIDPOINT: Sequence:: = Sequence(u16::MID);
+
+ #[inline]
+ fn seq(v: u16) -> Sequence {
+ Sequence::from(v)
+ }
+
+ assert_eq!(seq(0).diff(&seq(0)), 0);
+ assert_eq!(seq(0).diff(&seq(1)), 1);
+ assert_eq!(seq(3).diff(&seq(7)), 4);
+ assert_eq!(seq(1).diff(&seq(0)), 1);
+ assert_eq!(seq(7).diff(&seq(3)), 4);
+ assert_eq!(seq(u16::MAX).diff(&seq(u16::MIN)), 1);
+ assert_eq!(seq(u16::MAX).sub(3).diff(&seq(u16::MIN).add(3)), 7);
+ assert_eq!(seq(u16::MIN).diff(&seq(u16::MAX)), 1);
+ assert_eq!(seq(u16::MIN).add(3).diff(&seq(u16::MAX).sub(3)), 7);
+ assert_eq!(MIDPOINT.diff(&MIDPOINT), 0);
+ assert_eq!(MIDPOINT.sub(1).diff(&MIDPOINT), 1);
+ assert_eq!(MIDPOINT.add(1).diff(&MIDPOINT), 1);
+}
+
+#[test]
+fn sequence_id_ordering_test() {
+ const MIDPOINT: Sequence:: = Sequence(u16::MID);
+
+ #[inline]
+ fn seq(v: u16) -> Sequence {
+ Sequence::from(v)
+ }
+
+ assert_eq!(seq(4).cmp(&seq(4)), Ordering::Equal);
+ assert_eq!(seq(15).cmp(&seq(9)), Ordering::Greater);
+ assert_eq!(seq(9).cmp(&seq(15)), Ordering::Less);
+ assert_eq!(seq(65534).cmp(&seq(66)), Ordering::Less);
+ assert_eq!(seq(u16::MAX).cmp(&seq(u16::MIN)), Ordering::Less);
+ assert_eq!(seq(66).cmp(&seq(65534)), Ordering::Greater);
+ assert_eq!(seq(u16::MIN).cmp(&seq(u16::MAX)), Ordering::Greater);
+ assert_eq!(MIDPOINT.cmp(&MIDPOINT), Ordering::Equal);
+ assert_eq!(MIDPOINT.sub(1).cmp(&MIDPOINT), Ordering::Less);
+ assert_eq!(MIDPOINT.add(1).cmp(&MIDPOINT), Ordering::Greater);
+}
+
+#[cfg(feature="octs")]
+mod octs {
+ use octs::{Encode, FixedEncodeLen, Decode};
+ use super::{Sequence, SeqValue};
+
+ impl Encode for Sequence {
+ type Error = T::Error;
+
+ #[inline]
+ fn encode(&self, mut dst: impl octs::Write) -> Result<(), octs::BufTooShortOr> {
+ self.0.encode(&mut dst)
+ }
+ }
+
+ impl FixedEncodeLen for Sequence {
+ const ENCODE_LEN: usize = T::ENCODE_LEN;
+ }
+
+ impl Decode for Sequence {
+ type Error = T::Error;
+
+ #[inline]
+ fn decode(mut src: impl octs::Read) -> Result> {
+ T::decode(&mut src).map(|v| Self(v))
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/src/numbers/varint.rs b/extras/src/numbers/varint.rs
new file mode 100644
index 000000000..82747250f
--- /dev/null
+++ b/extras/src/numbers/varint.rs
@@ -0,0 +1,254 @@
+use std::fmt::{Debug, Display};
+use bevy_stardust::messages::bytes::{Buf, BufMut};
+use bevy_stardust::prelude::*;
+
+/// A variable length integer that can store values up to `(2^62)-1`.
+///
+/// Based on [RFC 9000 Section 16](https://www.rfc-editor.org/rfc/rfc9000.html#name-variable-length-integer-enc) (Variable-Length Integer Encoding).
+#[derive(Clone, Copy, PartialEq, Eq)]
+pub struct VarInt(u64);
+
+impl VarInt {
+ /// The maximum representable value of a `VarInt`.
+ pub const MAX: u64 = 2u64.pow(62) - 1;
+
+ /// Creates a `VarInt` from a `u32`.
+ /// As this function cannot fail, it is usable in const contexts.
+ #[inline]
+ pub const fn from_u32(value: u32) -> Self {
+ Self(value as u64)
+ }
+
+ /// Decodes a `VarInt` from a [`Buf`].
+ pub fn read(b: &mut B) -> Result {
+ const MASK: u8 = 0b0000_0011;
+
+ // Check there's anything left in the buffer
+ if b.remaining() < 1 { return Err(()) }
+
+ let mut bytes = [0u8; 8];
+ let first = b.get_u8();
+ bytes[0] = first & !MASK;
+
+ match first & MASK {
+ 0b00 => {},
+
+ 0b01 => {
+ if b.remaining() < 1 { return Err(()) }
+ bytes[1] = b.get_u8();
+ },
+
+ 0b10 => {
+ if b.remaining() < 3 { return Err(()); }
+ b.copy_to_slice(&mut bytes[1..4]);
+ },
+
+ 0b11 => {
+ if b.remaining() < 7 { return Err(()); }
+ b.copy_to_slice(&mut bytes[1..8]);
+ },
+
+ _ => unreachable!(),
+ }
+
+ // The result has to be bitshifted by 2
+ // due to the length header
+ return Ok(Self(u64::from_le_bytes(bytes) >> 2));
+ }
+
+ /// Encodes a `VarInt` to a [`BufMut`].
+ pub fn write(&self, b: &mut B) -> Result<(), ()> {
+ let mut bytes = (self.0 << 2).to_le_bytes();
+ let len = self.len();
+ if len as usize > b.remaining_mut() { return Err(()); }
+
+ match len {
+ 1 => {
+ b.put_u8(bytes[0]);
+ },
+
+ 2 => {
+ bytes[0] |= 0b01;
+ b.put(&bytes[..2]);
+ },
+
+ 4 => {
+ bytes[0] |= 0b10;
+ b.put(&bytes[..4]);
+ },
+
+ 8 => {
+ bytes[0] |= 0b11;
+ b.put(&bytes[..8]);
+ },
+
+ _ => unreachable!(),
+ }
+
+ return Ok(());
+ }
+
+ /// Returns the amount of bytes that would be written if `write` were used.
+ pub fn len(&self) -> u8 {
+ // SAFETY: A VarInt that would return an Err cannot be created, so this case cannot occur.
+ unsafe { Self::len_u64(self.0).unwrap_unchecked() }
+ }
+
+ /// Estimate the encoded size of a `VarInt` with this value.
+ pub fn len_u32(value: u32) -> u8 {
+ let x = value;
+ if x <= 63 { return 1; }
+ if x <= 16383 { return 2; }
+ if x <= 1073741823 { return 4; }
+ return 8;
+ }
+
+ /// Estimate the encoded size of a `VarInt` with this value.
+ ///
+ /// Since `u64` can represent values a `VarInt` can't, this function can fail.
+ pub fn len_u64(value: u64) -> Result {
+ if value <= 63 { return Ok(1); }
+ if value <= 16383 { return Ok(2); }
+ if value <= 1073741823 { return Ok(4); }
+ if value <= 4611686018427387903 { return Ok(8); }
+ return Err(());
+ }
+}
+
+impl From for VarInt {
+ #[inline]
+ fn from(value: u32) -> Self {
+ Self(value as u64)
+ }
+}
+
+impl TryFrom for VarInt {
+ type Error = ();
+
+ fn try_from(value: u64) -> Result {
+ if value > Self::MAX { return Err(()); }
+ return Ok(Self(value));
+ }
+}
+
+impl TryFrom for VarInt {
+ type Error = ();
+
+ fn try_from(value: usize) -> Result {
+ #[cfg(target_pointer_width="32")]
+ return Ok(Self(value as u64));
+
+ #[cfg(target_pointer_width="64")]
+ (value as u64).try_into()
+ }
+}
+
+impl TryFrom for u32 {
+ type Error = ();
+
+ fn try_from(value: VarInt) -> Result {
+ if value.0 > u32::MAX as u64 { return Err(()); }
+ return Ok(value.0 as u32);
+ }
+}
+
+impl From for u64 {
+ #[inline]
+ fn from(value: VarInt) -> Self {
+ value.0
+ }
+}
+
+impl From for ChannelId {
+ #[inline]
+ fn from(value: VarInt) -> Self {
+ value.into()
+ }
+}
+
+impl From for VarInt {
+ #[inline]
+ fn from(value: ChannelId) -> Self {
+ VarInt::from_u32(value.into())
+ }
+}
+
+impl Debug for VarInt {
+ #[inline]
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ Debug::fmt(&self.0, f)
+ }
+}
+
+impl Display for VarInt {
+ #[inline]
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ Display::fmt(&self.0, f)
+ }
+}
+
+#[test]
+fn varint_encoding() {
+ use std::io::Cursor;
+
+ static TEST_SET: &[u64] = &[
+ 0, 1, 2, 4, 8, 16, 32, 63, 64, 65, 66,
+ 8000, 10000, 16000, 16383, 16384, 16385,
+ 107374000, 1073741823, 1073741824, 1073741825,
+ 4611686017999999999, 4611686018000000000, 4611686018000000001,
+ 4611686018427387901, 4611686018427387902, 4611686018427387903,
+ ];
+
+ fn serial_test(value: u64, bytes: &mut Vec) {
+ let value = VarInt::try_from(value)
+ .expect("Value passed to serial_test was not representable in a varint");
+
+ // Serialise
+ value.write(bytes).unwrap();
+ assert_eq!(bytes.len(), value.len() as usize);
+
+ // Deserialise
+ let mut cursor = Cursor::new(&bytes[..]);
+ let new = VarInt::read(&mut cursor).unwrap();
+ assert_eq!(value, new);
+ }
+
+ let mut bytes: Vec = Vec::with_capacity(8);
+ for value in TEST_SET {
+ serial_test(*value, &mut bytes);
+ bytes.clear();
+ }
+}
+
+#[cfg(feature="octs")]
+mod octs {
+ use super::VarInt;
+ use octs::{Encode, EncodeLen, FixedEncodeLenHint, Decode};
+
+ impl Encode for VarInt {
+ type Error = ();
+
+ fn encode(&self, mut dst: impl octs::Write) -> Result<(), octs::BufTooShortOr> {
+ self.write(&mut dst).map_err(|_| octs::BufTooShortOr::TooShort)
+ }
+ }
+
+ impl EncodeLen for VarInt {
+ fn encode_len(&self) -> usize {
+ self.len() as usize
+ }
+ }
+
+ impl FixedEncodeLenHint for VarInt {
+ const MAX_ENCODE_LEN: usize = 8;
+ const MIN_ENCODE_LEN: usize = 1;
+ }
+
+ impl Decode for VarInt {
+ type Error = ();
+
+ fn decode(mut src: impl octs::Read) -> Result> {
+ VarInt::read(&mut src).map_err(|_| octs::BufTooShortOr::TooShort)
+ }
+ }
+}
\ No newline at end of file
diff --git a/stardust/Cargo.toml b/stardust/Cargo.toml
index 0acc4c30d..ea59ff0cb 100644
--- a/stardust/Cargo.toml
+++ b/stardust/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name="bevy_stardust"
-version="0.5.1"
+version="0.6.0"
edition="2021"
authors=["Veritius "]
license="MIT OR Apache-2.0"
@@ -8,13 +8,19 @@ description="A networking crate for the Bevy game engine."
repository="https://github.com/veritius/bevy_stardust/"
keywords=["bevy", "gamedev", "networking"]
-[dependencies]
-bevy = { version = "0.13", default-features = false }
-bytes = "1.5.0"
-smallvec = "1.11.1"
-gxhash = { version = "=3.0.0", optional = true } # 3.1 is broken, remove this bound when it's fixed
+[dependencies.bevy]
+version = "0.14"
+default-features = false
-[features]
-default = ["reflect"]
-reflect = []
-hashing = ["reflect", "dep:gxhash"]
\ No newline at end of file
+[dependencies.bytes]
+version = "1.5.0"
+
+[dependencies.smallvec]
+version = "1.11.1"
+
+[dev-dependencies]
+fastrand = "2"
+
+[dev-dependencies.bevy_stardust_extras]
+version = "0.1.0"
+path = "../extras"
\ No newline at end of file
diff --git a/stardust/src/channels/config.rs b/stardust/src/channels/config.rs
index 27b84d5d4..38612f5fa 100644
--- a/stardust/src/channels/config.rs
+++ b/stardust/src/channels/config.rs
@@ -1,81 +1,109 @@
-//! Channel configuration.
-//!
-//! All settings are not definitive, but hints to transport layers as how to treat channels.
-
-#[cfg(feature="hashing")]
-use {std::hash::Hasher, crate::hashing::StableHash};
+use bevy::reflect::Reflect;
/// Configuration for a channel.
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Hash, Reflect)]
+#[reflect(Debug, Hash)]
pub struct ChannelConfiguration {
- /// Whether messages should be resent if they're missed.
- pub reliable: ReliabilityGuarantee,
-
- /// Whether messages should be read in the order they were sent.
- pub ordered: OrderingGuarantee,
-
- /// If messages on this channel may need to be broken up to be transmitted.
- /// If disabled, messages over the MTU will be discarded or panic, depending on the transport layer.
- pub fragmented: bool,
+ /// Guarantees that the transport layer must make
+ /// for messages sent on this channel. See the
+ /// documentation of [`MessageConsistency`].
+ pub consistency: MessageConsistency,
/// The priority of messages on this channel.
- /// Transport values will send messages on channels with higher `priority` values first.
- /// Channel priority is not hashed when the `hashing` feature is enabled.
+ /// Transport values will try to send messages on
+ /// channels with higher `priority` values first.
pub priority: u32,
}
-#[cfg(feature="hashing")]
-impl StableHash for &ChannelConfiguration {
- fn hash(&self, state: &mut H) {
- self.reliable.hash(state);
- self.ordered.hash(state);
- self.fragmented.hash(state);
- }
-}
+/// Reliability and ordering guarantees.
+/// This is enforced by the transport layer handling the client.
+///
+/// # Why?
+/// ## Reliability
+///
+///
+/// The Internet makes no guarantees about your message being received.
+/// This is a challenge if your application is expecting something, and it's lost.
+/// Reliability guarantees that individual messages on this channel are received
+/// eventually, through whatever means are available to the transport layer.
+/// This almost always incurs some overhead, and may be undesirable for
+/// certain kinds of transmission, especially for real-time data.
+///
+/// ## Ordering
+/// The Internet makes no guarantees about the order packets are received in.
+/// This means that if you're trying to send chunks of an image, you may
+/// receive packets in the wrong order to the one they were sent in, and end
+/// up with a very muddled up image.
+///
+/// By enabling ordering for a channel, transport layers will ensure
+/// that messages in the channel will be received in a specified order,
+/// relative to the order they were sent in. Messages are only ordered
+/// against other messages in the same channel.
+///
+/// Sequencing is related to ordering, but discards older messages when
+/// an out-of-order transmission occurs. If the messages `[1,2,3,4,5]` is
+/// received in order, the application sees `[1,2,3,4,5]`. However, if the
+/// messages are received in the order `[1,3,2,5,4]`, the application will
+/// only see the messages `[1,3,5]`.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Reflect)]
+#[reflect(Debug, PartialEq, Hash)]
+#[non_exhaustive]
+pub enum MessageConsistency {
+ /// Messages lost in transport will not be resent.
+ /// They are added to the queue in the order they're received,
+ /// which may be different to the order they were sent in.
+ ///
+ /// Useful for messages that can occasionally be lost,
+ /// and aren't needed to be read in a specific order,
+ /// such as spawning particle effects.
+ UnreliableUnordered,
+
+ /// Messages lost in transport will not be resent.
+ /// If messages are not received in order, only the most
+ /// recent messages will be stored, discarding old messages.
+ ///
+ /// Useful for messages that are used to update something
+ /// each tick, where only the most recent values matter,
+ /// such as player position in a shooter.
+ UnreliableSequenced,
-/// The reliability guarantee of a channel.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum ReliabilityGuarantee {
- /// Messages are not guaranteed to arrive.
- Unreliable,
+ /// Messages lost in transport will be resent.
+ /// They are added to the queue in the order they're received,
+ /// which may be different to the order they were sent in.
+ ///
+ /// Useful for messages that must be received,
+ /// but don't have any ordering requirements,
+ /// such as inventory updates in a survival game.
+ ReliableUnordered,
- /// Lost messages will be detected and resent.
- Reliable,
+ /// Messages lost in transport will be resent.
+ /// They are added to the queue in the order they were sent,
+ /// which may introduce a delay in the case of a resend.
+ ///
+ /// Useful for messages that must be received,
+ /// and must be received in a certain order,
+ /// such as chat messages in a multiplayer game.
+ ReliableOrdered,
}
-#[cfg(feature="hashing")]
-impl StableHash for ReliabilityGuarantee {
- fn hash(&self, state: &mut H) {
+impl MessageConsistency {
+ /// Returns `true` if messages in this channel must be sent reliably.
+ pub fn is_reliable(&self) -> bool {
match self {
- ReliabilityGuarantee::Unreliable => state.write_u8(0),
- ReliabilityGuarantee::Reliable => state.write_u8(1),
+ MessageConsistency::UnreliableUnordered => false,
+ MessageConsistency::UnreliableSequenced => false,
+ MessageConsistency::ReliableUnordered => true,
+ MessageConsistency::ReliableOrdered => true,
}
}
-}
-
-/// The ordering guarantee of a channel.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum OrderingGuarantee {
- /// Messages will be available in the order they are received.
- /// This is not necessarily the order they were sent. If that matters, use a different variant.
- Unordered,
-
- /// Messages that are older than the most recent value will be discarded.
- /// Therefore, messages will be available in order, but out of order messages are lost.
- Sequenced,
-
- /// Messages will be available in the exact order they were sent.
- /// If [reliability](ReliabilityGuarantee::Reliable) is used, this can 'block' messages temporarily due to data loss.
- Ordered,
-}
-#[cfg(feature="hashing")]
-impl StableHash for OrderingGuarantee {
- fn hash(&self, state: &mut H) {
+ /// Returns `true` if messages in this channel have any ordering constraints applied.
+ pub fn is_ordered(&self) -> bool {
match self {
- OrderingGuarantee::Unordered => state.write_u8(0),
- OrderingGuarantee::Sequenced => state.write_u8(1),
- OrderingGuarantee::Ordered => state.write_u8(2),
+ MessageConsistency::UnreliableUnordered => false,
+ MessageConsistency::UnreliableSequenced => true,
+ MessageConsistency::ReliableUnordered => false,
+ MessageConsistency::ReliableOrdered => true,
}
}
}
\ No newline at end of file
diff --git a/stardust/src/channels/extension.rs b/stardust/src/channels/extension.rs
index 3e29b25ff..c626b665f 100644
--- a/stardust/src/channels/extension.rs
+++ b/stardust/src/channels/extension.rs
@@ -1,8 +1,9 @@
//! Adds `add_channel` to the `App`.
use bevy::app::App;
-use crate::channels::config::ChannelConfiguration;
-use super::{id::Channel, SetupChannelRegistry};
+use super::config::ChannelConfiguration;
+use super::ChannelId;
+use super::{id::Channel, ChannelRegistryBuilder};
mod sealed {
pub trait Sealed {}
@@ -12,28 +13,21 @@ mod sealed {
/// Adds channel-related functions to the `App`.
pub trait ChannelSetupAppExt: sealed::Sealed {
/// Registers a channel with type `C` and the config and components given.
- fn add_channel(&mut self, config: ChannelConfiguration);
+ /// Returns the sequential `ChannelId` now associated with the channel.
+ fn add_channel(&mut self, config: ChannelConfiguration) -> ChannelId;
}
impl ChannelSetupAppExt for App {
fn add_channel(
&mut self,
config: ChannelConfiguration,
- ) {
- // Register channel type in reflect data
- #[cfg(feature="reflect")]
- self.register_type::();
-
- // Change hash value
- #[cfg(feature="hashing")] {
- use crate::hashing::HashingAppExt;
- self.net_hash_value("channel");
- self.net_hash_value(C::type_path());
- self.net_hash_value(&config);
- }
+ ) -> ChannelId {
+ // Get the registry
+ let mut registry = self.world_mut()
+ .get_resource_mut::()
+ .expect("Cannot add channels after plugin cleanup");
// Add to registry
- let mut registry = self.world.resource_mut::();
- registry.0.register_channel::(config);
+ return registry.0.register_channel::(config);
}
}
\ No newline at end of file
diff --git a/stardust/src/channels/id.rs b/stardust/src/channels/id.rs
index 8b70cf141..65299bbe0 100644
--- a/stardust/src/channels/id.rs
+++ b/stardust/src/channels/id.rs
@@ -1,92 +1,41 @@
-//! Types that can be used to interface with Stardust's message reading and writing APIs.
-//!
-//! Note: In the following examples, `#[derive(Reflect)]` is only needed with the `reflect` feature flag.
-//!
-//! ```ignore
-//! // Defining a channel type is simple
-//! #[derive(Reflect)]
-//! pub struct MyChannel;
-//!
-//! // You can make channels private
-//! #[derive(Reflect)]
-//! struct MyPrivateChannel;
-//!
-//! // You can make channels with generic type bounds too
-//! #[derive(Reflect)]
-//! struct MyGenericChannel(PhantomData);
-//! ```
-//!
-//! In Stardust, `Channel` trait objects are just used for their type data.
-//! The type itself isn't actually stored. That means you can do things like this.
-//!
-//! ```ignore
-//! #[derive(Reflect, Event)]
-//! pub struct MovementEvent(pub Vec3);
-//!
-//! fn main() {
-//! let mut app = App::new();
-//!
-//! app.add_plugins((DefaultPlugins, StardustPlugin));
-//!
-//! app.add_event::();
-//! app.add_channel::(ChannelConfiguration {
-//! reliable: ReliabilityGuarantee::Unreliable,
-//! ordered: OrderingGuarantee::Unordered,
-//! fragmented: false,
-//! priority: 0,
-//! });
-//! }
-//! ```
-
-use std::{marker::PhantomData, ops::Deref};
+use std::any::Any;
use bevy::prelude::*;
-use super::ChannelRegistryInner;
-
-/// Marker trait for channels. See the [module level documentation](self) for more information.
-#[cfg(not(feature="reflect"))]
-pub trait Channel: Send + Sync + 'static {}
-
-#[cfg(not(feature="reflect"))]
-impl Channel for T {}
-
-#[cfg(feature="reflect")]
-use bevy::reflect::*;
+use super::ChannelRegistry;
-/// Marker trait for channels. See the [module level documentation](self) for more information.
-#[cfg(feature="reflect")]
-pub trait Channel: Reflect + TypePath + GetTypeRegistration + Send + Sync + 'static {}
+/// Types that can be used to identify channels within the type system.
+/// Once registered to the `App`, this type has a [`ChannelId`] assigned to it.
+pub trait Channel: Any {}
-#[cfg(feature="reflect")]
-impl Channel for T {}
-
-/// Typed marker component for filtering channel entities.
-#[derive(Component)]
-pub(super) struct ChannelMarker(pub PhantomData);
-
-impl Default for ChannelMarker {
- fn default() -> Self {
- Self(Default::default())
- }
-}
+impl Channel for T {}
-/// A sequential channel identifier that can be used to access data without type information.
+/// A unique identifier for a channel, generated during application setup.
///
-/// Channel identifiers are generated by the `ChannelRegistry` and are unique to the `World` they originated from.
-/// Attempting to use a `ChannelId` in another `World` will probably panic, or give you unintended results.
-#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
-#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))]
+/// A `ChannelId` is used to identify a channel without type information,
+/// such as in a transport layer or associative arrays where `TypeId`
+/// would be excessive. Channel registration also ensures that the same
+/// `ChannelId` refers to the same channel, regardless of compilation.
+/// This only holds true if the [ordering constraints](super) are obeyed.
+///
+/// Note that channel identifiers are only unique to the
+/// `World` belonging to the `App` they were registered to.
+/// Using them in a different `World` or `App` may panic,
+/// or have additional consequences because of transport
+/// layers, such as causing undefined behavior. Make sure
+/// you read the documentation!
+#[derive(Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Reflect)]
#[repr(transparent)]
pub struct ChannelId(u32);
-impl From for ChannelId {
- fn from(value: u32) -> Self {
- Self(value)
+impl std::fmt::Debug for ChannelId {
+ #[inline(always)]
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ self.0.fmt(f)
}
}
-impl From<[u8;4]> for ChannelId {
- fn from(value: [u8;4]) -> Self {
- Self(u32::from_be_bytes(value))
+impl From for ChannelId {
+ fn from(value: u32) -> Self {
+ Self(value)
}
}
@@ -102,6 +51,12 @@ impl From for usize {
}
}
+impl From<[u8;4]> for ChannelId {
+ fn from(value: [u8;4]) -> Self {
+ Self(u32::from_be_bytes(value))
+ }
+}
+
impl From for [u8;4] {
fn from(value: ChannelId) -> Self {
value.0.to_be_bytes()
@@ -109,36 +64,21 @@ impl From for [u8;4] {
}
/// Types that can be used to access channel data in a channel registry.
-pub trait ToChannelId: sealed::Sealed {
- /// Convert the type to a `ChannelId`
- fn to_channel_id(&self, registry: impl Deref) -> Option;
+pub trait ToChannelId {
+ /// Convert the type to a `ChannelId`.
+ fn to_channel_id(&self, registry: impl AsRef) -> Option;
}
impl ToChannelId for ChannelId {
#[inline]
- fn to_channel_id(&self, _: impl Deref) -> Option {
+ fn to_channel_id(&self, _: impl AsRef) -> Option {
Some(self.clone())
}
}
impl ToChannelId for std::any::TypeId {
- fn to_channel_id(&self, registry: impl Deref) -> Option {
- registry.channel_type_ids.get(&self).cloned()
- }
-}
-
-#[cfg(feature="reflect")]
-impl ToChannelId for &dyn bevy::reflect::Reflect {
- fn to_channel_id(&self, registry: impl Deref) -> Option {
- self.type_id().to_channel_id(registry)
+ #[inline]
+ fn to_channel_id(&self, registry: impl AsRef) -> Option {
+ registry.as_ref().channel_type_ids.get(&self).cloned()
}
-}
-
-mod sealed {
- pub trait Sealed {}
- impl Sealed for super::ChannelId {}
- impl Sealed for std::any::TypeId {}
-
- #[cfg(feature="reflect")]
- impl Sealed for &dyn bevy::reflect::Reflect {}
}
\ No newline at end of file
diff --git a/stardust/src/channels/mod.rs b/stardust/src/channels/mod.rs
index 4066bdf14..09204d8b4 100644
--- a/stardust/src/channels/mod.rs
+++ b/stardust/src/channels/mod.rs
@@ -1,46 +1,173 @@
-//! Channel definitions and message storage.
+//! Message organisation systems.
//!
-//! You can add a channel when setting up the `App`.
-//! ```ignore
-//! #[derive(Reflect)] // Only necessary with the reflect feature
-//! struct MyChannel;
+//! # What are channels
+//! Channels are an abstraction provided by Stardust to make writing netcode easier.
+//! Instead of having a static number like `12345` to identify message types,
+//! Stardust automatically generates these numbers, which have the dual benefit of
+//! being very efficient to transmit, and easy to work with for a developer.
+//! Most of the time, you as the developer won't directly work with channel identifiers.
+//! Instead, you use the type system, just like you would to use Bevy systemparams.
+//!
+//! A major benefit of automatically generating channel identifiers is that
+//! it's incredibly easy to add new message types. You don't need a massive
+//! document of every channel ID to make sure that system A doesn't read a
+//! message intended for system B. This is especially useful when using plugins,
+//! which now just work, with no extra effort on your part.
+//!
+//! Channels also obey Rust's visibility system. Since you primarily access
+//! channels with their associated type, if that type is not accessible,
+//! that channel cannot be accessed, letting you compartmentalise code better.
+//! This aligns very well with the compartmentalisation that ECS is designed for.
+//!
+//! Note that you *can* technically access a channel without a type, using its ID,
+//! but this is very unreliable and considered bad practice. Visibility cannot be
+//! perfectly enforced, as transport layers must have access to all channels to
+//! be able to do their job.
+//!
+//! # Adding channels
+//! Channels are accessed using the type system. You can use any type,
+//! as long as it implements [`Channel`]. Since `Channel` is automatically
+//! implemented for any type that implements `Any`, you can simply define
+//! a new struct with no fields, or reuse another type.
+//!
+//! ```no_run
+//! pub struct MyChannel;
+//! ```
+//!
+//! Channels must also have a [`ChannelConfiguration`].
+//! The configuration of a channel is used to tell transport layers how to treat
+//! [messages] sent over that channel, like if messages should be ordered.
+//! It's highly recommended to read the documentation of `ChannelConfiguration`
+//! to understand what each field does, and its implications for your code.
+//!
+//! ```no_run
+//! # use bevy_stardust::prelude::*;
+//! # fn _p() {
+//! let config = ChannelConfiguration {
+//! consistency: MessageConsistency::ReliableOrdered,
+//! priority: 128,
+//! };
+//! # }
+//! ```
+//!
+//! Channels must be added to the `App` before being used. This is done
+//! by adding it to the channel registry. To do this, just use the
+//! [`add_channel`][add_channel] on the `App`. This is implemented by
+//! the [`ChannelSetupAppExt`] trait, which is automatically brought into
+//! scope with `use bevy_stardust::prelude::*;`
+//!
+//! [`add_channel`][add_channel] takes a generic, `C`, which you should
+//! set as the type of the channel you are trying to add. In this case, our
+//! channel is named `MyChannel`, so we would do `add_channel`.
+//! This function also takes the configuration of the channel. This is where
+//! you put in the `ChannelConfiguration` you defined.
+//!
+//! ```no_run
+//! # use bevy::prelude::*;
+//! # use bevy_stardust::prelude::*;
+//! #
+//! pub struct MyChannel;
//!
//! fn main() {
+//! // Normal Bevy app setup
//! let mut app = App::new();
+//! app.add_plugins(DefaultPlugins);
//!
-//! app.add_plugins((DefaultPlugins, StardustPlugin));
+//! // StardustPlugin must be added
+//! app.add_plugins(StardustPlugin);
//!
//! app.add_channel::(ChannelConfiguration {
-//! reliable: ReliabilityGuarantee::Unreliable,
-//! ordered: OrderingGuarantee::Unordered,
-//! fragmented: false,
-//! string_size: 0..=16,
+//! consistency: MessageConsistency::ReliableOrdered,
+//! priority: 128,
+//! });
+//! }
+//! ```
+//!
+//! Note that channels must be added *after* [`StardustPlugin`] is added,
+//! and *before* `StardustPlugin` [cleans up][Plugin::cleanup]. Channel
+//! insertion order also matters: you must make sure all calls to
+//! [`add_channel`][add_channel] are in a deterministic order.
+//! This includes channels registered by plugins.
+//! This is an unfortunate limitation that will (hopefully) be lifted in future.
+//!
+//! [messages]: crate::messages
+//! [`StardustPlugin`]: crate::plugin::StardustPlugin
+//! [add_channel]: ChannelSetupAppExt::add_channel
+//!
+//! # Advanced channels
+//! Only compile-time information is used from channel types.
+//! Your types will never be instantiated. While most often,
+//! you will use a zero-sized newtype like a field-less struct.
+//!
+//! ## Reusing types
+//! Let's say you have an event called `MovementEvent`, that is
+//! used to inform systems of player movements. If you want to
+//! send this event to other peers, you could create a new type
+//! and use it in `add_channel`, or you could use `MovementEvent`.
+//!
+//! ```no_run
+//! # use bevy::prelude::*;
+//! # use bevy_stardust::prelude::*;
+//! #
+//! #[derive(Event)]
+//! struct MovementEvent {
+//! change: Vec3,
+//! }
+//!
+//! fn main() {
+//! // App setup as detailed earlier
+//! let mut app = App::new();
+//! app.add_plugins((DefaultPlugins, StardustPlugin));
+//!
+//! // Register MovementEvent as an event
+//! app.add_event::();
+//!
+//! // And register it as a channel
+//! app.add_channel::(ChannelConfiguration {
+//! consistency: MessageConsistency::UnreliableUnordered,
+//! priority: 32,
//! });
//! }
//! ```
+//!
+//! At this point, you can introduce a system in `NetworkRecv::Read`
+//! to turn the messages on your `MovementEvent` channel into events
+//! in `Events`, which other systems can read from.
+//! This can be useful to make your code less cluttered, especially
+//! for replicated events like this, but there are times where it's
+//! not suitable. It's up to you to decide when you want to use this.
+//!
+//! You can also use generic type parameters as an organisational tool.
+//! As long as the type still implements `Channel`, it's just fine!
+//! ```no_run
+//! # use bevy_stardust::prelude::*;
+//! # use std::marker::PhantomData;
+//! #
+//! pub struct MyGenericChannel(PhantomData);
+//! ```
mod config;
+mod extension;
mod id;
+mod params;
mod registry;
-mod extension;
-pub use config::*;
-pub use id::*;
-pub use registry::*;
+pub use config::{ChannelConfiguration, MessageConsistency};
+pub use id::{Channel, ChannelId, ToChannelId};
+pub use registry::{ChannelRegistry, ChannelMetadata};
+pub use params::{Channels, ChannelData};
pub use extension::ChannelSetupAppExt;
-use std::sync::Arc;
use bevy::prelude::*;
+use registry::ChannelRegistryBuilder;
-pub(super) fn channel_build(app: &mut App) {
- // Create setup channel registry
- app.insert_resource(registry::SetupChannelRegistry(Box::new(ChannelRegistryInner::new())));
-
+pub(crate) fn plugin_build(app: &mut App) {
+ app.insert_resource(ChannelRegistryBuilder(ChannelRegistry::new()));
}
-pub(super) fn channel_finish(app: &mut App) {
- // Remove SetupChannelRegistry and put the inner into an Arc inside ChannelRegistry
- // This dramatically improves
- let registry = app.world.remove_resource::().unwrap();
- app.insert_resource(FinishedChannelRegistry(Arc::from(registry.0)));
+pub(crate) fn plugin_cleanup(app: &mut App) {
+ let world = app.world_mut();
+ let mut builder = world.remove_resource::().unwrap();
+ builder.0.channel_data.shrink_to_fit();
+ world.insert_resource(builder.finish());
}
\ No newline at end of file
diff --git a/stardust/src/channels/params.rs b/stardust/src/channels/params.rs
new file mode 100644
index 000000000..96c873a61
--- /dev/null
+++ b/stardust/src/channels/params.rs
@@ -0,0 +1,136 @@
+use std::{any::TypeId, marker::PhantomData, sync::Arc, ops::Deref};
+use bevy::ecs::{component::Tick, system::{SystemMeta, SystemParam}, world::unsafe_world_cell::UnsafeWorldCell};
+use super::registry::*;
+use super::*;
+
+/// Access to registered channels and channel data.
+///
+/// This is only available after [`StardustPlugin::cleanup`](crate::plugin::StardustPlugin::cleanup) is called.
+/// Attempts to access this type before this point will cause a panic.
+///
+/// For asynchronous contexts, [`clone_arc`](Self::clone_arc) can be used
+/// to get a reference to the registry that will exist longer than the system.
+/// This can be used in the [`ComputeTaskPool`] or [`AsyncComputeTaskPool`].
+///
+/// [`StardustPlugin`]: crate::plugin::StardustPlugin
+/// [`cleanup`]: bevy::app::Plugin::cleanup
+/// [`ComputeTaskPool`]: bevy::tasks::ComputeTaskPool
+/// [`AsyncComputeTaskPool`]: bevy::tasks::AsyncComputeTaskPool
+#[derive(SystemParam)]
+pub struct Channels<'w> {
+ // This hides the ChannelRegistryFinished type so that it
+ // cannot be removed from the World, which would be bad
+ finished: Res<'w, ChannelRegistryFinished>,
+}
+
+impl<'w> Channels<'w> {
+ /// Returns an `Arc` to the underlying `ChannelRegistry`.
+ /// This allows the registry to be used in asynchronous contexts.
+ pub fn clone_arc(&self) -> Arc {
+ self.finished.0.clone()
+ }
+}
+
+impl<'a> AsRef for Channels<'a> {
+ #[inline]
+ fn as_ref(&self) -> &ChannelRegistry {
+ &self.finished.0
+ }
+}
+
+impl<'a> Deref for Channels<'a> {
+ type Target = ChannelRegistry;
+
+ #[inline]
+ fn deref(&self) -> &Self::Target {
+ self.as_ref()
+ }
+}
+
+/// A `SystemParam` that provides rapid, cached access to data about channel `C`.
+///
+/// Unlike [`Channels`], `ChannelData` accesses data when the system is run by the scheduler.
+/// The data that `Channels` returns is cached, allowing fast repeat access.
+/// Using `ChannelData` is more convenient if `C` is known at compile time.
+///
+/// # Panics
+/// Panics when used as a [`SystemParam`] if `C` is not registered.
+///
+/// If `C` may not be registered, use [`Channels`] instead.
+pub struct ChannelData<'a, C: Channel> {
+ registration: &'a Registration,
+ phantom: PhantomData,
+}
+
+impl ChannelData<'_, C> {
+ /// Returns the [`ChannelId`] assigned to `C`.
+ #[inline]
+ pub fn id(&self) -> ChannelId {
+ self.metadata().channel_id
+ }
+
+ /// Returns the [`ChannelMetadata`] of channel `C`.
+ #[inline]
+ pub fn metadata(&self) -> &ChannelMetadata {
+ &self.registration.metadata
+ }
+
+ /// Returns the [`ChannelConfiguration`] of channel `C`.
+ #[inline]
+ pub fn config(&self) -> &ChannelConfiguration {
+ &self.registration.config
+ }
+}
+
+impl<'a, C: Channel> Clone for ChannelData<'a, C> {
+ fn clone(&self) -> ChannelData<'a, C> {
+ Self {
+ registration: self.registration,
+ phantom: PhantomData,
+ }
+ }
+}
+
+impl<'a, C: Channel> Copy for ChannelData<'a, C> {}
+
+pub struct ChannelDataState {
+ // Directly use the State type from the SystemParam implementation
+ // This avoids type errors if it's changed in future. It shouldn't, but eh.
+ // The lifetime should be irrelevant here. If it isn't, a type error is thrown.
+ res_state: as SystemParam>::State,
+ channel: ChannelId,
+}
+
+unsafe impl SystemParam for ChannelData<'_, C>
+where
+ C: Channel,
+{
+ type State = ChannelDataState;
+ type Item<'world, 'state> = ChannelData<'world, C>;
+
+ fn init_state(world: &mut World, system_meta: &mut SystemMeta) -> Self::State {
+ let res_state = as SystemParam>::init_state(world, system_meta);
+ let registry = world.resource::();
+ let channel = registry.id(TypeId::of::()).unwrap();
+ return ChannelDataState { res_state, channel };
+ }
+
+ unsafe fn get_param<'world, 'state>(
+ state: &'state mut Self::State,
+ system_meta: &SystemMeta,
+ world: UnsafeWorldCell<'world>,
+ change_tick: Tick,
+ ) -> Self::Item<'world, 'state> {
+ let registry = as SystemParam>::get_param(
+ &mut state.res_state,
+ system_meta,
+ world,
+ change_tick
+ ).into_inner();
+
+ return ChannelData {
+ registration: registry.get_registration(state.channel).unwrap(),
+ phantom: PhantomData,
+ }
+ }
+}
\ No newline at end of file
diff --git a/stardust/src/channels/registry.rs b/stardust/src/channels/registry.rs
index c983a2791..cd026c162 100644
--- a/stardust/src/channels/registry.rs
+++ b/stardust/src/channels/registry.rs
@@ -1,112 +1,39 @@
//! The channel registry.
-use std::{any::TypeId, collections::BTreeMap, ops::{Deref, DerefMut}, sync::Arc};
-use bevy::{ecs::{component::ComponentId, system::SystemParam}, prelude::*};
+use std::{any::{type_name, TypeId}, collections::BTreeMap, ops::Deref, sync::Arc};
+use bevy::prelude::*;
use crate::prelude::ChannelConfiguration;
use super::{id::{Channel, ChannelId}, ToChannelId};
-/// Mutable access to the channel registry, only available during app setup.
#[derive(Resource)]
-pub(crate) struct SetupChannelRegistry(pub(crate) Box);
+pub(super) struct ChannelRegistryBuilder(pub ChannelRegistry);
-impl Deref for SetupChannelRegistry {
- type Target = ChannelRegistryInner;
-
- #[inline]
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl DerefMut for SetupChannelRegistry {
- #[inline]
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
- }
-}
-
-/// Immutable access to the channel registry, only available after app setup.
-///
-/// In almost all cases, you should just use the [`ChannelRegistry`] systemparam.
-/// However, this type can be cloned and will point to the same inner value.
-/// This makes it useful for asynchronous programming, like in futures.
-#[derive(Resource, Clone)]
-pub struct FinishedChannelRegistry(pub(crate) Arc);
-
-impl Deref for FinishedChannelRegistry {
- type Target = ChannelRegistryInner;
-
- #[inline]
- fn deref(&self) -> &Self::Target {
- &self.0
+impl ChannelRegistryBuilder {
+ pub fn finish(self) -> ChannelRegistryFinished {
+ ChannelRegistryFinished(Arc::new(self.0))
}
}
-/// Access to the configuration of registered channels, at any point.
-///
-/// If you're writing async code, you might want to look at [`FinishedChannelRegistry`].
-pub struct ChannelRegistry<'a>(&'a ChannelRegistryInner);
-
-unsafe impl<'a> SystemParam for ChannelRegistry<'a> {
- type State = (ComponentId, ComponentId);
- type Item<'w, 's> = ChannelRegistry<'w>;
-
- fn init_state(world: &mut World, system_meta: &mut bevy::ecs::system::SystemMeta) -> Self::State {
- // SAFETY: Since we can't register accesses, we do it through Res which can
- (
- as SystemParam>::init_state(world, system_meta),
- as SystemParam>::init_state(world, system_meta),
- )
- }
-
- unsafe fn get_param<'w, 's>(
- state: &'s mut Self::State,
- _system_meta: &bevy::ecs::system::SystemMeta,
- world: bevy::ecs::world::unsafe_world_cell::UnsafeWorldCell<'w>,
- _change_tick: bevy::ecs::component::Tick,
- ) -> Self::Item<'w, 's> {
- if let Some(ptr) = world.get_resource_by_id(state.0) {
- return ChannelRegistry(ptr.deref::().0.as_ref());
- }
-
- if let Some(ptr) = world.get_resource_by_id(state.1) {
- return ChannelRegistry(ptr.deref::().0.as_ref());
- }
-
- panic!("Neither SetupChannelRegistry or FinishedChannelRegistry were present when attempting to create ChannelRegistry")
- }
-}
+#[derive(Resource)]
+pub(super) struct ChannelRegistryFinished(pub Arc);
-impl ChannelRegistry<'_> {
- /// Gets the id fom the `ToChannelId` implementation.
- #[inline]
- pub fn channel_id(&self, from: impl ToChannelId) -> Option {
- self.0.channel_id(from)
- }
+impl Deref for ChannelRegistryFinished {
+ type Target = ChannelRegistry;
- /// Gets the channel configuration for `id`.
#[inline]
- pub fn channel_config(&self, id: impl ToChannelId) -> Option<&ChannelData> {
- self.0.channel_config(id)
- }
-}
-
-impl Deref for ChannelRegistry<'_> {
- type Target = ChannelRegistryInner;
-
fn deref(&self) -> &Self::Target {
&self.0
}
}
-/// Stores channel configuration data. Accessible through the [`ChannelRegistry`] system parameter.
-pub struct ChannelRegistryInner {
+/// The inner registry
+pub struct ChannelRegistry {
pub(super) channel_type_ids: BTreeMap,
- pub(super) channel_data: Vec,
+ pub(super) channel_data: Vec,
}
-impl ChannelRegistryInner {
- pub(in crate) fn new() -> Self {
+impl ChannelRegistry {
+ pub(super) fn new() -> Self {
Self {
channel_type_ids: BTreeMap::new(),
channel_data: vec![],
@@ -124,32 +51,22 @@ impl ChannelRegistryInner {
// Check the channel doesn't already exist
let type_id = TypeId::of::();
- #[cfg(feature="reflect")]
- let type_path = C::type_path();
+ let type_name = type_name::();
if self.channel_type_ids.get(&type_id).is_some() {
- #[cfg(feature="reflect")]
- panic!("A channel was registered twice: {type_path}");
- #[cfg(not(feature="reflect"))]
panic!("A channel was registered twice: {}", std::any::type_name::());
}
// Add to map
- let channel_id = ChannelId::try_from(self.channel_count()).unwrap();
+ let channel_id = ChannelId::try_from(self.count()).unwrap();
self.channel_type_ids.insert(type_id, channel_id);
- #[cfg(feature="reflect")]
- self.channel_data.push(ChannelData {
- type_id,
- type_path,
- channel_id,
-
- config,
- });
-
- #[cfg(not(feature="reflect"))]
- self.channel_data.push(ChannelData {
- type_id,
- channel_id,
+ self.channel_data.push(Registration {
+ metadata: ChannelMetadata {
+ type_id,
+ type_name,
+ channel_id,
+ _hidden: (),
+ },
config,
});
@@ -159,33 +76,37 @@ impl ChannelRegistryInner {
/// Gets the id from the `ToChannelId` implementation.
#[inline]
- pub fn channel_id(&self, value: impl ToChannelId) -> Option {
+ pub fn id(&self, value: impl ToChannelId) -> Option {
value.to_channel_id(self)
}
+ pub(super) fn get_registration(&self, id: impl ToChannelId) -> Option<&Registration> {
+ self.channel_data
+ .get(Into::::into(id.to_channel_id(self)?))
+ }
+
+ /// Returns a reference to the channel metadata.
+ pub fn metadata(&self, id: impl ToChannelId) -> Option<&ChannelMetadata> {
+ self.get_registration(id).map(|v| &v.metadata)
+ }
+
/// Returns a reference to the channel configuration.
- pub fn channel_config(&self, id: impl ToChannelId) -> Option<&ChannelData> {
- self.channel_data.get(Into::::into(id.to_channel_id(self)?))
+ pub fn config(&self, id: impl ToChannelId) -> Option<&ChannelConfiguration> {
+ self.get_registration(id).map(|v| &v.config)
}
/// Returns whether the channel exists.
- pub fn channel_exists(&self, id: ChannelId) -> bool {
+ pub fn exists(&self, id: ChannelId) -> bool {
self.channel_data.len() >= Into::::into(id)
}
/// Returns how many channels currently exist.
- pub fn channel_count(&self) -> u32 {
+ pub fn count(&self) -> u32 {
TryInto::::try_into(self.channel_data.len()).unwrap()
}
-
- /// Returns an iterator of all existing channel ids.
- pub fn channel_ids(&self) -> impl Iterator- {
- (0..self.channel_count()).into_iter()
- .map(|f| ChannelId::try_from(f).unwrap())
- }
}
-impl Default for ChannelRegistryInner {
+impl Default for ChannelRegistry {
fn default() -> Self {
Self {
channel_type_ids: BTreeMap::new(),
@@ -194,27 +115,29 @@ impl Default for ChannelRegistryInner {
}
}
-/// Channel information generated when `add_channel` is run.
-pub struct ChannelData {
+// AsRef is not reflexive, so we must implement it here
+// https://doc.rust-lang.org/std/convert/trait.AsRef.html#reflexivity
+impl AsRef
for ChannelRegistry {
+ #[inline]
+ fn as_ref(&self) -> &ChannelRegistry { self }
+}
+
+/// Metadata about a channel, generated during channel registration.
+pub struct ChannelMetadata {
/// The channel's `TypeId`.
pub type_id: TypeId,
- /// The channel's `TypePath` (from `bevy::reflect`)
- #[cfg(feature="reflect")]
- pub type_path: &'static str,
+ /// The channel's type name, from the `Any` trait.
+ /// This is only useful for debugging, and is not stable across compilation.
+ pub type_name: &'static str,
/// The channel's sequential ID assigned by the registry.
pub channel_id: ChannelId,
- /// The config of the channel.
- /// Since `ChannelData` implements `Deref` for `ChannelConfiguration`, this is just clutter.
- config: ChannelConfiguration,
+ _hidden: (),
}
-impl std::ops::Deref for ChannelData {
- type Target = ChannelConfiguration;
-
- fn deref(&self) -> &Self::Target {
- &self.config
- }
+pub(super) struct Registration {
+ pub metadata: ChannelMetadata,
+ pub config: ChannelConfiguration,
}
\ No newline at end of file
diff --git a/stardust/src/connections/debug.rs b/stardust/src/connections/debug.rs
deleted file mode 100644
index 45b61c474..000000000
--- a/stardust/src/connections/debug.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-use bevy::prelude::*;
-
-/// Used to intentionally reduce the performance of peers for testing purposes.
-/// If applied to a `NetworkPeer` entity, reduces performance for that peer specifically.
-#[derive(Debug, Component)]
-pub struct NetworkPerformanceReduction {
- /// Chance to drop a packet when sending, if the transport is packet-based.
- /// This chance is from `0.0` (never) to `1.0` (always), with `0.5` dropping 50% of the time.
- pub packet_drop_chance: f32,
-
- /// Chance to mangle or otherwise invalidate a packet, if the transport is packet based.
- /// This chance is from `0.0` (never) to `1.0` (always), with `0.5` mangling 50% of the time.
- /// The degree to which the packet is mangled is up to the transport layer.
- pub packet_mangle_chance: f32,
-
- /// Artificial delay in transmitting, in milliseconds.
- /// 1000 milliseconds is the same as one second.
- pub transmit_delay_millis: u32,
-}
\ No newline at end of file
diff --git a/stardust/src/connections/events.rs b/stardust/src/connections/events.rs
new file mode 100644
index 000000000..84e538418
--- /dev/null
+++ b/stardust/src/connections/events.rs
@@ -0,0 +1,156 @@
+//! Connection events.
+
+use std::{fmt::Display, sync::Arc, time::Duration};
+use bevy::prelude::*;
+
+/// Sent by transport layers when a peer is connecting.
+///
+/// This event should be "followed up" by another event,
+/// such as [`PeerConnectedEvent`] or [`PeerDisconnectedEvent`].
+#[derive(Event)]
+pub struct PeerConnectingEvent {
+ /// The peer that is connecting.
+ pub peer: Entity,
+}
+
+/// Sent by transport layers when a peer has connected.
+///
+/// This may be sent after [`PeerConnectingEvent`],
+/// but can also occur on its own without any preceding events.
+#[derive(Event)]
+pub struct PeerConnectedEvent {
+ /// The peer that has connected.
+ pub peer: Entity,
+}
+
+/// Sent by the application to tell a transport layer to disconnect a peer.
+#[derive(Debug, Clone, Event)]
+pub struct DisconnectPeerEvent {
+ /// The peer to be disconnected.
+ pub peer: Entity,
+
+ /// The reason for disconnection.
+ pub reason: DisconnectReason,
+
+ /// A human-readable string associated with the disconnection.
+ /// This is useful to communicate information that isn't in the reason enum,
+ /// such as a ban reason or detailed error code.
+ pub comment: Option>,
+
+ /// Whether or not the peer should be disconnected immediately.
+ /// This may cause data loss if set to `true`, and should be used sparingly.
+ pub force: bool,
+}
+
+/// Sent by transport layers when a peer begins disconnecting.
+///
+/// This event should be followed up with [`PeerDisconnectedEvent`],
+/// which includes the reason for the disconnection.
+#[derive(Debug, Clone, Event)]
+pub struct PeerDisconnectingEvent {
+ /// The peer that is disconnecting.
+ pub peer: Entity,
+}
+
+/// Sent by transport layers when a peer is fully disconnected.
+///
+/// This may occur after [`PeerConnectingEvent`] or after [`PeerDisconnectingEvent`],
+/// but can also occur on its own without any preceding events.
+#[derive(Debug, Clone, Event)]
+pub struct PeerDisconnectedEvent {
+ /// The peer that disconnected.
+ pub peer: Entity,
+
+ /// The reason for disconnection, if one is available.
+ pub reason: DisconnectReason,
+
+ /// A human-readable string associated with the disconnection.
+ /// This is useful to communicate information that isn't in the reason enum,
+ /// such as a ban reason or detailed error code.
+ pub comment: Option>,
+}
+
+impl Display for PeerDisconnectedEvent {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_fmt(format_args!("peer {:?} disconnected: {}", self.peer, self.reason))?;
+
+ // If a comment is present, show it
+ if let Some(comment) = &self.comment {
+ f.write_fmt(format_args!(" ({comment})"))?;
+ }
+
+ return Ok(())
+ }
+}
+
+/// A reason for disconnection.
+#[derive(Debug, Clone, Default)]
+#[non_exhaustive]
+pub enum DisconnectReason {
+ /// No reason given.
+ #[default]
+ Unspecified,
+
+ /// The connection was finished gracefully,
+ /// and both sides terminated with no data loss.
+ Finished,
+
+ /// The peer failed some kind of verification check for protocol,
+ /// such as checking game versions, or game modifications.
+ /// This most often will occur during a handshake.
+ FailedVerification,
+
+ /// The peer failed some kind of authentication check for their identity,
+ /// such as an account ID, key exchange, or a TLS certificate.
+ /// This most often will occur during a handshake.
+ FailedAuthentication,
+
+ /// The connection was refused by the remote peer,
+ /// as their acceptance would exceed the limit for some resource.
+ ///
+ /// This reason is returned for instances such as a
+ /// server at capacity, or a full lobby in a party game.
+ ResourceCapacity,
+
+ /// The peer stopped responding.
+ TimedOut {
+ /// The duration between the last data received from the peer, and the time of disconnection.
+ after: Duration,
+ },
+
+ /// The transport layer identified the peer as violating
+ /// its protocol, and was subsequently disconnected.
+ ProtocolViolation,
+
+ /// The peer behaved unexpectedly, and was disconnected by the application.
+ /// This is useful for instances such as kicking buggy or hacked clients.
+ Misbehaving,
+}
+
+impl Display for DisconnectReason {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ use DisconnectReason::*;
+
+ match self {
+ Unspecified => f.write_str("no reason given"),
+ Finished => f.write_str("finished"),
+ FailedVerification => f.write_str("failed verification"),
+ FailedAuthentication => f.write_str("failed authentication"),
+ ResourceCapacity => f.write_str("at capacity"),
+ ProtocolViolation => f.write_str("transport protocol violation"),
+ Misbehaving => f.write_str("peer misbehaving"),
+
+ TimedOut { after } => {
+ let (secs, millis) = (after.as_secs(), after.subsec_millis());
+ if (secs, millis) == (0, 0) { return f.write_str("timed out immediately"); }
+
+ f.write_str("timed out after ")?;
+ if secs != 0 { f.write_fmt(format_args!("{secs} seconds "))?; }
+ if secs != 0 && millis != 0 { f.write_str("and ")?; }
+ if millis != 0 { f.write_fmt(format_args!("{millis} milliseconds"))?; }
+
+ return Ok(())
+ },
+ }
+ }
+}
\ No newline at end of file
diff --git a/stardust/src/connections/groups.rs b/stardust/src/connections/groups.rs
deleted file mode 100644
index 8ed87326d..000000000
--- a/stardust/src/connections/groups.rs
+++ /dev/null
@@ -1,44 +0,0 @@
-//! Organisation of network peers.
-
-use bevy::prelude::*;
-use smallvec::SmallVec;
-
-/// A collection of network peers, used for organisational purposes.
-///
-/// This can be used for anything, such as teams of players, rooms for replication, or administrative permissions.
-#[derive(Debug, Component)]
-pub struct NetworkGroup(pub(crate) SmallVec<[Entity; 8]>);
-
-impl Default for NetworkGroup {
- fn default() -> Self {
- Self(SmallVec::default())
- }
-}
-
-impl NetworkGroup {
- /// Adds the peer to the network group.
- /// Does nothing if the peer is already included.
- pub fn add(&mut self, peer: Entity) {
- match self.0.binary_search(&peer) {
- Ok(_) => {},
- Err(idx) => self.0.insert(idx, peer),
- }
- }
-
- /// Removes the peer from the network group.
- /// Does nothing if the peer isn't present.
- pub fn remove(&mut self, peer: Entity) {
- match self.0.binary_search(&peer) {
- Ok(idx) => { self.0.remove(idx); },
- Err(_) => {},
- }
- }
-
- /// Returns `true` if the peer is part of the network group.
- pub fn contains(&self, peer: Entity) -> bool {
- match self.0.binary_search(&peer) {
- Ok(_) => true,
- Err(_) => false,
- }
- }
-}
\ No newline at end of file
diff --git a/stardust/src/connections/lifestage.rs b/stardust/src/connections/lifestage.rs
new file mode 100644
index 000000000..4c02ed901
--- /dev/null
+++ b/stardust/src/connections/lifestage.rs
@@ -0,0 +1,57 @@
+use bevy::{ecs::{query::{QueryData, QueryFilter, WorldQuery}, storage::TableRow}, prelude::*};
+
+/// The lifestage of a connection.
+///
+/// This exists to model the average lifecycle of a connection, from an initial handshake to being disconnected.
+/// An `Ord` implementation is provided, with variants being 'greater' if they're later in the model lifecycle.
+#[derive(Debug, Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Reflect)]
+#[reflect(Debug, Component, PartialEq)]
+#[non_exhaustive]
+pub enum PeerLifestage {
+ /// Midway through a [handshake].
+ /// Messages sent to peers in this stage will likely be ignored.
+ ///
+ /// [handshake]: https://en.wikipedia.org/wiki/Handshake_(computing)
+ Handshaking,
+
+ /// Fully connected and communicating normally.
+ Established,
+
+ /// In the process of closing the connection.
+ ///
+ /// This step may be skipped and peers will jump directly to the `Closed` stage from any other variant.
+ Closing,
+
+ /// The connection is closed.
+ Closed,
+}
+
+/// A [`QueryFilter`] for entities in the [`Established`](PeerLifestage::Established) lifestage.
+///
+/// ```rust
+/// # use bevy::prelude::*;
+/// # use bevy_stardust::prelude::*;
+/// #
+/// fn my_system(query: Query<&Peer, Established>) {
+/// for peer in &query {
+/// println!("Hello, world!");
+/// }
+/// }
+/// ```
+#[derive(QueryData)]
+pub struct Established<'w> {
+ lifestage: Option<&'w PeerLifestage>,
+}
+
+impl<'w> QueryFilter for Established<'w> {
+ const IS_ARCHETYPAL: bool = false;
+
+ unsafe fn filter_fetch(
+ fetch: &mut Self::Fetch<'_>,
+ entity: Entity,
+ table_row: TableRow,
+ ) -> bool {
+ Self::fetch(fetch, entity, table_row).lifestage
+ .is_some_and(|e| *e == PeerLifestage::Established)
+ }
+}
\ No newline at end of file
diff --git a/stardust/src/connections/messages.rs b/stardust/src/connections/messages.rs
new file mode 100644
index 000000000..08825f5af
--- /dev/null
+++ b/stardust/src/connections/messages.rs
@@ -0,0 +1,114 @@
+use std::marker::PhantomData;
+use bevy::prelude::*;
+use crate::{channels::ChannelId, messages::*};
+use super::Peer;
+
+/// A message queue for a [peer entity], exposing a subset of [`MessageQueue`]'s API.
+///
+/// [`PeerMessages`] has a generic `D`, which defines its [direction].
+///
+/// [`PeerMessages`] components are cleared automatically in the [`NetworkSend::Clear`] system set.
+/// Unread messages will be discarded unless the [`Message`] objects are cloned.
+///
+/// [peer entity]: crate::connections
+/// [direction]: crate::messages::NetDirection
+/// [`NetworkSend::Clear`]: crate::scheduling::NetworkSend::Clear
+#[derive(Default, Component)]
+pub struct PeerMessages {
+ queue: MessageQueue,
+ phantom: PhantomData,
+}
+
+impl PeerMessages {
+ /// Creates a new [`PeerMessages`].
+ pub fn new() -> Self {
+ Self {
+ queue: MessageQueue::new(),
+ phantom: PhantomData,
+ }
+ }
+
+ /// Returns the total number of messages stored in the queue.
+ #[inline]
+ pub fn count(&self) -> usize {
+ self.queue.count()
+ }
+
+ /// Returns the sum of bytes from all messages in the queue.
+ #[inline]
+ pub fn bytes(&self) -> usize {
+ self.queue.bytes()
+ }
+
+ /// Pushes a single message to the queue.
+ #[inline]
+ pub fn push_one(&mut self, message: ChannelMessage) {
+ self.queue.push_one(message);
+ }
+
+ /// Pushes many messages from `iter` to the queue.
+ /// This can be faster than calling [`push_one`](Self::push_one) repeatedly.
+ #[inline]
+ pub fn push_many(&mut self, iter: I)
+ where
+ I: IntoIterator- ,
+ {
+ self.queue.push_many(iter);
+ }
+
+ /// Pushes many messages from `iter` to a single channel.
+ /// This can be faster than calling [`push_one`](Self::push_one) or [`push_many`](Self::push_many) repeatedly.
+ #[inline]
+ pub fn push_channel
(&mut self, channel: ChannelId, iter: I)
+ where
+ I: IntoIterator- ,
+ {
+ self.queue.push_channel(channel, iter);
+ }
+
+ /// Returns an iterator over channels, and their associated queues.
+ #[inline]
+ pub fn iter(&self) -> ChannelIter {
+ self.queue.iter()
+ }
+
+ /// Returns an iterator over all messages in a specific channel.
+ #[inline]
+ pub fn iter_channel(&self, channel: ChannelId) -> MessageIter {
+ self.queue.iter_channel(channel)
+ }
+}
+
+impl<'a, D: MessageDirection> IntoIterator for &'a PeerMessages
{
+ type Item = <&'a MessageQueue as IntoIterator>::Item;
+ type IntoIter = <&'a MessageQueue as IntoIterator>::IntoIter;
+
+ #[inline]
+ fn into_iter(self) -> Self::IntoIter {
+ self.queue.into_iter()
+ }
+}
+
+impl AsRef for PeerMessages {
+ /// Borrows the inner [`MessageQueue`].
+ #[inline]
+ fn as_ref(&self) -> &MessageQueue {
+ &self.queue
+ }
+}
+
+impl AsMut for PeerMessages {
+ /// Mutably borrows the inner [`MessageQueue`].
+ #[inline]
+ fn as_mut(&mut self) -> &mut MessageQueue {
+ &mut self.queue
+ }
+}
+
+pub(crate) fn clear_message_queues_system(
+ mut instances: Query<&mut PeerMessages, With>,
+) {
+ for mut messages in instances.iter_mut() {
+ messages.queue.clear()
+ }
+}
\ No newline at end of file
diff --git a/stardust/src/connections/mod.rs b/stardust/src/connections/mod.rs
index a9fa59d5d..4e717a57b 100644
--- a/stardust/src/connections/mod.rs
+++ b/stardust/src/connections/mod.rs
@@ -1,13 +1,35 @@
-//! Connection-related functionality.
+//! Virtual connections.
+//!
+//! In Stardust, a virtual connection is represented as an entity
+//! with the [`Peer`] component, referred to as a **peer entity**.
+//! Peer entities don't do anything on their own. Instead, their
+//! behavior is defined by new components.
+//!
+//! # I/O and Messaging
+//! Peers have an I/O abstraction called [`PeerMessages`] that
+//! acts as a queue of messages, bridging the gap between game systems
+//! and transport layers. For example, `PeerMessages` is used
+//! by transport layers to queue unread messages, which the application
+//! and other plugins can use to read incoming messages.
+//!
+//! For more information about messaging, see the [messages module](crate::messages).
+//!
+//! # Additional Data
+//! The `Peer` component by itself does not store much data.
+//! Instead, that's left up to additional components.
+//! Components that store peer-related data on peer entities
+//! are prefixed with `Peer`, such as [`PeerUid`].
-mod debug;
-mod groups;
+mod lifestage;
+mod messages;
mod peer;
-mod security;
+mod stats;
-pub(crate) mod systems;
+pub(crate) use messages::clear_message_queues_system;
-pub use debug::*;
-pub use groups::*;
-pub use peer::*;
-pub use security::*;
\ No newline at end of file
+pub mod events;
+
+pub use messages::PeerMessages;
+pub use peer::{Peer, PeerAddress, PeerUid};
+pub use stats::PeerRtt;
+pub use lifestage::{PeerLifestage, Established};
\ No newline at end of file
diff --git a/stardust/src/connections/peer.rs b/stardust/src/connections/peer.rs
index 4da6cc0d1..dae8c6c05 100644
--- a/stardust/src/connections/peer.rs
+++ b/stardust/src/connections/peer.rs
@@ -1,120 +1,59 @@
-//! "Peers" aka other computers over the network.
-
-use std::time::Instant;
+use std::{net::IpAddr, time::Instant};
use bevy::prelude::*;
-/// An active connection to a remote peer.
-///
-/// The term 'peer' is used interchangeably for any kind of connection to another instance of the application.
-/// If you're writing a star-topology system, you can treat these as servers and clients.
-/// If you're writing a mesh-topology system, you can treat these as another peer in the mesh.
-///
-/// The `NetworkPeer` component is intended to be queried freely, but not created by the developer.
-/// Instead, it should be managed by transport layer plugins.
-///
-/// Entities with this component may also have the following components:
-/// - [`NetworkMessages`](crate::messages::NetworkMessages), relating to messages
-/// - [`NetworkPeerUid`], relating to persistent data
-/// - [`NetworkPeerLifestage`], relating to connection state
-/// - [`SecurityLevel`](super::security::SecurityLevel), relating to encryption
-#[derive(Debug, Component)]
-#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))]
-pub struct NetworkPeer {
+/// A component for entities that represent a virtual connection.
+#[derive(Debug, Component, Reflect)]
+#[reflect(Debug, Component)]
+#[non_exhaustive]
+pub struct Peer {
/// The point in time this peer was added to the `World`.
pub joined: Instant,
-
- /// The quality of the connection, from `0.0` to `1.0`.
- /// This is subjective and defined by the transport layer.
- /// `None` means a value is not provided.
- pub quality: Option,
-
- /// Round-trip time estimate, in milliseconds.
- pub ping: u32,
-
- disconnect_requested: bool,
}
-impl NetworkPeer {
+impl Peer {
/// Creates the component in the `Handshaking` state.
pub fn new() -> Self {
Self {
joined: Instant::now(),
- quality: None,
- ping: 0,
- disconnect_requested: false,
}
}
-
- /// Signals to the transport layer to disconnect the peer.
- /// This operation cannot be undone.
- pub fn disconnect(&mut self) {
- self.disconnect_requested = true
- }
-
- /// Returns `true` if [`disconnect`] has been used.
- /// This is intended for use by transport layers, and you should use [`NetworkPeerLifestage`] instead.
- pub fn disconnect_requested(&self) -> bool {
- self.disconnect_requested
- }
}
-/// The lifestage of a connection.
-///
-/// This exists to model the average lifecycle of a connection, from an initial handshake to being disconnected.
-/// An `Ord` implementation is provided, with variants being 'greater' if they're later in the model lifecycle.
-#[derive(Debug, Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
-#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))]
-#[non_exhaustive]
-pub enum NetworkPeerLifestage {
- /// Midway through a [handshake].
- /// Messages sent to peers in this stage will likely be ignored.
- ///
- /// [handshake]: https://en.wikipedia.org/wiki/Handshake_(computing)
- Handshaking,
-
- /// Fully connected and communicating normally.
- Established,
-
- /// In the process of closing the connection.
- ///
- /// This step may be skipped and peers will jump directly to the `Closed` stage from any other variant.
- Closing,
-
- /// The connection is closed, and the entity will soon be despawned automatically.
- Closed,
-}
+/// The IP address of a network peer, if it has one.
+#[derive(Component, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PeerAddress(pub IpAddr);
-/// A unique identifier for a [`NetworkPeer`], to store persistent data across multiple connections.
+/// A unique identifier for a [`Peer`], to store persistent data across multiple connections.
/// This component should only be constructed by the app developer, but can be read by any plugins.
///
/// If you're working with another ID namespace, like UUIDs and Steam IDs, you should
/// map the ids from that space into a unique value here through some kind of associative array.
-#[derive(Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
-#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))]
-pub struct NetworkPeerUid(pub u64);
+#[derive(Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Reflect)]
+#[reflect(Debug, Component, PartialEq, Hash)]
+pub struct PeerUid(pub u64);
-impl std::fmt::Debug for NetworkPeerUid {
+impl std::fmt::Debug for PeerUid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{:X}", self.0))
}
}
-impl std::fmt::Display for NetworkPeerUid {
+impl std::fmt::Display for PeerUid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
::fmt(self, f)
}
}
-impl From for NetworkPeerUid {
+impl From for PeerUid {
#[inline]
fn from(value: u64) -> Self {
Self(value)
}
}
-impl From for u64 {
+impl From for u64 {
#[inline]
- fn from(value: NetworkPeerUid) -> Self {
+ fn from(value: PeerUid) -> Self {
value.0
}
}
\ No newline at end of file
diff --git a/stardust/src/connections/security.rs b/stardust/src/connections/security.rs
deleted file mode 100644
index ee0cb11e3..000000000
--- a/stardust/src/connections/security.rs
+++ /dev/null
@@ -1,48 +0,0 @@
-use bevy::prelude::*;
-
-/// How 'secure' a connection is.
-/// This is set by the transport layer that controls the connection.
-/// See variant documentation for specific information.
-///
-/// This type implements `Ord`, with 'greater' orderings corresponding to better security.
-///
-/// This value is set by the transport layer managing this peer.
-/// It's up to it to provide an appropriate value here.
-#[derive(Debug, Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
-#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))]
-pub enum SecurityLevel {
- /// Communication is encrypted but not authenticated, or is fully plain text.
- ///
- /// **For end users:**
- /// This kind of connection should not be used for anything that must remain secret or private.
- /// It is vulnerable to [man in the middle attacks] like reading and modifying in-flight information.
- ///
- /// [man in the middle attacks]: https://en.wikipedia.org/wiki/Man-in-the-middle_attack
- Unauthenticated,
-
- /// Communication is both encrypted and authenticated.
- ///
- /// **For end users:**
- /// - Encrypted traffic cannot be viewed by a man in the middle at any point once the handshake finishes.
- /// - You can exchange private information with the client in as much confidence as you have in your transport layers.
- ///
- /// Note that these guarantees are only valid if your transport layers are well implemented and use secure cryptography methods.
- /// Keep any cryptography-implementing transport layers up to date as much as possible, and use good judgement.
- ///
- /// Additionally, since transport layers can read any and all outgoing messages, it's up to you to verify that they're safe.
- /// Regardless, it's not a good idea to transfer something like credit card details in the first place without incredible precautions.
- /// Some things (like banking. especially banking) should be left up to the experts.
- ///
- /// **For transport layer implementors:**
- /// - For TLS, this should be set if a full chain of trust is set up.
- /// - Only TLS versions > 1.2 are acceptable (1.3 onward).
- /// - You should always use the latest version of TLS. There's not really a reason not to.
- /// - Broken or flawed cryptography methods are not suitable for this variant. Broken cryptography is as bad as no cryptography.
- /// - If in doubt, *pick a lower level.*
- ///
- /// **Examples of authenticated connections:**
- /// - [Pre-shared keys](https://en.wikipedia.org/wiki/Pre-shared_key)
- /// - [Transport Layer Security](https://en.wikipedia.org/wiki/Transport_Layer_Security)
- /// - [netcode.io](https://github.com/networkprotocol/netcode.io/blob/master/STANDARD.md)
- Authenticated,
-}
diff --git a/stardust/src/connections/stats.rs b/stardust/src/connections/stats.rs
new file mode 100644
index 000000000..b2d239f3d
--- /dev/null
+++ b/stardust/src/connections/stats.rs
@@ -0,0 +1,43 @@
+use std::{ops::{Deref, DerefMut}, time::Duration};
+use bevy::prelude::*;
+
+/// Round-trip time estimate for [peer entities].
+///
+/// Round-trip time (RTT) is the duration of time that it takes
+/// for one message to be sent to a peer, and then a response
+/// to be sent back by the recipient. This estimate is set by
+/// the transport layer managing a connection.
+///
+/// [peer entities]: crate::connections
+#[derive(Debug, Default, Clone, Copy, Component)]
+pub struct PeerRtt(pub Duration);
+
+impl Deref for PeerRtt {
+ type Target = Duration;
+
+ #[inline]
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for PeerRtt {
+ #[inline]
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+impl From for Duration {
+ #[inline]
+ fn from(value: PeerRtt) -> Self {
+ value.0
+ }
+}
+
+impl From for PeerRtt {
+ #[inline]
+ fn from(value: Duration) -> Self {
+ Self(value)
+ }
+}
\ No newline at end of file
diff --git a/stardust/src/connections/systems.rs b/stardust/src/connections/systems.rs
deleted file mode 100644
index 7a7d052c0..000000000
--- a/stardust/src/connections/systems.rs
+++ /dev/null
@@ -1,13 +0,0 @@
-use bevy::prelude::*;
-use super::{NetworkPeer, NetworkPeerLifestage};
-
-pub(crate) fn despawn_closed_connections_system(
- mut commands: Commands,
- query: Query<(Entity, &NetworkPeerLifestage), (With, Changed)>,
-) {
- for (id, stage) in query.iter() {
- if *stage == NetworkPeerLifestage::Closed {
- commands.entity(id).despawn();
- }
- }
-}
\ No newline at end of file
diff --git a/stardust/src/diagnostics/connections.rs b/stardust/src/diagnostics/connections.rs
new file mode 100644
index 000000000..7eb4e9a79
--- /dev/null
+++ b/stardust/src/diagnostics/connections.rs
@@ -0,0 +1,27 @@
+use bevy::{prelude::*, diagnostic::*};
+use crate::prelude::*;
+
+/// Adds diagnostics about connections.
+pub struct PeerDiagnosticPlugin;
+
+impl Plugin for PeerDiagnosticPlugin {
+ fn build(&self, app: &mut App) {
+ app.register_diagnostic(Diagnostic::new(Self::COUNT)
+ .with_smoothing_factor(0.0)
+ .with_max_history_length(1));
+
+ app.add_systems(Update, diagnostic_system);
+ }
+}
+
+impl PeerDiagnosticPlugin {
+ /// Diagnostic path for the amount of entities with [`Peer`].
+ pub const COUNT: DiagnosticPath = DiagnosticPath::const_new("net/core/peers/total");
+}
+
+fn diagnostic_system(
+ mut diagnostics: Diagnostics,
+ query: Query<(), With>,
+) {
+ diagnostics.add_measurement(&PeerDiagnosticPlugin::COUNT, || query.iter().count() as f64);
+}
\ No newline at end of file
diff --git a/stardust/src/diagnostics/messages.rs b/stardust/src/diagnostics/messages.rs
new file mode 100644
index 000000000..952b4f710
--- /dev/null
+++ b/stardust/src/diagnostics/messages.rs
@@ -0,0 +1,36 @@
+use bevy::{prelude::*, diagnostic::*};
+use crate::prelude::*;
+
+/// Adds diagnostics about how many messages are being sent.
+pub struct MessageCountDiagnosticsPlugin;
+
+impl Plugin for MessageCountDiagnosticsPlugin {
+ fn build(&self, app: &mut App) {
+ app.register_diagnostic(Diagnostic::new(Self::INCOMING_COUNT));
+ app.register_diagnostic(Diagnostic::new(Self::OUTGOING_COUNT));
+
+ app.add_systems(PreUpdate, diagnostic_system::.in_set(NetworkRecv::Synchronise));
+ app.add_systems(PostUpdate, diagnostic_system::.in_set(NetworkSend::Diagnostics));
+ }
+}
+
+impl MessageCountDiagnosticsPlugin {
+ /// The number of incoming messages in queues for all peers.
+ pub const INCOMING_COUNT: DiagnosticPath = DiagnosticPath::const_new("net/core/messages/outgoing");
+
+ /// The number of outgoing messages in queues for all peers.
+ pub const OUTGOING_COUNT: DiagnosticPath = DiagnosticPath::const_new("net/core/messages/outgoing");
+}
+
+fn diagnostic_system(
+ mut diagnostics: Diagnostics,
+ query: Query<&PeerMessages, With>,
+) {
+ let count = query.iter().map(|m| m.count()).sum::() as f64;
+ let path = match D::net_dir() {
+ NetDirection::Outgoing => MessageCountDiagnosticsPlugin::OUTGOING_COUNT,
+ NetDirection::Incoming => MessageCountDiagnosticsPlugin::INCOMING_COUNT,
+ };
+
+ diagnostics.add_measurement(&path, || count as f64);
+}
\ No newline at end of file
diff --git a/stardust/src/diagnostics/mod.rs b/stardust/src/diagnostics/mod.rs
new file mode 100644
index 000000000..a0ee5be2f
--- /dev/null
+++ b/stardust/src/diagnostics/mod.rs
@@ -0,0 +1,12 @@
+//! Diagnostic tools for Stardust.
+//! These are generic, and transport layers may provide better tools for their peers specifically.
+
+mod connections;
+mod messages;
+mod slowdown;
+mod stats;
+
+pub use connections::PeerDiagnosticPlugin;
+pub use messages::MessageCountDiagnosticsPlugin;
+pub use slowdown::{DropPackets, SimulateLatency};
+pub use stats::PeerStats;
\ No newline at end of file
diff --git a/stardust/src/diagnostics/slowdown.rs b/stardust/src/diagnostics/slowdown.rs
new file mode 100644
index 000000000..dd093ff4f
--- /dev/null
+++ b/stardust/src/diagnostics/slowdown.rs
@@ -0,0 +1,32 @@
+use std::time::Duration;
+use bevy::prelude::*;
+
+/// Instructs transport layers to drop packets randomly, simulating an unstable connection.
+///
+/// This value ranges between `0.0` (never drop) to `1.0` (always drop), with `0.5` dropping 50% of the time.
+#[derive(Debug, Default, Clone, Component, Reflect)]
+#[reflect(Debug, Default, Component)]
+pub struct DropPackets(#[reflect(@0.0..=1.0)] pub f32);
+
+impl DropPackets {
+ /// Never drop packets.
+ pub const NEVER: Self = Self(0.0);
+
+ /// Always drop packets.
+ pub const ALWAYS: Self = Self(1.0);
+}
+
+/// Instructs transport layers to artifically increase latency, simulating a distant connection.
+///
+/// This latency increase is implemented by the transport layer, as a minimum latency value.
+/// You can think of it as a function `min(a,b)` where `a` is their real latency, and `b` is the value in this component.
+#[derive(Debug, Default, Clone, Component, Reflect)]
+#[reflect(Debug, Default, Component)]
+pub struct SimulateLatency(pub Duration);
+
+impl From for SimulateLatency {
+ #[inline]
+ fn from(value: Duration) -> Self {
+ Self(value)
+ }
+}
\ No newline at end of file
diff --git a/stardust/src/diagnostics/stats.rs b/stardust/src/diagnostics/stats.rs
new file mode 100644
index 000000000..7bbaa0663
--- /dev/null
+++ b/stardust/src/diagnostics/stats.rs
@@ -0,0 +1,38 @@
+use std::time::Instant;
+use bevy::prelude::*;
+
+/// A statistics tracking component for [peer entities].
+///
+/// When added to a peer, it tracks data about the peer's connection.
+/// This is useful for debugging and system administrator tools.
+/// These values are set by the transport layer managing the connection.
+///
+/// Note that round-trip time is not tracked in this component.
+/// RTT is tracked in its own component, called [`PeerRtt`].
+///
+/// [peer entities]: crate::connections
+/// [`PeerRtt`]: crate::connections::PeerRtt
+#[derive(Debug, Default, Clone, Component, Reflect)]
+#[reflect(Default, Component)]
+#[non_exhaustive]
+pub struct PeerStats {
+ /// The last time any data was received by the transport layer.
+ /// May be `None` if data has never been received.
+ pub last_recv: Option,
+
+ /// Outgoing data in kilobits per second, including overhead from the transport layer.
+ pub all_kbps_out: u32,
+
+ /// Outgoing data in kilobits per second, only counting bytes in individual messages.
+ /// If messages are sent piecemeal (in multiple chunks received on different ticks),
+ /// the received data is still counted.
+ pub msg_kbps_out: u32,
+
+ /// Incoming data in kilobits per second, including overhead from the transport layer.
+ pub all_kbps_in: u32,
+
+ /// Incoming data in kilobits per second, only counting bytes in individual messages.
+ /// If messages are sent piecemeal (in multiple chunks received on different ticks),
+ /// the received data is still counted.
+ pub msg_kbps_in: u32,
+}
\ No newline at end of file
diff --git a/stardust/src/hashing/mod.rs b/stardust/src/hashing/mod.rs
deleted file mode 100644
index a49cf8628..000000000
--- a/stardust/src/hashing/mod.rs
+++ /dev/null
@@ -1,32 +0,0 @@
-//! Hashing of Stardust's configuration and related plugins.
-
-mod stablehash;
-mod resource;
-
-use bevy::prelude::*;
-
-pub(crate) use resource::{PendingHashValues, finalise_hasher_system};
-
-pub use stablehash::StableHash;
-pub use resource::ProtocolConfigHash;
-
-mod sealed {
- pub trait Sealed {}
- impl Sealed for bevy::app::App {}
-}
-
-/// Extends Bevy's `App` to add methods for generating the [ProtocolId].
-pub trait HashingAppExt: sealed::Sealed {
- /// Hashes `value` immediately.
- ///
- /// Using this function depends on the ordering of its use. `f(A) f(B)` has a different result to `f(B) f(A)`.
- /// If you don't want this, use `net_hash_string`.
- fn net_hash_value(&mut self, value: impl StableHash);
-}
-
-impl HashingAppExt for App {
- fn net_hash_value(&mut self, value: impl StableHash) {
- let mut hasher = self.world.resource_mut::();
- value.hash(&mut hasher.state);
- }
-}
\ No newline at end of file
diff --git a/stardust/src/hashing/resource.rs b/stardust/src/hashing/resource.rs
deleted file mode 100644
index 489971714..000000000
--- a/stardust/src/hashing/resource.rs
+++ /dev/null
@@ -1,45 +0,0 @@
-use std::hash::Hasher;
-use bevy::prelude::*;
-use gxhash::GxHasher;
-use super::stablehash::STABLE_HASHER_SEED;
-
-/// A unique value generated during `App` creation, used to ensure two clients have consistent network setups.
-#[derive(Resource)]
-pub struct ProtocolConfigHash {
- int: u64,
-}
-
-impl ProtocolConfigHash {
- /// Returns the integer representation of the hash.
- pub fn int(&self) -> u64 {
- self.int
- }
-}
-
-/// Stores the state of the hasher before a result is finalized
-#[derive(Resource)]
-pub(crate) struct PendingHashValues {
- pub state: Box,
-}
-
-impl PendingHashValues {
- pub fn new() -> Self {
- Self {
- state: Box::new(GxHasher::with_seed(STABLE_HASHER_SEED)),
- }
- }
-}
-
-/// Adds the `UniqueNetworkHash` resource to the world.
-pub fn finalise_hasher_system(
- world: &mut World
-) {
- // Remove hasher resource
- let hasher = world.remove_resource::().unwrap();
-
- // Get hasher values
- let int = hasher.state.finish();
-
- // Insert hash resource
- world.insert_resource(ProtocolConfigHash { int });
-}
\ No newline at end of file
diff --git a/stardust/src/hashing/stablehash.rs b/stardust/src/hashing/stablehash.rs
deleted file mode 100644
index fd2f479d5..000000000
--- a/stardust/src/hashing/stablehash.rs
+++ /dev/null
@@ -1,87 +0,0 @@
-use std::hash::Hasher;
-
-/// Pre-defined seed used in GxHasher.
-pub(super) const STABLE_HASHER_SEED: i64 = 0x68066CFE6F752C27;
-
-/// A stably hashable type, for comparing configurations across the network.
-/// Since `#[derive(Hash)]` does not guarantee stability, this trait exists instead.
-/// You should implement it manually.
-///
-/// This must always feed the same bytes into the hasher no matter the architecture, platform, Rust version, or compilation.
-/// If this guarantee is not upheld, different compilations of the same application may become incompatible.
-/// If possible, you should always go through the `StableHash` implementation of a type, rather than using the `Hasher`'s API.
-///
-/// Notes for implementors:
-/// - Only write bytes (`write`, `write_u8`) - don't use other functions
-pub trait StableHash {
- /// Hashes the type through `H`.
- fn hash(&self, state: &mut H);
-}
-
-impl StableHash for () {
- fn hash(&self, _state: &mut H) {}
-}
-
-macro_rules! impl_stablehash_simple {
- ($type:ident) => {
- impl StableHash for $type {
- fn hash(&self, state: &mut H) {
- state.write(&self.to_be_bytes());
- }
- }
- };
-}
-
-impl_stablehash_simple!(u8);
-impl_stablehash_simple!(u16);
-impl_stablehash_simple!(u32);
-impl_stablehash_simple!(u64);
-impl_stablehash_simple!(u128);
-impl_stablehash_simple!(usize);
-impl_stablehash_simple!(i8);
-impl_stablehash_simple!(i16);
-impl_stablehash_simple!(i32);
-impl_stablehash_simple!(i64);
-impl_stablehash_simple!(i128);
-impl_stablehash_simple!(isize);
-
-impl StableHash for &[u8] {
- fn hash(&self, state: &mut H) {
- state.write(self);
- }
-}
-
-impl StableHash for bool {
- fn hash(&self, state: &mut H) {
- match self {
- true => state.write_u8(u8::MAX),
- false => state.write_u8(u8::MIN),
- }
- }
-}
-
-impl StableHash for &str {
- fn hash(&self, state: &mut H) {
- state.write(self.as_bytes());
- state.write_u8(0xFF);
- }
-}
-
-// Simple (and probably ineffective) check for hash stability in gxhash and our StableHash impls.
-// The reference point is a Surface Pro 7 with an Intel i5-1035G4 (8) @ 3.700GHz CPU.
-// The computer is running Linux Mint 21.2 x86_64 on kernel 6.6.6-surface-1.
-// Compiled on rustc 1.72.1 (d5c2e9c34 2023-09-13) on the release channel.
-// gxhash is version 3.0.0 exactly
-// If this test ever fails, you should report it immediately.
-#[test]
-fn hash_stability_check() {
- let mut hasher = gxhash::GxHasher::with_seed(STABLE_HASHER_SEED);
-
- false.hash(&mut hasher);
- 123456789u32.hash(&mut hasher);
- 123456789i32.hash(&mut hasher);
- "Hello, World!".hash(&mut hasher);
-
- let hash = hasher.finish();
- assert_eq!(hash, 0xCE46E06873D99619);
-}
\ No newline at end of file
diff --git a/stardust/src/lib.rs b/stardust/src/lib.rs
index 7a9fe5c41..60a19e167 100644
--- a/stardust/src/lib.rs
+++ b/stardust/src/lib.rs
@@ -3,10 +3,8 @@
pub mod channels;
pub mod connections;
+pub mod diagnostics;
pub mod messages;
pub mod plugin;
pub mod prelude;
-pub mod scheduling;
-
-#[cfg(feature="hashing")]
-pub mod hashing;
\ No newline at end of file
+pub mod scheduling;
\ No newline at end of file
diff --git a/stardust/src/messages/direction.rs b/stardust/src/messages/direction.rs
index 1ad88d619..cc7e82a2a 100644
--- a/stardust/src/messages/direction.rs
+++ b/stardust/src/messages/direction.rs
@@ -1,54 +1,67 @@
-use std::{any::Any, fmt::Debug};
+use std::fmt::Debug;
+use bevy::reflect::Reflect;
/// The direction a message is going, as an enum for dynamic use.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
-#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))]
-pub enum Direction {
- /// Messages being sent to a remote peer.
+///
+/// For use in the type system, see the [`MessageDirection`] trait.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Reflect)]
+#[reflect(Debug, PartialEq, Hash)]
+pub enum NetDirection {
+ /// Messages being transmitted from this peer to a remote peer.
+ /// Corresponds to, and is returned by, [`Outgoing`].
Outgoing,
- /// Messages being received from a remote peer.
+
+ /// Messages being transmitted to this peer by a remote peer.
+ /// Corresponds to, and is returned by, [`Incoming`].
Incoming,
}
/// The direction a message is going, as a trait for use in the type system.
///
-/// Implemented by:
-/// - [`Outgoing`], corresponding to [`Direction::Outgoing`]
-/// - [`Incoming`], corresponding to [`Direction::Incoming`]
-#[cfg(not(feature="reflect"))]
-pub trait DirectionType: Debug + Send + Sync + Any + sealed::Sealed {
- /// Returns the corresponding [`Direction`].
- fn as_enum() -> Direction;
+/// This is a [sealed trait] implemented by two [empty enums], [`Outgoing`] and [`Incoming`].
+/// These are intended to be used exclusively within the type system, such as on [`PeerMessages`].
+/// This allows types to have 'directional' variations to indicate their role in message flow.
+///
+/// The enum [`NetDirection`] is useful to carry this information at runtime.
+/// It is also returned by any `MessageDirection` implementor with the `net_dir` function.
+///
+/// [sealed trait]: https://rust-lang.github.io/api-guidelines/future-proofing.html#sealed-traits-protect-against-downstream-implementations-c-sealed
+/// [empty enums]: https://doc.rust-lang.org/nomicon/exotic-sizes.html#empty-types
+/// [`PeerMessages`]: crate::connections::PeerMessages
+pub trait MessageDirection: Debug + Send + Sync + Reflect + sealed::Sealed {
+ /// Returns the corresponding [`NetDirection`].
+ fn net_dir() -> NetDirection;
}
-/// The direction a message is going, as a trait for use in the type system.
+/// Messages being transmitted from this peer to a remote peer.
///
-/// Implemented by:
-/// - [`Outgoing`], corresponding to [`Direction::Outgoing`]
-/// - [`Incoming`], corresponding to [`Direction::Incoming`]
-#[cfg(feature="reflect")]
-pub trait DirectionType: Debug + Send + Sync + Any + bevy::reflect::Reflect + sealed::Sealed {
- /// Returns the corresponding [`Direction`].
- fn as_enum() -> Direction;
-}
+/// Counterpart to [`Incoming`], and corresponds to [`NetDirection::Incoming`].
+///
+/// This type **cannot** be instantiated and is only intended for use in the type system.
+/// For more information on message direction, see the [`MessageDirection`] trait.
+#[derive(Debug, Clone, Copy, Reflect)]
+#[reflect(Debug)]
+pub enum Outgoing {}
-/// Messages being sent to a remote peer. Counterpart to [`Incoming`].
-#[derive(Debug, Clone, Copy)]
-#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))]
-pub struct Outgoing;
-impl DirectionType for Outgoing {
- fn as_enum() -> Direction {
- Direction::Outgoing
+impl MessageDirection for Outgoing {
+ fn net_dir() -> NetDirection {
+ NetDirection::Outgoing
}
}
-/// Messages being received from a remote peer. Counterpart to [`Outgoing`].
-#[derive(Debug, Clone, Copy)]
-#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))]
-pub struct Incoming;
-impl DirectionType for Incoming {
- fn as_enum() -> Direction {
- Direction::Incoming
+/// Messages being transmitted to this peer by a remote peer.
+///
+/// Counterpart to [`Outgoing`], and corresponds to [`NetDirection::Outgoing`].
+///
+/// This type **cannot** be instantiated and is only intended for use in the type system.
+/// For more information on message direction, see the [`MessageDirection`] trait.
+#[derive(Debug, Clone, Copy, Reflect)]
+#[reflect(Debug)]
+pub enum Incoming {}
+
+impl MessageDirection for Incoming {
+ fn net_dir() -> NetDirection {
+ NetDirection::Incoming
}
}
diff --git a/stardust/src/messages/message.rs b/stardust/src/messages/message.rs
new file mode 100644
index 000000000..368b62d5d
--- /dev/null
+++ b/stardust/src/messages/message.rs
@@ -0,0 +1,144 @@
+use std::{ops::Deref, str::{from_utf8, Utf8Error}};
+use bytes::Bytes;
+use crate::channels::ChannelId;
+
+/// An individual, whole message. The most basic communication primitive.
+///
+/// Messages are cheaply clonable and contiguous, being a `#[repr(transparent)]` wrapper around a [`Bytes`].
+///
+/// ## Constraints
+/// A `Message` is **unaltered** - it is exactly the same series of bytes as what was sent by the peer.
+/// All outside data is untrusted, and you should employ defensive programming when handling user data.
+/// It's recommended to use the `untrusted` crate or the `octs` crate, but not `bytes`,
+/// since `bytes` methods panic rather than returning an error.
+#[derive(Clone)]
+#[repr(transparent)]
+pub struct Message(Bytes);
+
+impl Message {
+ /// Returns the length of the message, in bytes.
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.0.len()
+ }
+
+ /// Returns `true` if the length of the message is `0`.
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
+
+ /// Returns the message as a slice of bytes.
+ #[inline]
+ pub fn as_slice(&self) -> &[u8] {
+ &self.0
+ }
+
+ /// Attempts to read the message as a string slice.
+ ///
+ /// This fails if the message is not valid UTF-8.
+ /// See the [`from_utf8`] docs for more information.
+ #[inline]
+ pub fn as_str(&self) -> Result<&str, Utf8Error> {
+ from_utf8(&self.0)
+ }
+
+ /// Create a [`Message`] from a [`Bytes`].
+ ///
+ /// This is different from the [`From`] implementation,
+ /// as it can be used in const contexts.
+ #[inline]
+ pub const fn from_bytes(bytes: Bytes) -> Self {
+ Self(bytes)
+ }
+
+ /// Create a [`Message`] from a static slice of bytes.
+ #[inline]
+ pub const fn from_static(slc: &'static [u8]) -> Self {
+ Self::from_bytes(Bytes::from_static(slc))
+ }
+
+ /// Create a [`Message`] from a static string slice.
+ #[inline]
+ pub const fn from_static_str(str: &'static str) -> Self {
+ Self::from_static(str.as_bytes())
+ }
+}
+
+impl From for Message {
+ #[inline]
+ fn from(bytes: Bytes) -> Self {
+ Self::from_bytes(bytes)
+ }
+}
+
+impl From for Bytes {
+ #[inline]
+ fn from(message: Message) -> Self {
+ message.0
+ }
+}
+
+impl AsRef<[u8]> for Message {
+ #[inline]
+ fn as_ref(&self) -> &[u8] {
+ &self.0[..]
+ }
+}
+
+impl Deref for Message {
+ type Target = [u8];
+
+ fn deref(&self) -> &Self::Target {
+ &self.0[..]
+ }
+}
+
+impl std::fmt::Debug for Message {
+ #[inline]
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ self.0.fmt(f)
+ }
+}
+
+/// A [`Message`] with an associated [`ChannelId`].
+#[derive(Clone)]
+pub struct ChannelMessage {
+ /// The channel's identifier.
+ pub channel: ChannelId,
+
+ /// The contents of the message.
+ pub message: Message,
+}
+
+impl From<(ChannelId, Message)> for ChannelMessage {
+ fn from(value: (ChannelId, Message)) -> Self {
+ Self {
+ channel: value.0,
+ message: value.1,
+ }
+ }
+}
+
+impl From<(ChannelId, Bytes)> for ChannelMessage {
+ fn from(value: (ChannelId, Bytes)) -> Self {
+ Self {
+ channel: value.0,
+ message: Message::from_bytes(value.1),
+ }
+ }
+}
+
+impl AsRef for ChannelMessage {
+ #[inline]
+ fn as_ref(&self) -> &Message {
+ &self.message
+ }
+}
+
+impl AsRef<[u8]> for ChannelMessage {
+ #[inline]
+ fn as_ref(&self) -> &[u8] {
+ self.message.as_ref()
+ }
+}
\ No newline at end of file
diff --git a/stardust/src/messages/mod.rs b/stardust/src/messages/mod.rs
index 22d085cbf..11ab87777 100644
--- a/stardust/src/messages/mod.rs
+++ b/stardust/src/messages/mod.rs
@@ -1,11 +1,32 @@
-//! Message related stuff.
+//! This module exposes APIs for working with messages.
+//!
+//! Messages are represented with the [`Message`] type, which is heap-allocated and cheaply clonable.
+//! This is the smallest unit of information transmission between [peers] used by Stardust.
+//! The contents of a message can be absolutely anything, from chat messages to game state.
+//! Stardust will never do anything with the contents of your messages - that's up to your systems.
+//!
+//! Messages are, in reality, just an abstraction. The nuts and bolts of how the messages
+//! are actually exchanged between computers is entirely up to installed transport layers.
+//! A message may be sent via datagrams, a byte stream, or something else entirely. You as
+//! the developer don't need to worry about what's going on behind the scenes, because it
+//! should just work.
+//!
+//! You will primarily handle messages through [`PeerMessages`]. This component is attached
+//! to [peer entities][peers], and acts as a queue for incoming and outgoing messages, depending
+//! on the choice of `D`. The documentation for `PeerMessages` goes into much further detail about
+//! how you use these message queues.
+//!
+//! [peers]: crate::connections
+//! [`PeerMessages`]: crate::connections::PeerMessages
-mod queue;
mod direction;
+mod message;
+mod queue;
-pub(crate) mod systems;
-
-pub use queue::NetworkMessages;
-pub use direction::*;
+// Re-exports
+pub use bytes;
-pub use bytes::{Buf, BufMut, Bytes, BytesMut};
\ No newline at end of file
+// Public types
+pub use direction::{NetDirection, MessageDirection, Incoming, Outgoing};
+pub use message::{Message, ChannelMessage};
+pub use queue::{MessageQueue, ChannelIter, MessageIter};
\ No newline at end of file
diff --git a/stardust/src/messages/queue.rs b/stardust/src/messages/queue.rs
index d8451c728..ad270ff70 100644
--- a/stardust/src/messages/queue.rs
+++ b/stardust/src/messages/queue.rs
@@ -1,66 +1,295 @@
-use std::{collections::HashMap, marker::PhantomData};
-use bevy::prelude::*;
-use bytes::Bytes;
+use bevy::utils::HashMap;
+use smallvec::SmallVec;
use crate::prelude::*;
-use super::direction::DirectionType;
-static EMPTY_SLICE: &[Bytes] = &[];
+type IdxVec = SmallVec<[usize; 2]>;
-/// A queue-like structure for storing messages, separated by channels.
-///
-/// The items in this queue **do not** persist across frames.
-/// They are cleared in [`NetworkWrite::Clear`].
-#[derive(Component)]
-pub struct NetworkMessages {
- pub(crate) queue_map: HashMap>,
- phantom: PhantomData
+/// An efficient queue of messages, organised by channel.
+pub struct MessageQueue {
+ messages: Vec,
+ indexes: HashMap,
}
-impl NetworkMessages {
- /// Creates a new `Messages` store. Doesn't allocate until [`push`](Self::push) is used.
+impl MessageQueue {
+ /// Creates a new `Messages` store. Doesn't allocate until [`push_one`](Self::push_one) is used.
pub fn new() -> Self {
Self {
- queue_map: HashMap::new(),
- phantom: PhantomData,
+ messages: Vec::new(),
+ indexes: HashMap::new(),
}
}
- /// Clears all queues but doesn't reallocate.
- pub(crate) fn clear(&mut self) {
- self.queue_map
+ /// Clears all queues but doesn't resize anything.
+ ///
+ /// If you want to free allocations, use [`reset`](Self::reset).
+ pub fn clear(&mut self) {
+ // Clear the message map
+ self.messages.clear();
+
+ // Clear all indexes in the map
+ self.indexes
.iter_mut()
.for_each(|(_, v)| v.clear())
}
- /// Counts how many messages are queued in all channels.
- pub fn count(&self) -> usize {
- self.queue_map
- .iter()
- .map(|(_,v)| v.len())
- .sum()
+ /// Clears all data in the queue, and frees any allocations.
+ pub fn reset(&mut self) {
+ *self = MessageQueue::new();
+ }
+
+ /// Reserves capacity for at least `additional` new messages to be efficiently inserted.
+ pub fn reserve(&mut self, additional: usize) {
+ self.messages.reserve(additional);
}
- /// Pushes a new item to the queue.
- pub fn push(&mut self, channel: ChannelId, bytes: Bytes) {
- self.queue_map
+ /// Reserves capacity for at least `additional` new messages associated with `channel` to be efficiently inserted.
+ pub fn reserve_channel(&mut self, channel: ChannelId, additional: usize) {
+ // Reserve space in the messages vector
+ self.reserve(additional);
+
+ // Reserve space in the index vector
+ self.indexes
.entry(channel)
- .or_insert(Vec::with_capacity(1))
- .push(bytes);
+ .or_insert_with(|| SmallVec::new())
+ .reserve(additional);
+ }
+
+ /// Returns the total number of messages stored in the queue.
+ #[inline]
+ pub fn count(&self) -> usize {
+ self.messages.len()
+ }
+
+ /// Returns the sum of bytes from all messages in the queue.
+ #[inline]
+ pub fn bytes(&self) -> usize {
+ self.messages.iter().map(|v| v.len()).sum()
+ }
+
+ /// Pushes a single message to the queue.
+ pub fn push_one(&mut self, message: ChannelMessage) {
+ // Add to the messages vec
+ let idx = self.messages.len();
+ self.messages.push(message.message);
+
+ // Add index to the map
+ self.indexes
+ .entry(message.channel)
+ .or_insert(IdxVec::with_capacity(1))
+ .push(idx);
+ }
+
+ /// Pushes messages from `iter` to the queue.
+ /// This can be faster than calling [`push_one`](Self::push_one) repeatedly.
+ pub fn push_many(&mut self, iter: I)
+ where
+ I: IntoIterator- ,
+ {
+ // Convert the iterator and find the maximum expected size
+ let iter = iter.into_iter();
+ let size = match iter.size_hint() {
+ (v, None) => v,
+ (v, Some(a)) => v.max(a),
+ };
+
+ // Expand the message vector to fit, if necessary
+ self.messages.reserve(size);
+
+ // Push everything as per usual
+ for message in iter {
+ self.push_one(message);
+ }
+ }
+
+ /// Pushes many messages from `iter` to a single channel.
+ /// This can be faster than calling [`push_one`](Self::push_one) or [`push_many`](Self::push_many) repeatedly.
+ pub fn push_channel
(&mut self, channel: ChannelId, iter: I)
+ where
+ I: IntoIterator- ,
+ {
+ // Convert the iterator and find the maximum expected size
+ let iter = iter.into_iter();
+ let size = match iter.size_hint() {
+ (v, None) => v,
+ (v, Some(a)) => v.max(a),
+ };
+
+ // Add index to the map
+ let indexes = self.indexes
+ .entry(channel)
+ .or_insert(IdxVec::with_capacity(size));
+
+ // Expand the vectors to fit, if necessary
+ self.messages.reserve(size);
+ indexes.reserve(size);
+
+ // Insert all payloads
+ for payload in iter {
+ let idx = self.messages.len();
+ self.messages.push(payload);
+ indexes.push(idx);
+ }
+ }
+
+ /// Returns an iterator over channels, and their associated queues.
+ pub fn iter(&self) -> ChannelIter {
+ ChannelIter {
+ messages: &self.messages,
+ map_iter: self.indexes.iter(),
+ }
+ }
+
+ /// Returns an iterator over all messages in a specific channel.
+ pub fn iter_channel(&self, channel: ChannelId) -> MessageIter {
+ match self.indexes.get(&channel) {
+ // The index map exists, return a real MessageIter
+ Some(indexes) => MessageIter {
+ messages: &self.messages,
+ indexes: indexes.as_slice(),
+ },
+
+ // MessageIter tracks progress by shrinking a slice every iteration.
+ // If the slice we give it is empty, it instantly returns None.
+ // By returning an iterator that ends instantly, we don't need to
+ // make this function have a return type of Option
.
+ None => MessageIter {
+ messages: &self.messages,
+ indexes: &[],
+ },
+ }
+ }
+}
+
+impl Default for MessageQueue {
+ #[inline]
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl Extend for MessageQueue {
+ #[inline]
+ fn extend>(&mut self, iter: T) {
+ self.push_many(iter);
+ }
+}
+
+impl<'a> IntoIterator for &'a MessageQueue {
+ type IntoIter = ChannelIter<'a>;
+ type Item = as Iterator>::Item;
+
+ #[inline]
+ fn into_iter(self) -> Self::IntoIter {
+ self.iter()
+ }
+}
+
+/// An iterator over individual channels in a [`MessageQueue`].
+///
+/// Produces [`ChannelId`] values, and [`MessageIter`] iterators.
+/// The order of iteration over channels is unspecified, and may change unpredictably.
+#[derive(Clone)]
+pub struct ChannelIter<'a> {
+ messages: &'a [Message],
+ map_iter: bevy::utils::hashbrown::hash_map::Iter<'a, ChannelId, IdxVec>,
+}
+
+impl<'a> Iterator for ChannelIter<'a> {
+ type Item = (ChannelId, MessageIter<'a>);
+
+ fn next(&mut self) -> Option {
+ let (c, i) = self.map_iter.next()?;
+ return Some((c.clone(), MessageIter {
+ messages: &self.messages,
+ indexes: i.as_slice(),
+ }));
+ }
+
+ #[inline]
+ fn size_hint(&self) -> (usize, Option) {
+ self.map_iter.size_hint()
+ }
+}
+
+/// An iterator over individual messages in channel, produced by a [`ChannelIter`].
+///
+/// Produces the contents of the messages in the order they were added to the queue.
+#[derive(Clone)]
+pub struct MessageIter<'a> {
+ messages: &'a [Message],
+ indexes: &'a [usize],
+}
+
+impl<'a> Iterator for MessageIter<'a> {
+ type Item = Message;
+
+ fn next(&mut self) -> Option {
+ // If there's no items left, return
+ let ln = self.indexes.len();
+ if ln == 0 { return None }
+
+ // Get the next message we want
+ let idx = self.indexes[0];
+ let val = self.messages[idx].clone();
+
+ // Change the slice to cut off the first item
+ // This is cheaper than storing a cursor value
+ self.indexes = &self.indexes[1..];
+
+ // Return the message
+ return Some(val);
+ }
+
+ #[inline]
+ fn size_hint(&self) -> (usize, Option) {
+ let len = self.indexes.len();
+ (len, Some(len))
}
+}
- /// Returns a slice of the queue for channel `channel`.
- pub fn channel_queue(&self, channel: ChannelId) -> &[Bytes] {
- self.queue_map
- .get(&channel)
- .map_or(EMPTY_SLICE, |v| v.as_slice())
+impl<'a> ExactSizeIterator for MessageIter<'a> {
+ #[inline]
+ fn len(&self) -> usize {
+ self.indexes.len()
}
+}
+
+#[test]
+fn message_queue_ordering_test() {
+ let mut queue = MessageQueue::new();
- /// Returns an iterator over all queues, including their channel ids.
- /// The iterator does not contain empty queues.
- pub fn all_queues(&self) -> impl Iterator- {
- self.queue_map
- .iter()
- .filter(|(_,v)| v.len() != 0)
- .map(|(k,v)| (k.clone(), v.as_slice()))
+ fn map_messages(set: &'static [&'static [u8]]) -> impl Iterator
- {
+ set.iter().map(|v| Message::from_bytes(Bytes::from_static(v)))
}
+
+ const MESSAGE_SET_A: &[&[u8]] = &[
+ b"Hello, world!",
+ b"It's a very nice day, isn't it?",
+ b"Yeah, I agree!",
+ ];
+
+ const MESSAGE_SET_B: &[&[u8]] = &[
+ b"Hello, world!",
+ b"It's a miserable day, isn't it?",
+ b"No, I think it's fine!",
+ ];
+
+ const MESSAGE_SET_C: &[&[u8]] = &[
+ b"Hello, world!",
+ b"It's an alright day, isn't it?",
+ b"That's a good way of putting it.",
+ ];
+
+ // Add all the messages to the channel
+ queue.push_channel(ChannelId::from(0), map_messages(MESSAGE_SET_A));
+ queue.push_channel(ChannelId::from(0), map_messages(MESSAGE_SET_B));
+ queue.push_channel(ChannelId::from(1), map_messages(MESSAGE_SET_C));
+
+ queue.iter_channel(ChannelId::from(0))
+ .zip(MESSAGE_SET_A.iter().chain(MESSAGE_SET_B))
+ .for_each(|(a, b)| assert_eq!(a.as_slice(), *b));
+
+ queue.iter_channel(ChannelId::from(1))
+ .zip(MESSAGE_SET_C)
+ .for_each(|(a, b)| assert_eq!(a.as_slice(), *b));
}
\ No newline at end of file
diff --git a/stardust/src/messages/systems.rs b/stardust/src/messages/systems.rs
deleted file mode 100644
index 76b3fd595..000000000
--- a/stardust/src/messages/systems.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-use bevy::prelude::*;
-use super::*;
-
-pub(crate) fn clear_message_queue_system