diff --git a/Cargo.lock b/Cargo.lock index 5783f5d..e95d586 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aho-corasick" version = "1.1.3" @@ -47,7 +62,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" dependencies = [ - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -57,7 +72,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -67,10 +82,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3" [[package]] -name = "autocfg" -version = "1.3.0" +name = "backtrace" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +checksum = "17c6a35df3749d2e8bb1b7b21a976d82b15548788d2735b9d82f329268f71a11" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] [[package]] name = "bindgen" @@ -105,6 +129,18 @@ version = "0.9.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3280efcf6d66bc77c2cf9b67dc8acee47a217d9be67dd590b3230dffe663724d" +[[package]] +name = "bytes" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" + +[[package]] +name = "cc" +version = "1.0.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c51067fd44124faa7f870b4b1c969379ad32b2ba805aa959430ceaa384f695" + [[package]] name = "cexpr" version = "0.6.0" @@ -177,7 +213,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -186,12 +222,36 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "gimli" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" + [[package]] name = "glob" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "humantime" version = "2.1.0" @@ -257,9 +317,10 @@ dependencies = [ "bpaf", "env_logger", "log", - "nix", "serde", "serde_json", + "tokio", + "tokio-stream", "utils", ] @@ -307,19 +368,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] -name = "memoffset" -version = "0.9.1" +name = "minimal-lexical" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] +name = "miniz_oxide" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87dfd01fe195c66b572b37921ad8803d010623c0aca821bea2302239d155cdae" dependencies = [ - "autocfg", + "adler", ] [[package]] -name = "minimal-lexical" -version = "0.2.1" +name = "mio" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.48.0", +] [[package]] name = "nix" @@ -331,7 +403,6 @@ dependencies = [ "cfg-if", "cfg_aliases", "libc", - "memoffset", ] [[package]] @@ -344,6 +415,31 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8ec7ab813848ba4522158d5517a6093db1ded27575b070f4177b8d12b41db5e" +dependencies = [ + "memchr", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + [[package]] name = "proc-macro2" version = "1.0.82" @@ -391,6 +487,12 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -407,7 +509,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -453,6 +555,25 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "syn" version = "2.0.61" @@ -473,7 +594,61 @@ dependencies = [ "cfg-if", "fastrand", "rustix", - "windows-sys", + "windows-sys 0.52.0", +] + +[[package]] +name = "tokio" +version = "1.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "num_cpus", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "tokio-macros" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-util" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", ] [[package]] @@ -496,13 +671,43 @@ dependencies = [ "serde", ] +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", ] [[package]] @@ -511,28 +716,46 @@ version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.5" @@ -545,24 +768,48 @@ version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.5" diff --git a/Cargo.toml b/Cargo.toml index fdeef66..b67df79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,4 +23,6 @@ rustix = { version = "0.38.34", default-features = false } serde = { version = "1.0.203", default-features = false } serde_json = { version = "1.0.117", default-features = false } tempfile = { version = "3.10.1", default-features = false } +tokio = { version = "1.38.0", default-features = false } +tokio-stream = { version = "0.1.15", default-features = false } utils = { path = "crates/utils", default-features = false } diff --git a/crates/krun-guest/src/net.rs b/crates/krun-guest/src/net.rs index e4cabd0..629ccd0 100644 --- a/crates/krun-guest/src/net.rs +++ b/crates/krun-guest/src/net.rs @@ -1,5 +1,5 @@ use std::fs; -use std::os::unix::process::ExitStatusExt; +use std::os::unix::process::ExitStatusExt as _; use std::process::Command; use anyhow::{anyhow, Context, Result}; diff --git a/crates/krun-guest/src/pulse.rs b/crates/krun-guest/src/pulse.rs index ed091e7..cf2b97f 100644 --- a/crates/krun-guest/src/pulse.rs +++ b/crates/krun-guest/src/pulse.rs @@ -1,8 +1,7 @@ use std::collections::HashMap; -use std::env; -use std::fs; use std::path::Path; use std::process::{Command, Stdio}; +use std::{env, fs}; use anyhow::{Context, Result}; use utils::env::find_in_path; diff --git a/crates/krun-server/Cargo.toml b/crates/krun-server/Cargo.toml index cf8b3fc..45eb74b 100644 --- a/crates/krun-server/Cargo.toml +++ b/crates/krun-server/Cargo.toml @@ -13,9 +13,10 @@ anyhow = { workspace = true, features = ["std"] } bpaf = { workspace = true, features = [] } env_logger = { workspace = true, features = ["auto-color", "humantime", "unstable-kv"] } log = { workspace = true, features = ["kv"] } -nix = { workspace = true, features = ["socket"] } serde = { workspace = true, features = [] } serde_json = { workspace = true, features = ["std"] } +tokio = { workspace = true, features = ["io-util", "macros", "net", "process", "rt-multi-thread", "sync"] } +tokio-stream = { workspace = true, features = ["net", "sync"] } utils = { workspace = true, features = [] } [features] diff --git a/crates/krun-server/src/bin/krun-server.rs b/crates/krun-server/src/bin/krun-server.rs index a61d34a..e552d0c 100644 --- a/crates/krun-server/src/bin/krun-server.rs +++ b/crates/krun-server/src/bin/krun-server.rs @@ -1,32 +1,94 @@ -use std::net::TcpListener; -use std::os::fd::AsRawFd; -use std::panic; -use std::process::Command; +use std::os::unix::process::ExitStatusExt as _; -use anyhow::{Context, Result}; +use anyhow::Result; use krun_server::cli_options::options; -use krun_server::server::start_server; -use nix::sys::socket::{shutdown, Shutdown}; +use krun_server::server::{start_server, State}; +use log::error; +use tokio::net::TcpListener; +use tokio::process::Command; +use tokio::sync::watch; +use tokio_stream::wrappers::WatchStream; +use tokio_stream::StreamExt as _; -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { env_logger::init(); let options = options().run(); - let listener = TcpListener::bind(format!("0.0.0.0:{}", options.server_port))?; - let listener_fd = listener.as_raw_fd(); + let listener = TcpListener::bind(format!("0.0.0.0:{}", options.server_port)).await?; + let (state_tx, state_rx) = watch::channel(State { + connection_idle: true, + child_processes: 0, + }); - let server_thread = start_server(listener); - - Command::new(&options.command) + let server_handle = tokio::spawn(start_server(listener, state_tx)); + tokio::pin!(server_handle); + let command_status = Command::new(&options.command) .args(options.command_args) - .status() - .with_context(|| format!("Failed to execute command {:?}", options.command))?; + .status(); + tokio::pin!(command_status); + let mut state_rx = WatchStream::new(state_rx); - shutdown(listener_fd, Shutdown::Both)?; - if let Err(err) = server_thread.join() { - panic::resume_unwind(err); - } + let mut server_died = false; + let mut command_exited = false; - Ok(()) + loop { + tokio::select! { + res = &mut server_handle, if !server_died => { + // If an error is received here, accepting connections from the + // TCP listener failed due to non-transient errors and the + // server is giving up and shutting down. + // + // Errors encountered when handling individual connections do + // not bubble up to this point. + if let Err(err) = res { + error!(err:% = err; "server task failed"); + server_died = true; + } + }, + res = &mut command_status, if !command_exited => { + match res { + Ok(status) => { + if !status.success() { + if let Some(code) = status.code() { + eprintln!( + "{:?} process exited with status code: {code}", + options.command + ); + } else { + eprintln!( + "{:?} process terminated by signal: {}", + options.command, + status + .signal() + .expect("either one of status code or signal should be set") + ); + } + } + }, + Err(err) => { + eprintln!( + "Failed to execute {:?} as child process: {err}", + options.command + ); + }, + } + command_exited = true; + }, + Some(state) = state_rx.next(), if command_exited => { + if state.connection_idle && state.child_processes == 0 { + // Server is idle (not currently handling an accepted + // incoming connection) and no more child processes. + // We're done. + return Ok(()); + } + println!( + "Waiting for {} other commands launched through this krun server to exit...", + state.child_processes + ); + println!("Press Ctrl+C to force quit"); + }, + } + } } diff --git a/crates/krun-server/src/server.rs b/crates/krun-server/src/server.rs index 7839003..4ead4e7 100644 --- a/crates/krun-server/src/server.rs +++ b/crates/krun-server/src/server.rs @@ -1,39 +1,131 @@ use std::collections::HashMap; use std::env; -use std::io::{BufRead, BufReader, Write}; -use std::net::{TcpListener, TcpStream}; -use std::process::{Command, Stdio}; -use std::thread::{self, JoinHandle}; +use std::os::unix::process::ExitStatusExt as _; +use std::path::PathBuf; +use std::process::Stdio; -use anyhow::{anyhow, Result}; -use log::debug; -use utils::{launch::Launch, stdio::make_stdout_stderr}; +use anyhow::{anyhow, Context, Result}; +use log::{debug, error}; +use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufStream}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::process::{Child, Command}; +use tokio::sync::watch; +use tokio::task::JoinSet; +use tokio_stream::wrappers::TcpListenerStream; +use tokio_stream::StreamExt as _; +use utils::launch::Launch; +use utils::stdio::make_stdout_stderr; -pub fn start_server(listener: TcpListener) -> JoinHandle<()> { - thread::spawn(move || { - if let Err(err) = work(listener) { - debug!(err:?; "server thread is terminating") - } - }) +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +pub struct State { + pub connection_idle: bool, + pub child_processes: usize, } -fn work(listener: TcpListener) -> Result<()> { - for stream in listener.incoming() { - let stream = stream?; +pub async fn start_server(listener: TcpListener, state_tx: watch::Sender) { + let mut listener_stream = TcpListenerStream::new(listener); + let mut child_set = JoinSet::new(); + + loop { + tokio::select! { + Some(stream) = listener_stream.next() => { + state_tx.send_if_modified(|state| { + let connection_idle = false; + if state.connection_idle == connection_idle { + return false; + } + state.connection_idle = connection_idle; + true + }); + let stream = match stream { + Ok(stream) => stream, + Err(err) => { + eprintln!("Failed to accept incoming connection: {err}"); + state_tx.send_if_modified(|state| { + let connection_idle = true; + if state.connection_idle == connection_idle { + return false; + } + state.connection_idle = connection_idle; + true + }); + continue; + }, + }; + let stream = BufStream::new(stream); - if let Err(err) = handle_connection(stream) { - println!("Error processing client request: {err:?}"); + match handle_connection(stream).await { + Ok((command, mut child)) => { + child_set.spawn(async move { (command, child.wait().await) }); + state_tx.send_if_modified(|state| { + let child_processes = child_set.len(); + if state.child_processes == child_processes { + return false; + } + state.child_processes = child_processes; + true + }); + }, + Err(err) => { + eprintln!("Failed to process client request: {err:?}"); + }, + } + state_tx.send_if_modified(|state| { + let connection_idle = true; + if state.connection_idle == connection_idle { + return false; + } + state.connection_idle = connection_idle; + true + }); + }, + Some(res) = child_set.join_next() => { + match res { + Ok((command, res)) => match res { + Ok(status) => { + debug!(command:?; "child process exited"); + if !status.success() { + if let Some(code) = status.code() { + eprintln!( + "{command:?} process exited with status code: {code}" + ); + } else { + eprintln!( + "{command:?} process terminated by signal: {}", + status + .signal() + .expect( + "either one of status code or signal should be set" + ) + ); + } + } + }, + Err(err) => { + eprintln!("Failed to wait for {command:?} process to exit: {err}"); + }, + }, + Err(err) => { + error!(err:% = err; "child task failed"); + }, + } + state_tx.send_if_modified(|state| { + let child_processes = child_set.len(); + if state.child_processes == child_processes { + return false; + } + state.child_processes = child_processes; + true + }); + }, } } - - Ok(()) } -fn read_request(mut stream: &TcpStream) -> Result { - let mut buf_reader = BufReader::new(&mut stream); +async fn read_request(stream: &mut BufStream) -> Result { let mut buf = String::new(); loop { - if buf_reader.read_line(&mut buf)? == 0 { + if stream.read_line(&mut buf).await? == 0 { return Err(anyhow!("unexpected EOF")); } if buf.contains("EOM") { @@ -43,32 +135,34 @@ fn read_request(mut stream: &TcpStream) -> Result { } } -fn handle_connection(mut stream: TcpStream) -> Result<()> { +async fn handle_connection(mut stream: BufStream) -> Result<(PathBuf, Child)> { let mut envs: HashMap = env::vars().collect(); let Launch { command, command_args, env, - } = read_request(&stream)?; + } = read_request(&mut stream).await?; + debug!(command:?, command_args:?, env:?; "received launch request"); envs.extend(env); let (stdout, stderr) = make_stdout_stderr(&command, &envs)?; - let err = Command::new(&command) + let res = Command::new(&command) .args(command_args) .envs(envs) .stdin(Stdio::null()) .stdout(stdout) .stderr(stderr) - .spawn(); - if let Err(err) = err { - let msg = format!("Failed to execute command {command:?}: {err}"); - stream.write_all(msg.as_bytes()).ok(); + .spawn() + .with_context(|| format!("Failed to execute {command:?} as child process")); + if let Err(err) = &res { + let msg = format!("{err:?}"); + stream.write_all(msg.as_bytes()).await.ok(); } else { - stream.write_all(b"OK").ok(); + stream.write_all(b"OK").await.ok(); } - stream.flush().ok(); + stream.flush().await.ok(); - Ok(()) + res.map(|child| (command, child)) } diff --git a/rustfmt.toml b/rustfmt.toml index 1b26771..abb3c84 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,6 +1,6 @@ edition = "2021" -# empty_item_single_line = false +# empty_item_single_line = true # error_on_line_overflow = true # format_code_in_doc_comments = true # format_strings = true