-
Notifications
You must be signed in to change notification settings - Fork 888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GetDeploymentReachability API #6859
Changes from all commits
670232c
7899a50
13fb4c7
12992a5
7a00f7e
97d4584
0349b73
ec84921
ad0dbc0
2f4a81f
079c710
a986591
aff94cc
51095f9
2b41bc7
215efc9
6de1893
e613d0c
6341511
e972d44
0714b00
96f47f9
e53ceef
9143dca
a3f7999
ee4a890
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ import ( | |
"github.com/temporalio/sqlparser" | ||
commonpb "go.temporal.io/api/common/v1" | ||
deploymentpb "go.temporal.io/api/deployment/v1" | ||
enumspb "go.temporal.io/api/enums/v1" | ||
persistencespb "go.temporal.io/server/api/persistence/v1" | ||
taskqueuespb "go.temporal.io/server/api/taskqueue/v1" | ||
"go.temporal.io/server/common/namespace" | ||
|
@@ -41,14 +42,44 @@ import ( | |
) | ||
|
||
const ( | ||
buildIdSearchAttributePrefixAssigned = "assigned" | ||
buildIdSearchAttributePrefixVersioned = "versioned" | ||
buildIdSearchAttributePrefixUnversioned = "unversioned" | ||
BuildIdSearchAttributeDelimiter = ":" | ||
buildIdSearchAttributePrefixReachability = "reachability" | ||
buildIdSearchAttributePrefixAssigned = "assigned" | ||
buildIdSearchAttributePrefixVersioned = "versioned" | ||
buildIdSearchAttributePrefixUnversioned = "unversioned" | ||
BuildIdSearchAttributeDelimiter = ":" | ||
// UnversionedSearchAttribute is the sentinel value used to mark all unversioned workflows | ||
UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned | ||
) | ||
|
||
// TODO (carly): fix delimiter | ||
// escapeBuildIdSearchAttributeDelimiter is a helper which escapes the BuildIdSearchAttributeDelimiter character in the input string | ||
func escapeBuildIdSearchAttributeDelimiter(s string) string { | ||
s = strings.Replace(s, BuildIdSearchAttributeDelimiter, `|`+BuildIdSearchAttributeDelimiter, -1) | ||
return s | ||
} | ||
|
||
// ReachabilityBuildIdSearchAttribute returns the search attribute value for the currently assigned build ID in the form | ||
// 'reachability:<behavior>:<deployment_series_name>:<deployment_build_id>' | ||
func ReachabilityBuildIdSearchAttribute(behavior enumspb.VersioningBehavior, deployment *deploymentpb.Deployment) string { | ||
var escapedDeployment string | ||
if deployment == nil { | ||
escapedDeployment = "UNVERSIONED" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm... behavior is not meaningful in this case, do we need to add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah honestly I think I never call this with a nil deployment, I was just not thinking / being paranoid about nil pointer error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can remove the nil-check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interesting, in the general case, it is possible that the behavior is unpinned but deployment is nil (unpinned override of a new/unversioned wf). So this is another evidence that we should not make it generic, and keep it specific to pinned when the deployment cannot be null. |
||
} else { | ||
escapedDeployment = fmt.Sprintf("%s%s%s", | ||
escapeBuildIdSearchAttributeDelimiter(deployment.GetSeriesName()), | ||
BuildIdSearchAttributeDelimiter, | ||
escapeBuildIdSearchAttributeDelimiter(deployment.GetBuildId()), | ||
) | ||
} | ||
return sqlparser.String(sqlparser.NewStrVal([]byte(fmt.Sprintf("%s%s%s%s%s", | ||
buildIdSearchAttributePrefixReachability, | ||
BuildIdSearchAttributeDelimiter, | ||
escapeBuildIdSearchAttributeDelimiter(behavior.String()), | ||
carlydf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
BuildIdSearchAttributeDelimiter, | ||
escapedDeployment, | ||
)))) | ||
} | ||
carlydf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// AssignedBuildIdSearchAttribute returns the search attribute value for the currently assigned build ID | ||
func AssignedBuildIdSearchAttribute(buildId string) string { | ||
return buildIdSearchAttributePrefixAssigned + BuildIdSearchAttributeDelimiter + buildId | ||
|
@@ -144,7 +175,7 @@ func DeploymentToString(deployment *deploymentpb.Deployment) string { | |
if deployment == nil { | ||
return "UNVERSIONED" | ||
} | ||
return deployment.SeriesName + ":" + deployment.GetBuildId() | ||
return deployment.GetSeriesName() + ":" + deployment.GetBuildId() | ||
} | ||
|
||
// MakeDirectiveForWorkflowTask returns a versioning directive based on the following parameters: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ import ( | |
"context" | ||
"errors" | ||
"fmt" | ||
"math" | ||
"math/rand" | ||
"reflect" | ||
"slices" | ||
|
@@ -2639,6 +2640,12 @@ func (ms *MutableStateImpl) UpdateBuildIdAssignment(buildId string) error { | |
return ms.updateBuildIdsSearchAttribute(&commonpb.WorkerVersionStamp{UseVersioning: true, BuildId: buildId}, limit) | ||
} | ||
|
||
// For v3 versioned workflows (ms.GetEffectiveVersioningBehavior() != UNSPECIFIED), this will append a tag formed as | ||
// reachability:<effective_behavior>:<deployment_series_name>:<deployment_build_id> to the BuildIds search attribute, | ||
Comment on lines
+2643
to
+2644
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. trying to remember what was the use for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm.. I think the reason to get reachability of unpinned would be to be able to further segment the "reachable" value into "reachable by unpinned only", which would mean you can feel more comfortable decommissioning a deployment, if it is only "reachable by unpinned" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but now, we just treat "reachable by unpinned only" as equivalent to "unreachable" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This plus my previous comment makes me thing maybe instead of a general (later we can add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah you're right, that was our reason. But we abandoned that thinking. but why spend costly visibility space for something we don't need? If we want to add it in the future, I think we can start adding this SA to unpinned as well (as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wait actually, I think we want There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or, to save storage, we can remove the previous pinned SA (I guess I can just remove the string from the string list that I read, and then write the new list?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah should at most be one pinned:... value in the list. Even if we keep reachability:... there should at most be one of those values in the list. |
||
// if it does not already exist there. The deployment will be execution_info.deployment, or the override deployment if | ||
// a pinned override is set. | ||
// For all other workflows (ms.GetEffectiveVersioningBehavior() == UNSPECIFIED), this will append a tag based on the | ||
// workflow's versioning status. | ||
func (ms *MutableStateImpl) updateBuildIdsSearchAttribute(stamp *commonpb.WorkerVersionStamp, maxSearchAttributeValueSize int) error { | ||
changed, err := ms.addBuildIdToSearchAttributesWithNoVisibilityTask(stamp, maxSearchAttributeValueSize) | ||
if err != nil { | ||
|
@@ -2671,34 +2678,55 @@ func (ms *MutableStateImpl) loadBuildIds() ([]string, error) { | |
return searchAttributeValues, nil | ||
} | ||
|
||
// getReachabilityDeployment ignores DeploymentTransition.Deployment. If there is a pinned override, | ||
// it returns the override deployment. If there is an unpinned override or no override, it returns | ||
// execution_info.deployment, which is the last deployment that this workflow completed a task on. | ||
func (ms *MutableStateImpl) getReachabilityDeployment() *deploymentpb.Deployment { | ||
versioningInfo := ms.GetExecutionInfo().GetVersioningInfo() | ||
if override := versioningInfo.GetVersioningOverride(); override != nil { | ||
if override.GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED { | ||
return override.GetDeployment() | ||
} | ||
} | ||
return versioningInfo.GetDeployment() | ||
} | ||
|
||
// Takes a list of loaded build IDs from a search attribute and adds a new build ID to it. Also makes sure that the | ||
// resulting SA list begins with either "unversioned" or "assigned:<bld>" based on workflow's Build ID assignment status. | ||
// Returns a potentially modified list. | ||
// [cleanup-old-wv] old versioning does not add "assigned:<bld>" value to the SA. | ||
// [cleanup-versioning-2] versioning-2 adds "assigned:<bld>" which is no longer used in versioning-3 | ||
func (ms *MutableStateImpl) addBuildIdToLoadedSearchAttribute( | ||
existingValues []string, | ||
stamp *commonpb.WorkerVersionStamp, | ||
) []string { | ||
var newValues []string | ||
if !stamp.GetUseVersioning() { | ||
if !stamp.GetUseVersioning() && ms.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { // unversioned workflows may still have non-nil deployment, so we don't check deployment | ||
newValues = append(newValues, worker_versioning.UnversionedSearchAttribute) | ||
} else if ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { | ||
newValues = append(newValues, worker_versioning.ReachabilityBuildIdSearchAttribute( | ||
ms.GetEffectiveVersioningBehavior(), | ||
ms.getReachabilityDeployment(), | ||
)) | ||
} else if ms.GetAssignedBuildId() != "" { | ||
newValues = append(newValues, worker_versioning.AssignedBuildIdSearchAttribute(ms.GetAssignedBuildId())) | ||
} | ||
|
||
buildId := worker_versioning.VersionStampToBuildIdSearchAttribute(stamp) | ||
found := slices.Contains(newValues, buildId) | ||
for _, existingValue := range existingValues { | ||
if existingValue == buildId { | ||
found = true | ||
if ms.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { | ||
buildId := worker_versioning.VersionStampToBuildIdSearchAttribute(stamp) | ||
found := slices.Contains(newValues, buildId) | ||
for _, existingValue := range existingValues { | ||
if existingValue == buildId { | ||
found = true | ||
} | ||
if !worker_versioning.IsUnversionedOrAssignedBuildIdSearchAttribute(existingValue) { | ||
newValues = append(newValues, existingValue) | ||
} | ||
} | ||
if !worker_versioning.IsUnversionedOrAssignedBuildIdSearchAttribute(existingValue) { | ||
newValues = append(newValues, existingValue) | ||
if !found { | ||
newValues = append(newValues, buildId) | ||
} | ||
} | ||
if !found { | ||
newValues = append(newValues, buildId) | ||
} | ||
return newValues | ||
} | ||
|
||
|
@@ -4251,23 +4279,16 @@ func (ms *MutableStateImpl) AddWorkflowExecutionOptionsUpdatedEvent( | |
|
||
func (ms *MutableStateImpl) ApplyWorkflowExecutionOptionsUpdatedEvent(event *historypb.HistoryEvent) error { | ||
override := event.GetWorkflowExecutionOptionsUpdatedEventAttributes().GetVersioningOverride() | ||
previousCurrentDeployment := ms.GetEffectiveDeployment() | ||
previousEffectiveDeployment := ms.GetEffectiveDeployment() | ||
previousEffectiveVersioningBehavior := ms.GetEffectiveVersioningBehavior() | ||
if ms.GetExecutionInfo().GetVersioningInfo() == nil { | ||
ms.GetExecutionInfo().VersioningInfo = &workflowpb.WorkflowExecutionVersioningInfo{} | ||
} | ||
ms.GetExecutionInfo().VersioningInfo.VersioningOverride = override | ||
|
||
// If effective deployment or behavior change, we need to reschedule any pending tasks, because History will reject | ||
// the task's start request if the task is being started by a poller that is not from the workflow's effective | ||
// deployment according to History. Therefore, it is important for matching to match tasks with the correct pollers. | ||
// Even if the effective deployment does not change, we still need to reschedule tasks into the appropriate | ||
// default/unpinned queue or the pinned queue, because the two queues will be handled differently if the task queue's | ||
// Current Deployment changes between now and when the task is started. | ||
// | ||
// We choose to let any started WFT that is running on the old deployment finish running, instead of forcing it to fail. | ||
if !proto.Equal(ms.GetEffectiveDeployment(), previousCurrentDeployment) || | ||
if !proto.Equal(ms.GetEffectiveDeployment(), previousEffectiveDeployment) || | ||
ms.GetEffectiveVersioningBehavior() != previousEffectiveVersioningBehavior { | ||
// TODO (carly) part 2: if safe mode, do replay test on new deployment if deployment changed, if fail, revert changes and abort | ||
// If there is an ongoing transition, we remove it so that tasks from this workflow (including the pending WFT | ||
// that initiated the transition) can run on our override deployment as soon as possible. | ||
// | ||
|
@@ -4291,21 +4312,33 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionOptionsUpdatedEvent(event *his | |
// and it will start the same transition in the workflow. So removing the transition would not make a difference | ||
// and would in fact add some extra work for the server. | ||
ms.executionInfo.GetVersioningInfo().DeploymentTransition = nil | ||
// TODO (carly) part 2: if safe mode, do replay test on new deployment if deployment changed, if fail, revert changes and abort | ||
|
||
// If effective deployment or behavior change, we need to reschedule any pending tasks, because History will reject | ||
// the task's start request if the task is being started by a poller that is not from the workflow's effective | ||
// deployment according to History. Therefore, it is important for matching to match tasks with the correct pollers. | ||
// Even if the effective deployment does not change, we still need to reschedule tasks into the appropriate | ||
// default/unpinned queue or the pinned queue, because the two queues will be handled differently if the task queue's | ||
// Current Deployment changes between now and when the task is started. | ||
// | ||
// We choose to let any started WFT that is running on the old deployment finish running, instead of forcing it to fail. | ||
ms.ClearStickyTaskQueue() | ||
if ms.HasPendingWorkflowTask() && !ms.HasStartedWorkflowTask() && | ||
// Speculative WFT is directly (without transfer task) added to matching when scheduled. | ||
// It is protected by timeout on both normal and sticky task queues. | ||
// If there is no poller for previous deployment, it will time out, | ||
// and will be rescheduled as normal WFT. | ||
ms.GetPendingWorkflowTask().Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { | ||
// sticky queue was just cleared, so the following call only generates | ||
// a WorkflowTask, not a WorkflowTaskTimeoutTask. | ||
// Sticky queue was just cleared, so the following call only generates a WorkflowTask, not a WorkflowTaskTimeoutTask. | ||
err := ms.taskGenerator.GenerateScheduleWorkflowTaskTasks(ms.GetPendingWorkflowTask().ScheduledEventID) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
// For v3 versioned workflows (ms.GetEffectiveVersioningBehavior() != UNSPECIFIED), this will update the reachability | ||
// search attribute based on the execution_info.deployment and/or override deployment if one exists. | ||
if err := ms.updateBuildIdsSearchAttribute(nil, math.MaxInt32); err != nil { | ||
return err | ||
} | ||
} | ||
return ms.reschedulePendingActivities() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apparently escaping is harder than we think. see #6864 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a todo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is the correct escape func. Maybe we should bring the function here and use it in all places. probably we want to rename this folder to common/deployment after we clean up old stuff.