Skip to content

Commit

Permalink
feat(rt-wasmtime): always propagate serving context
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Jul 8, 2024
1 parent 6c6f71c commit 34810a4
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ wit-component = { version = "0.212", default-features = false }
wit-parser = { version = "0.212", default-features = false }
wrpc-cli = { version = "0.2", path = "./crates/cli", default-features = false }
wrpc-introspect = { version = "0.1", default-features = false, path = "./crates/introspect" }
wrpc-runtime-wasmtime = { version = "0.17", path = "./crates/runtime-wasmtime", default-features = false }
wrpc-runtime-wasmtime = { version = "0.18", path = "./crates/runtime-wasmtime", default-features = false }
wrpc-transport = { version = "0.26", path = "./crates/transport", default-features = false }
wrpc-transport-nats = { version = "0.22", path = "./crates/transport-nats", default-features = false }
wrpc-transport-quic = { version = "0.1", path = "./crates/transport-quic", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/runtime-wasmtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-runtime-wasmtime"
version = "0.17.6"
version = "0.18.0"
description = "wRPC wasmtime integration"

authors.workspace = true
Expand Down
102 changes: 59 additions & 43 deletions crates/runtime-wasmtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,9 @@ pub trait ServeExt: wrpc_transport::Serve {
instance: &str,
name: &str,
) -> impl Future<
Output = anyhow::Result<impl Stream<Item = anyhow::Result<Self::Context>> + Send + 'static>,
Output = anyhow::Result<
impl Stream<Item = anyhow::Result<(Self::Context, anyhow::Result<()>)>> + Send + 'static,
>,
> + Send
where
T: WasiView + 'static,
Expand All @@ -1019,46 +1021,55 @@ pub trait ServeExt: wrpc_transport::Serve {
let params_ty = Arc::clone(&params_ty);
let results_ty = Arc::clone(&results_ty);
async move {
let mut store = store.lock().await;
let mut params = vec![Val::Bool(false); params_ty.len()];
let mut rx = pin!(rx);
for (i, (v, ty)) in zip(&mut params, params_ty.iter()).enumerate() {
read_value(&mut *store, &mut rx, v, ty, &[i])
.await
.with_context(|| format!("failed to decode parameter value {i}"))?;
}
let mut results = vec![Val::Bool(false); results_ty.len()];
func.call_async(&mut *store, &params, &mut results)
.await
.context("failed to call function")?;
let mut buf = BytesMut::default();
let mut deferred = vec![];
for (i, (ref v, ty)) in zip(results, results_ty.iter()).enumerate() {
let mut enc = ValEncoder::new(store.as_context_mut(), ty);
enc.encode(v, &mut buf)
.with_context(|| format!("failed to encode result value {i}"))?;
deferred.push(enc.deferred);
}
debug!("transmitting results");
tx.write_all(&buf)
.await
.context("failed to transmit results")?;
tx.shutdown()
.await
.context("failed to shutdown outgoing stream")?;
try_join_all(
zip(0.., deferred)
.filter_map(|(i, f)| f.map(|f| (tx.index(&[i]), f)))
.map(|(w, f)| async move {
let w = w?;
f(w).await
}),
)
.await?;
func.post_return_async(&mut *store)
.await
.context("failed to perform post-return cleanup")?;
Ok(cx)
Ok((
cx,
async move {
let mut store = store.lock().await;
let mut params = vec![Val::Bool(false); params_ty.len()];
let mut rx = pin!(rx);
for (i, (v, ty)) in zip(&mut params, params_ty.iter()).enumerate() {
read_value(&mut *store, &mut rx, v, ty, &[i])
.await
.with_context(|| {
format!("failed to decode parameter value {i}")
})?;
}
let mut results = vec![Val::Bool(false); results_ty.len()];
func.call_async(&mut *store, &params, &mut results)
.await
.context("failed to call function")?;
let mut buf = BytesMut::default();
let mut deferred = vec![];
for (i, (ref v, ty)) in zip(results, results_ty.iter()).enumerate() {
let mut enc = ValEncoder::new(store.as_context_mut(), ty);
enc.encode(v, &mut buf).with_context(|| {
format!("failed to encode result value {i}")
})?;
deferred.push(enc.deferred);
}
debug!("transmitting results");
tx.write_all(&buf)
.await
.context("failed to transmit results")?;
tx.shutdown()
.await
.context("failed to shutdown outgoing stream")?;
try_join_all(
zip(0.., deferred)
.filter_map(|(i, f)| f.map(|f| (tx.index(&[i]), f)))
.map(|(w, f)| async move {
let w = w?;
f(w).await
}),
)
.await?;
func.post_return_async(&mut *store)
.await
.context("failed to perform post-return cleanup")?;
Ok(())
}
.await,
))
}
}))
}
Expand All @@ -1074,7 +1085,10 @@ pub trait ServeExt: wrpc_transport::Serve {
name: &str,
) -> impl Future<
Output = anyhow::Result<
impl Stream<Item = anyhow::Result<Self::Context>> + Unpin + Send + 'static,
impl Stream<Item = anyhow::Result<(Self::Context, anyhow::Result<()>)>>
+ Unpin
+ Send
+ 'static,
>,
> + Send
where
Expand Down Expand Up @@ -1173,7 +1187,9 @@ pub trait ServeExt: wrpc_transport::Serve {
instance: InstancePre<T>,
mut store: wasmtime::Store<T>,
) -> impl Future<
Output = anyhow::Result<impl Stream<Item = anyhow::Result<Self::Context>> + 'static>,
Output = anyhow::Result<
impl Stream<Item = anyhow::Result<(Self::Context, anyhow::Result<()>)>> + 'static,
>,
> + Send
where
T: WasiView + 'static,
Expand Down

0 comments on commit 34810a4

Please sign in to comment.