From 34810a4db7a69539e17fe68ae4c0022a57f66e0e Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Mon, 8 Jul 2024 21:01:31 +0200 Subject: [PATCH] feat(rt-wasmtime): always propagate serving context Signed-off-by: Roman Volosatovs --- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/runtime-wasmtime/Cargo.toml | 2 +- crates/runtime-wasmtime/src/lib.rs | 102 +++++++++++++++++------------ 4 files changed, 62 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9def2e02..df332b5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3723,7 +3723,7 @@ dependencies = [ [[package]] name = "wrpc-runtime-wasmtime" -version = "0.17.6" +version = "0.18.0" dependencies = [ "anyhow", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 0fb976f5..fb0b83e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,7 +136,7 @@ wit-component = { version = "0.212", default-features = false } wit-parser = { version = "0.212", default-features = false } wrpc-cli = { version = "0.2", path = "./crates/cli", default-features = false } wrpc-introspect = { version = "0.1", default-features = false, path = "./crates/introspect" } -wrpc-runtime-wasmtime = { version = "0.17", path = "./crates/runtime-wasmtime", default-features = false } +wrpc-runtime-wasmtime = { version = "0.18", path = "./crates/runtime-wasmtime", default-features = false } wrpc-transport = { version = "0.26", path = "./crates/transport", default-features = false } wrpc-transport-nats = { version = "0.22", path = "./crates/transport-nats", default-features = false } wrpc-transport-quic = { version = "0.1", path = "./crates/transport-quic", default-features = false } diff --git a/crates/runtime-wasmtime/Cargo.toml b/crates/runtime-wasmtime/Cargo.toml index 349865c1..0c254d39 100644 --- a/crates/runtime-wasmtime/Cargo.toml +++ b/crates/runtime-wasmtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wrpc-runtime-wasmtime" -version = "0.17.6" +version = "0.18.0" description = "wRPC wasmtime integration" authors.workspace = true diff --git a/crates/runtime-wasmtime/src/lib.rs b/crates/runtime-wasmtime/src/lib.rs index 9777a9a4..fc7de5f8 100644 --- a/crates/runtime-wasmtime/src/lib.rs +++ b/crates/runtime-wasmtime/src/lib.rs @@ -998,7 +998,9 @@ pub trait ServeExt: wrpc_transport::Serve { instance: &str, name: &str, ) -> impl Future< - Output = anyhow::Result> + Send + 'static>, + Output = anyhow::Result< + impl Stream)>> + Send + 'static, + >, > + Send where T: WasiView + 'static, @@ -1019,46 +1021,55 @@ pub trait ServeExt: wrpc_transport::Serve { let params_ty = Arc::clone(¶ms_ty); let results_ty = Arc::clone(&results_ty); async move { - let mut store = store.lock().await; - let mut params = vec![Val::Bool(false); params_ty.len()]; - let mut rx = pin!(rx); - for (i, (v, ty)) in zip(&mut params, params_ty.iter()).enumerate() { - read_value(&mut *store, &mut rx, v, ty, &[i]) - .await - .with_context(|| format!("failed to decode parameter value {i}"))?; - } - let mut results = vec![Val::Bool(false); results_ty.len()]; - func.call_async(&mut *store, ¶ms, &mut results) - .await - .context("failed to call function")?; - let mut buf = BytesMut::default(); - let mut deferred = vec![]; - for (i, (ref v, ty)) in zip(results, results_ty.iter()).enumerate() { - let mut enc = ValEncoder::new(store.as_context_mut(), ty); - enc.encode(v, &mut buf) - .with_context(|| format!("failed to encode result value {i}"))?; - deferred.push(enc.deferred); - } - debug!("transmitting results"); - tx.write_all(&buf) - .await - .context("failed to transmit results")?; - tx.shutdown() - .await - .context("failed to shutdown outgoing stream")?; - try_join_all( - zip(0.., deferred) - .filter_map(|(i, f)| f.map(|f| (tx.index(&[i]), f))) - .map(|(w, f)| async move { - let w = w?; - f(w).await - }), - ) - .await?; - func.post_return_async(&mut *store) - .await - .context("failed to perform post-return cleanup")?; - Ok(cx) + Ok(( + cx, + async move { + let mut store = store.lock().await; + let mut params = vec![Val::Bool(false); params_ty.len()]; + let mut rx = pin!(rx); + for (i, (v, ty)) in zip(&mut params, params_ty.iter()).enumerate() { + read_value(&mut *store, &mut rx, v, ty, &[i]) + .await + .with_context(|| { + format!("failed to decode parameter value {i}") + })?; + } + let mut results = vec![Val::Bool(false); results_ty.len()]; + func.call_async(&mut *store, ¶ms, &mut results) + .await + .context("failed to call function")?; + let mut buf = BytesMut::default(); + let mut deferred = vec![]; + for (i, (ref v, ty)) in zip(results, results_ty.iter()).enumerate() { + let mut enc = ValEncoder::new(store.as_context_mut(), ty); + enc.encode(v, &mut buf).with_context(|| { + format!("failed to encode result value {i}") + })?; + deferred.push(enc.deferred); + } + debug!("transmitting results"); + tx.write_all(&buf) + .await + .context("failed to transmit results")?; + tx.shutdown() + .await + .context("failed to shutdown outgoing stream")?; + try_join_all( + zip(0.., deferred) + .filter_map(|(i, f)| f.map(|f| (tx.index(&[i]), f))) + .map(|(w, f)| async move { + let w = w?; + f(w).await + }), + ) + .await?; + func.post_return_async(&mut *store) + .await + .context("failed to perform post-return cleanup")?; + Ok(()) + } + .await, + )) } })) } @@ -1074,7 +1085,10 @@ pub trait ServeExt: wrpc_transport::Serve { name: &str, ) -> impl Future< Output = anyhow::Result< - impl Stream> + Unpin + Send + 'static, + impl Stream)>> + + Unpin + + Send + + 'static, >, > + Send where @@ -1173,7 +1187,9 @@ pub trait ServeExt: wrpc_transport::Serve { instance: InstancePre, mut store: wasmtime::Store, ) -> impl Future< - Output = anyhow::Result> + 'static>, + Output = anyhow::Result< + impl Stream)>> + 'static, + >, > + Send where T: WasiView + 'static,