diff --git a/CHANGELOG.md b/CHANGELOG.md index 053d3258a87..a446cf4a78d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2413](https://github.com/FuelLabs/fuel-core/issues/2413): block production immediately errors if unable to lock the mutex. ### Changed +- [2295](https://github.com/FuelLabs/fuel-core/pull/2295): `CombinedDb::from_config` now respects `state_rewind_policy` with tmp RocksDB. - [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message. ## [Version 0.40.0] diff --git a/benches/benches/block_target_gas.rs b/benches/benches/block_target_gas.rs index 9574720a3df..fddd14a24c9 100644 --- a/benches/benches/block_target_gas.rs +++ b/benches/benches/block_target_gas.rs @@ -27,6 +27,7 @@ use fuel_core::{ Config, FuelService, }, + state::historical_rocksdb::StateRewindPolicy, }; use fuel_core_benches::{ default_gas_costs::default_gas_costs, @@ -265,7 +266,8 @@ fn service_with_many_contracts( .build() .unwrap(); let _drop = rt.enter(); - let mut database = Database::rocksdb_temp(); + let mut database = Database::rocksdb_temp(StateRewindPolicy::NoRewind) + .expect("Failed to create database"); let mut chain_config = ChainConfig::local_testnet(); diff --git a/crates/client/assets/debugAdapterProtocol.json b/crates/client/assets/debugAdapterProtocol.json index 44a0c2eed9c..b435aef0f85 100644 --- a/crates/client/assets/debugAdapterProtocol.json +++ b/crates/client/assets/debugAdapterProtocol.json @@ -1440,7 +1440,7 @@ { "$ref": "#/definitions/Request" }, { "type": "object", - "description": "Replaces all existing instruction breakpoints. Typically, instruction breakpoints would be set from a diassembly window. \nTo clear all instruction breakpoints, specify an empty array.\nWhen an instruction breakpoint is hit, a 'stopped' event (with reason 'instruction breakpoint') is generated.\nClients should only call this request if the capability 'supportsInstructionBreakpoints' is true.", + "description": "Replaces all existing instruction breakpoints. Typically, instruction breakpoints would be set from a disassembly window. \nTo clear all instruction breakpoints, specify an empty array.\nWhen an instruction breakpoint is hit, a 'stopped' event (with reason 'instruction breakpoint') is generated.\nClients should only call this request if the capability 'supportsInstructionBreakpoints' is true.", "properties": { "command": { "type": "string", diff --git a/crates/compression/src/lib.rs b/crates/compression/src/lib.rs index bd4b0fdcbba..d41deccefa1 100644 --- a/crates/compression/src/lib.rs +++ b/crates/compression/src/lib.rs @@ -16,6 +16,7 @@ pub use registry::RegistryKeyspace; use fuel_core_types::{ blockchain::header::PartialBlockHeader, fuel_tx::CompressedTransaction, + fuel_types::BlockHeight, }; use registry::RegistrationsPerTable; @@ -42,6 +43,15 @@ impl Default for VersionedCompressedBlock { } } +impl VersionedCompressedBlock { + /// Returns the height of the compressed block. + pub fn height(&self) -> &BlockHeight { + match self { + VersionedCompressedBlock::V0(block) => block.header.height(), + } + } +} + #[cfg(test)] mod tests { use fuel_core_compression as _; diff --git a/crates/compression/src/registry.rs b/crates/compression/src/registry.rs index 0bf1e3a5967..1fd20365d1a 100644 --- a/crates/compression/src/registry.rs +++ b/crates/compression/src/registry.rs @@ -78,7 +78,7 @@ macro_rules! tables { impl RegistrationsPerTable { - pub(crate) fn write_to_registry(&self, registry: &mut R, timestamp: Tai64) -> anyhow::Result<()> + pub fn write_to_registry(&self, registry: &mut R, timestamp: Tai64) -> anyhow::Result<()> where R: TemporalRegistryAll { diff --git a/crates/fuel-core/src/combined_database.rs b/crates/fuel-core/src/combined_database.rs index 8696a8b3969..c0b6d291af1 100644 --- a/crates/fuel-core/src/combined_database.rs +++ b/crates/fuel-core/src/combined_database.rs @@ -105,6 +105,19 @@ impl CombinedDatabase { }) } + /// A test-only temporary rocksdb database with given rewind policy. + #[cfg(feature = "rocksdb")] + pub fn temp_database_with_state_rewind_policy( + state_rewind_policy: StateRewindPolicy, + ) -> DatabaseResult { + Ok(Self { + on_chain: Database::rocksdb_temp(state_rewind_policy)?, + off_chain: Database::rocksdb_temp(state_rewind_policy)?, + relayer: Default::default(), + gas_price: Default::default(), + }) + } + pub fn from_config(config: &CombinedDatabaseConfig) -> DatabaseResult { let combined_database = match config.database_type { #[cfg(feature = "rocksdb")] @@ -114,7 +127,9 @@ impl CombinedDatabase { tracing::warn!( "No RocksDB path configured, initializing database with a tmp directory" ); - CombinedDatabase::default() + CombinedDatabase::temp_database_with_state_rewind_policy( + config.state_rewind_policy, + )? } else { tracing::info!( "Opening database {:?} with cache size \"{}\" and state rewind policy \"{:?}\"", diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 5061692ed92..50c286ea85b 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -251,12 +251,11 @@ where } #[cfg(feature = "rocksdb")] - pub fn rocksdb_temp() -> Self { - let db = RocksDb::>::default_open_temp(None).unwrap(); - let historical_db = - HistoricalRocksDB::new(db, StateRewindPolicy::NoRewind).unwrap(); + pub fn rocksdb_temp(rewind_policy: StateRewindPolicy) -> Result { + let db = RocksDb::>::default_open_temp(None)?; + let historical_db = HistoricalRocksDB::new(db, rewind_policy)?; let data = Arc::new(historical_db); - Self::from_storage(DataSource::new(data, Stage::default())) + Ok(Self::from_storage(DataSource::new(data, Stage::default()))) } } @@ -275,7 +274,8 @@ where } #[cfg(feature = "rocksdb")] { - Self::rocksdb_temp() + Self::rocksdb_temp(StateRewindPolicy::NoRewind) + .expect("Failed to create a temporary database") } } } @@ -408,7 +408,7 @@ impl Modifiable for GenesisDatabase { } } -fn commit_changes_with_height_update( +pub fn commit_changes_with_height_update( database: &mut Database, changes: Changes, heights_lookup: impl Fn( diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index 772bbc815ea..4e469a205d7 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -9,7 +9,7 @@ use std::{ }; pub mod api_service; -mod da_compression; +pub mod da_compression; pub mod database; pub(crate) mod metrics_extension; pub mod ports; diff --git a/crates/fuel-core/src/graphql_api/da_compression.rs b/crates/fuel-core/src/graphql_api/da_compression.rs index e9d11d1c22e..722f63b5080 100644 --- a/crates/fuel-core/src/graphql_api/da_compression.rs +++ b/crates/fuel-core/src/graphql_api/da_compression.rs @@ -14,14 +14,21 @@ use fuel_core_compression::{ config::Config, ports::{ EvictorDb, + HistoryLookup, TemporalRegistry, UtxoIdToPointer, }, }; use fuel_core_storage::{ not_found, + tables::{ + Coins, + FuelBlocks, + Messages, + }, StorageAsMut, StorageAsRef, + StorageInspect, }; use fuel_core_types::{ blockchain::block::Block, @@ -49,8 +56,8 @@ where { let compressed = compress( config, - CompressTx { - db_tx, + CompressDbTx { + db_tx: DbTx { db_tx }, block_events, }, block, @@ -65,14 +72,23 @@ where Ok(()) } -struct CompressTx<'a, Tx> { - db_tx: &'a mut Tx, +pub struct DbTx<'a, Tx> { + pub db_tx: &'a mut Tx, +} + +struct CompressDbTx<'a, Tx> { + db_tx: DbTx<'a, Tx>, block_events: &'a [Event], } +pub struct DecompressDbTx<'a, Tx, Onchain> { + pub db_tx: DbTx<'a, Tx>, + pub onchain_db: Onchain, +} + macro_rules! impl_temporal_registry { ($type:ty) => { paste::paste! { - impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx> + impl<'a, Tx> TemporalRegistry<$type> for DbTx<'a, Tx> where Tx: OffChainDatabaseTransaction, { @@ -150,7 +166,79 @@ macro_rules! impl_temporal_registry { } } - impl<'a, Tx> EvictorDb<$type> for CompressTx<'a, Tx> + impl<'a, Tx> TemporalRegistry<$type> for CompressDbTx<'a, Tx> + where + Tx: OffChainDatabaseTransaction, + { + fn read_registry( + &self, + key: &fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result<$type> { + self.db_tx.read_registry(key) + } + + fn read_timestamp( + &self, + key: &fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result { + <_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key) + } + + fn write_registry( + &mut self, + key: &fuel_core_types::fuel_compression::RegistryKey, + value: &$type, + timestamp: Tai64, + ) -> anyhow::Result<()> { + self.db_tx.write_registry(key, value, timestamp) + } + + fn registry_index_lookup( + &self, + value: &$type, + ) -> anyhow::Result> + { + self.db_tx.registry_index_lookup(value) + } + } + + impl<'a, Tx, Offchain> TemporalRegistry<$type> for DecompressDbTx<'a, Tx, Offchain> + where + Tx: OffChainDatabaseTransaction, + { + fn read_registry( + &self, + key: &fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result<$type> { + self.db_tx.read_registry(key) + } + + fn read_timestamp( + &self, + key: &fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result { + <_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key) + } + + fn write_registry( + &mut self, + key: &fuel_core_types::fuel_compression::RegistryKey, + value: &$type, + timestamp: Tai64, + ) -> anyhow::Result<()> { + self.db_tx.write_registry(key, value, timestamp) + } + + fn registry_index_lookup( + &self, + value: &$type, + ) -> anyhow::Result> + { + self.db_tx.registry_index_lookup(value) + } + } + + impl<'a, Tx> EvictorDb<$type> for CompressDbTx<'a, Tx> where Tx: OffChainDatabaseTransaction, { @@ -158,7 +246,7 @@ macro_rules! impl_temporal_registry { &mut self, key: fuel_core_types::fuel_compression::RegistryKey, ) -> anyhow::Result<()> { - self.db_tx + self.db_tx.db_tx .storage_as_mut::() .insert(&MetadataKey::$type, &key)?; Ok(()) @@ -168,7 +256,7 @@ macro_rules! impl_temporal_registry { &self, ) -> anyhow::Result> { Ok(self - .db_tx + .db_tx.db_tx .storage_as_ref::() .get(&MetadataKey::$type)? .map(|v| v.into_owned()) @@ -185,7 +273,7 @@ impl_temporal_registry!(ContractId); impl_temporal_registry!(ScriptCode); impl_temporal_registry!(PredicateCode); -impl<'a, Tx> UtxoIdToPointer for CompressTx<'a, Tx> +impl<'a, Tx> UtxoIdToPointer for CompressDbTx<'a, Tx> where Tx: OffChainDatabaseTransaction, { @@ -210,3 +298,78 @@ where anyhow::bail!("UtxoId not found in the block events"); } } + +impl<'a, Tx, Onchain> HistoryLookup for DecompressDbTx<'a, Tx, Onchain> +where + Tx: OffChainDatabaseTransaction, + Onchain: StorageInspect + + StorageInspect + + StorageInspect, +{ + fn utxo_id( + &self, + c: fuel_core_types::fuel_tx::CompressedUtxoId, + ) -> anyhow::Result { + if c.tx_pointer.block_height() == 0u32.into() { + // This is a genesis coin, which is handled differently. + // See CoinConfigGenerator::generate which generates the genesis coins. + let mut bytes = [0u8; 32]; + let tx_index = c.tx_pointer.tx_index(); + bytes[..std::mem::size_of_val(&tx_index)] + .copy_from_slice(&tx_index.to_be_bytes()); + return Ok(fuel_core_types::fuel_tx::UtxoId::new( + fuel_core_types::fuel_tx::TxId::from(bytes), + 0, + )); + } + + let block_info = self + .onchain_db + .storage_as_ref::() + .get(&c.tx_pointer.block_height())? + .ok_or(not_found!(FuelBlocks))?; + + let tx_id = *block_info + .transactions() + .get(c.tx_pointer.tx_index() as usize) + .ok_or(anyhow::anyhow!( + "Transaction not found in the block: {:?}", + c.tx_pointer + ))?; + + Ok(fuel_core_types::fuel_tx::UtxoId::new(tx_id, c.output_index)) + } + + fn coin( + &self, + utxo_id: fuel_core_types::fuel_tx::UtxoId, + ) -> anyhow::Result { + let coin = self + .onchain_db + .storage_as_ref::() + .get(&utxo_id)? + .ok_or(not_found!(fuel_core_storage::tables::Coins))?; + Ok(fuel_core_compression::ports::CoinInfo { + owner: *coin.owner(), + asset_id: *coin.asset_id(), + amount: *coin.amount(), + }) + } + + fn message( + &self, + nonce: fuel_core_types::fuel_types::Nonce, + ) -> anyhow::Result { + let message = self + .onchain_db + .storage_as_ref::() + .get(&nonce)? + .ok_or(not_found!(fuel_core_storage::tables::Messages))?; + Ok(fuel_core_compression::ports::MessageInfo { + sender: *message.sender(), + recipient: *message.recipient(), + amount: message.amount(), + data: message.data().clone(), + }) + } +} diff --git a/crates/fuel-core/src/state/generic_database.rs b/crates/fuel-core/src/state/generic_database.rs index 8306df1b1fb..b6f5f2ea464 100644 --- a/crates/fuel-core/src/state/generic_database.rs +++ b/crates/fuel-core/src/state/generic_database.rs @@ -44,7 +44,7 @@ impl GenericDatabase { } pub fn into_inner(self) -> Storage { - self.storage.into_inner() + self.storage.into_storage() } } diff --git a/crates/storage/src/structured_storage.rs b/crates/storage/src/structured_storage.rs index e78e9637484..9a76595bdf8 100644 --- a/crates/storage/src/structured_storage.rs +++ b/crates/storage/src/structured_storage.rs @@ -105,7 +105,7 @@ impl StructuredStorage { } /// Returns the inner storage. - pub fn into_inner(self) -> S { + pub fn into_storage(self) -> S { self.inner } } diff --git a/crates/storage/src/transactional.rs b/crates/storage/src/transactional.rs index 14ec74159ed..34fd040f512 100644 --- a/crates/storage/src/transactional.rs +++ b/crates/storage/src/transactional.rs @@ -112,6 +112,13 @@ impl StorageTransaction { self.inner.changes } + /// Returns the storage and changes to it. + pub fn into_inner(self) -> (S, Changes) { + let storage = self.inner.storage; + let changes = self.inner.changes; + (storage, changes) + } + /// Resets the changes to the storage. pub fn reset_changes(&mut self) { self.inner.changes = Default::default(); @@ -259,10 +266,7 @@ pub trait WriteTransaction { fn write_transaction(&mut self) -> StorageTransaction<&mut Self>; } -impl WriteTransaction for S -where - S: Modifiable, -{ +impl WriteTransaction for S { fn write_transaction(&mut self) -> StorageTransaction<&mut Self> { StorageTransaction::transaction( self, diff --git a/tests/tests/da_compression.rs b/tests/tests/da_compression.rs index 43fce2a27e6..b4f82b39a53 100644 --- a/tests/tests/da_compression.rs +++ b/tests/tests/da_compression.rs @@ -1,7 +1,13 @@ use core::time::Duration; use fuel_core::{ - combined_database::CombinedDatabase, - fuel_core_graphql_api::worker_service::DaCompressionConfig, + chain_config::TESTNET_WALLET_SECRETS, + fuel_core_graphql_api::{ + da_compression::{ + DbTx, + DecompressDbTx, + }, + worker_service::DaCompressionConfig, + }, p2p_test_helpers::*, service::{ Config, @@ -9,11 +15,19 @@ use fuel_core::{ }, }; use fuel_core_client::client::{ + pagination::PaginationRequest, types::TransactionStatus, FuelClient, }; -use fuel_core_compression::VersionedCompressedBlock; +use fuel_core_compression::{ + decompress::decompress, + VersionedCompressedBlock, +}; use fuel_core_poa::signer::SignMode; +use fuel_core_storage::transactional::{ + HistoricalView, + IntoTransaction, +}; use fuel_core_types::{ fuel_asm::{ op, @@ -21,9 +35,11 @@ use fuel_core_types::{ }, fuel_crypto::SecretKey, fuel_tx::{ + Address, GasCosts, Input, TransactionBuilder, + TxPointer, }, secrecy::Secret, }; @@ -31,30 +47,56 @@ use rand::{ rngs::StdRng, SeedableRng, }; +use std::str::FromStr; #[tokio::test] async fn can_fetch_da_compressed_block_from_graphql() { let mut rng = StdRng::seed_from_u64(10); let poa_secret = SecretKey::random(&mut rng); - let db = CombinedDatabase::default(); let mut config = Config::local_node(); config.consensus_signer = SignMode::Key(Secret::new(poa_secret.into())); + config.utxo_validation = true; let compression_config = fuel_core_compression::Config { temporal_registry_retention: Duration::from_secs(3600), }; config.da_compression = DaCompressionConfig::Enabled(compression_config); - let srv = FuelService::from_combined_database(db.clone(), config) - .await - .unwrap(); + let srv = FuelService::new_node(config).await.unwrap(); let client = FuelClient::from(srv.bound_address); + let wallet_secret = + SecretKey::from_str(TESTNET_WALLET_SECRETS[1]).expect("Expected valid secret"); + let wallet_address = Address::from(*wallet_secret.public_key().hash()); + + let coin = client + .coins( + &wallet_address, + None, + PaginationRequest { + cursor: None, + results: 1, + direction: fuel_core_client::client::pagination::PageDirection::Forward, + }, + ) + .await + .expect("Unable to get coins") + .results + .into_iter() + .next() + .expect("Expected at least one coin"); + let tx = TransactionBuilder::script([op::ret(RegId::ONE)].into_iter().collect(), vec![]) .max_fee_limit(0) .script_gas_limit(1_000_000) .with_gas_costs(GasCosts::free()) - .add_fee_input() + .add_unsigned_coin_input( + wallet_secret, + coin.utxo_id, + coin.amount, + coin.asset_id, + TxPointer::new(coin.block_created.into(), coin.tx_created_idx), + ) .finalize_as_transaction(); let status = client.submit_and_await_commit(&tx).await.unwrap(); @@ -66,9 +108,25 @@ async fn can_fetch_da_compressed_block_from_graphql() { } }; - let block = client.da_compressed_block(block_height).await.unwrap(); - let block = block.expect("Unable to get compressed block"); - let _: VersionedCompressedBlock = postcard::from_bytes(&block).unwrap(); + let block = client + .da_compressed_block(block_height) + .await + .unwrap() + .expect("Unable to get compressed block"); + let block: VersionedCompressedBlock = postcard::from_bytes(&block).unwrap(); + + // Reuse the existing offchain db to decompress the block + let db = &srv.shared.database; + let mut tx_inner = db.off_chain().clone().into_transaction(); + let db_tx = DecompressDbTx { + db_tx: DbTx { + db_tx: &mut tx_inner, + }, + onchain_db: db.on_chain().view_at(&0u32.into()).unwrap(), + }; + let decompressed = decompress(compression_config, db_tx, block).await.unwrap(); + + assert!(decompressed.transactions.len() == 2); } #[tokio::test(flavor = "multi_thread")]