Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster_name property to avoid several clusters merging accidentally #36

Merged
merged 5 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scuttlebutt-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
pub struct ApiResponse {
pub cluster_name: String,
pub cluster_state: SerializableClusterState,
pub live_nodes: Vec<NodeId>,
pub dead_nodes: Vec<NodeId>,
Expand Down
2 changes: 2 additions & 0 deletions scuttlebutt-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl Api {
async fn index(&self) -> Json<serde_json::Value> {
let scuttlebutt_guard = self.scuttlebutt.lock().await;
let response = ApiResponse {
cluster_name: scuttlebutt_guard.cluster_name().to_string(),
cluster_state: SerializableClusterState::from(scuttlebutt_guard.cluster_state()),
live_nodes: scuttlebutt_guard.live_nodes().cloned().collect::<Vec<_>>(),
dead_nodes: scuttlebutt_guard.dead_nodes().cloned().collect::<Vec<_>>(),
Expand Down Expand Up @@ -67,6 +68,7 @@ async fn main() -> Result<(), std::io::Error> {
NodeId::from(opt.listen_addr.as_str()),
&opt.seeds[..],
&opt.listen_addr,
"testing".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand Down
1 change: 1 addition & 0 deletions scuttlebutt-test/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ fn test_multiple_nodes() -> anyhow::Result<()> {
.node_states
.get("localhost:10003")
.is_some());
assert_eq!(info.cluster_name, "testing");
assert_eq!(info.live_nodes.len(), 4);
assert_eq!(info.dead_nodes.len(), 0);

Expand Down
31 changes: 28 additions & 3 deletions scuttlebutt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use failure_detector::FailureDetectorConfig;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::digest::Digest;
use crate::message::ScuttleButtMessage;
Expand Down Expand Up @@ -120,6 +120,7 @@ pub struct ScuttleButt {
mtu: usize,
address: String,
self_node_id: NodeId,
cluster_name: String,
cluster_state: ClusterState,
heartbeat: u64,
/// The failure detector instance.
Expand All @@ -135,6 +136,7 @@ impl ScuttleButt {
self_node_id: NodeId,
seed_ids: HashSet<String>,
address: String,
cluster_name: String,
initial_key_values: Vec<(impl ToString, impl ToString)>,
failure_detector_config: FailureDetectorConfig,
) -> Self {
Expand All @@ -143,6 +145,7 @@ impl ScuttleButt {
mtu: 60_000,
address,
self_node_id,
cluster_name,
cluster_state: ClusterState::with_seed_ids(seed_ids),
heartbeat: 0,
failure_detector: FailureDetector::new(failure_detector_config),
Expand All @@ -169,12 +172,26 @@ impl ScuttleButt {

pub fn create_syn_message(&mut self) -> ScuttleButtMessage {
let digest = self.compute_digest();
ScuttleButtMessage::Syn { digest }
ScuttleButtMessage::Syn {
cluster_name: self.cluster_name.clone(),
digest,
}
}

pub fn process_message(&mut self, msg: ScuttleButtMessage) -> Option<ScuttleButtMessage> {
match msg {
ScuttleButtMessage::Syn { digest } => {
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => {
if cluster_name != self.cluster_name {
warn!(
cluster_name = ?cluster_name,
fulmicoton marked this conversation as resolved.
Show resolved Hide resolved
"ignoring syn message with mismatching cluster name"
);
return None;
}

let self_digest = self.compute_digest();
let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
let delta = self.cluster_state.compute_delta(
Expand Down Expand Up @@ -275,6 +292,10 @@ impl ScuttleButt {
&self.self_node_id
}

pub fn cluster_name(&self) -> &str {
&self.cluster_name
}

/// Computes digest.
///
/// This method also increments the heartbeat, to force the presence
Expand Down Expand Up @@ -351,6 +372,7 @@ mod tests {
NodeId::from(address.as_str()),
seeds,
address,
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig {
dead_node_grace_period: DEAD_NODE_GRACE_PERIOD,
Expand Down Expand Up @@ -417,13 +439,15 @@ mod tests {
NodeId::from("node1"),
HashSet::new(),
"node1".to_string(),
"test-cluster".to_string(),
vec![("key1a", "1"), ("key2a", "2")],
FailureDetectorConfig::default(),
);
let mut node2 = ScuttleButt::with_node_id_and_seeds(
NodeId::from("node2"),
HashSet::new(),
"node2".to_string(),
"test-cluster".to_string(),
vec![("key1b", "1"), ("key2b", "2")],
FailureDetectorConfig::default(),
);
Expand Down Expand Up @@ -570,6 +594,7 @@ mod tests {
NodeId::new("new_node".to_string(), address.clone()),
&["localhost:40001".to_string()],
address,
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand Down
22 changes: 18 additions & 4 deletions scuttlebutt/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use crate::serialize::Serializable;
#[derive(Debug)]
pub enum ScuttleButtMessage {
/// Node A initiates handshakes.
Syn { digest: Digest },
Syn {
cluster_name: String,
digest: Digest,
},
/// Node B returns a partial update as described
/// in the scuttlebutt reconcialiation algorithm,
/// and returns its own checksum.
Expand Down Expand Up @@ -68,9 +71,13 @@ impl MessageType {
impl Serializable for ScuttleButtMessage {
fn serialize(&self, buf: &mut Vec<u8>) {
match self {
ScuttleButtMessage::Syn { digest } => {
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => {
buf.push(MessageType::Syn.to_code());
digest.serialize(buf);
cluster_name.serialize(buf);
}
ScuttleButtMessage::SynAck { digest, delta } => {
buf.push(MessageType::SynAck.to_code());
Expand All @@ -94,7 +101,11 @@ impl Serializable for ScuttleButtMessage {
match code {
MessageType::Syn => {
let digest = Digest::deserialize(buf)?;
Ok(Self::Syn { digest })
let cluster_name = String::deserialize(buf)?;
Ok(Self::Syn {
cluster_name,
digest,
})
}
MessageType::SynAck => {
let digest = Digest::deserialize(buf)?;
Expand All @@ -110,7 +121,10 @@ impl Serializable for ScuttleButtMessage {

fn serialized_len(&self) -> usize {
match self {
ScuttleButtMessage::Syn { digest } => 1 + digest.serialized_len(),
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => 1 + cluster_name.serialized_len() + digest.serialized_len(),
ScuttleButtMessage::SynAck { digest, delta } => {
1 + digest.serialized_len() + delta.serialized_len()
}
Expand Down
58 changes: 57 additions & 1 deletion scuttlebutt/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl ScuttleServer {
node_id: NodeId,
seed_nodes: &[String],
address: impl Into<String>,
cluster_name: String,
initial_key_values: Vec<(impl ToString, impl ToString)>,
failure_detector_config: FailureDetectorConfig,
) -> Self {
Expand All @@ -72,6 +73,7 @@ impl ScuttleServer {
node_id,
seed_nodes.iter().cloned().collect(),
address.into(),
cluster_name,
initial_key_values,
failure_detector_config,
);
Expand Down Expand Up @@ -397,6 +399,7 @@ mod tests {
"0.0.0.0:1112".into(),
&[],
"0.0.0.0:1112",
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand All @@ -407,7 +410,13 @@ mod tests {

let msg = ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap();
match msg {
ScuttleButtMessage::Syn { .. } => (),
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => {
assert_eq!(cluster_name, "test-cluster");
assert_eq!(digest.node_max_version.len(), 1);
}
message => panic!("unexpected message: {:?}", message),
}

Expand All @@ -422,6 +431,7 @@ mod tests {
"offline".into(),
HashSet::new(),
"offline".to_string(),
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand All @@ -430,6 +440,7 @@ mod tests {
server_addr.into(),
&[],
server_addr,
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand All @@ -451,13 +462,50 @@ mod tests {
server.shutdown().await.unwrap();
}

#[tokio::test]
async fn ignore_mismatched_cluster_name() {
let server_addr = "0.0.0.0:2223";
let socket = UdpSocket::bind("0.0.0.0:2224").await.unwrap();
let mut outsider = ScuttleButt::with_node_id_and_seeds(
"offline".into(),
HashSet::new(),
"offline".to_string(),
"another-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);

let server = ScuttleServer::spawn(
server_addr.into(),
&[],
server_addr,
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);

let mut buf = Vec::new();
let syn = outsider.create_syn_message();
syn.serialize(&mut buf);
socket.send_to(&buf[..], server_addr).await.unwrap();

// server will drop the message, we expect the recv to timeout
let mut buf = [0; super::UDP_MTU];
let resp =
tokio::time::timeout(Duration::from_millis(100), socket.recv_from(&mut buf)).await;
assert!(resp.is_err(), "unexpected response from peer");

server.shutdown().await.unwrap();
}

#[tokio::test]
async fn ignore_broken_payload() {
let server_addr = "0.0.0.0:3331";
let server = ScuttleServer::spawn(
server_addr.into(),
&[],
server_addr,
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand All @@ -466,6 +514,7 @@ mod tests {
"offline".into(),
HashSet::new(),
"offline".to_string(),
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand Down Expand Up @@ -497,6 +546,7 @@ mod tests {
server_addr.into(),
&[],
server_addr,
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand All @@ -505,6 +555,7 @@ mod tests {
"offline".into(),
HashSet::new(),
"offline".to_string(),
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand Down Expand Up @@ -537,6 +588,7 @@ mod tests {
"0.0.0.0:5552".into(),
&[server_addr.into()],
"0.0.0.0:5552",
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand All @@ -559,6 +611,7 @@ mod tests {
test_addr.into(),
HashSet::new(),
test_addr.to_string(),
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand All @@ -570,6 +623,7 @@ mod tests {
NodeId::from(server_addr),
&[],
server_addr,
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand Down Expand Up @@ -617,13 +671,15 @@ mod tests {
NodeId::from("0.0.0.0:6663"),
&[],
"0.0.0.0:6663",
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
let node2 = ScuttleServer::spawn(
NodeId::from("0.0.0.0:6664"),
&["0.0.0.0:6663".to_string()],
"0.0.0.0:6664",
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);
Expand Down