From 4fe20963b9e5dcd6fb5de469ec7f2907ea82f284 Mon Sep 17 00:00:00 2001 From: Maico Date: Mon, 30 Oct 2023 15:13:14 -0300 Subject: [PATCH] feat: integrate Pallas (Byron phase-1) validations --- Cargo.lock | 39 +++++++++---- Cargo.toml | 2 +- src/storage/applydb/mod.rs | 110 +++++++++++++++++++++++++++++++++---- src/sync/ledger.rs | 38 +++++++++++-- src/sync/mod.rs | 2 +- 5 files changed, 161 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 798bc85d..26be44af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -313,6 +313,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cbor_event" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "089a0261d1bc59e54e8e11860031efd88593f0e61b921172c474f1f38c2f2d3c" + [[package]] name = "cc" version = "1.0.83" @@ -530,6 +536,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "cryptoxide" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e35f15e1a0699dd988fed910dd78fdc6407f44654cd12589c91fa44ea67d9159" + [[package]] name = "cryptoxide" version = "0.4.4" @@ -1272,7 +1284,7 @@ checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" [[package]] name = "pallas" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ "pallas-addresses", "pallas-applying", @@ -1289,7 +1301,7 @@ dependencies = [ [[package]] name = "pallas-addresses" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ "base58", "bech32 0.9.1", @@ -1304,8 +1316,11 @@ dependencies = [ [[package]] name = "pallas-applying" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ + "cbor_event", + "cryptoxide 0.1.3", + "pallas-addresses", "pallas-codec", "pallas-crypto", "pallas-primitives", @@ -1316,7 +1331,7 @@ dependencies = [ [[package]] name = "pallas-codec" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ "hex", "minicbor", @@ -1327,7 +1342,7 @@ dependencies = [ [[package]] name = "pallas-configs" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ "base64 0.21.4", "hex", @@ -1341,9 +1356,9 @@ dependencies = [ [[package]] name = "pallas-crypto" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ - "cryptoxide", + "cryptoxide 0.4.4", "hex", "pallas-codec", "rand_core", @@ -1354,7 +1369,7 @@ dependencies = [ [[package]] name = "pallas-network" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ "byteorder", "hex", @@ -1369,7 +1384,7 @@ dependencies = [ [[package]] name = "pallas-primitives" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ "base58", "bech32 0.9.1", @@ -1384,7 +1399,7 @@ dependencies = [ [[package]] name = "pallas-rolldb" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ "async-stream", "bincode", @@ -1401,7 +1416,7 @@ dependencies = [ [[package]] name = "pallas-traverse" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ "hex", "pallas-addresses", @@ -1415,7 +1430,7 @@ dependencies = [ [[package]] name = "pallas-utxorpc" version = "0.19.1" -source = "git+https://github.com/txpipe/pallas.git#919529eaa0432af9547aad4f6089755b68cff89d" +source = "git+https://github.com/txpipe/pallas.git?branch=dev/byron-phase-1-validations#a9a1f305bf2335c5a7424e4d3f6a82cd25b87787" dependencies = [ "pallas-codec", "pallas-primitives", diff --git a/Cargo.toml b/Cargo.toml index b8795194..c3fe5532 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ authors = ["Santiago Carmuega "] [dependencies] # pallas = "0.19.0" # pallas = { path = "../pallas/pallas" } -pallas = { git = "https://github.com/txpipe/pallas.git", features = ["unstable"] } +pallas = { git = "https://github.com/txpipe/pallas.git", branch = "dev/byron-phase-1-validations", features = ["unstable"] } gasket = { version = "^0.5", features = ["derive"] } # gasket = { path = "../../construkts/gasket-rs/gasket", features = ["derive"] } diff --git a/src/storage/applydb/mod.rs b/src/storage/applydb/mod.rs index e9fe5644..92a0327a 100644 --- a/src/storage/applydb/mod.rs +++ b/src/storage/applydb/mod.rs @@ -1,6 +1,17 @@ pub mod genesis; -use pallas::{crypto::hash::Hash, ledger::traverse::MultiEraBlock}; +use pallas::{ + applying::{ + types::{Environment, UTxOs}, + validate, + }, + codec::utils::CborWrap, + crypto::hash::Hash, + ledger::{ + primitives::byron::{Tx, TxIn, TxOut}, + traverse::{Era, MultiEraBlock, MultiEraInput, MultiEraOutput, MultiEraTx}, + }, +}; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, @@ -8,7 +19,7 @@ use std::{ sync::Arc, }; use thiserror::Error; -use tracing::{error, info}; +use tracing::{error, info, warn}; use rocksdb::{Options, WriteBatch, DB}; @@ -34,6 +45,9 @@ pub enum Error { #[error("cbor decoding")] Cbor, + + #[error("unimplemented validation for this era")] + UnimplementedEra, } impl From for Error { @@ -77,7 +91,7 @@ pub struct ApplyBatch<'a> { block_hash: BlockHash, utxo_inserts: HashMap, stxi_inserts: HashMap, - utxo_deletes: Vec, + utxo_deletes: HashMap, } impl<'a> ApplyBatch<'a> { @@ -88,7 +102,7 @@ impl<'a> ApplyBatch<'a> { block_hash, utxo_inserts: HashMap::new(), stxi_inserts: HashMap::new(), - utxo_deletes: Vec::new(), + utxo_deletes: HashMap::new(), } } @@ -96,6 +110,22 @@ impl<'a> ApplyBatch<'a> { self.utxo_inserts.contains_key(&UtxoRef(tx, output)) } + // Meant to be used to get the UTxO associated with a transaction input, assuming the current + // block has already been traversed, appropriately filling utxo_inserts and utxo_deletes. + pub fn get_same_block_utxo(&self, tx_hash: TxHash, ind: OutputIndex) -> Option { + // utxo_inserts contains the UTxOs produced in the current block which haven't been spent. + self.utxo_inserts + .get(&UtxoRef(tx_hash, ind)) + // utxo_deletes contains UTxOs previously stored in the DB, which we don't care + // about, and UTxOs produced (and spent) by transactions in the current block, + // which we care about. + .or(self + .utxo_deletes + .get(&UtxoRef(tx_hash, ind)) + ) + .map(Vec::::clone) + } + pub fn insert_utxo(&mut self, tx: TxHash, output: OutputIndex, body: UtxoBody) { self.utxo_inserts.insert(UtxoRef(tx, output), body); } @@ -105,8 +135,8 @@ impl<'a> ApplyBatch<'a> { let k = UtxoRef(tx, idx); - self.stxi_inserts.insert(k.clone(), body); - self.utxo_deletes.push(k); + self.stxi_inserts.insert(k.clone(), body.clone()); + self.utxo_deletes.insert(k.clone(), body); } pub fn spend_utxo_same_block(&mut self, tx: TxHash, idx: OutputIndex) { @@ -116,8 +146,8 @@ impl<'a> ApplyBatch<'a> { let body = self.utxo_inserts.remove(&k).unwrap(); - self.stxi_inserts.insert(k.clone(), body); - self.utxo_deletes.push(k) + self.stxi_inserts.insert(k.clone(), body.clone()); + self.utxo_deletes.insert(k.clone(), body); } } @@ -129,7 +159,7 @@ impl<'a> From> for WriteBatch { UtxoKV::stage_upsert(from.db, DBSerde(key), DBBytes(value), &mut batch); } - for key in from.utxo_deletes { + for (key, _) in from.utxo_deletes { UtxoKV::stage_delete(from.db, DBSerde(key), &mut batch); } @@ -272,7 +302,7 @@ impl ApplyDB { Ok(dbval.map(|x| x.0)) } - pub fn apply_block(&mut self, cbor: &[u8]) -> Result<(), Error> { + pub fn apply_block(&mut self, cbor: &[u8], env: Option<&Environment>) -> Result<(), Error> { let block = MultiEraBlock::decode(cbor).map_err(|_| Error::Cbor)?; let slot = block.slot(); let hash = block.hash(); @@ -305,6 +335,21 @@ impl ApplyDB { } } + for tx in txs.iter() { + if let (MultiEraTx::Byron(_), Some(e)) = (&tx, env) { + match self.get_inputs(tx, &batch) { + Ok(inputs) => { + let utxos: UTxOs = Self::mk_utxo(&inputs); + match validate(tx, &utxos, e) { + Ok(()) => (), + Err(err) => warn!("Transaction validation failed ({:?})", err), + } + } + Err(err) => warn!("Skipping validation ({:?})", err), + } + } + } + let batch = WriteBatch::from(batch); self.db @@ -314,6 +359,49 @@ impl ApplyDB { Ok(()) } + fn get_inputs( + &self, + metx: &MultiEraTx, + batch: &ApplyBatch, + ) -> Result, Error> { + let mut res: Vec<(TxIn, TxOut)> = Vec::new(); + if let MultiEraTx::Byron(mtxp) = &metx { + let tx: &Tx = &mtxp.transaction; + let inputs: &Vec = &tx.inputs; + for input in inputs { + if let TxIn::Variant0(CborWrap((tx_hash, output_index))) = &input { + let hash: TxHash = *tx_hash; + let idx: OutputIndex = *output_index as u64; + // The input UTxO may be in the database or in the same block. + let utxo: UtxoBody = self.get_utxo(hash, idx)?.map_or( + batch + .get_same_block_utxo(hash, idx) + .ok_or(Error::MissingUtxo(hash, idx)), + Ok, + )?; + match MultiEraOutput::decode(Era::Byron, &utxo) { + Ok(tx_out) => res.push(( + TxIn::Variant0(CborWrap((hash, idx as u32))), + tx_out.as_byron().ok_or(Error::UnimplementedEra)?.clone(), + )), + Err(_) => unreachable!(), + } + } + } + } + Ok(res) + } + + fn mk_utxo<'a>(entries: &'a [(TxIn, TxOut)]) -> UTxOs<'a> { + let mut utxos: UTxOs<'a> = UTxOs::new(); + for (input, output) in entries.iter() { + let multi_era_input: MultiEraInput = MultiEraInput::from_byron(input); + let multi_era_output: MultiEraOutput = MultiEraOutput::from_byron(output); + utxos.insert(multi_era_input, multi_era_output); + } + utxos + } + pub fn undo_block(&mut self, cbor: &[u8]) -> Result<(), Error> { let block = MultiEraBlock::decode(cbor).map_err(|_| Error::Cbor)?; let slot = block.slot(); @@ -437,7 +525,7 @@ mod tests { } } - db.apply_block(&cbor).unwrap(); + db.apply_block(&cbor, None).unwrap(); for tx in block.txs() { for input in tx.consumes() { diff --git a/src/sync/ledger.rs b/src/sync/ledger.rs index fa21d72a..b410e9a3 100644 --- a/src/sync/ledger.rs +++ b/src/sync/ledger.rs @@ -1,5 +1,8 @@ use gasket::framework::*; -use pallas::ledger::configs::byron::GenesisFile; +use pallas::{ + applying::types::{ByronProtParams, Environment, MultiEraProtParams}, + ledger::configs::byron::GenesisFile, +}; use tracing::info; use crate::prelude::*; @@ -12,6 +15,7 @@ pub type UpstreamPort = gasket::messaging::tokio::InputPort; pub struct Stage { ledger: ApplyDB, genesis: GenesisFile, + environment: Environment, pub upstream: UpstreamPort, @@ -23,10 +27,33 @@ pub struct Stage { } impl Stage { - pub fn new(ledger: ApplyDB, genesis: GenesisFile) -> Self { + pub fn new(ledger: ApplyDB, genesis: GenesisFile, prot_magic: u64) -> Self { + let env: Environment = Environment { + prot_params: MultiEraProtParams::Byron(ByronProtParams { + min_fees_const: genesis + .block_version_data + .tx_fee_policy + .summand + .parse::() + .unwrap_or_else(|err| panic!("{:?}", err)), + min_fees_factor: genesis + .block_version_data + .tx_fee_policy + .multiplier + .parse::() + .unwrap_or_else(|err| panic!("{:?}", err)), + max_tx_size: genesis + .block_version_data + .max_tx_size + .parse::() + .unwrap_or_else(|err| panic!("{:?}", err)), + }), + prot_magic: prot_magic as u32, + }; Self { ledger, genesis, + environment: env, upstream: Default::default(), // downstream: Default::default(), block_count: Default::default(), @@ -35,8 +62,6 @@ impl Stage { } } -impl Stage {} - pub struct Worker; #[async_trait::async_trait(?Send)] @@ -58,7 +83,10 @@ impl gasket::framework::Worker for Worker { match unit { RollEvent::Apply(slot, _, cbor) => { info!(slot, "applying block"); - stage.ledger.apply_block(cbor).or_panic()?; + stage + .ledger + .apply_block(cbor, Some(&stage.environment)) + .or_panic()?; } RollEvent::Undo(slot, _, cbor) => { info!(slot, "undoing block"); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 255f0518..42c11a94 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -70,7 +70,7 @@ pub fn pipeline( let mut roll = roll::Stage::new(wal, cursor_chain, cursor_ledger); let mut chain = chain::Stage::new(chain); - let mut ledger = ledger::Stage::new(ledger, genesis); + let mut ledger = ledger::Stage::new(ledger, genesis, config.network_magic); let (to_roll, from_pull) = gasket::messaging::tokio::mpsc_channel(50); pull.downstream.connect(to_roll);