diff --git a/Cargo.lock b/Cargo.lock index 6d692e1..7fcb142 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -593,6 +593,14 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-backing-primitives" +version = "0.9.0" +dependencies = [ + "sp-api", + "sp-consensus-slots", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -1948,6 +1956,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "cumulus-client-consensus-proposer" +version = "0.1.0" +source = "git+https://github.com/paritytech/polkadot-sdk?branch=release-polkadot-v1.3.0#401f8a3e9448db854f5605b679fa085b8f445039" +dependencies = [ + "anyhow", + "async-trait", + "cumulus-primitives-parachain-inherent", + "sp-consensus", + "sp-inherents", + "sp-runtime", + "sp-state-machine", + "thiserror", +] + [[package]] name = "cumulus-client-network" version = "0.1.0" @@ -5720,6 +5743,7 @@ dependencies = [ "cumulus-client-cli", "cumulus-client-collator", "cumulus-client-consensus-common", + "cumulus-client-consensus-proposer", "cumulus-client-network", "cumulus-client-service", "cumulus-primitives-core", @@ -5785,6 +5809,7 @@ dependencies = [ name = "moonkit-template-runtime" version = "0.9.0" dependencies = [ + "async-backing-primitives", "cumulus-pallet-dmp-queue", "cumulus-pallet-parachain-system", "cumulus-pallet-xcm", @@ -5801,6 +5826,7 @@ dependencies = [ "hex-literal 0.3.4", "log", "nimbus-primitives", + "pallet-async-backing", "pallet-author-inherent", "pallet-author-slot-filter", "pallet-balances", @@ -6031,15 +6057,22 @@ dependencies = [ name = "nimbus-consensus" version = "0.9.0" dependencies = [ + "async-backing-primitives", "async-trait", + "cumulus-client-collator", "cumulus-client-consensus-common", + "cumulus-client-consensus-proposer", "cumulus-primitives-core", "cumulus-primitives-parachain-inherent", + "cumulus-relay-chain-interface", "futures 0.3.28", "log", "nimbus-primitives", "parity-scale-codec", "parking_lot 0.12.1", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-primitives", "sc-client-api", "sc-consensus", "sc-consensus-manual-seal", @@ -6048,6 +6081,7 @@ dependencies = [ "sp-block-builder", "sp-blockchain", "sp-consensus", + "sp-consensus-slots", "sp-core", "sp-inherents", "sp-keystore", @@ -6345,6 +6379,26 @@ dependencies = [ "sp-std", ] +[[package]] +name = "pallet-async-backing" +version = "0.9.0" +dependencies = [ + "cumulus-pallet-parachain-system", + "cumulus-primitives-core", + "frame-support", + "frame-system", + "log", + "nimbus-primitives", + "pallet-timestamp", + "parity-scale-codec", + "scale-info", + "sp-consensus-slots", + "sp-core", + "sp-io", + "sp-runtime", + "sp-std", +] + [[package]] name = "pallet-aura-style-filter" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index b6adc92..520efeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,14 +1,8 @@ [workspace] members = [ "client/consensus/nimbus-consensus", - "pallets/aura-style-filter", - "pallets/author-inherent", - "pallets/author-mapping", - "pallets/author-slot-filter", - "pallets/migrations", - "pallets/maintenance-mode", - "pallets/randomness", - "primitives/nimbus-primitives", + "pallets/*", + "primitives/*", "template/node", "template/runtime", "template/pallets/template", @@ -87,6 +81,7 @@ sp-api = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release sp-application-crypto = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } sp-block-builder = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } sp-consensus-babe = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } +sp-consensus-slots = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } sp-core = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } sp-debug-derive = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } sp-externalities = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } @@ -159,6 +154,7 @@ parachain-info = { git = "https://github.com/paritytech/polkadot-sdk", branch = cumulus-client-cli = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-client-collator = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-client-consensus-common = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } +cumulus-client-consensus-proposer = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-client-consensus-relay-chain = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-client-network = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-client-service = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } @@ -171,6 +167,7 @@ cumulus-relay-chain-rpc-interface = { git = "https://github.com/paritytech/polka pallet-xcm = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } pallet-xcm-benchmarks = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } polkadot-core-primitives = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } +polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } polkadot-parachain-primitives = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } polkadot-runtime = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } polkadot-runtime-common = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0", default-features = false } @@ -182,6 +179,7 @@ staging-xcm-executor = { git = "https://github.com/paritytech/polkadot-sdk", bra # Polkadot (client) kusama-runtime = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } polkadot-cli = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } +polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } polkadot-primitives = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } polkadot-service = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } rococo-runtime = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } @@ -189,9 +187,11 @@ westend-runtime = { git = "https://github.com/paritytech/polkadot-sdk", branch = xcm-simulator = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } # Local (wasm) +async-backing-primitives = { path = "primitives/async-backing", default-features = false } pallet-author-inherent = { path = "pallets/author-inherent", default-features = false } pallet-author-mapping = { path = "pallets/author-mapping", default-features = false } pallet-author-slot-filter = { path = "pallets/author-slot-filter", default-features = false } +pallet-async-backing = { path = "pallets/async-backing", default-features = false } pallet-maintenance_mode = { path = "pallets/maintenance_mode", default-features = false } pallet-migrations = { path = "pallets/migrations", default-features = false } nimbus-primitives = { path = "primitives/nimbus-primitives", default-features = false } diff --git a/client/consensus/nimbus-consensus/Cargo.toml b/client/consensus/nimbus-consensus/Cargo.toml index 0b346a4..0be2f00 100644 --- a/client/consensus/nimbus-consensus/Cargo.toml +++ b/client/consensus/nimbus-consensus/Cargo.toml @@ -8,6 +8,7 @@ version = "0.9.0" sc-client-api = { workspace = true } sc-consensus = { workspace = true } sc-consensus-manual-seal = { workspace = true } +sp-consensus-slots = { workspace = true } sp-api = { workspace = true } sp-application-crypto = { workspace = true } sp-block-builder = { workspace = true } @@ -20,12 +21,21 @@ sp-runtime = { workspace = true } substrate-prometheus-endpoint = { workspace = true } # Cumulus dependencies +cumulus-client-collator = { workspace = true } cumulus-client-consensus-common = { workspace = true } +cumulus-client-consensus-proposer = { workspace = true } cumulus-primitives-core = { workspace = true } cumulus-primitives-parachain-inherent = { workspace = true } +cumulus-relay-chain-interface = { workspace = true } + +# Polkadot dependencies +polkadot-node-primitives = { workspace = true } +polkadot-node-subsystem = { workspace = true } +polkadot-primitives = { workspace = true } # Nimbus Dependencies -nimbus-primitives = { workspace = true } +async-backing-primitives = { workspace = true, features = ["std"] } +nimbus-primitives = { workspace = true, features = ["std"] } # Other deps async-trait = { workspace = true } diff --git a/client/consensus/nimbus-consensus/src/collators.rs b/client/consensus/nimbus-consensus/src/collators.rs new file mode 100644 index 0000000..608b5f8 --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collators.rs @@ -0,0 +1,196 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +//! Stock, pure Nimbus collators. +//! +//! This includes the [`basic`] collator, which only builds on top of the most recently +//! included parachain block, as well as the [`lookahead`] collator, which prospectively +//! builds on parachain blocks which have not yet been included in the relay chain. + +pub mod basic; +pub mod lookahead; + +use crate::*; +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::{ParachainBlockImportMarker, ParachainCandidate}; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_primitives_core::ParachainBlockData; +use cumulus_primitives_parachain_inherent::ParachainInherentData; +use futures::prelude::*; +use log::{debug, info}; +use nimbus_primitives::{CompatibleDigestItem, DigestsProvider, NimbusId, NIMBUS_KEY_ID}; +use polkadot_node_primitives::{Collation, MaybeCompressedPoV}; +use sc_consensus::{BlockImport, BlockImportParams}; +use sp_application_crypto::ByteArray; +use sp_consensus::{BlockOrigin, Proposal}; +use sp_core::{crypto::CryptoTypeId, sr25519, Encode}; +use sp_inherents::InherentData; +use sp_keystore::Keystore; +use sp_runtime::{ + traits::{Block as BlockT, Header as HeaderT}, + DigestItem, +}; +use std::convert::TryInto; +use std::error::Error; +use std::time::Duration; + +/// Propose, seal, and import a block, packaging it into a collation. +/// +/// Provide the slot to build at as well as any other necessary pre-digest logs, +/// the inherent data, and the proposal duration and PoV size limits. +/// +/// The Aura pre-digest should not be explicitly provided and is set internally. +/// +/// This does not announce the collation to the parachain network or the relay chain. +pub(crate) async fn collate( + additional_digests_provider: &ADP, + author_id: NimbusId, + block_import: &mut BI, + collator_service: &CS, + keystore: &dyn Keystore, + parent_header: &Block::Header, + proposer: &mut Proposer, + inherent_data: (ParachainInherentData, InherentData), + proposal_duration: Duration, + max_pov_size: usize, +) -> Result<(Collation, ParachainBlockData, Block::Hash), Box> +where + ADP: DigestsProvider::Hash> + 'static, + Block: BlockT, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + CS: CollatorServiceInterface, + Proposer: ProposerInterface + Send + Sync + 'static, +{ + let mut logs = vec![CompatibleDigestItem::nimbus_pre_digest(author_id.clone())]; + logs.extend( + additional_digests_provider.provide_digests(author_id.clone(), parent_header.hash()), + ); + + let Proposal { + block, + storage_changes, + proof, + } = proposer + .propose( + &parent_header, + &inherent_data.0, + inherent_data.1, + sp_runtime::generic::Digest { logs }, + proposal_duration, + Some(max_pov_size), + ) + .await + .map_err(|e| Box::new(e) as Box)?; + + let (header, extrinsics) = block.clone().deconstruct(); + + let sig_digest = seal_header::( + &header, + keystore, + &author_id.to_raw_vec(), + &sr25519::CRYPTO_ID, + ); + + let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header.clone()); + block_import_params.post_digests.push(sig_digest.clone()); + block_import_params.body = Some(extrinsics.clone()); + block_import_params.state_action = sc_consensus::StateAction::ApplyChanges( + sc_consensus::StorageChanges::Changes(storage_changes), + ); + + let post_hash = block_import_params.post_hash(); + + // Print the same log line as slots (aura and babe) + info!( + "🔖 Sealed block for proposal at {}. Hash now {:?}, previously {:?}.", + *header.number(), + block_import_params.post_hash(), + header.hash(), + ); + + block_import + .import_block(block_import_params) + .map_err(|e| Box::new(e) as Box) + .await?; + + // Compute info about the block after the digest is added + let mut post_header = header.clone(); + post_header.digest_mut().logs.push(sig_digest.clone()); + let post_block = Block::new(post_header, extrinsics); + + if let Some((collation, block_data)) = collator_service.build_collation( + parent_header, + post_hash, + ParachainCandidate { + block: post_block, + proof: proof, + }, + ) { + tracing::info!( + target: crate::LOG_TARGET, + "PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}", + block_data.header().encode().len() as f64 / 1024f64, + block_data.extrinsics().encode().len() as f64 / 1024f64, + block_data.storage_proof().encode().len() as f64 / 1024f64, + ); + + if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { + tracing::info!( + target: crate::LOG_TARGET, + "Compressed PoV size: {}kb", + pov.block_data.0.len() as f64 / 1024f64, + ); + } + + Ok((collation, block_data, post_hash)) + } else { + Err( + Box::::from("Unable to produce collation") + as Box, + ) + } +} + +pub(crate) fn seal_header( + header: &Block::Header, + keystore: &dyn Keystore, + public_pair: &Vec, + crypto_id: &CryptoTypeId, +) -> DigestItem +where + Block: BlockT, +{ + let pre_hash = header.hash(); + + let raw_sig = Keystore::sign_with( + keystore, + NIMBUS_KEY_ID, + *crypto_id, + public_pair, + pre_hash.as_ref(), + ) + .expect("Keystore should be able to sign") + .expect("We already checked that the key was present"); + + debug!(target: LOG_TARGET, "The signature is \n{:?}", raw_sig); + + let signature = raw_sig + .clone() + .try_into() + .expect("signature bytes produced by keystore should be right length"); + + ::nimbus_seal(signature) +} diff --git a/client/consensus/nimbus-consensus/src/collators/basic.rs b/client/consensus/nimbus-consensus/src/collators/basic.rs new file mode 100644 index 0000000..9e26390 --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collators/basic.rs @@ -0,0 +1,205 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use crate::*; +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::ParachainBlockImportMarker; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_primitives_core::{ + relay_chain::{BlockId as RBlockId, Hash as PHash}, + CollectCollationInfo, ParaId, PersistedValidationData, +}; +use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface}; +use futures::prelude::*; +use nimbus_primitives::{DigestsProvider, NimbusApi, NimbusId}; +use polkadot_node_primitives::CollationResult; +use polkadot_primitives::CollatorPair; +use sc_client_api::{BlockBackend, BlockOf}; +use sp_api::ProvideRuntimeApi; +use sp_blockchain::HeaderBackend; +use sp_core::Decode; +use sp_inherents::CreateInherentDataProviders; +use sp_keystore::KeystorePtr; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use std::{sync::Arc, time::Duration}; + +/// Parameters for [`run`]. +pub struct Params { + /// Additional digest provider + pub additional_digests_provider: ADP, + /// Parachain id + pub para_id: ParaId, + /// A handle to the relay-chain client's "Overseer" or task orchestrator. + pub overseer_handle: OverseerHandle, + /// The underlying block proposer this should call into. + pub proposer: Proposer, + /// The block import handle. + pub block_import: BI, + /// The underlying para client. + pub para_client: Arc, + /// An interface to the relay-chain client. + pub relay_client: RClient, + /// The underlying keystore, which should contain Nimbus consensus keys. + pub keystore: KeystorePtr, + /// The collator key used to sign collations before submitting to validators. + pub collator_key: CollatorPair, + /// Force production of the block even if the collator is not eligible + pub force_authoring: bool, + /// A builder for inherent data builders. + pub create_inherent_data_providers: CIDP, + /// The collator service used for bundling proposals into collations and announcing + /// to the network. + pub collator_service: CS, +} + +/// Run bare Nimbus consensus as a relay-chain-driven collator. +pub fn run( + params: Params, +) -> impl Future + Send + 'static +where + Block: BlockT + Send, + CIDP: CreateInherentDataProviders + 'static, + CIDP::InherentDataProviders: Send, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + Client: ProvideRuntimeApi + + BlockOf + + HeaderBackend + + BlockBackend + + Send + + Sync + + 'static, + Client::Api: NimbusApi + CollectCollationInfo, + RClient: RelayChainInterface + Send + Clone + 'static, + Proposer: ProposerInterface + Send + Sync + 'static, + CS: CollatorServiceInterface + Send + Sync + 'static, + ADP: DigestsProvider::Hash> + Send + Sync + 'static, +{ + async move { + let mut collation_requests = cumulus_client_collator::relay_chain_driven::init( + params.collator_key, + params.para_id, + params.overseer_handle, + ) + .await; + + let Params { + additional_digests_provider, + mut block_import, + collator_service, + create_inherent_data_providers, + keystore, + para_id, + mut proposer, + para_client, + relay_client, + force_authoring, + .. + } = params; + + while let Some(request) = collation_requests.next().await { + macro_rules! reject_with_error { + ($err:expr) => {{ + request.complete(None); + tracing::error!(target: crate::LOG_TARGET, err = ?{ $err }); + continue; + }}; + } + + macro_rules! try_request { + ($x:expr) => {{ + match $x { + Ok(x) => x, + Err(e) => reject_with_error!(e), + } + }}; + } + + let validation_data = request.persisted_validation_data(); + + let parent_header = try_request!(Block::Header::decode( + &mut &validation_data.parent_head.0[..] + )); + + let parent_hash = parent_header.hash(); + + if !collator_service.check_block_status(parent_hash, &parent_header) { + continue; + } + + let relay_parent_header = match relay_client + .header(RBlockId::hash(*request.relay_parent())) + .await + { + Err(e) => reject_with_error!(e), + Ok(None) => continue, // sanity: would be inconsistent to get `None` here + Ok(Some(h)) => h, + }; + + let nimbus_id = match claim_slot::( + &keystore, + ¶_client, + &parent_header, + &relay_parent_header, + force_authoring, + ) + .await + { + Ok(None) => continue, + Ok(Some(nimbus_id)) => nimbus_id, + Err(e) => reject_with_error!(e), + }; + + let inherent_data = try_request!( + create_inherent_data( + &create_inherent_data_providers, + para_id, + parent_header.hash(), + &validation_data, + &relay_client, + *request.relay_parent(), + nimbus_id.clone(), + ) + .await + ); + + let (collation, _, post_hash) = try_request!( + super::collate::( + &additional_digests_provider, + nimbus_id, + &mut block_import, + &collator_service, + &*keystore, + &parent_header, + &mut proposer, + inherent_data, + Duration::from_millis(500), //params.authoring_duration, + // Set the block limit to 50% of the maximum PoV size. + // + // TODO: If we got benchmarking that includes the proof size, + // we should be able to use the maximum pov size. + (validation_data.max_pov_size / 2) as usize, + ) + .await + ); + + let result_sender = Some(collator_service.announce_with_barrier(post_hash)); + request.complete(Some(CollationResult { + collation, + result_sender, + })); + } + } +} diff --git a/client/consensus/nimbus-consensus/src/collators/lookahead.rs b/client/consensus/nimbus-consensus/src/collators/lookahead.rs new file mode 100644 index 0000000..99e264b --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collators/lookahead.rs @@ -0,0 +1,522 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use crate::*; +use async_backing_primitives::UnincludedSegmentApi; +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::{ + self as consensus_common, load_abridged_host_configuration, ParachainBlockImportMarker, + ParentSearchParams, +}; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_primitives_core::{relay_chain::Hash as PHash, CollectCollationInfo, ParaId}; +use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface}; +use futures::{channel::oneshot, prelude::*}; +use nimbus_primitives::{DigestsProvider, NimbusApi, NimbusId}; +use polkadot_node_primitives::SubmitCollationParams; +use polkadot_node_subsystem::messages::{ + CollationGenerationMessage, RuntimeApiMessage, RuntimeApiRequest, +}; +use polkadot_primitives::{CollatorPair, OccupiedCoreAssumption}; +use sc_client_api::{BlockBackend, BlockOf}; +use sp_api::ProvideRuntimeApi; +use sp_blockchain::HeaderBackend; +use sp_consensus::SyncOracle; +use sp_consensus_slots::{Slot, SlotDuration}; +use sp_core::Encode; +use sp_inherents::CreateInherentDataProviders; +use sp_keystore::KeystorePtr; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use std::{sync::Arc, time::Duration}; + +/// Parameters for [`run`]. +pub struct Params { + /// Additional digest provider + pub additional_digests_provider: DP, + /// The amount of time to spend authoring each block. + pub authoring_duration: Duration, + /// Used to actually import blocks. + pub block_import: BI, + /// A validation code hash provider, used to get the current validation code hash. + pub code_hash_provider: CHP, + /// The collator key used to sign collations before submitting to validators. + pub collator_key: CollatorPair, + /// The generic collator service used to plug into this consensus engine. + pub collator_service: CS, + /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. + /// the timestamp, slot, and paras inherents should be omitted, as they are set by this + /// collator. + pub create_inherent_data_providers: CIDP, + /// Force production of the block even if the collator is not eligible + pub force_authoring: bool, + /// The underlying keystore, which should contain Aura consensus keys. + pub keystore: KeystorePtr, + /// A handle to the relay-chain client's "Overseer" or task orchestrator. + pub overseer_handle: OverseerHandle, + /// The para client's backend, used to access the database. + pub para_backend: Arc, + /// The underlying para client. + pub para_client: Arc, + /// The para's ID. + pub para_id: ParaId, + /// The underlying block proposer this should call into. + pub proposer: Proposer, + /// The length of slots in the relay chain. + pub relay_chain_slot_duration: Duration, + /// A handle to the relay-chain client. + pub relay_client: RClient, + /// The length of slots in this parachain. + /// If the parachain doesn't have slot and rely only on relay slots, set it to None. + pub slot_duration: Option, + /// A chain synchronization oracle. + pub sync_oracle: SO, +} + +/// Run async-backing-friendly collator. +pub fn run( + mut params: Params, +) -> impl Future + Send + 'static +where + Block: BlockT, + Client: ProvideRuntimeApi + + BlockOf + + HeaderBackend + + BlockBackend + + Send + + Sync + + 'static, + Client::Api: NimbusApi + CollectCollationInfo + UnincludedSegmentApi, + Backend: sc_client_api::Backend + 'static, + RClient: RelayChainInterface + Clone + 'static, + CIDP: CreateInherentDataProviders + 'static, + CIDP::InherentDataProviders: Send, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + SO: SyncOracle + Send + Sync + Clone + 'static, + Proposer: ProposerInterface + Send + Sync + 'static, + CS: CollatorServiceInterface + Send + Sync + 'static, + CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, + DP: DigestsProvider::Hash> + Send + Sync + 'static, +{ + // This is an arbitrary value which is likely guaranteed to exceed any reasonable + // limit, as it would correspond to 10 non-included blocks. + // + // Since we only search for parent blocks which have already been imported, + // we can guarantee that all imported blocks respect the unincluded segment + // rules specified by the parachain's runtime and thus will never be too deep. + const PARENT_SEARCH_DEPTH: usize = 10; + + async move { + cumulus_client_collator::initialize_collator_subsystems( + &mut params.overseer_handle, + params.collator_key, + params.para_id, + ) + .await; + + let mut import_notifications = match params.relay_client.import_notification_stream().await + { + Ok(s) => s, + Err(err) => { + tracing::error!( + target: crate::LOG_TARGET, + ?err, + "Failed to initialize consensus: no relay chain import notification stream" + ); + + return; + } + }; + + // React to each new relmy block + while let Some(relay_parent_header) = import_notifications.next().await { + let relay_parent = relay_parent_header.hash(); + + // First, verify if the parachain is active (have a core available on the relay) + if !is_para_scheduled(relay_parent, params.para_id, &mut params.overseer_handle).await { + tracing::trace!( + target: crate::LOG_TARGET, + ?relay_parent, + ?params.para_id, + "Para is not scheduled on any core, skipping import notification", + ); + + continue; + } + + // Get the PoV size limit dynamically + let max_pov_size = match params + .relay_client + .persisted_validation_data( + relay_parent, + params.para_id, + OccupiedCoreAssumption::Included, + ) + .await + { + Ok(None) => continue, + Ok(Some(pvd)) => pvd.max_pov_size, + Err(err) => { + tracing::error!( + target: crate::LOG_TARGET, + ?err, + "Failed to gather information from relay-client" + ); + continue; + } + }; + + // Determine which is the current slot + let slot_now = match consensus_common::relay_slot_and_timestamp( + &relay_parent_header, + params.relay_chain_slot_duration, + ) { + None => continue, + Some((relay_slot, relay_timestamp)) => { + let our_slot = if let Some(slot_duration) = params.slot_duration { + Slot::from_timestamp(relay_timestamp, slot_duration) + } else { + // If there is no slot duration, we assume that the parachain use the relay slot directly + relay_slot + }; + tracing::debug!( + target: crate::LOG_TARGET, + ?relay_slot, + para_slot = ?our_slot, + ?relay_timestamp, + slot_duration = ?params.slot_duration, + relay_chain_slot_duration = ?params.relay_chain_slot_duration, + "Adjusted relay-chain slot to parachain slot" + ); + our_slot + } + }; + + // Search potential parents to build upon + let mut potential_parents = + match cumulus_client_consensus_common::find_potential_parents::( + ParentSearchParams { + relay_parent, + para_id: params.para_id, + ancestry_lookback: max_ancestry_lookback( + relay_parent, + ¶ms.relay_client, + ) + .await, + max_depth: PARENT_SEARCH_DEPTH, + ignore_alternative_branches: true, + }, + &*params.para_backend, + ¶ms.relay_client, + ) + .await + { + Err(e) => { + tracing::error!( + target: crate::LOG_TARGET, + ?relay_parent, + err = ?e, + "Could not fetch potential parents to build upon" + ); + + continue; + } + Ok(potential_parents) => potential_parents, + }; + + // Search the first potential parent parablock that is already included in the relay + let included_block = match potential_parents.iter().find(|x| x.depth == 0) { + None => continue, // also serves as an `is_empty` check. + Some(b) => b.hash, + }; + + // At this point, we found a potential parent parablock that is already included in the relay. + // + // Sort by depth, ascending, to choose the longest chain. If the longest chain has space, + // build upon that. Otherwise, don't build at all. + potential_parents.sort_by_key(|a| a.depth); + let initial_parent = match potential_parents.pop() { + None => continue, + Some(initial_parent) => initial_parent, + }; + + // Build in a loop until not allowed. Note that the selected collators can change + // at any block, so we need to re-claim our slot every time. + // This needs to change to support elastic scaling, but for continuously + // scheduled chains this ensures that the backlog will grow steadily. + let mut parent_hash = initial_parent.hash; + let mut parent_header = initial_parent.header; + let overseer_handle = &mut params.overseer_handle; + for n_built in 0..2 { + // Ask to the runtime if we are authorized to create a new parablock on top of this parent. + // (This will claim the slot internally) + let para_client = &*params.para_client; + let keystore = ¶ms.keystore; + let author_id = match can_build_upon::<_, _>( + slot_now, + &parent_header, + &relay_parent_header, + included_block, + para_client, + &keystore, + params.force_authoring, + ) + .await + { + None => break, + Some(author_id) => author_id, + }; + + tracing::debug!( + target: crate::LOG_TARGET, + ?relay_parent, + unincluded_segment_len = initial_parent.depth + n_built, + "Slot claimed. Building" + ); + + // + // Build and announce collations recursively until + // `can_build_upon` fails or building a collation fails. + // + + // Create inherents data for the next parablock + let (parachain_inherent_data, other_inherent_data) = + match crate::create_inherent_data( + ¶ms.create_inherent_data_providers, + params.para_id, + parent_hash, + &PersistedValidationData { + parent_head: parent_header.encode().into(), + relay_parent_number: *relay_parent_header.number(), + relay_parent_storage_root: *relay_parent_header.state_root(), + max_pov_size, + }, + ¶ms.relay_client, + relay_parent, + author_id.clone(), + ) + .await + { + Err(err) => { + tracing::error!(target: crate::LOG_TARGET, ?err); + break; + } + Ok(x) => x, + }; + + // Compute the hash of the parachain runtime bytecode that we using to build the block. + // The hash will be send to relay validators alongside the candidate. + let validation_code_hash = match params.code_hash_provider.code_hash_at(parent_hash) + { + None => { + tracing::error!( + target: crate::LOG_TARGET, + ?parent_hash, + "Could not fetch validation code hash" + ); + break; + } + Some(validation_code_hash) => validation_code_hash, + }; + + match super::collate( + ¶ms.additional_digests_provider, + author_id, + &mut params.block_import, + ¶ms.collator_service, + keystore, + &parent_header, + &mut params.proposer, + (parachain_inherent_data, other_inherent_data), + params.authoring_duration, + // Set the block limit to 50% of the maximum PoV size. + // + // TODO: If we got benchmarking that includes the proof size, + // we should be able to use the maximum pov size. + (max_pov_size / 2) as usize, + ) + .await + { + Ok((collation, block_data, new_block_hash)) => { + // Here we are assuming that the import logic protects against equivocations + // and provides sybil-resistance, as it should. + params.collator_service.announce_block(new_block_hash, None); + + // Send a submit-collation message to the collation generation subsystem, + // which then distributes this to validators. + // + // Here we are assuming that the leaf is imported, as we've gotten an + // import notification. + overseer_handle + .send_msg( + CollationGenerationMessage::SubmitCollation( + SubmitCollationParams { + relay_parent, + collation, + parent_head: parent_header.encode().into(), + validation_code_hash, + result_sender: None, + }, + ), + "SubmitCollation", + ) + .await; + + parent_hash = new_block_hash; + parent_header = block_data.into_header(); + } + Err(err) => { + tracing::error!(target: crate::LOG_TARGET, ?err); + break; + } + } + } + } + } +} + +// Checks if we own the slot at the given block and whether there +// is space in the unincluded segment. +async fn can_build_upon( + slot: Slot, + parent: &Block::Header, + relay_parent: &PHeader, + included_block: Block::Hash, + client: &Client, + keystore: &KeystorePtr, + force_authoring: bool, +) -> Option +where + Block: BlockT, + Client: ProvideRuntimeApi, + Client::Api: NimbusApi + UnincludedSegmentApi, +{ + let runtime_api = client.runtime_api(); + match crate::claim_slot::( + keystore, + client, + parent, + relay_parent, + force_authoring, + ) + .await + { + Ok(Some(nimbus_id)) => { + // Here we lean on the property that building on an empty unincluded segment must always + // be legal. Skipping the runtime API query here allows us to seamlessly run this + // collator against chains which have not yet upgraded their runtime. + if parent.hash() != included_block { + match runtime_api.can_build_upon(parent.hash(), included_block, slot) { + Ok(true) => Some(nimbus_id), + Ok(false) => None, + Err(err) => { + tracing::error!( + target: crate::LOG_TARGET, + ?err, + ?parent, + ?relay_parent, + ?included_block, + "Failed to call runtime api UnincludedSegmentApi::can_build_upon", + ); + None + } + } + } else { + Some(nimbus_id) + } + } + Ok(None) => None, + Err(err) => { + tracing::error!( + target: crate::LOG_TARGET, + ?err, + ?parent, + ?relay_parent, + ?included_block, + "Failed to claim slot", + ); + None + } + } +} + +/// Reads allowed ancestry length parameter from the relay chain storage at the given relay parent. +/// +/// Falls back to 0 in case of an error. +async fn max_ancestry_lookback( + relay_parent: PHash, + relay_client: &impl RelayChainInterface, +) -> usize { + match load_abridged_host_configuration(relay_parent, relay_client).await { + Ok(Some(config)) => config.async_backing_params.allowed_ancestry_len as usize, + Ok(None) => { + tracing::error!( + target: crate::LOG_TARGET, + "Active config is missing in relay chain storage", + ); + 0 + } + Err(err) => { + tracing::error!( + target: crate::LOG_TARGET, + ?err, + ?relay_parent, + "Failed to read active config from relay chain client", + ); + 0 + } + } +} + +// Checks if there exists a scheduled core for the para at the provided relay parent. +// +// Falls back to `false` in case of an error. +async fn is_para_scheduled( + relay_parent: PHash, + para_id: ParaId, + overseer_handle: &mut OverseerHandle, +) -> bool { + let (tx, rx) = oneshot::channel(); + let request = RuntimeApiRequest::AvailabilityCores(tx); + overseer_handle + .send_msg( + RuntimeApiMessage::Request(relay_parent, request), + "LookaheadCollator", + ) + .await; + + let cores = match rx.await { + Ok(Ok(cores)) => cores, + Ok(Err(error)) => { + tracing::error!( + target: crate::LOG_TARGET, + ?error, + ?relay_parent, + "Failed to query availability cores runtime API", + ); + return false; + } + Err(oneshot::Canceled) => { + tracing::error!( + target: crate::LOG_TARGET, + ?relay_parent, + "Sender for availability cores runtime request dropped", + ); + return false; + } + }; + + cores.iter().any(|core| core.para_id() == Some(para_id)) +} diff --git a/client/consensus/nimbus-consensus/src/lib.rs b/client/consensus/nimbus-consensus/src/lib.rs index 78615a9..e721d77 100644 --- a/client/consensus/nimbus-consensus/src/lib.rs +++ b/client/consensus/nimbus-consensus/src/lib.rs @@ -20,150 +20,135 @@ //! stored in its keystore are eligible to author at this slot. If it has an eligible //! key it authors. -use cumulus_client_consensus_common::{ - ParachainBlockImport, ParachainCandidate, ParachainConsensus, -}; -use cumulus_primitives_core::{relay_chain::Hash as PHash, ParaId, PersistedValidationData}; +pub mod collators; + +mod import_queue; +mod manual_seal; + pub use import_queue::import_queue; -use log::{debug, info, warn}; -use nimbus_primitives::{ - CompatibleDigestItem, DigestsProvider, NimbusApi, NimbusId, NIMBUS_KEY_ID, +pub use manual_seal::NimbusManualSealConsensusDataProvider; + +use cumulus_primitives_core::{ + relay_chain::{Hash as PHash, Header as PHeader}, + ParaId, PersistedValidationData, }; -use parking_lot::Mutex; -use sc_client_api::backend::Backend; -use sc_consensus::{BlockImport, BlockImportParams}; +use cumulus_primitives_parachain_inherent::ParachainInherentData; +use cumulus_relay_chain_interface::RelayChainInterface; +use futures::prelude::*; +use log::{info, warn}; +use nimbus_primitives::{NimbusApi, NimbusId, NIMBUS_KEY_ID}; +use sc_consensus::BlockImport; use sp_api::ProvideRuntimeApi; use sp_application_crypto::ByteArray; -use sp_consensus::{ - BlockOrigin, EnableProofRecording, Environment, ProofRecording, Proposal, Proposer, -}; -use sp_core::{crypto::CryptoTypeId, sr25519}; use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider}; use sp_keystore::{Keystore, KeystorePtr}; -use sp_runtime::{ - traits::{Block as BlockT, Header as HeaderT}, - DigestItem, -}; -use std::convert::TryInto; -use std::{marker::PhantomData, sync::Arc, time::Duration}; -use tracing::error; -mod import_queue; -mod manual_seal; -pub use manual_seal::NimbusManualSealConsensusDataProvider; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use std::error::Error; const LOG_TARGET: &str = "filtering-consensus"; -/// The implementation of the relay-chain provided consensus for parachains. -pub struct NimbusConsensus { - para_id: ParaId, - proposer_factory: Arc>, - create_inherent_data_providers: Arc, - block_import: Arc>>, - parachain_client: Arc, - keystore: KeystorePtr, - skip_prediction: bool, - additional_digests_provider: Arc, - _phantom: PhantomData, -} +/*impl + NimbusConsensus +where + Block: BlockT, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + CS: CollatorServiceInterface, + Client: ProvideRuntimeApi + 'static, + Client::Api: sp_api::Core + NimbusApi, + DP: DigestsProvider::Hash> + 'static, + Proposer: ProposerInterface + Send + Sync + 'static, + RClient: RelayChainInterface + Send + Clone + 'static, +{ + +}*/ -impl Clone - for NimbusConsensus +/// Attempt to claim a slot derived from the given relay-parent header's slot. +pub(crate) async fn claim_slot( + keystore: &KeystorePtr, + para_client: &Client, + parent: &Block::Header, + relay_parent_header: &PHeader, + skip_prediction: bool, +) -> Result, Box> +where + Block: BlockT, + Client: ProvideRuntimeApi, + Client::Api: NimbusApi, { - fn clone(&self) -> Self { - Self { - para_id: self.para_id, - proposer_factory: self.proposer_factory.clone(), - create_inherent_data_providers: self.create_inherent_data_providers.clone(), - block_import: self.block_import.clone(), - parachain_client: self.parachain_client.clone(), - keystore: self.keystore.clone(), - skip_prediction: self.skip_prediction, - additional_digests_provider: self.additional_digests_provider.clone(), - _phantom: PhantomData, - } + // Determine if runtime change + let runtime_upgraded = if *parent.number() > sp_runtime::traits::Zero::zero() { + use sp_api::Core as _; + let previous_runtime_version: sp_api::RuntimeVersion = para_client + .runtime_api() + .version(parent.hash()) + .map_err(Box::new)?; + let runtime_version: sp_api::RuntimeVersion = para_client + .runtime_api() + .version(parent.hash()) + .map_err(Box::new)?; + + previous_runtime_version != runtime_version + } else { + false + }; + + let maybe_key = if skip_prediction || runtime_upgraded { + first_available_key(&*keystore) + } else { + first_eligible_key::( + para_client.clone(), + &*keystore, + parent, + *relay_parent_header.number(), + ) + }; + + if let Some(key) = maybe_key { + Ok(Some( + NimbusId::from_slice(&key).map_err(|_| "invalid nimbus id (wrong length)")?, + )) + } else { + Ok(None) } } -impl NimbusConsensus +/// Explicitly creates the inherent data for parachain block authoring. +pub(crate) async fn create_inherent_data( + create_inherent_data_providers: &CIDP, + para_id: ParaId, + parent: Block::Hash, + validation_data: &PersistedValidationData, + relay_client: &RClient, + relay_parent: PHash, + author_id: NimbusId, +) -> Result<(ParachainInherentData, InherentData), Box> where - B: BlockT, - PF: 'static, - BI: 'static, - BE: Backend + 'static, - ParaClient: ProvideRuntimeApi + 'static, - CIDP: CreateInherentDataProviders + 'static, - DP: DigestsProvider::Hash> + 'static, + Block: BlockT, + CIDP: CreateInherentDataProviders + 'static, + RClient: RelayChainInterface + Send + Clone + 'static, { - /// Create a new instance of nimbus consensus. - pub fn build( - BuildNimbusConsensusParams { - para_id, - proposer_factory, - create_inherent_data_providers, - block_import, - backend, - parachain_client, - keystore, - skip_prediction, - additional_digests_provider, - }: BuildNimbusConsensusParams, - ) -> Box> - where - Self: ParachainConsensus, - { - Box::new(Self { - para_id, - proposer_factory: Arc::new(Mutex::new(proposer_factory)), - create_inherent_data_providers: Arc::new(create_inherent_data_providers), - block_import: Arc::new(futures::lock::Mutex::new(ParachainBlockImport::new( - block_import, - backend, - ))), - parachain_client, - keystore, - skip_prediction, - additional_digests_provider: Arc::new(additional_digests_provider), - _phantom: PhantomData, - }) - } - - //TODO Could this be a provided implementation now that we have this async inherent stuff? - /// Create the data. - async fn inherent_data( - &self, - parent: B::Hash, - validation_data: &PersistedValidationData, - relay_parent: PHash, - author_id: NimbusId, - ) -> Option { - let inherent_data_providers = self - .create_inherent_data_providers - .create_inherent_data_providers( - parent, - (relay_parent, validation_data.clone(), author_id), + let paras_inherent_data = + ParachainInherentData::create_at(relay_parent, relay_client, validation_data, para_id) + .await; + + let paras_inherent_data = match paras_inherent_data { + Some(p) => p, + None => { + return Err( + format!("Could not create paras inherent data at {:?}", relay_parent).into(), ) - .await - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - error = ?e, - "Failed to create inherent data providers.", - ) - }) - .ok()?; + } + }; - inherent_data_providers - .create_inherent_data() - .await - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - error = ?e, - "Failed to create inherent data.", - ) - }) - .ok() - } + let other_inherent_data = create_inherent_data_providers + .create_inherent_data_providers(parent, (relay_parent, validation_data.clone(), author_id)) + .map_err(|e| e as Box) + .await? + .create_inherent_data() + .await + .map_err(Box::new)?; + + Ok((paras_inherent_data, other_inherent_data)) } /// Grabs any available nimbus key from the keystore. @@ -193,15 +178,16 @@ pub(crate) fn first_available_key(keystore: &dyn Keystore) -> Option> { /// If multiple keys are eligible this function still only returns one /// and makes no guarantees which one as that depends on the keystore's iterator behavior. /// This is the standard way of determining which key to author with. -pub(crate) fn first_eligible_key( - client: Arc, +pub(crate) fn first_eligible_key( + client: &Client, keystore: &dyn Keystore, - parent: &B::Header, + parent: &Block::Header, slot_number: u32, ) -> Option> where - C: ProvideRuntimeApi, - C::Api: NimbusApi, + Block: BlockT, + Client: ProvideRuntimeApi, + Client::Api: NimbusApi, { // Get all the available keys let available_keys = Keystore::keys(keystore, NIMBUS_KEY_ID).ok()?; @@ -245,215 +231,3 @@ where maybe_key } - -pub(crate) fn seal_header( - header: &B::Header, - keystore: &dyn Keystore, - public_pair: &Vec, - crypto_id: &CryptoTypeId, -) -> DigestItem -where - B: BlockT, -{ - let pre_hash = header.hash(); - - let raw_sig = Keystore::sign_with( - keystore, - NIMBUS_KEY_ID, - *crypto_id, - public_pair, - pre_hash.as_ref(), - ) - .expect("Keystore should be able to sign") - .expect("We already checked that the key was present"); - - debug!(target: LOG_TARGET, "The signature is \n{:?}", raw_sig); - - let signature = raw_sig - .clone() - .try_into() - .expect("signature bytes produced by keystore should be right length"); - - ::nimbus_seal(signature) -} - -#[async_trait::async_trait] -impl ParachainConsensus - for NimbusConsensus -where - B: BlockT, - BI: BlockImport + Send + Sync + 'static, - BE: Backend + Send + Sync + 'static, - PF: Environment + Send + Sync + 'static, - PF::Proposer: Proposer< - B, - ProofRecording = EnableProofRecording, - Proof = ::Proof, - >, - ParaClient: ProvideRuntimeApi + Send + Sync + 'static, - ParaClient::Api: NimbusApi, - CIDP: CreateInherentDataProviders + 'static, - DP: DigestsProvider::Hash> + 'static + Send + Sync, -{ - async fn produce_candidate( - &mut self, - parent: &B::Header, - relay_parent: PHash, - validation_data: &PersistedValidationData, - ) -> Option> { - // Determine if runtime change - let runtime_upgraded = if *parent.number() > sp_runtime::traits::Zero::zero() { - use sp_api::Core as _; - let previous_runtime_version: sp_api::RuntimeVersion = self - .parachain_client - .runtime_api() - .version(parent.hash()) - .ok()?; - let runtime_version: sp_api::RuntimeVersion = self - .parachain_client - .runtime_api() - .version(parent.hash()) - .ok()?; - - previous_runtime_version != runtime_version - } else { - false - }; - - let maybe_key = if self.skip_prediction || runtime_upgraded { - first_available_key(&*self.keystore) - } else { - first_eligible_key::( - self.parachain_client.clone(), - &*self.keystore, - parent, - validation_data.relay_parent_number, - ) - }; - - // If there are no eligible keys, print the log, and exit early. - let type_public_pair = match maybe_key { - Some(p) => p, - None => { - return None; - } - }; - - let proposer_future = self.proposer_factory.lock().init(&parent); - - let proposer = proposer_future - .await - .map_err(|e| error!(target: LOG_TARGET, error = ?e, "Could not create proposer.")) - .ok()?; - - let nimbus_id = NimbusId::from_slice(&type_public_pair) - .map_err( - |e| error!(target: LOG_TARGET, error = ?e, "Invalid Nimbus ID (wrong length)."), - ) - .ok()?; - - let inherent_data = self - .inherent_data( - parent.hash(), - &validation_data, - relay_parent, - nimbus_id.clone(), - ) - .await?; - - let mut logs = vec![CompatibleDigestItem::nimbus_pre_digest(nimbus_id.clone())]; - logs.extend( - self.additional_digests_provider - .provide_digests(nimbus_id, parent.hash()), - ); - let inherent_digests = sp_runtime::generic::Digest { logs }; - - let Proposal { - block, - storage_changes, - proof, - } = proposer - .propose( - inherent_data, - inherent_digests, - //TODO: Fix this. - Duration::from_millis(500), - // Set the block limit to 50% of the maximum PoV size. - // - // TODO: If we got benchmarking that includes that encapsulates the proof size, - // we should be able to use the maximum pov size. - Some((validation_data.max_pov_size / 2) as usize), - ) - .await - .map_err(|e| error!(target: LOG_TARGET, error = ?e, "Proposing failed.")) - .ok()?; - - let (header, extrinsics) = block.clone().deconstruct(); - - let sig_digest = seal_header::( - &header, - &*self.keystore, - &type_public_pair, - &sr25519::CRYPTO_ID, - ); - - let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header.clone()); - block_import_params.post_digests.push(sig_digest.clone()); - block_import_params.body = Some(extrinsics.clone()); - block_import_params.state_action = sc_consensus::StateAction::ApplyChanges( - sc_consensus::StorageChanges::Changes(storage_changes), - ); - - // Print the same log line as slots (aura and babe) - info!( - "🔖 Sealed block for proposal at {}. Hash now {:?}, previously {:?}.", - *header.number(), - block_import_params.post_hash(), - header.hash(), - ); - - if let Err(err) = self - .block_import - .lock() - .await - .import_block(block_import_params) - .await - { - error!( - target: LOG_TARGET, - at = ?parent.hash(), - error = ?err, - "Error importing built block.", - ); - - return None; - } - - // Compute info about the block after the digest is added - let mut post_header = header.clone(); - post_header.digest_mut().logs.push(sig_digest.clone()); - let post_block = B::new(post_header, extrinsics); - - // Returning the block WITH the seal for distribution around the network. - Some(ParachainCandidate { - block: post_block, - proof, - }) - } -} - -/// Paramaters of [`build_relay_chain_consensus`]. -/// -/// I briefly tried the async keystore approach, but decided to go sync so I can copy -/// code from Aura. Maybe after it is working, Jeremy can help me go async. -pub struct BuildNimbusConsensusParams { - pub para_id: ParaId, - pub proposer_factory: PF, - pub create_inherent_data_providers: CIDP, - pub block_import: BI, - pub backend: Arc, - pub parachain_client: Arc, - pub keystore: KeystorePtr, - pub skip_prediction: bool, - pub additional_digests_provider: DP, -} diff --git a/client/consensus/nimbus-consensus/src/manual_seal.rs b/client/consensus/nimbus-consensus/src/manual_seal.rs index b33419a..644f2e7 100644 --- a/client/consensus/nimbus-consensus/src/manual_seal.rs +++ b/client/consensus/nimbus-consensus/src/manual_seal.rs @@ -65,7 +65,7 @@ where // Fetch first eligible key from keystore let maybe_key = crate::first_eligible_key::( - self.client.clone(), + &self.client, &*self.keystore, parent, // For now we author all blocks in slot zero, which is consistent with how we are @@ -121,7 +121,7 @@ where let nimbus_public = NimbusId::from_slice(&claimed_author) .map_err(|_| Error::StringError(String::from("invalid nimbus id (wrong length)")))?; - let sig_digest = crate::seal_header::( + let sig_digest = crate::collators::seal_header::( ¶ms.header, &*self.keystore, &nimbus_public.to_raw_vec(), diff --git a/pallets/async-backing/Cargo.toml b/pallets/async-backing/Cargo.toml new file mode 100644 index 0000000..456a53c --- /dev/null +++ b/pallets/async-backing/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "pallet-async-backing" +authors = [ "Moonsong Labs" ] +description = "Nimbus consensus extension pallet for asynchronous backing" +edition = "2021" +version = "0.9.0" + +[dependencies] +cumulus-pallet-parachain-system = { workspace = true, features = [ "parameterized-consensus-hook" ] } +cumulus-primitives-core = { workspace = true } +frame-support = { workspace = true } +frame-system = { workspace = true } +log = { workspace = true } +nimbus-primitives = { workspace = true } +pallet-timestamp = { workspace = true } +parity-scale-codec = { workspace = true } +scale-info = { workspace = true } +sp-consensus-slots = { workspace = true } +sp-core = { workspace = true } +sp-runtime = { workspace = true } +sp-std = { workspace = true } + +[dev-dependencies] +sp-io = { workspace = true } + +[features] +default = [ "std" ] +std = [ + "cumulus-pallet-parachain-system/std", + "cumulus-primitives-core/std", + "frame-support/std", + "frame-system/std", + "nimbus-primitives/std", + "pallet-timestamp/std", + "parity-scale-codec/std", + "scale-info/std", + "sp-consensus-slots/std", + "sp-core/std", + "sp-runtime/std", + "sp-std/std", +] \ No newline at end of file diff --git a/pallets/async-backing/src/consensus_hook.rs b/pallets/async-backing/src/consensus_hook.rs new file mode 100644 index 0000000..6c1e206 --- /dev/null +++ b/pallets/async-backing/src/consensus_hook.rs @@ -0,0 +1,140 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +//! The definition of a [`FixedVelocityConsensusHook`] for consensus logic to manage +//! block velocity. +//! +//! The velocity `V` refers to the rate of block processing by the relay chain. + +use crate::*; +use cumulus_pallet_parachain_system::{ + self as parachain_system, + consensus_hook::{ConsensusHook, UnincludedSegmentCapacity}, + relay_state_snapshot::RelayChainStateProof, +}; +use frame_support::pallet_prelude::*; +use sp_consensus_slots::Slot; +use sp_std::{marker::PhantomData, num::NonZeroU32}; + +#[cfg(tests)] +type RelayChainStateProof = crate::mock::FakeRelayChainStateProof; + +/// A consensus hook for a fixed block processing velocity and unincluded segment capacity. +/// +/// Relay chain slot duration must be provided in milliseconds. +pub struct FixedVelocityConsensusHook(PhantomData); + +impl ConsensusHook + for FixedVelocityConsensusHook +where + ::Moment: Into, +{ + // Validates the number of authored blocks within the slot with respect to the `V + 1` limit. + fn on_state_proof(state_proof: &RelayChainStateProof) -> (Weight, UnincludedSegmentCapacity) { + let relay_chain_slot = state_proof + .read_slot() + .expect("failed to read relay chain slot"); + + Self::on_state_proof_inner(relay_chain_slot) + } +} + +impl FixedVelocityConsensusHook +where + ::Moment: Into, +{ + pub(crate) fn on_state_proof_inner( + relay_chain_slot: cumulus_primitives_core::relay_chain::Slot, + ) -> (Weight, UnincludedSegmentCapacity) { + // Ensure velocity is non-zero. + let velocity = V.max(1); + + // Get and verify the parachain slot + let new_slot = T::GetAndVerifySlot::get_and_verify_slot(&relay_chain_slot) + .expect("slot number mismatch"); + + // Update Slot Info + let authored = match SlotInfo::::get() { + Some((slot, authored)) if slot == new_slot => { + if !T::AllowMultipleBlocksPerSlot::get() { + panic!("Block invalid; Supplied slot number is not high enough"); + } + authored + 1 + } + Some((slot, _)) if slot < new_slot => 1, + Some(..) => { + panic!("slot moved backwards") + } + None => 1, + }; + + // Perform checks. + if authored > velocity + 1 { + panic!("authored blocks limit is reached for the slot") + } + + // Store new slot info + SlotInfo::::put((new_slot, authored)); + + // Account weights + let weight = T::DbWeight::get().reads_writes(1, 1); + + // Return weight and unincluded segment capacity + ( + weight, + NonZeroU32::new(sp_std::cmp::max(C, 1)) + .expect("1 is the minimum value and non-zero; qed") + .into(), + ) + } +} + +impl + FixedVelocityConsensusHook +{ + /// Whether it is legal to extend the chain, assuming the given block is the most + /// recently included one as-of the relay parent that will be built against, and + /// the given slot. + /// + /// This should be consistent with the logic the runtime uses when validating blocks to + /// avoid issues. + /// + /// When the unincluded segment is empty, i.e. `included_hash == at`, where at is the block + /// whose state we are querying against, this must always return `true` as long as the slot + /// is more recent than the included block itself. + pub fn can_build_upon(included_hash: T::Hash, new_slot: Slot) -> bool { + let velocity = V.max(1); + let (last_slot, authored_so_far) = match pallet::Pallet::::slot_info() { + None => return true, + Some(x) => x, + }; + + let size_after_included = + parachain_system::Pallet::::unincluded_segment_size_after(included_hash); + + // can never author when the unincluded segment is full. + if size_after_included >= C { + return false; + } + + if last_slot == new_slot { + authored_so_far < velocity + 1 + } else { + // disallow slot from moving backwards. + last_slot < new_slot + } + } +} diff --git a/pallets/async-backing/src/lib.rs b/pallets/async-backing/src/lib.rs new file mode 100644 index 0000000..16e606b --- /dev/null +++ b/pallets/async-backing/src/lib.rs @@ -0,0 +1,105 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +#![cfg_attr(not(feature = "std"), no_std)] + +pub mod consensus_hook; + +#[cfg(test)] +mod mock; +#[cfg(test)] +mod tests; + +pub use pallet::*; + +use frame_support::pallet_prelude::*; +use sp_consensus_slots::{Slot, SlotDuration}; + +/// The InherentIdentifier for nimbus's extension inherent +pub const INHERENT_IDENTIFIER: InherentIdentifier = *b"nimb-ext"; + +/// A way to get the current parachain slot and verify it's validity against the relay slot. +/// If you don't need to have slots at parachain level, you can use the `RelaySlot` implementation. +pub trait GetAndVerifySlot { + /// Get the current slot + fn get_and_verify_slot(relay_chain_slot: &Slot) -> Result; +} + +/// Parachain slot implementation that use the relay chain slot directly +pub struct RelaySlot; +impl GetAndVerifySlot for RelaySlot { + fn get_and_verify_slot(relay_chain_slot: &Slot) -> Result { + Ok(*relay_chain_slot) + } +} + +/// Parachain slot implementation that use a slot provider +pub struct ParaSlot( + PhantomData, +); + +impl GetAndVerifySlot + for ParaSlot +where + SlotProvider: Get<(Slot, SlotDuration)>, +{ + fn get_and_verify_slot(relay_chain_slot: &Slot) -> Result { + // Convert relay chain timestamp. + let relay_chain_timestamp = + u64::from(RELAY_CHAIN_SLOT_DURATION_MILLIS).saturating_mul((*relay_chain_slot).into()); + + let (new_slot, para_slot_duration) = SlotProvider::get(); + + let para_slot_from_relay = + Slot::from_timestamp(relay_chain_timestamp.into(), para_slot_duration); + + if new_slot == para_slot_from_relay { + Ok(new_slot) + } else { + Err(()) + } + } +} + +#[frame_support::pallet] +pub mod pallet { + use super::*; + + /// The current storage version. + const STORAGE_VERSION: StorageVersion = StorageVersion::new(1); + + #[pallet::pallet] + #[pallet::storage_version(STORAGE_VERSION)] + pub struct Pallet(PhantomData); + + /// The configuration trait. + #[pallet::config] + pub trait Config: pallet_timestamp::Config + frame_system::Config { + /// Whether or not to allow more than one block per slot. + /// Setting it to 'true' will enable async-backing compatibility. + type AllowMultipleBlocksPerSlot: Get; + + /// A way to get the current parachain slot and verify it's validity against the relay slot. + type GetAndVerifySlot: GetAndVerifySlot; + } + + /// First tuple element is the highest slot that has been seen in the history of this chain. + /// Second tuple element is the number of authored blocks so far. + /// This is a strictly-increasing value if T::AllowMultipleBlocksPerSlot = false. + #[pallet::storage] + #[pallet::getter(fn slot_info)] + pub type SlotInfo = StorageValue<_, (Slot, u32), OptionQuery>; +} diff --git a/pallets/async-backing/src/mock.rs b/pallets/async-backing/src/mock.rs new file mode 100644 index 0000000..900161e --- /dev/null +++ b/pallets/async-backing/src/mock.rs @@ -0,0 +1,96 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use crate::{self as async_backing, RelaySlot}; +use frame_support::parameter_types; +use frame_support::traits::{ConstU32, ConstU64}; +use frame_support::weights::RuntimeDbWeight; +use frame_system; +use sp_core::H256; +use sp_runtime::{ + traits::{BlakeTwo256, IdentityLookup}, + BuildStorage, +}; + +type Block = frame_system::mocking::MockBlock; + +frame_support::construct_runtime!( + pub enum Test + { + System: frame_system, + Timestamp: pallet_timestamp, + AsyncBacking: async_backing, + } +); + +parameter_types! { + pub const BlockHashCount: u64 = 250; + pub const TestDbWeight: RuntimeDbWeight = RuntimeDbWeight { + read: 1, + write: 10, + }; +} + +impl frame_system::Config for Test { + type BaseCallFilter = frame_support::traits::Everything; + type BlockWeights = (); + type BlockLength = (); + type DbWeight = TestDbWeight; + type RuntimeOrigin = RuntimeOrigin; + type RuntimeCall = RuntimeCall; + type Nonce = u64; + type Block = Block; + type Hash = H256; + type Hashing = BlakeTwo256; + type AccountId = u64; + type Lookup = IdentityLookup; + type RuntimeEvent = RuntimeEvent; + type BlockHashCount = BlockHashCount; + type Version = (); + type PalletInfo = PalletInfo; + type AccountData = (); + type OnNewAccount = (); + type OnKilledAccount = (); + type SystemWeightInfo = (); + type SS58Prefix = (); + type OnSetCode = (); + type MaxConsumers = ConstU32<16>; +} + +impl pallet_timestamp::Config for Test { + type Moment = u64; + type OnTimestampSet = (); + type MinimumPeriod = ConstU64<1>; + type WeightInfo = (); +} + +parameter_types! { + pub const AllowMultipleBlocksPerSlot: bool = true; + pub const SlotDuration: u64 = 12000; +} + +impl async_backing::Config for Test { + type AllowMultipleBlocksPerSlot = AllowMultipleBlocksPerSlot; + type GetAndVerifySlot = RelaySlot; +} + +/// Build genesis storage according to the mock runtime. +pub fn new_test_ext() -> sp_io::TestExternalities { + let t = frame_system::GenesisConfig::::default() + .build_storage() + .unwrap(); + sp_io::TestExternalities::new(t) +} diff --git a/pallets/async-backing/src/tests.rs b/pallets/async-backing/src/tests.rs new file mode 100644 index 0000000..db7cba4 --- /dev/null +++ b/pallets/async-backing/src/tests.rs @@ -0,0 +1,70 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use crate::consensus_hook::FixedVelocityConsensusHook; +use crate::mock::*; +use std::ops::Deref; + +type ConsensusHook = FixedVelocityConsensusHook; + +fn assert_slot_info_eq(slot_number: u64, authored: u32) { + assert_eq!(AsyncBacking::slot_info().unwrap().0.deref(), &slot_number); + assert_eq!(AsyncBacking::slot_info().unwrap().1, authored); +} + +#[test] +fn on_state_proof_inner_update_slot_info() { + new_test_ext().execute_with(|| { + assert_eq!(AsyncBacking::slot_info(), None); + ConsensusHook::on_state_proof_inner(1.into()); + assert_slot_info_eq(1, 1); + }); +} + +#[test] +fn can_produce_2_parablock_per_relay_slot() { + new_test_ext().execute_with(|| { + assert_eq!(AsyncBacking::slot_info(), None); + ConsensusHook::on_state_proof_inner(1.into()); + assert_slot_info_eq(1, 1); + ConsensusHook::on_state_proof_inner(1.into()); + assert_slot_info_eq(1, 2); + }); +} + +#[test] +#[should_panic = "authored blocks limit is reached for the slot"] +fn can_not_produce_3_parablock_per_relay_slot() { + new_test_ext().execute_with(|| { + assert_eq!(AsyncBacking::slot_info(), None); + ConsensusHook::on_state_proof_inner(1.into()); + assert_slot_info_eq(1, 1); + ConsensusHook::on_state_proof_inner(1.into()); + assert_slot_info_eq(1, 2); + ConsensusHook::on_state_proof_inner(1.into()); + }); +} + +#[test] +fn can_skip_a_relay_slot() { + new_test_ext().execute_with(|| { + assert_eq!(AsyncBacking::slot_info(), None); + ConsensusHook::on_state_proof_inner(1.into()); + assert_slot_info_eq(1, 1); + ConsensusHook::on_state_proof_inner(3.into()); + assert_slot_info_eq(3, 1); + }); +} diff --git a/pallets/author-inherent/src/lib.rs b/pallets/author-inherent/src/lib.rs index 20a4e8d..8447855 100644 --- a/pallets/author-inherent/src/lib.rs +++ b/pallets/author-inherent/src/lib.rs @@ -28,7 +28,6 @@ use parity_scale_codec::{Decode, Encode, FullCodec}; use sp_inherents::{InherentIdentifier, IsFatalError}; use sp_runtime::{ConsensusEngineId, RuntimeString}; -mod exec; pub use crate::weights::WeightInfo; pub use exec::BlockExecutor; pub use pallet::*; @@ -38,6 +37,8 @@ mod benchmarks; pub mod weights; +mod exec; + #[cfg(test)] mod mock; #[cfg(test)] @@ -94,11 +95,6 @@ pub mod pallet { #[pallet::storage] pub type Author = StorageValue<_, T::AuthorId, OptionQuery>; - /// The highest slot that has been seen in the history of this chain. - /// This is a strictly-increasing value. - #[pallet::storage] - pub type HighestSlotSeen = StorageValue<_, u32, ValueQuery>; - #[pallet::hooks] impl Hooks> for Pallet { fn on_initialize(_: BlockNumberFor) -> Weight { @@ -126,21 +122,14 @@ pub mod pallet { ensure_none(origin)?; // First check that the slot number is valid (greater than the previous highest) - let slot = T::SlotBeacon::slot(); - assert!( - slot > HighestSlotSeen::::get(), - "Block invalid; Supplied slot number is not high enough" - ); + let new_slot = T::SlotBeacon::slot(); // Now check that the author is valid in this slot assert!( - T::CanAuthor::can_author(&Self::get(), &slot), + T::CanAuthor::can_author(&Self::get(), &new_slot), "Block invalid, supplied author is not eligible." ); - // Once that is validated, update the stored slot number - HighestSlotSeen::::put(slot); - Ok(Pays::No.into()) } } diff --git a/pallets/author-slot-filter/Cargo.toml b/pallets/author-slot-filter/Cargo.toml index 2204f78..ea721b1 100644 --- a/pallets/author-slot-filter/Cargo.toml +++ b/pallets/author-slot-filter/Cargo.toml @@ -6,11 +6,11 @@ edition = "2021" version = "0.9.0" [dependencies] -parity-scale-codec = { workspace = true } frame-support = { workspace = true } frame-system = { workspace = true } log = { workspace = true } nimbus-primitives = { workspace = true } +parity-scale-codec = { workspace = true } scale-info = { workspace = true } serde = { workspace = true } sp-core = { workspace = true } @@ -20,7 +20,6 @@ sp-std = { workspace = true } # Benchmarks frame-benchmarking = { workspace = true, optional = true } - [dev-dependencies] frame-support-test = { workspace = true } sp-io = { workspace = true } diff --git a/primitives/async-backing/Cargo.toml b/primitives/async-backing/Cargo.toml new file mode 100644 index 0000000..a77eca5 --- /dev/null +++ b/primitives/async-backing/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "async-backing-primitives" +authors = [ "Moonsong Labs" ] +description = "Primitive types and traits used for asynchronous backing pallet" +edition = "2021" +version = "0.9.0" + +[dependencies] +sp-api = { workspace = true } +sp-consensus-slots = { workspace = true } + +[features] +default = [ "std" ] +std = [ + "sp-api/std", + "sp-consensus-slots/std", +] diff --git a/primitives/async-backing/src/lib.rs b/primitives/async-backing/src/lib.rs new file mode 100644 index 0000000..8402a0e --- /dev/null +++ b/primitives/async-backing/src/lib.rs @@ -0,0 +1,50 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +//! Core primitives for Asynchronous Backing. +//! +//! In particular, this exposes the [`UnincludedSegmentApi`] which is used to regulate +//! the behavior collation within a parachain context. + +#![cfg_attr(not(feature = "std"), no_std)] + +pub use sp_consensus_slots::Slot; + +sp_api::decl_runtime_apis! { + /// This runtime API is used to inform potential block authors whether they will + /// have the right to author at a slot, assuming they have claimed the slot. + /// + /// In particular, this API allows parachains to regulate their "unincluded segment", + /// which is the section of the head of the chain which has not yet been made available in the + /// relay chain. + /// + /// When the unincluded segment is short, parachains will allow authors to create multiple + /// blocks per slot in order to build a backlog. When it is saturated, this API will limit + /// the amount of blocks that can be created. + pub trait UnincludedSegmentApi { + /// Whether it is legal to extend the chain, assuming the given block is the most + /// recently included one as-of the relay parent that will be built against, and + /// the given slot. + /// + /// This should be consistent with the logic the runtime uses when validating blocks to + /// avoid issues. + /// + /// When the unincluded segment is empty, i.e. `included_hash == at`, where at is the block + /// whose state we are querying against, this must always return `true` as long as the slot + /// is more recent than the included block itself. + fn can_build_upon(included_hash: Block::Hash, slot: Slot) -> bool; + } +} diff --git a/primitives/nimbus-primitives/Cargo.toml b/primitives/nimbus-primitives/Cargo.toml index 1f77011..3939608 100644 --- a/primitives/nimbus-primitives/Cargo.toml +++ b/primitives/nimbus-primitives/Cargo.toml @@ -7,10 +7,9 @@ version = "0.9.0" [dependencies] async-trait = { workspace = true, optional = true } -parity-scale-codec = { workspace = true } - frame-support = { workspace = true } frame-system = { workspace = true } +parity-scale-codec = { workspace = true } scale-info = { version = "2.0.0", default-features = false, features = [ "derive" ] } sp-api = { workspace = true } sp-application-crypto = { workspace = true } diff --git a/primitives/nimbus-primitives/src/lib.rs b/primitives/nimbus-primitives/src/lib.rs index 2bcf36f..fc3a339 100644 --- a/primitives/nimbus-primitives/src/lib.rs +++ b/primitives/nimbus-primitives/src/lib.rs @@ -91,17 +91,6 @@ impl> SlotBeacon for T { } } -/// PLANNED: A SlotBeacon that starts a new slot based on the timestamp. Behaviorally, this is -/// similar to what aura, babe and company do. Implementation-wise it is different because it -/// depends on the timestamp pallet for its notion of time. -pub struct IntervalBeacon; - -impl SlotBeacon for IntervalBeacon { - fn slot() -> u32 { - todo!() - } -} - /// Trait to determine whether this author is eligible to author in this slot. /// This is the primary trait your nimbus filter needs to implement. /// diff --git a/template/node/Cargo.toml b/template/node/Cargo.toml index e2ce2cc..e50b5c8 100644 --- a/template/node/Cargo.toml +++ b/template/node/Cargo.toml @@ -78,6 +78,7 @@ sp-transaction-pool = { workspace = true } cumulus-client-cli = { workspace = true } cumulus-client-collator = { workspace = true } cumulus-client-consensus-common = { workspace = true } +cumulus-client-consensus-proposer = { workspace = true } cumulus-client-network = { workspace = true } cumulus-client-service = { workspace = true } cumulus-primitives-core = { workspace = true } diff --git a/template/node/src/service.rs b/template/node/src/service.rs index 108ff88..d8e899b 100644 --- a/template/node/src/service.rs +++ b/template/node/src/service.rs @@ -4,26 +4,30 @@ use std::{sync::Arc, time::Duration}; // Local Runtime Types -use moonkit_template_runtime::{opaque::Block, AccountId, Balance, Nonce, RuntimeApi}; - -use nimbus_consensus::{ - BuildNimbusConsensusParams, NimbusConsensus, NimbusManualSealConsensusDataProvider, +use moonkit_template_runtime::{ + opaque::{Block, Hash}, + AccountId, Balance, Nonce, RuntimeApi, }; +use nimbus_consensus::NimbusManualSealConsensusDataProvider; +use nimbus_primitives::NimbusApi; + // Cumulus Imports use cumulus_client_cli::CollatorOptions; -use cumulus_client_consensus_common::ParachainConsensus; +use cumulus_client_collator::service::CollatorService; +use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport; +use cumulus_client_consensus_proposer::Proposer; use cumulus_client_network::RequireSecondedInBlockAnnounce; #[allow(deprecated)] use cumulus_client_service::{ - prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams, + prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, StartRelayChainTasksParams, }; -use cumulus_primitives_core::ParaId; +use cumulus_primitives_core::{CollectCollationInfo, ParaId}; use cumulus_primitives_parachain_inherent::{ MockValidationDataInherentDataProvider, MockXcmConfig, }; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; -use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; +use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface, RelayChainResult}; use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc; use polkadot_service::CollatorPair; @@ -37,7 +41,6 @@ use sc_executor::{ HeapAllocStrategy, NativeElseWasmExecutor, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY, }; use sc_network::{config::FullNetworkConfiguration, NetworkBlock}; -use sc_network_sync::SyncingService; use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager}; use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle}; use sp_api::ConstructRuntimeApi; @@ -59,6 +62,14 @@ impl sc_executor::NativeExecutionDispatch for TemplateRuntimeExecutor { } } +type ParachainClient = + TFullClient>; + +type ParachainBackend = TFullBackend; + +type ParachainBlockImport = + TParachainBlockImport>, ParachainBackend>; + /// Starts a `ServiceBuilder` for a full service. /// /// Use this macro if you don't actually need the full service, but just the builder in order to @@ -69,15 +80,16 @@ pub fn new_partial( parachain: bool, ) -> Result< PartialComponents< - TFullClient>, - TFullBackend, - sc_consensus::LongestChain, Block>, + ParachainClient, + ParachainBackend, + sc_consensus::LongestChain, sc_consensus::DefaultImportQueue, - sc_transaction_pool::FullPool< - Block, - TFullClient>, - >, - (Option, Option), + sc_transaction_pool::FullPool>, + ( + ParachainBlockImport, + Option, + Option, + ), >, sc_service::Error, > @@ -86,7 +98,9 @@ where + Send + Sync + 'static, - RuntimeApi::RuntimeApi: sp_transaction_pool::runtime_api::TaggedTransactionQueue + RuntimeApi::RuntimeApi: CollectCollationInfo + + NimbusApi + + sp_transaction_pool::runtime_api::TaggedTransactionQueue + sp_api::Metadata + sp_session::SessionKeys + sp_api::ApiExt @@ -151,6 +165,8 @@ where client.clone(), ); + let block_import = ParachainBlockImport::new(client.clone(), backend.clone()); + let import_queue = nimbus_consensus::import_queue( client.clone(), client.clone(), @@ -172,7 +188,7 @@ where task_manager, transaction_pool, select_chain, - other: (telemetry, telemetry_worker_handle), + other: (block_import, telemetry, telemetry_worker_handle), }; Ok(params) @@ -208,13 +224,12 @@ async fn build_relay_chain_interface( /// /// This is the actual implementation that is abstract over the executor and the runtime api. #[sc_tracing::logging::prefix_logs_with("Parachain")] -async fn start_node_impl( +async fn start_node_impl( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, _rpc_ext_builder: RB, - build_consensus: BIC, ) -> sc_service::error::Result<( TaskManager, Arc>>, @@ -224,7 +239,9 @@ where + Send + Sync + 'static, - RuntimeApi::RuntimeApi: sp_transaction_pool::runtime_api::TaggedTransactionQueue + RuntimeApi::RuntimeApi: CollectCollationInfo + + NimbusApi + + sp_transaction_pool::runtime_api::TaggedTransactionQueue + sp_api::Metadata + sp_session::SessionKeys + sp_api::ApiExt @@ -240,28 +257,11 @@ where ) -> Result + Send + 'static, - BIC: FnOnce( - Arc>>, - Arc>, - Option<&Registry>, - Option, - &TaskManager, - Arc, - Arc< - sc_transaction_pool::FullPool< - Block, - TFullClient>, - >, - >, - Arc>, - KeystorePtr, - bool, - ) -> Result>, sc_service::Error>, { let parachain_config = prepare_node_config(parachain_config); let params = new_partial::(¶chain_config, true)?; - let (mut telemetry, telemetry_worker_handle) = params.other; + let (block_import, mut telemetry, telemetry_worker_handle) = params.other; let client = params.client.clone(); let backend = params.backend.clone(); @@ -278,9 +278,8 @@ where .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?; let block_announce_validator = - RequireSecondedInBlockAnnounce::new(relay_chain_interface.clone(), id); + RequireSecondedInBlockAnnounce::new(relay_chain_interface.clone(), para_id); - let force_authoring = parachain_config.force_authoring; let validator = parachain_config.role.is_authority(); let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); @@ -318,6 +317,8 @@ where }) }; + let force_authoring = parachain_config.force_authoring; + sc_service::spawn_tasks(sc_service::SpawnTasksParams { rpc_builder: rpc_extensions_builder, client: client.clone(), @@ -343,53 +344,39 @@ where .overseer_handle() .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + start_relay_chain_tasks(StartRelayChainTasksParams { + client: client.clone(), + announce_block: announce_block.clone(), + para_id, + relay_chain_interface: relay_chain_interface.clone(), + task_manager: &mut task_manager, + da_recovery_profile: if validator { + DARecoveryProfile::Collator + } else { + DARecoveryProfile::FullNode + }, + import_queue: import_queue_service, + relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle.clone()), + sync_service: sync_service.clone(), + })?; + if validator { - let parachain_consensus = build_consensus( + start_consensus::( client.clone(), - backend.clone(), + block_import, prometheus_registry.as_ref(), telemetry.as_ref().map(|t| t.handle()), &task_manager, relay_chain_interface.clone(), transaction_pool, - sync_service.clone(), params.keystore_container.keystore(), + para_id, + collator_key.expect("Command line arguments do not allow this. qed"), + overseer_handle, + announce_block, force_authoring, )?; - - let spawner = task_manager.spawn_handle(); - - let params = StartCollatorParams { - para_id: id, - block_status: client.clone(), - announce_block, - client: client.clone(), - task_manager: &mut task_manager, - relay_chain_interface, - spawner, - parachain_consensus, - import_queue: import_queue_service, - recovery_handle: Box::new(overseer_handle), - collator_key: collator_key.expect("Command line arguments do not allow this. qed"), - relay_chain_slot_duration, - sync_service, - }; - #[allow(deprecated)] - start_collator(params).await?; - } else { - let params = StartFullNodeParams { - client: client.clone(), - announce_block, - task_manager: &mut task_manager, - para_id: id, - relay_chain_interface, - relay_chain_slot_duration, - import_queue: import_queue_service, - recovery_handle: Box::new(overseer_handle), - sync_service, - }; - #[allow(deprecated)] - start_full_node(params)?; } start_network.start_network(); @@ -397,83 +384,102 @@ where Ok((task_manager, client)) } +fn start_consensus( + client: Arc>, + block_import: ParachainBlockImport, + prometheus_registry: Option<&Registry>, + telemetry: Option, + task_manager: &TaskManager, + relay_chain_interface: Arc, + transaction_pool: Arc< + sc_transaction_pool::FullPool>, + >, + keystore: KeystorePtr, + para_id: ParaId, + collator_key: CollatorPair, + overseer_handle: OverseerHandle, + announce_block: Arc>) + Send + Sync>, + force_authoring: bool, +) -> Result<(), sc_service::Error> +where + RuntimeApi: ConstructRuntimeApi>> + + Send + + Sync + + 'static, + RuntimeApi::RuntimeApi: CollectCollationInfo + + NimbusApi + + sp_transaction_pool::runtime_api::TaggedTransactionQueue + + sp_api::Metadata + + sp_session::SessionKeys + + sp_api::ApiExt + + sp_offchain::OffchainWorkerApi + + sp_block_builder::BlockBuilder, + sc_client_api::StateBackendFor, Block>: sp_api::StateBackend, + Executor: sc_executor::NativeExecutionDispatch + 'static, +{ + let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( + task_manager.spawn_handle(), + client.clone(), + transaction_pool, + prometheus_registry, + telemetry.clone(), + ); + + let proposer = Proposer::new(proposer_factory); + + let collator_service = CollatorService::new( + client.clone(), + Arc::new(task_manager.spawn_handle()), + announce_block, + client.clone(), + ); + + let params = nimbus_consensus::collators::basic::Params { + para_id, + overseer_handle, + proposer, + create_inherent_data_providers: move |_, _| async move { Ok(()) }, + block_import, + relay_client: relay_chain_interface, + para_client: client, + keystore, + collator_service, + force_authoring, + additional_digests_provider: (), + collator_key, + //authoring_duration: Duration::from_millis(500), + }; + + let fut = nimbus_consensus::collators::basic::run::( + params, + ); + task_manager + .spawn_essential_handle() + .spawn("nimbus", None, fut); + + Ok(()) +} + /// Start a parachain node. pub async fn start_parachain_node( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, ) -> sc_service::error::Result<( TaskManager, Arc>>, )> { - start_node_impl::( + start_node_impl::( parachain_config, polkadot_config, collator_options, - id, + para_id, |_| Ok(crate::rpc::RpcExtension::new(())), - |client, - backend, - prometheus_registry, - telemetry, - task_manager, - relay_chain_interface, - transaction_pool, - _sync_oracle, - keystore, - force_authoring| { - let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( - task_manager.spawn_handle(), - client.clone(), - transaction_pool, - prometheus_registry, - telemetry.clone(), - ); - - Ok(NimbusConsensus::build(BuildNimbusConsensusParams { - para_id: id, - proposer_factory, - block_import: client.clone(), - backend, - parachain_client: client.clone(), - keystore, - skip_prediction: force_authoring, - create_inherent_data_providers: move |_, - ( - relay_parent, - validation_data, - _author_id, - )| { - let relay_chain_interface = relay_chain_interface.clone(); - async move { - let parachain_inherent = - cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( - relay_parent, - &relay_chain_interface, - &validation_data, - id, - ).await; - - let time = sp_timestamp::InherentDataProvider::from_system_time(); - - let parachain_inherent = parachain_inherent.ok_or_else(|| { - Box::::from( - "Failed to create parachain inherent", - ) - })?; - - let nimbus_inherent = nimbus_primitives::InherentDataProvider; - - Ok((time, parachain_inherent, nimbus_inherent)) - } - }, - additional_digests_provider: (), - })) - }, ) .await } + use sc_transaction_pool_api::OffchainTransactionPoolFactory; /// Builds a new service for a full client. pub fn start_instant_seal_node(config: Configuration) -> Result { @@ -485,7 +491,7 @@ pub fn start_instant_seal_node(config: Configuration) -> Result(&config, false)?; let net_config = FullNetworkConfiguration::new(&config.network); diff --git a/template/runtime/Cargo.toml b/template/runtime/Cargo.toml index 270c1ce..ef234a3 100644 --- a/template/runtime/Cargo.toml +++ b/template/runtime/Cargo.toml @@ -63,7 +63,7 @@ staging-xcm-executor = { workspace = true } # Cumulus dependencies cumulus-pallet-dmp-queue = { workspace = true } -cumulus-pallet-parachain-system = { workspace = true } +cumulus-pallet-parachain-system = { workspace = true, features = [ "parameterized-consensus-hook" ] } cumulus-pallet-xcm = { workspace = true } cumulus-pallet-xcmp-queue = { workspace = true } cumulus-primitives-core = { workspace = true } @@ -72,10 +72,11 @@ cumulus-primitives-utility = { workspace = true } parachain-info = { workspace = true } # Moonkit Dependencies +async-backing-primitives = { workspace = true } nimbus-primitives = { workspace = true } pallet-author-inherent = {workspace = true } pallet-author-slot-filter = { workspace = true } - +pallet-async-backing = { workspace = true } [build-dependencies] substrate-wasm-builder = { workspace = true } @@ -85,6 +86,7 @@ default = [ "std", ] std = [ + "async-backing-primitives/std", "cumulus-pallet-dmp-queue/std", "cumulus-pallet-parachain-system/std", "cumulus-pallet-xcm/std", @@ -98,6 +100,7 @@ std = [ "frame-system-rpc-runtime-api/std", "log/std", "nimbus-primitives/std", + "pallet-async-backing/std", "pallet-author-inherent/std", "pallet-author-slot-filter/std", "pallet-balances/std", diff --git a/template/runtime/src/lib.rs b/template/runtime/src/lib.rs index 64237e3..5f3e484 100644 --- a/template/runtime/src/lib.rs +++ b/template/runtime/src/lib.rs @@ -28,7 +28,7 @@ use frame_support::{ construct_runtime, dispatch::DispatchClass, match_types, parameter_types, - traits::{Everything, Nothing, OnInitialize}, + traits::{ConstBool, Everything, Nothing, OnInitialize}, weights::{ constants::{ BlockExecutionWeight, ExtrinsicBaseWeight, RocksDbWeight, WEIGHT_REF_TIME_PER_SECOND, @@ -175,7 +175,10 @@ impl WeightToFeePolynomial for WeightToFee { /// to even the core data structures. pub mod opaque { use super::*; - use sp_runtime::{generic, traits::BlakeTwo256}; + use sp_runtime::{ + generic, + traits::{BlakeTwo256, Hash as HashT}, + }; pub use sp_runtime::OpaqueExtrinsic as UncheckedExtrinsic; /// Opaque block header type. @@ -184,6 +187,8 @@ pub mod opaque { pub type Block = generic::Block; /// Opaque block identifier type. pub type BlockId = generic::BlockId; + /// Opaque block hash type. + pub type Hash = ::Output; } impl_opaque_keys! { @@ -402,9 +407,21 @@ impl cumulus_pallet_parachain_system::Config for Runtime { type OutboundXcmpMessageSource = XcmpQueue; type XcmpMessageHandler = XcmpQueue; type ReservedXcmpWeight = ReservedXcmpWeight; - type CheckAssociatedRelayNumber = cumulus_pallet_parachain_system::RelayNumberStrictlyIncreases; + type CheckAssociatedRelayNumber = + cumulus_pallet_parachain_system::RelayNumberMonotonicallyIncreases; + type ConsensusHook = ConsensusHook; } +pub const RELAY_CHAIN_SLOT_DURATION_MILLIS: u32 = 6000; +pub const UNINCLUDED_SEGMENT_CAPACITY: u32 = 1; +pub const BLOCK_PROCESSING_VELOCITY: u32 = 1; + +type ConsensusHook = pallet_async_backing::consensus_hook::FixedVelocityConsensusHook< + Runtime, + BLOCK_PROCESSING_VELOCITY, + UNINCLUDED_SEGMENT_CAPACITY, +>; + impl pallet_insecure_randomness_collective_flip::Config for Runtime {} impl parachain_info::Config for Runtime {} @@ -602,6 +619,11 @@ impl pallet_author_slot_filter::Config for Runtime { type WeightInfo = (); } +impl pallet_async_backing::Config for Runtime { + type AllowMultipleBlocksPerSlot = ConstBool; + type GetAndVerifySlot = pallet_async_backing::RelaySlot; +} + parameter_types! { pub const Period: u32 = 6 * HOURS; pub const Offset: u32 = 0; @@ -635,6 +657,7 @@ construct_runtime!( AuthorInherent: pallet_author_inherent::{Pallet, Call, Storage, Inherent} = 20, AuthorFilter: pallet_author_slot_filter::{Pallet, Storage, Event, Config} = 21, PotentialAuthorSet: pallet_account_set::{Pallet, Storage, Config} = 22, + NimbusAsyncBacking: pallet_async_backing::{Pallet, Storage} = 23, // XCM helpers. XcmpQueue: cumulus_pallet_xcmp_queue::{Pallet, Call, Storage, Event} = 30, @@ -772,6 +795,15 @@ impl_runtime_apis! { } } + impl async_backing_primitives::UnincludedSegmentApi for Runtime { + fn can_build_upon( + included_hash: ::Hash, + slot: async_backing_primitives::Slot, + ) -> bool { + ConsensusHook::can_build_upon(included_hash, slot) + } + } + #[cfg(feature = "runtime-benchmarks")] impl frame_benchmarking::Benchmark for Runtime { fn benchmark_metadata(extra: bool) -> ( @@ -822,31 +854,7 @@ impl_runtime_apis! { } } -struct CheckInherents; - -impl cumulus_pallet_parachain_system::CheckInherents for CheckInherents { - fn check_inherents( - block: &Block, - relay_state_proof: &cumulus_pallet_parachain_system::RelayChainStateProof, - ) -> sp_inherents::CheckInherentsResult { - let relay_chain_slot = relay_state_proof - .read_slot() - .expect("Could not read the relay chain slot from the proof"); - - let inherent_data = - cumulus_primitives_timestamp::InherentDataProvider::from_relay_chain_slot_and_duration( - relay_chain_slot, - sp_std::time::Duration::from_secs(6), - ) - .create_inherent_data() - .expect("Could not create the timestamp inherent data"); - - inherent_data.check_extrinsics(block) - } -} - cumulus_pallet_parachain_system::register_validate_block! { Runtime = Runtime, BlockExecutor = pallet_author_inherent::BlockExecutor::, - CheckInherents = CheckInherents, }