From 4014ee149f2afb0f6c362e739fd6bf0eeac5d05b Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 23 Jun 2024 09:44:47 -0300 Subject: [PATCH] refactor(grpc): improve sync mapping (#276) --- src/serve/grpc/sync.rs | 43 +++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/serve/grpc/sync.rs b/src/serve/grpc/sync.rs index 3a6d11fe..f333c121 100644 --- a/src/serve/grpc/sync.rs +++ b/src/serve/grpc/sync.rs @@ -1,6 +1,6 @@ use futures_core::Stream; use futures_util::StreamExt; -use itertools::Itertools; +use itertools::{Either, Itertools}; use pallas::interop::utxorpc as interop; use pallas::interop::utxorpc::{spec as u5c, Mapper}; use std::pin::Pin; @@ -33,6 +33,15 @@ fn raw_to_anychain( } } +fn raw_to_blockref(raw: &wal::RawBlock) -> u5c::sync::BlockRef { + let RawBlock { slot, hash, .. } = raw; + + u5c::sync::BlockRef { + index: *slot, + hash: hash.to_vec().into(), + } +} + fn roll_to_tip_response( mapper: &Mapper, log: &wal::LogValue, @@ -105,31 +114,23 @@ impl u5c::sync::chain_sync_service_server::ChainSyncService for ChainSyncService let len = msg.max_items as usize + 1; - let mut page = self + let page = self .wal .read_block_page(from.as_ref(), len) - .map_err(|_err| Status::internal("can't query block"))? - .collect_vec(); - - let next_token = if page.len() == len { - let RawBlock { slot, hash, .. } = page.remove(len - 1); + .map_err(|_err| Status::internal("can't query block"))?; - Some(u5c::sync::BlockRef { - index: slot, - hash: hash.to_vec().into(), - }) - } else { - None - }; - - let blocks = page - .into_iter() - .map(|x| raw_to_anychain(&self.mapper, &x)) - .collect(); + let (items, next_token): (_, Vec<_>) = + page.into_iter().enumerate().partition_map(|(idx, x)| { + if idx < len - 1 { + Either::Left(raw_to_anychain(&self.mapper, &x)) + } else { + Either::Right(raw_to_blockref(&x)) + } + }); let response = u5c::sync::DumpHistoryResponse { - block: blocks, - next_token, + block: items, + next_token: next_token.into_iter().next(), }; Ok(Response::new(response))