From c8914d878dfe5887886dc7145bb10116721d203d Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 3 Jun 2024 16:10:20 +1000 Subject: [PATCH] refactor(node-framework): use owned type to identify `Task`s (#2124) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ This PR refactors `Task`-related traits to use an owned type `TaskId` instead of `&'static str`. ## Why ❔ There are use cases where a task's id needs to be generated dynamically (e.g. broad trait implementations) which this PR accommodates for. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. --- core/node/node_framework/examples/showcase.rs | 14 +++--- .../layers/circuit_breaker_checker.rs | 6 +-- .../layers/commitment_generator.rs | 6 +-- .../src/implementations/layers/consensus.rs | 10 ++-- .../layers/consistency_checker.rs | 6 +-- .../layers/contract_verification_api.rs | 6 +-- .../src/implementations/layers/eth_sender.rs | 10 ++-- .../src/implementations/layers/eth_watch.rs | 6 +-- .../layers/healtcheck_server.rs | 6 +-- .../implementations/layers/house_keeper.rs | 46 +++++++++---------- .../src/implementations/layers/l1_gas.rs | 6 +-- .../layers/metadata_calculator.rs | 10 ++-- .../layers/prometheus_exporter.rs | 6 +-- .../layers/proof_data_handler.rs | 6 +-- .../src/implementations/layers/sigint.rs | 6 +-- .../layers/state_keeper/mempool_io.rs | 10 ++-- .../layers/state_keeper/mod.rs | 10 ++-- .../layers/tee_verifier_input_producer.rs | 6 +-- .../implementations/layers/web3_api/caches.rs | 6 +-- .../implementations/layers/web3_api/server.rs | 12 ++--- .../layers/web3_api/tx_sender.rs | 10 ++-- core/node/node_framework/src/precondition.rs | 4 +- .../node_framework/src/service/context.rs | 10 ++-- .../node_framework/src/service/runnables.rs | 26 +++++------ core/node/node_framework/src/service/tests.rs | 14 +++--- core/node/node_framework/src/task.rs | 44 ++++++++++++++++-- 26 files changed, 168 insertions(+), 134 deletions(-) diff --git a/core/node/node_framework/examples/showcase.rs b/core/node/node_framework/examples/showcase.rs index 0a1552f3350..98baa5bc968 100644 --- a/core/node/node_framework/examples/showcase.rs +++ b/core/node/node_framework/examples/showcase.rs @@ -10,7 +10,7 @@ use std::{ use zksync_node_framework::{ resource::Resource, service::{ServiceContext, StopReceiver, ZkStackServiceBuilder}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -96,14 +96,14 @@ impl PutTask { #[async_trait::async_trait] impl Task for PutTask { - fn name(&self) -> &'static str { + fn id(&self) -> TaskId { // Task names simply have to be unique. They are used for logging and debugging. - "put_task" + "put_task".into() } /// This method will be invoked by the framework when the task is started. async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { - tracing::info!("Starting the task {}", self.name()); + tracing::info!("Starting the task {}", self.id()); // We have to respect the stop receiver and should exit as soon as we receive // a stop signal. @@ -138,12 +138,12 @@ impl CheckTask { #[async_trait::async_trait] impl Task for CheckTask { - fn name(&self) -> &'static str { - "check_task" + fn id(&self) -> TaskId { + "check_task".into() } async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { - tracing::info!("Starting the task {}", self.name()); + tracing::info!("Starting the task {}", self.id()); tokio::select! { _ = self.run_inner() => {}, diff --git a/core/node/node_framework/src/implementations/layers/circuit_breaker_checker.rs b/core/node/node_framework/src/implementations/layers/circuit_breaker_checker.rs index f493d8081ef..b8fff34b7e9 100644 --- a/core/node/node_framework/src/implementations/layers/circuit_breaker_checker.rs +++ b/core/node/node_framework/src/implementations/layers/circuit_breaker_checker.rs @@ -4,7 +4,7 @@ use zksync_config::configs::chain::CircuitBreakerConfig; use crate::{ implementations::resources::circuit_breakers::CircuitBreakersResource, service::{ServiceContext, StopReceiver}, - task::UnconstrainedTask, + task::{TaskId, UnconstrainedTask}, wiring_layer::{WiringError, WiringLayer}, }; @@ -43,8 +43,8 @@ struct CircuitBreakerCheckerTask { #[async_trait::async_trait] impl UnconstrainedTask for CircuitBreakerCheckerTask { - fn name(&self) -> &'static str { - "circuit_breaker_checker" + fn id(&self) -> TaskId { + "circuit_breaker_checker".into() } async fn run_unconstrained( diff --git a/core/node/node_framework/src/implementations/layers/commitment_generator.rs b/core/node/node_framework/src/implementations/layers/commitment_generator.rs index aeb668dca17..5d2f6393129 100644 --- a/core/node/node_framework/src/implementations/layers/commitment_generator.rs +++ b/core/node/node_framework/src/implementations/layers/commitment_generator.rs @@ -7,7 +7,7 @@ use crate::{ pools::{MasterPool, PoolResource}, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -55,8 +55,8 @@ struct CommitmentGeneratorTask { #[async_trait::async_trait] impl Task for CommitmentGeneratorTask { - fn name(&self) -> &'static str { - "commitment_generator" + fn id(&self) -> TaskId { + "commitment_generator".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/consensus.rs b/core/node/node_framework/src/implementations/layers/consensus.rs index 5a91e796eb5..06bca1bba3a 100644 --- a/core/node/node_framework/src/implementations/layers/consensus.rs +++ b/core/node/node_framework/src/implementations/layers/consensus.rs @@ -14,7 +14,7 @@ use crate::{ sync_state::SyncStateResource, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -110,8 +110,8 @@ pub struct MainNodeConsensusTask { #[async_trait::async_trait] impl Task for MainNodeConsensusTask { - fn name(&self) -> &'static str { - "consensus" + fn id(&self) -> TaskId { + "consensus".into() } async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -147,8 +147,8 @@ pub struct FetcherTask { #[async_trait::async_trait] impl Task for FetcherTask { - fn name(&self) -> &'static str { - "consensus_fetcher" + fn id(&self) -> TaskId { + "consensus_fetcher".into() } async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/consistency_checker.rs b/core/node/node_framework/src/implementations/layers/consistency_checker.rs index 4f2ec2ededc..a387fc19ead 100644 --- a/core/node/node_framework/src/implementations/layers/consistency_checker.rs +++ b/core/node/node_framework/src/implementations/layers/consistency_checker.rs @@ -8,7 +8,7 @@ use crate::{ pools::{MasterPool, PoolResource}, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -75,8 +75,8 @@ pub struct ConsistencyCheckerTask { #[async_trait::async_trait] impl Task for ConsistencyCheckerTask { - fn name(&self) -> &'static str { - "consistency_checker" + fn id(&self) -> TaskId { + "consistency_checker".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/contract_verification_api.rs b/core/node/node_framework/src/implementations/layers/contract_verification_api.rs index 2e0dcf540ea..5e76c32ddd5 100644 --- a/core/node/node_framework/src/implementations/layers/contract_verification_api.rs +++ b/core/node/node_framework/src/implementations/layers/contract_verification_api.rs @@ -4,7 +4,7 @@ use zksync_dal::{ConnectionPool, Core}; use crate::{ implementations::resources::pools::{MasterPool, PoolResource, ReplicaPool}, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -46,8 +46,8 @@ pub struct ContractVerificationApiTask { #[async_trait::async_trait] impl Task for ContractVerificationApiTask { - fn name(&self) -> &'static str { - "contract_verification_api" + fn id(&self) -> TaskId { + "contract_verification_api".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/eth_sender.rs b/core/node/node_framework/src/implementations/layers/eth_sender.rs index ed27fe86321..3cf2cf597c3 100644 --- a/core/node/node_framework/src/implementations/layers/eth_sender.rs +++ b/core/node/node_framework/src/implementations/layers/eth_sender.rs @@ -14,7 +14,7 @@ use crate::{ pools::{MasterPool, PoolResource, ReplicaPool}, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -173,8 +173,8 @@ struct EthTxAggregatorTask { #[async_trait::async_trait] impl Task for EthTxAggregatorTask { - fn name(&self) -> &'static str { - "eth_tx_aggregator" + fn id(&self) -> TaskId { + "eth_tx_aggregator".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -189,8 +189,8 @@ struct EthTxManagerTask { #[async_trait::async_trait] impl Task for EthTxManagerTask { - fn name(&self) -> &'static str { - "eth_tx_manager" + fn id(&self) -> TaskId { + "eth_tx_manager".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/eth_watch.rs b/core/node/node_framework/src/implementations/layers/eth_watch.rs index c12d9290753..df931901311 100644 --- a/core/node/node_framework/src/implementations/layers/eth_watch.rs +++ b/core/node/node_framework/src/implementations/layers/eth_watch.rs @@ -12,7 +12,7 @@ use crate::{ pools::{MasterPool, PoolResource}, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -75,8 +75,8 @@ struct EthWatchTask { #[async_trait::async_trait] impl Task for EthWatchTask { - fn name(&self) -> &'static str { - "eth_watch" + fn id(&self) -> TaskId { + "eth_watch".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/healtcheck_server.rs b/core/node/node_framework/src/implementations/layers/healtcheck_server.rs index 34c41fd70a9..c6138c71108 100644 --- a/core/node/node_framework/src/implementations/layers/healtcheck_server.rs +++ b/core/node/node_framework/src/implementations/layers/healtcheck_server.rs @@ -7,7 +7,7 @@ use zksync_node_api_server::healthcheck::HealthCheckHandle; use crate::{ implementations::resources::healthcheck::AppHealthCheckResource, service::{ServiceContext, StopReceiver}, - task::UnconstrainedTask, + task::{TaskId, UnconstrainedTask}, wiring_layer::{WiringError, WiringLayer}, }; @@ -53,8 +53,8 @@ struct HealthCheckTask { #[async_trait::async_trait] impl UnconstrainedTask for HealthCheckTask { - fn name(&self) -> &'static str { - "healthcheck_server" + fn id(&self) -> TaskId { + "healthcheck_server".into() } async fn run_unconstrained( diff --git a/core/node/node_framework/src/implementations/layers/house_keeper.rs b/core/node/node_framework/src/implementations/layers/house_keeper.rs index 1eb559ea5e1..7b3e52c7ed5 100644 --- a/core/node/node_framework/src/implementations/layers/house_keeper.rs +++ b/core/node/node_framework/src/implementations/layers/house_keeper.rs @@ -19,7 +19,7 @@ use zksync_house_keeper::{ use crate::{ implementations::resources::pools::{PoolResource, ProverPool, ReplicaPool}, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -179,8 +179,8 @@ struct PostgresMetricsScrapingTask { #[async_trait::async_trait] impl Task for PostgresMetricsScrapingTask { - fn name(&self) -> &'static str { - "postgres_metrics_scraping" + fn id(&self) -> TaskId { + "postgres_metrics_scraping".into() } async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -203,8 +203,8 @@ struct L1BatchMetricsReporterTask { #[async_trait::async_trait] impl Task for L1BatchMetricsReporterTask { - fn name(&self) -> &'static str { - "l1_batch_metrics_reporter" + fn id(&self) -> TaskId { + "l1_batch_metrics_reporter".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -219,8 +219,8 @@ struct FriProverJobRetryManagerTask { #[async_trait::async_trait] impl Task for FriProverJobRetryManagerTask { - fn name(&self) -> &'static str { - "fri_prover_job_retry_manager" + fn id(&self) -> TaskId { + "fri_prover_job_retry_manager".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -235,8 +235,8 @@ struct FriWitnessGeneratorJobRetryManagerTask { #[async_trait::async_trait] impl Task for FriWitnessGeneratorJobRetryManagerTask { - fn name(&self) -> &'static str { - "fri_witness_generator_job_retry_manager" + fn id(&self) -> TaskId { + "fri_witness_generator_job_retry_manager".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -253,8 +253,8 @@ struct WaitingToQueuedFriWitnessJobMoverTask { #[async_trait::async_trait] impl Task for WaitingToQueuedFriWitnessJobMoverTask { - fn name(&self) -> &'static str { - "waiting_to_queued_fri_witness_job_mover" + fn id(&self) -> TaskId { + "waiting_to_queued_fri_witness_job_mover".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -271,8 +271,8 @@ struct FriWitnessGeneratorStatsReporterTask { #[async_trait::async_trait] impl Task for FriWitnessGeneratorStatsReporterTask { - fn name(&self) -> &'static str { - "fri_witness_generator_stats_reporter" + fn id(&self) -> TaskId { + "fri_witness_generator_stats_reporter".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -289,8 +289,8 @@ struct FriProverStatsReporterTask { #[async_trait::async_trait] impl Task for FriProverStatsReporterTask { - fn name(&self) -> &'static str { - "fri_prover_stats_reporter" + fn id(&self) -> TaskId { + "fri_prover_stats_reporter".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -305,8 +305,8 @@ struct FriProofCompressorStatsReporterTask { #[async_trait::async_trait] impl Task for FriProofCompressorStatsReporterTask { - fn name(&self) -> &'static str { - "fri_proof_compressor_stats_reporter" + fn id(&self) -> TaskId { + "fri_proof_compressor_stats_reporter".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -323,8 +323,8 @@ struct FriProofCompressorJobRetryManagerTask { #[async_trait::async_trait] impl Task for FriProofCompressorJobRetryManagerTask { - fn name(&self) -> &'static str { - "fri_proof_compressor_job_retry_manager" + fn id(&self) -> TaskId { + "fri_proof_compressor_job_retry_manager".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -341,8 +341,8 @@ struct FriProverJobArchiverTask { #[async_trait::async_trait] impl Task for FriProverJobArchiverTask { - fn name(&self) -> &'static str { - "fri_prover_job_archiver" + fn id(&self) -> TaskId { + "fri_prover_job_archiver".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -356,8 +356,8 @@ struct FriProverGpuArchiverTask { #[async_trait::async_trait] impl Task for FriProverGpuArchiverTask { - fn name(&self) -> &'static str { - "fri_prover_gpu_archiver" + fn id(&self) -> TaskId { + "fri_prover_gpu_archiver".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/l1_gas.rs b/core/node/node_framework/src/implementations/layers/l1_gas.rs index d9e554aad04..8deafd4e294 100644 --- a/core/node/node_framework/src/implementations/layers/l1_gas.rs +++ b/core/node/node_framework/src/implementations/layers/l1_gas.rs @@ -14,7 +14,7 @@ use crate::{ l1_tx_params::L1TxParamsResource, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -80,8 +80,8 @@ struct GasAdjusterTask { #[async_trait::async_trait] impl Task for GasAdjusterTask { - fn name(&self) -> &'static str { - "gas_adjuster" + fn id(&self) -> TaskId { + "gas_adjuster".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index 4b1e1d00cb5..935bb283fe8 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -18,7 +18,7 @@ use crate::{ web3_api::TreeApiClientResource, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -118,8 +118,8 @@ pub struct MetadataCalculatorTask { #[async_trait::async_trait] impl Task for MetadataCalculatorTask { - fn name(&self) -> &'static str { - "metadata_calculator" + fn id(&self) -> TaskId { + "metadata_calculator".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -141,8 +141,8 @@ pub struct TreeApiTask { #[async_trait::async_trait] impl Task for TreeApiTask { - fn name(&self) -> &'static str { - "tree_api" + fn id(&self) -> TaskId { + "tree_api".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs b/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs index 95477291e43..6c7d4f915df 100644 --- a/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs +++ b/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs @@ -4,7 +4,7 @@ use zksync_health_check::{HealthStatus, HealthUpdater, ReactiveHealthCheck}; use crate::{ implementations::resources::healthcheck::AppHealthCheckResource, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -50,8 +50,8 @@ impl WiringLayer for PrometheusExporterLayer { #[async_trait::async_trait] impl Task for PrometheusExporterTask { - fn name(&self) -> &'static str { - "prometheus_exporter" + fn id(&self) -> TaskId { + "prometheus_exporter".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/proof_data_handler.rs b/core/node/node_framework/src/implementations/layers/proof_data_handler.rs index f9960036cec..7952ca6a585 100644 --- a/core/node/node_framework/src/implementations/layers/proof_data_handler.rs +++ b/core/node/node_framework/src/implementations/layers/proof_data_handler.rs @@ -11,7 +11,7 @@ use crate::{ pools::{MasterPool, PoolResource}, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -73,8 +73,8 @@ struct ProofDataHandlerTask { #[async_trait::async_trait] impl Task for ProofDataHandlerTask { - fn name(&self) -> &'static str { - "proof_data_handler" + fn id(&self) -> TaskId { + "proof_data_handler".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/sigint.rs b/core/node/node_framework/src/implementations/layers/sigint.rs index a028be97995..2d11f152537 100644 --- a/core/node/node_framework/src/implementations/layers/sigint.rs +++ b/core/node/node_framework/src/implementations/layers/sigint.rs @@ -2,7 +2,7 @@ use tokio::sync::oneshot; use crate::{ service::{ServiceContext, StopReceiver}, - task::UnconstrainedTask, + task::{TaskId, UnconstrainedTask}, wiring_layer::{WiringError, WiringLayer}, }; @@ -29,8 +29,8 @@ struct SigintHandlerTask; #[async_trait::async_trait] impl UnconstrainedTask for SigintHandlerTask { - fn name(&self) -> &'static str { - "sigint_handler" + fn id(&self) -> TaskId { + "sigint_handler".into() } async fn run_unconstrained( diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs index 91be11ea8a8..65e86bef520 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs @@ -22,7 +22,7 @@ use crate::{ }, resource::Unique, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -142,8 +142,8 @@ struct L2BlockSealerTask(zksync_state_keeper::L2BlockSealerTask); #[async_trait::async_trait] impl Task for L2BlockSealerTask { - fn name(&self) -> &'static str { - "state_keeper/l2_block_sealer" + fn id(&self) -> TaskId { + "state_keeper/l2_block_sealer".into() } async fn run(self: Box, _stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -157,8 +157,8 @@ struct MempoolFetcherTask(MempoolFetcher); #[async_trait::async_trait] impl Task for MempoolFetcherTask { - fn name(&self) -> &'static str { - "state_keeper/mempool_fetcher" + fn id(&self) -> TaskId { + "state_keeper/mempool_fetcher".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs index 8d56bdd671a..edbe1d6e12f 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs @@ -21,7 +21,7 @@ use crate::{ }, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -105,8 +105,8 @@ struct StateKeeperTask { #[async_trait::async_trait] impl Task for StateKeeperTask { - fn name(&self) -> &'static str { - "state_keeper" + fn id(&self) -> TaskId { + "state_keeper".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -134,8 +134,8 @@ struct RocksdbCatchupTask(AsyncCatchupTask); #[async_trait::async_trait] impl Task for RocksdbCatchupTask { - fn name(&self) -> &'static str { - "state_keeper/rocksdb_catchup_task" + fn id(&self) -> TaskId { + "state_keeper/rocksdb_catchup_task".into() } async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/tee_verifier_input_producer.rs b/core/node/node_framework/src/implementations/layers/tee_verifier_input_producer.rs index a595e2eeb20..76ae0b26971 100644 --- a/core/node/node_framework/src/implementations/layers/tee_verifier_input_producer.rs +++ b/core/node/node_framework/src/implementations/layers/tee_verifier_input_producer.rs @@ -8,7 +8,7 @@ use crate::{ pools::{MasterPool, PoolResource}, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -52,8 +52,8 @@ pub struct TeeVerifierInputProducerTask { #[async_trait::async_trait] impl Task for TeeVerifierInputProducerTask { - fn name(&self) -> &'static str { - "tee_verifier_input_producer" + fn id(&self) -> TaskId { + "tee_verifier_input_producer".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/web3_api/caches.rs b/core/node/node_framework/src/implementations/layers/web3_api/caches.rs index 7c6d160c333..c01a62748fa 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/caches.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/caches.rs @@ -8,7 +8,7 @@ use crate::{ web3_api::MempoolCacheResource, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -49,8 +49,8 @@ pub struct MempoolCacheUpdateTask(mempool_cache::MempoolCacheUpdateTask); #[async_trait::async_trait] impl Task for MempoolCacheUpdateTask { - fn name(&self) -> &'static str { - "mempool_cache_update_task" + fn id(&self) -> TaskId { + "mempool_cache_update_task".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/web3_api/server.rs b/core/node/node_framework/src/implementations/layers/web3_api/server.rs index 08eaa4b8044..c81b475c3ec 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/server.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/server.rs @@ -14,7 +14,7 @@ use crate::{ web3_api::{MempoolCacheResource, TreeApiClientResource, TxSenderResource}, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -206,10 +206,10 @@ type ApiJoinHandle = JoinHandle>; #[async_trait::async_trait] impl Task for Web3ApiTask { - fn name(&self) -> &'static str { + fn id(&self) -> TaskId { match self.transport { - Transport::Http => "web3_http_server", - Transport::Ws => "web3_ws_server", + Transport::Http => "web3_http_server".into(), + Transport::Ws => "web3_ws_server".into(), } } @@ -232,8 +232,8 @@ struct ApiTaskGarbageCollector { #[async_trait::async_trait] impl Task for ApiTaskGarbageCollector { - fn name(&self) -> &'static str { - "api_task_garbage_collector" + fn id(&self) -> TaskId { + "api_task_garbage_collector".into() } async fn run(self: Box, _stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs b/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs index eea9148f6a6..c7a568e5cb4 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs @@ -14,7 +14,7 @@ use crate::{ web3_api::{TxSenderResource, TxSinkResource}, }, service::{ServiceContext, StopReceiver}, - task::Task, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -123,8 +123,8 @@ impl fmt::Debug for PostgresStorageCachesTask { #[async_trait::async_trait] impl Task for PostgresStorageCachesTask { - fn name(&self) -> &'static str { - "postgres_storage_caches" + fn id(&self) -> TaskId { + "postgres_storage_caches".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { @@ -138,8 +138,8 @@ struct VmConcurrencyBarrierTask { #[async_trait::async_trait] impl Task for VmConcurrencyBarrierTask { - fn name(&self) -> &'static str { - "vm_concurrency_barrier_task" + fn id(&self) -> TaskId { + "vm_concurrency_barrier_task".into() } async fn run(mut self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/precondition.rs b/core/node/node_framework/src/precondition.rs index 0e47da6a631..a612c5b90a8 100644 --- a/core/node/node_framework/src/precondition.rs +++ b/core/node/node_framework/src/precondition.rs @@ -2,12 +2,12 @@ use std::sync::Arc; use tokio::sync::Barrier; -use crate::service::StopReceiver; +use crate::{service::StopReceiver, task::TaskId}; #[async_trait::async_trait] pub trait Precondition: 'static + Send + Sync { /// Unique name of the precondition. - fn name(&self) -> &'static str; + fn id(&self) -> TaskId; async fn check(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; } diff --git a/core/node/node_framework/src/service/context.rs b/core/node/node_framework/src/service/context.rs index 4ec76ca1d2a..81d094630c3 100644 --- a/core/node/node_framework/src/service/context.rs +++ b/core/node/node_framework/src/service/context.rs @@ -39,7 +39,7 @@ impl<'a> ServiceContext<'a> { /// Added tasks will be launched after the wiring process will be finished and all the preconditions /// are met. pub fn add_task(&mut self, task: Box) -> &mut Self { - tracing::info!("Layer {} has added a new task: {}", self.layer, task.name()); + tracing::info!("Layer {} has added a new task: {}", self.layer, task.id()); self.service.runnables.tasks.push(task); self } @@ -50,7 +50,7 @@ impl<'a> ServiceContext<'a> { tracing::info!( "Layer {} has added a new unconstrained task: {}", self.layer, - task.name() + task.id() ); self.service.runnables.unconstrained_tasks.push(task); self @@ -61,7 +61,7 @@ impl<'a> ServiceContext<'a> { tracing::info!( "Layer {} has added a new precondition: {}", self.layer, - precondition.name() + precondition.id() ); self.service.runnables.preconditions.push(precondition); self @@ -72,7 +72,7 @@ impl<'a> ServiceContext<'a> { tracing::info!( "Layer {} has added a new oneshot task: {}", self.layer, - task.name() + task.id() ); self.service.runnables.oneshot_tasks.push(task); self @@ -86,7 +86,7 @@ impl<'a> ServiceContext<'a> { tracing::info!( "Layer {} has added a new unconstrained oneshot task: {}", self.layer, - task.name() + task.id() ); self.service .runnables diff --git a/core/node/node_framework/src/service/runnables.rs b/core/node/node_framework/src/service/runnables.rs index 7b3e3f7f43b..7f35e384d6c 100644 --- a/core/node/node_framework/src/service/runnables.rs +++ b/core/node/node_framework/src/service/runnables.rs @@ -27,22 +27,22 @@ pub(super) struct Runnables { impl fmt::Debug for Runnables { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // Macro that iterates over a `Vec`, invokes `.name()` method and collects the results into a `Vec`. + // Macro that iterates over a `Vec`, invokes `.id()` method and collects the results into a `Vec`. // Returns a reference to created `Vec` to satisfy the `.field` method signature. - macro_rules! names { + macro_rules! ids { ($vec:expr) => { - &$vec.iter().map(|x| x.name()).collect::>() + &$vec.iter().map(|x| x.id()).collect::>() }; } f.debug_struct("Runnables") - .field("preconditions", names!(self.preconditions)) - .field("tasks", names!(self.tasks)) - .field("oneshot_tasks", names!(self.oneshot_tasks)) - .field("unconstrained_tasks", names!(self.unconstrained_tasks)) + .field("preconditions", ids!(self.preconditions)) + .field("tasks", ids!(self.tasks)) + .field("oneshot_tasks", ids!(self.oneshot_tasks)) + .field("unconstrained_tasks", ids!(self.unconstrained_tasks)) .field( "unconstrained_oneshot_tasks", - names!(self.unconstrained_oneshot_tasks), + ids!(self.unconstrained_oneshot_tasks), ) .finish() } @@ -127,7 +127,7 @@ impl Runnables { stop_receiver: StopReceiver, ) { for task in std::mem::take(&mut self.unconstrained_tasks) { - let name = task.name(); + let name = task.id(); let stop_receiver = stop_receiver.clone(); let task_future = Box::pin(async move { task.run_unconstrained(stop_receiver) @@ -145,7 +145,7 @@ impl Runnables { stop_receiver: StopReceiver, ) { for task in std::mem::take(&mut self.tasks) { - let name = task.name(); + let name = task.id(); let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); let task_future = Box::pin(async move { @@ -164,7 +164,7 @@ impl Runnables { stop_receiver: StopReceiver, ) { for precondition in std::mem::take(&mut self.preconditions) { - let name = precondition.name(); + let name = precondition.id(); let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); let task_future = Box::pin(async move { @@ -184,7 +184,7 @@ impl Runnables { stop_receiver: StopReceiver, ) { for oneshot_task in std::mem::take(&mut self.oneshot_tasks) { - let name = oneshot_task.name(); + let name = oneshot_task.id(); let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); let task_future = Box::pin(async move { @@ -203,7 +203,7 @@ impl Runnables { stop_receiver: StopReceiver, ) { for unconstrained_oneshot_task in std::mem::take(&mut self.unconstrained_oneshot_tasks) { - let name = unconstrained_oneshot_task.name(); + let name = unconstrained_oneshot_task.id(); let stop_receiver = stop_receiver.clone(); let task_future = Box::pin(async move { unconstrained_oneshot_task diff --git a/core/node/node_framework/src/service/tests.rs b/core/node/node_framework/src/service/tests.rs index 81a7eaabdc6..b5bcc3aaa25 100644 --- a/core/node/node_framework/src/service/tests.rs +++ b/core/node/node_framework/src/service/tests.rs @@ -9,7 +9,7 @@ use crate::{ ServiceContext, StopReceiver, WiringError, WiringLayer, ZkStackServiceBuilder, ZkStackServiceError, }, - task::Task, + task::{Task, TaskId}, }; // `ZkStack` Service's `new()` method has to have a check for nested runtime. @@ -127,8 +127,8 @@ struct ErrorTask; #[async_trait::async_trait] impl Task for ErrorTask { - fn name(&self) -> &'static str { - "error_task" + fn id(&self) -> TaskId { + "error_task".into() } async fn run(self: Box, _stop_receiver: StopReceiver) -> anyhow::Result<()> { anyhow::bail!("error task") @@ -178,8 +178,8 @@ struct SuccessfulTask(Arc, Arc>); #[async_trait::async_trait] impl Task for SuccessfulTask { - fn name(&self) -> &'static str { - "successful_task" + fn id(&self) -> TaskId { + "successful_task".into() } async fn run(self: Box, _stop_receiver: StopReceiver) -> anyhow::Result<()> { self.0.wait().await; @@ -196,8 +196,8 @@ struct RemainingTask(Arc, Arc>); #[async_trait::async_trait] impl Task for RemainingTask { - fn name(&self) -> &'static str { - "remaining_task" + fn id(&self) -> TaskId { + "remaining_task".into() } async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { diff --git a/core/node/node_framework/src/task.rs b/core/node/node_framework/src/task.rs index f5ba08de193..a72d640731e 100644 --- a/core/node/node_framework/src/task.rs +++ b/core/node/node_framework/src/task.rs @@ -28,12 +28,46 @@ //! - A task that must be started as soon as possible, e.g. healthcheck server. //! - A task that may be a driving force for some precondition to be met. -use std::sync::Arc; +use std::{ + fmt::{Display, Formatter}, + ops::Deref, + sync::Arc, +}; use tokio::sync::Barrier; use crate::service::StopReceiver; +/// A unique human-readable identifier of a task. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct TaskId(String); + +impl TaskId { + pub fn new(value: String) -> Self { + TaskId(value) + } +} + +impl Display for TaskId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +impl From<&str> for TaskId { + fn from(value: &str) -> Self { + TaskId(value.to_owned()) + } +} + +impl Deref for TaskId { + type Target = str; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + /// A task implementation. /// /// Note: any `Task` added to the service will only start after all the [preconditions](crate::precondition::Precondition) @@ -41,7 +75,7 @@ use crate::service::StopReceiver; #[async_trait::async_trait] pub trait Task: 'static + Send { /// Unique name of the task. - fn name(&self) -> &'static str; + fn id(&self) -> TaskId; /// Runs the task. /// @@ -85,7 +119,7 @@ impl dyn Task { #[async_trait::async_trait] pub trait OneshotTask: 'static + Send { /// Unique name of the task. - fn name(&self) -> &'static str; + fn id(&self) -> TaskId; /// Runs the task. /// @@ -130,7 +164,7 @@ impl dyn OneshotTask { #[async_trait::async_trait] pub trait UnconstrainedTask: 'static + Send { /// Unique name of the task. - fn name(&self) -> &'static str; + fn id(&self) -> TaskId; /// Runs the task without waiting for any precondition to be met. async fn run_unconstrained(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; @@ -141,7 +175,7 @@ pub trait UnconstrainedTask: 'static + Send { #[async_trait::async_trait] pub trait UnconstrainedOneshotTask: 'static + Send { /// Unique name of the task. - fn name(&self) -> &'static str; + fn id(&self) -> TaskId; /// Runs the task without waiting for any precondition to be met. async fn run_unconstrained_oneshot(