Skip to content

Commit

Permalink
feat(rt-wasmtime): instantiate per call
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Jul 9, 2024
1 parent 34810a4 commit 42e7651
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 132 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/runtime-wasmtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-runtime-wasmtime"
version = "0.18.0"
version = "0.18.2"
description = "wRPC wasmtime integration"

authors.workspace = true
Expand Down
261 changes: 131 additions & 130 deletions crates/runtime-wasmtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use futures::future::try_join_all;
use futures::stream::{self, FuturesUnordered, SelectAll};
use futures::{Stream, TryStreamExt as _};
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
use tokio::sync::Mutex;
use tokio::try_join;
use tokio_util::codec::{Encoder, FramedRead};
use tokio_util::compat::FuturesAsyncReadCompatExt as _;
Expand All @@ -26,7 +25,7 @@ use wasm_tokio::{
CoreVecEncoderBytes, Leb128Encoder, Utf8Codec,
};
use wasmtime::component::types::{self, Case, Field};
use wasmtime::component::{Func, Instance, InstancePre, LinkerInstance, ResourceType, Type, Val};
use wasmtime::component::{Func, InstancePre, LinkerInstance, ResourceType, Type, Val};
use wasmtime::{AsContextMut, Engine, StoreContextMut};
use wasmtime_wasi::pipe::AsyncReadStream;
use wasmtime_wasi::{InputStream, StreamError, WasiView};
Expand Down Expand Up @@ -989,81 +988,123 @@ where
})
}

pub async fn call<C, I, O>(
mut store: C,
rx: I,
mut tx: O,
params_ty: impl ExactSizeIterator<Item = &Type>,
results_ty: impl ExactSizeIterator<Item = &Type>,
func: Func,
) -> anyhow::Result<()>
where
I: AsyncRead + wrpc_transport::Index<I> + Send + Sync + Unpin + 'static,
O: AsyncWrite + wrpc_transport::Index<O> + Send + Sync + Unpin + 'static,
C: AsContextMut,
C::Data: WasiView,
{
let mut params = vec![Val::Bool(false); params_ty.len()];
let mut rx = pin!(rx);
for (i, (v, ty)) in zip(&mut params, params_ty).enumerate() {
read_value(&mut store, &mut rx, v, ty, &[i])
.await
.with_context(|| format!("failed to decode parameter value {i}"))?;
}
let mut results = vec![Val::Bool(false); results_ty.len()];
func.call_async(&mut store, &params, &mut results)
.await
.context("failed to call function")?;
let mut buf = BytesMut::default();
let mut deferred = vec![];
for (i, (ref v, ty)) in zip(results, results_ty).enumerate() {
let mut enc = ValEncoder::new(store.as_context_mut(), ty);
enc.encode(v, &mut buf)
.with_context(|| format!("failed to encode result value {i}"))?;
deferred.push(enc.deferred);
}
debug!("transmitting results");
tx.write_all(&buf)
.await
.context("failed to transmit results")?;
tx.shutdown()
.await
.context("failed to shutdown outgoing stream")?;
try_join_all(
zip(0.., deferred)
.filter_map(|(i, f)| f.map(|f| (tx.index(&[i]), f)))
.map(|(w, f)| async move {
let w = w?;
f(w).await
}),
)
.await?;
Ok(())
}

pub trait ServeExt: wrpc_transport::Serve {
/// Serve [`Func`]
/// Serve [`types::ComponentFunc`] from an [`InstancePre`] instantiating it on each call
fn serve_function<T>(
&self,
store: impl Into<Arc<Mutex<wasmtime::Store<T>>>>,
func: Func,
instance: &str,
engine: Engine,
instance_pre: InstancePre<T>,
data: T,
ty: types::ComponentFunc,
instance_name: &str,
name: &str,
) -> impl Future<
Output = anyhow::Result<
impl Stream<Item = anyhow::Result<(Self::Context, anyhow::Result<()>)>> + Send + 'static,
>,
> + Send
where
T: WasiView + 'static,
T: WasiView + Clone + 'static,
{
let store = store.into();
async move {
let (params_ty, results_ty): (Arc<[_]>, Arc<[_]>) = {
let store = store.lock().await;
(
Arc::from(func.params(&*store)),
Arc::from(func.results(&*store)),
)
};
debug!(instance = instance_name, name, "serving function export");
// TODO: set paths
let invocations = self.serve(instance, name, []).await?;
Ok(invocations.and_then(move |(cx, mut tx, rx)| {
let store = store.clone();
let invocations = self.serve(instance_name, name, []).await?;
let instance_name = Arc::<str>::from(instance_name);
let name = Arc::<str>::from(name);
let params_ty: Arc<[_]> = ty.params().collect();
let results_ty: Arc<[_]> = ty.results().collect();
Ok(invocations.and_then(move |(cx, tx, rx)| {
let data = data.clone();
let engine = engine.clone();
let instance_name = Arc::clone(&instance_name);
let instance_pre = instance_pre.clone();
let name = Arc::clone(&name);
let params_ty = Arc::clone(&params_ty);
let results_ty = Arc::clone(&results_ty);
async move {
Ok((
cx,
async move {
let mut store = store.lock().await;
let mut params = vec![Val::Bool(false); params_ty.len()];
let mut rx = pin!(rx);
for (i, (v, ty)) in zip(&mut params, params_ty.iter()).enumerate() {
read_value(&mut *store, &mut rx, v, ty, &[i])
.await
.with_context(|| {
format!("failed to decode parameter value {i}")
})?;
}
let mut results = vec![Val::Bool(false); results_ty.len()];
func.call_async(&mut *store, &params, &mut results)
.await
.context("failed to call function")?;
let mut buf = BytesMut::default();
let mut deferred = vec![];
for (i, (ref v, ty)) in zip(results, results_ty.iter()).enumerate() {
let mut enc = ValEncoder::new(store.as_context_mut(), ty);
enc.encode(v, &mut buf).with_context(|| {
format!("failed to encode result value {i}")
})?;
deferred.push(enc.deferred);
}
debug!("transmitting results");
tx.write_all(&buf)
.await
.context("failed to transmit results")?;
tx.shutdown()
let mut store = wasmtime::Store::new(&engine, data);
let instance = instance_pre
.instantiate_async(&mut store)
.await
.context("failed to shutdown outgoing stream")?;
try_join_all(
zip(0.., deferred)
.filter_map(|(i, f)| f.map(|f| (tx.index(&[i]), f)))
.map(|(w, f)| async move {
let w = w?;
f(w).await
}),
.context("failed to instantiate component")?;
let func = {
let mut instance = instance.exports(&mut store);
if instance_name.is_empty() {
instance.root()
} else {
instance.instance(&instance_name).with_context(|| {
format!("instance export `{instance_name}` not found")
})?
}
.func(&name)
.with_context(|| format!("function export`{name}` not found"))?
};
call(
&mut store,
rx,
tx,
params_ty.iter(),
results_ty.iter(),
func,
)
.await?;
func.post_return_async(&mut *store)
func.post_return_async(&mut store)
.await
.context("failed to perform post-return cleanup")?;
Ok(())
Expand All @@ -1075,13 +1116,14 @@ pub trait ServeExt: wrpc_transport::Serve {
}
}

/// Serve [`types::ComponentItem`]
/// Serve [`types::ComponentItem`] from an [`InstancePre`] instantiating it on each call
fn serve_item<T>(
&self,
instance: &Instance,
engine: &Engine,
instance_pre: &InstancePre<T>,
data: T,
ty: types::ComponentItem,
store: impl Into<Arc<Mutex<wasmtime::Store<T>>>>,
instance_name: &str,
instance: &str,
name: &str,
) -> impl Future<
Output = anyhow::Result<
Expand All @@ -1092,29 +1134,20 @@ pub trait ServeExt: wrpc_transport::Serve {
>,
> + Send
where
T: WasiView + 'static,
T: WasiView + Clone + 'static,
{
let store = store.into();
async move {
match ty {
types::ComponentItem::ComponentFunc(_) => {
debug!(instance_name, name, "serving function export");
let name = name.to_string();
let func = {
let mut store = store.lock().await;
let mut instance = instance.exports(&mut *store);
if instance_name.is_empty() {
instance.root()
} else {
instance.instance(instance_name).with_context(|| {
format!("instance export `{instance_name}` not found")
})?
}
.func(&name)
.with_context(|| format!("function export`{name}` not found"))
}?;
types::ComponentItem::ComponentFunc(ty) => {
let s = self
.serve_function(Arc::clone(&store), func, instance_name, &name)
.serve_function(
engine.clone(),
instance_pre.clone(),
data,
ty,
instance,
name,
)
.await?;
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = _> + Send>>)
}
Expand All @@ -1128,28 +1161,22 @@ pub trait ServeExt: wrpc_transport::Serve {
bail!("serving component exports not supported yet");
}
types::ComponentItem::ComponentInstance(ty) => {
let mut instance_store = store.lock().await;
let engine = instance_store.engine().clone();
let mut instance = instance.exports(&mut *instance_store);
let mut instance = instance
.instance(name)
.with_context(|| format!("instance export `{instance_name}` not found"))?;
let instance_name = name;
let mut invocations: SelectAll<Pin<Box<dyn Stream<Item = _> + Send>>> =
SelectAll::new();
for (name, ty) in ty.exports(&engine) {
for (name, ty) in ty.exports(engine) {
match ty {
types::ComponentItem::ComponentFunc(_) => {
debug!(name, "serving instance function export");
let func = instance.func(name).with_context(|| {
format!(
"function `{name}` not found in instance export `{instance_name}`"
)
})?;
let func = self
.serve_function(Arc::clone(&store), func, instance_name, name)
types::ComponentItem::ComponentFunc(ty) => {
let s = self
.serve_function(
engine.clone(),
instance_pre.clone(),
data.clone(),
ty,
instance,
name,
)
.await?;
invocations.push(Box::pin(func));
invocations.push(Box::pin(s));
}
types::ComponentItem::CoreFunc(_) => {
bail!("serving instance core function exports not supported yet")
Expand Down Expand Up @@ -1181,33 +1208,28 @@ pub trait ServeExt: wrpc_transport::Serve {
}
}

/// Serve all exports of this [`InstancePre`]
/// Serve all exports of this [`InstancePre`] instantiating it on each call
fn serve_exports<T>(
&self,
instance: InstancePre<T>,
mut store: wasmtime::Store<T>,
engine: &Engine,
instance_pre: &InstancePre<T>,
data: T,
) -> impl Future<
Output = anyhow::Result<
impl Stream<Item = anyhow::Result<(Self::Context, anyhow::Result<()>)>> + 'static,
>,
> + Send
where
T: WasiView + 'static,
T: WasiView + Clone + 'static,
{
async move {
let engine = store.engine().clone();
let ty = instance.component().component_type();
let exports = ty.exports(&engine);
let instance = instance
.instantiate_async(&mut store)
.await
.context("failed to instantiate component")?;
let store = Arc::new(Mutex::new(store));
let ty = instance_pre.component().component_type();
let exports = ty.exports(engine);
let mut invocations: SelectAll<Pin<Box<dyn Stream<Item = _> + Send>>> =
SelectAll::new();
for (name, ty) in exports {
let s = self
.serve_item(&instance, ty, Arc::clone(&store), "", name)
.serve_item(engine, instance_pre, data.clone(), ty, "", name)
.await?;
invocations.push(Box::pin(s));
}
Expand All @@ -1217,24 +1239,3 @@ pub trait ServeExt: wrpc_transport::Serve {
}

impl<T: wrpc_transport::Serve> ServeExt for T {}

#[cfg(test)]
mod tests {
use tokio::sync::Mutex;

use super::*;

#[allow(unused)]
async fn serve_function<T: WasiView + 'static>(
srv: impl wrpc_transport::Serve,
store: wasmtime::Store<T>,
func: Func,
) {
srv.serve_function(Mutex::new(store), func, "foo", "bar")
.await
.unwrap()
.try_for_each(|_| async { Ok(()) })
.await
.unwrap();
}
}

0 comments on commit 42e7651

Please sign in to comment.