Skip to content

Commit

Permalink
Progress on sync sockets (19)
Browse files Browse the repository at this point in the history
  • Loading branch information
zmerp committed Jul 12, 2023
1 parent c636bd5 commit 30349d7
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 61 deletions.
38 changes: 25 additions & 13 deletions alvr/client_core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use alvr_packets::{
};
use alvr_session::{settings_schema::Switch, SessionConfig};
use alvr_sockets::{
spawn_cancelable, PeerType, ProtoControlSocket, ReceiverBuffer, StreamSender,
StreamSocketBuilder,
PeerType, ProtoControlSocket, ReceiverBuffer, StreamSender, StreamSocketBuilder,
};
use serde_json as json;
use std::{
Expand Down Expand Up @@ -539,6 +538,28 @@ fn connection_pipeline(
}
});

let stream_receive_thread = thread::spawn(move || {
while let Some(runtime) = &*CONNECTION_RUNTIME.read() {
let res = runtime.block_on(async {
tokio::select! {
res = stream_socket.recv() => Some(res),
_ = time::sleep(Duration::from_millis(500)) => None,
}
});
match res {
Some(Ok(())) => (),
Some(Err(e)) => {
info!("Client disconnected. Cause: {e}");
set_hud_message(SERVER_DISCONNECTED_MESSAGE);
DISCONNECT_SERVER_NOTIFIER.notify_waiters();

return;
}
None => continue,
}
}
});

let lifecycle_check_thread = thread::spawn(|| {
while IS_STREAMING.value() && IS_RESUMED.value() && IS_ALIVE.value() {
thread::sleep(Duration::from_millis(500));
Expand All @@ -548,17 +569,7 @@ fn connection_pipeline(
});

CONNECTION_RUNTIME.read().as_ref().unwrap().block_on(async {
let receive_loop = async move { stream_socket.receive_loop().await };
// Run many tasks concurrently. Threading is managed by the runtime, for best performance.
tokio::select! {
res = spawn_cancelable(receive_loop) => {
if let Err(e) = res {
info!("Server disconnected. Cause: {e}");
}
set_hud_message(
SERVER_DISCONNECTED_MESSAGE
);
},
_ = DISCONNECT_SERVER_NOTIFIER.notified() => (),
}
});
Expand All @@ -583,8 +594,9 @@ fn connection_pipeline(
game_audio_thread.join().ok();
microphone_thread.join().ok();
haptics_receive_thread.join().ok();
control_receive_thread.join().ok();
control_send_thread.join().ok();
control_receive_thread.join().ok();
stream_receive_thread.join().ok();
keepalive_sender_thread.join().ok();
lifecycle_check_thread.join().ok();

Expand Down
42 changes: 34 additions & 8 deletions alvr/server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
}
});

let control_thread = thread::spawn({
let control_receive_thread = thread::spawn({
let control_sender = Arc::clone(&control_sender);
let client_hostname = client_hostname.clone();
move || loop {
Expand Down Expand Up @@ -1035,6 +1035,37 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
}
});

let stream_receive_thread = thread::spawn({
let client_hostname = client_hostname.clone();
move || {
while let Some(runtime) = &*CONNECTION_RUNTIME.read() {
let res = runtime.block_on(async {
tokio::select! {
res = stream_socket.recv() => Some(res),
_ = time::sleep(Duration::from_millis(500)) => None,
}
});
match res {
Some(Ok(())) => (),
Some(Err(e)) => {
info!("Client disconnected. Cause: {e}");

SERVER_DATA_MANAGER.write().update_client_list(
client_hostname,
ClientListAction::SetConnectionState(ConnectionState::Disconnecting {
should_be_removed: false,
}),
);
DISCONNECT_CLIENT_NOTIFIER.notify_waiters();

return;
}
None => continue,
}
}
}
});

let lifecycle_check_thread = thread::spawn(|| {
while SHOULD_CONNECT_TO_CLIENTS.value() && CONNECTION_RUNTIME.read().is_some() {
thread::sleep(Duration::from_millis(500));
Expand Down Expand Up @@ -1077,12 +1108,6 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
.unwrap()
.block_on(async move {
tokio::select! {
res = stream_socket.receive_loop() => {
if let Err(e) = res {
info!("Client disconnected. Cause: {e}" );
}
},

_ = RESTART_NOTIFIER.notified() => {
control_sender
.lock()
Expand Down Expand Up @@ -1126,7 +1151,8 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
microphone_thread.join().ok();
tracking_receive_thread.join().ok();
statistics_thread.join().ok();
control_thread.join().ok();
control_receive_thread.join().ok();
stream_receive_thread.join().ok();
keepalive_thread.join().ok();
lifecycle_check_thread.join().ok();
});
Expand Down
25 changes: 0 additions & 25 deletions alvr/sockets/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,3 @@ pub const HANDSHAKE_PACKET_SIZE_BYTES: usize = 56; // this may change in future
pub const KEEPALIVE_INTERVAL: Duration = Duration::from_millis(500);

type Ldc = tokio_util::codec::LengthDelimitedCodec;

mod util {
use alvr_common::prelude::*;
use std::future::Future;
use tokio::{sync::oneshot, task};

// Tokio tasks are not cancelable. This function awaits a cancelable task.
pub async fn spawn_cancelable(
future: impl Future<Output = StrResult> + Send + 'static,
) -> StrResult {
// this channel is actually never used. cancel_receiver will be notified when _cancel_sender
// is dropped
let (_cancel_sender, cancel_receiver) = oneshot::channel::<()>();

task::spawn(async {
tokio::select! {
res = future => res,
_ = cancel_receiver => Ok(()),
}
})
.await
.map_err(err!())?
}
}
pub use util::*;
8 changes: 4 additions & 4 deletions alvr/sockets/src/stream_socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,13 @@ impl StreamSocket {
}
}

pub async fn receive_loop(&self) -> StrResult {
match self.receive_socket.lock().await.take().unwrap() {
pub async fn recv(&self) -> StrResult {
match self.receive_socket.lock().await.as_mut().unwrap() {
StreamReceiveSocket::Udp(socket) => {
udp::receive_loop(socket, Arc::clone(&self.packet_queues)).await
udp::recv(socket, Arc::clone(&self.packet_queues)).await
}
StreamReceiveSocket::Tcp(socket) => {
tcp::receive_loop(socket, Arc::clone(&self.packet_queues)).await
tcp::recv(socket, Arc::clone(&self.packet_queues)).await
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions alvr/sockets/src/stream_socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ pub async fn connect_to_client(
Ok((Arc::new(Mutex::new(send_socket)), receive_socket))
}

pub async fn receive_loop(
mut socket: TcpStreamReceiveSocket,
pub async fn recv(
socket: &mut TcpStreamReceiveSocket,
packet_enqueuers: Arc<Mutex<HashMap<u16, mpsc::UnboundedSender<BytesMut>>>>,
) -> StrResult {
while let Some(maybe_packet) = socket.next().await {
if let Some(maybe_packet) = socket.next().await {
let mut packet = maybe_packet.map_err(err!())?;

let stream_id = packet.get_u16();
if let Some(enqueuer) = packet_enqueuers.lock().await.get_mut(&stream_id) {
enqueuer.send(packet).map_err(err!())?;
}
}

Ok(())
Ok(())
} else {
fmt_e!("Socket closed")
}
}
15 changes: 9 additions & 6 deletions alvr/sockets/src/stream_socket/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,25 @@ pub async fn connect(
))
}

pub async fn receive_loop(
mut socket: UdpStreamReceiveSocket,
pub async fn recv(
socket: &mut UdpStreamReceiveSocket,
packet_enqueuers: Arc<Mutex<HashMap<u16, mpsc::UnboundedSender<BytesMut>>>>,
) -> StrResult {
while let Some(maybe_packet) = socket.inner.next().await {
if let Some(maybe_packet) = socket.inner.next().await {
let (mut packet_bytes, address) = maybe_packet.map_err(err!())?;

if address != socket.peer_addr {
continue;
// Non fatal
return Ok(());
}

let stream_id = packet_bytes.get_u16();
if let Some(enqueuer) = packet_enqueuers.lock().await.get_mut(&stream_id) {
enqueuer.send(packet_bytes).map_err(err!())?;
}
}

Ok(())
Ok(())
} else {
fmt_e!("Socket closed")
}
}

0 comments on commit 30349d7

Please sign in to comment.