Skip to content

Commit

Permalink
feat: install payload delay (#134)
Browse files Browse the repository at this point in the history
this adds a hack that wraps the engine_getPayloadV3 and applies a delay,
that delays the request up to 500ms into the slot.

the motivation for this is, so we can give the payload builder a bit
more time to include more txs, because the previous block building
window was observed to be ~250ms
  • Loading branch information
mattsse authored Jan 22, 2025
1 parent 1ab2877 commit 5fc8fe8
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ serde_json = "1"
thiserror = "2"
futures = "0.3"
url = "2.5"
parking_lot = "0.12"

# misc-testing
rstest = "0.18.2"
14 changes: 14 additions & 0 deletions bin/odyssey/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use eyre::Context;
use odyssey_node::{
broadcaster::periodic_broadcaster,
chainspec::OdysseyChainSpecParser,
delayed_resolve::{DelayedResolver, MAX_DELAY_INTO_SLOT},
forwarder::forward_raw_transactions,
node::OdysseyNode,
rpc::{EthApiExt, EthApiOverrideServer},
Expand All @@ -40,6 +41,7 @@ use reth_node_builder::{engine_tree_config::TreeConfig, EngineNodeLauncher, Node
use reth_optimism_cli::Cli;
use reth_optimism_node::{args::RollupArgs, node::OpAddOnsBuilder};
use reth_provider::{providers::BlockchainProvider2, CanonStateSubscriptions};
use std::time::Duration;
use tracing::{info, warn};

#[global_allocator]
Expand Down Expand Up @@ -110,6 +112,18 @@ fn main() {
ctx.modules.merge_configured(walltime.into_rpc())?;
info!(target: "reth::cli", "Walltime configured");

// wrap the getPayloadV3 method in a delay
let engine_module = ctx.auth_module.module_mut().clone();
let delay_into_slot = std::env::var("MAX_PAYLOAD_DELAY")
.ok()
.and_then(|val| val.parse::<u64>().map(Duration::from_millis).ok())
.unwrap_or(MAX_DELAY_INTO_SLOT);

let delayed_payload = DelayedResolver::new(engine_module, delay_into_slot);
delayed_payload.clone().spawn(ctx.provider().canonical_state_stream());
ctx.auth_module.replace_auth_methods(delayed_payload.into_rpc_module())?;
info!(target: "reth::cli", "Configured payload delay");

Ok(())
})
.launch_with_fn(|builder| {
Expand Down
4 changes: 4 additions & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ reth-trie-common.workspace = true
reth-trie-db.workspace = true
reth-network.workspace = true
reth-network-types.workspace = true
reth-chain-state.workspace = true

alloy-consensus.workspace = true
alloy-eips.workspace = true
Expand All @@ -50,6 +51,9 @@ tokio.workspace = true
tracing.workspace = true
eyre.workspace = true
jsonrpsee.workspace = true
futures.workspace = true
parking_lot.workspace = true
serde.workspace = true

[lints]
workspace = true
146 changes: 146 additions & 0 deletions crates/node/src/delayed_resolve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//! Helper that delays resolving the payload
use futures::{Stream, StreamExt};
use jsonrpsee::{
core::traits::ToRpcParams,
types::{error::INVALID_PARAMS_CODE, ErrorObject, Params},
MethodsError, RpcModule,
};
use parking_lot::Mutex;
use reth_chain_state::CanonStateNotification;
use serde::de::Error;
use serde_json::value::RawValue;
use std::{
sync::Arc,
time::{Duration, Instant},
};

/// Delay into the slot
pub const MAX_DELAY_INTO_SLOT: Duration = Duration::from_millis(500);

/// The getpayload fn we want to delay
pub const GET_PAYLOAD_V3: &str = "engine_getPayloadV3";

/// A helper that tracks the block clock timestamp and can delay resolving the payload to give the
/// payload builder more time to build a block.
#[derive(Debug, Clone)]
pub struct DelayedResolver {
inner: Arc<DelayedResolverInner>,
}

impl DelayedResolver {
/// Creates a new instance with the engine module and the duration we should target
pub fn new(engine_module: RpcModule<()>, max_delay_into_slot: Duration) -> Self {
Self {
inner: Arc::new(DelayedResolverInner {
last_block_time: Mutex::new(Instant::now()),
engine_module,
max_delay_into_slot,
}),
}
}

/// Listen for new blocks and track the local timestamp.
pub fn spawn<St>(self, mut st: St)
where
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
{
tokio::task::spawn(async move {
while st.next().await.is_some() {
*self.inner.last_block_time.lock() = Instant::now();
}
});
}

async fn call(&self, params: Params<'static>) -> Result<serde_json::Value, MethodsError> {
let last = *self.inner.last_block_time.lock();
let now = Instant::now();
// how far we're into the slot
let offset = now.duration_since(last);

if offset < self.inner.max_delay_into_slot {
// if we received the request before the max delay exceeded we can delay the request to
// give the payload builder more time to build the payload.
let delay = self.inner.max_delay_into_slot.saturating_sub(offset);
tokio::time::sleep(delay).await;
}

let params = params
.as_str()
.ok_or_else(|| MethodsError::Parse(serde_json::Error::missing_field("payload id")))?;

self.inner.engine_module.call(GET_PAYLOAD_V3, PayloadParam(params.to_string())).await
}

/// Converts this type into a new [`RpcModule`] that delegates the get payload call.
pub fn into_rpc_module(self) -> RpcModule<()> {
let mut module = RpcModule::new(());
module
.register_async_method(GET_PAYLOAD_V3, move |params, _ctx, _| {
let value = self.clone();
async move {
value.call(params).await.map_err(|err| match err {
MethodsError::JsonRpc(err) => err,
err => ErrorObject::owned(
INVALID_PARAMS_CODE,
format!("invalid payload call: {:?}", err),
None::<()>,
),
})
}
})
.unwrap();

module
}
}

#[derive(Debug)]
struct DelayedResolverInner {
/// Tracks the time when the last block was emitted
last_block_time: Mutex<Instant>,
engine_module: RpcModule<()>,
/// By how much we want to delay getPayload into the slot
max_delay_into_slot: Duration,
}

struct PayloadParam(String);

impl ToRpcParams for PayloadParam {
fn to_rpc_params(self) -> Result<Option<Box<RawValue>>, serde_json::Error> {
RawValue::from_string(self.0).map(Some)
}
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_rpc_types::engine::PayloadId;

/// Mocked payload object
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)]
struct Payload {
attributes: serde_json::Value,
header: serde_json::Value,
}

#[tokio::test]
async fn test_delayed_forward() {
use jsonrpsee::{core::RpcResult, RpcModule};

let mut module = RpcModule::new(());
module
.register_method::<RpcResult<Payload>, _>(GET_PAYLOAD_V3, |params, _, _| {
params.one::<PayloadId>()?;
Ok(Payload::default())
})
.unwrap();

let id = PayloadId::default();

let _echo: Payload = module.call(GET_PAYLOAD_V3, [id]).await.unwrap();

let delayer = DelayedResolver::new(module, MAX_DELAY_INTO_SLOT).into_rpc_module();
let _echo: Payload = delayer.call(GET_PAYLOAD_V3, [id]).await.unwrap();
}
}
1 change: 1 addition & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

pub mod broadcaster;
pub mod chainspec;
pub mod delayed_resolve;
pub mod evm;
pub mod forwarder;
pub mod node;
Expand Down
1 change: 1 addition & 0 deletions crates/wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ impl<T> OdysseyWallet<T> {
Self { inner: Arc::new(inner) }
}

#[allow(clippy::missing_const_for_fn)]
fn chain_id(&self) -> ChainId {
self.inner.chain_id
}
Expand Down

0 comments on commit 5fc8fe8

Please sign in to comment.