Skip to content

Commit

Permalink
Merge pull request #3016 from wangdayong228/metrics-for-async-rpcs
Browse files Browse the repository at this point in the history
Metrics for async rpcs
  • Loading branch information
wangdayong228 authored Dec 31, 2024
2 parents ffba844 + 312d82e commit b8e2238
Show file tree
Hide file tree
Showing 13 changed files with 1,094 additions and 153 deletions.
713 changes: 658 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ members = [
"crates/pos/types/pow-types",
"crates/pos/types/types",
"crates/pos/config/management/network-address-encryption",
"crates/rpc/rpc-middlewares",
]

resolver = "2"
Expand Down Expand Up @@ -194,6 +195,7 @@ cfx-rpc = { path = "./crates/rpc/rpc-eth-impl" }
cfx-rpc-utils = { path = "./crates/rpc/rpc-utils" }
cfx-rpc-builder = { path = "./crates/rpc/rpc-builder" }
cfx-rpc-cfx-impl = { path = "./crates/rpc/rpc-cfx-impl" }
cfx-rpc-middlewares = { path = "./crates/rpc/rpc-middlewares" }
bounded-executor = { path = "./crates/pos/common/bounded-executor" }
diem-channel = { path = "./crates/pos/common/channel", package = "channel" }
channel = { path = "./crates/pos/common/channel" }
Expand Down Expand Up @@ -320,7 +322,7 @@ secp256k1 = "0.30.0"
clap = "2"
rand = "0.7"
rand_xorshift = "0.2"
rand_08 = { package = "rand", version = "0.8"}
rand_08 = { package = "rand", version = "0.8" }
log = "0.4"
log4rs = "1.2.0"
env_logger = "0.11"
Expand Down Expand Up @@ -366,6 +368,7 @@ docopt = "1.0"
vergen = "7.0.0"
target_info = "0.1"
libc = "0.2"
rustls = "0.21"

# conflux forked crates
rocksdb = { git = "https://github.com/Conflux-Chain/rust-rocksdb.git", rev = "3773afe5b953997188f37c39308105b5deb0faac" }
Expand Down
1 change: 1 addition & 0 deletions crates/client/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ pub fn initialize_not_light_node_modules(
let async_eth_rpc_http_server =
tokio_runtime.block_on(launch_async_rpc_servers(
conf.rpc_impl_config(),
conf.raw_conf.throttling_conf.clone(),
conf.raw_conf.public_evm_rpc_async_apis.clone(),
consensus.clone(),
sync.clone(),
Expand Down
13 changes: 8 additions & 5 deletions crates/client/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,19 @@ where

// start espace rpc server v2(async)
pub async fn launch_async_rpc_servers(
config: RpcImplConfiguration, apis: RpcModuleSelection,
consensus: SharedConsensusGraph, sync: SharedSynchronizationService,
tx_pool: SharedTransactionPool, addr: Option<SocketAddr>,
rpc_conf: RpcImplConfiguration, throttling_conf_file: Option<String>,
apis: RpcModuleSelection, consensus: SharedConsensusGraph,
sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
addr: Option<SocketAddr>,
) -> Result<Option<RpcServerHandle>, String> {
if addr.is_none() {
return Ok(None);
}

let enable_metrics = rpc_conf.enable_metrics;

let rpc_module_builder =
RpcModuleBuilder::new(config, consensus, sync, tx_pool);
RpcModuleBuilder::new(rpc_conf, consensus, sync, tx_pool);

info!(
"Enabled evm async rpc modules: {:?}",
Expand All @@ -545,7 +548,7 @@ pub async fn launch_async_rpc_servers(
.with_http_address(addr.unwrap());

let server_handle = server_config
.start(&transport_rpc_modules)
.start(&transport_rpc_modules, throttling_conf_file, enable_metrics)
.await
.map_err(|e| e.to_string())?;

Expand Down
13 changes: 11 additions & 2 deletions crates/rpc/rpc-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ strum = { workspace = true, features = ["derive"] }
alloy-primitives = { workspace = true}
thiserror = { workspace = true }

jsonrpc-core ={ workspace = true}
jsonrpsee-core = { workspace = true }
jsonrpsee-types = { workspace = true }
jsonrpsee = { workspace = true, features = ["server"] }
# tower-http = { workspace = true, features = ["full"] }
tower = { workspace = true, features = ["full"] }
http.workspace = true
pin-project = { workspace = true }
Expand All @@ -31,4 +31,13 @@ cfx-rpc-eth-api = { workspace = true }
cfxcore = { workspace = true }
cfx-types = { workspace = true }
cfx-rpc-eth-types = { workspace = true }
cfx-rpc-cfx-types = { workspace = true }
cfx-rpc-cfx-types = { workspace = true }
throttling = { workspace = true }
cfx-util-macros = { workspace = true }
log = { workspace = true }
cfx-rpc-utils = { workspace = true }
cfx-rpc-middlewares = { workspace = true }
futures-util = { workspace = true, features = ["io", "async-await-macro"] }

[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
164 changes: 75 additions & 89 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ mod error;
mod id_provider;
mod module;

use cfx_rpc_middlewares::{Metrics, Throttle};
pub use error::*;
pub use id_provider::EthSubscriptionIdProvider;
use log::debug;
pub use module::{EthRpcModule, RpcModuleSelection};

use cfx_rpc::{helpers::ChainInfo, *};
Expand Down Expand Up @@ -267,7 +269,7 @@ impl RpcRegistryInner {
/// started, See also [`ServerBuilder::build`] and
/// [`Server::start`](jsonrpsee::server::Server::start).
#[derive(Debug)]
pub struct RpcServerConfig<RpcMiddleware = Identity> {
pub struct RpcServerConfig {
/// Configs for JSON-RPC Http.
http_server_config: Option<ServerBuilder<Identity, Identity>>,
/// Allowed CORS Domains for http
Expand All @@ -280,12 +282,12 @@ pub struct RpcServerConfig<RpcMiddleware = Identity> {
ws_cors_domains: Option<String>,
/// Address where to bind the ws server to
ws_addr: Option<SocketAddr>,
/// Configurable RPC middleware
#[allow(dead_code)]
rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
// /// Configurable RPC middleware
// #[allow(dead_code)]
// rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
}

impl Default for RpcServerConfig<Identity> {
impl Default for RpcServerConfig {
fn default() -> Self {
Self {
http_server_config: None,
Expand All @@ -294,7 +296,7 @@ impl Default for RpcServerConfig<Identity> {
ws_server_config: None,
ws_cors_domains: None,
ws_addr: None,
rpc_middleware: RpcServiceBuilder::new(),
// rpc_middleware: RpcServiceBuilder::new(),
}
}
}
Expand Down Expand Up @@ -337,21 +339,21 @@ impl RpcServerConfig {
}
}

impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
impl RpcServerConfig {
/// Configure rpc middleware
pub fn set_rpc_middleware<T>(
self, rpc_middleware: RpcServiceBuilder<T>,
) -> RpcServerConfig<T> {
RpcServerConfig {
http_server_config: self.http_server_config,
http_cors_domains: self.http_cors_domains,
http_addr: self.http_addr,
ws_server_config: self.ws_server_config,
ws_cors_domains: self.ws_cors_domains,
ws_addr: self.ws_addr,
rpc_middleware,
}
}
// pub fn set_rpc_middleware<T>(
// self, rpc_middleware: RpcServiceBuilder<T>,
// ) -> RpcServerConfig<T> {
// RpcServerConfig {
// http_server_config: self.http_server_config,
// http_cors_domains: self.http_cors_domains,
// http_addr: self.http_addr,
// ws_server_config: self.ws_server_config,
// ws_cors_domains: self.ws_cors_domains,
// ws_addr: self.ws_addr,
// rpc_middleware,
// }
// }

/// Configure the cors domains for http _and_ ws
pub fn with_cors(self, cors_domain: Option<String>) -> Self {
Expand Down Expand Up @@ -426,9 +428,20 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
// Returns the [`RpcServerHandle`] with the handle to the started servers.
pub async fn start(
self, modules: &TransportRpcModules,
throttling_conf_file: Option<String>, enable_metrics: bool,
) -> Result<RpcServerHandle, RpcError> {
let mut http_handle = None;
let mut ws_handle = None;
// TODO: handle enable metrics
debug!("enable metrics: {}", enable_metrics);

let rpc_middleware = RpcServiceBuilder::new()
.layer_fn(move |s| {
Throttle::new(
throttling_conf_file.as_ref().map(|s| s.as_str()),
"rpc",
s,
)
})
.layer_fn(|s| Metrics::new(s));

let http_socket_addr =
self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
Expand Down Expand Up @@ -468,21 +481,7 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {

if let Some(builder) = self.http_server_config {
let server = builder
// .set_http_middleware(
// tower::ServiceBuilder::new()
// .option_layer(Self::maybe_cors_layer(cors)?)
// .option_layer(Self::maybe_jwt_layer(self.
// jwt_secret)), )
// .set_rpc_middleware(
// self.rpc_middleware.clone().layer(
// modules
// .http
// .as_ref()
// .or(modules.ws.as_ref())
// .map(RpcRequestMetrics::same_port)
// .unwrap_or_default(),
// ),
// )
.set_rpc_middleware(rpc_middleware)
.build(http_socket_addr)
.await
.map_err(|err| {
Expand All @@ -501,38 +500,33 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
modules.http.as_ref().or(modules.ws.as_ref())
{
let handle = server.start(module.clone());
http_handle = Some(handle.clone());
ws_handle = Some(handle);
let http_handle = Some(handle.clone());
let ws_handle = Some(handle);

return Ok(RpcServerHandle {
http_local_addr: Some(addr),
ws_local_addr: Some(addr),
http: http_handle,
ws: ws_handle,
});
}
return Ok(RpcServerHandle {
http_local_addr: Some(addr),
ws_local_addr: Some(addr),
http: http_handle,
ws: ws_handle,
});

return Err(RpcError::Custom(
"No valid RpcModule found from modules".to_string(),
));
}
}

let mut ws_local_addr = None;
let mut ws_server = None;
let mut http_local_addr = None;
let mut http_server = None;

let mut result = RpcServerHandle {
http_local_addr: None,
ws_local_addr: None,
http: None,
ws: None,
};
if let Some(builder) = self.ws_server_config {
let server = builder
.ws_only()
// .set_http_middleware(
// tower::ServiceBuilder::new()
// .option_layer(Self::maybe_cors_layer(self.
// ws_cors_domains.clone())?)
// .option_layer(Self::maybe_jwt_layer(self.
// jwt_secret)), )
// .set_rpc_middleware(
// self.rpc_middleware
// .clone()
// .layer(modules.ws.as_ref().
// map(RpcRequestMetrics::ws).unwrap_or_default()),
// )
.set_rpc_middleware(rpc_middleware.clone())
.build(ws_socket_addr)
.await
.map_err(|err| {
Expand All @@ -543,24 +537,20 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
RpcError::server_error(err, ServerKind::WS(ws_socket_addr))
})?;

ws_local_addr = Some(addr);
ws_server = Some(server);
let ws_local_addr = Some(addr);
let ws_server = Some(server);
let ws_handle = ws_server.map(|ws_server| {
ws_server.start(modules.ws.clone().expect("ws server error"))
});

result.ws = ws_handle;
result.ws_local_addr = ws_local_addr;
}

if let Some(builder) = self.http_server_config {
let server = builder
.http_only()
// .set_http_middleware(
// tower::ServiceBuilder::new()
// .option_layer(Self::maybe_cors_layer(self.
// http_cors_domains.clone())?)
// .option_layer(Self::maybe_jwt_layer(self.
// jwt_secret)), )
// .set_rpc_middleware(
// self.rpc_middleware.clone().layer(
// modules.http.as_ref().map(RpcRequestMetrics::http).
// unwrap_or_default(), ),
// )
.set_rpc_middleware(rpc_middleware)
.build(http_socket_addr)
.await
.map_err(|err| {
Expand All @@ -572,22 +562,18 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
let local_addr = server.local_addr().map_err(|err| {
RpcError::server_error(err, ServerKind::Http(http_socket_addr))
})?;
http_local_addr = Some(local_addr);
http_server = Some(server);
let http_local_addr = Some(local_addr);
let http_server = Some(server);
let http_handle = http_server.map(|http_server| {
http_server
.start(modules.http.clone().expect("http server error"))
});

result.http = http_handle;
result.http_local_addr = http_local_addr;
}

http_handle = http_server.map(|http_server| {
http_server.start(modules.http.clone().expect("http server error"))
});
ws_handle = ws_server.map(|ws_server| {
ws_server.start(modules.ws.clone().expect("ws server error"))
});
Ok(RpcServerHandle {
http_local_addr,
ws_local_addr,
http: http_handle,
ws: ws_handle,
})
Ok(result)
}
}

Expand Down
31 changes: 31 additions & 0 deletions crates/rpc/rpc-middlewares/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "cfx-rpc-middlewares"
edition = "2021"
version.workspace = true
authors.workspace = true
description.workspace = true
documentation.workspace = true
homepage.workspace = true
keywords.workspace = true
repository.workspace = true
license-file.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
jsonrpc-core = { workspace = true }
jsonrpsee = { workspace = true ,features = ["server","ws-client"]}
rustls = { workspace = true, features = ["dangerous_configuration"] }
throttling = { workspace = true }
futures-util = { workspace = true }
log = { workspace = true }
jsonrpsee-types = { workspace = true }
cfx-rpc-utils = { workspace = true }
cfx-util-macros = { workspace = true }
lazy_static = { workspace = true }
parking_lot = { workspace = true }
metrics = { workspace = true }
futures = { workspace = true }
tracing-subscriber = {version = "=0.3.0",features = ["env-filter"]}
anyhow = {workspace = true}
tokio = { workspace = true }
Loading

0 comments on commit b8e2238

Please sign in to comment.