From e18c0469dce3092126562d892bd019b6fe61c2d6 Mon Sep 17 00:00:00 2001 From: tserakhau Date: Wed, 4 Dec 2024 12:08:18 +0300 Subject: [PATCH] Kafka source offset policy Allow to specify offset policy to read topic either at start or at end of a data, this simplify initial setup and allow to crop bad data from the past. closes #127 --- Pull Request resolved: https://github.com/doublecloud/transfer/pull/128 Co-authored-by: tserakhau commit_hash:fa95aafd98a46c99c8a7127b770109fa32150d78 --- pkg/providers/kafka/model_source.go | 10 ++++++ pkg/providers/kafka/source.go | 5 +++ pkg/providers/kafka/source_test.go | 56 +++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+) diff --git a/pkg/providers/kafka/model_source.go b/pkg/providers/kafka/model_source.go index 553d6ea3..a4dacdad 100644 --- a/pkg/providers/kafka/model_source.go +++ b/pkg/providers/kafka/model_source.go @@ -23,8 +23,18 @@ type KafkaSource struct { ParserConfig map[string]interface{} IsHomo bool // enabled kafka mirror protocol which can work only with kafka target SynchronizeIsNeeded bool // true, if we need to send synchronize events on releasing partitions + + OffsetPolicy OffsetPolicy // specify from what topic part start message consumption } +type OffsetPolicy string + +const ( + NoOffsetPolicy = OffsetPolicy("") // Not specified + AtStartOffsetPolicy = OffsetPolicy("at_start") + AtEndOffsetPolicy = OffsetPolicy("at_end") +) + var _ model.Source = (*KafkaSource)(nil) func (s *KafkaSource) MDBClusterID() string { diff --git a/pkg/providers/kafka/source.go b/pkg/providers/kafka/source.go index a4c60b23..54774c47 100644 --- a/pkg/providers/kafka/source.go +++ b/pkg/providers/kafka/source.go @@ -525,6 +525,11 @@ func newSourceWithCallbacks(cfg *KafkaSource, logger log.Logger, registry metric }), kgo.ConsumeTopics(topics...), ) + if cfg.OffsetPolicy == AtStartOffsetPolicy { + opts = append(opts, kgo.ConsumeResetOffset(kgo.NewOffset().AtStart())) + } else if cfg.OffsetPolicy == AtEndOffsetPolicy { + opts = append(opts, kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd())) + } kfClient, err := kgo.NewClient(opts...) if err != nil { diff --git a/pkg/providers/kafka/source_test.go b/pkg/providers/kafka/source_test.go index 172fd425..39cec02b 100644 --- a/pkg/providers/kafka/source_test.go +++ b/pkg/providers/kafka/source_test.go @@ -160,3 +160,59 @@ func TestNonExistsTopic(t *testing.T) { _, err = NewSource("asd", kafkaSource, logger.Log, solomon.NewRegistry(solomon.NewRegistryOpts())) require.Error(t, err) } + +func TestOffsetPolicy(t *testing.T) { + parserConfigMap, err := parsers.ParserConfigStructToMap(&jsonparser.ParserConfigJSONCommon{ + Fields: []abstract.ColSchema{{ColumnName: "ts", DataType: "DateTime"}, {ColumnName: "msg", DataType: "string"}}, + AddRest: false, + AddDedupeKeys: true, + }) + require.NoError(t, err) + kafkaSource, err := SourceRecipe() + require.NoError(t, err) + kafkaSource.Topic = "topic2" + kafkaSource.ParserConfig = parserConfigMap + + kafkaClient, err := client.NewClient(kafkaSource.Connection.Brokers, nil, nil) + require.NoError(t, err) + require.NoError(t, kafkaClient.CreateTopicIfNotExist(logger.Log, kafkaSource.Topic, nil)) + + lgr, closer, err := logger.NewKafkaLogger(&logger.KafkaConfig{ + Broker: kafkaSource.Connection.Brokers[0], + Topic: kafkaSource.Topic, + User: kafkaSource.Auth.User, + Password: kafkaSource.Auth.Password, + }) + require.NoError(t, err) + + defer closer.Close() + for i := 0; i < 3; i++ { + lgr.Infof("log item: %v", i) + } + time.Sleep(time.Second) // just in case + + kafkaSource.OffsetPolicy = AtStartOffsetPolicy // Will read old item (1, 2 and 3) + src, err := NewSource("asd", kafkaSource, logger.Log, solomon.NewRegistry(solomon.NewRegistryOpts())) + require.NoError(t, err) + items, err := src.Fetch() + require.NoError(t, err) + src.Stop() + require.True(t, len(items) >= 3) // At least 3 old items + abstract.Dump(items) + + go func() { + time.Sleep(time.Second) + for i := 3; i < 5; i++ { + lgr.Infof("log item: %v", i) + } + }() + + kafkaSource.OffsetPolicy = AtEndOffsetPolicy // Will read only new items (3 and 4) + src, err = NewSource("asd", kafkaSource, logger.Log, solomon.NewRegistry(solomon.NewRegistryOpts())) + require.NoError(t, err) + items, err = src.Fetch() + require.NoError(t, err) + src.Stop() + abstract.Dump(items) + require.Len(t, items, 2) +}