Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce broadcast API for event sharing #884

Merged
merged 21 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f6ec48a
Implement subscription to broadcast events
gwbaik9717 Aug 20, 2024
6284cfe
Implement publishing broadcast events
gwbaik9717 Aug 20, 2024
0472c0a
Decode broadcasted payload
gwbaik9717 Aug 21, 2024
a1442b7
Add validation logic for serializable payload
gwbaik9717 Aug 21, 2024
323f981
Add test case for throwing error when trying to broadcast unserialize…
gwbaik9717 Aug 21, 2024
b692441
Fix bug in subscribeBroadcastEvent method
gwbaik9717 Aug 21, 2024
66c996a
Add test case for successfully broadcast serializeable payload
gwbaik9717 Aug 22, 2024
bc8b66c
Merge branch 'main' into broadcast-api
gwbaik9717 Aug 22, 2024
b0b2766
Add test cases for subscribing and unsubscribing broadcast events
gwbaik9717 Aug 22, 2024
06d37fa
Modify interface for subscribing broadcast events
gwbaik9717 Aug 26, 2024
8060985
Refactor the broadcast method to be called directly by the document o…
gwbaik9717 Aug 26, 2024
4d516be
Fix lint errors
gwbaik9717 Aug 26, 2024
69650c0
Refactor test code to use EventCollector
gwbaik9717 Aug 27, 2024
86035b8
Refactor by removing unnecessary broadcastEventHanlders
gwbaik9717 Aug 28, 2024
41da599
Refactor test codes
chacha912 Aug 28, 2024
bc49062
Merge branch 'main' into broadcast-api
gwbaik9717 Aug 28, 2024
5ac5e2b
Refactor Broadcast Subscription Interface to Enable Manual Topic Comp…
gwbaik9717 Aug 29, 2024
fe3c0e9
Remove client from document to prevent circular references
gwbaik9717 Aug 29, 2024
e88e3c1
Handle the case when broadcast event fails
gwbaik9717 Aug 30, 2024
43ae22c
Refactor test code to remove undeterministic Promise
gwbaik9717 Aug 30, 2024
ffe525f
Fix bug where publisher receives its own broadcast event
gwbaik9717 Sep 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/sdk/src/client/attachment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,11 @@ export class Attachment<T, P extends Indexable> {
clearTimeout(this.watchLoopTimerID);
this.watchLoopTimerID = undefined;
}

/**
* `unsetClient` unsets the client of the document.
*/
public unsetClient(): void {
this.doc.setClient(null);
}
}
56 changes: 56 additions & 0 deletions packages/sdk/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
import { OpSource } from '@yorkie-js-sdk/src/document/operation/operation';
import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor';
import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor';
import { validateSerializable } from '../util/validator';

/**
* `SyncMode` defines synchronization modes for the PushPullChanges API.
Expand Down Expand Up @@ -303,6 +304,7 @@ export class Client {
}
doc.setActor(this.id!);
doc.update((_, p) => p.set(options.initialPresence || {}));
doc.setClient(this);

const syncMode = options.syncMode ?? SyncMode.Realtime;
return this.enqueueTask(async () => {
Expand Down Expand Up @@ -584,6 +586,59 @@ export class Client {
return this.conditions[condition];
}

/**
* `broadcast` broadcasts the given payload to the given topic.
*/
public broadcast(
docKey: DocumentKey,
topic: string,
payload: any,
): Promise<void> {
if (!this.isActive()) {
throw new YorkieError(
Code.ErrClientNotActivated,
`${this.key} is not active`,
);
}
const attachment = this.attachmentMap.get(docKey);
if (!attachment) {
throw new YorkieError(
Code.ErrDocumentNotAttached,
`${docKey} is not attached`,
);
}

if (!validateSerializable(payload)) {
throw new YorkieError(
Code.ErrInvalidArgument,
'payload is not serializable',
);
}

return this.enqueueTask(async () => {
return this.rpcClient
.broadcast(
{
clientId: this.id!,
documentId: attachment.docID,
topic,
payload: new TextEncoder().encode(JSON.stringify(payload)),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } },
)
.then(() => {
logger.info(
`[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`,
);
})
.catch((err) => {
logger.error(`[BC] c:"${this.getKey()}" err :`, err);
this.handleConnectError(err);
throw err;
});
});
}

/**
* `runSyncLoop` runs the sync loop. The sync loop pushes local changes to
* the server and pulls remote changes from the server.
Expand Down Expand Up @@ -747,6 +802,7 @@ export class Client {
return;
}

attachment.unsetClient();
attachment.cancelWatchStream();
this.attachmentMap.delete(docKey);
}
Expand Down
135 changes: 132 additions & 3 deletions packages/sdk/src/document/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import { History, HistoryOperation } from '@yorkie-js-sdk/src/document/history';
import { setupDevtools } from '@yorkie-js-sdk/src/devtools';
import * as Devtools from '@yorkie-js-sdk/src/devtools/types';
import { Client } from '@yorkie-js-sdk/src/client/client';

/**
* `DocumentOptions` are the options to create a new document.
Expand Down Expand Up @@ -173,6 +174,11 @@
* `PresenceChanged` means that the presences of the client has updated.
*/
PresenceChanged = 'presence-changed',

/**
* `Broadcast` means that the message is broadcasted to clients who subscribe to the event.
*/
Broadcast = 'broadcast',
}

/**
Expand All @@ -191,7 +197,8 @@
| InitializedEvent<P>
| WatchedEvent<P>
| UnwatchedEvent<P>
| PresenceChangedEvent<P>;
| PresenceChangedEvent<P>
| BroadcastEvent;

/**
* `TransactionEvent` represents document events that occur within
Expand Down Expand Up @@ -371,6 +378,11 @@
value: { clientID: ActorID; presence: P };
}

export interface BroadcastEvent extends BaseDocEvent {
type: DocEventType.Broadcast;
value: { topic: string; payload: any };
}

type DocEventCallbackMap<P extends Indexable> = {
default: NextFn<
| SnapshotEvent
Expand All @@ -388,6 +400,7 @@
connection: NextFn<ConnectionChangedEvent>;
status: NextFn<StatusChangedEvent>;
sync: NextFn<SyncStatusChangedEvent>;
broadcast: (topic: string, payload: any) => void;
all: NextFn<TransactionEvent<P>>;
};
export type DocEventTopic = keyof DocEventCallbackMap<never>;
Expand Down Expand Up @@ -539,6 +552,21 @@
Depth
>;

/*
* `SubscribePair` represents the type of the subscribe pair.
*/
type SubscribePair = {
type: string;
};

/*
* `BroadcastSubscribePair` represents the type of the broadcast subscribe pair.
*/
type BroadcastSubscribePair = {
type: 'broadcast';
topic: string;
} & SubscribePair;

/**
* `Document` is a CRDT-based data type. We can represent the model
* of the application and edit it even while offline.
Expand Down Expand Up @@ -589,6 +617,17 @@
*/
private isUpdating: boolean;

/**
* `broadcastEventHandlers` is a map of broadcast event handlers.
* The key is the topic of the broadcast event, and the value is the handler.
*/
private broadcastEventHandlers: Map<
string,
DocEventCallbackMap<P>['broadcast']
>;
chacha912 marked this conversation as resolved.
Show resolved Hide resolved

private client: Client | null = null;

Check failure on line 629 in packages/sdk/src/document/document.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

Don't use `null` as a type. Use undefined instead of null

constructor(key: string, opts?: DocumentOptions) {
this.opts = opts || {};

Expand Down Expand Up @@ -616,6 +655,8 @@
redo: this.redo.bind(this),
};

this.broadcastEventHandlers = new Map();

setupDevtools(this);
}

Expand Down Expand Up @@ -818,6 +859,18 @@
error?: ErrorFn,
complete?: CompleteFn,
): Unsubscribe;
/**
* `subscribe` registers a callback to subscribe to events on the document.
* The callback will be called when the document is changed.
*/
public subscribe(
type: {
type: 'broadcast';
topic: string;
},
next: DocEventCallbackMap<P>['broadcast'],
error?: ErrorFn,
): Unsubscribe;
/**
* `subscribe` registers a callback to subscribe to events on the document.
*/
Expand All @@ -834,7 +887,11 @@
TPath extends PathOf<T>,
TOperationInfo extends OperationInfoOf<T, TPath>,
>(
arg1: TPath | DocEventTopic | DocEventCallbackMap<P>['default'],
arg1:
| TPath
| DocEventTopic
| DocEventCallbackMap<P>['default']
| SubscribePair,
arg2?:
| NextFn<
| LocalChangeEvent<TOperationInfo, P>
Expand Down Expand Up @@ -1024,6 +1081,32 @@
complete,
);
}
if (typeof arg1 === 'object') {
const { type } = arg1 as SubscribePair;

if (type === 'broadcast') {
const { topic } = arg1 as BroadcastSubscribePair;
const handler = arg2 as DocEventCallbackMap<P>['broadcast'];
const error = arg3 as ErrorFn;
this.broadcastEventHandlers.set(topic, handler);
const unsubscribe = this.eventStream.subscribe((event) => {
for (const docEvent of event) {
if (docEvent.type !== DocEventType.Broadcast) {
continue;
}

if (docEvent.value.topic === topic) {
handler(topic, docEvent.value.payload);
}
}
}, error);

return () => {
unsubscribe();
this.broadcastEventHandlers.delete(topic);
};
}
}
throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`);
}

Expand Down Expand Up @@ -1250,6 +1333,15 @@
return this.root.getGarbageLen();
}

/*
* `setClient` sets the client of this document.
*
* @internal
*/
public setClient(client: Client | null): void {

Check failure on line 1341 in packages/sdk/src/document/document.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

Missing JSDoc comment

Check failure on line 1341 in packages/sdk/src/document/document.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

Don't use `null` as a type. Use undefined instead of null
this.client = client;
}

/**
* `getGarbageLenFromClone` returns the length of elements should be purged from clone.
*/
Expand Down Expand Up @@ -1468,7 +1560,8 @@

if (resp.body.case === 'event') {
const { type, publisher } = resp.body.value;
const event: Array<WatchedEvent<P> | UnwatchedEvent<P>> = [];
const event: Array<WatchedEvent<P> | UnwatchedEvent<P> | BroadcastEvent> =
[];
if (type === PbDocEventType.DOCUMENT_WATCHED) {
this.addOnlineClient(publisher);
// NOTE(chacha912): We added to onlineClients, but we won't trigger watched event
Expand All @@ -1495,6 +1588,16 @@
value: { clientID: publisher, presence },
});
}
} else if (type === PbDocEventType.DOCUMENT_BROADCAST) {
if (resp.body.value.body) {
const { topic, payload } = resp.body.value.body;
const decoder = new TextDecoder();

event.push({
type: DocEventType.Broadcast,
value: { topic, payload: JSON.parse(decoder.decode(payload)) },
});
}
}

if (event.length > 0) {
Expand Down Expand Up @@ -1584,6 +1687,14 @@
const { clientID, presence } = event.value;
this.presences.set(clientID, presence);
}

if (event.type === DocEventType.Broadcast) {
const { topic, payload } = event.value;
const handler = this.broadcastEventHandlers.get(topic);
if (handler) {
handler(topic, payload);
}
}
}

/**
Expand Down Expand Up @@ -1970,4 +2081,22 @@
public getRedoStackForTest(): Array<Array<HistoryOperation<P>>> {
return this.internalHistory.getRedoStackForTest();
}

/**
* `broadcast` the payload to the given topic.
*/
public broadcast(topic: string, payload: any): Promise<void> {
chacha912 marked this conversation as resolved.
Show resolved Hide resolved
if (this.client) {
try {

Check failure on line 2090 in packages/sdk/src/document/document.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

Unnecessary try/catch wrapper
return this.client.broadcast(this.getKey(), topic, payload);
} catch (e) {
throw e;
}
}

throw new YorkieError(
Code.ErrClientNotFound,
'Document is not attached to a client',
);
}
gwbaik9717 marked this conversation as resolved.
Show resolved Hide resolved
}
31 changes: 31 additions & 0 deletions packages/sdk/src/util/validator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* `validateSerializable` returns whether the given value is serializable or not.
*/
export const validateSerializable = (value: any): boolean => {
try {
const serialized = JSON.stringify(value);

if (serialized === undefined) {
return false;
}
} catch (error) {
return false;
}
return true;
};
Loading
Loading