Skip to content

Commit

Permalink
feat: add HTTP-accessible emit | broadcast | whisper methods;
Browse files Browse the repository at this point in the history
- Closes #7
- Closes #13
  • Loading branch information
lukeed committed Jan 12, 2022
1 parent 37a375f commit 5aa8bbc
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
40 changes: 39 additions & 1 deletion example/worker/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,50 @@ export class Room extends Replica<Bindings> {
async receive(req: Request) {
console.log('[ HELLO ][receive] req.url', req.url);

let { pathname } = new URL(req.url);
let { pathname, searchParams } = new URL(req.url);

if (pathname === '/ws') {
return this.connect(req);
}

if (pathname === '/broadcast') {
await this.broadcast({
type: 'user:msg',
from: 'HTTP',
text: 'SENT VIA "/broadcast" HTTP route',
time: Date.now(),
});

return new Response('DONE');
}

if (pathname === '/whisper') {
let target = searchParams.get('target') || 'aaa';

await this.whisper(target, {
type: 'user:msg',
from: 'HTTP',
text: 'SENT VIA "/whisper" HTTP route',
time: Date.now(),
meta: 'whisper',
to: target,
});

return new Response('DONE');
}

if (pathname === '/emit') {
this.emit({
type: 'user:msg',
from: 'HTTP',
text: 'SENT VIA "/emit" HTTP route',
time: Date.now(),
meta: 'group'
});

return new Response('DONE');
}

// NOTE: can employ whatever routing logic
return new Response(`PATH: "${pathname}"`);
}
Expand Down
18 changes: 18 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ export abstract class Replica<T extends Bindings> {
*/
connect(req: Request): Promise<Response>;

/**
* Send a message (via HTTP) to WebSockets owned by the Replica
* @NOTE This is the HTTP-accessible version of `Socket.emit`
*/
emit(msg: Message): void;

/**
* Send a message (via HTTP) to ALL WebSockets within the CLUSTER.
* @NOTE This is the HTTP-accessible version of `Socket.broadcast`
*/
broadcast(msg: Message): Promise<void>;

/**
* Send a message (via HTTP) to a specific WebSocket target.
* @NOTE This is the HTTP-accessible version of `Socket.whisper`
*/
whisper(target: string, msg: Message): Promise<void>;

/**
* Respond to another Replica's gossip.
* @NOTE Must return a JSON-serializable value.
Expand Down
17 changes: 16 additions & 1 deletion src/replica.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ export abstract class Replica<T extends ModuleWorker.Bindings> implements DOG.Re
readonly #parent: DurableObjectNamespace;
readonly #self: DurableObjectNamespace;

#gid?: string;

constructor(state: DurableObjectState, env: T) {
this.uid = state.id.toString();
this.#neighbors = new Set;
Expand Down Expand Up @@ -168,7 +170,7 @@ export abstract class Replica<T extends ModuleWorker.Bindings> implements DOG.Re
if (this.#neighbors.size < 1) return [];

let list = await this.#dispatch({
group: 'Q', // ignored
group: this.#gid!,
sender: this.uid, // this replica
route: ROUTES.GOSSIP,
body: msg == null ? msg : JSON.stringify(msg)
Expand All @@ -192,6 +194,7 @@ export abstract class Replica<T extends ModuleWorker.Bindings> implements DOG.Re
}

if (pathname === ROUTES.NEIGHBOR) {
this.#gid = this.#gid || gid; // save
// rid === HEADERS.NEIGHBORID
this.#neighbors.add(rid);
return new Response;
Expand Down Expand Up @@ -248,6 +251,18 @@ export abstract class Replica<T extends ModuleWorker.Bindings> implements DOG.Re
}
}

emit(msg: DOG.Message): void {
this.#emit(this.#gid!, msg, true);
}

broadcast(msg: DOG.Message): Promise<void> {
return this.#broadcast(this.#gid!, this.uid, msg, true);
}

whisper(target: string, msg: DOG.Message): Promise<void> {
return this.#whisper(this.#gid!, this.uid, target, msg);
}

/**
* Share a message ONLY with this REPLICA's connections
*/
Expand Down

0 comments on commit 5aa8bbc

Please sign in to comment.