Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added a 100ms sleep to the keep alive system to prevent it from having excessive cpu usage #117

Merged
merged 5 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 61 additions & 32 deletions src/bin/src/packet_handlers/login_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use ferrumc_net::packets::incoming::login_acknowledged::LoginAcknowledgedEvent;
use ferrumc_net::packets::incoming::login_start::LoginStartEvent;
use ferrumc_net::packets::incoming::server_bound_known_packs::ServerBoundKnownPacksEvent;
use ferrumc_net::packets::outgoing::client_bound_known_packs::ClientBoundKnownPacksPacket;
use ferrumc_net::packets::outgoing::finish_configuration::FinishConfigurationPacket;
use ferrumc_net::packets::outgoing::game_event::GameEventPacket;
use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket};
use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use ferrumc_net::packets::outgoing::login_play::LoginPlayPacket;
use ferrumc_net::packets::outgoing::login_success::LoginSuccessPacket;
use ferrumc_net::packets::outgoing::registry_data::get_registry_packets;
Expand All @@ -18,7 +19,6 @@ use ferrumc_net::packets::outgoing::synchronize_player_position::SynchronizePlay
use ferrumc_net::GlobalState;
use ferrumc_net_codec::encode::NetEncodeOpts;
use tracing::{debug, trace};
use ferrumc_net::packets::outgoing::finish_configuration::FinishConfigurationPacket;

#[event_handler]
async fn handle_login_start(
Expand All @@ -31,19 +31,23 @@ async fn handle_login_start(
let username = login_start_event.login_start_packet.username.as_str();
debug!("Received login start from user with username {}", username);


// Add the player identity component to the ECS for the entity.
state.universe.add_component::<PlayerIdentity>(
login_start_event.conn_id,
PlayerIdentity::new(username.to_string(), uuid),
)?;

//Send a Login Success Response to further the login sequence
let mut writer = state
.universe
.get_mut::<StreamWriter>(login_start_event.conn_id)?;

writer.send_packet(&LoginSuccessPacket::new(uuid, username), &NetEncodeOpts::WithLength).await?;
writer
.send_packet(
&LoginSuccessPacket::new(uuid, username),
&NetEncodeOpts::WithLength,
)
.await?;

Ok(login_start_event)
}
Expand All @@ -62,15 +66,16 @@ async fn handle_login_acknowledged(

*connection_state = ConnectionState::Configuration;


// Send packets packet
let client_bound_known_packs = ClientBoundKnownPacksPacket::new();

let mut writer = state
.universe
.get_mut::<StreamWriter>(login_acknowledged_event.conn_id)?;

writer.send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength).await?;
writer
.send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength)
.await?;

Ok(login_acknowledged_event)
}
Expand All @@ -87,10 +92,17 @@ async fn handle_server_bound_known_packs(
.get_mut::<StreamWriter>(server_bound_known_packs_event.conn_id)?;

let registry_packets = get_registry_packets();
writer.send_packet(&registry_packets, &NetEncodeOpts::None).await?;

writer.send_packet(&FinishConfigurationPacket::new(), &NetEncodeOpts::WithLength).await?;

writer
.send_packet(&registry_packets, &NetEncodeOpts::None)
.await?;

writer
.send_packet(
&FinishConfigurationPacket::new(),
&NetEncodeOpts::WithLength,
)
.await?;

Ok(server_bound_known_packs_event)
}

Expand All @@ -103,34 +115,51 @@ async fn handle_ack_finish_configuration(

let conn_id = ack_finish_configuration_event.conn_id;

let mut conn_state = state
.universe
.get_mut::<ConnectionState>(conn_id)?;
let mut conn_state = state.universe.get_mut::<ConnectionState>(conn_id)?;

*conn_state = ConnectionState::Play;

let mut writer = state
.universe
.get_mut::<StreamWriter>(conn_id)?;

writer.send_packet(&LoginPlayPacket::new(conn_id), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&SetDefaultSpawnPositionPacket::default(), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&SynchronizePlayerPositionPacket::default(), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&GameEventPacket::start_waiting_for_level_chunks(), &NetEncodeOpts::WithLength).await?;
let mut writer = state.universe.get_mut::<StreamWriter>(conn_id)?;

writer
.send_packet(&LoginPlayPacket::new(conn_id), &NetEncodeOpts::WithLength)
.await?;
writer
.send_packet(
&SetDefaultSpawnPositionPacket::default(),
&NetEncodeOpts::WithLength,
)
.await?;
writer
.send_packet(
&SynchronizePlayerPositionPacket::default(),
&NetEncodeOpts::WithLength,
)
.await?;
writer
.send_packet(
&GameEventPacket::start_waiting_for_level_chunks(),
&NetEncodeOpts::WithLength,
)
.await?;

send_keep_alive(conn_id, state, &mut writer).await?;


Ok(ack_finish_configuration_event)
}
async fn send_keep_alive(conn_id: usize, state: GlobalState, writer: &mut ComponentRefMut<'_, StreamWriter>) -> Result<(), NetError> {
let keep_alive_packet = KeepAlivePacket::default();
writer.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength).await?;

let id = keep_alive_packet.id;

state.universe.add_component::<KeepAlive>(conn_id, id)?;

async fn send_keep_alive(
conn_id: usize,
state: GlobalState,
writer: &mut ComponentRefMut<'_, StreamWriter>,
) -> Result<(), NetError> {
let keep_alive_packet = OutgoingKeepAlivePacket::default();
writer
.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength)
.await?;

state
.universe
.add_component::<OutgoingKeepAlivePacket>(conn_id, keep_alive_packet)?;

Ok(())
}
}
69 changes: 40 additions & 29 deletions src/bin/src/systems/keep_alive_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use crate::systems::definition::System;
use async_trait::async_trait;
use ferrumc_core::identity::player_identity::PlayerIdentity;
use ferrumc_net::connection::{ConnectionState, StreamWriter};
use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket};
use ferrumc_net::packets::incoming::keep_alive::IncomingKeepAlivePacket;
use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use ferrumc_net::utils::broadcast::{BroadcastOptions, BroadcastToAll};
use ferrumc_net::GlobalState;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tracing::{error, info, trace, warn};
use ferrumc_net::utils::broadcast::{BroadcastOptions, BroadcastToAll};

pub struct KeepAliveSystem {
shutdown: AtomicBool,
Expand All @@ -20,19 +21,17 @@ impl KeepAliveSystem {
}
}
}
const FIFTEEN_SECONDS_MS: i64 = 15000; // 15 seconds in milliseconds

#[async_trait]
impl System for KeepAliveSystem {
async fn start(self: Arc<Self>, state: GlobalState) {
info!("Started keep_alive");
let mut last_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as i64;
loop {
if self.shutdown.load(Ordering::Relaxed) {
break;
}

while !self.shutdown.load(Ordering::Relaxed) {
let online_players = state.universe.query::<&PlayerIdentity>();

let current_time = std::time::SystemTime::now()
Expand All @@ -45,19 +44,24 @@ impl System for KeepAliveSystem {
last_time = current_time;
}

let fifteen_seconds_ms = 15000; // 15 seconds in milliseconds

let entities = state
.universe
.query::<(&mut StreamWriter, &ConnectionState, &KeepAlive)>()
.query::<(
&mut StreamWriter,
&ConnectionState,
&IncomingKeepAlivePacket,
)>()
.into_entities()
.into_iter()
.filter_map(|entity| {
let conn_state = state.universe.get::<ConnectionState>(entity).ok()?;
let keep_alive = state.universe.get_mut::<KeepAlive>(entity).ok()?;
let keep_alive = state
.universe
.get_mut::<IncomingKeepAlivePacket>(entity)
.ok()?;

if matches!(*conn_state, ConnectionState::Play)
&& (current_time - keep_alive.id) >= fifteen_seconds_ms
&& (current_time - keep_alive.id) >= FIFTEEN_SECONDS_MS
{
Some(entity)
} else {
Expand All @@ -67,24 +71,31 @@ impl System for KeepAliveSystem {
.collect::<Vec<_>>();
if !entities.is_empty() {
trace!("there are {:?} players to keep alive", entities.len());
}

let packet = KeepAlivePacket::default();

let broadcast_opts = BroadcastOptions::default().only(entities)
.with_sync_callback(move |entity, state| {
let Ok(mut keep_alive) = state.universe.get_mut::<KeepAlive>(entity) else {
warn!("Failed to get <KeepAlive> component for entity {}", entity);
return;
};

*keep_alive = KeepAlive::from(current_time);
});

if let Err(e) = state.broadcast(&packet, broadcast_opts).await {
error!("Error sending keep alive packet: {}", e);
};

let packet = OutgoingKeepAlivePacket { id: current_time };

let broadcast_opts = BroadcastOptions::default()
.only(entities)
.with_sync_callback(move |entity, state| {
let Ok(mut outgoing_keep_alive) =
state.universe.get_mut::<OutgoingKeepAlivePacket>(entity)
else {
warn!(
"Failed to get <OutgoingKeepAlive> component for entity {}",
entity
);
return;
};

*outgoing_keep_alive = OutgoingKeepAlivePacket { id: current_time };
});

if let Err(e) = state.broadcast(&packet, broadcast_opts).await {
error!("Error sending keep alive packet: {}", e);
};
}
// TODO, this should be configurable as some people may have bad network so the clients may end up disconnecting from the server moments before the keep alive is sent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

15 seconds delay means the client gets 2 chances. The default timeout for keepalive is 30 seconds.

tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
}
}

Expand Down
39 changes: 34 additions & 5 deletions src/lib/net/src/packets/incoming/keep_alive.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::packets::outgoing::keep_alive::KeepAlive;
use crate::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use crate::packets::IncomingPacket;
use crate::{NetResult, ServerState};
use ferrumc_ecs::components::storage::ComponentRefMut;
use ferrumc_ecs::errors::ECSError;
use ferrumc_macros::{packet, NetDecode};
use std::sync::Arc;
use tracing::debug;
use tracing::{debug, warn};

#[derive(NetDecode)]
#[packet(packet_id = 0x18, state = "play")]
Expand All @@ -13,15 +15,42 @@ pub struct IncomingKeepAlivePacket {

impl IncomingPacket for IncomingKeepAlivePacket {
async fn handle(self, conn_id: usize, state: Arc<ServerState>) -> NetResult<()> {
let mut last_keep_alive = state.universe.get_mut::<KeepAlive>(conn_id)?;
// TODO handle errors.
let last_keep_alive = state.universe.get_mut::<OutgoingKeepAlivePacket>(conn_id)?;

if self.id != last_keep_alive.id {
debug!(
"Invalid keep alive packet received from {:?} with id {:?} (expected {:?})",
"Invalid keep alive packet received from entity {:?} with id {:?} (expected {:?})",
conn_id, self.id, last_keep_alive.id
);
return NetResult::Err(crate::errors::NetError::Packet(
crate::errors::PacketError::InvalidState(0x18),
));
// TODO Kick player
}

let result = state.universe.get_mut::<IncomingKeepAlivePacket>(conn_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the problem happens I'm guessing.


if result.is_err() {
let err = result.as_ref().err().unwrap();
if matches!(err, ECSError::ComponentTypeNotFound) {
state
.universe
.add_component(conn_id, IncomingKeepAlivePacket { id: self.id })?;
let mut last_received_keep_alive = state.universe.get_mut(conn_id)?;
*last_received_keep_alive = self;
} else {
warn!(
"Failed to get or create <IncomingKeepAlive> component: {:?}",
err
);
return Err(crate::errors::NetError::ECSError(result.err().unwrap()));
}
} else {
*last_keep_alive = KeepAlive::from(self.id);
let mut last_received_keep_alive: ComponentRefMut<'_, IncomingKeepAlivePacket> =
result.unwrap();

*last_received_keep_alive = self;
Comment on lines +33 to +53
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do a match statement?

match result {
    Ok(component) => {
        // ...
    },
    Err(e) => {
        // ...
    }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesnt fix the problem of the error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state.universe.get_mut::<IncomingKeepAlivePacket>(conn_id)

is the culprit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a deadlock from dashmap?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If something stupid is being done, probably. I haven't read the code in too much depth, but this shouldn't be the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it wasnt, I think the error system is a bit inconsistent, I expected only ComponentTypeNotFound, but also got ComponentRetrievalError, when they're both reporting what should be the same error since the IncomingKeepAlive hasn't been created yet, I've fixed this in #122

}

Ok(())
Expand Down
29 changes: 4 additions & 25 deletions src/lib/net/src/packets/outgoing/keep_alive.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,18 @@
use ferrumc_macros::{packet, NetEncode};
use std::io::Write;

#[derive(Debug, NetEncode)]
pub struct KeepAlive {
pub id: i64,
}

mod adapters {
impl From<i64> for super::KeepAlive {
fn from(id: i64) -> Self {
Self { id }
}
}
}

#[derive(NetEncode)]
#[packet(packet_id = 0x26)]
pub struct KeepAlivePacket {
pub id: KeepAlive,
pub struct OutgoingKeepAlivePacket {
pub id: i64,
}

impl Default for KeepAlivePacket {
impl Default for OutgoingKeepAlivePacket {
fn default() -> Self {
let current_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards?? LMAO")
.as_millis() as i64;
Self::new(current_ms)
}
}

impl KeepAlivePacket {
pub fn new(id: i64) -> Self {
Self {
id: KeepAlive::from(id),
}
Self { id: current_ms }
}
}