From c087a9f49b202b8baebad785b983195161077564 Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 8 Dec 2024 16:10:38 +1030 Subject: [PATCH 01/22] Queues chunks to be fetched when a player moves --- Cargo.toml | 1 + src/bin/src/main.rs | 2 ++ src/bin/src/packet_handlers/login_process.rs | 4 ++- .../transform/update_player_position.rs | 34 +++++++++++++++++++ src/bin/src/systems/chunk_fetcher.rs | 27 +++++++++++++++ src/bin/src/systems/mod.rs | 1 + src/lib/core/Cargo.toml | 6 ++-- src/lib/core/src/chunks/chunk_receiver.rs | 27 +++++++++++++++ src/lib/core/src/chunks/mod.rs | 1 + src/lib/core/src/lib.rs | 1 + 10 files changed, 101 insertions(+), 3 deletions(-) create mode 100644 src/bin/src/systems/chunk_fetcher.rs create mode 100644 src/lib/core/src/chunks/chunk_receiver.rs create mode 100644 src/lib/core/src/chunks/mod.rs diff --git a/Cargo.toml b/Cargo.toml index b79853e4..11be7aa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -145,6 +145,7 @@ hashbrown = "0.15.0" tinyvec = "1.8.0" dashmap = "6.1.0" uuid = { version = "1.1", features = ["v4", "v3", "serde"] } +whirlwind = "0.1.1" # Macros lazy_static = "1.5.0" diff --git a/src/bin/src/main.rs b/src/bin/src/main.rs index 62fd09b4..31aa9027 100644 --- a/src/bin/src/main.rs +++ b/src/bin/src/main.rs @@ -14,6 +14,8 @@ use std::sync::Arc; use systems::definition; use tracing::{error, info, Level}; +pub const VIEW_DISTANCE: i32 = 12; + #[derive(clap::Parser)] struct CLIArgs { #[clap(long)] diff --git a/src/bin/src/packet_handlers/login_process.rs b/src/bin/src/packet_handlers/login_process.rs index c26609aa..4c725290 100644 --- a/src/bin/src/packet_handlers/login_process.rs +++ b/src/bin/src/packet_handlers/login_process.rs @@ -25,6 +25,7 @@ use ferrumc_net::packets::outgoing::synchronize_player_position::SynchronizePlay use ferrumc_net_codec::encode::NetEncodeOpts; use ferrumc_state::GlobalState; use tracing::{debug, trace}; +use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; #[event_handler] async fn handle_login_start( @@ -131,7 +132,8 @@ async fn handle_ack_finish_configuration( .universe .add_component::(conn_id, Position::default())? .add_component::(conn_id, Rotation::default())? - .add_component::(conn_id, OnGround::default())?; + .add_component::(conn_id, OnGround::default())? + .add_component::(conn_id, ChunkReceiver::default())?; let mut writer = state.universe.get_mut::(conn_id)?; diff --git a/src/bin/src/packet_handlers/transform/update_player_position.rs b/src/bin/src/packet_handlers/transform/update_player_position.rs index e5e05cb8..ec227a2f 100644 --- a/src/bin/src/packet_handlers/transform/update_player_position.rs +++ b/src/bin/src/packet_handlers/transform/update_player_position.rs @@ -1,3 +1,4 @@ +use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; use ferrumc_core::transform::grounded::OnGround; use ferrumc_core::transform::position::Position; use ferrumc_core::transform::rotation::Rotation; @@ -13,9 +14,30 @@ async fn handle_player_move( state: GlobalState, ) -> Result { let conn_id = event.conn_id; + let mut calculate_chunks = false; if let Some(ref new_position) = event.position { let mut position = conn_id.get_mut::(&state)?; + let mut chunk_recv = state.universe.get_mut::(conn_id)?; + if let Some(last_chunk) = &chunk_recv.last_chunk { + let new_chunk = ( + new_position.x as i32 / 16, + new_position.z as i32 / 16, + String::from("overworld"), + ); + if *last_chunk != new_chunk { + chunk_recv.last_chunk = Some(new_chunk); + calculate_chunks = true; + } + } else { + chunk_recv.last_chunk = Some(( + new_position.x as i32 / 16, + new_position.z as i32 / 16, + String::from("overworld"), + )); + calculate_chunks = true; + } + *position = Position::new(new_position.x, new_position.y, new_position.z); } @@ -31,5 +53,17 @@ async fn handle_player_move( *on_ground = OnGround(new_grounded); } + if calculate_chunks { + let chunk_recv = state.universe.get_mut::(conn_id)?; + chunk_recv.can_see.clear().await; + let (center_x, center_z, dimension) = chunk_recv.last_chunk.as_ref().unwrap(); + for x in center_x - crate::VIEW_DISTANCE..=center_x + crate::VIEW_DISTANCE { + for z in center_z - crate::VIEW_DISTANCE..=center_z + crate::VIEW_DISTANCE { + chunk_recv.needed_chunks.insert((x, z, dimension.clone()), None).await; + chunk_recv.can_see.insert((x, z, dimension.clone())).await; + } + } + } + Ok(event) } diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs new file mode 100644 index 00000000..9c1ae3cc --- /dev/null +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -0,0 +1,27 @@ +use crate::systems::definition::System; +use async_trait::async_trait; +use ferrumc_state::GlobalState; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use tracing::{debug, info}; + +pub struct ChunkFetcher { + stop: AtomicBool, +} + +#[async_trait] +impl System for ChunkFetcher { + async fn start(self: Arc, state: GlobalState) { + info!("Chunk fetcher system started"); + + while !self.stop.load(std::sync::atomic::Ordering::Relaxed) {} + } + + async fn stop(self: Arc, state: GlobalState) { + self.stop.store(true, std::sync::atomic::Ordering::Relaxed); + } + + fn name(&self) -> &'static str { + "Chunk Fetcher" + } +} diff --git a/src/bin/src/systems/mod.rs b/src/bin/src/systems/mod.rs index 06228141..7f6fd015 100644 --- a/src/bin/src/systems/mod.rs +++ b/src/bin/src/systems/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod definition; +mod chunk_fetcher; mod chunk_sender; mod keep_alive_system; mod tcp_listener_system; diff --git a/src/lib/core/Cargo.toml b/src/lib/core/Cargo.toml index 79e2e1c3..81764183 100644 --- a/src/lib/core/Cargo.toml +++ b/src/lib/core/Cargo.toml @@ -6,5 +6,7 @@ edition = "2021" [dependencies] thiserror = { workspace = true } -tokio = { workspace = true} -ferrumc-ecs = {workspace = true} +tokio = { workspace = true } +ferrumc-ecs = { workspace = true } +whirlwind = { workspace = true } +ferrumc-world = { workspace = true } diff --git a/src/lib/core/src/chunks/chunk_receiver.rs b/src/lib/core/src/chunks/chunk_receiver.rs new file mode 100644 index 00000000..39319387 --- /dev/null +++ b/src/lib/core/src/chunks/chunk_receiver.rs @@ -0,0 +1,27 @@ +use ferrumc_world::chunk_format::Chunk; +use tokio::time::Instant; +use whirlwind::{ShardMap, ShardSet}; + +pub struct ChunkReceiver { + pub needed_chunks: ShardMap<(i32, i32, String), Option>, + pub can_see: ShardSet<(i32, i32, String)>, + pub last_update: Instant, + pub last_chunk: Option<(i32, i32, String)>, +} + +impl Default for ChunkReceiver { + fn default() -> Self { + Self::new() + } +} + +impl ChunkReceiver { + pub fn new() -> Self { + Self { + needed_chunks: ShardMap::new(), + can_see: ShardSet::new(), + last_update: Instant::now(), + last_chunk: None, + } + } +} diff --git a/src/lib/core/src/chunks/mod.rs b/src/lib/core/src/chunks/mod.rs new file mode 100644 index 00000000..c64e1905 --- /dev/null +++ b/src/lib/core/src/chunks/mod.rs @@ -0,0 +1 @@ +pub mod chunk_receiver; \ No newline at end of file diff --git a/src/lib/core/src/lib.rs b/src/lib/core/src/lib.rs index dc9eeb0e..0796791e 100644 --- a/src/lib/core/src/lib.rs +++ b/src/lib/core/src/lib.rs @@ -4,3 +4,4 @@ pub mod errors; pub mod identity; pub mod state; pub mod transform; +pub mod chunks; From 18170dbdb2d650d392bb9275575c926a2690503c Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 8 Dec 2024 16:51:27 +1030 Subject: [PATCH 02/22] Should now send chunks --- .../transform/update_player_position.rs | 11 +- src/bin/src/systems/chunk_sender.rs | 129 ++++-------------- src/lib/core/Cargo.toml | 2 +- src/lib/core/src/chunks/chunk_receiver.rs | 26 +++- 4 files changed, 52 insertions(+), 116 deletions(-) diff --git a/src/bin/src/packet_handlers/transform/update_player_position.rs b/src/bin/src/packet_handlers/transform/update_player_position.rs index ec227a2f..ceea4fc0 100644 --- a/src/bin/src/packet_handlers/transform/update_player_position.rs +++ b/src/bin/src/packet_handlers/transform/update_player_position.rs @@ -54,15 +54,8 @@ async fn handle_player_move( } if calculate_chunks { - let chunk_recv = state.universe.get_mut::(conn_id)?; - chunk_recv.can_see.clear().await; - let (center_x, center_z, dimension) = chunk_recv.last_chunk.as_ref().unwrap(); - for x in center_x - crate::VIEW_DISTANCE..=center_x + crate::VIEW_DISTANCE { - for z in center_z - crate::VIEW_DISTANCE..=center_z + crate::VIEW_DISTANCE { - chunk_recv.needed_chunks.insert((x, z, dimension.clone()), None).await; - chunk_recv.can_see.insert((x, z, dimension.clone())).await; - } - } + let mut chunk_recv = state.universe.get_mut::(conn_id)?; + chunk_recv.calculate_chunks().await; } Ok(event) diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index a16cf191..54f572e7 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -1,22 +1,16 @@ use crate::systems::definition::System; use async_trait::async_trait; -use ferrumc_core::identity::player_identity::PlayerIdentity; -use ferrumc_core::transform::position::Position; +use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; +use ferrumc_ecs::errors::ECSError; use ferrumc_net::connection::StreamWriter; use ferrumc_net::packets::outgoing::chunk_and_light_data::ChunkAndLightData; -use ferrumc_net::packets::outgoing::set_center_chunk::SetCenterChunk; use ferrumc_net_codec::encode::NetEncodeOpts; use ferrumc_state::GlobalState; -use std::ops::Div; -use std::simd::num::SimdFloat; -use std::simd::{f64x2, StdFloat}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tracing::{debug, error, info}; -const CHUNK_RADIUS: i32 = 12; - pub(super) struct ChunkSenderSystem { pub stop: AtomicBool, } @@ -38,109 +32,42 @@ impl System for ChunkSenderSystem { debug!("Sending chunks to players"); let players = state .universe - .query::<(&PlayerIdentity, &Position, &mut StreamWriter)>(); - // TODO: This is so ass. Please fix this. - for (_entity, (player, position, mut conn)) in players { - debug!( - "Sending chunks to player: {} @ {}", - player.username, position - ); - // Haha SIMD go brrrrt - let [chunk_x, chunk_z] = f64x2::from_array([position.x, position.z]) - .floor() - .div(f64x2::from_array([16f64, 16f64])) - .cast::() - .to_array(); - if let Err(e) = conn - .send_packet( - &SetCenterChunk::new(chunk_x, chunk_z), - &NetEncodeOpts::WithLength, - ) - .await - { - error!( - "Unable to set the center chunk for {} @ {}, {}: {}", - &player.username, chunk_x, chunk_z, e - ); - continue; - } - let start = std::time::Instant::now(); - let mut chunk_range = (chunk_x - CHUNK_RADIUS..chunk_x + CHUNK_RADIUS) - .flat_map(|z| { - (chunk_z - CHUNK_RADIUS..chunk_z + CHUNK_RADIUS) - .map(move |x| (x, z, "overworld")) - }) - .collect::>(); - - chunk_range.sort_by_key(|&(x, z, _)| { - let dx = x - chunk_x; - let dz = z - chunk_z; - (((dx ^ 2) + (dz ^ 2)) as f64).sqrt() as i32 - }); - - match state.world.load_chunk_batch(chunk_range).await { - Ok(chunks) => { - for chunk in chunks { - match ChunkAndLightData::from_chunk(&chunk) { - Ok(data) => { + .query::<(&mut ChunkReceiver, &mut StreamWriter)>(); + let mut task_set: tokio::task::JoinSet> = + tokio::task::JoinSet::new(); + for (eid, (_, _)) in players { + let state = state.clone(); + task_set.spawn(async move { + let chunk_recv = state.universe.get_mut::<&mut ChunkReceiver>(eid)?; + for possible_chunk in chunk_recv.needed_chunks.iter_mut() { + let (key, possible_chunk) = possible_chunk.pair(); + let _ = chunk_recv.needed_chunks.remove(key); + if let Some(chunk) = possible_chunk { + let packet = &ChunkAndLightData::from_chunk(chunk); + match packet { + Ok(packet) => { + let mut conn = + state.universe.get_mut::<&mut StreamWriter>(eid)?; if let Err(e) = - conn.send_packet(&data, &NetEncodeOpts::WithLength).await + conn.send_packet(packet, &NetEncodeOpts::WithLength).await { - error!( - "Unable to send chunk data to {} @ {}, {}: {}", - &player.username, chunk.x, chunk.z, e - ); - if let Err(e) = conn - .send_packet( - &ChunkAndLightData::empty(chunk.x, chunk.z), - &NetEncodeOpts::WithLength, - ) - .await - { - error!( - "Unable to send empty chunk data to {} @ {}, {}: {}", - &player.username, chunk.x, chunk.z, e - ); - } + error!("Error sending chunk: {:?}", e); } + return Ok(()); } Err(e) => { - error!( - "Unable to convert chunk to chunk and light data for {} @ {}, {}: {}", - &player.username, chunk.x, chunk.z, e - ); - if let Err(e) = conn - .send_packet( - &ChunkAndLightData::empty(chunk.x, chunk.z), - &NetEncodeOpts::WithLength, - ) - .await - { - error!( - "Unable to send empty chunk data to {} @ {}, {}: {}", - &player.username, chunk.x, chunk.z, e - ); - } + error!("Error sending chunk: {:?}", e); } } } } - Err(e) => { - error!( - "Unable to load chunks for {} @ {}, {}: {}", - &player.username, chunk_x, chunk_z, e - ); - } + Ok(()) + }); + } + while let Some(result) = task_set.join_next().await { + if let Err(e) = result { + error!("Error sending chunk: {:?}", e); } - - debug!( - "Sent {} chunks to player: {} @ {:.2},{:.2} in {:?}", - (CHUNK_RADIUS * 2) * 2, - player.username, - position.x, - position.z, - start.elapsed() - ); } tokio::time::sleep(Duration::from_secs(5)).await; diff --git a/src/lib/core/Cargo.toml b/src/lib/core/Cargo.toml index 81764183..853f9f0d 100644 --- a/src/lib/core/Cargo.toml +++ b/src/lib/core/Cargo.toml @@ -8,5 +8,5 @@ edition = "2021" thiserror = { workspace = true } tokio = { workspace = true } ferrumc-ecs = { workspace = true } -whirlwind = { workspace = true } +dashmap = { workspace = true } ferrumc-world = { workspace = true } diff --git a/src/lib/core/src/chunks/chunk_receiver.rs b/src/lib/core/src/chunks/chunk_receiver.rs index 39319387..6aeae30e 100644 --- a/src/lib/core/src/chunks/chunk_receiver.rs +++ b/src/lib/core/src/chunks/chunk_receiver.rs @@ -1,10 +1,11 @@ +use dashmap::{DashMap, DashSet}; use ferrumc_world::chunk_format::Chunk; use tokio::time::Instant; -use whirlwind::{ShardMap, ShardSet}; +const VIEW_DISTANCE: i32 = 12; pub struct ChunkReceiver { - pub needed_chunks: ShardMap<(i32, i32, String), Option>, - pub can_see: ShardSet<(i32, i32, String)>, + pub needed_chunks: DashMap<(i32, i32, String), Option>, + pub can_see: DashSet<(i32, i32, String)>, pub last_update: Instant, pub last_chunk: Option<(i32, i32, String)>, } @@ -18,10 +19,25 @@ impl Default for ChunkReceiver { impl ChunkReceiver { pub fn new() -> Self { Self { - needed_chunks: ShardMap::new(), - can_see: ShardSet::new(), + needed_chunks: DashMap::new(), + can_see: DashSet::new(), last_update: Instant::now(), last_chunk: None, } } } + +impl ChunkReceiver { + pub async fn calculate_chunks(&mut self) { + if let Some(last_chunk) = &self.last_chunk { + self.can_see.clear(); + for x in last_chunk.0 - VIEW_DISTANCE..=last_chunk.0 + VIEW_DISTANCE { + for z in last_chunk.1 - VIEW_DISTANCE..=last_chunk.1 + VIEW_DISTANCE { + self.needed_chunks + .insert((x, z, last_chunk.2.clone()), None); + self.can_see.insert((x, z, last_chunk.2.clone())); + } + } + } + } +} From 89fa5e37ef4b937c884c6e40c78380a319db461e Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 8 Dec 2024 18:08:42 +1030 Subject: [PATCH 03/22] Deadlocks when sending a chunk. what. --- Cargo.toml | 3 ++ src/bin/src/packet_handlers/login_process.rs | 13 +++-- src/bin/src/systems/chunk_fetcher.rs | 52 +++++++++++++++++-- src/bin/src/systems/chunk_sender.rs | 53 ++++++++++---------- src/bin/src/systems/definition.rs | 2 + src/lib/core/Cargo.toml | 2 + src/lib/core/src/chunks/chunk_receiver.rs | 9 ++++ src/lib/utils/logging/Cargo.toml | 1 + src/lib/utils/logging/src/lib.rs | 3 +- 9 files changed, 105 insertions(+), 33 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 11be7aa0..4a532388 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ # - Workspace lints # - Workspace dependencies. + [workspace] resolver = "2" @@ -104,6 +105,7 @@ ferrumc-utils = { path = "src/lib/utils" } ferrumc-world = { path = "src/lib/world" } + # Asynchronous tokio = { version = "1.40.0", features = ["full"] } socket2 = "0.5.7" @@ -114,6 +116,7 @@ async-trait = "0.1.82" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } log = "0.4.22" +console-subscriber = "0.4.1" # Concurrency/Parallelism parking_lot = "0.12.3" diff --git a/src/bin/src/packet_handlers/login_process.rs b/src/bin/src/packet_handlers/login_process.rs index 4c725290..b2a97ffe 100644 --- a/src/bin/src/packet_handlers/login_process.rs +++ b/src/bin/src/packet_handlers/login_process.rs @@ -42,7 +42,8 @@ async fn handle_login_start( state.universe.add_component::( login_start_event.conn_id, PlayerIdentity::new(username.to_string(), uuid), - )?; + )? + .add_component::(login_start_event.conn_id, ChunkReceiver::default())?; //Send a Login Success Response to further the login sequence let mut writer = state @@ -132,8 +133,7 @@ async fn handle_ack_finish_configuration( .universe .add_component::(conn_id, Position::default())? .add_component::(conn_id, Rotation::default())? - .add_component::(conn_id, OnGround::default())? - .add_component::(conn_id, ChunkReceiver::default())?; + .add_component::(conn_id, OnGround::default())?; let mut writer = state.universe.get_mut::(conn_id)?; @@ -170,7 +170,14 @@ async fn handle_ack_finish_configuration( &NetEncodeOpts::WithLength, ) .await?; + + let pos = state.universe.get_mut::(conn_id)?; + let mut chunk_recv = state.universe.get_mut::(conn_id)?; + chunk_recv.last_chunk = Some((pos.x as i32, pos.z as i32, String::from("overworld"))); + chunk_recv.calculate_chunks().await; + send_keep_alive(conn_id, state, &mut writer).await?; + Ok(ack_finish_configuration_event) } diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index 9c1ae3cc..9f178a70 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -1,23 +1,69 @@ +use crate::errors::BinaryError; use crate::systems::definition::System; use async_trait::async_trait; +use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; +use ferrumc_core::identity::player_identity::PlayerIdentity; use ferrumc_state::GlobalState; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use tracing::{debug, info}; +use tokio::task::JoinSet; +use tracing::{error, info, trace}; pub struct ChunkFetcher { stop: AtomicBool, } +impl ChunkFetcher { + pub(crate) fn new() -> Self { + Self { + stop: AtomicBool::new(false), + } + } +} + #[async_trait] impl System for ChunkFetcher { async fn start(self: Arc, state: GlobalState) { info!("Chunk fetcher system started"); - while !self.stop.load(std::sync::atomic::Ordering::Relaxed) {} + while !self.stop.load(std::sync::atomic::Ordering::Relaxed) { + let mut taskset: JoinSet> = JoinSet::new(); + let players = state + .universe + .query::<(&PlayerIdentity, &mut ChunkReceiver)>(); + for (eid, (_, chunk_recv)) in players { + let state = state.clone(); + //taskset.spawn(async move { + let player = state + .universe + .get::(eid) + .expect("PlayerIdentity not found"); + trace!("Checking chunks for player: {}", player.username); + for mut chunks in chunk_recv.needed_chunks.iter_mut() { + let (key, chunk) = chunks.pair_mut(); + if chunk.is_none() { + trace!("Fetching chunk: {:?}", key); + let fetched_chunk = state + .world + .load_chunk(key.0, key.1, &key.2.clone()) + .await + .unwrap(); + *chunk = Some(fetched_chunk); + } + } + // Ok(()) + //}); + } + while let Some(result) = taskset.join_next().await { + if let Err(e) = result { + error!("Error fetching chunk: {:?}", e); + } + } + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } } - async fn stop(self: Arc, state: GlobalState) { + async fn stop(self: Arc, _: GlobalState) { self.stop.store(true, std::sync::atomic::Ordering::Relaxed); } diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index 54f572e7..75dc71d7 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -1,6 +1,7 @@ use crate::systems::definition::System; use async_trait::async_trait; use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; +use ferrumc_core::identity::player_identity::PlayerIdentity; use ferrumc_ecs::errors::ECSError; use ferrumc_net::connection::StreamWriter; use ferrumc_net::packets::outgoing::chunk_and_light_data::ChunkAndLightData; @@ -9,7 +10,8 @@ use ferrumc_state::GlobalState; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, error, info}; +use tokio::task::JoinSet; +use tracing::{debug, error, info, trace}; pub(super) struct ChunkSenderSystem { pub stop: AtomicBool, @@ -29,40 +31,39 @@ impl System for ChunkSenderSystem { info!("Chunk sender system started"); while !self.stop.load(Ordering::Relaxed) { - debug!("Sending chunks to players"); let players = state .universe .query::<(&mut ChunkReceiver, &mut StreamWriter)>(); - let mut task_set: tokio::task::JoinSet> = - tokio::task::JoinSet::new(); - for (eid, (_, _)) in players { + let mut task_set: JoinSet> = JoinSet::new(); + for (eid, (chunk_recv, mut conn)) in players { let state = state.clone(); - task_set.spawn(async move { - let chunk_recv = state.universe.get_mut::<&mut ChunkReceiver>(eid)?; - for possible_chunk in chunk_recv.needed_chunks.iter_mut() { - let (key, possible_chunk) = possible_chunk.pair(); + // task_set.spawn(async move { + trace!("Checking chunks for player"); + for possible_chunk in chunk_recv.needed_chunks.iter_mut() { + let (key, possible_chunk) = possible_chunk.pair(); + if let Some(chunk) = possible_chunk { + trace!("Sending chunk: {:?}", key); let _ = chunk_recv.needed_chunks.remove(key); - if let Some(chunk) = possible_chunk { - let packet = &ChunkAndLightData::from_chunk(chunk); - match packet { - Ok(packet) => { - let mut conn = - state.universe.get_mut::<&mut StreamWriter>(eid)?; - if let Err(e) = - conn.send_packet(packet, &NetEncodeOpts::WithLength).await - { - error!("Error sending chunk: {:?}", e); - } - return Ok(()); - } - Err(e) => { + let packet = &ChunkAndLightData::from_chunk(chunk); + match packet { + Ok(packet) => { + let player = state.universe.get::(eid).unwrap(); + trace!("Sending chunk {}, {} to {}", key.0, key.1, player.username); + if let Err(e) = + conn.send_packet(packet, &NetEncodeOpts::WithLength).await + { error!("Error sending chunk: {:?}", e); } + trace!("Sent chunk {}, {} to {}", key.0, key.1, player.username); + } + Err(e) => { + error!("Error sending chunk: {:?}", e); } } } - Ok(()) - }); + } + // Ok(()) + // }); } while let Some(result) = task_set.join_next().await { if let Err(e) = result { @@ -70,7 +71,7 @@ impl System for ChunkSenderSystem { } } - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_millis(200)).await; } } diff --git a/src/bin/src/systems/definition.rs b/src/bin/src/systems/definition.rs index 9873becb..9e254eea 100644 --- a/src/bin/src/systems/definition.rs +++ b/src/bin/src/systems/definition.rs @@ -8,6 +8,7 @@ use ferrumc_state::GlobalState; use futures::stream::FuturesUnordered; use std::sync::{Arc, LazyLock}; use tracing::{debug, debug_span, info, Instrument}; +use crate::systems::chunk_fetcher::ChunkFetcher; #[async_trait] pub trait System: Send + Sync { @@ -24,6 +25,7 @@ pub fn create_systems() -> Vec> { Arc::new(KeepAliveSystem::new()), Arc::new(TickingSystem), Arc::new(ChunkSenderSystem::new()), + Arc::new(ChunkFetcher::new()), ] } pub async fn start_all_systems(state: GlobalState) -> NetResult<()> { diff --git a/src/lib/core/Cargo.toml b/src/lib/core/Cargo.toml index 853f9f0d..435b049c 100644 --- a/src/lib/core/Cargo.toml +++ b/src/lib/core/Cargo.toml @@ -10,3 +10,5 @@ tokio = { workspace = true } ferrumc-ecs = { workspace = true } dashmap = { workspace = true } ferrumc-world = { workspace = true } +tracing = { workspace = true } +log = "0.4.22" diff --git a/src/lib/core/src/chunks/chunk_receiver.rs b/src/lib/core/src/chunks/chunk_receiver.rs index 6aeae30e..d19e3219 100644 --- a/src/lib/core/src/chunks/chunk_receiver.rs +++ b/src/lib/core/src/chunks/chunk_receiver.rs @@ -1,6 +1,7 @@ use dashmap::{DashMap, DashSet}; use ferrumc_world::chunk_format::Chunk; use tokio::time::Instant; +use tracing::trace; const VIEW_DISTANCE: i32 = 12; pub struct ChunkReceiver { @@ -30,14 +31,22 @@ impl ChunkReceiver { impl ChunkReceiver { pub async fn calculate_chunks(&mut self) { if let Some(last_chunk) = &self.last_chunk { + trace!("Calculating chunks"); self.can_see.clear(); for x in last_chunk.0 - VIEW_DISTANCE..=last_chunk.0 + VIEW_DISTANCE { for z in last_chunk.1 - VIEW_DISTANCE..=last_chunk.1 + VIEW_DISTANCE { + if self.can_see.contains(&(x, z, last_chunk.2.clone())) { + continue; + } self.needed_chunks .insert((x, z, last_chunk.2.clone()), None); self.can_see.insert((x, z, last_chunk.2.clone())); } } + trace!( + "Calculated chunks: {} chunks queued", + self.needed_chunks.len() + ); } } } diff --git a/src/lib/utils/logging/Cargo.toml b/src/lib/utils/logging/Cargo.toml index 91fd770f..5bc7a18d 100644 --- a/src/lib/utils/logging/Cargo.toml +++ b/src/lib/utils/logging/Cargo.toml @@ -9,3 +9,4 @@ tracing-subscriber = { workspace = true } tokio = { workspace = true } ferrumc-profiling = { workspace = true } thiserror = { workspace = true } +console-subscriber = { workspace = true } diff --git a/src/lib/utils/logging/src/lib.rs b/src/lib/utils/logging/src/lib.rs index 17afe9a5..6a780525 100644 --- a/src/lib/utils/logging/src/lib.rs +++ b/src/lib/utils/logging/src/lib.rs @@ -7,7 +7,8 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; -pub fn init_logging(trace_level: tracing::Level) { +pub fn init_logging(trace_level: Level) { + // console_subscriber::init(); let env_filter = EnvFilter::from_default_env().add_directive(trace_level.into()); let mut fmt_layer = Layer::default(); From 68c71efda9a5370e2aa5d58e8c115799b35e7a8a Mon Sep 17 00:00:00 2001 From: ReCore Date: Mon, 9 Dec 2024 17:24:05 +1030 Subject: [PATCH 04/22] Deadlocking fixed --- .../transform/update_player_position.rs | 4 + src/bin/src/systems/chunk_fetcher.rs | 1 - src/bin/src/systems/chunk_sender.rs | 73 +++++++++++++------ src/lib/core/src/chunks/chunk_receiver.rs | 17 ++--- 4 files changed, 60 insertions(+), 35 deletions(-) diff --git a/src/bin/src/packet_handlers/transform/update_player_position.rs b/src/bin/src/packet_handlers/transform/update_player_position.rs index ceea4fc0..a3dd06e0 100644 --- a/src/bin/src/packet_handlers/transform/update_player_position.rs +++ b/src/bin/src/packet_handlers/transform/update_player_position.rs @@ -1,4 +1,6 @@ +use tracing::trace; use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; +use ferrumc_core::identity::player_identity::PlayerIdentity; use ferrumc_core::transform::grounded::OnGround; use ferrumc_core::transform::position::Position; use ferrumc_core::transform::rotation::Rotation; @@ -26,6 +28,8 @@ async fn handle_player_move( String::from("overworld"), ); if *last_chunk != new_chunk { + let player = state.universe.get::(conn_id)?; + trace!("Player {} crossed chunk boundary", player.username); chunk_recv.last_chunk = Some(new_chunk); calculate_chunks = true; } diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index 9f178a70..6d75b2b5 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -38,7 +38,6 @@ impl System for ChunkFetcher { .universe .get::(eid) .expect("PlayerIdentity not found"); - trace!("Checking chunks for player: {}", player.username); for mut chunks in chunk_recv.needed_chunks.iter_mut() { let (key, chunk) = chunks.pair_mut(); if chunk.is_none() { diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index 75dc71d7..5a6246b9 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -5,7 +5,9 @@ use ferrumc_core::identity::player_identity::PlayerIdentity; use ferrumc_ecs::errors::ECSError; use ferrumc_net::connection::StreamWriter; use ferrumc_net::packets::outgoing::chunk_and_light_data::ChunkAndLightData; +use ferrumc_net::packets::outgoing::set_center_chunk::SetCenterChunk; use ferrumc_net_codec::encode::NetEncodeOpts; +use ferrumc_net_codec::net_types::var_int::VarInt; use ferrumc_state::GlobalState; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -35,35 +37,60 @@ impl System for ChunkSenderSystem { .universe .query::<(&mut ChunkReceiver, &mut StreamWriter)>(); let mut task_set: JoinSet> = JoinSet::new(); - for (eid, (chunk_recv, mut conn)) in players { + for (eid, (_, _)) in players { let state = state.clone(); - // task_set.spawn(async move { - trace!("Checking chunks for player"); - for possible_chunk in chunk_recv.needed_chunks.iter_mut() { - let (key, possible_chunk) = possible_chunk.pair(); - if let Some(chunk) = possible_chunk { - trace!("Sending chunk: {:?}", key); - let _ = chunk_recv.needed_chunks.remove(key); - let packet = &ChunkAndLightData::from_chunk(chunk); - match packet { - Ok(packet) => { - let player = state.universe.get::(eid).unwrap(); - trace!("Sending chunk {}, {} to {}", key.0, key.1, player.username); - if let Err(e) = - conn.send_packet(packet, &NetEncodeOpts::WithLength).await - { + task_set.spawn(async move { + let chunk_recv = state + .universe + .get_mut::(eid) + .expect("ChunkReceiver not found"); + let mut conn = state + .universe + .get_mut::(eid) + .expect("StreamWriter not found"); + let mut to_drop = Vec::new(); + if let Some(last_chunk) = &chunk_recv.last_chunk { + let packet = SetCenterChunk { + x: VarInt::from(last_chunk.0), + z: VarInt::from(last_chunk.1), + }; + if let Err(e) = conn.send_packet(&packet, &NetEncodeOpts::WithLength).await + { + error!("Error sending chunk: {:?}", e); + } + } else { + debug!("No last chunk found"); + } + for possible_chunk in chunk_recv.needed_chunks.iter_mut() { + if let Some(chunk) = possible_chunk.pair().1 { + let key = possible_chunk.pair().0; + to_drop.push(key.clone()); + match ChunkAndLightData::from_chunk(&chunk.clone()) { + Ok(packet) => { + let player = state.universe.get::(eid).unwrap(); + trace!( + "Sending chunk {}, {} to {}", + key.0, + key.1, + player.username + ); + if let Err(e) = + conn.send_packet(&packet, &NetEncodeOpts::WithLength).await + { + error!("Error sending chunk: {:?}", e); + } + } + Err(e) => { error!("Error sending chunk: {:?}", e); } - trace!("Sent chunk {}, {} to {}", key.0, key.1, player.username); - } - Err(e) => { - error!("Error sending chunk: {:?}", e); } } } - } - // Ok(()) - // }); + for key in to_drop { + chunk_recv.needed_chunks.remove(&key); + } + Ok(()) + }); } while let Some(result) = task_set.join_next().await { if let Err(e) = result { diff --git a/src/lib/core/src/chunks/chunk_receiver.rs b/src/lib/core/src/chunks/chunk_receiver.rs index d19e3219..3c068046 100644 --- a/src/lib/core/src/chunks/chunk_receiver.rs +++ b/src/lib/core/src/chunks/chunk_receiver.rs @@ -31,22 +31,17 @@ impl ChunkReceiver { impl ChunkReceiver { pub async fn calculate_chunks(&mut self) { if let Some(last_chunk) = &self.last_chunk { - trace!("Calculating chunks"); - self.can_see.clear(); + let new_can_see = DashSet::new(); for x in last_chunk.0 - VIEW_DISTANCE..=last_chunk.0 + VIEW_DISTANCE { for z in last_chunk.1 - VIEW_DISTANCE..=last_chunk.1 + VIEW_DISTANCE { - if self.can_see.contains(&(x, z, last_chunk.2.clone())) { - continue; + if !self.can_see.contains(&(x, z, last_chunk.2.clone())) { + self.needed_chunks + .insert((x, z, last_chunk.2.clone()), None); } - self.needed_chunks - .insert((x, z, last_chunk.2.clone()), None); - self.can_see.insert((x, z, last_chunk.2.clone())); + new_can_see.insert((x, z, last_chunk.2.clone())); } } - trace!( - "Calculated chunks: {} chunks queued", - self.needed_chunks.len() - ); + self.can_see = new_can_see; } } } From 7d7a66dd61ca96502aa2cfb9fbc60a9308b1417f Mon Sep 17 00:00:00 2001 From: ReCore Date: Mon, 9 Dec 2024 18:39:12 +1030 Subject: [PATCH 05/22] Changes to logging and view range --- src/bin/src/systems/chunk_sender.rs | 10 ++++++---- src/lib/net/src/packets/outgoing/login_play.rs | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index 5a6246b9..2ecb6b40 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -44,19 +44,21 @@ impl System for ChunkSenderSystem { .universe .get_mut::(eid) .expect("ChunkReceiver not found"); + if chunk_recv.needed_chunks.is_empty() { + return Ok(()); + } let mut conn = state .universe .get_mut::(eid) .expect("StreamWriter not found"); let mut to_drop = Vec::new(); if let Some(last_chunk) = &chunk_recv.last_chunk { - let packet = SetCenterChunk { - x: VarInt::from(last_chunk.0), - z: VarInt::from(last_chunk.1), - }; + let packet = SetCenterChunk::new(last_chunk.0, last_chunk.1); if let Err(e) = conn.send_packet(&packet, &NetEncodeOpts::WithLength).await { error!("Error sending chunk: {:?}", e); + } else { + trace!("Sent center chunk as {}, {}", last_chunk.0, last_chunk.1); } } else { debug!("No last chunk found"); diff --git a/src/lib/net/src/packets/outgoing/login_play.rs b/src/lib/net/src/packets/outgoing/login_play.rs index b22f99cc..7b06a638 100644 --- a/src/lib/net/src/packets/outgoing/login_play.rs +++ b/src/lib/net/src/packets/outgoing/login_play.rs @@ -37,8 +37,8 @@ impl LoginPlayPacket<'_> { dimension_length: VarInt::from(1), dimension_names: &["minecraft:overworld"], max_players: VarInt::from(20), - view_distance: VarInt::from(10), - simulation_distance: VarInt::from(10), + view_distance: VarInt::from(12), + simulation_distance: VarInt::from(12), reduced_debug_info: false, enable_respawn_screen: true, do_limited_crafting: false, From 5f25545c033c8169d7435bd8a005b197a0f102c3 Mon Sep 17 00:00:00 2001 From: ReCore Date: Tue, 10 Dec 2024 11:54:32 +1030 Subject: [PATCH 06/22] clippy + fmt --- src/bin/src/packet_handlers/login_process.rs | 15 ++++++++------- .../transform/update_player_position.rs | 2 +- src/bin/src/systems/chunk_fetcher.rs | 6 +----- src/bin/src/systems/chunk_sender.rs | 1 - src/bin/src/systems/definition.rs | 2 +- src/lib/core/src/chunks/chunk_receiver.rs | 1 - src/lib/core/src/chunks/mod.rs | 2 +- src/lib/core/src/lib.rs | 2 +- 8 files changed, 13 insertions(+), 18 deletions(-) diff --git a/src/bin/src/packet_handlers/login_process.rs b/src/bin/src/packet_handlers/login_process.rs index b2a97ffe..0fc450ea 100644 --- a/src/bin/src/packet_handlers/login_process.rs +++ b/src/bin/src/packet_handlers/login_process.rs @@ -1,3 +1,4 @@ +use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; use ferrumc_core::identity::player_identity::PlayerIdentity; use ferrumc_core::transform::grounded::OnGround; use ferrumc_core::transform::position::Position; @@ -25,7 +26,6 @@ use ferrumc_net::packets::outgoing::synchronize_player_position::SynchronizePlay use ferrumc_net_codec::encode::NetEncodeOpts; use ferrumc_state::GlobalState; use tracing::{debug, trace}; -use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; #[event_handler] async fn handle_login_start( @@ -39,10 +39,12 @@ async fn handle_login_start( debug!("Received login start from user with username {}", username); // Add the player identity component to the ECS for the entity. - state.universe.add_component::( - login_start_event.conn_id, - PlayerIdentity::new(username.to_string(), uuid), - )? + state + .universe + .add_component::( + login_start_event.conn_id, + PlayerIdentity::new(username.to_string(), uuid), + )? .add_component::(login_start_event.conn_id, ChunkReceiver::default())?; //Send a Login Success Response to further the login sequence @@ -175,9 +177,8 @@ async fn handle_ack_finish_configuration( let mut chunk_recv = state.universe.get_mut::(conn_id)?; chunk_recv.last_chunk = Some((pos.x as i32, pos.z as i32, String::from("overworld"))); chunk_recv.calculate_chunks().await; - + send_keep_alive(conn_id, state, &mut writer).await?; - Ok(ack_finish_configuration_event) } diff --git a/src/bin/src/packet_handlers/transform/update_player_position.rs b/src/bin/src/packet_handlers/transform/update_player_position.rs index a3dd06e0..86c253e4 100644 --- a/src/bin/src/packet_handlers/transform/update_player_position.rs +++ b/src/bin/src/packet_handlers/transform/update_player_position.rs @@ -1,4 +1,3 @@ -use tracing::trace; use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; use ferrumc_core::identity::player_identity::PlayerIdentity; use ferrumc_core::transform::grounded::OnGround; @@ -9,6 +8,7 @@ use ferrumc_net::errors::NetError; use ferrumc_net::packets::packet_events::TransformEvent; use ferrumc_net::utils::ecs_helpers::EntityExt; use ferrumc_state::GlobalState; +use tracing::trace; #[event_handler] async fn handle_player_move( diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index 6d75b2b5..07b48d70 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -31,13 +31,9 @@ impl System for ChunkFetcher { let players = state .universe .query::<(&PlayerIdentity, &mut ChunkReceiver)>(); - for (eid, (_, chunk_recv)) in players { + for (_eid, (_, chunk_recv)) in players { let state = state.clone(); //taskset.spawn(async move { - let player = state - .universe - .get::(eid) - .expect("PlayerIdentity not found"); for mut chunks in chunk_recv.needed_chunks.iter_mut() { let (key, chunk) = chunks.pair_mut(); if chunk.is_none() { diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index 2ecb6b40..b9e39430 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -7,7 +7,6 @@ use ferrumc_net::connection::StreamWriter; use ferrumc_net::packets::outgoing::chunk_and_light_data::ChunkAndLightData; use ferrumc_net::packets::outgoing::set_center_chunk::SetCenterChunk; use ferrumc_net_codec::encode::NetEncodeOpts; -use ferrumc_net_codec::net_types::var_int::VarInt; use ferrumc_state::GlobalState; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; diff --git a/src/bin/src/systems/definition.rs b/src/bin/src/systems/definition.rs index 9e254eea..b0608b10 100644 --- a/src/bin/src/systems/definition.rs +++ b/src/bin/src/systems/definition.rs @@ -1,3 +1,4 @@ +use crate::systems::chunk_fetcher::ChunkFetcher; use crate::systems::chunk_sender::ChunkSenderSystem; use crate::systems::keep_alive_system::KeepAliveSystem; use crate::systems::tcp_listener_system::TcpListenerSystem; @@ -8,7 +9,6 @@ use ferrumc_state::GlobalState; use futures::stream::FuturesUnordered; use std::sync::{Arc, LazyLock}; use tracing::{debug, debug_span, info, Instrument}; -use crate::systems::chunk_fetcher::ChunkFetcher; #[async_trait] pub trait System: Send + Sync { diff --git a/src/lib/core/src/chunks/chunk_receiver.rs b/src/lib/core/src/chunks/chunk_receiver.rs index 3c068046..b0e3c8ea 100644 --- a/src/lib/core/src/chunks/chunk_receiver.rs +++ b/src/lib/core/src/chunks/chunk_receiver.rs @@ -1,7 +1,6 @@ use dashmap::{DashMap, DashSet}; use ferrumc_world::chunk_format::Chunk; use tokio::time::Instant; -use tracing::trace; const VIEW_DISTANCE: i32 = 12; pub struct ChunkReceiver { diff --git a/src/lib/core/src/chunks/mod.rs b/src/lib/core/src/chunks/mod.rs index c64e1905..4f733ac2 100644 --- a/src/lib/core/src/chunks/mod.rs +++ b/src/lib/core/src/chunks/mod.rs @@ -1 +1 @@ -pub mod chunk_receiver; \ No newline at end of file +pub mod chunk_receiver; diff --git a/src/lib/core/src/lib.rs b/src/lib/core/src/lib.rs index 0796791e..93d6b8d4 100644 --- a/src/lib/core/src/lib.rs +++ b/src/lib/core/src/lib.rs @@ -1,7 +1,7 @@ pub mod errors; // Core structs/types. Usually used in ECS Components. +pub mod chunks; pub mod identity; pub mod state; pub mod transform; -pub mod chunks; From 5ceb663b0810823cf16c339cf084090f69a37a01 Mon Sep 17 00:00:00 2001 From: ReCore Date: Tue, 10 Dec 2024 13:42:40 +1030 Subject: [PATCH 07/22] Fixed the other deadlock (maybe) --- src/bin/Cargo.toml | 2 +- src/bin/src/main.rs | 27 +++++++++++ .../transform/update_player_position.rs | 1 + src/bin/src/systems/chunk_fetcher.rs | 45 ++++++++++++------- src/bin/src/systems/chunk_sender.rs | 35 +++++++-------- src/lib/utils/logging/src/lib.rs | 8 +++- 6 files changed, 80 insertions(+), 38 deletions(-) diff --git a/src/bin/Cargo.toml b/src/bin/Cargo.toml index 803db5a7..0c0aaaea 100644 --- a/src/bin/Cargo.toml +++ b/src/bin/Cargo.toml @@ -28,7 +28,7 @@ ferrumc-general-purpose = { workspace = true } ferrumc-state = { workspace = true } ctor = { workspace = true } -parking_lot = { workspace = true } +parking_lot = { workspace = true, features = ["deadlock_detection"] } tracing = { workspace = true } tokio = { workspace = true } rayon = { workspace = true } diff --git a/src/bin/src/main.rs b/src/bin/src/main.rs index bbd8fdd9..4ee7ba72 100644 --- a/src/bin/src/main.rs +++ b/src/bin/src/main.rs @@ -27,6 +27,8 @@ async fn main() { let cli_args = CLIArgs::parse(); ferrumc_logging::init_logging(cli_args.log.into()); + check_deadlocks(); + match cli_args.command { Some(Command::Setup) => { info!("Starting setup..."); @@ -107,3 +109,28 @@ async fn create_state() -> Result { world: World::new().await, }) } +fn check_deadlocks() { + { + use parking_lot::deadlock; + use std::thread; + use std::time::Duration; + + // Create a background thread which checks for deadlocks every 10s + thread::spawn(move || loop { + thread::sleep(Duration::from_secs(10)); + let deadlocks = deadlock::check_deadlock(); + if deadlocks.is_empty() { + continue; + } + + println!("{} deadlocks detected", deadlocks.len()); + for (i, threads) in deadlocks.iter().enumerate() { + println!("Deadlock #{}", i); + for t in threads { + println!("Thread Id {:#?}", t.thread_id()); + println!("{:#?}", t.backtrace()); + } + } + }); + } +} diff --git a/src/bin/src/packet_handlers/transform/update_player_position.rs b/src/bin/src/packet_handlers/transform/update_player_position.rs index 86c253e4..198739f1 100644 --- a/src/bin/src/packet_handlers/transform/update_player_position.rs +++ b/src/bin/src/packet_handlers/transform/update_player_position.rs @@ -28,6 +28,7 @@ async fn handle_player_move( String::from("overworld"), ); if *last_chunk != new_chunk { + trace!("Player moved to new chunk"); let player = state.universe.get::(conn_id)?; trace!("Player {} crossed chunk boundary", player.username); chunk_recv.last_chunk = Some(new_chunk); diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index 07b48d70..37caa622 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -31,27 +31,38 @@ impl System for ChunkFetcher { let players = state .universe .query::<(&PlayerIdentity, &mut ChunkReceiver)>(); - for (_eid, (_, chunk_recv)) in players { + for (_eid, (_, _)) in players { let state = state.clone(); - //taskset.spawn(async move { - for mut chunks in chunk_recv.needed_chunks.iter_mut() { - let (key, chunk) = chunks.pair_mut(); - if chunk.is_none() { - trace!("Fetching chunk: {:?}", key); - let fetched_chunk = state - .world - .load_chunk(key.0, key.1, &key.2.clone()) - .await - .unwrap(); - *chunk = Some(fetched_chunk); + taskset.spawn(async move { + let chunk_recv = state + .universe + .get_mut::(_eid) + .expect("ChunkReceiver not found"); + for mut chunks in chunk_recv.needed_chunks.iter_mut() { + let (key, chunk) = chunks.pair_mut(); + if chunk.is_none() { + trace!("Fetching chunk: {:?}", key); + let fetched_chunk = state + .world + .load_chunk(key.0, key.1, &key.2.clone()) + .await + .unwrap(); + *chunk = Some(fetched_chunk); + } } - } - // Ok(()) - //}); + Ok(()) + }); } while let Some(result) = taskset.join_next().await { - if let Err(e) = result { - error!("Error fetching chunk: {:?}", e); + match result { + Ok(task_res) => { + if let Err(e) = task_res { + error!("Error fetching chunk: {:?}", e); + } + } + Err(e) => { + error!("Error fetching chunk: {:?}", e); + } } } tokio::time::sleep(std::time::Duration::from_millis(5)).await; diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index b9e39430..afd96836 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -1,7 +1,7 @@ use crate::systems::definition::System; use async_trait::async_trait; use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; -use ferrumc_core::identity::player_identity::PlayerIdentity; +use ferrumc_core::transform::position::Position; use ferrumc_ecs::errors::ECSError; use ferrumc_net::connection::StreamWriter; use ferrumc_net::packets::outgoing::chunk_and_light_data::ChunkAndLightData; @@ -12,7 +12,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::task::JoinSet; -use tracing::{debug, error, info, trace}; +use tracing::{error, info, trace}; pub(super) struct ChunkSenderSystem { pub stop: AtomicBool, @@ -34,9 +34,9 @@ impl System for ChunkSenderSystem { while !self.stop.load(Ordering::Relaxed) { let players = state .universe - .query::<(&mut ChunkReceiver, &mut StreamWriter)>(); + .query::<(&mut ChunkReceiver, &mut StreamWriter, &Position)>(); let mut task_set: JoinSet> = JoinSet::new(); - for (eid, (_, _)) in players { + for (eid, (_, _, _)) in players { let state = state.clone(); task_set.spawn(async move { let chunk_recv = state @@ -51,16 +51,14 @@ impl System for ChunkSenderSystem { .get_mut::(eid) .expect("StreamWriter not found"); let mut to_drop = Vec::new(); - if let Some(last_chunk) = &chunk_recv.last_chunk { - let packet = SetCenterChunk::new(last_chunk.0, last_chunk.1); + if let Some(chunk) = &chunk_recv.last_chunk { + let packet = SetCenterChunk::new(chunk.0, chunk.1); if let Err(e) = conn.send_packet(&packet, &NetEncodeOpts::WithLength).await { error!("Error sending chunk: {:?}", e); } else { - trace!("Sent center chunk as {}, {}", last_chunk.0, last_chunk.1); + trace!("Sent center chunk as {}, {}", chunk.0, chunk.1); } - } else { - debug!("No last chunk found"); } for possible_chunk in chunk_recv.needed_chunks.iter_mut() { if let Some(chunk) = possible_chunk.pair().1 { @@ -68,13 +66,7 @@ impl System for ChunkSenderSystem { to_drop.push(key.clone()); match ChunkAndLightData::from_chunk(&chunk.clone()) { Ok(packet) => { - let player = state.universe.get::(eid).unwrap(); - trace!( - "Sending chunk {}, {} to {}", - key.0, - key.1, - player.username - ); + trace!("Sending chunk {}, {} to a player", key.0, key.1,); if let Err(e) = conn.send_packet(&packet, &NetEncodeOpts::WithLength).await { @@ -94,8 +86,15 @@ impl System for ChunkSenderSystem { }); } while let Some(result) = task_set.join_next().await { - if let Err(e) = result { - error!("Error sending chunk: {:?}", e); + match result { + Ok(task_res) => { + if let Err(e) = task_res { + error!("Error sending chunk: {:?}", e); + } + } + Err(e) => { + error!("Error sending chunk: {:?}", e); + } } } diff --git a/src/lib/utils/logging/src/lib.rs b/src/lib/utils/logging/src/lib.rs index 6a780525..aae07be7 100644 --- a/src/lib/utils/logging/src/lib.rs +++ b/src/lib/utils/logging/src/lib.rs @@ -8,8 +8,11 @@ use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; pub fn init_logging(trace_level: Level) { - // console_subscriber::init(); - let env_filter = EnvFilter::from_default_env().add_directive(trace_level.into()); + let console = console_subscriber::spawn(); + let env_filter = EnvFilter::from_default_env() + .add_directive(trace_level.into()) + .add_directive("tokio=off".parse().unwrap()) + .add_directive("runtime=off".parse().unwrap()); let mut fmt_layer = Layer::default(); @@ -24,6 +27,7 @@ pub fn init_logging(trace_level: Level) { let profiler_layer = ProfilerTracingLayer; tracing_subscriber::registry() + .with(console) .with(env_filter) .with(profiler_layer) .with(fmt_layer) From b0c7c7641506086f143a4a2dac59c43c979abdd3 Mon Sep 17 00:00:00 2001 From: ReCore Date: Tue, 10 Dec 2024 15:10:04 +1030 Subject: [PATCH 08/22] Removed console thingy --- src/lib/utils/logging/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib/utils/logging/src/lib.rs b/src/lib/utils/logging/src/lib.rs index aae07be7..0c8bd973 100644 --- a/src/lib/utils/logging/src/lib.rs +++ b/src/lib/utils/logging/src/lib.rs @@ -8,7 +8,7 @@ use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; pub fn init_logging(trace_level: Level) { - let console = console_subscriber::spawn(); + //let console = console_subscriber::spawn(); let env_filter = EnvFilter::from_default_env() .add_directive(trace_level.into()) .add_directive("tokio=off".parse().unwrap()) @@ -27,7 +27,7 @@ pub fn init_logging(trace_level: Level) { let profiler_layer = ProfilerTracingLayer; tracing_subscriber::registry() - .with(console) + // .with(console) .with(env_filter) .with(profiler_layer) .with(fmt_layer) From 046d7fef0662823958906388d09eee6bc22d98ad Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 15 Dec 2024 15:48:23 +1030 Subject: [PATCH 09/22] Chunks now send properly Still deadlocking though --- src/bin/src/systems/chunk_fetcher.rs | 8 ++- src/bin/src/systems/chunk_sender.rs | 54 +++++++++++++++---- src/lib/core/src/chunks/chunk_receiver.rs | 2 +- .../packets/outgoing/chunk_batch_finish.rs | 9 ++++ .../src/packets/outgoing/chunk_batch_start.rs | 6 +++ .../net/src/packets/outgoing/login_play.rs | 4 +- src/lib/net/src/packets/outgoing/mod.rs | 2 + 7 files changed, 68 insertions(+), 17 deletions(-) create mode 100644 src/lib/net/src/packets/outgoing/chunk_batch_finish.rs create mode 100644 src/lib/net/src/packets/outgoing/chunk_batch_start.rs diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index 37caa622..b335c340 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -28,15 +28,13 @@ impl System for ChunkFetcher { while !self.stop.load(std::sync::atomic::Ordering::Relaxed) { let mut taskset: JoinSet> = JoinSet::new(); - let players = state - .universe - .query::<(&PlayerIdentity, &mut ChunkReceiver)>(); - for (_eid, (_, _)) in players { + let players = state.universe.query::<&mut ChunkReceiver>(); + for (eid, _) in players { let state = state.clone(); taskset.spawn(async move { let chunk_recv = state .universe - .get_mut::(_eid) + .get_mut::(eid) .expect("ChunkReceiver not found"); for mut chunks in chunk_recv.needed_chunks.iter_mut() { let (key, chunk) = chunks.pair_mut(); diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index afd96836..42c19bab 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -5,8 +5,11 @@ use ferrumc_core::transform::position::Position; use ferrumc_ecs::errors::ECSError; use ferrumc_net::connection::StreamWriter; use ferrumc_net::packets::outgoing::chunk_and_light_data::ChunkAndLightData; +use ferrumc_net::packets::outgoing::chunk_batch_finish::ChunkBatchFinish; +use ferrumc_net::packets::outgoing::chunk_batch_start::ChunkBatchStart; use ferrumc_net::packets::outgoing::set_center_chunk::SetCenterChunk; use ferrumc_net_codec::encode::NetEncodeOpts; +use ferrumc_net_codec::net_types::var_int::VarInt; use ferrumc_state::GlobalState; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -46,20 +49,30 @@ impl System for ChunkSenderSystem { if chunk_recv.needed_chunks.is_empty() { return Ok(()); } - let mut conn = state - .universe - .get_mut::(eid) - .expect("StreamWriter not found"); let mut to_drop = Vec::new(); - if let Some(chunk) = &chunk_recv.last_chunk { - let packet = SetCenterChunk::new(chunk.0, chunk.1); - if let Err(e) = conn.send_packet(&packet, &NetEncodeOpts::WithLength).await + { + let mut conn = state + .universe + .get_mut::(eid) + .expect("StreamWriter not found"); + if let Some(chunk) = &chunk_recv.last_chunk { + let packet = SetCenterChunk::new(chunk.0, chunk.1); + if let Err(e) = + conn.send_packet(&packet, &NetEncodeOpts::WithLength).await + { + error!("Error sending chunk: {:?}", e); + } else { + trace!("Sent center chunk as {}, {}", chunk.0, chunk.1); + } + } + if let Err(e) = conn + .send_packet(&ChunkBatchStart {}, &NetEncodeOpts::WithLength) + .await { error!("Error sending chunk: {:?}", e); - } else { - trace!("Sent center chunk as {}, {}", chunk.0, chunk.1); } } + let mut sent_chunks = 0; for possible_chunk in chunk_recv.needed_chunks.iter_mut() { if let Some(chunk) = possible_chunk.pair().1 { let key = possible_chunk.pair().0; @@ -67,10 +80,16 @@ impl System for ChunkSenderSystem { match ChunkAndLightData::from_chunk(&chunk.clone()) { Ok(packet) => { trace!("Sending chunk {}, {} to a player", key.0, key.1,); + let mut conn = state + .universe + .get_mut::(eid) + .expect("StreamWriter not found"); if let Err(e) = conn.send_packet(&packet, &NetEncodeOpts::WithLength).await { error!("Error sending chunk: {:?}", e); + } else { + sent_chunks += 1; } } Err(e) => { @@ -79,6 +98,23 @@ impl System for ChunkSenderSystem { } } } + { + let mut conn = state + .universe + .get_mut::(eid) + .expect("StreamWriter not found"); + if let Err(e) = conn + .send_packet( + &ChunkBatchFinish { + batch_size: VarInt::from(sent_chunks), + }, + &NetEncodeOpts::WithLength, + ) + .await + { + error!("Error sending chunk: {:?}", e); + } + } for key in to_drop { chunk_recv.needed_chunks.remove(&key); } diff --git a/src/lib/core/src/chunks/chunk_receiver.rs b/src/lib/core/src/chunks/chunk_receiver.rs index b0e3c8ea..dcae60c0 100644 --- a/src/lib/core/src/chunks/chunk_receiver.rs +++ b/src/lib/core/src/chunks/chunk_receiver.rs @@ -2,7 +2,7 @@ use dashmap::{DashMap, DashSet}; use ferrumc_world::chunk_format::Chunk; use tokio::time::Instant; -const VIEW_DISTANCE: i32 = 12; +const VIEW_DISTANCE: i32 = 8; pub struct ChunkReceiver { pub needed_chunks: DashMap<(i32, i32, String), Option>, pub can_see: DashSet<(i32, i32, String)>, diff --git a/src/lib/net/src/packets/outgoing/chunk_batch_finish.rs b/src/lib/net/src/packets/outgoing/chunk_batch_finish.rs new file mode 100644 index 00000000..a6869ab9 --- /dev/null +++ b/src/lib/net/src/packets/outgoing/chunk_batch_finish.rs @@ -0,0 +1,9 @@ +use ferrumc_macros::{packet, NetEncode}; +use std::io::Write; +use ferrumc_net_codec::net_types::var_int::VarInt; + +#[derive(NetEncode)] +#[packet(packet_id = 0x0C)] +pub struct ChunkBatchFinish { + pub batch_size: VarInt +} diff --git a/src/lib/net/src/packets/outgoing/chunk_batch_start.rs b/src/lib/net/src/packets/outgoing/chunk_batch_start.rs new file mode 100644 index 00000000..11867687 --- /dev/null +++ b/src/lib/net/src/packets/outgoing/chunk_batch_start.rs @@ -0,0 +1,6 @@ +use ferrumc_macros::{packet, NetEncode}; +use std::io::Write; + +#[derive(NetEncode)] +#[packet(packet_id = 0x0D)] +pub struct ChunkBatchStart {} diff --git a/src/lib/net/src/packets/outgoing/login_play.rs b/src/lib/net/src/packets/outgoing/login_play.rs index 7b06a638..14b8b4fc 100644 --- a/src/lib/net/src/packets/outgoing/login_play.rs +++ b/src/lib/net/src/packets/outgoing/login_play.rs @@ -37,8 +37,8 @@ impl LoginPlayPacket<'_> { dimension_length: VarInt::from(1), dimension_names: &["minecraft:overworld"], max_players: VarInt::from(20), - view_distance: VarInt::from(12), - simulation_distance: VarInt::from(12), + view_distance: VarInt::from(2), + simulation_distance: VarInt::from(2), reduced_debug_info: false, enable_respawn_screen: true, do_limited_crafting: false, diff --git a/src/lib/net/src/packets/outgoing/mod.rs b/src/lib/net/src/packets/outgoing/mod.rs index 9c63db7c..90c59d4c 100644 --- a/src/lib/net/src/packets/outgoing/mod.rs +++ b/src/lib/net/src/packets/outgoing/mod.rs @@ -1,4 +1,6 @@ pub mod chunk_and_light_data; +pub mod chunk_batch_finish; +pub mod chunk_batch_start; pub mod client_bound_known_packs; pub mod disconnect; pub mod finish_configuration; From 4deb3a92855ce6f4411fb588f4d8bc22e600db82 Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 15 Dec 2024 17:41:23 +1030 Subject: [PATCH 10/22] Fixed deadlock (probably) --- .../transform/update_player_position.rs | 5 -- src/bin/src/systems/chunk_fetcher.rs | 51 ++++++++++++------- src/bin/src/systems/chunk_sender.rs | 3 -- .../packets/outgoing/chunk_batch_finish.rs | 4 +- 4 files changed, 35 insertions(+), 28 deletions(-) diff --git a/src/bin/src/packet_handlers/transform/update_player_position.rs b/src/bin/src/packet_handlers/transform/update_player_position.rs index 198739f1..ceea4fc0 100644 --- a/src/bin/src/packet_handlers/transform/update_player_position.rs +++ b/src/bin/src/packet_handlers/transform/update_player_position.rs @@ -1,5 +1,4 @@ use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; -use ferrumc_core::identity::player_identity::PlayerIdentity; use ferrumc_core::transform::grounded::OnGround; use ferrumc_core::transform::position::Position; use ferrumc_core::transform::rotation::Rotation; @@ -8,7 +7,6 @@ use ferrumc_net::errors::NetError; use ferrumc_net::packets::packet_events::TransformEvent; use ferrumc_net::utils::ecs_helpers::EntityExt; use ferrumc_state::GlobalState; -use tracing::trace; #[event_handler] async fn handle_player_move( @@ -28,9 +26,6 @@ async fn handle_player_move( String::from("overworld"), ); if *last_chunk != new_chunk { - trace!("Player moved to new chunk"); - let player = state.universe.get::(conn_id)?; - trace!("Player {} crossed chunk boundary", player.username); chunk_recv.last_chunk = Some(new_chunk); calculate_chunks = true; } diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index b335c340..7450d658 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -2,8 +2,8 @@ use crate::errors::BinaryError; use crate::systems::definition::System; use async_trait::async_trait; use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; -use ferrumc_core::identity::player_identity::PlayerIdentity; use ferrumc_state::GlobalState; +use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::Arc; use tokio::task::JoinSet; @@ -27,31 +27,46 @@ impl System for ChunkFetcher { info!("Chunk fetcher system started"); while !self.stop.load(std::sync::atomic::Ordering::Relaxed) { - let mut taskset: JoinSet> = JoinSet::new(); + let mut task_set: JoinSet> = JoinSet::new(); let players = state.universe.query::<&mut ChunkReceiver>(); for (eid, _) in players { let state = state.clone(); - taskset.spawn(async move { - let chunk_recv = state - .universe - .get_mut::(eid) - .expect("ChunkReceiver not found"); - for mut chunks in chunk_recv.needed_chunks.iter_mut() { - let (key, chunk) = chunks.pair_mut(); - if chunk.is_none() { - trace!("Fetching chunk: {:?}", key); - let fetched_chunk = state - .world - .load_chunk(key.0, key.1, &key.2.clone()) - .await - .unwrap(); - *chunk = Some(fetched_chunk); + task_set.spawn(async move { + let mut copied_chunks = { + let chunk_recv = state + .universe + .get_mut::(eid) + .expect("ChunkReceiver not found"); + let mut copied_chunks = HashMap::new(); + for chunk in chunk_recv.needed_chunks.iter() { + let (key, chunk) = chunk.pair(); + if chunk.is_none() { + copied_chunks.insert(key.clone(), None); + } + } + copied_chunks + }; + for (key, chunk) in copied_chunks.iter_mut() { + let fetched_chunk = state + .world + .load_chunk(key.0, key.1, &key.2.clone()) + .await + .unwrap(); + *chunk = Some(fetched_chunk); + } + { + let chunk_recv = state + .universe + .get_mut::(eid) + .expect("ChunkReceiver not found"); + for (key, chunk) in copied_chunks.iter() { + chunk_recv.needed_chunks.insert(key.clone(), chunk.clone()); } } Ok(()) }); } - while let Some(result) = taskset.join_next().await { + while let Some(result) = task_set.join_next().await { match result { Ok(task_res) => { if let Err(e) = task_res { diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index 42c19bab..81950bcd 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -61,8 +61,6 @@ impl System for ChunkSenderSystem { conn.send_packet(&packet, &NetEncodeOpts::WithLength).await { error!("Error sending chunk: {:?}", e); - } else { - trace!("Sent center chunk as {}, {}", chunk.0, chunk.1); } } if let Err(e) = conn @@ -79,7 +77,6 @@ impl System for ChunkSenderSystem { to_drop.push(key.clone()); match ChunkAndLightData::from_chunk(&chunk.clone()) { Ok(packet) => { - trace!("Sending chunk {}, {} to a player", key.0, key.1,); let mut conn = state .universe .get_mut::(eid) diff --git a/src/lib/net/src/packets/outgoing/chunk_batch_finish.rs b/src/lib/net/src/packets/outgoing/chunk_batch_finish.rs index a6869ab9..e15550ea 100644 --- a/src/lib/net/src/packets/outgoing/chunk_batch_finish.rs +++ b/src/lib/net/src/packets/outgoing/chunk_batch_finish.rs @@ -1,9 +1,9 @@ use ferrumc_macros::{packet, NetEncode}; -use std::io::Write; use ferrumc_net_codec::net_types::var_int::VarInt; +use std::io::Write; #[derive(NetEncode)] #[packet(packet_id = 0x0C)] pub struct ChunkBatchFinish { - pub batch_size: VarInt + pub batch_size: VarInt, } From 20c5a737ecebbadea1e85ea9dd2739a0fdf5e9c0 Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 15 Dec 2024 17:57:37 +1030 Subject: [PATCH 11/22] Clippy + changed default log level --- src/bin/src/cli.rs | 2 +- src/bin/src/systems/chunk_fetcher.rs | 2 +- src/bin/src/systems/chunk_sender.rs | 2 +- src/lib/utils/logging/src/lib.rs | 5 +---- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/bin/src/cli.rs b/src/bin/src/cli.rs index 099a40c0..334e5b82 100644 --- a/src/bin/src/cli.rs +++ b/src/bin/src/cli.rs @@ -6,7 +6,7 @@ pub struct CLIArgs { #[command(subcommand)] pub command: Option, #[clap(long)] - #[arg(value_enum, default_value_t = LogLevel(Level::TRACE))] + #[arg(value_enum, default_value_t = LogLevel(Level::DEBUG))] pub log: LogLevel, } diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index 7450d658..f7c8b8e5 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::Arc; use tokio::task::JoinSet; -use tracing::{error, info, trace}; +use tracing::{error, info}; pub struct ChunkFetcher { stop: AtomicBool, diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index 81950bcd..9fe5fbb7 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -15,7 +15,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::task::JoinSet; -use tracing::{error, info, trace}; +use tracing::{error, info}; pub(super) struct ChunkSenderSystem { pub stop: AtomicBool, diff --git a/src/lib/utils/logging/src/lib.rs b/src/lib/utils/logging/src/lib.rs index 0c8bd973..85c5b55c 100644 --- a/src/lib/utils/logging/src/lib.rs +++ b/src/lib/utils/logging/src/lib.rs @@ -9,10 +9,7 @@ use tracing_subscriber::EnvFilter; pub fn init_logging(trace_level: Level) { //let console = console_subscriber::spawn(); - let env_filter = EnvFilter::from_default_env() - .add_directive(trace_level.into()) - .add_directive("tokio=off".parse().unwrap()) - .add_directive("runtime=off".parse().unwrap()); + let env_filter = EnvFilter::from_default_env().add_directive(trace_level.into()); let mut fmt_layer = Layer::default(); From d19ddba20968838d83306a615898a8655a4b1e19 Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 15 Dec 2024 18:04:52 +1030 Subject: [PATCH 12/22] Async closures stabilization clippy --- src/lib/storage/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lib/storage/src/lib.rs b/src/lib/storage/src/lib.rs index 91a57a7f..4bbb6e3d 100644 --- a/src/lib/storage/src/lib.rs +++ b/src/lib/storage/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(async_closure)] pub mod compressors; pub mod errors; pub mod lmdb; From 712639cebbcddd8d479cb1f65feb7f3ddb79a9ca Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 15 Dec 2024 18:39:00 +1030 Subject: [PATCH 13/22] Docs --- src/bin/src/systems/chunk_fetcher.rs | 3 +++ src/bin/src/systems/chunk_sender.rs | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index f7c8b8e5..1a086f77 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -32,6 +32,7 @@ impl System for ChunkFetcher { for (eid, _) in players { let state = state.clone(); task_set.spawn(async move { + // Copy the chunks into a new map so we don't lock the component while fetching let mut copied_chunks = { let chunk_recv = state .universe @@ -46,6 +47,7 @@ impl System for ChunkFetcher { } copied_chunks }; + // Fetch the chunks for (key, chunk) in copied_chunks.iter_mut() { let fetched_chunk = state .world @@ -54,6 +56,7 @@ impl System for ChunkFetcher { .unwrap(); *chunk = Some(fetched_chunk); } + // Insert the fetched chunks back into the component { let chunk_recv = state .universe diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index 9fe5fbb7..44a490b9 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -46,9 +46,12 @@ impl System for ChunkSenderSystem { .universe .get_mut::(eid) .expect("ChunkReceiver not found"); + if chunk_recv.needed_chunks.is_empty() { return Ok(()); } + // We can't delete from the map while iterating, so we collect the keys to drop + // and then drop them after sending the chunks let mut to_drop = Vec::new(); { let mut conn = state @@ -131,7 +134,7 @@ impl System for ChunkSenderSystem { } } - tokio::time::sleep(Duration::from_millis(200)).await; + tokio::time::sleep(Duration::from_millis(5)).await; } } From 7639aad59f986fe39172ef98c9ba144f78e7d819 Mon Sep 17 00:00:00 2001 From: ReCore Date: Mon, 16 Dec 2024 14:56:25 +1030 Subject: [PATCH 14/22] Deadlocking fixed hopefully --- .../transform/update_player_position.rs | 21 +++++----- src/bin/src/systems/chunk_fetcher.rs | 25 ++++++------ src/bin/src/systems/chunk_sender.rs | 39 ++++++++++++++++--- 3 files changed, 58 insertions(+), 27 deletions(-) diff --git a/src/bin/src/packet_handlers/transform/update_player_position.rs b/src/bin/src/packet_handlers/transform/update_player_position.rs index ceea4fc0..4223f2e0 100644 --- a/src/bin/src/packet_handlers/transform/update_player_position.rs +++ b/src/bin/src/packet_handlers/transform/update_player_position.rs @@ -1,3 +1,4 @@ +use tracing::trace; use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; use ferrumc_core::transform::grounded::OnGround; use ferrumc_core::transform::position::Position; @@ -14,11 +15,11 @@ async fn handle_player_move( state: GlobalState, ) -> Result { let conn_id = event.conn_id; - let mut calculate_chunks = false; if let Some(ref new_position) = event.position { - let mut position = conn_id.get_mut::(&state)?; + trace!("Getting chunk_recv 1 for player move"); let mut chunk_recv = state.universe.get_mut::(conn_id)?; + trace!("Got chunk_recv 1 for player move"); if let Some(last_chunk) = &chunk_recv.last_chunk { let new_chunk = ( new_position.x as i32 / 16, @@ -27,7 +28,7 @@ async fn handle_player_move( ); if *last_chunk != new_chunk { chunk_recv.last_chunk = Some(new_chunk); - calculate_chunks = true; + chunk_recv.calculate_chunks().await; } } else { chunk_recv.last_chunk = Some(( @@ -35,28 +36,30 @@ async fn handle_player_move( new_position.z as i32 / 16, String::from("overworld"), )); - calculate_chunks = true; + chunk_recv.calculate_chunks().await; } + trace!("Getting position 1 for player move"); + let mut position = conn_id.get_mut::(&state)?; + trace!("Got position 1 for player move"); *position = Position::new(new_position.x, new_position.y, new_position.z); } if let Some(ref new_rotation) = event.rotation { + trace!("Getting rotation 1 for player move"); let mut rotation = conn_id.get_mut::(&state)?; + trace!("Got rotation 1 for player move"); *rotation = Rotation::new(new_rotation.yaw, new_rotation.pitch); } if let Some(new_grounded) = event.on_ground { + trace!("Getting on_ground 1 for player move"); let mut on_ground = conn_id.get_mut::(&state)?; + trace!("Got on_ground 1 for player move"); *on_ground = OnGround(new_grounded); } - if calculate_chunks { - let mut chunk_recv = state.universe.get_mut::(conn_id)?; - chunk_recv.calculate_chunks().await; - } - Ok(event) } diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index 1a086f77..86930c38 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::Arc; use tokio::task::JoinSet; -use tracing::{error, info}; +use tracing::{error, info, trace}; pub struct ChunkFetcher { stop: AtomicBool, @@ -34,10 +34,12 @@ impl System for ChunkFetcher { task_set.spawn(async move { // Copy the chunks into a new map so we don't lock the component while fetching let mut copied_chunks = { + trace!("Getting chunk_recv 1 for fetcher"); let chunk_recv = state .universe - .get_mut::(eid) + .get::(eid) .expect("ChunkReceiver not found"); + trace!("Got chunk_recv 1 for fetcher"); let mut copied_chunks = HashMap::new(); for chunk in chunk_recv.needed_chunks.iter() { let (key, chunk) = chunk.pair(); @@ -49,19 +51,18 @@ impl System for ChunkFetcher { }; // Fetch the chunks for (key, chunk) in copied_chunks.iter_mut() { - let fetched_chunk = state - .world - .load_chunk(key.0, key.1, &key.2.clone()) - .await - .unwrap(); + let fetched_chunk = + state.world.load_chunk(key.0, key.1, &key.2.clone()).await?; *chunk = Some(fetched_chunk); } // Insert the fetched chunks back into the component { - let chunk_recv = state - .universe - .get_mut::(eid) - .expect("ChunkReceiver not found"); + trace!("Getting chunk_recv 2 for fetcher"); + let Ok(chunk_recv) = state.universe.get::(eid) else { + trace!("A player disconnected before we could get the ChunkReceiver"); + return Ok(()); + }; + trace!("Got chunk_recv 2 for fetcher"); for (key, chunk) in copied_chunks.iter() { chunk_recv.needed_chunks.insert(key.clone(), chunk.clone()); } @@ -81,7 +82,7 @@ impl System for ChunkFetcher { } } } - tokio::time::sleep(std::time::Duration::from_millis(5)).await; + tokio::time::sleep(std::time::Duration::from_millis(1)).await; } } diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index 44a490b9..1a8ea062 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -15,7 +15,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::task::JoinSet; -use tracing::{error, info}; +use tracing::{error, info, trace}; pub(super) struct ChunkSenderSystem { pub stop: AtomicBool, @@ -37,27 +37,37 @@ impl System for ChunkSenderSystem { while !self.stop.load(Ordering::Relaxed) { let players = state .universe - .query::<(&mut ChunkReceiver, &mut StreamWriter, &Position)>(); + .query::<(&mut ChunkReceiver, &mut StreamWriter)>(); let mut task_set: JoinSet> = JoinSet::new(); - for (eid, (_, _, _)) in players { + for (eid, (_, _)) in players { let state = state.clone(); task_set.spawn(async move { + trace!("Getting chunk_recv 1 for sender"); let chunk_recv = state .universe - .get_mut::(eid) + .get::(eid) .expect("ChunkReceiver not found"); - + trace!("Got chunk_recv 1 for sender"); if chunk_recv.needed_chunks.is_empty() { return Ok(()); } + drop(chunk_recv); // We can't delete from the map while iterating, so we collect the keys to drop // and then drop them after sending the chunks let mut to_drop = Vec::new(); { + trace!("Getting conn 1 for sender"); let mut conn = state .universe .get_mut::(eid) .expect("StreamWriter not found"); + trace!("Got conn 1 for sender"); + trace!("Getting chunk_recv 2 for sender"); + let chunk_recv = state + .universe + .get::(eid) + .expect("ChunkReceiver not found"); + trace!("Got chunk_recv 2 for sender"); if let Some(chunk) = &chunk_recv.last_chunk { let packet = SetCenterChunk::new(chunk.0, chunk.1); if let Err(e) = @@ -74,16 +84,24 @@ impl System for ChunkSenderSystem { } } let mut sent_chunks = 0; + trace!("Getting chunk_recv 3 for sender"); + let chunk_recv = state + .universe + .get_mut::(eid) + .expect("ChunkReceiver not found"); + trace!("Got chunk_recv 3 for sender"); for possible_chunk in chunk_recv.needed_chunks.iter_mut() { if let Some(chunk) = possible_chunk.pair().1 { let key = possible_chunk.pair().0; to_drop.push(key.clone()); match ChunkAndLightData::from_chunk(&chunk.clone()) { Ok(packet) => { + trace!("Getting conn 2 for sender"); let mut conn = state .universe .get_mut::(eid) .expect("StreamWriter not found"); + trace!("Got conn 2 for sender"); if let Err(e) = conn.send_packet(&packet, &NetEncodeOpts::WithLength).await { @@ -98,11 +116,14 @@ impl System for ChunkSenderSystem { } } } + drop(chunk_recv); { + trace!("Getting conn 3 for sender"); let mut conn = state .universe .get_mut::(eid) .expect("StreamWriter not found"); + trace!("Got conn 3 for sender"); if let Err(e) = conn .send_packet( &ChunkBatchFinish { @@ -115,6 +136,12 @@ impl System for ChunkSenderSystem { error!("Error sending chunk: {:?}", e); } } + trace!("Getting chunk_recv 4 for sender"); + let chunk_recv = state + .universe + .get_mut::(eid) + .expect("ChunkReceiver not found"); + trace!("Got chunk_recv 4 for sender"); for key in to_drop { chunk_recv.needed_chunks.remove(&key); } @@ -134,7 +161,7 @@ impl System for ChunkSenderSystem { } } - tokio::time::sleep(Duration::from_millis(5)).await; + tokio::time::sleep(Duration::from_millis(1)).await; } } From a1fe01a8e5c5146fe829921f2c178e6e43548636 Mon Sep 17 00:00:00 2001 From: ReCore Date: Mon, 16 Dec 2024 14:57:39 +1030 Subject: [PATCH 15/22] clippy + formatting --- .../src/packet_handlers/transform/update_player_position.rs | 3 +-- src/bin/src/systems/chunk_sender.rs | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/bin/src/packet_handlers/transform/update_player_position.rs b/src/bin/src/packet_handlers/transform/update_player_position.rs index 4223f2e0..6487e68f 100644 --- a/src/bin/src/packet_handlers/transform/update_player_position.rs +++ b/src/bin/src/packet_handlers/transform/update_player_position.rs @@ -1,4 +1,3 @@ -use tracing::trace; use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; use ferrumc_core::transform::grounded::OnGround; use ferrumc_core::transform::position::Position; @@ -8,6 +7,7 @@ use ferrumc_net::errors::NetError; use ferrumc_net::packets::packet_events::TransformEvent; use ferrumc_net::utils::ecs_helpers::EntityExt; use ferrumc_state::GlobalState; +use tracing::trace; #[event_handler] async fn handle_player_move( @@ -16,7 +16,6 @@ async fn handle_player_move( ) -> Result { let conn_id = event.conn_id; if let Some(ref new_position) = event.position { - trace!("Getting chunk_recv 1 for player move"); let mut chunk_recv = state.universe.get_mut::(conn_id)?; trace!("Got chunk_recv 1 for player move"); diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index 1a8ea062..29fab873 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -1,7 +1,6 @@ use crate::systems::definition::System; use async_trait::async_trait; use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; -use ferrumc_core::transform::position::Position; use ferrumc_ecs::errors::ECSError; use ferrumc_net::connection::StreamWriter; use ferrumc_net::packets::outgoing::chunk_and_light_data::ChunkAndLightData; From 0a66bb161ecca21d99bb13efbcc69611a46f6fa7 Mon Sep 17 00:00:00 2001 From: ReCore Date: Mon, 16 Dec 2024 14:58:37 +1030 Subject: [PATCH 16/22] Slowed down chunk cycle --- src/bin/src/systems/chunk_fetcher.rs | 2 +- src/bin/src/systems/chunk_sender.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index 86930c38..de6f886b 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -82,7 +82,7 @@ impl System for ChunkFetcher { } } } - tokio::time::sleep(std::time::Duration::from_millis(1)).await; + tokio::time::sleep(std::time::Duration::from_millis(5)).await; } } diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index 29fab873..aac9be9f 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -160,7 +160,7 @@ impl System for ChunkSenderSystem { } } - tokio::time::sleep(Duration::from_millis(1)).await; + tokio::time::sleep(Duration::from_millis(5)).await; } } From 219392d8737e2c3233f8732f650aad72b80d6529 Mon Sep 17 00:00:00 2001 From: ReCore Date: Fri, 27 Dec 2024 17:32:38 +1030 Subject: [PATCH 17/22] Added logging --- src/bin/src/systems/chunk_fetcher.rs | 10 +++++----- src/bin/src/systems/chunk_sender.rs | 18 +++++++++--------- src/lib/ecs/src/components/mod.rs | 17 +++++++++++++++++ 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index de6f886b..5bcabb3d 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -35,10 +35,10 @@ impl System for ChunkFetcher { // Copy the chunks into a new map so we don't lock the component while fetching let mut copied_chunks = { trace!("Getting chunk_recv 1 for fetcher"); - let chunk_recv = state - .universe - .get::(eid) - .expect("ChunkReceiver not found"); + let Ok(chunk_recv) = state.universe.get::(eid) else { + trace!("A player disconnected before we could get the ChunkReceiver"); + return Ok(()); + }; trace!("Got chunk_recv 1 for fetcher"); let mut copied_chunks = HashMap::new(); for chunk in chunk_recv.needed_chunks.iter() { @@ -82,7 +82,7 @@ impl System for ChunkFetcher { } } } - tokio::time::sleep(std::time::Duration::from_millis(5)).await; + tokio::time::sleep(std::time::Duration::from_millis(1)).await; } } diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index aac9be9f..a8a9922d 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -42,10 +42,10 @@ impl System for ChunkSenderSystem { let state = state.clone(); task_set.spawn(async move { trace!("Getting chunk_recv 1 for sender"); - let chunk_recv = state - .universe - .get::(eid) - .expect("ChunkReceiver not found"); + let Ok(chunk_recv) = state.universe.get::(eid) else { + trace!("A player disconnected before we could get the ChunkReceiver"); + return Ok(()); + }; trace!("Got chunk_recv 1 for sender"); if chunk_recv.needed_chunks.is_empty() { return Ok(()); @@ -62,10 +62,10 @@ impl System for ChunkSenderSystem { .expect("StreamWriter not found"); trace!("Got conn 1 for sender"); trace!("Getting chunk_recv 2 for sender"); - let chunk_recv = state - .universe - .get::(eid) - .expect("ChunkReceiver not found"); + let Ok(chunk_recv) = state.universe.get::(eid) else { + trace!("A player disconnected before we could get the ChunkReceiver"); + return Ok(()); + }; trace!("Got chunk_recv 2 for sender"); if let Some(chunk) = &chunk_recv.last_chunk { let packet = SetCenterChunk::new(chunk.0, chunk.1); @@ -160,7 +160,7 @@ impl System for ChunkSenderSystem { } } - tokio::time::sleep(Duration::from_millis(5)).await; + tokio::time::sleep(Duration::from_millis(1)).await; } } diff --git a/src/lib/ecs/src/components/mod.rs b/src/lib/ecs/src/components/mod.rs index 6853fd8e..3b89f238 100644 --- a/src/lib/ecs/src/components/mod.rs +++ b/src/lib/ecs/src/components/mod.rs @@ -238,6 +238,7 @@ use crate::ECSResult; use dashmap::DashMap; use parking_lot::RwLock; use std::any::TypeId; +use tracing::trace; pub mod storage; @@ -300,6 +301,14 @@ impl ComponentManager { } pub fn get<'a, T: Component>(&self, entity_id: usize) -> ECSResult> { let type_id = TypeId::of::(); + #[cfg(debug_assertions)] + { + trace!("Getting static component lock for entity {}", entity_id); + let locked = matches!(self.components.try_get(&type_id), dashmap::try_result::TryResult::Locked); + if locked { + trace!("Static component lock for entity {} is locked", entity_id); + } + } let ptr = *self .components .get(&type_id) @@ -310,6 +319,14 @@ impl ComponentManager { pub fn get_mut<'a, T: Component>(&self, entity_id: usize) -> ECSResult> { let type_id = TypeId::of::(); + #[cfg(debug_assertions)] + { + trace!("Getting mutable component lock for entity {}", entity_id); + let locked = matches!(self.components.try_get_mut(&type_id), dashmap::try_result::TryResult::Locked); + if locked { + trace!("Mutable component lock for entity {} is locked", entity_id); + } + } let ptr = *self .components .get(&type_id) From 76747aadf5c9f9e98272b7408863ac26b6e7f8a8 Mon Sep 17 00:00:00 2001 From: ReCore Date: Fri, 27 Dec 2024 19:25:35 +1030 Subject: [PATCH 18/22] Seems to deadlock less, but still happening --- src/bin/src/main.rs | 14 ++ .../transform/update_player_position.rs | 36 +++-- src/bin/src/systems/chunk_fetcher.rs | 2 +- src/bin/src/systems/chunk_sender.rs | 151 ++++++++++-------- src/lib/ecs/src/components/mod.rs | 77 +++++++-- 5 files changed, 184 insertions(+), 96 deletions(-) diff --git a/src/bin/src/main.rs b/src/bin/src/main.rs index 4ee7ba72..9b8aecef 100644 --- a/src/bin/src/main.rs +++ b/src/bin/src/main.rs @@ -5,11 +5,14 @@ extern crate core; use crate::errors::BinaryError; use clap::Parser; use ferrumc_config::statics::get_global_config; +use ferrumc_core::chunks::chunk_receiver::ChunkReceiver; use ferrumc_ecs::Universe; use ferrumc_general_purpose::paths::get_root_path; +use ferrumc_net::connection::StreamWriter; use ferrumc_net::server::create_server_listener; use ferrumc_state::ServerState; use ferrumc_world::World; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use systems::definition; use tracing::{error, info}; @@ -29,6 +32,17 @@ async fn main() { check_deadlocks(); + { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + std::any::TypeId::of::().hash(&mut hasher); + let digest = hasher.finish(); + println!("ChunkReceiver: {:X}", digest); + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + std::any::TypeId::of::().hash(&mut hasher); + let digest = hasher.finish(); + println!("StreamWriter: {:X}", digest); + } + match cli_args.command { Some(Command::Setup) => { info!("Starting setup..."); diff --git a/src/bin/src/packet_handlers/transform/update_player_position.rs b/src/bin/src/packet_handlers/transform/update_player_position.rs index 6487e68f..e262e267 100644 --- a/src/bin/src/packet_handlers/transform/update_player_position.rs +++ b/src/bin/src/packet_handlers/transform/update_player_position.rs @@ -17,25 +17,27 @@ async fn handle_player_move( let conn_id = event.conn_id; if let Some(ref new_position) = event.position { trace!("Getting chunk_recv 1 for player move"); - let mut chunk_recv = state.universe.get_mut::(conn_id)?; - trace!("Got chunk_recv 1 for player move"); - if let Some(last_chunk) = &chunk_recv.last_chunk { - let new_chunk = ( - new_position.x as i32 / 16, - new_position.z as i32 / 16, - String::from("overworld"), - ); - if *last_chunk != new_chunk { - chunk_recv.last_chunk = Some(new_chunk); + { + let mut chunk_recv = state.universe.get_mut::(conn_id)?; + trace!("Got chunk_recv 1 for player move"); + if let Some(last_chunk) = &chunk_recv.last_chunk { + let new_chunk = ( + new_position.x as i32 / 16, + new_position.z as i32 / 16, + String::from("overworld"), + ); + if *last_chunk != new_chunk { + chunk_recv.last_chunk = Some(new_chunk); + chunk_recv.calculate_chunks().await; + } + } else { + chunk_recv.last_chunk = Some(( + new_position.x as i32 / 16, + new_position.z as i32 / 16, + String::from("overworld"), + )); chunk_recv.calculate_chunks().await; } - } else { - chunk_recv.last_chunk = Some(( - new_position.x as i32 / 16, - new_position.z as i32 / 16, - String::from("overworld"), - )); - chunk_recv.calculate_chunks().await; } trace!("Getting position 1 for player move"); diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index 5bcabb3d..bd727f43 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -82,7 +82,7 @@ impl System for ChunkFetcher { } } } - tokio::time::sleep(std::time::Duration::from_millis(1)).await; + // tokio::time::sleep(std::time::Duration::from_nanos(50)).await; } } diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index a8a9922d..af0d005c 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -7,9 +7,10 @@ use ferrumc_net::packets::outgoing::chunk_and_light_data::ChunkAndLightData; use ferrumc_net::packets::outgoing::chunk_batch_finish::ChunkBatchFinish; use ferrumc_net::packets::outgoing::chunk_batch_start::ChunkBatchStart; use ferrumc_net::packets::outgoing::set_center_chunk::SetCenterChunk; -use ferrumc_net_codec::encode::NetEncodeOpts; +use ferrumc_net_codec::encode::{NetEncode, NetEncodeOpts}; use ferrumc_net_codec::net_types::var_int::VarInt; use ferrumc_state::GlobalState; +use std::io::Cursor; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -41,92 +42,112 @@ impl System for ChunkSenderSystem { for (eid, (_, _)) in players { let state = state.clone(); task_set.spawn(async move { + let mut packets = Vec::new(); + let mut centre_coords = (0, 0); trace!("Getting chunk_recv 1 for sender"); - let Ok(chunk_recv) = state.universe.get::(eid) else { - trace!("A player disconnected before we could get the ChunkReceiver"); - return Ok(()); - }; - trace!("Got chunk_recv 1 for sender"); - if chunk_recv.needed_chunks.is_empty() { - return Ok(()); + { + let Ok(chunk_recv) = state.universe.get::(eid) else { + trace!("A player disconnected before we could get the ChunkReceiver"); + return Ok(()); + }; + trace!("Got chunk_recv 1 for sender"); + if chunk_recv.needed_chunks.is_empty() { + return Ok(()); + } } - drop(chunk_recv); // We can't delete from the map while iterating, so we collect the keys to drop // and then drop them after sending the chunks let mut to_drop = Vec::new(); { - trace!("Getting conn 1 for sender"); - let mut conn = state - .universe - .get_mut::(eid) - .expect("StreamWriter not found"); - trace!("Got conn 1 for sender"); trace!("Getting chunk_recv 2 for sender"); let Ok(chunk_recv) = state.universe.get::(eid) else { trace!("A player disconnected before we could get the ChunkReceiver"); return Ok(()); }; trace!("Got chunk_recv 2 for sender"); - if let Some(chunk) = &chunk_recv.last_chunk { - let packet = SetCenterChunk::new(chunk.0, chunk.1); - if let Err(e) = - conn.send_packet(&packet, &NetEncodeOpts::WithLength).await - { - error!("Error sending chunk: {:?}", e); - } - } - if let Err(e) = conn - .send_packet(&ChunkBatchStart {}, &NetEncodeOpts::WithLength) - .await { - error!("Error sending chunk: {:?}", e); + trace!("Getting conn 1 for sender"); + trace!("Got conn 1 for sender"); + if let Some(chunk) = &chunk_recv.last_chunk { + centre_coords = (chunk.0, chunk.1); + } } } let mut sent_chunks = 0; trace!("Getting chunk_recv 3 for sender"); - let chunk_recv = state - .universe - .get_mut::(eid) - .expect("ChunkReceiver not found"); - trace!("Got chunk_recv 3 for sender"); - for possible_chunk in chunk_recv.needed_chunks.iter_mut() { - if let Some(chunk) = possible_chunk.pair().1 { - let key = possible_chunk.pair().0; - to_drop.push(key.clone()); - match ChunkAndLightData::from_chunk(&chunk.clone()) { - Ok(packet) => { - trace!("Getting conn 2 for sender"); - let mut conn = state - .universe - .get_mut::(eid) - .expect("StreamWriter not found"); - trace!("Got conn 2 for sender"); - if let Err(e) = - conn.send_packet(&packet, &NetEncodeOpts::WithLength).await - { - error!("Error sending chunk: {:?}", e); - } else { + { + let chunk_recv = state + .universe + .get_mut::(eid) + .expect("ChunkReceiver not found"); + trace!("Got chunk_recv 3 for sender"); + for possible_chunk in chunk_recv.needed_chunks.iter_mut() { + if let Some(chunk) = possible_chunk.pair().1 { + let key = possible_chunk.pair().0; + to_drop.push(key.clone()); + match ChunkAndLightData::from_chunk(&chunk.clone()) { + Ok(packet) => { + packets.push(packet); sent_chunks += 1; } - } - Err(e) => { - error!("Error sending chunk: {:?}", e); + Err(e) => { + error!("Error sending chunk: {:?}", e); + } } } } } - drop(chunk_recv); { - trace!("Getting conn 3 for sender"); - let mut conn = state + trace!("Getting chunk_recv 4 for sender"); + let chunk_recv = state .universe - .get_mut::(eid) - .expect("StreamWriter not found"); - trace!("Got conn 3 for sender"); + .get_mut::(eid) + .expect("ChunkReceiver not found"); + trace!("Got chunk_recv 4 for sender"); + for key in to_drop { + chunk_recv.needed_chunks.remove(&key); + } + } + + { + if packets.is_empty() { + return Ok(()); + } + trace!("Getting conn 2 for sender"); + let Ok(mut conn) = state.universe.get_mut::(eid) else { + error!("Could not get StreamWriter"); + return Ok(()); + }; + trace!("Got conn 2 for sender"); + if let Err(e) = conn + .send_packet( + &SetCenterChunk { + x: VarInt::new(centre_coords.0), + z: VarInt::new(centre_coords.1), + }, + &NetEncodeOpts::WithLength, + ) + .await + { + error!("Error sending chunk: {:?}", e); + } + if let Err(e) = conn + .send_packet(&ChunkBatchStart {}, &NetEncodeOpts::WithLength) + .await + { + error!("Error sending chunk: {:?}", e); + } + for packet in packets { + if let Err(e) = + conn.send_packet(&packet, &NetEncodeOpts::WithLength).await + { + error!("Error sending chunk: {:?}", e); + } + } if let Err(e) = conn .send_packet( &ChunkBatchFinish { - batch_size: VarInt::from(sent_chunks), + batch_size: VarInt::new(sent_chunks), }, &NetEncodeOpts::WithLength, ) @@ -135,15 +156,7 @@ impl System for ChunkSenderSystem { error!("Error sending chunk: {:?}", e); } } - trace!("Getting chunk_recv 4 for sender"); - let chunk_recv = state - .universe - .get_mut::(eid) - .expect("ChunkReceiver not found"); - trace!("Got chunk_recv 4 for sender"); - for key in to_drop { - chunk_recv.needed_chunks.remove(&key); - } + Ok(()) }); } @@ -160,7 +173,7 @@ impl System for ChunkSenderSystem { } } - tokio::time::sleep(Duration::from_millis(1)).await; + // tokio::time::sleep(Duration::from_nanos(50)).await; } } diff --git a/src/lib/ecs/src/components/mod.rs b/src/lib/ecs/src/components/mod.rs index 3b89f238..4d61d053 100644 --- a/src/lib/ecs/src/components/mod.rs +++ b/src/lib/ecs/src/components/mod.rs @@ -237,7 +237,8 @@ use crate::errors::ECSError; use crate::ECSResult; use dashmap::DashMap; use parking_lot::RwLock; -use std::any::TypeId; +use std::any::{Any, TypeId}; +use std::hash::{Hash, Hasher}; use tracing::trace; pub mod storage; @@ -303,10 +304,24 @@ impl ComponentManager { let type_id = TypeId::of::(); #[cfg(debug_assertions)] { - trace!("Getting static component lock for entity {}", entity_id); - let locked = matches!(self.components.try_get(&type_id), dashmap::try_result::TryResult::Locked); + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + type_id.hash(&mut hasher); + let type_hash = hasher.finish(); + trace!( + "Getting static component (ID: {:X}) lock for entity {}", + type_hash, + entity_id + ); + let locked = matches!( + self.components.try_get(&type_id), + dashmap::try_result::TryResult::Locked + ); if locked { - trace!("Static component lock for entity {} is locked", entity_id); + trace!( + "Static component (ID: {:X}) lock for entity {} is locked", + type_hash, + entity_id + ); } } let ptr = *self @@ -314,17 +329,45 @@ impl ComponentManager { .get(&type_id) .ok_or(ECSError::ComponentTypeNotFound)?; let component_set = unsafe { &*(ptr as *const ComponentSparseSet) }; - component_set.get(entity_id) + let res = component_set.get(entity_id); + #[cfg(debug_assertions)] + { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + type_id.hash(&mut hasher); + let type_hash = hasher.finish(); + if res.is_ok() { + trace!( + "Got static component (ID: {:X}) lock for entity {}", + type_hash, + entity_id + ); + } + res + } } pub fn get_mut<'a, T: Component>(&self, entity_id: usize) -> ECSResult> { let type_id = TypeId::of::(); #[cfg(debug_assertions)] { - trace!("Getting mutable component lock for entity {}", entity_id); - let locked = matches!(self.components.try_get_mut(&type_id), dashmap::try_result::TryResult::Locked); + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + type_id.hash(&mut hasher); + let type_hash = hasher.finish(); + trace!( + "Getting mutable component (ID: {:X}) lock for entity {}", + type_hash, + entity_id + ); + let locked = matches!( + self.components.try_get_mut(&type_id), + dashmap::try_result::TryResult::Locked + ); if locked { - trace!("Mutable component lock for entity {} is locked", entity_id); + trace!( + "Mutable component (ID: {:X}) lock for entity {} is locked", + type_hash, + entity_id + ); } } let ptr = *self @@ -332,7 +375,23 @@ impl ComponentManager { .get(&type_id) .ok_or(ECSError::ComponentTypeNotFound)?; let component_set = unsafe { &*(ptr as *const ComponentSparseSet) }; - component_set.get_mut(entity_id) + { + let res = component_set.get_mut(entity_id); + #[cfg(debug_assertions)] + { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + type_id.hash(&mut hasher); + let type_hash = hasher.finish(); + if res.is_ok() { + trace!( + "Got mutable component (ID: {:X}) lock for entity {}", + type_hash, + entity_id + ); + } + } + res + } } pub fn remove(&self, entity_id: usize) -> ECSResult<()> { From 01f3989390a62f9b42104c393dbd88bb389cf145 Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 29 Dec 2024 16:35:48 +1030 Subject: [PATCH 19/22] fixed issue with locking components even when discarding them --- src/bin/src/systems/chunk_fetcher.rs | 8 ++---- src/bin/src/systems/chunk_sender.rs | 43 +++++++++++----------------- 2 files changed, 18 insertions(+), 33 deletions(-) diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index bd727f43..151cb65b 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -28,18 +28,16 @@ impl System for ChunkFetcher { while !self.stop.load(std::sync::atomic::Ordering::Relaxed) { let mut task_set: JoinSet> = JoinSet::new(); - let players = state.universe.query::<&mut ChunkReceiver>(); - for (eid, _) in players { + let players = state.universe.query::<&mut ChunkReceiver>().into_entities(); + for eid in players { let state = state.clone(); task_set.spawn(async move { // Copy the chunks into a new map so we don't lock the component while fetching let mut copied_chunks = { - trace!("Getting chunk_recv 1 for fetcher"); let Ok(chunk_recv) = state.universe.get::(eid) else { trace!("A player disconnected before we could get the ChunkReceiver"); return Ok(()); }; - trace!("Got chunk_recv 1 for fetcher"); let mut copied_chunks = HashMap::new(); for chunk in chunk_recv.needed_chunks.iter() { let (key, chunk) = chunk.pair(); @@ -57,12 +55,10 @@ impl System for ChunkFetcher { } // Insert the fetched chunks back into the component { - trace!("Getting chunk_recv 2 for fetcher"); let Ok(chunk_recv) = state.universe.get::(eid) else { trace!("A player disconnected before we could get the ChunkReceiver"); return Ok(()); }; - trace!("Got chunk_recv 2 for fetcher"); for (key, chunk) in copied_chunks.iter() { chunk_recv.needed_chunks.insert(key.clone(), chunk.clone()); } diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index af0d005c..a3df2aa3 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -37,20 +37,19 @@ impl System for ChunkSenderSystem { while !self.stop.load(Ordering::Relaxed) { let players = state .universe - .query::<(&mut ChunkReceiver, &mut StreamWriter)>(); + .query::<(&mut ChunkReceiver, &mut StreamWriter)>() + .into_entities(); let mut task_set: JoinSet> = JoinSet::new(); - for (eid, (_, _)) in players { + for eid in players { let state = state.clone(); task_set.spawn(async move { let mut packets = Vec::new(); let mut centre_coords = (0, 0); - trace!("Getting chunk_recv 1 for sender"); { let Ok(chunk_recv) = state.universe.get::(eid) else { trace!("A player disconnected before we could get the ChunkReceiver"); return Ok(()); }; - trace!("Got chunk_recv 1 for sender"); if chunk_recv.needed_chunks.is_empty() { return Ok(()); } @@ -59,28 +58,22 @@ impl System for ChunkSenderSystem { // and then drop them after sending the chunks let mut to_drop = Vec::new(); { - trace!("Getting chunk_recv 2 for sender"); let Ok(chunk_recv) = state.universe.get::(eid) else { trace!("A player disconnected before we could get the ChunkReceiver"); return Ok(()); }; - trace!("Got chunk_recv 2 for sender"); - { - trace!("Getting conn 1 for sender"); - trace!("Got conn 1 for sender"); - if let Some(chunk) = &chunk_recv.last_chunk { - centre_coords = (chunk.0, chunk.1); - } + // Store the last chunk's coordinates so we can send the SetCenterChunk packet + // This means we don't need to lock the chunk_recv while sending the chunks + if let Some(chunk) = &chunk_recv.last_chunk { + centre_coords = (chunk.0, chunk.1); } } let mut sent_chunks = 0; - trace!("Getting chunk_recv 3 for sender"); { - let chunk_recv = state - .universe - .get_mut::(eid) - .expect("ChunkReceiver not found"); - trace!("Got chunk_recv 3 for sender"); + let Ok(chunk_recv) = state.universe.get::(eid) else { + trace!("A player disconnected before we could get the ChunkReceiver"); + return Ok(()); + }; for possible_chunk in chunk_recv.needed_chunks.iter_mut() { if let Some(chunk) = possible_chunk.pair().1 { let key = possible_chunk.pair().0; @@ -98,12 +91,10 @@ impl System for ChunkSenderSystem { } } { - trace!("Getting chunk_recv 4 for sender"); - let chunk_recv = state - .universe - .get_mut::(eid) - .expect("ChunkReceiver not found"); - trace!("Got chunk_recv 4 for sender"); + let Ok(chunk_recv) = state.universe.get::(eid) else { + trace!("A player disconnected before we could get the ChunkReceiver"); + return Ok(()); + }; for key in to_drop { chunk_recv.needed_chunks.remove(&key); } @@ -113,12 +104,10 @@ impl System for ChunkSenderSystem { if packets.is_empty() { return Ok(()); } - trace!("Getting conn 2 for sender"); let Ok(mut conn) = state.universe.get_mut::(eid) else { error!("Could not get StreamWriter"); return Ok(()); }; - trace!("Got conn 2 for sender"); if let Err(e) = conn .send_packet( &SetCenterChunk { @@ -183,6 +172,6 @@ impl System for ChunkSenderSystem { } fn name(&self) -> &'static str { - "chunk_sender" + "Chunk Sender" } } From 06447904b83fb61cb68cf0733181f75712589b99 Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 29 Dec 2024 16:38:31 +1030 Subject: [PATCH 20/22] Cargo + fmt --- src/bin/src/systems/chunk_fetcher.rs | 2 +- src/bin/src/systems/chunk_sender.rs | 5 ++--- src/lib/ecs/src/components/mod.rs | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/bin/src/systems/chunk_fetcher.rs b/src/bin/src/systems/chunk_fetcher.rs index 151cb65b..47741926 100644 --- a/src/bin/src/systems/chunk_fetcher.rs +++ b/src/bin/src/systems/chunk_fetcher.rs @@ -78,7 +78,7 @@ impl System for ChunkFetcher { } } } - // tokio::time::sleep(std::time::Duration::from_nanos(50)).await; + tokio::time::sleep(std::time::Duration::from_millis(5)).await; } } diff --git a/src/bin/src/systems/chunk_sender.rs b/src/bin/src/systems/chunk_sender.rs index a3df2aa3..6b359bb3 100644 --- a/src/bin/src/systems/chunk_sender.rs +++ b/src/bin/src/systems/chunk_sender.rs @@ -7,10 +7,9 @@ use ferrumc_net::packets::outgoing::chunk_and_light_data::ChunkAndLightData; use ferrumc_net::packets::outgoing::chunk_batch_finish::ChunkBatchFinish; use ferrumc_net::packets::outgoing::chunk_batch_start::ChunkBatchStart; use ferrumc_net::packets::outgoing::set_center_chunk::SetCenterChunk; -use ferrumc_net_codec::encode::{NetEncode, NetEncodeOpts}; +use ferrumc_net_codec::encode::NetEncodeOpts; use ferrumc_net_codec::net_types::var_int::VarInt; use ferrumc_state::GlobalState; -use std::io::Cursor; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -162,7 +161,7 @@ impl System for ChunkSenderSystem { } } - // tokio::time::sleep(Duration::from_nanos(50)).await; + tokio::time::sleep(Duration::from_millis(5)).await; } } diff --git a/src/lib/ecs/src/components/mod.rs b/src/lib/ecs/src/components/mod.rs index 4d61d053..aa103488 100644 --- a/src/lib/ecs/src/components/mod.rs +++ b/src/lib/ecs/src/components/mod.rs @@ -237,7 +237,7 @@ use crate::errors::ECSError; use crate::ECSResult; use dashmap::DashMap; use parking_lot::RwLock; -use std::any::{Any, TypeId}; +use std::any::TypeId; use std::hash::{Hash, Hasher}; use tracing::trace; From 3715c86c4cee9e8b67d852b4ed8b332b72bdfebf Mon Sep 17 00:00:00 2001 From: ReCore Date: Sun, 29 Dec 2024 16:55:51 +1030 Subject: [PATCH 21/22] Clippy fixes --- src/lib/adapters/anvil/src/lib.rs | 6 +- src/lib/ecs/src/components/mod.rs | 236 +----------------------------- 2 files changed, 4 insertions(+), 238 deletions(-) diff --git a/src/lib/adapters/anvil/src/lib.rs b/src/lib/adapters/anvil/src/lib.rs index d289c9cc..895d6721 100644 --- a/src/lib/adapters/anvil/src/lib.rs +++ b/src/lib/adapters/anvil/src/lib.rs @@ -100,9 +100,9 @@ impl LoadedAnvilFile { pub fn get_locations(&self) -> Vec { let mut locations = Vec::with_capacity(1024); for i in 0..1024 { - let location = u32::from(self.table[i * 4]) << 24 - | u32::from(self.table[i * 4 + 1]) << 16 - | u32::from(self.table[i * 4 + 2]) << 8 + let location = (u32::from(self.table[i * 4]) << 24) + | (u32::from(self.table[i * 4 + 1]) << 16) + | (u32::from(self.table[i * 4 + 2]) << 8) | u32::from(self.table[i * 4 + 3]); if location != 0 { locations.push(location); diff --git a/src/lib/ecs/src/components/mod.rs b/src/lib/ecs/src/components/mod.rs index aa103488..92093503 100644 --- a/src/lib/ecs/src/components/mod.rs +++ b/src/lib/ecs/src/components/mod.rs @@ -1,237 +1,3 @@ -/*use dashmap::DashMap; -use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use sparse_set::SparseSet; -use std::any::{Any, TypeId}; -use std::marker::PhantomData; -use std::ops::{Deref, DerefMut}; -use crate::ECSResult; -use crate::entities::Entity; -use crate::errors::ECSError; - -pub mod sparse_set; - -pub trait Component: Any + Send + Sync {} -impl Component for T {} - -unsafe impl Send for ComponentRef<'_, T> where T: Component {} -unsafe impl Send for ComponentRefMut<'_, T> where T: Component {} - -pub struct ComponentStorage { - pub components: DashMap>>>, -} - -impl Default for ComponentStorage { - fn default() -> Self { - Self::new() - } -} - -impl ComponentStorage { - pub fn new() -> Self { - ComponentStorage { - components: DashMap::new(), - } - } - - pub fn insert(&self, entity: Entity, component: T) { - let type_id = TypeId::of::(); - let mut components = self - .components - .entry(type_id) - .or_default(); - components.insert(entity, RwLock::new(Box::new(component))); - } - - pub fn get_entities_with(&self) -> Vec { - let type_id = TypeId::of::(); - let components = match self.components.get(&type_id) { - Some(components) => components, - None => { - return Vec::new(); - } - }; - - components.value().entities() - } - - pub fn remove(&self, entity: Entity) { - let type_id = TypeId::of::(); - self.components.get_mut(&type_id) - .map(|mut components| components.remove(entity)); - } - - pub fn remove_all_components(&self, entity: Entity) -> ECSResult<()> { - self.components.iter_mut() - .for_each(|mut components| { - // check if its locked or not - if let Some(component) = components.get_mut(entity) { - let lock = component.write(); - // basically wait for component to be able to be written to (or have no readers & writers) - drop(lock); - // Remove else-wise - components.remove(entity); - } - }); - - Ok(()) - } -} -impl ComponentStorage { - pub fn get<'a, T: Component>(&self, entity: Entity) -> ECSResult> - { - let type_id = TypeId::of::(); - let components = self.components.get(&type_id) - .ok_or(ECSError::ComponentNotFound)?; - let component = components.get(entity) - .ok_or(ECSError::ComponentNotFound)?; - - let read_guard = component.try_read() - .ok_or(ECSError::ComponentLocked)?; - - let read_guard = unsafe { - std::mem::transmute::< - RwLockReadGuard<'_, Box>, - RwLockReadGuard<'a, Box>, - >(read_guard) - }; - - Ok(ComponentRef { - read_guard, - _phantom: PhantomData, - }) - } - - pub fn get_mut<'a, T: Component>(&self, entity: Entity) -> ECSResult> - { - let type_id = TypeId::of::(); - let components = self.components.get(&type_id) - .ok_or(ECSError::ComponentNotFound)?; - let component = components.get(entity) - .ok_or(ECSError::ComponentNotFound)?; - - let write_guard = component.try_write() - .ok_or(ECSError::ComponentLocked)?; - - let write_guard = unsafe { - std::mem::transmute::< - RwLockWriteGuard<'_, Box>, - RwLockWriteGuard<'a, Box>, - >(write_guard) - }; - - Ok(ComponentRefMut { - write_guard, - _phantom: PhantomData, - }) - } -} -pub struct ComponentRef<'a, T: Component> { - read_guard: RwLockReadGuard<'a, Box>, - _phantom: PhantomData<&'a T>, -} - -pub struct ComponentRefMut<'a, T: Component> { - write_guard: RwLockWriteGuard<'a, Box>, - _phantom: PhantomData<&'a mut T>, -} -mod debug { - use std::fmt::Debug; - use crate::components::{Component, ComponentRef, ComponentRefMut}; - - impl Debug for ComponentRef<'_, T> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Debug::fmt(&**self, f) - } - } - - impl Debug for ComponentRefMut<'_, T> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Debug::fmt(&**self, f) - } - } -} - -impl Deref for ComponentRef<'_, T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - unsafe { &*(&**self.read_guard as *const dyn Component as *const T) } - } -} - -impl Deref for ComponentRefMut<'_, T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - unsafe { &*(&**self.write_guard as *const dyn Component as *const T) } - } -} - -impl DerefMut for ComponentRefMut<'_, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { &mut *(&mut **self.write_guard as *mut dyn Component as *mut T) } - } -} - - - -#[cfg(test)] -mod tests { - use crate::components::ComponentStorage; - use crate::entities::EntityManager; - - struct Position { - x: f32, - y: f32, - } - - #[test] - fn insert_get() { - let entity_manager = EntityManager::new(); - let component_storage = ComponentStorage::new(); - let entity = entity_manager.create_entity(); - let position = Position { x: 0.0, y: 0.0 }; - component_storage.insert(entity, position); - - let position = component_storage.get::(entity).unwrap(); - assert_eq!(position.x, 0.0); - assert_eq!(position.y, 0.0); - } - - #[test] - fn insert_get_mut() { - let entity_manager = EntityManager::new(); - let component_storage = ComponentStorage::new(); - let entity = entity_manager.create_entity(); - let position = Position { x: 0.0, y: 0.0 }; - component_storage.insert(entity, position); - - let mut position = component_storage.get_mut::(entity).unwrap(); - position.x = 1.0; - position.y = 2.0; - - assert_eq!(position.x, 1.0); - assert_eq!(position.y, 2.0); - } - - #[test] - fn test_multi_mut() { - let entity_manager = EntityManager::new(); - let component_storage = ComponentStorage::new(); - let entity = entity_manager.create_entity(); - let position = Position { x: 0.0, y: 0.0 }; - component_storage.insert(entity, position); - - let mut position = component_storage.get_mut::(entity).unwrap(); - position.x = 1.0; - position.y = 2.0; - - let position = component_storage.get_mut::(entity); - - assert!(position.is_err()); - } -} -*/ use crate::components::storage::{Component, ComponentRef, ComponentRefMut, ComponentSparseSet}; use crate::errors::ECSError; use crate::ECSResult; @@ -342,8 +108,8 @@ impl ComponentManager { entity_id ); } - res } + res } pub fn get_mut<'a, T: Component>(&self, entity_id: usize) -> ECSResult> { From a9f2f17cb8cf99424f89cbb57a83a398dd2b1ca0 Mon Sep 17 00:00:00 2001 From: ReCore Date: Mon, 30 Dec 2024 22:06:43 +1030 Subject: [PATCH 22/22] fixed cyclic dep issue --- src/lib/utils/config/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lib/utils/config/Cargo.toml b/src/lib/utils/config/Cargo.toml index 3299a277..8274b6c2 100644 --- a/src/lib/utils/config/Cargo.toml +++ b/src/lib/utils/config/Cargo.toml @@ -13,7 +13,6 @@ tracing = { workspace = true } parking_lot = { workspace = true } base64 = { workspace = true } ferrumc-general-purpose = { workspace = true } -ferrumc-core = { workspace = true } lazy_static = { workspace = true } dashmap = { workspace = true } uuid = { workspace = true }