diff --git a/ethexe/cli/src/args.rs b/ethexe/cli/src/args.rs index 7b015efd3d2..ede80f875ad 100644 --- a/ethexe/cli/src/args.rs +++ b/ethexe/cli/src/args.rs @@ -20,7 +20,7 @@ use crate::{ config, - params::{NetworkParams, PrometheusParams}, + params::{NetworkParams, PrometheusParams, RpcParams}, }; use anyhow::{anyhow, bail, Result}; use clap::{Parser, Subcommand}; @@ -28,7 +28,7 @@ use ethexe_ethereum::Ethereum; use ethexe_signer::Address; use gprimitives::CodeId; use serde::Deserialize; -use std::{fs, path::PathBuf}; +use std::{fs, num::NonZero, path::PathBuf}; #[derive(Clone, Debug, Parser, Deserialize)] #[command(version, about, long_about = None)] @@ -77,9 +77,6 @@ pub struct Args { #[arg(long = "validator-address")] pub sender_address: Option, - #[arg(long = "rpc-port")] - pub rpc_port: Option, - /// Max depth to discover last commitment. #[arg(long = "max-depth")] pub max_commitment_depth: Option, @@ -89,6 +86,16 @@ pub struct Args { #[arg(long, default_value = "12")] pub block_time: u64, + /// Amount of physical threads tokio runtime will use for program processing. + /// + /// The default value is the number of cores available to the system. + #[arg(long = "worker-threads")] + pub worker_threads_override: Option>, + + /// Amount of virtual threads (workers) for programs processing. + #[arg(long, default_value = "16")] + pub virtual_threads: NonZero, + /// Run a temporary node. /// /// A temporary directory will be created to store the configuration and will be deleted @@ -108,6 +115,10 @@ pub struct Args { #[clap(flatten)] pub prometheus_params: Option, + #[allow(missing_docs)] + #[clap(flatten)] + pub rpc_params: RpcParams, + #[command(subcommand)] pub extra_command: Option, } diff --git a/ethexe/cli/src/config.rs b/ethexe/cli/src/config.rs index 09267a16625..daf0a4ad456 100644 --- a/ethexe/cli/src/config.rs +++ b/ethexe/cli/src/config.rs @@ -23,6 +23,7 @@ use anyhow::{ensure, Context as _, Result}; use directories::ProjectDirs; use ethexe_network::NetworkEventLoopConfig; use ethexe_prometheus_endpoint::Registry; +use ethexe_rpc::RpcConfig; use ethexe_signer::{Address, PublicKey}; use std::{iter, net::SocketAddr, path::PathBuf, str::FromStr, time::Duration}; use tempfile::TempDir; @@ -101,6 +102,14 @@ pub struct Config { /// Block production time. pub block_time: Duration, + /// Amount of physical threads tokio runtime will use for program processing. + /// + /// The default value is the number of cores available to the system. + pub worker_threads_override: Option, + + /// Amount of virtual threads (workers) for programs processing. + pub virtual_threads: usize, + /// Path of the state database pub database_path: PathBuf, @@ -116,14 +125,14 @@ pub struct Config { /// Sender address to send Ethereum transaction. pub sender_address: Option, - // Network configuration + /// Network configuration. pub net_config: Option, - // Prometheus configuration + /// Prometheus configuration. pub prometheus_config: Option, - /// RPC port - pub rpc_port: Option, + /// Rpc endpoint configuration. + pub rpc_config: Option, } impl TryFrom for Config { @@ -195,6 +204,8 @@ impl TryFrom for Config { .context("failed to parse router address")?, max_commitment_depth: args.max_commitment_depth.unwrap_or(1000), block_time: Duration::from_secs(args.block_time), + worker_threads_override: args.worker_threads_override.map(|v| u8::from(v).into()), + virtual_threads: u8::from(args.virtual_threads).into(), database_path: base_path.join("db"), key_path: base_path.join("key"), sequencer, @@ -204,7 +215,7 @@ impl TryFrom for Config { prometheus_config: args.prometheus_params.and_then(|params| { params.prometheus_config(DEFAULT_PROMETHEUS_PORT, "ethexe-dev".to_string()) }), - rpc_port: args.rpc_port, + rpc_config: args.rpc_params.as_config(), }) } } diff --git a/ethexe/cli/src/params/mod.rs b/ethexe/cli/src/params/mod.rs index 888304b46db..3934f2dbb1f 100644 --- a/ethexe/cli/src/params/mod.rs +++ b/ethexe/cli/src/params/mod.rs @@ -16,8 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -mod network_params; -mod prometheus_params; +mod network; +mod prometheus; +mod rpc; -pub use network_params::*; -pub use prometheus_params::*; +pub use network::NetworkParams; +pub use prometheus::PrometheusParams; +pub use rpc::RpcParams; diff --git a/ethexe/cli/src/params/network_params.rs b/ethexe/cli/src/params/network.rs similarity index 100% rename from ethexe/cli/src/params/network_params.rs rename to ethexe/cli/src/params/network.rs diff --git a/ethexe/cli/src/params/prometheus_params.rs b/ethexe/cli/src/params/prometheus.rs similarity index 100% rename from ethexe/cli/src/params/prometheus_params.rs rename to ethexe/cli/src/params/prometheus.rs diff --git a/ethexe/cli/src/params/rpc.rs b/ethexe/cli/src/params/rpc.rs new file mode 100644 index 00000000000..72cf961c1a6 --- /dev/null +++ b/ethexe/cli/src/params/rpc.rs @@ -0,0 +1,58 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use clap::Args; +use ethexe_rpc::RpcConfig; +use serde::Deserialize; +use std::net::{Ipv4Addr, SocketAddr}; + +/// Parameters used to config prometheus. +#[derive(Debug, Clone, Args, Deserialize)] +pub struct RpcParams { + /// Rpc endpoint port. + #[arg(long, default_value = "9944")] + pub rpc_port: u16, + + /// Expose rpc endpoint on all interfaces + #[arg(long, default_value = "false")] + pub rpc_external: bool, + + /// Do not start rpc endpoint. + #[arg(long, default_value = "false")] + pub no_rpc: bool, +} + +impl RpcParams { + /// Creates [`RpcConfig`]. + pub fn as_config(&self) -> Option { + if self.no_rpc { + return None; + }; + + let ip = if self.rpc_external { + Ipv4Addr::UNSPECIFIED + } else { + Ipv4Addr::LOCALHOST + } + .into(); + + let listen_addr = SocketAddr::new(ip, self.rpc_port); + + Some(RpcConfig { listen_addr }) + } +} diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index b9b17dde143..2b4d039ed3e 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -33,7 +33,7 @@ use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_ethereum::{primitives::U256, router::RouterQuery}; use ethexe_network::{db_sync, NetworkReceiverEvent}; use ethexe_observer::{RequestBlockData, RequestEvent}; -use ethexe_processor::LocalOutcome; +use ethexe_processor::{LocalOutcome, ProcessorConfig}; use ethexe_sequencer::agro::AggregatedCommitments; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; use ethexe_validator::BlockCommitmentValidationRequest; @@ -126,7 +126,22 @@ impl Service { ) .await?; - let processor = ethexe_processor::Processor::new(db.clone())?; + let processor = ethexe_processor::Processor::with_config( + ProcessorConfig { + worker_threads_override: config.worker_threads_override, + virtual_threads: config.virtual_threads, + }, + db.clone(), + )?; + + if let Some(worker_threads) = processor.config().worker_threads_override { + log::info!("🔧 Overriding amount of physical threads for runtime: {worker_threads}"); + } + + log::info!( + "🔧 Amount of virtual threads for programs processing: {}", + processor.config().virtual_threads + ); let signer = ethexe_signer::Signer::new(config.key_path.clone())?; @@ -179,8 +194,9 @@ impl Service { .transpose()?; let rpc = config - .rpc_port - .map(|port| ethexe_rpc::RpcService::new(port, db.clone())); + .rpc_config + .as_ref() + .map(|config| ethexe_rpc::RpcService::new(config.clone(), db.clone())); Ok(Self { db, @@ -440,8 +456,10 @@ impl Service { }; let mut rpc_handle = if let Some(rpc) = rpc { - let (rpc_run, rpc_port) = rpc.run_server().await?; - log::info!("🌐 Rpc server started at: {}", rpc_port); + log::info!("🌐 Rpc server starting at: {}", rpc.port()); + + let rpc_run = rpc.run_server().await?; + Some(tokio::spawn(rpc_run.stopped())) } else { None @@ -879,6 +897,7 @@ mod utils { mod tests { use super::Service; use crate::config::{Config, PrometheusConfig}; + use ethexe_rpc::RpcConfig; use std::{ net::{Ipv4Addr, SocketAddr}, time::Duration, @@ -904,6 +923,8 @@ mod tests { .expect("infallible"), max_commitment_depth: 1000, block_time: Duration::from_secs(1), + worker_threads_override: None, + virtual_threads: 1, database_path: tmp_dir.join("db"), key_path: tmp_dir.join("key"), sequencer: Default::default(), @@ -914,7 +935,9 @@ mod tests { SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9635), "dev".to_string(), )), - rpc_port: Some(9090), + rpc_config: Some(RpcConfig { + listen_addr: SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9944), + }), }) .await .unwrap(); @@ -929,6 +952,8 @@ mod tests { .expect("infallible"), max_commitment_depth: 1000, block_time: Duration::from_secs(1), + worker_threads_override: None, + virtual_threads: 1, database_path: tmp_dir.join("db"), key_path: tmp_dir.join("key"), sequencer: Default::default(), @@ -936,7 +961,7 @@ mod tests { sender_address: Default::default(), net_config: None, prometheus_config: None, - rpc_port: None, + rpc_config: None, }) .await .unwrap(); diff --git a/ethexe/processor/src/handling/mod.rs b/ethexe/processor/src/handling/mod.rs index 752a4ff87b1..3d08e4805a0 100644 --- a/ethexe/processor/src/handling/mod.rs +++ b/ethexe/processor/src/handling/mod.rs @@ -28,6 +28,7 @@ pub(crate) mod events; pub(crate) mod run; pub struct ProcessingHandler { + pub block_hash: H256, pub db: Database, pub transitions: InBlockTransitions, } @@ -70,6 +71,7 @@ impl Processor { let transitions = InBlockTransitions::new(header, states, schedule); Ok(ProcessingHandler { + block_hash, db: self.db.clone(), transitions, }) diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index d9a66560cd0..0ab3c1fbc13 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -16,7 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::host::{InstanceCreator, InstanceWrapper}; +use crate::{ + host::{InstanceCreator, InstanceWrapper}, + ProcessorConfig, +}; use core_processor::common::JournalNote; use ethexe_db::{CodesStorage, Database}; use ethexe_runtime_common::{InBlockTransitions, JournalHandler, TransitionController}; @@ -34,36 +37,47 @@ enum Task { } pub fn run( - threads_amount: usize, + config: &ProcessorConfig, db: Database, instance_creator: InstanceCreator, in_block_transitions: &mut InBlockTransitions, ) { tokio::task::block_in_place(|| { - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(threads_amount) - .enable_all() - .build() - .unwrap(); + let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); - rt.block_on(async { run_in_async(db, instance_creator, in_block_transitions).await }) + if let Some(worker_threads) = config.worker_threads_override { + rt_builder.worker_threads(worker_threads); + }; + + rt_builder.enable_all(); + + let rt = rt_builder.build().unwrap(); + + rt.block_on(async { + run_in_async( + config.virtual_threads, + db, + instance_creator, + in_block_transitions, + ) + .await + }) }) } // TODO: Returning Vec is a temporary solution. // In future need to send all messages to users and all state hashes changes to sequencer. async fn run_in_async( + virtual_threads: usize, db: Database, instance_creator: InstanceCreator, in_block_transitions: &mut InBlockTransitions, ) { - let num_workers = 4; - let mut task_senders = vec![]; let mut handles = vec![]; // create workers - for id in 0..num_workers { + for id in 0..virtual_threads { let (task_sender, task_receiver) = mpsc::channel(100); task_senders.push(task_sender); let handle = tokio::spawn(worker( @@ -79,7 +93,7 @@ async fn run_in_async( // Send tasks to process programs in workers, until all queues are empty. let mut no_more_to_do = true; - for index in (0..in_block_transitions.states_amount()).step_by(num_workers) { + for index in (0..in_block_transitions.states_amount()).step_by(virtual_threads) { let result_receivers = one_batch(index, &task_senders, in_block_transitions).await; let mut super_journal = vec![]; diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 29660d78652..2421bf22c1c 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -21,10 +21,10 @@ use anyhow::{anyhow, ensure, Result}; use ethexe_common::{mirror::RequestEvent as MirrorEvent, BlockRequestEvent}; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; -use ethexe_runtime_common::{state::Storage, InBlockTransitions}; +use ethexe_runtime_common::state::Storage; use gear_core::{ids::prelude::CodeIdExt, message::ReplyInfo}; use gprimitives::{ActorId, CodeId, MessageId, H256}; -use handling::run; +use handling::{run, ProcessingHandler}; use host::InstanceCreator; pub use common::LocalOutcome; @@ -37,8 +37,24 @@ mod handling; #[cfg(test)] mod tests; +#[derive(Clone, Debug)] +pub struct ProcessorConfig { + pub worker_threads_override: Option, + pub virtual_threads: usize, +} + +impl Default for ProcessorConfig { + fn default() -> Self { + Self { + worker_threads_override: None, + virtual_threads: 16, + } + } +} + #[derive(Clone)] pub struct Processor { + config: ProcessorConfig, db: Database, creator: InstanceCreator, } @@ -46,9 +62,22 @@ pub struct Processor { /// TODO: consider avoiding re-instantiations on processing events. /// Maybe impl `struct EventProcessor`. impl Processor { + /// Creates processor with default config. pub fn new(db: Database) -> Result { + Self::with_config(Default::default(), db) + } + + pub fn with_config(config: ProcessorConfig, db: Database) -> Result { let creator = InstanceCreator::new(host::runtime())?; - Ok(Self { db, creator }) + Ok(Self { + config, + db, + creator, + }) + } + + pub fn config(&self) -> &ProcessorConfig { + &self.config } pub fn overlaid(mut self) -> OverlaidProcessor { @@ -94,7 +123,7 @@ impl Processor { } handler.run_schedule(); - self.run(block_hash, &mut handler.transitions); + self.process_queue(&mut handler); let (transitions, states, schedule) = handler.transitions.finalize(); @@ -109,14 +138,14 @@ impl Processor { Ok(outcomes) } - pub fn run(&mut self, chain_head: H256, in_block_transitions: &mut InBlockTransitions) { - self.creator.set_chain_head(chain_head); + pub fn process_queue(&mut self, handler: &mut ProcessingHandler) { + self.creator.set_chain_head(handler.block_hash); run::run( - 8, + self.config(), self.db.clone(), self.creator.clone(), - in_block_transitions, + &mut handler.transitions, ); } } @@ -163,12 +192,7 @@ impl OverlaidProcessor { }, )?; - run::run( - 8, - self.0.db.clone(), - self.0.creator.clone(), - &mut handler.transitions, - ); + self.0.process_queue(&mut handler); let res = handler .transitions diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index 2ba8775c095..721df5b5b6e 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -291,12 +291,7 @@ fn ping_pong() { ) .expect("failed to send message"); - run::run( - 8, - processor.db.clone(), - processor.creator.clone(), - &mut handler.transitions, - ); + processor.process_queue(&mut handler); let to_users = handler.transitions.current_messages(); @@ -412,12 +407,7 @@ fn async_and_ping() { ) .expect("failed to send message"); - run::run( - 8, - processor.db.clone(), - processor.creator.clone(), - &mut handler.transitions, - ); + processor.process_queue(&mut handler); let to_users = handler.transitions.current_messages(); @@ -440,8 +430,6 @@ fn async_and_ping() { fn many_waits() { init_logger(); - let threads_amount = 8; - let wat = r#" (module (import "env" "memory" (memory 1)) @@ -519,12 +507,8 @@ fn many_waits() { } handler.run_schedule(); - run::run( - threads_amount, - processor.db.clone(), - processor.creator.clone(), - &mut handler.transitions, - ); + processor.process_queue(&mut handler); + assert_eq!( handler.transitions.current_messages().len(), amount as usize @@ -544,12 +528,8 @@ fn many_waits() { .expect("failed to send message"); } - run::run( - threads_amount, - processor.db.clone(), - processor.creator.clone(), - &mut handler.transitions, - ); + processor.process_queue(&mut handler); + // unchanged assert_eq!( handler.transitions.current_messages().len(), @@ -604,14 +584,8 @@ fn many_waits() { } let mut handler = processor.handler(ch11).unwrap(); - handler.run_schedule(); - run::run( - threads_amount, - processor.db.clone(), - processor.creator.clone(), - &mut handler.transitions, - ); + processor.process_queue(&mut handler); assert_eq!( handler.transitions.current_messages().len(), diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 2621f98f41b..c26aeb12b29 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -42,32 +42,34 @@ struct PerConnection { svc_builder: TowerServiceBuilder, } +/// Configuration of the RPC endpoint. +#[derive(Debug, Clone)] pub struct RpcConfig { - port: u16, - db: Database, + /// Listen address. + pub listen_addr: SocketAddr, } pub struct RpcService { config: RpcConfig, + db: Database, } impl RpcService { - pub fn new(port: u16, db: Database) -> Self { - Self { - config: RpcConfig { port, db }, - } + pub fn new(config: RpcConfig, db: Database) -> Self { + Self { config, db } + } + + pub const fn port(&self) -> u16 { + self.config.listen_addr.port() } - pub async fn run_server(self) -> anyhow::Result<(ServerHandle, u16)> { - let listener = - TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], self.config.port))).await?; + pub async fn run_server(self) -> anyhow::Result { + let listener = TcpListener::bind(self.config.listen_addr).await?; let service_builder = Server::builder().to_service_builder(); let mut module = JsonrpcModule::new(()); - module.merge(ProgramServer::into_rpc(ProgramApi::new( - self.config.db.clone(), - )))?; - module.merge(BlockServer::into_rpc(BlockApi::new(self.config.db.clone())))?; + module.merge(ProgramServer::into_rpc(ProgramApi::new(self.db.clone())))?; + module.merge(BlockServer::into_rpc(BlockApi::new(self.db.clone())))?; let (stop_handle, server_handle) = stop_channel(); @@ -136,6 +138,6 @@ impl RpcService { } }); - Ok((server_handle, self.config.port)) + Ok(server_handle) } }