Skip to content

Commit

Permalink
add Graceful Shutdown support to Server
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Sep 18, 2018
1 parent 11f95e6 commit 76c3f7d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ bitflags = "1"
bytes = "0.4.8"
futures = "0.1"
http = "0.1"
hyper = "0.12.6"
hyper = "0.12.10"
log = "0.4"
mime = "0.3"
mime_guess = "2.0.0-alpha.6"
Expand Down
80 changes: 67 additions & 13 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@ pub struct Server<S> {
service: S,
}

// Getting all various generic bounds to make this a re-usable method is
// very complicated, so instead this is just a macro.
macro_rules! bind_inner {
($this:ident, $addr:expr) => ({
let inner = Arc::new($this.service.into_warp_service());
let service = move || {
let inner = inner.clone();
service_fn(move |req| {
ReplyFuture {
inner: inner.call(req)
}
})
};
let srv = HyperServer::bind(&$addr.into())
.http1_pipeline_flush($this.pipeline)
.serve(service);
let addr = srv.local_addr();
(addr, srv)
});
}

impl<S> Server<S>
where
S: IntoWarpService + 'static,
Expand Down Expand Up @@ -55,22 +76,55 @@ where
/// Returns the bound address and a `Future` that can be executed on
/// any runtime.
pub fn bind_ephemeral(self, addr: impl Into<SocketAddr> + 'static) -> (SocketAddr, impl Future<Item=(), Error=()> + 'static) {
let inner = Arc::new(self.service.into_warp_service());
let service = move || {
let inner = inner.clone();
service_fn(move |req| {
ReplyFuture {
inner: inner.call(req)
}
})
};
let srv = HyperServer::bind(&addr.into())
.http1_pipeline_flush(self.pipeline)
.serve(service);
let addr = srv.local_addr();
let (addr, srv) = bind_inner!(self, addr);
(addr, srv.map_err(|e| error!("server error: {}", e)))
}

/// Create a server with graceful shutdown signal.
///
/// When the signal completes, the server will start the graceful shutdown
/// process.
///
/// Returns the bound address and a `Future` that can be executed on
/// any runtime.
///
/// # Example
///
/// ```no_run
/// extern crate futures;
/// extern crate warp;
///
/// use futures::sync::oneshot;
/// use warp::Filter;
///
/// # fn main() {
/// let routes = warp::any()
/// .map(|| "Hello, World!");
///
/// let (tx, rx) = oneshot::channel();
///
/// let (addr, server) = warp::serve(routes)
/// .bind_with_graceful_shutdown(([127, 0, 0, 1], 3030), rx);
///
/// // Spawn the server into a runtime
/// warp::spawn(server);
///
/// // Later, start the shutdown...
/// let _ = tx.send(());
/// # }
/// ```
pub fn bind_with_graceful_shutdown(
self,
addr: impl Into<SocketAddr> + 'static,
signal: impl Future<Item=()> + Send + 'static,
) -> (SocketAddr, impl Future<Item=(), Error=()> + 'static) {
let (addr, srv) = bind_inner!(self, addr);
let fut = srv
.with_graceful_shutdown(signal)
.map_err(|e| error!("server error: {}", e));
(addr, fut)
}

// Generally shouldn't be used, as it can slow down non-pipelined responses.
//
// It's only real use is to make silly pipeline benchmarks look better.
Expand Down

0 comments on commit 76c3f7d

Please sign in to comment.