Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support independent activities for pinned workflows #6957

Merged
merged 8 commits into from
Jan 15, 2025
4,410 changes: 2,215 additions & 2,195 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/searchattribute"
serviceerrors "go.temporal.io/server/common/serviceerror"
"google.golang.org/protobuf/types/known/emptypb"
)

Expand Down Expand Up @@ -286,3 +287,37 @@ func FindCurrentDeployment(deployments *persistencespb.DeploymentData) *deployme
}
return currentDeployment
}

func ValidateTaskVersionDirective(
directive *taskqueuespb.TaskVersionDirective,
wfBehavior enumspb.VersioningBehavior,
wfDeployment *deploymentpb.Deployment,
scheduledDeployment *deploymentpb.Deployment,
) error {
// Effective behavior and deployment of the workflow when History scheduled the WFT.
directiveBehavior := directive.GetBehavior()
if directiveBehavior != wfBehavior &&
// Verisoning 3 pre-release (v1.26, Dec 2024) is not populating request.VersionDirective so
// we skip this check until v1.28 if directiveBehavior is unspecified.
// TODO (shahab): remove this line after v1.27 is released.
directiveBehavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
// This must be a task scheduled before the workflow changes behavior. Matching can drop it.
return serviceerrors.NewObsoleteMatchingTask(fmt.Sprintf(
"task was scheduled when workflow had versioning behavior %s, now it has versioning behavior %s.",
directiveBehavior, wfBehavior))
}

directiveDeployment := directive.GetDeployment()
if directiveDeployment == nil {
// TODO: remove this once the ScheduledDeployment field is removed from proto
directiveDeployment = scheduledDeployment
}
if !directiveDeployment.Equal(wfDeployment) {
// This must be a task scheduled before the workflow transitions to the current
// deployment. Matching can drop it.
return serviceerrors.NewObsoleteMatchingTask(fmt.Sprintf(
"task was scheduled when workflow was on build %s, now it is on build %s.",
directiveDeployment.GetBuildId(), wfDeployment.GetBuildId()))
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,10 @@ message RecordWorkflowTaskStartedRequest {
temporal.server.api.clock.v1.VectorClock clock = 7;
temporal.server.api.taskqueue.v1.BuildIdRedirectInfo build_id_redirect_info = 8;
// The deployment passed by History when the task was scheduled.
// Deprecated. use `version_directive.deployment`.
temporal.api.deployment.v1.Deployment scheduled_deployment = 9;

// Versioning directive that was sent by history when scheduling the task.
temporal.server.api.taskqueue.v1.TaskVersionDirective version_directive = 10;
}

message RecordWorkflowTaskStartedResponse {
Expand Down Expand Up @@ -286,11 +288,11 @@ message RecordActivityTaskStartedRequest {
// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
int32 stamp = 9;
// The deployment passed by History when the task was scheduled.
// Deprecated. use `version_directive.deployment`.
temporal.api.deployment.v1.Deployment scheduled_deployment = 10;
// Whether the scheduled deployment contains the activity's task queue. Used by History to
// determine if the activity redirect should affect the workflow.
// Only set if `scheduled_deployment` is set (i.e. the task is redirected).
bool scheduled_deployment_contains_task_queue = 11;
reserved 11;
// Versioning directive that was sent by history when scheduling the task.
temporal.server.api.taskqueue.v1.TaskVersionDirective version_directive = 12;
}

message RecordActivityTaskStartedResponse {
Expand Down
34 changes: 16 additions & 18 deletions service/history/api/recordactivitytaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,12 @@ func recordActivityTaskStarted(
return nil, false, serviceerror.NewNotFound(errorMessage)
}

// TODO (shahab): support independent activities. Independent activities do not need all
// the deployment validations and do not start workflow transition.

wfBehavior := mutableState.GetEffectiveVersioningBehavior()
wfDeployment := mutableState.GetEffectiveDeployment()
pollerDeployment := worker_versioning.DeploymentFromCapabilities(request.PollRequest.WorkerVersionCapabilities)
// Effective deployment of the workflow when History scheduled the WFT.
scheduledDeployment := request.GetScheduledDeployment()
if scheduledDeployment == nil {
// Matching does not send the directive deployment when it's the same as poller's.
scheduledDeployment = pollerDeployment
}
if !scheduledDeployment.Equal(wfDeployment) {
// This must be an AT scheduled before the workflow transitions to the current
// deployment. Matching can drop it.
return nil, false, serviceerrors.NewObsoleteMatchingTask("wrong directive deployment")
err = worker_versioning.ValidateTaskVersionDirective(request.GetVersionDirective(), wfBehavior, wfDeployment, request.ScheduledDeployment)
if err != nil {
return nil, false, err
}

if mutableState.GetDeploymentTransition() != nil {
Expand All @@ -192,9 +183,11 @@ func recordActivityTaskStarted(
return nil, false, serviceerrors.NewActivityStartDuringTransition()
}

if !pollerDeployment.Equal(wfDeployment) {
// Task is redirected, see if this activity should start a transition on the workflow. The
// workflow transition happens only if the workflow TQ's current deployment is the same as
if !pollerDeployment.Equal(wfDeployment) &&
// Independent activities of pinned workflows are redirected. They should not start a transition on wf.
wfBehavior != enumspb.VERSIONING_BEHAVIOR_PINNED {
// AT of an unpinned workflow is redirected, see if a transition on the workflow should start.
// The workflow transition happens only if the workflow TQ's current deployment is the same as
// the poller deployment. Otherwise, it means the activity is independently versioned, we
// allow it to start without affecting the workflow.
wfTqCurrentDeployment, err := getTaskQueueCurrentDeployment(ctx,
Expand All @@ -211,6 +204,8 @@ func recordActivityTaskStarted(
if errors.Is(err, workflow.ErrPinnedWorkflowCannotTransition) {
// This must be a task from a time that the workflow was unpinned, but it's
// now pinned so can't transition. Matching can drop the task safely.
// TODO (shahab): remove this special error check because it is not
// expected to happen once scheduledBehavior is always populated. see TODOs above.
return nil, false, serviceerrors.NewObsoleteMatchingTask(err.Error())
}
return nil, false, err
Expand All @@ -235,8 +230,11 @@ func recordActivityTaskStarted(
taggedMetrics,
namespaceName,
// passing the root partition all the time as we don't care about partition ID in this metric
tqid.UnsafeTaskQueueFamily(namespaceEntry.ID().String(), ai.GetTaskQueue()).TaskQueue(enumspb.TASK_QUEUE_TYPE_ACTIVITY).RootPartition(),
shardContext.GetConfig().BreakdownMetricsByTaskQueue(namespaceName, ai.GetTaskQueue(), enumspb.TASK_QUEUE_TYPE_ACTIVITY),
tqid.UnsafeTaskQueueFamily(namespaceEntry.ID().String(),
ai.GetTaskQueue()).TaskQueue(enumspb.TASK_QUEUE_TYPE_ACTIVITY).RootPartition(),
shardContext.GetConfig().BreakdownMetricsByTaskQueue(namespaceName,
ai.GetTaskQueue(),
enumspb.TASK_QUEUE_TYPE_ACTIVITY),
),
).Record(scheduleToStartLatency)

Expand Down
12 changes: 6 additions & 6 deletions service/history/api/recordworkflowtaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,12 @@ func Invoke(
return nil, serviceerrors.NewObsoleteMatchingTask("wrong task queue type")
}

wfBehavior := mutableState.GetEffectiveVersioningBehavior()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any opportunity to share code between this and recordactivitytaskstarted? actually I'm not even sure that would make this easier to understand, maybe it's simpler to just do it inline in both. (especially after future cleanups.) just wondering.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted a method and shared the directive verification logic.

wfDeployment := mutableState.GetEffectiveDeployment()
pollerDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities)
// Effective deployment of the workflow when History scheduled the WFT.
scheduledDeployment := req.GetScheduledDeployment()
if !scheduledDeployment.Equal(wfDeployment) {
// This must be an AT scheduled before the workflow transitions to the current
// deployment. Matching can drop it.
return nil, serviceerrors.NewObsoleteMatchingTask("wrong directive deployment")
err = worker_versioning.ValidateTaskVersionDirective(req.GetVersionDirective(), wfBehavior, wfDeployment, req.ScheduledDeployment)
if err != nil {
return nil, err
}

_, workflowTask, err = mutableState.AddWorkflowTaskStartedEvent(
Expand Down Expand Up @@ -210,6 +208,8 @@ func Invoke(
if errors.Is(err, workflow.ErrPinnedWorkflowCannotTransition) {
// This must be a task from a time that the workflow was unpinned, but it's
// now pinned so can't transition. Matching can drop the task safely.
// TODO (shahab): remove this special error check because it is not
// expected to happen once scheduledBehavior is always populated. see TODOs above.
return nil, serviceerrors.NewObsoleteMatchingTask(err.Error())
}
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion service/matching/deployment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (

func findDeployment(deployments *persistencespb.DeploymentData, deployment *deploymentpb.Deployment) int {
for i, d := range deployments.GetDeployments() {
if d.Deployment.SeriesName == deployment.SeriesName && d.Deployment.BuildId == deployment.BuildId {
if d.Deployment.Equal(deployment) {
return i
}
}
Expand Down
4 changes: 4 additions & 0 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2443,7 +2443,9 @@ func (e *matchingEngineImpl) recordWorkflowTaskStarted(
RequestId: uuid.New(),
PollRequest: pollReq,
BuildIdRedirectInfo: task.redirectInfo,
// TODO: stop sending ScheduledDeployment. [cleanup-old-wv]
ScheduledDeployment: task.event.Data.VersionDirective.GetDeployment(),
VersionDirective: task.event.Data.VersionDirective,
}

return e.historyClient.RecordWorkflowTaskStarted(ctx, recordStartedRequest)
Expand All @@ -2466,7 +2468,9 @@ func (e *matchingEngineImpl) recordActivityTaskStarted(
PollRequest: pollReq,
BuildIdRedirectInfo: task.redirectInfo,
Stamp: task.event.Data.GetStamp(),
// TODO: stop sending ScheduledDeployment. [cleanup-old-wv]
ScheduledDeployment: task.event.Data.VersionDirective.GetDeployment(),
VersionDirective: task.event.Data.VersionDirective,
}

return e.historyClient.RecordActivityTaskStarted(ctx, recordStartedRequest)
Expand Down
2 changes: 1 addition & 1 deletion service/matching/physical_task_queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (c *physicalTaskQueueManagerImpl) ProcessSpooledTask(
// Don't try to set read level here because it may have been advanced already.
return nil
}
return c.partitionMgr.ProcessSpooledTask(ctx, task, c.queue.Version().BuildId())
return c.partitionMgr.ProcessSpooledTask(ctx, task, c.queue)
}

// DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned,
Expand Down
88 changes: 57 additions & 31 deletions service/matching/task_queue_partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,18 +332,19 @@ func (pm *taskQueuePartitionManagerImpl) PollTask(
func (pm *taskQueuePartitionManagerImpl) ProcessSpooledTask(
ctx context.Context,
task *internalTask,
assignedBuildId string,
backlogQueue *PhysicalTaskQueueKey,
) error {
taskInfo := task.event.GetData()
// This task came from taskReader so task.event is always set here.
directive := taskInfo.GetVersionDirective()
assignedBuildId := backlogQueue.Version().BuildId()
if assignedBuildId != "" {
// construct directive based on the build ID of the spool queue
directive = worker_versioning.MakeBuildIdDirective(assignedBuildId)
}
// Redirect and re-resolve if we're blocked in matcher and user data changes.
for {
_, syncMatchQueue, userDataChanged, err := pm.getPhysicalQueuesForAdd(
newBacklogQueue, syncMatchQueue, userDataChanged, err := pm.getPhysicalQueuesForAdd(
ctx,
directive,
nil,
Expand All @@ -361,6 +362,22 @@ func (pm *taskQueuePartitionManagerImpl) ProcessSpooledTask(
// make sure to reset redirectInfo in case it was set in a previous loop cycle
task.redirectInfo = nil
}
if !backlogQueue.version.Deployment().Equal(newBacklogQueue.QueueKey().version.Deployment()) {
// Backlog queue has changed, spool to the new queue. This should happen rarely: when
// activity of pinned workflow was determined independent and sent to the default queue
// but now at dispatch time, the determination is different because the activity pollers
// on the pinned deployment have reached server.
// TODO: before spooling, try to sync-match the task on the new queue
err = newBacklogQueue.SpoolTask(taskInfo)
if err != nil {
// return the error so task_reader retries the outer call
return err
}
// Finish the task because now it is copied to the other backlog. It should be considered
// invalid because a poller did not receive the task.
task.finish(nil, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return after this line? or continue to try to dispatch on syncMatchQueue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should return, good catch. Fixed.

return nil
}
err = syncMatchQueue.DispatchSpooledTask(ctx, task, userDataChanged)
if err != errInterrupted {
return err
Expand Down Expand Up @@ -779,10 +796,16 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd(
directive *taskqueuespb.TaskVersionDirective,
forwardInfo *taskqueuespb.TaskForwardInfo,
runId string,
) (pinnedQueue physicalTaskQueueManager, syncMatchQueue physicalTaskQueueManager, userDataChanged <-chan struct{}, err error) {
) (spoolQueue physicalTaskQueueManager, syncMatchQueue physicalTaskQueueManager, userDataChanged <-chan struct{}, err error) {
wfBehavior := directive.GetBehavior()
deployment := directive.GetDeployment()

perTypeUserData, userDataChanged, err := pm.getPerTypeUserData()
if err != nil {
return nil, nil, nil, err
}
deploymentData := perTypeUserData.GetDeploymentData()

if wfBehavior == enumspb.VERSIONING_BEHAVIOR_PINNED {
if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY {
// TODO (shahab): we can verify the passed deployment matches the last poller's deployment
Expand All @@ -793,26 +816,29 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd(
if err != nil {
return nil, nil, nil, err
}
pinnedQueue, err = pm.getVersionedQueue(ctx, "", "", deployment, true)
if err != nil {
return nil, nil, nil, err
}
if forwardInfo == nil {
// Task is not forwarded, so it can be spooled if sync match fails.
// Spool queue and sync match queue is the same for pinned workflows.
return pinnedQueue, pinnedQueue, nil, nil
} else {
// Forwarded from child partition - only do sync match.
return nil, pinnedQueue, nil, nil
}
}

perTypeUserData, perTypeUserDataChanged, err := pm.getPerTypeUserData()
if err != nil {
return nil, nil, nil, err
// We ignore the pinned directive if this is an activity task but the activity task queue is
// not present in the workflow's pinned deployment. Such activities are considered
// independent activities and are treated as unpinned, sent to their TQ's current deployment.
isIndependentActivity := pm.partition.TaskType() == enumspb.TASK_QUEUE_TYPE_ACTIVITY &&
findDeployment(deploymentData, deployment) == -1
if !isIndependentActivity {
pinnedQueue, err := pm.getVersionedQueue(ctx, "", "", deployment, true)
if err != nil {
return nil, nil, nil, err
}
if forwardInfo == nil {
// Task is not forwarded, so it can be spooled if sync match fails.
// Spool queue and sync match queue is the same for pinned workflows.
return pinnedQueue, pinnedQueue, userDataChanged, nil
} else {
// Forwarded from child partition - only do sync match.
return nil, pinnedQueue, userDataChanged, nil
}
}
}

currentDeployment := worker_versioning.FindCurrentDeployment(perTypeUserData.GetDeploymentData())
currentDeployment := worker_versioning.FindCurrentDeployment(deploymentData)
if currentDeployment != nil &&
// Make sure the wf is not v1-2 versioned
directive.GetAssignedBuildId() == "" {
Expand All @@ -824,17 +850,17 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd(
}

// TODO (shahab): we can verify the passed deployment matches the last poller's deployment
return pm.defaultQueue, pm.defaultQueue, perTypeUserDataChanged, nil
return pm.defaultQueue, pm.defaultQueue, userDataChanged, nil
}

currentDeploymentQueue, err := pm.getVersionedQueue(ctx, "", "", currentDeployment, true)
if forwardInfo == nil {
// Task is not forwarded, so it can be spooled if sync match fails.
// Unpinned tasks are spooled in default queue
return pm.defaultQueue, currentDeploymentQueue, perTypeUserDataChanged, err
return pm.defaultQueue, currentDeploymentQueue, userDataChanged, err
} else {
// Forwarded from child partition - only do sync match.
return nil, currentDeploymentQueue, perTypeUserDataChanged, err
return nil, currentDeploymentQueue, userDataChanged, err
}
}

Expand All @@ -855,17 +881,17 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd(
return nil, syncMatchQueue, nil, err
}

userData, userDataChanged, err := pm.userDataManager.GetUserData()
if err != nil {
return nil, nil, nil, err
}

if directive.GetBuildId() == nil {
// The task belongs to an unversioned execution. Keep using unversioned. But also return
// userDataChanged so if current deployment is set, the task redirects to that deployment.
return pm.defaultQueue, pm.defaultQueue, userDataChanged, nil
}

userData, userDataChanged, err := pm.userDataManager.GetUserData()
if err != nil {
return nil, nil, nil, err
}

data := userData.GetData().GetVersioningData()

var buildId, redirectBuildId string
Expand Down Expand Up @@ -921,7 +947,7 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd(
}

if versionSet != "" {
pinnedQueue = pm.defaultQueue
spoolQueue = pm.defaultQueue
syncMatchQueue, err = pm.getVersionedQueue(ctx, versionSet, "", nil, true)
if err != nil {
return nil, nil, nil, err
Expand All @@ -932,13 +958,13 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd(
return nil, nil, nil, err
}
// redirect rules are not applied when spooling a task. They'll be applied when dispatching the spool task.
pinnedQueue, err = pm.getPhysicalQueue(ctx, buildId)
spoolQueue, err = pm.getPhysicalQueue(ctx, buildId)
if err != nil {
return nil, nil, nil, err
}
}

return pinnedQueue, syncMatchQueue, userDataChanged, err
return spoolQueue, syncMatchQueue, userDataChanged, err
}

func (pm *taskQueuePartitionManagerImpl) getVersionSetForAdd(directive *taskqueuespb.TaskVersionDirective, data *persistencespb.VersioningData) (string, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ type (
PollTask(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, bool, error)
// ProcessSpooledTask dispatches a task to a poller. When there are no pollers to pick
// up the task, this method will return error. Task will not be persisted to db
ProcessSpooledTask(ctx context.Context, task *internalTask, assignedBuildId string) error
ProcessSpooledTask(
ctx context.Context,
task *internalTask,
backlogQueue *PhysicalTaskQueueKey,
) error
// DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned,
// if dispatched to local poller then nil and nil is returned.
DispatchQueryTask(ctx context.Context, taskId string, request *matchingservice.QueryWorkflowRequest) (*matchingservice.QueryWorkflowResponse, error)
Expand Down
Loading
Loading