Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dead slot notification in geyser #3163

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ impl ReplayStage {
&bank_notification_sender,
&rewards_recorder_sender,
&rpc_subscriptions,
&slot_status_notifier,
&mut duplicate_slots_tracker,
&duplicate_confirmed_slots,
&mut epoch_slots_frozen_slots,
Expand Down Expand Up @@ -2253,6 +2254,7 @@ impl ReplayStage {
root: Slot,
err: &BlockstoreProcessorError,
rpc_subscriptions: &Arc<RpcSubscriptions>,
slot_status_notifier: &Option<SlotStatusNotifier>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
duplicate_confirmed_slots: &DuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
Expand Down Expand Up @@ -2293,11 +2295,21 @@ impl ReplayStage {

blockstore.slots_stats.mark_dead(slot);

let err = format!("error: {err:?}");

if let Some(slot_status_notifier) = slot_status_notifier {
slot_status_notifier
.read()
.unwrap()
.notify_slot_dead(slot, err.clone());
}

rpc_subscriptions.notify_slot_update(SlotUpdate::Dead {
slot,
err: format!("error: {err:?}"),
err,
timestamp: timestamp(),
});

let dead_state = DeadState::new_from_state(
slot,
duplicate_slots_tracker,
Expand Down Expand Up @@ -3005,6 +3017,7 @@ impl ReplayStage {
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
slot_status_notifier: &Option<SlotStatusNotifier>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
duplicate_confirmed_slots: &DuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
Expand Down Expand Up @@ -3045,6 +3058,7 @@ impl ReplayStage {
root,
err,
rpc_subscriptions,
slot_status_notifier,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
Expand Down Expand Up @@ -3093,6 +3107,7 @@ impl ReplayStage {
root,
&BlockstoreProcessorError::InvalidTransaction(err),
rpc_subscriptions,
slot_status_notifier,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
Expand Down Expand Up @@ -3124,6 +3139,7 @@ impl ReplayStage {
root,
&result_err,
rpc_subscriptions,
slot_status_notifier,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
Expand Down Expand Up @@ -3302,6 +3318,7 @@ impl ReplayStage {
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
slot_status_notifier: &Option<SlotStatusNotifier>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
duplicate_confirmed_slots: &DuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
Expand Down Expand Up @@ -3384,6 +3401,7 @@ impl ReplayStage {
bank_notification_sender,
rewards_recorder_sender,
rpc_subscriptions,
slot_status_notifier,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
Expand Down Expand Up @@ -4212,6 +4230,7 @@ pub(crate) mod tests {
solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc::{create_test_transaction_entries, populate_blockstore_for_tests},
slot_status_notifier::SlotStatusNotifierInterface,
},
solana_runtime::{
accounts_background_service::AbsRequestSender,
Expand All @@ -4237,7 +4256,7 @@ pub(crate) mod tests {
std::{
fs::remove_dir_all,
iter,
sync::{atomic::AtomicU64, Arc, RwLock},
sync::{atomic::AtomicU64, Arc, Mutex, RwLock},
},
tempfile::tempdir,
test_case::test_case,
Expand Down Expand Up @@ -4882,6 +4901,34 @@ pub(crate) mod tests {
);
}

struct SlotStatusNotifierForTest {
dead_slots: Arc<Mutex<HashSet<Slot>>>,
}

impl SlotStatusNotifierForTest {
pub fn new(dead_slots: Arc<Mutex<HashSet<Slot>>>) -> Self {
Self { dead_slots }
}
}

impl SlotStatusNotifierInterface for SlotStatusNotifierForTest {
fn notify_slot_confirmed(&self, _slot: Slot, _parent: Option<Slot>) {}

fn notify_slot_processed(&self, _slot: Slot, _parent: Option<Slot>) {}

fn notify_slot_rooted(&self, _slot: Slot, _parent: Option<Slot>) {}

fn notify_first_shred_received(&self, _slot: Slot) {}

fn notify_completed(&self, _slot: Slot) {}

fn notify_created_bank(&self, _slot: Slot, _parent: Slot) {}

fn notify_slot_dead(&self, slot: Slot, _error: String) {
self.dead_slots.lock().unwrap().insert(slot);
}
}

// Given a shred and a fatal expected error, check that replaying that shred causes causes the fork to be
// marked as dead. Returns the error for caller to verify.
fn check_dead_fork<F>(shred_to_insert: F) -> result::Result<(), BlockstoreProcessorError>
Expand Down Expand Up @@ -4950,13 +4997,20 @@ pub(crate) mod tests {
));
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
let dead_slots = Arc::new(Mutex::new(HashSet::default()));

let slot_status_notifier: Option<SlotStatusNotifier> = Some(Arc::new(RwLock::new(
SlotStatusNotifierForTest::new(dead_slots.clone()),
)));

if let Err(err) = &res {
ReplayStage::mark_dead_slot(
&blockstore,
&bank1,
0,
err,
&rpc_subscriptions,
&slot_status_notifier,
&mut DuplicateSlotsTracker::default(),
&DuplicateConfirmedSlots::new(),
&mut EpochSlotsFrozenSlots::default(),
Expand All @@ -4967,7 +5021,7 @@ pub(crate) mod tests {
&mut PurgeRepairSlotCounter::default(),
);
}

assert!(dead_slots.lock().unwrap().contains(&bank1.slot()));
// Check that the erroring bank was marked as dead in the progress map
assert!(progress
.get(&bank1.slot())
Expand Down
8 changes: 6 additions & 2 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ pub enum GeyserPluginError {
}

/// The current status of a slot
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(u32)]
pub enum SlotStatus {
/// The highest slot of the heaviest fork processed by the node. Ledger state at this slot is
Expand All @@ -328,6 +328,9 @@ pub enum SlotStatus {

/// A new bank fork is created with the slot
CreatedBank,

/// A slot is marked dead
Dead(String),
}

impl SlotStatus {
Expand All @@ -339,6 +342,7 @@ impl SlotStatus {
SlotStatus::FirstShredReceived => "first_shread_received",
SlotStatus::Completed => "completed",
SlotStatus::CreatedBank => "created_bank",
SlotStatus::Dead(_error) => "dead",
}
}
}
Expand Down Expand Up @@ -419,7 +423,7 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
&self,
slot: Slot,
parent: Option<u64>,
status: SlotStatus,
status: &SlotStatus,
) -> Result<()> {
Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion geyser-plugin-manager/src/slot_status_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl SlotStatusNotifierInterface for SlotStatusNotifierImpl {
fn notify_created_bank(&self, slot: Slot, parent: Slot) {
self.notify_slot_status(slot, Some(parent), SlotStatus::CreatedBank);
}

fn notify_slot_dead(&self, slot: Slot, error: String) {
self.notify_slot_status(slot, None, SlotStatus::Dead(error));
}
}

impl SlotStatusNotifierImpl {
Expand All @@ -52,7 +56,7 @@ impl SlotStatusNotifierImpl {

for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-update-slot");
match plugin.update_slot_status(slot, parent, slot_status) {
match plugin.update_slot_status(slot, parent, &slot_status) {
Err(err) => {
error!(
"Failed to update slot status at slot {}, error: {} to plugin {}",
Expand Down
3 changes: 3 additions & 0 deletions rpc/src/slot_status_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub trait SlotStatusNotifierInterface {

/// Notified when the slot has bank created.
fn notify_created_bank(&self, slot: Slot, parent: Slot);

/// Notified when the slot is marked "Dead"
fn notify_slot_dead(&self, slot: Slot, error: String);
}

pub type SlotStatusNotifier = Arc<RwLock<dyn SlotStatusNotifierInterface + Sync + Send>>;
Loading