From 9bd796fd966fa785210c2204b13f3e12701e3d81 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sat, 13 Jul 2024 13:05:07 -0300 Subject: [PATCH] refactor: support side-by-side state schemas --- src/state/mod.rs | 6 + src/state/redb/mod.rs | 75 ++++++++-- src/state/redb/tables.rs | 266 +++++++++++++++++++++++++++++++++++ src/state/redb/v1.rs | 297 +++------------------------------------ 4 files changed, 353 insertions(+), 291 deletions(-) create mode 100644 src/state/redb/tables.rs diff --git a/src/state/mod.rs b/src/state/mod.rs index 4428f52b..3da486b2 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -62,6 +62,12 @@ impl LedgerStore { } } +impl From for LedgerStore { + fn from(value: redb::LedgerStore) -> Self { + Self::Redb(value) + } +} + impl interop::LedgerContext for LedgerStore { fn get_utxos<'a>(&self, refs: &[interop::TxoRef]) -> Option { let refs: Vec<_> = refs.iter().map(|x| TxoRef::from(*x)).collect(); diff --git a/src/state/redb/mod.rs b/src/state/redb/mod.rs index 86de449f..c943a4e9 100644 --- a/src/state/redb/mod.rs +++ b/src/state/redb/mod.rs @@ -1,13 +1,58 @@ +mod tables; mod v1; use ::redb::Database; -use std::{collections::HashSet, path::Path, sync::Arc}; +use itertools::Itertools as _; +use redb::TableHandle as _; +use std::{ + collections::HashSet, + hash::{Hash as _, Hasher as _}, + path::Path, +}; use tracing::warn; use crate::ledger::*; const DEFAULT_CACHE_SIZE_MB: usize = 500; +fn compute_schema_hash(db: &Database) -> Result, LedgerError> { + let mut hasher = std::hash::DefaultHasher::new(); + + let mut names = db + .begin_read() + .map_err(|e| LedgerError::StorageError(e.into()))? + .list_tables() + .map_err(|e| LedgerError::StorageError(e.into()))? + .map(|t| t.name().to_owned()) + .collect_vec(); + + if names.is_empty() { + // this db hasn't been initialized, we can't compute hash + return Ok(None); + } + + // sort to make sure we don't depend on some redb implementation regarding order + // of the tables. + names.sort(); + + names.into_iter().for_each(|n| n.hash(&mut hasher)); + + let hash = hasher.finish(); + + Ok(Some(hash)) +} + +fn open_db(path: impl AsRef, cache_size: Option) -> Result { + let db = Database::builder() + .set_repair_callback(|x| warn!(progress = x.progress() * 100f64, "ledger db is repairing")) + .set_cache_size(1024 * 1024 * cache_size.unwrap_or(DEFAULT_CACHE_SIZE_MB)) + //.create_with_backend(redb::backends::InMemoryBackend::new())?; + .create(path) + .map_err(|x| LedgerError::StorageError(x.into()))?; + + Ok(db) +} + #[derive(Clone)] pub enum LedgerStore { SchemaV1(v1::LedgerStore), @@ -15,16 +60,18 @@ pub enum LedgerStore { impl LedgerStore { pub fn open(path: impl AsRef, cache_size: Option) -> Result { - let inner = Database::builder() - .set_repair_callback(|x| { - warn!(progress = x.progress() * 100f64, "ledger db is repairing") - }) - .set_cache_size(1024 * 1024 * cache_size.unwrap_or(DEFAULT_CACHE_SIZE_MB)) - //.create_with_backend(redb::backends::InMemoryBackend::new())?; - .create(path) - .map_err(|x| LedgerError::StorageError(x.into()))?; - - Ok(Self::SchemaV1(v1::LedgerStore(Arc::new(inner)))) + let db = open_db(path, cache_size)?; + let hash = compute_schema_hash(&db)?; + + let schema = match hash { + // use latest schema if no hash + None => v1::LedgerStore::from(db).into(), + // v1 hash + Some(13844724490616556453) => v1::LedgerStore::from(db).into(), + Some(x) => panic!("can't recognize db hash {}", x), + }; + + Ok(schema) } pub fn cursor(&self) -> Result, LedgerError> { @@ -70,8 +117,8 @@ impl LedgerStore { } } -impl From for super::LedgerStore { - fn from(value: LedgerStore) -> Self { - super::LedgerStore::Redb(value) +impl From for LedgerStore { + fn from(value: v1::LedgerStore) -> Self { + Self::SchemaV1(value) } } diff --git a/src/state/redb/tables.rs b/src/state/redb/tables.rs new file mode 100644 index 00000000..d48953f8 --- /dev/null +++ b/src/state/redb/tables.rs @@ -0,0 +1,266 @@ +use std::collections::{HashMap, HashSet}; + +use ::redb::{Error, MultimapTableDefinition, TableDefinition, WriteTransaction}; +use itertools::Itertools as _; +use pallas::{crypto::hash::Hash, ledger::traverse::MultiEraOutput}; +use redb::{ReadTransaction, ReadableTable as _, TableError}; + +use crate::ledger::*; + +pub struct BlocksTable; + +impl BlocksTable { + pub const DEF: TableDefinition<'static, u64, &'static [u8; 32]> = + TableDefinition::new("blocks"); + + pub fn last(rx: &ReadTransaction) -> Result, Error> { + let table = match rx.open_table(Self::DEF) { + Ok(x) => x, + Err(TableError::TableDoesNotExist(_)) => return Ok(None), + Err(x) => return Err(x.into()), + }; + + let last = table.last()?; + let last = last.map(|(k, v)| ChainPoint(k.value(), Hash::new(*v.value()))); + + Ok(last) + } + + pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { + let mut table = wx.open_table(Self::DEF)?; + + if let Some(ChainPoint(slot, hash)) = delta.new_position.as_ref() { + let v: &[u8; 32] = hash; + table.insert(slot, v)?; + } + + if let Some(ChainPoint(slot, _)) = delta.undone_position.as_ref() { + table.remove(slot)?; + } + + Ok(()) + } +} + +type UtxosKey = (&'static [u8; 32], u32); +type UtxosValue = (u16, &'static [u8]); + +pub struct UtxosTable; + +impl UtxosTable { + pub const DEF: TableDefinition<'static, UtxosKey, UtxosValue> = TableDefinition::new("utxos"); + + pub fn get_sparse( + rx: &ReadTransaction, + refs: Vec, + ) -> Result, Error> { + let table = rx.open_table(Self::DEF)?; + let mut out = HashMap::new(); + + for key in refs { + if let Some(body) = table.get(&(&key.0 as &[u8; 32], key.1))? { + let (era, cbor) = body.value(); + let era = pallas::ledger::traverse::Era::try_from(era).unwrap(); + let cbor = cbor.to_owned(); + let value = EraCbor(era, cbor); + + out.insert(key, value); + } + } + + Ok(out) + } + + pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { + let mut table = wx.open_table(Self::DEF)?; + + for (k, v) in delta.produced_utxo.iter() { + let k: (&[u8; 32], u32) = (&k.0, k.1); + let v: (u16, &[u8]) = (v.0.into(), &v.1); + table.insert(k, v)?; + } + + for (k, _) in delta.undone_utxo.iter() { + let k: (&[u8; 32], u32) = (&k.0, k.1); + table.remove(k)?; + } + + Ok(()) + } + + pub fn compact( + wx: &WriteTransaction, + _slot: BlockSlot, + tombstone: &[TxoRef], + ) -> Result<(), Error> { + let mut table = wx.open_table(Self::DEF)?; + + for txo in tombstone { + let k: (&[u8; 32], u32) = (&txo.0, txo.1); + table.remove(k)?; + } + + Ok(()) + } +} + +pub struct PParamsTable; + +impl PParamsTable { + pub const DEF: TableDefinition<'static, u64, (u16, &'static [u8])> = + TableDefinition::new("pparams"); + + pub fn get_range(rx: &ReadTransaction, until: BlockSlot) -> Result, Error> { + let table = rx.open_table(Self::DEF)?; + + let mut out = vec![]; + + for item in table.range(..until)? { + let (_, body) = item?; + let (era, cbor) = body.value(); + let era = pallas::ledger::traverse::Era::try_from(era).unwrap(); + out.push(PParamsBody(era, Vec::from(cbor))); + } + + Ok(out) + } + + pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { + let mut table = wx.open_table(PParamsTable::DEF)?; + + if let Some(ChainPoint(slot, _)) = delta.new_position { + for PParamsBody(era, body) in delta.new_pparams.iter() { + let v: (u16, &[u8]) = (u16::from(*era), body); + table.insert(slot, v)?; + } + } + + if let Some(ChainPoint(slot, _)) = delta.undone_position { + table.remove(slot)?; + } + + Ok(()) + } +} + +pub struct TombstonesTable; + +impl TombstonesTable { + pub const DEF: MultimapTableDefinition<'static, BlockSlot, (&'static [u8; 32], TxoIdx)> = + MultimapTableDefinition::new("tombstones"); + + pub fn get_range( + rx: &ReadTransaction, + until: BlockSlot, + ) -> Result)>, Error> { + let table = rx.open_multimap_table(Self::DEF)?; + + let mut out = vec![]; + + for entry in table.range(..until)? { + let (slot, tss) = entry?; + + let tss: Vec<_> = tss + .into_iter() + .map_ok(|x| (*x.value().0, x.value().1)) + .map_ok(|(hash, idx)| TxoRef(hash.into(), idx)) + .try_collect()?; + + out.push((slot.value(), tss)); + } + + Ok(out) + } + + pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { + let mut table = wx.open_multimap_table(Self::DEF)?; + + if let Some(ChainPoint(slot, _)) = delta.new_position.as_ref() { + for (stxi, _) in delta.consumed_utxo.iter() { + let stxi: (&[u8; 32], u32) = (&stxi.0, stxi.1); + table.insert(slot, stxi)?; + } + } + + if let Some(ChainPoint(slot, _)) = delta.undone_position.as_ref() { + table.remove_all(slot)?; + } + + Ok(()) + } + + pub fn compact( + wx: &WriteTransaction, + slot: BlockSlot, + _tombstone: &[TxoRef], + ) -> Result<(), Error> { + let mut table = wx.open_multimap_table(Self::DEF)?; + + table.remove_all(slot)?; + + Ok(()) + } +} + +pub struct ByAddressIndex; + +impl ByAddressIndex { + pub const DEF: MultimapTableDefinition<'static, &'static [u8], UtxosKey> = + MultimapTableDefinition::new("byaddress"); + + pub fn get_utxo_by_address_set( + rx: &ReadTransaction, + address: &[u8], + ) -> Result, Error> { + let table = rx.open_multimap_table(Self::DEF)?; + + let mut out = HashSet::new(); + + for item in table.get(address)? { + let item = item?; + let (hash, idx) = item.value(); + out.insert(TxoRef((*hash).into(), idx)); + } + + Ok(out) + } + + fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { + let mut table = wx.open_multimap_table(Self::DEF)?; + + for (utxo, body) in delta.produced_utxo.iter() { + // TODO: decoding here is very inefficient + let body = MultiEraOutput::try_from(body).unwrap(); + + if let Ok(address) = body.address() { + let k = address.to_vec(); + let v: (&[u8; 32], u32) = (&utxo.0, utxo.1); + table.insert(k.as_slice(), v)?; + } + } + + for (stxi, body) in delta.consumed_utxo.iter() { + // TODO: decoding here is very inefficient + let body = MultiEraOutput::try_from(body).unwrap(); + + if let Ok(address) = body.address() { + let k = address.to_vec(); + let v: (&[u8; 32], u32) = (&stxi.0, stxi.1); + table.remove(k.as_slice(), v)?; + } + } + + for (stxi, body) in delta.undone_utxo.iter() { + // TODO: decoding here is very inefficient + let body = MultiEraOutput::try_from(body).unwrap(); + + if let Ok(address) = body.address() { + let k = address.to_vec(); + let v: (&[u8; 32], u32) = (&stxi.0, stxi.1); + table.remove(k.as_slice(), v)?; + } + } + + Ok(()) + } +} diff --git a/src/state/redb/v1.rs b/src/state/redb/v1.rs index c3108755..c927255c 100644 --- a/src/state/redb/v1.rs +++ b/src/state/redb/v1.rs @@ -1,204 +1,9 @@ -use ::redb::{ - Database, Durability, Error, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, - TableDefinition, TableError, WriteTransaction, -}; -use itertools::Itertools as _; -use pallas::{ - crypto::hash::Hash, - ledger::traverse::{Era, MultiEraOutput}, -}; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use ::redb::{Database, Durability, Error}; +use std::sync::Arc; use crate::ledger::*; -trait LedgerTable { - fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error>; - fn compact(wx: &WriteTransaction, slot: BlockSlot, tombstone: &[TxoRef]) -> Result<(), Error>; -} - -const BLOCKS: TableDefinition = TableDefinition::new("blocks"); -struct BlocksTable; - -impl LedgerTable for BlocksTable { - fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { - let mut table = wx.open_table(BLOCKS)?; - - if let Some(ChainPoint(slot, hash)) = delta.new_position.as_ref() { - let v: &[u8; 32] = hash; - table.insert(slot, v)?; - } - - if let Some(ChainPoint(slot, _)) = delta.undone_position.as_ref() { - table.remove(slot)?; - } - - Ok(()) - } - - fn compact( - _wx: &WriteTransaction, - _slot: BlockSlot, - _tombstone: &[TxoRef], - ) -> Result<(), Error> { - // do nothing - Ok(()) - } -} - -type UtxosKey<'a> = (&'a [u8; 32], u32); -type UtxosValue<'a> = (u16, &'a [u8]); - -const UTXOS: TableDefinition = TableDefinition::new("utxos"); -struct UtxosTable; - -impl LedgerTable for UtxosTable { - fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { - let mut table = wx.open_table(UTXOS)?; - - for (k, v) in delta.produced_utxo.iter() { - let k: (&[u8; 32], u32) = (&k.0, k.1); - let v: (u16, &[u8]) = (v.0.into(), &v.1); - table.insert(k, v)?; - } - - for (k, _) in delta.undone_utxo.iter() { - let k: (&[u8; 32], u32) = (&k.0, k.1); - table.remove(k)?; - } - - Ok(()) - } - - fn compact(wx: &WriteTransaction, _slot: BlockSlot, tombstone: &[TxoRef]) -> Result<(), Error> { - let mut table = wx.open_table(UTXOS)?; - - for txo in tombstone { - let k: (&[u8; 32], u32) = (&txo.0, txo.1); - table.remove(k)?; - } - - Ok(()) - } -} - -const PPARAMS: TableDefinition = TableDefinition::new("pparams"); -struct PParamsTable; - -impl LedgerTable for PParamsTable { - fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { - let mut table = wx.open_table(PPARAMS)?; - - if let Some(ChainPoint(slot, _)) = delta.new_position { - for PParamsBody(era, body) in delta.new_pparams.iter() { - let v: (u16, &[u8]) = (u16::from(*era), body); - table.insert(slot, v)?; - } - } - - if let Some(ChainPoint(slot, _)) = delta.undone_position { - table.remove(slot)?; - } - - Ok(()) - } - - fn compact( - _wx: &WriteTransaction, - _slot: BlockSlot, - _tombstone: &[TxoRef], - ) -> Result<(), Error> { - // do nothing - Ok(()) - } -} - -pub const TOMBSTONES: MultimapTableDefinition = - MultimapTableDefinition::new("tombstones"); -struct TombstonesTable; - -impl LedgerTable for TombstonesTable { - fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { - let mut table = wx.open_multimap_table(TOMBSTONES)?; - - if let Some(ChainPoint(slot, _)) = delta.new_position.as_ref() { - for (stxi, _) in delta.consumed_utxo.iter() { - let stxi: (&[u8; 32], u32) = (&stxi.0, stxi.1); - table.insert(slot, stxi)?; - } - } - - if let Some(ChainPoint(slot, _)) = delta.undone_position.as_ref() { - table.remove_all(slot)?; - } - - Ok(()) - } - - fn compact(wx: &WriteTransaction, slot: BlockSlot, _tombstone: &[TxoRef]) -> Result<(), Error> { - let mut table = wx.open_multimap_table(TOMBSTONES)?; - - table.remove_all(slot)?; - - Ok(()) - } -} - -pub const BY_ADDRESS_INDEX: MultimapTableDefinition<&[u8], UtxosKey> = - MultimapTableDefinition::new("byaddress"); -struct ByAddressIndex; - -impl LedgerTable for ByAddressIndex { - fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { - let mut table = wx.open_multimap_table(BY_ADDRESS_INDEX)?; - - for (utxo, body) in delta.produced_utxo.iter() { - // TODO: decoding here is very inefficient - let body = MultiEraOutput::try_from(body).unwrap(); - - if let Ok(address) = body.address() { - let k = address.to_vec(); - let v: (&[u8; 32], u32) = (&utxo.0, utxo.1); - table.insert(k.as_slice(), v)?; - } - } - - for (stxi, body) in delta.consumed_utxo.iter() { - // TODO: decoding here is very inefficient - let body = MultiEraOutput::try_from(body).unwrap(); - - if let Ok(address) = body.address() { - let k = address.to_vec(); - let v: (&[u8; 32], u32) = (&stxi.0, stxi.1); - table.remove(k.as_slice(), v)?; - } - } - - for (stxi, body) in delta.undone_utxo.iter() { - // TODO: decoding here is very inefficient - let body = MultiEraOutput::try_from(body).unwrap(); - - if let Ok(address) = body.address() { - let k = address.to_vec(); - let v: (&[u8; 32], u32) = (&stxi.0, stxi.1); - table.remove(k.as_slice(), v)?; - } - } - - Ok(()) - } - - fn compact( - _wx: &WriteTransaction, - _slot: BlockSlot, - _tombstone: &[TxoRef], - ) -> Result<(), Error> { - // do nothing - Ok(()) - } -} +use super::tables; #[derive(Clone)] pub struct LedgerStore(pub Arc); @@ -213,17 +18,7 @@ impl LedgerStore { pub fn cursor(&self) -> Result, Error> { let rx = self.0.begin_read()?; - - let table = match rx.open_table(BLOCKS) { - Ok(x) => x, - Err(TableError::TableDoesNotExist(_)) => return Ok(None), - Err(x) => return Err(x.into()), - }; - - let last = table.last()?; - let last = last.map(|(k, v)| ChainPoint(k.value(), Hash::new(*v.value()))); - - Ok(last) + tables::BlocksTable::last(&rx) } pub fn apply(&mut self, deltas: &[LedgerDelta]) -> Result<(), Error> { @@ -231,13 +26,10 @@ impl LedgerStore { wx.set_durability(Durability::Eventual); for delta in deltas { - UtxosTable::apply(&wx, delta)?; - PParamsTable::apply(&wx, delta)?; - TombstonesTable::apply(&wx, delta)?; - BlocksTable::apply(&wx, delta)?; - - // indexes? - //ByAddressIndex::apply(&wx, delta)?; + tables::UtxosTable::apply(&wx, delta)?; + tables::PParamsTable::apply(&wx, delta)?; + tables::TombstonesTable::apply(&wx, delta)?; + tables::BlocksTable::apply(&wx, delta)?; } wx.commit()?; @@ -246,30 +38,16 @@ impl LedgerStore { } pub fn finalize(&mut self, until: BlockSlot) -> Result<(), Error> { + let rx = self.0.begin_read()?; + let tss = tables::TombstonesTable::get_range(&rx, until)?; + let mut wx = self.0.begin_write()?; wx.set_durability(Durability::Eventual); - let tss: Vec<_> = { - wx.open_multimap_table(TOMBSTONES)? - .range(..until)? - .map_ok(|(k, v)| { - let values: Vec<_> = v - .into_iter() - .map_ok(|x| (*x.value().0, x.value().1)) - .map_ok(|(hash, idx)| TxoRef(hash.into(), idx)) - .try_collect()?; - - Result::<_, Error>::Ok((k.value(), values)) - }) - .try_collect()? - }; - for ts in tss { - let (slot, txos) = ts?; - UtxosTable::compact(&wx, slot, &txos)?; - PParamsTable::compact(&wx, slot, &txos)?; - BlocksTable::compact(&wx, slot, &txos)?; - TombstonesTable::compact(&wx, slot, &txos)?; + let (slot, txos) = ts; + tables::UtxosTable::compact(&wx, slot, &txos)?; + tables::TombstonesTable::compact(&wx, slot, &txos)?; } wx.commit()?; @@ -284,52 +62,17 @@ impl LedgerStore { } let rx = self.0.begin_read()?; - - let table = rx.open_table(UTXOS)?; - let mut out = HashMap::new(); - - for key in refs { - if let Some(body) = table.get(&(&key.0 as &[u8; 32], key.1))? { - let (era, cbor) = body.value(); - let era = Era::try_from(era).unwrap(); - let cbor = cbor.to_owned(); - let value = EraCbor(era, cbor); - - out.insert(key, value); - } - } - - Ok(out) + tables::UtxosTable::get_sparse(&rx, refs) } pub fn get_pparams(&self, until: BlockSlot) -> Result, Error> { let rx = self.0.begin_read()?; - let table = rx.open_table(PPARAMS)?; - - let mut out = vec![]; - - for item in table.range(..until)? { - let (_, body) = item?; - let (era, cbor) = body.value(); - let era = Era::try_from(era).unwrap(); - out.push(PParamsBody(era, Vec::from(cbor))); - } - - Ok(out) + tables::PParamsTable::get_range(&rx, until) } +} - pub fn get_utxo_by_address_set(&self, address: &[u8]) -> Result, Error> { - let rx = self.0.begin_read()?; - let table = rx.open_multimap_table(BY_ADDRESS_INDEX)?; - - let mut out = HashSet::new(); - - for item in table.get(address)? { - let item = item?; - let (hash, idx) = item.value(); - out.insert(TxoRef((*hash).into(), idx)); - } - - Ok(out) +impl From for LedgerStore { + fn from(value: Database) -> Self { + Self(Arc::new(value)) } }