Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Add support for multi get operation for database queries #2396

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c77045f
feat: Multi-get with boxed iterators
netrome Oct 24, 2024
e66db04
feat: Multi get on bĺueprint
netrome Oct 24, 2024
9eace87
feat: get_multi on RocksDB
netrome Oct 24, 2024
8ba0c9d
feat: Use local fuel-vm
netrome Oct 24, 2024
76b919b
feat: Implementation for structured storage
netrome Oct 24, 2024
2333dfb
feat: Use multi-get
netrome Oct 25, 2024
49994b9
feat: Messages impl
netrome Oct 25, 2024
2fb9e88
feat: Don't rely on modified StorageInspect
netrome Oct 25, 2024
b2c2632
wip: Introduce specific StorageBatchInspect trait
netrome Oct 25, 2024
df63d48
wip: Use specific trait
netrome Oct 25, 2024
4d5ca7d
feat: Use boxed iterator
netrome Oct 25, 2024
36a50bb
feat: Use multi-get when getting full block
netrome Oct 25, 2024
6275b36
feat: Use multi-get when getting coins
netrome Oct 25, 2024
e1e50fa
feat: Use multi-get when getting transactions
netrome Oct 26, 2024
f76d210
refactor: Rename multi_get -> get_batch
netrome Oct 29, 2024
4feadfd
feat: Implement multi-get for more backends
netrome Oct 29, 2024
e141f56
chore: Clean up comments and add docstrings
netrome Oct 29, 2024
8e3fd0f
chore: Add changelog entry
netrome Oct 29, 2024
39664bb
fix: Take slice instead of Vec as input to function
netrome Oct 29, 2024
52eca64
test: Assert the returned tx is in the expected place in get_full_blo…
netrome Oct 29, 2024
5b0e975
fix: Typo
netrome Oct 29, 2024
bc955fd
fix: Typo
netrome Oct 29, 2024
491c452
feat: Add metrics for RocksDB get_batch implementation
netrome Oct 29, 2024
676a5d4
fix: Clippy
netrome Oct 29, 2024
8b35460
fix: Remove stale TODO comment
netrome Oct 31, 2024
f697707
Revert "feat: Add metrics for RocksDB get_batch implementation"
netrome Oct 31, 2024
3d0a71f
refactor: Return iterator over results in old multi_get function and …
netrome Oct 31, 2024
5de80b7
perf: Only fall back to fetch off chain transactions if any result is…
netrome Oct 31, 2024
d7a30c8
fix: Whitespace
netrome Oct 31, 2024
f27bdbb
feat: Take Cow as keys in `KeyValueInspect::get_batch`
netrome Oct 31, 2024
9ea8894
Merge branch 'master' into 2344-add-support-for-multi-get-operation-i…
netrome Oct 31, 2024
2c2d222
Proposals to multi get PR (#2419)
xgreenx Oct 31, 2024
df33ef2
feat: Simplify `DatabaseCoins` port
netrome Oct 31, 2024
79e42da
Another proposals to multi get PR (#2420)
xgreenx Oct 31, 2024
cbb6efc
feat: Don't require `Send` in BoxedIter
netrome Nov 1, 2024
07f9b94
Merge branch 'master' into 2344-add-support-for-multi-get-operation-i…
netrome Nov 1, 2024
75fedc7
fix: Cargo fmt
netrome Nov 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.

#### Breaking
- [2396](https://github.com/FuelLabs/fuel-core/pull/2396): Return `StorageResult<Transaction>` in `OffChainDatabase::old_transaction`.

### Added
- [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`).
- [2347](https://github.com/FuelLabs/fuel-core/pull/2364): Add activity concept in order to protect against infinitely increasing DA gas price scenarios
- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future.
- [2386](https://github.com/FuelLabs/fuel-core/pull/2386): Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit.
- [2396](https://github.com/FuelLabs/fuel-core/pull/2396): Add support for multi get operation for database queries.

## [Version 0.40.0]

Expand Down
4 changes: 2 additions & 2 deletions crates/client/src/client/schema/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl LowerHex for TxPointer {
}
}

#[derive(cynic::Scalar, Debug, Clone)]
#[derive(cynic::Scalar, Debug, Clone, PartialEq, Eq)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes needed? I can't seem to find a place where these additional derives are necessary.

pub struct HexString(pub Bytes);

impl From<HexString> for Vec<u8> {
Expand All @@ -194,7 +194,7 @@ impl Deref for HexString {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

pub struct Bytes(pub Vec<u8>);

impl FromStr for Bytes {
Expand Down
25 changes: 11 additions & 14 deletions crates/fuel-core/src/database/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
};
use fuel_core_storage::{
iter::{
IntoBoxedIter,
IterDirection,
IteratorOverTable,
},
Expand All @@ -23,6 +24,7 @@ use fuel_core_storage::{
Error as StorageError,
Result as StorageResult,
StorageAsRef,
StorageBatchInspect,
};
use fuel_core_types::{
blockchain::{
Expand All @@ -37,7 +39,6 @@ use fuel_core_types::{
fuel_types::BlockHeight,
};
use itertools::Itertools;
use std::borrow::Cow;

impl OffChainIterableKeyValueView {
pub fn get_block_height(&self, id: &BlockId) -> StorageResult<Option<BlockHeight>> {
Expand Down Expand Up @@ -65,20 +66,16 @@ impl OnChainIterableKeyValueView {
/// Retrieve the full block and all associated transactions
pub fn get_full_block(&self, height: &BlockHeight) -> StorageResult<Option<Block>> {
let db_block = self.storage::<FuelBlocks>().get(height)?;

if let Some(block) = db_block {
// fetch all the transactions
// TODO: Use multiget when it's implemented.
// https://github.com/FuelLabs/fuel-core/issues/2344
let txs = block
.transactions()
.iter()
.map(|tx_id| {
self.storage::<Transactions>()
.get(tx_id)
.and_then(|tx| tx.ok_or(not_found!(Transactions)))
.map(Cow::into_owned)
})
.try_collect()?;
let transaction_ids = block.transactions().iter().into_boxed();
let txs = <Self as StorageBatchInspect<Transactions>>::get_batch(
self,
transaction_ids,
)
.map(|res| res.and_then(|opt| opt.ok_or(not_found!(Transactions))))
.try_collect()?;

Ok(Some(block.into_owned().uncompress(txs)))
} else {
Ok(None)
Expand Down
48 changes: 30 additions & 18 deletions crates/fuel-core/src/graphql_api/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use fuel_core_storage::{
IntoBoxedIter,
IterDirection,
},
not_found,
tables::Transactions,
transactional::AtomicView,
Error as StorageError,
IsNotFound,
Expand Down Expand Up @@ -65,6 +63,7 @@ use fuel_core_types::{
use futures::Stream;
use std::{
borrow::Cow,
collections::BTreeMap,
sync::Arc,
};

Expand Down Expand Up @@ -141,29 +140,42 @@ impl ReadView {
pub fn transaction(&self, tx_id: &TxId) -> StorageResult<Transaction> {
let result = self.on_chain.transaction(tx_id);
if result.is_not_found() {
if let Some(tx) = self.off_chain.old_transaction(tx_id)? {
Ok(tx)
} else {
Err(not_found!(Transactions))
}
self.off_chain.old_transaction(tx_id)
} else {
result
}
}

pub async fn transactions(
&self,
tx_ids: Vec<TxId>,
) -> Vec<StorageResult<Transaction>> {
// TODO: Use multiget when it's implemented.
// https://github.com/FuelLabs/fuel-core/issues/2344
let result = tx_ids
pub async fn transactions(&self, tx_ids: &[TxId]) -> Vec<StorageResult<Transaction>> {
let on_chain_results: BTreeMap<_, _> = tx_ids
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is nice that this function handles this hard use case.

In most cases, we have only on_chain data unless we did regenesis. It would be nice if this did not affect performance for the primary use case.

Can we first request values from the on-chain database, and if any of them are not found, we will fall back into the logic with enumerate and BTreeMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I'll do it.

Copy link
Contributor Author

@netrome netrome Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5de80b7 let me know what you think.

.iter()
.enumerate()
.zip(self.on_chain.transactions(tx_ids.iter().into_boxed()))
.collect();

let off_chain_indexed_txids: Vec<_> = on_chain_results
.iter()
.map(|tx_id| self.transaction(tx_id))
.collect::<Vec<_>>();
// Give a chance to other tasks to run.
.filter_map(|(indexed_tx_id, result)| {
result.is_not_found().then_some(*indexed_tx_id)
})
.collect();

let off_chain_results = off_chain_indexed_txids.iter().copied().zip(
self.off_chain.old_transactions(
off_chain_indexed_txids
.iter()
.map(|(_, tx_id)| *tx_id)
.into_boxed(),
),
);

let mut results = on_chain_results;
results.extend(off_chain_results);

// Give a chance for other tasks to run.
tokio::task::yield_now().await;
result

results.into_values().collect()
}

pub fn block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock> {
Expand Down
49 changes: 41 additions & 8 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use fuel_core_storage::{
},
Error as StorageError,
Result as StorageResult,
StorageBatchInspect,
StorageInspect,
StorageRead,
};
Expand All @@ -30,12 +31,15 @@ use fuel_core_types::{
DaBlockHeight,
},
},
entities::relayer::{
message::{
MerkleProof,
Message,
entities::{
coins::coin::Coin,
relayer::{
message::{
MerkleProof,
Message,
},
transaction::RelayedTransactionStatus,
},
transaction::RelayedTransactionStatus,
},
fuel_tx::{
Bytes32,
Expand Down Expand Up @@ -104,7 +108,12 @@ pub trait OffChainDatabase: Send + Sync {

fn old_block_consensus(&self, height: &BlockHeight) -> StorageResult<Consensus>;

fn old_transaction(&self, id: &TxId) -> StorageResult<Option<Transaction>>;
fn old_transaction(&self, id: &TxId) -> StorageResult<Transaction>;

fn old_transactions<'a>(
&'a self,
ids: BoxedIter<'a, &'a TxId>,
) -> BoxedIter<'a, StorageResult<Transaction>>;

fn relayed_tx_status(
&self,
Expand All @@ -120,7 +129,7 @@ pub trait OnChainDatabase:
+ Sync
+ DatabaseBlocks
+ DatabaseMessages
+ StorageInspect<Coins, Error = StorageError>
+ DatabaseCoins
+ StorageRead<BlobData, Error = StorageError>
+ StorageInspect<StateTransitionBytecodeVersions, Error = StorageError>
+ StorageInspect<UploadedBytecodes, Error = StorageError>
Expand All @@ -135,6 +144,12 @@ pub trait DatabaseBlocks {
/// Get a transaction by its id.
fn transaction(&self, tx_id: &TxId) -> StorageResult<Transaction>;

/// Get a batch of transactions by their ids.
fn transactions<'a>(
&'a self,
tx_ids: BoxedIter<'a, &'a TxId>,
) -> BoxedIter<'a, StorageResult<Transaction>>;

/// Get a block by its height.
fn block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock>;

Expand All @@ -159,13 +174,20 @@ pub trait DatabaseDaCompressedBlocks {
}

/// Trait that specifies all the getters required for messages.
pub trait DatabaseMessages: StorageInspect<Messages, Error = StorageError> {
pub trait DatabaseMessages:
StorageInspect<Messages, Error = StorageError> + StorageBatchInspect<Messages>
{
fn all_messages(
&self,
start_message_id: Option<Nonce>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<Message>>;

fn message_batch<'a>(
&'a self,
ids: BoxedIter<'a, &'a Nonce>,
) -> BoxedIter<'a, StorageResult<Message>>;

fn message_exists(&self, nonce: &Nonce) -> StorageResult<bool>;
}

Expand All @@ -176,6 +198,17 @@ pub trait DatabaseRelayedTransactions {
) -> StorageResult<Option<RelayedTransactionStatus>>;
}

/// Trait that specifies all the getters required for coins
pub trait DatabaseCoins:
StorageInspect<Coins, Error = StorageError> + StorageBatchInspect<Coins>
{
fn coin(&self, utxo_id: UtxoId) -> StorageResult<Coin>;
fn coins<'a>(
&'a self,
utxo_ids: BoxedIter<'a, &'a UtxoId>,
) -> BoxedIter<'a, StorageResult<Coin>>;
}

/// Trait that specifies all the getters required for contract.
pub trait DatabaseContracts:
StorageInspect<ContractsRawCode, Error = StorageError>
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/query/balance/asset_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<'a> AssetsQuery<'a> {
})
.try_filter_map(move |chunk| async move {
let chunk = database.messages(chunk).await;
Ok::<_, StorageError>(Some(futures::stream::iter(chunk)))
Ok::<_, StorageError>(Some(chunk))
})
.try_flatten()
.filter(|result| {
Expand Down
27 changes: 9 additions & 18 deletions crates/fuel-core/src/query/coin.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::fuel_core_graphql_api::database::ReadView;
use fuel_core_storage::{
iter::IterDirection,
not_found,
tables::Coins,
iter::{
IntoBoxedIter,
IterDirection,
},
Error as StorageError,
Result as StorageResult,
StorageAsRef,
};
use fuel_core_types::{
entities::coins::coin::Coin,
Expand All @@ -20,27 +20,18 @@ use futures::{

impl ReadView {
pub fn coin(&self, utxo_id: UtxoId) -> StorageResult<Coin> {
let coin = self
.on_chain
.as_ref()
.storage::<Coins>()
.get(&utxo_id)?
.ok_or(not_found!(Coins))?
.into_owned();

Ok(coin.uncompress(utxo_id))
self.on_chain.coin(utxo_id)
}

pub async fn coins(
&self,
utxo_ids: Vec<UtxoId>,
) -> impl Iterator<Item = StorageResult<Coin>> + '_ {
// TODO: Use multiget when it's implemented.
// https://github.com/FuelLabs/fuel-core/issues/2344
let coins = utxo_ids.into_iter().map(|id| self.coin(id));
// Give a chance to other tasks to run.
let coins: Vec<_> = self.on_chain.coins(utxo_ids.iter().into_boxed()).collect();

// Give a chance for other tasks to run.
tokio::task::yield_now().await;
coins
coins.into_iter()
}

pub fn owned_coins(
Expand Down
18 changes: 11 additions & 7 deletions crates/fuel-core/src/query/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::fuel_core_graphql_api::database::ReadView;
use fuel_core_storage::{
iter::{
BoxedIter,
IntoBoxedIter,
IterDirection,
},
not_found,
Expand Down Expand Up @@ -81,13 +82,16 @@ impl ReadView {
pub async fn messages(
&self,
ids: Vec<Nonce>,
) -> impl Iterator<Item = StorageResult<Message>> + '_ {
// TODO: Use multiget when it's implemented.
// https://github.com/FuelLabs/fuel-core/issues/2344
let messages = ids.into_iter().map(|id| self.message(&id));
// Give a chance to other tasks to run.
) -> impl Stream<Item = StorageResult<Message>> {
let messages: Vec<_> = self
.on_chain
.message_batch(ids.iter().into_boxed())
.collect();

// Give a chance for other tasks to run.
tokio::task::yield_now().await;
messages

futures::stream::iter(messages)
}

pub fn owned_messages<'a>(
Expand All @@ -104,7 +108,7 @@ impl ReadView {
})
.try_filter_map(move |chunk| async move {
let chunk = self.messages(chunk).await;
Ok::<_, StorageError>(Some(futures::stream::iter(chunk)))
Ok::<_, StorageError>(Some(chunk))
})
.try_flatten()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/query/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl ReadView {
})
.try_filter_map(move |chunk| async move {
let tx_ids = chunk.iter().map(|(_, tx_id)| *tx_id).collect::<Vec<_>>();
let txs = self.transactions(tx_ids).await;
let txs = self.transactions(&tx_ids).await;
let txs = txs
.into_iter()
.zip(chunk)
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/schema/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl Block {
.filter_map(move |tx_ids: Vec<TxId>| {
let async_query = query.as_ref().clone();
async move {
let txs = async_query.transactions(tx_ids.clone()).await;
let txs = async_query.transactions(&tx_ids).await;
let txs = txs
.into_iter()
.zip(tx_ids.into_iter())
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl TxQuery {
.iter()
.map(|sorted| sorted.tx_id.0)
.collect::<Vec<_>>();
let txs = async_query.transactions(tx_ids).await;
let txs = async_query.transactions(&tx_ids).await;
let txs = txs.into_iter().zip(chunk.into_iter()).map(
|(result, sorted)| {
result.map(|tx| {
Expand Down
Loading
Loading