-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Make krun-server wait for child processes
Continue to accept and handle incoming connections, until the server is idle and all child processes have exited. Signed-off-by: Teoh Han Hui <[email protected]>
- Loading branch information
1 parent
d0179f3
commit 3c6f941
Showing
8 changed files
with
488 additions
and
83 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,94 @@ | ||
use std::net::TcpListener; | ||
use std::os::fd::AsRawFd; | ||
use std::panic; | ||
use std::process::Command; | ||
use std::os::unix::process::ExitStatusExt as _; | ||
|
||
use anyhow::{Context, Result}; | ||
use anyhow::Result; | ||
use krun_server::cli_options::options; | ||
use krun_server::server::start_server; | ||
use nix::sys::socket::{shutdown, Shutdown}; | ||
use krun_server::server::{start_server, State}; | ||
use log::error; | ||
use tokio::net::TcpListener; | ||
use tokio::process::Command; | ||
use tokio::sync::watch; | ||
use tokio_stream::wrappers::WatchStream; | ||
use tokio_stream::StreamExt as _; | ||
|
||
fn main() -> Result<()> { | ||
#[tokio::main] | ||
async fn main() -> Result<()> { | ||
env_logger::init(); | ||
|
||
let options = options().run(); | ||
|
||
let listener = TcpListener::bind(format!("0.0.0.0:{}", options.server_port))?; | ||
let listener_fd = listener.as_raw_fd(); | ||
let listener = TcpListener::bind(format!("0.0.0.0:{}", options.server_port)).await?; | ||
let (state_tx, state_rx) = watch::channel(State { | ||
connection_idle: true, | ||
child_processes: 0, | ||
}); | ||
|
||
let server_thread = start_server(listener); | ||
|
||
Command::new(&options.command) | ||
let server_handle = tokio::spawn(start_server(listener, state_tx)); | ||
tokio::pin!(server_handle); | ||
let command_status = Command::new(&options.command) | ||
.args(options.command_args) | ||
.status() | ||
.with_context(|| format!("Failed to execute command {:?}", options.command))?; | ||
.status(); | ||
tokio::pin!(command_status); | ||
let mut state_rx = WatchStream::new(state_rx); | ||
|
||
shutdown(listener_fd, Shutdown::Both)?; | ||
if let Err(err) = server_thread.join() { | ||
panic::resume_unwind(err); | ||
} | ||
let mut server_died = false; | ||
let mut command_exited = false; | ||
|
||
Ok(()) | ||
loop { | ||
tokio::select! { | ||
res = &mut server_handle, if !server_died => { | ||
// If an error is received here, accepting connections from the | ||
// TCP listener failed due to non-transient errors and the | ||
// server is giving up and shutting down. | ||
// | ||
// Errors encountered when handling individual connections do | ||
// not bubble up to this point. | ||
if let Err(err) = res { | ||
error!(err:% = err; "server task failed"); | ||
server_died = true; | ||
} | ||
}, | ||
res = &mut command_status, if !command_exited => { | ||
match res { | ||
Ok(status) => { | ||
if !status.success() { | ||
if let Some(code) = status.code() { | ||
eprintln!( | ||
"{:?} process exited with status code: {code}", | ||
options.command | ||
); | ||
} else { | ||
eprintln!( | ||
"{:?} process terminated by signal: {}", | ||
options.command, | ||
status | ||
.signal() | ||
.expect("either one of status code or signal should be set") | ||
); | ||
} | ||
} | ||
}, | ||
Err(err) => { | ||
eprintln!( | ||
"Failed to execute {:?} as child process: {err}", | ||
options.command | ||
); | ||
}, | ||
} | ||
command_exited = true; | ||
}, | ||
Some(state) = state_rx.next(), if command_exited => { | ||
if state.connection_idle && state.child_processes == 0 { | ||
// Server is idle (not currently handling an accepted | ||
// incoming connection) and no more child processes. | ||
// We're done. | ||
return Ok(()); | ||
} | ||
println!( | ||
"Waiting for {} other commands launched through this krun server to exit...", | ||
state.child_processes | ||
); | ||
println!("Press Ctrl+C to force quit"); | ||
}, | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters