Skip to content

Commit

Permalink
chore(block-scheduler): pin fallback offset during start-up (#15642)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli authored Jan 9, 2025
1 parent c581218 commit 732cc8b
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 50 deletions.
32 changes: 21 additions & 11 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,28 +91,38 @@ type BlockScheduler struct {
queue *JobQueue
metrics *Metrics

offsetManager partition.OffsetManager
planner Planner
fallbackOffsetMillis int64
offsetManager partition.OffsetManager
planner Planner
}

// NewScheduler creates a new scheduler instance
func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) (*BlockScheduler, error) {
// pin the fallback offset at the time of scheduler creation to ensure planner uses the same fallback offset on subsequent runs
// without this, planner would create jobs that are unaligned when the partition has no commits so far.
fallbackOffsetMillis := int64(partition.KafkaStartOffset)
if cfg.LookbackPeriod > 0 {
fallbackOffsetMillis = time.Now().UnixMilli() - cfg.LookbackPeriod.Milliseconds()
}

var planner Planner
switch cfg.Strategy {
case RecordCountStrategy:
planner = NewRecordCountPlanner(offsetManager, cfg.TargetRecordCount, cfg.LookbackPeriod, logger)
planner = NewRecordCountPlanner(offsetManager, cfg.TargetRecordCount, fallbackOffsetMillis, logger)
default:
return nil, fmt.Errorf("invalid strategy: %s", cfg.Strategy)
}

s := &BlockScheduler{
cfg: cfg,
planner: planner,
offsetManager: offsetManager,
logger: logger,
metrics: NewMetrics(r),
queue: queue,
cfg: cfg,
planner: planner,
offsetManager: offsetManager,
logger: logger,
metrics: NewMetrics(r),
queue: queue,
fallbackOffsetMillis: fallbackOffsetMillis,
}

s.Service = services.NewBasicService(nil, s.running, nil)
return s, nil
}
Expand All @@ -139,7 +149,7 @@ func (s *BlockScheduler) running(ctx context.Context) error {
}

func (s *BlockScheduler) runOnce(ctx context.Context) error {
lag, err := s.offsetManager.GroupLag(ctx, s.cfg.LookbackPeriod)
lag, err := s.offsetManager.GroupLag(ctx, s.fallbackOffsetMillis)
if err != nil {
level.Error(s.logger).Log("msg", "failed to get group lag", "err", err)
return err
Expand Down Expand Up @@ -302,5 +312,5 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, job *types.Job) error
}

func (s *BlockScheduler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newStatusPageHandler(s.queue, s.offsetManager, s.cfg.LookbackPeriod).ServeHTTP(w, req)
newStatusPageHandler(s.queue, s.offsetManager, s.fallbackOffsetMillis).ServeHTTP(w, req)
}
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type mockOffsetManager struct {

func (m *mockOffsetManager) Topic() string { return m.topic }
func (m *mockOffsetManager) ConsumerGroup() string { return m.consumerGroup }
func (m *mockOffsetManager) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) {
func (m *mockOffsetManager) GroupLag(_ context.Context, _ int64) (map[int32]kadm.GroupMemberLag, error) {
return nil, nil
}
func (m *mockOffsetManager) FetchLastCommittedOffset(_ context.Context, _ int32) (int64, error) {
Expand Down
14 changes: 7 additions & 7 deletions pkg/blockbuilder/scheduler/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type jobQueue interface {
}

type offsetReader interface {
GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error)
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error)
}

type partitionInfo struct {
Expand All @@ -35,17 +35,17 @@ type partitionInfo struct {
}

type statusPageHandler struct {
jobQueue jobQueue
offsetReader offsetReader
lookbackPeriod time.Duration
jobQueue jobQueue
offsetReader offsetReader
fallbackOffsetMillis int64
}

func newStatusPageHandler(jobQueue jobQueue, offsetReader offsetReader, lookbackPeriod time.Duration) *statusPageHandler {
return &statusPageHandler{jobQueue: jobQueue, offsetReader: offsetReader, lookbackPeriod: lookbackPeriod}
func newStatusPageHandler(jobQueue jobQueue, offsetReader offsetReader, fallbackOffsetMillis int64) *statusPageHandler {
return &statusPageHandler{jobQueue: jobQueue, offsetReader: offsetReader, fallbackOffsetMillis: fallbackOffsetMillis}
}

func (h *statusPageHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
offsets, err := h.offsetReader.GroupLag(context.Background(), h.lookbackPeriod)
offsets, err := h.offsetReader.GroupLag(context.Background(), h.fallbackOffsetMillis)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestStatusPageHandler_ServeHTTP(t *testing.T) {
},
}

handler := newStatusPageHandler(mockLister, mockReader, time.Hour)
handler := newStatusPageHandler(mockLister, mockReader, 0)
req := httptest.NewRequest(http.MethodGet, "/test", nil)
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
Expand Down
25 changes: 12 additions & 13 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheduler
import (
"context"
"sort"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -14,7 +13,7 @@ import (

// OffsetReader is an interface to list offsets for all partitions of a topic from Kafka.
type OffsetReader interface {
GroupLag(context.Context, time.Duration) (map[int32]kadm.GroupMemberLag, error)
GroupLag(context.Context, int64) (map[int32]kadm.GroupMemberLag, error)
}

type Planner interface {
Expand All @@ -32,18 +31,18 @@ var validStrategies = []string{

// tries to consume upto targetRecordCount records per partition
type RecordCountPlanner struct {
targetRecordCount int64
lookbackPeriod time.Duration
offsetReader OffsetReader
logger log.Logger
targetRecordCount int64
fallbackOffsetMillis int64
offsetReader OffsetReader
logger log.Logger
}

func NewRecordCountPlanner(offsetReader OffsetReader, targetRecordCount int64, lookbackPeriod time.Duration, logger log.Logger) *RecordCountPlanner {
func NewRecordCountPlanner(offsetReader OffsetReader, targetRecordCount int64, fallbackOffsetMillis int64, logger log.Logger) *RecordCountPlanner {
return &RecordCountPlanner{
targetRecordCount: targetRecordCount,
lookbackPeriod: lookbackPeriod,
offsetReader: offsetReader,
logger: logger,
targetRecordCount: targetRecordCount,
fallbackOffsetMillis: fallbackOffsetMillis,
offsetReader: offsetReader,
logger: logger,
}
}

Expand All @@ -52,8 +51,8 @@ func (p *RecordCountPlanner) Name() string {
}

func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error) {
level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount, "lookback_period", p.lookbackPeriod.String())
offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod)
level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount)
offsets, err := p.offsetReader.GroupLag(ctx, p.fallbackOffsetMillis)
if err != nil {
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/blockbuilder/scheduler/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type mockOffsetReader struct {
groupLag map[int32]kadm.GroupMemberLag
}

func (m *mockOffsetReader) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) {
func (m *mockOffsetReader) GroupLag(_ context.Context, _ int64) (map[int32]kadm.GroupMemberLag, error) {
return m.groupLag, nil
}

Expand Down Expand Up @@ -182,7 +182,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
TargetRecordCount: tc.recordCount,
}
require.NoError(t, cfg.Validate())
planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger())
planner := NewRecordCountPlanner(mockReader, tc.recordCount, 0, log.NewNopLogger())
jobs, err := planner.Plan(context.Background(), 0, tc.minOffsetsPerJob)
require.NoError(t, err)

Expand Down
19 changes: 4 additions & 15 deletions pkg/kafka/partition/offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -22,9 +21,8 @@ type OffsetManager interface {
Topic() string
ConsumerGroup() string

// GroupLag returns the lag for the consumer group; if lookbackPeriod is greater than 0, then the lag is calculated
// based on the current time minus the lookback period; otherwise, the lag is calculated based on Kafka's start offset
GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error)
// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error)
FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error)
FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error)
Commit(ctx context.Context, partition int32, offset int64) error
Expand Down Expand Up @@ -182,17 +180,8 @@ func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partition
return partition.Offset, nil
}

// GroupLag returns the lag for the consumer group; if lookbackPeriod is greater than 0, then the lag is calculated
// based on the current time minus the lookback period; otherwise, the lag is calculated based on Kafka's start offset
func (r *KafkaOffsetManager) GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error) {
lookbackMills := int64(lookbackPeriod / time.Millisecond)
var fallbackOffsetMillis int64
if lookbackMills > 0 {
fallbackOffsetMillis = time.Now().UnixMilli() - lookbackMills
} else {
fallbackOffsetMillis = int64(KafkaStartOffset)
}

// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error) {
lag, err := GetGroupLag(ctx, r.adminClient, r.cfg.Topic, r.ConsumerGroup(), fallbackOffsetMillis)
if err != nil {
return nil, err
Expand Down

0 comments on commit 732cc8b

Please sign in to comment.