From cd9df115014c40ca4300ff914196c2f6afa174dd Mon Sep 17 00:00:00 2001 From: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com> Date: Fri, 22 Mar 2024 19:22:38 +0100 Subject: [PATCH] use geyser blockmeta to progress finalizedslot (#367) introduce BlockInfo on a new BlockInfoStream; maps from yellowstone BlockMeta fix: last_finalized_slot must progress unconditionally --- accounts/src/account_service.rs | 16 +- .../tests/blockstore_integration_tests.rs | 2 +- cluster-endpoints/src/endpoint_stremers.rs | 3 +- cluster-endpoints/src/grpc_multiplex.rs | 144 +++++++++++------- cluster-endpoints/src/grpc_subscription.rs | 4 +- .../src/json_rpc_subscription.rs | 3 + .../src/rpc_polling/poll_blocks.rs | 17 +++ core/src/stores/block_information_store.rs | 12 ++ core/src/structures/block_info.rs | 11 ++ core/src/structures/mod.rs | 1 + core/src/types.rs | 9 ++ lite-rpc/src/main.rs | 45 +++--- lite-rpc/src/service_spawner.rs | 7 +- services/src/data_caching_service.rs | 38 ++++- 14 files changed, 219 insertions(+), 93 deletions(-) create mode 100644 core/src/structures/block_info.rs diff --git a/accounts/src/account_service.rs b/accounts/src/account_service.rs index 7b9ab410..bcf1f2b6 100644 --- a/accounts/src/account_service.rs +++ b/accounts/src/account_service.rs @@ -4,13 +4,13 @@ use anyhow::bail; use itertools::Itertools; use prometheus::{opts, register_int_gauge, IntGauge}; use solana_account_decoder::{UiAccount, UiDataSliceConfig}; +use solana_lite_rpc_core::types::BlockInfoStream; use solana_lite_rpc_core::{ commitment_utils::Commitment, structures::{ account_data::{AccountData, AccountNotificationMessage, AccountStream}, account_filter::AccountFilters, }, - types::BlockStream, AnyhowJoinHandle, }; use solana_rpc_client::nonblocking::rpc_client::RpcClient; @@ -151,7 +151,7 @@ impl AccountService { pub fn process_account_stream( &self, mut account_stream: AccountStream, - mut block_stream: BlockStream, + mut blockinfo_stream: BlockInfoStream, ) -> Vec { let this = self.clone(); let processed_task = tokio::spawn(async move { @@ -187,19 +187,19 @@ impl AccountService { let this = self.clone(); let block_processing_task = tokio::spawn(async move { loop { - match block_stream.recv().await { - Ok(block_notification) => { - if block_notification.commitment_config.is_processed() { + match blockinfo_stream.recv().await { + Ok(block_info) => { + if block_info.commitment_config.is_processed() { // processed commitment is not processed in this loop continue; } - let commitment = Commitment::from(block_notification.commitment_config); + let commitment = Commitment::from(block_info.commitment_config); let updated_accounts = this .account_store - .process_slot_data(block_notification.slot, commitment) + .process_slot_data(block_info.slot, commitment) .await; - if block_notification.commitment_config.is_finalized() { + if block_info.commitment_config.is_finalized() { ACCOUNT_UPDATES_FINALIZED.add(updated_accounts.len() as i64) } else { ACCOUNT_UPDATES_CONFIRMED.add(updated_accounts.len() as i64); diff --git a/blockstore/tests/blockstore_integration_tests.rs b/blockstore/tests/blockstore_integration_tests.rs index dadde559..9da84a70 100644 --- a/blockstore/tests/blockstore_integration_tests.rs +++ b/blockstore/tests/blockstore_integration_tests.rs @@ -70,7 +70,7 @@ async fn storage_test() { let (slot_notifier, _jh_multiplex_slotstream) = create_grpc_multiplex_processed_slots_subscription(grpc_sources.clone()); - let (blocks_notifier, _jh_multiplex_blockstream) = + let (blocks_notifier, _blockmeta_output_stream, _jh_multiplex_blockstream) = create_grpc_multiplex_blocks_subscription(grpc_sources); let (epoch_cache, _) = EpochCache::bootstrap_epoch(&rpc_client).await.unwrap(); diff --git a/cluster-endpoints/src/endpoint_stremers.rs b/cluster-endpoints/src/endpoint_stremers.rs index aeab980c..97a65a44 100644 --- a/cluster-endpoints/src/endpoint_stremers.rs +++ b/cluster-endpoints/src/endpoint_stremers.rs @@ -1,11 +1,12 @@ use solana_lite_rpc_core::{ structures::account_data::AccountStream, - types::{BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream}, + types::{BlockInfoStream, BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream}, }; /// subscribers to broadcast channels should assume that channels are not getting closed unless the system is shutting down pub struct EndpointStreaming { pub blocks_notifier: BlockStream, + pub blockinfo_notifier: BlockInfoStream, pub slot_notifier: SlotStream, pub vote_account_notifier: VoteAccountStream, pub cluster_info_notifier: ClusterInfoStream, diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index fb000a97..85c04b5a 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -1,6 +1,5 @@ use anyhow::{bail, Context}; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc; -use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; use geyser_grpc_connector::{GeyserFilter, GrpcSourceConfig, Message}; use log::{debug, info, trace, warn}; use solana_lite_rpc_core::structures::produced_block::ProducedBlock; @@ -10,6 +9,7 @@ use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use solana_lite_rpc_core::solana_utils::hash_from_str; +use solana_lite_rpc_core::structures::block_info::BlockInfo; use std::collections::{BTreeSet, HashMap, HashSet}; use std::time::Duration; use tokio::sync::broadcast::Receiver; @@ -109,9 +109,9 @@ fn create_grpc_multiplex_processed_block_task( } // backpressure: the mpsc sender will block grpc stream until capacity is available -fn create_grpc_multiplex_block_meta_task( +fn create_grpc_multiplex_block_info_task( grpc_sources: &Vec, - block_meta_sender: tokio::sync::mpsc::Sender, + block_info_sender: tokio::sync::mpsc::Sender, commitment_config: CommitmentConfig, ) -> Vec { let (autoconnect_tx, mut blocks_rx) = tokio::sync::mpsc::channel(10); @@ -134,14 +134,24 @@ fn create_grpc_multiplex_block_meta_task( let proposed_slot = block_meta.slot; if proposed_slot > tip { tip = proposed_slot; - let block_meta = BlockMeta { + let block_meta = BlockInfo { slot: proposed_slot, + block_height: block_meta + .block_height + .expect("block_height from geyser block meta") + .block_height, blockhash: hash_from_str(&block_meta.blockhash) .expect("valid blockhash"), + commitment_config, + block_time: block_meta + .block_time + .expect("block_time from geyser block meta") + .timestamp + as u64, }; let send_started_at = Instant::now(); - let send_result = block_meta_sender + let send_result = block_info_sender .send(block_meta) .await .context("Send block to channel"); @@ -188,7 +198,11 @@ fn create_grpc_multiplex_block_meta_task( /// the channel must never be closed pub fn create_grpc_multiplex_blocks_subscription( grpc_sources: Vec, -) -> (Receiver, AnyhowJoinHandle) { +) -> ( + Receiver, + Receiver, + AnyhowJoinHandle, +) { info!("Setup grpc multiplexed blocks connection..."); if grpc_sources.is_empty() { info!("- no grpc connection configured"); @@ -198,9 +212,13 @@ pub fn create_grpc_multiplex_blocks_subscription( } // return value is the broadcast receiver - // must NEVER be closed form inside this method + // must NEVER be closed from inside this method let (producedblock_sender, blocks_output_stream) = tokio::sync::broadcast::channel::(32); + // provide information about finalized blocks as quickly as possible + // note that produced block stream might most probably lag behind + let (blockinfo_sender, blockinfo_output_stream) = + tokio::sync::broadcast::channel::(32); let mut reconnect_attempts = 0; @@ -210,10 +228,12 @@ pub fn create_grpc_multiplex_blocks_subscription( // channels must NEVER GET CLOSED (unless full restart of multiplexer) let (processed_block_sender, mut processed_block_reciever) = tokio::sync::mpsc::channel::(10); // experiemental - let (block_meta_sender_confirmed, mut block_meta_reciever_confirmed) = - tokio::sync::mpsc::channel::(500); - let (block_meta_sender_finalized, mut block_meta_reciever_finalized) = - tokio::sync::mpsc::channel::(500); + let (block_info_sender_processed, mut block_info_reciever_processed) = + tokio::sync::mpsc::channel::(500); + let (block_info_sender_confirmed, mut block_info_reciever_confirmed) = + tokio::sync::mpsc::channel::(500); + let (block_info_sender_finalized, mut block_info_reciever_finalized) = + tokio::sync::mpsc::channel::(500); let processed_block_sender = processed_block_sender.clone(); reconnect_attempts += 1; @@ -234,15 +254,22 @@ pub fn create_grpc_multiplex_blocks_subscription( task_list.extend(processed_blocks_tasks); // TODO apply same pattern as in create_grpc_multiplex_processed_block_task - let jh_meta_task_confirmed = create_grpc_multiplex_block_meta_task( + + let jh_meta_task_processed = create_grpc_multiplex_block_info_task( + &grpc_sources, + block_info_sender_processed.clone(), + CommitmentConfig::processed(), + ); + task_list.extend(jh_meta_task_processed); + let jh_meta_task_confirmed = create_grpc_multiplex_block_info_task( &grpc_sources, - block_meta_sender_confirmed.clone(), + block_info_sender_confirmed.clone(), CommitmentConfig::confirmed(), ); task_list.extend(jh_meta_task_confirmed); - let jh_meta_task_finalized = create_grpc_multiplex_block_meta_task( + let jh_meta_task_finalized = create_grpc_multiplex_block_info_task( &grpc_sources, - block_meta_sender_finalized.clone(), + block_info_sender_finalized.clone(), CommitmentConfig::finalized(), ); task_list.extend(jh_meta_task_finalized); @@ -265,10 +292,10 @@ pub fn create_grpc_multiplex_blocks_subscription( let mut startup_completed = false; const MAX_ALLOWED_CLEANUP_WITHOUT_RECV: u8 = 12; // 12*5 = 60s without recving data 'recv_loop: loop { - debug!("processed_block_sender: {}, block_meta_sender_confirmed: {}, block_meta_sender_finalized: {}", + debug!("channel capacities: processed_block_sender={}, block_info_sender_confirmed={}, block_info_sender_finalized={}", processed_block_sender.capacity(), - block_meta_sender_confirmed.capacity(), - block_meta_sender_finalized.capacity() + block_info_sender_confirmed.capacity(), + block_info_sender_finalized.capacity() ); tokio::select! { processed_block = processed_block_reciever.recv() => { @@ -277,6 +304,11 @@ pub fn create_grpc_multiplex_blocks_subscription( let processed_block = processed_block.expect("processed block from stream"); trace!("got processed block {} with blockhash {}", processed_block.slot, processed_block.blockhash.clone()); + + if processed_block.commitment_config.is_finalized() { + last_finalized_slot = last_finalized_slot.max(processed_block.slot); + } + if let Err(e) = producedblock_sender.send(processed_block.clone()) { warn!("produced block channel has no receivers {e:?}"); } @@ -291,14 +323,30 @@ pub fn create_grpc_multiplex_blocks_subscription( } } recent_processed_blocks.insert(processed_block.blockhash, processed_block); + }, - meta_confirmed = block_meta_reciever_confirmed.recv() => { + blockinfo_processed = block_info_reciever_processed.recv() => { + let blockinfo_processed = blockinfo_processed.expect("processed block info from stream"); + let blockhash = blockinfo_processed.blockhash; + trace!("got processed blockinfo {} with blockhash {}", + blockinfo_processed.slot, blockhash); + if let Err(e) = blockinfo_sender.send(blockinfo_processed) { + warn!("Processed blockinfo channel has no receivers {e:?}"); + } + }, + blockinfo_confirmed = block_info_reciever_confirmed.recv() => { cleanup_without_confirmed_recv_blocks_meta = 0; - let meta_confirmed = meta_confirmed.expect("confirmed block meta from stream"); - let blockhash = meta_confirmed.blockhash; + let blockinfo_confirmed = blockinfo_confirmed.expect("confirmed block info from stream"); + let blockhash = blockinfo_confirmed.blockhash; + trace!("got confirmed blockinfo {} with blockhash {}", + blockinfo_confirmed.slot, blockhash); + if let Err(e) = blockinfo_sender.send(blockinfo_confirmed) { + warn!("Confirmed blockinfo channel has no receivers {e:?}"); + } + if let Some(cached_processed_block) = recent_processed_blocks.get(&blockhash) { let confirmed_block = cached_processed_block.to_confirmed_block(); - debug!("got confirmed blockmeta {} with blockhash {}", + debug!("got confirmed blockinfo {} with blockhash {}", confirmed_block.slot, confirmed_block.blockhash.clone()); if let Err(e) = producedblock_sender.send(confirmed_block) { warn!("confirmed block channel has no receivers {e:?}"); @@ -309,23 +357,29 @@ pub fn create_grpc_multiplex_blocks_subscription( confirmed_block_not_yet_processed.len(), recent_processed_blocks.len()); } }, - meta_finalized = block_meta_reciever_finalized.recv() => { + blockinfo_finalized = block_info_reciever_finalized.recv() => { cleanup_without_finalized_recv_blocks_meta = 0; - let meta_finalized = meta_finalized.expect("finalized block meta from stream"); - // let _span = debug_span!("sequence_block_meta_finalized", ?meta_finalized.slot).entered(); - let blockhash = meta_finalized.blockhash; + let blockinfo_finalized = blockinfo_finalized.expect("finalized block info from stream"); + last_finalized_slot = last_finalized_slot.max(blockinfo_finalized.slot); + + let blockhash = blockinfo_finalized.blockhash; + trace!("got finalized blockinfo {} with blockhash {}", + blockinfo_finalized.slot, blockhash); + if let Err(e) = blockinfo_sender.send(blockinfo_finalized) { + warn!("Finalized blockinfo channel has no receivers {e:?}"); + } + if let Some(cached_processed_block) = recent_processed_blocks.remove(&blockhash) { let finalized_block = cached_processed_block.to_finalized_block(); - last_finalized_slot = finalized_block.slot; startup_completed = true; - debug!("got finalized blockmeta {} with blockhash {}", + debug!("got finalized blockinfo {} with blockhash {}", finalized_block.slot, finalized_block.blockhash.clone()); if let Err(e) = producedblock_sender.send(finalized_block) { warn!("Finalized block channel has no receivers {e:?}"); } } else if startup_completed { // this warning is ok for first few blocks when we start lrpc - log::warn!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash); + log::warn!("finalized blockinfo received for blockhash {} which was never seen or already emitted", blockhash); finalized_block_not_yet_processed.insert(blockhash); } }, @@ -334,7 +388,7 @@ pub fn create_grpc_multiplex_blocks_subscription( if cleanup_without_recv_full_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV || cleanup_without_confirmed_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV || cleanup_without_finalized_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV { - log::error!("block or block meta geyser stream stopped - restarting multiplexer ({}-{}-{})", + log::error!("block or block info geyser stream stopped - restarting multiplexer ({}-{}-{})", cleanup_without_recv_full_blocks, cleanup_without_confirmed_recv_blocks_meta, cleanup_without_finalized_recv_blocks_meta,); // throttle a bit sleep(Duration::from_millis(1500)).await; @@ -358,7 +412,11 @@ pub fn create_grpc_multiplex_blocks_subscription( } // -- END reconnect loop }); - (blocks_output_stream, jh_block_emitter_task) + ( + blocks_output_stream, + blockinfo_output_stream, + jh_block_emitter_task, + ) } pub fn create_grpc_multiplex_processed_slots_subscription( @@ -443,30 +501,6 @@ pub fn create_grpc_multiplex_processed_slots_subscription( (multiplexed_messages_rx, jh_multiplex_task) } -#[allow(dead_code)] -struct BlockMeta { - pub slot: Slot, - pub blockhash: solana_sdk::hash::Hash, -} - -struct BlockMetaExtractor(CommitmentConfig); - -impl FromYellowstoneExtractor for BlockMetaExtractor { - type Target = BlockMeta; - fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(u64, BlockMeta)> { - match update.update_oneof { - Some(UpdateOneof::BlockMeta(block_meta)) => Some(( - block_meta.slot, - BlockMeta { - slot: block_meta.slot, - blockhash: hash_from_str(&block_meta.blockhash).unwrap(), - }, - )), - _ => None, - } - } -} - fn map_slot_from_yellowstone_update(update: SubscribeUpdate) -> Option { match update.update_oneof { Some(UpdateOneof::Slot(update_slot_message)) => Some(update_slot_message.slot), diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 20c852f1..2f6a0bcd 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -271,7 +271,7 @@ pub fn create_grpc_subscription( let (slot_multiplex_channel, jh_multiplex_slotstream) = create_grpc_multiplex_processed_slots_subscription(grpc_sources.clone()); - let (block_multiplex_channel, jh_multiplex_blockstream) = + let (block_multiplex_channel, blockmeta_channel, jh_multiplex_blockstream) = create_grpc_multiplex_blocks_subscription(grpc_sources.clone()); let cluster_info_polling = poll_cluster_info(rpc_client.clone(), cluster_info_sx); @@ -283,6 +283,7 @@ pub fn create_grpc_subscription( create_grpc_account_streaming(grpc_sources, accounts_filter); let streamers = EndpointStreaming { blocks_notifier: block_multiplex_channel, + blockinfo_notifier: blockmeta_channel, slot_notifier: slot_multiplex_channel, cluster_info_notifier, vote_account_notifier, @@ -300,6 +301,7 @@ pub fn create_grpc_subscription( } else { let streamers = EndpointStreaming { blocks_notifier: block_multiplex_channel, + blockinfo_notifier: blockmeta_channel, slot_notifier: slot_multiplex_channel, cluster_info_notifier, vote_account_notifier, diff --git a/cluster-endpoints/src/json_rpc_subscription.rs b/cluster-endpoints/src/json_rpc_subscription.rs index ba32c475..8c0a4eef 100644 --- a/cluster-endpoints/src/json_rpc_subscription.rs +++ b/cluster-endpoints/src/json_rpc_subscription.rs @@ -16,6 +16,7 @@ pub fn create_json_rpc_polling_subscription( ) -> anyhow::Result<(EndpointStreaming, Vec)> { let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(16); let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(16); + let (blockinfo_sx, blockinfo_notifier) = tokio::sync::broadcast::channel(16); let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(16); let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(16); // does not support accounts support with rpc polling @@ -26,6 +27,7 @@ pub fn create_json_rpc_polling_subscription( let mut block_polling_tasks = poll_block( rpc_client.clone(), block_sx, + blockinfo_sx, slot_notifier.resubscribe(), num_parallel_tasks, ); @@ -39,6 +41,7 @@ pub fn create_json_rpc_polling_subscription( let streamers = EndpointStreaming { blocks_notifier, + blockinfo_notifier, slot_notifier, cluster_info_notifier, vote_account_notifier, diff --git a/cluster-endpoints/src/rpc_polling/poll_blocks.rs b/cluster-endpoints/src/rpc_polling/poll_blocks.rs index 65979e47..e0238226 100644 --- a/cluster-endpoints/src/rpc_polling/poll_blocks.rs +++ b/cluster-endpoints/src/rpc_polling/poll_blocks.rs @@ -1,6 +1,7 @@ use anyhow::{bail, Context}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_lite_rpc_core::solana_utils::hash_from_str; +use solana_lite_rpc_core::structures::block_info::BlockInfo; use solana_lite_rpc_core::structures::produced_block::{ProducedBlockInner, TransactionInfo}; use solana_lite_rpc_core::{ structures::{ @@ -54,6 +55,7 @@ pub async fn process_block( pub fn poll_block( rpc_client: Arc, block_notification_sender: Sender, + blockinfo_notification_sender: Sender, slot_notification: Receiver, num_parallel_tasks: usize, ) -> Vec { @@ -66,6 +68,7 @@ pub fn poll_block( for _i in 0..num_parallel_tasks { let block_notification_sender = block_notification_sender.clone(); + let blockinfo_notification_sender = blockinfo_notification_sender.clone(); let rpc_client = rpc_client.clone(); let block_schedule_queue_rx = block_schedule_queue_rx.clone(); let slot_retry_queue_sx = slot_retry_queue_sx.clone(); @@ -79,9 +82,13 @@ pub fn poll_block( process_block(rpc_client.as_ref(), slot, commitment_config).await; match processed_block { Some(processed_block) => { + let block_info = map_block_info(&processed_block); block_notification_sender .send(processed_block) .context("Processed block should be sent")?; + blockinfo_notification_sender + .send(block_info) + .context("Processed block info should be sent")?; // schedule to get finalized commitment if commitment_config.commitment != CommitmentLevel::Finalized { let retry_at = tokio::time::Instant::now() @@ -332,6 +339,16 @@ pub fn from_ui_block( ProducedBlock::new(inner, commitment_config) } +fn map_block_info(produced_block: &ProducedBlock) -> BlockInfo { + BlockInfo { + slot: produced_block.slot, + block_height: produced_block.block_height, + blockhash: produced_block.blockhash, + commitment_config: produced_block.commitment_config, + block_time: produced_block.block_time, + } +} + #[inline] fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 { (units as u64 * 1000) / additional_fee as u64 diff --git a/core/src/stores/block_information_store.rs b/core/src/stores/block_information_store.rs index 598d9bbc..5ab7c547 100644 --- a/core/src/stores/block_information_store.rs +++ b/core/src/stores/block_information_store.rs @@ -7,6 +7,7 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use tokio::sync::RwLock; +use crate::structures::block_info::BlockInfo; use crate::structures::produced_block::ProducedBlock; use solana_sdk::hash::Hash; @@ -33,6 +34,17 @@ impl BlockInformation { block_time: block.block_time, } } + pub fn from_block_info(block_info: &BlockInfo) -> Self { + BlockInformation { + slot: block_info.slot, + block_height: block_info.block_height, + last_valid_blockheight: block_info.block_height + MAX_RECENT_BLOCKHASHES as u64, + cleanup_slot: block_info.block_height + 1000, + blockhash: block_info.blockhash, + commitment_config: block_info.commitment_config, + block_time: block_info.block_time, + } + } } /// - Block Information Store diff --git a/core/src/structures/block_info.rs b/core/src/structures/block_info.rs new file mode 100644 index 00000000..8f112d87 --- /dev/null +++ b/core/src/structures/block_info.rs @@ -0,0 +1,11 @@ +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::hash::Hash; + +#[derive(Clone, Debug)] +pub struct BlockInfo { + pub slot: u64, + pub block_height: u64, + pub blockhash: Hash, + pub commitment_config: CommitmentConfig, + pub block_time: u64, +} diff --git a/core/src/structures/mod.rs b/core/src/structures/mod.rs index 41c1b30d..889d7c62 100644 --- a/core/src/structures/mod.rs +++ b/core/src/structures/mod.rs @@ -2,6 +2,7 @@ pub mod account_data; pub mod account_filter; +pub mod block_info; pub mod epoch; pub mod identity_stakes; pub mod leader_data; diff --git a/core/src/types.rs b/core/src/types.rs index 96ba3cee..733925d1 100644 --- a/core/src/types.rs +++ b/core/src/types.rs @@ -3,13 +3,22 @@ use std::sync::Arc; use solana_rpc_client_api::response::{RpcContactInfo, RpcVoteAccountStatus}; use tokio::sync::broadcast::Receiver; +use crate::structures::block_info::BlockInfo; use crate::{ structures::{produced_block::ProducedBlock, slot_notification::SlotNotification}, traits::subscription_sink::SubscriptionSink, }; +// full blocks, commitment level: processed, confirmed, finalized +// note: there is no guarantee about the order +// note: there is no guarantee about the order wrt commitment level +// note: there is no guarantee about the order wrt block vs block meta pub type BlockStream = Receiver; +// block info (slot, blockhash, etc), commitment level: processed, confirmed, finalized +// note: there is no guarantee about the order wrt commitment level +pub type BlockInfoStream = Receiver; pub type SlotStream = Receiver; + pub type VoteAccountStream = Receiver; pub type ClusterInfoStream = Receiver>; pub type SubscptionHanderSink = Arc; diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 3f9fbbdb..40eb380f 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -42,10 +42,9 @@ use solana_lite_rpc_core::structures::account_filter::AccountFilters; use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule; use solana_lite_rpc_core::structures::{ epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender, - produced_block::ProducedBlock, }; use solana_lite_rpc_core::traits::address_lookup_table_interface::AddressLookupTableInterface; -use solana_lite_rpc_core::types::BlockStream; +use solana_lite_rpc_core::types::{BlockInfoStream, BlockStream}; use solana_lite_rpc_core::AnyhowJoinHandle; use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService; use solana_lite_rpc_services::data_caching_service::DataCachingService; @@ -55,6 +54,7 @@ use solana_lite_rpc_services::transaction_replayer::TransactionReplayer; use solana_lite_rpc_services::tx_sender::TxSender; use lite_rpc::postgres_logger; +use solana_lite_rpc_core::structures::block_info::BlockInfo; use solana_lite_rpc_prioritization_fees::start_block_priofees_task; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::commitment_config::CommitmentConfig; @@ -75,27 +75,27 @@ use tracing_subscriber::EnvFilter; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -async fn get_latest_block( - mut block_stream: BlockStream, +async fn get_latest_block_info( + mut blockinfo_stream: BlockInfoStream, commitment_config: CommitmentConfig, -) -> ProducedBlock { +) -> BlockInfo { let started = Instant::now(); loop { - match timeout(Duration::from_millis(500), block_stream.recv()).await { - Ok(Ok(block)) => { - if block.commitment_config == commitment_config { - return block; + match timeout(Duration::from_millis(500), blockinfo_stream.recv()).await { + Ok(Ok(block_info)) => { + if block_info.commitment_config == commitment_config { + return block_info; } } Err(_elapsed) => { debug!( - "waiting for latest block ({}) ... {:.02}ms", + "waiting for latest block info ({}) ... {:.02}ms", commitment_config.commitment, started.elapsed().as_secs_f32() * 1000.0 ); } Ok(Err(_error)) => { - panic!("Did not recv blocks"); + panic!("Did not recv block info"); } } } @@ -201,6 +201,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: let EndpointStreaming { // note: blocks_notifier will be dropped at some point blocks_notifier, + blockinfo_notifier, cluster_info_notifier, slot_notifier, vote_account_notifier, @@ -235,8 +236,10 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: let account_service = AccountService::new(account_storage, account_notification_sender); - account_service - .process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe()); + account_service.process_account_stream( + account_stream.resubscribe(), + blockinfo_notifier.resubscribe(), + ); account_service .populate_from_rpc( @@ -250,21 +253,24 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: None }; - info!("Waiting for first finalized block..."); - let finalized_block = - get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await; - info!("Got finalized block: {:?}", finalized_block.slot); + info!("Waiting for first finalized block info..."); + let finalized_block_info = get_latest_block_info( + blockinfo_notifier.resubscribe(), + CommitmentConfig::finalized(), + ) + .await; + info!("Got finalized block info: {:?}", finalized_block_info.slot); let (epoch_data, _current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?; let block_information_store = - BlockInformationStore::new(BlockInformation::from_block(&finalized_block)); + BlockInformationStore::new(BlockInformation::from_block_info(&finalized_block_info)); let data_cache = DataCache { block_information_store, cluster_info: ClusterInfo::default(), identity_stakes: IdentityStakes::new(validator_identity.pubkey()), - slot_cache: SlotCache::new(finalized_block.slot), + slot_cache: SlotCache::new(finalized_block_info.slot), tx_subs: SubscriptionStore::default(), txs: TxStore { store: Arc::new(DashMap::new()), @@ -281,6 +287,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: // to avoid laggin we resubscribe to block notification let data_caching_service = data_cache_service.listen( blocks_notifier.resubscribe(), + blockinfo_notifier.resubscribe(), slot_notifier.resubscribe(), cluster_info_notifier, vote_account_notifier, diff --git a/lite-rpc/src/service_spawner.rs b/lite-rpc/src/service_spawner.rs index 69669509..0be16a1d 100644 --- a/lite-rpc/src/service_spawner.rs +++ b/lite-rpc/src/service_spawner.rs @@ -1,3 +1,4 @@ +use solana_lite_rpc_core::types::BlockInfoStream; use solana_lite_rpc_core::{ stores::data_cache::DataCache, structures::notifications::NotificationSender, @@ -14,6 +15,7 @@ use solana_lite_rpc_services::{ tx_sender::TxSender, }; use std::time::Duration; + pub struct ServiceSpawner { pub prometheus_addr: String, pub data_cache: DataCache, @@ -38,9 +40,11 @@ impl ServiceSpawner { } } - pub async fn spawn_data_caching_service( + // TODO remove + pub async fn _spawn_data_caching_service( &self, block_notifier: BlockStream, + blockinfo_notifier: BlockInfoStream, slot_notification: SlotStream, cluster_info_notification: ClusterInfoStream, va_notification: VoteAccountStream, @@ -52,6 +56,7 @@ impl ServiceSpawner { data_service.listen( block_notifier, + blockinfo_notifier, slot_notification, cluster_info_notification, va_notification, diff --git a/services/src/data_caching_service.rs b/services/src/data_caching_service.rs index a97beec4..826ddef5 100644 --- a/services/src/data_caching_service.rs +++ b/services/src/data_caching_service.rs @@ -7,12 +7,14 @@ use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter}; use solana_lite_rpc_core::stores::{ block_information_store::BlockInformation, data_cache::DataCache, }; +use solana_lite_rpc_core::structures::block_info::BlockInfo; use solana_lite_rpc_core::types::{BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream}; use solana_lite_rpc_core::AnyhowJoinHandle; use solana_sdk::clock::MAX_RECENT_BLOCKHASHES; use solana_sdk::commitment_config::CommitmentLevel; use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus}; use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::Receiver; lazy_static::lazy_static! { static ref NB_CLUSTER_NODES: GenericGauge = @@ -43,13 +45,15 @@ impl DataCachingService { pub fn listen( self, block_notifier: BlockStream, + blockinfo_notifier: Receiver, slot_notification: SlotStream, cluster_info_notification: ClusterInfoStream, va_notification: VoteAccountStream, ) -> Vec { - // clone the ledger to move into the processor task let data_cache = self.data_cache.clone(); - // process all the data into the ledger + let block_information_store_block = data_cache.block_information_store.clone(); + let block_information_store_block_info = data_cache.block_information_store.clone(); + let block_cache_jh = tokio::spawn(async move { let mut block_notifier = block_notifier; loop { @@ -64,8 +68,8 @@ impl DataCachingService { } }; - data_cache - .block_information_store + // note: most likely the block has been added from blockinfo_notifier stream already + block_information_store_block .add_block(BlockInformation::from_block(&block)) .await; @@ -76,9 +80,8 @@ impl DataCachingService { }; for tx in &block.transactions { - let block_info = data_cache - .block_information_store - .get_block_info(&tx.recent_blockhash); + let block_info = + block_information_store_block.get_block_info(&tx.recent_blockhash); let last_valid_blockheight = if let Some(block_info) = block_info { block_info.last_valid_blockheight } else { @@ -118,6 +121,26 @@ impl DataCachingService { } }); + let blockinfo_cache_jh = tokio::spawn(async move { + let mut blockinfo_notifier = blockinfo_notifier; + loop { + let block_info = match blockinfo_notifier.recv().await { + Ok(block_info) => block_info, + Err(RecvError::Lagged(blockinfo_lagged)) => { + warn!("Lagged {} block info - continue", blockinfo_lagged); + continue; + } + Err(RecvError::Closed) => { + bail!("BlockInfo stream has been closed - abort"); + } + }; + + block_information_store_block_info + .add_block(BlockInformation::from_block_info(&block_info)) + .await; + } + }); + let data_cache = self.data_cache.clone(); let slot_cache_jh = tokio::spawn(async move { let mut slot_notification = slot_notification; @@ -174,6 +197,7 @@ impl DataCachingService { vec![ slot_cache_jh, block_cache_jh, + blockinfo_cache_jh, cluster_info_jh, identity_stakes_jh, cleaning_service,