Skip to content

Commit

Permalink
Merge pull request #681 from streamich/json-crdt-improvements
Browse files Browse the repository at this point in the history
JSON CRDT improvements
  • Loading branch information
streamich authored Jul 28, 2024
2 parents 5ab4105 + 808a99c commit a479799
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 40 deletions.
44 changes: 26 additions & 18 deletions src/json-crdt-patch/Patch.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as operations from './operations';
import {ITimestampStruct, ts, printTs} from './clock';
import {SESSION} from './constants';
import {ITimestampStruct, ts, printTs, Timestamp} from './clock';
import {printTree} from 'tree-dump/lib/printTree';
import {encode, decode} from './codec/binary';
import type {Printable} from 'tree-dump/lib/types';
Expand Down Expand Up @@ -121,7 +120,8 @@ export class Patch implements Printable {
for (let i = 0; i < length; i++) {
const op = ops[i];
if (op instanceof operations.DelOp) patchOps.push(new operations.DelOp(ts(op.id), ts(op.obj), op.what));
else if (op instanceof operations.NewConOp) patchOps.push(new operations.NewConOp(ts(op.id), op.val));
else if (op instanceof operations.NewConOp)
patchOps.push(new operations.NewConOp(ts(op.id), op.val instanceof Timestamp ? ts(op.val) : op.val));
else if (op instanceof operations.NewVecOp) patchOps.push(new operations.NewVecOp(ts(op.id)));
else if (op instanceof operations.NewValOp) patchOps.push(new operations.NewValOp(ts(op.id)));
else if (op instanceof operations.NewObjOp) patchOps.push(new operations.NewObjOp(ts(op.id)));
Expand All @@ -144,36 +144,44 @@ export class Patch implements Printable {
op.data.map(([key, value]) => [key, ts(value)]),
),
);
else if (op instanceof operations.InsVecOp)
patchOps.push(
new operations.InsVecOp(
ts(op.id),
ts(op.obj),
op.data.map(([key, value]) => [key, ts(value)]),
),
);
else if (op instanceof operations.NopOp) patchOps.push(new operations.NopOp(ts(op.id), op.len));
}
return patch;
}

/**
* The .rebase() operation is meant to work only with patch that use
* the server clock. When receiving a patch from a client, the starting
* ID of the patch can be out of sync with the server clock. For example,
* if some other user has in the meantime pushed operations to the server.
* The `.rebase()` operation is meant to be applied to patches which have not
* yet been advertised to the server (other peers), or when
* the server clock is used and concurrent change on the server happened.
*
* The .rebase() operation returns a new `Patch` with the IDs recalculated
* such that the first operation has ID of the patch is equal to the
* actual server time tip.
* such that the first operation has the `time` equal to `newTime`.
*
* @param serverTime Real server time tip (ID of the next expected operation).
* @param newTime Time where the patch ID should begin (ID of the first operation).
* @param transformAfter Time after (and including) which the IDs should be
* transformed. If not specified, equals to the time of the first operation.
*/
public rebase(serverTime: number, transformHorizon: number): Patch {
public rebase(newTime: number, transformAfter?: number): Patch {
const id = this.getId();
if (!id) throw new Error('EMPTY_PATCH');
const sid = id.sid;
const patchStartTime = id.time;
if (patchStartTime === serverTime) return this;
const delta = serverTime - patchStartTime;
transformAfter ??= patchStartTime;
if (patchStartTime === newTime) return this;
const delta = newTime - patchStartTime;
return this.rewriteTime((id: ITimestampStruct): ITimestampStruct => {
const sessionId = id.sid;
const isServerTimestamp = sessionId === SESSION.SERVER;
if (!isServerTimestamp) return id;
if (id.sid !== sid) return id;
const time = id.time;
if (time < transformHorizon) return id;
return ts(SESSION.SERVER, time + delta);
if (time < transformAfter) return id;
return ts(sid, time + delta);
});
}

Expand Down
44 changes: 43 additions & 1 deletion src/json-crdt-patch/__tests__/Patch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import {Model} from '../../json-crdt/model';
import {s} from '../builder/schema';
import {LogicalClock, ts} from '../clock';
import {SESSION} from '../constants';
import {InsArrOp} from '../operations';
Expand All @@ -13,7 +15,7 @@ describe('.rebase()', () => {
expect(patch.ops[0].id.time).toBe(10);
});

test('does not rewrite references, which are commited on the server', () => {
test('does not rewrite references, which are committed on the server', () => {
const builder = new PatchBuilder(new LogicalClock(SESSION.SERVER, 5));
builder.insArr(ts(SESSION.SERVER, 3), ts(SESSION.SERVER, 3), [ts(0, 10)]);
expect((builder.patch.ops[0] as InsArrOp).ref.time).toBe(3);
Expand All @@ -28,5 +30,45 @@ describe('.rebase()', () => {
const patch = builder.patch.rebase(10, 5);
expect((patch.ops[0] as InsArrOp).ref.time).toBe(12);
});

test('can advance patch ID', () => {
const model = Model.create();
model.api.root({
foo: 'bar',
num: s.con(123),
arr: [null],
vec: s.vec(s.con(1), s.con(2)),
id: s.con(ts(4, 5)),
val: s.val(s.con('asdf')),
bin: s.bin(new Uint8Array([1, 2, 3])),
});
const patch1 = model.api.flush();
const patch2 = patch1.rebase(1000);
expect(patch1.getId()!.time).not.toBe(1000);
expect(patch2.getId()!.time).toBe(1000);
expect(patch2.getId()!.sid).toBe(patch1.getId()!.sid);
const model2 = Model.create();
model2.applyPatch(patch2);
expect(model2.view()).toEqual(model.view());
});

test('transforms "con" ID values, if they share the patch SID', () => {
const model = Model.create();
const sid = model.clock.sid;
model.api.root({
id1: s.con(ts(4, 5)),
id2: s.con(ts(model.clock.sid, 5)),
});
const patch1 = model.api.flush();
const base = patch1.getId()!.time;
const patch2 = patch1.rebase(1000);
expect(patch1.getId()!.time).not.toBe(1000);
expect(patch2.getId()!.time).toBe(1000);
expect(patch2.getId()!.sid).toBe(patch1.getId()!.sid);
const model2 = Model.create();
model2.applyPatch(patch2);
expect((model2.view() as any).id1).toEqual(ts(4, 5));
expect((model2.view() as any).id2).toEqual(ts(sid, 1000 - base + 5));
});
});
});
26 changes: 22 additions & 4 deletions src/json-crdt/model/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ export class Model<N extends JsonNode = JsonNode<any>> implements Printable {
*/
public static readonly sid = randomSessionId;

/**
* Use this method to generate a random session ID for an existing document.
* It checks for the uniqueness of the session ID given the current peers in
* the document. This reduces the chance of collision substantially.
*
* @returns A random session ID that is not used by any peer in the current
* document.
*/
public rndSid(): number {
const clock = this.clock;
const sid = clock.sid;
const peers = clock.peers;
while (true) {
const candidate = randomSessionId();
if (sid !== candidate && !peers.has(candidate)) return candidate;
}
}

/**
* Create a CRDT model which uses logical clock. Logical clock assigns a
* logical timestamp to every node and operation. Logical timestamp consists
Expand All @@ -46,7 +64,7 @@ export class Model<N extends JsonNode = JsonNode<any>> implements Printable {
* @deprecated Use `Model.create()` instead.
*/
public static readonly withLogicalClock = (clockOrSessionId?: clock.ClockVector | number): Model => {
return Model.create(undefined, clockOrSessionId);
return Model.create(void 0, clockOrSessionId);
};

/**
Expand All @@ -62,7 +80,7 @@ export class Model<N extends JsonNode = JsonNode<any>> implements Printable {
* @deprecated Use `Model.create()` instead: `Model.create(undefined, SESSION.SERVER)`.
*/
public static readonly withServerClock = (time: number = 1): Model => {
return Model.create(undefined, new clock.ServerClockVector(SESSION.SERVER, time));
return Model.create(void 0, new clock.ServerClockVector(SESSION.SERVER, time));
};

/**
Expand Down Expand Up @@ -193,7 +211,7 @@ export class Model<N extends JsonNode = JsonNode<any>> implements Printable {
const first = patches[0];
const sid = first.getId()!.sid;
if (!sid) throw new Error('NO_SID');
const model = Model.withLogicalClock(sid);
const model = Model.create(void 0, sid);
model.applyBatch(patches);
return model;
}
Expand Down Expand Up @@ -436,7 +454,7 @@ export class Model<N extends JsonNode = JsonNode<any>> implements Printable {
* @param sessionId Session ID to use for the new model.
* @returns A copy of this model with a new session ID.
*/
public fork(sessionId: number = Model.sid()): Model<N> {
public fork(sessionId: number = this.rndSid()): Model<N> {
const copy = Model.fromBinary(this.toBinary()) as unknown as Model<N>;
if (copy.clock.sid !== sessionId && copy.clock instanceof clock.ClockVector)
copy.clock = copy.clock.fork(sessionId);
Expand Down
51 changes: 34 additions & 17 deletions src/json-crdt/model/__tests__/Model.cloning.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {Model} from '../Model';

describe('clone()', () => {
test('can clone a simple document', () => {
const doc1 = Model.withLogicalClock();
const doc1 = Model.create();
const builder1 = new PatchBuilder(doc1.clock);
const obj = builder1.json({foo: 'bar', gg: [123]});
builder1.root(obj);
Expand All @@ -17,7 +17,7 @@ describe('clone()', () => {
});

test('can modify the cloned copy independently', () => {
const doc1 = Model.withLogicalClock();
const doc1 = Model.create();
const builder1 = new PatchBuilder(doc1.clock);
const obj = builder1.json({foo: 'bar', hh: true});
builder1.root(obj);
Expand All @@ -34,7 +34,7 @@ describe('clone()', () => {
});

test('can clone a document with string edits', () => {
const doc1 = Model.withLogicalClock();
const doc1 = Model.create();
doc1.api.root({
foo: 'abc',
});
Expand All @@ -48,7 +48,7 @@ describe('clone()', () => {
});

test('can clone a document with string deletes', () => {
const doc1 = Model.withLogicalClock();
const doc1 = Model.create();
doc1.api.root({
foo: 'abc',
});
Expand All @@ -63,7 +63,7 @@ describe('clone()', () => {
});

test('can clone a document with object edits', () => {
const doc1 = Model.withLogicalClock();
const doc1 = Model.create();
doc1.api.root({
foo: {
a: 1,
Expand All @@ -90,7 +90,7 @@ describe('clone()', () => {
});

test('can clone array with edits', () => {
const doc1 = Model.withLogicalClock();
const doc1 = Model.create();
doc1.api.root({
foo: {
a: [1],
Expand All @@ -110,7 +110,7 @@ describe('clone()', () => {
});

test('can clone an empty model', () => {
const doc1 = Model.withLogicalClock();
const doc1 = Model.create();
const doc2 = doc1.clone();
expect(doc1.clock.sid === doc2.clock.sid).toBe(true);
expect(doc1.view()).toBe(undefined);
Expand All @@ -126,7 +126,7 @@ describe('clone()', () => {

describe('fork()', () => {
test('can fork a simple document', () => {
const doc1 = Model.withLogicalClock();
const doc1 = Model.create();
const builder1 = new PatchBuilder(doc1.clock);
const obj = builder1.json([1, 2, 'lol']);
builder1.root(obj);
Expand All @@ -138,7 +138,7 @@ describe('fork()', () => {
});

test('forked document has a different session ID', () => {
const doc1 = Model.withLogicalClock();
const doc1 = Model.create();
const builder1 = new PatchBuilder(doc1.clock);
const obj = builder1.json([1, 2, 'lol']);
builder1.root(obj);
Expand All @@ -149,7 +149,7 @@ describe('fork()', () => {
});

test('can modify the cloned copy independently', () => {
const doc1 = Model.withLogicalClock();
const doc1 = Model.create();
const builder1 = new PatchBuilder(doc1.clock);
const arr = builder1.json([1, 2, 'lol']);
builder1.root(arr);
Expand All @@ -165,12 +165,29 @@ describe('fork()', () => {
expect(doc2.view()).toEqual([true, 1, 2, 'lol']);
expect(doc1.clock.sid !== doc2.clock.sid).toBe(true);
});

test('does not reuse existing session IDs when forking', () => {
const rnd = Math.random;
let i = 0;
Math.random = () => {
i++;
return i < 20 ? 0.5 : i < 24 ? 0.1 : i < 30 ? 0.5 : rnd();
};
const model = Model.create();
model.api.root(123);
const model2 = model.fork();
const model3 = model2.fork();
expect(model.clock.sid).not.toBe(model2.clock.sid);
expect(model3.clock.sid).not.toBe(model2.clock.sid);
expect(model3.clock.sid).not.toBe(model.clock.sid);
Math.random = rnd;
});
});

describe('reset()', () => {
test('resets model state', () => {
const doc1 = Model.withLogicalClock();
const doc2 = Model.withLogicalClock();
const doc1 = Model.create();
const doc2 = Model.create();
doc1.api.root({foo: 123});
doc2.api.root({
text: 'hello',
Expand All @@ -190,8 +207,8 @@ describe('reset()', () => {
});

test('models can be edited separately', () => {
const doc1 = Model.withLogicalClock();
const doc2 = Model.withLogicalClock();
const doc1 = Model.create();
const doc2 = Model.create();
doc1.api.root({foo: 123});
doc2.api.root({
text: 'hello',
Expand All @@ -207,8 +224,8 @@ describe('reset()', () => {
});

test('emits change event on reset', async () => {
const doc1 = Model.withLogicalClock();
const doc2 = Model.withLogicalClock();
const doc1 = Model.create();
const doc2 = Model.create();
doc1.api.root({foo: 123});
doc2.api.root({
text: 'hello',
Expand All @@ -222,7 +239,7 @@ describe('reset()', () => {
});

test('preserves API nodes when model is reset', async () => {
const doc1 = Model.withLogicalClock().setSchema(
const doc1 = Model.create().setSchema(
schema.obj({
text: schema.str('hell'),
}),
Expand Down

0 comments on commit a479799

Please sign in to comment.