Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(consensus): Rename persistent certification pool section for consistency #2262

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions rs/artifact_pool/src/bin/consensus_pool_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,13 @@ fn export(path: &str, matches: &clap::ArgMatches) {
}
}
"Certification" => {
for x in certification_pool
.persistent_pool
.certifications()
.get_all()
{
for x in certification_pool.validated.certifications().get_all() {
println!("{}", to_string(&CertificationMessage::Certification(x)));
}
}
"CertificationShare" => {
for x in certification_pool
.persistent_pool
.validated
.certification_shares()
.get_all()
{
Expand Down Expand Up @@ -250,7 +246,7 @@ fn import(path: &str) {
});
consensus_pool.validated.mutate(ops);
} else if let Ok(msg) = from_str(&s) {
certification_pool.persistent_pool.insert(msg)
certification_pool.validated.insert(msg)
} else {
panic!("Failed to parse JSON: {}", s);
}
Expand Down
47 changes: 21 additions & 26 deletions rs/artifact_pool/src/certification_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct CertificationPoolImpl {
unvalidated_cert_index: HeightIndex<CertificationMessageHash>,
unvalidated: BTreeMap<CertificationMessageHash, CertificationMessage>,

pub persistent_pool: Box<dyn MutablePoolSection + Send + Sync>,
pub validated: Box<dyn MutablePoolSection + Send + Sync>,

unvalidated_pool_metrics: PoolMetrics,
validated_pool_metrics: PoolMetrics,
Expand Down Expand Up @@ -82,7 +82,7 @@ impl CertificationPoolImpl {
unvalidated_share_index: HeightIndex::default(),
unvalidated_cert_index: HeightIndex::default(),
unvalidated: BTreeMap::default(),
persistent_pool,
validated: persistent_pool,
invalidated_artifacts: metrics_registry.int_counter(
"certification_invalidated_artifacts",
"The number of invalidated certification artifacts",
Expand All @@ -102,12 +102,12 @@ impl CertificationPoolImpl {
}

fn validated_certifications(&self) -> Box<dyn Iterator<Item = Certification> + '_> {
self.persistent_pool.certifications().get_all()
self.validated.certifications().get_all()
}

fn insert_validated_certification(&self, certification: Certification) {
if let Some(existing_certification) = self
.persistent_pool
.validated
.certifications()
.get_by_height(certification.height)
.next()
Expand All @@ -116,7 +116,7 @@ impl CertificationPoolImpl {
panic!("Certifications are not expected to be added more than once per height.");
}
} else {
self.persistent_pool
self.validated
.insert(CertificationMessage::Certification(certification))
}
}
Expand Down Expand Up @@ -147,11 +147,11 @@ impl CertificationPoolImpl {
self.validated_pool_metrics
.pool_artifacts
.with_label_values(&[CERTIFICATION_ARTIFACT_TYPE])
.set(self.persistent_pool.certifications().size() as i64);
.set(self.validated.certifications().size() as i64);
self.validated_pool_metrics
.pool_artifacts
.with_label_values(&[CERTIFICATION_SHARE_ARTIFACT_TYPE])
.set(self.persistent_pool.certification_shares().size() as i64);
.set(self.validated.certification_shares().size() as i64);

// Unvalidated artifacts metrics
self.unvalidated_pool_metrics
Expand Down Expand Up @@ -216,7 +216,7 @@ impl MutablePool<CertificationMessage> for CertificationPoolImpl {
.received_artifact_bytes
.with_label_values(&[msg.label()])
.observe(std::mem::size_of_val(&msg) as f64);
self.persistent_pool.insert(msg);
self.validated.insert(msg);
}

ChangeAction::MoveToValidated(msg) => {
Expand All @@ -237,7 +237,7 @@ impl MutablePool<CertificationMessage> for CertificationPoolImpl {

match msg {
CertificationMessage::CertificationShare(share) => {
self.persistent_pool
self.validated
.insert(CertificationMessage::CertificationShare(share));
}
CertificationMessage::Certification(cert) => {
Expand All @@ -253,7 +253,7 @@ impl MutablePool<CertificationMessage> for CertificationPoolImpl {
ChangeAction::RemoveAllBelow(height) => {
self.remove_all_unvalidated_below(height);
transmits.extend(
self.persistent_pool
self.validated
.purge_below(height)
.drain(..)
.map(ArtifactTransmit::Abort),
Expand Down Expand Up @@ -299,23 +299,18 @@ pub trait MutablePoolSection {

impl CertificationPool for CertificationPoolImpl {
fn certification_at_height(&self, height: Height) -> Option<Certification> {
self.persistent_pool
.certifications()
.get_by_height(height)
.next()
self.validated.certifications().get_by_height(height).next()
}

fn shares_at_height(
&self,
height: Height,
) -> Box<dyn Iterator<Item = CertificationShare> + '_> {
self.persistent_pool
.certification_shares()
.get_by_height(height)
self.validated.certification_shares().get_by_height(height)
}

fn validated_shares(&self) -> Box<dyn Iterator<Item = CertificationShare> + '_> {
self.persistent_pool.certification_shares().get_all()
self.validated.certification_shares().get_all()
}

fn unvalidated_shares_at_height(
Expand Down Expand Up @@ -394,8 +389,8 @@ impl ValidatedPoolReader<CertificationMessage> for CertificationPoolImpl {
}

fn get_all_for_broadcast(&self) -> Box<dyn Iterator<Item = CertificationMessage> + '_> {
let certification_range = self.persistent_pool.certifications().height_range();
let share_range = self.persistent_pool.certification_shares().height_range();
let certification_range = self.validated.certifications().height_range();
let share_range = self.validated.certification_shares().height_range();

let ranges = [certification_range.as_ref(), share_range.as_ref()]
.into_iter()
Expand All @@ -408,11 +403,11 @@ impl ValidatedPoolReader<CertificationMessage> for CertificationPoolImpl {
// For all heights above the minimum, return the validated certification of the subnet,
// or the share signed by this node if we don't have the aggregate.
let iterator = (min.get()..=max.get()).map(Height::from).flat_map(|h| {
let mut certifications = self.persistent_pool.certifications().get_by_height(h);
let mut certifications = self.validated.certifications().get_by_height(h);
if let Some(certification) = certifications.next() {
vec![CertificationMessage::Certification(certification)]
} else {
self.persistent_pool
self.validated
.certification_shares()
.get_by_height(h)
.filter(|share| share.signed.signature.signer == self.node_id)
Expand Down Expand Up @@ -798,11 +793,11 @@ mod tests {
let cert_msg = fake_cert(8);

assert!(pool
.persistent_pool
.validated
.get(&CertificationMessageId::from(&share_msg))
.is_none());
assert!(pool
.persistent_pool
.validated
.get(&CertificationMessageId::from(&cert_msg))
.is_none());

Expand All @@ -822,13 +817,13 @@ mod tests {
);
assert_eq!(
share_msg,
pool.persistent_pool
pool.validated
.get(&CertificationMessageId::from(&share_msg))
.unwrap()
);
assert_eq!(
cert_msg,
pool.persistent_pool
pool.validated
.get(&CertificationMessageId::from(&cert_msg))
.unwrap()
);
Expand Down
2 changes: 1 addition & 1 deletion rs/consensus/src/certification/certifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ mod tests {

for height in 1..=4 {
cert_pool
.persistent_pool
.validated
.insert(CertificationMessage::Certification(Certification {
height: Height::from(height),
signed: Signed {
Expand Down
63 changes: 25 additions & 38 deletions rs/recovery/src/steps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Step for MergeCertificationPoolsStep {
"Moving certifications of all nodes to new pool."
);
pools.iter().for_each(|(ip, p)| {
p.persistent_pool.certifications().get_all().for_each(|c| {
p.validated.certifications().get_all().for_each(|c| {
if let Some(cert) = new_pool.certification_at_height(c.height) {
if cert != c {
warn!(
Expand All @@ -176,13 +176,13 @@ impl Step for MergeCertificationPoolsStep {
"Height {}: inserting certification from node {ip}", c.height
);
new_pool
.persistent_pool
.validated
.insert(CertificationMessage::Certification(c))
}
})
});

let max_full_cert = new_pool.persistent_pool.certifications().get_highest().ok();
let max_full_cert = new_pool.validated.certifications().get_highest().ok();

if let Some(cert) = max_full_cert.as_ref() {
info!(
Expand All @@ -196,17 +196,12 @@ impl Step for MergeCertificationPoolsStep {
// Analyze and move shares
let max_cert_share = pools
.values()
.flat_map(|p| {
p.persistent_pool
.certification_shares()
.get_highest_iter()
.next()
})
.flat_map(|p| p.validated.certification_shares().get_highest_iter().next())
.max_by_key(|c| c.height);

let min_share_height = pools
.values()
.flat_map(|p| p.persistent_pool.certification_shares().height_range())
.flat_map(|p| p.validated.certification_shares().height_range())
.map(|range| range.min.get())
.min();

Expand Down Expand Up @@ -247,7 +242,7 @@ impl Step for MergeCertificationPoolsStep {
"Inserting share from node {ip}: {:?}", s.signed
);
new_pool
.persistent_pool
.validated
.insert(CertificationMessage::CertificationShare(s))
});
}
Expand Down Expand Up @@ -1121,25 +1116,25 @@ mod tests {
// only one of them should be kept after the merge.
let cert1 = make_certification(1, vec![1, 2, 3]);
let cert1_2 = make_certification(1, vec![4, 5, 6]);
pool1.persistent_pool.insert(cert1);
pool2.persistent_pool.insert(cert1_2);
pool1.validated.insert(cert1);
pool2.validated.insert(cert1_2);

// Add the same certification for height 2 to both pools,
// it should only exists in the merged pool once.
let cert2 = make_certification(2, vec![1, 2, 3]);
pool1.persistent_pool.insert(cert2.clone());
pool2.persistent_pool.insert(cert2);
pool1.validated.insert(cert2.clone());
pool2.validated.insert(cert2);

// Add two more certifications for heights 3 and 4, one to each pool.
let cert3 = make_certification(3, vec![1, 2, 3]);
let cert4 = make_certification(4, vec![1, 2, 3]);
pool1.persistent_pool.insert(cert4);
pool2.persistent_pool.insert(cert3);
pool1.validated.insert(cert4);
pool2.validated.insert(cert3);

// Add a share at height 3 to one pool. It should not be added to the
// merged pool as it is lower than the highest full certification (4).
let share3 = make_share(3, vec![1], 1);
pool1.persistent_pool.insert(share3);
pool1.validated.insert(share3);

step.exec().expect("Failed to execute step.");

Expand All @@ -1151,19 +1146,15 @@ mod tests {
);

assert_eq!(
new_pool.persistent_pool.certifications().get_all().count(),
new_pool.validated.certifications().get_all().count(),
4 // One for each height 1-4
);
assert_eq!(
new_pool
.persistent_pool
.certification_shares()
.get_all()
.count(),
new_pool.validated.certification_shares().get_all().count(),
0
);
let range = new_pool
.persistent_pool
.validated
.certifications()
.height_range()
.expect("no height range");
Expand Down Expand Up @@ -1193,14 +1184,14 @@ mod tests {
let share6 = make_share(6, vec![6], 1);
let share6_2 = make_share(6, vec![6, 2], 2);

pool1.persistent_pool.insert(cert4);
pool1.persistent_pool.insert(share3);
pool1.persistent_pool.insert(share4);
pool1.persistent_pool.insert(share5.clone());
pool1.persistent_pool.insert(share6_2);
pool1.validated.insert(cert4);
pool1.validated.insert(share3);
pool1.validated.insert(share4);
pool1.validated.insert(share5.clone());
pool1.validated.insert(share6_2);

pool2.persistent_pool.insert(share5);
pool2.persistent_pool.insert(share6);
pool2.validated.insert(share5);
pool2.validated.insert(share6);

step.exec().expect("Failed to execute step.");

Expand All @@ -1212,15 +1203,11 @@ mod tests {
);

assert_eq!(
new_pool
.persistent_pool
.certification_shares()
.get_all()
.count(),
new_pool.validated.certification_shares().get_all().count(),
3 // share5, share6, share6_2
);
let range = new_pool
.persistent_pool
.validated
.certification_shares()
.height_range()
.expect("no height range");
Expand Down
6 changes: 2 additions & 4 deletions rs/replay/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ fn find_malicious_nodes(
) -> HashSet<NodeId> {
let mut malicious = HashSet::new();
if let Some(range) = certification_pool
.persistent_pool
.validated
.certification_shares()
.height_range()
{
Expand Down Expand Up @@ -1494,9 +1494,7 @@ mod tests {
make_share(3, vec![3], 7),
];

shares
.into_iter()
.for_each(|s| pool.persistent_pool.insert(s));
shares.into_iter().for_each(|s| pool.validated.insert(s));

let malicious = find_malicious_nodes(&pool, Height::new(0), &verify);
assert_eq!(malicious.len(), 1);
Expand Down
Loading