Skip to content

Commit

Permalink
return BadCluster on cluster name mismatch
Browse files Browse the repository at this point in the history
This will make testing and operations easier, by explicitly
refusing the message instead of silently dropping it.
  • Loading branch information
xvello committed Apr 8, 2022
1 parent 63128f0 commit 1318657
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 13 deletions.
2 changes: 1 addition & 1 deletion scuttlebutt/src/digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{NodeId, Version};
///
/// It is equivalent to a map
/// peer -> max version.
#[derive(Debug, Default)]
#[derive(Debug, Default, PartialEq)]
pub struct Digest {
pub(crate) node_max_version: BTreeMap<NodeId, Version>,
}
Expand Down
8 changes: 6 additions & 2 deletions scuttlebutt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,9 @@ impl ScuttleButt {
if cluster_name != self.cluster_name {
warn!(
cluster_name = ?cluster_name,
"ignoring syn message with mismatching cluster name"
"rejecting syn message with mismatching cluster name"
);
return None;
return Some(ScuttleButtMessage::BadCluster);
}

let self_digest = self.compute_digest();
Expand Down Expand Up @@ -219,6 +219,10 @@ impl ScuttleButt {
self.cluster_state.apply_delta(delta);
None
}
ScuttleButtMessage::BadCluster => {
warn!("message rejected by peer: cluster name mismatch");
return None;
}
}
}

Expand Down
35 changes: 34 additions & 1 deletion scuttlebutt/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::serialize::Serializable;
/// between node A and node B.
/// The names {Syn, SynAck, Ack} of the different steps are borrowed from
/// TCP Handshake.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum ScuttleButtMessage {
/// Node A initiates handshakes.
Syn {
Expand All @@ -44,6 +44,9 @@ pub enum ScuttleButtMessage {
SynAck { digest: Digest, delta: Delta },
/// Node A returns a partial update for B.
Ack { delta: Delta },
/// Node B rejects the Syn message because of a
/// cluster name mismatch between the peers.
BadCluster,
}

#[derive(Copy, Clone)]
Expand All @@ -52,6 +55,7 @@ enum MessageType {
Syn = 0,
SynAck = 1u8,
Ack = 2u8,
BadCluster = 3u8,
}

impl MessageType {
Expand All @@ -60,6 +64,7 @@ impl MessageType {
0 => Some(Self::Syn),
1 => Some(Self::SynAck),
2 => Some(Self::Ack),
3 => Some(Self::BadCluster),
_ => None,
}
}
Expand Down Expand Up @@ -88,6 +93,9 @@ impl Serializable for ScuttleButtMessage {
buf.push(MessageType::Ack.to_code());
delta.serialize(buf);
}
ScuttleButtMessage::BadCluster => {
buf.push(MessageType::BadCluster.to_code());
}
}
}

Expand Down Expand Up @@ -116,6 +124,7 @@ impl Serializable for ScuttleButtMessage {
let delta = Delta::deserialize(buf)?;
Ok(Self::Ack { delta })
}
MessageType::BadCluster => Ok(Self::BadCluster),
}
}

Expand All @@ -129,6 +138,30 @@ impl Serializable for ScuttleButtMessage {
1 + digest.serialized_len() + delta.serialized_len()
}
ScuttleButtMessage::Ack { delta } => 1 + delta.serialized_len(),
ScuttleButtMessage::BadCluster => 1,
}
}
}

#[cfg(test)]
mod tests {
use crate::{Digest, ScuttleButtMessage};
use crate::serialize::test_serdeser_aux;

#[test]
fn test_syn() {
let mut digest = Digest::default();
digest.add_node("node1".into(), 1);
digest.add_node("node2".into(), 2);
let syn = ScuttleButtMessage::Syn {
cluster_name: "cluster-a".to_string(),
digest
};
test_serdeser_aux(&syn, 58);
}

#[test]
fn test_bad_cluster() {
test_serdeser_aux(&ScuttleButtMessage::BadCluster, 1);
}
}
22 changes: 13 additions & 9 deletions scuttlebutt/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,18 +463,19 @@ mod tests {
}

#[tokio::test]
async fn ignore_mismatched_cluster_name() {
let server_addr = "0.0.0.0:2223";
let socket = UdpSocket::bind("0.0.0.0:2224").await.unwrap();
async fn syn_bad_cluster() {
let outsider_addr = "0.0.0.0:2224";
let socket = UdpSocket::bind(outsider_addr).await.unwrap();
let mut outsider = ScuttleButt::with_node_id_and_seeds(
"offline".into(),
outsider_addr.into(),
HashSet::new(),
"offline".to_string(),
outsider_addr.into(),
"another-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);

let server_addr = "0.0.0.0:2223";
let server = ScuttleServer::spawn(
server_addr.into(),
&[],
Expand All @@ -489,11 +490,14 @@ mod tests {
syn.serialize(&mut buf);
socket.send_to(&buf[..], server_addr).await.unwrap();

// server will drop the message, we expect the recv to timeout
let mut buf = [0; super::UDP_MTU];
let resp =
tokio::time::timeout(Duration::from_millis(100), socket.recv_from(&mut buf)).await;
assert!(resp.is_err(), "unexpected response from peer");
let (len, _addr) = timeout(socket.recv_from(&mut buf)).await.unwrap();

let msg = ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap();
match msg {
ScuttleButtMessage::BadCluster => (),
message => panic!("unexpected message: {:?}", message),
}

server.shutdown().await.unwrap();
}
Expand Down

0 comments on commit 1318657

Please sign in to comment.