Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI args for AccountsDb thread pools to validator #3281

Merged
merged 6 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,8 @@ pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
storage_access: StorageAccess::Mmap,
scan_filter_for_shrinking: ScanFilter::OnlyAbnormalWithVerify,
enable_experimental_accumulator_hash: false,
num_clean_threads: None,
num_foreground_threads: None,
num_hash_threads: None,
};
pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig {
Expand All @@ -528,6 +530,8 @@ pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig
storage_access: StorageAccess::Mmap,
scan_filter_for_shrinking: ScanFilter::OnlyAbnormalWithVerify,
enable_experimental_accumulator_hash: false,
num_clean_threads: None,
num_foreground_threads: None,
num_hash_threads: None,
};

Expand Down Expand Up @@ -640,6 +644,10 @@ pub struct AccountsDbConfig {
pub storage_access: StorageAccess,
pub scan_filter_for_shrinking: ScanFilter,
pub enable_experimental_accumulator_hash: bool,
/// Number of threads for background cleaning operations (`thread_pool_clean')
pub num_clean_threads: Option<NonZeroUsize>,
/// Number of threads for foreground operations (`thread_pool`)
pub num_foreground_threads: Option<NonZeroUsize>,
/// Number of threads for background accounts hashing (`thread_pool_hash`)
pub num_hash_threads: Option<NonZeroUsize>,
}
Expand Down Expand Up @@ -1766,6 +1774,10 @@ pub fn make_hash_thread_pool(num_threads: Option<NonZeroUsize>) -> ThreadPool {
.unwrap()
}

pub fn default_num_foreground_threads() -> usize {
get_thread_count()
}

#[cfg(feature = "frozen-abi")]
impl solana_frozen_abi::abi_example::AbiExample for AccountsDb {
fn example() -> Self {
Expand Down Expand Up @@ -1893,16 +1905,30 @@ impl AccountsDb {

let bank_hash_stats = Mutex::new(HashMap::from([(0, BankHashStats::default())]));

// Increase the stack for accounts threads
// Increase the stack for foreground threads
// rayon needs a lot of stack
const ACCOUNTS_STACK_SIZE: usize = 8 * 1024 * 1024;
let num_foreground_threads = accounts_db_config
.num_foreground_threads
.map(Into::into)
.unwrap_or_else(default_num_foreground_threads);
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.num_threads(num_foreground_threads)
.thread_name(|i| format!("solAccounts{i:02}"))
.stack_size(ACCOUNTS_STACK_SIZE)
.build()
.expect("new rayon threadpool");
let thread_pool_clean = make_min_priority_thread_pool();

let num_clean_threads = accounts_db_config
.num_clean_threads
.map(Into::into)
.unwrap_or_else(quarter_thread_count);
let thread_pool_clean = rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("solAccountsLo{i:02}"))
.num_threads(num_clean_threads)
.build()
.expect("new rayon threadpool");

let thread_pool_hash = make_hash_thread_pool(accounts_db_config.num_hash_threads);

let mut new = Self {
Expand Down
14 changes: 10 additions & 4 deletions accounts-db/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use {
std::{
collections::{btree_map::BTreeMap, HashSet},
fmt::Debug,
num::NonZeroUsize,
ops::{
Bound,
Bound::{Excluded, Included, Unbounded},
Expand All @@ -45,10 +46,11 @@ pub const ITER_BATCH_SIZE: usize = 1000;
pub const BINS_DEFAULT: usize = 8192;
pub const BINS_FOR_TESTING: usize = 2; // we want > 1, but each bin is a few disk files with a disk based index, so fewer is better
pub const BINS_FOR_BENCHMARKS: usize = 8192;
pub const FLUSH_THREADS_TESTING: usize = 1;
// The unsafe is safe because we're using a fixed, known non-zero value
pub const FLUSH_THREADS_TESTING: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndexConfig {
bins: Some(BINS_FOR_TESTING),
flush_threads: Some(FLUSH_THREADS_TESTING),
num_flush_threads: Some(FLUSH_THREADS_TESTING),
drives: None,
index_limit_mb: IndexLimitMb::Unlimited,
ages_to_stay_in_cache: None,
Expand All @@ -57,7 +59,7 @@ pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndex
};
pub const ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS: AccountsIndexConfig = AccountsIndexConfig {
bins: Some(BINS_FOR_BENCHMARKS),
flush_threads: Some(FLUSH_THREADS_TESTING),
num_flush_threads: Some(FLUSH_THREADS_TESTING),
drives: None,
index_limit_mb: IndexLimitMb::Unlimited,
ages_to_stay_in_cache: None,
Expand Down Expand Up @@ -218,7 +220,7 @@ pub enum IndexLimitMb {
#[derive(Debug, Default, Clone)]
pub struct AccountsIndexConfig {
pub bins: Option<usize>,
pub flush_threads: Option<usize>,
pub num_flush_threads: Option<NonZeroUsize>,
pub drives: Option<Vec<PathBuf>>,
pub index_limit_mb: IndexLimitMb,
pub ages_to_stay_in_cache: Option<Age>,
Expand All @@ -227,6 +229,10 @@ pub struct AccountsIndexConfig {
pub started_from_validator: bool,
}

pub fn default_num_flush_threads() -> NonZeroUsize {
NonZeroUsize::new(std::cmp::max(2, num_cpus::get() / 4)).expect("non-zero system threads")
}

#[derive(Debug, Default, Clone)]
pub struct AccountSecondaryIndexes {
pub keys: Option<AccountSecondaryIndexesIncludeExclude>,
Expand Down
23 changes: 10 additions & 13 deletions accounts-db/src/accounts_index_storage.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use {
crate::{
accounts_index::{
in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue,
self, in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue,
IndexValue,
},
bucket_map_holder::BucketMapHolder,
waitable_condvar::WaitableCondvar,
},
std::{
fmt::Debug,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
Expand Down Expand Up @@ -58,14 +59,14 @@ impl BgThreads {
fn new<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>(
storage: &Arc<BucketMapHolder<T, U>>,
in_mem: &[Arc<InMemAccountsIndex<T, U>>],
threads: usize,
threads: NonZeroUsize,
can_advance_age: bool,
exit: Arc<AtomicBool>,
) -> Self {
// stop signal used for THIS batch of bg threads
let local_exit = Arc::new(AtomicBool::default());
let handles = Some(
(0..threads)
(0..threads.get())
.map(|idx| {
// the first thread we start is special
let can_advance_age = can_advance_age && idx == 0;
Expand Down Expand Up @@ -123,7 +124,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexStorage<
*self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
&self.storage,
&self.in_mem,
Self::num_threads(),
accounts_index::default_num_flush_threads(),
false, // cannot advance age from any of these threads
self.exit.clone(),
));
Expand Down Expand Up @@ -151,25 +152,21 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexStorage<
self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
}

fn num_threads() -> usize {
std::cmp::max(2, num_cpus::get() / 4)
}

/// allocate BucketMapHolder and InMemAccountsIndex[]
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, exit: Arc<AtomicBool>) -> Self {
let threads = config
let num_flush_threads = config
.as_ref()
.and_then(|config| config.flush_threads)
.unwrap_or_else(Self::num_threads);
.and_then(|config| config.num_flush_threads)
.unwrap_or_else(accounts_index::default_num_flush_threads);

let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
let storage = Arc::new(BucketMapHolder::new(bins, config, num_flush_threads.get()));

let in_mem = (0..bins)
.map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
.collect::<Vec<_>>();

Self {
_bg_threads: BgThreads::new(&storage, &in_mem, threads, true, exit.clone()),
_bg_threads: BgThreads::new(&storage, &in_mem, num_flush_threads, true, exit.clone()),
storage,
in_mem,
startup_worker_threads: Mutex::default(),
Expand Down
64 changes: 63 additions & 1 deletion validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

use {
clap::{value_t_or_exit, Arg, ArgMatches},
solana_accounts_db::accounts_db,
solana_accounts_db::{accounts_db, accounts_index},
solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
std::{num::NonZeroUsize, ops::RangeInclusive},
};

// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
pub struct DefaultThreadArgs {
pub accounts_db_clean_threads: String,
pub accounts_db_foreground_threads: String,
pub accounts_db_hash_threads: String,
pub accounts_index_flush_threads: String,
pub ip_echo_server_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
Expand All @@ -21,7 +24,12 @@ pub struct DefaultThreadArgs {
impl Default for DefaultThreadArgs {
fn default() -> Self {
Self {
accounts_db_clean_threads: AccountsDbCleanThreadsArg::bounded_default().to_string(),
accounts_db_foreground_threads: AccountsDbForegroundThreadsArg::bounded_default()
.to_string(),
accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(),
accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
.to_string(),
ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
Expand All @@ -34,7 +42,10 @@ impl Default for DefaultThreadArgs {

pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
vec![
new_thread_arg::<AccountsDbCleanThreadsArg>(&defaults.accounts_db_clean_threads),
new_thread_arg::<AccountsDbForegroundThreadsArg>(&defaults.accounts_db_foreground_threads),
new_thread_arg::<AccountsDbHashThreadsArg>(&defaults.accounts_db_hash_threads),
new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_db_foreground_threads),
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
Expand All @@ -55,7 +66,10 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
}

pub struct NumThreadConfig {
pub accounts_db_clean_threads: NonZeroUsize,
pub accounts_db_foreground_threads: NonZeroUsize,
pub accounts_db_hash_threads: NonZeroUsize,
pub accounts_index_flush_threads: NonZeroUsize,
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
Expand All @@ -65,11 +79,26 @@ pub struct NumThreadConfig {

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NumThreadConfig {
accounts_db_clean_threads: value_t_or_exit!(
matches,
AccountsDbCleanThreadsArg::NAME,
NonZeroUsize
),
accounts_db_foreground_threads: value_t_or_exit!(
matches,
AccountsDbForegroundThreadsArg::NAME,
NonZeroUsize
),
accounts_db_hash_threads: value_t_or_exit!(
matches,
AccountsDbHashThreadsArg::NAME,
NonZeroUsize
),
accounts_index_flush_threads: value_t_or_exit!(
matches,
AccountsIndexFlushThreadsArg::NAME,
NonZeroUsize
),
ip_echo_server_threads: value_t_or_exit!(
matches,
IpEchoServerThreadsArg::NAME,
Expand Down Expand Up @@ -126,6 +155,28 @@ trait ThreadArg {
}
}

struct AccountsDbCleanThreadsArg;
impl ThreadArg for AccountsDbCleanThreadsArg {
const NAME: &'static str = "accounts_db_clean_threads";
const LONG_NAME: &'static str = "accounts-db-clean-threads";
const HELP: &'static str = "Number of threads to use for cleaning AccountsDb";

fn default() -> usize {
accounts_db::quarter_thread_count()
}
}

struct AccountsDbForegroundThreadsArg;
impl ThreadArg for AccountsDbForegroundThreadsArg {
const NAME: &'static str = "accounts_db_foreground_threads";
const LONG_NAME: &'static str = "accounts-db-foreground-threads";
const HELP: &'static str = "Number of threads to use for AccountsDb block processing";

fn default() -> usize {
accounts_db::default_num_foreground_threads()
}
}

struct AccountsDbHashThreadsArg;
impl ThreadArg for AccountsDbHashThreadsArg {
const NAME: &'static str = "accounts_db_hash_threads";
Expand All @@ -137,6 +188,17 @@ impl ThreadArg for AccountsDbHashThreadsArg {
}
}

struct AccountsIndexFlushThreadsArg;
impl ThreadArg for AccountsIndexFlushThreadsArg {
const NAME: &'static str = "accounts_index_flush_threads";
const LONG_NAME: &'static str = "accounts-index-flush-threads";
const HELP: &'static str = "Number of threads to use for flushing the accounts index";

fn default() -> usize {
accounts_index::default_num_flush_threads().get()
}
}

struct IpEchoServerThreadsArg;
impl ThreadArg for IpEchoServerThreadsArg {
const NAME: &'static str = "ip_echo_server_threads";
Expand Down
6 changes: 6 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,10 @@ pub fn main() {
};

let cli::thread_args::NumThreadConfig {
accounts_db_clean_threads,
accounts_db_foreground_threads,
accounts_db_hash_threads,
accounts_index_flush_threads,
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
Expand Down Expand Up @@ -1181,6 +1184,7 @@ pub fn main() {

let mut accounts_index_config = AccountsIndexConfig {
started_from_validator: true, // this is the only place this is set
num_flush_threads: Some(accounts_index_flush_threads),
..AccountsIndexConfig::default()
};
if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) {
Expand Down Expand Up @@ -1312,6 +1316,8 @@ pub fn main() {
scan_filter_for_shrinking,
enable_experimental_accumulator_hash: matches
.is_present("accounts_db_experimental_accumulator_hash"),
num_clean_threads: Some(accounts_db_clean_threads),
num_foreground_threads: Some(accounts_db_foreground_threads),
num_hash_threads: Some(accounts_db_hash_threads),
..AccountsDbConfig::default()
};
Expand Down
Loading