Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add api at mutable state to track potential reapply events #7001

Merged
merged 10 commits into from
Jan 11, 2025
37 changes: 29 additions & 8 deletions service/history/ndc/transaction_manager_new_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,40 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
currentWorkflow.GetReleaseFn()(nil)
currentWorkflow = nil

targetWorkflowSnapshot, targetWorkflowEventsSeq, err := targetWorkflow.GetMutableState().CloseTransactionAsSnapshot(
ms := targetWorkflow.GetMutableState()

eventReapplyCandidates := ms.GetReapplyCandidateEvents()
targetWorkflowSnapshot, targetWorkflowEventsSeq, err := ms.CloseTransactionAsSnapshot(
targetWorkflowPolicy,
)
if err != nil {
return err
}

if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
targetWorkflowEventsSeq,
); err != nil {
return err
if len(targetWorkflowEventsSeq) != 0 {
if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
targetWorkflowEventsSeq,
); err != nil {
return err
}
} else if len(eventReapplyCandidates) != 0 {
eventsToApply := []*persistence.WorkflowEvents{
{
NamespaceID: ms.GetExecutionInfo().NamespaceId,
WorkflowID: ms.GetExecutionInfo().WorkflowId,
RunID: ms.GetExecutionState().RunId,
Events: eventReapplyCandidates,
},
}
if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
eventsToApply,
); err != nil {
return err
}
}

// target workflow is in zombie state, no need to update current record.
Expand All @@ -245,7 +266,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
createMode,
prevRunID,
prevLastWriteVersion,
targetWorkflow.GetMutableState(),
ms,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
)
Expand Down
81 changes: 81 additions & 0 deletions service/history/ndc/transaction_manager_new_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return(
targetWorkflowSnapshot, targetWorkflowEventsSeq, nil,
)
targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(nil)

s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil)
s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil)
Expand All @@ -296,6 +297,85 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
s.True(currentReleaseCalled)
}

func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_ReapplyCandidates() {
ctx := context.Background()

namespaceID := namespace.ID("some random namespace ID")
workflowID := "some random workflow ID"
targetRunID := "some random run ID"
currentRunID := "other random runID"

targetReleaseCalled := false
currentReleaseCalled := false

targetWorkflow := NewMockWorkflow(s.controller)
targetContext := workflow.NewMockContext(s.controller)
targetMutableState := workflow.NewMockMutableState(s.controller)
var targetReleaseFn wcache.ReleaseCacheFunc = func(error) { targetReleaseCalled = true }
targetWorkflow.EXPECT().GetContext().Return(targetContext).AnyTimes()
targetWorkflow.EXPECT().GetMutableState().Return(targetMutableState).AnyTimes()
targetWorkflow.EXPECT().GetReleaseFn().Return(targetReleaseFn).AnyTimes()

currentWorkflow := NewMockWorkflow(s.controller)
var currentReleaseFn wcache.ReleaseCacheFunc = func(error) { currentReleaseCalled = true }
currentWorkflow.EXPECT().GetReleaseFn().Return(currentReleaseFn).AnyTimes()

targetWorkflowSnapshot := &persistence.WorkflowSnapshot{
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
NamespaceId: namespaceID.String(),
WorkflowId: workflowID,
},
}
targetWorkflowEventsSeq := []*persistence.WorkflowEvents{}

targetMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: namespaceID.String(),
WorkflowId: workflowID,
}).AnyTimes()
targetMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: targetRunID,
}).AnyTimes()
targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return(
targetWorkflowSnapshot, targetWorkflowEventsSeq, nil,
)

eventReapplyCandidates := []*historypb.HistoryEvent{{
EventId: common.FirstEventID + rand.Int63(),
}}
eventsToApply := []*persistence.WorkflowEvents{
{
NamespaceID: namespaceID.String(),
WorkflowID: workflowID,
RunID: targetRunID,
Events: eventReapplyCandidates,
},
}
targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(eventReapplyCandidates)

s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil)
s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil)

targetWorkflow.EXPECT().HappensAfter(currentWorkflow).Return(false, nil)
targetWorkflow.EXPECT().SuppressBy(currentWorkflow).Return(workflow.TransactionPolicyPassive, nil)

targetContext.EXPECT().CreateWorkflowExecution(
gomock.Any(),
s.mockShard,
persistence.CreateWorkflowModeBypassCurrent,
"",
int64(0),
targetMutableState,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
).Return(nil)
targetContext.EXPECT().ReapplyEvents(gomock.Any(), s.mockShard, eventsToApply).Return(nil)

err := s.createMgr.dispatchForNewWorkflow(ctx, targetWorkflow)
s.NoError(err)
s.True(targetReleaseCalled)
s.True(currentReleaseCalled)
}

func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_Dedup() {
ctx := context.Background()

Expand Down Expand Up @@ -340,6 +420,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return(
targetWorkflowSnapshot, targetWorkflowEventsSeq, nil,
)
targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(nil)

s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil)
s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil)
Expand Down
65 changes: 48 additions & 17 deletions service/history/ndc/workflow_state_replicator.go
yycptt marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,13 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
if err != nil {
return err
}
events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory)
if err != nil {
return err
}
for _, event := range events {
localMutableState.AddReapplyCandidateEvent(event)
}
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: isNewBranch,
Expand All @@ -830,6 +837,7 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
}
prevTxnID = txnID
isNewBranch = false

localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(historyBlob.rawHistory.Data))
}
return nil
Expand All @@ -856,6 +864,9 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
if err != nil {
return err
}
for _, event := range events {
localMutableState.AddReapplyCandidateEvent(event)
}
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: isNewBranch,
Expand Down Expand Up @@ -936,6 +947,24 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist(
return err
}

ns, err := r.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
return err
}

mutableState, err := workflow.NewSanitizedMutableState(
r.shardContext,
r.shardContext.GetEventsCache(),
r.logger,
ns,
sourceMutableState,
common.EmptyEventTaskID, // will be updated below
lastEventItem.GetVersion(),
)
if err != nil {
return err
}

lastFirstTxnID, err := r.backfillHistory(
ctx,
sourceCluster,
Expand All @@ -946,36 +975,23 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist(
// Use the history tree id to be the original run id.
// https://github.com/temporalio/temporal/issues/6501
branchInfo.GetTreeId(),
mutableState,
lastEventItem.GetEventId(),
lastEventItem.GetVersion(),
newHistoryBranchToken,
isStateBased,
)
if err != nil {
return err
}

ns, err := r.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
return err
}

mutableState, err := workflow.NewSanitizedMutableState(
r.shardContext,
r.shardContext.GetEventsCache(),
r.logger,
ns,
sourceMutableState,
lastFirstTxnID,
lastEventItem.GetVersion(),
)
if err != nil {
return err
}
mutableState.GetExecutionInfo().LastFirstEventTxnId = lastFirstTxnID

err = mutableState.SetCurrentBranchToken(newHistoryBranchToken)
if err != nil {
return err
}

if newRunInfo != nil {
err = r.createNewRunWorkflow(
ctx,
Expand Down Expand Up @@ -1073,9 +1089,11 @@ func (r *WorkflowStateReplicatorImpl) backfillHistory(
workflowID string,
runID string,
originalRunID string,
mutableState *workflow.MutableStateImpl,
lastEventID int64,
lastEventVersion int64,
branchToken []byte,
isStateBased bool,
) (taskID int64, retError error) {

if runID != originalRunID {
Expand Down Expand Up @@ -1153,6 +1171,18 @@ BackfillLoop:
return common.EmptyEventTaskID, err
}

if isStateBased {
// If backfill suceeds but later event reapply fails, during task's next retry,
// we still need to reapply events that have been stored in local DB.
events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory)
if err != nil {
return common.EmptyEventTaskID, err
}
for _, event := range events {
mutableState.AddReapplyCandidateEvent(event)
}
}

if historyBlob.nodeID <= lastBatchNodeID {
// The history batch already in DB.
if len(sortedAncestors) > sortedAncestorsIdx {
Expand Down Expand Up @@ -1206,6 +1236,7 @@ BackfillLoop:
if err != nil {
return common.EmptyEventTaskID, err
}

_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: prevBranchID != branchID,
Expand Down
22 changes: 22 additions & 0 deletions service/history/ndc/workflow_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
"go.uber.org/mock/gomock"
"google.golang.org/protobuf/proto"
)

type (
Expand Down Expand Up @@ -958,6 +959,19 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_MutationProv
s.IsType(&serviceerrors.SyncState{}, err)
}

type historyEventMatcher struct {
expected *historypb.HistoryEvent
}

func (m *historyEventMatcher) Matches(x interface{}) bool {
evt, ok := x.(*historypb.HistoryEvent)
return ok && proto.Equal(evt, m.expected)
}

func (m *historyEventMatcher) String() string {
return fmt.Sprintf("is equal to %v", m.expected)
}

func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_WithGapAndTailEvents() {
namespaceID := uuid.New()
versionHistories := &historyspb.VersionHistories{
Expand Down Expand Up @@ -1038,6 +1052,14 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_W
RunId: s.runID,
}).AnyTimes()
mockMutableState.EXPECT().SetHistoryBuilder(gomock.Any())

allEvents := append(gapEvents, requestedEvents...)
allEvents = append(allEvents, tailEvents...)
for _, event := range allEvents {
mockMutableState.EXPECT().AddReapplyCandidateEvent(&historyEventMatcher{expected: event}).
Times(1)
}

mockWeCtx := workflow.NewMockContext(s.controller)
sourceClusterName := "test-cluster"
mockShard := shard.NewMockContext(s.controller)
Expand Down
Loading
Loading