Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Feb 13, 2024
1 parent 8a38957 commit c007e51
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 110 deletions.
121 changes: 65 additions & 56 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ authors = ["Santiago Carmuega <[email protected]>"]


[dependencies]
pallas = { git = "https://github.com/txpipe/pallas.git", rev = "691b6059e25b08bbe54cf8122d812bf37dd492e1" , features = ["unstable"] }
# pallas = { path = "../pallas/pallas", features = ["unstable"] }
# pallas = { version = "^0.23", features = ["unstable"] }
pallas = { path = "../pallas/pallas", features = ["unstable"] }

gasket = { version = "^0.5", features = ["derive"] }
# gasket = { path = "../../construkts/gasket-rs/gasket", features = ["derive"] }
Expand Down
45 changes: 7 additions & 38 deletions src/serve/grpc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use pallas::{
crypto::hash::Hash,
storage::rolldb::{chain, wal, Error},
};
use std::pin::Pin;
use std::{ops::Deref, pin::Pin};
use tokio_stream::StreamExt;
use tonic::{Code, Request, Response, Status};
use utxorpc::proto::sync::v1::*;
Expand Down Expand Up @@ -133,44 +133,13 @@ impl chain_sync_service_server::ChainSyncService for ChainSyncServiceImpl {
) -> Result<Response<Self::FollowTipStream>, tonic::Status> {
let request = request.into_inner();

let has_intersect = !request.intersect.is_empty();

for intersect in request.intersect {
let slot = intersect.index;
let hash: [u8; 32] = intersect.hash.to_vec().try_into().unwrap();

// start a RollStream where the first entry is the intersect point (if found)
let s = wal::RollStream::start_from_point(self.wal.clone(), (slot, hash.into())).map(
|res| {
res.map(|log| roll_to_tip_response(log))
.map_err(|e| match e {
Error::NotFound => Status::not_found("intersect not found"),
e => Status::internal(format!("kvtable error: {e:?}")),
})
},
);

let mut s = Box::pin(s);

// check the first entry so see if intersect was successful
match s.next().await {
// successfully intersected, return iter after intersect
Some(Ok(_)) => return Ok(Response::new(s)),
// try next intersect if intersect not found
Some(Err(s)) if s.code() == Code::NotFound => continue,
// return error encountered while trying to create iterator
Some(Err(s)) => return Err(Status::internal(format!("FT2 {s:?}"))),
// unreachable?
None => return Err(Status::internal("FT3")),
}
}

// intersects were provided but we couldn't intersect WAL using them
if has_intersect {
return Err(Status::not_found("could not find intersect"));
}
let intersect: Vec<_> = request
.intersect
.iter()
.map(|x| (x.index, bytes_to_hash(&x.hash)))
.collect();

let s = wal::RollStream::start_after(self.wal.clone(), None)
let s = wal::RollStream::intersect(self.wal.clone(), intersect)
.map(|log| Ok(roll_to_tip_response(log)));

Ok(Response::new(Box::pin(s)))
Expand Down
Loading

0 comments on commit c007e51

Please sign in to comment.