Skip to content

Commit

Permalink
feat(rt-wasmtime): add more serving functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Jul 8, 2024
1 parent a5daa65 commit d09db49
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/runtime-wasmtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-runtime-wasmtime"
version = "0.17.4"
version = "0.17.5"
description = "wRPC wasmtime integration"

authors.workspace = true
Expand Down
142 changes: 139 additions & 3 deletions crates/runtime-wasmtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::Arc;
use anyhow::{bail, Context as _};
use bytes::{BufMut as _, Bytes, BytesMut};
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::stream::{self, FuturesUnordered, SelectAll};
use futures::{Stream, TryStreamExt as _};
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
use tokio::sync::Mutex;
Expand All @@ -26,7 +26,7 @@ use wasm_tokio::{
CoreVecEncoderBytes, Leb128Encoder, Utf8Codec,
};
use wasmtime::component::types::{self, Case, Field};
use wasmtime::component::{Func, LinkerInstance, ResourceType, Type, Val};
use wasmtime::component::{Func, Instance, InstancePre, LinkerInstance, ResourceType, Type, Val};
use wasmtime::{AsContextMut, Engine, StoreContextMut};
use wasmtime_wasi::pipe::AsyncReadStream;
use wasmtime_wasi::{InputStream, StreamError, WasiView};
Expand Down Expand Up @@ -990,6 +990,7 @@ where
}

pub trait ServeExt: wrpc_transport::Serve {
/// Serve [`Func`]
fn serve_function<T>(
&self,
store: impl Into<Arc<Mutex<wasmtime::Store<T>>>>,
Expand Down Expand Up @@ -1062,6 +1063,141 @@ pub trait ServeExt: wrpc_transport::Serve {
}))
}
}

/// Serve [`types::ComponentItem`]
fn serve_item<T>(
&self,
engine: &Engine,
instance: &Instance,
ty: types::ComponentItem,
store: impl Into<Arc<Mutex<wasmtime::Store<T>>>>,
instance_name: &str,
name: &str,
) -> impl Future<
Output = anyhow::Result<
impl Stream<Item = anyhow::Result<Self::Context>> + Unpin + Send + 'static,
>,
> + Send
where
T: WasiView + 'static,
{
let store = store.into();
async move {
match ty {
types::ComponentItem::ComponentFunc(_) => {
debug!(instance_name, name, "serving function export");
let name = name.to_string();
let func = {
let mut store = store.lock().await;
let mut instance = instance.exports(&mut *store);
if instance_name.is_empty() {
instance.root()
} else {
instance.instance(instance_name).with_context(|| {
format!("instance export `{instance_name}` not found")
})?
}
.func(&name)
.with_context(|| format!("function export`{name}` not found"))
}?;
let s = self
.serve_function(Arc::clone(&store), func, instance_name, &name)
.await?;
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = _> + Send>>)
}
types::ComponentItem::CoreFunc(_) => {
bail!("serving core function exports not supported yet")
}
types::ComponentItem::Module(_) => {
bail!("serving module exports not supported yet");
}
types::ComponentItem::Component(_) => {
bail!("serving component exports not supported yet");
}
types::ComponentItem::ComponentInstance(ty) => {
let mut instance_store = store.lock().await;
let mut instance = instance.exports(&mut *instance_store);
let mut instance = instance
.instance(name)
.with_context(|| format!("instance export `{instance_name}` not found"))?;
let instance_name = name;
let mut invocations: SelectAll<Pin<Box<dyn Stream<Item = _> + Send>>> =
SelectAll::new();
for (name, ty) in ty.exports(engine) {
match ty {
types::ComponentItem::ComponentFunc(_) => {
debug!(name, "serving instance function export");
let func = instance.func(name).with_context(|| {
format!(
"function `{name}` not found in instance export `{instance_name}`"
)
})?;
let func = self
.serve_function(Arc::clone(&store), func, instance_name, name)
.await?;
invocations.push(Box::pin(func));
}
types::ComponentItem::CoreFunc(_) => {
bail!("serving instance core function exports not supported yet")
}
types::ComponentItem::Module(_) => {
bail!("serving instance module exports not supported yet")
}
types::ComponentItem::Component(_) => {
bail!("serving instance component exports not supported yet")
}
types::ComponentItem::ComponentInstance(_) => {
bail!("serving nested instance exports not supported yet")
}
types::ComponentItem::Type(_) => {}
types::ComponentItem::Resource(_) => {
bail!("serving instance resource exports not supported yet")
}
}
}
Ok(Box::pin(invocations) as Pin<Box<dyn Stream<Item = _> + Send>>)
}
types::ComponentItem::Type(_) => {
Ok(Box::pin(stream::empty()) as Pin<Box<dyn Stream<Item = _> + Send>>)
}
types::ComponentItem::Resource(_) => {
bail!("serving root resource exports not supported yet")
}
}
}
}

/// Serve all exports of this [`InstancePre`]
fn serve_exports<T>(
&self,
engine: &Engine,
instance: InstancePre<T>,
mut store: wasmtime::Store<T>,
) -> impl Future<
Output = anyhow::Result<impl Stream<Item = anyhow::Result<Self::Context>> + 'static>,
> + Send
where
T: WasiView + 'static,
{
async move {
let ty = instance.component().component_type();
let exports = ty.exports(engine);
let instance = instance
.instantiate_async(&mut store)
.await
.context("failed to instantiate component")?;
let store = Arc::new(Mutex::new(store));
let mut invocations: SelectAll<Pin<Box<dyn Stream<Item = _> + Send>>> =
SelectAll::new();
for (name, ty) in exports {
let s = self
.serve_item(engine, &instance, ty, Arc::clone(&store), "", name)
.await?;
invocations.push(Box::pin(s));
}
Ok(invocations)
}
}
}

impl<T: wrpc_transport::Serve> ServeExt for T {}
Expand All @@ -1083,6 +1219,6 @@ mod tests {
.unwrap()
.try_for_each(|_| async { Ok(()) })
.await
.unwrap()
.unwrap();
}
}

0 comments on commit d09db49

Please sign in to comment.