Skip to content

Commit

Permalink
feat(ethexe): extend ethexe cli (#4328)
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx authored Nov 11, 2024
1 parent a508edb commit cb8a707
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 95 deletions.
21 changes: 16 additions & 5 deletions ethexe/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

use crate::{
config,
params::{NetworkParams, PrometheusParams},
params::{NetworkParams, PrometheusParams, RpcParams},
};
use anyhow::{anyhow, bail, Result};
use clap::{Parser, Subcommand};
use ethexe_ethereum::Ethereum;
use ethexe_signer::Address;
use gprimitives::CodeId;
use serde::Deserialize;
use std::{fs, path::PathBuf};
use std::{fs, num::NonZero, path::PathBuf};

#[derive(Clone, Debug, Parser, Deserialize)]
#[command(version, about, long_about = None)]
Expand Down Expand Up @@ -77,9 +77,6 @@ pub struct Args {
#[arg(long = "validator-address")]
pub sender_address: Option<String>,

#[arg(long = "rpc-port")]
pub rpc_port: Option<u16>,

/// Max depth to discover last commitment.
#[arg(long = "max-depth")]
pub max_commitment_depth: Option<u32>,
Expand All @@ -89,6 +86,16 @@ pub struct Args {
#[arg(long, default_value = "12")]
pub block_time: u64,

/// Amount of physical threads tokio runtime will use for program processing.
///
/// The default value is the number of cores available to the system.
#[arg(long = "worker-threads")]
pub worker_threads_override: Option<NonZero<u8>>,

/// Amount of virtual threads (workers) for programs processing.
#[arg(long, default_value = "16")]
pub virtual_threads: NonZero<u8>,

/// Run a temporary node.
///
/// A temporary directory will be created to store the configuration and will be deleted
Expand All @@ -108,6 +115,10 @@ pub struct Args {
#[clap(flatten)]
pub prometheus_params: Option<PrometheusParams>,

#[allow(missing_docs)]
#[clap(flatten)]
pub rpc_params: RpcParams,

#[command(subcommand)]
pub extra_command: Option<ExtraCommands>,
}
Expand Down
21 changes: 16 additions & 5 deletions ethexe/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use anyhow::{ensure, Context as _, Result};
use directories::ProjectDirs;
use ethexe_network::NetworkEventLoopConfig;
use ethexe_prometheus_endpoint::Registry;
use ethexe_rpc::RpcConfig;
use ethexe_signer::{Address, PublicKey};
use std::{iter, net::SocketAddr, path::PathBuf, str::FromStr, time::Duration};
use tempfile::TempDir;
Expand Down Expand Up @@ -101,6 +102,14 @@ pub struct Config {
/// Block production time.
pub block_time: Duration,

/// Amount of physical threads tokio runtime will use for program processing.
///
/// The default value is the number of cores available to the system.
pub worker_threads_override: Option<usize>,

/// Amount of virtual threads (workers) for programs processing.
pub virtual_threads: usize,

/// Path of the state database
pub database_path: PathBuf,

Expand All @@ -116,14 +125,14 @@ pub struct Config {
/// Sender address to send Ethereum transaction.
pub sender_address: Option<String>,

// Network configuration
/// Network configuration.
pub net_config: Option<NetworkEventLoopConfig>,

// Prometheus configuration
/// Prometheus configuration.
pub prometheus_config: Option<PrometheusConfig>,

/// RPC port
pub rpc_port: Option<u16>,
/// Rpc endpoint configuration.
pub rpc_config: Option<RpcConfig>,
}

impl TryFrom<Args> for Config {
Expand Down Expand Up @@ -195,6 +204,8 @@ impl TryFrom<Args> for Config {
.context("failed to parse router address")?,
max_commitment_depth: args.max_commitment_depth.unwrap_or(1000),
block_time: Duration::from_secs(args.block_time),
worker_threads_override: args.worker_threads_override.map(|v| u8::from(v).into()),
virtual_threads: u8::from(args.virtual_threads).into(),
database_path: base_path.join("db"),
key_path: base_path.join("key"),
sequencer,
Expand All @@ -204,7 +215,7 @@ impl TryFrom<Args> for Config {
prometheus_config: args.prometheus_params.and_then(|params| {
params.prometheus_config(DEFAULT_PROMETHEUS_PORT, "ethexe-dev".to_string())
}),
rpc_port: args.rpc_port,
rpc_config: args.rpc_params.as_config(),
})
}
}
10 changes: 6 additions & 4 deletions ethexe/cli/src/params/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

mod network_params;
mod prometheus_params;
mod network;
mod prometheus;
mod rpc;

pub use network_params::*;
pub use prometheus_params::*;
pub use network::NetworkParams;
pub use prometheus::PrometheusParams;
pub use rpc::RpcParams;
File renamed without changes.
File renamed without changes.
58 changes: 58 additions & 0 deletions ethexe/cli/src/params/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// This file is part of Gear.
//
// Copyright (C) 2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use clap::Args;
use ethexe_rpc::RpcConfig;
use serde::Deserialize;
use std::net::{Ipv4Addr, SocketAddr};

/// Parameters used to config prometheus.
#[derive(Debug, Clone, Args, Deserialize)]
pub struct RpcParams {
/// Rpc endpoint port.
#[arg(long, default_value = "9944")]
pub rpc_port: u16,

/// Expose rpc endpoint on all interfaces
#[arg(long, default_value = "false")]
pub rpc_external: bool,

/// Do not start rpc endpoint.
#[arg(long, default_value = "false")]
pub no_rpc: bool,
}

impl RpcParams {
/// Creates [`RpcConfig`].
pub fn as_config(&self) -> Option<RpcConfig> {
if self.no_rpc {
return None;
};

let ip = if self.rpc_external {
Ipv4Addr::UNSPECIFIED
} else {
Ipv4Addr::LOCALHOST
}
.into();

let listen_addr = SocketAddr::new(ip, self.rpc_port);

Some(RpcConfig { listen_addr })
}
}
41 changes: 33 additions & 8 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use ethexe_db::{BlockMetaStorage, CodesStorage, Database};
use ethexe_ethereum::{primitives::U256, router::RouterQuery};
use ethexe_network::{db_sync, NetworkReceiverEvent};
use ethexe_observer::{RequestBlockData, RequestEvent};
use ethexe_processor::LocalOutcome;
use ethexe_processor::{LocalOutcome, ProcessorConfig};
use ethexe_sequencer::agro::AggregatedCommitments;
use ethexe_signer::{Digest, PublicKey, Signature, Signer};
use ethexe_validator::BlockCommitmentValidationRequest;
Expand Down Expand Up @@ -126,7 +126,22 @@ impl Service {
)
.await?;

let processor = ethexe_processor::Processor::new(db.clone())?;
let processor = ethexe_processor::Processor::with_config(
ProcessorConfig {
worker_threads_override: config.worker_threads_override,
virtual_threads: config.virtual_threads,
},
db.clone(),
)?;

if let Some(worker_threads) = processor.config().worker_threads_override {
log::info!("🔧 Overriding amount of physical threads for runtime: {worker_threads}");
}

log::info!(
"🔧 Amount of virtual threads for programs processing: {}",
processor.config().virtual_threads
);

let signer = ethexe_signer::Signer::new(config.key_path.clone())?;

Expand Down Expand Up @@ -179,8 +194,9 @@ impl Service {
.transpose()?;

let rpc = config
.rpc_port
.map(|port| ethexe_rpc::RpcService::new(port, db.clone()));
.rpc_config
.as_ref()
.map(|config| ethexe_rpc::RpcService::new(config.clone(), db.clone()));

Ok(Self {
db,
Expand Down Expand Up @@ -440,8 +456,10 @@ impl Service {
};

let mut rpc_handle = if let Some(rpc) = rpc {
let (rpc_run, rpc_port) = rpc.run_server().await?;
log::info!("🌐 Rpc server started at: {}", rpc_port);
log::info!("🌐 Rpc server starting at: {}", rpc.port());

let rpc_run = rpc.run_server().await?;

Some(tokio::spawn(rpc_run.stopped()))
} else {
None
Expand Down Expand Up @@ -879,6 +897,7 @@ mod utils {
mod tests {
use super::Service;
use crate::config::{Config, PrometheusConfig};
use ethexe_rpc::RpcConfig;
use std::{
net::{Ipv4Addr, SocketAddr},
time::Duration,
Expand All @@ -904,6 +923,8 @@ mod tests {
.expect("infallible"),
max_commitment_depth: 1000,
block_time: Duration::from_secs(1),
worker_threads_override: None,
virtual_threads: 1,
database_path: tmp_dir.join("db"),
key_path: tmp_dir.join("key"),
sequencer: Default::default(),
Expand All @@ -914,7 +935,9 @@ mod tests {
SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9635),
"dev".to_string(),
)),
rpc_port: Some(9090),
rpc_config: Some(RpcConfig {
listen_addr: SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9944),
}),
})
.await
.unwrap();
Expand All @@ -929,14 +952,16 @@ mod tests {
.expect("infallible"),
max_commitment_depth: 1000,
block_time: Duration::from_secs(1),
worker_threads_override: None,
virtual_threads: 1,
database_path: tmp_dir.join("db"),
key_path: tmp_dir.join("key"),
sequencer: Default::default(),
validator: Default::default(),
sender_address: Default::default(),
net_config: None,
prometheus_config: None,
rpc_port: None,
rpc_config: None,
})
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions ethexe/processor/src/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(crate) mod events;
pub(crate) mod run;

pub struct ProcessingHandler {
pub block_hash: H256,
pub db: Database,
pub transitions: InBlockTransitions,
}
Expand Down Expand Up @@ -70,6 +71,7 @@ impl Processor {
let transitions = InBlockTransitions::new(header, states, schedule);

Ok(ProcessingHandler {
block_hash,
db: self.db.clone(),
transitions,
})
Expand Down
38 changes: 26 additions & 12 deletions ethexe/processor/src/handling/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::host::{InstanceCreator, InstanceWrapper};
use crate::{
host::{InstanceCreator, InstanceWrapper},
ProcessorConfig,
};
use core_processor::common::JournalNote;
use ethexe_db::{CodesStorage, Database};
use ethexe_runtime_common::{InBlockTransitions, JournalHandler, TransitionController};
Expand All @@ -34,36 +37,47 @@ enum Task {
}

pub fn run(
threads_amount: usize,
config: &ProcessorConfig,
db: Database,
instance_creator: InstanceCreator,
in_block_transitions: &mut InBlockTransitions,
) {
tokio::task::block_in_place(|| {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(threads_amount)
.enable_all()
.build()
.unwrap();
let mut rt_builder = tokio::runtime::Builder::new_multi_thread();

rt.block_on(async { run_in_async(db, instance_creator, in_block_transitions).await })
if let Some(worker_threads) = config.worker_threads_override {
rt_builder.worker_threads(worker_threads);
};

rt_builder.enable_all();

let rt = rt_builder.build().unwrap();

rt.block_on(async {
run_in_async(
config.virtual_threads,
db,
instance_creator,
in_block_transitions,
)
.await
})
})
}

// TODO: Returning Vec<LocalOutcome> is a temporary solution.
// In future need to send all messages to users and all state hashes changes to sequencer.
async fn run_in_async(
virtual_threads: usize,
db: Database,
instance_creator: InstanceCreator,
in_block_transitions: &mut InBlockTransitions,
) {
let num_workers = 4;

let mut task_senders = vec![];
let mut handles = vec![];

// create workers
for id in 0..num_workers {
for id in 0..virtual_threads {
let (task_sender, task_receiver) = mpsc::channel(100);
task_senders.push(task_sender);
let handle = tokio::spawn(worker(
Expand All @@ -79,7 +93,7 @@ async fn run_in_async(
// Send tasks to process programs in workers, until all queues are empty.

let mut no_more_to_do = true;
for index in (0..in_block_transitions.states_amount()).step_by(num_workers) {
for index in (0..in_block_transitions.states_amount()).step_by(virtual_threads) {
let result_receivers = one_batch(index, &task_senders, in_block_transitions).await;

let mut super_journal = vec![];
Expand Down
Loading

0 comments on commit cb8a707

Please sign in to comment.