Skip to content

Commit

Permalink
test leader and vote RPC calls
Browse files Browse the repository at this point in the history
  • Loading branch information
musitdev committed Nov 11, 2023
1 parent ec6703d commit 5266b64
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 53 deletions.
5 changes: 3 additions & 2 deletions core/src/stores/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use solana_sdk::hash::Hash;
use solana_sdk::slot_history::Slot;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use std::sync::{atomic::AtomicU64, Arc};
use tokio::sync::RwLock;

use crate::{
stores::{
Expand Down Expand Up @@ -37,7 +38,7 @@ pub struct DataCache {
pub identity_stakes: IdentityStakes,
pub cluster_info: ClusterInfo,
pub epoch_data: EpochCache,
pub leader_schedule: Arc<CalculatedSchedule>,
pub leader_schedule: Arc<RwLock<CalculatedSchedule>>,
}

impl DataCache {
Expand Down Expand Up @@ -93,7 +94,7 @@ impl DataCache {
store: Arc::new(DashMap::new()),
},
epoch_data: EpochCache::new_for_tests(),
leader_schedule: Arc::new(CalculatedSchedule::default()),
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
}
}
}
Expand Down
103 changes: 66 additions & 37 deletions core/src/structures/leaderschedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use solana_rpc_client_api::config::RpcGetVoteAccountsConfig;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::ParsePubkeyError;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::slot_history::Slot;
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
use std::collections::HashMap;
use std::str::FromStr;

Expand Down Expand Up @@ -46,7 +48,13 @@ impl CalculatedSchedule {
commitment: Option<CommitmentConfig>,
data_cache: &DataCache,
) -> Option<HashMap<String, Vec<usize>>> {
let commitment = commitment.unwrap_or_else(CommitmentConfig::default);
log::info!(
"get_leader_schedule_for_slot current:{:?} next:{:?} ",
self.current.clone().unwrap_or_default(),
self.next.clone().unwrap_or_default()
);

let commitment = commitment.unwrap_or_default();
let slot = match slot {
Some(slot) => slot,
None => {
Expand All @@ -67,48 +75,69 @@ impl CalculatedSchedule {
get_schedule(self.current.as_ref()).or_else(|| get_schedule(self.next.as_ref()))
}

// pub async fn get_slot_leaders(&self, start_slot: Slot, limit: u64) -> Result<Vec<String>> {
// debug!(
// "get_slot_leaders rpc request received (start: {} limit: {})",
// start_slot, limit
// );
pub async fn get_slot_leaders(
&self,
start_slot: Slot,
limit: u64,
epock_schedule: &EpochSchedule,
) -> Result<Vec<Pubkey>, String> {
log::debug!(
"get_slot_leaders rpc request received (start: {} limit: {})",
start_slot,
limit
);
pub const MAX_GET_SLOT_LEADERS: usize =
solana_rpc_client_api::request::MAX_GET_SLOT_LEADERS;

let mut limit = limit as usize;
if limit > MAX_GET_SLOT_LEADERS {
return Err(format!(
"Invalid Params: Invalid limit; max {MAX_GET_SLOT_LEADERS}"
));
}

// let limit = limit as usize;
// if limit > MAX_GET_SLOT_LEADERS {
// return Err(Error::invalid_params(format!(
// "Invalid limit; max {MAX_GET_SLOT_LEADERS}"
// )));
// }
// let bank = self.bank(commitment);
let (epoch, slot_index) = epock_schedule.get_epoch_and_slot_index(start_slot);
let mut slot_leaders = Vec::with_capacity(limit);

// let (mut epoch, mut slot_index) =
// bank.epoch_schedule().get_epoch_and_slot_index(start_slot);
let mut extend_slot_from_epoch = |leader_schedule: &[Pubkey], slot_index: usize| {
let take = limit.saturating_sub(slot_leaders.len());
slot_leaders.extend(leader_schedule.iter().skip(slot_index).take(take));
limit -= slot_leaders.len();
};

// let mut slot_leaders = Vec::with_capacity(limit);
// while slot_leaders.len() < limit {
// if let Some(leader_schedule) =
// self.leader_schedule_cache.get_epoch_leader_schedule(epoch)
// {
// slot_leaders.extend(
// leader_schedule
// .get_slot_leaders()
// .iter()
// .skip(slot_index as usize)
// .take(limit.saturating_sub(slot_leaders.len())),
// );
// } else {
// return Err(Error::invalid_params(format!(
// "Invalid slot range: leader schedule for epoch {epoch} is unavailable"
// )));
// }
// log::info!(
// "get_slot_leaders epoch:{epoch} current:{:?} next:{:?} ",
// self.current.clone().unwrap_or_default(),
// self.next.clone().unwrap_or_default()
// );

// epoch += 1;
// slot_index = 0;
// }
// }
//TODO manage more leader schedule data in storage.
//Here only search on current and next epoch
let res = [
(&self.current, slot_index as usize, epoch),
(&self.next, slot_index as usize, epoch),
(&self.next, 0, epoch + 1),
]
.into_iter()
.filter_map(|(epoch_data, slot_index, epoch)| {
epoch_data.as_ref().and_then(|epoch_data| {
(epoch_data.epoch == epoch).then_some((epoch_data, slot_index))
})
})
.map(|(epoch_data, slot_index)| {
extend_slot_from_epoch(&epoch_data.schedule_by_slot, slot_index);
})
.collect::<Vec<()>>();
match res.is_empty() {
true => Err(format!(
"Invalid Params: Invalid slot range: leader schedule for epoch {epoch} is unavailable"
)),
false => Ok(slot_leaders),
}
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct LeaderScheduleData {
pub schedule_by_node: HashMap<String, Vec<usize>>,
pub schedule_by_slot: Vec<Pubkey>,
Expand Down
15 changes: 13 additions & 2 deletions lite-rpc/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,12 @@ impl LiteRpcServer for LiteBridge {
slot: Option<u64>,
config: Option<RpcLeaderScheduleConfig>,
) -> crate::rpc::Result<Option<HashMap<String, Vec<usize>>>> {
log::warn!("receive get_leader_schedule rpc call");
//TODO verify leader identity.
let schedule = self
.data_cache
.leader_schedule
.read()
.await
.get_leader_schedule_for_slot(slot, config.and_then(|c| c.commitment), &self.data_cache)
.await;
Ok(schedule)
Expand All @@ -392,7 +393,17 @@ impl LiteRpcServer for LiteBridge {
start_slot: u64,
limit: u64,
) -> crate::rpc::Result<Vec<Pubkey>> {
todo!()
let epock_schedule = self.data_cache.epoch_data.get_epoch_schedule();

self.data_cache
.leader_schedule
.read()
.await
.get_slot_leaders(start_slot, limit, epock_schedule)
.await
.map_err(|err| {
jsonrpsee::core::Error::Custom(format!("error during query processing:{err}"))
})
}

async fn get_vote_accounts(
Expand Down
8 changes: 4 additions & 4 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
pub mod rpc_tester;

use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use std::time::Duration;

use anyhow::bail;
use clap::Parser;
use dashmap::DashMap;
Expand All @@ -11,6 +8,9 @@ use lite_rpc::postgres_logger::PostgresLogger;
use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::{bridge::LiteBridge, cli::Args};
use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, GRPC_VERSION, NB_SLOTS_TRANSACTIONS_TO_CACHE};
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use std::time::Duration;
use tokio::sync::RwLock;

use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription;
Expand Down Expand Up @@ -143,7 +143,7 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
save_for_additional_slots: NB_SLOTS_TRANSACTIONS_TO_CACHE,
},
epoch_data,
leader_schedule: Arc::new(CalculatedSchedule::default()),
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
};

let lata_cache_service = DataCachingService {
Expand Down
9 changes: 5 additions & 4 deletions stake_vote/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const VOTESTORE_INITIAL_CAPACITY: usize = 600000;
type Slot = u64;

pub async fn start_stakes_and_votes_loop(
mut data_cache: DataCache,
data_cache: DataCache,
mut slot_notification: SlotStream,
mut vote_account_rpc_request: Receiver<(
GetVoteAccountsConfig,
Expand Down Expand Up @@ -226,8 +226,8 @@ pub async fn start_stakes_and_votes_loop(
Ok(Some(boot_res))=> {
match boot_res {
Ok(current_schedule_data) => {
//let data_schedule = Arc::make_mut(&mut data_cache.leader_schedule);
data_cache.leader_schedule = Arc::new(current_schedule_data);
let mut data_schedule = data_cache.leader_schedule.write().await;
*data_schedule = current_schedule_data;
}
Err(err) => {
log::warn!("Error during current leader schedule bootstrap from files:{err}")
Expand All @@ -254,7 +254,8 @@ pub async fn start_stakes_and_votes_loop(
if let Some(new_leader_schedule) = new_leader_schedule {
//clone old schedule values is there's other use.
//only done once epoch. Avoid to use a Mutex.
let data_schedule = Arc::make_mut(&mut data_cache.leader_schedule);
log::info!("End leader schedule calculus for epoch:{}", new_leader_schedule.epoch);
let mut data_schedule = data_cache.leader_schedule.write().await;
data_schedule.current = data_schedule.next.take();
data_schedule.next = Some(new_leader_schedule.rpc_data);
}
Expand Down
11 changes: 7 additions & 4 deletions stake_vote/src/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,18 @@ pub fn merge_program_account_in_vote_map(
});
}

//TODO put in config instead of const.
// Validators that are this number of slots behind are considered delinquent
pub const DELINQUENT_VALIDATOR_SLOT_DISTANCE: u64 = 128;
pub fn get_rpc_vote_accounts_info(
current_slot: Slot,
votes: &VoteMap,
vote_accounts: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
config: GetVoteAccountsConfig,
) -> RpcVoteAccountStatus {
pub const DELINQUENT_VALIDATOR_SLOT_DISTANCE: u64 =
solana_rpc_client_api::request::DELINQUENT_VALIDATOR_SLOT_DISTANCE;
let delinquent_validator_slot_distance = config
.delinquent_slot_distance
.unwrap_or(DELINQUENT_VALIDATOR_SLOT_DISTANCE);
//From Solana rpc::rpc::metaz::get_vote_accounts() code.
let (current_vote_accounts, delinquent_vote_accounts): (
Vec<RpcVoteAccountInfo>,
Expand All @@ -258,8 +261,8 @@ pub fn get_rpc_vote_accounts_info(
vote.convert_to_rpc_vote_account_info(stake, epoch_vote_account)
})
.partition(|vote_account_info| {
if current_slot >= DELINQUENT_VALIDATOR_SLOT_DISTANCE {
vote_account_info.last_vote > current_slot - DELINQUENT_VALIDATOR_SLOT_DISTANCE
if current_slot >= delinquent_validator_slot_distance {
vote_account_info.last_vote > current_slot - delinquent_validator_slot_distance
} else {
vote_account_info.last_vote > 0
}
Expand Down

0 comments on commit 5266b64

Please sign in to comment.