diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 29236a998455ba..d69cd895ccafd8 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -510,6 +510,8 @@ pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig { storage_access: StorageAccess::Mmap, scan_filter_for_shrinking: ScanFilter::OnlyAbnormalWithVerify, enable_experimental_accumulator_hash: false, + num_clean_threads: None, + num_foreground_threads: None, num_hash_threads: None, }; pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig { @@ -528,6 +530,8 @@ pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig storage_access: StorageAccess::Mmap, scan_filter_for_shrinking: ScanFilter::OnlyAbnormalWithVerify, enable_experimental_accumulator_hash: false, + num_clean_threads: None, + num_foreground_threads: None, num_hash_threads: None, }; @@ -640,6 +644,10 @@ pub struct AccountsDbConfig { pub storage_access: StorageAccess, pub scan_filter_for_shrinking: ScanFilter, pub enable_experimental_accumulator_hash: bool, + /// Number of threads for background cleaning operations (`thread_pool_clean') + pub num_clean_threads: Option, + /// Number of threads for foreground operations (`thread_pool`) + pub num_foreground_threads: Option, /// Number of threads for background accounts hashing (`thread_pool_hash`) pub num_hash_threads: Option, } @@ -1766,6 +1774,10 @@ pub fn make_hash_thread_pool(num_threads: Option) -> ThreadPool { .unwrap() } +pub fn default_num_foreground_threads() -> usize { + get_thread_count() +} + #[cfg(feature = "frozen-abi")] impl solana_frozen_abi::abi_example::AbiExample for AccountsDb { fn example() -> Self { @@ -1893,16 +1905,30 @@ impl AccountsDb { let bank_hash_stats = Mutex::new(HashMap::from([(0, BankHashStats::default())])); - // Increase the stack for accounts threads + // Increase the stack for foreground threads // rayon needs a lot of stack const ACCOUNTS_STACK_SIZE: usize = 8 * 1024 * 1024; + let num_foreground_threads = accounts_db_config + .num_foreground_threads + .map(Into::into) + .unwrap_or_else(default_num_foreground_threads); let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) + .num_threads(num_foreground_threads) .thread_name(|i| format!("solAccounts{i:02}")) .stack_size(ACCOUNTS_STACK_SIZE) .build() .expect("new rayon threadpool"); - let thread_pool_clean = make_min_priority_thread_pool(); + + let num_clean_threads = accounts_db_config + .num_clean_threads + .map(Into::into) + .unwrap_or_else(quarter_thread_count); + let thread_pool_clean = rayon::ThreadPoolBuilder::new() + .thread_name(|i| format!("solAccountsLo{i:02}")) + .num_threads(num_clean_threads) + .build() + .expect("new rayon threadpool"); + let thread_pool_hash = make_hash_thread_pool(accounts_db_config.num_hash_threads); let mut new = Self { diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index a849d08af385f6..ff430931c9ec48 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -27,6 +27,7 @@ use { std::{ collections::{btree_map::BTreeMap, HashSet}, fmt::Debug, + num::NonZeroUsize, ops::{ Bound, Bound::{Excluded, Included, Unbounded}, @@ -45,10 +46,11 @@ pub const ITER_BATCH_SIZE: usize = 1000; pub const BINS_DEFAULT: usize = 8192; pub const BINS_FOR_TESTING: usize = 2; // we want > 1, but each bin is a few disk files with a disk based index, so fewer is better pub const BINS_FOR_BENCHMARKS: usize = 8192; -pub const FLUSH_THREADS_TESTING: usize = 1; +// The unsafe is safe because we're using a fixed, known non-zero value +pub const FLUSH_THREADS_TESTING: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndexConfig { bins: Some(BINS_FOR_TESTING), - flush_threads: Some(FLUSH_THREADS_TESTING), + num_flush_threads: Some(FLUSH_THREADS_TESTING), drives: None, index_limit_mb: IndexLimitMb::Unlimited, ages_to_stay_in_cache: None, @@ -57,7 +59,7 @@ pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndex }; pub const ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS: AccountsIndexConfig = AccountsIndexConfig { bins: Some(BINS_FOR_BENCHMARKS), - flush_threads: Some(FLUSH_THREADS_TESTING), + num_flush_threads: Some(FLUSH_THREADS_TESTING), drives: None, index_limit_mb: IndexLimitMb::Unlimited, ages_to_stay_in_cache: None, @@ -218,7 +220,7 @@ pub enum IndexLimitMb { #[derive(Debug, Default, Clone)] pub struct AccountsIndexConfig { pub bins: Option, - pub flush_threads: Option, + pub num_flush_threads: Option, pub drives: Option>, pub index_limit_mb: IndexLimitMb, pub ages_to_stay_in_cache: Option, @@ -227,6 +229,10 @@ pub struct AccountsIndexConfig { pub started_from_validator: bool, } +pub fn default_num_flush_threads() -> NonZeroUsize { + NonZeroUsize::new(std::cmp::max(2, num_cpus::get() / 4)).expect("non-zero system threads") +} + #[derive(Debug, Default, Clone)] pub struct AccountSecondaryIndexes { pub keys: Option, diff --git a/accounts-db/src/accounts_index_storage.rs b/accounts-db/src/accounts_index_storage.rs index 3a654c84c25f97..48c5f55c5d9f64 100644 --- a/accounts-db/src/accounts_index_storage.rs +++ b/accounts-db/src/accounts_index_storage.rs @@ -1,7 +1,7 @@ use { crate::{ accounts_index::{ - in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue, + self, in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue, IndexValue, }, bucket_map_holder::BucketMapHolder, @@ -9,6 +9,7 @@ use { }, std::{ fmt::Debug, + num::NonZeroUsize, sync::{ atomic::{AtomicBool, Ordering}, Arc, Mutex, @@ -58,14 +59,14 @@ impl BgThreads { fn new + Into>( storage: &Arc>, in_mem: &[Arc>], - threads: usize, + threads: NonZeroUsize, can_advance_age: bool, exit: Arc, ) -> Self { // stop signal used for THIS batch of bg threads let local_exit = Arc::new(AtomicBool::default()); let handles = Some( - (0..threads) + (0..threads.get()) .map(|idx| { // the first thread we start is special let can_advance_age = can_advance_age && idx == 0; @@ -123,7 +124,7 @@ impl + Into> AccountsIndexStorage< *self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new( &self.storage, &self.in_mem, - Self::num_threads(), + accounts_index::default_num_flush_threads(), false, // cannot advance age from any of these threads self.exit.clone(), )); @@ -151,25 +152,21 @@ impl + Into> AccountsIndexStorage< self.in_mem.iter().for_each(|mem| mem.shrink_to_fit()) } - fn num_threads() -> usize { - std::cmp::max(2, num_cpus::get() / 4) - } - /// allocate BucketMapHolder and InMemAccountsIndex[] pub fn new(bins: usize, config: &Option, exit: Arc) -> Self { - let threads = config + let num_flush_threads = config .as_ref() - .and_then(|config| config.flush_threads) - .unwrap_or_else(Self::num_threads); + .and_then(|config| config.num_flush_threads) + .unwrap_or_else(accounts_index::default_num_flush_threads); - let storage = Arc::new(BucketMapHolder::new(bins, config, threads)); + let storage = Arc::new(BucketMapHolder::new(bins, config, num_flush_threads.get())); let in_mem = (0..bins) .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin))) .collect::>(); Self { - _bg_threads: BgThreads::new(&storage, &in_mem, threads, true, exit.clone()), + _bg_threads: BgThreads::new(&storage, &in_mem, num_flush_threads, true, exit.clone()), storage, in_mem, startup_worker_threads: Mutex::default(), diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 21de36f4abe8d6..314e82846b5e2c 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -2,7 +2,7 @@ use { clap::{value_t_or_exit, Arg, ArgMatches}, - solana_accounts_db::accounts_db, + solana_accounts_db::{accounts_db, accounts_index}, solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range}, solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, std::{num::NonZeroUsize, ops::RangeInclusive}, @@ -10,7 +10,10 @@ use { // Need this struct to provide &str whose lifetime matches that of the CLAP Arg's pub struct DefaultThreadArgs { + pub accounts_db_clean_threads: String, + pub accounts_db_foreground_threads: String, pub accounts_db_hash_threads: String, + pub accounts_index_flush_threads: String, pub ip_echo_server_threads: String, pub replay_forks_threads: String, pub replay_transactions_threads: String, @@ -21,7 +24,12 @@ pub struct DefaultThreadArgs { impl Default for DefaultThreadArgs { fn default() -> Self { Self { + accounts_db_clean_threads: AccountsDbCleanThreadsArg::bounded_default().to_string(), + accounts_db_foreground_threads: AccountsDbForegroundThreadsArg::bounded_default() + .to_string(), accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(), + accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default() + .to_string(), ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(), replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(), replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default() @@ -34,7 +42,10 @@ impl Default for DefaultThreadArgs { pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { vec![ + new_thread_arg::(&defaults.accounts_db_clean_threads), + new_thread_arg::(&defaults.accounts_db_foreground_threads), new_thread_arg::(&defaults.accounts_db_hash_threads), + new_thread_arg::(&defaults.accounts_db_foreground_threads), new_thread_arg::(&defaults.ip_echo_server_threads), new_thread_arg::(&defaults.replay_forks_threads), new_thread_arg::(&defaults.replay_transactions_threads), @@ -55,7 +66,10 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> { } pub struct NumThreadConfig { + pub accounts_db_clean_threads: NonZeroUsize, + pub accounts_db_foreground_threads: NonZeroUsize, pub accounts_db_hash_threads: NonZeroUsize, + pub accounts_index_flush_threads: NonZeroUsize, pub ip_echo_server_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, @@ -65,11 +79,26 @@ pub struct NumThreadConfig { pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { NumThreadConfig { + accounts_db_clean_threads: value_t_or_exit!( + matches, + AccountsDbCleanThreadsArg::NAME, + NonZeroUsize + ), + accounts_db_foreground_threads: value_t_or_exit!( + matches, + AccountsDbForegroundThreadsArg::NAME, + NonZeroUsize + ), accounts_db_hash_threads: value_t_or_exit!( matches, AccountsDbHashThreadsArg::NAME, NonZeroUsize ), + accounts_index_flush_threads: value_t_or_exit!( + matches, + AccountsIndexFlushThreadsArg::NAME, + NonZeroUsize + ), ip_echo_server_threads: value_t_or_exit!( matches, IpEchoServerThreadsArg::NAME, @@ -126,6 +155,28 @@ trait ThreadArg { } } +struct AccountsDbCleanThreadsArg; +impl ThreadArg for AccountsDbCleanThreadsArg { + const NAME: &'static str = "accounts_db_clean_threads"; + const LONG_NAME: &'static str = "accounts-db-clean-threads"; + const HELP: &'static str = "Number of threads to use for cleaning AccountsDb"; + + fn default() -> usize { + accounts_db::quarter_thread_count() + } +} + +struct AccountsDbForegroundThreadsArg; +impl ThreadArg for AccountsDbForegroundThreadsArg { + const NAME: &'static str = "accounts_db_foreground_threads"; + const LONG_NAME: &'static str = "accounts-db-foreground-threads"; + const HELP: &'static str = "Number of threads to use for AccountsDb block processing"; + + fn default() -> usize { + accounts_db::default_num_foreground_threads() + } +} + struct AccountsDbHashThreadsArg; impl ThreadArg for AccountsDbHashThreadsArg { const NAME: &'static str = "accounts_db_hash_threads"; @@ -137,6 +188,17 @@ impl ThreadArg for AccountsDbHashThreadsArg { } } +struct AccountsIndexFlushThreadsArg; +impl ThreadArg for AccountsIndexFlushThreadsArg { + const NAME: &'static str = "accounts_index_flush_threads"; + const LONG_NAME: &'static str = "accounts-index-flush-threads"; + const HELP: &'static str = "Number of threads to use for flushing the accounts index"; + + fn default() -> usize { + accounts_index::default_num_flush_threads().get() + } +} + struct IpEchoServerThreadsArg; impl ThreadArg for IpEchoServerThreadsArg { const NAME: &'static str = "ip_echo_server_threads"; diff --git a/validator/src/main.rs b/validator/src/main.rs index 6d185f8c8e9bb3..5d218eedd4a9f4 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -910,7 +910,10 @@ pub fn main() { }; let cli::thread_args::NumThreadConfig { + accounts_db_clean_threads, + accounts_db_foreground_threads, accounts_db_hash_threads, + accounts_index_flush_threads, ip_echo_server_threads, replay_forks_threads, replay_transactions_threads, @@ -1181,6 +1184,7 @@ pub fn main() { let mut accounts_index_config = AccountsIndexConfig { started_from_validator: true, // this is the only place this is set + num_flush_threads: Some(accounts_index_flush_threads), ..AccountsIndexConfig::default() }; if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) { @@ -1312,6 +1316,8 @@ pub fn main() { scan_filter_for_shrinking, enable_experimental_accumulator_hash: matches .is_present("accounts_db_experimental_accumulator_hash"), + num_clean_threads: Some(accounts_db_clean_threads), + num_foreground_threads: Some(accounts_db_foreground_threads), num_hash_threads: Some(accounts_db_hash_threads), ..AccountsDbConfig::default() };