Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add decompression traits and a test case #2295

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Changed

- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.
- [2295](https://github.com/FuelLabs/fuel-core/pull/2295): `CombinedDb::from_config` now respects `state_rewind_policy` with tmp RocksDB.

### Added
- [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`).
- [2295](https://github.com/FuelLabs/fuel-core/pull/2295): A test case for decompressing DA-compressed blocks.
- [2347](https://github.com/FuelLabs/fuel-core/pull/2364): Add activity concept in order to protect against infinitely increasing DA gas price scenarios
- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future.
- [2386](https://github.com/FuelLabs/fuel-core/pull/2386): Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit.
Expand Down
4 changes: 3 additions & 1 deletion benches/benches/block_target_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use fuel_core::{
Config,
FuelService,
},
state::historical_rocksdb::StateRewindPolicy,
};
use fuel_core_benches::{
default_gas_costs::default_gas_costs,
Expand Down Expand Up @@ -265,7 +266,8 @@ fn service_with_many_contracts(
.build()
.unwrap();
let _drop = rt.enter();
let mut database = Database::rocksdb_temp();
let mut database = Database::rocksdb_temp(StateRewindPolicy::NoRewind)
.expect("Failed to create database");

let mut chain_config = ChainConfig::local_testnet();

Expand Down
17 changes: 16 additions & 1 deletion crates/fuel-core/src/combined_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ impl CombinedDatabase {
})
}

/// A test-only temporary rocksdb database with given rewind policy.
#[cfg(feature = "rocksdb")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be "test-helpers" if it's "test-only"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5251318

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this in beaee56 since it seems like we're using it with when empty path is provided, and I'm not sure if we need that outside tests as well.

pub fn from_state_rewind_policy(
Dentosal marked this conversation as resolved.
Show resolved Hide resolved
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
Ok(Self {
on_chain: Database::rocksdb_temp(state_rewind_policy)?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should use ShallowTempDir here? we moved it from fuel-core to the benches crate but if the use case is the same we can use it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we have a way to clean up the directory after shutdown.. not sure if this method deletes the db after we are done with it.

off_chain: Database::rocksdb_temp(state_rewind_policy)?,
relayer: Database::rocksdb_temp(state_rewind_policy)?,
Dentosal marked this conversation as resolved.
Show resolved Hide resolved
gas_price: Default::default(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, why the difference for the gas_price database?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xgreenx might know, I have no idea

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because gas price table and relayer table doesn't need to support state rewind in the case of the temporary database(right now at least). Because we don't have any logic around it.

})
}

pub fn from_config(config: &CombinedDatabaseConfig) -> DatabaseResult<Self> {
let combined_database = match config.database_type {
#[cfg(feature = "rocksdb")]
Expand All @@ -114,7 +127,9 @@ impl CombinedDatabase {
tracing::warn!(
"No RocksDB path configured, initializing database with a tmp directory"
);
CombinedDatabase::default()
CombinedDatabase::from_state_rewind_policy(
config.state_rewind_policy,
)?
} else {
tracing::info!(
"Opening database {:?} with cache size \"{}\" and state rewind policy \"{:?}\"",
Expand Down
12 changes: 6 additions & 6 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,11 @@ where
}

#[cfg(feature = "rocksdb")]
pub fn rocksdb_temp() -> Self {
let db = RocksDb::<Historical<Description>>::default_open_temp(None).unwrap();
let historical_db =
HistoricalRocksDB::new(db, StateRewindPolicy::NoRewind).unwrap();
pub fn rocksdb_temp(rewind_policy: StateRewindPolicy) -> Result<Self> {
let db = RocksDb::<Historical<Description>>::default_open_temp(None)?;
let historical_db = HistoricalRocksDB::new(db, rewind_policy)?;
let data = Arc::new(historical_db);
Self::from_storage(DataSource::new(data, Stage::default()))
Ok(Self::from_storage(DataSource::new(data, Stage::default())))
}
}

Expand All @@ -275,7 +274,8 @@ where
}
#[cfg(feature = "rocksdb")]
{
Self::rocksdb_temp()
Self::rocksdb_temp(StateRewindPolicy::NoRewind)
.expect("Failed to create a temporary database")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

pub mod api_service;
mod da_compression;
pub mod da_compression;
pub mod database;
pub(crate) mod metrics_extension;
pub mod ports;
Expand Down
181 changes: 172 additions & 9 deletions crates/fuel-core/src/graphql_api/da_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,21 @@ use fuel_core_compression::{
config::Config,
ports::{
EvictorDb,
HistoryLookup,
TemporalRegistry,
UtxoIdToPointer,
},
};
use fuel_core_storage::{
not_found,
tables::{
Coins,
FuelBlocks,
Messages,
},
StorageAsMut,
StorageAsRef,
StorageInspect,
};
use fuel_core_types::{
blockchain::block::Block,
Expand Down Expand Up @@ -49,8 +56,8 @@ where
{
let compressed = compress(
config,
CompressTx {
db_tx,
CompressDbTx {
db_tx: DbTx { db_tx },
block_events,
},
block,
Expand All @@ -65,14 +72,23 @@ where
Ok(())
}

struct CompressTx<'a, Tx> {
db_tx: &'a mut Tx,
pub struct DbTx<'a, Tx> {
pub db_tx: &'a mut Tx,
}
netrome marked this conversation as resolved.
Show resolved Hide resolved

struct CompressDbTx<'a, Tx> {
db_tx: DbTx<'a, Tx>,
block_events: &'a [Event],
}

pub struct DecompressDbTx<'a, Tx, Onchain> {
pub db_tx: DbTx<'a, Tx>,
pub onchain_db: Onchain,
}

macro_rules! impl_temporal_registry {
($type:ty) => { paste::paste! {
impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx>
impl<'a, Tx> TemporalRegistry<$type> for DbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
Expand Down Expand Up @@ -150,15 +166,87 @@ macro_rules! impl_temporal_registry {
}
}

impl<'a, Tx> EvictorDb<$type> for CompressTx<'a, Tx>
impl<'a, Tx> TemporalRegistry<$type> for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
fn read_registry(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<$type> {
self.db_tx.read_registry(key)
}

fn read_timestamp(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<Tai64> {
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
}

fn write_registry(
&mut self,
key: &fuel_core_types::fuel_compression::RegistryKey,
value: &$type,
timestamp: Tai64,
) -> anyhow::Result<()> {
self.db_tx.write_registry(key, value, timestamp)
}

fn registry_index_lookup(
&self,
value: &$type,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
{
self.db_tx.registry_index_lookup(value)
}
}

impl<'a, Tx, Offchain> TemporalRegistry<$type> for DecompressDbTx<'a, Tx, Offchain>
where
Tx: OffChainDatabaseTransaction,
{
fn read_registry(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<$type> {
self.db_tx.read_registry(key)
}

fn read_timestamp(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<Tai64> {
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
}

fn write_registry(
&mut self,
key: &fuel_core_types::fuel_compression::RegistryKey,
value: &$type,
timestamp: Tai64,
) -> anyhow::Result<()> {
self.db_tx.write_registry(key, value, timestamp)
}

fn registry_index_lookup(
&self,
value: &$type,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
{
self.db_tx.registry_index_lookup(value)
}
}

impl<'a, Tx> EvictorDb<$type> for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
fn set_latest_assigned_key(
&mut self,
key: fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<()> {
self.db_tx
self.db_tx.db_tx
.storage_as_mut::<DaCompressionTemporalRegistryEvictorCache>()
.insert(&MetadataKey::$type, &key)?;
Ok(())
Expand All @@ -168,7 +256,7 @@ macro_rules! impl_temporal_registry {
&self,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>> {
Ok(self
.db_tx
.db_tx.db_tx
.storage_as_ref::<DaCompressionTemporalRegistryEvictorCache>()
.get(&MetadataKey::$type)?
.map(|v| v.into_owned())
Expand All @@ -185,7 +273,7 @@ impl_temporal_registry!(ContractId);
impl_temporal_registry!(ScriptCode);
impl_temporal_registry!(PredicateCode);

impl<'a, Tx> UtxoIdToPointer for CompressTx<'a, Tx>
impl<'a, Tx> UtxoIdToPointer for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
Expand All @@ -210,3 +298,78 @@ where
anyhow::bail!("UtxoId not found in the block events");
}
}

impl<'a, Tx, Onchain> HistoryLookup for DecompressDbTx<'a, Tx, Onchain>
where
Tx: OffChainDatabaseTransaction,
Onchain: StorageInspect<Coins, Error = fuel_core_storage::Error>
+ StorageInspect<Messages, Error = fuel_core_storage::Error>
+ StorageInspect<FuelBlocks, Error = fuel_core_storage::Error>,
{
fn utxo_id(
&self,
c: fuel_core_types::fuel_tx::CompressedUtxoId,
) -> anyhow::Result<fuel_core_types::fuel_tx::UtxoId> {
if c.tx_pointer.block_height() == 0u32.into() {
// This is a genesis coin, which is handled differently.
// See CoinConfigGenerator::generate which generates the genesis coins.
let mut bytes = [0u8; 32];
let tx_index = c.tx_pointer.tx_index();
bytes[..std::mem::size_of_val(&tx_index)]
.copy_from_slice(&tx_index.to_be_bytes());
return Ok(fuel_core_types::fuel_tx::UtxoId::new(
fuel_core_types::fuel_tx::TxId::from(bytes),
0,
));
}

let block_info = self
.onchain_db
.storage_as_ref::<FuelBlocks>()
.get(&c.tx_pointer.block_height())?
.ok_or(not_found!(FuelBlocks))?;

let tx_id = *block_info
.transactions()
.get(c.tx_pointer.tx_index() as usize)
.ok_or(anyhow::anyhow!(
"Transaction not found in the block: {:?}",
c.tx_pointer
))?;

Ok(fuel_core_types::fuel_tx::UtxoId::new(tx_id, c.output_index))
}

fn coin(
&self,
utxo_id: fuel_core_types::fuel_tx::UtxoId,
) -> anyhow::Result<fuel_core_compression::ports::CoinInfo> {
let coin = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::Coins>()
.get(&utxo_id)?
.ok_or(not_found!(fuel_core_storage::tables::Coins))?;
Ok(fuel_core_compression::ports::CoinInfo {
owner: *coin.owner(),
asset_id: *coin.asset_id(),
amount: *coin.amount(),
})
}

fn message(
&self,
nonce: fuel_core_types::fuel_types::Nonce,
) -> anyhow::Result<fuel_core_compression::ports::MessageInfo> {
let message = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::Messages>()
.get(&nonce)?
.ok_or(not_found!(fuel_core_storage::tables::Messages))?;
Ok(fuel_core_compression::ports::MessageInfo {
sender: *message.sender(),
recipient: *message.recipient(),
amount: message.amount(),
data: message.data().clone(),
})
}
}
Loading
Loading