Skip to content

Commit

Permalink
refactor: support side-by-side state schemas (#290)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Jul 13, 2024
1 parent 68e590f commit f080cf8
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 291 deletions.
6 changes: 6 additions & 0 deletions src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ impl LedgerStore {
}
}

impl From<redb::LedgerStore> for LedgerStore {
fn from(value: redb::LedgerStore) -> Self {
Self::Redb(value)
}
}

impl interop::LedgerContext for LedgerStore {
fn get_utxos<'a>(&self, refs: &[interop::TxoRef]) -> Option<interop::UtxoMap> {
let refs: Vec<_> = refs.iter().map(|x| TxoRef::from(*x)).collect();
Expand Down
75 changes: 61 additions & 14 deletions src/state/redb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,77 @@
mod tables;
mod v1;

use ::redb::Database;
use std::{collections::HashSet, path::Path, sync::Arc};
use itertools::Itertools as _;
use redb::TableHandle as _;
use std::{
collections::HashSet,
hash::{Hash as _, Hasher as _},
path::Path,
};
use tracing::warn;

use crate::ledger::*;

const DEFAULT_CACHE_SIZE_MB: usize = 500;

fn compute_schema_hash(db: &Database) -> Result<Option<u64>, LedgerError> {
let mut hasher = std::hash::DefaultHasher::new();

let mut names = db
.begin_read()
.map_err(|e| LedgerError::StorageError(e.into()))?
.list_tables()
.map_err(|e| LedgerError::StorageError(e.into()))?
.map(|t| t.name().to_owned())
.collect_vec();

if names.is_empty() {
// this db hasn't been initialized, we can't compute hash
return Ok(None);
}

// sort to make sure we don't depend on some redb implementation regarding order
// of the tables.
names.sort();

names.into_iter().for_each(|n| n.hash(&mut hasher));

let hash = hasher.finish();

Ok(Some(hash))
}

fn open_db(path: impl AsRef<Path>, cache_size: Option<usize>) -> Result<Database, LedgerError> {
let db = Database::builder()
.set_repair_callback(|x| warn!(progress = x.progress() * 100f64, "ledger db is repairing"))
.set_cache_size(1024 * 1024 * cache_size.unwrap_or(DEFAULT_CACHE_SIZE_MB))
//.create_with_backend(redb::backends::InMemoryBackend::new())?;
.create(path)
.map_err(|x| LedgerError::StorageError(x.into()))?;

Ok(db)
}

#[derive(Clone)]
pub enum LedgerStore {
SchemaV1(v1::LedgerStore),
}

impl LedgerStore {
pub fn open(path: impl AsRef<Path>, cache_size: Option<usize>) -> Result<Self, LedgerError> {
let inner = Database::builder()
.set_repair_callback(|x| {
warn!(progress = x.progress() * 100f64, "ledger db is repairing")
})
.set_cache_size(1024 * 1024 * cache_size.unwrap_or(DEFAULT_CACHE_SIZE_MB))
//.create_with_backend(redb::backends::InMemoryBackend::new())?;
.create(path)
.map_err(|x| LedgerError::StorageError(x.into()))?;

Ok(Self::SchemaV1(v1::LedgerStore(Arc::new(inner))))
let db = open_db(path, cache_size)?;
let hash = compute_schema_hash(&db)?;

let schema = match hash {
// use latest schema if no hash
None => v1::LedgerStore::from(db).into(),
// v1 hash
Some(13844724490616556453) => v1::LedgerStore::from(db).into(),
Some(x) => panic!("can't recognize db hash {}", x),
};

Ok(schema)
}

pub fn cursor(&self) -> Result<Option<ChainPoint>, LedgerError> {
Expand Down Expand Up @@ -70,8 +117,8 @@ impl LedgerStore {
}
}

impl From<LedgerStore> for super::LedgerStore {
fn from(value: LedgerStore) -> Self {
super::LedgerStore::Redb(value)
impl From<v1::LedgerStore> for LedgerStore {
fn from(value: v1::LedgerStore) -> Self {
Self::SchemaV1(value)
}
}
266 changes: 266 additions & 0 deletions src/state/redb/tables.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
use std::collections::{HashMap, HashSet};

use ::redb::{Error, MultimapTableDefinition, TableDefinition, WriteTransaction};
use itertools::Itertools as _;
use pallas::{crypto::hash::Hash, ledger::traverse::MultiEraOutput};
use redb::{ReadTransaction, ReadableTable as _, TableError};

use crate::ledger::*;

pub struct BlocksTable;

impl BlocksTable {
pub const DEF: TableDefinition<'static, u64, &'static [u8; 32]> =
TableDefinition::new("blocks");

pub fn last(rx: &ReadTransaction) -> Result<Option<ChainPoint>, Error> {
let table = match rx.open_table(Self::DEF) {
Ok(x) => x,
Err(TableError::TableDoesNotExist(_)) => return Ok(None),
Err(x) => return Err(x.into()),
};

let last = table.last()?;
let last = last.map(|(k, v)| ChainPoint(k.value(), Hash::new(*v.value())));

Ok(last)
}

pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> {
let mut table = wx.open_table(Self::DEF)?;

if let Some(ChainPoint(slot, hash)) = delta.new_position.as_ref() {
let v: &[u8; 32] = hash;
table.insert(slot, v)?;
}

if let Some(ChainPoint(slot, _)) = delta.undone_position.as_ref() {
table.remove(slot)?;
}

Ok(())
}
}

type UtxosKey = (&'static [u8; 32], u32);
type UtxosValue = (u16, &'static [u8]);

pub struct UtxosTable;

impl UtxosTable {
pub const DEF: TableDefinition<'static, UtxosKey, UtxosValue> = TableDefinition::new("utxos");

pub fn get_sparse(
rx: &ReadTransaction,
refs: Vec<TxoRef>,
) -> Result<HashMap<TxoRef, EraCbor>, Error> {
let table = rx.open_table(Self::DEF)?;
let mut out = HashMap::new();

for key in refs {
if let Some(body) = table.get(&(&key.0 as &[u8; 32], key.1))? {
let (era, cbor) = body.value();
let era = pallas::ledger::traverse::Era::try_from(era).unwrap();
let cbor = cbor.to_owned();
let value = EraCbor(era, cbor);

out.insert(key, value);
}
}

Ok(out)
}

pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> {
let mut table = wx.open_table(Self::DEF)?;

for (k, v) in delta.produced_utxo.iter() {
let k: (&[u8; 32], u32) = (&k.0, k.1);
let v: (u16, &[u8]) = (v.0.into(), &v.1);
table.insert(k, v)?;
}

for (k, _) in delta.undone_utxo.iter() {
let k: (&[u8; 32], u32) = (&k.0, k.1);
table.remove(k)?;
}

Ok(())
}

pub fn compact(
wx: &WriteTransaction,
_slot: BlockSlot,
tombstone: &[TxoRef],
) -> Result<(), Error> {
let mut table = wx.open_table(Self::DEF)?;

for txo in tombstone {
let k: (&[u8; 32], u32) = (&txo.0, txo.1);
table.remove(k)?;
}

Ok(())
}
}

pub struct PParamsTable;

impl PParamsTable {
pub const DEF: TableDefinition<'static, u64, (u16, &'static [u8])> =
TableDefinition::new("pparams");

pub fn get_range(rx: &ReadTransaction, until: BlockSlot) -> Result<Vec<PParamsBody>, Error> {
let table = rx.open_table(Self::DEF)?;

let mut out = vec![];

for item in table.range(..until)? {
let (_, body) = item?;
let (era, cbor) = body.value();
let era = pallas::ledger::traverse::Era::try_from(era).unwrap();
out.push(PParamsBody(era, Vec::from(cbor)));
}

Ok(out)
}

pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> {
let mut table = wx.open_table(PParamsTable::DEF)?;

if let Some(ChainPoint(slot, _)) = delta.new_position {
for PParamsBody(era, body) in delta.new_pparams.iter() {
let v: (u16, &[u8]) = (u16::from(*era), body);
table.insert(slot, v)?;
}
}

if let Some(ChainPoint(slot, _)) = delta.undone_position {
table.remove(slot)?;
}

Ok(())
}
}

pub struct TombstonesTable;

impl TombstonesTable {
pub const DEF: MultimapTableDefinition<'static, BlockSlot, (&'static [u8; 32], TxoIdx)> =
MultimapTableDefinition::new("tombstones");

pub fn get_range(
rx: &ReadTransaction,
until: BlockSlot,
) -> Result<Vec<(BlockSlot, Vec<TxoRef>)>, Error> {
let table = rx.open_multimap_table(Self::DEF)?;

let mut out = vec![];

for entry in table.range(..until)? {
let (slot, tss) = entry?;

let tss: Vec<_> = tss
.into_iter()
.map_ok(|x| (*x.value().0, x.value().1))
.map_ok(|(hash, idx)| TxoRef(hash.into(), idx))
.try_collect()?;

out.push((slot.value(), tss));
}

Ok(out)
}

pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> {
let mut table = wx.open_multimap_table(Self::DEF)?;

if let Some(ChainPoint(slot, _)) = delta.new_position.as_ref() {
for (stxi, _) in delta.consumed_utxo.iter() {
let stxi: (&[u8; 32], u32) = (&stxi.0, stxi.1);
table.insert(slot, stxi)?;
}
}

if let Some(ChainPoint(slot, _)) = delta.undone_position.as_ref() {
table.remove_all(slot)?;
}

Ok(())
}

pub fn compact(
wx: &WriteTransaction,
slot: BlockSlot,
_tombstone: &[TxoRef],
) -> Result<(), Error> {
let mut table = wx.open_multimap_table(Self::DEF)?;

table.remove_all(slot)?;

Ok(())
}
}

pub struct ByAddressIndex;

Check warning on line 205 in src/state/redb/tables.rs

View workflow job for this annotation

GitHub Actions / Test Suite

struct `ByAddressIndex` is never constructed

Check failure on line 205 in src/state/redb/tables.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

struct `ByAddressIndex` is never constructed

Check warning on line 205 in src/state/redb/tables.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

struct `ByAddressIndex` is never constructed

Check warning on line 205 in src/state/redb/tables.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable)

struct `ByAddressIndex` is never constructed

Check warning on line 205 in src/state/redb/tables.rs

View workflow job for this annotation

GitHub Actions / Check (macOS-latest, stable)

struct `ByAddressIndex` is never constructed

impl ByAddressIndex {
pub const DEF: MultimapTableDefinition<'static, &'static [u8], UtxosKey> =

Check warning on line 208 in src/state/redb/tables.rs

View workflow job for this annotation

GitHub Actions / Test Suite

associated items `DEF`, `get_utxo_by_address_set`, and `apply` are never used

Check failure on line 208 in src/state/redb/tables.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

associated items `DEF`, `get_utxo_by_address_set`, and `apply` are never used

Check warning on line 208 in src/state/redb/tables.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

associated items `DEF`, `get_utxo_by_address_set`, and `apply` are never used

Check warning on line 208 in src/state/redb/tables.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable)

associated items `DEF`, `get_utxo_by_address_set`, and `apply` are never used

Check warning on line 208 in src/state/redb/tables.rs

View workflow job for this annotation

GitHub Actions / Check (macOS-latest, stable)

associated items `DEF`, `get_utxo_by_address_set`, and `apply` are never used
MultimapTableDefinition::new("byaddress");

pub fn get_utxo_by_address_set(
rx: &ReadTransaction,
address: &[u8],
) -> Result<HashSet<TxoRef>, Error> {
let table = rx.open_multimap_table(Self::DEF)?;

let mut out = HashSet::new();

for item in table.get(address)? {
let item = item?;
let (hash, idx) = item.value();
out.insert(TxoRef((*hash).into(), idx));
}

Ok(out)
}

fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> {
let mut table = wx.open_multimap_table(Self::DEF)?;

for (utxo, body) in delta.produced_utxo.iter() {
// TODO: decoding here is very inefficient
let body = MultiEraOutput::try_from(body).unwrap();

if let Ok(address) = body.address() {
let k = address.to_vec();
let v: (&[u8; 32], u32) = (&utxo.0, utxo.1);
table.insert(k.as_slice(), v)?;
}
}

for (stxi, body) in delta.consumed_utxo.iter() {
// TODO: decoding here is very inefficient
let body = MultiEraOutput::try_from(body).unwrap();

if let Ok(address) = body.address() {
let k = address.to_vec();
let v: (&[u8; 32], u32) = (&stxi.0, stxi.1);
table.remove(k.as_slice(), v)?;
}
}

for (stxi, body) in delta.undone_utxo.iter() {
// TODO: decoding here is very inefficient
let body = MultiEraOutput::try_from(body).unwrap();

if let Ok(address) = body.address() {
let k = address.to_vec();
let v: (&[u8; 32], u32) = (&stxi.0, stxi.1);
table.remove(k.as_slice(), v)?;
}
}

Ok(())
}
}
Loading

0 comments on commit f080cf8

Please sign in to comment.