Skip to content

Commit

Permalink
Merge pull request #33 from bytes-zone/split
Browse files Browse the repository at this point in the history
split
  • Loading branch information
BrianHicks authored Dec 26, 2024
2 parents 510b6c2 + 90524ae commit 5d08f60
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 57 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 25 additions & 12 deletions beeps_core/src/gmap.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::merge::Merge;
use std::hash::Hash;
use std::{
collections::{
hash_map::{Drain, Entry, Iter},
HashMap,
},
fmt,
use std::collections::{
hash_map::{Drain, Entry, Iter},
HashMap,
};
use std::fmt;
use std::hash::Hash;

/// A grow-only map (G-Map.) Allows any hashable type as a key, but values must
/// implement `Merge`.
Expand Down Expand Up @@ -81,6 +79,16 @@ where
K: Eq + Hash,
V: Merge,
{
type Part = (K, V);

fn split(self) -> impl Iterator<Item = Self::Part> {
self.0.into_iter()
}

fn merge_part(&mut self, (key, value): Self::Part) {
self.upsert(key, value);
}

fn merge(mut self, mut other: Self) -> Self {
for (k, v) in other.drain() {
self.upsert(k, v);
Expand Down Expand Up @@ -179,7 +187,7 @@ mod test {

let result = map.get(&"test").unwrap();

prop_assert_eq!(result, &lww1.merge(lww2));
prop_assert_eq!(result, &Merge::merge(lww1, lww2));
}
}
}
Expand All @@ -194,7 +202,7 @@ mod test {
let map1 = GMap::<&str, Lww<i32>>::new();
let map2 = GMap::<&str, Lww<i32>>::new();

let merged = map1.merge(map2);
let merged = Merge::merge(map1, map2);

assert_eq!(merged.len(), 0);
}
Expand All @@ -207,7 +215,7 @@ mod test {
let mut map2 = GMap::<&str, Lww<u8>>::new();
map2.upsert("bar", Lww::new(2, Hlc::zero()));

let merged = map1.merge(map2);
let merged = Merge::merge(map1, map2);

assert_eq!(merged.get(&"foo").unwrap().value(), &1);
assert_eq!(merged.get(&"bar").unwrap().value(), &2);
Expand All @@ -225,14 +233,19 @@ mod test {
let mut map2 = GMap::<&str, Lww<u8>>::new();
map2.upsert("test", lww2.clone());

let merged_lww = lww1.merge(lww2);
let merged_map = map1.merge(map2);
let merged_lww = Merge::merge(lww1, lww2);
let merged_map = Merge::merge(map1, map2);

let result = merged_map.get(&"test").unwrap();

prop_assert_eq!(result, &merged_lww);
}

#[test]
fn merge_or_merge_parts(a: GMap<u8, Lww<bool>>, b: GMap<u8, Lww<bool>>) {
crate::merge::test_merge_or_merge_parts(a, b);
}

#[test]
fn merge_idempotent(a: GMap<u8, Lww<u8>>) {
crate::merge::test_idempotent(a);
Expand Down
22 changes: 20 additions & 2 deletions beeps_core/src/gset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,20 @@ impl<T: Ord> GSet<T> {
}
}

impl<T: Ord> Merge for GSet<T> {
impl<T> Merge for GSet<T>
where
T: Ord,
{
type Part = T;

fn split(self) -> impl Iterator<Item = Self::Part> {
self.0.into_iter()
}

fn merge_part(&mut self, part: Self::Part) {
self.insert(part);
}

fn merge(mut self, mut other: Self) -> Self {
self.0.append(&mut other.0);

Expand Down Expand Up @@ -76,11 +89,16 @@ impl<'a, T: Ord> IntoIterator for &'a GSet<T> {
mod test {
use super::*;

mod test {
mod merge {
use super::*;
use proptest::prelude::*;

proptest! {
#[test]
fn merge_or_merge_parts(a: GSet<u8>, b: GSet<u8>) {
crate::merge::test_merge_or_merge_parts(a, b);
}

#[test]
fn test_idempotent(a: GSet<u8>) {
crate::merge::test_idempotent(a);
Expand Down
31 changes: 21 additions & 10 deletions beeps_core/src/lww.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::{hlc::Hlc, merge::Merge};
use crate::hlc::Hlc;
use crate::merge::Merge;
use core::fmt::{self, Debug, Formatter};
use std::iter;

/// A last-write-wins register. Values can be anything you like. We decide which
/// writes "win" when merging with a hybrid logical clock.
Expand Down Expand Up @@ -43,12 +45,14 @@ impl<T> Merge for Lww<T>
where
T: Clone,
{
fn merge(self, other: Self) -> Self {
if other.clock > self.clock {
other
} else {
self
}
type Part = Lww<T>;

fn split(self) -> impl Iterator<Item = Self::Part> {
iter::once(self)
}

fn merge_part(&mut self, part: Self::Part) {
self.set(part.value, part.clock);
}
}

Expand All @@ -70,7 +74,7 @@ mod test {
fn overwrites_if_clock_is_newer() {
let first_clock = Hlc::zero();

let lww = Lww::new(1, first_clock).merge(Lww::new(2, first_clock.next()));
let lww = Merge::merge(Lww::new(1, first_clock), Lww::new(2, first_clock.next()));

assert_eq!(lww.value, 2);
}
Expand All @@ -79,7 +83,7 @@ mod test {
fn rejects_if_clock_is_equal() {
let first_clock = Hlc::zero();

let lww = Lww::new(1, first_clock).merge(Lww::new(2, first_clock));
let lww = Merge::merge(Lww::new(1, first_clock), Lww::new(2, first_clock));

assert_eq!(lww.value, 1);
}
Expand All @@ -88,7 +92,7 @@ mod test {
fn rejects_if_clock_is_older() {
let first_clock = Hlc::zero();

let merged = Lww::new(1, first_clock.next()).merge(Lww::new(2, first_clock));
let merged = Merge::merge(Lww::new(1, first_clock.next()), Lww::new(2, first_clock));

assert_eq!(merged.value, 1);
}
Expand All @@ -109,4 +113,11 @@ mod test {
crate::merge::test_idempotent(a);
}
}

proptest! {
#[test]
fn merge_or_merge_parts(a: Lww<bool>, b: Lww<bool>) {
crate::merge::test_merge_or_merge_parts(a, b);
}
}
}
101 changes: 73 additions & 28 deletions beeps_core/src/merge.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,102 @@
/// Merge two CRDTs into one.
pub trait Merge {
/// Split a data structure into parts (for sync and storage) and
/// merge them back together later.
pub trait Merge
where
Self: Sized,
{
/// The "parts" that we split this data structure into. These can be
/// whatever you like, but should generally be the smallest parts possible.
type Part;

/// Split this data structure into multiple parts. We use this for
/// delta-state sync as well as getting minimal parts for storage in a
/// database.
///
/// Implementations of this method should follow two principles:
///
/// 1. `to_parts` should return the smallest possible parts.
/// 2. `empty.merge_parts(a.to_parts())` should result in a value equal to
/// the original `a`.
/// 3. `a.merge_parts(b.split())` should be the same as `a.merge(b)`.
fn split(self) -> impl Iterator<Item = Self::Part>;

/// Build a data structure from the given parts. (For example, this is used
/// when we load data from the database.)
fn merge_part(&mut self, part: Self::Part);

/// Merge two `Merge`s into one. This happens when we sync state between
/// replicas. In order for CRDT semantics to hold, this operation must be
/// commutative, associative, and idempotent. There are tests to help
/// guarantee this below.
#[must_use]
fn merge(self, other: Self) -> Self;
fn merge(mut self, other: Self) -> Self {
for part in other.split() {
self.merge_part(part);
}

self
}
}

/// Test that a Merge implementation is idempotent (needed so that merging
/// multiple times does not change the state.)
/// Test that a Merge implementation is idempotent (in other words, merging
/// multiple times should not change the state.)
#[cfg(test)]
pub fn test_idempotent<T>(orig: T)
pub fn test_idempotent<T>(thing: T)
where
T: Merge + Clone + PartialEq + std::fmt::Debug,
{
let a1 = orig.clone();
let a2 = orig.clone();
assert_eq!(thing.clone().merge(thing.clone()), thing);
}

let merged = a1.merge(a2);
/// Test that the implementation is commutative (in other words, the order of
/// merges should not effect the final result.)
#[cfg(test)]
pub fn test_commutative<T>(a: T, b: T)
where
T: Merge + Clone + PartialEq + std::fmt::Debug,
{
let ab = a.clone().merge(b.clone());
let ba = b.merge(a);

assert_eq!(merged, orig, "idempotency failure");
assert_eq!(ab, ba);
}

/// Test that a Merge implementation is commutative (needed so that the order of
/// merges does not matter.)
/// Test that a Merge implementation is associative (in other words, the order
/// in which replicas are merged should not effect the final result.)
#[cfg(test)]
pub fn test_commutative<T>(m1: T, m2: T)
pub fn test_associative<T>(a: T, b: T, c: T)
where
T: Merge + Clone + PartialEq + std::fmt::Debug,
{
let a1 = m1.clone();
let a2 = m2.clone();
let merged1 = a1.merge(a2);

let merged2 = m1.merge(m2);
let ab_c = a.clone().merge(b.clone()).merge(c.clone());
let a_bc = a.merge(b.merge(c));

assert_eq!(merged1, merged2, "commutativity failure");
assert_eq!(ab_c, a_bc);
}

/// Test that a Merge implementation is associative (needed so that the order of
/// replicas does not matter)
/// Test that `merge` and `merge_parts` hold the proper relationship. That is:
///
/// a.merge(b)
///
/// Should give the same result as:
///
/// for part in b.split() {
/// a.merge_part(part)
/// }
///
/// This is only useful if `merge` is implemented separately from `merge_part`,
/// as the default implementation does essentially the second code sample.
#[cfg(test)]
pub fn test_associative<T>(m1: T, m2: T, m3: T)
pub fn test_merge_or_merge_parts<T>(a: T, b: T)
where
T: Merge + Clone + PartialEq + std::fmt::Debug,
{
let a1 = m1.clone();
let a2 = m2.clone();
let a3 = m3.clone();
let merged1 = a1.merge(a2).merge(a3);
let merged = a.clone().merge(b.clone());

let merged2 = m1.merge(m2.merge(m3));
let mut from_parts = a;
for part in b.split() {
from_parts.merge_part(part);
}

assert_eq!(merged1, merged2, "associativity failure");
assert_eq!(from_parts, merged);
}
40 changes: 35 additions & 5 deletions beeps_core/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::gmap::GMap;
use crate::gset::GSet;
use crate::hlc::Hlc;
use crate::lww::Lww;
use crate::merge::Merge;
use crate::{gmap::GMap, gset::GSet};
use chrono::{DateTime, Utc};

/// The state that gets synced between replicas.
Expand Down Expand Up @@ -82,12 +83,41 @@ impl Default for State {
}
}

/// Parts of the `State` that can be split and merged independently.
pub enum Part {
/// A part to be applied to `minutes_per_ping`
MinutesPerPing(Lww<u16>),

/// A part to be applied to `pings`
Ping(DateTime<Utc>),

/// A part to be applied to `tags`
Tag((DateTime<Utc>, Lww<Option<String>>)),
}

impl Merge for State {
fn merge(mut self, other: Self) -> Self {
self.minutes_per_ping = self.minutes_per_ping.merge(other.minutes_per_ping);
self.pings = self.pings.merge(other.pings);
type Part = Part;

fn split(self) -> impl Iterator<Item = Self::Part> {
self.minutes_per_ping
.split()
.map(Part::MinutesPerPing)
.chain(self.pings.split().map(Part::Ping))
.chain(self.tags.split().map(Part::Tag))
}

self
fn merge_part(&mut self, part: Part) {
match part {
Part::MinutesPerPing(part) => {
self.minutes_per_ping.merge_part(part);
}
Part::Ping(part) => {
self.pings.merge_part(part);
}
Part::Tag(part) => {
self.tags.merge_part(part);
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions browser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ wasm-bindgen = "0.2.84"
# code size when deploying.
console_error_panic_hook = { version = "0.1.7", optional = true }
chrono = "0.4.39"
serde_json = "1.0.134"

# Enable this to get wasm-pack to leave names in the file, making it possible to
# analyze with Twiggy.
Expand Down
Loading

0 comments on commit 5d08f60

Please sign in to comment.