Struggling with server shutdown #493
-
I don't really understand when the If I slightly modify the hello_world example to support exactly one connection (i.e. by removing the loop and calling pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = ::std::env::args().collect();
if args.len() != 3 {
println!("usage: {} server ADDRESS[:PORT]", args[0]);
return Ok(());
}
let addr = args[2]
.to_socket_addrs()?
.next()
.expect("could not parse address");
// In my case, I want to share the server between multiple connections so I moved it here
let hello_world_client: hello_world::Client = capnp_rpc::new_client(HelloWorldImpl);
// I create a `local_set` object here for later
let local_set = tokio::task::LocalSet::new();
local_set.spawn_local(async move {
let listener = tokio::net::TcpListener::bind(&addr).await?;
// Note that I removed the loop here
{
let (stream, _) = listener.accept().await?;
stream.set_nodelay(true)?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let network = twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let rpc_system =
RpcSystem::new(Box::new(network), Some(hello_world_client.clone().client));
tokio::task::spawn_local(rpc_system).await;
}
Ok::<(), Box<dyn std::error::Error>>(())
});
// This awaits until all the tasks are done
local_set.await;
Ok(())
} This does what I expect: the server responds to the first client and shuts down gracefully. Now if I add a pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = ::std::env::args().collect();
if args.len() != 3 {
println!("usage: {} server ADDRESS[:PORT]", args[0]);
return Ok(());
}
let addr = args[2]
.to_socket_addrs()?
.next()
.expect("could not parse address");
let hello_world_client: hello_world::Client = capnp_rpc::new_client(HelloWorldImpl);
let local_set = tokio::task::LocalSet::new();
// This is the server_task, like above
local_set.spawn_local(async move {
let listener = tokio::net::TcpListener::bind(&addr).await?;
{
let (stream, _) = listener.accept().await?;
stream.set_nodelay(true)?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let network = twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let rpc_system =
RpcSystem::new(Box::new(network), Some(hello_world_client.clone().client));
tokio::task::spawn_local(rpc_system).await;
}
Ok::<(), Box<dyn std::error::Error>>(())
});
// This is the client_task, imported from `client.rs`
local_set.spawn_local(async move {
let stream = tokio::net::TcpStream::connect(&addr).await?;
stream.set_nodelay(true)?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let hello_world: hello_world::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc_system);
let mut request = hello_world.say_hello_request();
request.get().init_request().set_name("blah");
let reply = request.send().promise.await?;
println!(
"received: {}",
reply.get()?.get_reply()?.get_message()?.to_str()?
);
Ok::<(), Box<dyn std::error::Error>>(())
});
// This never finishes (?!)
local_set.await;
Ok(())
} This time, Why is that? What am I missing? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
When the client and server are in separate processes, the client process terminates and closes the connection, causing the server process to also shut down. When they are in the same process, you need to trigger the disconnect yourself. You can do so using the capnproto-rust/capnp-rpc/src/lib.rs Lines 295 to 299 in 17a4bec This works: pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = ::std::env::args().collect();
if args.len() != 3 {
println!("usage: {} server ADDRESS[:PORT]", args[0]);
return Ok(());
}
let addr = args[2]
.to_socket_addrs()?
.next()
.expect("could not parse address");
let hello_world_client: hello_world::Client = capnp_rpc::new_client(HelloWorldImpl);
let local_set = tokio::task::LocalSet::new();
// This is the server_task, like above
local_set.spawn_local(async move {
let listener = tokio::net::TcpListener::bind(&addr).await?;
{
let (stream, _) = listener.accept().await?;
stream.set_nodelay(true)?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let network = twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let rpc_system =
RpcSystem::new(Box::new(network), Some(hello_world_client.clone().client));
tokio::task::spawn_local(rpc_system).await;
}
Ok::<(), Box<dyn std::error::Error>>(())
});
// This is the client_task, imported from `client.rs`
local_set.spawn_local(async move {
let stream = tokio::net::TcpStream::connect(&addr).await?;
stream.set_nodelay(true)?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let hello_world: hello_world::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
let disconnector = rpc_system.get_disconnector();
tokio::task::spawn_local(rpc_system);
let mut request = hello_world.say_hello_request();
request.get().init_request().set_name("blah");
let reply = request.send().promise.await?;
println!(
"received: {}",
reply.get()?.get_reply()?.get_message()?.to_str()?
);
disconnector.await?;
Ok::<(), Box<dyn std::error::Error>>(())
});
// This finishes.
local_set.await;
Ok(())
} |
Beta Was this translation helpful? Give feedback.
When the client and server are in separate processes, the client process terminates and closes the connection, causing the server process to also shut down. When they are in the same process, you need to trigger the disconnect yourself. You can do so using the
RpcSystem::get_connector()
method:capnproto-rust/capnp-rpc/src/lib.rs
Lines 295 to 299 in 17a4bec