diff --git a/Cargo.lock b/Cargo.lock index 51b793b3ddff8e..4b79fb310050b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4342,9 +4342,12 @@ dependencies = [ [[package]] name = "prio-graph" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6492a75ca57066a4479af45efa302bed448680182b0563f96300645d5f896097" +checksum = "2f28921629370a46cf564f6ba1828bd8d1c97f7fad4ee9d1c6438f92feed6b8d" +dependencies = [ + "ahash 0.8.11", +] [[package]] name = "proc-macro-crate" diff --git a/Cargo.toml b/Cargo.toml index 5275cb71d8989b..6b243946b7916d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -178,10 +178,7 @@ members = [ "zk-token-sdk", ] -exclude = [ - "programs/sbf", - "svm/tests/example-programs", -] +exclude = ["programs/sbf", "svm/tests/example-programs"] resolver = "2" @@ -337,7 +334,7 @@ percentage = "0.1.0" pickledb = { version = "0.5.1", default-features = false } predicates = "2.1" pretty-hex = "0.3.0" -prio-graph = "0.2.1" +prio-graph = "0.3.0" proc-macro2 = "1.0.89" proptest = "1.5" prost = "0.11.9" diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 9f6fcc8388a364..2687c04d271991 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -18,18 +18,34 @@ use { }, crossbeam_channel::{Receiver, Sender, TryRecvError}, itertools::izip, - prio_graph::{AccessKind, PrioGraph}, + prio_graph::{AccessKind, GraphNode, PrioGraph}, solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS, solana_measure::measure_us, solana_sdk::{pubkey::Pubkey, saturating_add_assign, transaction::SanitizedTransaction}, }; +#[inline(always)] +fn passthrough_priority( + id: &TransactionPriorityId, + _graph_node: &GraphNode, +) -> TransactionPriorityId { + *id +} + +type SchedulerPrioGraph = PrioGraph< + TransactionPriorityId, + Pubkey, + TransactionPriorityId, + fn(&TransactionPriorityId, &GraphNode) -> TransactionPriorityId, +>; + pub(crate) struct PrioGraphScheduler { in_flight_tracker: InFlightTracker, account_locks: ThreadAwareAccountLocks, consume_work_senders: Vec>, finished_consume_work_receiver: Receiver, look_ahead_window_size: usize, + prio_graph: SchedulerPrioGraph, } impl PrioGraphScheduler { @@ -44,6 +60,7 @@ impl PrioGraphScheduler { consume_work_senders, finished_consume_work_receiver, look_ahead_window_size: 2048, + prio_graph: PrioGraph::new(passthrough_priority), } } @@ -94,7 +111,6 @@ impl PrioGraphScheduler { // these transactions to be scheduled before them. let mut unschedulable_ids = Vec::new(); let mut blocking_locks = ReadWriteAccountSet::default(); - let mut prio_graph = PrioGraph::new(|id: &TransactionPriorityId, _graph_node| *id); // Track metrics on filter. let mut num_filtered_out: usize = 0; @@ -150,7 +166,7 @@ impl PrioGraphScheduler { // Create the initial look-ahead window. // Check transactions against filter, remove from container if it fails. - chunked_pops(container, &mut prio_graph, &mut window_budget); + chunked_pops(container, &mut self.prio_graph, &mut window_budget); let mut unblock_this_batch = Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH); @@ -160,11 +176,11 @@ impl PrioGraphScheduler { let mut num_unschedulable: usize = 0; while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS { // If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule. - if prio_graph.is_empty() { + if self.prio_graph.is_empty() { break; } - while let Some(id) = prio_graph.pop() { + while let Some(id) = self.prio_graph.pop() { unblock_this_batch.push(id); // Should always be in the container, during initial testing phase panic. @@ -242,11 +258,11 @@ impl PrioGraphScheduler { // Refresh window budget and do chunked pops saturating_add_assign!(window_budget, unblock_this_batch.len()); - chunked_pops(container, &mut prio_graph, &mut window_budget); + chunked_pops(container, &mut self.prio_graph, &mut window_budget); // Unblock all transactions that were blocked by the transactions that were just sent. for id in unblock_this_batch.drain(..) { - prio_graph.unblock(&id); + self.prio_graph.unblock(&id); } } @@ -259,9 +275,13 @@ impl PrioGraphScheduler { } // Push remaining transactions back into the container - while let Some((id, _)) = prio_graph.pop_and_unblock() { + while let Some((id, _)) = self.prio_graph.pop_and_unblock() { container.push_id_into_queue(id); } + // No more remaining items in the queue. + // Clear here to make sure the next scheduling pass starts fresh + // without detecting any conflicts. + self.prio_graph.clear(); assert_eq!( num_scheduled, num_sent, diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 6340097c96f244..3d30e98328d0ab 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -3641,9 +3641,12 @@ dependencies = [ [[package]] name = "prio-graph" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6492a75ca57066a4479af45efa302bed448680182b0563f96300645d5f896097" +checksum = "2f28921629370a46cf564f6ba1828bd8d1c97f7fad4ee9d1c6438f92feed6b8d" +dependencies = [ + "ahash 0.8.11", +] [[package]] name = "proc-macro-crate"