Skip to content

Commit

Permalink
chore(blockbuilder): mark a job as complete only after stopping sync …
Browse files Browse the repository at this point in the history
…updates (#15670)
  • Loading branch information
ashwanthgoli authored Jan 9, 2025
1 parent bef2043 commit b5c627b
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 11 deletions.
4 changes: 0 additions & 4 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,6 @@ block_builder:
[scheduler_grpc_client_config: <grpc_client>]

block_scheduler:
# Consumer group used by block scheduler to track the last consumed offset.
# CLI flag: -block-scheduler.consumer-group
[consumer_group: <string> | default = "block-scheduler"]

# How often the scheduler should plan jobs.
# CLI flag: -block-scheduler.interval
[interval: <duration> | default = 15m]
Expand Down
14 changes: 9 additions & 5 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin
completion.Success = false
}

// remove from inflight jobs to stop sending sync requests
i.jobsMtx.Lock()
delete(i.inflightJobs, job.ID())
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

if _, err := withBackoff(
ctx,
i.cfg.Backoff,
Expand All @@ -292,16 +298,12 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin
return true, err
}

i.jobsMtx.Lock()
delete(i.inflightJobs, job.ID())
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

return true, err
}

func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
level.Debug(logger).Log("msg", "beginning job")
start := time.Now()

indexer := newTsdbCreator()
appender := newAppender(i.id,
Expand Down Expand Up @@ -505,6 +507,8 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
level.Info(logger).Log(
"msg", "successfully processed job",
"last_offset", lastOffset,
"duration", time.Since(start),
"records", lastOffset-job.Offsets().Min,
)

return lastOffset, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) {
case types.JobStatusInProgress:
case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired:
// Job already completed, re-enqueue a new one
level.Warn(q.logger).Log("msg", "job already completed, re-enqueuing", "job", jobID, "status", jobMeta.Status)
registerInProgress()
return
default:
Expand Down
2 changes: 0 additions & 2 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var (
)

type Config struct {
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
LookbackPeriod time.Duration `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
Expand All @@ -36,7 +35,6 @@ type Config struct {

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.Interval, prefix+"interval", 15*time.Minute, "How often the scheduler should plan jobs.")
f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.")
f.DurationVar(&cfg.LookbackPeriod, prefix+"lookback-period", 0, "Lookback period used by the scheduler to plan jobs when the consumer group has no commits. 0 consumes from the start of the partition.")
f.StringVar(
&cfg.Strategy,
Expand Down

0 comments on commit b5c627b

Please sign in to comment.