Skip to content

Commit

Permalink
Dvir/write tx to file (#1997)
Browse files Browse the repository at this point in the history
* perf(storage)!: write thin transaction outputs to a file

* perf(storage)!: write transactions to a file
  • Loading branch information
DvirYo-starkware authored May 16, 2024
1 parent bf5a159 commit 6390dde
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 23 deletions.
24 changes: 19 additions & 5 deletions crates/papyrus_storage/src/body/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ use crate::body::{EventsTable, EventsTableKey, TransactionIndex};
use crate::db::serialization::{NoVersionValueWrapper, VersionZeroWrapper};
use crate::db::table_types::{DbCursor, DbCursorTrait, SimpleTable, Table};
use crate::db::{DbTransaction, RO};
use crate::{StorageResult, StorageTxn};
use crate::mmap_file::LocationInFile;
use crate::{FileHandlers, StorageResult, StorageTxn};

/// An identifier of an event.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Ord)]
Expand Down Expand Up @@ -159,6 +160,7 @@ impl EventIterByContractAddress<'_> {
/// and finally, by the event index in the transaction output.
pub struct EventIterByEventIndex<'txn, 'env> {
txn: &'txn DbTransaction<'env, RO>,
file_handlers: &'txn FileHandlers<RO>,
tx_current: Option<TransactionOutputsKeyValue>,
tx_cursor: TransactionOutputsTableCursor<'txn>,
events_table: EventsTable<'env>,
Expand Down Expand Up @@ -206,7 +208,12 @@ impl EventIterByEventIndex<'_, '_> {

// There are no more events in the current transaction, so we go over the rest of the
// transactions until we find an event.
self.tx_current = self.tx_cursor.next()?;
let Some((tx_index, location)) = self.tx_cursor.next()? else {
self.tx_current = None;
return Ok(());
};
self.tx_current =
Some((tx_index, self.file_handlers.get_transaction_output_unchecked(location)?));
self.event_index_in_tx_current = EventIndexInTransactionOutput(0);
}

Expand Down Expand Up @@ -248,12 +255,19 @@ impl<'txn, 'env> StorageTxn<'env, RO> {
) -> StorageResult<EventIterByEventIndex<'txn, 'env>> {
let transaction_outputs_table = self.open_table(&self.tables.transaction_outputs)?;
let mut tx_cursor = transaction_outputs_table.cursor(&self.txn)?;
let tx_current = tx_cursor.lower_bound(&event_index.0)?;
let events_table = self.open_table(&self.tables.events)?;
let first_txn_location = tx_cursor.lower_bound(&event_index.0)?;
let first_relevant_transaction = match first_txn_location {
None => None,
Some((tx_index, location)) => {
Some((tx_index, self.file_handlers.get_transaction_output_unchecked(location)?))
}
};

let mut it = EventIterByEventIndex {
txn: &self.txn,
tx_current,
file_handlers: &self.file_handlers,
tx_current: first_relevant_transaction,
tx_cursor,
events_table,
event_index_in_tx_current: event_index.1,
Expand Down Expand Up @@ -476,4 +490,4 @@ type EventsTableCursor<'txn> =
type TransactionOutputsKeyValue = (TransactionIndex, ThinTransactionOutput);
/// A cursor of the transaction outputs table.
type TransactionOutputsTableCursor<'txn> =
DbCursor<'txn, RO, TransactionIndex, VersionZeroWrapper<ThinTransactionOutput>, SimpleTable>;
DbCursor<'txn, RO, TransactionIndex, VersionZeroWrapper<LocationInFile>, SimpleTable>;
95 changes: 80 additions & 15 deletions crates/papyrus_storage/src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,21 @@ use crate::body::events::{EventIndex, ThinTransactionOutput};
use crate::db::serialization::{NoVersionValueWrapper, ValueSerde, VersionZeroWrapper};
use crate::db::table_types::{DbCursorTrait, SimpleTable, Table};
use crate::db::{DbTransaction, TableHandle, TransactionKind, RW};
use crate::{MarkerKind, MarkersTable, StorageError, StorageResult, StorageScope, StorageTxn};
use crate::mmap_file::LocationInFile;
use crate::{
FileHandlers,
MarkerKind,
MarkersTable,
StorageError,
StorageResult,
StorageScope,
StorageTxn,
};

type TransactionsTable<'env> =
TableHandle<'env, TransactionIndex, VersionZeroWrapper<Transaction>, SimpleTable>;
TableHandle<'env, TransactionIndex, VersionZeroWrapper<LocationInFile>, SimpleTable>;
type TransactionOutputsTable<'env> =
TableHandle<'env, TransactionIndex, VersionZeroWrapper<ThinTransactionOutput>, SimpleTable>;
TableHandle<'env, TransactionIndex, VersionZeroWrapper<LocationInFile>, SimpleTable>;
type TransactionHashToIdxTable<'env> =
TableHandle<'env, TransactionHash, NoVersionValueWrapper<TransactionIndex>, SimpleTable>;
type TransactionIdxToHashTable<'env> =
Expand Down Expand Up @@ -175,17 +184,26 @@ impl<'env, Mode: TransactionKind> BodyStorageReader for StorageTxn<'env, Mode> {
transaction_index: TransactionIndex,
) -> StorageResult<Option<Transaction>> {
let transactions_table = self.open_table(&self.tables.transactions)?;
let transaction = transactions_table.get(&self.txn, &transaction_index)?;
Ok(transaction)
let Some(tx_location) = transactions_table.get(&self.txn, &transaction_index)? else {
return Ok(None);
};
let transaction = self.file_handlers.get_transaction_unchecked(tx_location)?;
Ok(Some(transaction))
}

fn get_transaction_output(
&self,
transaction_index: TransactionIndex,
) -> StorageResult<Option<ThinTransactionOutput>> {
let transaction_outputs_table = self.open_table(&self.tables.transaction_outputs)?;
let transaction_output = transaction_outputs_table.get(&self.txn, &transaction_index)?;
Ok(transaction_output)
let Some(tx_output_location) =
transaction_outputs_table.get(&self.txn, &transaction_index)?
else {
return Ok(None);
};
let transaction_output =
self.file_handlers.get_transaction_output_unchecked(tx_output_location)?;
Ok(Some(transaction_output))
}

fn get_transaction_events(
Expand Down Expand Up @@ -237,7 +255,7 @@ impl<'env, Mode: TransactionKind> BodyStorageReader for StorageTxn<'env, Mode> {
block_number: BlockNumber,
) -> StorageResult<Option<Vec<Transaction>>> {
let transactions_table = self.open_table(&self.tables.transactions)?;
self.get_transactions_in_block(block_number, transactions_table)
self.get_transaction_objects_in_block(block_number, transactions_table)
}

fn get_block_transaction_hashes(
Expand All @@ -254,7 +272,7 @@ impl<'env, Mode: TransactionKind> BodyStorageReader for StorageTxn<'env, Mode> {
block_number: BlockNumber,
) -> StorageResult<Option<Vec<ThinTransactionOutput>>> {
let transaction_outputs_table = self.open_table(&self.tables.transaction_outputs)?;
self.get_transactions_in_block(block_number, transaction_outputs_table)
self.get_transaction_outputs_in_block(block_number, transaction_outputs_table)
}

fn get_block_transactions_count(
Expand Down Expand Up @@ -312,6 +330,50 @@ impl<'env, Mode: TransactionKind> StorageTxn<'env, Mode> {
}
Ok(Some(res))
}

// TODO(dvir): remove this function when we have a general table interface also for values
// written to a file.
// Returns the transaction outputs in the given block.
fn get_transaction_outputs_in_block(
&self,
block_number: BlockNumber,
transaction_output_offsets_table: TransactionOutputsTable<'env>,
) -> StorageResult<Option<Vec<ThinTransactionOutput>>> {
let Some(locations) =
self.get_transactions_in_block(block_number, transaction_output_offsets_table)?
else {
return Ok(None);
};

let mut res = Vec::new();
for location in locations {
res.push(self.file_handlers.get_transaction_output_unchecked(location)?);
}

Ok(Some(res))
}

// TODO(dvir): remove this function when we have a general table interface also for values
// written to a file.
// Returns the transactions in the given block.
fn get_transaction_objects_in_block(
&self,
block_number: BlockNumber,
transaction_offsets_table: TransactionsTable<'env>,
) -> StorageResult<Option<Vec<Transaction>>> {
let Some(locations) =
self.get_transactions_in_block(block_number, transaction_offsets_table)?
else {
return Ok(None);
};

let mut res = Vec::new();
for location in locations {
res.push(self.file_handlers.get_transaction_unchecked(location)?);
}

Ok(Some(res))
}
}

impl<'env> BodyStorageWriter for StorageTxn<'env, RW> {
Expand All @@ -332,6 +394,7 @@ impl<'env> BodyStorageWriter for StorageTxn<'env, RW> {
write_transactions(
&block_body,
&self.txn,
&self.file_handlers,
&transactions_table,
&transaction_hash_to_idx_table,
&transaction_idx_to_hash_table,
Expand All @@ -340,6 +403,7 @@ impl<'env> BodyStorageWriter for StorageTxn<'env, RW> {
write_transaction_outputs(
block_body,
&self.txn,
&self.file_handlers,
&transaction_outputs_table,
&events_table,
block_number,
Expand Down Expand Up @@ -428,6 +492,7 @@ impl<'env> BodyStorageWriter for StorageTxn<'env, RW> {
fn write_transactions<'env>(
block_body: &BlockBody,
txn: &DbTransaction<'env, RW>,
file_handlers: &FileHandlers<RW>,
transactions_table: &'env TransactionsTable<'env>,
transaction_hash_to_idx_table: &'env TransactionHashToIdxTable<'env>,
transaction_idx_to_hash_table: &'env TransactionIdxToHashTable<'env>,
Expand All @@ -445,14 +510,16 @@ fn write_transactions<'env>(
tx_hash,
transaction_index,
)?;
transactions_table.insert(txn, &transaction_index, tx)?;
let location = file_handlers.append_transaction(tx);
transactions_table.insert(txn, &transaction_index, &location)?;
}
Ok(())
}

fn write_transaction_outputs<'env>(
block_body: BlockBody,
txn: &DbTransaction<'env, RW>,
file_handlers: &FileHandlers<RW>,
transaction_outputs_table: &'env TransactionOutputsTable<'env>,
events_table: &'env EventsTable<'env>,
block_number: BlockNumber,
Expand All @@ -461,11 +528,9 @@ fn write_transaction_outputs<'env>(
let transaction_index = TransactionIndex(block_number, TransactionOffsetInBlock(index));

write_events(&tx_output, txn, events_table, transaction_index)?;
transaction_outputs_table.insert(
txn,
&transaction_index,
&ThinTransactionOutput::from(tx_output),
)?;
let location =
file_handlers.append_transaction_output(&ThinTransactionOutput::from(tx_output));
transaction_outputs_table.insert(txn, &transaction_index, &location)?;
}
Ok(())
}
Expand Down
65 changes: 62 additions & 3 deletions crates/papyrus_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,10 @@ struct_field_names! {
state_diffs: TableIdentifier<BlockNumber, VersionZeroWrapper<LocationInFile>, SimpleTable>,
transaction_hash_to_idx: TableIdentifier<TransactionHash, NoVersionValueWrapper<TransactionIndex>, SimpleTable>,
transaction_idx_to_hash: TableIdentifier<TransactionIndex, NoVersionValueWrapper<TransactionHash>, SimpleTable>,
transaction_outputs: TableIdentifier<TransactionIndex, VersionZeroWrapper<ThinTransactionOutput>, SimpleTable>,
transactions: TableIdentifier<TransactionIndex, VersionZeroWrapper<Transaction>, SimpleTable>,
// TODO(dvir): write transaction also to a file and combine all the mapping from tx_index to the same table.
// Afterward add option to iterate over tx/output also with tx_hash.
transaction_outputs: TableIdentifier<TransactionIndex, VersionZeroWrapper<LocationInFile>, SimpleTable>,
transactions: TableIdentifier<TransactionIndex, VersionZeroWrapper<LocationInFile>, SimpleTable>,

// Version tables
starknet_version: TableIdentifier<BlockNumber, VersionZeroWrapper<StarknetVersion>, SimpleTable>,
Expand Down Expand Up @@ -670,6 +672,8 @@ struct FileHandlers<Mode: TransactionKind> {
contract_class: FileHandler<VersionZeroWrapper<ContractClass>, Mode>,
casm: FileHandler<VersionZeroWrapper<CasmContractClass>, Mode>,
deprecated_contract_class: FileHandler<VersionZeroWrapper<DeprecatedContractClass>, Mode>,
thin_transaction_output: FileHandler<VersionZeroWrapper<ThinTransactionOutput>, Mode>,
transaction: FileHandler<VersionZeroWrapper<Transaction>, Mode>,
}

impl FileHandlers<RW> {
Expand Down Expand Up @@ -704,6 +708,19 @@ impl FileHandlers<RW> {
self.casm.flush();
self.deprecated_contract_class.flush();
}

// Appends a thin transaction output to the corresponding file and returns its location.
fn append_transaction_output(
&self,
transaction_output: &ThinTransactionOutput,
) -> LocationInFile {
self.clone().thin_transaction_output.append(transaction_output)
}

// Appends a transaction to the corresponding file and returns its location.
fn append_transaction(&self, transaction: &Transaction) -> LocationInFile {
self.clone().transaction.append(transaction)
}
}

impl<Mode: TransactionKind> FileHandlers<Mode> {
Expand All @@ -714,6 +731,8 @@ impl<Mode: TransactionKind> FileHandlers<Mode> {
("contract_class".to_string(), self.contract_class.stats()),
("casm".to_string(), self.casm.stats()),
("deprecated_contract_class".to_string(), self.deprecated_contract_class.stats()),
("thin_transaction_output".to_string(), self.thin_transaction_output.stats()),
("transaction".to_string(), self.transaction.stats()),
])
}

Expand Down Expand Up @@ -754,6 +773,24 @@ impl<Mode: TransactionKind> FileHandlers<Mode> {
msg: format!("DeprecatedContractClass at location {:?} not found.", location),
})
}

// Returns the thin transaction output at the given location or an error in case it doesn't
// exist.
fn get_transaction_output_unchecked(
&self,
location: LocationInFile,
) -> StorageResult<ThinTransactionOutput> {
self.thin_transaction_output.get(location)?.ok_or(StorageError::DBInconsistency {
msg: format!("ThinTransactionOutput at location {:?} not found.", location),
})
}

// Returns the transaction at the given location or an error in case it doesn't exist.
fn get_transaction_unchecked(&self, location: LocationInFile) -> StorageResult<Transaction> {
self.transaction.get(location)?.ok_or(StorageError::DBInconsistency {
msg: format!("Transaction at location {:?} not found.", location),
})
}
}

fn open_storage_files(
Expand All @@ -765,6 +802,7 @@ fn open_storage_files(
let db_transaction = db_reader.begin_ro_txn()?;
let table = db_transaction.open_table(file_offsets_table)?;

// TODO(dvir): consider using a loop here to avoid code duplication.
let thin_state_diff_offset =
table.get(&db_transaction, &OffsetKind::ThinStateDiff)?.unwrap_or_default();
let (thin_state_diff_writer, thin_state_diff_reader) = open_file(
Expand All @@ -788,23 +826,40 @@ fn open_storage_files(
let deprecated_contract_class_offset =
table.get(&db_transaction, &OffsetKind::DeprecatedContractClass)?.unwrap_or_default();
let (deprecated_contract_class_writer, deprecated_contract_class_reader) = open_file(
mmap_file_config,
mmap_file_config.clone(),
db_config.path().join("deprecated_contract_class.dat"),
deprecated_contract_class_offset,
)?;

let transaction_output_offset =
table.get(&db_transaction, &OffsetKind::ThinTransactionOutput)?.unwrap_or_default();
let (transaction_output_writer, transaction_output_reader) = open_file(
mmap_file_config.clone(),
db_config.path().join("transaction_output.dat"),
transaction_output_offset,
)?;

let transaction_offset =
table.get(&db_transaction, &OffsetKind::Transaction)?.unwrap_or_default();
let (transaction_writer, transaction_reader) =
open_file(mmap_file_config, db_config.path().join("transaction.dat"), transaction_offset)?;

Ok((
FileHandlers {
thin_state_diff: thin_state_diff_writer,
contract_class: contract_class_writer,
casm: casm_writer,
deprecated_contract_class: deprecated_contract_class_writer,
thin_transaction_output: transaction_output_writer,
transaction: transaction_writer,
},
FileHandlers {
thin_state_diff: thin_state_diff_reader,
contract_class: contract_class_reader,
casm: casm_reader,
deprecated_contract_class: deprecated_contract_class_reader,
thin_transaction_output: transaction_output_reader,
transaction: transaction_reader,
},
))
}
Expand All @@ -820,6 +875,10 @@ pub enum OffsetKind {
Casm,
/// A deprecated contract class file.
DeprecatedContractClass,
/// A thin transaction output file.
ThinTransactionOutput,
/// A transaction file.
Transaction,
}

/// A storage query. Used for benchmarking in the storage_benchmark binary.
Expand Down
2 changes: 2 additions & 0 deletions crates/papyrus_storage/src/serialization/serializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ auto_storage_serde! {
ContractClass = 1,
Casm = 2,
DeprecatedContractClass = 3,
ThinTransactionOutput = 4,
Transaction = 5,
}
pub struct PaymasterData(pub Vec<StarkFelt>);
pub struct PoseidonHash(pub StarkFelt);
Expand Down

0 comments on commit 6390dde

Please sign in to comment.