Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add api at mutable state to track potential reapply events #7001

Merged
merged 10 commits into from
Jan 11, 2025
37 changes: 29 additions & 8 deletions service/history/ndc/transaction_manager_new_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,40 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
currentWorkflow.GetReleaseFn()(nil)
currentWorkflow = nil

targetWorkflowSnapshot, targetWorkflowEventsSeq, err := targetWorkflow.GetMutableState().CloseTransactionAsSnapshot(
ms := targetWorkflow.GetMutableState()

eventReapplyCandidates := ms.GetReapplyCandidateEvents()
targetWorkflowSnapshot, targetWorkflowEventsSeq, err := ms.CloseTransactionAsSnapshot(
targetWorkflowPolicy,
)
if err != nil {
return err
}

if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
targetWorkflowEventsSeq,
); err != nil {
return err
if len(targetWorkflowEventsSeq) != 0 {
if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
targetWorkflowEventsSeq,
); err != nil {
return err
}
} else if len(eventReapplyCandidates) != 0 {
eventsToApply := []*persistence.WorkflowEvents{
{
NamespaceID: ms.GetExecutionInfo().NamespaceId,
WorkflowID: ms.GetExecutionInfo().WorkflowId,
RunID: ms.GetExecutionState().RunId,
Events: eventReapplyCandidates,
},
}
if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
eventsToApply,
); err != nil {
return err
}
}

// target workflow is in zombie state, no need to update current record.
Expand All @@ -245,7 +266,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
createMode,
prevRunID,
prevLastWriteVersion,
targetWorkflow.GetMutableState(),
ms,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return(
targetWorkflowSnapshot, targetWorkflowEventsSeq, nil,
)
targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(nil)

s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil)
s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil)
Expand Down Expand Up @@ -340,6 +341,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return(
targetWorkflowSnapshot, targetWorkflowEventsSeq, nil,
)
targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(nil)

s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil)
s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil)
Expand Down
65 changes: 48 additions & 17 deletions service/history/ndc/workflow_state_replicator.go
yycptt marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,13 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
if err != nil {
return err
}
events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory)
if err != nil {
return err
}
for _, event := range events {
localMutableState.AddReapplyCandidateEvent(event)
}
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: isNewBranch,
Expand All @@ -830,6 +837,7 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
}
prevTxnID = txnID
isNewBranch = false

localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(historyBlob.rawHistory.Data))
}
return nil
Expand All @@ -856,6 +864,9 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
if err != nil {
return err
}
for _, event := range events {
localMutableState.AddReapplyCandidateEvent(event)
}
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: isNewBranch,
Expand Down Expand Up @@ -936,6 +947,24 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist(
return err
}

ns, err := r.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
return err
}

mutableState, err := workflow.NewSanitizedMutableState(
r.shardContext,
r.shardContext.GetEventsCache(),
r.logger,
ns,
sourceMutableState,
common.EmptyEventTaskID, // will be updated below
lastEventItem.GetVersion(),
)
if err != nil {
return err
}

lastFirstTxnID, err := r.backfillHistory(
ctx,
sourceCluster,
Expand All @@ -946,36 +975,23 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist(
// Use the history tree id to be the original run id.
// https://github.com/temporalio/temporal/issues/6501
branchInfo.GetTreeId(),
mutableState,
lastEventItem.GetEventId(),
lastEventItem.GetVersion(),
newHistoryBranchToken,
isStateBased,
)
if err != nil {
return err
}

ns, err := r.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
return err
}

mutableState, err := workflow.NewSanitizedMutableState(
r.shardContext,
r.shardContext.GetEventsCache(),
r.logger,
ns,
sourceMutableState,
lastFirstTxnID,
lastEventItem.GetVersion(),
)
if err != nil {
return err
}
mutableState.GetExecutionInfo().LastFirstEventTxnId = lastFirstTxnID

err = mutableState.SetCurrentBranchToken(newHistoryBranchToken)
if err != nil {
return err
}

if newRunInfo != nil {
err = r.createNewRunWorkflow(
ctx,
Expand Down Expand Up @@ -1073,9 +1089,11 @@ func (r *WorkflowStateReplicatorImpl) backfillHistory(
workflowID string,
runID string,
originalRunID string,
mutableState *workflow.MutableStateImpl,
lastEventID int64,
lastEventVersion int64,
branchToken []byte,
isStateBased bool,
) (taskID int64, retError error) {

if runID != originalRunID {
Expand Down Expand Up @@ -1153,6 +1171,18 @@ BackfillLoop:
return common.EmptyEventTaskID, err
}

if isStateBased {
// If backfill suceeds but later event reapply fails, during task's next retry,
// we still need to reapply events that have been stored in local DB.
events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory)
if err != nil {
return common.EmptyEventTaskID, err
}
for _, event := range events {
mutableState.AddReapplyCandidateEvent(event)
}
}

if historyBlob.nodeID <= lastBatchNodeID {
// The history batch already in DB.
if len(sortedAncestors) > sortedAncestorsIdx {
Expand Down Expand Up @@ -1206,6 +1236,7 @@ BackfillLoop:
if err != nil {
return common.EmptyEventTaskID, err
}

_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: prevBranchID != branchID,
Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/workflow_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_W
RunId: s.runID,
}).AnyTimes()
mockMutableState.EXPECT().SetHistoryBuilder(gomock.Any())
mockMutableState.EXPECT().AddReapplyCandidateEvent(gomock.Any()).AnyTimes()
mockWeCtx := workflow.NewMockContext(s.controller)
sourceClusterName := "test-cluster"
mockShard := shard.NewMockContext(s.controller)
Expand Down
40 changes: 31 additions & 9 deletions service/history/workflow/context.go
yycptt marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,25 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution(
}
}

eventsToReapply := resetWorkflowEventsSeq
if len(resetWorkflowEventsSeq) == 0 {
eventsToReapply = []*persistence.WorkflowEvents{
{
NamespaceID: c.workflowKey.NamespaceID,
WorkflowID: c.workflowKey.WorkflowID,
RunID: c.workflowKey.RunID,
Events: resetMutableState.GetReapplyCandidateEvents(),
},
}
}

if err := c.conflictResolveEventReapply(
ctx,
shardContext,
conflictResolveMode,
resetWorkflowEventsSeq,
eventsToReapply,
// The new run is created by applying events so the history builder in newMutableState contains the events be re-applied.
// So we can use newWorkflowEventsSeq directly to reapply events.
newWorkflowEventsSeq,
// current workflow events will not participate in the events reapplication
); err != nil {
Expand Down Expand Up @@ -613,11 +627,25 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew(
return err
}

eventsToReapply := updateWorkflowEventsSeq
if len(updateWorkflowEventsSeq) == 0 {
eventsToReapply = []*persistence.WorkflowEvents{
{
NamespaceID: c.workflowKey.NamespaceID,
WorkflowID: c.workflowKey.WorkflowID,
RunID: c.workflowKey.RunID,
Events: c.MutableState.GetReapplyCandidateEvents(),
},
}
}

if err := c.updateWorkflowExecutionEventReapply(
ctx,
shardContext,
updateMode,
updateWorkflowEventsSeq,
eventsToReapply,
// The new run is created by applying events so the history builder in newMutableState contains the events be re-applied.
// So we can use newWorkflowEventsSeq directly to reapply events.
newWorkflowEventsSeq,
yycptt marked this conversation as resolved.
Show resolved Hide resolved
); err != nil {
return err
Expand Down Expand Up @@ -862,13 +890,7 @@ func (c *ContextImpl) ReapplyEvents(

for _, e := range events.Events {
event := e
switch event.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:

if shouldReapplyEvent(shardContext.StateMachineRegistry(), event) {
reapplyEvents = append(reapplyEvents, event)
}
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,5 +450,8 @@ type (
// If there is a pending workflow task that is not started yet, it'll be rescheduled after
// transition start.
StartDeploymentTransition(deployment *deploymentpb.Deployment) error

AddReapplyCandidateEvent(event *historypb.HistoryEvent)
GetReapplyCandidateEvents() []*historypb.HistoryEvent
}
)
17 changes: 17 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ type (
activityInfosUserDataUpdated map[int64]struct{}
timerInfosUserDataUpdated map[string]struct{}

// in memory fields to track potential reapply events that needs to be reapplied during workflow update
// should only be used in the state based replication as state based replication does not have
// event inside history builder. This is only for x-run reapply (from zombie wf to current wf)
reapplyEventsCandidate []*historypb.HistoryEvent
yycptt marked this conversation as resolved.
Show resolved Hide resolved

InsertTasks map[tasks.Category][]tasks.Task

speculativeWorkflowTaskTimeoutTask *tasks.WorkflowTaskTimeoutTask
Expand Down Expand Up @@ -309,6 +314,7 @@ func NewMutableState(
updateInfoUpdated: make(map[string]struct{}),
timerInfosUserDataUpdated: make(map[string]struct{}),
activityInfosUserDataUpdated: make(map[int64]struct{}),
reapplyEventsCandidate: []*historypb.HistoryEvent{},

QueryRegistry: NewQueryRegistry(),

Expand Down Expand Up @@ -6128,6 +6134,7 @@ func (ms *MutableStateImpl) cleanupTransaction() error {
ms.updateInfoUpdated = make(map[string]struct{})
ms.timerInfosUserDataUpdated = make(map[string]struct{})
ms.activityInfosUserDataUpdated = make(map[int64]struct{})
ms.reapplyEventsCandidate = nil

ms.stateInDB = ms.executionState.State
ms.nextEventIDInDB = ms.GetNextEventID()
Expand Down Expand Up @@ -7396,3 +7403,13 @@ func (ms *MutableStateImpl) reschedulePendingActivities() error {

return nil
}

func (ms *MutableStateImpl) AddReapplyCandidateEvent(event *historypb.HistoryEvent) {
if shouldReapplyEvent(ms.shard.StateMachineRegistry(), event) {
ms.reapplyEventsCandidate = append(ms.reapplyEventsCandidate, event)
}
}

func (ms *MutableStateImpl) GetReapplyCandidateEvents() []*historypb.HistoryEvent {
return ms.reapplyEventsCandidate
}
26 changes: 26 additions & 0 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading