Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster_name property to avoid several clusters merging accidentally #36

Merged
merged 5 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scuttlebutt-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
pub struct ApiResponse {
pub cluster_name: String,
pub cluster_state: SerializableClusterState,
pub live_nodes: Vec<NodeId>,
pub dead_nodes: Vec<NodeId>,
Expand Down
2 changes: 2 additions & 0 deletions scuttlebutt-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl Api {
async fn index(&self) -> Json<serde_json::Value> {
let scuttlebutt_guard = self.scuttlebutt.lock().await;
let response = ApiResponse {
cluster_name: scuttlebutt_guard.cluster_name().to_string(),
cluster_state: SerializableClusterState::from(scuttlebutt_guard.cluster_state()),
live_nodes: scuttlebutt_guard.live_nodes().cloned().collect::<Vec<_>>(),
dead_nodes: scuttlebutt_guard.dead_nodes().cloned().collect::<Vec<_>>(),
Expand Down Expand Up @@ -67,6 +68,7 @@ async fn main() -> Result<(), std::io::Error> {
NodeId::from(opt.listen_addr.as_str()),
&opt.seeds[..],
&opt.listen_addr,
"testing".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand Down
1 change: 1 addition & 0 deletions scuttlebutt-test/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ fn test_multiple_nodes() -> anyhow::Result<()> {
.node_states
.get("localhost:10003")
.is_some());
assert_eq!(info.cluster_name, "testing");
assert_eq!(info.live_nodes.len(), 4);
assert_eq!(info.dead_nodes.len(), 0);

Expand Down
2 changes: 1 addition & 1 deletion scuttlebutt/src/digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{NodeId, Version};
///
/// It is equivalent to a map
/// peer -> max version.
#[derive(Debug, Default)]
#[derive(Debug, Default, PartialEq)]
pub struct Digest {
pub(crate) node_max_version: BTreeMap<NodeId, Version>,
}
Expand Down
35 changes: 32 additions & 3 deletions scuttlebutt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use failure_detector::FailureDetectorConfig;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::digest::Digest;
use crate::message::ScuttleButtMessage;
Expand Down Expand Up @@ -120,6 +120,7 @@ pub struct ScuttleButt {
mtu: usize,
address: String,
self_node_id: NodeId,
cluster_name: String,
cluster_state: ClusterState,
heartbeat: u64,
/// The failure detector instance.
Expand All @@ -135,6 +136,7 @@ impl ScuttleButt {
self_node_id: NodeId,
seed_ids: HashSet<String>,
address: String,
cluster_name: String,
initial_key_values: Vec<(impl ToString, impl ToString)>,
failure_detector_config: FailureDetectorConfig,
) -> Self {
Expand All @@ -143,6 +145,7 @@ impl ScuttleButt {
mtu: 60_000,
address,
self_node_id,
cluster_name,
cluster_state: ClusterState::with_seed_ids(seed_ids),
heartbeat: 0,
failure_detector: FailureDetector::new(failure_detector_config),
Expand All @@ -169,12 +172,26 @@ impl ScuttleButt {

pub fn create_syn_message(&mut self) -> ScuttleButtMessage {
let digest = self.compute_digest();
ScuttleButtMessage::Syn { digest }
ScuttleButtMessage::Syn {
cluster_name: self.cluster_name.clone(),
digest,
}
}

pub fn process_message(&mut self, msg: ScuttleButtMessage) -> Option<ScuttleButtMessage> {
match msg {
ScuttleButtMessage::Syn { digest } => {
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => {
if cluster_name != self.cluster_name {
warn!(
cluster_name = %cluster_name,
"rejecting syn message with mismatching cluster name"
);
return Some(ScuttleButtMessage::BadCluster);
}

let self_digest = self.compute_digest();
let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
let delta = self.cluster_state.compute_delta(
Expand Down Expand Up @@ -202,6 +219,10 @@ impl ScuttleButt {
self.cluster_state.apply_delta(delta);
None
}
ScuttleButtMessage::BadCluster => {
warn!("message rejected by peer: cluster name mismatch");
None
}
}
}

Expand Down Expand Up @@ -275,6 +296,10 @@ impl ScuttleButt {
&self.self_node_id
}

pub fn cluster_name(&self) -> &str {
&self.cluster_name
}

/// Computes digest.
///
/// This method also increments the heartbeat, to force the presence
Expand Down Expand Up @@ -351,6 +376,7 @@ mod tests {
NodeId::from(address.as_str()),
seeds,
address,
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig {
dead_node_grace_period: DEAD_NODE_GRACE_PERIOD,
Expand Down Expand Up @@ -417,13 +443,15 @@ mod tests {
NodeId::from("node1"),
HashSet::new(),
"node1".to_string(),
"test-cluster".to_string(),
vec![("key1a", "1"), ("key2a", "2")],
FailureDetectorConfig::default(),
);
let mut node2 = ScuttleButt::with_node_id_and_seeds(
NodeId::from("node2"),
HashSet::new(),
"node2".to_string(),
"test-cluster".to_string(),
vec![("key1b", "1"), ("key2b", "2")],
FailureDetectorConfig::default(),
);
Expand Down Expand Up @@ -570,6 +598,7 @@ mod tests {
NodeId::new("new_node".to_string(), address.clone()),
&["localhost:40001".to_string()],
address,
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand Down
57 changes: 52 additions & 5 deletions scuttlebutt/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,22 @@ use crate::serialize::Serializable;
/// between node A and node B.
/// The names {Syn, SynAck, Ack} of the different steps are borrowed from
/// TCP Handshake.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum ScuttleButtMessage {
/// Node A initiates handshakes.
Syn { digest: Digest },
Syn {
cluster_name: String,
digest: Digest,
},
/// Node B returns a partial update as described
/// in the scuttlebutt reconcialiation algorithm,
/// and returns its own checksum.
SynAck { digest: Digest, delta: Delta },
/// Node A returns a partial update for B.
Ack { delta: Delta },
/// Node B rejects the Syn message because of a
/// cluster name mismatch between the peers.
BadCluster,
}

#[derive(Copy, Clone)]
Expand All @@ -49,6 +55,7 @@ enum MessageType {
Syn = 0,
SynAck = 1u8,
Ack = 2u8,
BadCluster = 3u8,
}

impl MessageType {
Expand All @@ -57,6 +64,7 @@ impl MessageType {
0 => Some(Self::Syn),
1 => Some(Self::SynAck),
2 => Some(Self::Ack),
3 => Some(Self::BadCluster),
_ => None,
}
}
Expand All @@ -68,9 +76,13 @@ impl MessageType {
impl Serializable for ScuttleButtMessage {
fn serialize(&self, buf: &mut Vec<u8>) {
match self {
ScuttleButtMessage::Syn { digest } => {
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => {
buf.push(MessageType::Syn.to_code());
digest.serialize(buf);
cluster_name.serialize(buf);
}
ScuttleButtMessage::SynAck { digest, delta } => {
buf.push(MessageType::SynAck.to_code());
Expand All @@ -81,6 +93,9 @@ impl Serializable for ScuttleButtMessage {
buf.push(MessageType::Ack.to_code());
delta.serialize(buf);
}
ScuttleButtMessage::BadCluster => {
buf.push(MessageType::BadCluster.to_code());
}
}
}

Expand All @@ -94,7 +109,11 @@ impl Serializable for ScuttleButtMessage {
match code {
MessageType::Syn => {
let digest = Digest::deserialize(buf)?;
Ok(Self::Syn { digest })
let cluster_name = String::deserialize(buf)?;
Ok(Self::Syn {
cluster_name,
digest,
})
}
MessageType::SynAck => {
let digest = Digest::deserialize(buf)?;
Expand All @@ -105,16 +124,44 @@ impl Serializable for ScuttleButtMessage {
let delta = Delta::deserialize(buf)?;
Ok(Self::Ack { delta })
}
MessageType::BadCluster => Ok(Self::BadCluster),
}
}

fn serialized_len(&self) -> usize {
match self {
ScuttleButtMessage::Syn { digest } => 1 + digest.serialized_len(),
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => 1 + cluster_name.serialized_len() + digest.serialized_len(),
ScuttleButtMessage::SynAck { digest, delta } => {
1 + digest.serialized_len() + delta.serialized_len()
}
ScuttleButtMessage::Ack { delta } => 1 + delta.serialized_len(),
ScuttleButtMessage::BadCluster => 1,
}
}
}

#[cfg(test)]
mod tests {
use crate::serialize::test_serdeser_aux;
use crate::{Digest, ScuttleButtMessage};

#[test]
fn test_syn() {
let mut digest = Digest::default();
digest.add_node("node1".into(), 1);
digest.add_node("node2".into(), 2);
let syn = ScuttleButtMessage::Syn {
cluster_name: "cluster-a".to_string(),
digest,
};
test_serdeser_aux(&syn, 58);
}

#[test]
fn test_bad_cluster() {
test_serdeser_aux(&ScuttleButtMessage::BadCluster, 1);
}
}
Loading