From b2f46de418bcd6a96a77e6a04b22e8f650b086df Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 5 Jan 2025 20:52:52 -0800 Subject: [PATCH] fix(block-scheduler): one job per partition (local branch copy) (#15579) --- docs/sources/shared/configuration.md | 4 -- pkg/blockbuilder/scheduler/scheduler.go | 88 ++++++++++++++++--------- 2 files changed, 56 insertions(+), 36 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 763a7ad905b42..c8bc0e9aaeac5 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -232,10 +232,6 @@ block_scheduler: # CLI flag: -block-scheduler.target-record-count [target_record_count: | default = 1000] - # Maximum number of jobs that the planner can return. - # CLI flag: -block-scheduler.max-jobs-planned-per-interval - [max_jobs_planned_per_interval: | default = 100] - job_queue: # Interval to check for expired job leases # CLI flag: -jobqueue.lease-expiry-check-interval diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 1ea28970f923f..74b507dde74cb 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "net/http" + "sort" "strconv" "strings" "time" @@ -25,13 +26,12 @@ var ( ) type Config struct { - ConsumerGroup string `yaml:"consumer_group"` - Interval time.Duration `yaml:"interval"` - LookbackPeriod time.Duration `yaml:"lookback_period"` - Strategy string `yaml:"strategy"` - TargetRecordCount int64 `yaml:"target_record_count"` - MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"` - JobQueueConfig JobQueueConfig `yaml:"job_queue"` + ConsumerGroup string `yaml:"consumer_group"` + Interval time.Duration `yaml:"interval"` + LookbackPeriod time.Duration `yaml:"lookback_period"` + Strategy string `yaml:"strategy"` + TargetRecordCount int64 `yaml:"target_record_count"` + JobQueueConfig JobQueueConfig `yaml:"job_queue"` } func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -56,12 +56,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { RecordCountStrategy, ), ) - f.IntVar( - &cfg.MaxJobsPlannedPerInterval, - prefix+"max-jobs-planned-per-interval", - 100, - "Maximum number of jobs that the planner can return.", - ) cfg.JobQueueConfig.RegisterFlags(f) } @@ -155,7 +149,7 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { s.publishLagMetrics(lag) - jobs, err := s.planner.Plan(ctx, s.cfg.MaxJobsPlannedPerInterval) + jobs, err := s.planner.Plan(ctx, 1) // TODO(owen-d): parallelize work within a partition if err != nil { level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err) } @@ -163,25 +157,10 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { for _, job := range jobs { // TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID - logger := log.With( - s.logger, - "job", job.Job.ID(), - "priority", job.Priority, - ) - - status, ok := s.queue.Exists(job.Job) - - // scheduler is unaware of incoming job; enqueue - if !ok { - level.Debug(logger).Log( - "msg", "job does not exist, enqueueing", - ) - - // enqueue - if err := s.queue.Enqueue(job.Job, job.Priority); err != nil { - level.Error(logger).Log("msg", "failed to enqueue job", "err", err) - } + added, status, err := s.idempotentEnqueue(job) + // if we've either added or encountered an error, move on; we're done this cycle + if added || err != nil { continue } @@ -232,6 +211,34 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context) (*types.Job, bool, er } } +// if added is true, the job was added to the queue, otherwise status is the current status of the job +func (s *BlockScheduler) idempotentEnqueue(job *JobWithMetadata) (added bool, status types.JobStatus, err error) { + logger := log.With( + s.logger, + "job", job.Job.ID(), + "priority", job.Priority, + ) + + status, ok := s.queue.Exists(job.Job) + + // scheduler is unaware of incoming job; enqueue + if !ok { + level.Debug(logger).Log( + "msg", "job does not exist, enqueueing", + ) + + // enqueue + if err := s.queue.Enqueue(job.Job, job.Priority); err != nil { + level.Error(logger).Log("msg", "failed to enqueue job", "err", err) + return false, types.JobStatusUnknown, err + } + + return true, types.JobStatusPending, nil + } + + return false, status, nil +} + func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job, success bool) (err error) { logger := log.With(s.logger, "job", job.ID()) @@ -243,6 +250,23 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job, ); err == nil { s.queue.MarkComplete(job.ID(), types.JobStatusComplete) level.Info(logger).Log("msg", "job completed successfully") + + // TODO(owen-d): cleaner way to enqueue next job for this partition, + // don't make it part of the response cycle to job completion, etc. + jobs, err := s.planner.Plan(ctx, 1) + if err != nil { + level.Error(logger).Log("msg", "failed to plan subsequent jobs", "err", err) + } + + // find first job for this partition + nextJob := sort.Search(len(jobs), func(i int) bool { + return jobs[i].Job.Partition() >= job.Partition() + }) + + if nextJob < len(jobs) && jobs[nextJob].Job.Partition() == job.Partition() { + _, _, _ = s.idempotentEnqueue(jobs[nextJob]) + } + return nil }