Skip to content

Commit

Permalink
fix(rt-wasmtime): take Arc<Mutex<Store<T>>> in serve
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Jul 8, 2024
1 parent d5d06fa commit a5daa65
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/runtime-wasmtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-runtime-wasmtime"
version = "0.17.3"
version = "0.17.4"
description = "wRPC wasmtime integration"

authors.workspace = true
Expand Down
44 changes: 36 additions & 8 deletions crates/runtime-wasmtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::{Stream, TryStreamExt as _};
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
use tokio::sync::Mutex;
use tokio::try_join;
use tokio_util::codec::{Encoder, FramedRead};
use tokio_util::compat::FuturesAsyncReadCompatExt as _;
Expand Down Expand Up @@ -991,7 +992,7 @@ where
pub trait ServeExt: wrpc_transport::Serve {
fn serve_function<T>(
&self,
store: impl AsContextMut<Data = T> + Send + Clone + 'static,
store: impl Into<Arc<Mutex<wasmtime::Store<T>>>>,
func: Func,
instance: &str,
name: &str,
Expand All @@ -1001,26 +1002,32 @@ pub trait ServeExt: wrpc_transport::Serve {
where
T: WasiView + 'static,
{
let store = store.into();
async move {
let params_ty: Arc<[_]> = Arc::from(func.params(&store));
let results_ty: Arc<[_]> = Arc::from(func.results(&store));
let (params_ty, results_ty): (Arc<[_]>, Arc<[_]>) = {
let store = store.lock().await;
(
Arc::from(func.params(&*store)),
Arc::from(func.results(&*store)),
)
};
// TODO: set paths
let invocations = self.serve(instance, name, []).await?;

Ok(invocations.and_then(move |(cx, mut tx, rx)| {
let mut store = store.clone();
let store = store.clone();
let params_ty = Arc::clone(&params_ty);
let results_ty = Arc::clone(&results_ty);
async move {
let mut store = store.lock().await;
let mut params = vec![Val::Bool(false); params_ty.len()];
let mut rx = pin!(rx);
for (i, (v, ty)) in zip(&mut params, params_ty.iter()).enumerate() {
read_value(&mut store, &mut rx, v, ty, &[i])
read_value(&mut *store, &mut rx, v, ty, &[i])
.await
.with_context(|| format!("failed to decode parameter value {i}"))?;
}
let mut results = vec![Val::Bool(false); results_ty.len()];
func.call_async(&mut store, &params, &mut results)
func.call_async(&mut *store, &params, &mut results)
.await
.context("failed to call function")?;
let mut buf = BytesMut::default();
Expand All @@ -1047,7 +1054,7 @@ pub trait ServeExt: wrpc_transport::Serve {
}),
)
.await?;
func.post_return_async(&mut store)
func.post_return_async(&mut *store)
.await
.context("failed to perform post-return cleanup")?;
Ok(cx)
Expand All @@ -1058,3 +1065,24 @@ pub trait ServeExt: wrpc_transport::Serve {
}

impl<T: wrpc_transport::Serve> ServeExt for T {}

#[cfg(test)]
mod tests {
use tokio::sync::Mutex;

use super::*;

#[allow(unused)]
async fn serve_function<T: WasiView + 'static>(
srv: impl wrpc_transport::Serve,
store: wasmtime::Store<T>,
func: Func,
) {
srv.serve_function(Mutex::new(store), func, "foo", "bar")
.await
.unwrap()
.try_for_each(|_| async { Ok(()) })
.await
.unwrap()
}
}

0 comments on commit a5daa65

Please sign in to comment.