-
Notifications
You must be signed in to change notification settings - Fork 877
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support unpinned workflows #6887
Conversation
…g-behaviors # Conflicts: # api/deployment/v1/message.go-helpers.pb.go # api/deployment/v1/message.pb.go # common/worker_versioning/worker_versioning.go # proto/internal/temporal/server/api/deployment/v1/message.proto
…g-behaviors # Conflicts: # api/persistence/v1/executions.pb.go # service/matching/task_queue_partition_manager.go
…shahab/transfer-versioning-info # Conflicts: # service/history/workflow/mutable_state_impl.go
…er-versioning-info # Conflicts: # api/deployment/v1/message.pb.go # api/historyservice/v1/request_response.pb.go # api/persistence/v1/tasks.pb.go # api/taskqueue/v1/message.pb.go # proto/internal/temporal/server/api/deployment/v1/message.proto # proto/internal/temporal/server/api/historyservice/v1/request_response.proto
…haviors # Conflicts: # common/worker_versioning/worker_versioning.go
…er-versioning-info
…g-behaviors # Conflicts: # common/worker_versioning/worker_versioning.go # service/history/workflow/util.go
common/log/tag/tags.go
Outdated
func Deployment(deployment *deployment.Deployment) ZapTag { | ||
val := "UNVERSIONED" | ||
if deployment != nil { | ||
val = deployment.SeriesName + ":" + deployment.GetBuildId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use GetXXX()
format everywhere for consistency. Also all these log tag thing is aiming to 0 allocations and 0 overhead if tag is not used but I guess there is no good way to make it work here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make deployment
a string
here and call DeploymentToString
before passing deployment to this func. Let's make tags trivial simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case there already is a deployment != nil
check so I'd just use direct field access.
Even better: I think you can delay the allocation until the tag is used by doing something like:
type deploymentStringer *deploymentpb.Deployment
// implement fmt.Stringer
func (d deploymentStringer) String() string {
if pb := (*deploymentpb.Deployment)(d); pb != nil {
return pb.SeriesName + ":" + pb.BuildId
}
return "unversioned"
}
func Deployment(d *deploymentpb.Deployment) ZapTag {
return NewAnyTag("deployment", deploymentStringer(d))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried the stringer thing but it gave me "Invalid receiver type 'deploymentStringer' ('*deploymentpb.Deployment' is a pointer type)" error. So I got rid of it, still improved the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh right.. I think you can work around that restriction by making a one-field struct with the pointer and using that as a value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured but I'm not sure how much value the delayed allocation adds in this case, is it worth all the extra code?
@@ -110,7 +110,7 @@ message VersioningData { | |||
message DeploymentData { | |||
// Set of deployments that this task queue belongs to. | |||
// Current deployment is defined implicitly as the deployment with the most recent last_became_current_time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you specify TaskQueueData.last_became_current_time
?
@@ -110,7 +110,7 @@ message VersioningData { | |||
message DeploymentData { | |||
// Set of deployments that this task queue belongs to. | |||
// Current deployment is defined implicitly as the deployment with the most recent last_became_current_time. | |||
repeated Deployment deployment = 1; | |||
repeated Deployment deployments = 1; | |||
|
|||
message Deployment { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This nested Deployment
is a bad sign. DeploymentDataItem
?
} | ||
wfDeployment := mutableState.GetEffectiveDeployment() | ||
if !directiveDeployment.Equal(wfDeployment) { | ||
// This must be a task scheduled before the workflow transitions to the current |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same way as we refer "workflow task" as WFT everywhere in comments, I would suggest to refer "activity task" as AT, not just "task".
return nil, serviceerrors.NewObsoleteMatchingTask("wrong task queue type") | ||
} | ||
|
||
dispatchDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dispatchDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities) | |
pollerDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities) |
} | ||
|
||
dispatchDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities) | ||
directiveDeployment := req.GetDirectiveDeployment() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
directiveDeployment := req.GetDirectiveDeployment() | |
scheduledDeployment := req.GetScheduledDeployment() |
-- deployment which history wanted when it scheduled a WFT.
if errors.Is(err, workflow.ErrPinnedWorkflowCannotTransition) { | ||
// This must be a task from a time that the workflow was unpinned, but it's | ||
// now pinned so can't transition. Matching can drop the task safely. | ||
return nil, serviceerrors.NewObsoleteMatchingTask("workflow is not eligible for transition") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, serviceerrors.NewObsoleteMatchingTask("workflow is not eligible for transition") | |
return nil, serviceerrors.NewObsoleteMatchingTask(workflow.ErrPinnedWorkflowCannotTransition.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review, will finish the rest in an hour or two
common/log/tag/tags.go
Outdated
func Deployment(deployment *deployment.Deployment) ZapTag { | ||
val := "UNVERSIONED" | ||
if deployment != nil { | ||
val = deployment.SeriesName + ":" + deployment.GetBuildId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case there already is a deployment != nil
check so I'd just use direct field access.
Even better: I think you can delay the allocation until the tag is used by doing something like:
type deploymentStringer *deploymentpb.Deployment
// implement fmt.Stringer
func (d deploymentStringer) String() string {
if pb := (*deploymentpb.Deployment)(d); pb != nil {
return pb.SeriesName + ":" + pb.BuildId
}
return "unversioned"
}
func Deployment(d *deploymentpb.Deployment) ZapTag {
return NewAnyTag("deployment", deploymentStringer(d))
}
@@ -54,7 +56,7 @@ func Invoke( | |||
namespace := namespaceEntry.Name() | |||
|
|||
response := &historyservice.RecordActivityTaskStartedResponse{} | |||
var dropTask bool | |||
var startedTransition bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uh oh, this was a mess to merge before since the logic got extracted. see what I did in the merge commit and if you like that or want to do it differently
dispatchDeployment := worker_versioning.DeploymentFromCapabilities(request.PollRequest.WorkerVersionCapabilities) | ||
directiveDeployment := request.GetDirectiveDeployment() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename as discussed
// activities. | ||
// If there is a pending workflow task that is not started yet, it'll be rescheduled after | ||
// transition start. | ||
StartDeploymentTransition(deployment *deploymentpb.Deployment) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this doesn't work for pinned, which is the call to move a pinned / change the pinned state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the question. Pinned state normally is set in afterAddWorkflowTaskCompletedEvent
, also could come from manual override which is applied in ApplyWorkflowExecutionOptionsUpdatedEvent
.
service/matching/db.go
Outdated
@@ -222,7 +222,7 @@ func (db *taskQueueDB) updateApproximateBacklogCount( | |||
// Prevent under-counting | |||
if db.approximateBacklogCount.Load()+delta < 0 { | |||
db.logger.Info("ApproximateBacklogCounter could have under-counted.", | |||
tag.WorkerBuildId(db.queue.BuildId()), tag.WorkflowNamespace(db.queue.Partition().NamespaceId())) | |||
tag.WorkerBuildId(db.queue.Version().MetricsTagValue()), tag.WorkflowNamespace(db.queue.Partition().NamespaceId())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be
tag.WorkerBuildId(db.queue.Version().MetricsTagValue()), tag.WorkflowNamespace(db.queue.Partition().NamespaceId())) | |
tag.WorkerBuildId(db.queue.Version().MetricsTagValue()), tag.WorkflowNamespaceId(db.queue.Partition().NamespaceId())) |
? mixing up name/id
…g-unpinned # Conflicts: # service/matching/matching_engine.go # service/matching/matching_engine_test.go # service/matching/physical_task_queue_key.go # service/matching/physical_task_queue_key_test.go # service/matching/physical_task_queue_manager.go # service/matching/task_queue_partition_manager.go # tests/versioning_3_test.go
service/matching/deployment_util.go
Outdated
@@ -23,7 +23,7 @@ func findDeployment(deployments []*persistencespb.DeploymentData_Deployment, dep | |||
return -1 | |||
} | |||
|
|||
func findCurrentDeployment(deployments []*persistencespb.DeploymentData_Deployment) int { | |||
func findCurrentDeployment(deployments []*persistencespb.DeploymentData_Deployment) *deploymentpb.Deployment { | |||
maxCurrentIndex := -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might as well change this to keep the pointer instead of the index
// History should've scheduled another task on the right task queue and deployment. | ||
// Dropping this one. | ||
e.logger.Info("dropping obsolete workflow task", | ||
tag.WorkflowTaskQueueName(taskQueueName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the log tag boilerplate in this function is getting a little out of hand. let's add some helpers later?
version PhysicalTaskQueueVersion | ||
} | ||
|
||
PhysicalTaskQueueVersion struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wasn't this in the previous pr? do you need to merge so we only see the diff from this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it was, and the previous PR is merged, it might be that Github has not updated the diff?! I'm going to merge some changes now and hopefully the diff gets refreshed.
// out, future wft will be redirected by matching again and the transition will | ||
// eventually happen. If an activity starts while the speculative is also started on the | ||
// new deployment, the activity will cause the transition to be created and persisted in | ||
// the MS. | ||
if workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to move this check above and don't start transition if WFT is speculative. It is because we don't want any extra changes in MS which are not persisted, but then decisions (like dropping AT) are made based on this, and then MS is lost.
And it is also ok not to do anything here because when new deployment is pinned new normal WFT is created and it will start transition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ^^^ comment probably worth to be moved in code comment (with some modifications).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of moving the check up, I moved the StartDeploymentTransition call down. I hope it counts :D
taskDeployment := attrs.GetDeployment() | ||
taskBehavior := attrs.GetVersioningBehavior() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskDeployment := attrs.GetDeployment() | |
taskBehavior := attrs.GetVersioningBehavior() | |
wftDeployment := attrs.GetDeployment() | |
wftBehavior := attrs.GetVersioningBehavior() |
same reason: I would avoid word "task" everywhere where possible.
} | ||
} | ||
|
||
if m.ms.GetDeploymentTransition() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is very unclear that assignment on the line 1094 actually clears this. I would remove ms.GetDeploymentTransition()
and operate versioningInfo
here and everywhere else.
|
||
// Change deployment and behavior based on the completed wft. | ||
if wfBehaviorBefore != wftBehavior || !wfDeploymentBefore.Equal(wftDeployment) { | ||
if versioningInfo == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't be nil
here. At least I don't see how.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can: the wf was unversioned, hence its versioningInfo was nil. A speculative task redirected to a new deployment without touching versioningInfo at start time, and now it is completing with a non-nil wftDeployment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 1089 will give NPE then.
wfDeploymentBefore := m.ms.GetEffectiveDeployment() | ||
wfBehaviorBefore := m.ms.GetEffectiveVersioningBehavior() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It really feels like we need a message/struct which combines Deployment
and Behavior
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the usages. Whenever Behavior is used, Deployment is also used, but not vice versa (in many places we only care about deployment). I think we can keep it as is.
if versioningInfo.DeploymentTransition != nil { | ||
// It's possible that the completed WFT is not yet from the current transition because when | ||
// the transition started, the current wft was already started. In this case, we allow the | ||
// started wft to run and when completed, we create another wft immediately. | ||
if versioningInfo.DeploymentTransition.GetDeployment().Equal(wftDeployment) { | ||
versioningInfo.DeploymentTransition = nil | ||
completedTransition = true | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Equal
supports nil
on both sides:
if versioningInfo.DeploymentTransition != nil { | |
// It's possible that the completed WFT is not yet from the current transition because when | |
// the transition started, the current wft was already started. In this case, we allow the | |
// started wft to run and when completed, we create another wft immediately. | |
if versioningInfo.DeploymentTransition.GetDeployment().Equal(wftDeployment) { | |
versioningInfo.DeploymentTransition = nil | |
completedTransition = true | |
} | |
} | |
// It's possible that the completed WFT is not yet from the current transition because when | |
// the transition started, the current wft was already started. In this case, we allow the | |
// started wft to run and when completed, we create another wft immediately. | |
if versioningInfo.GetDeploymentTransition().GetDeployment().Equal(wftDeployment) { | |
versioningInfo.DeploymentTransition = nil | |
completedTransition = true | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to set completedTransition to true if there was no transition and wftDeployment is nil.
|
||
wftDeployment := attrs.GetDeployment() | ||
wftBehavior := attrs.GetVersioningBehavior() | ||
versioningInfo := m.ms.GetExecutionInfo().GetVersioningInfo() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can add early exit here. If versioningInfo
is nil
, all the rest doesn't need to be checked at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I read a comment about speculative WFT :-). Now I believe that you just need to move that block with versionInfo
initialization here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make a suggested change? I'm not totally following.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment saying that versionInfo
can be nil
.
versioningInfo := m.ms.GetExecutionInfo().GetVersioningInfo() | ||
|
||
var completedTransition bool | ||
if versioningInfo.DeploymentTransition != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if versioningInfo.DeploymentTransition != nil { | |
if versioningInfo.GetDeploymentTransition() != nil { |
…g-unpinned # Conflicts: # api/historyservice/v1/request_response.pb.go # service/history/api/updateworkflowoptions/api.go # tests/deployment_test.go
## What changed? <!-- Describe what has changed in this PR --> Add Matching and History changes to properly route unpinned workflow tasks. ## Why? <!-- Tell your future self why have you made these changes --> ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Tests will come in separate PR. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
What changed?
Add Matching and History changes to properly route unpinned workflow tasks.
Why?
How did you test it?
Tests will come in separate PR.
Potential risks
Documentation
Is hotfix candidate?