Skip to content

Commit

Permalink
Add arg for accounts index flush threads
Browse files Browse the repository at this point in the history
  • Loading branch information
steviez committed Oct 24, 2024
1 parent 4e1b436 commit 4596af9
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 18 deletions.
14 changes: 10 additions & 4 deletions accounts-db/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use {
std::{
collections::{btree_map::BTreeMap, HashSet},
fmt::Debug,
num::NonZeroUsize,
ops::{
Bound,
Bound::{Excluded, Included, Unbounded},
Expand All @@ -45,10 +46,11 @@ pub const ITER_BATCH_SIZE: usize = 1000;
pub const BINS_DEFAULT: usize = 8192;
pub const BINS_FOR_TESTING: usize = 2; // we want > 1, but each bin is a few disk files with a disk based index, so fewer is better
pub const BINS_FOR_BENCHMARKS: usize = 8192;
pub const FLUSH_THREADS_TESTING: usize = 1;
// The unsafe is safe because we're using a fixed, known non-zero value
pub const FLUSH_THREADS_TESTING: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndexConfig {
bins: Some(BINS_FOR_TESTING),
flush_threads: Some(FLUSH_THREADS_TESTING),
num_flush_threads: Some(FLUSH_THREADS_TESTING),
drives: None,
index_limit_mb: IndexLimitMb::Unlimited,
ages_to_stay_in_cache: None,
Expand All @@ -57,7 +59,7 @@ pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndex
};
pub const ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS: AccountsIndexConfig = AccountsIndexConfig {
bins: Some(BINS_FOR_BENCHMARKS),
flush_threads: Some(FLUSH_THREADS_TESTING),
num_flush_threads: Some(FLUSH_THREADS_TESTING),
drives: None,
index_limit_mb: IndexLimitMb::Unlimited,
ages_to_stay_in_cache: None,
Expand Down Expand Up @@ -218,7 +220,7 @@ pub enum IndexLimitMb {
#[derive(Debug, Default, Clone)]
pub struct AccountsIndexConfig {
pub bins: Option<usize>,
pub flush_threads: Option<usize>,
pub num_flush_threads: Option<NonZeroUsize>,
pub drives: Option<Vec<PathBuf>>,
pub index_limit_mb: IndexLimitMb,
pub ages_to_stay_in_cache: Option<Age>,
Expand All @@ -227,6 +229,10 @@ pub struct AccountsIndexConfig {
pub started_from_validator: bool,
}

pub fn default_num_flush_threads() -> NonZeroUsize {
NonZeroUsize::new(std::cmp::max(2, num_cpus::get() / 4)).expect("non-zero system threads")
}

#[derive(Debug, Default, Clone)]
pub struct AccountSecondaryIndexes {
pub keys: Option<AccountSecondaryIndexesIncludeExclude>,
Expand Down
23 changes: 10 additions & 13 deletions accounts-db/src/accounts_index_storage.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use {
crate::{
accounts_index::{
in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue,
self, in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue,
IndexValue,
},
bucket_map_holder::BucketMapHolder,
waitable_condvar::WaitableCondvar,
},
std::{
fmt::Debug,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
Expand Down Expand Up @@ -58,14 +59,14 @@ impl BgThreads {
fn new<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>(
storage: &Arc<BucketMapHolder<T, U>>,
in_mem: &[Arc<InMemAccountsIndex<T, U>>],
threads: usize,
threads: NonZeroUsize,
can_advance_age: bool,
exit: Arc<AtomicBool>,
) -> Self {
// stop signal used for THIS batch of bg threads
let local_exit = Arc::new(AtomicBool::default());
let handles = Some(
(0..threads)
(0..threads.get())
.map(|idx| {
// the first thread we start is special
let can_advance_age = can_advance_age && idx == 0;
Expand Down Expand Up @@ -123,7 +124,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexStorage<
*self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
&self.storage,
&self.in_mem,
Self::num_threads(),
accounts_index::default_num_flush_threads(),
false, // cannot advance age from any of these threads
self.exit.clone(),
));
Expand Down Expand Up @@ -151,25 +152,21 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexStorage<
self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
}

fn num_threads() -> usize {
std::cmp::max(2, num_cpus::get() / 4)
}

/// allocate BucketMapHolder and InMemAccountsIndex[]
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, exit: Arc<AtomicBool>) -> Self {
let threads = config
let num_flush_threads = config
.as_ref()
.and_then(|config| config.flush_threads)
.unwrap_or_else(Self::num_threads);
.and_then(|config| config.num_flush_threads)
.unwrap_or_else(accounts_index::default_num_flush_threads);

let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
let storage = Arc::new(BucketMapHolder::new(bins, config, num_flush_threads.get()));

let in_mem = (0..bins)
.map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
.collect::<Vec<_>>();

Self {
_bg_threads: BgThreads::new(&storage, &in_mem, threads, true, exit.clone()),
_bg_threads: BgThreads::new(&storage, &in_mem, num_flush_threads, true, exit.clone()),
storage,
in_mem,
startup_worker_threads: Mutex::default(),
Expand Down
23 changes: 22 additions & 1 deletion validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use {
clap::{value_t_or_exit, Arg, ArgMatches},
solana_accounts_db::accounts_db,
solana_accounts_db::{accounts_db, accounts_index},
solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
std::{num::NonZeroUsize, ops::RangeInclusive},
Expand All @@ -13,6 +13,7 @@ pub struct DefaultThreadArgs {
pub accounts_db_clean_threads: String,
pub accounts_db_hash_threads: String,
pub accounts_db_process_threads: String,
pub accounts_index_flush_threads: String,
pub ip_echo_server_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
Expand All @@ -26,6 +27,8 @@ impl Default for DefaultThreadArgs {
accounts_db_clean_threads: AccountsDbCleanThreadsArg::bounded_default().to_string(),
accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(),
accounts_db_process_threads: AccountsDbProcessThreadsArg::bounded_default().to_string(),
accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
.to_string(),
ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
Expand All @@ -41,6 +44,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<AccountsDbCleanThreadsArg>(&defaults.accounts_db_clean_threads),
new_thread_arg::<AccountsDbHashThreadsArg>(&defaults.accounts_db_hash_threads),
new_thread_arg::<AccountsDbProcessThreadsArg>(&defaults.accounts_db_process_threads),
new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_db_process_threads),
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
Expand All @@ -64,6 +68,7 @@ pub struct NumThreadConfig {
pub accounts_db_clean_threads: NonZeroUsize,
pub accounts_db_hash_threads: NonZeroUsize,
pub accounts_db_process_threads: NonZeroUsize,
pub accounts_index_flush_threads: NonZeroUsize,
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
Expand All @@ -88,6 +93,11 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
AccountsDbProcessThreadsArg::NAME,
NonZeroUsize
),
accounts_index_flush_threads: value_t_or_exit!(
matches,
AccountsIndexFlushThreadsArg::NAME,
NonZeroUsize
),
ip_echo_server_threads: value_t_or_exit!(
matches,
IpEchoServerThreadsArg::NAME,
Expand Down Expand Up @@ -177,6 +187,17 @@ impl ThreadArg for AccountsDbProcessThreadsArg {
}
}

struct AccountsIndexFlushThreadsArg;
impl ThreadArg for AccountsIndexFlushThreadsArg {
const NAME: &'static str = "accounts_index_flush_threads";
const LONG_NAME: &'static str = "accounts-index-flush-threads";
const HELP: &'static str = "Number of threads to use for flushing the accounts index";

fn default() -> usize {
accounts_index::default_num_flush_threads().get()
}
}

struct IpEchoServerThreadsArg;
impl ThreadArg for IpEchoServerThreadsArg {
const NAME: &'static str = "ip_echo_server_threads";
Expand Down
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ pub fn main() {
accounts_db_clean_threads,
accounts_db_hash_threads,
accounts_db_process_threads,
accounts_index_flush_threads,
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
Expand Down Expand Up @@ -1183,6 +1184,7 @@ pub fn main() {

let mut accounts_index_config = AccountsIndexConfig {
started_from_validator: true, // this is the only place this is set
num_flush_threads: Some(accounts_index_flush_threads),
..AccountsIndexConfig::default()
};
if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) {
Expand Down

0 comments on commit 4596af9

Please sign in to comment.