diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index a849d08af385f6..ff430931c9ec48 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -27,6 +27,7 @@ use { std::{ collections::{btree_map::BTreeMap, HashSet}, fmt::Debug, + num::NonZeroUsize, ops::{ Bound, Bound::{Excluded, Included, Unbounded}, @@ -45,10 +46,11 @@ pub const ITER_BATCH_SIZE: usize = 1000; pub const BINS_DEFAULT: usize = 8192; pub const BINS_FOR_TESTING: usize = 2; // we want > 1, but each bin is a few disk files with a disk based index, so fewer is better pub const BINS_FOR_BENCHMARKS: usize = 8192; -pub const FLUSH_THREADS_TESTING: usize = 1; +// The unsafe is safe because we're using a fixed, known non-zero value +pub const FLUSH_THREADS_TESTING: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndexConfig { bins: Some(BINS_FOR_TESTING), - flush_threads: Some(FLUSH_THREADS_TESTING), + num_flush_threads: Some(FLUSH_THREADS_TESTING), drives: None, index_limit_mb: IndexLimitMb::Unlimited, ages_to_stay_in_cache: None, @@ -57,7 +59,7 @@ pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndex }; pub const ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS: AccountsIndexConfig = AccountsIndexConfig { bins: Some(BINS_FOR_BENCHMARKS), - flush_threads: Some(FLUSH_THREADS_TESTING), + num_flush_threads: Some(FLUSH_THREADS_TESTING), drives: None, index_limit_mb: IndexLimitMb::Unlimited, ages_to_stay_in_cache: None, @@ -218,7 +220,7 @@ pub enum IndexLimitMb { #[derive(Debug, Default, Clone)] pub struct AccountsIndexConfig { pub bins: Option, - pub flush_threads: Option, + pub num_flush_threads: Option, pub drives: Option>, pub index_limit_mb: IndexLimitMb, pub ages_to_stay_in_cache: Option, @@ -227,6 +229,10 @@ pub struct AccountsIndexConfig { pub started_from_validator: bool, } +pub fn default_num_flush_threads() -> NonZeroUsize { + NonZeroUsize::new(std::cmp::max(2, num_cpus::get() / 4)).expect("non-zero system threads") +} + #[derive(Debug, Default, Clone)] pub struct AccountSecondaryIndexes { pub keys: Option, diff --git a/accounts-db/src/accounts_index_storage.rs b/accounts-db/src/accounts_index_storage.rs index 3a654c84c25f97..48c5f55c5d9f64 100644 --- a/accounts-db/src/accounts_index_storage.rs +++ b/accounts-db/src/accounts_index_storage.rs @@ -1,7 +1,7 @@ use { crate::{ accounts_index::{ - in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue, + self, in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue, IndexValue, }, bucket_map_holder::BucketMapHolder, @@ -9,6 +9,7 @@ use { }, std::{ fmt::Debug, + num::NonZeroUsize, sync::{ atomic::{AtomicBool, Ordering}, Arc, Mutex, @@ -58,14 +59,14 @@ impl BgThreads { fn new + Into>( storage: &Arc>, in_mem: &[Arc>], - threads: usize, + threads: NonZeroUsize, can_advance_age: bool, exit: Arc, ) -> Self { // stop signal used for THIS batch of bg threads let local_exit = Arc::new(AtomicBool::default()); let handles = Some( - (0..threads) + (0..threads.get()) .map(|idx| { // the first thread we start is special let can_advance_age = can_advance_age && idx == 0; @@ -123,7 +124,7 @@ impl + Into> AccountsIndexStorage< *self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new( &self.storage, &self.in_mem, - Self::num_threads(), + accounts_index::default_num_flush_threads(), false, // cannot advance age from any of these threads self.exit.clone(), )); @@ -151,25 +152,21 @@ impl + Into> AccountsIndexStorage< self.in_mem.iter().for_each(|mem| mem.shrink_to_fit()) } - fn num_threads() -> usize { - std::cmp::max(2, num_cpus::get() / 4) - } - /// allocate BucketMapHolder and InMemAccountsIndex[] pub fn new(bins: usize, config: &Option, exit: Arc) -> Self { - let threads = config + let num_flush_threads = config .as_ref() - .and_then(|config| config.flush_threads) - .unwrap_or_else(Self::num_threads); + .and_then(|config| config.num_flush_threads) + .unwrap_or_else(accounts_index::default_num_flush_threads); - let storage = Arc::new(BucketMapHolder::new(bins, config, threads)); + let storage = Arc::new(BucketMapHolder::new(bins, config, num_flush_threads.get())); let in_mem = (0..bins) .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin))) .collect::>(); Self { - _bg_threads: BgThreads::new(&storage, &in_mem, threads, true, exit.clone()), + _bg_threads: BgThreads::new(&storage, &in_mem, num_flush_threads, true, exit.clone()), storage, in_mem, startup_worker_threads: Mutex::default(), diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 43c5f663c03447..f77cdec1491207 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -2,7 +2,7 @@ use { clap::{value_t_or_exit, Arg, ArgMatches}, - solana_accounts_db::accounts_db, + solana_accounts_db::{accounts_db, accounts_index}, solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range}, solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, std::{num::NonZeroUsize, ops::RangeInclusive}, @@ -13,6 +13,7 @@ pub struct DefaultThreadArgs { pub accounts_db_clean_threads: String, pub accounts_db_hash_threads: String, pub accounts_db_process_threads: String, + pub accounts_index_flush_threads: String, pub ip_echo_server_threads: String, pub replay_forks_threads: String, pub replay_transactions_threads: String, @@ -26,6 +27,8 @@ impl Default for DefaultThreadArgs { accounts_db_clean_threads: AccountsDbCleanThreadsArg::bounded_default().to_string(), accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(), accounts_db_process_threads: AccountsDbProcessThreadsArg::bounded_default().to_string(), + accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default() + .to_string(), ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(), replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(), replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default() @@ -41,6 +44,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { new_thread_arg::(&defaults.accounts_db_clean_threads), new_thread_arg::(&defaults.accounts_db_hash_threads), new_thread_arg::(&defaults.accounts_db_process_threads), + new_thread_arg::(&defaults.accounts_db_process_threads), new_thread_arg::(&defaults.ip_echo_server_threads), new_thread_arg::(&defaults.replay_forks_threads), new_thread_arg::(&defaults.replay_transactions_threads), @@ -64,6 +68,7 @@ pub struct NumThreadConfig { pub accounts_db_clean_threads: NonZeroUsize, pub accounts_db_hash_threads: NonZeroUsize, pub accounts_db_process_threads: NonZeroUsize, + pub accounts_index_flush_threads: NonZeroUsize, pub ip_echo_server_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, @@ -88,6 +93,11 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { AccountsDbProcessThreadsArg::NAME, NonZeroUsize ), + accounts_index_flush_threads: value_t_or_exit!( + matches, + AccountsIndexFlushThreadsArg::NAME, + NonZeroUsize + ), ip_echo_server_threads: value_t_or_exit!( matches, IpEchoServerThreadsArg::NAME, @@ -177,6 +187,17 @@ impl ThreadArg for AccountsDbProcessThreadsArg { } } +struct AccountsIndexFlushThreadsArg; +impl ThreadArg for AccountsIndexFlushThreadsArg { + const NAME: &'static str = "accounts_index_flush_threads"; + const LONG_NAME: &'static str = "accounts-index-flush-threads"; + const HELP: &'static str = "Number of threads to use for flushing the accounts index"; + + fn default() -> usize { + accounts_index::default_num_flush_threads().get() + } +} + struct IpEchoServerThreadsArg; impl ThreadArg for IpEchoServerThreadsArg { const NAME: &'static str = "ip_echo_server_threads"; diff --git a/validator/src/main.rs b/validator/src/main.rs index 6108ba716d77e6..06cd91c19b4b84 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -913,6 +913,7 @@ pub fn main() { accounts_db_clean_threads, accounts_db_hash_threads, accounts_db_process_threads, + accounts_index_flush_threads, ip_echo_server_threads, replay_forks_threads, replay_transactions_threads, @@ -1183,6 +1184,7 @@ pub fn main() { let mut accounts_index_config = AccountsIndexConfig { started_from_validator: true, // this is the only place this is set + num_flush_threads: Some(accounts_index_flush_threads), ..AccountsIndexConfig::default() }; if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) {