Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: activity override and cancellation #101

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 136 additions & 5 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
workflow,
heartbeat,
HeartbeatTimeout,
completeActivity,
failActivity,
} from "@eventual/core";
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
import { AsyncWriterTestEvent } from "./async-writer-handler.js";
Expand Down Expand Up @@ -148,11 +150,13 @@ export const childWorkflow = workflow(
}
);

const slowActivity = activity(
"slowAct",
{ timeoutSeconds: 5 },
() => new Promise((resolve) => setTimeout(resolve, 10 * 1000))
);
const delay = (seconds: number) =>
new Promise((resolve) => setTimeout(resolve, seconds * 1000));

const slowActivity = activity("slowAct", { timeoutSeconds: 5 }, async () => {
await delay(10);
return "done finally";
});

const slowWf = workflow("slowWorkflow", { timeoutSeconds: 5 }, () =>
sleepFor(10)
Expand Down Expand Up @@ -332,3 +336,130 @@ const sendFinishEvent = activity("sendFinish", async (executionId: string) => {
proxy: true,
});
});

const activityOverrideEvent = event<{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its a good time to add comments about what each of the elements in this service are, and what they're used for in the tests. Its starting to become a lot to piece together.

executionId: string;
token: string;
location: "handler" | "signal";
type: "complete" | "fail";
}>("activityOverrideEvent");

const activityOverrideActivity = activity(
"eventPublish",
async ({
type,
location,
executionId,
}: {
executionId: string;
location: "handler" | "signal";
type: "complete" | "fail";
}) => {
return asyncResult(
async (token) =>
await activityOverrideEvent.publish({
token,
location,
type,
executionId,
})
);
}
);

const activityOverrideSignal = new Signal<{
token: string;
type: "complete" | "fail";
}>("activityOverrideSignal");

activityOverrideEvent.on(async ({ token, location, type, executionId }) => {
if (location === "handler") {
if (type === "complete") {
await completeActivity(token, "from the event handler!");
} else {
await failActivity(token, new Error("WHY!!!"));
}
} else {
await activityOverrideSignal.send(executionId, { token, type });
}
});

/**
* Activity which waits to be closed/cancelled.
* If it is closed, it signals the workflow with "complete", if not it signals with "fail".
*/
const activityOverrideAwareActivity = activity(
"overrideAware",
async ({ executionId }: { executionId: string }) => {
let n = 0;
while (n < 10) {
await delay(1);
const { closed } = await heartbeat();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does heartbeat throw if it's been cancelled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the activity chooses what to do with this information, it can keep running if the developer wants. (ex: fire and forget activities).

if (closed) {
await activityOverrideSignal.send(executionId, {
token: "",
type: "complete",
});
return;
}
}

await activityOverrideSignal.send(executionId, { token: "", type: "fail" });
}
);

export const overrideWorkflow = workflow(
"override",
async (_, { execution: { id: executionId } }) => {
const act = slowActivity();
const act2 = slowActivity();
const act3 = slowActivity();
act.cancel("because");
act2.fail(new Error("ahhh"));
act3.complete("hi!");

const results1 = await Promise.allSettled([act, act2, act3]);

const signalHandler = activityOverrideSignal.on(async ({ token, type }) => {
if (type === "complete") {
await completeActivity(token, "from the signal handler!");
} else {
await failActivity(token, new Error("BECAUSE!!!"));
}
});

const results2 = await Promise.allSettled([
activityOverrideActivity({
location: "handler",
type: "complete",
executionId,
}),
activityOverrideActivity({
location: "handler",
type: "fail",
executionId,
}),
activityOverrideActivity({
location: "signal",
type: "complete",
executionId,
}),
activityOverrideActivity({
location: "signal",
type: "fail",
executionId,
}),
]);

const aware = activityOverrideAwareActivity({ executionId });
await sleepFor(1);
aware.cancel("because");

signalHandler.dispose();

// the override activity SHOULD send a signal when it realizes it is cancelled.
const result = await expectSignal(activityOverrideSignal);

return [results1, results2, result];
}
);
38 changes: 35 additions & 3 deletions apps/tests/aws-runtime/test/tester.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { HeartbeatTimeout } from "@eventual/core";
import {
ActivityCancelled,
EventualError,
HeartbeatTimeout,
} from "@eventual/core";
import { eventualRuntimeTestHarness } from "./runtime-test-harness.js";
import {
eventDrivenWorkflow,
Expand All @@ -10,6 +14,7 @@ import {
workflow2,
workflow3,
workflow4,
overrideWorkflow,
} from "./test-service.js";

jest.setTimeout(100 * 1000);
Expand All @@ -31,7 +36,7 @@ eventualRuntimeTestHarness(({ testCompletion }) => {
{ status: "fulfilled", value: ["HELLO SAM", "HELLO CHRIS", "HELLO SAM"] },
{ status: "fulfilled", value: ["hello sam", "hello chris", "hello sam"] },
{ status: "fulfilled", value: "hello sam" },
{ status: "rejected", reason: "Error" },
{ status: "rejected", reason: { name: "Error", message: "failed" } },
]);

testCompletion("parent-child", parentWorkflow, "done");
Expand All @@ -45,7 +50,10 @@ eventualRuntimeTestHarness(({ testCompletion }) => {

testCompletion("asyncActivities", asyncWorkflow, [
"hello from the async writer!",
"AsyncWriterError",
{
name: "AsyncWriterError",
message: "I was told to fail this activity, sorry.",
},
]);

testCompletion("heartbeat", heartbeatWorkflow, 10, [
Expand All @@ -62,4 +70,28 @@ eventualRuntimeTestHarness(({ testCompletion }) => {
]);

testCompletion("event-driven", eventDrivenWorkflow, "done!");

testCompletion("overrideActivities", overrideWorkflow, [
[
{ status: "rejected", reason: new ActivityCancelled("because").toJSON() },
{
status: "rejected",
reason: new EventualError("Error", "ahhh").toJSON(),
},
{ status: "fulfilled", value: "hi!" },
],
[
{ status: "fulfilled", value: "from the event handler!" },
{
status: "rejected",
reason: new EventualError("Error", "WHY!!!").toJSON(),
},
{ status: "fulfilled", value: "from the signal handler!" },
{
status: "rejected",
reason: new EventualError("Error", "BECAUSE!!!").toJSON(),
},
],
{ token: "", type: "complete" },
]);
});
2 changes: 2 additions & 0 deletions packages/@eventual/aws-cdk/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ export class Activities extends Construct implements IActivities, IGrantable {

public configureCompleteActivity(func: Function) {
this.props.workflows.configureSendWorkflowEvent(func);
this.configureUpdateActivity(func);
}

public grantCompleteActivity(grantable: IGrantable) {
this.props.workflows.grantSendWorkflowEvent(grantable);
this.grantUpdateActivity(grantable);
}

public configureUpdateActivity(func: Function) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class AWSActivityRuntimeClient implements ActivityRuntimeClient {
executionId: string,
seq: number,
heartbeatTime: string
): Promise<{ cancelled: boolean }> {
): Promise<{ closed: boolean }> {
const item = await this.props.dynamo.send(
new UpdateItemCommand({
Key: {
Expand All @@ -86,27 +86,36 @@ export class AWSActivityRuntimeClient implements ActivityRuntimeClient {
);

return {
cancelled:
(item.Attributes as ActivityExecutionRecord).cancelled?.BOOL ?? false,
closed:
(item.Attributes as ActivityExecutionRecord).closed?.BOOL ?? false,
};
}

async cancelActivity(executionId: string, seq: number) {
await this.props.dynamo.send(
/**
* An activity execution is closed when it already has a result (completed or failed).
*
* Close the activity to prevent others from emitting events for it and report to the activity worker
* that the activity is no longer able to report a result.
*/
async closeActivity(executionId: string, seq: number) {
const item = await this.props.dynamo.send(
new UpdateItemCommand({
Key: {
pk: { S: ActivityExecutionRecord.key(executionId, seq) },
},
UpdateExpression:
"SET cancelled=:cancelled, executionId = :executionId, seq = :seq",
"SET closed=:closed, executionId = :executionId, seq = :seq",
ExpressionAttributeValues: {
":cancelled": { BOOL: true },
":closed": { BOOL: true },
":executionId": { S: executionId },
":seq": { N: `${seq}` },
},
TableName: this.props.activityTableName,
ReturnValues: ReturnValue.UPDATED_OLD,
})
);

return { alreadyClosed: item.Attributes?.["closed"]?.BOOL ?? false };
}

async getActivity(
Expand All @@ -132,7 +141,7 @@ export interface ActivityExecutionRecord {
executionId: AttributeValue.SMember;
seq: AttributeValue.NMember;
heartbeatTime?: AttributeValue.SMember;
cancelled?: AttributeValue.BOOLMember;
closed?: AttributeValue.BOOLMember;
}

export namespace ActivityExecutionRecord {
Expand All @@ -148,7 +157,7 @@ function createActivityFromRecord(
return {
executionId: activityRecord.executionId.S,
seq: Number(activityRecord.seq.N),
cancelled: Boolean(activityRecord.cancelled?.BOOL ?? false),
closed: Boolean(activityRecord.closed?.BOOL ?? false),
heartbeatTime: activityRecord?.heartbeatTime?.S,
};
}
2 changes: 2 additions & 0 deletions packages/@eventual/aws-runtime/src/handlers/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { SQSEvent, SQSRecord } from "aws-lambda";
import { logger, loggerMiddlewares } from "../logger.js";
import { AWSMetricsClient } from "../clients/metrics-client.js";
import {
createActivityRuntimeClient,
createEventClient,
createExecutionHistoryClient,
createTimerClient,
Expand All @@ -24,6 +25,7 @@ const orchestrate = createOrchestrator({
workflowRuntimeClient: createWorkflowRuntimeClient(),
workflowClient: createWorkflowClient(),
eventClient: createEventClient(),
activityRuntimeClient: createActivityRuntimeClient(),
metricsClient: AWSMetricsClient,
logger,
});
Expand Down
Loading