Skip to content

Commit

Permalink
Fix invalid state transition on attempt to cancel (#817)
Browse files Browse the repository at this point in the history
* Fix invalid state transition on attempt to cancel

Client channel implementation of close call registered cancel
handlers by iterating over all blocked receivers.  In the case
of Cancel this results in blocked receiver collection to be
modified during iteration which causes some handlers to be
called multiple times.
This fix clones the blocked receivers and uses the cloned copy
for iteration.  This guarantees that all callbacks are called
once irrespective of any modifications to the list of blocked
receivers during the iteration.

Also added new API to allow partial replay of history from
json file upto any specificed event ID.

* move ActivityCancelRepro test workflow to the end
  • Loading branch information
samarabbas authored and vancexu committed Aug 29, 2019
1 parent 1e3023d commit eb10667
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 4 deletions.
4 changes: 3 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,9 @@ func (c *channelImpl) sendAsyncImpl(v interface{}, pair *sendCallback) (ok bool)

func (c *channelImpl) Close() {
c.closed = true
for _, callback := range c.blockedReceives {
// Use a copy of blockedReceives for iteration as invoking callback could result in modification
copy := append(c.blockedReceives[:0:0], c.blockedReceives...)
for _, callback := range copy {
callback.fn(nil, false)
}
// All blocked sends are going to panic
Expand Down
29 changes: 26 additions & 3 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,16 @@ func ReplayWorkflowHistory(logger *zap.Logger, history *shared.History) error {
// The logger is an optional parameter. Defaults to the noop logger.
func ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string) error {

history, err := extractHistoryFromFile(jsonfileName)
return ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, 0)
}

// ReplayWorkflowHistoryFromJSONFile executes a single decision task for the given json history file upto provided
// lastEventID(inclusive).
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
// The logger is an optional parameter. Defaults to the noop logger.
func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error {

history, err := extractHistoryFromFile(jsonfileName, lastEventID)

if err != nil {
return err
Expand Down Expand Up @@ -409,7 +418,7 @@ func replayWorkflowHistory(logger *zap.Logger, service workflowserviceclient.Int
return err
}

func extractHistoryFromFile(jsonfileName string) (*shared.History, error) {
func extractHistoryFromFile(jsonfileName string, lastEventID int64) (*shared.History, error) {
raw, err := ioutil.ReadFile(jsonfileName)
if err != nil {
return nil, err
Expand All @@ -421,7 +430,21 @@ func extractHistoryFromFile(jsonfileName string) (*shared.History, error) {
if err != nil {
return nil, err
}
history := &shared.History{Events: deserializedEvents}

if lastEventID <= 0 {
return &shared.History{Events: deserializedEvents}, nil
}

// Caller is potentially asking for subset of history instead of all history events
events := []*shared.HistoryEvent{}
for _, event := range deserializedEvents {
events = append(events, event)
if event.GetEventId() == lastEventID {
// Copy history upto last event (inclusive)
break
}
}
history := &shared.History{Events: events}

return history, nil
}
1 change: 1 addition & 0 deletions test/fixtures/activity.cancel.sm.repro.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"eventId":1,"timestamp":1563844217060613000,"eventType":"WorkflowExecutionStarted","version":-24,"workflowExecutionStartedEventAttributes":{"workflowType":{"name":"go.uber.org/cadence/test.(*Workflows).ActivityCancelRepro-fm"},"taskList":{"name":"tl-1"},"executionStartToCloseTimeoutSeconds":10,"taskStartToCloseTimeoutSeconds":1,"identity":"97228@samar-C02XG22GJGH6@"}},{"eventId":2,"timestamp":1563844217060620000,"eventType":"DecisionTaskScheduled","version":-24,"decisionTaskScheduledEventAttributes":{"taskList":{"name":"tl-1"},"startToCloseTimeoutSeconds":1,"attempt":0}},{"eventId":3,"timestamp":1563844217066914000,"eventType":"DecisionTaskStarted","version":-24,"decisionTaskStartedEventAttributes":{"scheduledEventId":2,"identity":"97228@samar-C02XG22GJGH6@tl-1","requestId":"9c612c81-6cd9-402d-866f-e5652e9c4823"}},{"eventId":4,"timestamp":1563844217073526000,"eventType":"DecisionTaskCompleted","version":-24,"decisionTaskCompletedEventAttributes":{"scheduledEventId":2,"startedEventId":3,"identity":"97228@samar-C02XG22GJGH6@tl-1"}},{"eventId":5,"timestamp":1563844217073598000,"eventType":"TimerStarted","version":-24,"timerStartedEventAttributes":{"timerId":"0","startToFireTimeoutSeconds":10,"decisionTaskCompletedEventId":4}},{"eventId":6,"timestamp":1563844217073620000,"eventType":"ActivityTaskScheduled","version":-24,"activityTaskScheduledEventAttributes":{"activityId":"1","activityType":{"name":"toUpperWithDelay"},"taskList":{"name":"tl-1"},"input":"ImhlbGxvIgo1MDAwMDAwMDAwCg==","scheduleToCloseTimeoutSeconds":10,"scheduleToStartTimeoutSeconds":10,"startToCloseTimeoutSeconds":9,"heartbeatTimeoutSeconds":0,"decisionTaskCompletedEventId":4}},{"eventId":7,"timestamp":1563844217073670000,"eventType":"ActivityTaskScheduled","version":-24,"activityTaskScheduledEventAttributes":{"activityId":"2","activityType":{"name":"toUpper"},"taskList":{"name":"bad_tl"},"input":"ImhlbGxvIgo=","scheduleToCloseTimeoutSeconds":10,"scheduleToStartTimeoutSeconds":10,"startToCloseTimeoutSeconds":9,"heartbeatTimeoutSeconds":0,"decisionTaskCompletedEventId":4}},{"eventId":8,"timestamp":1563844217073679000,"eventType":"ActivityTaskScheduled","version":-24,"activityTaskScheduledEventAttributes":{"activityId":"3","activityType":{"name":"toUpper"},"taskList":{"name":"bad_tl"},"input":"ImhlbGxvIgo=","scheduleToCloseTimeoutSeconds":10,"scheduleToStartTimeoutSeconds":10,"startToCloseTimeoutSeconds":9,"heartbeatTimeoutSeconds":0,"decisionTaskCompletedEventId":4}},{"eventId":9,"timestamp":1563844217080804000,"eventType":"ActivityTaskStarted","version":-24,"activityTaskStartedEventAttributes":{"scheduledEventId":6,"identity":"97228@samar-C02XG22GJGH6@tl-1","requestId":"8b1ab5fd-5f15-4867-af33-97a7b00da341","attempt":0}},{"eventId":10,"timestamp":1563844222089088000,"eventType":"ActivityTaskCompleted","version":-24,"activityTaskCompletedEventAttributes":{"result":"IkhFTExPIgo=","scheduledEventId":6,"startedEventId":9,"identity":"97228@samar-C02XG22GJGH6@tl-1"}},{"eventId":11,"timestamp":1563844222089104000,"eventType":"DecisionTaskScheduled","version":-24,"decisionTaskScheduledEventAttributes":{"taskList":{"name":"tl-1"},"startToCloseTimeoutSeconds":1,"attempt":0}},{"eventId":12,"timestamp":1563844222096052000,"eventType":"DecisionTaskStarted","version":-24,"decisionTaskStartedEventAttributes":{"scheduledEventId":11,"identity":"97228@samar-C02XG22GJGH6@tl-1","requestId":"89f09b7a-2f34-497f-b3c4-99ede5efaf30"}},{"eventId":13,"timestamp":1563844222102892000,"eventType":"DecisionTaskFailed","version":-24,"decisionTaskFailedEventAttributes":{"scheduledEventId":11,"startedEventId":12,"cause":"WORKFLOW_WORKER_UNHANDLED_FAILURE","details":"aW52YWxpZCBzdGF0ZSB0cmFuc2l0aW9uOiBhdHRlbXB0IHRvIGNhbmNlbCwgRGVjaXNpb25UeXBlOiBBY3Rpdml0eSwgSUQ6IDMsIHN0YXRlPUNhbmNlbGVkQWZ0ZXJJbml0aWF0ZWQsIGlzRG9uZSgpPWZhbHNlLCBoaXN0b3J5PVtDcmVhdGVkIGhhbmRsZURlY2lzaW9uU2VudCBEZWNpc2lvblNlbnQgaGFuZGxlSW5pdGlhdGVkRXZlbnQgSW5pdGlhdGVkIGNhbmNlbCBDYW5jZWxlZEFmdGVySW5pdGlhdGVkXQ==","identity":"97228@samar-C02XG22GJGH6@tl-1"}},{"eventId":14,"timestamp":1563844227061245000,"eventType":"WorkflowExecutionTimedOut","version":-24,"workflowExecutionTimedOutEventAttributes":{"timeoutType":"START_TO_CLOSE"}}]
13 changes: 13 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,19 @@ func (ts *IntegrationTestSuite) TestChildWFWithMemoAndSearchAttributes() {
ts.Equal("memoVal, searchAttrVal", result)
}

func (ts *IntegrationTestSuite) TestActivityCancelUsingReplay() {
logger, err := zap.NewDevelopment()
err = worker.ReplayPartialWorkflowHistoryFromJSONFile(logger, "fixtures/activity.cancel.sm.repro.json", 12)
ts.Nil(err)
}

func (ts *IntegrationTestSuite) TestActivityCancelRepro() {
var expected []string
err := ts.executeWorkflow("test-activity-cancel-sm", ts.workflows.ActivityCancelRepro, &expected)
ts.Nil(err)
ts.EqualValues(expected, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) registerDomain() {
client := client.NewDomainClient(ts.rpcClient.Interface, &client.Options{})
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
Expand Down
64 changes: 64 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,69 @@ func (w *Workflows) ChildWorkflowSuccess(ctx workflow.Context) (result string, e
return
}

func (w *Workflows) ActivityCancelRepro(ctx workflow.Context) ([]string, error) {
ctx, cancelFunc := workflow.WithCancel(ctx)

// First go-routine which triggers cancellation on completion of first activity
workflow.Go(ctx, func(ctx1 workflow.Context) {
activityCtx := workflow.WithActivityOptions(ctx1, workflow.ActivityOptions{
ScheduleToStartTimeout: 10 * time.Second,
ScheduleToCloseTimeout: 10 * time.Second,
StartToCloseTimeout: 9 * time.Second,
})

activityF := workflow.ExecuteActivity(activityCtx, "toUpperWithDelay", "hello", 1 * time.Second)
var ans string
err := activityF.Get(activityCtx, &ans)
if err != nil {
workflow.GetLogger(activityCtx).Sugar().Infof("Activity Failed: Err: %v", err)
return
}

// Trigger cancellation of root context
cancelFunc()
})

// Second go-routine which get blocked on ActivitySchedule and not started
workflow.Go(ctx, func(ctx1 workflow.Context) {
activityCtx := workflow.WithActivityOptions(ctx1, workflow.ActivityOptions{
ScheduleToStartTimeout: 10 * time.Second,
ScheduleToCloseTimeout: 10 * time.Second,
StartToCloseTimeout: 1 * time.Second,
TaskList: "bad_tl",
})

activityF := workflow.ExecuteActivity(activityCtx, "toUpper", "hello")
var ans string
err := activityF.Get(activityCtx, &ans)
if err != nil {
workflow.GetLogger(activityCtx).Sugar().Infof("Activity Failed: Err: %v", err)
}
})

// Third go-routine which get blocked on ActivitySchedule and not started
workflow.Go(ctx, func(ctx1 workflow.Context) {
activityCtx := workflow.WithActivityOptions(ctx1, workflow.ActivityOptions{
ScheduleToStartTimeout: 10 * time.Second,
ScheduleToCloseTimeout: 10 * time.Second,
StartToCloseTimeout: 1 * time.Second,
TaskList: "bad_tl",
})

activityF := workflow.ExecuteActivity(activityCtx, "toUpper", "hello")
var ans string
err := activityF.Get(activityCtx, &ans)
if err != nil {
workflow.GetLogger(activityCtx).Sugar().Infof("Activity Failed: Err: %v", err)
}
})

// Cause the workflow to block on sleep
workflow.Sleep(ctx, 10 * time.Second)

return []string{"toUpperWithDelay"}, nil
}

func (w *Workflows) child(ctx workflow.Context, arg string, mustFail bool) (string, error) {
var result string
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
Expand Down Expand Up @@ -339,6 +402,7 @@ func (w *Workflows) register() {
workflow.Register(w.sleep)
workflow.Register(w.child)
workflow.Register(w.childForMemoAndSearchAttr)
workflow.Register(w.ActivityCancelRepro)
}

func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions {
Expand Down
10 changes: 10 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ func ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string)
return internal.ReplayWorkflowHistoryFromJSONFile(logger, jsonfileName)
}

// ReplayPartialWorkflowHistoryFromJSONFile executes a single decision task for the json history file upto provided
//// lastEventID(inclusive), downloaded from the cli.
// To download the history file: cadence workflow showid <workflow_id> -of <output_filename>
// See https://github.com/uber/cadence/blob/master/tools/cli/README.md for full documentation
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
// The logger is an optional parameter. Defaults to the noop logger.
func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error {
return internal.ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, lastEventID)
}

// ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it.
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
// The logger is the only optional parameter. Defaults to the noop logger.
Expand Down

0 comments on commit eb10667

Please sign in to comment.