Skip to content

Commit

Permalink
refactor(node-framework): use owned type to identify Tasks (matter-…
Browse files Browse the repository at this point in the history
…labs#2124)

## What ❔

This PR refactors `Task`-related traits to use an owned type `TaskId`
instead of `&'static str`.

## Why ❔

There are use cases where a task's id needs to be generated dynamically
(e.g. broad trait implementations) which this PR accommodates for.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
itegulov authored Jun 3, 2024
1 parent e71f6f9 commit c8914d8
Show file tree
Hide file tree
Showing 26 changed files with 168 additions and 134 deletions.
14 changes: 7 additions & 7 deletions core/node/node_framework/examples/showcase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use zksync_node_framework::{
resource::Resource,
service::{ServiceContext, StopReceiver, ZkStackServiceBuilder},
task::Task,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -96,14 +96,14 @@ impl PutTask {

#[async_trait::async_trait]
impl Task for PutTask {
fn name(&self) -> &'static str {
fn id(&self) -> TaskId {
// Task names simply have to be unique. They are used for logging and debugging.
"put_task"
"put_task".into()
}

/// This method will be invoked by the framework when the task is started.
async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
tracing::info!("Starting the task {}", self.name());
tracing::info!("Starting the task {}", self.id());

// We have to respect the stop receiver and should exit as soon as we receive
// a stop signal.
Expand Down Expand Up @@ -138,12 +138,12 @@ impl CheckTask {

#[async_trait::async_trait]
impl Task for CheckTask {
fn name(&self) -> &'static str {
"check_task"
fn id(&self) -> TaskId {
"check_task".into()
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
tracing::info!("Starting the task {}", self.name());
tracing::info!("Starting the task {}", self.id());

tokio::select! {
_ = self.run_inner() => {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use zksync_config::configs::chain::CircuitBreakerConfig;
use crate::{
implementations::resources::circuit_breakers::CircuitBreakersResource,
service::{ServiceContext, StopReceiver},
task::UnconstrainedTask,
task::{TaskId, UnconstrainedTask},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -43,8 +43,8 @@ struct CircuitBreakerCheckerTask {

#[async_trait::async_trait]
impl UnconstrainedTask for CircuitBreakerCheckerTask {
fn name(&self) -> &'static str {
"circuit_breaker_checker"
fn id(&self) -> TaskId {
"circuit_breaker_checker".into()
}

async fn run_unconstrained(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
pools::{MasterPool, PoolResource},
},
service::{ServiceContext, StopReceiver},
task::Task,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -55,8 +55,8 @@ struct CommitmentGeneratorTask {

#[async_trait::async_trait]
impl Task for CommitmentGeneratorTask {
fn name(&self) -> &'static str {
"commitment_generator"
fn id(&self) -> TaskId {
"commitment_generator".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand Down
10 changes: 5 additions & 5 deletions core/node/node_framework/src/implementations/layers/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
sync_state::SyncStateResource,
},
service::{ServiceContext, StopReceiver},
task::Task,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -110,8 +110,8 @@ pub struct MainNodeConsensusTask {

#[async_trait::async_trait]
impl Task for MainNodeConsensusTask {
fn name(&self) -> &'static str {
"consensus"
fn id(&self) -> TaskId {
"consensus".into()
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand Down Expand Up @@ -147,8 +147,8 @@ pub struct FetcherTask {

#[async_trait::async_trait]
impl Task for FetcherTask {
fn name(&self) -> &'static str {
"consensus_fetcher"
fn id(&self) -> TaskId {
"consensus_fetcher".into()
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
pools::{MasterPool, PoolResource},
},
service::{ServiceContext, StopReceiver},
task::Task,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -75,8 +75,8 @@ pub struct ConsistencyCheckerTask {

#[async_trait::async_trait]
impl Task for ConsistencyCheckerTask {
fn name(&self) -> &'static str {
"consistency_checker"
fn id(&self) -> TaskId {
"consistency_checker".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use zksync_dal::{ConnectionPool, Core};
use crate::{
implementations::resources::pools::{MasterPool, PoolResource, ReplicaPool},
service::{ServiceContext, StopReceiver},
task::Task,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -46,8 +46,8 @@ pub struct ContractVerificationApiTask {

#[async_trait::async_trait]
impl Task for ContractVerificationApiTask {
fn name(&self) -> &'static str {
"contract_verification_api"
fn id(&self) -> TaskId {
"contract_verification_api".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
pools::{MasterPool, PoolResource, ReplicaPool},
},
service::{ServiceContext, StopReceiver},
task::Task,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -173,8 +173,8 @@ struct EthTxAggregatorTask {

#[async_trait::async_trait]
impl Task for EthTxAggregatorTask {
fn name(&self) -> &'static str {
"eth_tx_aggregator"
fn id(&self) -> TaskId {
"eth_tx_aggregator".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -189,8 +189,8 @@ struct EthTxManagerTask {

#[async_trait::async_trait]
impl Task for EthTxManagerTask {
fn name(&self) -> &'static str {
"eth_tx_manager"
fn id(&self) -> TaskId {
"eth_tx_manager".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
pools::{MasterPool, PoolResource},
},
service::{ServiceContext, StopReceiver},
task::Task,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -75,8 +75,8 @@ struct EthWatchTask {

#[async_trait::async_trait]
impl Task for EthWatchTask {
fn name(&self) -> &'static str {
"eth_watch"
fn id(&self) -> TaskId {
"eth_watch".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use zksync_node_api_server::healthcheck::HealthCheckHandle;
use crate::{
implementations::resources::healthcheck::AppHealthCheckResource,
service::{ServiceContext, StopReceiver},
task::UnconstrainedTask,
task::{TaskId, UnconstrainedTask},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -53,8 +53,8 @@ struct HealthCheckTask {

#[async_trait::async_trait]
impl UnconstrainedTask for HealthCheckTask {
fn name(&self) -> &'static str {
"healthcheck_server"
fn id(&self) -> TaskId {
"healthcheck_server".into()
}

async fn run_unconstrained(
Expand Down
46 changes: 23 additions & 23 deletions core/node/node_framework/src/implementations/layers/house_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use zksync_house_keeper::{
use crate::{
implementations::resources::pools::{PoolResource, ProverPool, ReplicaPool},
service::{ServiceContext, StopReceiver},
task::Task,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -179,8 +179,8 @@ struct PostgresMetricsScrapingTask {

#[async_trait::async_trait]
impl Task for PostgresMetricsScrapingTask {
fn name(&self) -> &'static str {
"postgres_metrics_scraping"
fn id(&self) -> TaskId {
"postgres_metrics_scraping".into()
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -203,8 +203,8 @@ struct L1BatchMetricsReporterTask {

#[async_trait::async_trait]
impl Task for L1BatchMetricsReporterTask {
fn name(&self) -> &'static str {
"l1_batch_metrics_reporter"
fn id(&self) -> TaskId {
"l1_batch_metrics_reporter".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -219,8 +219,8 @@ struct FriProverJobRetryManagerTask {

#[async_trait::async_trait]
impl Task for FriProverJobRetryManagerTask {
fn name(&self) -> &'static str {
"fri_prover_job_retry_manager"
fn id(&self) -> TaskId {
"fri_prover_job_retry_manager".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -235,8 +235,8 @@ struct FriWitnessGeneratorJobRetryManagerTask {

#[async_trait::async_trait]
impl Task for FriWitnessGeneratorJobRetryManagerTask {
fn name(&self) -> &'static str {
"fri_witness_generator_job_retry_manager"
fn id(&self) -> TaskId {
"fri_witness_generator_job_retry_manager".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -253,8 +253,8 @@ struct WaitingToQueuedFriWitnessJobMoverTask {

#[async_trait::async_trait]
impl Task for WaitingToQueuedFriWitnessJobMoverTask {
fn name(&self) -> &'static str {
"waiting_to_queued_fri_witness_job_mover"
fn id(&self) -> TaskId {
"waiting_to_queued_fri_witness_job_mover".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -271,8 +271,8 @@ struct FriWitnessGeneratorStatsReporterTask {

#[async_trait::async_trait]
impl Task for FriWitnessGeneratorStatsReporterTask {
fn name(&self) -> &'static str {
"fri_witness_generator_stats_reporter"
fn id(&self) -> TaskId {
"fri_witness_generator_stats_reporter".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -289,8 +289,8 @@ struct FriProverStatsReporterTask {

#[async_trait::async_trait]
impl Task for FriProverStatsReporterTask {
fn name(&self) -> &'static str {
"fri_prover_stats_reporter"
fn id(&self) -> TaskId {
"fri_prover_stats_reporter".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -305,8 +305,8 @@ struct FriProofCompressorStatsReporterTask {

#[async_trait::async_trait]
impl Task for FriProofCompressorStatsReporterTask {
fn name(&self) -> &'static str {
"fri_proof_compressor_stats_reporter"
fn id(&self) -> TaskId {
"fri_proof_compressor_stats_reporter".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -323,8 +323,8 @@ struct FriProofCompressorJobRetryManagerTask {

#[async_trait::async_trait]
impl Task for FriProofCompressorJobRetryManagerTask {
fn name(&self) -> &'static str {
"fri_proof_compressor_job_retry_manager"
fn id(&self) -> TaskId {
"fri_proof_compressor_job_retry_manager".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -341,8 +341,8 @@ struct FriProverJobArchiverTask {

#[async_trait::async_trait]
impl Task for FriProverJobArchiverTask {
fn name(&self) -> &'static str {
"fri_prover_job_archiver"
fn id(&self) -> TaskId {
"fri_prover_job_archiver".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand All @@ -356,8 +356,8 @@ struct FriProverGpuArchiverTask {

#[async_trait::async_trait]
impl Task for FriProverGpuArchiverTask {
fn name(&self) -> &'static str {
"fri_prover_gpu_archiver"
fn id(&self) -> TaskId {
"fri_prover_gpu_archiver".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions core/node/node_framework/src/implementations/layers/l1_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
l1_tx_params::L1TxParamsResource,
},
service::{ServiceContext, StopReceiver},
task::Task,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -80,8 +80,8 @@ struct GasAdjusterTask {

#[async_trait::async_trait]
impl Task for GasAdjusterTask {
fn name(&self) -> &'static str {
"gas_adjuster"
fn id(&self) -> TaskId {
"gas_adjuster".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
Expand Down
Loading

0 comments on commit c8914d8

Please sign in to comment.