diff --git a/Cargo.lock b/Cargo.lock index df332b5f..70ca9fcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3723,7 +3723,7 @@ dependencies = [ [[package]] name = "wrpc-runtime-wasmtime" -version = "0.18.0" +version = "0.18.2" dependencies = [ "anyhow", "bytes", diff --git a/crates/runtime-wasmtime/Cargo.toml b/crates/runtime-wasmtime/Cargo.toml index 0c254d39..cf3d59d8 100644 --- a/crates/runtime-wasmtime/Cargo.toml +++ b/crates/runtime-wasmtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wrpc-runtime-wasmtime" -version = "0.18.0" +version = "0.18.2" description = "wRPC wasmtime integration" authors.workspace = true diff --git a/crates/runtime-wasmtime/src/lib.rs b/crates/runtime-wasmtime/src/lib.rs index fc7de5f8..0cbac1a1 100644 --- a/crates/runtime-wasmtime/src/lib.rs +++ b/crates/runtime-wasmtime/src/lib.rs @@ -14,7 +14,6 @@ use futures::future::try_join_all; use futures::stream::{self, FuturesUnordered, SelectAll}; use futures::{Stream, TryStreamExt as _}; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; -use tokio::sync::Mutex; use tokio::try_join; use tokio_util::codec::{Encoder, FramedRead}; use tokio_util::compat::FuturesAsyncReadCompatExt as _; @@ -26,7 +25,7 @@ use wasm_tokio::{ CoreVecEncoderBytes, Leb128Encoder, Utf8Codec, }; use wasmtime::component::types::{self, Case, Field}; -use wasmtime::component::{Func, Instance, InstancePre, LinkerInstance, ResourceType, Type, Val}; +use wasmtime::component::{Func, InstancePre, LinkerInstance, ResourceType, Type, Val}; use wasmtime::{AsContextMut, Engine, StoreContextMut}; use wasmtime_wasi::pipe::AsyncReadStream; use wasmtime_wasi::{InputStream, StreamError, WasiView}; @@ -989,13 +988,67 @@ where }) } +pub async fn call( + mut store: C, + rx: I, + mut tx: O, + params_ty: impl ExactSizeIterator, + results_ty: impl ExactSizeIterator, + func: Func, +) -> anyhow::Result<()> +where + I: AsyncRead + wrpc_transport::Index + Send + Sync + Unpin + 'static, + O: AsyncWrite + wrpc_transport::Index + Send + Sync + Unpin + 'static, + C: AsContextMut, + C::Data: WasiView, +{ + let mut params = vec![Val::Bool(false); params_ty.len()]; + let mut rx = pin!(rx); + for (i, (v, ty)) in zip(&mut params, params_ty).enumerate() { + read_value(&mut store, &mut rx, v, ty, &[i]) + .await + .with_context(|| format!("failed to decode parameter value {i}"))?; + } + let mut results = vec![Val::Bool(false); results_ty.len()]; + func.call_async(&mut store, ¶ms, &mut results) + .await + .context("failed to call function")?; + let mut buf = BytesMut::default(); + let mut deferred = vec![]; + for (i, (ref v, ty)) in zip(results, results_ty).enumerate() { + let mut enc = ValEncoder::new(store.as_context_mut(), ty); + enc.encode(v, &mut buf) + .with_context(|| format!("failed to encode result value {i}"))?; + deferred.push(enc.deferred); + } + debug!("transmitting results"); + tx.write_all(&buf) + .await + .context("failed to transmit results")?; + tx.shutdown() + .await + .context("failed to shutdown outgoing stream")?; + try_join_all( + zip(0.., deferred) + .filter_map(|(i, f)| f.map(|f| (tx.index(&[i]), f))) + .map(|(w, f)| async move { + let w = w?; + f(w).await + }), + ) + .await?; + Ok(()) +} + pub trait ServeExt: wrpc_transport::Serve { - /// Serve [`Func`] + /// Serve [`types::ComponentFunc`] from an [`InstancePre`] instantiating it on each call fn serve_function( &self, - store: impl Into>>>, - func: Func, - instance: &str, + engine: Engine, + instance_pre: InstancePre, + data: T, + ty: types::ComponentFunc, + instance_name: &str, name: &str, ) -> impl Future< Output = anyhow::Result< @@ -1003,67 +1056,55 @@ pub trait ServeExt: wrpc_transport::Serve { >, > + Send where - T: WasiView + 'static, + T: WasiView + Clone + 'static, { - let store = store.into(); async move { - let (params_ty, results_ty): (Arc<[_]>, Arc<[_]>) = { - let store = store.lock().await; - ( - Arc::from(func.params(&*store)), - Arc::from(func.results(&*store)), - ) - }; + debug!(instance = instance_name, name, "serving function export"); // TODO: set paths - let invocations = self.serve(instance, name, []).await?; - Ok(invocations.and_then(move |(cx, mut tx, rx)| { - let store = store.clone(); + let invocations = self.serve(instance_name, name, []).await?; + let instance_name = Arc::::from(instance_name); + let name = Arc::::from(name); + let params_ty: Arc<[_]> = ty.params().collect(); + let results_ty: Arc<[_]> = ty.results().collect(); + Ok(invocations.and_then(move |(cx, tx, rx)| { + let data = data.clone(); + let engine = engine.clone(); + let instance_name = Arc::clone(&instance_name); + let instance_pre = instance_pre.clone(); + let name = Arc::clone(&name); let params_ty = Arc::clone(¶ms_ty); let results_ty = Arc::clone(&results_ty); async move { Ok(( cx, async move { - let mut store = store.lock().await; - let mut params = vec![Val::Bool(false); params_ty.len()]; - let mut rx = pin!(rx); - for (i, (v, ty)) in zip(&mut params, params_ty.iter()).enumerate() { - read_value(&mut *store, &mut rx, v, ty, &[i]) - .await - .with_context(|| { - format!("failed to decode parameter value {i}") - })?; - } - let mut results = vec![Val::Bool(false); results_ty.len()]; - func.call_async(&mut *store, ¶ms, &mut results) - .await - .context("failed to call function")?; - let mut buf = BytesMut::default(); - let mut deferred = vec![]; - for (i, (ref v, ty)) in zip(results, results_ty.iter()).enumerate() { - let mut enc = ValEncoder::new(store.as_context_mut(), ty); - enc.encode(v, &mut buf).with_context(|| { - format!("failed to encode result value {i}") - })?; - deferred.push(enc.deferred); - } - debug!("transmitting results"); - tx.write_all(&buf) - .await - .context("failed to transmit results")?; - tx.shutdown() + let mut store = wasmtime::Store::new(&engine, data); + let instance = instance_pre + .instantiate_async(&mut store) .await - .context("failed to shutdown outgoing stream")?; - try_join_all( - zip(0.., deferred) - .filter_map(|(i, f)| f.map(|f| (tx.index(&[i]), f))) - .map(|(w, f)| async move { - let w = w?; - f(w).await - }), + .context("failed to instantiate component")?; + let func = { + let mut instance = instance.exports(&mut store); + if instance_name.is_empty() { + instance.root() + } else { + instance.instance(&instance_name).with_context(|| { + format!("instance export `{instance_name}` not found") + })? + } + .func(&name) + .with_context(|| format!("function export`{name}` not found"))? + }; + call( + &mut store, + rx, + tx, + params_ty.iter(), + results_ty.iter(), + func, ) .await?; - func.post_return_async(&mut *store) + func.post_return_async(&mut store) .await .context("failed to perform post-return cleanup")?; Ok(()) @@ -1075,13 +1116,14 @@ pub trait ServeExt: wrpc_transport::Serve { } } - /// Serve [`types::ComponentItem`] + /// Serve [`types::ComponentItem`] from an [`InstancePre`] instantiating it on each call fn serve_item( &self, - instance: &Instance, + engine: &Engine, + instance_pre: &InstancePre, + data: T, ty: types::ComponentItem, - store: impl Into>>>, - instance_name: &str, + instance: &str, name: &str, ) -> impl Future< Output = anyhow::Result< @@ -1092,29 +1134,20 @@ pub trait ServeExt: wrpc_transport::Serve { >, > + Send where - T: WasiView + 'static, + T: WasiView + Clone + 'static, { - let store = store.into(); async move { match ty { - types::ComponentItem::ComponentFunc(_) => { - debug!(instance_name, name, "serving function export"); - let name = name.to_string(); - let func = { - let mut store = store.lock().await; - let mut instance = instance.exports(&mut *store); - if instance_name.is_empty() { - instance.root() - } else { - instance.instance(instance_name).with_context(|| { - format!("instance export `{instance_name}` not found") - })? - } - .func(&name) - .with_context(|| format!("function export`{name}` not found")) - }?; + types::ComponentItem::ComponentFunc(ty) => { let s = self - .serve_function(Arc::clone(&store), func, instance_name, &name) + .serve_function( + engine.clone(), + instance_pre.clone(), + data, + ty, + instance, + name, + ) .await?; Ok(Box::pin(s) as Pin + Send>>) } @@ -1128,28 +1161,22 @@ pub trait ServeExt: wrpc_transport::Serve { bail!("serving component exports not supported yet"); } types::ComponentItem::ComponentInstance(ty) => { - let mut instance_store = store.lock().await; - let engine = instance_store.engine().clone(); - let mut instance = instance.exports(&mut *instance_store); - let mut instance = instance - .instance(name) - .with_context(|| format!("instance export `{instance_name}` not found"))?; - let instance_name = name; let mut invocations: SelectAll + Send>>> = SelectAll::new(); - for (name, ty) in ty.exports(&engine) { + for (name, ty) in ty.exports(engine) { match ty { - types::ComponentItem::ComponentFunc(_) => { - debug!(name, "serving instance function export"); - let func = instance.func(name).with_context(|| { - format!( - "function `{name}` not found in instance export `{instance_name}`" - ) - })?; - let func = self - .serve_function(Arc::clone(&store), func, instance_name, name) + types::ComponentItem::ComponentFunc(ty) => { + let s = self + .serve_function( + engine.clone(), + instance_pre.clone(), + data.clone(), + ty, + instance, + name, + ) .await?; - invocations.push(Box::pin(func)); + invocations.push(Box::pin(s)); } types::ComponentItem::CoreFunc(_) => { bail!("serving instance core function exports not supported yet") @@ -1181,33 +1208,28 @@ pub trait ServeExt: wrpc_transport::Serve { } } - /// Serve all exports of this [`InstancePre`] + /// Serve all exports of this [`InstancePre`] instantiating it on each call fn serve_exports( &self, - instance: InstancePre, - mut store: wasmtime::Store, + engine: &Engine, + instance_pre: &InstancePre, + data: T, ) -> impl Future< Output = anyhow::Result< impl Stream)>> + 'static, >, > + Send where - T: WasiView + 'static, + T: WasiView + Clone + 'static, { async move { - let engine = store.engine().clone(); - let ty = instance.component().component_type(); - let exports = ty.exports(&engine); - let instance = instance - .instantiate_async(&mut store) - .await - .context("failed to instantiate component")?; - let store = Arc::new(Mutex::new(store)); + let ty = instance_pre.component().component_type(); + let exports = ty.exports(engine); let mut invocations: SelectAll + Send>>> = SelectAll::new(); for (name, ty) in exports { let s = self - .serve_item(&instance, ty, Arc::clone(&store), "", name) + .serve_item(engine, instance_pre, data.clone(), ty, "", name) .await?; invocations.push(Box::pin(s)); } @@ -1217,24 +1239,3 @@ pub trait ServeExt: wrpc_transport::Serve { } impl ServeExt for T {} - -#[cfg(test)] -mod tests { - use tokio::sync::Mutex; - - use super::*; - - #[allow(unused)] - async fn serve_function( - srv: impl wrpc_transport::Serve, - store: wasmtime::Store, - func: Func, - ) { - srv.serve_function(Mutex::new(store), func, "foo", "bar") - .await - .unwrap() - .try_for_each(|_| async { Ok(()) }) - .await - .unwrap(); - } -}