From f3f96bf5670d113169e9d9cff1dbd93071eefce0 Mon Sep 17 00:00:00 2001 From: ShahabT Date: Fri, 22 Nov 2024 15:11:32 -0800 Subject: [PATCH] Pass versioning info when adding Matching tasks --- service/history/api/queryworkflow/api.go | 23 ++++++-- service/history/api/updateworkflow/api.go | 35 ++++++------ service/history/ndc_standby_task_util.go | 4 ++ .../timer_queue_active_task_executor.go | 1 + .../timer_queue_standby_task_executor.go | 1 + .../transfer_queue_active_task_executor.go | 7 ++- .../transfer_queue_standby_task_executor.go | 2 + .../transfer_queue_task_executor_base.go | 5 ++ .../history/workflow/mutable_state_impl.go | 33 ++++-------- service/history/workflow/util.go | 53 +++++++++++++++++++ service/matching/forwarder.go | 9 ++-- 11 files changed, 123 insertions(+), 50 deletions(-) diff --git a/service/history/api/queryworkflow/api.go b/service/history/api/queryworkflow/api.go index 8710f1fa8d7..884ca1bbf9e 100644 --- a/service/history/api/queryworkflow/api.go +++ b/service/history/api/queryworkflow/api.go @@ -32,6 +32,7 @@ import ( querypb "go.temporal.io/api/query/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" + deploymentpb "go.temporal.io/server/api/deployment/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/common" @@ -345,10 +346,11 @@ func queryDirectlyThroughMatching( } nonStickyMatchingRequest := &matchingservice.QueryWorkflowRequest{ - NamespaceId: namespaceID, - QueryRequest: queryRequest, - TaskQueue: msResp.TaskQueue, - VersionDirective: directive, + NamespaceId: namespaceID, + QueryRequest: queryRequest, + TaskQueue: msResp.TaskQueue, + VersionDirective: directive, + WorkflowVersioningInfo: GetWorkflowVersioningInfoMatchingTask(msResp), } nonStickyStartTime := time.Now().UTC() @@ -364,3 +366,16 @@ func queryDirectlyThroughMatching( QueryRejected: matchingResp.GetQueryRejected(), }}, err } + +func GetWorkflowVersioningInfoMatchingTask(msResp *historyservice.GetMutableStateResponse) *deploymentpb.WorkflowVersioningInfo { + effectiveBehavior := workflow.GetEffectiveVersioningBehavior(msResp.GetVersioningInfo()) + if effectiveBehavior == enumspb.VERSIONING_BEHAVIOR_PINNED { + // unversioned + return nil + } + + return &deploymentpb.WorkflowVersioningInfo{ + Behavior: effectiveBehavior, + Deployment: workflow.GetEffectiveDeployment(msResp.GetVersioningInfo()), + } +} diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index c6cf4b0bad9..708d2eef052 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -34,6 +34,7 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" updatepb "go.temporal.io/api/update/v1" "go.temporal.io/api/workflowservice/v1" + deploymentpb "go.temporal.io/server/api/deployment/v1" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" @@ -65,9 +66,10 @@ type Updater struct { req *historyservice.UpdateWorkflowExecutionRequest namespaceID namespace.ID - wfKey definition.WorkflowKey - upd *update.Update - directive *taskqueuespb.TaskVersionDirective + wfKey definition.WorkflowKey + upd *update.Update + directive *taskqueuespb.TaskVersionDirective + workflowVersioningInfo *deploymentpb.WorkflowVersioningInfo // Variables referencing mutable state data. // WARNING: any references to mutable state data *have to* be copied @@ -216,6 +218,7 @@ func (u *Updater) ApplyRequest( ms.GetMostRecentWorkerVersionStamp(), ms.HasCompletedAnyWorkflowTask(), ) + u.workflowVersioningInfo = workflow.GetWorkflowVersioningInfoMatchingTask(ms) return &api.UpdateWorkflowAction{ Noop: true, @@ -231,7 +234,7 @@ func (u *Updater) OnSuccess( // Speculative WFT was created and needs to be added directly to matching w/o transfer task. // TODO (alex): This code is copied from transferQueueActiveTaskExecutor.processWorkflowTask. // Helper function needs to be extracted to avoid code duplication. - err := u.addWorkflowTaskToMatching(ctx, u.wfKey, u.taskQueue, u.scheduledEventID, u.scheduleToStartTimeout, u.directive) + err := u.addWorkflowTaskToMatching(ctx) if _, isStickyWorkerUnavailable := err.(*serviceerrors.StickyWorkerUnavailable); isStickyWorkerUnavailable { // If sticky worker is unavailable, switch to original normal task queue. @@ -239,7 +242,7 @@ func (u *Updater) OnSuccess( Name: u.normalTaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL, } - err = u.addWorkflowTaskToMatching(ctx, u.wfKey, u.taskQueue, u.scheduledEventID, u.scheduleToStartTimeout, u.directive) + err = u.addWorkflowTaskToMatching(ctx) } if err != nil { @@ -277,14 +280,7 @@ func (u *Updater) OnSuccess( } // TODO (alex-update): Consider moving this func to a better place. -func (u *Updater) addWorkflowTaskToMatching( - ctx context.Context, - wfKey definition.WorkflowKey, - tq *taskqueuepb.TaskQueue, - scheduledEventID int64, - wtScheduleToStartTimeout time.Duration, - directive *taskqueuespb.TaskVersionDirective, -) error { +func (u *Updater) addWorkflowTaskToMatching(ctx context.Context) error { clock, err := u.shardCtx.NewVectorClock() if err != nil { return err @@ -293,14 +289,15 @@ func (u *Updater) addWorkflowTaskToMatching( _, err = u.matchingClient.AddWorkflowTask(ctx, &matchingservice.AddWorkflowTaskRequest{ NamespaceId: u.namespaceID.String(), Execution: &commonpb.WorkflowExecution{ - WorkflowId: wfKey.WorkflowID, - RunId: wfKey.RunID, + WorkflowId: u.wfKey.WorkflowID, + RunId: u.wfKey.RunID, }, - TaskQueue: tq, - ScheduledEventId: scheduledEventID, - ScheduleToStartTimeout: durationpb.New(wtScheduleToStartTimeout), + TaskQueue: u.taskQueue, + ScheduledEventId: u.scheduledEventID, + ScheduleToStartTimeout: durationpb.New(u.scheduleToStartTimeout), Clock: clock, - VersionDirective: directive, + VersionDirective: u.directive, + WorkflowVersioningInfo: u.workflowVersioningInfo, }) if err != nil { return err diff --git a/service/history/ndc_standby_task_util.go b/service/history/ndc_standby_task_util.go index 6e6d03021f1..3d682bee448 100644 --- a/service/history/ndc_standby_task_util.go +++ b/service/history/ndc_standby_task_util.go @@ -30,6 +30,7 @@ import ( "time" taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/server/api/deployment/v1" persistencespb "go.temporal.io/server/api/persistence/v1" taskqueuespb "go.temporal.io/server/api/taskqueue/v1" "go.temporal.io/server/common/log" @@ -117,6 +118,7 @@ type ( taskQueue string activityTaskScheduleToStartTimeout time.Duration versionDirective *taskqueuespb.TaskVersionDirective + workflowVersioningInfo *deployment.WorkflowVersioningInfo } workflowTaskPostActionInfo struct { @@ -167,6 +169,7 @@ func newActivityTaskPostActionInfo( historyResendInfo: resendInfo, activityTaskScheduleToStartTimeout: activityInfo.ScheduleToStartTimeout.AsDuration(), versionDirective: directive, + workflowVersioningInfo: workflow.GetWorkflowVersioningInfoMatchingTask(mutableState), }, nil } @@ -188,6 +191,7 @@ func newActivityRetryTimePostActionInfo( taskQueue: taskQueue, activityTaskScheduleToStartTimeout: activityScheduleToStartTimeout, versionDirective: directive, + workflowVersioningInfo: workflow.GetWorkflowVersioningInfoMatchingTask(mutableState), }, nil } diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index d3c5e889012..69aace5fc07 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -517,6 +517,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask( ScheduleToStartTimeout: durationpb.New(scheduleToStartTimeout), Clock: vclock.NewVectorClock(t.shardContext.GetClusterMetadata().GetClusterID(), t.shardContext.GetShardID(), task.TaskID), VersionDirective: directive, + WorkflowVersioningInfo: workflow.GetWorkflowVersioningInfoMatchingTask(mutableState), }) if err != nil { return err diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index d3351ae0dc7..669c8999521 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -741,6 +741,7 @@ func (t *timerQueueStandbyTaskExecutor) pushActivity( ScheduleToStartTimeout: durationpb.New(activityScheduleToStartTimeout), Clock: vclock.NewVectorClock(t.shardContext.GetClusterMetadata().GetClusterID(), t.shardContext.GetShardID(), activityTask.TaskID), VersionDirective: pushActivityInfo.versionDirective, + WorkflowVersioningInfo: pushActivityInfo.workflowVersioningInfo, }) if err != nil { diff --git a/service/history/transfer_queue_active_task_executor.go b/service/history/transfer_queue_active_task_executor.go index 154c1ae22f1..f5d43de4429 100644 --- a/service/history/transfer_queue_active_task_executor.go +++ b/service/history/transfer_queue_active_task_executor.go @@ -216,13 +216,14 @@ func (t *transferQueueActiveTaskExecutor) processActivityTask( timeout := timestamp.DurationValue(ai.ScheduleToStartTimeout) directive := MakeDirectiveForActivityTask(mutableState, ai) + wfVersioningInfo := workflow.GetWorkflowVersioningInfoMatchingTask(mutableState) // NOTE: do not access anything related mutable state after this lock release // release the context lock since we no longer need mutable state and // the rest of logic is making RPC call, which takes time. release(nil) - return t.pushActivity(ctx, task, timeout, directive, workflow.TransactionPolicyActive) + return t.pushActivity(ctx, task, timeout, directive, wfVersioningInfo, workflow.TransactionPolicyActive) } func (t *transferQueueActiveTaskExecutor) processWorkflowTask( @@ -265,6 +266,8 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( directive := MakeDirectiveForWorkflowTask(mutableState) + wfVersioningInfo := workflow.GetWorkflowVersioningInfoMatchingTask(mutableState) + // NOTE: Do not access mutableState after this lock is released. // It is important to release the workflow lock here, because pushWorkflowTask will call matching, // which will call history back (with RecordWorkflowTaskStarted), and it will try to get workflow lock again. @@ -276,6 +279,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( taskQueue, scheduleToStartTimeout.AsDuration(), directive, + wfVersioningInfo, workflow.TransactionPolicyActive, ) @@ -298,6 +302,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( taskQueue, scheduleToStartTimeout.AsDuration(), directive, + nil, workflow.TransactionPolicyActive, ) } diff --git a/service/history/transfer_queue_standby_task_executor.go b/service/history/transfer_queue_standby_task_executor.go index 5d9aa486bc9..e0393936ff5 100644 --- a/service/history/transfer_queue_standby_task_executor.go +++ b/service/history/transfer_queue_standby_task_executor.go @@ -545,6 +545,7 @@ func (t *transferQueueStandbyTaskExecutor) pushActivity( task.(*tasks.ActivityTask), timeout, pushActivityInfo.versionDirective, + pushActivityInfo.workflowVersioningInfo, workflow.TransactionPolicyPassive, ) } @@ -566,6 +567,7 @@ func (t *transferQueueStandbyTaskExecutor) pushWorkflowTask( pushwtInfo.taskqueue, pushwtInfo.workflowTaskScheduleToStartTimeout, pushwtInfo.versionDirective, + nil, workflow.TransactionPolicyPassive, ) } diff --git a/service/history/transfer_queue_task_executor_base.go b/service/history/transfer_queue_task_executor_base.go index 14b9c3ac42d..31a9dfc688f 100644 --- a/service/history/transfer_queue_task_executor_base.go +++ b/service/history/transfer_queue_task_executor_base.go @@ -32,6 +32,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" + deploymentpb "go.temporal.io/server/api/deployment/v1" "go.temporal.io/server/api/matchingservice/v1" taskqueuespb "go.temporal.io/server/api/taskqueue/v1" "go.temporal.io/server/common" @@ -118,6 +119,7 @@ func (t *transferQueueTaskExecutorBase) pushActivity( task *tasks.ActivityTask, activityScheduleToStartTimeout time.Duration, directive *taskqueuespb.TaskVersionDirective, + workflowVersioningInfo *deploymentpb.WorkflowVersioningInfo, transactionPolicy workflow.TransactionPolicy, ) error { resp, err := t.matchingRawClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{ @@ -134,6 +136,7 @@ func (t *transferQueueTaskExecutorBase) pushActivity( ScheduleToStartTimeout: durationpb.New(activityScheduleToStartTimeout), Clock: vclock.NewVectorClock(t.shardContext.GetClusterMetadata().GetClusterID(), t.shardContext.GetShardID(), task.TaskID), VersionDirective: directive, + WorkflowVersioningInfo: workflowVersioningInfo, }) if _, isNotFound := err.(*serviceerror.NotFound); isNotFound { // NotFound error is not expected for AddTasks calls @@ -168,6 +171,7 @@ func (t *transferQueueTaskExecutorBase) pushWorkflowTask( taskqueue *taskqueuepb.TaskQueue, workflowTaskScheduleToStartTimeout time.Duration, directive *taskqueuespb.TaskVersionDirective, + workflowVersioningInfo *deploymentpb.WorkflowVersioningInfo, transactionPolicy workflow.TransactionPolicy, ) error { var sst *durationpb.Duration @@ -185,6 +189,7 @@ func (t *transferQueueTaskExecutorBase) pushWorkflowTask( ScheduleToStartTimeout: sst, Clock: vclock.NewVectorClock(t.shardContext.GetClusterMetadata().GetClusterID(), t.shardContext.GetShardID(), task.TaskID), VersionDirective: directive, + WorkflowVersioningInfo: workflowVersioningInfo, }) if _, isNotFound := err.(*serviceerror.NotFound); isNotFound { // NotFound error is not expected for AddTasks calls diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 2c2a94bbf2c..8802ec0c719 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -6733,23 +6733,16 @@ func (ms *MutableStateImpl) disablingTransitionHistory() bool { } // GetEffectiveDeployment returns the effective deployment in the following order: -// 1. DeploymentTransition.Deployment: this is returned when the wf is transitioning to a new -// deployment -// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override at wf -// start time, or later via UpdateWorkflowExecutionOptions. -// 3. Deployment: this is returned when there is no transition and not override (most common case). -// Deployment is set based on the worker-sent deployment in the latest WFT completion. +// 1. DeploymentTransition.Deployment: this is returned when the wf is transitioning to a +// new deployment +// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override +// at wf start time, or later via UpdateWorkflowExecutionOptions. +// 3. Deployment: this is returned when there is no transition and no override (the most +// common case). Deployment is set based on the worker-sent deployment in the latest WFT +// completion. Exception: if Deployment is set but the workflow's effective behavior is +// UNSPECIFIED, it means the workflow is unversioned, so effective deployment will be nil. func (ms *MutableStateImpl) GetEffectiveDeployment() *deploymentpb.Deployment { - versioningInfo := ms.GetExecutionInfo().GetVersioningInfo() - if versioningInfo == nil { - return nil - } else if transition := versioningInfo.GetDeploymentTransition(); transition != nil { - return transition.GetDeployment() - } else if override := versioningInfo.GetVersioningOverride(); override != nil && - override.GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED { - return override.GetDeployment() - } - return versioningInfo.GetDeployment() + return GetEffectiveDeployment(ms.GetExecutionInfo().GetVersioningInfo()) } func (ms *MutableStateImpl) GetDeploymentTransition() *workflowpb.DeploymentTransition { @@ -6763,13 +6756,7 @@ func (ms *MutableStateImpl) GetDeploymentTransition() *workflowpb.DeploymentTran // 2. Behavior: this is returned when there is no override (most common case). Behavior is // set based on the worker-sent deployment in the latest WFT completion. func (ms *MutableStateImpl) GetEffectiveVersioningBehavior() enumspb.VersioningBehavior { - versioningInfo := ms.GetExecutionInfo().GetVersioningInfo() - if versioningInfo == nil { - return enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED - } else if override := versioningInfo.GetVersioningOverride(); override != nil { - return override.GetBehavior() - } - return versioningInfo.GetBehavior() + return GetEffectiveVersioningBehavior(ms.GetExecutionInfo().GetVersioningInfo()) } // StartDeploymentTransition starts a transition to the given deployment. Returns true diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index 274cfac91d6..103dbda2ec4 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -26,10 +26,12 @@ package workflow import ( commonpb "go.temporal.io/api/common/v1" + deploymentspb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" workflowpb "go.temporal.io/api/workflow/v1" + deploymentpb "go.temporal.io/server/api/deployment/v1" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/primitives/timestamp" @@ -195,3 +197,54 @@ func (mse MutableStateWithEffects) CanAddEvent() bool { // Event can be added to the history if workflow is still running. return mse.MutableState.IsWorkflowExecutionRunning() } + +func GetWorkflowVersioningInfoMatchingTask(ms MutableState) *deploymentpb.WorkflowVersioningInfo { + effectiveBehavior := ms.GetEffectiveVersioningBehavior() + if effectiveBehavior == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { + // unversioned + return nil + } + + return &deploymentpb.WorkflowVersioningInfo{ + Behavior: effectiveBehavior, + Deployment: ms.GetEffectiveDeployment(), + } +} + +// GetEffectiveDeployment returns the effective deployment in the following order: +// 1. DeploymentTransition.Deployment: this is returned when the wf is transitioning to a +// new deployment +// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override +// at wf start time, or later via UpdateWorkflowExecutionOptions. +// 3. Deployment: this is returned when there is no transition and no override (the most +// common case). Deployment is set based on the worker-sent deployment in the latest WFT +// completion. Exception: if Deployment is set but the workflow's effective behavior is +// UNSPECIFIED, it means the workflow is unversioned, so effective deployment will be nil. +func GetEffectiveDeployment(versioningInfo *workflowpb.WorkflowExecutionVersioningInfo) *deploymentspb.Deployment { + if versioningInfo == nil { + return nil + } else if transition := versioningInfo.GetDeploymentTransition(); transition != nil { + return transition.GetDeployment() + } else if override := versioningInfo.GetVersioningOverride(); override != nil && + override.GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED { + return override.GetDeployment() + } else if GetEffectiveVersioningBehavior(versioningInfo) != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { + return versioningInfo.GetDeployment() + } + return nil +} + +// GetEffectiveVersioningBehavior returns the effective versioning behavior in the following +// order: +// 1. VersioningOverride.Behavior: this is returned when user has set a behavior override +// at wf start time, or later via UpdateWorkflowExecutionOptions. +// 2. Behavior: this is returned when there is no override (most common case). Behavior is +// set based on the worker-sent deployment in the latest WFT completion. +func GetEffectiveVersioningBehavior(versioningInfo *workflowpb.WorkflowExecutionVersioningInfo) enumspb.VersioningBehavior { + if versioningInfo == nil { + return enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED + } else if override := versioningInfo.GetVersioningOverride(); override != nil { + return override.GetBehavior() + } + return versioningInfo.GetBehavior() +} diff --git a/service/matching/forwarder.go b/service/matching/forwarder.go index 1181b301622..5e094fb2d7d 100644 --- a/service/matching/forwarder.go +++ b/service/matching/forwarder.go @@ -153,6 +153,7 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro Clock: task.event.Data.GetClock(), ScheduleToStartTimeout: expirationDuration, ForwardInfo: fwdr.getForwardInfo(task), + WorkflowVersioningInfo: task.event.Data.GetWorkflowVersioningInfo(), }, ) case enumspb.TASK_QUEUE_TYPE_ACTIVITY: @@ -168,6 +169,7 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro Clock: task.event.Data.GetClock(), ScheduleToStartTimeout: expirationDuration, ForwardInfo: fwdr.getForwardInfo(task), + WorkflowVersioningInfo: task.event.Data.GetWorkflowVersioningInfo(), }, ) default: @@ -212,9 +214,10 @@ func (fwdr *Forwarder) ForwardQueryTask( Name: target.RpcName(), Kind: fwdr.partition.Kind(), }, - QueryRequest: task.query.request.QueryRequest, - VersionDirective: task.query.request.VersionDirective, - ForwardInfo: fwdr.getForwardInfo(task), + QueryRequest: task.query.request.QueryRequest, + VersionDirective: task.query.request.VersionDirective, + ForwardInfo: fwdr.getForwardInfo(task), + WorkflowVersioningInfo: task.query.request.GetWorkflowVersioningInfo(), }) return resp, fwdr.handleErr(err)