Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extend eigen rpc, add get_batch_proof, get_block_by_number #19

Merged
merged 4 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ PROVER_ADDR=http://localhost:50061 cargo run -r -- run --database mdbx --log-lev
curl -H "Content-Type: application/json" -X POST --data '{"jsonrpc":"2.0","method":"eigenrpc_customMethod","params":[],"id": 10}' 127.0.0.1:8546

curl -H "Content-Type: application/json" -X POST --data '{"jsonrpc":"2.0","method":"eigenrpc_getBlockByNumber","params":[0],"id": 10}' 127.0.0.1:8546

curl -H "Content-Type: application/json" -X POST --data '{"jsonrpc":"2.0","method":"eigenrpc_getBatchProof","params":[0],"id": 10}' 127.0.0.1:8546
```

You can also use [cast](https://github.com/foundry-rs/foundry/releases).
Expand All @@ -30,4 +32,6 @@ You can also use [cast](https://github.com/foundry-rs/foundry/releases).
cast rpc --rpc-url http://localhost:8546 eigenrpc_customMethod

cast rpc --rpc-url http://localhost:8546 eigenrpc_getBlockByNumber 0

cast rpc --rpc-url http://localhost:8546 eigenrpc_getBatchProof 0
```
23 changes: 21 additions & 2 deletions src/batch_proposer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::db::keys;
use crate::db::Database;
use crate::db::{keys, prefix, Status};
use anyhow::{anyhow, bail, Result};
use ethers_providers::{Http, Middleware, Provider};
use std::sync::Arc;
Expand Down Expand Up @@ -75,7 +75,26 @@ impl L2Watcher {
.map_err(|e| anyhow!("{:?}", e))
.map(|number| {
log::info!("L2Watcher fetched block({})", number);
db.put(keys::KEY_LAST_SEQUENCE_FINALITY_BLOCK_NUMBER.to_vec(), number.as_u64().to_be_bytes().to_vec())

let last_fetched_block = match db.get(keys::KEY_LAST_SEQUENCE_FINALITY_BLOCK_NUMBER) {
None => {
db.put(keys::KEY_LAST_SEQUENCE_FINALITY_BLOCK_NUMBER.to_vec(), number.as_u64().to_be_bytes().to_vec());
0
}
Some(block_number_bytes) => {
u64::from_be_bytes(block_number_bytes.try_into().unwrap())
}
};

db.put(keys::KEY_LAST_SEQUENCE_FINALITY_BLOCK_NUMBER.to_vec(), number.as_u64().to_be_bytes().to_vec());
for number in last_fetched_block+1..number.as_u64()+1 {
// update block status to sequenced
let status_key = format!("{}{}", std::str::from_utf8(prefix::PREFIX_BLOCK_STATUS).unwrap(), number);
let status = Status::Sequenced;
let encoded_status = serde_json::to_vec(&status).unwrap();
db.put(status_key.as_bytes().to_vec(), encoded_status);

}
})
},
_ = stop_rx.recv() => {
Expand Down
13 changes: 11 additions & 2 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt;
use std::sync::Arc;

use anyhow::Result;
use anyhow::{anyhow, Result};
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -211,15 +212,21 @@ impl RunCmd {
reth_stop_tx.send(()).await.unwrap();
});

// initialize the database
let rollup_db =
lfs::open_db(db_config).map_err(|e| anyhow!("Failed to open db: {:?}", e))?;
let arc_rollup_db = Arc::new(rollup_db);

let (reth_started_signal_tx, reth_started_signal_rx) = mpsc::channel::<()>(1);
let a = aggregator_addr.clone();
let operator_rollup_db = arc_rollup_db.clone();
tokio::spawn(async move {
// Run the operator
Operator::run(
&GLOBAL_ENV.l2addr,
&GLOBAL_ENV.prover_addr,
settlement_spec.clone(),
db_config.clone(),
operator_rollup_db,
a.as_str(),
stop_rx,
reth_started_signal_rx,
Expand All @@ -231,11 +238,13 @@ impl RunCmd {
let rpc_args = self.reth_cmd.rpc.clone();
let dev_args = self.reth_cmd.dev;
let data_dir = self.reth_cmd.datadir.clone();
let reth_rollup_db = arc_rollup_db.clone();

// Launch the custom reth
custom_reth::launch_custom_node(
reth_stop_rx,
reth_started_signal_tx,
reth_rollup_db,
chain_spec,
rpc_args,
data_dir,
Expand Down
74 changes: 69 additions & 5 deletions src/custom_reth/eigen.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use jsonrpsee::core::Serialize;
use std::sync::Arc;
// Reth block related imports
use reth_primitives::{Block, B256};
use reth_provider::BlockReaderIdExt;

// Rpc related imports
use crate::db::{prefix, Database as RollupDatabase, ProofResult, Status};
use jsonrpsee::proc_macros::rpc;
use reth_rpc::eth::error::EthResult;
use reth_interfaces::RethError;
use reth_rpc::eth::error::{EthApiError, EthResult};
use serde::Deserialize;

/// trait interface for a custom rpc namespace: `EigenRpc`
///
Expand All @@ -15,14 +20,17 @@ pub trait EigenRpcExtApi {
#[method(name = "customMethod")]
fn custom_methhod(&self) -> EthResult<Option<Block>>;
#[method(name = "getBlockByNumber")]
fn get_block_by_number(&self, block_no: u64) -> EthResult<Option<Block>>;
fn get_block_by_number(&self, block_no: u64) -> EthResult<Option<BlockExt>>;
#[method(name = "traceTransaction")]
fn trace_transaction(&self, hash: B256) -> EthResult<Option<()>>;
#[method(name = "getBatchProof")]
fn get_batch_proof(&self, block_no: u64) -> EthResult<Option<BatchProofInfo>>;
}

/// The type that implements `EigenRpc` rpc namespace trait
pub struct EigenRpcExt<Provider> {
pub provider: Provider,
pub rollup_db: Arc<Box<dyn RollupDatabase>>,
}

impl<Provider> EigenRpcExtApiServer for EigenRpcExt<Provider>
Expand All @@ -38,10 +46,24 @@ where
Ok(block)
}

// TODO: override the eth_get_block_by_hash to check if the block has been confirmed by L1
fn get_block_by_number(&self, block_no: u64) -> EthResult<Option<Block>> {
fn get_block_by_number(&self, block_no: u64) -> EthResult<Option<BlockExt>> {
let block = self.provider.block_by_number(block_no)?;
Ok(block)
if let Some(block) = block {
let status_key = format!(
"{}{}",
std::str::from_utf8(prefix::PREFIX_BLOCK_STATUS)
.map_err(|e| EthApiError::Internal(RethError::Custom(e.to_string())))?,
block_no
);
let status = match self.rollup_db.get(status_key.as_bytes()) {
Some(status_bytes) => serde_json::from_slice(&status_bytes)
.map_err(|e| EthApiError::Internal(RethError::Custom(e.to_string())))?,
None => Status::Pending,
};
Ok(Some(BlockExt { block, status }))
} else {
Ok(None)
}
}

// TODO return the pre and post data for zkvm
Expand All @@ -50,4 +72,46 @@ where
//let traces = self.provider.trace
Ok(Some(()))
}

fn get_batch_proof(&self, block_no: u64) -> EthResult<Option<BatchProofInfo>> {
let next_proof_key = format!(
"{}{}",
std::str::from_utf8(prefix::PREFIX_BATCH_PROOF)
.map_err(|e| EthApiError::Internal(RethError::Custom(e.to_string())))?,
block_no
);
if let Some(proof_bytes) = self.rollup_db.get(next_proof_key.as_bytes()) {
let proof: ProofResult = serde_json::from_slice(&proof_bytes)
.map_err(|e| EthApiError::Internal(RethError::Custom(e.to_string())))?;
let proof_info = BatchProofInfo {
block_number: proof.block_number,
proof: proof.proof,
public_input: proof.public_input,
pre_state_root: format!("0x{}", hex::encode(proof.pre_state_root)),
post_state_root: format!("0x{}", hex::encode(proof.post_state_root)),
};
Ok(Some(proof_info))
} else {
Ok(None)
}
}
}

/// A custom struct that extends the standard `reth::Block` struct with a `status` field.
/// This additional field represents the status of the block within the rollup process.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct BlockExt {
pub block: Block,
pub status: Status,
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct BatchProofInfo {
pub block_number: u64,
pub proof: String,
pub public_input: String,
// 0x + hex([u8; 32])
pub pre_state_root: String,
// 0x + hex([u8; 32])
pub post_state_root: String,
}
3 changes: 3 additions & 0 deletions src/custom_reth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use thiserror::Error;

use crate::custom_reth::eigen::EigenRpcExt;
use crate::custom_reth::eigen::EigenRpcExtApiServer;
use crate::db::Database as RollupDatabase;
use anyhow::{anyhow, Result};
use jsonrpsee::tracing;
use reth_blockchain_tree::{
Expand Down Expand Up @@ -341,6 +342,7 @@ where
pub async fn launch_custom_node(
mut stop_rx: tokio::sync::mpsc::Receiver<()>,
reth_started_signal_channel: tokio::sync::mpsc::Sender<()>,
rollup_db: Arc<Box<dyn RollupDatabase>>,
spec: Arc<ChainSpec>,
rpc_args: RpcServerArgs,
data_dir: MaybePlatformPath<DataDirPath>,
Expand Down Expand Up @@ -395,6 +397,7 @@ pub async fn launch_custom_node(
// create EigenRpcExt Instance
let custom_rpc = EigenRpcExt {
provider: provider.clone(),
rollup_db: rollup_db.clone(),
};

// add EigenRpcExt to RPC modules
Expand Down
29 changes: 29 additions & 0 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// TODO: Fix me
#![allow(dead_code)]

use jsonrpsee::core::Serialize;
use serde::Deserialize;

mod data_availability_db;
pub(crate) mod lfs;

Expand Down Expand Up @@ -36,4 +39,30 @@ pub(crate) mod keys {

pub(crate) mod prefix {
pub const PREFIX_BATCH_PROOF: &[u8] = b"BATCH_PROOF_";
pub const PREFIX_BLOCK_STATUS: &[u8] = b"BLOCK_STATUS_";
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum Status {
/// the tx is pending
Pending,
/// confirmed by sequencer
Sequenced,
/// packing the block into a batch
Batching,
/// confirmed by DA
/// TODO: we skip the DA for now, should support it in the future
Submitted,
/// confirmed by settlement
Finalized,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProofResult {
// TODO: refactor to batch
pub block_number: u64,
pub proof: String,
pub public_input: String,
pub pre_state_root: [u8; 32],
pub post_state_root: [u8; 32],
}
14 changes: 5 additions & 9 deletions src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anyhow::{anyhow, Result};
// use ethers_core::types::{Bytes, H160, U256};
use ethers_providers::{Http, Provider};
// use serde::Serialize;
use crate::db::lfs;
use crate::db::Database;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
Expand All @@ -22,7 +22,7 @@ impl Operator {
l2addr: &str,
prover_addr: &str,
settlement_spec: NetworkSpec,
db_config: lfs::DBConfig,
rollup_db: Arc<Box<dyn Database>>,
aggregator_addr: &str,
mut stop_rx: Receiver<()>,
mut reth_started_signal_rx: Receiver<()>,
Expand All @@ -31,10 +31,6 @@ impl Operator {
// initialize the prover
let prover = ProverChannel::new(prover_addr, aggregator_addr);

// initialize the database
let db = lfs::open_db(db_config).map_err(|e| anyhow!("Failed to open db: {:?}", e))?;
let arc_db = Arc::new(db);

// initialize the settlement layer
let settlement_provider = init_settlement_provider(settlement_spec)
.map_err(|e| anyhow!("Failed to init settlement: {:?}", e))?;
Expand All @@ -51,14 +47,14 @@ impl Operator {
log::info!("Initializing reth Provider with address: {}", l2addr);
let l2provider = Provider::<Http>::try_from(l2addr)
.map_err(|e| anyhow!("Failed to init l2 provider: {:?}", e))?;
let mut l2watcher = L2Watcher::new(arc_db.clone(), l2provider);
let mut l2watcher = L2Watcher::new(rollup_db.clone(), l2provider);

// start all components of the eigen-zeth full node
// start the L2Watcher
l2watcher.start().await.unwrap();

// start the verify worker
let arc_db_for_verify_worker = arc_db.clone();
let arc_db_for_verify_worker = rollup_db.clone();
let (verify_stop_tx, verify_stop_rx) = mpsc::channel::<()>(1);
tokio::spawn(async move {
Settler::verify_worker(
Expand All @@ -70,7 +66,7 @@ impl Operator {
});

// start the proof worker
let arc_db_for_proof_worker = arc_db.clone();
let arc_db_for_proof_worker = rollup_db.clone();
let (proof_stop_tx, proof_stop_rx) = mpsc::channel::<()>(1);
tokio::spawn(async move {
Settler::proof_worker(arc_db_for_proof_worker, prover, proof_stop_rx).await
Expand Down
19 changes: 7 additions & 12 deletions src/prover/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! 3) If the task is finished, update the status into proof database, hence the extended RPC module will fetch this and return it to SDK.

use crate::config::env::GLOBAL_ENV;
use crate::db::ProofResult;
use crate::prover::provider::prover_service::prover_request::RequestType;
use crate::prover::provider::prover_service::prover_response::ResponseType;
use crate::prover::provider::prover_service::prover_service_client::ProverServiceClient;
Expand All @@ -14,11 +15,11 @@ use crate::prover::provider::prover_service::{
ProverRequest,
};
use anyhow::{anyhow, bail, Result};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time;
use tokio_stream::wrappers::ReceiverStream;

pub mod prover_service {
Expand Down Expand Up @@ -63,16 +64,6 @@ pub enum ExecuteResult {
Failed(ErrMsg),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProofResult {
// TODO: refactor to batch
pub block_number: u64,
pub proof: String,
pub public_input: String,
pub pre_state_root: [u8; 32],
pub post_state_root: [u8; 32],
}

/// ProveStep ...
#[derive(Debug)]
enum ProveStep {
Expand Down Expand Up @@ -155,7 +146,11 @@ impl ProverChannel {
Err(e) => {
// stop with the error
// TODO: relaunch the endpoint
log::error!("ProverEndpoint error: {:?}", e);
log::error!(
"ProverEndpoint stopped with error, try again later, err: {:?}",
e
);
time::sleep(Duration::from_secs(10)).await;
}
}
}
Expand Down
Loading
Loading