From 4ee89ab84933185bb9dfabb65dcf6da9125ef1cc Mon Sep 17 00:00:00 2001 From: Riccardo Zaglia Date: Fri, 14 Jul 2023 16:14:23 +0800 Subject: [PATCH] Move from `StrResult` to `anyhow::Result` * Add `ToAny`, `ToCon` and `AnyhowToCon` * Remove all current tracing macros and add `bail_con!()` --- Cargo.lock | 4 + alvr/audio/src/lib.rs | 185 +++++++++++------------ alvr/audio/src/windows.rs | 47 +++--- alvr/client_core/src/audio.rs | 26 ++-- alvr/client_core/src/c_api.rs | 5 +- alvr/client_core/src/connection.rs | 31 ++-- alvr/client_core/src/decoder.rs | 8 +- alvr/client_core/src/lib.rs | 2 +- alvr/client_core/src/platform/android.rs | 15 +- alvr/client_core/src/sockets.rs | 14 +- alvr/client_core/src/storage.rs | 2 +- alvr/client_openxr/src/lib.rs | 5 +- alvr/common/Cargo.toml | 1 + alvr/common/src/connection_result.rs | 64 ++++++++ alvr/common/src/lib.rs | 37 +---- alvr/common/src/logging.rs | 72 +++------ alvr/dashboard/src/data_sources.rs | 2 +- alvr/dashboard/src/steamvr_launcher.rs | 13 +- alvr/events/src/lib.rs | 2 +- alvr/server/src/connection.rs | 76 +++++----- alvr/server/src/face_tracking.rs | 10 +- alvr/server/src/lib.rs | 10 +- alvr/server/src/openvr_props.rs | 2 +- alvr/server/src/sockets.rs | 22 ++- alvr/server/src/web_server.rs | 46 +++--- alvr/server_io/src/lib.rs | 42 +++-- alvr/server_io/src/openvr_drivers.rs | 38 +++-- alvr/server_io/src/openvrpaths.rs | 42 ++--- alvr/session/src/lib.rs | 21 ++- alvr/sockets/src/control_socket.rs | 50 +++--- alvr/sockets/src/stream_socket/mod.rs | 65 ++++---- alvr/sockets/src/stream_socket/tcp.rs | 34 ++--- alvr/sockets/src/stream_socket/udp.rs | 22 +-- 33 files changed, 482 insertions(+), 533 deletions(-) create mode 100644 alvr/common/src/connection_result.rs diff --git a/Cargo.lock b/Cargo.lock index fecd1f2ffd..25ae2f2508 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,6 +246,7 @@ dependencies = [ name = "alvr_common" version = "21.0.0-dev00" dependencies = [ + "anyhow", "backtrace", "glam", "log", @@ -529,6 +530,9 @@ name = "anyhow" version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +dependencies = [ + "backtrace", +] [[package]] name = "app_dirs2" diff --git a/alvr/audio/src/lib.rs b/alvr/audio/src/lib.rs index 04a2888397..c191d27639 100644 --- a/alvr/audio/src/lib.rs +++ b/alvr/audio/src/lib.rs @@ -5,10 +5,11 @@ mod windows; pub use crate::windows::*; use alvr_common::{ + anyhow::{self, anyhow, bail, Context, Result}, + info, once_cell::sync::Lazy, parking_lot::{Mutex, RwLock}, - prelude::*, - RelaxedAtomic, + ConnectionError, RelaxedAtomic, ToAny, }; use alvr_session::{ AudioBufferingConfig, CustomAudioDeviceConfig, LinuxAudioBackend, MicrophoneDevicesConfig, @@ -38,49 +39,41 @@ static VIRTUAL_MICROPHONE_PAIRS: Lazy> = Lazy::new(|| { .collect() }); -fn device_from_custom_config(host: &Host, config: &CustomAudioDeviceConfig) -> StrResult { +fn device_from_custom_config(host: &Host, config: &CustomAudioDeviceConfig) -> Result { Ok(match config { CustomAudioDeviceConfig::NameSubstring(name_substring) => host - .devices() - .map_err(err!())? + .devices()? .find(|d| { d.name() .map(|name| name.to_lowercase().contains(&name_substring.to_lowercase())) .unwrap_or(false) }) - .ok_or_else(|| { + .with_context(|| { format!("Cannot find audio device which name contains \"{name_substring}\"") })?, CustomAudioDeviceConfig::Index(index) => host - .devices() - .map_err(err!())? + .devices()? .nth(*index) - .ok_or_else(|| format!("Cannot find audio device at index {index}"))?, + .with_context(|| format!("Cannot find audio device at index {index}"))?, }) } -fn microphone_pair_from_sink_name(host: &Host, sink_name: &str) -> StrResult<(Device, Device)> { +fn microphone_pair_from_sink_name(host: &Host, sink_name: &str) -> Result<(Device, Device)> { let sink = host - .output_devices() - .map_err(err!())? + .output_devices()? .find(|d| d.name().unwrap_or_default().contains(sink_name)) - .ok_or_else(|| { - "VB-CABLE or Voice Meeter not found. Please install or reinstall either one".to_owned() - })?; + .context("VB-CABLE or Voice Meeter not found. Please install or reinstall either one")?; if let Some(source_name) = VIRTUAL_MICROPHONE_PAIRS.get(sink_name) { Ok(( sink, - host.input_devices() - .map_err(err!())? + host.input_devices()? .find(|d| { d.name() .map(|name| name.contains(source_name)) .unwrap_or(false) }) - .ok_or_else(|| { - "Matching output microphone not found. Did you rename it?".to_owned() - })?, + .context("Matching output microphone not found. Did you rename it?")?, )) } else { unreachable!("Invalid argument") @@ -98,7 +91,7 @@ impl AudioDevice { pub fn new_output( linux_backend: Option, config: Option<&CustomAudioDeviceConfig>, - ) -> StrResult { + ) -> Result { #[cfg(target_os = "linux")] let host = match linux_backend { Some(LinuxAudioBackend::Alsa) => cpal::host_from_id(cpal::HostId::Alsa).unwrap(), @@ -111,7 +104,7 @@ impl AudioDevice { let device = match config { None => host .default_output_device() - .ok_or_else(|| "No output audio device found".to_owned())?, + .context("No output audio device found")?, Some(config) => device_from_custom_config(&host, config)?, }; @@ -121,13 +114,13 @@ impl AudioDevice { }) } - pub fn new_input(config: Option) -> StrResult { + pub fn new_input(config: Option) -> Result { let host = cpal::default_host(); let device = match config { None => host .default_input_device() - .ok_or_else(|| "No input audio device found".to_owned())?, + .context("No input audio device found")?, Some(config) => device_from_custom_config(&host, &config)?, }; @@ -141,7 +134,7 @@ impl AudioDevice { pub fn new_virtual_microphone_pair( linux_backend: Option, config: MicrophoneDevicesConfig, - ) -> StrResult<(Self, Self)> { + ) -> Result<(Self, Self)> { #[cfg(target_os = "linux")] let host = match linux_backend { Some(LinuxAudioBackend::Alsa) => cpal::host_from_id(cpal::HostId::Alsa).unwrap(), @@ -153,7 +146,7 @@ impl AudioDevice { let (sink, source) = match config { MicrophoneDevicesConfig::Automatic => { - let mut pair = Err(String::new()); + let mut pair = Err(anyhow!("No microphones found")); for sink_name in VIRTUAL_MICROPHONE_PAIRS.keys() { pair = microphone_pair_from_sink_name(&host, sink_name); if pair.is_ok() { @@ -193,13 +186,12 @@ impl AudioDevice { )) } - pub fn input_sample_rate(&self) -> StrResult { + pub fn input_sample_rate(&self) -> Result { let config = self .inner .default_input_config() // On Windows, loopback devices are not recognized as input devices. Use output config. - .or_else(|_| self.inner.default_output_config()) - .map_err(err!())?; + .or_else(|_| self.inner.default_output_config())?; Ok(config.sample_rate().0) } @@ -213,11 +205,10 @@ pub fn is_same_device(device1: &AudioDevice, device2: &AudioDevice) -> bool { } } -#[derive(Clone)] pub enum AudioRecordState { Recording, ShouldStop, - Err(String), + Err(Option), } #[allow(unused_variables)] @@ -227,17 +218,16 @@ pub fn record_audio_blocking( device: &AudioDevice, channels_count: u16, mute: bool, -) -> StrResult { +) -> Result<()> { let config = device .inner .default_input_config() // On Windows, loopback devices are not recognized as input devices. Use output config. - .or_else(|_| device.inner.default_output_config()) - .map_err(err!())?; + .or_else(|_| device.inner.default_output_config())?; if config.channels() > 2 { // todo: handle more than 2 channels - return fmt_e!( + bail!( "Audio devices with more than 2 channels are not supported. {}", "Please turn off surround audio." ); @@ -251,70 +241,67 @@ pub fn record_audio_blocking( let state = Arc::new(Mutex::new(AudioRecordState::Recording)); - let stream = device - .inner - .build_input_stream_raw( - &stream_config, - config.sample_format(), - { - let state = Arc::clone(&state); - let runtime = Arc::clone(&runtime); - move |data, _| { - let data = if config.sample_format() == SampleFormat::F32 { - data.bytes() - .chunks_exact(4) - .flat_map(|b| { - f32::from_ne_bytes([b[0], b[1], b[2], b[3]]) - .to_sample::() - .to_ne_bytes() - .to_vec() - }) - .collect() - } else { - data.bytes().to_vec() - }; - - let data = if config.channels() == 1 && channels_count == 2 { - data.chunks_exact(2) - .flat_map(|c| vec![c[0], c[1], c[0], c[1]]) - .collect() - } else if config.channels() == 2 && channels_count == 1 { - data.chunks_exact(4) - .flat_map(|c| vec![c[0], c[1]]) - .collect() - } else { - data - }; - - if let Some(runtime) = &*runtime.read() { - sender.send(runtime, &(), data).ok(); - } else { - *state.lock() = AudioRecordState::ShouldStop; - } + let stream = device.inner.build_input_stream_raw( + &stream_config, + config.sample_format(), + { + let state = Arc::clone(&state); + let runtime = Arc::clone(&runtime); + move |data, _| { + let data = if config.sample_format() == SampleFormat::F32 { + data.bytes() + .chunks_exact(4) + .flat_map(|b| { + f32::from_ne_bytes([b[0], b[1], b[2], b[3]]) + .to_sample::() + .to_ne_bytes() + .to_vec() + }) + .collect() + } else { + data.bytes().to_vec() + }; + + let data = if config.channels() == 1 && channels_count == 2 { + data.chunks_exact(2) + .flat_map(|c| vec![c[0], c[1], c[0], c[1]]) + .collect() + } else if config.channels() == 2 && channels_count == 1 { + data.chunks_exact(4) + .flat_map(|c| vec![c[0], c[1]]) + .collect() + } else { + data + }; + + if let Some(runtime) = &*runtime.read() { + sender.send(runtime, &(), data).ok(); + } else { + *state.lock() = AudioRecordState::ShouldStop; } - }, - { - let state = Arc::clone(&state); - move |e| *state.lock() = AudioRecordState::Err(e.to_string()) - }, - None, - ) - .map_err(err!())?; + } + }, + { + let state = Arc::clone(&state); + move |e| *state.lock() = AudioRecordState::Err(Some(e.into())) + }, + None, + )?; #[cfg(windows)] if mute && device.is_output { crate::windows::set_mute_windows_device(device, true).ok(); } - let mut res = stream.play().map_err(err!()); + let mut res = stream.play().to_any(); if res.is_ok() { while matches!(*state.lock(), AudioRecordState::Recording) && runtime.read().is_some() { thread::sleep(Duration::from_millis(500)) } - if let AudioRecordState::Err(e) = state.lock().clone() { - res = Err(e); + if let AudioRecordState::Err(e) = &mut *state.lock() { + res = Err(e.take().unwrap()); } } @@ -367,14 +354,14 @@ pub fn receive_samples_loop( channels_count: usize, batch_frames_count: usize, average_buffer_frames_count: usize, -) -> StrResult { +) -> Result<()> { let mut receiver_buffer = ReceiverBuffer::new(); let mut recovery_sample_buffer = vec![]; while running.value() { match receiver.recv_buffer(Duration::from_millis(500), &mut receiver_buffer) { Ok(true) => (), Ok(false) | Err(ConnectionError::Timeout) => continue, - Err(ConnectionError::Other(e)) => return fmt_e!("{e}"), + Err(ConnectionError::Other(e)) => return Err(e), }; let (_, packet) = receiver_buffer.get()?; @@ -519,7 +506,7 @@ pub fn play_audio_loop( sample_rate: u32, config: AudioBufferingConfig, receiver: StreamReceiver<()>, -) -> StrResult { +) -> Result<()> { // Size of a chunk of frames. It corresponds to the duration if a fade-in/out in frames. let batch_frames_count = sample_rate as usize * config.batch_ms as usize / 1000; @@ -529,18 +516,16 @@ pub fn play_audio_loop( let sample_buffer = Arc::new(Mutex::new(VecDeque::new())); - let (_stream, handle) = OutputStream::try_from_device(&device.inner).map_err(err!())?; + let (_stream, handle) = OutputStream::try_from_device(&device.inner)?; - handle - .play_raw(StreamingSource { - sample_buffer: Arc::clone(&sample_buffer), - current_batch: vec![], - current_batch_cursor: 0, - channels_count: channels_count as _, - sample_rate, - batch_frames_count, - }) - .map_err(err!())?; + handle.play_raw(StreamingSource { + sample_buffer: Arc::clone(&sample_buffer), + current_batch: vec![], + current_batch_cursor: 0, + channels_count: channels_count as _, + sample_rate, + batch_frames_count, + })?; receive_samples_loop( running, diff --git a/alvr/audio/src/windows.rs b/alvr/audio/src/windows.rs index 452b84f42b..43006a2b58 100644 --- a/alvr/audio/src/windows.rs +++ b/alvr/audio/src/windows.rs @@ -1,8 +1,8 @@ use crate::AudioDevice; -use alvr_common::prelude::*; +use alvr_common::anyhow::{bail, Result}; use rodio::DeviceTrait; -fn get_windows_device(device: &AudioDevice) -> StrResult { +fn get_windows_device(device: &AudioDevice) -> Result { use widestring::U16CStr; use windows::Win32::{ Devices::FunctionDiscovery::PKEY_Device_FriendlyName, @@ -10,54 +10,49 @@ fn get_windows_device(device: &AudioDevice) -> StrResult StrResult { +pub fn get_windows_device_id(device: &AudioDevice) -> Result { use widestring::U16CStr; use windows::Win32::System::Com; unsafe { let imm_device = get_windows_device(device)?; - let id_str_ptr = imm_device.GetId().map_err(err!())?; - let id_str = U16CStr::from_ptr_str(id_str_ptr.0) - .to_string() - .map_err(err!())?; + let id_str_ptr = imm_device.GetId()?; + let id_str = U16CStr::from_ptr_str(id_str_ptr.0).to_string()?; Com::CoTaskMemFree(Some(id_str_ptr.0 as _)); Ok(id_str) @@ -65,7 +60,7 @@ pub fn get_windows_device_id(device: &AudioDevice) -> StrResult { } // device must be an output device -pub fn set_mute_windows_device(device: &AudioDevice, mute: bool) -> StrResult { +pub fn set_mute_windows_device(device: &AudioDevice, mute: bool) -> Result<()> { use windows::{ core::GUID, Win32::{Media::Audio::Endpoints::IAudioEndpointVolume, System::Com::CLSCTX_ALL}, @@ -74,13 +69,9 @@ pub fn set_mute_windows_device(device: &AudioDevice, mute: bool) -> StrResult { unsafe { let imm_device = get_windows_device(device)?; - let endpoint_volume = imm_device - .Activate::(CLSCTX_ALL, None) - .map_err(err!())?; + let endpoint_volume = imm_device.Activate::(CLSCTX_ALL, None)?; - endpoint_volume - .SetMute(mute, &GUID::zeroed()) - .map_err(err!())?; + endpoint_volume.SetMute(mute, &GUID::zeroed())?; } Ok(()) diff --git a/alvr/client_core/src/audio.rs b/alvr/client_core/src/audio.rs index ffce9d8e78..556f70ac24 100644 --- a/alvr/client_core/src/audio.rs +++ b/alvr/client_core/src/audio.rs @@ -1,8 +1,8 @@ use alvr_audio::{AudioDevice, AudioRecordState}; use alvr_common::{ + anyhow::{bail, Result}, parking_lot::{Mutex, RwLock}, - prelude::*, - RelaxedAtomic, + RelaxedAtomic, ToAny, }; use alvr_session::AudioBufferingConfig; use alvr_sockets::{StreamReceiver, StreamSender}; @@ -46,7 +46,7 @@ impl AudioInputCallback for RecorderCallback { } fn on_error_before_close(&mut self, _: &mut dyn AudioInputStreamSafe, error: oboe::Error) { - *self.state.lock() = AudioRecordState::Err(error.to_string()); + *self.state.lock() = AudioRecordState::Err(Some(error.into())); } } @@ -57,7 +57,7 @@ pub fn record_audio_blocking( device: &AudioDevice, channels_count: u16, mute: bool, -) -> StrResult { +) -> Result<()> { let sample_rate = device.input_sample_rate()?; let state = Arc::new(Mutex::new(AudioRecordState::Recording)); @@ -77,18 +77,17 @@ pub fn record_audio_blocking( sender, state: Arc::clone(&state), }) - .open_stream() - .map_err(err!())?; + .open_stream()?; - let mut res = stream.start().map_err(err!()); + let mut res = stream.start().to_any(); if res.is_ok() { while matches!(*state.lock(), AudioRecordState::Recording) && runtime.read().is_some() { thread::sleep(Duration::from_millis(500)) } - if let AudioRecordState::Err(e) = state.lock().clone() { - res = Err(e); + if let AudioRecordState::Err(e) = &mut *state.lock() { + res = Err(e.take().unwrap()); } } @@ -132,11 +131,11 @@ pub fn play_audio_loop( sample_rate: u32, config: AudioBufferingConfig, receiver: StreamReceiver<()>, -) -> StrResult { +) -> Result<()> { // the client sends invalid sample rates sometimes, and we crash if we try and use one // (batch_frames_count ends up zero and the audio callback gets confused) if sample_rate < 8000 { - return fmt_e!("Invalid audio sample rate"); + bail!("Invalid audio sample rate"); } let batch_frames_count = sample_rate as usize * config.batch_ms as usize / 1000; @@ -159,10 +158,9 @@ pub fn play_audio_loop( sample_buffer: Arc::clone(&sample_buffer), batch_frames_count, }) - .open_stream() - .map_err(err!())?; + .open_stream()?; - stream.start().map_err(err!())?; + stream.start()?; alvr_audio::receive_samples_loop( running, diff --git a/alvr/client_core/src/c_api.rs b/alvr/client_core/src/c_api.rs index a9ba3e2474..5cf32150f2 100644 --- a/alvr/client_core/src/c_api.rs +++ b/alvr/client_core/src/c_api.rs @@ -3,11 +3,12 @@ use crate::{ ClientCoreEvent, }; use alvr_common::{ + debug, error, glam::{Quat, UVec2, Vec2, Vec3}, + info, once_cell::sync::Lazy, parking_lot::Mutex, - prelude::*, - DeviceMotion, Fov, Pose, + warn, DeviceMotion, Fov, Pose, }; use alvr_packets::{ButtonEntry, ButtonValue, Tracking}; use alvr_session::{CodecType, FoveatedRenderingConfig}; diff --git a/alvr/client_core/src/connection.rs b/alvr/client_core/src/connection.rs index be5046fcde..ae389d7a44 100644 --- a/alvr/client_core/src/connection.rs +++ b/alvr/client_core/src/connection.rs @@ -10,11 +10,12 @@ use crate::{ }; use alvr_audio::AudioDevice; use alvr_common::{ + debug, error, glam::UVec2, + info, once_cell::sync::Lazy, parking_lot::{Mutex, RwLock}, - prelude::*, - ALVR_VERSION, + warn, AnyhowToCon, ConResult, ConnectionError, ToCon, ALVR_VERSION, }; use alvr_packets::{ ClientConnectionResult, ClientControlPacket, ClientStatistics, Haptics, ServerControlPacket, @@ -119,12 +120,12 @@ fn connection_pipeline( .worker_threads(2) .enable_all() .build() - .map_err(to_con_e!())?; + .to_con()?; let (mut proto_control_socket, server_ip) = { let config = Config::load(); - let announcer_socket = AnnouncerSocket::new(&config.hostname).map_err(to_con_e!())?; - let listener_socket = alvr_sockets::get_server_listener(&runtime).map_err(to_con_e!())?; + let announcer_socket = AnnouncerSocket::new(&config.hostname).to_con()?; + let listener_socket = alvr_sockets::get_server_listener(&runtime).to_con()?; loop { if !IS_ALIVE.value() { @@ -183,22 +184,20 @@ fn connection_pipeline( }), }, ) - .map_err(to_con_e!())?; - let config_packet = proto_control_socket - .recv::(&runtime, Duration::from_secs(1)) - .map_err(to_con_e!())?; + .to_con()?; + let config_packet = + proto_control_socket.recv::(&runtime, Duration::from_secs(1))?; let settings = { let mut session_desc = SessionConfig::default(); session_desc - .merge_from_json(&json::from_str(&config_packet.session).map_err(to_con_e!())?) - .map_err(to_con_e!())?; + .merge_from_json(&json::from_str(&config_packet.session).to_con()?) + .to_con()?; session_desc.to_settings() }; let negotiated_config = - json::from_str::>(&config_packet.negotiated) - .map_err(to_con_e!())?; + json::from_str::>(&config_packet.negotiated).to_con()?; let view_resolution = negotiated_config .get("view_resolution") @@ -260,7 +259,7 @@ fn connection_pipeline( settings.connection.client_send_buffer_bytes, settings.connection.client_recv_buffer_bytes, ) - .map_err(to_con_e!())?; + .to_con()?; if let Err(e) = control_sender.send(&runtime, &ClientControlPacket::StreamReady) { info!("Server disconnected. Cause: {e}"); @@ -347,7 +346,7 @@ fn connection_pipeline( }); let game_audio_thread = if let Switch::Enabled(config) = settings.audio.game_audio { - let device = AudioDevice::new_output(None, None).map_err(to_con_e!())?; + let device = AudioDevice::new_output(None, None).to_con()?; thread::spawn(move || { alvr_common::show_err(audio::play_audio_loop( @@ -364,7 +363,7 @@ fn connection_pipeline( }; let microphone_thread = if matches!(settings.audio.microphone, Switch::Enabled(_)) { - let device = AudioDevice::new_input(None).map_err(to_con_e!())?; + let device = AudioDevice::new_input(None).to_con()?; let microphone_sender = stream_socket.request_stream(AUDIO); diff --git a/alvr/client_core/src/decoder.rs b/alvr/client_core/src/decoder.rs index f2574b0877..a2960a40fc 100644 --- a/alvr/client_core/src/decoder.rs +++ b/alvr/client_core/src/decoder.rs @@ -4,9 +4,6 @@ use alvr_packets::DecoderInitializationConfig; use alvr_session::{CodecType, MediacodecDataType}; use std::time::Duration; -#[cfg(target_os = "android")] -use alvr_common::prelude::*; - #[derive(Clone)] pub struct DecoderInitConfig { pub codec: CodecType, @@ -80,7 +77,10 @@ pub fn push_nal(timestamp: Duration, nal: &[u8]) -> bool { } else { #[cfg(target_os = "android")] if let Some(decoder) = &*DECODER_ENQUEUER.lock() { - matches!(show_err(decoder.push_frame_nal(timestamp, nal)), Some(true)) + matches!( + alvr_common::show_err(decoder.push_frame_nal(timestamp, nal)), + Some(true) + ) } else { false } diff --git a/alvr/client_core/src/lib.rs b/alvr/client_core/src/lib.rs index dafc397efd..e6f5dc933d 100644 --- a/alvr/client_core/src/lib.rs +++ b/alvr/client_core/src/lib.rs @@ -25,10 +25,10 @@ pub use logging_backend::init_logging; pub use platform::try_get_permission; use alvr_common::{ + error, glam::{UVec2, Vec2}, once_cell::sync::Lazy, parking_lot::Mutex, - prelude::*, Fov, RelaxedAtomic, }; use alvr_packets::{BatteryPacket, ButtonEntry, ClientControlPacket, Tracking, ViewsConfig}; diff --git a/alvr/client_core/src/platform/android.rs b/alvr/client_core/src/platform/android.rs index 69c43f8500..b20ac96c6a 100644 --- a/alvr/client_core/src/platform/android.rs +++ b/alvr/client_core/src/platform/android.rs @@ -1,9 +1,10 @@ use crate::decoder::DecoderInitConfig; use alvr_common::{ + anyhow::{bail, Result}, + error, info, once_cell::sync::Lazy, parking_lot::{Condvar, Mutex}, - prelude::*, - RelaxedAtomic, + warn, RelaxedAtomic, }; use alvr_session::{CodecType, MediacodecDataType}; use jni::{ @@ -306,7 +307,7 @@ unsafe impl Send for VideoDecoderEnqueuer {} impl VideoDecoderEnqueuer { // Block until the buffer has been written or timeout is reached. Returns false if timeout. - pub fn push_frame_nal(&self, timestamp: Duration, data: &[u8]) -> StrResult { + pub fn push_frame_nal(&self, timestamp: Duration, data: &[u8]) -> Result { let Some(decoder) = &*self.inner.lock() else { // This might happen only during destruction return Ok(false); @@ -325,14 +326,12 @@ impl VideoDecoderEnqueuer { // NB: the function expects the timestamp in micros, but nanos is used to have // complete precision, so when converted back to Duration it can compare correctly // to other Durations - decoder - .queue_input_buffer(buffer, 0, data.len(), timestamp.as_nanos() as _, 0) - .map_err(err!())?; + decoder.queue_input_buffer(buffer, 0, data.len(), timestamp.as_nanos() as _, 0)?; Ok(true) } Ok(DequeuedInputBufferResult::TryAgainLater) => Ok(false), - Err(e) => fmt_e!("{e}"), + Err(e) => bail!("{e}"), } } } @@ -410,7 +409,7 @@ pub fn video_decoder_split( config: DecoderInitConfig, csd_0: Vec, dequeued_frame_callback: impl Fn(Duration) + Send + 'static, -) -> StrResult<(VideoDecoderEnqueuer, VideoDecoderDequeuer)> { +) -> Result<(VideoDecoderEnqueuer, VideoDecoderDequeuer)> { let running = Arc::new(RelaxedAtomic::new(true)); let decoder_enqueuer = Arc::new(Mutex::new(None::)); let decoder_ready_notifier = Arc::new(Condvar::new()); diff --git a/alvr/client_core/src/sockets.rs b/alvr/client_core/src/sockets.rs index f5eb219731..1a0607596a 100644 --- a/alvr/client_core/src/sockets.rs +++ b/alvr/client_core/src/sockets.rs @@ -1,4 +1,4 @@ -use alvr_common::{StrResult, *}; +use alvr_common::{anyhow::Result, ALVR_NAME}; use alvr_sockets::{CONTROL_PORT, LOCAL_IP}; use std::net::{Ipv4Addr, UdpSocket}; @@ -8,9 +8,9 @@ pub struct AnnouncerSocket { } impl AnnouncerSocket { - pub fn new(hostname: &str) -> StrResult { - let socket = UdpSocket::bind((LOCAL_IP, CONTROL_PORT)).map_err(err!())?; - socket.set_broadcast(true).map_err(err!())?; + pub fn new(hostname: &str) -> Result { + let socket = UdpSocket::bind((LOCAL_IP, CONTROL_PORT))?; + socket.set_broadcast(true)?; let mut packet = [0; 56]; packet[0..ALVR_NAME.len()].copy_from_slice(ALVR_NAME.as_bytes()); @@ -20,10 +20,10 @@ impl AnnouncerSocket { Ok(Self { socket, packet }) } - pub fn broadcast(&self) -> StrResult { + pub fn broadcast(&self) -> Result<()> { self.socket - .send_to(&self.packet, (Ipv4Addr::BROADCAST, CONTROL_PORT)) - .map_err(err!())?; + .send_to(&self.packet, (Ipv4Addr::BROADCAST, CONTROL_PORT))?; + Ok(()) } } diff --git a/alvr/client_core/src/storage.rs b/alvr/client_core/src/storage.rs index 3580b53533..c60f76cf15 100644 --- a/alvr/client_core/src/storage.rs +++ b/alvr/client_core/src/storage.rs @@ -1,4 +1,4 @@ -use alvr_common::prelude::*; +use alvr_common::{error, info}; use app_dirs2::{AppDataType, AppInfo}; use rand::Rng; use serde::{Deserialize, Serialize}; diff --git a/alvr/client_openxr/src/lib.rs b/alvr/client_openxr/src/lib.rs index 289257ef80..587ec22bb6 100644 --- a/alvr/client_openxr/src/lib.rs +++ b/alvr/client_openxr/src/lib.rs @@ -2,10 +2,11 @@ mod interaction; use alvr_client_core::{opengl::RenderViewInput, ClientCoreEvent}; use alvr_common::{ + error, glam::{Quat, UVec2, Vec2, Vec3}, - prelude::*, + info, settings_schema::Switch, - DeviceMotion, Fov, Pose, RelaxedAtomic, HEAD_ID, LEFT_HAND_ID, RIGHT_HAND_ID, + warn, DeviceMotion, Fov, Pose, RelaxedAtomic, HEAD_ID, LEFT_HAND_ID, RIGHT_HAND_ID, }; use alvr_packets::{FaceData, Tracking}; use alvr_session::ClientsideFoveationMode; diff --git a/alvr/common/Cargo.toml b/alvr/common/Cargo.toml index ab48a9f2e7..ee0df872be 100644 --- a/alvr/common/Cargo.toml +++ b/alvr/common/Cargo.toml @@ -7,6 +7,7 @@ authors.workspace = true license.workspace = true [dependencies] +anyhow = { version = "1", features = ["backtrace"] } backtrace = "0.3" glam = { version = "0.24", features = ["serde"] } log = "0.4" diff --git a/alvr/common/src/connection_result.rs b/alvr/common/src/connection_result.rs new file mode 100644 index 0000000000..9f97249954 --- /dev/null +++ b/alvr/common/src/connection_result.rs @@ -0,0 +1,64 @@ +use anyhow::Result; +use std::{error::Error, fmt::Display}; + +pub enum ConnectionError { + Timeout, + Other(anyhow::Error), +} + +impl Display for ConnectionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ConnectionError::Timeout => write!(f, "Timeout"), + ConnectionError::Other(s) => write!(f, "{}", s), + } + } +} + +pub type ConResult = Result; + +pub fn timeout() -> ConResult { + Err(ConnectionError::Timeout) +} + +#[macro_export] +macro_rules! con_bail { + ($($args:tt)+) => { + return Err(alvr_common::ConnectionError::Other(alvr_common::anyhow::anyhow!($($args)+))) + }; +} + +pub trait ToCon { + fn to_con(self) -> ConResult; +} + +impl ToCon for Option { + fn to_con(self) -> ConResult { + match self { + Some(value) => Ok(value), + None => Err(ConnectionError::Other(anyhow::anyhow!("Unexpected None"))), + } + } +} + +impl ToCon for Result { + fn to_con(self) -> ConResult { + match self { + Ok(value) => Ok(value), + Err(e) => Err(ConnectionError::Other(e.into())), + } + } +} + +pub trait AnyhowToCon { + fn to_con(self) -> ConResult; +} + +impl AnyhowToCon for Result { + fn to_con(self) -> ConResult { + match self { + Ok(value) => Ok(value), + Err(e) => Err(ConnectionError::Other(e)), + } + } +} diff --git a/alvr/common/src/lib.rs b/alvr/common/src/lib.rs index ebaf858f02..9e65d517c6 100644 --- a/alvr/common/src/lib.rs +++ b/alvr/common/src/lib.rs @@ -1,22 +1,13 @@ mod average; +mod connection_result; mod logging; mod paths; mod primitives; mod version; -use std::{ - fmt::Display, - sync::atomic::{AtomicBool, Ordering}, -}; - -pub mod prelude { - pub use crate::{ - con_e, con_fmt_e, enone, err, err_dbg, fmt_e, logging::*, timeout, to_con_e, ConResult, - ConnectionError, StrResult, - }; - pub use log::{debug, error, info, warn}; -} +use std::sync::atomic::{AtomicBool, Ordering}; +pub use anyhow; pub use log; pub use once_cell; pub use parking_lot; @@ -24,6 +15,8 @@ pub use semver; pub use settings_schema; pub use average::*; +pub use connection_result::*; +pub use log::{debug, error, info, warn}; pub use logging::*; pub use paths::*; pub use primitives::*; @@ -31,26 +24,6 @@ pub use version::*; pub const ALVR_NAME: &str = "ALVR"; -pub type StrResult = Result; - -pub enum ConnectionError { - Timeout, - Other(String), -} -impl Display for ConnectionError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ConnectionError::Timeout => write!(f, "Timeout"), - ConnectionError::Other(s) => write!(f, "{}", s), - } - } -} -pub type ConResult = Result; - -pub fn timeout() -> ConResult { - Err(ConnectionError::Timeout) -} - // Simple wrapper for AtomicBool when using Ordering::Relaxed. Deref cannot be implemented (cannot // return local reference) pub struct RelaxedAtomic(AtomicBool); diff --git a/alvr/common/src/logging.rs b/alvr/common/src/logging.rs index 4ec4fe81fa..a53e7be1a6 100644 --- a/alvr/common/src/logging.rs +++ b/alvr/common/src/logging.rs @@ -1,7 +1,8 @@ +use anyhow::Result; use backtrace::Backtrace; use serde::{Deserialize, Serialize}; use settings_schema::SettingsSchema; -use std::{fmt::Display, future::Future}; +use std::{error::Error, fmt::Display}; #[derive( SettingsSchema, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, @@ -134,63 +135,24 @@ pub fn show_err_blocking(res: Result) -> Option { res.map_err(|e| show_e_block(e, true)).ok() } -pub async fn show_err_async( - future_res: impl Future>, -) -> Option { - show_err(future_res.await) +pub trait ToAny { + fn to_any(self) -> Result; } -#[macro_export] -macro_rules! fmt_e { - ($($args:tt)+) => { - Err(format!($($args)+)) - }; -} - -#[macro_export] -macro_rules! err { - () => { - |e| format!("At {}:{}: {e}", file!(), line!()) - }; -} - -// trace_err variant for errors that do not implement fmt::Display -#[macro_export] -macro_rules! err_dbg { - () => { - |e| format!("At {}:{}: {e:?}", file!(), line!()) - }; -} - -#[macro_export] -macro_rules! enone { - () => { - || format!("At {}:{}", file!(), line!()) - }; -} - -#[macro_export] -macro_rules! con_fmt_e { - ($($args:tt)+) => { - Err(ConnectionError::Other(format!($($args)+))) - }; -} - -#[macro_export] -macro_rules! con_e { - () => { - |e| match e { - ConnectionError::Timeout => ConnectionError::Timeout, - ConnectionError::Other(e) => { - ConnectionError::Other(format!("At {}:{}: {e}", file!(), line!())) - } +impl ToAny for Option { + fn to_any(self) -> Result { + match self { + Some(value) => Ok(value), + None => Err(anyhow::anyhow!("Unexpected None")), } - }; + } } -#[macro_export] -macro_rules! to_con_e { - () => { - |e| ConnectionError::Other(format!("At {}:{}: {e}", file!(), line!())) - }; +impl ToAny for Result { + fn to_any(self) -> Result { + match self { + Ok(value) => Ok(value), + Err(e) => Err(e.into()), + } + } } diff --git a/alvr/dashboard/src/data_sources.rs b/alvr/dashboard/src/data_sources.rs index 127cb4ecc4..224aa00530 100644 --- a/alvr/dashboard/src/data_sources.rs +++ b/alvr/dashboard/src/data_sources.rs @@ -1,4 +1,4 @@ -use alvr_common::{parking_lot::Mutex, prelude::*, RelaxedAtomic}; +use alvr_common::{debug, error, info, parking_lot::Mutex, warn, RelaxedAtomic}; use alvr_events::{Event, EventType}; use alvr_packets::ServerRequest; use alvr_server_io::ServerDataManager; diff --git a/alvr/dashboard/src/steamvr_launcher.rs b/alvr/dashboard/src/steamvr_launcher.rs index 44b2e32923..e53e978550 100644 --- a/alvr/dashboard/src/steamvr_launcher.rs +++ b/alvr/dashboard/src/steamvr_launcher.rs @@ -1,5 +1,5 @@ use crate::data_sources; -use alvr_common::{once_cell::sync::Lazy, parking_lot::Mutex, prelude::*}; +use alvr_common::{debug, once_cell::sync::Lazy, parking_lot::Mutex}; use alvr_filesystem as afs; use alvr_session::{DriverLaunchAction, DriversBackup}; use std::{ @@ -32,7 +32,7 @@ pub fn is_steamvr_running() -> bool { } #[cfg(target_os = "linux")] -pub fn maybe_wrap_vrcompositor_launcher() -> StrResult { +pub fn maybe_wrap_vrcompositor_launcher() -> alvr_common::anyhow::Result<()> { use std::fs; let steamvr_bin_dir = alvr_server_io::steamvr_root_dir()? @@ -42,17 +42,16 @@ pub fn maybe_wrap_vrcompositor_launcher() -> StrResult { // In case of SteamVR update, vrcompositor will be restored if fs::read_link(&launcher_path).is_ok() { - fs::remove_file(&launcher_path).map_err(err!())?; // recreate the link + fs::remove_file(&launcher_path)?; // recreate the link } else { - fs::rename(&launcher_path, steamvr_bin_dir.join("vrcompositor.real")).map_err(err!())?; + fs::rename(&launcher_path, steamvr_bin_dir.join("vrcompositor.real"))?; } std::os::unix::fs::symlink( afs::filesystem_layout_from_dashboard_exe(&env::current_exe().unwrap()) .vrcompositor_wrapper(), &launcher_path, - ) - .map_err(err!())?; + )?; Ok(()) } @@ -138,7 +137,7 @@ impl Launcher { } #[cfg(target_os = "linux")] - show_err(maybe_wrap_vrcompositor_launcher()); + alvr_common::show_err(maybe_wrap_vrcompositor_launcher()); if !is_steamvr_running() { debug!("SteamVR is dead. Launching..."); diff --git a/alvr/events/src/lib.rs b/alvr/events/src/lib.rs index d6d44b61ec..b6d056288f 100644 --- a/alvr/events/src/lib.rs +++ b/alvr/events/src/lib.rs @@ -1,4 +1,4 @@ -use alvr_common::{prelude::*, DeviceMotion, Pose}; +use alvr_common::{info, DeviceMotion, LogEntry, Pose}; use alvr_packets::{AudioDevicesList, ButtonValue}; use alvr_session::SessionConfig; use serde::{Deserialize, Serialize}; diff --git a/alvr/server/src/connection.rs b/alvr/server/src/connection.rs index 7ac24e1f0e..02730d82e0 100644 --- a/alvr/server/src/connection.rs +++ b/alvr/server/src/connection.rs @@ -11,12 +11,14 @@ use crate::{ }; use alvr_audio::AudioDevice; use alvr_common::{ + con_bail, debug, error, glam::{UVec2, Vec2}, + info, once_cell::sync::Lazy, parking_lot::{Mutex, RwLock}, - prelude::*, settings_schema::Switch, - RelaxedAtomic, DEVICE_ID_TO_PATH, HEAD_ID, LEFT_HAND_ID, RIGHT_HAND_ID, + warn, AnyhowToCon, ConResult, ConnectionError, RelaxedAtomic, ToCon, DEVICE_ID_TO_PATH, + HEAD_ID, LEFT_HAND_ID, RIGHT_HAND_ID, }; use alvr_events::{ButtonEvent, EventType, HapticsEvent, TrackingEvent}; use alvr_packets::{ @@ -306,7 +308,7 @@ pub fn handshake_loop() { } fn try_connect(mut client_ips: HashMap) -> ConResult { - let runtime = Runtime::new().map_err(to_con_e!())?; + let runtime = Runtime::new().to_con()?; let (mut proto_socket, client_ip) = ProtoControlSocket::connect_to( &runtime, @@ -366,9 +368,7 @@ fn try_connect(mut client_ips: HashMap) -> ConResult { display_name, streaming_capabilities, .. - } = proto_socket - .recv(&runtime, Duration::from_secs(1)) - .map_err(to_con_e!())? + } = proto_socket.recv(&runtime, Duration::from_secs(1))? { SERVER_DATA_MANAGER.write().update_client_list( client_hostname.clone(), @@ -394,7 +394,7 @@ fn try_connect(mut client_ips: HashMap) -> ConResult { let streaming_caps = if let Some(streaming_caps) = maybe_streaming_caps { streaming_caps } else { - return con_fmt_e!("Only streaming clients are supported for now"); + con_bail!("Only streaming clients are supported for now"); }; let settings = SERVER_DATA_MANAGER.read().settings().clone(); @@ -447,38 +447,37 @@ fn try_connect(mut client_ips: HashMap) -> ConResult { warn!("Chosen refresh rate not supported. Using {fps}Hz"); } - let game_audio_sample_rate = if let Switch::Enabled(game_audio_config) = - &settings.audio.game_audio - { - let game_audio_device = AudioDevice::new_output( - Some(settings.audio.linux_backend), - game_audio_config.device.as_ref(), - ) - .map_err(to_con_e!())?; - - #[cfg(not(target_os = "linux"))] - if let Switch::Enabled(microphone_desc) = &settings.audio.microphone { - let (sink, source) = AudioDevice::new_virtual_microphone_pair( + let game_audio_sample_rate = + if let Switch::Enabled(game_audio_config) = &settings.audio.game_audio { + let game_audio_device = AudioDevice::new_output( Some(settings.audio.linux_backend), - microphone_desc.devices.clone(), + game_audio_config.device.as_ref(), ) - .map_err(to_con_e!())?; - if alvr_audio::is_same_device(&game_audio_device, &sink) - || alvr_audio::is_same_device(&game_audio_device, &source) - { - return con_fmt_e!("Game audio and microphone cannot point to the same device!"); + .to_con()?; + + #[cfg(not(target_os = "linux"))] + if let Switch::Enabled(microphone_desc) = &settings.audio.microphone { + let (sink, source) = AudioDevice::new_virtual_microphone_pair( + Some(settings.audio.linux_backend), + microphone_desc.devices.clone(), + ) + .to_con()?; + if alvr_audio::is_same_device(&game_audio_device, &sink) + || alvr_audio::is_same_device(&game_audio_device, &source) + { + con_bail!("Game audio and microphone cannot point to the same device!"); + } } - } - game_audio_device.input_sample_rate().map_err(to_con_e!())? - } else { - 0 - }; + game_audio_device.input_sample_rate().to_con()? + } else { + 0 + }; let client_config = StreamConfigPacket { session: { let session = SERVER_DATA_MANAGER.read().session().clone(); - serde_json::to_string(&session).map_err(to_con_e!())? + serde_json::to_string(&session).to_con()? }, negotiated: serde_json::json!({ "view_resolution": stream_view_resolution, @@ -487,9 +486,7 @@ fn try_connect(mut client_ips: HashMap) -> ConResult { }) .to_string(), }; - proto_socket - .send(&runtime, &client_config) - .map_err(to_con_e!())?; + proto_socket.send(&runtime, &client_config).to_con()?; let (mut control_sender, mut control_receiver) = proto_socket.split(); @@ -512,15 +509,15 @@ fn try_connect(mut client_ips: HashMap) -> ConResult { control_sender .send(&runtime, &ServerControlPacket::StartStream) - .map_err(to_con_e!())?; + .to_con()?; match control_receiver.recv(&runtime, Duration::from_secs(1)) { Ok(ClientControlPacket::StreamReady) => (), Ok(_) => { - return con_fmt_e!("Got unexpected packet waiting for stream ack"); + con_bail!("Got unexpected packet waiting for stream ack"); } Err(e) => { - return con_fmt_e!("Error while waiting for stream ack: {e}"); + con_bail!("Error while waiting for stream ack: {e}"); } } @@ -545,8 +542,7 @@ fn try_connect(mut client_ips: HashMap) -> ConResult { settings.connection.server_send_buffer_bytes, settings.connection.server_recv_buffer_bytes, settings.connection.packet_size as _, - ) - .map_err(to_con_e!())?; + )?; let mut video_sender = stream_socket.request_stream(VIDEO); let game_audio_sender = stream_socket.request_stream(AUDIO); @@ -648,7 +644,7 @@ fn try_connect(mut client_ips: HashMap) -> ConResult { Some(settings.audio.linux_backend), config.devices, ) - .map_err(to_con_e!())?; + .to_con()?; #[cfg(windows)] if let Ok(id) = alvr_audio::get_windows_device_id(&source) { diff --git a/alvr/server/src/face_tracking.rs b/alvr/server/src/face_tracking.rs index f47d7f6cdd..8dc63fda8f 100644 --- a/alvr/server/src/face_tracking.rs +++ b/alvr/server/src/face_tracking.rs @@ -1,4 +1,4 @@ -use alvr_common::{glam::EulerRot, prelude::*}; +use alvr_common::{anyhow::Result, glam::EulerRot}; use alvr_packets::FaceData; use alvr_session::FaceTrackingSinkConfig; use bytes::{BufMut, BytesMut}; @@ -16,16 +16,14 @@ pub struct FaceTrackingSink { } impl FaceTrackingSink { - pub fn new(config: FaceTrackingSinkConfig, local_osc_port: u16) -> StrResult { + pub fn new(config: FaceTrackingSinkConfig, local_osc_port: u16) -> Result { let port = match config { FaceTrackingSinkConfig::VrchatEyeOsc { port } => port, FaceTrackingSinkConfig::VrcFaceTracking => VRCFT_PORT, }; - let socket = UdpSocket::bind(format!("127.0.0.1:{local_osc_port}")).map_err(err!())?; - socket - .connect(format!("127.0.0.1:{port}")) - .map_err(err!())?; + let socket = UdpSocket::bind(format!("127.0.0.1:{local_osc_port}"))?; + socket.connect(format!("127.0.0.1:{port}"))?; Ok(Self { config, diff --git a/alvr/server/src/lib.rs b/alvr/server/src/lib.rs index 1dad86a393..159b11fdd4 100644 --- a/alvr/server/src/lib.rs +++ b/alvr/server/src/lib.rs @@ -23,11 +23,11 @@ mod bindings { use bindings::*; use alvr_common::{ + error, glam::Quat, log, once_cell::sync::Lazy, parking_lot::{Mutex, RwLock}, - prelude::*, }; use alvr_events::EventType; use alvr_filesystem::{self as afs, Layout}; @@ -51,7 +51,9 @@ use sysinfo::{ProcessRefreshKind, RefreshKind, SystemExt}; use tokio::{runtime::Runtime, sync::broadcast}; static FILESYSTEM_LAYOUT: Lazy = Lazy::new(|| { - afs::filesystem_layout_from_openvr_driver_root_dir(&alvr_server_io::get_driver_dir().unwrap()) + afs::filesystem_layout_from_openvr_driver_root_dir( + &alvr_server_io::get_driver_dir_from_registered().unwrap(), + ) }); static SERVER_DATA_MANAGER: Lazy> = Lazy::new(|| RwLock::new(ServerDataManager::new(&FILESYSTEM_LAYOUT.session()))); @@ -226,9 +228,7 @@ fn init() { logging_backend::init_logging(events_sender.clone()); if let Some(runtime) = WEBSERVER_RUNTIME.lock().as_mut() { - runtime.spawn(alvr_common::show_err_async(web_server::web_server( - events_sender, - ))); + runtime.spawn(async { alvr_common::show_err(web_server::web_server(events_sender).await) }); } SERVER_DATA_MANAGER.write().clean_client_list(); diff --git a/alvr/server/src/openvr_props.rs b/alvr/server/src/openvr_props.rs index f1db5be3c1..11c89adc8d 100644 --- a/alvr/server/src/openvr_props.rs +++ b/alvr/server/src/openvr_props.rs @@ -3,7 +3,7 @@ // todo: add more emulation modes use crate::{FfiOpenvrProperty, FfiOpenvrPropertyValue, SERVER_DATA_MANAGER}; -use alvr_common::{prelude::*, settings_schema::Switch, HEAD_ID, LEFT_HAND_ID, RIGHT_HAND_ID}; +use alvr_common::{info, settings_schema::Switch, HEAD_ID, LEFT_HAND_ID, RIGHT_HAND_ID}; use alvr_session::{ ControllersEmulationMode, HeadsetEmulationMode, OpenvrPropValue, OpenvrPropertyKey::{self, *}, diff --git a/alvr/server/src/sockets.rs b/alvr/server/src/sockets.rs index c9cf52382c..0833bd6167 100644 --- a/alvr/server/src/sockets.rs +++ b/alvr/server/src/sockets.rs @@ -1,4 +1,4 @@ -use alvr_common::{prelude::*, StrResult, ALVR_NAME}; +use alvr_common::{anyhow::Result, con_bail, ConResult, ToCon, ALVR_NAME}; use alvr_sockets::{CONTROL_PORT, HANDSHAKE_PACKET_SIZE_BYTES, LOCAL_IP}; use std::{ io::ErrorKind, @@ -12,11 +12,9 @@ pub struct WelcomeSocket { } impl WelcomeSocket { - pub fn new(read_timeout: Duration) -> StrResult { - let socket = UdpSocket::bind((LOCAL_IP, CONTROL_PORT)).map_err(err!())?; - socket - .set_read_timeout(Some(read_timeout)) - .map_err(err!())?; + pub fn new(read_timeout: Duration) -> Result { + let socket = UdpSocket::bind((LOCAL_IP, CONTROL_PORT))?; + socket.set_read_timeout(Some(read_timeout))?; Ok(Self { socket, @@ -30,9 +28,9 @@ impl WelcomeSocket { Ok(pair) => pair, Err(e) => { if matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock) { - return timeout(); + return alvr_common::timeout(); } else { - return con_fmt_e!("{e}"); + con_bail!("{e}"); } } }; @@ -46,14 +44,14 @@ impl WelcomeSocket { let received_protocol_id = u64::from_le_bytes(protocol_id_bytes); if received_protocol_id != alvr_common::protocol_id() { - return con_fmt_e!("Found incompatible client! Upgrade or downgrade\nExpected protocol ID {}, Found {received_protocol_id}", + con_bail!("Found incompatible client! Upgrade or downgrade\nExpected protocol ID {}, Found {received_protocol_id}", alvr_common::protocol_id()); } let mut hostname_bytes = [0; 32]; hostname_bytes.copy_from_slice(&self.buffer[24..56]); let hostname = std::str::from_utf8(&hostname_bytes) - .map_err(to_con_e!())? + .to_con()? .trim_end_matches('\x00') .to_owned(); @@ -61,11 +59,11 @@ impl WelcomeSocket { } else if &self.buffer[..16] == b"\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00ALVR" || &self.buffer[..5] == b"\x01ALVR" { - con_fmt_e!("Found old client. Please upgrade") + con_bail!("Found old client. Please upgrade") } else { // Unexpected packet. // Note: no need to check for v12 and v13, not found in the wild anymore - con_fmt_e!("Found unrelated packet during discovery") + con_bail!("Found unrelated packet during discovery") } } } diff --git a/alvr/server/src/web_server.rs b/alvr/server/src/web_server.rs index e34d59038c..dca6f53858 100644 --- a/alvr/server/src/web_server.rs +++ b/alvr/server/src/web_server.rs @@ -2,7 +2,10 @@ use crate::{ connection::ClientDisconnectRequest, DECODER_CONFIG, DISCONNECT_CLIENT_NOTIFIER, FILESYSTEM_LAYOUT, SERVER_DATA_MANAGER, VIDEO_MIRROR_SENDER, VIDEO_RECORDING_FILE, }; -use alvr_common::{log, prelude::*}; +use alvr_common::{ + anyhow::{self, Result}, + error, info, log, warn, +}; use alvr_events::{Event, EventType}; use alvr_packets::{ClientListAction, ServerRequest}; use alvr_session::ConnectionState; @@ -22,28 +25,21 @@ use tokio_util::codec::{BytesCodec, FramedRead}; pub const WS_BROADCAST_CAPACITY: usize = 256; -fn reply(code: StatusCode) -> StrResult> { - Response::builder() - .status(code) - .body(Body::empty()) - .map_err(err!()) +fn reply(code: StatusCode) -> Result> { + Ok(Response::builder().status(code).body(Body::empty())?) } -async fn from_request_body(request: Request) -> StrResult { - json::from_reader( - hyper::body::aggregate(request) - .await - .map_err(err!())? - .reader(), - ) - .map_err(err!()) +async fn from_request_body(request: Request) -> Result { + Ok(json::from_reader( + hyper::body::aggregate(request).await?.reader(), + )?) } async fn websocket( request: Request, sender: broadcast::Sender, message_builder: impl Fn(T) -> protocol::Message + Send + Sync + 'static, -) -> StrResult> { +) -> Result> { if let Some(key) = request.headers().typed_get::() { tokio::spawn(async move { match hyper::upgrade::on(request).await { @@ -79,8 +75,7 @@ async fn websocket( let mut response = Response::builder() .status(StatusCode::SWITCHING_PROTOCOLS) - .body(Body::empty()) - .map_err(err!())?; + .body(Body::empty())?; let h = response.headers_mut(); h.typed_insert(headers::Upgrade::websocket()); @@ -96,7 +91,7 @@ async fn websocket( async fn http_api( request: Request, events_sender: broadcast::Sender, -) -> StrResult> { +) -> Result> { let mut response = match request.uri().path() { // New unified requests "/api/dashboard-request" => { @@ -251,9 +246,7 @@ async fn http_api( builder = builder.header(CONTENT_TYPE, "application/wasm"); } - builder - .body(Body::wrap_stream(FramedRead::new(file, BytesCodec::new()))) - .map_err(err!())? + builder.body(Body::wrap_stream(FramedRead::new(file, BytesCodec::new())))? } else { reply(StatusCode::NOT_FOUND)? } @@ -263,7 +256,7 @@ async fn http_api( response.headers_mut().insert( CACHE_CONTROL, - HeaderValue::from_str("no-cache, no-store, must-revalidate").map_err(err!())?, + HeaderValue::from_str("no-cache, no-store, must-revalidate")?, ); response .headers_mut() @@ -272,7 +265,7 @@ async fn http_api( Ok(response) } -pub async fn web_server(events_sender: broadcast::Sender) -> StrResult { +pub async fn web_server(events_sender: broadcast::Sender) -> Result<()> { let web_server_port = SERVER_DATA_MANAGER .read() .settings() @@ -282,7 +275,7 @@ pub async fn web_server(events_sender: broadcast::Sender) -> StrResult { let service = service::make_service_fn(|_| { let events_sender = events_sender.clone(); async move { - StrResult::Ok(service::service_fn(move |request| { + Ok::<_, anyhow::Error>(service::service_fn(move |request| { let events_sender = events_sender.clone(); async move { let res = http_api(request, events_sender).await; @@ -296,11 +289,10 @@ pub async fn web_server(events_sender: broadcast::Sender) -> StrResult { } }); - hyper::Server::bind(&SocketAddr::new( + Ok(hyper::Server::bind(&SocketAddr::new( "0.0.0.0".parse().unwrap(), web_server_port, )) .serve(service) - .await - .map_err(err!()) + .await?) } diff --git a/alvr/server_io/src/lib.rs b/alvr/server_io/src/lib.rs index 60dd24c4b4..d52ab3ea82 100644 --- a/alvr/server_io/src/lib.rs +++ b/alvr/server_io/src/lib.rs @@ -6,7 +6,10 @@ pub use firewall::*; pub use openvr_drivers::*; pub use openvrpaths::*; -use alvr_common::prelude::*; +use alvr_common::{ + anyhow::{bail, Result}, + error, info, +}; use alvr_events::EventType; use alvr_packets::{AudioDevicesList, ClientListAction, GpuVendor, PathSegment, PathValuePair}; use alvr_session::{ClientConnectionConfig, ConnectionState, SessionConfig, Settings}; @@ -20,8 +23,10 @@ use std::{ }; use wgpu::AdapterInfo; -fn save_session(session: &SessionConfig, path: &Path) -> StrResult { - fs::write(path, json::to_string_pretty(session).map_err(err!())?).map_err(err!()) +fn save_session(session: &SessionConfig, path: &Path) -> Result<()> { + fs::write(path, json::to_string_pretty(session)?)?; + + Ok(()) } // SessionConfig wrapper that saves session.json on destruction. @@ -155,7 +160,7 @@ impl ServerDataManager { } // Note: "value" can be any session subtree, in json format. - pub fn set_values(&mut self, descs: Vec) -> StrResult { + pub fn set_values(&mut self, descs: Vec) -> Result<()> { let mut session_json = serde_json::to_value(self.session.clone()).unwrap(); for desc in descs { @@ -166,22 +171,14 @@ impl ServerDataManager { if let Some(name) = session_ref.get_mut(name) { name } else { - return fmt_e!( - "From path {:?}: segment \"{}\" not found", - desc.path, - name - ); + bail!("From path {:?}: segment \"{name}\" not found", desc.path); } } PathSegment::Index(index) => { if let Some(index) = session_ref.get_mut(index) { index } else { - return fmt_e!( - "From path {:?}: segment [{}] not found", - desc.path, - index - ); + bail!("From path {:?}: segment [{index}] not found", desc.path); } } }; @@ -190,7 +187,7 @@ impl ServerDataManager { } // session_json has been updated - self.session = serde_json::from_value(session_json).map_err(err!())?; + self.session = serde_json::from_value(session_json)?; self.settings = self.session.to_settings(); save_session(&self.session, &self.session_path).unwrap(); @@ -327,24 +324,21 @@ impl ServerDataManager { } #[cfg_attr(not(target_os = "linux"), allow(unused_variables))] - pub fn get_audio_devices_list(&self) -> StrResult { + pub fn get_audio_devices_list(&self) -> Result { #[cfg(target_os = "linux")] let host = match self.session.to_settings().audio.linux_backend { - alvr_session::LinuxAudioBackend::Alsa => cpal::host_from_id(cpal::HostId::Alsa), - alvr_session::LinuxAudioBackend::Jack => cpal::host_from_id(cpal::HostId::Jack), - } - .map_err(err!())?; + alvr_session::LinuxAudioBackend::Alsa => cpal::host_from_id(cpal::HostId::Alsa)?, + alvr_session::LinuxAudioBackend::Jack => cpal::host_from_id(cpal::HostId::Jack)?, + }; #[cfg(not(target_os = "linux"))] let host = cpal::default_host(); let output = host - .output_devices() - .map_err(err!())? + .output_devices()? .filter_map(|d| d.name().ok()) .collect::>(); let input = host - .input_devices() - .map_err(err!())? + .input_devices()? .filter_map(|d| d.name().ok()) .collect::>(); diff --git a/alvr/server_io/src/openvr_drivers.rs b/alvr/server_io/src/openvr_drivers.rs index 9bda0c005a..e54afb971b 100644 --- a/alvr/server_io/src/openvr_drivers.rs +++ b/alvr/server_io/src/openvr_drivers.rs @@ -1,5 +1,8 @@ use crate::openvrpaths; -use alvr_common::prelude::*; +use alvr_common::{ + anyhow::{bail, Result}, + ToAny, +}; use serde_json as json; use std::{ collections::{HashMap, HashSet}, @@ -7,19 +10,17 @@ use std::{ path::PathBuf, }; -pub fn get_registered_drivers() -> StrResult> { +pub fn get_registered_drivers() -> Result> { Ok(openvrpaths::from_openvr_paths( openvrpaths::load_openvr_paths_json()? .get_mut("external_drivers") - .ok_or_else(enone!())?, + .to_any()?, )) } -pub fn driver_registration(driver_paths: &[PathBuf], register: bool) -> StrResult { +pub fn driver_registration(driver_paths: &[PathBuf], register: bool) -> Result<()> { let mut openvr_paths_json = openvrpaths::load_openvr_paths_json()?; - let paths_json_ref = openvr_paths_json - .get_mut("external_drivers") - .ok_or_else(enone!())?; + let paths_json_ref = openvr_paths_json.get_mut("external_drivers").to_any()?; let mut paths: HashSet<_> = openvrpaths::from_openvr_paths(paths_json_ref) .into_iter() @@ -40,25 +41,22 @@ pub fn driver_registration(driver_paths: &[PathBuf], register: bool) -> StrResul openvrpaths::save_openvr_paths_json(&openvr_paths_json) } -fn get_driver_dir_from_registered() -> StrResult { +pub fn get_driver_dir_from_registered() -> Result { for dir in get_registered_drivers()? { - let maybe_driver_name = || -> StrResult<_> { - let manifest_string = - fs::read_to_string(dir.join("driver.vrdrivermanifest")).map_err(err!())?; + let maybe_driver_name = || -> Result<_> { + let manifest_string = fs::read_to_string(dir.join("driver.vrdrivermanifest"))?; let mut manifest_map = - json::from_str::>(&manifest_string).map_err(err!())?; + json::from_str::>(&manifest_string)?; - manifest_map.remove("name").ok_or_else(enone!()) + manifest_map.remove("name").to_any() }(); - if maybe_driver_name == Ok(json::Value::String("alvr_server".to_owned())) { - return Ok(dir); + if let Ok(json::Value::String(str)) = maybe_driver_name { + if str == "alvr_server" { + return Ok(dir); + } } } - fmt_e!("ALVR driver path not registered") -} -pub fn get_driver_dir() -> StrResult { - get_driver_dir_from_registered() - .map_err(|e| format!("ALVR driver path not stored and not registered ({e})")) + bail!("ALVR driver path not registered") } diff --git a/alvr/server_io/src/openvrpaths.rs b/alvr/server_io/src/openvrpaths.rs index 72dd11fed5..84baa8e685 100644 --- a/alvr/server_io/src/openvrpaths.rs +++ b/alvr/server_io/src/openvrpaths.rs @@ -1,4 +1,7 @@ -use alvr_common::prelude::*; +use alvr_common::{ + anyhow::{bail, Result}, + ToAny, +}; use encoding_rs_io::DecodeReaderBytes; use serde_json as json; use std::{ @@ -7,37 +10,39 @@ use std::{ path::PathBuf, }; -fn openvr_source_file_path() -> StrResult { +fn openvr_source_file_path() -> Result { let path = if cfg!(windows) { dirs::cache_dir() } else { dirs::config_dir() } - .ok_or_else(enone!())? + .to_any()? .join("openvr/openvrpaths.vrpath"); if path.exists() { Ok(path) } else { - fmt_e!("{} does not exist", path.to_string_lossy()) + bail!("{} does not exist", path.to_string_lossy()) } } -pub(crate) fn load_openvr_paths_json() -> StrResult { - let file = File::open(openvr_source_file_path()?).map_err(err!())?; +pub(crate) fn load_openvr_paths_json() -> Result { + let file = File::open(openvr_source_file_path()?)?; let mut file_content_decoded = String::new(); - DecodeReaderBytes::new(&file) - .read_to_string(&mut file_content_decoded) - .map_err(err!())?; + DecodeReaderBytes::new(&file).read_to_string(&mut file_content_decoded)?; + + let value = json::from_str(&file_content_decoded)?; - json::from_str(&file_content_decoded).map_err(err!()) + Ok(value) } -pub(crate) fn save_openvr_paths_json(openvr_paths: &json::Value) -> StrResult { - let file_content = json::to_string_pretty(openvr_paths).map_err(err!())?; +pub(crate) fn save_openvr_paths_json(openvr_paths: &json::Value) -> Result<()> { + let file_content = json::to_string_pretty(openvr_paths)?; + + fs::write(openvr_source_file_path()?, file_content)?; - fs::write(openvr_source_file_path()?, file_content).map_err(err!()) + Ok(()) } pub(crate) fn from_openvr_paths(paths: &json::Value) -> Vec { @@ -63,15 +68,12 @@ pub(crate) fn to_openvr_paths(paths: &[PathBuf]) -> json::Value { json::Value::Array(paths_vec) } -fn get_single_openvr_path(path_type: &str) -> StrResult { +fn get_single_openvr_path(path_type: &str) -> Result { let openvr_paths_json = load_openvr_paths_json()?; - let paths_json = openvr_paths_json.get(path_type).ok_or_else(enone!())?; - from_openvr_paths(paths_json) - .get(0) - .cloned() - .ok_or_else(enone!()) + let paths_json = openvr_paths_json.get(path_type).to_any()?; + from_openvr_paths(paths_json).get(0).cloned().to_any() } -pub fn steamvr_root_dir() -> StrResult { +pub fn steamvr_root_dir() -> Result { get_single_openvr_path("runtime") } diff --git a/alvr/session/src/lib.rs b/alvr/session/src/lib.rs index 0f53b0af5a..fa7a9af86b 100644 --- a/alvr/session/src/lib.rs +++ b/alvr/session/src/lib.rs @@ -3,7 +3,11 @@ mod settings; pub use settings::*; pub use settings_schema; -use alvr_common::{prelude::*, semver::Version, ALVR_VERSION}; +use alvr_common::{ + anyhow::{bail, Result}, + semver::Version, + ToAny, ALVR_VERSION, +}; use serde::{Deserialize, Serialize}; use serde_json as json; use settings_schema::{NumberType, SchemaNode}; @@ -159,7 +163,7 @@ impl SessionConfig { // deserialization will fail if the type of values does not match. Because of this, // `session_settings` must be handled separately to do a better job of retrieving data using the // settings schema. - pub fn merge_from_json(&mut self, json_value: &json::Value) -> StrResult { + pub fn merge_from_json(&mut self, json_value: &json::Value) -> Result<()> { const SESSION_SETTINGS_STR: &str = "session_settings"; if let Ok(session_desc) = json::from_value(json_value.clone()) { @@ -167,15 +171,16 @@ impl SessionConfig { return Ok(()); } - let old_session_json = json::to_value(&self).map_err(err!())?; - let old_session_fields = old_session_json.as_object().ok_or_else(enone!())?; + // Note: unwrap is safe because current session is expected to serialize correctly + let old_session_json = json::to_value(&self).unwrap(); + let old_session_fields = old_session_json.as_object().unwrap(); let maybe_session_settings_json = json_value .get(SESSION_SETTINGS_STR) .map(|new_session_settings_json| { extrapolate_session_settings_from_session_settings( - &old_session_json[SESSION_SETTINGS_STR], + &old_session_fields[SESSION_SETTINGS_STR], new_session_settings_json, &Settings::schema(settings::session_settings_default()), ) @@ -196,7 +201,9 @@ impl SessionConfig { let mut session_desc_mut = json::from_value::(json::Value::Object(new_fields)).unwrap_or_default(); - match json::from_value::(maybe_session_settings_json.ok_or_else(enone!())?) + match maybe_session_settings_json + .to_any() + .and_then(|s| serde_json::from_value::(s).map_err(|e| e.into())) { Ok(session_settings) => { session_desc_mut.session_settings = session_settings; @@ -206,7 +213,7 @@ impl SessionConfig { Err(e) => { *self = session_desc_mut; - fmt_e!("Error while deserializing extrapolated session settings: {e}") + bail!("Error while deserializing extrapolated session settings: {e}") } } } diff --git a/alvr/sockets/src/control_socket.rs b/alvr/sockets/src/control_socket.rs index 1aeaed8c09..aaecca52a5 100644 --- a/alvr/sockets/src/control_socket.rs +++ b/alvr/sockets/src/control_socket.rs @@ -1,5 +1,5 @@ use super::{Ldc, CONTROL_PORT, LOCAL_IP}; -use alvr_common::prelude::*; +use alvr_common::{anyhow::Result, ConResult, ToCon}; use bytes::Bytes; use futures::{ stream::{SplitSink, SplitStream}, @@ -20,11 +20,11 @@ pub struct ControlSocketSender { } impl ControlSocketSender { - pub fn send(&mut self, runtime: &Runtime, packet: &S) -> StrResult { - let packet_bytes = bincode::serialize(packet).map_err(err!())?; - runtime - .block_on(self.inner.send(packet_bytes.into())) - .map_err(err!()) + pub fn send(&mut self, runtime: &Runtime, packet: &S) -> Result<()> { + let packet_bytes = bincode::serialize(packet)?; + runtime.block_on(self.inner.send(packet_bytes.into()))?; + + Ok(()) } } @@ -37,20 +37,18 @@ impl ControlSocketReceiver { pub fn recv(&mut self, runtime: &Runtime, timeout: Duration) -> ConResult { let packet_bytes = runtime.block_on(async { tokio::select! { - res = self.inner.next() => { - res.map(|p| p.map_err(to_con_e!())).ok_or_else(enone!()).map_err(to_con_e!()) - } + res = self.inner.next() => res.map(|p| p.to_con()).to_con(), _ = time::sleep(timeout) => alvr_common::timeout(), } })??; - bincode::deserialize(&packet_bytes).map_err(to_con_e!()) + bincode::deserialize(&packet_bytes).to_con() } } -pub fn get_server_listener(runtime: &Runtime) -> StrResult { - runtime - .block_on(TcpListener::bind((LOCAL_IP, CONTROL_PORT))) - .map_err(err!()) +pub fn get_server_listener(runtime: &Runtime) -> Result { + let listener = runtime.block_on(TcpListener::bind((LOCAL_IP, CONTROL_PORT)))?; + + Ok(listener) } // Proto-control-socket that can send and receive any packet. After the split, only the packets of @@ -78,7 +76,7 @@ impl ProtoControlSocket { .collect::>(); runtime.block_on(async { tokio::select! { - res = TcpStream::connect(client_addresses.as_slice()) => res.map_err(to_con_e!()), + res = TcpStream::connect(client_addresses.as_slice()) => res.to_con(), _ = time::sleep(timeout) => alvr_common::timeout(), } })? @@ -86,7 +84,7 @@ impl ProtoControlSocket { PeerType::Server(listener) => { let (socket, _) = runtime.block_on(async { tokio::select! { - res = listener.accept() => res.map_err(to_con_e!()), + res = listener.accept() => res.to_con(), _ = time::sleep(timeout) => alvr_common::timeout(), } })?; @@ -94,18 +92,17 @@ impl ProtoControlSocket { } }; - socket.set_nodelay(true).map_err(to_con_e!())?; - let peer_ip = socket.peer_addr().map_err(to_con_e!())?.ip(); + socket.set_nodelay(true).to_con()?; + let peer_ip = socket.peer_addr().to_con()?.ip(); let socket = Framed::new(socket, Ldc::new()); Ok((Self { inner: socket }, peer_ip)) } - pub fn send(&mut self, runtime: &Runtime, packet: &S) -> StrResult { - let packet_bytes = bincode::serialize(packet).map_err(err!())?; - runtime - .block_on(self.inner.send(packet_bytes.into())) - .map_err(err!()) + pub fn send(&mut self, runtime: &Runtime, packet: &S) -> Result<()> { + runtime.block_on(self.inner.send(bincode::serialize(packet)?.into()))?; + + Ok(()) } pub fn recv( @@ -116,14 +113,13 @@ impl ProtoControlSocket { let packet_bytes = runtime .block_on(async { tokio::select! { - res = self.inner.next() => res.map(|p| p.map_err(to_con_e!())), + res = self.inner.next() => res.map(|p| p.to_con()), _ = time::sleep(timeout) => Some(alvr_common::timeout()), } }) - .ok_or_else(enone!()) - .map_err(to_con_e!())??; + .to_con()??; - bincode::deserialize(&packet_bytes).map_err(to_con_e!()) + bincode::deserialize(&packet_bytes).to_con() } pub fn split( diff --git a/alvr/sockets/src/stream_socket/mod.rs b/alvr/sockets/src/stream_socket/mod.rs index b24d2de840..6a868dacd1 100644 --- a/alvr/sockets/src/stream_socket/mod.rs +++ b/alvr/sockets/src/stream_socket/mod.rs @@ -7,7 +7,7 @@ mod tcp; mod udp; -use alvr_common::prelude::*; +use alvr_common::{anyhow::Result, con_bail, debug, error, info, AnyhowToCon, ConResult}; use alvr_session::{SocketBufferSize, SocketProtocol}; use bytes::{Buf, BufMut, BytesMut}; use futures::SinkExt; @@ -28,11 +28,11 @@ pub fn set_socket_buffers( socket: &socket2::Socket, send_buffer_bytes: SocketBufferSize, recv_buffer_bytes: SocketBufferSize, -) -> StrResult { +) -> Result<()> { info!( "Initial socket buffer size: send: {}B, recv: {}B", - socket.send_buffer_size().map_err(err!())?, - socket.recv_buffer_size().map_err(err!())? + socket.send_buffer_size()?, + socket.recv_buffer_size()? ); { @@ -48,7 +48,7 @@ pub fn set_socket_buffers( } else { info!( "Set socket send buffer succeeded: {}", - socket.send_buffer_size().map_err(err!())? + socket.send_buffer_size()? ); } } @@ -67,7 +67,7 @@ pub fn set_socket_buffers( } else { info!( "Set socket recv buffer succeeded: {}", - socket.recv_buffer_size().map_err(err!())? + socket.recv_buffer_size()? ); } } @@ -124,38 +124,34 @@ pub struct StreamSender { } impl StreamSender { - fn send_buffer(&self, runtime: &Runtime, buffer: BytesMut) -> StrResult { + fn send_buffer(&self, runtime: &Runtime, buffer: BytesMut) -> Result<()> { match &self.socket { - StreamSendSocket::Udp(socket) => runtime - .block_on( - socket - .inner - .lock() - .feed((buffer.freeze(), socket.peer_addr)), - ) - .map_err(err!()), - StreamSendSocket::Tcp(socket) => runtime - .block_on(socket.lock().feed(buffer.freeze())) - .map_err(err!()), + StreamSendSocket::Udp(socket) => Ok(runtime.block_on( + socket + .inner + .lock() + .feed((buffer.freeze(), socket.peer_addr)), + )?), + StreamSendSocket::Tcp(socket) => { + Ok(runtime.block_on(socket.lock().feed(buffer.freeze()))?) + } } } - pub fn send(&mut self, runtime: &Runtime, header: &T, payload_buffer: Vec) -> StrResult { + pub fn send(&mut self, runtime: &Runtime, header: &T, payload_buffer: Vec) -> Result<()> { // packet layout: // [ 2B (stream ID) | 4B (packet index) | 4B (packet shard count) | 4B (shard index)] // this escluses length delimited coding, which is handled by the TCP backend const OFFSET: usize = 2 + 4 + 4 + 4; let max_shard_data_size = self.max_packet_size - OFFSET; - let header_size = bincode::serialized_size(header).map_err(err!()).unwrap() as usize; + let header_size = bincode::serialized_size(header).unwrap() as usize; self.header_buffer.clear(); if self.header_buffer.capacity() < header_size { // If the buffer is empty, with this call we request a capacity of "header_size". self.header_buffer.reserve(header_size); } - bincode::serialize_into(&mut self.header_buffer, header) - .map_err(err!()) - .unwrap(); + bincode::serialize_into(&mut self.header_buffer, header).unwrap(); let header_shards = self.header_buffer.chunks(max_shard_data_size); let payload_shards = payload_buffer.chunks(max_shard_data_size); @@ -175,13 +171,8 @@ impl StreamSender { } match &self.socket { - StreamSendSocket::Udp(socket) => runtime - .block_on(socket.inner.lock().flush()) - .map_err(err!())?, - - StreamSendSocket::Tcp(socket) => { - runtime.block_on(socket.lock().flush()).map_err(err!())? - } + StreamSendSocket::Udp(socket) => runtime.block_on(socket.inner.lock().flush())?, + StreamSendSocket::Tcp(socket) => runtime.block_on(socket.lock().flush())?, } self.next_packet_index += 1; @@ -212,9 +203,9 @@ impl ReceiverBuffer { } impl ReceiverBuffer { - pub fn get(&self) -> StrResult<(T, &[u8])> { + pub fn get(&self) -> Result<(T, &[u8])> { let mut data: &[u8] = &self.inner; - let header = bincode::deserialize_from(&mut data).map_err(err!())?; + let header = bincode::deserialize_from(&mut data)?; Ok((header, data)) } @@ -240,7 +231,7 @@ impl StreamReceiver { let mut shard = match self.receiver.recv_timeout(timeout) { Ok(shard) => Ok(shard), Err(RecvTimeoutError::Timeout) => alvr_common::timeout(), - Err(RecvTimeoutError::Disconnected) => con_fmt_e!("Disconnected"), + Err(RecvTimeoutError::Disconnected) => con_bail!("Disconnected"), }?; let shard_packet_index = shard.get_u32(); let shards_count = shard.get_u32() as usize; @@ -299,7 +290,7 @@ impl StreamReceiver { loop { if self.recv_buffer(timeout, &mut buffer)? { - return Ok(buffer.get().map_err(to_con_e!())?.0); + return Ok(buffer.get().to_con()?.0); } } } @@ -317,7 +308,7 @@ impl StreamSocketBuilder { stream_socket_config: SocketProtocol, send_buffer_bytes: SocketBufferSize, recv_buffer_bytes: SocketBufferSize, - ) -> StrResult { + ) -> Result { Ok(match stream_socket_config { SocketProtocol::Udp => StreamSocketBuilder::Udp(udp::bind( runtime, @@ -383,8 +374,8 @@ impl StreamSocketBuilder { ) -> ConResult { let (send_socket, receive_socket) = match protocol { SocketProtocol::Udp => { - let socket = udp::bind(runtime, port, send_buffer_bytes, recv_buffer_bytes) - .map_err(to_con_e!())?; + let socket = + udp::bind(runtime, port, send_buffer_bytes, recv_buffer_bytes).to_con()?; let (send_socket, receive_socket) = udp::connect(socket, client_ip, port); ( diff --git a/alvr/sockets/src/stream_socket/tcp.rs b/alvr/sockets/src/stream_socket/tcp.rs index 08a7e8229e..424df2d83f 100644 --- a/alvr/sockets/src/stream_socket/tcp.rs +++ b/alvr/sockets/src/stream_socket/tcp.rs @@ -1,5 +1,5 @@ use crate::{Ldc, LOCAL_IP}; -use alvr_common::{parking_lot::Mutex, prelude::*}; +use alvr_common::{anyhow::Result, con_bail, parking_lot::Mutex, ConResult, ToCon}; use alvr_session::SocketBufferSize; use bytes::{Buf, Bytes, BytesMut}; use futures::{ @@ -27,16 +27,16 @@ pub fn bind( port: u16, send_buffer_bytes: SocketBufferSize, recv_buffer_bytes: SocketBufferSize, -) -> StrResult { - let socket = runtime - .block_on(TcpListener::bind((LOCAL_IP, port))) - .map_err(err!())?; - let socket = socket2::Socket::from(socket.into_std().map_err(err!())?); +) -> Result { + let socket = runtime.block_on(TcpListener::bind((LOCAL_IP, port)))?; + let socket = socket2::Socket::from(socket.into_std()?); super::set_socket_buffers(&socket, send_buffer_bytes, recv_buffer_bytes).ok(); let _tokio_guard = runtime.enter(); - TcpListener::from_std(socket.into()).map_err(err!()) + let socket = TcpListener::from_std(socket.into())?; + + Ok(socket) } pub fn accept_from_server( @@ -47,16 +47,16 @@ pub fn accept_from_server( ) -> ConResult<(TcpStreamSendSocket, TcpStreamReceiveSocket)> { let (socket, server_address) = runtime.block_on(async { tokio::select! { - res = listener.accept() => res.map_err(to_con_e!()), + res = listener.accept() => res.to_con(), _ = time::sleep(timeout) => alvr_common::timeout(), } })?; if server_address.ip() != server_ip { - return con_fmt_e!("Connected to wrong client: {server_address} != {server_ip}"); + con_bail!("Connected to wrong client: {server_address} != {server_ip}"); } - socket.set_nodelay(true).map_err(to_con_e!())?; + socket.set_nodelay(true).to_con()?; let socket = Framed::new(socket, Ldc::new()); let (send_socket, receive_socket) = socket.split(); @@ -73,20 +73,20 @@ pub fn connect_to_client( ) -> ConResult<(TcpStreamSendSocket, TcpStreamReceiveSocket)> { let socket = runtime.block_on(async { tokio::select! { - res = TcpStream::connect((client_ip, port)) => res.map_err(to_con_e!()), + res = TcpStream::connect((client_ip, port)) => res.to_con(), _ = time::sleep(timeout) => alvr_common::timeout(), } })?; - let socket = socket2::Socket::from(socket.into_std().map_err(to_con_e!())?); + let socket = socket2::Socket::from(socket.into_std().to_con()?); super::set_socket_buffers(&socket, send_buffer_bytes, recv_buffer_bytes).ok(); let socket = { let _tokio_guard = runtime.enter(); - TcpStream::from_std(socket.into()).map_err(to_con_e!())? + TcpStream::from_std(socket.into()).to_con()? }; - socket.set_nodelay(true).map_err(to_con_e!())?; + socket.set_nodelay(true).to_con()?; let socket = Framed::new(socket, Ldc::new()); let (send_socket, receive_socket) = socket.split(); @@ -101,7 +101,7 @@ pub fn recv( ) -> ConResult { if let Some(maybe_packet) = runtime.block_on(async { tokio::select! { - res = socket.next() => res.map(|p| p.map_err(to_con_e!())), + res = socket.next() => res.map(|p| p.to_con()), _ = time::sleep(timeout) => Some(alvr_common::timeout()), } }) { @@ -109,11 +109,11 @@ pub fn recv( let stream_id = packet.get_u16(); if let Some(enqueuer) = packet_enqueuers.get_mut(&stream_id) { - enqueuer.send(packet).map_err(to_con_e!())?; + enqueuer.send(packet).to_con()?; } Ok(()) } else { - con_fmt_e!("Socket closed") + con_bail!("Socket closed") } } diff --git a/alvr/sockets/src/stream_socket/udp.rs b/alvr/sockets/src/stream_socket/udp.rs index 437c1d31b7..221358179f 100644 --- a/alvr/sockets/src/stream_socket/udp.rs +++ b/alvr/sockets/src/stream_socket/udp.rs @@ -1,5 +1,5 @@ use crate::{Ldc, LOCAL_IP}; -use alvr_common::{parking_lot::Mutex, prelude::*}; +use alvr_common::{anyhow::Result, con_bail, parking_lot::Mutex, ConResult, ToCon}; use alvr_session::SocketBufferSize; use bytes::{Buf, Bytes, BytesMut}; use futures::{ @@ -36,16 +36,16 @@ pub fn bind( port: u16, send_buffer_bytes: SocketBufferSize, recv_buffer_bytes: SocketBufferSize, -) -> StrResult { - let socket = runtime - .block_on(UdpSocket::bind((LOCAL_IP, port))) - .map_err(err!())?; - let socket = socket2::Socket::from(socket.into_std().map_err(err!())?); +) -> Result { + let socket = runtime.block_on(UdpSocket::bind((LOCAL_IP, port)))?; + let socket = socket2::Socket::from(socket.into_std()?); super::set_socket_buffers(&socket, send_buffer_bytes, recv_buffer_bytes).ok(); let _tokio_guard = runtime.enter(); - UdpSocket::from_std(socket.into()).map_err(err!()) + let socket = UdpSocket::from_std(socket.into())?; + + Ok(socket) } pub fn connect( @@ -77,11 +77,11 @@ pub fn recv( ) -> ConResult { if let Some(maybe_packet) = runtime.block_on(async { tokio::select! { - res = socket.inner.next() => res.map(|p| p.map_err(to_con_e!())), + res = socket.inner.next() => res.map(|p| p.to_con()), _ = time::sleep(timeout) => Some(alvr_common::timeout()), } }) { - let (mut packet_bytes, address) = maybe_packet.map_err(to_con_e!())?; + let (mut packet_bytes, address) = maybe_packet?; if address != socket.peer_addr { // Non fatal @@ -90,11 +90,11 @@ pub fn recv( let stream_id = packet_bytes.get_u16(); if let Some(enqueuer) = packet_enqueuers.get_mut(&stream_id) { - enqueuer.send(packet_bytes).map_err(to_con_e!())?; + enqueuer.send(packet_bytes).to_con()?; } Ok(()) } else { - con_fmt_e!("Socket closed") + con_bail!("Socket closed") } }