diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 55ecc066dc582..ea1f17ee56dc5 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -91,28 +91,38 @@ type BlockScheduler struct { queue *JobQueue metrics *Metrics - offsetManager partition.OffsetManager - planner Planner + fallbackOffsetMillis int64 + offsetManager partition.OffsetManager + planner Planner } // NewScheduler creates a new scheduler instance func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) (*BlockScheduler, error) { + // pin the fallback offset at the time of scheduler creation to ensure planner uses the same fallback offset on subsequent runs + // without this, planner would create jobs that are unaligned when the partition has no commits so far. + fallbackOffsetMillis := int64(partition.KafkaStartOffset) + if cfg.LookbackPeriod > 0 { + fallbackOffsetMillis = time.Now().UnixMilli() - cfg.LookbackPeriod.Milliseconds() + } + var planner Planner switch cfg.Strategy { case RecordCountStrategy: - planner = NewRecordCountPlanner(offsetManager, cfg.TargetRecordCount, cfg.LookbackPeriod, logger) + planner = NewRecordCountPlanner(offsetManager, cfg.TargetRecordCount, fallbackOffsetMillis, logger) default: return nil, fmt.Errorf("invalid strategy: %s", cfg.Strategy) } s := &BlockScheduler{ - cfg: cfg, - planner: planner, - offsetManager: offsetManager, - logger: logger, - metrics: NewMetrics(r), - queue: queue, + cfg: cfg, + planner: planner, + offsetManager: offsetManager, + logger: logger, + metrics: NewMetrics(r), + queue: queue, + fallbackOffsetMillis: fallbackOffsetMillis, } + s.Service = services.NewBasicService(nil, s.running, nil) return s, nil } @@ -139,7 +149,7 @@ func (s *BlockScheduler) running(ctx context.Context) error { } func (s *BlockScheduler) runOnce(ctx context.Context) error { - lag, err := s.offsetManager.GroupLag(ctx, s.cfg.LookbackPeriod) + lag, err := s.offsetManager.GroupLag(ctx, s.fallbackOffsetMillis) if err != nil { level.Error(s.logger).Log("msg", "failed to get group lag", "err", err) return err @@ -302,5 +312,5 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, job *types.Job) error } func (s *BlockScheduler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - newStatusPageHandler(s.queue, s.offsetManager, s.cfg.LookbackPeriod).ServeHTTP(w, req) + newStatusPageHandler(s.queue, s.offsetManager, s.fallbackOffsetMillis).ServeHTTP(w, req) } diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index 55234fbd4d9f0..24c8b129638e4 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -27,7 +27,7 @@ type mockOffsetManager struct { func (m *mockOffsetManager) Topic() string { return m.topic } func (m *mockOffsetManager) ConsumerGroup() string { return m.consumerGroup } -func (m *mockOffsetManager) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) { +func (m *mockOffsetManager) GroupLag(_ context.Context, _ int64) (map[int32]kadm.GroupMemberLag, error) { return nil, nil } func (m *mockOffsetManager) FetchLastCommittedOffset(_ context.Context, _ int32) (int64, error) { diff --git a/pkg/blockbuilder/scheduler/status.go b/pkg/blockbuilder/scheduler/status.go index ef08598fef8f5..c6cdaa847e1b3 100644 --- a/pkg/blockbuilder/scheduler/status.go +++ b/pkg/blockbuilder/scheduler/status.go @@ -24,7 +24,7 @@ type jobQueue interface { } type offsetReader interface { - GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error) + GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error) } type partitionInfo struct { @@ -35,17 +35,17 @@ type partitionInfo struct { } type statusPageHandler struct { - jobQueue jobQueue - offsetReader offsetReader - lookbackPeriod time.Duration + jobQueue jobQueue + offsetReader offsetReader + fallbackOffsetMillis int64 } -func newStatusPageHandler(jobQueue jobQueue, offsetReader offsetReader, lookbackPeriod time.Duration) *statusPageHandler { - return &statusPageHandler{jobQueue: jobQueue, offsetReader: offsetReader, lookbackPeriod: lookbackPeriod} +func newStatusPageHandler(jobQueue jobQueue, offsetReader offsetReader, fallbackOffsetMillis int64) *statusPageHandler { + return &statusPageHandler{jobQueue: jobQueue, offsetReader: offsetReader, fallbackOffsetMillis: fallbackOffsetMillis} } func (h *statusPageHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { - offsets, err := h.offsetReader.GroupLag(context.Background(), h.lookbackPeriod) + offsets, err := h.offsetReader.GroupLag(context.Background(), h.fallbackOffsetMillis) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/pkg/blockbuilder/scheduler/status_test.go b/pkg/blockbuilder/scheduler/status_test.go index ac471b344b2cc..2777386f06597 100644 --- a/pkg/blockbuilder/scheduler/status_test.go +++ b/pkg/blockbuilder/scheduler/status_test.go @@ -69,7 +69,7 @@ func TestStatusPageHandler_ServeHTTP(t *testing.T) { }, } - handler := newStatusPageHandler(mockLister, mockReader, time.Hour) + handler := newStatusPageHandler(mockLister, mockReader, 0) req := httptest.NewRequest(http.MethodGet, "/test", nil) w := httptest.NewRecorder() handler.ServeHTTP(w, req) diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index 6e2f0acb38ced..445b58276cd5a 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -3,7 +3,6 @@ package scheduler import ( "context" "sort" - "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -14,7 +13,7 @@ import ( // OffsetReader is an interface to list offsets for all partitions of a topic from Kafka. type OffsetReader interface { - GroupLag(context.Context, time.Duration) (map[int32]kadm.GroupMemberLag, error) + GroupLag(context.Context, int64) (map[int32]kadm.GroupMemberLag, error) } type Planner interface { @@ -32,18 +31,18 @@ var validStrategies = []string{ // tries to consume upto targetRecordCount records per partition type RecordCountPlanner struct { - targetRecordCount int64 - lookbackPeriod time.Duration - offsetReader OffsetReader - logger log.Logger + targetRecordCount int64 + fallbackOffsetMillis int64 + offsetReader OffsetReader + logger log.Logger } -func NewRecordCountPlanner(offsetReader OffsetReader, targetRecordCount int64, lookbackPeriod time.Duration, logger log.Logger) *RecordCountPlanner { +func NewRecordCountPlanner(offsetReader OffsetReader, targetRecordCount int64, fallbackOffsetMillis int64, logger log.Logger) *RecordCountPlanner { return &RecordCountPlanner{ - targetRecordCount: targetRecordCount, - lookbackPeriod: lookbackPeriod, - offsetReader: offsetReader, - logger: logger, + targetRecordCount: targetRecordCount, + fallbackOffsetMillis: fallbackOffsetMillis, + offsetReader: offsetReader, + logger: logger, } } @@ -52,8 +51,8 @@ func (p *RecordCountPlanner) Name() string { } func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error) { - level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount, "lookback_period", p.lookbackPeriod.String()) - offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod) + level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount) + offsets, err := p.offsetReader.GroupLag(ctx, p.fallbackOffsetMillis) if err != nil { level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) return nil, err diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go index b94ebd5624727..ac0f4226ac33a 100644 --- a/pkg/blockbuilder/scheduler/strategy_test.go +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -16,7 +16,7 @@ type mockOffsetReader struct { groupLag map[int32]kadm.GroupMemberLag } -func (m *mockOffsetReader) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) { +func (m *mockOffsetReader) GroupLag(_ context.Context, _ int64) (map[int32]kadm.GroupMemberLag, error) { return m.groupLag, nil } @@ -182,7 +182,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) { TargetRecordCount: tc.recordCount, } require.NoError(t, cfg.Validate()) - planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger()) + planner := NewRecordCountPlanner(mockReader, tc.recordCount, 0, log.NewNopLogger()) jobs, err := planner.Plan(context.Background(), 0, tc.minOffsetsPerJob) require.NoError(t, err) diff --git a/pkg/kafka/partition/offset_manager.go b/pkg/kafka/partition/offset_manager.go index efec3a47a4168..14fb77d244d58 100644 --- a/pkg/kafka/partition/offset_manager.go +++ b/pkg/kafka/partition/offset_manager.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -22,9 +21,8 @@ type OffsetManager interface { Topic() string ConsumerGroup() string - // GroupLag returns the lag for the consumer group; if lookbackPeriod is greater than 0, then the lag is calculated - // based on the current time minus the lookback period; otherwise, the lag is calculated based on Kafka's start offset - GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error) + // GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits. + GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error) FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error) FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error) Commit(ctx context.Context, partition int32, offset int64) error @@ -182,17 +180,8 @@ func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partition return partition.Offset, nil } -// GroupLag returns the lag for the consumer group; if lookbackPeriod is greater than 0, then the lag is calculated -// based on the current time minus the lookback period; otherwise, the lag is calculated based on Kafka's start offset -func (r *KafkaOffsetManager) GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error) { - lookbackMills := int64(lookbackPeriod / time.Millisecond) - var fallbackOffsetMillis int64 - if lookbackMills > 0 { - fallbackOffsetMillis = time.Now().UnixMilli() - lookbackMills - } else { - fallbackOffsetMillis = int64(KafkaStartOffset) - } - +// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits. +func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error) { lag, err := GetGroupLag(ctx, r.adminClient, r.cfg.Topic, r.ConsumerGroup(), fallbackOffsetMillis) if err != nil { return nil, err