diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index ae2fefee..2d0f05f3 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -49,7 +49,7 @@ fn create_grpc_multiplex_processed_block_task( let jh_merging_streams = tokio::task::spawn(async move { let mut slots_processed = BTreeSet::::new(); let mut last_tick = Instant::now(); - loop { + 'recv_loop: loop { // recv loop if last_tick.elapsed() > Duration::from_millis(800) { warn!( @@ -70,16 +70,23 @@ fn create_grpc_multiplex_processed_block_task( }; match blocks_rx_result { Some(Message::GeyserSubscribeUpdate(subscribe_update)) => { - let mapfilter = - map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG); - if let Some((slot, produced_block)) = mapfilter { - assert_eq!(COMMITMENT_CONFIG, produced_block.commitment_config); + // note: avoid mapping of full block as long as possible + let extracted_slot = extract_slot_from_yellowstone_update(&subscribe_update); + if let Some(slot) = extracted_slot { // check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value // it means that the slot is too old to process - if !slots_processed.contains(&slot) - && (slots_processed.len() < MAX_SIZE / 2 - || slot > slots_processed.first().cloned().unwrap_or_default()) + if slots_processed.contains(&slot) { + continue 'recv_loop; + } + if slots_processed.len() >= MAX_SIZE / 2 + && slot <= slots_processed.first().cloned().unwrap_or_default() { + continue 'recv_loop; + } + + let mapfilter = + map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG); + if let Some((_slot, produced_block)) = mapfilter { let send_started_at = Instant::now(); let send_result = block_sender .send(produced_block) @@ -552,6 +559,16 @@ pub fn create_grpc_multiplex_processed_slots_subscription( (multiplexed_messages_rx, jh_multiplex_task) } +fn extract_slot_from_yellowstone_update(update: &SubscribeUpdate) -> Option { + match &update.update_oneof { + // list is not exhaustive + Some(UpdateOneof::Slot(update_message)) => Some(update_message.slot), + Some(UpdateOneof::BlockMeta(update_message)) => Some(update_message.slot), + Some(UpdateOneof::Block(update_message)) => Some(update_message.slot), + _ => None, + } +} + fn map_slot_from_yellowstone_update(update: SubscribeUpdate) -> Option { match update.update_oneof { Some(UpdateOneof::Slot(update_slot_message)) => Some(update_slot_message.slot),