diff --git a/service/history/ndc/transaction_manager_new_workflow.go b/service/history/ndc/transaction_manager_new_workflow.go index 1a44fde41d7..02e79964c7f 100644 --- a/service/history/ndc/transaction_manager_new_workflow.go +++ b/service/history/ndc/transaction_manager_new_workflow.go @@ -220,19 +220,40 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie( currentWorkflow.GetReleaseFn()(nil) currentWorkflow = nil - targetWorkflowSnapshot, targetWorkflowEventsSeq, err := targetWorkflow.GetMutableState().CloseTransactionAsSnapshot( + ms := targetWorkflow.GetMutableState() + + eventReapplyCandidates := ms.GetReapplyCandidateEvents() + targetWorkflowSnapshot, targetWorkflowEventsSeq, err := ms.CloseTransactionAsSnapshot( targetWorkflowPolicy, ) if err != nil { return err } - if err := targetWorkflow.GetContext().ReapplyEvents( - ctx, - r.shardContext, - targetWorkflowEventsSeq, - ); err != nil { - return err + if len(targetWorkflowEventsSeq) != 0 { + if err := targetWorkflow.GetContext().ReapplyEvents( + ctx, + r.shardContext, + targetWorkflowEventsSeq, + ); err != nil { + return err + } + } else if len(eventReapplyCandidates) != 0 { + eventsToApply := []*persistence.WorkflowEvents{ + { + NamespaceID: ms.GetExecutionInfo().NamespaceId, + WorkflowID: ms.GetExecutionInfo().WorkflowId, + RunID: ms.GetExecutionState().RunId, + Events: eventReapplyCandidates, + }, + } + if err := targetWorkflow.GetContext().ReapplyEvents( + ctx, + r.shardContext, + eventsToApply, + ); err != nil { + return err + } } // target workflow is in zombie state, no need to update current record. @@ -245,7 +266,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie( createMode, prevRunID, prevLastWriteVersion, - targetWorkflow.GetMutableState(), + ms, targetWorkflowSnapshot, targetWorkflowEventsSeq, ) diff --git a/service/history/ndc/transaction_manager_new_workflow_test.go b/service/history/ndc/transaction_manager_new_workflow_test.go index 89341136a73..7e8097d29f8 100644 --- a/service/history/ndc/transaction_manager_new_workflow_test.go +++ b/service/history/ndc/transaction_manager_new_workflow_test.go @@ -271,6 +271,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return( targetWorkflowSnapshot, targetWorkflowEventsSeq, nil, ) + targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(nil) s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil) s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil) @@ -296,6 +297,85 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ s.True(currentReleaseCalled) } +func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_ReapplyCandidates() { + ctx := context.Background() + + namespaceID := namespace.ID("some random namespace ID") + workflowID := "some random workflow ID" + targetRunID := "some random run ID" + currentRunID := "other random runID" + + targetReleaseCalled := false + currentReleaseCalled := false + + targetWorkflow := NewMockWorkflow(s.controller) + targetContext := workflow.NewMockContext(s.controller) + targetMutableState := workflow.NewMockMutableState(s.controller) + var targetReleaseFn wcache.ReleaseCacheFunc = func(error) { targetReleaseCalled = true } + targetWorkflow.EXPECT().GetContext().Return(targetContext).AnyTimes() + targetWorkflow.EXPECT().GetMutableState().Return(targetMutableState).AnyTimes() + targetWorkflow.EXPECT().GetReleaseFn().Return(targetReleaseFn).AnyTimes() + + currentWorkflow := NewMockWorkflow(s.controller) + var currentReleaseFn wcache.ReleaseCacheFunc = func(error) { currentReleaseCalled = true } + currentWorkflow.EXPECT().GetReleaseFn().Return(currentReleaseFn).AnyTimes() + + targetWorkflowSnapshot := &persistence.WorkflowSnapshot{ + ExecutionInfo: &persistencespb.WorkflowExecutionInfo{ + NamespaceId: namespaceID.String(), + WorkflowId: workflowID, + }, + } + targetWorkflowEventsSeq := []*persistence.WorkflowEvents{} + + targetMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + NamespaceId: namespaceID.String(), + WorkflowId: workflowID, + }).AnyTimes() + targetMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ + RunId: targetRunID, + }).AnyTimes() + targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return( + targetWorkflowSnapshot, targetWorkflowEventsSeq, nil, + ) + + eventReapplyCandidates := []*historypb.HistoryEvent{{ + EventId: common.FirstEventID + rand.Int63(), + }} + eventsToApply := []*persistence.WorkflowEvents{ + { + NamespaceID: namespaceID.String(), + WorkflowID: workflowID, + RunID: targetRunID, + Events: eventReapplyCandidates, + }, + } + targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(eventReapplyCandidates) + + s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil) + s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil) + + targetWorkflow.EXPECT().HappensAfter(currentWorkflow).Return(false, nil) + targetWorkflow.EXPECT().SuppressBy(currentWorkflow).Return(workflow.TransactionPolicyPassive, nil) + + targetContext.EXPECT().CreateWorkflowExecution( + gomock.Any(), + s.mockShard, + persistence.CreateWorkflowModeBypassCurrent, + "", + int64(0), + targetMutableState, + targetWorkflowSnapshot, + targetWorkflowEventsSeq, + ).Return(nil) + targetContext.EXPECT().ReapplyEvents(gomock.Any(), s.mockShard, eventsToApply).Return(nil) + + err := s.createMgr.dispatchForNewWorkflow(ctx, targetWorkflow) + s.NoError(err) + s.True(targetReleaseCalled) + s.True(currentReleaseCalled) +} + func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_Dedup() { ctx := context.Background() @@ -340,6 +420,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return( targetWorkflowSnapshot, targetWorkflowEventsSeq, nil, ) + targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(nil) s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil) s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil) diff --git a/service/history/ndc/workflow_state_replicator.go b/service/history/ndc/workflow_state_replicator.go index 6024789698b..53a30744c48 100644 --- a/service/history/ndc/workflow_state_replicator.go +++ b/service/history/ndc/workflow_state_replicator.go @@ -811,6 +811,13 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch( if err != nil { return err } + events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory) + if err != nil { + return err + } + for _, event := range events { + localMutableState.AddReapplyCandidateEvent(event) + } _, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{ ShardID: r.shardContext.GetShardID(), IsNewBranch: isNewBranch, @@ -830,6 +837,7 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch( } prevTxnID = txnID isNewBranch = false + localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(historyBlob.rawHistory.Data)) } return nil @@ -856,6 +864,9 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch( if err != nil { return err } + for _, event := range events { + localMutableState.AddReapplyCandidateEvent(event) + } _, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{ ShardID: r.shardContext.GetShardID(), IsNewBranch: isNewBranch, @@ -936,6 +947,24 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist( return err } + ns, err := r.namespaceRegistry.GetNamespaceByID(namespaceID) + if err != nil { + return err + } + + mutableState, err := workflow.NewSanitizedMutableState( + r.shardContext, + r.shardContext.GetEventsCache(), + r.logger, + ns, + sourceMutableState, + common.EmptyEventTaskID, // will be updated below + lastEventItem.GetVersion(), + ) + if err != nil { + return err + } + lastFirstTxnID, err := r.backfillHistory( ctx, sourceCluster, @@ -946,36 +975,23 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist( // Use the history tree id to be the original run id. // https://github.com/temporalio/temporal/issues/6501 branchInfo.GetTreeId(), + mutableState, lastEventItem.GetEventId(), lastEventItem.GetVersion(), newHistoryBranchToken, + isStateBased, ) if err != nil { return err } - ns, err := r.namespaceRegistry.GetNamespaceByID(namespaceID) - if err != nil { - return err - } - - mutableState, err := workflow.NewSanitizedMutableState( - r.shardContext, - r.shardContext.GetEventsCache(), - r.logger, - ns, - sourceMutableState, - lastFirstTxnID, - lastEventItem.GetVersion(), - ) - if err != nil { - return err - } + mutableState.GetExecutionInfo().LastFirstEventTxnId = lastFirstTxnID err = mutableState.SetCurrentBranchToken(newHistoryBranchToken) if err != nil { return err } + if newRunInfo != nil { err = r.createNewRunWorkflow( ctx, @@ -1073,9 +1089,11 @@ func (r *WorkflowStateReplicatorImpl) backfillHistory( workflowID string, runID string, originalRunID string, + mutableState *workflow.MutableStateImpl, lastEventID int64, lastEventVersion int64, branchToken []byte, + isStateBased bool, ) (taskID int64, retError error) { if runID != originalRunID { @@ -1153,6 +1171,18 @@ BackfillLoop: return common.EmptyEventTaskID, err } + if isStateBased { + // If backfill suceeds but later event reapply fails, during task's next retry, + // we still need to reapply events that have been stored in local DB. + events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory) + if err != nil { + return common.EmptyEventTaskID, err + } + for _, event := range events { + mutableState.AddReapplyCandidateEvent(event) + } + } + if historyBlob.nodeID <= lastBatchNodeID { // The history batch already in DB. if len(sortedAncestors) > sortedAncestorsIdx { @@ -1206,6 +1236,7 @@ BackfillLoop: if err != nil { return common.EmptyEventTaskID, err } + _, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{ ShardID: r.shardContext.GetShardID(), IsNewBranch: prevBranchID != branchID, diff --git a/service/history/ndc/workflow_state_replicator_test.go b/service/history/ndc/workflow_state_replicator_test.go index 72ae60c1f98..597ef67eed0 100644 --- a/service/history/ndc/workflow_state_replicator_test.go +++ b/service/history/ndc/workflow_state_replicator_test.go @@ -60,6 +60,7 @@ import ( "go.temporal.io/server/service/history/workflow" wcache "go.temporal.io/server/service/history/workflow/cache" "go.uber.org/mock/gomock" + "google.golang.org/protobuf/proto" ) type ( @@ -958,6 +959,19 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_MutationProv s.IsType(&serviceerrors.SyncState{}, err) } +type historyEventMatcher struct { + expected *historypb.HistoryEvent +} + +func (m *historyEventMatcher) Matches(x interface{}) bool { + evt, ok := x.(*historypb.HistoryEvent) + return ok && proto.Equal(evt, m.expected) +} + +func (m *historyEventMatcher) String() string { + return fmt.Sprintf("is equal to %v", m.expected) +} + func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_WithGapAndTailEvents() { namespaceID := uuid.New() versionHistories := &historyspb.VersionHistories{ @@ -1038,6 +1052,14 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_W RunId: s.runID, }).AnyTimes() mockMutableState.EXPECT().SetHistoryBuilder(gomock.Any()) + + allEvents := append(gapEvents, requestedEvents...) + allEvents = append(allEvents, tailEvents...) + for _, event := range allEvents { + mockMutableState.EXPECT().AddReapplyCandidateEvent(&historyEventMatcher{expected: event}). + Times(1) + } + mockWeCtx := workflow.NewMockContext(s.controller) sourceClusterName := "test-cluster" mockShard := shard.NewMockContext(s.controller) diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index 09034479eb9..6ff3bad2f84 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -426,11 +426,25 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution( } } + eventsToReapply := resetWorkflowEventsSeq + if len(resetWorkflowEventsSeq) == 0 { + eventsToReapply = []*persistence.WorkflowEvents{ + { + NamespaceID: c.workflowKey.NamespaceID, + WorkflowID: c.workflowKey.WorkflowID, + RunID: c.workflowKey.RunID, + Events: resetMutableState.GetReapplyCandidateEvents(), + }, + } + } + if err := c.conflictResolveEventReapply( ctx, shardContext, conflictResolveMode, - resetWorkflowEventsSeq, + eventsToReapply, + // The new run is created by applying events so the history builder in newMutableState contains the events be re-applied. + // So we can use newWorkflowEventsSeq directly to reapply events. newWorkflowEventsSeq, // current workflow events will not participate in the events reapplication ); err != nil { @@ -613,11 +627,25 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew( return err } + eventsToReapply := updateWorkflowEventsSeq + if len(updateWorkflowEventsSeq) == 0 { + eventsToReapply = []*persistence.WorkflowEvents{ + { + NamespaceID: c.workflowKey.NamespaceID, + WorkflowID: c.workflowKey.WorkflowID, + RunID: c.workflowKey.RunID, + Events: c.MutableState.GetReapplyCandidateEvents(), + }, + } + } + if err := c.updateWorkflowExecutionEventReapply( ctx, shardContext, updateMode, - updateWorkflowEventsSeq, + eventsToReapply, + // The new run is created by applying events so the history builder in newMutableState contains the events be re-applied. + // So we can use newWorkflowEventsSeq directly to reapply events. newWorkflowEventsSeq, ); err != nil { return err @@ -862,13 +890,7 @@ func (c *ContextImpl) ReapplyEvents( for _, e := range events.Events { event := e - switch event.GetEventType() { - case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, - enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED, - enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, - enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED, - enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED: - + if shouldReapplyEvent(shardContext.StateMachineRegistry(), event) { reapplyEvents = append(reapplyEvents, event) } } diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index 6b0ccbc3f06..ee518067fe2 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -447,5 +447,8 @@ type ( // If there is a pending workflow task that is not started yet, it'll be rescheduled after // transition start. StartDeploymentTransition(deployment *deploymentpb.Deployment) error + + AddReapplyCandidateEvent(event *historypb.HistoryEvent) + GetReapplyCandidateEvents() []*historypb.HistoryEvent } ) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 13df963bfde..cc43e90c6a9 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -218,6 +218,11 @@ type ( activityInfosUserDataUpdated map[int64]struct{} timerInfosUserDataUpdated map[string]struct{} + // in memory fields to track potential reapply events that needs to be reapplied during workflow update + // should only be used in the state based replication as state based replication does not have + // event inside history builder. This is only for x-run reapply (from zombie wf to current wf) + reapplyEventsCandidate []*historypb.HistoryEvent + InsertTasks map[tasks.Category][]tasks.Task speculativeWorkflowTaskTimeoutTask *tasks.WorkflowTaskTimeoutTask @@ -309,6 +314,7 @@ func NewMutableState( updateInfoUpdated: make(map[string]struct{}), timerInfosUserDataUpdated: make(map[string]struct{}), activityInfosUserDataUpdated: make(map[int64]struct{}), + reapplyEventsCandidate: []*historypb.HistoryEvent{}, QueryRegistry: NewQueryRegistry(), @@ -6133,6 +6139,7 @@ func (ms *MutableStateImpl) cleanupTransaction() error { ms.updateInfoUpdated = make(map[string]struct{}) ms.timerInfosUserDataUpdated = make(map[string]struct{}) ms.activityInfosUserDataUpdated = make(map[int64]struct{}) + ms.reapplyEventsCandidate = nil ms.stateInDB = ms.executionState.State ms.nextEventIDInDB = ms.GetNextEventID() @@ -7401,3 +7408,13 @@ func (ms *MutableStateImpl) reschedulePendingActivities() error { return nil } + +func (ms *MutableStateImpl) AddReapplyCandidateEvent(event *historypb.HistoryEvent) { + if shouldReapplyEvent(ms.shard.StateMachineRegistry(), event) { + ms.reapplyEventsCandidate = append(ms.reapplyEventsCandidate, event) + } +} + +func (ms *MutableStateImpl) GetReapplyCandidateEvents() []*historypb.HistoryEvent { + return ms.reapplyEventsCandidate +} diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index 371fcf61cd2..e823c56083f 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -403,6 +403,18 @@ func (mr *MockMutableStateMockRecorder) AddHistorySize(size any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddHistorySize", reflect.TypeOf((*MockMutableState)(nil).AddHistorySize), size) } +// AddReapplyCandidateEvent mocks base method. +func (m *MockMutableState) AddReapplyCandidateEvent(event *history.HistoryEvent) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddReapplyCandidateEvent", event) +} + +// AddReapplyCandidateEvent indicates an expected call of AddReapplyCandidateEvent. +func (mr *MockMutableStateMockRecorder) AddReapplyCandidateEvent(event any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddReapplyCandidateEvent", reflect.TypeOf((*MockMutableState)(nil).AddReapplyCandidateEvent), event) +} + // AddRecordMarkerEvent mocks base method. func (m *MockMutableState) AddRecordMarkerEvent(arg0 int64, arg1 *command.RecordMarkerCommandAttributes) (*history.HistoryEvent, error) { m.ctrl.T.Helper() @@ -2292,6 +2304,20 @@ func (mr *MockMutableStateMockRecorder) GetQueryRegistry() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueryRegistry", reflect.TypeOf((*MockMutableState)(nil).GetQueryRegistry)) } +// GetReapplyCandidateEvents mocks base method. +func (m *MockMutableState) GetReapplyCandidateEvents() []*history.HistoryEvent { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetReapplyCandidateEvents") + ret0, _ := ret[0].([]*history.HistoryEvent) + return ret0 +} + +// GetReapplyCandidateEvents indicates an expected call of GetReapplyCandidateEvents. +func (mr *MockMutableStateMockRecorder) GetReapplyCandidateEvents() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReapplyCandidateEvents", reflect.TypeOf((*MockMutableState)(nil).GetReapplyCandidateEvents)) +} + // GetRequestCancelInfo mocks base method. func (m *MockMutableState) GetRequestCancelInfo(arg0 int64) (*persistence.RequestCancelInfo, bool) { m.ctrl.T.Helper() diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index 0b3d49a9884..901e762a3f5 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -36,6 +36,7 @@ import ( "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/internal/effect" "go.temporal.io/server/service/history/consts" + "go.temporal.io/server/service/history/hsm" ) func failWorkflowTask( @@ -236,3 +237,22 @@ func GetEffectiveVersioningBehavior(versioningInfo *workflowpb.WorkflowExecution } return versioningInfo.GetBehavior() } + +// shouldReapplyEvent returns true if the event should be reapplied to the workflow execution. +func shouldReapplyEvent(stateMachineRegistry *hsm.Registry, event *historypb.HistoryEvent) bool { + switch event.GetEventType() { // nolint:exhaustive + case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED: + return true + } + + // events registered in the hsm framework that are potentially cherry-pickable + if _, ok := stateMachineRegistry.EventDefinition(event.GetEventType()); ok { + return true + } + + return false +}