Skip to content

Commit

Permalink
refactor: organize module layout (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Oct 6, 2024
1 parent 79c35aa commit d8f3eeb
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 174 deletions.
56 changes: 0 additions & 56 deletions balius-runtime/src/adapter.rs

This file was deleted.

23 changes: 23 additions & 0 deletions balius-runtime/src/kv/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use crate::wit::balius::app::kv as wit;

#[derive(Clone)]
pub enum Kv {
Mock,
}

#[async_trait::async_trait]
impl wit::Host for Kv {
async fn get_value(&mut self, key: String) -> Result<wit::Payload, wit::KvError> {
todo!()
}

async fn set_value(&mut self, key: String, value: wit::Payload) -> Result<(), wit::KvError> {
println!("{}:{}", key, hex::encode(value));

Ok(())
}

async fn list_values(&mut self, prefix: String) -> Result<Vec<String>, wit::KvError> {
todo!()
}
}
192 changes: 172 additions & 20 deletions balius-runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use pallas::ledger::traverse::MultiEraBlock;
use serde_json::json;
use std::{collections::HashSet, path::Path};
use std::{
collections::{HashMap, HashSet},
path::Path,
sync::{Arc, Mutex},
};
use thiserror::Error;

mod wit {
Expand All @@ -11,13 +15,13 @@ mod wit {
});
}

mod adapter;
mod loader;
mod router;
mod store;

// implementations
pub mod kv;
pub mod ledgers;
pub mod submit;

pub use store::Store;

Expand Down Expand Up @@ -100,24 +104,48 @@ pub struct ChainPoint(pub BlockSlot, pub BlockHash);

pub type LogSeq = u64;

struct WorkerState {
pub worker_id: String,
pub router: router::Router,
pub ledger: Option<ledgers::Ledger>,
pub kv: Option<kv::Kv>,
pub submit: Option<submit::Submit>,
}

#[async_trait::async_trait]
impl wit::balius::app::driver::Host for WorkerState {
async fn register_channel(
&mut self,
id: u32,
pattern: wit::balius::app::driver::EventPattern,
) -> () {
self.router.register_channel(&self.worker_id, id, &pattern);
}
}

struct LoadedWorker {
store: wasmtime::Store<WorkerState>,
instance: wit::Worker,
}

type WorkerMap = HashMap<String, LoadedWorker>;

#[derive(Clone)]
pub struct Runtime {
loader: loader::Loader,
engine: wasmtime::Engine,
linker: wasmtime::component::Linker<WorkerState>,
loaded: Arc<Mutex<WorkerMap>>,

router: router::Router,
store: store::Store,
ledger: ledgers::Ledger,
ledger: Option<ledgers::Ledger>,
kv: Option<kv::Kv>,
submit: Option<submit::Submit>,
}

impl Runtime {
pub fn new(store: store::Store, ledger: ledgers::Ledger) -> Result<Self, Error> {
let router = router::Router::new();

Ok(Self {
loader: loader::Loader::new(router.clone())?,
router,
store,
ledger,
})
pub fn builder(store: store::Store) -> RuntimeBuilder {
RuntimeBuilder::new(store)
}

pub fn cursor(&self) -> Result<Option<LogSeq>, Error> {
Expand All @@ -131,22 +159,62 @@ impl Runtime {
id: &str,
wasm_path: impl AsRef<Path>,
config: serde_json::Value,
) -> Result<(), Error> {
self.loader
.register_worker(id, wasm_path, config, self.ledger.clone())
.await?;
) -> wasmtime::Result<()> {
let component = wasmtime::component::Component::from_file(&self.engine, wasm_path)?;

let mut store = wasmtime::Store::new(
&self.engine,
WorkerState {
worker_id: id.to_owned(),
router: self.router.clone(),
ledger: self.ledger.clone(),
kv: self.kv.clone(),
submit: self.submit.clone(),
},
);

let instance = wit::Worker::instantiate_async(&mut store, &component, &self.linker).await?;

let config = serde_json::to_vec(&config).unwrap();
instance.call_init(&mut store, &config).await?;

self.loaded
.lock()
.unwrap()
.insert(id.to_owned(), LoadedWorker { store, instance });

Ok(())
}

pub async fn dispatch_event(
&self,
worker: &str,
channel: u32,
event: &wit::Event,
) -> Result<wit::Response, Error> {
let mut lock = self.loaded.lock().unwrap();

let worker = lock
.get_mut(worker)
.ok_or(Error::WorkerNotFound(worker.to_string()))?;

let result = worker
.instance
.call_handle(&mut worker.store, channel, event)
.await?;

let response = result.map_err(|err| Error::Handle(err.code, err.message))?;

Ok(response)
}

async fn fire_and_forget(
&self,
event: &wit::Event,
targets: HashSet<router::Target>,
) -> Result<(), Error> {
for target in targets {
let result = self
.loader
.dispatch_event(&target.worker, target.channel, event)
.await;

Expand Down Expand Up @@ -199,7 +267,6 @@ impl Runtime {
let evt = wit::Event::Request(serde_json::to_vec(&params).unwrap());

let reply = self
.loader
.dispatch_event(&target.worker, target.channel, &evt)
.await?;

Expand All @@ -213,3 +280,88 @@ impl Runtime {
Ok(json)
}
}

pub struct RuntimeBuilder {
store: store::Store,
engine: wasmtime::Engine,
linker: wasmtime::component::Linker<WorkerState>,
ledger: Option<ledgers::Ledger>,
kv: Option<kv::Kv>,
submit: Option<submit::Submit>,
}

impl RuntimeBuilder {
pub fn new(store: store::Store) -> Self {
let mut config = wasmtime::Config::new();
config.async_support(true);
let engine = wasmtime::Engine::new(&config).unwrap();
let mut linker = wasmtime::component::Linker::new(&engine);

wit::balius::app::driver::add_to_linker(&mut linker, |state: &mut WorkerState| state)
.unwrap();

Self {
store,
engine,
linker,
ledger: None,
kv: None,
submit: None,
}
}

pub fn with_ledger(mut self, ledger: ledgers::Ledger) -> Self {
self.ledger = Some(ledger);

wit::balius::app::ledger::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
state.ledger.as_mut().unwrap()
})
.unwrap();

self
}

pub fn with_kv(mut self, kv: kv::Kv) -> Self {
self.kv = Some(kv);

wit::balius::app::kv::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
state.kv.as_mut().unwrap()
})
.unwrap();

self
}

pub fn with_submit(mut self, submit: submit::Submit) -> Self {
self.submit = Some(submit);

wit::balius::app::submit::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
state.submit.as_mut().unwrap()
})
.unwrap();

self
}

pub fn build(self) -> Result<Runtime, Error> {
let RuntimeBuilder {
store,
engine,
linker,
ledger,
kv,
submit,
} = self;

Ok(Runtime {
loaded: Default::default(),
router: router::Router::new(),
engine,
linker,
store,
ledger,
kv,
submit,
})
}
}
Loading

0 comments on commit d8f3eeb

Please sign in to comment.