From 9e13abd454536646b8334a31af4cd600222a569e Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Wed, 8 Jan 2025 19:06:50 -0500 Subject: [PATCH 1/7] fix(deps): update module github.com/ibm/sarama to v1.45.0 (#15636) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 +- vendor/github.com/IBM/sarama/Dockerfile.kafka | 38 ++- vendor/github.com/IBM/sarama/Makefile | 2 +- vendor/github.com/IBM/sarama/admin.go | 217 ++++++++++++------ .../github.com/IBM/sarama/balance_strategy.go | 5 + .../IBM/sarama/create_topics_request.go | 15 ++ .../IBM/sarama/delete_topics_request.go | 15 ++ .../github.com/IBM/sarama/docker-compose.yml | 23 +- vendor/github.com/IBM/sarama/errors.go | 2 +- .../github.com/IBM/sarama/server.properties | 138 +++++++++++ vendor/github.com/IBM/sarama/utils.go | 4 +- vendor/modules.txt | 4 +- 13 files changed, 373 insertions(+), 96 deletions(-) create mode 100644 vendor/github.com/IBM/sarama/server.properties diff --git a/go.mod b/go.mod index 071ee3704f32d..f13f5b9f6be1d 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/Azure/azure-storage-blob-go v0.15.0 github.com/Azure/go-autorest/autorest/adal v0.9.24 github.com/Azure/go-autorest/autorest/azure/auth v0.5.13 - github.com/IBM/sarama v1.44.0 + github.com/IBM/sarama v1.45.0 github.com/Masterminds/sprig/v3 v3.3.0 github.com/NYTimes/gziphandler v1.1.1 github.com/Workiva/go-datastructures v1.1.5 diff --git a/go.sum b/go.sum index 695b88caaca38..15cb39121db77 100644 --- a/go.sum +++ b/go.sum @@ -134,8 +134,8 @@ github.com/IBM/go-sdk-core/v5 v5.18.3 h1:q6IDU3N2bHGwijK9pMnzKC5gqdaRII56NzB4ZNd github.com/IBM/go-sdk-core/v5 v5.18.3/go.mod h1:5kILxqEWOrwMhoD2b7J6Xv9Z2M6YIdT/6Oy+XRSsCGQ= github.com/IBM/ibm-cos-sdk-go v1.12.0 h1:Wrk3ve4JS3euhl7XjNFd3RlvPT56199G2/rKaPWpRKU= github.com/IBM/ibm-cos-sdk-go v1.12.0/go.mod h1:v/VBvFuysZMIX9HcaIrz6a+FLVw9px8fq6XabFwD+E4= -github.com/IBM/sarama v1.44.0 h1:puNKqcScjSAgVLramjsuovZrS0nJZFVsrvuUymkWqhE= -github.com/IBM/sarama v1.44.0/go.mod h1:MxQ9SvGfvKIorbk077Ff6DUnBlGpidiQOtU2vuBaxVw= +github.com/IBM/sarama v1.45.0 h1:IzeBevTn809IJ/dhNKhP5mpxEXTmELuezO2tgHD9G5E= +github.com/IBM/sarama v1.45.0/go.mod h1:EEay63m8EZkeumco9TDXf2JT3uDnZsZqFgV46n4yZdY= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/MasslessParticle/azure-storage-blob-go v0.14.1-0.20240322194317-344980fda573 h1:DCPjdUAi+jcGnL7iN+A7uNY8xG584oMRuisYh/VE21E= github.com/MasslessParticle/azure-storage-blob-go v0.14.1-0.20240322194317-344980fda573/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck= diff --git a/vendor/github.com/IBM/sarama/Dockerfile.kafka b/vendor/github.com/IBM/sarama/Dockerfile.kafka index b4d5c6acbbcef..d2234e3918f2d 100644 --- a/vendor/github.com/IBM/sarama/Dockerfile.kafka +++ b/vendor/github.com/IBM/sarama/Dockerfile.kafka @@ -1,17 +1,17 @@ -FROM registry.access.redhat.com/ubi8/ubi-minimal:8.10@sha256:cf095e5668919ba1b4ace3888107684ad9d587b1830d3eb56973e6a54f456e67 +FROM registry.access.redhat.com/ubi9/ubi-minimal:9.5@sha256:daa61d6103e98bccf40d7a69a0d4f8786ec390e2204fd94f7cc49053e9949360 USER root RUN microdnf update -y \ - && microdnf install -y curl gzip java-11-openjdk-headless tar tzdata-java \ + && microdnf install -y git gzip java-17-openjdk-headless tar tzdata-java \ && microdnf reinstall -y tzdata \ && microdnf clean all -ENV JAVA_HOME=/usr/lib/jvm/jre-11 +ENV JAVA_HOME=/usr/lib/jvm/jre-17 # https://docs.oracle.com/javase/7/docs/technotes/guides/net/properties.html # Ensure Java doesn't cache any dns results -RUN cd /etc/java/java-11-openjdk/*/conf/security \ +RUN cd /etc/java/java-17-openjdk/*/conf/security \ && sed -e '/networkaddress.cache.ttl/d' -e '/networkaddress.cache.negative.ttl/d' -i java.security \ && echo 'networkaddress.cache.ttl=0' >> java.security \ && echo 'networkaddress.cache.negative.ttl=0' >> java.security @@ -19,24 +19,46 @@ RUN cd /etc/java/java-11-openjdk/*/conf/security \ ARG SCALA_VERSION="2.13" ARG KAFKA_VERSION="3.6.2" +WORKDIR /tmp + # https://github.com/apache/kafka/blob/2e2b0a58eda3e677763af974a44a6aaa3c280214/tests/docker/Dockerfile#L77-L105 ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" SHELL ["/bin/bash", "-o", "pipefail", "-c"] -RUN mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ +RUN --mount=type=bind,target=.,rw=true \ + mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ && chmod a+rw "/opt/kafka-${KAFKA_VERSION}" \ - && curl -s "$KAFKA_MIRROR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" | tar xz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" + && if [ "$KAFKA_VERSION" = "4.0.0" ]; then \ + microdnf install -y java-17-openjdk-devel \ + && git clone --depth=50 --single-branch -b 4.0 https://github.com/apache/kafka /usr/src/kafka \ + && cd /usr/src/kafka \ + && : PIN TO COMMIT BEFORE KAFKA-17616 ZOOKEEPER REMOVAL STARTED \ + && git reset --hard d1504649fb \ + && export JAVA_TOOL_OPTIONS=-XX:MaxRAMPercentage=80 \ + && sed -e '/version=/s/-SNAPSHOT//' -e '/org.gradle.jvmargs/d' -e '/org.gradle.parallel/s/true/false/' -i gradle.properties && ./gradlew -PmaxParallelForks=1 -PmaxScalacThreads=1 --no-daemon releaseTarGz -x siteDocsTar -x javadoc \ + && tar xzf core/build/distributions/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" \ + && cp /tmp/server.properties "/opt/kafka-${KAFKA_VERSION}/config/" \ + && microdnf remove -y java-17-openjdk-devel \ + && rm -rf /usr/src/kafka ; \ + else \ + curl -s "$KAFKA_MIRROR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" | tar xz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" ; \ + fi # older kafka versions depend upon jaxb-api being bundled with the JDK, but it # was removed from Java 11 so work around that by including it in the kafka # libs dir regardless -WORKDIR /tmp RUN curl -sLO "https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api/2.3.0/jaxb-api-2.3.0.jar" \ && for DIR in /opt/kafka-*; do cp -v jaxb-api-2.3.0.jar $DIR/libs/ ; done \ && rm -f jaxb-api-2.3.0.jar +# older kafka versions with the zookeeper 3.4.13 client aren't compatible with Java 17 so quietly bump them to 3.5.9 +RUN [ -f "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.13.jar" ] || exit 0 ; \ + rm -f "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.13.jar" \ + && curl --fail -sSL -o "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.5.9.jar" "https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper/3.5.9/zookeeper-3.5.9.jar" \ + && curl --fail -sSL -o "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-jute-3.5.9.jar" "https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper-jute/3.5.9/zookeeper-jute-3.5.9.jar" + WORKDIR /opt/kafka-${KAFKA_VERSION} -ENV JAVA_MAJOR_VERSION=11 +ENV JAVA_MAJOR_VERSION=17 RUN sed -e "s/JAVA_MAJOR_VERSION=.*/JAVA_MAJOR_VERSION=${JAVA_MAJOR_VERSION}/" -i"" ./bin/kafka-run-class.sh diff --git a/vendor/github.com/IBM/sarama/Makefile b/vendor/github.com/IBM/sarama/Makefile index 7cefc2a2c36bf..ba6f46e41a5ba 100644 --- a/vendor/github.com/IBM/sarama/Makefile +++ b/vendor/github.com/IBM/sarama/Makefile @@ -9,7 +9,7 @@ FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go') $(GOBIN)/tparse: - GOBIN=$(GOBIN) go install github.com/mfridman/tparse@v0.11.1 + GOBIN=$(GOBIN) go install github.com/mfridman/tparse@v0.16.0 get: $(GO) get ./... $(GO) mod verify diff --git a/vendor/github.com/IBM/sarama/admin.go b/vendor/github.com/IBM/sarama/admin.go index 6549c7e6fb07f..8aa1f374e4d63 100644 --- a/vendor/github.com/IBM/sarama/admin.go +++ b/vendor/github.com/IBM/sarama/admin.go @@ -3,6 +3,7 @@ package sarama import ( "errors" "fmt" + "io" "math/rand" "strconv" "sync" @@ -144,6 +145,10 @@ type ClusterAdmin interface { // locally cached value if it's available. Controller() (*Broker, error) + // Coordinator returns the coordinating broker for a consumer group. It will + // return a locally cached value if it's available. + Coordinator(group string) (*Broker, error) + // Remove members from the consumer group by given member identities. // This operation is supported by brokers with version 2.3 or higher // This is for static membership feature. KIP-345 @@ -195,14 +200,25 @@ func (ca *clusterAdmin) Controller() (*Broker, error) { return ca.client.Controller() } +func (ca *clusterAdmin) Coordinator(group string) (*Broker, error) { + return ca.client.Coordinator(group) +} + func (ca *clusterAdmin) refreshController() (*Broker, error) { return ca.client.RefreshController() } -// isErrNotController returns `true` if the given error type unwraps to an -// `ErrNotController` response from Kafka -func isErrNotController(err error) bool { - return errors.Is(err, ErrNotController) +// isRetriableControllerError returns `true` if the given error type unwraps to +// an `ErrNotController` or `EOF` response from Kafka +func isRetriableControllerError(err error) bool { + return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF) +} + +// isRetriableGroupCoordinatorError returns `true` if the given error type +// unwraps to an `ErrNotCoordinatorForConsumer`, +// `ErrConsumerCoordinatorNotAvailable` or `EOF` response from Kafka +func isRetriableGroupCoordinatorError(err error) bool { + return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF) } // retryOnError will repeatedly call the given (error-returning) func in the @@ -252,7 +268,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -269,7 +285,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO } if !errors.Is(topicErr.Err, ErrNoError) { - if errors.Is(topicErr.Err, ErrNotController) { + if isRetriableControllerError(topicErr.Err) { _, _ = ca.refreshController() } return topicErr @@ -281,14 +297,14 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { controller, err := ca.Controller() if err != nil { return err } request := NewMetadataRequest(ca.conf.Version, topics) response, err = controller.GetMetadata(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -301,7 +317,7 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { controller, err := ca.Controller() if err != nil { return err @@ -309,7 +325,7 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32 request := NewMetadataRequest(ca.conf.Version, nil) response, err = controller.GetMetadata(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -441,7 +457,7 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error { request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -485,7 +501,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -526,7 +542,7 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][ request.AddBlock(topic, int32(i), assignment[i]) } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -573,7 +589,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in request.AddBlock(topic, partitions) var rsp *ListPartitionReassignmentsResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -581,7 +597,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in _ = b.Open(ca.client.Config()) rsp, err = b.ListPartitionReassignments(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -924,7 +940,7 @@ func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[s } var res *ElectLeadersResponse - err := ca.retryOnError(isErrNotController, func() error { + if err := ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -932,12 +948,17 @@ func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[s _ = b.Open(ca.client.Config()) res, err = b.ElectLeaders(request) - if isErrNotController(err) { - _, _ = ca.refreshController() + if err != nil { + return err } - return err - }) - if err != nil { + if !errors.Is(res.ErrorCode, ErrNoError) { + if isRetriableControllerError(res.ErrorCode) { + _, _ = ca.refreshController() + } + return res.ErrorCode + } + return nil + }); err != nil { return nil, err } return res.ReplicaElectionResults, nil @@ -947,11 +968,11 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group groupsPerBroker := make(map[*Broker][]string) for _, group := range groups { - controller, err := ca.client.Coordinator(group) + coordinator, err := ca.client.Coordinator(group) if err != nil { return nil, err } - groupsPerBroker[controller] = append(groupsPerBroker[controller], group) + groupsPerBroker[coordinator] = append(groupsPerBroker[coordinator], group) } for broker, brokerGroups := range groupsPerBroker { @@ -1043,22 +1064,36 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e } func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return nil, err - } - + var response *OffsetFetchResponse request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions) + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() + + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - return coordinator.FetchOffset(request) + response, err = coordinator.FetchOffset(request) + if err != nil { + return err + } + if !errors.Is(response.Err, ErrNoError) { + return response.Err + } + + return nil + }) + + return response, err } func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + var response *DeleteOffsetsResponse request := &DeleteOffsetsRequest{ Group: group, partitions: map[string][]int32{ @@ -1066,27 +1101,35 @@ func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, pa }, } - resp, err := coordinator.DeleteOffsets(request) - if err != nil { - return err - } + return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() - if !errors.Is(resp.ErrorCode, ErrNoError) { - return resp.ErrorCode - } + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - if !errors.Is(resp.Errors[topic][partition], ErrNoError) { - return resp.Errors[topic][partition] - } - return nil + response, err = coordinator.DeleteOffsets(request) + if err != nil { + return err + } + if !errors.Is(response.ErrorCode, ErrNoError) { + return response.ErrorCode + } + if !errors.Is(response.Errors[topic][partition], ErrNoError) { + return response.Errors[topic][partition] + } + + return nil + }) } func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + var response *DeleteGroupsResponse request := &DeleteGroupsRequest{ Groups: []string{group}, } @@ -1094,21 +1137,34 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { request.Version = 1 } - resp, err := coordinator.DeleteGroups(request) - if err != nil { - return err - } + return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() - groupErr, ok := resp.GroupErrorCodes[group] - if !ok { - return ErrIncompleteResponse - } + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - if !errors.Is(groupErr, ErrNoError) { - return groupErr - } + response, err = coordinator.DeleteGroups(request) + if err != nil { + return err + } - return nil + groupErr, ok := response.GroupErrorCodes[group] + if !ok { + return ErrIncompleteResponse + } + + if !errors.Is(groupErr, ErrNoError) { + return groupErr + } + + return nil + }) } func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) { @@ -1206,7 +1262,7 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU } var rsp *AlterUserScramCredentialsResponse - err := ca.retryOnError(isErrNotController, func() error { + err := ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -1284,18 +1340,14 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie return nil } -func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) { +func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) { if !ca.conf.Version.IsAtLeast(V2_4_0_0) { return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0") } - - controller, err := ca.client.Coordinator(groupId) - if err != nil { - return nil, err - } + var response *LeaveGroupResponse request := &LeaveGroupRequest{ Version: 3, - GroupId: groupId, + GroupId: group, } for _, instanceId := range groupInstanceIds { groupInstanceId := instanceId @@ -1303,5 +1355,28 @@ func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInsta GroupInstanceId: &groupInstanceId, }) } - return controller.LeaveGroup(request) + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() + + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + + response, err = coordinator.LeaveGroup(request) + if err != nil { + return err + } + if !errors.Is(response.Err, ErrNoError) { + return response.Err + } + + return nil + }) + + return response, err } diff --git a/vendor/github.com/IBM/sarama/balance_strategy.go b/vendor/github.com/IBM/sarama/balance_strategy.go index 30d41779c1e9b..b5bc30a13bdfc 100644 --- a/vendor/github.com/IBM/sarama/balance_strategy.go +++ b/vendor/github.com/IBM/sarama/balance_strategy.go @@ -989,6 +989,7 @@ func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicParti return reversePairPartition } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) { if src == dst { return currentPath, false @@ -1023,6 +1024,7 @@ func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, cur return currentPath, false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) in(cycle []string, cycles [][]string) bool { superCycle := make([]string, len(cycle)-1) for i := 0; i < len(cycle)-1; i++ { @@ -1037,6 +1039,7 @@ func (p *partitionMovements) in(cycle []string, cycles [][]string) bool { return false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) hasCycles(pairs []consumerPair) bool { cycles := make([][]string, 0) for _, pair := range pairs { @@ -1068,6 +1071,7 @@ func (p *partitionMovements) hasCycles(pairs []consumerPair) bool { return false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) isSticky() bool { for topic, movements := range p.PartitionMovementsByTopic { movementPairs := make([]consumerPair, len(movements)) @@ -1085,6 +1089,7 @@ func (p *partitionMovements) isSticky() bool { return true } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func indexOfSubList(source []string, target []string) int { targetSize := len(target) maxCandidate := len(source) - targetSize diff --git a/vendor/github.com/IBM/sarama/create_topics_request.go b/vendor/github.com/IBM/sarama/create_topics_request.go index 8382d17c20a74..e8c0f01472c35 100644 --- a/vendor/github.com/IBM/sarama/create_topics_request.go +++ b/vendor/github.com/IBM/sarama/create_topics_request.go @@ -16,6 +16,21 @@ type CreateTopicsRequest struct { ValidateOnly bool } +func NewCreateTopicsRequest(version KafkaVersion, topicDetails map[string]*TopicDetail, timeout time.Duration) *CreateTopicsRequest { + r := &CreateTopicsRequest{ + TopicDetails: topicDetails, + Timeout: timeout, + } + if version.IsAtLeast(V2_0_0_0) { + r.Version = 3 + } else if version.IsAtLeast(V0_11_0_0) { + r.Version = 2 + } else if version.IsAtLeast(V0_10_2_0) { + r.Version = 1 + } + return r +} + func (c *CreateTopicsRequest) encode(pe packetEncoder) error { if err := pe.putArrayLength(len(c.TopicDetails)); err != nil { return err diff --git a/vendor/github.com/IBM/sarama/delete_topics_request.go b/vendor/github.com/IBM/sarama/delete_topics_request.go index 252c0d0259461..f38f32770be7b 100644 --- a/vendor/github.com/IBM/sarama/delete_topics_request.go +++ b/vendor/github.com/IBM/sarama/delete_topics_request.go @@ -8,6 +8,21 @@ type DeleteTopicsRequest struct { Timeout time.Duration } +func NewDeleteTopicsRequest(version KafkaVersion, topics []string, timeout time.Duration) *DeleteTopicsRequest { + d := &DeleteTopicsRequest{ + Topics: topics, + Timeout: timeout, + } + if version.IsAtLeast(V2_1_0_0) { + d.Version = 3 + } else if version.IsAtLeast(V2_0_0_0) { + d.Version = 2 + } else if version.IsAtLeast(V0_11_0_0) { + d.Version = 1 + } + return d +} + func (d *DeleteTopicsRequest) encode(pe packetEncoder) error { if err := pe.putStringArray(d.Topics); err != nil { return err diff --git a/vendor/github.com/IBM/sarama/docker-compose.yml b/vendor/github.com/IBM/sarama/docker-compose.yml index a0e3d2e21e925..1e66cca0ce5d4 100644 --- a/vendor/github.com/IBM/sarama/docker-compose.yml +++ b/vendor/github.com/IBM/sarama/docker-compose.yml @@ -1,6 +1,6 @@ services: zookeeper-1: - hostname: 'zookeeper-1' + container_name: 'zookeeper-1' image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always @@ -13,7 +13,7 @@ services: ZOO_MAX_CLIENT_CNXNS: '0' ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' zookeeper-2: - hostname: 'zookeeper-2' + container_name: 'zookeeper-2' image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always @@ -26,7 +26,7 @@ services: ZOO_MAX_CLIENT_CNXNS: '0' ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' zookeeper-3: - hostname: 'zookeeper-3' + container_name: 'zookeeper-3' image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always @@ -39,7 +39,7 @@ services: ZOO_MAX_CLIENT_CNXNS: '0' ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' kafka-1: - hostname: 'kafka-1' + container_name: 'kafka-1' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -74,6 +74,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '1' KAFKA_CFG_BROKER_RACK: '1' @@ -85,7 +86,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-2: - hostname: 'kafka-2' + container_name: 'kafka-2' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -120,6 +121,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '2' KAFKA_CFG_BROKER_RACK: '2' @@ -131,7 +133,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-3: - hostname: 'kafka-3' + container_name: 'kafka-3' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -166,6 +168,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '3' KAFKA_CFG_BROKER_RACK: '3' @@ -177,7 +180,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-4: - hostname: 'kafka-4' + container_name: 'kafka-4' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -212,6 +215,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '4' KAFKA_CFG_BROKER_RACK: '4' @@ -223,7 +227,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-5: - hostname: 'kafka-5' + container_name: 'kafka-5' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -258,6 +262,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '5' KAFKA_CFG_BROKER_RACK: '5' @@ -269,7 +274,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" toxiproxy: - hostname: 'toxiproxy' + container_name: 'toxiproxy' image: 'ghcr.io/shopify/toxiproxy:2.4.0' init: true healthcheck: diff --git a/vendor/github.com/IBM/sarama/errors.go b/vendor/github.com/IBM/sarama/errors.go index 2c431aecb05f0..842d302571ae7 100644 --- a/vendor/github.com/IBM/sarama/errors.go +++ b/vendor/github.com/IBM/sarama/errors.go @@ -304,7 +304,7 @@ func (err KError) Error() string { case ErrOffsetsLoadInProgress: return "kafka server: The coordinator is still loading offsets and cannot currently process requests" case ErrConsumerCoordinatorNotAvailable: - return "kafka server: Offset's topic has not yet been created" + return "kafka server: The coordinator is not available" case ErrNotCoordinatorForConsumer: return "kafka server: Request was for a consumer group that is not coordinated by this broker" case ErrInvalidTopic: diff --git a/vendor/github.com/IBM/sarama/server.properties b/vendor/github.com/IBM/sarama/server.properties new file mode 100644 index 0000000000000..21ba1c7d9c61b --- /dev/null +++ b/vendor/github.com/IBM/sarama/server.properties @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required. +# See kafka.server.KafkaConfig for additional details and defaults +# + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. If not configured, the host name will be equal to the value of +# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +#listeners=PLAINTEXT://:9092 + +# Listener name, hostname and port the broker will advertise to clients. +# If not set, it uses the value for "listeners". +#advertised.listeners=PLAINTEXT://your.host.name:9092 + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads=3 + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs=/tmp/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=18000 + + +############################# Group Coordinator Settings ############################# + +# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. +# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. +# The default value for this is 3 seconds. +# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. +# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. +group.initial.rebalance.delay.ms=0 diff --git a/vendor/github.com/IBM/sarama/utils.go b/vendor/github.com/IBM/sarama/utils.go index d5b77e0d92b36..b0e1aceff14d4 100644 --- a/vendor/github.com/IBM/sarama/utils.go +++ b/vendor/github.com/IBM/sarama/utils.go @@ -206,6 +206,7 @@ var ( V3_8_0_0 = newKafkaVersion(3, 8, 0, 0) V3_8_1_0 = newKafkaVersion(3, 8, 1, 0) V3_9_0_0 = newKafkaVersion(3, 9, 0, 0) + V4_0_0_0 = newKafkaVersion(4, 0, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, @@ -277,9 +278,10 @@ var ( V3_8_0_0, V3_8_1_0, V3_9_0_0, + V4_0_0_0, } MinVersion = V0_8_2_0 - MaxVersion = V3_9_0_0 + MaxVersion = V4_0_0_0 DefaultVersion = V2_1_0_0 // reduced set of protocol versions to matrix test diff --git a/vendor/modules.txt b/vendor/modules.txt index 5ff5397c6ec11..5040e9ea62e0e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -263,8 +263,8 @@ github.com/IBM/ibm-cos-sdk-go/private/protocol/restxml github.com/IBM/ibm-cos-sdk-go/private/protocol/xml/xmlutil github.com/IBM/ibm-cos-sdk-go/service/s3 github.com/IBM/ibm-cos-sdk-go/service/s3/s3iface -# github.com/IBM/sarama v1.44.0 -## explicit; go 1.20 +# github.com/IBM/sarama v1.45.0 +## explicit; go 1.21 github.com/IBM/sarama # github.com/Masterminds/goutils v1.1.1 ## explicit From b6df82e9654fb8ae529fdc442ae2427971fa7955 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Wed, 8 Jan 2025 19:07:21 -0500 Subject: [PATCH 2/7] fix(deps): update module github.com/axiomhq/hyperloglog to v0.2.3 (#15641) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 +- .../axiomhq/hyperloglog/compressed.go | 51 +++++++++---------- .../axiomhq/hyperloglog/hyperloglog.go | 30 ++++++----- .../github.com/axiomhq/hyperloglog/sparse.go | 25 ++++----- vendor/modules.txt | 2 +- 6 files changed, 58 insertions(+), 56 deletions(-) diff --git a/go.mod b/go.mod index f13f5b9f6be1d..dd11345babaea 100644 --- a/go.mod +++ b/go.mod @@ -116,7 +116,7 @@ require ( github.com/DmitriyVTitov/size v1.5.0 github.com/IBM/go-sdk-core/v5 v5.18.3 github.com/IBM/ibm-cos-sdk-go v1.12.0 - github.com/axiomhq/hyperloglog v0.2.2 + github.com/axiomhq/hyperloglog v0.2.3 github.com/buger/jsonparser v1.1.1 github.com/d4l3k/messagediff v1.2.1 github.com/dolthub/swiss v0.2.1 diff --git a/go.sum b/go.sum index 15cb39121db77..85319be119f5d 100644 --- a/go.sum +++ b/go.sum @@ -222,8 +222,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 h1:yDxvkz3/uOKfxnv8YhzOi9m+2OGI github.com/aws/aws-sdk-go-v2/service/sts v1.32.4/go.mod h1:9XEUty5v5UAsMiFOBJrNibZgwCeOma73jgGwwhgffa8= github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= -github.com/axiomhq/hyperloglog v0.2.2 h1:9X9rOdYx82zXKgd1aMsDZNUw3d7DKAHhd2J305HZPA8= -github.com/axiomhq/hyperloglog v0.2.2/go.mod h1:DLUK9yIzpU5B6YFLjxTIcbHu1g4Y1WQb1m5RH3radaM= +github.com/axiomhq/hyperloglog v0.2.3 h1:2ZGwz3FGcx77e9/aNjqJijsGhH6RZOlglzxnDpVBCQY= +github.com/axiomhq/hyperloglog v0.2.3/go.mod h1:DLUK9yIzpU5B6YFLjxTIcbHu1g4Y1WQb1m5RH3radaM= github.com/baidubce/bce-sdk-go v0.9.212 h1:B3PUoaFi4m13wP7gWObznjPLZ5umQ1BHjO/UoSsj3x4= github.com/baidubce/bce-sdk-go v0.9.212/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps= diff --git a/vendor/github.com/axiomhq/hyperloglog/compressed.go b/vendor/github.com/axiomhq/hyperloglog/compressed.go index d140b88c16cdc..865901bbc690d 100644 --- a/vendor/github.com/axiomhq/hyperloglog/compressed.go +++ b/vendor/github.com/axiomhq/hyperloglog/compressed.go @@ -1,6 +1,9 @@ package hyperloglog -import "encoding/binary" +import ( + "encoding/binary" + "slices" +) // Original author of this file is github.com/clarkduvall/hyperloglog type iterable interface { @@ -52,32 +55,26 @@ func (v *compressedList) Clone() *compressedList { return newV } -func (v *compressedList) MarshalBinary() (data []byte, err error) { - // Marshal the variableLengthList - bdata, err := v.b.MarshalBinary() - if err != nil { - return nil, err - } - - // At least 4 bytes for the two fixed sized values plus the size of bdata. - data = make([]byte, 0, 4+4+len(bdata)) +func (v *compressedList) AppendBinary(data []byte) ([]byte, error) { + // At least 4 bytes for the two fixed sized values + data = slices.Grow(data, 4+4) // Marshal the count and last values. - data = append(data, []byte{ + data = append(data, // Number of items in the list. - byte(v.count >> 24), - byte(v.count >> 16), - byte(v.count >> 8), + byte(v.count>>24), + byte(v.count>>16), + byte(v.count>>8), byte(v.count), // The last item in the list. - byte(v.last >> 24), - byte(v.last >> 16), - byte(v.last >> 8), + byte(v.last>>24), + byte(v.last>>16), + byte(v.last>>8), byte(v.last), - }...) + ) - // Append the list - return append(data, bdata...), nil + // Append the variableLengthList + return v.b.AppendBinary(data) } func (v *compressedList) UnmarshalBinary(data []byte) error { @@ -130,20 +127,20 @@ func (v *compressedList) Iter() *iterator { type variableLengthList []uint8 -func (v variableLengthList) MarshalBinary() (data []byte, err error) { +func (v variableLengthList) AppendBinary(data []byte) ([]byte, error) { // 4 bytes for the size of the list, and a byte for each element in the // list. - data = make([]byte, 0, 4+v.Len()) + data = slices.Grow(data, 4+v.Len()) // Length of the list. We only need 32 bits because the size of the set // couldn't exceed that on 32 bit architectures. sz := v.Len() - data = append(data, []byte{ - byte(sz >> 24), - byte(sz >> 16), - byte(sz >> 8), + data = append(data, + byte(sz>>24), + byte(sz>>16), + byte(sz>>8), byte(sz), - }...) + ) // Marshal each element in the list. for i := 0; i < sz; i++ { diff --git a/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go b/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go index 24b39e43562aa..f4fbb01e96592 100644 --- a/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go +++ b/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "slices" "sort" ) @@ -203,8 +204,16 @@ func (sk *Sketch) mergeSparse() { } // MarshalBinary implements the encoding.BinaryMarshaler interface. +// +// When the result will be appended to another buffer, consider using +// AppendBinary to avoid additional allocations and copying. func (sk *Sketch) MarshalBinary() (data []byte, err error) { - data = make([]byte, 0, 8+len(sk.regs)) + return sk.AppendBinary(nil) +} + +// AppendBinary implements the encoding.BinaryAppender interface. +func (sk *Sketch) AppendBinary(data []byte) ([]byte, error) { + data = slices.Grow(data, 8+len(sk.regs)) // Marshal a version marker. data = append(data, version) // Marshal p. @@ -217,18 +226,13 @@ func (sk *Sketch) MarshalBinary() (data []byte, err error) { data = append(data, byte(1)) // Add the tmp_set - tsdata, err := sk.tmpSet.MarshalBinary() + data, err := sk.tmpSet.AppendBinary(data) if err != nil { return nil, err } - data = append(data, tsdata...) // Add the sparse Sketch - sdata, err := sk.sparseList.MarshalBinary() - if err != nil { - return nil, err - } - return append(data, sdata...), nil + return sk.sparseList.AppendBinary(data) } // It's using the dense Sketch. @@ -236,12 +240,12 @@ func (sk *Sketch) MarshalBinary() (data []byte, err error) { // Add the dense sketch Sketch. sz := len(sk.regs) - data = append(data, []byte{ - byte(sz >> 24), - byte(sz >> 16), - byte(sz >> 8), + data = append(data, + byte(sz>>24), + byte(sz>>16), + byte(sz>>8), byte(sz), - }...) + ) // Marshal each element in the list. for _, v := range sk.regs { diff --git a/vendor/github.com/axiomhq/hyperloglog/sparse.go b/vendor/github.com/axiomhq/hyperloglog/sparse.go index 0151740df9859..ca2b939516664 100644 --- a/vendor/github.com/axiomhq/hyperloglog/sparse.go +++ b/vendor/github.com/axiomhq/hyperloglog/sparse.go @@ -2,6 +2,7 @@ package hyperloglog import ( "math/bits" + "slices" "github.com/kamstrup/intmap" ) @@ -83,29 +84,29 @@ func (s *set) Clone() *set { return &set{m: newS} } -func (s *set) MarshalBinary() (data []byte, err error) { +func (s *set) AppendBinary(data []byte) ([]byte, error) { // 4 bytes for the size of the set, and 4 bytes for each key. // list. - data = make([]byte, 0, 4+(4*s.m.Len())) + data = slices.Grow(data, 4+(4*s.m.Len())) // Length of the set. We only need 32 bits because the size of the set // couldn't exceed that on 32 bit architectures. sl := s.m.Len() - data = append(data, []byte{ - byte(sl >> 24), - byte(sl >> 16), - byte(sl >> 8), + data = append(data, + byte(sl>>24), + byte(sl>>16), + byte(sl>>8), byte(sl), - }...) + ) // Marshal each element in the set. s.m.ForEach(func(k uint32) bool { - data = append(data, []byte{ - byte(k >> 24), - byte(k >> 16), - byte(k >> 8), + data = append(data, + byte(k>>24), + byte(k>>16), + byte(k>>8), byte(k), - }...) + ) return true }) diff --git a/vendor/modules.txt b/vendor/modules.txt index 5040e9ea62e0e..a22ba57b6d10c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -487,7 +487,7 @@ github.com/aws/smithy-go/time github.com/aws/smithy-go/tracing github.com/aws/smithy-go/transport/http github.com/aws/smithy-go/transport/http/internal/io -# github.com/axiomhq/hyperloglog v0.2.2 +# github.com/axiomhq/hyperloglog v0.2.3 ## explicit; go 1.23 github.com/axiomhq/hyperloglog # github.com/baidubce/bce-sdk-go v0.9.212 From 74885a20c735e8cd02bb590336bb7ae81c54bb33 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Wed, 8 Jan 2025 19:08:04 -0500 Subject: [PATCH 3/7] fix(deps): update module google.golang.org/protobuf to v1.36.2 (#15635) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- .../protobuf/internal/impl/message_opaque.go | 24 ++++++++++++++++--- .../protobuf/internal/version/version.go | 2 +- vendor/modules.txt | 2 +- 5 files changed, 26 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index dd11345babaea..ef312d46309fa 100644 --- a/go.mod +++ b/go.mod @@ -147,7 +147,7 @@ require ( go4.org/netipx v0.0.0-20230125063823-8449b0a6169f golang.org/x/oauth2 v0.25.0 golang.org/x/text v0.21.0 - google.golang.org/protobuf v1.36.1 + google.golang.org/protobuf v1.36.2 gotest.tools v2.2.0+incompatible k8s.io/apimachinery v0.32.0 k8s.io/utils v0.0.0-20241104163129-6fe5fd82f078 diff --git a/go.sum b/go.sum index 85319be119f5d..0452a1696821f 100644 --- a/go.sum +++ b/go.sum @@ -1653,8 +1653,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU= +google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/vendor/google.golang.org/protobuf/internal/impl/message_opaque.go b/vendor/google.golang.org/protobuf/internal/impl/message_opaque.go index d407dd791e89f..d7ec53f074ac4 100644 --- a/vendor/google.golang.org/protobuf/internal/impl/message_opaque.go +++ b/vendor/google.golang.org/protobuf/internal/impl/message_opaque.go @@ -88,9 +88,7 @@ func opaqueInitHook(mi *MessageInfo) bool { mi.oneofs = map[protoreflect.Name]*oneofInfo{} for i := 0; i < mi.Desc.Oneofs().Len(); i++ { od := mi.Desc.Oneofs().Get(i) - if !od.IsSynthetic() { - mi.oneofs[od.Name()] = makeOneofInfo(od, si.structInfo, mi.Exporter) - } + mi.oneofs[od.Name()] = makeOneofInfoOpaque(mi, od, si.structInfo, mi.Exporter) } mi.denseFields = make([]*fieldInfo, fds.Len()*2) @@ -119,6 +117,26 @@ func opaqueInitHook(mi *MessageInfo) bool { return true } +func makeOneofInfoOpaque(mi *MessageInfo, od protoreflect.OneofDescriptor, si structInfo, x exporter) *oneofInfo { + oi := &oneofInfo{oneofDesc: od} + if od.IsSynthetic() { + fd := od.Fields().Get(0) + index, _ := presenceIndex(mi.Desc, fd) + oi.which = func(p pointer) protoreflect.FieldNumber { + if p.IsNil() { + return 0 + } + if !mi.present(p, index) { + return 0 + } + return od.Fields().Get(0).Number() + } + return oi + } + // Dispatch to non-opaque oneof implementation for non-synthetic oneofs. + return makeOneofInfo(od, si, x) +} + func (mi *MessageInfo) fieldInfoForMapOpaque(si opaqueStructInfo, fd protoreflect.FieldDescriptor, fs reflect.StructField) fieldInfo { ft := fs.Type if ft.Kind() != reflect.Map { diff --git a/vendor/google.golang.org/protobuf/internal/version/version.go b/vendor/google.golang.org/protobuf/internal/version/version.go index 3018450df7998..386c823aa641c 100644 --- a/vendor/google.golang.org/protobuf/internal/version/version.go +++ b/vendor/google.golang.org/protobuf/internal/version/version.go @@ -52,7 +52,7 @@ import ( const ( Major = 1 Minor = 36 - Patch = 1 + Patch = 2 PreRelease = "" ) diff --git a/vendor/modules.txt b/vendor/modules.txt index a22ba57b6d10c..bf14fd4052559 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2184,7 +2184,7 @@ google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version ## explicit; go 1.21 google.golang.org/grpc/stats/opentelemetry google.golang.org/grpc/stats/opentelemetry/internal -# google.golang.org/protobuf v1.36.1 +# google.golang.org/protobuf v1.36.2 ## explicit; go 1.21 google.golang.org/protobuf/encoding/protodelim google.golang.org/protobuf/encoding/protojson From e347eb7c803d573e56b4730bbaeb83bbd4a6596d Mon Sep 17 00:00:00 2001 From: matthewhudsonedb <94822000+matthewhudsonedb@users.noreply.github.com> Date: Thu, 9 Jan 2025 18:30:54 +1100 Subject: [PATCH 4/7] fix: Fix loki ruler generator url left parameter url encoding (#15601) The generate URL `left` parameter is not URL encoded. The double quotes in str variable around "queries" break hyperlinking. --- pkg/ruler/base/ruler.go | 6 +++--- pkg/ruler/base/ruler_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/ruler/base/ruler.go b/pkg/ruler/base/ruler.go index adb1a7c136def..4e466810570b1 100644 --- a/pkg/ruler/base/ruler.go +++ b/pkg/ruler/base/ruler.go @@ -407,9 +407,9 @@ func grafanaLinkForExpression(expr, datasourceUID string) string { } marshaledExpression, _ := json.Marshal(exprStruct) - escapedExpression := url.QueryEscape(string(marshaledExpression)) - str := `/explore?left={"queries":[%s]}` - return fmt.Sprintf(str, escapedExpression) + params := url.Values{} + params.Set("left", fmt.Sprintf(`{"queries":[%s]}`, marshaledExpression)) + return `/explore?` + params.Encode() } // SendAlerts implements a rules.NotifyFunc for a Notifier. diff --git a/pkg/ruler/base/ruler_test.go b/pkg/ruler/base/ruler_test.go index bdb437ed9279d..6636f742456a2 100644 --- a/pkg/ruler/base/ruler_test.go +++ b/pkg/ruler/base/ruler_test.go @@ -1733,7 +1733,7 @@ func TestSendAlerts(t *testing.T) { Annotations: []labels.Label{{Name: "a2", Value: "v2"}}, StartsAt: time.Unix(2, 0), EndsAt: time.Unix(3, 0), - GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left={\"queries\":[%s]}", escapedExpression), + GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left=%%7B%%22queries%%22%%3A%%5B%s%%5D%%7D", escapedExpression), }, }, }, @@ -1753,7 +1753,7 @@ func TestSendAlerts(t *testing.T) { Annotations: []labels.Label{{Name: "a2", Value: "v2"}}, StartsAt: time.Unix(2, 0), EndsAt: time.Unix(4, 0), - GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left={\"queries\":[%s]}", escapedExpression), + GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left=%%7B%%22queries%%22%%3A%%5B%s%%5D%%7D", escapedExpression), }, }, }, From 5f476a39e130a22dc5b641af84a0c902f83a4ad3 Mon Sep 17 00:00:00 2001 From: jackyin Date: Thu, 9 Jan 2025 18:25:03 +0800 Subject: [PATCH 5/7] fix: Fix goroutine leak in queryrange downstreamer (#15665) --- pkg/querier/queryrange/downstreamer.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index c6fba0fbf49a1..b2aa50c292154 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -170,10 +170,12 @@ func (in instance) For( go func() { err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error { res, err := fn(queries[i]) + if err != nil { + return err + } response := logql.Resp{ I: i, Res: res, - Err: err, } // Feed the result into the channel unless the work has completed. @@ -181,7 +183,7 @@ func (in instance) For( case <-ctx.Done(): case ch <- response: } - return err + return nil }) if err != nil { ch <- logql.Resp{ @@ -192,15 +194,19 @@ func (in instance) For( close(ch) }() + var err error for resp := range ch { - if resp.Err != nil { - return nil, resp.Err + if err != nil { + continue } - if err := acc.Accumulate(ctx, resp.Res, resp.I); err != nil { - return nil, err + if resp.Err != nil { + err = resp.Err + continue } + err = acc.Accumulate(ctx, resp.Res, resp.I) } - return acc.Result(), nil + + return acc.Result(), err } // convert to matrix From bef20431cbbf302e584c4eea2eb423537bcf86e7 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 9 Jan 2025 11:27:23 +0100 Subject: [PATCH 6/7] perf(approx_topk): Reduce memory usage of HyperLogLog in approx_topk. (#15559) The count min sketch data structure backing the new approx_topk aggregation uses HyperLogLog (HLL) to track the actual cardinality of the aggregated vector. We were using the sparse version of the HLL. However, that resulted in memory and allocation overhead. --- pkg/logql/count_min_sketch.go | 7 +++++-- pkg/logql/log/labels.go | 5 ++++- pkg/logql/log/parser_test.go | 14 +++++++++++++- pkg/logql/sketch/cms.go | 2 +- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/pkg/logql/count_min_sketch.go b/pkg/logql/count_min_sketch.go index e24e089ad307b..927ff6ff772e7 100644 --- a/pkg/logql/count_min_sketch.go +++ b/pkg/logql/count_min_sketch.go @@ -3,6 +3,8 @@ package logql import ( "container/heap" "fmt" + "slices" + "strings" "github.com/axiomhq/hyperloglog" "github.com/cespare/xxhash/v2" @@ -187,16 +189,17 @@ func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCou } func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) { + slices.SortFunc(metric, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) v.buffer = metric.Bytes(v.buffer) v.F.Add(v.buffer, value) - // Add our metric if we haven't seen it - // TODO(karsten): There is a chance that the ids match but not the labels due to hash collision. Ideally there's // an else block the compares the series labels. However, that's not trivial. Besides, instance.Series has the // same issue in its deduping logic. id := xxhash.Sum64(v.buffer) + + // Add our metric if we haven't seen it if _, ok := v.observed[id]; !ok { heap.Push(v, metric) v.observed[id] = struct{}{} diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index c5ef408cc21ed..9e494f99237d4 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -2,7 +2,9 @@ package log import ( "fmt" + "slices" "sort" + "strings" "sync" "github.com/prometheus/prometheus/model/labels" @@ -585,7 +587,8 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { // Get all labels at once and sort them b.buf = b.UnsortedLabels(b.buf) - sort.Sort(b.buf) + // sort.Sort(b.buf) + slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) hash := b.hasher.Hash(b.buf) if cached, ok := b.resultCache[hash]; ok { diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 5ac3a87503634..af332c8cb54c7 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -241,25 +241,32 @@ func (p *fakeParseHints) ShouldExtract(key string) bool { p.checkCount++ return key == p.label || p.extractAll } + func (p *fakeParseHints) ShouldExtractPrefix(prefix string) bool { return prefix == p.label || p.extractAll } + func (p *fakeParseHints) NoLabels() bool { return false } + func (p *fakeParseHints) RecordExtracted(_ string) { p.count++ } + func (p *fakeParseHints) AllRequiredExtracted() bool { return !p.extractAll && p.count == 1 } + func (p *fakeParseHints) Reset() { p.checkCount = 0 p.count = 0 } + func (p *fakeParseHints) PreserveError() bool { return false } + func (p *fakeParseHints) ShouldContinueParsingLine(_ string, _ *LabelsBuilder) bool { return p.keepGoing } @@ -656,30 +663,36 @@ func Benchmark_Parser(b *testing.B) { b.Run(tt.name, func(b *testing.B) { line := []byte(tt.line) b.Run("no labels hints", func(b *testing.B) { + b.ReportAllocs() builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) b.Run("labels hints", func(b *testing.B) { + b.ReportAllocs() builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) builder.parserKeyHints = NewParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "", nil) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) b.Run("inline stages", func(b *testing.B) { + b.ReportAllocs() stages := []Stage{NewStringLabelFilter(tt.LabelFilterParseHint)} builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) builder.parserKeyHints = NewParserHint(nil, nil, false, false, ", nil", stages) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) }) @@ -1251,7 +1264,6 @@ func TestXExpressionParserFailures(t *testing.T) { }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { _, err := NewLogfmtExpressionParser([]LabelExtractionExpr{tt.expression}, false) diff --git a/pkg/logql/sketch/cms.go b/pkg/logql/sketch/cms.go index b510ee8504ea0..67f72be976c19 100644 --- a/pkg/logql/sketch/cms.go +++ b/pkg/logql/sketch/cms.go @@ -19,7 +19,7 @@ func NewCountMinSketch(w, d uint32) (*CountMinSketch, error) { Depth: d, Width: w, Counters: make2dslice(w, d), - HyperLogLog: hyperloglog.New16(), + HyperLogLog: hyperloglog.New16NoSparse(), }, nil } From b5c627b81360d7042fd04e4d1827b9ef79ac70e0 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Thu, 9 Jan 2025 19:32:14 +0530 Subject: [PATCH 7/7] chore(blockbuilder): mark a job as complete only after stopping sync updates (#15670) --- docs/sources/shared/configuration.md | 4 ---- pkg/blockbuilder/builder/builder.go | 14 +++++++++----- pkg/blockbuilder/scheduler/queue.go | 1 + pkg/blockbuilder/scheduler/scheduler.go | 2 -- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 91b560e106c87..82b5294daa191 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -210,10 +210,6 @@ block_builder: [scheduler_grpc_client_config: ] block_scheduler: - # Consumer group used by block scheduler to track the last consumed offset. - # CLI flag: -block-scheduler.consumer-group - [consumer_group: | default = "block-scheduler"] - # How often the scheduler should plan jobs. # CLI flag: -block-scheduler.interval [interval: | default = 15m] diff --git a/pkg/blockbuilder/builder/builder.go b/pkg/blockbuilder/builder/builder.go index 61ff6ffcbc782..018ccc1365441 100644 --- a/pkg/blockbuilder/builder/builder.go +++ b/pkg/blockbuilder/builder/builder.go @@ -279,6 +279,12 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin completion.Success = false } + // remove from inflight jobs to stop sending sync requests + i.jobsMtx.Lock() + delete(i.inflightJobs, job.ID()) + i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) + i.jobsMtx.Unlock() + if _, err := withBackoff( ctx, i.cfg.Backoff, @@ -292,16 +298,12 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin return true, err } - i.jobsMtx.Lock() - delete(i.inflightJobs, job.ID()) - i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) - i.jobsMtx.Unlock() - return true, err } func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) { level.Debug(logger).Log("msg", "beginning job") + start := time.Now() indexer := newTsdbCreator() appender := newAppender(i.id, @@ -505,6 +507,8 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types level.Info(logger).Log( "msg", "successfully processed job", "last_offset", lastOffset, + "duration", time.Since(start), + "records", lastOffset-job.Offsets().Min, ) return lastOffset, nil diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 019ca61a0c487..4758561fd6c2b 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -332,6 +332,7 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) { case types.JobStatusInProgress: case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired: // Job already completed, re-enqueue a new one + level.Warn(q.logger).Log("msg", "job already completed, re-enqueuing", "job", jobID, "status", jobMeta.Status) registerInProgress() return default: diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index d4e8c0935ce68..55ecc066dc582 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -26,7 +26,6 @@ var ( ) type Config struct { - ConsumerGroup string `yaml:"consumer_group"` Interval time.Duration `yaml:"interval"` LookbackPeriod time.Duration `yaml:"lookback_period"` Strategy string `yaml:"strategy"` @@ -36,7 +35,6 @@ type Config struct { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.Interval, prefix+"interval", 15*time.Minute, "How often the scheduler should plan jobs.") - f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.") f.DurationVar(&cfg.LookbackPeriod, prefix+"lookback-period", 0, "Lookback period used by the scheduler to plan jobs when the consumer group has no commits. 0 consumes from the start of the partition.") f.StringVar( &cfg.Strategy,