diff --git a/bench/src/helpers.rs b/bench/src/helpers.rs index 377ca214..db2632a7 100644 --- a/bench/src/helpers.rs +++ b/bench/src/helpers.rs @@ -157,7 +157,7 @@ fn transaction_size_small() { let rand_string = random_strings.first().unwrap(); let tx = BenchHelper::create_memo_tx_small(rand_string, &payer_keypair, blockhash); - assert_eq!(bincode::serialized_size(&tx).unwrap(), 179); + assert_eq!(bincode::serialized_size(&tx).unwrap(), 231); } #[test] @@ -172,5 +172,5 @@ fn transaction_size_large() { let rand_string = random_strings.first().unwrap(); let tx = BenchHelper::create_memo_tx_large(rand_string, &payer_keypair, blockhash); - assert_eq!(bincode::serialized_size(&tx).unwrap(), 1186); + assert_eq!(bincode::serialized_size(&tx).unwrap(), 1230); } diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index c24e8d14..5f4cc9f8 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -163,6 +163,7 @@ pub fn create_grpc_multiplex_blocks_subscription( let mut cleanup_without_confirmed_recv_blocks_meta: u32 = 0; let mut cleanup_without_finalized_recv_blocks_meta: u32 = 0; let mut confirmed_block_not_yet_processed = HashSet::::new(); + let mut finalized_block_not_yet_processed = HashSet::::new(); // start logging errors when we recieve first finalized block let mut startup_completed = false; @@ -182,6 +183,11 @@ pub fn create_grpc_multiplex_blocks_subscription( warn!("produced block channel has no receivers {e:?}"); } } + if finalized_block_not_yet_processed.remove(&processed_block.blockhash) { + if let Err(e) = producedblock_sender.send(processed_block.to_finalized_block()) { + warn!("produced block channel has no receivers {e:?}"); + } + } recent_processed_blocks.insert(processed_block.blockhash.clone(), processed_block); }, meta_confirmed = confirmed_blockmeta_stream.next() => { @@ -206,6 +212,7 @@ pub fn create_grpc_multiplex_blocks_subscription( let finalized_block = cached_processed_block.to_finalized_block(); last_finalized_slot = finalized_block.slot; startup_completed = true; + log::info!("sending finalized block"); debug!("got finalized blockmeta {} with blockhash {}", finalized_block.slot, finalized_block.blockhash.clone()); if let Err(e) = producedblock_sender.send(finalized_block) { @@ -214,6 +221,7 @@ pub fn create_grpc_multiplex_blocks_subscription( } else if startup_completed { // this warning is ok for first few blocks when we start lrpc log::warn!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash); + finalized_block_not_yet_processed.insert(blockhash); } }, _ = cleanup_tick.tick() => {