Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(block-scheduler): pin fallback offset during start-up #15642

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,28 +91,38 @@ type BlockScheduler struct {
queue *JobQueue
metrics *Metrics

offsetManager partition.OffsetManager
planner Planner
fallbackOffsetMillis int64
offsetManager partition.OffsetManager
planner Planner
}

// NewScheduler creates a new scheduler instance
func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) (*BlockScheduler, error) {
// pin the fallback offset at the time of scheduler creation to ensure planner uses the same fallback offset on subsequent runs
// without this, planner would create jobs that are unaligned when the partition has no commits so far.
fallbackOffsetMillis := int64(partition.KafkaStartOffset)
if cfg.LookbackPeriod > 0 {
fallbackOffsetMillis = time.Now().UnixMilli() - cfg.LookbackPeriod.Milliseconds()
}

var planner Planner
switch cfg.Strategy {
case RecordCountStrategy:
planner = NewRecordCountPlanner(offsetManager, cfg.TargetRecordCount, cfg.LookbackPeriod, logger)
planner = NewRecordCountPlanner(offsetManager, cfg.TargetRecordCount, fallbackOffsetMillis, logger)
default:
return nil, fmt.Errorf("invalid strategy: %s", cfg.Strategy)
}

s := &BlockScheduler{
cfg: cfg,
planner: planner,
offsetManager: offsetManager,
logger: logger,
metrics: NewMetrics(r),
queue: queue,
cfg: cfg,
planner: planner,
offsetManager: offsetManager,
logger: logger,
metrics: NewMetrics(r),
queue: queue,
fallbackOffsetMillis: fallbackOffsetMillis,
}

s.Service = services.NewBasicService(nil, s.running, nil)
return s, nil
}
Expand All @@ -139,7 +149,7 @@ func (s *BlockScheduler) running(ctx context.Context) error {
}

func (s *BlockScheduler) runOnce(ctx context.Context) error {
lag, err := s.offsetManager.GroupLag(ctx, s.cfg.LookbackPeriod)
lag, err := s.offsetManager.GroupLag(ctx, s.fallbackOffsetMillis)
if err != nil {
level.Error(s.logger).Log("msg", "failed to get group lag", "err", err)
return err
Expand Down Expand Up @@ -302,5 +312,5 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, job *types.Job) error
}

func (s *BlockScheduler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newStatusPageHandler(s.queue, s.offsetManager, s.cfg.LookbackPeriod).ServeHTTP(w, req)
newStatusPageHandler(s.queue, s.offsetManager, s.fallbackOffsetMillis).ServeHTTP(w, req)
}
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type mockOffsetManager struct {

func (m *mockOffsetManager) Topic() string { return m.topic }
func (m *mockOffsetManager) ConsumerGroup() string { return m.consumerGroup }
func (m *mockOffsetManager) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) {
func (m *mockOffsetManager) GroupLag(_ context.Context, _ int64) (map[int32]kadm.GroupMemberLag, error) {
return nil, nil
}
func (m *mockOffsetManager) FetchLastCommittedOffset(_ context.Context, _ int32) (int64, error) {
Expand Down
14 changes: 7 additions & 7 deletions pkg/blockbuilder/scheduler/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type jobQueue interface {
}

type offsetReader interface {
GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error)
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error)
}

type partitionInfo struct {
Expand All @@ -35,17 +35,17 @@ type partitionInfo struct {
}

type statusPageHandler struct {
jobQueue jobQueue
offsetReader offsetReader
lookbackPeriod time.Duration
jobQueue jobQueue
offsetReader offsetReader
fallbackOffsetMillis int64
}

func newStatusPageHandler(jobQueue jobQueue, offsetReader offsetReader, lookbackPeriod time.Duration) *statusPageHandler {
return &statusPageHandler{jobQueue: jobQueue, offsetReader: offsetReader, lookbackPeriod: lookbackPeriod}
func newStatusPageHandler(jobQueue jobQueue, offsetReader offsetReader, fallbackOffsetMillis int64) *statusPageHandler {
return &statusPageHandler{jobQueue: jobQueue, offsetReader: offsetReader, fallbackOffsetMillis: fallbackOffsetMillis}
}

func (h *statusPageHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
offsets, err := h.offsetReader.GroupLag(context.Background(), h.lookbackPeriod)
offsets, err := h.offsetReader.GroupLag(context.Background(), h.fallbackOffsetMillis)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestStatusPageHandler_ServeHTTP(t *testing.T) {
},
}

handler := newStatusPageHandler(mockLister, mockReader, time.Hour)
handler := newStatusPageHandler(mockLister, mockReader, 0)
req := httptest.NewRequest(http.MethodGet, "/test", nil)
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
Expand Down
25 changes: 12 additions & 13 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheduler
import (
"context"
"sort"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -14,7 +13,7 @@ import (

// OffsetReader is an interface to list offsets for all partitions of a topic from Kafka.
type OffsetReader interface {
GroupLag(context.Context, time.Duration) (map[int32]kadm.GroupMemberLag, error)
GroupLag(context.Context, int64) (map[int32]kadm.GroupMemberLag, error)
}

type Planner interface {
Expand All @@ -32,18 +31,18 @@ var validStrategies = []string{

// tries to consume upto targetRecordCount records per partition
type RecordCountPlanner struct {
targetRecordCount int64
lookbackPeriod time.Duration
offsetReader OffsetReader
logger log.Logger
targetRecordCount int64
fallbackOffsetMillis int64
offsetReader OffsetReader
logger log.Logger
}

func NewRecordCountPlanner(offsetReader OffsetReader, targetRecordCount int64, lookbackPeriod time.Duration, logger log.Logger) *RecordCountPlanner {
func NewRecordCountPlanner(offsetReader OffsetReader, targetRecordCount int64, fallbackOffsetMillis int64, logger log.Logger) *RecordCountPlanner {
return &RecordCountPlanner{
targetRecordCount: targetRecordCount,
lookbackPeriod: lookbackPeriod,
offsetReader: offsetReader,
logger: logger,
targetRecordCount: targetRecordCount,
fallbackOffsetMillis: fallbackOffsetMillis,
offsetReader: offsetReader,
logger: logger,
}
}

Expand All @@ -52,8 +51,8 @@ func (p *RecordCountPlanner) Name() string {
}

func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error) {
level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount, "lookback_period", p.lookbackPeriod.String())
offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod)
level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount)
offsets, err := p.offsetReader.GroupLag(ctx, p.fallbackOffsetMillis)
if err != nil {
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/blockbuilder/scheduler/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type mockOffsetReader struct {
groupLag map[int32]kadm.GroupMemberLag
}

func (m *mockOffsetReader) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) {
func (m *mockOffsetReader) GroupLag(_ context.Context, _ int64) (map[int32]kadm.GroupMemberLag, error) {
return m.groupLag, nil
}

Expand Down Expand Up @@ -182,7 +182,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
TargetRecordCount: tc.recordCount,
}
require.NoError(t, cfg.Validate())
planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger())
planner := NewRecordCountPlanner(mockReader, tc.recordCount, 0, log.NewNopLogger())
jobs, err := planner.Plan(context.Background(), 0, tc.minOffsetsPerJob)
require.NoError(t, err)

Expand Down
19 changes: 4 additions & 15 deletions pkg/kafka/partition/offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -22,9 +21,8 @@ type OffsetManager interface {
Topic() string
ConsumerGroup() string

// GroupLag returns the lag for the consumer group; if lookbackPeriod is greater than 0, then the lag is calculated
// based on the current time minus the lookback period; otherwise, the lag is calculated based on Kafka's start offset
GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error)
// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error)
FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error)
FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error)
Commit(ctx context.Context, partition int32, offset int64) error
Expand Down Expand Up @@ -182,17 +180,8 @@ func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partition
return partition.Offset, nil
}

// GroupLag returns the lag for the consumer group; if lookbackPeriod is greater than 0, then the lag is calculated
// based on the current time minus the lookback period; otherwise, the lag is calculated based on Kafka's start offset
func (r *KafkaOffsetManager) GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error) {
lookbackMills := int64(lookbackPeriod / time.Millisecond)
var fallbackOffsetMillis int64
if lookbackMills > 0 {
fallbackOffsetMillis = time.Now().UnixMilli() - lookbackMills
} else {
fallbackOffsetMillis = int64(KafkaStartOffset)
}

// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error) {
lag, err := GetGroupLag(ctx, r.adminClient, r.cfg.Topic, r.ConsumerGroup(), fallbackOffsetMillis)
if err != nil {
return nil, err
Expand Down
Loading