Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement a shutdown signal listener function #153

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 94 additions & 45 deletions pingora-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,66 +77,73 @@ pub struct Server {
impl Server {
async fn main_loop(&self) -> ShutdownType {
// waiting for exit signal
// TODO: there should be a signal handling function
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
tokio::select! {
_ = fast_shutdown_signal.recv() => {
let shutdown_signal = wait_for_shutdown_signal().await;
match shutdown_signal {
ShutdownSignal::Fast => {
info!("SIGINT received, exiting");
ShutdownType::Quick
},
_ = graceful_terminate_signal.recv() => {
}
ShutdownSignal::GracefulTerminate => {
// we receive a graceful terminate, all instances are instructed to stop
info!("SIGTERM received, gracefully exiting");
// graceful shutdown if there are listening sockets
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
}
_ = graceful_upgrade_signal.recv() => {
// TODO: still need to select! on signals in case a fast shutdown is needed
// aka: move below to another task and only kick it off here
info!("SIGQUIT received, sending socks and gracefully exiting");
if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
// XXX: this is blocking IO
match fds.send_to_sock(
self.configuration.as_ref().upgrade_sock.as_str())
{
Ok(_) => {info!("listener sockets sent");},
Err(e) => {
error!("Unable to send listener sockets to new process: {e}");
// sentry log error on fd send failure
#[cfg(not(debug_assertions))]
sentry::capture_error(&e);
}
}
sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
info!("Broadcasting graceful shutdown");
// gracefully exiting
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
// switch to fast shutdown
return ShutdownType::Graceful;
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
} else {
info!("No socks to send, shutting down.");
ShutdownType::Graceful
ShutdownSignal::GracefulUpgrade => {
let mut wait_for_sig_int = unix::signal(unix::SignalKind::interrupt())
.expect("Failed to create SIGINT listener.");
tokio::select! {
_ = wait_for_sig_int.recv() => {}
_ = self.graceful_upgrade() => {}
}
},
ShutdownType::Graceful
}
}
}

async fn graceful_upgrade(&self) {
// aka: move below to another task and only kick it off here
info!("SIGQUIT received, sending socks and gracefully exiting");
if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
// XXX: this is blocking IO
match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
Ok(_) => {
info!("listener sockets sent");
}
Err(e) => {
error!("Unable to send listener sockets to new process: {e}");
// sentry log error on fd send failure
#[cfg(not(debug_assertions))]
sentry::capture_error(&e);
}
}
sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
info!("Broadcasting graceful shutdown");
// gracefully exiting
match self.shutdown_watch.send(true) {
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
// switch to fast shutdown
}
}
info!("Broadcast graceful shutdown complete");
} else {
info!("No socks to send, shutting down.");
}
}

Expand Down Expand Up @@ -339,3 +346,45 @@ impl Server {
}
}
}

enum ShutdownSignal {
Fast,
GracefulTerminate,
GracefulUpgrade,
}

async fn wait_for_shutdown_signal() -> ShutdownSignal {
let sig_int = async {
tokio::signal::ctrl_c()
.await
.expect("Failed to create SIGINT listener.");
};

#[cfg(unix)]
let sig_term = async {
unix::signal(unix::SignalKind::terminate())
.expect("Failed to create SIGTERM listener.")
.recv()
.await;
};

#[cfg(unix)]
let sig_quit = async {
unix::signal(unix::SignalKind::quit())
.expect("Failed to create SIGQUIT listener.")
.recv()
.await;
};

#[cfg(not(unix))]
let sig_term = std::future::pending::<()>();

#[cfg(not(unix))]
let sig_quit = std::future::pending::<()>();

tokio::select! {
_ = sig_int => ShutdownSignal::Fast,
_ = sig_term => ShutdownSignal::GracefulTerminate,
_ = sig_quit => ShutdownSignal::GracefulUpgrade,
}
}