From d8f3eebe7e2eb47097359b70a4232079f4fdc6ea Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sat, 5 Oct 2024 21:03:26 -0300 Subject: [PATCH] refactor: organize module layout (#29) --- balius-runtime/src/adapter.rs | 56 --------- balius-runtime/src/kv/mod.rs | 23 ++++ balius-runtime/src/lib.rs | 192 +++++++++++++++++++++++++++---- balius-runtime/src/loader.rs | 97 ---------------- balius-runtime/src/submit/mod.rs | 15 +++ balius-runtime/tests/e2e.rs | 5 +- 6 files changed, 214 insertions(+), 174 deletions(-) delete mode 100644 balius-runtime/src/adapter.rs create mode 100644 balius-runtime/src/kv/mod.rs delete mode 100644 balius-runtime/src/loader.rs create mode 100644 balius-runtime/src/submit/mod.rs diff --git a/balius-runtime/src/adapter.rs b/balius-runtime/src/adapter.rs deleted file mode 100644 index d59db8f..0000000 --- a/balius-runtime/src/adapter.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::sync::Arc; - -use crate::{ - ledgers, - router::Router, - wit::balius::app::{driver, kv, ledger, submit}, -}; - -pub struct Adapter { - worker_id: String, - router: Router, - pub ledger: ledgers::Ledger, -} - -impl Adapter { - pub fn new(worker_id: String, router: Router, ledger: ledgers::Ledger) -> Self { - Self { - worker_id, - router, - ledger, - } - } -} - -#[async_trait::async_trait] -impl kv::Host for Adapter { - async fn get_value(&mut self, key: String) -> Result { - todo!() - } - - async fn set_value(&mut self, key: String, value: kv::Payload) -> Result<(), kv::KvError> { - println!("{}:{}", key, hex::encode(value)); - - Ok(()) - } - - async fn list_values(&mut self, prefix: String) -> Result, kv::KvError> { - todo!() - } -} - -#[async_trait::async_trait] -impl submit::Host for Adapter { - async fn submit_tx(&mut self, tx: submit::Cbor) -> Result<(), submit::SubmitError> { - println!("{}", hex::encode(tx)); - - Ok(()) - } -} - -#[async_trait::async_trait] -impl driver::Host for Adapter { - async fn register_channel(&mut self, id: u32, pattern: driver::EventPattern) -> () { - self.router.register_channel(&self.worker_id, id, &pattern); - } -} diff --git a/balius-runtime/src/kv/mod.rs b/balius-runtime/src/kv/mod.rs new file mode 100644 index 0000000..aeb138b --- /dev/null +++ b/balius-runtime/src/kv/mod.rs @@ -0,0 +1,23 @@ +use crate::wit::balius::app::kv as wit; + +#[derive(Clone)] +pub enum Kv { + Mock, +} + +#[async_trait::async_trait] +impl wit::Host for Kv { + async fn get_value(&mut self, key: String) -> Result { + todo!() + } + + async fn set_value(&mut self, key: String, value: wit::Payload) -> Result<(), wit::KvError> { + println!("{}:{}", key, hex::encode(value)); + + Ok(()) + } + + async fn list_values(&mut self, prefix: String) -> Result, wit::KvError> { + todo!() + } +} diff --git a/balius-runtime/src/lib.rs b/balius-runtime/src/lib.rs index 8678f1b..de758da 100644 --- a/balius-runtime/src/lib.rs +++ b/balius-runtime/src/lib.rs @@ -1,6 +1,10 @@ use pallas::ledger::traverse::MultiEraBlock; use serde_json::json; -use std::{collections::HashSet, path::Path}; +use std::{ + collections::{HashMap, HashSet}, + path::Path, + sync::{Arc, Mutex}, +}; use thiserror::Error; mod wit { @@ -11,13 +15,13 @@ mod wit { }); } -mod adapter; -mod loader; mod router; mod store; // implementations +pub mod kv; pub mod ledgers; +pub mod submit; pub use store::Store; @@ -100,24 +104,48 @@ pub struct ChainPoint(pub BlockSlot, pub BlockHash); pub type LogSeq = u64; +struct WorkerState { + pub worker_id: String, + pub router: router::Router, + pub ledger: Option, + pub kv: Option, + pub submit: Option, +} + +#[async_trait::async_trait] +impl wit::balius::app::driver::Host for WorkerState { + async fn register_channel( + &mut self, + id: u32, + pattern: wit::balius::app::driver::EventPattern, + ) -> () { + self.router.register_channel(&self.worker_id, id, &pattern); + } +} + +struct LoadedWorker { + store: wasmtime::Store, + instance: wit::Worker, +} + +type WorkerMap = HashMap; + #[derive(Clone)] pub struct Runtime { - loader: loader::Loader, + engine: wasmtime::Engine, + linker: wasmtime::component::Linker, + loaded: Arc>, + router: router::Router, store: store::Store, - ledger: ledgers::Ledger, + ledger: Option, + kv: Option, + submit: Option, } impl Runtime { - pub fn new(store: store::Store, ledger: ledgers::Ledger) -> Result { - let router = router::Router::new(); - - Ok(Self { - loader: loader::Loader::new(router.clone())?, - router, - store, - ledger, - }) + pub fn builder(store: store::Store) -> RuntimeBuilder { + RuntimeBuilder::new(store) } pub fn cursor(&self) -> Result, Error> { @@ -131,14 +159,55 @@ impl Runtime { id: &str, wasm_path: impl AsRef, config: serde_json::Value, - ) -> Result<(), Error> { - self.loader - .register_worker(id, wasm_path, config, self.ledger.clone()) - .await?; + ) -> wasmtime::Result<()> { + let component = wasmtime::component::Component::from_file(&self.engine, wasm_path)?; + + let mut store = wasmtime::Store::new( + &self.engine, + WorkerState { + worker_id: id.to_owned(), + router: self.router.clone(), + ledger: self.ledger.clone(), + kv: self.kv.clone(), + submit: self.submit.clone(), + }, + ); + + let instance = wit::Worker::instantiate_async(&mut store, &component, &self.linker).await?; + + let config = serde_json::to_vec(&config).unwrap(); + instance.call_init(&mut store, &config).await?; + + self.loaded + .lock() + .unwrap() + .insert(id.to_owned(), LoadedWorker { store, instance }); Ok(()) } + pub async fn dispatch_event( + &self, + worker: &str, + channel: u32, + event: &wit::Event, + ) -> Result { + let mut lock = self.loaded.lock().unwrap(); + + let worker = lock + .get_mut(worker) + .ok_or(Error::WorkerNotFound(worker.to_string()))?; + + let result = worker + .instance + .call_handle(&mut worker.store, channel, event) + .await?; + + let response = result.map_err(|err| Error::Handle(err.code, err.message))?; + + Ok(response) + } + async fn fire_and_forget( &self, event: &wit::Event, @@ -146,7 +215,6 @@ impl Runtime { ) -> Result<(), Error> { for target in targets { let result = self - .loader .dispatch_event(&target.worker, target.channel, event) .await; @@ -199,7 +267,6 @@ impl Runtime { let evt = wit::Event::Request(serde_json::to_vec(¶ms).unwrap()); let reply = self - .loader .dispatch_event(&target.worker, target.channel, &evt) .await?; @@ -213,3 +280,88 @@ impl Runtime { Ok(json) } } + +pub struct RuntimeBuilder { + store: store::Store, + engine: wasmtime::Engine, + linker: wasmtime::component::Linker, + ledger: Option, + kv: Option, + submit: Option, +} + +impl RuntimeBuilder { + pub fn new(store: store::Store) -> Self { + let mut config = wasmtime::Config::new(); + config.async_support(true); + let engine = wasmtime::Engine::new(&config).unwrap(); + let mut linker = wasmtime::component::Linker::new(&engine); + + wit::balius::app::driver::add_to_linker(&mut linker, |state: &mut WorkerState| state) + .unwrap(); + + Self { + store, + engine, + linker, + ledger: None, + kv: None, + submit: None, + } + } + + pub fn with_ledger(mut self, ledger: ledgers::Ledger) -> Self { + self.ledger = Some(ledger); + + wit::balius::app::ledger::add_to_linker(&mut self.linker, |state: &mut WorkerState| { + state.ledger.as_mut().unwrap() + }) + .unwrap(); + + self + } + + pub fn with_kv(mut self, kv: kv::Kv) -> Self { + self.kv = Some(kv); + + wit::balius::app::kv::add_to_linker(&mut self.linker, |state: &mut WorkerState| { + state.kv.as_mut().unwrap() + }) + .unwrap(); + + self + } + + pub fn with_submit(mut self, submit: submit::Submit) -> Self { + self.submit = Some(submit); + + wit::balius::app::submit::add_to_linker(&mut self.linker, |state: &mut WorkerState| { + state.submit.as_mut().unwrap() + }) + .unwrap(); + + self + } + + pub fn build(self) -> Result { + let RuntimeBuilder { + store, + engine, + linker, + ledger, + kv, + submit, + } = self; + + Ok(Runtime { + loaded: Default::default(), + router: router::Router::new(), + engine, + linker, + store, + ledger, + kv, + submit, + }) + } +} diff --git a/balius-runtime/src/loader.rs b/balius-runtime/src/loader.rs deleted file mode 100644 index d70914b..0000000 --- a/balius-runtime/src/loader.rs +++ /dev/null @@ -1,97 +0,0 @@ -use std::{ - collections::HashMap, - path::Path, - sync::{Arc, Mutex}, -}; - -use wasmtime::{component::Component, component::Linker, Engine, Store}; - -use crate::{adapter::Adapter, ledgers, router::Router, wit}; - -struct LoadedWorker { - store: Store, - instance: wit::Worker, -} - -type WorkerMap = HashMap; - -#[derive(Clone)] -pub struct Loader { - engine: Engine, - linker: Linker, - router: Router, - loaded: Arc>, -} - -impl Loader { - pub fn new(router: Router) -> Result { - let mut config = wasmtime::Config::new(); - config.async_support(true); - - let engine = Engine::new(&config).unwrap(); - - let mut linker = Linker::new(&engine); - wit::balius::app::driver::add_to_linker(&mut linker, |state: &mut Adapter| state)?; - wit::balius::app::kv::add_to_linker(&mut linker, |state: &mut Adapter| state)?; - wit::balius::app::submit::add_to_linker(&mut linker, |state: &mut Adapter| state)?; - wit::balius::app::ledger::add_to_linker(&mut linker, |state: &mut Adapter| { - &mut state.ledger - })?; - - Ok(Self { - engine, - loaded: Default::default(), - linker, - router, - }) - } - - pub async fn register_worker( - &mut self, - id: &str, - wasm_path: impl AsRef, - config: serde_json::Value, - ledger: ledgers::Ledger, - ) -> wasmtime::Result<()> { - let component = Component::from_file(&self.engine, wasm_path)?; - - let mut store = Store::new( - &self.engine, - Adapter::new(id.to_owned(), self.router.clone(), ledger), - ); - - let instance = wit::Worker::instantiate_async(&mut store, &component, &self.linker).await?; - - let config = serde_json::to_vec(&config).unwrap(); - instance.call_init(&mut store, &config).await?; - - self.loaded - .lock() - .unwrap() - .insert(id.to_owned(), LoadedWorker { store, instance }); - - Ok(()) - } - - pub async fn dispatch_event( - &self, - worker: &str, - channel: u32, - event: &wit::Event, - ) -> Result { - let mut lock = self.loaded.lock().unwrap(); - - let worker = lock - .get_mut(worker) - .ok_or(super::Error::WorkerNotFound(worker.to_string()))?; - - let result = worker - .instance - .call_handle(&mut worker.store, channel, event) - .await?; - - let response = result.map_err(|err| super::Error::Handle(err.code, err.message))?; - - Ok(response) - } -} diff --git a/balius-runtime/src/submit/mod.rs b/balius-runtime/src/submit/mod.rs new file mode 100644 index 0000000..70bfb8e --- /dev/null +++ b/balius-runtime/src/submit/mod.rs @@ -0,0 +1,15 @@ +use crate::wit::balius::app::submit as wit; + +#[derive(Clone)] +pub enum Submit { + Mock, +} + +#[async_trait::async_trait] +impl wit::Host for Submit { + async fn submit_tx(&mut self, tx: wit::Cbor) -> Result<(), wit::SubmitError> { + println!("{}", hex::encode(tx)); + + Ok(()) + } +} diff --git a/balius-runtime/tests/e2e.rs b/balius-runtime/tests/e2e.rs index 933849c..c78740a 100644 --- a/balius-runtime/tests/e2e.rs +++ b/balius-runtime/tests/e2e.rs @@ -7,7 +7,10 @@ use serde_json::json; async fn faucet_claim() { let store = Store::open("tests/balius.db", None).unwrap(); - let mut runtime = Runtime::new(store, ledgers::mock::Ledger.into()).unwrap(); + let mut runtime = Runtime::builder(store) + .with_ledger(ledgers::mock::Ledger.into()) + .build() + .unwrap(); let config = json!({ "validator": {