diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 0b6b553f7..e11cd0960 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -715,7 +715,9 @@ func (c *channelImpl) sendAsyncImpl(v interface{}, pair *sendCallback) (ok bool) func (c *channelImpl) Close() { c.closed = true - for _, callback := range c.blockedReceives { + // Use a copy of blockedReceives for iteration as invoking callback could result in modification + copy := append(c.blockedReceives[:0:0], c.blockedReceives...) + for _, callback := range copy { callback.fn(nil, false) } // All blocked sends are going to panic diff --git a/internal/worker.go b/internal/worker.go index d6aeee545..c4c69c674 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -300,7 +300,16 @@ func ReplayWorkflowHistory(logger *zap.Logger, history *shared.History) error { // The logger is an optional parameter. Defaults to the noop logger. func ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string) error { - history, err := extractHistoryFromFile(jsonfileName) + return ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, 0) +} + +// ReplayWorkflowHistoryFromJSONFile executes a single decision task for the given json history file upto provided +// lastEventID(inclusive). +// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. +// The logger is an optional parameter. Defaults to the noop logger. +func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error { + + history, err := extractHistoryFromFile(jsonfileName, lastEventID) if err != nil { return err @@ -409,7 +418,7 @@ func replayWorkflowHistory(logger *zap.Logger, service workflowserviceclient.Int return err } -func extractHistoryFromFile(jsonfileName string) (*shared.History, error) { +func extractHistoryFromFile(jsonfileName string, lastEventID int64) (*shared.History, error) { raw, err := ioutil.ReadFile(jsonfileName) if err != nil { return nil, err @@ -421,7 +430,21 @@ func extractHistoryFromFile(jsonfileName string) (*shared.History, error) { if err != nil { return nil, err } - history := &shared.History{Events: deserializedEvents} + + if lastEventID <= 0 { + return &shared.History{Events: deserializedEvents}, nil + } + + // Caller is potentially asking for subset of history instead of all history events + events := []*shared.HistoryEvent{} + for _, event := range deserializedEvents { + events = append(events, event) + if event.GetEventId() == lastEventID { + // Copy history upto last event (inclusive) + break + } + } + history := &shared.History{Events: events} return history, nil } diff --git a/test/fixtures/activity.cancel.sm.repro.json b/test/fixtures/activity.cancel.sm.repro.json new file mode 100755 index 000000000..a4d3e5ba8 --- /dev/null +++ b/test/fixtures/activity.cancel.sm.repro.json @@ -0,0 +1 @@ +[{"eventId":1,"timestamp":1563844217060613000,"eventType":"WorkflowExecutionStarted","version":-24,"workflowExecutionStartedEventAttributes":{"workflowType":{"name":"go.uber.org/cadence/test.(*Workflows).ActivityCancelRepro-fm"},"taskList":{"name":"tl-1"},"executionStartToCloseTimeoutSeconds":10,"taskStartToCloseTimeoutSeconds":1,"identity":"97228@samar-C02XG22GJGH6@"}},{"eventId":2,"timestamp":1563844217060620000,"eventType":"DecisionTaskScheduled","version":-24,"decisionTaskScheduledEventAttributes":{"taskList":{"name":"tl-1"},"startToCloseTimeoutSeconds":1,"attempt":0}},{"eventId":3,"timestamp":1563844217066914000,"eventType":"DecisionTaskStarted","version":-24,"decisionTaskStartedEventAttributes":{"scheduledEventId":2,"identity":"97228@samar-C02XG22GJGH6@tl-1","requestId":"9c612c81-6cd9-402d-866f-e5652e9c4823"}},{"eventId":4,"timestamp":1563844217073526000,"eventType":"DecisionTaskCompleted","version":-24,"decisionTaskCompletedEventAttributes":{"scheduledEventId":2,"startedEventId":3,"identity":"97228@samar-C02XG22GJGH6@tl-1"}},{"eventId":5,"timestamp":1563844217073598000,"eventType":"TimerStarted","version":-24,"timerStartedEventAttributes":{"timerId":"0","startToFireTimeoutSeconds":10,"decisionTaskCompletedEventId":4}},{"eventId":6,"timestamp":1563844217073620000,"eventType":"ActivityTaskScheduled","version":-24,"activityTaskScheduledEventAttributes":{"activityId":"1","activityType":{"name":"toUpperWithDelay"},"taskList":{"name":"tl-1"},"input":"ImhlbGxvIgo1MDAwMDAwMDAwCg==","scheduleToCloseTimeoutSeconds":10,"scheduleToStartTimeoutSeconds":10,"startToCloseTimeoutSeconds":9,"heartbeatTimeoutSeconds":0,"decisionTaskCompletedEventId":4}},{"eventId":7,"timestamp":1563844217073670000,"eventType":"ActivityTaskScheduled","version":-24,"activityTaskScheduledEventAttributes":{"activityId":"2","activityType":{"name":"toUpper"},"taskList":{"name":"bad_tl"},"input":"ImhlbGxvIgo=","scheduleToCloseTimeoutSeconds":10,"scheduleToStartTimeoutSeconds":10,"startToCloseTimeoutSeconds":9,"heartbeatTimeoutSeconds":0,"decisionTaskCompletedEventId":4}},{"eventId":8,"timestamp":1563844217073679000,"eventType":"ActivityTaskScheduled","version":-24,"activityTaskScheduledEventAttributes":{"activityId":"3","activityType":{"name":"toUpper"},"taskList":{"name":"bad_tl"},"input":"ImhlbGxvIgo=","scheduleToCloseTimeoutSeconds":10,"scheduleToStartTimeoutSeconds":10,"startToCloseTimeoutSeconds":9,"heartbeatTimeoutSeconds":0,"decisionTaskCompletedEventId":4}},{"eventId":9,"timestamp":1563844217080804000,"eventType":"ActivityTaskStarted","version":-24,"activityTaskStartedEventAttributes":{"scheduledEventId":6,"identity":"97228@samar-C02XG22GJGH6@tl-1","requestId":"8b1ab5fd-5f15-4867-af33-97a7b00da341","attempt":0}},{"eventId":10,"timestamp":1563844222089088000,"eventType":"ActivityTaskCompleted","version":-24,"activityTaskCompletedEventAttributes":{"result":"IkhFTExPIgo=","scheduledEventId":6,"startedEventId":9,"identity":"97228@samar-C02XG22GJGH6@tl-1"}},{"eventId":11,"timestamp":1563844222089104000,"eventType":"DecisionTaskScheduled","version":-24,"decisionTaskScheduledEventAttributes":{"taskList":{"name":"tl-1"},"startToCloseTimeoutSeconds":1,"attempt":0}},{"eventId":12,"timestamp":1563844222096052000,"eventType":"DecisionTaskStarted","version":-24,"decisionTaskStartedEventAttributes":{"scheduledEventId":11,"identity":"97228@samar-C02XG22GJGH6@tl-1","requestId":"89f09b7a-2f34-497f-b3c4-99ede5efaf30"}},{"eventId":13,"timestamp":1563844222102892000,"eventType":"DecisionTaskFailed","version":-24,"decisionTaskFailedEventAttributes":{"scheduledEventId":11,"startedEventId":12,"cause":"WORKFLOW_WORKER_UNHANDLED_FAILURE","details":"aW52YWxpZCBzdGF0ZSB0cmFuc2l0aW9uOiBhdHRlbXB0IHRvIGNhbmNlbCwgRGVjaXNpb25UeXBlOiBBY3Rpdml0eSwgSUQ6IDMsIHN0YXRlPUNhbmNlbGVkQWZ0ZXJJbml0aWF0ZWQsIGlzRG9uZSgpPWZhbHNlLCBoaXN0b3J5PVtDcmVhdGVkIGhhbmRsZURlY2lzaW9uU2VudCBEZWNpc2lvblNlbnQgaGFuZGxlSW5pdGlhdGVkRXZlbnQgSW5pdGlhdGVkIGNhbmNlbCBDYW5jZWxlZEFmdGVySW5pdGlhdGVkXQ==","identity":"97228@samar-C02XG22GJGH6@tl-1"}},{"eventId":14,"timestamp":1563844227061245000,"eventType":"WorkflowExecutionTimedOut","version":-24,"workflowExecutionTimedOutEventAttributes":{"timeoutType":"START_TO_CLOSE"}}] \ No newline at end of file diff --git a/test/integration_test.go b/test/integration_test.go index b472306d5..c962dab51 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -274,6 +274,19 @@ func (ts *IntegrationTestSuite) TestChildWFWithMemoAndSearchAttributes() { ts.Equal("memoVal, searchAttrVal", result) } +func (ts *IntegrationTestSuite) TestActivityCancelUsingReplay() { + logger, err := zap.NewDevelopment() + err = worker.ReplayPartialWorkflowHistoryFromJSONFile(logger, "fixtures/activity.cancel.sm.repro.json", 12) + ts.Nil(err) +} + +func (ts *IntegrationTestSuite) TestActivityCancelRepro() { + var expected []string + err := ts.executeWorkflow("test-activity-cancel-sm", ts.workflows.ActivityCancelRepro, &expected) + ts.Nil(err) + ts.EqualValues(expected, ts.activities.invoked()) +} + func (ts *IntegrationTestSuite) registerDomain() { client := client.NewDomainClient(ts.rpcClient.Interface, &client.Options{}) ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) diff --git a/test/workflow_test.go b/test/workflow_test.go index b9fcd7647..db1351a45 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -293,6 +293,69 @@ func (w *Workflows) ChildWorkflowSuccess(ctx workflow.Context) (result string, e return } +func (w *Workflows) ActivityCancelRepro(ctx workflow.Context) ([]string, error) { + ctx, cancelFunc := workflow.WithCancel(ctx) + + // First go-routine which triggers cancellation on completion of first activity + workflow.Go(ctx, func(ctx1 workflow.Context) { + activityCtx := workflow.WithActivityOptions(ctx1, workflow.ActivityOptions{ + ScheduleToStartTimeout: 10 * time.Second, + ScheduleToCloseTimeout: 10 * time.Second, + StartToCloseTimeout: 9 * time.Second, + }) + + activityF := workflow.ExecuteActivity(activityCtx, "toUpperWithDelay", "hello", 1 * time.Second) + var ans string + err := activityF.Get(activityCtx, &ans) + if err != nil { + workflow.GetLogger(activityCtx).Sugar().Infof("Activity Failed: Err: %v", err) + return + } + + // Trigger cancellation of root context + cancelFunc() + }) + + // Second go-routine which get blocked on ActivitySchedule and not started + workflow.Go(ctx, func(ctx1 workflow.Context) { + activityCtx := workflow.WithActivityOptions(ctx1, workflow.ActivityOptions{ + ScheduleToStartTimeout: 10 * time.Second, + ScheduleToCloseTimeout: 10 * time.Second, + StartToCloseTimeout: 1 * time.Second, + TaskList: "bad_tl", + }) + + activityF := workflow.ExecuteActivity(activityCtx, "toUpper", "hello") + var ans string + err := activityF.Get(activityCtx, &ans) + if err != nil { + workflow.GetLogger(activityCtx).Sugar().Infof("Activity Failed: Err: %v", err) + } + }) + + // Third go-routine which get blocked on ActivitySchedule and not started + workflow.Go(ctx, func(ctx1 workflow.Context) { + activityCtx := workflow.WithActivityOptions(ctx1, workflow.ActivityOptions{ + ScheduleToStartTimeout: 10 * time.Second, + ScheduleToCloseTimeout: 10 * time.Second, + StartToCloseTimeout: 1 * time.Second, + TaskList: "bad_tl", + }) + + activityF := workflow.ExecuteActivity(activityCtx, "toUpper", "hello") + var ans string + err := activityF.Get(activityCtx, &ans) + if err != nil { + workflow.GetLogger(activityCtx).Sugar().Infof("Activity Failed: Err: %v", err) + } + }) + + // Cause the workflow to block on sleep + workflow.Sleep(ctx, 10 * time.Second) + + return []string{"toUpperWithDelay"}, nil +} + func (w *Workflows) child(ctx workflow.Context, arg string, mustFail bool) (string, error) { var result string ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) @@ -339,6 +402,7 @@ func (w *Workflows) register() { workflow.Register(w.sleep) workflow.Register(w.child) workflow.Register(w.childForMemoAndSearchAttr) + workflow.Register(w.ActivityCancelRepro) } func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions { diff --git a/worker/worker.go b/worker/worker.go index 6d3e84235..675f00e28 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -97,6 +97,16 @@ func ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string) return internal.ReplayWorkflowHistoryFromJSONFile(logger, jsonfileName) } +// ReplayPartialWorkflowHistoryFromJSONFile executes a single decision task for the json history file upto provided +//// lastEventID(inclusive), downloaded from the cli. +// To download the history file: cadence workflow showid -of +// See https://github.com/uber/cadence/blob/master/tools/cli/README.md for full documentation +// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. +// The logger is an optional parameter. Defaults to the noop logger. +func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error { + return internal.ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, lastEventID) +} + // ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it. // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. // The logger is the only optional parameter. Defaults to the noop logger.