Skip to content

Commit

Permalink
feat: implement input resolver for gRPC endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Jan 17, 2024
1 parent 9b6ce5c commit 770343b
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/bin/dolos/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {
config.serve,
wal.clone(),
chain.clone(),
ledger.clone(),
));

dolos::sync::pipeline(
Expand Down
4 changes: 2 additions & 2 deletions src/bin/dolos/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ pub struct Args {}
pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (wal, chain, _) = crate::common::open_data_stores(&config)?;
let (wal, chain, ledger) = crate::common::open_data_stores(&config)?;

dolos::serve::serve(config.serve, wal, chain).await?;
dolos::serve::serve(config.serve, wal, chain, ledger).await?;

Ok(())
}
11 changes: 8 additions & 3 deletions src/serve/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tonic::transport::{Certificate, Server, ServerTlsConfig};
use tracing::info;
use utxorpc::proto::sync::v1::chain_sync_service_server::ChainSyncServiceServer;

use crate::prelude::*;
use crate::{prelude::*, storage::applydb::ApplyDB};

mod sync;

Expand All @@ -17,9 +17,14 @@ pub struct Config {
tls_client_ca_root: Option<PathBuf>,
}

pub async fn serve(config: Config, wal: wal::Store, chain: chain::Store) -> Result<(), Error> {
pub async fn serve(
config: Config,
wal: wal::Store,
chain: chain::Store,
ledger: ApplyDB,
) -> Result<(), Error> {
let addr = config.listen_address.parse().unwrap();
let service = sync::ChainSyncServiceImpl::new(wal, chain);
let service = sync::ChainSyncServiceImpl::new(wal, chain, ledger);
let service = ChainSyncServiceServer::new(service);

let reflection = tonic_reflection::server::Builder::configure()
Expand Down
61 changes: 51 additions & 10 deletions src/serve/grpc/sync.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use futures_core::Stream;
use pallas::{
crypto::hash::Hash,
ledger::traverse::{Era, MultiEraOutput},
storage::rolldb::{chain, wal},
};
use std::pin::Pin;
use tokio_stream::StreamExt;
use tonic::{Request, Response, Status};
use utxorpc::proto::sync::v1::*;

use crate::storage::applydb::ApplyDB;

fn bytes_to_hash(raw: &[u8]) -> Hash<32> {
let array: [u8; 32] = raw.try_into().unwrap();
Hash::<32>::new(array)
Expand All @@ -18,22 +21,57 @@ fn bytes_to_hash(raw: &[u8]) -> Hash<32> {
// AnyChainBlock { chain: Some(block) }
// }

fn raw_to_anychain(raw: &[u8]) -> AnyChainBlock {
let block = pallas::interop::utxorpc::map_block_cbor(raw);
fn fetch_stxi(hash: Hash<32>, idx: u64, ledger: &ApplyDB) -> utxorpc::proto::cardano::v1::TxOutput {
let (era, cbor) = ledger.get_stxi(hash, idx).unwrap().unwrap();
let era = Era::try_from(era).unwrap();
let txo = MultiEraOutput::decode(era, &cbor).unwrap();
pallas::interop::utxorpc::map_tx_output(&txo)
}

fn raw_to_anychain(raw: &[u8], ledger: &ApplyDB) -> AnyChainBlock {
let mut block = pallas::interop::utxorpc::map_block_cbor(raw);

let input_refs: Vec<_> = block
.body
.iter()
.flat_map(|b| b.tx.iter())
.flat_map(|t| t.inputs.iter())
.map(|i| (bytes_to_hash(&i.tx_hash), i.output_index))
.collect();

// TODO: turn this into a multi-get
let mut stxis: Vec<_> = input_refs
.into_iter()
.map(|(hash, idx)| (hash.clone(), idx, fetch_stxi(hash, idx as u64, &ledger)))

Check failure on line 45 in src/serve/grpc/sync.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

using `clone` on type `Hash<32>` which implements the `Copy` trait

Check failure on line 45 in src/serve/grpc/sync.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

this expression creates a reference which is immediately dereferenced by the compiler
.collect();

for tx in block.body.as_mut().unwrap().tx.iter_mut() {
for input in tx.inputs.iter_mut() {
let key = (bytes_to_hash(&input.tx_hash), input.output_index);
let index = stxis
.binary_search_by_key(&key, |&(a, b, _)| (a, b))
.unwrap();

let (_, _, stxi) = stxis.remove(index);
input.as_output = Some(stxi);
}
}

//pallas::interop::utxorpc::map_tx_output(x)

AnyChainBlock {
chain: utxorpc::proto::sync::v1::any_chain_block::Chain::Cardano(block).into(),
}
}

fn roll_to_tip_response(log: wal::Log) -> FollowTipResponse {
fn roll_to_tip_response(log: wal::Log, ledger: &ApplyDB) -> FollowTipResponse {
utxorpc::proto::sync::v1::FollowTipResponse {
action: match log {
wal::Log::Apply(_, _, block) => {
follow_tip_response::Action::Apply(raw_to_anychain(&block)).into()
follow_tip_response::Action::Apply(raw_to_anychain(&block, ledger)).into()
}
wal::Log::Undo(_, _, block) => {
follow_tip_response::Action::Undo(raw_to_anychain(&block)).into()
follow_tip_response::Action::Undo(raw_to_anychain(&block, ledger)).into()
}
// TODO: shouldn't we have a u5c event for origin?
wal::Log::Origin => None,
Expand All @@ -45,11 +83,12 @@ fn roll_to_tip_response(log: wal::Log) -> FollowTipResponse {
pub struct ChainSyncServiceImpl {
wal: wal::Store,
chain: chain::Store,
ledger: ApplyDB,
}

impl ChainSyncServiceImpl {
pub fn new(wal: wal::Store, chain: chain::Store) -> Self {
Self { wal, chain }
pub fn new(wal: wal::Store, chain: chain::Store, ledger: ApplyDB) -> Self {
Self { wal, chain, ledger }
}
}

Expand All @@ -75,7 +114,7 @@ impl chain_sync_service_server::ChainSyncService for ChainSyncServiceImpl {
.map_err(|_err| Status::internal("can't query block"))?
.iter()
.flatten()
.map(|b| raw_to_anychain(b))
.map(|b| raw_to_anychain(b, &self.ledger))
.collect();

let response = FetchBlockResponse { block: out };
Expand Down Expand Up @@ -116,7 +155,7 @@ impl chain_sync_service_server::ChainSyncService for ChainSyncServiceImpl {
.map(|x| x.ok_or(Status::internal("can't query history")))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.map(|raw| raw_to_anychain(&raw))
.map(|raw| raw_to_anychain(&raw, &self.ledger))
.collect();

let response = DumpHistoryResponse {
Expand All @@ -131,8 +170,10 @@ impl chain_sync_service_server::ChainSyncService for ChainSyncServiceImpl {
&self,
_request: Request<FollowTipRequest>,
) -> Result<Response<Self::FollowTipStream>, tonic::Status> {
let ledger = self.ledger.clone();

let s = wal::RollStream::start_after(self.wal.clone(), None)
.map(|log| Ok(roll_to_tip_response(log)));
.map(move |log| Ok(roll_to_tip_response(log, &ledger)));

Ok(Response::new(Box::pin(s)))
}
Expand Down
16 changes: 13 additions & 3 deletions src/serve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use pallas::storage::rolldb::{chain, wal};
use serde::{Deserialize, Serialize};
use tracing::info;

use crate::prelude::*;
use crate::{prelude::*, storage::applydb::ApplyDB};

pub mod grpc;
pub mod ouroboros;
Expand All @@ -18,12 +18,22 @@ pub struct Config {
///
/// Uses specified config to start listening for network connections on either
/// gRPC, Ouroboros or both protocols.
pub async fn serve(config: Config, wal: wal::Store, chain: chain::Store) -> Result<(), Error> {
pub async fn serve(
config: Config,
wal: wal::Store,
chain: chain::Store,
ledger: ApplyDB,
) -> Result<(), Error> {
let mut tasks = vec![];

if let Some(cfg) = config.grpc {
info!("found gRPC config");
tasks.push(tokio::spawn(grpc::serve(cfg, wal.clone(), chain.clone())));
tasks.push(tokio::spawn(grpc::serve(
cfg,
wal.clone(),
chain.clone(),
ledger,
)));
}

if let Some(cfg) = config.ouroboros {
Expand Down

0 comments on commit 770343b

Please sign in to comment.