Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add api at mutable state to track potential reapply events #7001

Merged
merged 10 commits into from
Jan 11, 2025
37 changes: 29 additions & 8 deletions service/history/ndc/transaction_manager_new_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,40 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
currentWorkflow.GetReleaseFn()(nil)
currentWorkflow = nil

targetWorkflowSnapshot, targetWorkflowEventsSeq, err := targetWorkflow.GetMutableState().CloseTransactionAsSnapshot(
ms := targetWorkflow.GetMutableState()

eventReapplyCandidates := ms.GetReapplyCandidateEvents()
targetWorkflowSnapshot, targetWorkflowEventsSeq, err := ms.CloseTransactionAsSnapshot(
targetWorkflowPolicy,
)
if err != nil {
return err
}

if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
targetWorkflowEventsSeq,
); err != nil {
return err
if len(targetWorkflowEventsSeq) != 0 {
if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
targetWorkflowEventsSeq,
); err != nil {
return err
}
} else if len(eventReapplyCandidates) != 0 {
eventsToApply := []*persistence.WorkflowEvents{
{
NamespaceID: ms.GetExecutionInfo().NamespaceId,
WorkflowID: ms.GetExecutionInfo().WorkflowId,
RunID: ms.GetExecutionState().RunId,
Events: eventReapplyCandidates,
},
}
if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
eventsToApply,
); err != nil {
return err
}
}

// target workflow is in zombie state, no need to update current record.
Expand All @@ -245,7 +266,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
createMode,
prevRunID,
prevLastWriteVersion,
targetWorkflow.GetMutableState(),
ms,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return(
targetWorkflowSnapshot, targetWorkflowEventsSeq, nil,
)
targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(nil)

s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil)
s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil)
Expand Down Expand Up @@ -340,6 +341,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return(
targetWorkflowSnapshot, targetWorkflowEventsSeq, nil,
)
targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(nil)

s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil)
s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil)
Expand Down
53 changes: 42 additions & 11 deletions service/history/ndc/workflow_state_replicator.go
yycptt marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,13 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
if err != nil {
return err
}
events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory)
if err != nil {
return err
}
for _, event := range events {
localMutableState.AddReapplyCandidateEvent(event)
}
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: isNewBranch,
Expand All @@ -830,6 +837,7 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
}
prevTxnID = txnID
isNewBranch = false

localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(historyBlob.rawHistory.Data))
}
return nil
Expand All @@ -856,6 +864,9 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
if err != nil {
return err
}
for _, event := range events {
localMutableState.AddReapplyCandidateEvent(event)
}
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: isNewBranch,
Expand Down Expand Up @@ -936,7 +947,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist(
return err
}

lastFirstTxnID, err := r.backfillHistory(
lastFirstTxnID, eventsToReapply, err := r.backfillHistory(
ctx,
sourceCluster,
namespaceID,
Expand All @@ -949,6 +960,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist(
lastEventItem.GetEventId(),
lastEventItem.GetVersion(),
newHistoryBranchToken,
isStateBased,
)
if err != nil {
return err
Expand Down Expand Up @@ -976,6 +988,13 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist(
if err != nil {
return err
}

if isStateBased {
for _, event := range eventsToReapply {
mutableState.AddReapplyCandidateEvent(event)
yycptt marked this conversation as resolved.
Show resolved Hide resolved
}
}

if newRunInfo != nil {
err = r.createNewRunWorkflow(
ctx,
Expand Down Expand Up @@ -1076,7 +1095,8 @@ func (r *WorkflowStateReplicatorImpl) backfillHistory(
lastEventID int64,
lastEventVersion int64,
branchToken []byte,
) (taskID int64, retError error) {
isStateBased bool,
) (taskID int64, eventsToReapply []*historypb.HistoryEvent, retError error) {

if runID != originalRunID {
// At this point, it already acquired the workflow lock on the run ID.
Expand All @@ -1092,7 +1112,7 @@ func (r *WorkflowStateReplicatorImpl) backfillHistory(
locks.PriorityLow,
)
if err != nil {
return common.EmptyEventTaskID, err
return common.EmptyEventTaskID, nil, err
}
defer func() {
if rec := recover(); rec != nil {
Expand All @@ -1119,7 +1139,7 @@ func (r *WorkflowStateReplicatorImpl) backfillHistory(
}
case *serviceerror.NotFound:
default:
return common.EmptyEventTaskID, err
return common.EmptyEventTaskID, nil, err
}
}

Expand All @@ -1137,7 +1157,7 @@ func (r *WorkflowStateReplicatorImpl) backfillHistory(
historyBranchUtil := r.executionMgr.GetHistoryBranchUtil()
historyBranch, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken)
if err != nil {
return common.EmptyEventTaskID, err
return common.EmptyEventTaskID, nil, err
}

prevTxnID := common.EmptyEventTaskID
Expand All @@ -1150,7 +1170,17 @@ BackfillLoop:
for remoteHistoryIterator.HasNext() {
historyBlob, err := remoteHistoryIterator.Next()
if err != nil {
return common.EmptyEventTaskID, err
return common.EmptyEventTaskID, nil, err
}

if isStateBased {
// If backfill suceeds but later event reapply fails, during task's next retry,
// we still need to reapply events that have been stored in local DB.
events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory)
if err != nil {
return common.EmptyEventTaskID, nil, err
}
eventsToReapply = append(eventsToReapply, events...)
yycptt marked this conversation as resolved.
Show resolved Hide resolved
}

if historyBlob.nodeID <= lastBatchNodeID {
Expand Down Expand Up @@ -1180,7 +1210,7 @@ BackfillLoop:
currentAncestor = sortedAncestors[sortedAncestorsIdx]
branchID = currentAncestor.GetBranchId()
if historyBlob.nodeID < currentAncestor.GetBeginNodeId() || historyBlob.nodeID >= currentAncestor.GetEndNodeId() {
return common.EmptyEventTaskID, serviceerror.NewInternal(
return common.EmptyEventTaskID, nil, serviceerror.NewInternal(
fmt.Sprintf("The backfill history blob node id %d is not in acestoer range [%d, %d]",
historyBlob.nodeID,
currentAncestor.GetBeginNodeId(),
Expand All @@ -1200,12 +1230,13 @@ BackfillLoop:
runID,
)
if err != nil {
return common.EmptyEventTaskID, err
return common.EmptyEventTaskID, nil, err
}
txnID, err := r.shardContext.GenerateTaskID()
if err != nil {
return common.EmptyEventTaskID, err
return common.EmptyEventTaskID, nil, err
}

_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: prevBranchID != branchID,
Expand All @@ -1221,13 +1252,13 @@ BackfillLoop:
),
})
if err != nil {
return common.EmptyEventTaskID, err
return common.EmptyEventTaskID, nil, err
}
prevTxnID = txnID
prevBranchID = branchID
}

return prevTxnID, nil
return prevTxnID, eventsToReapply, nil
}

func (r *WorkflowStateReplicatorImpl) getHistoryFromLocalPaginationFn(
Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/workflow_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_W
RunId: s.runID,
}).AnyTimes()
mockMutableState.EXPECT().SetHistoryBuilder(gomock.Any())
mockMutableState.EXPECT().AddReapplyCandidateEvent(gomock.Any()).AnyTimes()
mockWeCtx := workflow.NewMockContext(s.controller)
sourceClusterName := "test-cluster"
mockShard := shard.NewMockContext(s.controller)
Expand Down
78 changes: 59 additions & 19 deletions service/history/workflow/context.go
yycptt marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution(
}
}()

eventReapplyCandidates := resetMutableState.GetReapplyCandidateEvents()
var newEventsReapplyCandidates []*historypb.HistoryEvent
resetWorkflow, resetWorkflowEventsSeq, err := resetMutableState.CloseTransactionAsSnapshot(
resetWorkflowTransactionPolicy,
)
Expand All @@ -400,6 +402,7 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution(
}
}()

newEventsReapplyCandidates = newMutableState.GetReapplyCandidateEvents()
newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(
*newWorkflowTransactionPolicy,
)
Expand All @@ -426,15 +429,48 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution(
}
}

if err := c.conflictResolveEventReapply(
ctx,
shardContext,
conflictResolveMode,
resetWorkflowEventsSeq,
newWorkflowEventsSeq,
// current workflow events will not participate in the events reapplication
); err != nil {
return err
if len(resetWorkflowEventsSeq) != 0 || len(newWorkflowEventsSeq) != 0 {
yycptt marked this conversation as resolved.
Show resolved Hide resolved
if err := c.conflictResolveEventReapply(
ctx,
shardContext,
conflictResolveMode,
resetWorkflowEventsSeq,
newWorkflowEventsSeq,
// current workflow events will not participate in the events reapplication
); err != nil {
return err
}
} else if len(eventReapplyCandidates) > 0 || len(newEventsReapplyCandidates) > 0 {
eventsToApply := []*persistence.WorkflowEvents{
{
NamespaceID: c.workflowKey.NamespaceID,
WorkflowID: c.workflowKey.WorkflowID,
RunID: c.workflowKey.RunID,
Events: eventReapplyCandidates,
},
}
var newEventsToReapply []*persistence.WorkflowEvents
if len(newEventsReapplyCandidates) > 0 {
newEventsToReapply = []*persistence.WorkflowEvents{
{
NamespaceID: newContext.GetWorkflowKey().NamespaceID,
WorkflowID: newContext.GetWorkflowKey().WorkflowID,
RunID: newContext.GetWorkflowKey().RunID,
Events: newEventsReapplyCandidates,
},
}
}
if err := c.conflictResolveEventReapply(
ctx,
shardContext,
conflictResolveMode,
eventsToApply,
newEventsToReapply,
// current workflow events will not participate in the events reapplication
); err != nil {
return err
}

}

if _, _, _, err := NewTransaction(shardContext).ConflictResolveWorkflowExecution(
Expand Down Expand Up @@ -581,7 +617,6 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew(
c.Clear()
}
}()

updateWorkflow, updateWorkflowEventsSeq, err := c.MutableState.CloseTransactionAsMutation(
updateWorkflowTransactionPolicy,
)
Expand All @@ -597,7 +632,6 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew(
newContext.Clear()
}
}()

newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(
*newWorkflowTransactionPolicy,
)
Expand All @@ -613,11 +647,23 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew(
return err
}

eventsToReapply := updateWorkflowEventsSeq
if len(updateWorkflowEventsSeq) == 0 {
eventsToReapply = []*persistence.WorkflowEvents{
{
NamespaceID: c.workflowKey.NamespaceID,
WorkflowID: c.workflowKey.WorkflowID,
RunID: c.workflowKey.RunID,
Events: c.MutableState.GetReapplyCandidateEvents(),
},
}
}

if err := c.updateWorkflowExecutionEventReapply(
ctx,
shardContext,
updateMode,
updateWorkflowEventsSeq,
eventsToReapply,
newWorkflowEventsSeq,
yycptt marked this conversation as resolved.
Show resolved Hide resolved
); err != nil {
return err
Expand Down Expand Up @@ -862,13 +908,7 @@ func (c *ContextImpl) ReapplyEvents(

for _, e := range events.Events {
event := e
switch event.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:

if shouldReapplyEvent(shardContext.StateMachineRegistry(), event) {
reapplyEvents = append(reapplyEvents, event)
}
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,5 +450,8 @@ type (
// If there is a pending workflow task that is not started yet, it'll be rescheduled after
// transition start.
StartDeploymentTransition(deployment *deploymentpb.Deployment) error

AddReapplyCandidateEvent(event *historypb.HistoryEvent)
GetReapplyCandidateEvents() []*historypb.HistoryEvent
}
)
Loading
Loading