From 8d60e3b63a3b16256cafd95cc29d2137f26faa50 Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Mon, 12 Dec 2022 20:23:28 -0600 Subject: [PATCH 1/9] support complete and fail activity --- .../src/clients/activity-runtime-client.ts | 27 ++-- .../aws-runtime/src/handlers/orchestrator.ts | 2 + packages/@eventual/core/src/activity.ts | 94 +++++++++++- .../@eventual/core/src/calls/activity-call.ts | 66 ++++++++- .../core/src/calls/condition-call.ts | 5 +- .../core/src/calls/expect-signal-call.ts | 5 +- .../core/src/calls/send-events-call.ts | 5 +- .../core/src/calls/send-signal-call.ts | 5 +- .../core/src/calls/signal-handler-call.ts | 5 +- .../@eventual/core/src/calls/sleep-call.ts | 8 +- .../@eventual/core/src/calls/workflow-call.ts | 5 +- packages/@eventual/core/src/command.ts | 15 ++ packages/@eventual/core/src/error.ts | 10 ++ packages/@eventual/core/src/eventual.ts | 15 +- packages/@eventual/core/src/heartbeat.ts | 2 +- packages/@eventual/core/src/interpret.ts | 25 +++- .../clients/activity-runtime-client.ts | 21 ++- .../src/runtime/clients/workflow-client.ts | 36 ++--- .../core/src/runtime/command-executor.ts | 66 ++++++++- .../src/runtime/handlers/activity-worker.ts | 18 ++- .../core/src/runtime/handlers/orchestrator.ts | 4 + .../core/src/runtime/metrics/constants.ts | 4 + .../@eventual/core/src/workflow-events.ts | 19 +++ packages/@eventual/core/test/command-util.ts | 31 ++++ .../core/test/commend-executor.test.ts | 5 + .../@eventual/core/test/interpret.test.ts | 136 +++++++++++++++++- 26 files changed, 564 insertions(+), 70 deletions(-) diff --git a/packages/@eventual/aws-runtime/src/clients/activity-runtime-client.ts b/packages/@eventual/aws-runtime/src/clients/activity-runtime-client.ts index ef8507ad2..c68fa1d87 100644 --- a/packages/@eventual/aws-runtime/src/clients/activity-runtime-client.ts +++ b/packages/@eventual/aws-runtime/src/clients/activity-runtime-client.ts @@ -67,7 +67,7 @@ export class AWSActivityRuntimeClient implements ActivityRuntimeClient { executionId: string, seq: number, heartbeatTime: string - ): Promise<{ cancelled: boolean }> { + ): Promise<{ closed: boolean }> { const item = await this.props.dynamo.send( new UpdateItemCommand({ Key: { @@ -86,27 +86,36 @@ export class AWSActivityRuntimeClient implements ActivityRuntimeClient { ); return { - cancelled: - (item.Attributes as ActivityExecutionRecord).cancelled?.BOOL ?? false, + closed: + (item.Attributes as ActivityExecutionRecord).closed?.BOOL ?? false, }; } - async cancelActivity(executionId: string, seq: number) { - await this.props.dynamo.send( + /** + * An activity execution is closed when it already has a result (completed or failed). + * + * Close the activity to prevent others from emitting events for it and report to the activity worker + * that the activity is no longer able to report a result. + */ + async closeActivity(executionId: string, seq: number) { + const item = await this.props.dynamo.send( new UpdateItemCommand({ Key: { pk: { S: ActivityExecutionRecord.key(executionId, seq) }, }, UpdateExpression: - "SET cancelled=:cancelled, executionId = :executionId, seq = :seq", + "SET closed=:closed, executionId = :executionId, seq = :seq", ExpressionAttributeValues: { - ":cancelled": { BOOL: true }, + ":closed": { BOOL: true }, ":executionId": { S: executionId }, ":seq": { N: `${seq}` }, }, TableName: this.props.activityTableName, + ReturnValues: ReturnValue.UPDATED_OLD, }) ); + + return { alreadyClosed: item.Attributes?.["closed"]?.BOOL ?? false }; } async getActivity( @@ -132,7 +141,7 @@ export interface ActivityExecutionRecord { executionId: AttributeValue.SMember; seq: AttributeValue.NMember; heartbeatTime?: AttributeValue.SMember; - cancelled?: AttributeValue.BOOLMember; + closed?: AttributeValue.BOOLMember; } export namespace ActivityExecutionRecord { @@ -148,7 +157,7 @@ function createActivityFromRecord( return { executionId: activityRecord.executionId.S, seq: Number(activityRecord.seq.N), - cancelled: Boolean(activityRecord.cancelled?.BOOL ?? false), + closed: Boolean(activityRecord.closed?.BOOL ?? false), heartbeatTime: activityRecord?.heartbeatTime?.S, }; } diff --git a/packages/@eventual/aws-runtime/src/handlers/orchestrator.ts b/packages/@eventual/aws-runtime/src/handlers/orchestrator.ts index 13f3ee519..adbe87181 100644 --- a/packages/@eventual/aws-runtime/src/handlers/orchestrator.ts +++ b/packages/@eventual/aws-runtime/src/handlers/orchestrator.ts @@ -6,6 +6,7 @@ import type { SQSEvent, SQSRecord } from "aws-lambda"; import { logger, loggerMiddlewares } from "../logger.js"; import { AWSMetricsClient } from "../clients/metrics-client.js"; import { + createActivityRuntimeClient, createEventClient, createExecutionHistoryClient, createTimerClient, @@ -24,6 +25,7 @@ const orchestrate = createOrchestrator({ workflowRuntimeClient: createWorkflowRuntimeClient(), workflowClient: createWorkflowClient(), eventClient: createEventClient(), + activityRuntimeClient: createActivityRuntimeClient(), metricsClient: AWSMetricsClient, logger, }); diff --git a/packages/@eventual/core/src/activity.ts b/packages/@eventual/core/src/activity.ts index a6e31c5d1..6ed50b06d 100644 --- a/packages/@eventual/core/src/activity.ts +++ b/packages/@eventual/core/src/activity.ts @@ -1,5 +1,14 @@ -import { createActivityCall } from "./calls/activity-call.js"; -import { callableActivities, getActivityContext } from "./global.js"; +import { + createActivityCall, + createFinishActivityCall, +} from "./calls/activity-call.js"; +import { ActivityCancelled, EventualError } from "./error.js"; +import { + callableActivities, + getActivityContext, + getWorkflowClient, +} from "./global.js"; +import { Result } from "./result.js"; import { isActivityWorker, isOrchestratorWorker } from "./runtime/flags.js"; export interface ActivityOptions { @@ -44,6 +53,9 @@ export type UnwrapAsync = Output extends AsyncResult ? O : Output; +export type ActivityOutput> = + A extends ActivityFunction ? UnwrapAsync : never; + const AsyncTokenSymbol = Symbol.for("eventual:AsyncToken"); /** @@ -148,6 +160,84 @@ export function activity( } } +export type ActivityTarget = OwnActivityTarget | ActivityTokenTarget; + +export enum ActivityTargetType { + OwnActivity, + ActivityToken, +} + +export interface OwnActivityTarget { + type: ActivityTargetType.OwnActivity; + seq: number; +} + +export interface ActivityTokenTarget { + type: ActivityTargetType.ActivityToken; + activityToken: string; +} + +export function completeActivity = any>( + activityToken: string, + result: ActivityOutput +): Promise { + if (isOrchestratorWorker()) { + return createFinishActivityCall( + { + type: ActivityTargetType.ActivityToken, + activityToken, + }, + result + ) as any; + } else { + return getWorkflowClient().completeActivity({ activityToken, result }); + } +} + +export function failActivity( + activityToken: string, + error: Error +): Promise; +export function failActivity( + activityToken: string, + error: string, + message: string +): Promise; +export function failActivity( + activityToken: string, + ...args: [error: Error] | [error: string, message: string] +): Promise { + const error = + args.length === 1 ? args[0] : new EventualError(args[0], args[1]); + if (isOrchestratorWorker()) { + return createFinishActivityCall( + { + type: ActivityTargetType.ActivityToken, + activityToken, + }, + Result.failed(error) + ) as any; + } else { + return getWorkflowClient().failActivity({ + activityToken, + error: error.name, + message: error.message, + }); + } +} + +export function cancelActivity( + activityToken: string, + reason: string +): Promise { + if (isOrchestratorWorker()) { + // not a real promise, do not await + return failActivity(activityToken, new ActivityCancelled(reason)) as any; + } else { + return failActivity(activityToken, new ActivityCancelled(reason)); + } +} + /** * Retrieve an activity function that has been registered in a workflow. */ diff --git a/packages/@eventual/core/src/calls/activity-call.ts b/packages/@eventual/core/src/calls/activity-call.ts index 841ed3434..3a88965fe 100644 --- a/packages/@eventual/core/src/calls/activity-call.ts +++ b/packages/@eventual/core/src/calls/activity-call.ts @@ -1,19 +1,21 @@ +import { ActivityTarget, ActivityTargetType } from "../activity.js"; +import { ActivityCancelled, EventualError } from "../error.js"; import { EventualKind, - EventualBase, isEventualOfKind, createEventual, + CommandCallBase, } from "../eventual.js"; import { registerEventual } from "../global.js"; -import { Resolved, Failed } from "../result.js"; +import { Resolved, Failed, Result } from "../result.js"; export function isActivityCall(a: any): a is ActivityCall { return isEventualOfKind(EventualKind.ActivityCall, a); } export interface ActivityCall - extends EventualBase | Failed> { - seq?: number; + extends CommandCallBase | Failed>, + ActivityExecutionReference { name: string; args: any[]; heartbeatSeconds?: number; @@ -26,12 +28,66 @@ export function createActivityCall( timeoutSeconds?: number, heartbeatSeconds?: number ): ActivityCall { - return registerEventual( + const call = registerEventual( createEventual(EventualKind.ActivityCall, { name, args, timeoutSeconds, heartbeatSeconds, + } as ActivityCall) + ); + + call.complete = function (result) { + return createFinishActivityCall( + { type: ActivityTargetType.OwnActivity, seq: this.seq! }, + Result.resolved(result) + ) as unknown as Promise; + }; + call.fail = function (...args) { + return createFinishActivityCall( + { type: ActivityTargetType.OwnActivity, seq: this.seq! }, + Result.failed( + args.length === 1 ? args[0] : new EventualError(args[0], args[1]) + ) + ) as unknown as Promise; + }; + call.cancel = function (reason) { + return createFinishActivityCall( + { type: ActivityTargetType.OwnActivity, seq: this.seq! }, + Result.failed(new ActivityCancelled(reason)) + ) as unknown as Promise; + }; + + return call; +} + +export interface ActivityExecutionReference { + cancel: (reason: string) => Promise; + fail: ( + ...args: [error: Error] | [error: string, message: string] + ) => Promise; + complete: (result: T) => Promise; +} + +export function isFinishActivityCall(a: any): a is FinishActivityCall { + return isEventualOfKind(EventualKind.FinishActivityCall, a); +} + +export interface FinishActivityCall + extends CommandCallBase { + target: ActivityTarget; + outcome: Resolved | Failed; +} + +export function createFinishActivityCall( + target: ActivityTarget, + outcome: Resolved | Failed +): FinishActivityCall { + return registerEventual( + createEventual(EventualKind.FinishActivityCall, { + target, + outcome, + result: Result.resolved(undefined), }) ); } diff --git a/packages/@eventual/core/src/calls/condition-call.ts b/packages/@eventual/core/src/calls/condition-call.ts index 5a8c9fe07..d12b309fb 100644 --- a/packages/@eventual/core/src/calls/condition-call.ts +++ b/packages/@eventual/core/src/calls/condition-call.ts @@ -1,7 +1,7 @@ import { ConditionPredicate } from "../condition.js"; import { + CommandCallBase, createEventual, - EventualBase, EventualKind, isEventualOfKind, } from "../eventual.js"; @@ -13,8 +13,7 @@ export function isConditionCall(a: any): a is ConditionCall { } export interface ConditionCall - extends EventualBase | Failed> { - seq?: number; + extends CommandCallBase | Failed> { predicate: ConditionPredicate; timeoutSeconds?: number; } diff --git a/packages/@eventual/core/src/calls/expect-signal-call.ts b/packages/@eventual/core/src/calls/expect-signal-call.ts index f15a0a661..9c2d2b1f6 100644 --- a/packages/@eventual/core/src/calls/expect-signal-call.ts +++ b/packages/@eventual/core/src/calls/expect-signal-call.ts @@ -1,8 +1,8 @@ import { EventualKind, - EventualBase, isEventualOfKind, createEventual, + CommandCallBase, } from "../eventual.js"; import { registerEventual } from "../global.js"; import { Failed, Resolved } from "../result.js"; @@ -12,8 +12,7 @@ export function isExpectSignalCall(a: any): a is ExpectSignalCall { } export interface ExpectSignalCall - extends EventualBase | Failed> { - seq?: number; + extends CommandCallBase | Failed> { signalId: string; timeoutSeconds?: number; } diff --git a/packages/@eventual/core/src/calls/send-events-call.ts b/packages/@eventual/core/src/calls/send-events-call.ts index a36aae51a..3e0a257fd 100644 --- a/packages/@eventual/core/src/calls/send-events-call.ts +++ b/packages/@eventual/core/src/calls/send-events-call.ts @@ -1,8 +1,8 @@ import { EventualKind, - EventualBase, isEventualOfKind, createEventual, + CommandCallBase, } from "../eventual.js"; import { registerEventual } from "../global.js"; import { EventEnvelope } from "../event.js"; @@ -13,8 +13,7 @@ export function isPublishEventsCall(a: any): a is PublishEventsCall { } export interface PublishEventsCall - extends EventualBase> { - seq?: number; + extends CommandCallBase> { events: EventEnvelope[]; id?: string; } diff --git a/packages/@eventual/core/src/calls/send-signal-call.ts b/packages/@eventual/core/src/calls/send-signal-call.ts index d2eec3553..b50175cc5 100644 --- a/packages/@eventual/core/src/calls/send-signal-call.ts +++ b/packages/@eventual/core/src/calls/send-signal-call.ts @@ -1,8 +1,8 @@ import { EventualKind, - EventualBase, isEventualOfKind, createEventual, + CommandCallBase, } from "../eventual.js"; import { registerEventual } from "../global.js"; import { Resolved, Result } from "../result.js"; @@ -13,8 +13,7 @@ export function isSendSignalCall(a: any): a is SendSignalCall { } export interface SendSignalCall - extends EventualBase> { - seq?: number; + extends CommandCallBase> { signalId: string; payload?: any; target: SignalTarget; diff --git a/packages/@eventual/core/src/calls/signal-handler-call.ts b/packages/@eventual/core/src/calls/signal-handler-call.ts index ffd35c132..9da6f307a 100644 --- a/packages/@eventual/core/src/calls/signal-handler-call.ts +++ b/packages/@eventual/core/src/calls/signal-handler-call.ts @@ -1,6 +1,6 @@ import { + CommandCallBase, createEventual, - EventualBase, EventualKind, isEventualOfKind, } from "../eventual.js"; @@ -16,9 +16,8 @@ export function isRegisterSignalHandlerCall( } export interface RegisterSignalHandlerCall - extends EventualBase, + extends CommandCallBase, SignalsHandler { - seq?: number; signalId: string; handler: (input: T) => Program | void; } diff --git a/packages/@eventual/core/src/calls/sleep-call.ts b/packages/@eventual/core/src/calls/sleep-call.ts index 19d649565..84ef30b90 100644 --- a/packages/@eventual/core/src/calls/sleep-call.ts +++ b/packages/@eventual/core/src/calls/sleep-call.ts @@ -1,8 +1,8 @@ import { EventualKind, - EventualBase, isEventualOfKind, createEventual, + CommandCallBase, } from "../eventual.js"; import { registerEventual } from "../global.js"; import { Resolved } from "../result.js"; @@ -16,14 +16,12 @@ export function isSleepUntilCall(a: any): a is SleepUntilCall { } export interface SleepForCall - extends EventualBase> { - seq?: number; + extends CommandCallBase> { durationSeconds: number; } export interface SleepUntilCall - extends EventualBase> { - seq?: number; + extends CommandCallBase> { isoDate: string; } diff --git a/packages/@eventual/core/src/calls/workflow-call.ts b/packages/@eventual/core/src/calls/workflow-call.ts index c58c08afb..eac9fad09 100644 --- a/packages/@eventual/core/src/calls/workflow-call.ts +++ b/packages/@eventual/core/src/calls/workflow-call.ts @@ -1,7 +1,7 @@ import { + CommandCallBase, createEventual, Eventual, - EventualBase, EventualKind, isEventualOfKind, } from "../eventual.js"; @@ -24,11 +24,10 @@ export function isWorkflowCall(a: Eventual): a is WorkflowCall { * An {@link Eventual} representing an awaited call to a {@link Workflow}. */ export interface WorkflowCall - extends EventualBase>, + extends CommandCallBase>, ChildExecution { name: string; input?: any; - seq?: number; opts?: WorkflowOptions; } diff --git a/packages/@eventual/core/src/command.ts b/packages/@eventual/core/src/command.ts index 99e2c9957..09ebf3952 100644 --- a/packages/@eventual/core/src/command.ts +++ b/packages/@eventual/core/src/command.ts @@ -1,9 +1,11 @@ import { EventEnvelope } from "./event.js"; +import { ActivityTarget, Failed, Resolved } from "./index.js"; import { SignalTarget } from "./signals.js"; import { WorkflowOptions } from "./workflow.js"; export type Command = | ExpectSignalCommand + | FinishActivityCommand | ScheduleActivityCommand | ScheduleWorkflowCommand | PublishEventsCommand @@ -19,6 +21,7 @@ interface CommandBase { export enum CommandType { ExpectSignal = "ExpectSignal", + FinishActivity = "FinishActivity", PublishEvents = "PublishEvents", SendSignal = "SendSignal", SleepFor = "SleepFor", @@ -48,6 +51,18 @@ export function isScheduleActivityCommand( return a.kind === CommandType.StartActivity; } +export interface FinishActivityCommand + extends CommandBase { + target: ActivityTarget; + outcome: Resolved | Failed; +} + +export function isFinishActivityCommand( + a: Command +): a is FinishActivityCommand { + return a.kind === CommandType.FinishActivity; +} + // TODO support a timeout at the parent workflow level. The current timeout fails the whole workflow and not just the waiter. export interface ScheduleWorkflowCommand extends CommandBase { diff --git a/packages/@eventual/core/src/error.ts b/packages/@eventual/core/src/error.ts index f87204ae8..e340a7fb4 100644 --- a/packages/@eventual/core/src/error.ts +++ b/packages/@eventual/core/src/error.ts @@ -66,6 +66,16 @@ export class HeartbeatTimeout extends Timeout { this.name = "HeartbeatTimeout"; } } + +/** + * Thrown when an activity was cancelled by the workflow. + */ +export class ActivityCancelled extends EventualError { + constructor(reason: string) { + super("ActivityCancelled", reason); + } +} + /** * Thrown when a particular context only support synchronous operations (ex: condition predicate). */ diff --git a/packages/@eventual/core/src/eventual.ts b/packages/@eventual/core/src/eventual.ts index 173ebfcd3..d4df79f16 100644 --- a/packages/@eventual/core/src/eventual.ts +++ b/packages/@eventual/core/src/eventual.ts @@ -1,4 +1,9 @@ -import { ActivityCall, isActivityCall } from "./calls/activity-call.js"; +import { + ActivityCall, + FinishActivityCall, + isActivityCall, + isFinishActivityCall, +} from "./calls/activity-call.js"; import { AwaitAll, createAwaitAll } from "./await-all.js"; import { chain, Chain } from "./chain.js"; import type { Program } from "./interpret.js"; @@ -44,6 +49,11 @@ export interface EventualBase { result?: R; } +export interface CommandCallBase + extends EventualBase { + seq?: number; +} + export enum EventualKind { ActivityCall = 1, AwaitAll = 0, @@ -52,6 +62,7 @@ export enum EventualKind { Chain = 2, ConditionCall = 9, ExpectSignalCall = 6, + FinishActivityCall = 14, PublishEventsCall = 13, Race = 11, RegisterSignalHandlerCall = 7, @@ -95,6 +106,7 @@ export type CommandCall = | ActivityCall | ConditionCall | ExpectSignalCall + | FinishActivityCall | RegisterSignalHandlerCall | PublishEventsCall | SendSignalCall @@ -107,6 +119,7 @@ export function isCommandCall(call: Eventual): call is CommandCall { isActivityCall(call) || isConditionCall(call) || isExpectSignalCall(call) || + isFinishActivityCall(call) || isPublishEventsCall(call) || isRegisterSignalHandlerCall(call) || isSendSignalCall(call) || diff --git a/packages/@eventual/core/src/heartbeat.ts b/packages/@eventual/core/src/heartbeat.ts index 6bad9e4c1..0f85236a0 100644 --- a/packages/@eventual/core/src/heartbeat.ts +++ b/packages/@eventual/core/src/heartbeat.ts @@ -1,5 +1,5 @@ import { getActivityContext, getWorkflowClient } from "./global.js"; -import { HeartbeatResponse } from "./runtime/clients/workflow-client.js"; +import { HeartbeatResponse } from "./runtime/clients/activity-runtime-client.js"; import { isActivityWorker, isOrchestratorWorker } from "./runtime/flags.js"; /** diff --git a/packages/@eventual/core/src/interpret.ts b/packages/@eventual/core/src/interpret.ts index 21acfffed..8faa2ff12 100644 --- a/packages/@eventual/core/src/interpret.ts +++ b/packages/@eventual/core/src/interpret.ts @@ -6,7 +6,7 @@ import { EventualCallCollector, } from "./eventual.js"; import { isAwaitAll } from "./await-all.js"; -import { isActivityCall } from "./calls/activity-call.js"; +import { isActivityCall, isFinishActivityCall } from "./calls/activity-call.js"; import { DeterminismError, HeartbeatTimeout, @@ -36,6 +36,7 @@ import { isActivityTimedOut, isActivityHeartbeatTimedOut, isEventsPublished, + isActivityFinished, } from "./workflow-events.js"; import { Result, @@ -66,6 +67,7 @@ import { isAwaitAllSettled } from "./await-all-settled.js"; import { isAwaitAny } from "./await-any.js"; import { isRace } from "./race.js"; import { isPublishEventsCall } from "./calls/send-events-call.js"; +import { ActivityTargetType } from "./index.js"; export interface WorkflowResult { /** @@ -240,6 +242,13 @@ export function interpret( seq: call.seq!, events: call.events, }; + } else if (isFinishActivityCall(call)) { + return { + kind: CommandType.FinishActivity, + seq: call.seq!, + outcome: call.outcome, + target: call.target, + }; } return assertNever(call); @@ -269,6 +278,18 @@ export function interpret( if (isCommandCall(activity)) { if (isExpectSignalCall(activity)) { subscribeToSignal(activity.signalId, activity); + } else if (isFinishActivityCall(activity)) { + if (activity.target.type === ActivityTargetType.OwnActivity) { + const act = callTable[activity.target.seq]; + if (act === undefined) { + throw new DeterminismError( + `Call for seq ${activity.target.seq} was not emitted.` + ); + } + if (!act.result) { + act.result = activity.outcome; + } + } } else if (isConditionCall(activity)) { // if the condition is resolvable, don't add it to the calls. const result = tryResolveResult(activity); @@ -551,6 +572,8 @@ function isCorresponding(event: ScheduledEvent, call: CommandCall) { return isConditionCall(call); } else if (isEventsPublished(event)) { return isPublishEventsCall(call); + } else if (isActivityFinished(event)) { + return isFinishActivityCall(call); } return assertNever(event); } diff --git a/packages/@eventual/core/src/runtime/clients/activity-runtime-client.ts b/packages/@eventual/core/src/runtime/clients/activity-runtime-client.ts index ad9b05f37..1e8654d1e 100644 --- a/packages/@eventual/core/src/runtime/clients/activity-runtime-client.ts +++ b/packages/@eventual/core/src/runtime/clients/activity-runtime-client.ts @@ -21,13 +21,16 @@ export interface ActivityRuntimeClient { executionId: string, seq: number, heartbeatTime: string - ): Promise<{ cancelled: boolean }>; + ): Promise; /** - * Marks an activity as cancelled. An activity can use the {@link heartbeat} call + * Marks an activity as closed. An activity can use the {@link heartbeat} call * to retrieve this value. */ - cancelActivity(executionId: string, seq: number): Promise; + closeActivity( + executionId: string, + seq: number + ): Promise<{ alreadyClosed: boolean }>; /** * Retrieves the activity to check the cancellation status, heartbeat, or other properties. @@ -43,5 +46,15 @@ export interface ActivityExecution { seq: number; claims?: string[]; heartbeatTime?: string; - cancelled?: boolean; + closed?: boolean; +} + +export interface HeartbeatResponse { + /** + * True when the activity has already completed elsewhere. + * + * This is the only way for a long running activity to know that the activity + * is no longer looking for a result. + */ + closed: boolean; } diff --git a/packages/@eventual/core/src/runtime/clients/workflow-client.ts b/packages/@eventual/core/src/runtime/clients/workflow-client.ts index 67d86f417..15a6b1628 100644 --- a/packages/@eventual/core/src/runtime/clients/workflow-client.ts +++ b/packages/@eventual/core/src/runtime/clients/workflow-client.ts @@ -10,7 +10,7 @@ import { Execution, ExecutionStatus } from "../../execution.js"; import { Signal } from "../../signals.js"; import { Workflow, WorkflowInput, WorkflowOptions } from "../../workflow.js"; import { decodeActivityToken } from "../activity-token.js"; -import { ActivityRuntimeClient } from "./activity-runtime-client.js"; +import { ActivityRuntimeClient, HeartbeatResponse } from "./activity-runtime-client.js"; export abstract class WorkflowClient { constructor(private activityRuntimeClient: ActivityRuntimeClient) {} @@ -107,7 +107,7 @@ export abstract class WorkflowClient { const execution = await this.getExecution(data.payload.executionId); if (execution?.status !== ExecutionStatus.IN_PROGRESS) { - return { cancelled: true }; + return { closed: true }; } return await this.activityRuntimeClient.heartbeatActivity( @@ -121,13 +121,23 @@ export abstract class WorkflowClient { E extends ActivityCompleted | ActivityFailed >(activityToken: string, event: Omit) { const data = decodeActivityToken(activityToken); - await this.submitWorkflowTask( - data.payload.executionId, - createEvent({ - ...event, - seq: data.payload.seq, - }) - ); + // mark the activity as cancelled because we are supplying a value + const { alreadyClosed: alreadyCancelled } = + await this.activityRuntimeClient.closeActivity( + data.payload.executionId, + data.payload.seq + ); + if (!alreadyCancelled) { + await this.submitWorkflowTask( + data.payload.executionId, + createEvent({ + ...event, + seq: data.payload.seq, + }) + ); + } else { + throw new Error("Activity already completed."); + } } } @@ -192,11 +202,3 @@ export interface HeartbeatRequest { activityToken: string; } -export interface HeartbeatResponse { - /** - * True when the activity has been cancelled. - * - * This is the only way for a long running activity to know it was canelled. - */ - cancelled: boolean; -} diff --git a/packages/@eventual/core/src/runtime/command-executor.ts b/packages/@eventual/core/src/runtime/command-executor.ts index c77c43185..8088c01d1 100644 --- a/packages/@eventual/core/src/runtime/command-executor.ts +++ b/packages/@eventual/core/src/runtime/command-executor.ts @@ -1,7 +1,9 @@ import { Command, ExpectSignalCommand, + FinishActivityCommand, isExpectSignalCommand, + isFinishActivityCommand, isPublishEventsCommand, isScheduleActivityCommand, isScheduleWorkflowCommand, @@ -31,13 +33,23 @@ import { ConditionStarted, ConditionTimedOut, SignalSent, + ActivityFinished, + ActivityCompleted, + ActivityFailed, } from "../workflow-events.js"; -import { EventsPublished, isChildExecutionTarget } from "../index.js"; +import { + ActivityTargetType, + EventsPublished, + isChildExecutionTarget, + isResolved, +} from "../index.js"; import { assertNever } from "../util.js"; import { Workflow } from "../workflow.js"; import { formatChildExecutionName, formatExecutionId } from "./execution-id.js"; import { ActivityWorkerRequest } from "./handlers/activity-worker.js"; import { + ActivityRuntimeClient, + decodeActivityToken, EventClient, Schedule, TimerClient, @@ -50,6 +62,7 @@ interface CommandExecutorProps { timerClient: TimerClient; workflowClient: WorkflowClient; eventClient: EventClient; + activityRuntimeClient: ActivityRuntimeClient; } /** @@ -81,6 +94,8 @@ export class CommandExecutor { return startCondition(command); } else if (isPublishEventsCommand(command)) { return publishEvents(command); + } else if (isFinishActivityCommand(command)) { + return finishActivity(command); } else { return assertNever(command, `unknown command type`); } @@ -235,5 +250,54 @@ export class CommandExecutor { seq: command.seq!, }); } + + async function finishActivity(command: FinishActivityCommand) { + if (command.target.type === ActivityTargetType.OwnActivity) { + await self.props.activityRuntimeClient.closeActivity( + executionId, + command.target.seq + ); + return createEvent({ + executionId, + activitySeq: command.target.seq, + seq: command.seq, + type: WorkflowEventType.ActivityFinished, + }); + } else { + const data = decodeActivityToken(command.target.activityToken); + if (isResolved(command.outcome)) { + await self.props.workflowClient.submitWorkflowTask( + data.payload.executionId, + createEvent({ + type: WorkflowEventType.ActivityCompleted, + seq: data.payload.seq, + result: command.outcome.value, + }) + ); + } else { + await self.props.workflowClient.submitWorkflowTask( + data.payload.executionId, + createEvent({ + type: WorkflowEventType.ActivityFailed, + seq: data.payload.seq, + error: + command.outcome.error instanceof Error + ? command.outcome.error.name + : "Error", + message: + command.outcome.error instanceof Error + ? command.outcome.error.message + : JSON.stringify(command.outcome.error), + }) + ); + } + return createEvent({ + executionId: data.payload.executionId, + activitySeq: data.payload.seq, + seq: command.seq, + type: WorkflowEventType.ActivityFinished, + }); + } + } } } diff --git a/packages/@eventual/core/src/runtime/handlers/activity-worker.ts b/packages/@eventual/core/src/runtime/handlers/activity-worker.ts index c43c60022..cd192e036 100644 --- a/packages/@eventual/core/src/runtime/handlers/activity-worker.ts +++ b/packages/@eventual/core/src/runtime/handlers/activity-worker.ts @@ -227,9 +227,23 @@ export function createActivityWorker({ event: ActivityCompleted | ActivityFailed, duration: number ) { - await timed(metrics, ActivityMetrics.SubmitWorkflowTaskDuration, () => - workflowClient.submitWorkflowTask(request.executionId, event) + const { alreadyClosed } = await timed( + metrics, + ActivityMetrics.SubmitWorkflowTaskDuration, + () => + activityRuntimeClient.closeActivity( + request.executionId, + request.command.seq + ) ); + // if an activity is closed, do not send the result on completion. + if (alreadyClosed) { + await timed(metrics, ActivityMetrics.SubmitWorkflowTaskDuration, () => + workflowClient.submitWorkflowTask(request.executionId, event) + ); + } else { + logger.info("Activity was already closed, do not emit result."); + } logActivityCompleteMetrics(isWorkflowFailed(event), duration); } diff --git a/packages/@eventual/core/src/runtime/handlers/orchestrator.ts b/packages/@eventual/core/src/runtime/handlers/orchestrator.ts index 9acde41e3..370c68d7a 100644 --- a/packages/@eventual/core/src/runtime/handlers/orchestrator.ts +++ b/packages/@eventual/core/src/runtime/handlers/orchestrator.ts @@ -27,6 +27,7 @@ import { import { isFailed, isResolved, isResult, Result } from "../../result.js"; import { lookupWorkflow, progressWorkflow, Workflow } from "../../workflow.js"; import { + ActivityRuntimeClient, EventClient, ExecutionHistoryClient, MetricsClient, @@ -48,6 +49,7 @@ import { promiseAllSettledPartitioned } from "../utils.js"; * The Orchestrator's client dependencies. */ export interface OrchestratorDependencies { + activityRuntimeClient: ActivityRuntimeClient; executionHistoryClient: ExecutionHistoryClient; timerClient: TimerClient; workflowRuntimeClient: WorkflowRuntimeClient; @@ -71,6 +73,7 @@ export interface OrchestratorResult { * inject its own client implementations designed for that platform. */ export function createOrchestrator({ + activityRuntimeClient, executionHistoryClient, timerClient, workflowRuntimeClient, @@ -86,6 +89,7 @@ export function createOrchestrator({ workflowClient, workflowRuntimeClient, eventClient, + activityRuntimeClient, }); return async (eventsByExecutionId) => { diff --git a/packages/@eventual/core/src/runtime/metrics/constants.ts b/packages/@eventual/core/src/runtime/metrics/constants.ts index 0a048a5c4..92904e11f 100644 --- a/packages/@eventual/core/src/runtime/metrics/constants.ts +++ b/packages/@eventual/core/src/runtime/metrics/constants.ts @@ -162,6 +162,10 @@ export namespace ActivityMetrics { * amount of time it took to submit a workflow task to SQS to resume the workflow. */ export const SubmitWorkflowTaskDuration = "SubmitWorkflowTaskDuration"; + /** + * amount of time it took to make the activity as cancelled. + */ + export const CancelActivityDuration = "CancelActivityDuration"; } export namespace SchedulerForwarderMetrics { diff --git a/packages/@eventual/core/src/workflow-events.ts b/packages/@eventual/core/src/workflow-events.ts index 208ce6bed..45a5a2bad 100644 --- a/packages/@eventual/core/src/workflow-events.ts +++ b/packages/@eventual/core/src/workflow-events.ts @@ -19,6 +19,7 @@ export interface HistoryEventBase extends Omit { export enum WorkflowEventType { ActivityCompleted = "ActivityCompleted", ActivityFailed = "ActivityFailed", + ActivityFinished = "ActivityFinished", ActivityHeartbeatTimedOut = "ActivityHeartbeatTimedOut", ActivityScheduled = "ActivityScheduled", ActivityTimedOut = "ActivityTimedOut", @@ -55,6 +56,7 @@ export type WorkflowEvent = export type ScheduledEvent = | ActivityScheduled + | ActivityFinished | ChildWorkflowScheduled | ConditionStarted | EventsPublished @@ -135,6 +137,16 @@ export interface ActivityCompleted extends HistoryEventBase { result: any; } +/** + * Event generated when the workflow calls complete, fail, + * or cancel on it's activity or the activity of another execution. + */ +export interface ActivityFinished extends HistoryEventBase { + type: WorkflowEventType.ActivityFinished; + executionId: string; + activitySeq: number; +} + export interface ActivityFailed extends HistoryEventBase { type: WorkflowEventType.ActivityFailed; error: string; @@ -201,6 +213,12 @@ export function isActivityCompleted( return event.type === WorkflowEventType.ActivityCompleted; } +export function isActivityFinished( + event: WorkflowEvent +): event is ActivityFinished { + return event.type === WorkflowEventType.ActivityFinished; +} + export function isActivityFailed( event: WorkflowEvent ): event is ActivityFailed { @@ -377,6 +395,7 @@ export function isWorkflowTimedOut( export const isScheduledEvent = or( isActivityScheduled, + isActivityFinished, isChildWorkflowScheduled, isConditionStarted, isEventsPublished, diff --git a/packages/@eventual/core/test/command-util.ts b/packages/@eventual/core/test/command-util.ts index a66672c13..a5e3edc2e 100644 --- a/packages/@eventual/core/test/command-util.ts +++ b/packages/@eventual/core/test/command-util.ts @@ -2,6 +2,7 @@ import { ulid } from "ulidx"; import { CommandType, ExpectSignalCommand, + FinishActivityCommand, PublishEventsCommand, ScheduleActivityCommand, ScheduleWorkflowCommand, @@ -31,8 +32,11 @@ import { WorkflowEventType, WorkflowTimedOut, ActivityHeartbeatTimedOut, + ActivityFinished, } from "../src/workflow-events.js"; import { SignalTarget } from "../src/signals.js"; +import { Failed, Resolved } from "../src/result.js"; +import { ActivityTarget } from "../src/index.js"; export function createSleepUntilCommand( untilTime: string, @@ -69,6 +73,19 @@ export function createScheduledActivityCommand( }; } +export function createFinishActivityCommand( + outcome: Resolved | Failed, + target: ActivityTarget, + seq: number +): FinishActivityCommand { + return { + kind: CommandType.FinishActivity, + seq, + outcome, + target, + }; +} + export function createScheduledWorkflowCommand( name: string, input: any, @@ -191,6 +208,20 @@ export function activityScheduled( }; } +export function activityFinished( + executionId: string, + activitySeq: number, + seq: number +): ActivityFinished { + return { + type: WorkflowEventType.ActivityFinished, + executionId, + activitySeq, + seq, + timestamp: new Date(0).toISOString(), + }; +} + export function activityHeartbeatTimedOut( seq: number, /** Relative seconds from 0 */ diff --git a/packages/@eventual/core/test/commend-executor.test.ts b/packages/@eventual/core/test/commend-executor.test.ts index 6aeeb211e..78012ed94 100644 --- a/packages/@eventual/core/test/commend-executor.test.ts +++ b/packages/@eventual/core/test/commend-executor.test.ts @@ -14,6 +14,7 @@ import { WorkflowEventType, } from "../src/workflow-events.js"; import { + ActivityRuntimeClient, EventClient, EventEnvelope, formatChildExecutionName, @@ -45,12 +46,16 @@ const mockWorkflowRuntimeClient = { const mockEventClient = { publish: jest.fn() as EventClient["publish"], } satisfies Partial as EventClient; +const mockActivityRuntimeClient = { + closeActivity: jest.fn() as ActivityRuntimeClient["closeActivity"], +} satisfies Partial as ActivityRuntimeClient; const testExecutor = new CommandExecutor({ timerClient: mockTimerClient, workflowClient: mockWorkflowClient, workflowRuntimeClient: mockWorkflowRuntimeClient, eventClient: mockEventClient, + activityRuntimeClient: mockActivityRuntimeClient, }); const workflow = { diff --git a/packages/@eventual/core/test/interpret.test.ts b/packages/@eventual/core/test/interpret.test.ts index 1bee6000d..573a4aed3 100644 --- a/packages/@eventual/core/test/interpret.test.ts +++ b/packages/@eventual/core/test/interpret.test.ts @@ -1,8 +1,13 @@ -import { createActivityCall } from "../src/calls/activity-call.js"; +import { + createActivityCall, + createFinishActivityCall, +} from "../src/calls/activity-call.js"; import { chain } from "../src/chain.js"; import { DeterminismError, HeartbeatTimeout, Timeout } from "../src/error.js"; import { + ActivityTargetType, Context, + createActivityToken, createAwaitAll, Eventual, interpret, @@ -19,10 +24,14 @@ import { WorkflowHandler, WorkflowResult, } from "../src/index.js"; -import { createSleepUntilCall } from "../src/calls/sleep-call.js"; +import { + createSleepForCall, + createSleepUntilCall, +} from "../src/calls/sleep-call.js"; import { activityCompleted, activityFailed, + activityFinished, activityHeartbeatTimedOut, activityScheduled, activityTimedOut, @@ -30,6 +39,7 @@ import { conditionStarted, conditionTimedOut, createExpectSignalCommand, + createFinishActivityCommand, createPublishEventCommand, createScheduledActivityCommand, createScheduledWorkflowCommand, @@ -239,7 +249,7 @@ test("should handle partial blocks with partial completes", () => { }); }); -describe("activity", () => +describe("activity", () => { describe("heartbeat", () => { const wf = workflow(function* () { return createActivityCall("getPumpedUp", [], undefined, 100); @@ -300,7 +310,125 @@ describe("activity", () => commands: [], }); }); - })); + }); + + describe("finish activity", () => { + describe("complete own activity", () => { + const wf = workflow(function* () { + const act = createActivityCall("getPumpedUp", [], undefined, 100); + yield createSleepForCall(100); + yield createFinishActivityCall( + { seq: 0, type: ActivityTargetType.OwnActivity }, + Result.resolved("hi") + ); + return act; + }); + + test("finish first", () => { + expect( + interpret(wf.definition(undefined, context), [ + activityScheduled("getPumpedUp", 0), + scheduledSleep("", 1), + completedSleep(1), + activityFinished("", 0, 2), + ]) + ).toMatchObject({ + result: Result.resolved("hi"), + commands: [], + }); + }); + + test("complete own after real complete", () => { + expect( + interpret(wf.definition(undefined, context), [ + activityScheduled("getPumpedUp", 0), + scheduledSleep("", 1), + activityCompleted("bye", 0), + completedSleep(1), + activityFinished("", 0, 2), + ]) + ).toMatchObject({ + result: Result.resolved("bye"), + commands: [], + }); + }); + }); + + test("fail own activity", () => { + const wf = workflow(function* () { + const act = createActivityCall("getPumpedUp", [], undefined, 100); + yield createFinishActivityCall( + { seq: 0, type: ActivityTargetType.OwnActivity }, + Result.failed(new Timeout()) + ); + return act; + }); + expect( + interpret(wf.definition(undefined, context), [ + activityScheduled("getPumpedUp", 0), + activityFinished("", 0, 1), + ]) + ).toMatchObject({ + result: Result.failed(new Timeout()), + commands: [], + }); + }); + + describe("complete external activity", () => { + test("finish", () => { + const wf = workflow(function* () { + const act = createActivityCall("getPumpedUp", [], undefined, 100); + yield createFinishActivityCall( + { + type: ActivityTargetType.ActivityToken, + activityToken: createActivityToken("exec1", 100), + }, + Result.failed(new Timeout()) + ); + return act; + }); + expect( + interpret(wf.definition(undefined, context), [ + activityScheduled("getPumpedUp", 0), + activityFinished("exec1", 100, 1), + ]) + ).toMatchObject({ + commands: [], + }); + }); + + test("command", () => { + const wf = workflow(function* () { + const act = createActivityCall("getPumpedUp", [], undefined, 100); + yield createFinishActivityCall( + { + type: ActivityTargetType.ActivityToken, + activityToken: createActivityToken("exec1", 100), + }, + Result.failed(new Timeout()) + ); + return act; + }); + expect( + interpret(wf.definition(undefined, context), [ + activityScheduled("getPumpedUp", 0), + ]) + ).toMatchObject({ + commands: [ + createFinishActivityCommand( + Result.failed(new Timeout()), + { + type: ActivityTargetType.ActivityToken, + activityToken: createActivityToken("exec1", 100), + }, + 1 + ), + ], + }); + }); + }); + }); +}); test("should throw when scheduled does not correspond to call", () => { expect(() => From 0871669a938861462d140543d695c6bd551eb423 Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Mon, 12 Dec 2022 20:32:39 -0600 Subject: [PATCH 2/9] change finish to override --- packages/@eventual/core/src/activity.ts | 6 ++-- .../@eventual/core/src/calls/activity-call.ts | 20 +++++------ packages/@eventual/core/src/command.ts | 14 ++++---- packages/@eventual/core/src/eventual.ts | 10 +++--- packages/@eventual/core/src/interpret.ts | 17 +++++---- .../core/src/runtime/command-executor.ts | 20 +++++------ .../src/runtime/handlers/activity-worker.ts | 6 ++-- .../@eventual/core/src/workflow-events.ts | 16 ++++----- packages/@eventual/core/test/command-util.ts | 16 ++++----- .../@eventual/core/test/interpret.test.ts | 36 +++++++++---------- 10 files changed, 82 insertions(+), 79 deletions(-) diff --git a/packages/@eventual/core/src/activity.ts b/packages/@eventual/core/src/activity.ts index 6ed50b06d..fbf744c45 100644 --- a/packages/@eventual/core/src/activity.ts +++ b/packages/@eventual/core/src/activity.ts @@ -1,6 +1,6 @@ import { createActivityCall, - createFinishActivityCall, + createOverrideActivityCall, } from "./calls/activity-call.js"; import { ActivityCancelled, EventualError } from "./error.js"; import { @@ -182,7 +182,7 @@ export function completeActivity = any>( result: ActivityOutput ): Promise { if (isOrchestratorWorker()) { - return createFinishActivityCall( + return createOverrideActivityCall( { type: ActivityTargetType.ActivityToken, activityToken, @@ -210,7 +210,7 @@ export function failActivity( const error = args.length === 1 ? args[0] : new EventualError(args[0], args[1]); if (isOrchestratorWorker()) { - return createFinishActivityCall( + return createOverrideActivityCall( { type: ActivityTargetType.ActivityToken, activityToken, diff --git a/packages/@eventual/core/src/calls/activity-call.ts b/packages/@eventual/core/src/calls/activity-call.ts index 3a88965fe..a57d9c6b1 100644 --- a/packages/@eventual/core/src/calls/activity-call.ts +++ b/packages/@eventual/core/src/calls/activity-call.ts @@ -38,13 +38,13 @@ export function createActivityCall( ); call.complete = function (result) { - return createFinishActivityCall( + return createOverrideActivityCall( { type: ActivityTargetType.OwnActivity, seq: this.seq! }, Result.resolved(result) ) as unknown as Promise; }; call.fail = function (...args) { - return createFinishActivityCall( + return createOverrideActivityCall( { type: ActivityTargetType.OwnActivity, seq: this.seq! }, Result.failed( args.length === 1 ? args[0] : new EventualError(args[0], args[1]) @@ -52,7 +52,7 @@ export function createActivityCall( ) as unknown as Promise; }; call.cancel = function (reason) { - return createFinishActivityCall( + return createOverrideActivityCall( { type: ActivityTargetType.OwnActivity, seq: this.seq! }, Result.failed(new ActivityCancelled(reason)) ) as unknown as Promise; @@ -69,22 +69,22 @@ export interface ActivityExecutionReference { complete: (result: T) => Promise; } -export function isFinishActivityCall(a: any): a is FinishActivityCall { - return isEventualOfKind(EventualKind.FinishActivityCall, a); +export function isOverrideActivityCall(a: any): a is OverrideActivityCall { + return isEventualOfKind(EventualKind.OverrideActivityCall, a); } -export interface FinishActivityCall - extends CommandCallBase { +export interface OverrideActivityCall + extends CommandCallBase { target: ActivityTarget; outcome: Resolved | Failed; } -export function createFinishActivityCall( +export function createOverrideActivityCall( target: ActivityTarget, outcome: Resolved | Failed -): FinishActivityCall { +): OverrideActivityCall { return registerEventual( - createEventual(EventualKind.FinishActivityCall, { + createEventual(EventualKind.OverrideActivityCall, { target, outcome, result: Result.resolved(undefined), diff --git a/packages/@eventual/core/src/command.ts b/packages/@eventual/core/src/command.ts index 09ebf3952..b9c678564 100644 --- a/packages/@eventual/core/src/command.ts +++ b/packages/@eventual/core/src/command.ts @@ -5,7 +5,7 @@ import { WorkflowOptions } from "./workflow.js"; export type Command = | ExpectSignalCommand - | FinishActivityCommand + | OverrideActivityCommand | ScheduleActivityCommand | ScheduleWorkflowCommand | PublishEventsCommand @@ -21,7 +21,7 @@ interface CommandBase { export enum CommandType { ExpectSignal = "ExpectSignal", - FinishActivity = "FinishActivity", + OverrideActivity = "OverrideActivity", PublishEvents = "PublishEvents", SendSignal = "SendSignal", SleepFor = "SleepFor", @@ -51,16 +51,16 @@ export function isScheduleActivityCommand( return a.kind === CommandType.StartActivity; } -export interface FinishActivityCommand - extends CommandBase { +export interface OverrideActivityCommand + extends CommandBase { target: ActivityTarget; outcome: Resolved | Failed; } -export function isFinishActivityCommand( +export function isOverrideActivityCommand( a: Command -): a is FinishActivityCommand { - return a.kind === CommandType.FinishActivity; +): a is OverrideActivityCommand { + return a.kind === CommandType.OverrideActivity; } // TODO support a timeout at the parent workflow level. The current timeout fails the whole workflow and not just the waiter. diff --git a/packages/@eventual/core/src/eventual.ts b/packages/@eventual/core/src/eventual.ts index d4df79f16..33d42a58e 100644 --- a/packages/@eventual/core/src/eventual.ts +++ b/packages/@eventual/core/src/eventual.ts @@ -1,8 +1,8 @@ import { ActivityCall, - FinishActivityCall, + OverrideActivityCall, isActivityCall, - isFinishActivityCall, + isOverrideActivityCall, } from "./calls/activity-call.js"; import { AwaitAll, createAwaitAll } from "./await-all.js"; import { chain, Chain } from "./chain.js"; @@ -62,7 +62,7 @@ export enum EventualKind { Chain = 2, ConditionCall = 9, ExpectSignalCall = 6, - FinishActivityCall = 14, + OverrideActivityCall = 14, PublishEventsCall = 13, Race = 11, RegisterSignalHandlerCall = 7, @@ -106,7 +106,7 @@ export type CommandCall = | ActivityCall | ConditionCall | ExpectSignalCall - | FinishActivityCall + | OverrideActivityCall | RegisterSignalHandlerCall | PublishEventsCall | SendSignalCall @@ -119,7 +119,7 @@ export function isCommandCall(call: Eventual): call is CommandCall { isActivityCall(call) || isConditionCall(call) || isExpectSignalCall(call) || - isFinishActivityCall(call) || + isOverrideActivityCall(call) || isPublishEventsCall(call) || isRegisterSignalHandlerCall(call) || isSendSignalCall(call) || diff --git a/packages/@eventual/core/src/interpret.ts b/packages/@eventual/core/src/interpret.ts index 8faa2ff12..47f685963 100644 --- a/packages/@eventual/core/src/interpret.ts +++ b/packages/@eventual/core/src/interpret.ts @@ -6,7 +6,10 @@ import { EventualCallCollector, } from "./eventual.js"; import { isAwaitAll } from "./await-all.js"; -import { isActivityCall, isFinishActivityCall } from "./calls/activity-call.js"; +import { + isActivityCall, + isOverrideActivityCall, +} from "./calls/activity-call.js"; import { DeterminismError, HeartbeatTimeout, @@ -36,7 +39,7 @@ import { isActivityTimedOut, isActivityHeartbeatTimedOut, isEventsPublished, - isActivityFinished, + isActivityOverridden, } from "./workflow-events.js"; import { Result, @@ -242,9 +245,9 @@ export function interpret( seq: call.seq!, events: call.events, }; - } else if (isFinishActivityCall(call)) { + } else if (isOverrideActivityCall(call)) { return { - kind: CommandType.FinishActivity, + kind: CommandType.OverrideActivity, seq: call.seq!, outcome: call.outcome, target: call.target, @@ -278,7 +281,7 @@ export function interpret( if (isCommandCall(activity)) { if (isExpectSignalCall(activity)) { subscribeToSignal(activity.signalId, activity); - } else if (isFinishActivityCall(activity)) { + } else if (isOverrideActivityCall(activity)) { if (activity.target.type === ActivityTargetType.OwnActivity) { const act = callTable[activity.target.seq]; if (act === undefined) { @@ -572,8 +575,8 @@ function isCorresponding(event: ScheduledEvent, call: CommandCall) { return isConditionCall(call); } else if (isEventsPublished(event)) { return isPublishEventsCall(call); - } else if (isActivityFinished(event)) { - return isFinishActivityCall(call); + } else if (isActivityOverridden(event)) { + return isOverrideActivityCall(call); } return assertNever(event); } diff --git a/packages/@eventual/core/src/runtime/command-executor.ts b/packages/@eventual/core/src/runtime/command-executor.ts index 8088c01d1..831472e19 100644 --- a/packages/@eventual/core/src/runtime/command-executor.ts +++ b/packages/@eventual/core/src/runtime/command-executor.ts @@ -1,9 +1,9 @@ import { Command, ExpectSignalCommand, - FinishActivityCommand, + OverrideActivityCommand, isExpectSignalCommand, - isFinishActivityCommand, + isOverrideActivityCommand, isPublishEventsCommand, isScheduleActivityCommand, isScheduleWorkflowCommand, @@ -33,7 +33,7 @@ import { ConditionStarted, ConditionTimedOut, SignalSent, - ActivityFinished, + ActivityOverridden, ActivityCompleted, ActivityFailed, } from "../workflow-events.js"; @@ -94,8 +94,8 @@ export class CommandExecutor { return startCondition(command); } else if (isPublishEventsCommand(command)) { return publishEvents(command); - } else if (isFinishActivityCommand(command)) { - return finishActivity(command); + } else if (isOverrideActivityCommand(command)) { + return overrideActivity(command); } else { return assertNever(command, `unknown command type`); } @@ -251,17 +251,17 @@ export class CommandExecutor { }); } - async function finishActivity(command: FinishActivityCommand) { + async function overrideActivity(command: OverrideActivityCommand) { if (command.target.type === ActivityTargetType.OwnActivity) { await self.props.activityRuntimeClient.closeActivity( executionId, command.target.seq ); - return createEvent({ + return createEvent({ executionId, activitySeq: command.target.seq, seq: command.seq, - type: WorkflowEventType.ActivityFinished, + type: WorkflowEventType.ActivityOverridden, }); } else { const data = decodeActivityToken(command.target.activityToken); @@ -291,11 +291,11 @@ export class CommandExecutor { }) ); } - return createEvent({ + return createEvent({ executionId: data.payload.executionId, activitySeq: data.payload.seq, seq: command.seq, - type: WorkflowEventType.ActivityFinished, + type: WorkflowEventType.ActivityOverridden, }); } } diff --git a/packages/@eventual/core/src/runtime/handlers/activity-worker.ts b/packages/@eventual/core/src/runtime/handlers/activity-worker.ts index cd192e036..8275af571 100644 --- a/packages/@eventual/core/src/runtime/handlers/activity-worker.ts +++ b/packages/@eventual/core/src/runtime/handlers/activity-worker.ts @@ -174,7 +174,7 @@ export function createActivityWorker({ endTime ); - await finishActivity( + await overrideActivity( event, recordAge + (endTime.getTime() - start.getTime()) ); @@ -200,7 +200,7 @@ export function createActivityWorker({ endTime ); - await finishActivity( + await overrideActivity( event, recordAge + (endTime.getTime() - start.getTime()) ); @@ -223,7 +223,7 @@ export function createActivityWorker({ metrics.putMetric(ActivityMetrics.TotalDuration, duration); } - async function finishActivity( + async function overrideActivity( event: ActivityCompleted | ActivityFailed, duration: number ) { diff --git a/packages/@eventual/core/src/workflow-events.ts b/packages/@eventual/core/src/workflow-events.ts index 45a5a2bad..076f201ee 100644 --- a/packages/@eventual/core/src/workflow-events.ts +++ b/packages/@eventual/core/src/workflow-events.ts @@ -19,7 +19,7 @@ export interface HistoryEventBase extends Omit { export enum WorkflowEventType { ActivityCompleted = "ActivityCompleted", ActivityFailed = "ActivityFailed", - ActivityFinished = "ActivityFinished", + ActivityOverridden = "ActivityOverridden", ActivityHeartbeatTimedOut = "ActivityHeartbeatTimedOut", ActivityScheduled = "ActivityScheduled", ActivityTimedOut = "ActivityTimedOut", @@ -56,7 +56,7 @@ export type WorkflowEvent = export type ScheduledEvent = | ActivityScheduled - | ActivityFinished + | ActivityOverridden | ChildWorkflowScheduled | ConditionStarted | EventsPublished @@ -141,8 +141,8 @@ export interface ActivityCompleted extends HistoryEventBase { * Event generated when the workflow calls complete, fail, * or cancel on it's activity or the activity of another execution. */ -export interface ActivityFinished extends HistoryEventBase { - type: WorkflowEventType.ActivityFinished; +export interface ActivityOverridden extends HistoryEventBase { + type: WorkflowEventType.ActivityOverridden; executionId: string; activitySeq: number; } @@ -213,10 +213,10 @@ export function isActivityCompleted( return event.type === WorkflowEventType.ActivityCompleted; } -export function isActivityFinished( +export function isActivityOverridden( event: WorkflowEvent -): event is ActivityFinished { - return event.type === WorkflowEventType.ActivityFinished; +): event is ActivityOverridden { + return event.type === WorkflowEventType.ActivityOverridden; } export function isActivityFailed( @@ -395,7 +395,7 @@ export function isWorkflowTimedOut( export const isScheduledEvent = or( isActivityScheduled, - isActivityFinished, + isActivityOverridden, isChildWorkflowScheduled, isConditionStarted, isEventsPublished, diff --git a/packages/@eventual/core/test/command-util.ts b/packages/@eventual/core/test/command-util.ts index a5e3edc2e..37b23743c 100644 --- a/packages/@eventual/core/test/command-util.ts +++ b/packages/@eventual/core/test/command-util.ts @@ -2,7 +2,7 @@ import { ulid } from "ulidx"; import { CommandType, ExpectSignalCommand, - FinishActivityCommand, + OverrideActivityCommand, PublishEventsCommand, ScheduleActivityCommand, ScheduleWorkflowCommand, @@ -32,7 +32,7 @@ import { WorkflowEventType, WorkflowTimedOut, ActivityHeartbeatTimedOut, - ActivityFinished, + ActivityOverridden, } from "../src/workflow-events.js"; import { SignalTarget } from "../src/signals.js"; import { Failed, Resolved } from "../src/result.js"; @@ -73,13 +73,13 @@ export function createScheduledActivityCommand( }; } -export function createFinishActivityCommand( +export function createOverrideActivityCommand( outcome: Resolved | Failed, target: ActivityTarget, seq: number -): FinishActivityCommand { +): OverrideActivityCommand { return { - kind: CommandType.FinishActivity, + kind: CommandType.OverrideActivity, seq, outcome, target, @@ -208,13 +208,13 @@ export function activityScheduled( }; } -export function activityFinished( +export function activityOverridden( executionId: string, activitySeq: number, seq: number -): ActivityFinished { +): ActivityOverridden { return { - type: WorkflowEventType.ActivityFinished, + type: WorkflowEventType.ActivityOverridden, executionId, activitySeq, seq, diff --git a/packages/@eventual/core/test/interpret.test.ts b/packages/@eventual/core/test/interpret.test.ts index 573a4aed3..34ab7ba3d 100644 --- a/packages/@eventual/core/test/interpret.test.ts +++ b/packages/@eventual/core/test/interpret.test.ts @@ -1,6 +1,6 @@ import { createActivityCall, - createFinishActivityCall, + createOverrideActivityCall, } from "../src/calls/activity-call.js"; import { chain } from "../src/chain.js"; import { DeterminismError, HeartbeatTimeout, Timeout } from "../src/error.js"; @@ -31,7 +31,7 @@ import { import { activityCompleted, activityFailed, - activityFinished, + activityOverridden as activityOverridden, activityHeartbeatTimedOut, activityScheduled, activityTimedOut, @@ -39,7 +39,7 @@ import { conditionStarted, conditionTimedOut, createExpectSignalCommand, - createFinishActivityCommand, + createOverrideActivityCommand as createOverrideActivityCommand, createPublishEventCommand, createScheduledActivityCommand, createScheduledWorkflowCommand, @@ -312,25 +312,25 @@ describe("activity", () => { }); }); - describe("finish activity", () => { + describe("override activity", () => { describe("complete own activity", () => { const wf = workflow(function* () { const act = createActivityCall("getPumpedUp", [], undefined, 100); yield createSleepForCall(100); - yield createFinishActivityCall( + yield createOverrideActivityCall( { seq: 0, type: ActivityTargetType.OwnActivity }, Result.resolved("hi") ); return act; }); - test("finish first", () => { + test("override first", () => { expect( interpret(wf.definition(undefined, context), [ activityScheduled("getPumpedUp", 0), scheduledSleep("", 1), completedSleep(1), - activityFinished("", 0, 2), + activityOverridden("", 0, 2), ]) ).toMatchObject({ result: Result.resolved("hi"), @@ -338,14 +338,14 @@ describe("activity", () => { }); }); - test("complete own after real complete", () => { + test("override own after real complete", () => { expect( interpret(wf.definition(undefined, context), [ activityScheduled("getPumpedUp", 0), scheduledSleep("", 1), activityCompleted("bye", 0), completedSleep(1), - activityFinished("", 0, 2), + activityOverridden("", 0, 2), ]) ).toMatchObject({ result: Result.resolved("bye"), @@ -354,10 +354,10 @@ describe("activity", () => { }); }); - test("fail own activity", () => { + test("override with fail own activity", () => { const wf = workflow(function* () { const act = createActivityCall("getPumpedUp", [], undefined, 100); - yield createFinishActivityCall( + yield createOverrideActivityCall( { seq: 0, type: ActivityTargetType.OwnActivity }, Result.failed(new Timeout()) ); @@ -366,7 +366,7 @@ describe("activity", () => { expect( interpret(wf.definition(undefined, context), [ activityScheduled("getPumpedUp", 0), - activityFinished("", 0, 1), + activityOverridden("", 0, 1), ]) ).toMatchObject({ result: Result.failed(new Timeout()), @@ -374,11 +374,11 @@ describe("activity", () => { }); }); - describe("complete external activity", () => { - test("finish", () => { + describe("override external activity", () => { + test("override", () => { const wf = workflow(function* () { const act = createActivityCall("getPumpedUp", [], undefined, 100); - yield createFinishActivityCall( + yield createOverrideActivityCall( { type: ActivityTargetType.ActivityToken, activityToken: createActivityToken("exec1", 100), @@ -390,7 +390,7 @@ describe("activity", () => { expect( interpret(wf.definition(undefined, context), [ activityScheduled("getPumpedUp", 0), - activityFinished("exec1", 100, 1), + activityOverridden("exec1", 100, 1), ]) ).toMatchObject({ commands: [], @@ -400,7 +400,7 @@ describe("activity", () => { test("command", () => { const wf = workflow(function* () { const act = createActivityCall("getPumpedUp", [], undefined, 100); - yield createFinishActivityCall( + yield createOverrideActivityCall( { type: ActivityTargetType.ActivityToken, activityToken: createActivityToken("exec1", 100), @@ -415,7 +415,7 @@ describe("activity", () => { ]) ).toMatchObject({ commands: [ - createFinishActivityCommand( + createOverrideActivityCommand( Result.failed(new Timeout()), { type: ActivityTargetType.ActivityToken, From 073d78927478d7b7d131224a05a3fcebc16f4d16 Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Mon, 12 Dec 2022 21:26:40 -0600 Subject: [PATCH 3/9] override test --- apps/tests/aws-runtime/test/test-service.ts | 141 +++++++++++++++++++- apps/tests/aws-runtime/test/tester.test.ts | 22 ++- packages/@eventual/core/src/activity.ts | 4 +- 3 files changed, 160 insertions(+), 7 deletions(-) diff --git a/apps/tests/aws-runtime/test/test-service.ts b/apps/tests/aws-runtime/test/test-service.ts index 412f6c18d..1476a5d7a 100644 --- a/apps/tests/aws-runtime/test/test-service.ts +++ b/apps/tests/aws-runtime/test/test-service.ts @@ -11,6 +11,8 @@ import { workflow, heartbeat, HeartbeatTimeout, + completeActivity, + failActivity, } from "@eventual/core"; import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs"; import { AsyncWriterTestEvent } from "./async-writer-handler.js"; @@ -148,11 +150,13 @@ export const childWorkflow = workflow( } ); -const slowActivity = activity( - "slowAct", - { timeoutSeconds: 5 }, - () => new Promise((resolve) => setTimeout(resolve, 10 * 1000)) -); +const delay = (seconds: number) => + new Promise((resolve) => setTimeout(resolve, seconds * 1000)); + +const slowActivity = activity("slowAct", { timeoutSeconds: 5 }, async () => { + await delay(10); + return "done finally"; +}); const slowWf = workflow("slowWorkflow", { timeoutSeconds: 5 }, () => sleepFor(10) @@ -332,3 +336,130 @@ const sendFinishEvent = activity("sendFinish", async (executionId: string) => { proxy: true, }); }); + +const activityOverrideEvent = event<{ + executionId: string; + token: string; + location: "handler" | "signal"; + type: "complete" | "fail"; +}>("activityOverrideEvent"); + +const activityOverrideActivity = activity( + "eventPublish", + async ({ + type, + location, + executionId, + }: { + executionId: string; + location: "handler" | "signal"; + type: "complete" | "fail"; + }) => { + return asyncResult( + async (token) => + await activityOverrideEvent.publish({ + token, + location, + type, + executionId, + }) + ); + } +); + +const activityOverrideSignal = new Signal<{ + token: string; + type: "complete" | "fail"; +}>("activityOverrideSignal"); + +activityOverrideEvent.on(async ({ token, location, type, executionId }) => { + if (location === "handler") { + if (type === "complete") { + await completeActivity(token, "from the event handler!"); + } else { + await failActivity(token, new Error("WHY!!!")); + } + } else { + await activityOverrideSignal.send(executionId, { token, type }); + } +}); + +/** + * Activity which waits to be closed/cancelled. + * If it is closed, it signals the workflow with "complete", if not it signals with "fail". + */ +const activityOverrideAwareActivity = activity( + "overrideAware", + async ({ executionId }: { executionId: string }) => { + let n = 0; + while (n < 10) { + await delay(1); + const { closed } = await heartbeat(); + if (closed) { + await activityOverrideSignal.send(executionId, { + token: "", + type: "complete", + }); + return; + } + } + + await activityOverrideSignal.send(executionId, { token: "", type: "fail" }); + } +); + +export const overrideWorkflow = workflow( + "override", + async (_, { execution: { id: executionId } }) => { + const act = slowActivity(); + const act2 = slowActivity(); + const act3 = slowActivity(); + act.cancel("because"); + act2.fail(new Error("ahhh")); + act3.complete("hi!"); + + const results1 = await Promise.allSettled([act, act2, act3]); + + const signalHandler = activityOverrideSignal.on(async ({ token, type }) => { + if (type === "complete") { + await completeActivity(token, "from the signal handler!"); + } else { + await failActivity(token, new Error("BECAUSE!!!")); + } + }); + + const results2 = await Promise.allSettled([ + activityOverrideActivity({ + location: "handler", + type: "complete", + executionId, + }), + activityOverrideActivity({ + location: "handler", + type: "fail", + executionId, + }), + activityOverrideActivity({ + location: "signal", + type: "complete", + executionId, + }), + activityOverrideActivity({ + location: "signal", + type: "fail", + executionId, + }), + ]); + + const aware = activityOverrideAwareActivity({ executionId }); + await sleepFor(1); + aware.cancel("because"); + + signalHandler.dispose(); + + // the override activity SHOULD send a signal when it realizes it is cancelled. + const result = await expectSignal(activityOverrideSignal); + + return [results1, results2, result]; + } +); diff --git a/apps/tests/aws-runtime/test/tester.test.ts b/apps/tests/aws-runtime/test/tester.test.ts index 93e018ab9..635f5ea9c 100644 --- a/apps/tests/aws-runtime/test/tester.test.ts +++ b/apps/tests/aws-runtime/test/tester.test.ts @@ -1,4 +1,8 @@ -import { HeartbeatTimeout } from "@eventual/core"; +import { + ActivityCancelled, + EventualError, + HeartbeatTimeout, +} from "@eventual/core"; import { eventualRuntimeTestHarness } from "./runtime-test-harness.js"; import { eventDrivenWorkflow, @@ -10,6 +14,7 @@ import { workflow2, workflow3, workflow4, + overrideWorkflow, } from "./test-service.js"; jest.setTimeout(100 * 1000); @@ -62,4 +67,19 @@ eventualRuntimeTestHarness(({ testCompletion }) => { ]); testCompletion("event-driven", eventDrivenWorkflow, "done!"); + + testCompletion("overrideActivities", overrideWorkflow, [ + [ + { status: "rejected", reason: new ActivityCancelled("because") }, + { status: "rejected", reason: new EventualError("ahhh", "because") }, + { status: "fulfilled", value: "hi!" }, + ], + [ + { status: "fulfilled", value: "from the event handler!" }, + { status: "rejected", reason: new EventualError("Error", "WHY!!!") }, + { status: "fulfilled", value: "from the signal handler!" }, + { status: "rejected", reason: new EventualError("Error", "BECAUSE!!!") }, + ], + { token: "", type: "complete" }, + ]); }); diff --git a/packages/@eventual/core/src/activity.ts b/packages/@eventual/core/src/activity.ts index fbf744c45..c56f005c9 100644 --- a/packages/@eventual/core/src/activity.ts +++ b/packages/@eventual/core/src/activity.ts @@ -1,4 +1,5 @@ import { + ActivityExecutionReference, createActivityCall, createOverrideActivityCall, } from "./calls/activity-call.js"; @@ -35,7 +36,8 @@ export interface ActivityFunction< Arguments extends any[], Output extends any = any > { - (...args: Arguments): Promise>>; + (...args: Arguments): Promise>> & + ActivityExecutionReference>; } export interface ActivityHandler< From bffc121e9d7be48cf5dcf50e432bbe4b7145c421 Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Mon, 12 Dec 2022 22:33:12 -0600 Subject: [PATCH 4/9] tests pass --- apps/tests/aws-runtime/test/tester.test.ts | 24 ++++++++++++++----- packages/@eventual/aws-cdk/src/activities.ts | 2 ++ packages/@eventual/core/src/activity.ts | 2 +- packages/@eventual/core/src/interpret.ts | 11 +++++++-- .../src/runtime/handlers/activity-worker.ts | 8 +++---- packages/@eventual/core/src/signals.ts | 1 + 6 files changed, 35 insertions(+), 13 deletions(-) diff --git a/apps/tests/aws-runtime/test/tester.test.ts b/apps/tests/aws-runtime/test/tester.test.ts index 635f5ea9c..02fcea5bc 100644 --- a/apps/tests/aws-runtime/test/tester.test.ts +++ b/apps/tests/aws-runtime/test/tester.test.ts @@ -36,7 +36,7 @@ eventualRuntimeTestHarness(({ testCompletion }) => { { status: "fulfilled", value: ["HELLO SAM", "HELLO CHRIS", "HELLO SAM"] }, { status: "fulfilled", value: ["hello sam", "hello chris", "hello sam"] }, { status: "fulfilled", value: "hello sam" }, - { status: "rejected", reason: "Error" }, + { status: "rejected", reason: { name: "Error", message: "failed" } }, ]); testCompletion("parent-child", parentWorkflow, "done"); @@ -50,7 +50,10 @@ eventualRuntimeTestHarness(({ testCompletion }) => { testCompletion("asyncActivities", asyncWorkflow, [ "hello from the async writer!", - "AsyncWriterError", + { + name: "AsyncWriterError", + message: "I was told to fail this activity, sorry.", + }, ]); testCompletion("heartbeat", heartbeatWorkflow, 10, [ @@ -70,15 +73,24 @@ eventualRuntimeTestHarness(({ testCompletion }) => { testCompletion("overrideActivities", overrideWorkflow, [ [ - { status: "rejected", reason: new ActivityCancelled("because") }, - { status: "rejected", reason: new EventualError("ahhh", "because") }, + { status: "rejected", reason: new ActivityCancelled("because").toJSON() }, + { + status: "rejected", + reason: new EventualError("Error", "ahhh").toJSON(), + }, { status: "fulfilled", value: "hi!" }, ], [ { status: "fulfilled", value: "from the event handler!" }, - { status: "rejected", reason: new EventualError("Error", "WHY!!!") }, + { + status: "rejected", + reason: new EventualError("Error", "WHY!!!").toJSON(), + }, { status: "fulfilled", value: "from the signal handler!" }, - { status: "rejected", reason: new EventualError("Error", "BECAUSE!!!") }, + { + status: "rejected", + reason: new EventualError("Error", "BECAUSE!!!").toJSON(), + }, ], { token: "", type: "complete" }, ]); diff --git a/packages/@eventual/aws-cdk/src/activities.ts b/packages/@eventual/aws-cdk/src/activities.ts index 151e47171..f5029b435 100644 --- a/packages/@eventual/aws-cdk/src/activities.ts +++ b/packages/@eventual/aws-cdk/src/activities.ts @@ -87,10 +87,12 @@ export class Activities extends Construct implements IActivities, IGrantable { public configureCompleteActivity(func: Function) { this.props.workflows.configureSendWorkflowEvent(func); + this.configureUpdateActivity(func); } public grantCompleteActivity(grantable: IGrantable) { this.props.workflows.grantSendWorkflowEvent(grantable); + this.grantUpdateActivity(grantable); } public configureUpdateActivity(func: Function) { diff --git a/packages/@eventual/core/src/activity.ts b/packages/@eventual/core/src/activity.ts index c56f005c9..74b7ecb77 100644 --- a/packages/@eventual/core/src/activity.ts +++ b/packages/@eventual/core/src/activity.ts @@ -189,7 +189,7 @@ export function completeActivity = any>( type: ActivityTargetType.ActivityToken, activityToken, }, - result + Result.resolved(result) ) as any; } else { return getWorkflowClient().completeActivity({ activityToken, result }); diff --git a/packages/@eventual/core/src/interpret.ts b/packages/@eventual/core/src/interpret.ts index 47f685963..09c70b5d1 100644 --- a/packages/@eventual/core/src/interpret.ts +++ b/packages/@eventual/core/src/interpret.ts @@ -12,6 +12,7 @@ import { } from "./calls/activity-call.js"; import { DeterminismError, + EventualError, HeartbeatTimeout, SynchronousOperationError, Timeout, @@ -491,7 +492,13 @@ export function interpret( (r): PromiseFulfilledResult | PromiseRejectedResult => isResolved(r) ? { status: "fulfilled", value: r.value } - : { status: "rejected", reason: r.error } + : { + status: "rejected", + reason: + r.error instanceof Error + ? new EventualError(r.error.name, r.error.message) + : r.error, + } ) ); } @@ -554,7 +561,7 @@ export function interpret( ? Result.failed(new Timeout("Activity Timed Out")) : isActivityHeartbeatTimedOut(event) ? Result.failed(new HeartbeatTimeout("Activity Heartbeat TimedOut")) - : Result.failed(event.error); + : Result.failed(new EventualError(event.error, event.message)); } } diff --git a/packages/@eventual/core/src/runtime/handlers/activity-worker.ts b/packages/@eventual/core/src/runtime/handlers/activity-worker.ts index 8275af571..972be5c3c 100644 --- a/packages/@eventual/core/src/runtime/handlers/activity-worker.ts +++ b/packages/@eventual/core/src/runtime/handlers/activity-worker.ts @@ -174,7 +174,7 @@ export function createActivityWorker({ endTime ); - await overrideActivity( + await finishActivity( event, recordAge + (endTime.getTime() - start.getTime()) ); @@ -200,7 +200,7 @@ export function createActivityWorker({ endTime ); - await overrideActivity( + await finishActivity( event, recordAge + (endTime.getTime() - start.getTime()) ); @@ -223,7 +223,7 @@ export function createActivityWorker({ metrics.putMetric(ActivityMetrics.TotalDuration, duration); } - async function overrideActivity( + async function finishActivity( event: ActivityCompleted | ActivityFailed, duration: number ) { @@ -237,7 +237,7 @@ export function createActivityWorker({ ) ); // if an activity is closed, do not send the result on completion. - if (alreadyClosed) { + if (!alreadyClosed) { await timed(metrics, ActivityMetrics.SubmitWorkflowTaskDuration, () => workflowClient.submitWorkflowTask(request.executionId, event) ); diff --git a/packages/@eventual/core/src/signals.ts b/packages/@eventual/core/src/signals.ts index 482a40a5a..7a1438e7f 100644 --- a/packages/@eventual/core/src/signals.ts +++ b/packages/@eventual/core/src/signals.ts @@ -252,6 +252,7 @@ export function sendSignal( return getWorkflowClient().sendSignal({ executionId, signal, + payload, id: id ?? ulid(), }); } From b7ce50895f93e8277d71fc680502a9dd89673d6f Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Mon, 12 Dec 2022 22:40:56 -0600 Subject: [PATCH 5/9] docs --- packages/@eventual/core/src/activity.ts | 50 ++++++++++++++++++- .../@eventual/core/src/calls/activity-call.ts | 16 +++--- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/packages/@eventual/core/src/activity.ts b/packages/@eventual/core/src/activity.ts index 74b7ecb77..036130c6d 100644 --- a/packages/@eventual/core/src/activity.ts +++ b/packages/@eventual/core/src/activity.ts @@ -1,5 +1,4 @@ import { - ActivityExecutionReference, createActivityCall, createOverrideActivityCall, } from "./calls/activity-call.js"; @@ -179,6 +178,41 @@ export interface ActivityTokenTarget { activityToken: string; } +export interface ActivityExecutionReference { + /** + * Cancel this activity. + * + * The activity will reject with a {@link ActivityCancelled} error. + * + * If the activity is calling {@link heartbeat}, closed: true will be + * return to signal the workflow considers the activity finished. + */ + cancel: (reason: string) => Promise; + /** + * Causes the activity to reject with the provided value within the workflow. + * + * If the activity is calling {@link heartbeat}, closed: true will be + * return to signal the workflow considers the activity finished. + */ + + fail: ( + ...args: [error: Error] | [error: string, message: string] + ) => Promise; + /** + * Causes the activity to resolve the provided value to the workflow. + * + * If the activity is calling {@link heartbeat}, closed: true will be + * return to signal the workflow considers the activity finished. + */ + complete: (result: T) => Promise; +} + +/** + * Causes the activity to resolve the provided value to the workflow. + * + * If the activity is calling {@link heartbeat}, closed: true will be + * return to signal the workflow considers the activity finished. + */ export function completeActivity = any>( activityToken: string, result: ActivityOutput @@ -196,6 +230,12 @@ export function completeActivity = any>( } } +/** + * Causes the activity to reject with the provided value within the workflow. + * + * If the activity is calling {@link heartbeat}, closed: true will be + * return to signal the workflow considers the activity finished. + */ export function failActivity( activityToken: string, error: Error @@ -228,6 +268,14 @@ export function failActivity( } } +/** + * Cancel any activity using it's activityToken. + * + * The activity will reject with a {@link ActivityCancelled} error. + * + * If the activity is calling {@link heartbeat}, closed: true will be + * return to signal the workflow considers the activity finished. + */ export function cancelActivity( activityToken: string, reason: string diff --git a/packages/@eventual/core/src/calls/activity-call.ts b/packages/@eventual/core/src/calls/activity-call.ts index a57d9c6b1..f96cd2dae 100644 --- a/packages/@eventual/core/src/calls/activity-call.ts +++ b/packages/@eventual/core/src/calls/activity-call.ts @@ -1,4 +1,8 @@ -import { ActivityTarget, ActivityTargetType } from "../activity.js"; +import { + ActivityExecutionReference, + ActivityTarget, + ActivityTargetType, +} from "../activity.js"; import { ActivityCancelled, EventualError } from "../error.js"; import { EventualKind, @@ -15,7 +19,7 @@ export function isActivityCall(a: any): a is ActivityCall { export interface ActivityCall extends CommandCallBase | Failed>, - ActivityExecutionReference { + ActivityExecutionReference { name: string; args: any[]; heartbeatSeconds?: number; @@ -61,14 +65,6 @@ export function createActivityCall( return call; } -export interface ActivityExecutionReference { - cancel: (reason: string) => Promise; - fail: ( - ...args: [error: Error] | [error: string, message: string] - ) => Promise; - complete: (result: T) => Promise; -} - export function isOverrideActivityCall(a: any): a is OverrideActivityCall { return isEventualOfKind(EventualKind.OverrideActivityCall, a); } From 046ff4f8ab705450db81df1f87907eb2a3ab6c01 Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Mon, 12 Dec 2022 22:58:32 -0600 Subject: [PATCH 6/9] fixed tests for new errors --- packages/@eventual/core/src/error.ts | 2 +- packages/@eventual/core/src/interpret.ts | 5 +++- .../@eventual/core/test/interpret.test.ts | 27 +++++++++++++------ 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/packages/@eventual/core/src/error.ts b/packages/@eventual/core/src/error.ts index e340a7fb4..c319827e0 100644 --- a/packages/@eventual/core/src/error.ts +++ b/packages/@eventual/core/src/error.ts @@ -9,7 +9,7 @@ export class EventualError extends Error { toJSON() { return { name: this.name, - message: this.message, + ...(this.message ? { message: this.message } : {}), }; } } diff --git a/packages/@eventual/core/src/interpret.ts b/packages/@eventual/core/src/interpret.ts index 09c70b5d1..73b8c4eb7 100644 --- a/packages/@eventual/core/src/interpret.ts +++ b/packages/@eventual/core/src/interpret.ts @@ -496,7 +496,10 @@ export function interpret( status: "rejected", reason: r.error instanceof Error - ? new EventualError(r.error.name, r.error.message) + ? new EventualError( + r.error.name, + r.error.message ?? undefined + ) : r.error, } ) diff --git a/packages/@eventual/core/test/interpret.test.ts b/packages/@eventual/core/test/interpret.test.ts index 34ab7ba3d..ae40ca295 100644 --- a/packages/@eventual/core/test/interpret.test.ts +++ b/packages/@eventual/core/test/interpret.test.ts @@ -3,7 +3,12 @@ import { createOverrideActivityCall, } from "../src/calls/activity-call.js"; import { chain } from "../src/chain.js"; -import { DeterminismError, HeartbeatTimeout, Timeout } from "../src/error.js"; +import { + DeterminismError, + EventualError, + HeartbeatTimeout, + Timeout, +} from "../src/error.js"; import { ActivityTargetType, Context, @@ -166,7 +171,13 @@ test("should catch error of failed Activity", () => { activityFailed("error", 0), ]) ).toMatchObject({ - commands: [createScheduledActivityCommand("handle-error", ["error"], 1)], + commands: [ + createScheduledActivityCommand( + "handle-error", + [new EventualError("error").toJSON()], + 1 + ), + ], }); }); @@ -1066,7 +1077,7 @@ describe("Race", () => { activityCompleted("B", 1), ]) ).toMatchObject({ - result: Result.failed("A"), + result: Result.failed(new EventualError("A").toJSON()), }); expect( @@ -1076,7 +1087,7 @@ describe("Race", () => { activityFailed("B", 1), ]) ).toMatchObject({ - result: Result.failed("B"), + result: Result.failed(new EventualError("B").toJSON()), }); }); }); @@ -1146,8 +1157,8 @@ describe("AwaitAllSettled", () => { ]) ).toMatchObject[]>>({ result: Result.resolved([ - { status: "rejected", reason: "A" }, - { status: "rejected", reason: "B" }, + { status: "rejected", reason: new EventualError("A").toJSON() }, + { status: "rejected", reason: new EventualError("B").toJSON() }, ]), commands: [], }); @@ -1161,7 +1172,7 @@ describe("AwaitAllSettled", () => { ]) ).toMatchObject[]>>({ result: Result.resolved([ - { status: "rejected", reason: "A" }, + { status: "rejected", reason: new EventualError("A").toJSON() }, { status: "fulfilled", value: "B" }, ]), commands: [], @@ -1434,7 +1445,7 @@ test("workflow calling other workflow", () => { workflowFailed("error", 0), ]) ).toMatchObject({ - result: Result.failed("error"), + result: Result.failed(new EventualError("error").toJSON()), commands: [], }); }); From d879d04830c9f90097e656a7620da9976e7e34ae Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Tue, 13 Dec 2022 00:41:54 -0600 Subject: [PATCH 7/9] remove return --- packages/@eventual/core/src/activity.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/@eventual/core/src/activity.ts b/packages/@eventual/core/src/activity.ts index 036130c6d..f7960f7fa 100644 --- a/packages/@eventual/core/src/activity.ts +++ b/packages/@eventual/core/src/activity.ts @@ -194,7 +194,6 @@ export interface ActivityExecutionReference { * If the activity is calling {@link heartbeat}, closed: true will be * return to signal the workflow considers the activity finished. */ - fail: ( ...args: [error: Error] | [error: string, message: string] ) => Promise; From 398c6539fe3c0d743a1721952716eab823962749 Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Tue, 13 Dec 2022 00:51:46 -0600 Subject: [PATCH 8/9] support activity.complete from within a workflow --- packages/@eventual/core/src/activity.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/@eventual/core/src/activity.ts b/packages/@eventual/core/src/activity.ts index 614f3a36a..0d6e2ccbc 100644 --- a/packages/@eventual/core/src/activity.ts +++ b/packages/@eventual/core/src/activity.ts @@ -1,4 +1,7 @@ -import { createActivityCall, createOverrideActivityCall } from "./calls/activity-call.js"; +import { + createActivityCall, + createOverrideActivityCall, +} from "./calls/activity-call.js"; import { ActivityCancelled, EventualError } from "./error.js"; import { callableActivities, @@ -181,8 +184,8 @@ export function activity( Output >; } - func.complete = async function (request) { - return getWorkflowClient().completeActivity(request); + func.complete = function (request) { + return completeActivity(request.activityToken, request.result); }; return func; } From d8a5175de24d00ea35c08786085cf3f672b07f8e Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Tue, 13 Dec 2022 01:02:22 -0600 Subject: [PATCH 9/9] remove complete and fail activity from workflow. --- apps/tests/aws-runtime/test/test-service.ts | 24 ++++++++--- apps/tests/aws-runtime/test/tester.test.ts | 9 +---- packages/@eventual/core/src/activity.ts | 40 +------------------ .../@eventual/core/src/calls/activity-call.ts | 18 +-------- 4 files changed, 24 insertions(+), 67 deletions(-) diff --git a/apps/tests/aws-runtime/test/test-service.ts b/apps/tests/aws-runtime/test/test-service.ts index 1476a5d7a..ce0534a27 100644 --- a/apps/tests/aws-runtime/test/test-service.ts +++ b/apps/tests/aws-runtime/test/test-service.ts @@ -337,6 +337,10 @@ const sendFinishEvent = activity("sendFinish", async (executionId: string) => { }); }); +/** + * An event which is raised by the activityOverrideActivity. + * Can provide a value to the activity via activity token or defer the responsibility to the workflow via signal. + */ const activityOverrideEvent = event<{ executionId: string; token: string; @@ -344,6 +348,9 @@ const activityOverrideEvent = event<{ type: "complete" | "fail"; }>("activityOverrideEvent"); +/** + * An async activity which will be cancelled using it's activity token. + */ const activityOverrideActivity = activity( "eventPublish", async ({ @@ -367,6 +374,10 @@ const activityOverrideActivity = activity( } ); +/** + * A signal called by the activityOverrideEvent handler to pass the activity token to the workflow. + * Used to test activity completion from the workflow. + */ const activityOverrideSignal = new Signal<{ token: string; type: "complete" | "fail"; @@ -408,17 +419,20 @@ const activityOverrideAwareActivity = activity( } ); +/** + * Activity Override Tests. + * + * 1. cancel an activity from the workflow and record the result + * 2. Use the activity token to cancel, fail, or complete an async token from within an event handler or workflow + * 3. Cancel an activity and use a signal to signify that it knows it was cancelled and record the result. + */ export const overrideWorkflow = workflow( "override", async (_, { execution: { id: executionId } }) => { const act = slowActivity(); - const act2 = slowActivity(); - const act3 = slowActivity(); act.cancel("because"); - act2.fail(new Error("ahhh")); - act3.complete("hi!"); - const results1 = await Promise.allSettled([act, act2, act3]); + const results1 = await Promise.allSettled([act]); const signalHandler = activityOverrideSignal.on(async ({ token, type }) => { if (type === "complete") { diff --git a/apps/tests/aws-runtime/test/tester.test.ts b/apps/tests/aws-runtime/test/tester.test.ts index 02fcea5bc..2a074eb67 100644 --- a/apps/tests/aws-runtime/test/tester.test.ts +++ b/apps/tests/aws-runtime/test/tester.test.ts @@ -72,14 +72,7 @@ eventualRuntimeTestHarness(({ testCompletion }) => { testCompletion("event-driven", eventDrivenWorkflow, "done!"); testCompletion("overrideActivities", overrideWorkflow, [ - [ - { status: "rejected", reason: new ActivityCancelled("because").toJSON() }, - { - status: "rejected", - reason: new EventualError("Error", "ahhh").toJSON(), - }, - { status: "fulfilled", value: "hi!" }, - ], + [{ status: "rejected", reason: new ActivityCancelled("because").toJSON() }], [ { status: "fulfilled", value: "from the event handler!" }, { diff --git a/packages/@eventual/core/src/activity.ts b/packages/@eventual/core/src/activity.ts index 0d6e2ccbc..3a2002edd 100644 --- a/packages/@eventual/core/src/activity.ts +++ b/packages/@eventual/core/src/activity.ts @@ -37,7 +37,7 @@ export interface ActivityFunction< Output extends any = any > { (...args: Arguments): Promise>> & - ActivityExecutionReference>; + ActivityExecutionReference; /** * Complete an activity request by its {@link CompleteActivityRequest.activityToken}. @@ -207,7 +207,7 @@ export interface ActivityTokenTarget { activityToken: string; } -export interface ActivityExecutionReference { +export interface ActivityExecutionReference { /** * Cancel this activity. * @@ -217,22 +217,6 @@ export interface ActivityExecutionReference { * return to signal the workflow considers the activity finished. */ cancel: (reason: string) => Promise; - /** - * Causes the activity to reject with the provided value within the workflow. - * - * If the activity is calling {@link heartbeat}, closed: true will be - * return to signal the workflow considers the activity finished. - */ - fail: ( - ...args: [error: Error] | [error: string, message: string] - ) => Promise; - /** - * Causes the activity to resolve the provided value to the workflow. - * - * If the activity is calling {@link heartbeat}, closed: true will be - * return to signal the workflow considers the activity finished. - */ - complete: (result: T) => Promise; } /** @@ -296,26 +280,6 @@ export function failActivity( } } -/** - * Cancel any activity using it's activityToken. - * - * The activity will reject with a {@link ActivityCancelled} error. - * - * If the activity is calling {@link heartbeat}, closed: true will be - * return to signal the workflow considers the activity finished. - */ -export function cancelActivity( - activityToken: string, - reason: string -): Promise { - if (isOrchestratorWorker()) { - // not a real promise, do not await - return failActivity(activityToken, new ActivityCancelled(reason)) as any; - } else { - return failActivity(activityToken, new ActivityCancelled(reason)); - } -} - /** * Retrieve an activity function that has been registered in a workflow. */ diff --git a/packages/@eventual/core/src/calls/activity-call.ts b/packages/@eventual/core/src/calls/activity-call.ts index f96cd2dae..95f367c6e 100644 --- a/packages/@eventual/core/src/calls/activity-call.ts +++ b/packages/@eventual/core/src/calls/activity-call.ts @@ -3,7 +3,7 @@ import { ActivityTarget, ActivityTargetType, } from "../activity.js"; -import { ActivityCancelled, EventualError } from "../error.js"; +import { ActivityCancelled } from "../error.js"; import { EventualKind, isEventualOfKind, @@ -19,7 +19,7 @@ export function isActivityCall(a: any): a is ActivityCall { export interface ActivityCall extends CommandCallBase | Failed>, - ActivityExecutionReference { + ActivityExecutionReference { name: string; args: any[]; heartbeatSeconds?: number; @@ -41,20 +41,6 @@ export function createActivityCall( } as ActivityCall) ); - call.complete = function (result) { - return createOverrideActivityCall( - { type: ActivityTargetType.OwnActivity, seq: this.seq! }, - Result.resolved(result) - ) as unknown as Promise; - }; - call.fail = function (...args) { - return createOverrideActivityCall( - { type: ActivityTargetType.OwnActivity, seq: this.seq! }, - Result.failed( - args.length === 1 ? args[0] : new EventualError(args[0], args[1]) - ) - ) as unknown as Promise; - }; call.cancel = function (reason) { return createOverrideActivityCall( { type: ActivityTargetType.OwnActivity, seq: this.seq! },