Skip to content
This repository has been archived by the owner on Dec 19, 2024. It is now read-only.

Commit

Permalink
More in sending system
Browse files Browse the repository at this point in the history
  • Loading branch information
Veritius committed Dec 1, 2023
1 parent a168131 commit 00fef20
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 49 deletions.
15 changes: 9 additions & 6 deletions udp/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bevy::prelude::*;

#[derive(Debug, Resource)]
pub(crate) struct BoundSocketManager {
pub sockets: BTreeMap<u16, BoundSocket>,
sockets: BTreeMap<u16, BoundSocket>,
}

impl BoundSocketManager {
Expand Down Expand Up @@ -54,13 +54,15 @@ impl BoundSocketManager {
}
});

// Add to the least populated socket
// Get the least populated socket in the map
let smallest = self.smallest();
self.sockets
let mut socket = self.sockets
.get_mut(&smallest)
.unwrap() // TODO: Handle this case
.peers
.push(peer);
.unwrap(); // TODO: Handle this case

// Add to the least populated socket
socket.peers.push(peer);
socket.peers.sort_unstable();
}

pub fn iter(&self) -> impl Iterator<Item = (u16, &BoundSocket)> {
Expand All @@ -71,6 +73,7 @@ impl BoundSocketManager {
#[derive(Debug)]
pub(crate) struct BoundSocket {
pub socket: UdpSocket,
/// Peers associated with this socket. This list is sorted.
pub peers: Vec<Entity>,
}

Expand Down
77 changes: 35 additions & 42 deletions udp/src/receiving/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,41 @@ pub(crate) fn blocking_receive_packets_system(
.map(|x| (x.0, Mutex::new((x.1, x.2))))
.collect::<BTreeMap<_,_>>();

// Create a block to only have variables be shadowed inside here, since the task futures are move
{
// Prevent moves by shadowing variables as explicit references
let peer_locks = &peer_locks;
let policy = &policy;
// Task pool for performant, pleasing parallel processing of ports
rayon::scope(|scope| {
for (port, socket) in sockets.iter() {
scope.spawn(|_| {
let peers = &socket.peers;

// Task pool for performant, pleasing parallel processing of ports
rayon::scope(|scope| {
for (port, socket) in sockets.iter() {
scope.spawn(move |_| {
let peers = &socket.peers;
// Take locks for all the peers in our table
let mut peer_locks = peers.iter()
.map(|id| {
let lock = match peer_locks.get(id).unwrap().try_lock() {
Ok(lock) => lock,
Err(error) => {
panic!("Peer data mutex in receiving system may have had two simultaneous locks, this should not happen. Error is as follows: {error}");
},
};
(lock.1.address, (*id, lock))
})
.collect::<BTreeMap<_, _>>();

// Take locks for all the peers in our table
let mut peer_locks = peers.iter()
.map(|id| {
let lock = match peer_locks.get(id).unwrap().try_lock() {
Ok(lock) => lock,
Err(error) => {
panic!("Peer data mutex in receiving system may have had two simultaneous locks, this should not happen. Error is as follows: {error}");
},
};
(lock.1.address, (*id, lock))
})
.collect::<BTreeMap<_, _>>();

// Read all packets
let mut buffer = [0u8; MAXIMUM_PACKET_LENGTH];
loop {
// Read a packet from the socket
let (len, origin) = match socket.socket.recv_from(&mut buffer) {
Ok(v) => v,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(e) => {
error!("Error while reading UDP packets: {e}");
break
},
};

todo!()
}
});
}
});
}
// Read all packets
let mut buffer = [0u8; MAXIMUM_PACKET_LENGTH];
loop {
// Read a packet from the socket
let (len, origin) = match socket.socket.recv_from(&mut buffer) {
Ok(v) => v,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(e) => {
error!("Error while reading UDP packets: {e}");
break
},
};

todo!();
}
});
}
});
}
29 changes: 28 additions & 1 deletion udp/src/sending/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,32 @@ pub(crate) fn blocking_send_packets_system(
.map(|x| (x.0, Mutex::new((x.1, x.2))))
.collect::<BTreeMap<_,_>>();

todo!();
rayon::scope(|scope| {
for (port, socket) in sockets.iter() {
scope.spawn(|_| {
let peers = &socket.peers;

// Take locks for all the peers in our table
let mut peer_locks = peers.iter()
.map(|id| {
let lock = match peer_locks.get(id).unwrap().try_lock() {
Ok(lock) => lock,
Err(error) => {
panic!("Peer data mutex in sending system may have had two simultaneous locks, this should not happen. Error is as follows: {error}");
},
};
(*id, lock)
})
.collect::<BTreeMap<_, _>>();

let iter = outgoing
.iter_all()
.filter(|(_, target, _)| { peers.binary_search(target).is_ok() });

for (channel, target, data) in iter {
todo!();
}
});
}
});
}

0 comments on commit 00fef20

Please sign in to comment.