Skip to content

Commit

Permalink
Passing signal to metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Jan 25, 2025
1 parent bbee53a commit d257ae1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
39 changes: 36 additions & 3 deletions src/actors/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class Hello {
class Counter {
private count: number;
private watchTarget = new WatchTarget<number>();
public metadata?: { extraSum: number };
public metadata?: { extraSum: number; signal?: AbortSignal };
private listenerAttached = Promise.withResolvers<void>();

constructor(protected state: ActorState) {
this.count = 0;
Expand All @@ -43,6 +44,17 @@ class Counter {
});
}

waitForAbortSignalAttach() {
return this.listenerAttached.promise;
}
shouldFailWhenSignalIsAborted() {
const p = Promise.withResolvers<string>();
this.metadata?.signal?.addEventListener("abort", () => {
p.resolve("HELLO ITS RESOLVED!");
}, { once: true });
this.listenerAttached.resolve();
return p.promise;
}
callSayHello(): Promise<string> {
const hello = this.state.stub(Hello).id(this.state.id);
return hello.sayHello();
Expand Down Expand Up @@ -105,16 +117,37 @@ const runServer = (
};
};

Deno.test("counter tests", async () => {
Deno.test("actor tests", async () => {
const rt = new StdActorRuntime();
let reqCount = 0;
await using _server = runServer(rt, () => {

let nextRequest = Promise.withResolvers<() => void>();

let abortRequestFn = nextRequest.promise;
await using _server = runServer(rt, (rq) => {
reqCount++;
nextRequest.resolve(() => {
rq.signal.dispatchEvent(new Event("abort"));
});
nextRequest = Promise.withResolvers<() => void>();
abortRequestFn = nextRequest.promise;
});
const actorId = "1234";
const counterStub = actors.stub(Counter, { maxWsChunkSize: 64 });

const counterActor = counterStub.id(actorId);

const waitForAttach = counterActor.waitForAbortSignalAttach().then((p) => p);
await nextRequest.promise;
const shouldReturnHelloOnlyWhenAborted = counterActor
.shouldFailWhenSignalIsAborted().then(
(p) => p,
);
const abort = await abortRequestFn;
await waitForAttach;
abort();
assertEquals(await shouldReturnHelloOnlyWhenAborted, "HELLO ITS RESOLVED!");

using rpcActor = counterStub.id("12345").rpc();

const helloUploadActor = actors.stub(Hello).id("123456");
Expand Down
4 changes: 4 additions & 0 deletions src/actors/silo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ActorState } from "./state.ts";
import type { ActorStorage } from "./storage.ts";
import {
type ActorInvoker,
type BaseMetadata,
create,
createHttpInvoker,
type EnrichMetadataFn,
Expand Down Expand Up @@ -101,6 +102,9 @@ export class ActorSilo<TEnv extends object = object> {
)
: metadata;

if (metadata && typeof metadata === "object") {
(metadata as BaseMetadata).signal = req?.signal;
}
if (isWellKnownRPCMethod(String(method))) {
const chan = rpc(this.invoker, metadata);
return chan;
Expand Down
12 changes: 10 additions & 2 deletions src/actors/stubutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ export type EnrichMetadataFn<
metadata: TMetadata,
req: Request,
) => EnrichedMetadata;

export interface BaseMetadata {
signal?: AbortSignal;
}
/**
* Represents an actor proxy.
*/
Expand All @@ -233,11 +237,15 @@ export type ActorProxy<Actor> =
& (Actor extends { metadata?: infer TMetadata } ? Actor extends {
enrichMetadata: EnrichMetadataFn<infer TPartialMetadata, any>;
} ? {
withMetadata(metadata: TPartialMetadata): ActorProxy<Actor>;
withMetadata(
metadata: Omit<TPartialMetadata, keyof BaseMetadata>,
): ActorProxy<Actor>;
rpc(): ActorProxy<Actor> & Disposable;
}
: {
withMetadata(metadata: TMetadata): ActorProxy<Actor>;
withMetadata(
metadata: Omit<TMetadata, keyof BaseMetadata>,
): ActorProxy<Actor>;
rpc(): ActorProxy<Actor> & Disposable;
}
: { rpc(): ActorProxy<Actor> & Disposable });
Expand Down

0 comments on commit d257ae1

Please sign in to comment.