Skip to content

Commit

Permalink
Use Buffered transport everywhere (#696)
Browse files Browse the repository at this point in the history
Partially implement #688.

This PR changes the default `web3::Transport` implementation that is used in the services to be the `transport::Buffered` type. This transport implementation receives JSON RPC requests over a channel and automatically batches them based on runtime configuration parameters. The main motivation for using this are two-fold:
- First, this makes coding of "batched" requests easier (you basically just need to `futures::join` concurrent RPC requests)
- Second, it allows for runtime configuration of maximum batch size (where previously it used a hard-coded `const` value of `100`) as well as number of concurrent batches (which we didn't have previously).

Additionally, I removed manual uses of the `transport::Buffered` in the code since the underlying transport is already buffered, and double-buffering RPC requests doesn't really serve a purpose.

Note that we currently don't specify any configuration parameters to the buffered transport and just use the default of 100 (so it mostly works like it used to).

In follow up PRs I will:
1. Remove the manual batching code of various components (which should simplify a lot of code!)
2. Introduce CLI configuration parameters for configuring the buffered transport

### Test Plan

CI, ran the services locally without issue and saw automatic batching happening:
```
2022-11-01T17:11:21.817Z DEBUG auction{id=2973160 run=351}: shared::transport::http: [base][id:310] sending request: "[{\"jsonrpc\":\"2.0\",\"method\":\"eth_call\",\"params\":[{\"data\":\"0xdd62ed3e0000000000000000000000009008d19f58aabd9ed0d60971565aa8510560ab410000000000000000000000007a250d5630b4cf539739df2c5dacb4c659f2488d\",\"to\":\"0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2\"},\"latest\"],\"id\":307},{\"jsonrpc\":\"2.0\",\"method\":\"eth_call\",\"params\":[{\"data\":\"0xdd62ed3e0000000000000000000000009008d19f58aabd9ed0d60971565aa8510560ab410000000000000000000000007a250d5630b4cf539739df2c5dacb4c659f2488d\",\"to\":\"0xdef1ca1fb7fbcdc777520aa7f396b4e015f497ab\"},\"latest\"],\"id\":308},{\"jsonrpc\":\"2.0\",\"method\":\"eth_call\",\"params\":[{\"data\":\"0xdd62ed3e0000000000000000000000009008d19f58aabd9ed0d60971565aa8510560ab410000000000000000000000007a250d5630b4cf539739df2c5dacb4c659f2488d\",\"to\":\"0xae7ab96520de3a18e5e111b5eaab095312d7fe84\"},\"latest\"],\"id\":309}]"
```
  • Loading branch information
Nicholas Rodrigues Lordello authored Nov 5, 2022
1 parent 3c36167 commit 6ad6170
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 50 deletions.
16 changes: 0 additions & 16 deletions crates/shared/src/fee_subsidy/cow_token.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use super::{FeeSubsidizing, Subsidy, SubsidyParameters};
use crate::transport::buffered::{Buffered, Configuration};
use anyhow::{Context, Result};
use cached::{Cached, TimedSizedCache};
use contracts::{CowProtocolToken, CowProtocolVirtualToken};
use ethcontract::Web3;
use primitive_types::{H160, U256};
use std::{collections::BTreeMap, sync::Mutex, time::Duration};

Expand Down Expand Up @@ -72,20 +70,6 @@ impl CowSubsidy {
false,
);

// Create buffered transport to do the two calls we make per user in one batch.
let transport = token.raw_instance().web3().transport().clone();
let buffered = Buffered::with_config(
transport,
Configuration {
max_concurrent_requests: None,
max_batch_len: 2,
batch_delay: Duration::from_secs(1),
},
);
let web3 = Web3::new(buffered);
let token = CowProtocolToken::at(&web3, token.address());
let vtoken = CowProtocolVirtualToken::at(&web3, vtoken.address());

Self {
token,
vtoken,
Expand Down
9 changes: 6 additions & 3 deletions crates/shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ pub mod transport;
pub mod univ3_router_api;
pub mod zeroex_api;

use self::{http_client::HttpClientFactory, transport::http::HttpTransport};
use self::{
http_client::HttpClientFactory,
transport::{buffered::Buffered, http::HttpTransport},
};
use ethcontract::{
batch::CallBatch,
dyns::{DynTransport, DynWeb3},
Expand All @@ -64,11 +67,11 @@ pub type Web3CallBatch = CallBatch<Web3Transport>;

/// Create a Web3 instance.
pub fn web3(http_factory: &HttpClientFactory, url: &Url, name: impl ToString) -> Web3 {
let transport = Web3Transport::new(HttpTransport::new(
let transport = Web3Transport::new(Buffered::new(HttpTransport::new(
http_factory.configure(|builder| builder.cookie_store(true)),
url.clone(),
name.to_string(),
));
)));
Web3::new(transport)
}

Expand Down
7 changes: 1 addition & 6 deletions crates/solver/src/settlement_post_processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub trait SettlementSimulating: Send + Sync {
}

pub struct SettlementSimulator {
web3: Web3,
settlement_contract: GPv2Settlement,
gas_price: GasPrice1559,
solver_account: Account,
Expand All @@ -33,7 +32,6 @@ impl SettlementSimulating for SettlementSimulator {
let result = simulate_and_estimate_gas_at_current_block(
std::iter::once((self.solver_account.clone(), settlement, None)),
&self.settlement_contract,
&self.web3,
self.gas_price,
)
.await;
Expand All @@ -42,7 +40,6 @@ impl SettlementSimulating for SettlementSimulator {
}

pub struct PostProcessingPipeline {
web3: Web3,
settlement_contract: GPv2Settlement,
unwrap_factor: f64,
weth: WETH9,
Expand All @@ -59,10 +56,9 @@ impl PostProcessingPipeline {
market_makable_token_list: AutoUpdatingTokenList,
) -> Self {
let weth = WETH9::at(&web3, native_token);
let buffer_retriever = BufferRetriever::new(web3.clone(), settlement_contract.address());
let buffer_retriever = BufferRetriever::new(web3, settlement_contract.address());

Self {
web3,
settlement_contract,
unwrap_factor,
weth,
Expand All @@ -78,7 +74,6 @@ impl PostProcessingPipeline {
gas_price: GasPrice1559,
) -> Settlement {
let simulator = SettlementSimulator {
web3: self.web3.clone(),
settlement_contract: self.settlement_contract.clone(),
gas_price,
solver_account,
Expand Down
1 change: 0 additions & 1 deletion crates/solver/src/settlement_rater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl SettlementRating for SettlementRater {
)
}),
&self.settlement_contract,
&self.web3,
gas_price,
)
.await
Expand Down
38 changes: 15 additions & 23 deletions crates/solver/src/settlement_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,19 @@ const MAX_BASE_GAS_FEE_INCREASE: f64 = 1.125;
pub async fn simulate_and_estimate_gas_at_current_block(
settlements: impl Iterator<Item = (Account, Settlement, Option<AccessList>)>,
contract: &GPv2Settlement,
web3: &Web3,
gas_price: GasPrice1559,
) -> Result<Vec<Result<U256, ExecutionError>>> {
// Collect into Vec to not rely on Itertools::chunk which would make this future !Send.
let settlements: Vec<_> = settlements.collect();

// Needed because sending an empty batch request gets an empty response which doesn't
// deserialize correctly.
if settlements.is_empty() {
return Ok(Vec::new());
}

let web3 = web3::Web3::new(shared::transport::buffered::Buffered::new(
web3.transport().clone(),
));
let contract_with_buffered_transport = GPv2Settlement::at(&web3, contract.address());
// Force settlement simulations to be done in smaller batches. They can be
// quite large and exert significant node pressure.
let mut results = Vec::new();
for chunk in settlements.chunks(SIMULATE_BATCH_SIZE) {
let calls = chunk
.iter()
.map(|(account, settlement, access_list)| {
let tx = settle_method(
gas_price,
&contract_with_buffered_transport,
settlement.clone(),
account.clone(),
)
.tx;
let tx = settle_method(gas_price, contract, settlement.clone(), account.clone()).tx;
let tx = match access_list {
Some(access_list) => tx.access_list(access_list.clone()),
None => tx,
Expand All @@ -77,6 +62,7 @@ pub async fn simulate_and_estimate_gas_at_current_block(
let chuck_results = futures::future::join_all(calls).await;
results.extend(chuck_results);
}

Ok(results)
}

Expand Down Expand Up @@ -276,7 +262,10 @@ mod tests {
#[ignore]
async fn mainnet() {
// Create some bogus settlements to see that the simulation returns an error.
shared::tracing::initialize("solver=debug,shared=debug", tracing::Level::ERROR.into());
shared::tracing::initialize(
"info,solver=debug,shared=debug,shared::transport=trace",
tracing::Level::ERROR.into(),
);
let transport = create_env_test_transport();
let web3 = Web3::new(transport);
let block = web3.eth().block_number().await.unwrap().as_u64();
Expand Down Expand Up @@ -307,7 +296,6 @@ mod tests {
let result = simulate_and_estimate_gas_at_current_block(
settlements.iter().cloned(),
&contract,
&web3,
Default::default(),
)
.await
Expand All @@ -317,7 +305,6 @@ mod tests {
let result = simulate_and_estimate_gas_at_current_block(
std::iter::empty(),
&contract,
&web3,
Default::default(),
)
.await
Expand Down Expand Up @@ -383,6 +370,7 @@ mod tests {
"sellTokenBalance": "erc20",
"buyTokenBalance": "erc20",
"isLiquidityOrder": false,
"class": "ordinary",
});
let order0: Order = serde_json::from_value(value).unwrap();
let value = json!(
Expand Down Expand Up @@ -414,6 +402,7 @@ mod tests {
"sellTokenBalance": "erc20",
"buyTokenBalance": "erc20",
"isLiquidityOrder": true,
"class": "liquidity",
});
let order1: Order = serde_json::from_value(value).unwrap();
let value = json!(
Expand Down Expand Up @@ -445,6 +434,7 @@ mod tests {
"sellTokenBalance": "erc20",
"buyTokenBalance": "erc20",
"isLiquidityOrder": false,
"class": "ordinary",
});
let order2: Order = serde_json::from_value(value).unwrap();

Expand Down Expand Up @@ -686,7 +676,10 @@ mod tests {
#[tokio::test]
#[ignore]
async fn mainnet_chunked() {
shared::tracing::initialize("solver=debug,shared=debug", tracing::Level::ERROR.into());
shared::tracing::initialize(
"info,solver=debug,shared=debug,shared::transport=trace",
tracing::Level::ERROR.into(),
);
let transport = create_env_test_transport();
let web3 = Web3::new(transport);
let contract = GPv2Settlement::deployed(&web3).await.unwrap();
Expand All @@ -700,7 +693,6 @@ mod tests {
let result = simulate_and_estimate_gas_at_current_block(
settlements.iter().cloned(),
&contract,
&web3,
GasPrice1559::default(),
)
.await
Expand Down
1 change: 0 additions & 1 deletion crates/solver/src/settlement_submission/submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,6 @@ mod tests {
crate::settlement_simulation::simulate_and_estimate_gas_at_current_block(
std::iter::once((account.clone(), settlement.clone(), None)),
&contract,
&web3,
Default::default(),
)
.await
Expand Down

0 comments on commit 6ad6170

Please sign in to comment.