diff --git a/Cargo.lock b/Cargo.lock index b6a54500..273dd754 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3723,7 +3723,7 @@ dependencies = [ [[package]] name = "wrpc-runtime-wasmtime" -version = "0.17.4" +version = "0.17.5" dependencies = [ "anyhow", "bytes", diff --git a/crates/runtime-wasmtime/Cargo.toml b/crates/runtime-wasmtime/Cargo.toml index af27313a..ac576d53 100644 --- a/crates/runtime-wasmtime/Cargo.toml +++ b/crates/runtime-wasmtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wrpc-runtime-wasmtime" -version = "0.17.4" +version = "0.17.5" description = "wRPC wasmtime integration" authors.workspace = true diff --git a/crates/runtime-wasmtime/src/lib.rs b/crates/runtime-wasmtime/src/lib.rs index 96a181ee..2014bd99 100644 --- a/crates/runtime-wasmtime/src/lib.rs +++ b/crates/runtime-wasmtime/src/lib.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use anyhow::{bail, Context as _}; use bytes::{BufMut as _, Bytes, BytesMut}; use futures::future::try_join_all; -use futures::stream::FuturesUnordered; +use futures::stream::{self, FuturesUnordered, SelectAll}; use futures::{Stream, TryStreamExt as _}; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; use tokio::sync::Mutex; @@ -26,7 +26,7 @@ use wasm_tokio::{ CoreVecEncoderBytes, Leb128Encoder, Utf8Codec, }; use wasmtime::component::types::{self, Case, Field}; -use wasmtime::component::{Func, LinkerInstance, ResourceType, Type, Val}; +use wasmtime::component::{Func, Instance, InstancePre, LinkerInstance, ResourceType, Type, Val}; use wasmtime::{AsContextMut, Engine, StoreContextMut}; use wasmtime_wasi::pipe::AsyncReadStream; use wasmtime_wasi::{InputStream, StreamError, WasiView}; @@ -990,6 +990,7 @@ where } pub trait ServeExt: wrpc_transport::Serve { + /// Serve [`Func`] fn serve_function( &self, store: impl Into>>>, @@ -1062,6 +1063,141 @@ pub trait ServeExt: wrpc_transport::Serve { })) } } + + /// Serve [`types::ComponentItem`] + fn serve_item( + &self, + engine: &Engine, + instance: &Instance, + ty: types::ComponentItem, + store: impl Into>>>, + instance_name: &str, + name: &str, + ) -> impl Future< + Output = anyhow::Result< + impl Stream> + Unpin + Send + 'static, + >, + > + Send + where + T: WasiView + 'static, + { + let store = store.into(); + async move { + match ty { + types::ComponentItem::ComponentFunc(_) => { + debug!(instance_name, name, "serving function export"); + let name = name.to_string(); + let func = { + let mut store = store.lock().await; + let mut instance = instance.exports(&mut *store); + if instance_name.is_empty() { + instance.root() + } else { + instance.instance(instance_name).with_context(|| { + format!("instance export `{instance_name}` not found") + })? + } + .func(&name) + .with_context(|| format!("function export`{name}` not found")) + }?; + let s = self + .serve_function(Arc::clone(&store), func, instance_name, &name) + .await?; + Ok(Box::pin(s) as Pin + Send>>) + } + types::ComponentItem::CoreFunc(_) => { + bail!("serving core function exports not supported yet") + } + types::ComponentItem::Module(_) => { + bail!("serving module exports not supported yet"); + } + types::ComponentItem::Component(_) => { + bail!("serving component exports not supported yet"); + } + types::ComponentItem::ComponentInstance(ty) => { + let mut instance_store = store.lock().await; + let mut instance = instance.exports(&mut *instance_store); + let mut instance = instance + .instance(name) + .with_context(|| format!("instance export `{instance_name}` not found"))?; + let instance_name = name; + let mut invocations: SelectAll + Send>>> = + SelectAll::new(); + for (name, ty) in ty.exports(engine) { + match ty { + types::ComponentItem::ComponentFunc(_) => { + debug!(name, "serving instance function export"); + let func = instance.func(name).with_context(|| { + format!( + "function `{name}` not found in instance export `{instance_name}`" + ) + })?; + let func = self + .serve_function(Arc::clone(&store), func, instance_name, name) + .await?; + invocations.push(Box::pin(func)); + } + types::ComponentItem::CoreFunc(_) => { + bail!("serving instance core function exports not supported yet") + } + types::ComponentItem::Module(_) => { + bail!("serving instance module exports not supported yet") + } + types::ComponentItem::Component(_) => { + bail!("serving instance component exports not supported yet") + } + types::ComponentItem::ComponentInstance(_) => { + bail!("serving nested instance exports not supported yet") + } + types::ComponentItem::Type(_) => {} + types::ComponentItem::Resource(_) => { + bail!("serving instance resource exports not supported yet") + } + } + } + Ok(Box::pin(invocations) as Pin + Send>>) + } + types::ComponentItem::Type(_) => { + Ok(Box::pin(stream::empty()) as Pin + Send>>) + } + types::ComponentItem::Resource(_) => { + bail!("serving root resource exports not supported yet") + } + } + } + } + + /// Serve all exports of this [`InstancePre`] + fn serve_exports( + &self, + engine: &Engine, + instance: InstancePre, + mut store: wasmtime::Store, + ) -> impl Future< + Output = anyhow::Result> + 'static>, + > + Send + where + T: WasiView + 'static, + { + async move { + let ty = instance.component().component_type(); + let exports = ty.exports(engine); + let instance = instance + .instantiate_async(&mut store) + .await + .context("failed to instantiate component")?; + let store = Arc::new(Mutex::new(store)); + let mut invocations: SelectAll + Send>>> = + SelectAll::new(); + for (name, ty) in exports { + let s = self + .serve_item(engine, &instance, ty, Arc::clone(&store), "", name) + .await?; + invocations.push(Box::pin(s)); + } + Ok(invocations) + } + } } impl ServeExt for T {} @@ -1083,6 +1219,6 @@ mod tests { .unwrap() .try_for_each(|_| async { Ok(()) }) .await - .unwrap() + .unwrap(); } }