Skip to content

Commit

Permalink
use geyser blockmeta to progress finalizedslot (#367)
Browse files Browse the repository at this point in the history
introduce BlockInfo on a new BlockInfoStream; maps from yellowstone BlockMeta
fix: last_finalized_slot must progress unconditionally
  • Loading branch information
grooviegermanikus authored Mar 22, 2024
1 parent defdc20 commit cd9df11
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 93 deletions.
16 changes: 8 additions & 8 deletions accounts/src/account_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use anyhow::bail;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_account_decoder::{UiAccount, UiDataSliceConfig};
use solana_lite_rpc_core::types::BlockInfoStream;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::{
account_data::{AccountData, AccountNotificationMessage, AccountStream},
account_filter::AccountFilters,
},
types::BlockStream,
AnyhowJoinHandle,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
Expand Down Expand Up @@ -151,7 +151,7 @@ impl AccountService {
pub fn process_account_stream(
&self,
mut account_stream: AccountStream,
mut block_stream: BlockStream,
mut blockinfo_stream: BlockInfoStream,
) -> Vec<AnyhowJoinHandle> {
let this = self.clone();
let processed_task = tokio::spawn(async move {
Expand Down Expand Up @@ -187,19 +187,19 @@ impl AccountService {
let this = self.clone();
let block_processing_task = tokio::spawn(async move {
loop {
match block_stream.recv().await {
Ok(block_notification) => {
if block_notification.commitment_config.is_processed() {
match blockinfo_stream.recv().await {
Ok(block_info) => {
if block_info.commitment_config.is_processed() {
// processed commitment is not processed in this loop
continue;
}
let commitment = Commitment::from(block_notification.commitment_config);
let commitment = Commitment::from(block_info.commitment_config);
let updated_accounts = this
.account_store
.process_slot_data(block_notification.slot, commitment)
.process_slot_data(block_info.slot, commitment)
.await;

if block_notification.commitment_config.is_finalized() {
if block_info.commitment_config.is_finalized() {
ACCOUNT_UPDATES_FINALIZED.add(updated_accounts.len() as i64)
} else {
ACCOUNT_UPDATES_CONFIRMED.add(updated_accounts.len() as i64);
Expand Down
2 changes: 1 addition & 1 deletion blockstore/tests/blockstore_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn storage_test() {
let (slot_notifier, _jh_multiplex_slotstream) =
create_grpc_multiplex_processed_slots_subscription(grpc_sources.clone());

let (blocks_notifier, _jh_multiplex_blockstream) =
let (blocks_notifier, _blockmeta_output_stream, _jh_multiplex_blockstream) =
create_grpc_multiplex_blocks_subscription(grpc_sources);

let (epoch_cache, _) = EpochCache::bootstrap_epoch(&rpc_client).await.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion cluster-endpoints/src/endpoint_stremers.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use solana_lite_rpc_core::{
structures::account_data::AccountStream,
types::{BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream},
types::{BlockInfoStream, BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream},
};

/// subscribers to broadcast channels should assume that channels are not getting closed unless the system is shutting down
pub struct EndpointStreaming {
pub blocks_notifier: BlockStream,
pub blockinfo_notifier: BlockInfoStream,
pub slot_notifier: SlotStream,
pub vote_account_notifier: VoteAccountStream,
pub cluster_info_notifier: ClusterInfoStream,
Expand Down
144 changes: 89 additions & 55 deletions cluster-endpoints/src/grpc_multiplex.rs

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ pub fn create_grpc_subscription(
let (slot_multiplex_channel, jh_multiplex_slotstream) =
create_grpc_multiplex_processed_slots_subscription(grpc_sources.clone());

let (block_multiplex_channel, jh_multiplex_blockstream) =
let (block_multiplex_channel, blockmeta_channel, jh_multiplex_blockstream) =
create_grpc_multiplex_blocks_subscription(grpc_sources.clone());

let cluster_info_polling = poll_cluster_info(rpc_client.clone(), cluster_info_sx);
Expand All @@ -283,6 +283,7 @@ pub fn create_grpc_subscription(
create_grpc_account_streaming(grpc_sources, accounts_filter);
let streamers = EndpointStreaming {
blocks_notifier: block_multiplex_channel,
blockinfo_notifier: blockmeta_channel,
slot_notifier: slot_multiplex_channel,
cluster_info_notifier,
vote_account_notifier,
Expand All @@ -300,6 +301,7 @@ pub fn create_grpc_subscription(
} else {
let streamers = EndpointStreaming {
blocks_notifier: block_multiplex_channel,
blockinfo_notifier: blockmeta_channel,
slot_notifier: slot_multiplex_channel,
cluster_info_notifier,
vote_account_notifier,
Expand Down
3 changes: 3 additions & 0 deletions cluster-endpoints/src/json_rpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub fn create_json_rpc_polling_subscription(
) -> anyhow::Result<(EndpointStreaming, Vec<AnyhowJoinHandle>)> {
let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(16);
let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(16);
let (blockinfo_sx, blockinfo_notifier) = tokio::sync::broadcast::channel(16);
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(16);
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(16);
// does not support accounts support with rpc polling
Expand All @@ -26,6 +27,7 @@ pub fn create_json_rpc_polling_subscription(
let mut block_polling_tasks = poll_block(
rpc_client.clone(),
block_sx,
blockinfo_sx,
slot_notifier.resubscribe(),
num_parallel_tasks,
);
Expand All @@ -39,6 +41,7 @@ pub fn create_json_rpc_polling_subscription(

let streamers = EndpointStreaming {
blocks_notifier,
blockinfo_notifier,
slot_notifier,
cluster_info_notifier,
vote_account_notifier,
Expand Down
17 changes: 17 additions & 0 deletions cluster-endpoints/src/rpc_polling/poll_blocks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{bail, Context};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::solana_utils::hash_from_str;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use solana_lite_rpc_core::structures::produced_block::{ProducedBlockInner, TransactionInfo};
use solana_lite_rpc_core::{
structures::{
Expand Down Expand Up @@ -54,6 +55,7 @@ pub async fn process_block(
pub fn poll_block(
rpc_client: Arc<RpcClient>,
block_notification_sender: Sender<ProducedBlock>,
blockinfo_notification_sender: Sender<BlockInfo>,
slot_notification: Receiver<SlotNotification>,
num_parallel_tasks: usize,
) -> Vec<AnyhowJoinHandle> {
Expand All @@ -66,6 +68,7 @@ pub fn poll_block(

for _i in 0..num_parallel_tasks {
let block_notification_sender = block_notification_sender.clone();
let blockinfo_notification_sender = blockinfo_notification_sender.clone();
let rpc_client = rpc_client.clone();
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
let slot_retry_queue_sx = slot_retry_queue_sx.clone();
Expand All @@ -79,9 +82,13 @@ pub fn poll_block(
process_block(rpc_client.as_ref(), slot, commitment_config).await;
match processed_block {
Some(processed_block) => {
let block_info = map_block_info(&processed_block);
block_notification_sender
.send(processed_block)
.context("Processed block should be sent")?;
blockinfo_notification_sender
.send(block_info)
.context("Processed block info should be sent")?;
// schedule to get finalized commitment
if commitment_config.commitment != CommitmentLevel::Finalized {
let retry_at = tokio::time::Instant::now()
Expand Down Expand Up @@ -332,6 +339,16 @@ pub fn from_ui_block(
ProducedBlock::new(inner, commitment_config)
}

fn map_block_info(produced_block: &ProducedBlock) -> BlockInfo {
BlockInfo {
slot: produced_block.slot,
block_height: produced_block.block_height,
blockhash: produced_block.blockhash,
commitment_config: produced_block.commitment_config,
block_time: produced_block.block_time,
}
}

#[inline]
fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 {
(units as u64 * 1000) / additional_fee as u64
Expand Down
12 changes: 12 additions & 0 deletions core/src/stores/block_information_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::sync::RwLock;

use crate::structures::block_info::BlockInfo;
use crate::structures::produced_block::ProducedBlock;
use solana_sdk::hash::Hash;

Expand All @@ -33,6 +34,17 @@ impl BlockInformation {
block_time: block.block_time,
}
}
pub fn from_block_info(block_info: &BlockInfo) -> Self {
BlockInformation {
slot: block_info.slot,
block_height: block_info.block_height,
last_valid_blockheight: block_info.block_height + MAX_RECENT_BLOCKHASHES as u64,
cleanup_slot: block_info.block_height + 1000,
blockhash: block_info.blockhash,
commitment_config: block_info.commitment_config,
block_time: block_info.block_time,
}
}
}

/// - Block Information Store
Expand Down
11 changes: 11 additions & 0 deletions core/src/structures/block_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::hash::Hash;

#[derive(Clone, Debug)]
pub struct BlockInfo {
pub slot: u64,
pub block_height: u64,
pub blockhash: Hash,
pub commitment_config: CommitmentConfig,
pub block_time: u64,
}
1 change: 1 addition & 0 deletions core/src/structures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod account_data;
pub mod account_filter;
pub mod block_info;
pub mod epoch;
pub mod identity_stakes;
pub mod leader_data;
Expand Down
9 changes: 9 additions & 0 deletions core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@ use std::sync::Arc;
use solana_rpc_client_api::response::{RpcContactInfo, RpcVoteAccountStatus};
use tokio::sync::broadcast::Receiver;

use crate::structures::block_info::BlockInfo;
use crate::{
structures::{produced_block::ProducedBlock, slot_notification::SlotNotification},
traits::subscription_sink::SubscriptionSink,
};

// full blocks, commitment level: processed, confirmed, finalized
// note: there is no guarantee about the order
// note: there is no guarantee about the order wrt commitment level
// note: there is no guarantee about the order wrt block vs block meta
pub type BlockStream = Receiver<ProducedBlock>;
// block info (slot, blockhash, etc), commitment level: processed, confirmed, finalized
// note: there is no guarantee about the order wrt commitment level
pub type BlockInfoStream = Receiver<BlockInfo>;
pub type SlotStream = Receiver<SlotNotification>;

pub type VoteAccountStream = Receiver<RpcVoteAccountStatus>;
pub type ClusterInfoStream = Receiver<Vec<RpcContactInfo>>;
pub type SubscptionHanderSink = Arc<dyn SubscriptionSink>;
45 changes: 26 additions & 19 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ use solana_lite_rpc_core::structures::account_filter::AccountFilters;
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use solana_lite_rpc_core::structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender,
produced_block::ProducedBlock,
};
use solana_lite_rpc_core::traits::address_lookup_table_interface::AddressLookupTableInterface;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::types::{BlockInfoStream, BlockStream};
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService;
use solana_lite_rpc_services::data_caching_service::DataCachingService;
Expand All @@ -55,6 +54,7 @@ use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
use solana_lite_rpc_services::tx_sender::TxSender;

use lite_rpc::postgres_logger;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use solana_lite_rpc_prioritization_fees::start_block_priofees_task;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
Expand All @@ -75,27 +75,27 @@ use tracing_subscriber::EnvFilter;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

async fn get_latest_block(
mut block_stream: BlockStream,
async fn get_latest_block_info(
mut blockinfo_stream: BlockInfoStream,
commitment_config: CommitmentConfig,
) -> ProducedBlock {
) -> BlockInfo {
let started = Instant::now();
loop {
match timeout(Duration::from_millis(500), block_stream.recv()).await {
Ok(Ok(block)) => {
if block.commitment_config == commitment_config {
return block;
match timeout(Duration::from_millis(500), blockinfo_stream.recv()).await {
Ok(Ok(block_info)) => {
if block_info.commitment_config == commitment_config {
return block_info;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block ({}) ... {:.02}ms",
"waiting for latest block info ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(_error)) => {
panic!("Did not recv blocks");
panic!("Did not recv block info");
}
}
}
Expand Down Expand Up @@ -201,6 +201,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
let EndpointStreaming {
// note: blocks_notifier will be dropped at some point
blocks_notifier,
blockinfo_notifier,
cluster_info_notifier,
slot_notifier,
vote_account_notifier,
Expand Down Expand Up @@ -235,8 +236,10 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:

let account_service = AccountService::new(account_storage, account_notification_sender);

account_service
.process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe());
account_service.process_account_stream(
account_stream.resubscribe(),
blockinfo_notifier.resubscribe(),
);

account_service
.populate_from_rpc(
Expand All @@ -250,21 +253,24 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
None
};

info!("Waiting for first finalized block...");
let finalized_block =
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
info!("Got finalized block: {:?}", finalized_block.slot);
info!("Waiting for first finalized block info...");
let finalized_block_info = get_latest_block_info(
blockinfo_notifier.resubscribe(),
CommitmentConfig::finalized(),
)
.await;
info!("Got finalized block info: {:?}", finalized_block_info.slot);

let (epoch_data, _current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?;

let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
BlockInformationStore::new(BlockInformation::from_block_info(&finalized_block_info));

let data_cache = DataCache {
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalized_block.slot),
slot_cache: SlotCache::new(finalized_block_info.slot),
tx_subs: SubscriptionStore::default(),
txs: TxStore {
store: Arc::new(DashMap::new()),
Expand All @@ -281,6 +287,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
// to avoid laggin we resubscribe to block notification
let data_caching_service = data_cache_service.listen(
blocks_notifier.resubscribe(),
blockinfo_notifier.resubscribe(),
slot_notifier.resubscribe(),
cluster_info_notifier,
vote_account_notifier,
Expand Down
7 changes: 6 additions & 1 deletion lite-rpc/src/service_spawner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use solana_lite_rpc_core::types::BlockInfoStream;
use solana_lite_rpc_core::{
stores::data_cache::DataCache,
structures::notifications::NotificationSender,
Expand All @@ -14,6 +15,7 @@ use solana_lite_rpc_services::{
tx_sender::TxSender,
};
use std::time::Duration;

pub struct ServiceSpawner {
pub prometheus_addr: String,
pub data_cache: DataCache,
Expand All @@ -38,9 +40,11 @@ impl ServiceSpawner {
}
}

pub async fn spawn_data_caching_service(
// TODO remove
pub async fn _spawn_data_caching_service(
&self,
block_notifier: BlockStream,
blockinfo_notifier: BlockInfoStream,
slot_notification: SlotStream,
cluster_info_notification: ClusterInfoStream,
va_notification: VoteAccountStream,
Expand All @@ -52,6 +56,7 @@ impl ServiceSpawner {

data_service.listen(
block_notifier,
blockinfo_notifier,
slot_notification,
cluster_info_notification,
va_notification,
Expand Down
Loading

0 comments on commit cd9df11

Please sign in to comment.