Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch task queue user data persistence updates #7039

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

dnr
Copy link
Member

@dnr dnr commented Dec 30, 2024

What changed?

Multiple user data updates coming in for task queues in the same namespace within a short period of time get batched into a smaller number of persistence operations (transactions).

Since multiple updates are batched into a transaction, conflicts in one can cause unrelated ones to fail. This is detected and a non-retryable error is returned for the conflicting one, while a retryable error is returned for the other ones.

Why?

With deployments, we sometimes have to update user data on multiple task queues at once (all in the same namespace), and on cassandra, these updates go through an LWT. This could cause a backup since the throughput of LWTs is fairly low.

This change allows batching of multiple updates in one persistence operation (LWT on cassandra or transaction on sql). The batching is transparent: updates that come in within a short period of time automatically get batched (in matching engine).

How did you test it?

  • unit test for batcher component
  • added some persistence tests for user data, including conflict behavior (there were none before)
  • existing tests for user data updates

Potential risks

  • small extra latency on all user data updates

@dnr dnr requested a review from a team as a code owner December 30, 2024 20:52
Copy link
Collaborator

@ShahabT ShahabT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally lgtm, but someone else should review line-by-line.

if err != nil {
return err
if m.Db.IsDupEntryError(err) {
return &persistence.ConditionFailedError{Msg: err.Error()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this type of error handling you will end up with "partially updated" state, but you don't really know which one are passed.
(unless I miss something).
Is there a reason to stop on error? or may it make sense to move forward, and return an array of failed updates?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a transaction so you can't end up with things partially updated, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was not clear that all of this happening inside transaction. Probably worth a comment

common/persistence/task_manager.go Show resolved Hide resolved
common/stream_batcher/batcher.go Show resolved Hide resolved
common/stream_batcher/batcher.go Outdated Show resolved Hide resolved
common/stream_batcher/batcher.go Outdated Show resolved Hide resolved
service/matching/matching_engine_test.go Show resolved Hide resolved
// try to add more items. stop after a gap of MaxGap, total time of MaxTotalWait, or
// MaxItems items.
maxWaitC, maxWaitT := s.clock.NewTimer(s.opts.MaxDelay)
loop:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand all the possible implications, so I guess I will trust this work and covered by tests.

Copy link
Contributor

@stephanos stephanos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First half of my review. I'll review the stream_batcher next.

common/persistence/data_interfaces.go Show resolved Hide resolved
service/matching/matching_engine.go Outdated Show resolved Hide resolved
service/matching/matching_engine.go Outdated Show resolved Hide resolved
service/matching/matching_engine.go Outdated Show resolved Hide resolved
common/stream_batcher/batcher.go Outdated Show resolved Hide resolved
common/stream_batcher/batcher.go Outdated Show resolved Hide resolved
common/stream_batcher/batcher.go Outdated Show resolved Hide resolved
common/stream_batcher/batcher.go Outdated Show resolved Hide resolved
common/stream_batcher/batcher.go Show resolved Hide resolved
common/stream_batcher/batcher.go Outdated Show resolved Hide resolved
common/stream_batcher/batcher.go Outdated Show resolved Hide resolved
common/stream_batcher/batcher.go Outdated Show resolved Hide resolved
common/stream_batcher/batcher.go Show resolved Hide resolved
common/stream_batcher/batcher_test.go Show resolved Hide resolved
Comment on lines +559 to +579
// No error, but not applied. That means we had a conflict.
// Iterate through results to identify first conflicting row.
for {
name, nameErr := getTypedFieldFromRow[string]("task_queue_name", previous)
previousVersion, verErr := getTypedFieldFromRow[int64]("version", previous)
update, hasUpdate := request.Updates[name]
if nameErr == nil && verErr == nil && hasUpdate && update.Version != previousVersion {
if update.Conflicting != nil {
*update.Conflicting = true
}
return &p.ConditionFailedError{
Msg: fmt.Sprintf("Failed to update task queues: task queue %q version %d != %d",
name, update.Version, previousVersion),
}
}
clear(previous)
if !iter.MapScan(previous) {
break
}
}
return &p.ConditionFailedError{Msg: "Failed to update task queues: unknown conflict"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that I missed it (it's a big PR), but this is untested, isn't it? If so, that makes me a little nervous.

}
if len(request.BuildIdsAdded) > 0 {
err = tx.AddToBuildIdToTaskQueueMapping(ctx, sqlplugin.AddToBuildIdToTaskQueueMapping{
for taskQueue, update := range request.Updates {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be up to 100 separate Update writes, right? I assume self-hosted users won't usually see a lot of parallel updates?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants