From d5d1c4d8dda56cb9aadb04142b42e57fecb85a31 Mon Sep 17 00:00:00 2001 From: Lunarix <3759687+Skarlett@users.noreply.github.com> Date: Sun, 17 Sep 2023 23:46:01 -0500 Subject: [PATCH] Fix build (#118) * remove: `dbg!` * use better error messages * warn deemix: only complain of hifi if using hifi * deemix: cleanup DeemixErrors --------- Co-authored-by: Skarlett --- crates/mockingbird/src/check.rs | 5 +- crates/mockingbird/src/deemix.rs | 172 +++++++++++++++++++++---------- crates/mockingbird/src/player.rs | 121 +++++++++++++++++++--- 3 files changed, 225 insertions(+), 73 deletions(-) diff --git a/crates/mockingbird/src/check.rs b/crates/mockingbird/src/check.rs index 14a4cf0..9e9e238 100644 --- a/crates/mockingbird/src/check.rs +++ b/crates/mockingbird/src/check.rs @@ -249,15 +249,14 @@ async fn arl_check( let mut iargs = args.iter::(); - while let Some(Ok(arl)) = dbg!(iargs.next()) - { + while let Some(Ok(arl)) = iargs.next() { if let Err(()) = santitize_arl(&arl) { msg.channel_id.say(&ctx.http, "Invalid ARL").await?; continue; } let arl = arl.trim(); - if dbg!(arl.len()) != 192 { + if arl.len() != 192 { msg.channel_id .say(&ctx.http, "Must provide an ARL") .await?; diff --git a/crates/mockingbird/src/deemix.rs b/crates/mockingbird/src/deemix.rs index 1d6b8a7..6d63c57 100644 --- a/crates/mockingbird/src/deemix.rs +++ b/crates/mockingbird/src/deemix.rs @@ -1,10 +1,9 @@ use std::io::{BufReader, BufRead, Read}; -use serenity::json::JsonError; use songbird::{ constants::SAMPLE_RATE_RAW, input::{ children_to_reader, - error::{Error as SongbirdError, Result as SongbirdResult}, + error::Error as SongbirdError, Codec, Container, Metadata, @@ -20,7 +19,75 @@ use serde_json::Value; use std::os::fd::AsRawFd; use tokio::io::AsyncReadExt; -async fn max_pipe_size() -> Result> { +#[derive(Debug)] +pub enum DeemixError { + BadJson(String), + Metadata, + IO(std::io::Error), + ParseInt(core::num::ParseIntError), + Songbird(SongbirdError), + Tokio(tokio::task::JoinError), +} + +impl Into for DeemixError { + fn into(self) -> SongbirdError { + match self { + DeemixError::BadJson(_) + | DeemixError::ParseInt(_) + | DeemixError::Metadata + => SongbirdError::Metadata, + + DeemixError::IO(e) => SongbirdError::Io(e), + DeemixError::Songbird(e) => e, + DeemixError::Tokio(e) + => SongbirdError::Io( + std::io::Error::new(std::io::ErrorKind::Other, e) + ), + } + } +} + +impl std::fmt::Display for DeemixError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + DeemixError::BadJson(s) => write!(f, "Bad JSON: {}", s), + DeemixError::Metadata => write!(f, "Metadata error"), + DeemixError::IO(e) => write!(f, "Process error: {}", e), + DeemixError::ParseInt(e) => write!(f, "Parse int error: {}", e), + DeemixError::Songbird(e) => write!(f, "Songbird error: {}", e), + DeemixError::Tokio(e) => write!(f, "Tokio error: {}", e), + } + } +} + +impl From for DeemixError { + fn from(e: SongbirdError) -> Self { + DeemixError::Songbird(e) + } +} + +impl From for DeemixError { + fn from(e: std::io::Error) -> Self { + DeemixError::IO(e) + } +} + +impl From for DeemixError { + fn from(e: tokio::task::JoinError) -> Self { + DeemixError::Tokio(e) + } +} + +impl From for DeemixError { + fn from(e: core::num::ParseIntError) -> Self { + DeemixError::ParseInt(e) + } +} + +impl std::error::Error for DeemixError {} + + +async fn max_pipe_size() -> Result { let mut file = tokio::fs::OpenOptions::new() .read(true) .open("/proc/sys/fs/pipe-max-size") @@ -33,7 +100,6 @@ async fn max_pipe_size() -> Result> { Ok(data.parse::()?) } - #[link(name = "fion")] extern { fn availbytes(fd: std::ffi::c_int) -> std::ffi::c_int; @@ -52,9 +118,13 @@ where async fn call_restart(&mut self, time: Option) -> Result { if let Some(time) = time { let ts = format!("{:.3}", time.as_secs_f64()); - _deemix(self.uri.as_ref(), &["-ss", &ts]).await + _deemix(self.uri.as_ref(), &["-ss", &ts]) + .await + .map_err(DeemixError::into) } else { - deemix(self.uri.as_ref()).await + deemix(self.uri.as_ref()) + .await + .map_err(DeemixError::into) } } @@ -76,45 +146,49 @@ pub async fn deemix_metadata(uri: &str) -> std::io::Result { Ok(metadata_from_deemix_output(&serde_json::from_slice(&output.stdout[..])?)) } -#[cfg(feature = "debug")] -fn handle_bad_json( - mut writebuf: Vec, - error: JsonError, - mut reader: BufReader<&mut std::process::ChildStderr> -) -> SongbirdError -{ - let fault_data = writebuf.clone(); - let fault = String::from_utf8_lossy(fault_data.as_slice()); - - tracing::error!("TRIED PARSING: \n {}", fault); - tracing::error!("... [start] flushing buffer to logs..."); - writebuf.clear(); - - // Potentially hangs thread if EOF is never encountered - reader.read_to_end(&mut writebuf).unwrap(); - tracing::error!("{}", String::from_utf8_lossy(&writebuf)); - tracing::error!("... [ end ] flushed buffer to logs..."); - SongbirdError::Json { error, parsed_text: fault.to_string() } -} +fn process_stderr(s: &mut std::process::ChildStderr) -> Result { + let mut o_vec = vec![]; + let mut reader = BufReader::new(s.by_ref()); -#[cfg(not(feature = "debug"))] -fn handle_bad_json(writebuf: Vec, error: JsonError, _reader: BufReader<&mut std::process::ChildStderr> ) -> SongbirdError -{ - let fault = String::from_utf8_lossy(&writebuf); - tracing::error!("TRIED PARSING: \n {}", String::from_utf8_lossy(&writebuf)); - SongbirdError::Json { error, parsed_text: fault.to_string() } + // read until new line + reader.read_until(0xA, &mut o_vec) + .map_err(|_| DeemixError::Metadata)?; + + match serde_json::from_slice::(&o_vec.as_slice()) { + Ok(json) => Ok(json), + Err(_) => { + let mut buf: [u8; 2048] = [0; 2048]; + // If process crashes + // BufReader::read_to_end will hang + // until EOF is encountered (Never) + // reader.read_to_end(&mut o_vec).unwrap(); + // -- so instead, use fixed size buffer + while let Ok(n) = reader.read(&mut buf) { + if n > 0 { + o_vec.extend_from_slice(&buf[..n]); + continue; + } + else { break; } + } + + let text = String::from_utf8_lossy(&o_vec); + return Err(DeemixError::BadJson(text.to_string())); + } + } } + pub async fn deemix( uri: &str, -) -> SongbirdResult{ - _deemix(uri, &[]).await +) -> Result { + _deemix(uri, &[]) + .await } pub async fn _deemix( uri: &str, pre_args: &[&str], -) -> SongbirdResult +) -> Result { let pipesize = max_pipe_size().await.unwrap(); let ffmpeg_args = [ @@ -141,30 +215,22 @@ pub async fn _deemix( unsafe { bigpipe(deemix_out, pipesize); } let stderr = deemix.stderr.take(); - - let (returned_stderr, value) = tokio::task::spawn_blocking(move || { - let mut s = stderr.unwrap(); - let out: SongbirdResult = { - let mut o_vec = vec![]; - let mut serde_read = BufReader::new(s.by_ref()); - // Newline... - if let Ok(len) = serde_read.read_until(0xA, &mut o_vec) { - serde_json::from_slice(&o_vec[..len]).map_err(|e| handle_bad_json(o_vec, e, serde_read)) - } else { - SongbirdResult::Err(SongbirdError::Metadata) - } - }; - + // Read first line of stderr + // for metadata, but read entire buffer if error. + let threadout = tokio::task::spawn_blocking(move || { + let mut s = stderr.unwrap(); + let out = process_stderr(&mut s); (s, out) }) - .await - .map_err(|_| SongbirdError::Metadata)?; + .await?; + + let (returned_stderr, value) = threadout; deemix.stderr = Some(returned_stderr); let metadata_raw = value?; - if let Some(x) = metadata_raw.get("error") { - return Err(SongbirdError::YouTubeDlProcessing(x.clone())); + if let Some(_) = metadata_raw.get("error") { + return Err(DeemixError::Metadata); } let _filesize = metadata_raw["filesize"].as_u64(); diff --git a/crates/mockingbird/src/player.rs b/crates/mockingbird/src/player.rs index cb55bc2..fcea6c2 100644 --- a/crates/mockingbird/src/player.rs +++ b/crates/mockingbird/src/player.rs @@ -47,7 +47,7 @@ async fn next_track(call: &mut Call, uri: &str) -> Result for HandlerError { } } +#[cfg(feature = "deemix")] +impl From for HandlerError { + fn from(err: crate::deemix::DeemixError) -> Self { + HandlerError::DeemixError(err) + } +} + + impl std::fmt::Display for HandlerError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Songbird(err) => write!(f, "Songbird error: {}", err), Self::NotImplemented => write!(f, "This feature is not implemented."), - Self::IOError(err) => write!(f, "IO error: (most likely deemix-metadata failed) {}", err), - Self::Serenity(err) => write!(f, "Serenity error: {}", err), - Self::NoCall => write!(f, "Not in a voice channel to play in") + + Self::IOError(err) + => write!(f, "IO error: (most likely deemix-metadata failed) {}", err), + + Self::Serenity(err) + => write!(f, "Serenity error: {}", err), + + Self::NoCall + => write!(f, "Not in a voice channel to play in"), + + #[cfg(feature = "deemix")] + Self::DeemixError(crate::deemix::DeemixError::BadJson(err)) + => write!(f, "Deemix error: {}", err), + + _ => write!(f, "Unknown error") } } } @@ -111,6 +135,7 @@ fn process_fan_output(buf: &mut VecDeque, json_buf: Vec) -> Result) -> Result<(), HandlerError> { +async fn fan_deezer(uri: &str, buf: &mut VecDeque) -> Result<(), HandlerError> { return Err(HandlerError::NotImplemented) } #[cfg(not(feature="ytdl"))] -async fn fan_ytdl(uri: &str, buf: &mut Vec) -> Result<(), HandlerError> { +async fn fan_ytdl(uri: &str, buf: &mut VecDeque) -> Result { return Err(HandlerError::NotImplemented) } @@ -208,6 +233,7 @@ impl Players { let (track, track_handle) = create_player(input); handler.enqueue(track); + Ok(track_handle) } @@ -264,7 +290,7 @@ async fn play_routine(qctx: Arc) -> Result<(), HandlerError> { let mut call = handler.lock().await; - while let Some(uri) = dbg!(qctx.cold_queue.write().await.pop_front()) { + while let Some(uri) = qctx.cold_queue.write().await.pop_front() { match next_track(&mut call, &uri).await { Ok(track) => { track.add_event( @@ -274,15 +300,45 @@ async fn play_routine(qctx: Arc) -> Result<(), HandlerError> { break }, Err(e) => { + tracing::error!("Failed to play next track: {}", e); + + let response = match e { + HandlerError::NotImplemented + => "Not implemented/enabled".to_string(), + + HandlerError::NoCall + => "No call found".to_string(), + + HandlerError::IOError(e) + => format!("IO Error: {}", e.kind()), + + #[cfg(feature = "deemix")] + HandlerError::DeemixError(crate::deemix::DeemixError::BadJson(text)) + => { + qctx.invited_from.send_files( + &qctx.http, + vec![ + (text.as_bytes(), "error.txt") + ], + |m| m + ).await?; + "Json Error".to_string() + } + + e => format!("Discord error: {}", e) + }; + if tries == 0 { - tracing::error!("Failed to play next track: {}", e); + let _ = qctx.invited_from + .say(&qctx.http, format!("Halting. Last try: {}", &uri)) + .await; break } let _ = qctx.invited_from - .say(&qctx.http, format!("Couldn't play track {}", &uri)) + .say(&qctx.http, format!("Couldn't play track {}\n{}", &uri, &response)) .await; - tracing::error!("Failed to play next track: {}", e); + tries -= 1; } } @@ -358,11 +414,44 @@ async fn join_routine(ctx: &Context, msg: &Message) -> Result, return Err(JoinError::NoCall); }, }; - - match connect_to.bitrate() { - Some(x) if x > 120_000 => {} - None => { msg.reply(&ctx, "**Couldn't detect bitrate.** For the best experience, check that the voice room is using 128kbps.").await; } - Some(x) => { msg.reply(&ctx, format!("**Low quality voice room** detected. For the best experience, use 128kbps. [Currently: {}kbps]", (x / 1000)).await; } + + let chan: Channel = connect_to.to_channel(&ctx.http).await.unwrap(); + + let gchan = match chan { + Channel::Guild(ref gchan) => gchan, + _ => { + msg.reply( + &ctx.http, + "Not supported voice channel" + ).await + .unwrap(); + + return Err(JoinError::NoCall); + } + }; + + match gchan.bitrate { + Some(x) if x > 90_000 => {} + None => { + let _ = msg.reply( + &ctx.http, + r#"**Couldn't detect bitrate.** For the best experience, + check that the voice room is using 128kbps."# + ).await; + } + Some(x) => { + #[cfg(feature = "deemix")] + let _ = msg.reply( + &ctx, + format!( + r#"**Low quality voice room** detected. + + For the best experience, use 128kbps, & spotify links + [Currently: {}kbps]"#, + (x / 1000) + ) + ).await; + } } let manager = songbird::get(ctx) @@ -379,8 +468,6 @@ async fn join_routine(ctx: &Context, msg: &Message) -> Result, let call_lock = manager.get(guild_id).unwrap(); let mut call = call_lock.lock().await; - let chan: Channel = connect_to.to_channel(&ctx.http).await.unwrap(); - let queuectx = if let Channel::Guild(voice_chan_id) = chan { QueueContext {