From 79c35aa38c02ffc2d24d46e93dd8b737c3a1a424 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sat, 5 Oct 2024 16:54:16 -0300 Subject: [PATCH] refactor: support async host and polymorphic ledger (#28) --- Cargo.lock | 81 ++++++++++++++++++++++++++++-- balius-runtime/Cargo.toml | 5 ++ balius-runtime/src/adapter.rs | 60 +++++++--------------- balius-runtime/src/ledgers/mock.rs | 39 ++++++++++++++ balius-runtime/src/ledgers/mod.rs | 54 ++++++++++++++++++++ balius-runtime/src/ledgers/u5c.rs | 51 +++++++++++++++++++ balius-runtime/src/lib.rs | 40 +++++++++++---- balius-runtime/src/loader.rs | 26 ++++++---- balius-runtime/tests/e2e.rs | 64 +++++++++++------------ 9 files changed, 324 insertions(+), 96 deletions(-) create mode 100644 balius-runtime/src/ledgers/mock.rs create mode 100644 balius-runtime/src/ledgers/mod.rs create mode 100644 balius-runtime/src/ledgers/u5c.rs diff --git a/Cargo.lock b/Cargo.lock index cee477a..d2f6b7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -248,6 +248,7 @@ dependencies = [ name = "balius-runtime" version = "0.1.0" dependencies = [ + "async-trait", "flate2", "hex", "itertools 0.13.0", @@ -258,7 +259,9 @@ dependencies = [ "serde_json", "tar", "thiserror", + "tokio", "tracing", + "utxorpc", "warp", "wasmtime", ] @@ -1296,10 +1299,10 @@ dependencies = [ "http 1.1.0", "hyper 1.4.1", "hyper-util", - "rustls", + "rustls 0.23.13", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.0", "tower-service", ] @@ -2116,7 +2119,7 @@ dependencies = [ "pallas-primitives 0.30.2 (registry+https://github.com/rust-lang/crates.io-index)", "pallas-traverse 0.30.2 (registry+https://github.com/rust-lang/crates.io-index)", "prost-types 0.13.3", - "utxorpc-spec", + "utxorpc-spec 0.10.0", ] [[package]] @@ -2594,6 +2597,20 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls" version = "0.23.13" @@ -2607,6 +2624,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -3099,13 +3129,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls", + "rustls 0.23.13", "rustls-pki-types", "tokio", ] @@ -3199,7 +3240,11 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.12.6", + "rustls-native-certs", + "rustls-pemfile", + "rustls-pki-types", "tokio", + "tokio-rustls 0.25.0", "tokio-stream", "tower", "tower-layer", @@ -3373,6 +3418,34 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "utxorpc" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd7fd347aa5c43c481e71a37bde134a3e4a0b93ae631b9a80b0eb95f01f10d9c" +dependencies = [ + "bytes", + "thiserror", + "tokio", + "tonic", + "utxorpc-spec 0.9.0", +] + +[[package]] +name = "utxorpc-spec" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66ae3390f4f25d7244d82207573bf7e8ce935f2a13d2c5d708fb27dde9840042" +dependencies = [ + "bytes", + "futures-core", + "pbjson", + "pbjson-types", + "prost 0.12.6", + "serde", + "tonic", +] + [[package]] name = "utxorpc-spec" version = "0.10.0" diff --git a/balius-runtime/Cargo.toml b/balius-runtime/Cargo.toml index 9bfdc94..2226a1f 100644 --- a/balius-runtime/Cargo.toml +++ b/balius-runtime/Cargo.toml @@ -17,3 +17,8 @@ redb = "2.1.3" tracing = "0.1.40" hex = "0.4.3" itertools = "0.13.0" +utxorpc = { version = "0.6.0", optional = true } +async-trait = "0.1.83" + +[dev-dependencies] +tokio = "1.40.0" diff --git a/balius-runtime/src/adapter.rs b/balius-runtime/src/adapter.rs index 6c8a3b8..d59db8f 100644 --- a/balius-runtime/src/adapter.rs +++ b/balius-runtime/src/adapter.rs @@ -1,80 +1,56 @@ +use std::sync::Arc; + use crate::{ + ledgers, router::Router, wit::balius::app::{driver, kv, ledger, submit}, }; -#[derive(Clone)] pub struct Adapter { worker_id: String, router: Router, + pub ledger: ledgers::Ledger, } impl Adapter { - pub fn new(worker_id: String, router: Router) -> Self { - Self { worker_id, router } + pub fn new(worker_id: String, router: Router, ledger: ledgers::Ledger) -> Self { + Self { + worker_id, + router, + ledger, + } } } +#[async_trait::async_trait] impl kv::Host for Adapter { - fn get_value(&mut self, key: String) -> Result { + async fn get_value(&mut self, key: String) -> Result { todo!() } - fn set_value(&mut self, key: String, value: kv::Payload) -> Result<(), kv::KvError> { + async fn set_value(&mut self, key: String, value: kv::Payload) -> Result<(), kv::KvError> { println!("{}:{}", key, hex::encode(value)); Ok(()) } - fn list_values(&mut self, prefix: String) -> Result, kv::KvError> { + async fn list_values(&mut self, prefix: String) -> Result, kv::KvError> { todo!() } } +#[async_trait::async_trait] impl submit::Host for Adapter { - fn submit_tx(&mut self, tx: submit::Cbor) -> Result<(), submit::SubmitError> { + async fn submit_tx(&mut self, tx: submit::Cbor) -> Result<(), submit::SubmitError> { println!("{}", hex::encode(tx)); Ok(()) } } +#[async_trait::async_trait] impl driver::Host for Adapter { - fn register_channel(&mut self, id: u32, pattern: driver::EventPattern) -> () { + async fn register_channel(&mut self, id: u32, pattern: driver::EventPattern) -> () { self.router.register_channel(&self.worker_id, id, &pattern); } } - -impl ledger::Host for Adapter { - fn read_utxos( - &mut self, - refs: Vec, - ) -> Result, ledger::LedgerError> { - let output = pallas::ledger::primitives::babbage::MintedTransactionOutput::PostAlonzo(pallas::ledger::primitives::babbage::MintedPostAlonzoTransactionOutput { - address: pallas::ledger::addresses::Address::from_bech32("addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x").unwrap().to_vec().into(), - value: pallas::ledger::primitives::babbage::Value::Coin(5_000_000), - datum_option: None, - script_ref: None, - }); - - let cbor = pallas::codec::minicbor::to_vec(&output).unwrap(); - - Ok(vec![ledger::Utxo { - ref_: ledger::TxoRef { - tx_hash: hex::decode( - "f7d3837715680f3a170e99cd202b726842d97f82c05af8fcd18053c64e33ec4f", - ) - .unwrap(), - tx_index: 0, - }, - body: cbor, - }]) - } - - fn search_utxos( - &mut self, - pattern: ledger::UtxoPattern, - ) -> Result, ledger::LedgerError> { - todo!() - } -} diff --git a/balius-runtime/src/ledgers/mock.rs b/balius-runtime/src/ledgers/mock.rs new file mode 100644 index 0000000..ddc710b --- /dev/null +++ b/balius-runtime/src/ledgers/mock.rs @@ -0,0 +1,39 @@ +use crate::wit::balius::app::ledger as wit; + +#[derive(Clone)] +pub struct Ledger; + +#[async_trait::async_trait] +impl wit::Host for Ledger { + async fn read_utxos( + &mut self, + _refs: Vec, + ) -> Result, wit::LedgerError> { + let output = pallas::ledger::primitives::babbage::MintedTransactionOutput::PostAlonzo(pallas::ledger::primitives::babbage::MintedPostAlonzoTransactionOutput { + address: pallas::ledger::addresses::Address::from_bech32("addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x").unwrap().to_vec().into(), + value: pallas::ledger::primitives::babbage::Value::Coin(5_000_000), + datum_option: None, + script_ref: None, + }); + + let cbor = pallas::codec::minicbor::to_vec(&output).unwrap(); + + Ok(vec![wit::Utxo { + ref_: wit::TxoRef { + tx_hash: hex::decode( + "f7d3837715680f3a170e99cd202b726842d97f82c05af8fcd18053c64e33ec4f", + ) + .unwrap(), + tx_index: 0, + }, + body: cbor, + }]) + } + + async fn search_utxos( + &mut self, + _pattern: wit::UtxoPattern, + ) -> Result, wit::LedgerError> { + todo!() + } +} diff --git a/balius-runtime/src/ledgers/mod.rs b/balius-runtime/src/ledgers/mod.rs new file mode 100644 index 0000000..93f1a88 --- /dev/null +++ b/balius-runtime/src/ledgers/mod.rs @@ -0,0 +1,54 @@ +use crate::wit::balius::app::ledger as wit; + +pub mod mock; + +#[cfg(feature = "utxorpc")] +pub mod u5c; + +#[derive(Clone)] +pub enum Ledger { + Mock(mock::Ledger), + + #[cfg(feature = "utxorpc")] + U5C(u5c::Ledger), +} + +impl From for Ledger { + fn from(ledger: mock::Ledger) -> Self { + Ledger::Mock(ledger) + } +} + +#[cfg(feature = "utxorpc")] +impl From for Ledger { + fn from(ledger: u5c::Ledger) -> Self { + Ledger::U5C(ledger) + } +} + +#[async_trait::async_trait] +impl wit::Host for Ledger { + async fn read_utxos( + &mut self, + refs: Vec, + ) -> Result, wit::LedgerError> { + match self { + Ledger::Mock(ledger) => ledger.read_utxos(refs).await, + + #[cfg(feature = "utxorpc")] + Ledger::U5C(ledger) => ledger.read_utxos(refs).await, + } + } + + async fn search_utxos( + &mut self, + pattern: wit::UtxoPattern, + ) -> Result, wit::LedgerError> { + match self { + Ledger::Mock(ledger) => ledger.search_utxos(pattern).await, + + #[cfg(feature = "utxorpc")] + Ledger::U5C(ledger) => ledger.search_utxos(pattern).await, + } + } +} diff --git a/balius-runtime/src/ledgers/u5c.rs b/balius-runtime/src/ledgers/u5c.rs new file mode 100644 index 0000000..fd031e2 --- /dev/null +++ b/balius-runtime/src/ledgers/u5c.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; +use utxorpc::CardanoQueryClient; + +use crate::wit::balius::app::ledger as wit; + +impl From for crate::Error { + fn from(error: utxorpc::Error) -> Self { + crate::Error::Ledger(error.to_string()) + } +} + +pub struct Config { + endpoint_url: String, + api_key: String, +} + +#[derive(Clone)] +pub struct Ledger { + queries: Arc, +} + +impl Ledger { + pub async fn new(config: Config) -> Result { + let queries = utxorpc::ClientBuilder::new() + .uri(&config.endpoint_url)? + .metadata("dmtr-api-key", config.api_key)? + .build::() + .await; + + Ok(Self { + queries: Arc::new(queries), + }) + } +} + +#[async_trait::async_trait] +impl crate::wit::balius::app::ledger::Host for Ledger { + async fn read_utxos( + &mut self, + refs: Vec, + ) -> Result, wit::LedgerError> { + todo!() + } + + async fn search_utxos( + &mut self, + pattern: wit::UtxoPattern, + ) -> Result, wit::LedgerError> { + todo!() + } +} diff --git a/balius-runtime/src/lib.rs b/balius-runtime/src/lib.rs index b312e1e..8678f1b 100644 --- a/balius-runtime/src/lib.rs +++ b/balius-runtime/src/lib.rs @@ -4,7 +4,11 @@ use std::{collections::HashSet, path::Path}; use thiserror::Error; mod wit { - wasmtime::component::bindgen!(in "../wit"); + wasmtime::component::bindgen!({ + path:"../wit", + async: true, + tracing: true, + }); } mod adapter; @@ -12,6 +16,9 @@ mod loader; mod router; mod store; +// implementations +pub mod ledgers; + pub use store::Store; pub type WorkerId = String; @@ -38,6 +45,9 @@ pub enum Error { #[error("address in block failed to parse")] BadAddress(pallas::ledger::addresses::Error), + + #[error("ledger error: {0}")] + Ledger(String), } impl From for Error { @@ -95,16 +105,18 @@ pub struct Runtime { loader: loader::Loader, router: router::Router, store: store::Store, + ledger: ledgers::Ledger, } impl Runtime { - pub fn new(store: store::Store) -> Result { + pub fn new(store: store::Store, ledger: ledgers::Ledger) -> Result { let router = router::Router::new(); Ok(Self { loader: loader::Loader::new(router.clone())?, router, store, + ledger, }) } @@ -114,18 +126,20 @@ impl Runtime { Ok(cursor) } - pub fn register_worker( + pub async fn register_worker( &mut self, id: &str, wasm_path: impl AsRef, config: serde_json::Value, ) -> Result<(), Error> { - self.loader.register_worker(id, wasm_path, config)?; + self.loader + .register_worker(id, wasm_path, config, self.ledger.clone()) + .await?; Ok(()) } - fn fire_and_forget( + async fn fire_and_forget( &self, event: &wit::Event, targets: HashSet, @@ -133,7 +147,8 @@ impl Runtime { for target in targets { let result = self .loader - .dispatch_event(&target.worker, target.channel, event); + .dispatch_event(&target.worker, target.channel, event) + .await; match result { Ok(wit::Response::Acknowledge) => { @@ -152,13 +167,17 @@ impl Runtime { Ok(()) } - pub fn apply_block(&self, block: &MultiEraBlock, wal_seq: LogSeq) -> Result<(), Error> { + pub async fn apply_block( + &self, + block: &MultiEraBlock<'_>, + wal_seq: LogSeq, + ) -> Result<(), Error> { for tx in block.txs() { for utxo in tx.outputs() { let targets = self.router.find_utxo_targets(&utxo)?; let event = wit::Event::Utxo(utxo.encode()); - self.fire_and_forget(&event, targets)?; + self.fire_and_forget(&event, targets).await?; } } @@ -169,7 +188,7 @@ impl Runtime { Ok(()) } - pub fn handle_request( + pub async fn handle_request( &self, worker: &str, method: &str, @@ -181,7 +200,8 @@ impl Runtime { let reply = self .loader - .dispatch_event(&target.worker, target.channel, &evt)?; + .dispatch_event(&target.worker, target.channel, &evt) + .await?; let json = match reply { wit::Response::Acknowledge => json!({}), diff --git a/balius-runtime/src/loader.rs b/balius-runtime/src/loader.rs index 265b9d7..d70914b 100644 --- a/balius-runtime/src/loader.rs +++ b/balius-runtime/src/loader.rs @@ -6,7 +6,7 @@ use std::{ use wasmtime::{component::Component, component::Linker, Engine, Store}; -use crate::{adapter::Adapter, router::Router, wit}; +use crate::{adapter::Adapter, ledgers, router::Router, wit}; struct LoadedWorker { store: Store, @@ -25,13 +25,18 @@ pub struct Loader { impl Loader { pub fn new(router: Router) -> Result { - let engine = Default::default(); + let mut config = wasmtime::Config::new(); + config.async_support(true); + + let engine = Engine::new(&config).unwrap(); let mut linker = Linker::new(&engine); wit::balius::app::driver::add_to_linker(&mut linker, |state: &mut Adapter| state)?; wit::balius::app::kv::add_to_linker(&mut linker, |state: &mut Adapter| state)?; wit::balius::app::submit::add_to_linker(&mut linker, |state: &mut Adapter| state)?; - wit::balius::app::ledger::add_to_linker(&mut linker, |state: &mut Adapter| state)?; + wit::balius::app::ledger::add_to_linker(&mut linker, |state: &mut Adapter| { + &mut state.ledger + })?; Ok(Self { engine, @@ -41,22 +46,24 @@ impl Loader { }) } - pub fn register_worker( + pub async fn register_worker( &mut self, id: &str, wasm_path: impl AsRef, config: serde_json::Value, + ledger: ledgers::Ledger, ) -> wasmtime::Result<()> { let component = Component::from_file(&self.engine, wasm_path)?; let mut store = Store::new( &self.engine, - Adapter::new(id.to_owned(), self.router.clone()), + Adapter::new(id.to_owned(), self.router.clone(), ledger), ); - let instance = wit::Worker::instantiate(&mut store, &component, &self.linker)?; + let instance = wit::Worker::instantiate_async(&mut store, &component, &self.linker).await?; + let config = serde_json::to_vec(&config).unwrap(); - instance.call_init(&mut store, &config)?; + instance.call_init(&mut store, &config).await?; self.loaded .lock() @@ -66,7 +73,7 @@ impl Loader { Ok(()) } - pub fn dispatch_event( + pub async fn dispatch_event( &self, worker: &str, channel: u32, @@ -80,7 +87,8 @@ impl Loader { let result = worker .instance - .call_handle(&mut worker.store, channel, event)?; + .call_handle(&mut worker.store, channel, event) + .await?; let response = result.map_err(|err| super::Error::Handle(err.code, err.message))?; diff --git a/balius-runtime/tests/e2e.rs b/balius-runtime/tests/e2e.rs index dafaab4..933849c 100644 --- a/balius-runtime/tests/e2e.rs +++ b/balius-runtime/tests/e2e.rs @@ -1,46 +1,48 @@ #![cfg(test)] -use balius_runtime::{Runtime, Store}; +use balius_runtime::{ledgers, Runtime, Store}; use serde_json::json; -#[test] -fn faucet_claim() { +#[tokio::test] +async fn faucet_claim() { let store = Store::open("tests/balius.db", None).unwrap(); - let mut runtime = Runtime::new(store).unwrap(); + let mut runtime = Runtime::new(store, ledgers::mock::Ledger.into()).unwrap(); + + let config = json!({ + "validator": { + "ref_txo": { + "transaction_id": "f7d3837715680f3a170e99cd202b726842d97f82c05af8fcd18053c64e33ec4f", + "index": 0 + }, + "hash": "ef7a1cebb2dc7de884ddf82f8fcbc91fe9750dcd8c12ec7643a99bbe", + "address": "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x" + } + }); runtime - .register_worker("faucet", "tests/faucet.wasm", json!({ - "validator": { - "ref_txo": { - "transaction_id": "f7d3837715680f3a170e99cd202b726842d97f82c05af8fcd18053c64e33ec4f", - "index": 0 - }, - "hash": "ef7a1cebb2dc7de884ddf82f8fcbc91fe9750dcd8c12ec7643a99bbe", - "address": "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x" - } - } - )) + .register_worker("faucet", "tests/faucet.wasm", config) + .await .unwrap(); - let res = runtime - .handle_request("faucet", "claim", json!({ - "token": "54455354", - "quantity": 1, - "recipient": "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x", - "fuel": { - "Refs": [ - { - "hash": "f7d3837715680f3a170e99cd202b726842d97f82c05af8fcd18053c64e33ec4f", - "index": 0 - } - ] - } + let req = json!({ + "token": "54455354", + "quantity": 1, + "recipient": "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x", + "fuel": { + "Refs": [ + { + "hash": "f7d3837715680f3a170e99cd202b726842d97f82c05af8fcd18053c64e33ec4f", + "index": 0 } - )) + ] + } + }); + + let res = runtime + .handle_request("faucet", "claim", req) + .await .unwrap(); println!("{:?}", res); - - panic!() }