Skip to content

Commit

Permalink
feat(rs-bindgen): help compiler figure out the lifetimes
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Jul 10, 2024
1 parent 3bee000 commit 31f87b6
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 108 deletions.
176 changes: 89 additions & 87 deletions crates/wit-bindgen-rust/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ impl InterfaceGenerator<'_> {
uwrite!(
self.src,
r#"
pub async fn serve_interface<T: {wrpc_transport}::Serve, U>(
wrpc: &T,
pub fn serve_interface<'a, T: {wrpc_transport}::Serve, U>(
wrpc: &'a T,
handler: impl Handler<T::Context> + {resource_traits} ::core::marker::Send + ::core::marker::Sync + ::core::clone::Clone + 'static,
shutdown: impl ::core::future::Future<Output = U>,
) -> {anyhow}::Result<U> {{
let ("#,
) -> impl ::core::future::Future<Output = {anyhow}::Result<U>> + wrpc_transport::Captures<'a> {{
async move {{
let ("#,
anyhow = self.gen.anyhow_path(),
resource_traits = trait_names.join(""),
wrpc_transport = self.gen.wrpc_transport_path()
Expand Down Expand Up @@ -222,39 +223,39 @@ pub async fn serve_interface<T: {wrpc_transport}::Serve, U>(
uwrite!(
self.src,
r#"
async {{
{anyhow}::Context::context({wrpc_transport}::ServeExt::serve_values(
wrpc,
"{instance}",
"{}",
::std::sync::Arc::from("#,
rpc_func_name(func),
anyhow = self.gen.anyhow_path(),
wrpc_transport = self.gen.wrpc_transport_path(),
);
if paths.is_empty() {
self.src.push_str(
r"{
let paths: [::std::boxed::Box<[::core::option::Option<usize>]>; 0] = [];
paths
}",
);
} else {
self.src.push_str("[");
for path in paths {
self.src.push_str("::std::boxed::Box::from(");
self.src.push_str(&path);
self.src.push_str("), ");
}
self.src.push_str("]");
}
uwriteln!(
self.src,
r#")
)
.await,
"failed to serve `{instance}.{}`")
}},"#,
async {{
{anyhow}::Context::context({wrpc_transport}::ServeExt::serve_values(
wrpc,
"{instance}",
"{}",
::std::sync::Arc::from("#,
rpc_func_name(func),
anyhow = self.gen.anyhow_path(),
wrpc_transport = self.gen.wrpc_transport_path(),
);
if paths.is_empty() {
self.src.push_str(
r"{
let paths: [::std::boxed::Box<[::core::option::Option<usize>]>; 0] = [];
paths
}",
);
} else {
self.src.push_str("[");
for path in paths {
self.src.push_str("::std::boxed::Box::from(");
self.src.push_str(&path);
self.src.push_str("), ");
}
self.src.push_str("]");
}
uwriteln!(
self.src,
r#")
)
.await,
"failed to serve `{instance}.{}`")
}},"#,
func.name,
);
}
Expand All @@ -273,25 +274,25 @@ pub async fn serve_interface<T: {wrpc_transport}::Serve, U>(
uwrite!(
self.src,
r"
let mut {name} = ::core::pin::pin!({name});",
let mut {name} = ::core::pin::pin!({name});",
);
}
uwrite!(
self.src,
r"
let mut shutdown = ::core::pin::pin!(shutdown);
loop {{
{tokio}::select! {{",
let mut shutdown = ::core::pin::pin!(shutdown);
loop {{
{tokio}::select! {{",
tokio = self.gen.tokio_path()
);
for func in &funcs_to_export {
let name = to_rust_ident(&func.name);
uwrite!(
self.src,
r"
invocation = {futures}::StreamExt::next(&mut {}_{name}) => {{
match invocation {{
Some(Ok((cx, (",
invocation = {futures}::StreamExt::next(&mut {}_{name}) => {{
match invocation {{
Some(Ok((cx, (",
match func.kind {
FunctionKind::Freestanding => "f",
FunctionKind::Method(_) => "m",
Expand All @@ -306,7 +307,7 @@ pub async fn serve_interface<T: {wrpc_transport}::Serve, U>(
uwrite!(
self.src,
r"
), rx, tx))) => {{",
), rx, tx))) => {{",
);
let (trait_name, name) = match func.kind {
FunctionKind::Freestanding => ("Handler", name),
Expand All @@ -327,9 +328,9 @@ pub async fn serve_interface<T: {wrpc_transport}::Serve, U>(
uwrite!(
self.src,
r#"
let rx = rx.map({tracing}::Instrument::in_current_span).map({tokio}::spawn);
{tracing}::trace!(instance = "{instance}", func = "{wit_name}", "calling handler");
match {trait_name}::{name}(&handler, cx"#,
let rx = rx.map({tracing}::Instrument::in_current_span).map({tokio}::spawn);
{tracing}::trace!(instance = "{instance}", func = "{wit_name}", "calling handler");
match {trait_name}::{name}(&handler, cx"#,
tokio = self.gen.tokio_path(),
tracing = self.gen.tracing_path(),
wit_name = func.name,
Expand All @@ -340,9 +341,9 @@ pub async fn serve_interface<T: {wrpc_transport}::Serve, U>(
uwrite!(
self.src,
r#"
).await {{
Ok(results) => {{
match tx("#,
).await {{
Ok(results) => {{
match tx("#,
);
if func.results.len() == 1 {
// wrap single-element results into a tuple for correct indexing
Expand All @@ -353,44 +354,44 @@ pub async fn serve_interface<T: {wrpc_transport}::Serve, U>(
uwrite!(
self.src,
r#"
).await {{
Ok(()) => {{
if let Some(rx) = rx {{
{tracing}::trace!(instance = "{instance}", func = "{wit_name}", "receiving async invocation parameters");
if let Err(err) = {anyhow}::Context::context(
rx.await,
"receipt of async `.{wit_name}` invocation parameters failed",
)? {{
{tracing}::warn!(?err, instance = "{instance}", func = "{wit_name}", "failed to receive async invocation parameters");
).await {{
Ok(()) => {{
if let Some(rx) = rx {{
{tracing}::trace!(instance = "{instance}", func = "{wit_name}", "receiving async invocation parameters");
if let Err(err) = {anyhow}::Context::context(
rx.await,
"receipt of async `.{wit_name}` invocation parameters failed",
)? {{
{tracing}::warn!(?err, instance = "{instance}", func = "{wit_name}", "failed to receive async invocation parameters");
}}
}}
continue;
}}
continue;
}}
Err(err) => {{
if let Some(rx) = rx {{
rx.abort();
Err(err) => {{
if let Some(rx) = rx {{
rx.abort();
}}
{tracing}::warn!(?err, instance = "{instance}", func = "{wit_name}", "failed to transmit invocation results");
}}
{tracing}::warn!(?err, instance = "{instance}", func = "{wit_name}", "failed to transmit invocation results");
}}
}},
Err(err) => {{
if let Some(rx) = rx {{
rx.abort();
}}
{tracing}::warn!(?err, instance = "{instance}", func = "{wit_name}", "failed to serve invocation");
}}
}},
Err(err) => {{
if let Some(rx) = rx {{
rx.abort();
}}
{tracing}::warn!(?err, instance = "{instance}", func = "{wit_name}", "failed to serve invocation");
}}
}}
}},
Some(Err(err)) => {{
{tracing}::error!(?err, instance = "{instance}", func = "{wit_name}", "failed to accept invocation");
}},
None => {{
{tracing}::warn!(instance = "{instance}", func = "{wit_name}", "invocation stream unexpectedly finished");
{anyhow}::bail!("`{instance}.{wit_name}` stream unexpectedly finished")
}},
}}
}},"#,
}},
Some(Err(err)) => {{
{tracing}::error!(?err, instance = "{instance}", func = "{wit_name}", "failed to accept invocation");
}},
None => {{
{tracing}::warn!(instance = "{instance}", func = "{wit_name}", "invocation stream unexpectedly finished");
{anyhow}::bail!("`{instance}.{wit_name}` stream unexpectedly finished")
}},
}}
}},"#,
anyhow = self.gen.anyhow_path(),
tracing = self.gen.tracing_path(),
wit_name = func.name,
Expand All @@ -400,10 +401,11 @@ pub async fn serve_interface<T: {wrpc_transport}::Serve, U>(
uwriteln!(
self.src,
r#"
v = &mut shutdown => {{
{tracing}::debug!("shutdown received");
return Ok(v)
}},
v = &mut shutdown => {{
{tracing}::debug!("shutdown received");
return Ok(v)
}},
}}
}}
}}
}}"#,
Expand Down
36 changes: 19 additions & 17 deletions crates/wit-bindgen-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,28 +374,30 @@ with: {{\n\t{with_name:?}: generate\n}}
uwriteln!(
self.src,
r#"
pub async fn serve<T: {wrpc_transport}::Serve>(
wrpc: &T,
pub fn serve<'a, T: {wrpc_transport}::Serve>(
wrpc: &'a T,
handler: impl {bound} + ::core::marker::Send + ::core::marker::Sync + ::core::clone::Clone + 'static,
shutdown: impl ::core::future::Future<Output = ()>,
) -> {anyhow}::Result<()> {{
) -> impl ::core::future::Future<Output = {anyhow}::Result<()>> + {wrpc_transport}::Captures<'a> {{
use {futures}::FutureExt as _;
let shutdown = shutdown.shared();
{tokio}::try_join!("#
);
for path in &self.export_paths {
if !path.is_empty() {
self.src.push_str(path);
self.src.push_str("::");
async move {{
{tokio}::try_join!("#
);
for path in &self.export_paths {
if !path.is_empty() {
self.src.push_str(path);
self.src.push_str("::");
}
self.src
.push_str("serve_interface(wrpc, handler.clone(), shutdown.clone()),");
}
self.src
.push_str("serve_interface(wrpc, handler.clone(), shutdown.clone()),");
}
uwriteln!(
self.src,
r#"
)?;
Ok(())
uwriteln!(
self.src,
r#"
)?;
Ok(())
}}
}}"#
);
}
Expand Down
12 changes: 8 additions & 4 deletions tests/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn assert_bindgen<C, I, S>(clt: Arc<I>, srv: Arc<S>) -> anyhow::Result<()>
where
C: Send + Sync + Default,
I: wrpc::Invoke<Context = C> + 'static,
S: wrpc::Serve<Context = C>,
S: wrpc::Serve<Context = C> + Send + 'static,
{
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let shutdown_rx = async move { shutdown_rx.await.expect("shutdown sender dropped") }.shared();
Expand Down Expand Up @@ -268,9 +268,13 @@ where
}
}

serve(srv.as_ref(), Component::default(), shutdown_rx.clone())
.await
.context("failed to serve `wrpc-test:integration/test`")
tokio::spawn({
let srv = Arc::clone(&srv);
let shutdown_rx = shutdown_rx.clone();
async move { serve(srv.as_ref(), Component::default(), shutdown_rx).await }
})
.await?
.context("failed to serve `wrpc-test:integration/test`")
},
async {
wrpc::generate!({
Expand Down

0 comments on commit 31f87b6

Please sign in to comment.