diff --git a/tests/reset_workflow_test.go b/tests/reset_workflow_test.go index ea0ed8a678b..76e882f72c6 100644 --- a/tests/reset_workflow_test.go +++ b/tests/reset_workflow_test.go @@ -540,7 +540,7 @@ func (t *resetTest) reset(eventId int64) string { func (t *resetTest) run() { t.totalSignals = 2 t.totalUpdates = 2 - t.tv = t.WorkflowUpdateBaseSuite.startWorkflow(t.tv) + t.WorkflowUpdateBaseSuite.startWorkflow(t.tv) poller := &testcore.TaskPoller{ Client: t.FrontendClient(), @@ -686,8 +686,8 @@ func (s *ResetWorkflowTestSuite) testResetWorkflowSignalReapplyBuffer( - depending on the reapply type, the buffered signal is applied post-reset or not */ - tv = s.startWorkflow(tv) - s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(tv.RunID())) + runID := s.startWorkflow(tv) + s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(runID)) var resetRunID string wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -700,31 +700,23 @@ func (s *ResetWorkflowTestSuite) testResetWorkflowSignalReapplyBuffer( // (1) send Signal _, err := s.FrontendClient().SignalWorkflowExecution(testcore.NewContext(), &workflowservice.SignalWorkflowExecutionRequest{ - RequestId: uuid.New(), - Namespace: s.Namespace().String(), - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: tv.WorkflowID(), - RunId: tv.RunID(), - }, - SignalName: "random signal name", - Input: &commonpb.Payloads{Payloads: []*commonpb.Payload{{ - Data: []byte("random data"), - }}}, - Identity: tv.WorkerIdentity(), + RequestId: tv.Any().String(), + Namespace: s.Namespace().String(), + WorkflowExecution: tv.WorkflowExecution(), + SignalName: tv.Any().String(), + Input: tv.Any().Payloads(), + Identity: tv.WorkerIdentity(), }) s.NoError(err) // (2) send Reset resp, err := s.FrontendClient().ResetWorkflowExecution(testcore.NewContext(), &workflowservice.ResetWorkflowExecutionRequest{ - Namespace: s.Namespace().String(), - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: tv.WorkflowID(), - RunId: tv.RunID(), - }, + Namespace: s.Namespace().String(), + WorkflowExecution: tv.WorkflowExecution(), Reason: "reset execution from test", WorkflowTaskFinishEventId: 3, - RequestId: uuid.New(), + RequestId: tv.Any().String(), ResetReapplyType: reapplyType, }) s.NoError(err) diff --git a/tests/update_workflow_suite_base.go b/tests/update_workflow_suite_base.go index c55cd7ace9e..2e73fa1432b 100644 --- a/tests/update_workflow_suite_base.go +++ b/tests/update_workflow_suite_base.go @@ -122,12 +122,12 @@ func (s *WorkflowUpdateBaseSuite) waitUpdateAdmitted(tv *testvars.TestVars) { }, 5*time.Second, 10*time.Millisecond, "update %s did not reach Admitted stage", tv.UpdateID()) } -func (s *WorkflowUpdateBaseSuite) startWorkflow(tv *testvars.TestVars) *testvars.TestVars { +func (s *WorkflowUpdateBaseSuite) startWorkflow(tv *testvars.TestVars) string { s.T().Helper() startResp, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), s.startWorkflowRequest(tv)) s.NoError(err) - return tv.WithRunID(startResp.GetRunId()) + return startResp.GetRunId() } func (s *WorkflowUpdateBaseSuite) startWorkflowRequest(tv *testvars.TestVars) *workflowservice.StartWorkflowExecutionRequest { diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index 436f5f3f1df..257f7202c35 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -122,16 +122,16 @@ func (s *UpdateWorkflowSuite) speculativeWorkflowTaskOutcomes( func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_AcceptComplete() { testCases := []struct { - Name string - UseRunID bool + Name string + ApplyRunID func(tv *testvars.TestVars, runID string) *testvars.TestVars }{ { - Name: "with RunID", - UseRunID: true, + Name: "with RunID", + ApplyRunID: func(tv *testvars.TestVars, runID string) *testvars.TestVars { return tv.WithRunID(runID) }, }, { - Name: "without RunID", - UseRunID: false, + Name: "without RunID", + ApplyRunID: func(tv *testvars.TestVars, _ string) *testvars.TestVars { return tv }, }, } @@ -139,12 +139,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Ac s.Run(tc.Name, func() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) - runID := tv.RunID() - if !tc.UseRunID { - // Clear RunID in tv to test code paths when APIs have to fetch current RunID themselves. - tv = tv.WithRunID("") - } + runID := s.startWorkflow(tv) capture := s.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() defer s.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) @@ -207,7 +202,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Ac _, err := poller.PollAndProcessWorkflowTask() s.NoError(err) - updateResultCh := s.sendUpdateNoError(tv) + updateResultCh := s.sendUpdateNoError(tc.ApplyRunID(tv, runID)) // Process update in workflow. res, err := poller.PollAndProcessWorkflowTask(testcore.WithoutRetries) @@ -234,7 +229,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Ac s.Equal(1, commits) s.Equal(0, rollbacks) - events := s.GetHistory(s.Namespace().String(), tv.WorkflowExecution()) + events := s.GetHistory(s.Namespace().String(), tc.ApplyRunID(tv, runID).WorkflowExecution()) s.EqualHistoryEvents(` 1 WorkflowExecutionStarted @@ -253,16 +248,16 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Ac func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NotEmptySpeculativeWorkflowTask_AcceptComplete() { testCases := []struct { - Name string - UseRunID bool + Name string + ApplyRunID func(tv *testvars.TestVars, runID string) *testvars.TestVars }{ { - Name: "with RunID", - UseRunID: true, + Name: "with RunID", + ApplyRunID: func(tv *testvars.TestVars, runID string) *testvars.TestVars { return tv.WithRunID(runID) }, }, { - Name: "without RunID", - UseRunID: false, + Name: "without RunID", + ApplyRunID: func(tv *testvars.TestVars, _ string) *testvars.TestVars { return tv }, }, } @@ -270,10 +265,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NotEmptySpeculativeWorkflowTask s.Run(tc.Name, func() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) - if !tc.UseRunID { - tv = tv.WithRunID("") - } + runID := s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -343,7 +335,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NotEmptySpeculativeWorkflowTask _, err := poller.PollAndProcessWorkflowTask() s.NoError(err) - updateResultCh := s.sendUpdateNoError(tv) + updateResultCh := s.sendUpdateNoError(tc.ApplyRunID(tv, runID)) // Process update in workflow. res, err := poller.PollAndProcessWorkflowTask(testcore.WithoutRetries) @@ -356,7 +348,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NotEmptySpeculativeWorkflowTask s.Equal(2, wtHandlerCalls) s.Equal(2, msgHandlerCalls) - events := s.GetHistory(s.Namespace().String(), tv.WorkflowExecution()) + events := s.GetHistory(s.Namespace().String(), tc.ApplyRunID(tv, runID).WorkflowExecution()) s.EqualHistoryEvents(` 1 WorkflowExecutionStarted @@ -377,16 +369,16 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NotEmptySpeculativeWorkflowTask func (s *UpdateWorkflowSuite) TestUpdateWorkflow_FirstNormalScheduledWorkflowTask_AcceptComplete() { testCases := []struct { - Name string - UseRunID bool + Name string + ApplyRunID func(tv *testvars.TestVars, runID string) *testvars.TestVars }{ { - Name: "with RunID", - UseRunID: true, + Name: "with RunID", + ApplyRunID: func(tv *testvars.TestVars, runID string) *testvars.TestVars { return tv.WithRunID(runID) }, }, { - Name: "without RunID", - UseRunID: false, + Name: "without RunID", + ApplyRunID: func(tv *testvars.TestVars, _ string) *testvars.TestVars { return tv }, }, } @@ -394,10 +386,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_FirstNormalScheduledWorkflowTas s.Run(tc.Name, func() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) - if !tc.UseRunID { - tv = tv.WithRunID("") - } + runID := s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -446,7 +435,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_FirstNormalScheduledWorkflowTas T: s.T(), } - updateResultCh := s.sendUpdateNoError(tv) + updateResultCh := s.sendUpdateNoError(tc.ApplyRunID(tv, runID)) // Process update in workflow. res, err := poller.PollAndProcessWorkflowTask(testcore.WithoutRetries) @@ -460,7 +449,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_FirstNormalScheduledWorkflowTas s.Equal(1, wtHandlerCalls) s.Equal(1, msgHandlerCalls) - events := s.GetHistory(s.Namespace().String(), tv.WorkflowExecution()) + events := s.GetHistory(s.Namespace().String(), tc.ApplyRunID(tv, runID).WorkflowExecution()) s.EqualHistoryEvents(` 1 WorkflowExecutionStarted @@ -477,16 +466,16 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_FirstNormalScheduledWorkflowTas func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NormalScheduledWorkflowTask_AcceptComplete() { testCases := []struct { - Name string - UseRunID bool + Name string + ApplyRunID func(tv *testvars.TestVars, runID string) *testvars.TestVars }{ { - Name: "with RunID", - UseRunID: true, + Name: "with RunID", + ApplyRunID: func(tv *testvars.TestVars, runID string) *testvars.TestVars { return tv.WithRunID(runID) }, }, { - Name: "without RunID", - UseRunID: false, + Name: "without RunID", + ApplyRunID: func(tv *testvars.TestVars, _ string) *testvars.TestVars { return tv }, }, } @@ -494,10 +483,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NormalScheduledWorkflowTask_Acc s.Run(tc.Name, func() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) - if !tc.UseRunID { - tv = tv.WithRunID("") - } + runID := s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -563,7 +549,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NormalScheduledWorkflowTask_Acc err = s.SendSignal(s.Namespace().String(), tv.WorkflowExecution(), tv.Any().String(), tv.Any().Payloads(), tv.Any().String()) s.NoError(err) - updateResultCh := s.sendUpdateNoError(tv) + updateResultCh := s.sendUpdateNoError(tc.ApplyRunID(tv, runID)) // Process update in workflow. It will be attached to existing WT. res, err := poller.PollAndProcessWorkflowTask(testcore.WithoutRetries) @@ -577,7 +563,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NormalScheduledWorkflowTask_Acc s.Equal(2, wtHandlerCalls) s.Equal(2, msgHandlerCalls) - events := s.GetHistory(s.Namespace().String(), tv.WorkflowExecution()) + events := s.GetHistory(s.Namespace().String(), tc.ApplyRunID(tv, runID).WorkflowExecution()) s.EqualHistoryEvents(` 1 WorkflowExecutionStarted @@ -598,7 +584,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NormalScheduledWorkflowTask_Acc func (s *UpdateWorkflowSuite) TestUpdateWorkflow_RunningWorkflowTask_NewEmptySpeculativeWorkflowTask_Rejected() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) capture := s.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() defer s.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) @@ -718,7 +704,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_RunningWorkflowTask_NewNotEmpty tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) var updateResultCh <-chan *workflowservice.UpdateWorkflowExecutionResponse @@ -857,7 +843,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_RunningWorkflowTask_NewNotEmpty func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompletedWorkflow() { s.Run("receive outcome from completed Update", func() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -927,7 +913,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompletedWorkflow() { s.Run("receive update failure from accepted Update", func() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -1303,7 +1289,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_ValidateWorkerMessages() { s.Run(tc.Name, func() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { if tc.CommandFn == nil { @@ -1356,16 +1342,16 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_ValidateWorkerMessages() { func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_AcceptComplete() { testCases := []struct { - Name string - UseRunID bool + Name string + ApplyRunID func(tv *testvars.TestVars, runID string) *testvars.TestVars }{ { - Name: "with RunID", - UseRunID: true, + Name: "with RunID", + ApplyRunID: func(tv *testvars.TestVars, runID string) *testvars.TestVars { return tv.WithRunID(runID) }, }, { - Name: "without RunID", - UseRunID: false, + Name: "without RunID", + ApplyRunID: func(tv *testvars.TestVars, _ string) *testvars.TestVars { return tv }, }, } @@ -1373,10 +1359,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A s.Run(tc.Name, func() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) - if !tc.UseRunID { - tv = tv.WithRunID("") - } + runID := s.startWorkflow(tv) // Drain existing first WT from regular task queue, but respond with sticky queue enabled response, next WT will go to sticky queue. _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, @@ -1418,7 +1401,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A // This is to make sure that sticky poller above reached server first. // And when update comes, stick poller is already available. time.Sleep(500 * time.Millisecond) //nolint:forbidigo - updateResult := <-s.sendUpdateNoError(tv) + updateResult := <-s.sendUpdateNoError(tc.ApplyRunID(tv, runID)) s.EqualValues("success-result-of-"+tv.UpdateID(), testcore.DecodeString(s.T(), updateResult.GetOutcome().GetSuccess())) @@ -1432,7 +1415,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A 7 WorkflowTaskCompleted 8 WorkflowExecutionUpdateAccepted {"AcceptedRequestSequencingEventId": 5} // WTScheduled event which delivered update to the worker. 9 WorkflowExecutionUpdateCompleted {"AcceptedEventId": 8} - `, s.GetHistory(s.Namespace().String(), tv.WorkflowExecution())) + `, s.GetHistory(s.Namespace().String(), tc.ApplyRunID(tv, runID).WorkflowExecution())) }) } } @@ -1440,7 +1423,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_AcceptComplete_StickyWorkerUnavailable() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -1542,7 +1525,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StickySpeculativeWorkflowTask_A func (s *UpdateWorkflowSuite) TestUpdateWorkflow_FirstNormalScheduledWorkflowTask_Reject() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -1616,7 +1599,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_FirstNormalScheduledWorkflowTas func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Reject() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, taskpoller.DrainWorkflowTask) s.NoError(err) @@ -1698,7 +1681,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_EmptySpeculativeWorkflowTask_Re func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NotEmptySpeculativeWorkflowTask_Reject() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -1835,7 +1818,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_NotEmptySpeculativeWorkflowTask func (s *UpdateWorkflowSuite) TestUpdateWorkflow_1stAccept_2ndAccept_2ndComplete_1stComplete() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) tv1 := tv.WithUpdateIDNumber(1).WithMessageIDNumber(1).WithActivityIDNumber(1) tv2 := tv.WithUpdateIDNumber(2).WithMessageIDNumber(2).WithActivityIDNumber(2) @@ -2045,7 +2028,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_1stAccept_2ndAccept_2ndComplete func (s *UpdateWorkflowSuite) TestUpdateWorkflow_1stAccept_2ndReject_1stComplete() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) tv1 := tv.WithUpdateIDNumber(1).WithMessageIDNumber(1).WithActivityIDNumber(1) tv2 := tv.WithUpdateIDNumber(2).WithMessageIDNumber(2).WithActivityIDNumber(2) @@ -2202,7 +2185,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_1stAccept_2ndReject_1stComplete func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, taskpoller.DrainWorkflowTask) s.NoError(err) @@ -2324,7 +2307,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Fail() func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StartedSpeculativeWorkflowTask_ConvertToNormalBecauseOfBufferedSignal() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -2438,7 +2421,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StartedSpeculativeWorkflowTask_ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_ScheduledSpeculativeWorkflowTask_ConvertToNormalBecauseOfSignal() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -2545,11 +2528,9 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_StartTo WorkflowTaskTimeout: durationpb.New(1 * time.Second), // Important! } - startResp, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) + _, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) s.NoError(err) - tv = tv.WithRunID(startResp.GetRunId()) - wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { wtHandlerCalls++ @@ -2692,7 +2673,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_StartTo func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_ScheduleToStartTimeout() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) // Drain first WT and respond with sticky enabled response to enable sticky task queue. stickyScheduleToStartTimeout := 1 * time.Second @@ -2749,7 +2730,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Schedul func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_ScheduleToStartTimeoutOnNormalTaskQueue() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -2851,7 +2832,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Schedul func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StartedSpeculativeWorkflowTask_TerminateWorkflow() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -2956,7 +2937,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StartedSpeculativeWorkflowTask_ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_ScheduledSpeculativeWorkflowTask_TerminateWorkflow() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -3050,16 +3031,18 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompleteWorkflow_AbortUpdates() type completionCommand struct { name string finalStatus enumspb.WorkflowExecutionStatus - command func(_ *testvars.TestVars) *commandpb.Command + applyRunID func(tv *testvars.TestVars, runID string) *testvars.TestVars + command func(tv *testvars.TestVars) *commandpb.Command } testCases := []testCase{ { name: "update admitted", description: "update in stateAdmitted must get an error", updateErr: map[string]string{ - "workflow completed": "workflow execution already completed", - "workflow continued as new": "workflow operation can not be applied because workflow is closing", - "workflow failed": "workflow execution already completed", + "workflow completed": "workflow execution already completed", + "workflow continued as new without runID": "workflow operation can not be applied because workflow is closing", + "workflow continued as new with runID": "workflow operation can not be applied because workflow is closing", + "workflow failed": "workflow execution already completed", }, updateFailure: "", commands: func(_ *testvars.TestVars) []*commandpb.Command { return nil }, @@ -3101,6 +3084,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompleteWorkflow_AbortUpdates() { name: "workflow completed", finalStatus: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + applyRunID: func(tv *testvars.TestVars, _ string) *testvars.TestVars { return tv }, command: func(_ *testvars.TestVars) *commandpb.Command { return &commandpb.Command{ CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, @@ -3109,8 +3093,23 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompleteWorkflow_AbortUpdates() }, }, { - name: "workflow continued as new", + name: "workflow continued as new with runID", finalStatus: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW, + applyRunID: func(tv *testvars.TestVars, runID string) *testvars.TestVars { return tv.WithRunID(runID) }, + command: func(tv *testvars.TestVars) *commandpb.Command { + return &commandpb.Command{ + CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + }}, + } + }, + }, + { + name: "workflow continued as new without runID", + finalStatus: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, // This is the status of new run because update doesn't go to particular runID. + applyRunID: func(tv *testvars.TestVars, _ string) *testvars.TestVars { return tv }, command: func(tv *testvars.TestVars) *commandpb.Command { return &commandpb.Command{ CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, @@ -3124,6 +3123,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompleteWorkflow_AbortUpdates() { name: "workflow failed", finalStatus: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, + applyRunID: func(tv *testvars.TestVars, _ string) *testvars.TestVars { return tv }, command: func(tv *testvars.TestVars) *commandpb.Command { return &commandpb.Command{ CommandType: enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION, @@ -3140,7 +3140,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompleteWorkflow_AbortUpdates() s.Run(tc.name+" "+wfCC.name, func() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + runID := s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -3187,7 +3187,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompleteWorkflow_AbortUpdates() _, err := poller.PollAndProcessWorkflowTask() s.NoError(err) - updateResultCh := s.sendUpdate(testcore.NewContext(), tv) + updateResultCh := s.sendUpdate(testcore.NewContext(), wfCC.applyRunID(tv, runID)) // Complete workflow. _, err = poller.PollAndProcessWorkflowTask() @@ -3212,10 +3212,14 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompleteWorkflow_AbortUpdates() s.Nil(updateResult.response.GetOutcome().GetFailure(), tc.description) } + if expectedUpdateErr == "" && tc.updateFailure == "" { + s.Equal(runID, updateResult.response.GetUpdateRef().GetWorkflowExecution().GetRunId(), "update wasn't applied to the same run as was started") + } + // Check that update didn't block workflow completion. descResp, err := s.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{ Namespace: s.Namespace().String(), - Execution: tv.WorkflowExecution(), + Execution: wfCC.applyRunID(tv, runID).WorkflowExecution(), }) s.NoError(err) s.Equal(wfCC.finalStatus, descResp.WorkflowExecutionInfo.Status) @@ -3230,7 +3234,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompleteWorkflow_AbortUpdates() func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbeat() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) // Drain first WT. _, err := s.TaskPoller.PollAndHandleWorkflowTask(tv, taskpoller.DrainWorkflowTask) @@ -3304,7 +3308,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_Heartbe func (s *UpdateWorkflowSuite) TestUpdateWorkflow_ScheduledSpeculativeWorkflowTask_LostUpdate() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -3412,7 +3416,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_ScheduledSpeculativeWorkflowTas func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StartedSpeculativeWorkflowTask_LostUpdate() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -3534,7 +3538,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StartedSpeculativeWorkflowTask_ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_FirstNormalWorkflowTask_UpdateResurrectedAfterRegistryCleared() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -3643,7 +3647,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_FirstNormalWorkflowTask_UpdateR func (s *UpdateWorkflowSuite) TestUpdateWorkflow_ScheduledSpeculativeWorkflowTask_DeduplicateID() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -3736,7 +3740,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_ScheduledSpeculativeWorkflowTas func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StartedSpeculativeWorkflowTask_DeduplicateID() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) var updateResultCh2 <-chan *workflowservice.UpdateWorkflowExecutionResponse @@ -3846,7 +3850,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_CompletedSpeculativeWorkflowTas s.Run(tc.Name, func() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -3987,7 +3991,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StaleSpeculativeWorkflowTask_Fa */ tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -4131,7 +4135,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StaleSpeculativeWorkflowTask_Fa The second speculative WT respond back, server accept it. */ tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -4252,7 +4256,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StaleSpeculativeWorkflowTask_Fa */ tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) tv1 := tv.WithUpdateIDNumber(1).WithMessageIDNumber(1) tv2 := tv.WithUpdateIDNumber(2).WithMessageIDNumber(2) @@ -4369,7 +4373,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_StaleSpeculativeWorkflowTask_Fa func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_WorkerSkippedProcessing_RejectByServer() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) tv1 := tv.WithUpdateIDNumber(1).WithMessageIDNumber(1) tv2 := tv.WithUpdateIDNumber(2).WithMessageIDNumber(2) @@ -4490,7 +4494,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_WorkerS func (s *UpdateWorkflowSuite) TestUpdateWorkflow_LastWorkflowTask_HasUpdateMessage() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) poller := &testcore.TaskPoller{ Client: s.FrontendClient(), @@ -4535,7 +4539,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_LastWorkflowTask_HasUpdateMessa func (s *UpdateWorkflowSuite) TestUpdateWorkflow_SpeculativeWorkflowTask_QueryFailureClearsWFContext() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) wtHandlerCalls := 0 wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { @@ -4684,7 +4688,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_UpdatesAreSentToWorkerInOrderOf nUpdates := 10 tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) for i := 0; i < nUpdates; i++ { // Sequentially send updates one by one. s.sendUpdateNoError(tv.WithUpdateIDNumber(i)) @@ -4745,7 +4749,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_UpdatesAreSentToWorkerInOrderOf func (s *UpdateWorkflowSuite) TestUpdateWorkflow_WaitAccepted_GotCompleted() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) + s.startWorkflow(tv) poller := &testcore.TaskPoller{ Client: s.FrontendClient(), @@ -4783,9 +4787,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWorkflow_WaitAccepted_GotCompleted() { func (s *UpdateWorkflowSuite) TestUpdateWorkflow_ContinueAsNew_UpdateIsNotCarriedOver() { tv := testvars.New(s.T()) - tv = s.startWorkflow(tv) - firstRunID := tv.RunID() - tv = tv.WithRunID("") + firstRunID := s.startWorkflow(tv) tv1 := tv.WithUpdateIDNumber(1).WithMessageIDNumber(1) tv2 := tv.WithUpdateIDNumber(2).WithMessageIDNumber(2)