Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add decompression traits and a test case #2295

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Added
- [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`).
- [2295](https://github.com/FuelLabs/fuel-core/pull/2295): A test case for decompressing DA-compressed blocks.

### Changed
- [2295](https://github.com/FuelLabs/fuel-core/pull/2295): `CombinedDb::from_config` now respects `state_rewind_policy` with tmp RocksDB.

## [Version 0.40.0]

Expand Down
4 changes: 3 additions & 1 deletion benches/benches/block_target_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use fuel_core::{
Config,
FuelService,
},
state::historical_rocksdb::StateRewindPolicy,
};
use fuel_core_benches::{
default_gas_costs::default_gas_costs,
Expand Down Expand Up @@ -265,7 +266,8 @@ fn service_with_many_contracts(
.build()
.unwrap();
let _drop = rt.enter();
let mut database = Database::rocksdb_temp();
let mut database = Database::rocksdb_temp(StateRewindPolicy::NoRewind)
.expect("Failed to create database");

let mut chain_config = ChainConfig::local_testnet();

Expand Down
17 changes: 16 additions & 1 deletion crates/fuel-core/src/combined_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ impl CombinedDatabase {
})
}

/// A test-only temporary rocksdb database with given rewind policy.
#[cfg(feature = "rocksdb")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be "test-helpers" if it's "test-only"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5251318

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this in beaee56 since it seems like we're using it with when empty path is provided, and I'm not sure if we need that outside tests as well.

pub fn from_state_rewind_policy(
Dentosal marked this conversation as resolved.
Show resolved Hide resolved
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
Ok(Self {
on_chain: Database::rocksdb_temp(state_rewind_policy)?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should use ShallowTempDir here? we moved it from fuel-core to the benches crate but if the use case is the same we can use it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we have a way to clean up the directory after shutdown.. not sure if this method deletes the db after we are done with it.

off_chain: Database::rocksdb_temp(state_rewind_policy)?,
relayer: Database::rocksdb_temp(state_rewind_policy)?,
Dentosal marked this conversation as resolved.
Show resolved Hide resolved
gas_price: Default::default(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, why the difference for the gas_price database?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xgreenx might know, I have no idea

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because gas price table and relayer table doesn't need to support state rewind in the case of the temporary database(right now at least). Because we don't have any logic around it.

})
}

pub fn from_config(config: &CombinedDatabaseConfig) -> DatabaseResult<Self> {
let combined_database = match config.database_type {
#[cfg(feature = "rocksdb")]
Expand All @@ -103,7 +116,9 @@ impl CombinedDatabase {
tracing::warn!(
"No RocksDB path configured, initializing database with a tmp directory"
);
CombinedDatabase::default()
CombinedDatabase::from_state_rewind_policy(
config.state_rewind_policy,
)?
} else {
tracing::info!(
"Opening database {:?} with cache size \"{}\" and state rewind policy \"{:?}\"",
Expand Down
12 changes: 6 additions & 6 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,11 @@ where
}

#[cfg(feature = "rocksdb")]
pub fn rocksdb_temp() -> Self {
let db = RocksDb::<Historical<Description>>::default_open_temp(None).unwrap();
let historical_db =
HistoricalRocksDB::new(db, StateRewindPolicy::NoRewind).unwrap();
pub fn rocksdb_temp(rewind_policy: StateRewindPolicy) -> Result<Self> {
let db = RocksDb::<Historical<Description>>::default_open_temp(None)?;
let historical_db = HistoricalRocksDB::new(db, rewind_policy)?;
let data = Arc::new(historical_db);
Self::from_storage(DataSource::new(data, Stage::default()))
Ok(Self::from_storage(DataSource::new(data, Stage::default())))
}
}

Expand All @@ -273,7 +272,8 @@ where
}
#[cfg(feature = "rocksdb")]
{
Self::rocksdb_temp()
Self::rocksdb_temp(StateRewindPolicy::NoRewind)
.expect("Failed to create a temporary database")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

pub mod api_service;
mod da_compression;
pub mod da_compression;
pub mod database;
pub(crate) mod metrics_extension;
pub mod ports;
Expand Down
181 changes: 172 additions & 9 deletions crates/fuel-core/src/graphql_api/da_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,21 @@ use fuel_core_compression::{
config::Config,
ports::{
EvictorDb,
HistoryLookup,
TemporalRegistry,
UtxoIdToPointer,
},
};
use fuel_core_storage::{
not_found,
tables::{
Coins,
FuelBlocks,
Messages,
},
StorageAsMut,
StorageAsRef,
StorageInspect,
};
use fuel_core_types::{
blockchain::block::Block,
Expand Down Expand Up @@ -49,8 +56,8 @@ where
{
let compressed = compress(
config,
CompressTx {
db_tx,
CompressDbTx {
db_tx: DbTx { db_tx },
block_events,
},
block,
Expand All @@ -65,14 +72,23 @@ where
Ok(())
}

struct CompressTx<'a, Tx> {
db_tx: &'a mut Tx,
pub struct DbTx<'a, Tx> {
pub db_tx: &'a mut Tx,
}
netrome marked this conversation as resolved.
Show resolved Hide resolved

struct CompressDbTx<'a, Tx> {
db_tx: DbTx<'a, Tx>,
block_events: &'a [Event],
}

pub struct DecompressDbTx<'a, Tx, Onchain> {
pub db_tx: DbTx<'a, Tx>,
pub onchain_db: Onchain,
}

macro_rules! impl_temporal_registry {
($type:ty) => { paste::paste! {
impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx>
impl<'a, Tx> TemporalRegistry<$type> for DbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
Expand Down Expand Up @@ -150,15 +166,87 @@ macro_rules! impl_temporal_registry {
}
}

impl<'a, Tx> EvictorDb<$type> for CompressTx<'a, Tx>
impl<'a, Tx> TemporalRegistry<$type> for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
fn read_registry(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<$type> {
self.db_tx.read_registry(key)
}

fn read_timestamp(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<Tai64> {
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
}

fn write_registry(
&mut self,
key: &fuel_core_types::fuel_compression::RegistryKey,
value: &$type,
timestamp: Tai64,
) -> anyhow::Result<()> {
self.db_tx.write_registry(key, value, timestamp)
}

fn registry_index_lookup(
&self,
value: &$type,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
{
self.db_tx.registry_index_lookup(value)
}
}

impl<'a, Tx, Offchain> TemporalRegistry<$type> for DecompressDbTx<'a, Tx, Offchain>
where
Tx: OffChainDatabaseTransaction,
{
fn read_registry(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<$type> {
self.db_tx.read_registry(key)
}

fn read_timestamp(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<Tai64> {
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
}

fn write_registry(
&mut self,
key: &fuel_core_types::fuel_compression::RegistryKey,
value: &$type,
timestamp: Tai64,
) -> anyhow::Result<()> {
self.db_tx.write_registry(key, value, timestamp)
}

fn registry_index_lookup(
&self,
value: &$type,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
{
self.db_tx.registry_index_lookup(value)
}
}

impl<'a, Tx> EvictorDb<$type> for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
fn set_latest_assigned_key(
&mut self,
key: fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<()> {
self.db_tx
self.db_tx.db_tx
.storage_as_mut::<DaCompressionTemporalRegistryEvictorCache>()
.insert(&MetadataKey::$type, &key)?;
Ok(())
Expand All @@ -168,7 +256,7 @@ macro_rules! impl_temporal_registry {
&self,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>> {
Ok(self
.db_tx
.db_tx.db_tx
.storage_as_ref::<DaCompressionTemporalRegistryEvictorCache>()
.get(&MetadataKey::$type)?
.map(|v| v.into_owned())
Expand All @@ -185,7 +273,7 @@ impl_temporal_registry!(ContractId);
impl_temporal_registry!(ScriptCode);
impl_temporal_registry!(PredicateCode);

impl<'a, Tx> UtxoIdToPointer for CompressTx<'a, Tx>
impl<'a, Tx> UtxoIdToPointer for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
Expand All @@ -210,3 +298,78 @@ where
anyhow::bail!("UtxoId not found in the block events");
}
}

impl<'a, Tx, Onchain> HistoryLookup for DecompressDbTx<'a, Tx, Onchain>
where
Tx: OffChainDatabaseTransaction,
Onchain: StorageInspect<Coins, Error = fuel_core_storage::Error>
+ StorageInspect<Messages, Error = fuel_core_storage::Error>
+ StorageInspect<FuelBlocks, Error = fuel_core_storage::Error>,
{
fn utxo_id(
&self,
c: fuel_core_types::fuel_tx::CompressedUtxoId,
) -> anyhow::Result<fuel_core_types::fuel_tx::UtxoId> {
if c.tx_pointer.block_height() == 0u32.into() {
// This is a genesis coin, which is handled differently.
// See CoinConfigGenerator::generate which generates the genesis coins.
let mut bytes = [0u8; 32];
let tx_index = c.tx_pointer.tx_index();
bytes[..std::mem::size_of_val(&tx_index)]
.copy_from_slice(&tx_index.to_be_bytes());
return Ok(fuel_core_types::fuel_tx::UtxoId::new(
fuel_core_types::fuel_tx::TxId::from(bytes),
0,
));
}

let block_info = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::FuelBlocks>()
Dentosal marked this conversation as resolved.
Show resolved Hide resolved
.get(&c.tx_pointer.block_height())?
.ok_or(not_found!(fuel_core_storage::tables::FuelBlocks))?;

let tx_id = *block_info
.transactions()
.get(c.tx_pointer.tx_index() as usize)
.ok_or(anyhow::anyhow!(
"Transaction not found in the block: {:?}",
c.tx_pointer
))?;

Ok(fuel_core_types::fuel_tx::UtxoId::new(tx_id, c.output_index))
}

fn coin(
&self,
utxo_id: fuel_core_types::fuel_tx::UtxoId,
) -> anyhow::Result<fuel_core_compression::ports::CoinInfo> {
let coin = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::Coins>()
.get(&dbg!(utxo_id))?
Dentosal marked this conversation as resolved.
Show resolved Hide resolved
Dentosal marked this conversation as resolved.
Show resolved Hide resolved
.ok_or(not_found!(fuel_core_storage::tables::Coins))?;
Ok(fuel_core_compression::ports::CoinInfo {
owner: *coin.owner(),
asset_id: *coin.asset_id(),
amount: *coin.amount(),
})
}

fn message(
&self,
nonce: fuel_core_types::fuel_types::Nonce,
) -> anyhow::Result<fuel_core_compression::ports::MessageInfo> {
let message = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::Messages>()
.get(&nonce)?
.ok_or(not_found!(fuel_core_storage::tables::Messages))?;
Ok(fuel_core_compression::ports::MessageInfo {
sender: *message.sender(),
recipient: *message.recipient(),
amount: message.amount(),
data: message.data().clone(),
})
}
}
Loading
Loading