diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 91b560e106c87..82b5294daa191 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -210,10 +210,6 @@ block_builder: [scheduler_grpc_client_config: ] block_scheduler: - # Consumer group used by block scheduler to track the last consumed offset. - # CLI flag: -block-scheduler.consumer-group - [consumer_group: | default = "block-scheduler"] - # How often the scheduler should plan jobs. # CLI flag: -block-scheduler.interval [interval: | default = 15m] diff --git a/go.mod b/go.mod index 071ee3704f32d..ef312d46309fa 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/Azure/azure-storage-blob-go v0.15.0 github.com/Azure/go-autorest/autorest/adal v0.9.24 github.com/Azure/go-autorest/autorest/azure/auth v0.5.13 - github.com/IBM/sarama v1.44.0 + github.com/IBM/sarama v1.45.0 github.com/Masterminds/sprig/v3 v3.3.0 github.com/NYTimes/gziphandler v1.1.1 github.com/Workiva/go-datastructures v1.1.5 @@ -116,7 +116,7 @@ require ( github.com/DmitriyVTitov/size v1.5.0 github.com/IBM/go-sdk-core/v5 v5.18.3 github.com/IBM/ibm-cos-sdk-go v1.12.0 - github.com/axiomhq/hyperloglog v0.2.2 + github.com/axiomhq/hyperloglog v0.2.3 github.com/buger/jsonparser v1.1.1 github.com/d4l3k/messagediff v1.2.1 github.com/dolthub/swiss v0.2.1 @@ -147,7 +147,7 @@ require ( go4.org/netipx v0.0.0-20230125063823-8449b0a6169f golang.org/x/oauth2 v0.25.0 golang.org/x/text v0.21.0 - google.golang.org/protobuf v1.36.1 + google.golang.org/protobuf v1.36.2 gotest.tools v2.2.0+incompatible k8s.io/apimachinery v0.32.0 k8s.io/utils v0.0.0-20241104163129-6fe5fd82f078 diff --git a/go.sum b/go.sum index 695b88caaca38..0452a1696821f 100644 --- a/go.sum +++ b/go.sum @@ -134,8 +134,8 @@ github.com/IBM/go-sdk-core/v5 v5.18.3 h1:q6IDU3N2bHGwijK9pMnzKC5gqdaRII56NzB4ZNd github.com/IBM/go-sdk-core/v5 v5.18.3/go.mod h1:5kILxqEWOrwMhoD2b7J6Xv9Z2M6YIdT/6Oy+XRSsCGQ= github.com/IBM/ibm-cos-sdk-go v1.12.0 h1:Wrk3ve4JS3euhl7XjNFd3RlvPT56199G2/rKaPWpRKU= github.com/IBM/ibm-cos-sdk-go v1.12.0/go.mod h1:v/VBvFuysZMIX9HcaIrz6a+FLVw9px8fq6XabFwD+E4= -github.com/IBM/sarama v1.44.0 h1:puNKqcScjSAgVLramjsuovZrS0nJZFVsrvuUymkWqhE= -github.com/IBM/sarama v1.44.0/go.mod h1:MxQ9SvGfvKIorbk077Ff6DUnBlGpidiQOtU2vuBaxVw= +github.com/IBM/sarama v1.45.0 h1:IzeBevTn809IJ/dhNKhP5mpxEXTmELuezO2tgHD9G5E= +github.com/IBM/sarama v1.45.0/go.mod h1:EEay63m8EZkeumco9TDXf2JT3uDnZsZqFgV46n4yZdY= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/MasslessParticle/azure-storage-blob-go v0.14.1-0.20240322194317-344980fda573 h1:DCPjdUAi+jcGnL7iN+A7uNY8xG584oMRuisYh/VE21E= github.com/MasslessParticle/azure-storage-blob-go v0.14.1-0.20240322194317-344980fda573/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck= @@ -222,8 +222,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 h1:yDxvkz3/uOKfxnv8YhzOi9m+2OGI github.com/aws/aws-sdk-go-v2/service/sts v1.32.4/go.mod h1:9XEUty5v5UAsMiFOBJrNibZgwCeOma73jgGwwhgffa8= github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= -github.com/axiomhq/hyperloglog v0.2.2 h1:9X9rOdYx82zXKgd1aMsDZNUw3d7DKAHhd2J305HZPA8= -github.com/axiomhq/hyperloglog v0.2.2/go.mod h1:DLUK9yIzpU5B6YFLjxTIcbHu1g4Y1WQb1m5RH3radaM= +github.com/axiomhq/hyperloglog v0.2.3 h1:2ZGwz3FGcx77e9/aNjqJijsGhH6RZOlglzxnDpVBCQY= +github.com/axiomhq/hyperloglog v0.2.3/go.mod h1:DLUK9yIzpU5B6YFLjxTIcbHu1g4Y1WQb1m5RH3radaM= github.com/baidubce/bce-sdk-go v0.9.212 h1:B3PUoaFi4m13wP7gWObznjPLZ5umQ1BHjO/UoSsj3x4= github.com/baidubce/bce-sdk-go v0.9.212/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps= @@ -1653,8 +1653,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU= +google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/blockbuilder/builder/builder.go b/pkg/blockbuilder/builder/builder.go index 61ff6ffcbc782..018ccc1365441 100644 --- a/pkg/blockbuilder/builder/builder.go +++ b/pkg/blockbuilder/builder/builder.go @@ -279,6 +279,12 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin completion.Success = false } + // remove from inflight jobs to stop sending sync requests + i.jobsMtx.Lock() + delete(i.inflightJobs, job.ID()) + i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) + i.jobsMtx.Unlock() + if _, err := withBackoff( ctx, i.cfg.Backoff, @@ -292,16 +298,12 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin return true, err } - i.jobsMtx.Lock() - delete(i.inflightJobs, job.ID()) - i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) - i.jobsMtx.Unlock() - return true, err } func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) { level.Debug(logger).Log("msg", "beginning job") + start := time.Now() indexer := newTsdbCreator() appender := newAppender(i.id, @@ -505,6 +507,8 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types level.Info(logger).Log( "msg", "successfully processed job", "last_offset", lastOffset, + "duration", time.Since(start), + "records", lastOffset-job.Offsets().Min, ) return lastOffset, nil diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 019ca61a0c487..4758561fd6c2b 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -332,6 +332,7 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) { case types.JobStatusInProgress: case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired: // Job already completed, re-enqueue a new one + level.Warn(q.logger).Log("msg", "job already completed, re-enqueuing", "job", jobID, "status", jobMeta.Status) registerInProgress() return default: diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index d4e8c0935ce68..55ecc066dc582 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -26,7 +26,6 @@ var ( ) type Config struct { - ConsumerGroup string `yaml:"consumer_group"` Interval time.Duration `yaml:"interval"` LookbackPeriod time.Duration `yaml:"lookback_period"` Strategy string `yaml:"strategy"` @@ -36,7 +35,6 @@ type Config struct { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.Interval, prefix+"interval", 15*time.Minute, "How often the scheduler should plan jobs.") - f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.") f.DurationVar(&cfg.LookbackPeriod, prefix+"lookback-period", 0, "Lookback period used by the scheduler to plan jobs when the consumer group has no commits. 0 consumes from the start of the partition.") f.StringVar( &cfg.Strategy, diff --git a/pkg/logql/count_min_sketch.go b/pkg/logql/count_min_sketch.go index e24e089ad307b..927ff6ff772e7 100644 --- a/pkg/logql/count_min_sketch.go +++ b/pkg/logql/count_min_sketch.go @@ -3,6 +3,8 @@ package logql import ( "container/heap" "fmt" + "slices" + "strings" "github.com/axiomhq/hyperloglog" "github.com/cespare/xxhash/v2" @@ -187,16 +189,17 @@ func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCou } func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) { + slices.SortFunc(metric, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) v.buffer = metric.Bytes(v.buffer) v.F.Add(v.buffer, value) - // Add our metric if we haven't seen it - // TODO(karsten): There is a chance that the ids match but not the labels due to hash collision. Ideally there's // an else block the compares the series labels. However, that's not trivial. Besides, instance.Series has the // same issue in its deduping logic. id := xxhash.Sum64(v.buffer) + + // Add our metric if we haven't seen it if _, ok := v.observed[id]; !ok { heap.Push(v, metric) v.observed[id] = struct{}{} diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index c5ef408cc21ed..9e494f99237d4 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -2,7 +2,9 @@ package log import ( "fmt" + "slices" "sort" + "strings" "sync" "github.com/prometheus/prometheus/model/labels" @@ -585,7 +587,8 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { // Get all labels at once and sort them b.buf = b.UnsortedLabels(b.buf) - sort.Sort(b.buf) + // sort.Sort(b.buf) + slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) hash := b.hasher.Hash(b.buf) if cached, ok := b.resultCache[hash]; ok { diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 5ac3a87503634..af332c8cb54c7 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -241,25 +241,32 @@ func (p *fakeParseHints) ShouldExtract(key string) bool { p.checkCount++ return key == p.label || p.extractAll } + func (p *fakeParseHints) ShouldExtractPrefix(prefix string) bool { return prefix == p.label || p.extractAll } + func (p *fakeParseHints) NoLabels() bool { return false } + func (p *fakeParseHints) RecordExtracted(_ string) { p.count++ } + func (p *fakeParseHints) AllRequiredExtracted() bool { return !p.extractAll && p.count == 1 } + func (p *fakeParseHints) Reset() { p.checkCount = 0 p.count = 0 } + func (p *fakeParseHints) PreserveError() bool { return false } + func (p *fakeParseHints) ShouldContinueParsingLine(_ string, _ *LabelsBuilder) bool { return p.keepGoing } @@ -656,30 +663,36 @@ func Benchmark_Parser(b *testing.B) { b.Run(tt.name, func(b *testing.B) { line := []byte(tt.line) b.Run("no labels hints", func(b *testing.B) { + b.ReportAllocs() builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) b.Run("labels hints", func(b *testing.B) { + b.ReportAllocs() builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) builder.parserKeyHints = NewParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "", nil) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) b.Run("inline stages", func(b *testing.B) { + b.ReportAllocs() stages := []Stage{NewStringLabelFilter(tt.LabelFilterParseHint)} builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) builder.parserKeyHints = NewParserHint(nil, nil, false, false, ", nil", stages) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) }) @@ -1251,7 +1264,6 @@ func TestXExpressionParserFailures(t *testing.T) { }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { _, err := NewLogfmtExpressionParser([]LabelExtractionExpr{tt.expression}, false) diff --git a/pkg/logql/sketch/cms.go b/pkg/logql/sketch/cms.go index b510ee8504ea0..67f72be976c19 100644 --- a/pkg/logql/sketch/cms.go +++ b/pkg/logql/sketch/cms.go @@ -19,7 +19,7 @@ func NewCountMinSketch(w, d uint32) (*CountMinSketch, error) { Depth: d, Width: w, Counters: make2dslice(w, d), - HyperLogLog: hyperloglog.New16(), + HyperLogLog: hyperloglog.New16NoSparse(), }, nil } diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index c6fba0fbf49a1..b2aa50c292154 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -170,10 +170,12 @@ func (in instance) For( go func() { err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error { res, err := fn(queries[i]) + if err != nil { + return err + } response := logql.Resp{ I: i, Res: res, - Err: err, } // Feed the result into the channel unless the work has completed. @@ -181,7 +183,7 @@ func (in instance) For( case <-ctx.Done(): case ch <- response: } - return err + return nil }) if err != nil { ch <- logql.Resp{ @@ -192,15 +194,19 @@ func (in instance) For( close(ch) }() + var err error for resp := range ch { - if resp.Err != nil { - return nil, resp.Err + if err != nil { + continue } - if err := acc.Accumulate(ctx, resp.Res, resp.I); err != nil { - return nil, err + if resp.Err != nil { + err = resp.Err + continue } + err = acc.Accumulate(ctx, resp.Res, resp.I) } - return acc.Result(), nil + + return acc.Result(), err } // convert to matrix diff --git a/pkg/ruler/base/ruler.go b/pkg/ruler/base/ruler.go index adb1a7c136def..4e466810570b1 100644 --- a/pkg/ruler/base/ruler.go +++ b/pkg/ruler/base/ruler.go @@ -407,9 +407,9 @@ func grafanaLinkForExpression(expr, datasourceUID string) string { } marshaledExpression, _ := json.Marshal(exprStruct) - escapedExpression := url.QueryEscape(string(marshaledExpression)) - str := `/explore?left={"queries":[%s]}` - return fmt.Sprintf(str, escapedExpression) + params := url.Values{} + params.Set("left", fmt.Sprintf(`{"queries":[%s]}`, marshaledExpression)) + return `/explore?` + params.Encode() } // SendAlerts implements a rules.NotifyFunc for a Notifier. diff --git a/pkg/ruler/base/ruler_test.go b/pkg/ruler/base/ruler_test.go index bdb437ed9279d..6636f742456a2 100644 --- a/pkg/ruler/base/ruler_test.go +++ b/pkg/ruler/base/ruler_test.go @@ -1733,7 +1733,7 @@ func TestSendAlerts(t *testing.T) { Annotations: []labels.Label{{Name: "a2", Value: "v2"}}, StartsAt: time.Unix(2, 0), EndsAt: time.Unix(3, 0), - GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left={\"queries\":[%s]}", escapedExpression), + GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left=%%7B%%22queries%%22%%3A%%5B%s%%5D%%7D", escapedExpression), }, }, }, @@ -1753,7 +1753,7 @@ func TestSendAlerts(t *testing.T) { Annotations: []labels.Label{{Name: "a2", Value: "v2"}}, StartsAt: time.Unix(2, 0), EndsAt: time.Unix(4, 0), - GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left={\"queries\":[%s]}", escapedExpression), + GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left=%%7B%%22queries%%22%%3A%%5B%s%%5D%%7D", escapedExpression), }, }, }, diff --git a/vendor/github.com/IBM/sarama/Dockerfile.kafka b/vendor/github.com/IBM/sarama/Dockerfile.kafka index b4d5c6acbbcef..d2234e3918f2d 100644 --- a/vendor/github.com/IBM/sarama/Dockerfile.kafka +++ b/vendor/github.com/IBM/sarama/Dockerfile.kafka @@ -1,17 +1,17 @@ -FROM registry.access.redhat.com/ubi8/ubi-minimal:8.10@sha256:cf095e5668919ba1b4ace3888107684ad9d587b1830d3eb56973e6a54f456e67 +FROM registry.access.redhat.com/ubi9/ubi-minimal:9.5@sha256:daa61d6103e98bccf40d7a69a0d4f8786ec390e2204fd94f7cc49053e9949360 USER root RUN microdnf update -y \ - && microdnf install -y curl gzip java-11-openjdk-headless tar tzdata-java \ + && microdnf install -y git gzip java-17-openjdk-headless tar tzdata-java \ && microdnf reinstall -y tzdata \ && microdnf clean all -ENV JAVA_HOME=/usr/lib/jvm/jre-11 +ENV JAVA_HOME=/usr/lib/jvm/jre-17 # https://docs.oracle.com/javase/7/docs/technotes/guides/net/properties.html # Ensure Java doesn't cache any dns results -RUN cd /etc/java/java-11-openjdk/*/conf/security \ +RUN cd /etc/java/java-17-openjdk/*/conf/security \ && sed -e '/networkaddress.cache.ttl/d' -e '/networkaddress.cache.negative.ttl/d' -i java.security \ && echo 'networkaddress.cache.ttl=0' >> java.security \ && echo 'networkaddress.cache.negative.ttl=0' >> java.security @@ -19,24 +19,46 @@ RUN cd /etc/java/java-11-openjdk/*/conf/security \ ARG SCALA_VERSION="2.13" ARG KAFKA_VERSION="3.6.2" +WORKDIR /tmp + # https://github.com/apache/kafka/blob/2e2b0a58eda3e677763af974a44a6aaa3c280214/tests/docker/Dockerfile#L77-L105 ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" SHELL ["/bin/bash", "-o", "pipefail", "-c"] -RUN mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ +RUN --mount=type=bind,target=.,rw=true \ + mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ && chmod a+rw "/opt/kafka-${KAFKA_VERSION}" \ - && curl -s "$KAFKA_MIRROR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" | tar xz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" + && if [ "$KAFKA_VERSION" = "4.0.0" ]; then \ + microdnf install -y java-17-openjdk-devel \ + && git clone --depth=50 --single-branch -b 4.0 https://github.com/apache/kafka /usr/src/kafka \ + && cd /usr/src/kafka \ + && : PIN TO COMMIT BEFORE KAFKA-17616 ZOOKEEPER REMOVAL STARTED \ + && git reset --hard d1504649fb \ + && export JAVA_TOOL_OPTIONS=-XX:MaxRAMPercentage=80 \ + && sed -e '/version=/s/-SNAPSHOT//' -e '/org.gradle.jvmargs/d' -e '/org.gradle.parallel/s/true/false/' -i gradle.properties && ./gradlew -PmaxParallelForks=1 -PmaxScalacThreads=1 --no-daemon releaseTarGz -x siteDocsTar -x javadoc \ + && tar xzf core/build/distributions/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" \ + && cp /tmp/server.properties "/opt/kafka-${KAFKA_VERSION}/config/" \ + && microdnf remove -y java-17-openjdk-devel \ + && rm -rf /usr/src/kafka ; \ + else \ + curl -s "$KAFKA_MIRROR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" | tar xz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" ; \ + fi # older kafka versions depend upon jaxb-api being bundled with the JDK, but it # was removed from Java 11 so work around that by including it in the kafka # libs dir regardless -WORKDIR /tmp RUN curl -sLO "https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api/2.3.0/jaxb-api-2.3.0.jar" \ && for DIR in /opt/kafka-*; do cp -v jaxb-api-2.3.0.jar $DIR/libs/ ; done \ && rm -f jaxb-api-2.3.0.jar +# older kafka versions with the zookeeper 3.4.13 client aren't compatible with Java 17 so quietly bump them to 3.5.9 +RUN [ -f "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.13.jar" ] || exit 0 ; \ + rm -f "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.13.jar" \ + && curl --fail -sSL -o "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.5.9.jar" "https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper/3.5.9/zookeeper-3.5.9.jar" \ + && curl --fail -sSL -o "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-jute-3.5.9.jar" "https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper-jute/3.5.9/zookeeper-jute-3.5.9.jar" + WORKDIR /opt/kafka-${KAFKA_VERSION} -ENV JAVA_MAJOR_VERSION=11 +ENV JAVA_MAJOR_VERSION=17 RUN sed -e "s/JAVA_MAJOR_VERSION=.*/JAVA_MAJOR_VERSION=${JAVA_MAJOR_VERSION}/" -i"" ./bin/kafka-run-class.sh diff --git a/vendor/github.com/IBM/sarama/Makefile b/vendor/github.com/IBM/sarama/Makefile index 7cefc2a2c36bf..ba6f46e41a5ba 100644 --- a/vendor/github.com/IBM/sarama/Makefile +++ b/vendor/github.com/IBM/sarama/Makefile @@ -9,7 +9,7 @@ FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go') $(GOBIN)/tparse: - GOBIN=$(GOBIN) go install github.com/mfridman/tparse@v0.11.1 + GOBIN=$(GOBIN) go install github.com/mfridman/tparse@v0.16.0 get: $(GO) get ./... $(GO) mod verify diff --git a/vendor/github.com/IBM/sarama/admin.go b/vendor/github.com/IBM/sarama/admin.go index 6549c7e6fb07f..8aa1f374e4d63 100644 --- a/vendor/github.com/IBM/sarama/admin.go +++ b/vendor/github.com/IBM/sarama/admin.go @@ -3,6 +3,7 @@ package sarama import ( "errors" "fmt" + "io" "math/rand" "strconv" "sync" @@ -144,6 +145,10 @@ type ClusterAdmin interface { // locally cached value if it's available. Controller() (*Broker, error) + // Coordinator returns the coordinating broker for a consumer group. It will + // return a locally cached value if it's available. + Coordinator(group string) (*Broker, error) + // Remove members from the consumer group by given member identities. // This operation is supported by brokers with version 2.3 or higher // This is for static membership feature. KIP-345 @@ -195,14 +200,25 @@ func (ca *clusterAdmin) Controller() (*Broker, error) { return ca.client.Controller() } +func (ca *clusterAdmin) Coordinator(group string) (*Broker, error) { + return ca.client.Coordinator(group) +} + func (ca *clusterAdmin) refreshController() (*Broker, error) { return ca.client.RefreshController() } -// isErrNotController returns `true` if the given error type unwraps to an -// `ErrNotController` response from Kafka -func isErrNotController(err error) bool { - return errors.Is(err, ErrNotController) +// isRetriableControllerError returns `true` if the given error type unwraps to +// an `ErrNotController` or `EOF` response from Kafka +func isRetriableControllerError(err error) bool { + return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF) +} + +// isRetriableGroupCoordinatorError returns `true` if the given error type +// unwraps to an `ErrNotCoordinatorForConsumer`, +// `ErrConsumerCoordinatorNotAvailable` or `EOF` response from Kafka +func isRetriableGroupCoordinatorError(err error) bool { + return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF) } // retryOnError will repeatedly call the given (error-returning) func in the @@ -252,7 +268,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -269,7 +285,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO } if !errors.Is(topicErr.Err, ErrNoError) { - if errors.Is(topicErr.Err, ErrNotController) { + if isRetriableControllerError(topicErr.Err) { _, _ = ca.refreshController() } return topicErr @@ -281,14 +297,14 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { controller, err := ca.Controller() if err != nil { return err } request := NewMetadataRequest(ca.conf.Version, topics) response, err = controller.GetMetadata(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -301,7 +317,7 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { controller, err := ca.Controller() if err != nil { return err @@ -309,7 +325,7 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32 request := NewMetadataRequest(ca.conf.Version, nil) response, err = controller.GetMetadata(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -441,7 +457,7 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error { request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -485,7 +501,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -526,7 +542,7 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][ request.AddBlock(topic, int32(i), assignment[i]) } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -573,7 +589,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in request.AddBlock(topic, partitions) var rsp *ListPartitionReassignmentsResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -581,7 +597,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in _ = b.Open(ca.client.Config()) rsp, err = b.ListPartitionReassignments(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -924,7 +940,7 @@ func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[s } var res *ElectLeadersResponse - err := ca.retryOnError(isErrNotController, func() error { + if err := ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -932,12 +948,17 @@ func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[s _ = b.Open(ca.client.Config()) res, err = b.ElectLeaders(request) - if isErrNotController(err) { - _, _ = ca.refreshController() + if err != nil { + return err } - return err - }) - if err != nil { + if !errors.Is(res.ErrorCode, ErrNoError) { + if isRetriableControllerError(res.ErrorCode) { + _, _ = ca.refreshController() + } + return res.ErrorCode + } + return nil + }); err != nil { return nil, err } return res.ReplicaElectionResults, nil @@ -947,11 +968,11 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group groupsPerBroker := make(map[*Broker][]string) for _, group := range groups { - controller, err := ca.client.Coordinator(group) + coordinator, err := ca.client.Coordinator(group) if err != nil { return nil, err } - groupsPerBroker[controller] = append(groupsPerBroker[controller], group) + groupsPerBroker[coordinator] = append(groupsPerBroker[coordinator], group) } for broker, brokerGroups := range groupsPerBroker { @@ -1043,22 +1064,36 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e } func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return nil, err - } - + var response *OffsetFetchResponse request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions) + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() + + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - return coordinator.FetchOffset(request) + response, err = coordinator.FetchOffset(request) + if err != nil { + return err + } + if !errors.Is(response.Err, ErrNoError) { + return response.Err + } + + return nil + }) + + return response, err } func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + var response *DeleteOffsetsResponse request := &DeleteOffsetsRequest{ Group: group, partitions: map[string][]int32{ @@ -1066,27 +1101,35 @@ func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, pa }, } - resp, err := coordinator.DeleteOffsets(request) - if err != nil { - return err - } + return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() - if !errors.Is(resp.ErrorCode, ErrNoError) { - return resp.ErrorCode - } + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - if !errors.Is(resp.Errors[topic][partition], ErrNoError) { - return resp.Errors[topic][partition] - } - return nil + response, err = coordinator.DeleteOffsets(request) + if err != nil { + return err + } + if !errors.Is(response.ErrorCode, ErrNoError) { + return response.ErrorCode + } + if !errors.Is(response.Errors[topic][partition], ErrNoError) { + return response.Errors[topic][partition] + } + + return nil + }) } func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + var response *DeleteGroupsResponse request := &DeleteGroupsRequest{ Groups: []string{group}, } @@ -1094,21 +1137,34 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { request.Version = 1 } - resp, err := coordinator.DeleteGroups(request) - if err != nil { - return err - } + return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() - groupErr, ok := resp.GroupErrorCodes[group] - if !ok { - return ErrIncompleteResponse - } + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - if !errors.Is(groupErr, ErrNoError) { - return groupErr - } + response, err = coordinator.DeleteGroups(request) + if err != nil { + return err + } - return nil + groupErr, ok := response.GroupErrorCodes[group] + if !ok { + return ErrIncompleteResponse + } + + if !errors.Is(groupErr, ErrNoError) { + return groupErr + } + + return nil + }) } func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) { @@ -1206,7 +1262,7 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU } var rsp *AlterUserScramCredentialsResponse - err := ca.retryOnError(isErrNotController, func() error { + err := ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -1284,18 +1340,14 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie return nil } -func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) { +func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) { if !ca.conf.Version.IsAtLeast(V2_4_0_0) { return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0") } - - controller, err := ca.client.Coordinator(groupId) - if err != nil { - return nil, err - } + var response *LeaveGroupResponse request := &LeaveGroupRequest{ Version: 3, - GroupId: groupId, + GroupId: group, } for _, instanceId := range groupInstanceIds { groupInstanceId := instanceId @@ -1303,5 +1355,28 @@ func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInsta GroupInstanceId: &groupInstanceId, }) } - return controller.LeaveGroup(request) + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() + + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + + response, err = coordinator.LeaveGroup(request) + if err != nil { + return err + } + if !errors.Is(response.Err, ErrNoError) { + return response.Err + } + + return nil + }) + + return response, err } diff --git a/vendor/github.com/IBM/sarama/balance_strategy.go b/vendor/github.com/IBM/sarama/balance_strategy.go index 30d41779c1e9b..b5bc30a13bdfc 100644 --- a/vendor/github.com/IBM/sarama/balance_strategy.go +++ b/vendor/github.com/IBM/sarama/balance_strategy.go @@ -989,6 +989,7 @@ func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicParti return reversePairPartition } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) { if src == dst { return currentPath, false @@ -1023,6 +1024,7 @@ func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, cur return currentPath, false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) in(cycle []string, cycles [][]string) bool { superCycle := make([]string, len(cycle)-1) for i := 0; i < len(cycle)-1; i++ { @@ -1037,6 +1039,7 @@ func (p *partitionMovements) in(cycle []string, cycles [][]string) bool { return false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) hasCycles(pairs []consumerPair) bool { cycles := make([][]string, 0) for _, pair := range pairs { @@ -1068,6 +1071,7 @@ func (p *partitionMovements) hasCycles(pairs []consumerPair) bool { return false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) isSticky() bool { for topic, movements := range p.PartitionMovementsByTopic { movementPairs := make([]consumerPair, len(movements)) @@ -1085,6 +1089,7 @@ func (p *partitionMovements) isSticky() bool { return true } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func indexOfSubList(source []string, target []string) int { targetSize := len(target) maxCandidate := len(source) - targetSize diff --git a/vendor/github.com/IBM/sarama/create_topics_request.go b/vendor/github.com/IBM/sarama/create_topics_request.go index 8382d17c20a74..e8c0f01472c35 100644 --- a/vendor/github.com/IBM/sarama/create_topics_request.go +++ b/vendor/github.com/IBM/sarama/create_topics_request.go @@ -16,6 +16,21 @@ type CreateTopicsRequest struct { ValidateOnly bool } +func NewCreateTopicsRequest(version KafkaVersion, topicDetails map[string]*TopicDetail, timeout time.Duration) *CreateTopicsRequest { + r := &CreateTopicsRequest{ + TopicDetails: topicDetails, + Timeout: timeout, + } + if version.IsAtLeast(V2_0_0_0) { + r.Version = 3 + } else if version.IsAtLeast(V0_11_0_0) { + r.Version = 2 + } else if version.IsAtLeast(V0_10_2_0) { + r.Version = 1 + } + return r +} + func (c *CreateTopicsRequest) encode(pe packetEncoder) error { if err := pe.putArrayLength(len(c.TopicDetails)); err != nil { return err diff --git a/vendor/github.com/IBM/sarama/delete_topics_request.go b/vendor/github.com/IBM/sarama/delete_topics_request.go index 252c0d0259461..f38f32770be7b 100644 --- a/vendor/github.com/IBM/sarama/delete_topics_request.go +++ b/vendor/github.com/IBM/sarama/delete_topics_request.go @@ -8,6 +8,21 @@ type DeleteTopicsRequest struct { Timeout time.Duration } +func NewDeleteTopicsRequest(version KafkaVersion, topics []string, timeout time.Duration) *DeleteTopicsRequest { + d := &DeleteTopicsRequest{ + Topics: topics, + Timeout: timeout, + } + if version.IsAtLeast(V2_1_0_0) { + d.Version = 3 + } else if version.IsAtLeast(V2_0_0_0) { + d.Version = 2 + } else if version.IsAtLeast(V0_11_0_0) { + d.Version = 1 + } + return d +} + func (d *DeleteTopicsRequest) encode(pe packetEncoder) error { if err := pe.putStringArray(d.Topics); err != nil { return err diff --git a/vendor/github.com/IBM/sarama/docker-compose.yml b/vendor/github.com/IBM/sarama/docker-compose.yml index a0e3d2e21e925..1e66cca0ce5d4 100644 --- a/vendor/github.com/IBM/sarama/docker-compose.yml +++ b/vendor/github.com/IBM/sarama/docker-compose.yml @@ -1,6 +1,6 @@ services: zookeeper-1: - hostname: 'zookeeper-1' + container_name: 'zookeeper-1' image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always @@ -13,7 +13,7 @@ services: ZOO_MAX_CLIENT_CNXNS: '0' ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' zookeeper-2: - hostname: 'zookeeper-2' + container_name: 'zookeeper-2' image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always @@ -26,7 +26,7 @@ services: ZOO_MAX_CLIENT_CNXNS: '0' ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' zookeeper-3: - hostname: 'zookeeper-3' + container_name: 'zookeeper-3' image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always @@ -39,7 +39,7 @@ services: ZOO_MAX_CLIENT_CNXNS: '0' ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' kafka-1: - hostname: 'kafka-1' + container_name: 'kafka-1' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -74,6 +74,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '1' KAFKA_CFG_BROKER_RACK: '1' @@ -85,7 +86,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-2: - hostname: 'kafka-2' + container_name: 'kafka-2' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -120,6 +121,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '2' KAFKA_CFG_BROKER_RACK: '2' @@ -131,7 +133,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-3: - hostname: 'kafka-3' + container_name: 'kafka-3' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -166,6 +168,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '3' KAFKA_CFG_BROKER_RACK: '3' @@ -177,7 +180,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-4: - hostname: 'kafka-4' + container_name: 'kafka-4' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -212,6 +215,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '4' KAFKA_CFG_BROKER_RACK: '4' @@ -223,7 +227,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-5: - hostname: 'kafka-5' + container_name: 'kafka-5' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -258,6 +262,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '5' KAFKA_CFG_BROKER_RACK: '5' @@ -269,7 +274,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" toxiproxy: - hostname: 'toxiproxy' + container_name: 'toxiproxy' image: 'ghcr.io/shopify/toxiproxy:2.4.0' init: true healthcheck: diff --git a/vendor/github.com/IBM/sarama/errors.go b/vendor/github.com/IBM/sarama/errors.go index 2c431aecb05f0..842d302571ae7 100644 --- a/vendor/github.com/IBM/sarama/errors.go +++ b/vendor/github.com/IBM/sarama/errors.go @@ -304,7 +304,7 @@ func (err KError) Error() string { case ErrOffsetsLoadInProgress: return "kafka server: The coordinator is still loading offsets and cannot currently process requests" case ErrConsumerCoordinatorNotAvailable: - return "kafka server: Offset's topic has not yet been created" + return "kafka server: The coordinator is not available" case ErrNotCoordinatorForConsumer: return "kafka server: Request was for a consumer group that is not coordinated by this broker" case ErrInvalidTopic: diff --git a/vendor/github.com/IBM/sarama/server.properties b/vendor/github.com/IBM/sarama/server.properties new file mode 100644 index 0000000000000..21ba1c7d9c61b --- /dev/null +++ b/vendor/github.com/IBM/sarama/server.properties @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required. +# See kafka.server.KafkaConfig for additional details and defaults +# + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. If not configured, the host name will be equal to the value of +# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +#listeners=PLAINTEXT://:9092 + +# Listener name, hostname and port the broker will advertise to clients. +# If not set, it uses the value for "listeners". +#advertised.listeners=PLAINTEXT://your.host.name:9092 + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads=3 + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs=/tmp/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=18000 + + +############################# Group Coordinator Settings ############################# + +# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. +# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. +# The default value for this is 3 seconds. +# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. +# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. +group.initial.rebalance.delay.ms=0 diff --git a/vendor/github.com/IBM/sarama/utils.go b/vendor/github.com/IBM/sarama/utils.go index d5b77e0d92b36..b0e1aceff14d4 100644 --- a/vendor/github.com/IBM/sarama/utils.go +++ b/vendor/github.com/IBM/sarama/utils.go @@ -206,6 +206,7 @@ var ( V3_8_0_0 = newKafkaVersion(3, 8, 0, 0) V3_8_1_0 = newKafkaVersion(3, 8, 1, 0) V3_9_0_0 = newKafkaVersion(3, 9, 0, 0) + V4_0_0_0 = newKafkaVersion(4, 0, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, @@ -277,9 +278,10 @@ var ( V3_8_0_0, V3_8_1_0, V3_9_0_0, + V4_0_0_0, } MinVersion = V0_8_2_0 - MaxVersion = V3_9_0_0 + MaxVersion = V4_0_0_0 DefaultVersion = V2_1_0_0 // reduced set of protocol versions to matrix test diff --git a/vendor/github.com/axiomhq/hyperloglog/compressed.go b/vendor/github.com/axiomhq/hyperloglog/compressed.go index d140b88c16cdc..865901bbc690d 100644 --- a/vendor/github.com/axiomhq/hyperloglog/compressed.go +++ b/vendor/github.com/axiomhq/hyperloglog/compressed.go @@ -1,6 +1,9 @@ package hyperloglog -import "encoding/binary" +import ( + "encoding/binary" + "slices" +) // Original author of this file is github.com/clarkduvall/hyperloglog type iterable interface { @@ -52,32 +55,26 @@ func (v *compressedList) Clone() *compressedList { return newV } -func (v *compressedList) MarshalBinary() (data []byte, err error) { - // Marshal the variableLengthList - bdata, err := v.b.MarshalBinary() - if err != nil { - return nil, err - } - - // At least 4 bytes for the two fixed sized values plus the size of bdata. - data = make([]byte, 0, 4+4+len(bdata)) +func (v *compressedList) AppendBinary(data []byte) ([]byte, error) { + // At least 4 bytes for the two fixed sized values + data = slices.Grow(data, 4+4) // Marshal the count and last values. - data = append(data, []byte{ + data = append(data, // Number of items in the list. - byte(v.count >> 24), - byte(v.count >> 16), - byte(v.count >> 8), + byte(v.count>>24), + byte(v.count>>16), + byte(v.count>>8), byte(v.count), // The last item in the list. - byte(v.last >> 24), - byte(v.last >> 16), - byte(v.last >> 8), + byte(v.last>>24), + byte(v.last>>16), + byte(v.last>>8), byte(v.last), - }...) + ) - // Append the list - return append(data, bdata...), nil + // Append the variableLengthList + return v.b.AppendBinary(data) } func (v *compressedList) UnmarshalBinary(data []byte) error { @@ -130,20 +127,20 @@ func (v *compressedList) Iter() *iterator { type variableLengthList []uint8 -func (v variableLengthList) MarshalBinary() (data []byte, err error) { +func (v variableLengthList) AppendBinary(data []byte) ([]byte, error) { // 4 bytes for the size of the list, and a byte for each element in the // list. - data = make([]byte, 0, 4+v.Len()) + data = slices.Grow(data, 4+v.Len()) // Length of the list. We only need 32 bits because the size of the set // couldn't exceed that on 32 bit architectures. sz := v.Len() - data = append(data, []byte{ - byte(sz >> 24), - byte(sz >> 16), - byte(sz >> 8), + data = append(data, + byte(sz>>24), + byte(sz>>16), + byte(sz>>8), byte(sz), - }...) + ) // Marshal each element in the list. for i := 0; i < sz; i++ { diff --git a/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go b/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go index 24b39e43562aa..f4fbb01e96592 100644 --- a/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go +++ b/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "slices" "sort" ) @@ -203,8 +204,16 @@ func (sk *Sketch) mergeSparse() { } // MarshalBinary implements the encoding.BinaryMarshaler interface. +// +// When the result will be appended to another buffer, consider using +// AppendBinary to avoid additional allocations and copying. func (sk *Sketch) MarshalBinary() (data []byte, err error) { - data = make([]byte, 0, 8+len(sk.regs)) + return sk.AppendBinary(nil) +} + +// AppendBinary implements the encoding.BinaryAppender interface. +func (sk *Sketch) AppendBinary(data []byte) ([]byte, error) { + data = slices.Grow(data, 8+len(sk.regs)) // Marshal a version marker. data = append(data, version) // Marshal p. @@ -217,18 +226,13 @@ func (sk *Sketch) MarshalBinary() (data []byte, err error) { data = append(data, byte(1)) // Add the tmp_set - tsdata, err := sk.tmpSet.MarshalBinary() + data, err := sk.tmpSet.AppendBinary(data) if err != nil { return nil, err } - data = append(data, tsdata...) // Add the sparse Sketch - sdata, err := sk.sparseList.MarshalBinary() - if err != nil { - return nil, err - } - return append(data, sdata...), nil + return sk.sparseList.AppendBinary(data) } // It's using the dense Sketch. @@ -236,12 +240,12 @@ func (sk *Sketch) MarshalBinary() (data []byte, err error) { // Add the dense sketch Sketch. sz := len(sk.regs) - data = append(data, []byte{ - byte(sz >> 24), - byte(sz >> 16), - byte(sz >> 8), + data = append(data, + byte(sz>>24), + byte(sz>>16), + byte(sz>>8), byte(sz), - }...) + ) // Marshal each element in the list. for _, v := range sk.regs { diff --git a/vendor/github.com/axiomhq/hyperloglog/sparse.go b/vendor/github.com/axiomhq/hyperloglog/sparse.go index 0151740df9859..ca2b939516664 100644 --- a/vendor/github.com/axiomhq/hyperloglog/sparse.go +++ b/vendor/github.com/axiomhq/hyperloglog/sparse.go @@ -2,6 +2,7 @@ package hyperloglog import ( "math/bits" + "slices" "github.com/kamstrup/intmap" ) @@ -83,29 +84,29 @@ func (s *set) Clone() *set { return &set{m: newS} } -func (s *set) MarshalBinary() (data []byte, err error) { +func (s *set) AppendBinary(data []byte) ([]byte, error) { // 4 bytes for the size of the set, and 4 bytes for each key. // list. - data = make([]byte, 0, 4+(4*s.m.Len())) + data = slices.Grow(data, 4+(4*s.m.Len())) // Length of the set. We only need 32 bits because the size of the set // couldn't exceed that on 32 bit architectures. sl := s.m.Len() - data = append(data, []byte{ - byte(sl >> 24), - byte(sl >> 16), - byte(sl >> 8), + data = append(data, + byte(sl>>24), + byte(sl>>16), + byte(sl>>8), byte(sl), - }...) + ) // Marshal each element in the set. s.m.ForEach(func(k uint32) bool { - data = append(data, []byte{ - byte(k >> 24), - byte(k >> 16), - byte(k >> 8), + data = append(data, + byte(k>>24), + byte(k>>16), + byte(k>>8), byte(k), - }...) + ) return true }) diff --git a/vendor/google.golang.org/protobuf/internal/impl/message_opaque.go b/vendor/google.golang.org/protobuf/internal/impl/message_opaque.go index d407dd791e89f..d7ec53f074ac4 100644 --- a/vendor/google.golang.org/protobuf/internal/impl/message_opaque.go +++ b/vendor/google.golang.org/protobuf/internal/impl/message_opaque.go @@ -88,9 +88,7 @@ func opaqueInitHook(mi *MessageInfo) bool { mi.oneofs = map[protoreflect.Name]*oneofInfo{} for i := 0; i < mi.Desc.Oneofs().Len(); i++ { od := mi.Desc.Oneofs().Get(i) - if !od.IsSynthetic() { - mi.oneofs[od.Name()] = makeOneofInfo(od, si.structInfo, mi.Exporter) - } + mi.oneofs[od.Name()] = makeOneofInfoOpaque(mi, od, si.structInfo, mi.Exporter) } mi.denseFields = make([]*fieldInfo, fds.Len()*2) @@ -119,6 +117,26 @@ func opaqueInitHook(mi *MessageInfo) bool { return true } +func makeOneofInfoOpaque(mi *MessageInfo, od protoreflect.OneofDescriptor, si structInfo, x exporter) *oneofInfo { + oi := &oneofInfo{oneofDesc: od} + if od.IsSynthetic() { + fd := od.Fields().Get(0) + index, _ := presenceIndex(mi.Desc, fd) + oi.which = func(p pointer) protoreflect.FieldNumber { + if p.IsNil() { + return 0 + } + if !mi.present(p, index) { + return 0 + } + return od.Fields().Get(0).Number() + } + return oi + } + // Dispatch to non-opaque oneof implementation for non-synthetic oneofs. + return makeOneofInfo(od, si, x) +} + func (mi *MessageInfo) fieldInfoForMapOpaque(si opaqueStructInfo, fd protoreflect.FieldDescriptor, fs reflect.StructField) fieldInfo { ft := fs.Type if ft.Kind() != reflect.Map { diff --git a/vendor/google.golang.org/protobuf/internal/version/version.go b/vendor/google.golang.org/protobuf/internal/version/version.go index 3018450df7998..386c823aa641c 100644 --- a/vendor/google.golang.org/protobuf/internal/version/version.go +++ b/vendor/google.golang.org/protobuf/internal/version/version.go @@ -52,7 +52,7 @@ import ( const ( Major = 1 Minor = 36 - Patch = 1 + Patch = 2 PreRelease = "" ) diff --git a/vendor/modules.txt b/vendor/modules.txt index 5ff5397c6ec11..bf14fd4052559 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -263,8 +263,8 @@ github.com/IBM/ibm-cos-sdk-go/private/protocol/restxml github.com/IBM/ibm-cos-sdk-go/private/protocol/xml/xmlutil github.com/IBM/ibm-cos-sdk-go/service/s3 github.com/IBM/ibm-cos-sdk-go/service/s3/s3iface -# github.com/IBM/sarama v1.44.0 -## explicit; go 1.20 +# github.com/IBM/sarama v1.45.0 +## explicit; go 1.21 github.com/IBM/sarama # github.com/Masterminds/goutils v1.1.1 ## explicit @@ -487,7 +487,7 @@ github.com/aws/smithy-go/time github.com/aws/smithy-go/tracing github.com/aws/smithy-go/transport/http github.com/aws/smithy-go/transport/http/internal/io -# github.com/axiomhq/hyperloglog v0.2.2 +# github.com/axiomhq/hyperloglog v0.2.3 ## explicit; go 1.23 github.com/axiomhq/hyperloglog # github.com/baidubce/bce-sdk-go v0.9.212 @@ -2184,7 +2184,7 @@ google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version ## explicit; go 1.21 google.golang.org/grpc/stats/opentelemetry google.golang.org/grpc/stats/opentelemetry/internal -# google.golang.org/protobuf v1.36.1 +# google.golang.org/protobuf v1.36.2 ## explicit; go 1.21 google.golang.org/protobuf/encoding/protodelim google.golang.org/protobuf/encoding/protojson