From 4aa3fe59eb6c7d698f7c5fc1ef2f9b5709a80ddc Mon Sep 17 00:00:00 2001 From: Ayelet Zilber Date: Thu, 18 Jul 2024 11:17:23 +0300 Subject: [PATCH] feat(mempool): get txs return requested txs after replenshing queue --- crates/mempool/src/mempool.rs | 19 +++++++++++++++---- crates/mempool/src/mempool_test.rs | 25 ++++++++++++------------- crates/mempool/src/transaction_queue.rs | 4 ++++ 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/crates/mempool/src/mempool.rs b/crates/mempool/src/mempool.rs index 5d142a495..eb9f999ae 100644 --- a/crates/mempool/src/mempool.rs +++ b/crates/mempool/src/mempool.rs @@ -49,11 +49,22 @@ impl Mempool { // library. pub fn get_txs(&mut self, n_txs: usize) -> MempoolResult> { let mut eligible_txs: Vec = Vec::with_capacity(n_txs); - for tx_hash in self.tx_queue.pop_chunk(n_txs) { - let tx = self.tx_pool.remove(tx_hash)?; - eligible_txs.push(tx); + let mut n_remaining_txs = n_txs; + + while n_remaining_txs > 0 && !self.tx_queue.is_empty() { + let mut popped_txs = Vec::with_capacity(n_remaining_txs); + + for tx_hash in self.tx_queue.pop_chunk(n_remaining_txs) { + let tx = self.tx_pool.remove(tx_hash)?; + popped_txs.push(tx); + } + + self.enqueue_next_eligible_txs(&popped_txs); + + n_remaining_txs -= popped_txs.len(); + + eligible_txs.extend(popped_txs); } - self.enqueue_next_eligible_txs(&eligible_txs); Ok(eligible_txs) } diff --git a/crates/mempool/src/mempool_test.rs b/crates/mempool/src/mempool_test.rs index c3d528f78..e4aaa4236 100644 --- a/crates/mempool/src/mempool_test.rs +++ b/crates/mempool/src/mempool_test.rs @@ -177,24 +177,24 @@ fn test_get_txs(#[case] requested_txs: usize) { #[rstest] fn test_get_txs_multi_nonce() { // Setup. - let tx_address_0_nonce_0 = + let tx_nonce_0 = add_tx_input!(tx_hash: 1, sender_address: "0x0", tx_nonce: 0_u8, account_nonce: 0_u8).tx; - let tx_address_0_nonce_1 = + let tx_nonce_1 = add_tx_input!(tx_hash: 2, sender_address: "0x0", tx_nonce: 1_u8, account_nonce: 0_u8).tx; + let tx_nonce_2 = + add_tx_input!(tx_hash: 3, sender_address: "0x0", tx_nonce: 2_u8, account_nonce: 0_u8).tx; - let queue_txs = [TransactionReference::new(&tx_address_0_nonce_0)]; - let pool_txs = [tx_address_0_nonce_0.clone(), tx_address_0_nonce_1.clone()]; + let queue_txs = [TransactionReference::new(&tx_nonce_0)]; + let pool_txs = + [tx_nonce_0.clone(), tx_nonce_1.clone(), tx_nonce_2.clone()]; let mut mempool: Mempool = MempoolState::new(pool_txs, queue_txs).into(); // Test. - let txs = mempool.get_txs(2).unwrap(); + let txs = mempool.get_txs(3).unwrap(); - // Assert that the account's next tx was added the queue. - // TODO(Ayelet): all transactions should be returned after replenishing. - assert_eq!(txs, &[tx_address_0_nonce_0]); - let expected_queue_txs = [TransactionReference::new(&tx_address_0_nonce_1)]; - let expected_pool_txs = [tx_address_0_nonce_1]; - let expected_mempool_state = MempoolState::new(expected_pool_txs, expected_queue_txs); + // Assert: all transactions are returned. + assert_eq!(txs, &[tx_nonce_0, tx_nonce_1, tx_nonce_2]); + let expected_mempool_state = MempoolState::new([], []); expected_mempool_state.assert_eq_mempool_state(&mempool); } @@ -393,7 +393,6 @@ fn test_flow_filling_holes(mut mempool: Mempool) { add_tx(&mut mempool, &input_address_0_nonce_0); let txs = mempool.get_txs(2).unwrap(); - // TODO(Ayelet): all transactions should be returned after replenishing. // Assert: all remaining transactions are returned. - assert_eq!(txs, &[input_address_0_nonce_0.tx]); + assert_eq!(txs, &[input_address_0_nonce_0.tx, input_address_0_nonce_1.tx]); } diff --git a/crates/mempool/src/transaction_queue.rs b/crates/mempool/src/transaction_queue.rs index 4ae982fc5..e95745d15 100644 --- a/crates/mempool/src/transaction_queue.rs +++ b/crates/mempool/src/transaction_queue.rs @@ -63,6 +63,10 @@ impl TransactionQueue { } false } + + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } } /// Encapsulates a transaction reference to assess its order (i.e., priority).