Skip to content

Commit

Permalink
Merge pull request #2128 from eqlabs/chris/p2p-test-opts
Browse files Browse the repository at this point in the history
Add p2p.experimental cli options
  • Loading branch information
CHr15F0x authored Jul 19, 2024
2 parents 88f6ddf + 9036def commit 333ac6a
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 30 deletions.
2 changes: 1 addition & 1 deletion crates/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod core_addr {
Decoder::Hex.decode(b"4737c0c1B4D5b1A687B42610DdabEE781152359c");
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct EthereumStateUpdate {
pub state_root: StateCommitment,
pub block_number: BlockNumber,
Expand Down
36 changes: 25 additions & 11 deletions crates/p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,21 @@ impl Behaviour {
kademlia_config.set_provider_publication_interval(Some(PROVIDER_PUBLICATION_INTERVAL));
// This makes sure that the DHT we're implementing is incompatible with the
// "default" IPFS DHT from libp2p.
kademlia_config.set_protocol_names(vec![StreamProtocol::try_from_owned(
kademlia_protocol_name(chain_id),
)
.unwrap()]);
if cfg.kad_names.is_empty() {
kademlia_config.set_protocol_names(vec![StreamProtocol::try_from_owned(
kademlia_protocol_name(chain_id),
)
.unwrap()]);
} else {
kademlia_config.set_protocol_names(
cfg.kad_names
.iter()
.cloned()
.map(StreamProtocol::try_from_owned)
.collect::<Result<Vec<_>, _>>()
.expect("valid protocol names"),
);
}

let peer_id = identity.public().to_peer_id();

Expand All @@ -473,11 +484,14 @@ impl Behaviour {
)
.expect("valid gossipsub params");

let headers_sync = request_response_behavior::<codec::Headers>();
let classes_sync = request_response_behavior::<codec::Classes>();
let state_diffs_sync = request_response_behavior::<codec::StateDiffs>();
let transactions_sync = request_response_behavior::<codec::Transactions>();
let events_sync = request_response_behavior::<codec::Events>();
let p2p_stream_cfg = p2p_stream::Config::default()
.with_request_timeout(cfg.stream_timeout)
.with_max_concurrent_streams(cfg.max_concurrent_streams);
let headers_sync = request_response_behavior::<codec::Headers>(p2p_stream_cfg);
let classes_sync = request_response_behavior::<codec::Classes>(p2p_stream_cfg);
let state_diffs_sync = request_response_behavior::<codec::StateDiffs>(p2p_stream_cfg);
let transactions_sync = request_response_behavior::<codec::Transactions>(p2p_stream_cfg);
let events_sync = request_response_behavior::<codec::Events>(p2p_stream_cfg);

let (relay_transport, relay) = relay::client::new(peer_id);

Expand Down Expand Up @@ -842,12 +856,12 @@ impl Behaviour {
}
}

fn request_response_behavior<C>() -> p2p_stream::Behaviour<C>
fn request_response_behavior<C>(cfg: p2p_stream::Config) -> p2p_stream::Behaviour<C>
where
C: Default + p2p_stream::Codec + Clone + Send,
C::Protocol: Default,
{
p2p_stream::Behaviour::new(std::iter::once(C::Protocol::default()), Default::default())
p2p_stream::Behaviour::new(std::iter::once(C::Protocol::default()), cfg)
}

#[allow(dead_code)]
Expand Down
6 changes: 6 additions & 0 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ pub struct Config {
pub ip_whitelist: Vec<IpNet>,
pub bootstrap: BootstrapConfig,
pub inbound_connections_rate_limit: RateLimit,
/// Alternative protocol names for Kademlia
pub kad_names: Vec<String>,
/// Request timeout for p2p-stream
pub stream_timeout: Duration,
/// Applies to each of the p2p-stream protocols separately
pub max_concurrent_streams: usize,
}

#[derive(Debug, Clone)]
Expand Down
30 changes: 30 additions & 0 deletions crates/p2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ impl Default for TestPeer {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
},
Keypair::generate_ed25519(),
)
Expand Down Expand Up @@ -265,6 +268,9 @@ async fn periodic_bootstrap() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};
let mut boot = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -414,6 +420,9 @@ async fn reconnect_too_quickly() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};

let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -516,6 +525,9 @@ async fn duplicate_connection() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};
let keypair = Keypair::generate_ed25519();
let mut peer1 = TestPeer::new(cfg.clone(), keypair.clone());
Expand Down Expand Up @@ -602,6 +614,9 @@ async fn outbound_peer_eviction() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};

let mut peer = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -731,6 +746,9 @@ async fn inbound_peer_eviction() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};

let mut peer = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -816,6 +834,9 @@ async fn evicted_peer_reconnection() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};

let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -907,6 +928,9 @@ async fn ip_whitelist() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};
let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
let peer2 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -940,6 +964,9 @@ async fn ip_whitelist() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};
let mut peer3 = TestPeer::new(cfg, Keypair::generate_ed25519());

Expand Down Expand Up @@ -974,6 +1001,9 @@ async fn rate_limit() {
max: 2,
interval: RATE_LIMIT_INTERVAL,
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};

let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down
2 changes: 1 addition & 1 deletion crates/p2p_stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl fmt::Display for OutboundRequestId {
}

/// The configuration for a `Behaviour` protocol.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct Config {
request_timeout: Duration,
max_concurrent_streams: usize,
Expand Down
130 changes: 126 additions & 4 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ struct P2PCli {
Example:
'/ip4/127.0.0.1/9001/p2p/12D3KooWBEkKyufuqCMoZLRhVzq4xdHxVWhhYeBpjw92GSyZ6xaN,/ip4/127.0.0.1/9002/p2p/12D3KooWBEkKyufuqCMoZLRhVzq4xdHxVWhhYeBpjw92GSyZ6xaN'"#,
value_name = "MULTIADDRESS_LIST",
value_delimiter = ',',
env = "PATHFINDER_P2P_BOOTSTRAP_ADDRESSES"
)]
bootstrap_addresses: Vec<String>,
Expand All @@ -392,6 +393,7 @@ Example:
Example:
'/ip4/127.0.0.1/9003/p2p/12D3KooWBEkKyufuqCMoZLRhVzq4xdHxVWhhYeBpjw92GSyZ6xaP,/ip4/127.0.0.1/9004/p2p/12D3KooWBEkKyufuqCMoZLRhVzq4xdHxVWhhYeBpjw92GSyZ6xaR'"#,
value_name = "MULTIADDRESS_LIST",
value_delimiter = ',',
env = "PATHFINDER_P2P_PREDEFINED_PEERS"
)]
predefined_peers: Vec<String>,
Expand Down Expand Up @@ -445,6 +447,65 @@ Example:
env = "IP_WHITELIST"
)]
ip_whitelist: Vec<IpNet>,

#[arg(
long = "p2p.experimental.kad-names",
long_help = "Comma separated list of custom Kademlia protocol names.",
value_name = "LIST",
default_value = "/starknet/kad/<STARKNET_CHAIN_ID>/1.0.0",
value_delimiter = ',',
env = "PATHFINDER_P2P_EXPERIMENTAL_KAD_NAMES"
)]
kad_names: Vec<String>,

#[arg(
long = "p2p.experimental.l1-checkpoint-override",
long_help = "Override L1 sync checkpoint retrieved from the Ethereum API. This option \
points to a json encoded file containing an L1 checkpoint from which \
pathfinder will sync backwards till genesis before switching to syncing \
forward and following the head of the chain. Example contents: { \
\"block_hash\": \"0x1\", \"block_number\": 2, \"state_root\": \"0x3\" }",
value_name = "JSON_FILE",
env = "PATHFINDER_P2P_EXPERIMENTAL_L1_CHECKPOINT_OVERRIDE"
)]
l1_checkpoint_override: Option<String>,

#[arg(
long = "p2p.experimental.stream-timeout",
long_help = "Timeout of the request/response-stream protocol.",
value_name = "SECONDS",
default_value = "60",
env = "PATHFINDER_P2P_EXPERIMENTAL_STREAM_TIMEOUT"
)]
stream_timeout: u32,

#[arg(
long = "p2p.experimental.max-concurrent-streams",
long_help = "Maximum allowed number of concurrent streams per each \
request/response-stream protocol.",
value_name = "LIMIT",
default_value = "100",
env = "PATHFINDER_P2P_EXPERIMENTAL_MAX_CONCURRENT_STREAMS"
)]
max_concurrent_streams: usize,

#[arg(
long = "p2p.experimental.direct-connection-timeout",
long_help = "A direct (not relayed) peer can only connect once in this period.",
value_name = "SECONDS",
default_value = "30",
env = "PATHFINDER_P2P_EXPERIMENTAL_DIRECT_CONNECTION_TIMEOUT"
)]
direct_connection_timeout: u32,

#[arg(
long = "p2p.experimental.eviction-timeout",
long_help = "How long to prevent evicted peers from reconnecting.",
value_name = "SECONDS",
default_value = "900",
env = "PATHFINDER_P2P_EXPERIMENTAL_EVICTION_TIMEOUT"
)]
eviction_timeout: u32,
}

#[cfg(feature = "p2p")]
Expand Down Expand Up @@ -615,6 +676,12 @@ pub struct P2PConfig {
pub max_outbound_connections: usize,
pub ip_whitelist: Vec<IpNet>,
pub low_watermark: usize,
pub kad_names: Vec<String>,
pub l1_checkpoint_override: Option<pathfinder_ethereum::EthereumStateUpdate>,
pub stream_timeout: Duration,
pub max_concurrent_streams: usize,
pub direct_connection_timeout: Duration,
pub eviction_timeout: Duration,
}

#[cfg(not(feature = "p2p"))]
Expand Down Expand Up @@ -689,14 +756,14 @@ impl P2PConfig {
use clap::error::ErrorKind;
use p2p::libp2p::multiaddr::Result;

let parse_multiaddr_vec = |multiaddrs: Vec<String>| -> Vec<Multiaddr> {
let parse_multiaddr_vec = |field: &str, multiaddrs: Vec<String>| -> Vec<Multiaddr> {
multiaddrs
.into_iter()
.map(|addr| Multiaddr::from_str(&addr))
.collect::<Result<Vec<_>>>()
.unwrap_or_else(|error| {
Cli::command()
.error(ErrorKind::ValueValidation, error)
.error(ErrorKind::ValueValidation, format!("{field}: {error}"))
.exit()
})
};
Expand Down Expand Up @@ -728,6 +795,17 @@ impl P2PConfig {
.exit()
}

if args.kad_names.is_empty() || args.kad_names.iter().any(|x| x.is_empty()) {
Cli::command()
.error(
ErrorKind::ValueValidation,
"p2p.experimental.kad-names must contain at least one non-empty string",
)
.exit()
}

let l1_checkpoint_override = parse_l1_checkpoint_or_exit(args.l1_checkpoint_override);

Self {
max_inbound_direct_connections: args.max_inbound_direct_connections.try_into().unwrap(),
max_inbound_relayed_connections: args
Expand All @@ -738,14 +816,58 @@ impl P2PConfig {
proxy: args.proxy,
identity_config_file: args.identity_config_file,
listen_on: args.listen_on,
bootstrap_addresses: parse_multiaddr_vec(args.bootstrap_addresses),
predefined_peers: parse_multiaddr_vec(args.predefined_peers),
bootstrap_addresses: parse_multiaddr_vec(
"p2p.bootstrap-addresses",
args.bootstrap_addresses,
),
predefined_peers: parse_multiaddr_vec("p2p.predefined-peers", args.predefined_peers),
ip_whitelist: args.ip_whitelist,
low_watermark: 0,
kad_names: args.kad_names,
l1_checkpoint_override,
stream_timeout: Duration::from_secs(args.stream_timeout.into()),
max_concurrent_streams: args.max_concurrent_streams,
direct_connection_timeout: Duration::from_secs(args.direct_connection_timeout.into()),
eviction_timeout: Duration::from_secs(args.eviction_timeout.into()),
}
}
}

#[cfg(feature = "p2p")]
fn parse_l1_checkpoint_or_exit(
l1_checkpoint_override: Option<String>,
) -> Option<pathfinder_ethereum::EthereumStateUpdate> {
use clap::error::ErrorKind;
use pathfinder_common::{BlockHash, BlockNumber, StateCommitment};

#[derive(serde::Deserialize)]
struct Dto {
state_root: StateCommitment,
block_number: BlockNumber,
block_hash: BlockHash,
}

fn exit_now(e: impl std::fmt::Display) {
Cli::command()
.error(
ErrorKind::ValueValidation,
format!("p2p.experimental.l1-anchor: {e}"),
)
.exit()
}

l1_checkpoint_override.map(|f| {
// SAFETY: unwraps are safe because we exit the process on error
let f = std::fs::File::open(f).map_err(exit_now).unwrap();
let dto: Dto = serde_json::from_reader(f).map_err(exit_now).unwrap();
pathfinder_ethereum::EthereumStateUpdate {
state_root: dto.state_root,
block_number: dto.block_number,
block_hash: dto.block_hash,
}
})
}

#[cfg(not(feature = "p2p"))]
impl DebugConfig {
fn parse(_: ()) -> Self {
Expand Down
Loading

0 comments on commit 333ac6a

Please sign in to comment.