-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bloom blocks downloading queue #11201
bloom blocks downloading queue #11201
Conversation
Signed-off-by: Vladyslav Diachenko <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
took a brief look, letting @chaudum do the lions share of review on this one
# Conflicts: # pkg/validation/limits.go
Signed-off-by: Vladyslav Diachenko <[email protected]>
Trivy scan found the following vulnerabilities:
|
Signed-off-by: Vladyslav Diachenko <[email protected]>
Signed-off-by: Vladyslav Diachenko <[email protected]>
…ng additional goroutine Signed-off-by: Vladyslav Diachenko <[email protected]>
// we need to have errCh with size that can keep max count of errors to prevent the case when | ||
// the queue worker reported the error to this channel before the current goroutine | ||
// and this goroutine will go to the deadlock because it won't be able to report an error | ||
// because nothing reads this channel at this moment. | ||
errCh := make(chan error, len(references)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand that correctly. On line 168 you return after sending the first and only error to the errCh
. Could you just make buffered channel of size 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this channel is also passed to the queue and queue workers will report errors to this channel also...
Signed-off-by: Vladyslav Diachenko <[email protected]>
return Block{}, fmt.Errorf("error while period lookup: %w", err) | ||
} | ||
objectClient := b.periodicObjectClients[period] | ||
readCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you'll see this with the next rebase, but now result of this get call is a compressed tar archive that needs to be downloaded and decompressed, which needs to be handled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's already handled in
directory, err := d.extractBlock(&block, time.Now()) |
}() | ||
return blocksChannel, errChannel | ||
// GetBlock downloads the blocks from objectStorage and returns the downloaded block | ||
func (b *BloomClient) GetBlock(ctx context.Context, reference BlockRef) (Block, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asking to understand, is there no use case fetching multiple blocks at once? i can see underlying call is the same to object store, i'm asking from api design perspective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will download many blocks in parallel using the downloading queue. I see that in the compactor it might be necessary to download many blocks in parallel also, but I believe we can spin up the goroutines in compactor's code to download many blocks in parallel. wdyt?
# Conflicts: # pkg/validation/limits.go
Signed-off-by: Vladyslav Diachenko <[email protected]>
Signed-off-by: Vladyslav Diachenko <[email protected]>
Signed-off-by: Vladyslav Diachenko <[email protected]>
Signed-off-by: Vladyslav Diachenko <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments, but nothing that would block merging.
//TODO implement me | ||
panic("implement me") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//TODO implement me | |
panic("implement me") | |
return 0 |
//TODO implement me | ||
panic("implement me") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//TODO implement me | |
panic("implement me") | |
return true |
//TODO implement me | ||
panic("implement me") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//TODO implement me | |
panic("implement me") | |
return 1 |
} | ||
|
||
func (d *blockDownloader) stop() { | ||
_ = services.StopManagerAndAwaitStopped(d.ctx, d.manager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should log the error?
} | ||
|
||
func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | ||
f.IntVar(&cfg.WorkersCount, prefix+"workers-count", 100, "The count of parallel workers that download Bloom Blocks.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be more conservative regarding the default value. 10
should be more than sufficient?
|
||
func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | ||
f.IntVar(&cfg.WorkersCount, prefix+"workers-count", 100, "The count of parallel workers that download Bloom Blocks.") | ||
f.IntVar(&cfg.MaxTasksEnqueuedPerTenant, prefix+"max_tasks_enqueued_per_tenant", 10_000, "Maximum number of task in queue per tenant per bloom-gateway. Enqueuing the tasks above this limit will fail an error.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
implemented bloom blocks downloading queue to control the concurrency of downloading the blocks from the storage Signed-off-by: Vladyslav Diachenko <[email protected]>
implemented bloom blocks downloading queue to control the concurrency of downloading the blocks from the storage