diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 58f709f0be2f8..4e36e5ce3018e 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -82,10 +82,9 @@ var ( ) type metrics struct { - queueDuration prometheus.Histogram - inflightRequests prometheus.Summary - chunkRefsUnfiltered prometheus.Counter - chunkRefsFiltered prometheus.Counter + queueDuration prometheus.Histogram + inflightRequests prometheus.Summary + chunkRemovals *prometheus.CounterVec } func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { @@ -106,29 +105,15 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) * MaxAge: time.Minute, AgeBuckets: 6, }), - chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + chunkRemovals: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "chunkrefs_pre_filtering", - Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.", - }), - chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "chunkrefs_post_filtering", - Help: "Total amount of chunk refs post filtering.", - }), + Name: "chunk_removals_total", + Help: "Total amount of removals received from the block querier partitioned by state. The state 'accepted' means that the removals are processed, the state 'dropped' means that the removals were received after the task context was done (e.g. client timeout, etc).", + }, []string{"state"}), } } -func (m *metrics) addUnfilteredCount(n int) { - m.chunkRefsUnfiltered.Add(float64(n)) -} - -func (m *metrics) addFilteredCount(n int) { - m.chunkRefsFiltered.Add(float64(n)) -} - // SyncMap is a map structure which can be synchronized using the RWMutex type SyncMap[k comparable, v any] struct { sync.RWMutex @@ -324,12 +309,8 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, errors.New("from time must not be after through time") } - numChunksUnfiltered := len(req.Refs) - // Shortcut if request does not contain filters if len(req.Filters) == 0 { - g.metrics.addUnfilteredCount(numChunksUnfiltered) - g.metrics.addFilteredCount(len(req.Refs)) return &logproto.FilterChunkRefResponse{ ChunkRefs: req.Refs, }, nil @@ -374,15 +355,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk // When enqueuing, we also add the task to the pending tasks g.pendingTasks.Add(task.ID, task) }) - go consumeTask(ctx, task, tasksCh, logger) + go g.consumeTask(ctx, task, tasksCh) } responses := responsesPool.Get(numSeries) defer responsesPool.Put(responses) remaining := len(tasks) -outer: - for { + for remaining > 0 { select { case <-ctx.Done(): return nil, errors.Wrap(ctx.Err(), "request failed") @@ -393,23 +373,17 @@ outer: } responses = append(responses, task.responses...) remaining-- - if remaining == 0 { - break outer - } } } - for _, o := range responses { - if o.Removals.Len() == 0 { - continue - } - removeNotMatchingChunks(req, o, g.logger) - } + preFilterSeries := len(req.Refs) - g.metrics.addUnfilteredCount(numChunksUnfiltered) - g.metrics.addFilteredCount(len(req.Refs)) + // TODO(chaudum): Don't wait for all responses before starting to filter chunks. + filtered := g.processResponses(req, responses) - level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs)) + postFilterSeries := len(req.Refs) + + level.Info(logger).Log("msg", "return filtered chunk refs", "pre_filter_series", preFilterSeries, "post_filter_series", postFilterSeries, "filtered_chunks", filtered) return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil } @@ -419,16 +393,18 @@ outer: // task is closed by the worker. // Once the tasks is closed, it will send the task with the results from the // block querier to the supplied task channel. -func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log.Logger) { - logger = log.With(logger, "task", task.ID) +func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Task) { + logger := log.With(g.logger, "task", task.ID) for res := range task.resCh { select { case <-ctx.Done(): level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) + g.metrics.chunkRemovals.WithLabelValues("dropped").Add(float64(res.Removals.Len())) default: level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) task.responses = append(task.responses, res) + g.metrics.chunkRemovals.WithLabelValues("accepted").Add(float64(res.Removals.Len())) } } @@ -441,7 +417,18 @@ func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log } } -func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, logger log.Logger) { +func (g *Gateway) processResponses(req *logproto.FilterChunkRefRequest, responses []v1.Output) (filtered int) { + for _, o := range responses { + if o.Removals.Len() == 0 { + continue + } + filtered += g.removeNotMatchingChunks(req, o) + } + return +} + +func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) (filtered int) { + // binary search index of fingerprint idx := sort.Search(len(req.Refs), func(i int) bool { return req.Refs[i].Fingerprint >= uint64(res.Fp) @@ -449,13 +436,15 @@ func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, // fingerprint not found if idx >= len(req.Refs) { - level.Error(logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp)) + level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp)) return } // if all chunks of a fingerprint are are removed // then remove the whole group from the response if len(req.Refs[idx].Refs) == res.Removals.Len() { + filtered += len(req.Refs[idx].Refs) + req.Refs[idx] = nil // avoid leaking pointer req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...) return @@ -465,10 +454,13 @@ func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, toRemove := res.Removals[i] for j := 0; j < len(req.Refs[idx].Refs); j++ { if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum { + filtered += 1 + req.Refs[idx].Refs[j] = nil // avoid leaking pointer req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...) j-- // since we removed the current item at index, we have to redo the same index } } } + return } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index f07e014b84dc3..fede86484a96b 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -423,6 +423,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { } func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) { + g := &Gateway{ + logger: log.NewNopLogger(), + } t.Run("removing chunks partially", func(t *testing.T) { req := &logproto.FilterChunkRefRequest{ Refs: []*logproto.GroupedChunkRefs{ @@ -450,7 +453,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) { }}, }, } - removeNotMatchingChunks(req, res, log.NewNopLogger()) + n := g.removeNotMatchingChunks(req, res) + require.Equal(t, 2, n) require.Equal(t, expected, req) }) @@ -474,7 +478,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) { expected := &logproto.FilterChunkRefRequest{ Refs: []*logproto.GroupedChunkRefs{}, } - removeNotMatchingChunks(req, res, log.NewNopLogger()) + n := g.removeNotMatchingChunks(req, res) + require.Equal(t, 3, n) require.Equal(t, expected, req) }) diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index 4b2366e83f287..02608bfdf71c4 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -5,17 +5,56 @@ import ( "sort" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" ) +type querierMetrics struct { + chunksTotal prometheus.Counter + chunksFiltered prometheus.Counter + seriesTotal prometheus.Counter + seriesFiltered prometheus.Counter +} + +func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem string) *querierMetrics { + return &querierMetrics{ + chunksTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunks_total", + Help: "Total amount of chunks pre filtering. Does not count chunks in failed requests.", + }), + chunksFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunks_filtered_total", + Help: "Total amount of chunks that have been filtered out. Does not count chunks in failed requests.", + }), + seriesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_total", + Help: "Total amount of series pre filtering. Does not count series in failed requests.", + }), + seriesFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_filtered_total", + Help: "Total amount of series that have been filtered out. Does not count series in failed requests.", + }), + } +} + // BloomQuerier is a store-level abstraction on top of Client // It is used by the index gateway to filter ChunkRefs based on given line fiter expression. type BloomQuerier struct { - c Client - logger log.Logger + c Client + logger log.Logger + metrics *querierMetrics } func NewQuerier(c Client, logger log.Logger) *BloomQuerier { @@ -37,6 +76,9 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from defer groupedChunksRefPool.Put(grouped) grouped = groupChunkRefs(chunkRefs, grouped) + preFilterChunks := len(chunkRefs) + preFilterSeries := len(grouped) + refs, err := bq.c.FilterChunks(ctx, tenant, from, through, grouped, filters...) if err != nil { return nil, err @@ -55,6 +97,15 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from }) } } + + postFilterChunks := len(result) + postFilterSeries := len(refs) + + bq.metrics.chunksTotal.Add(float64(preFilterChunks)) + bq.metrics.chunksFiltered.Add(float64(preFilterChunks - postFilterChunks)) + bq.metrics.seriesTotal.Add(float64(preFilterSeries)) + bq.metrics.seriesFiltered.Add(float64(preFilterSeries - postFilterSeries)) + return result, nil } diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 5c57c0a2e4952..ec44081c1b30c 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -163,7 +163,7 @@ func (w *worker) running(_ context.Context) error { err = p.run(taskCtx, tasks) if err != nil { - w.metrics.processDuration.WithLabelValues(w.id, labelSuccess).Observe(time.Since(start).Seconds()) + w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds()) w.metrics.tasksProcessed.WithLabelValues(w.id, labelFailure).Add(float64(len(tasks))) level.Error(w.logger).Log("msg", "failed to process tasks", "err", err) } else { diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md index c0c6406ea9077..2ae99af502b51 100644 --- a/production/helm/loki/CHANGELOG.md +++ b/production/helm/loki/CHANGELOG.md @@ -16,6 +16,7 @@ Entries should include a reference to the pull request that introduced the chang ## 5.43.2 - [ENHANCEMENT] Added missing default values to support ServerSideApply +- [BUGFIX] Added `alibabacloud` to `isUsingObjectStorage` check. ## 5.43.1 diff --git a/production/helm/loki/templates/_helpers.tpl b/production/helm/loki/templates/_helpers.tpl index 9dd70123189e9..502c7650010b6 100644 --- a/production/helm/loki/templates/_helpers.tpl +++ b/production/helm/loki/templates/_helpers.tpl @@ -597,7 +597,7 @@ Create the service endpoint including port for MinIO. {{/* Determine if deployment is using object storage */}} {{- define "loki.isUsingObjectStorage" -}} -{{- or (eq .Values.loki.storage.type "gcs") (eq .Values.loki.storage.type "s3") (eq .Values.loki.storage.type "azure") (eq .Values.loki.storage.type "swift") -}} +{{- or (eq .Values.loki.storage.type "gcs") (eq .Values.loki.storage.type "s3") (eq .Values.loki.storage.type "azure") (eq .Values.loki.storage.type "swift") (eq .Values.loki.storage.type "alibabacloud") -}} {{- end -}} {{/* Configure the correct name for the memberlist service */}} diff --git a/production/ksonnet/loki/bloom-compactor.libsonnet b/production/ksonnet/loki/bloom-compactor.libsonnet deleted file mode 100644 index d8c5e862fa106..0000000000000 --- a/production/ksonnet/loki/bloom-compactor.libsonnet +++ /dev/null @@ -1,125 +0,0 @@ -{ - local k = import 'ksonnet-util/kausal.libsonnet', - local container = k.core.v1.container, - local containerPort = k.core.v1.containerPort, - local pvc = k.core.v1.persistentVolumeClaim, - local service = k.core.v1.service, - local statefulSet = k.apps.v1.statefulSet, - local volume = k.core.v1.volume, - local volumeMount = k.core.v1.volumeMount, - - local name = 'bloom-compactor', - - _config+:: { - bloom_compactor+: { - // number of replicas - replicas: if $._config.use_bloom_filters then 3 else 0, - // PVC config - pvc_size: if $._config.use_bloom_filters then error 'bloom_compactor.pvc_size needs to be defined' else '', - pvc_class: if $._config.use_bloom_filters then error 'bloom_compactor.pvc_class needs to be defined' else '', - }, - loki+: - if $._config.use_bloom_filters - then - { - bloom_compactor: { - enabled: true, - working_directory: '/data/blooms', - compaction_interval: '15m', - max_compaction_parallelism: 1, - }, - } - else {}, - }, - - local cfg = self._config.bloom_compactor, - - local volumeName = name + '-data', - local volumeMounts = [volumeMount.new(volumeName, '/data')], - - bloom_compactor_args:: - if $._config.use_bloom_filters - then - $._config.commonArgs { - target: 'bloom-compactor', - } - else {}, - - bloom_compactor_ports:: [ - containerPort.new(name='http-metrics', port=$._config.http_listen_port), - containerPort.new(name='grpc', port=9095), - ], - - bloom_compactor_data_pvc:: - if $._config.use_bloom_filters - then - pvc.new(volumeName) - // set disk size - + pvc.mixin.spec.resources.withRequests({ storage: $._config.bloom_compactor.pvc_size }) - // mount the volume as read-write by a single node - + pvc.mixin.spec.withAccessModes(['ReadWriteOnce']) - // set persistent volume storage class - + pvc.mixin.spec.withStorageClassName($._config.bloom_compactor.pvc_class) - else {}, - - - bloom_compactor_container:: - if $._config.use_bloom_filters - then - container.new(name, $._images.bloom_compactor) - // add default ports - + container.withPorts($.bloom_compactor_ports) - // add target specific CLI arguments - + container.withArgsMixin(k.util.mapToFlags($.bloom_compactor_args)) - // mount the data pvc at given mountpoint - + container.withVolumeMountsMixin(volumeMounts) - // add globale environment variables - + container.withEnvMixin($._config.commonEnvs) - // add HTTP readiness probe - + container.mixin.readinessProbe.httpGet.withPath('/ready') - + container.mixin.readinessProbe.httpGet.withPort($._config.http_listen_port) - + container.mixin.readinessProbe.withTimeoutSeconds(1) - // define container resource requests - + k.util.resourcesRequests('2', '4Gi') - // define container resource limits - + k.util.resourcesLimits(null, '8Gi') - else {}, - - bloom_compactor_statefulset: - if $._config.use_bloom_filters - then - statefulSet.new(name, cfg.replicas, [$.bloom_compactor_container], $.bloom_compactor_data_pvc) - // add clusterIP service - + statefulSet.mixin.spec.withServiceName(name) - // perform rolling update when statefulset configuration changes - + statefulSet.mixin.spec.updateStrategy.withType('RollingUpdate') - // launch or terminate pods in parallel, *does not* affect upgrades - + statefulSet.mixin.spec.withPodManagementPolicy('Parallel') - // 10001 is the user/group ID assigned to Loki in the Dockerfile - + statefulSet.mixin.spec.template.spec.securityContext.withRunAsUser(10001) - + statefulSet.mixin.spec.template.spec.securityContext.withRunAsGroup(10001) - + statefulSet.mixin.spec.template.spec.securityContext.withFsGroup(10001) - // ensure statefulset is updated when loki config changes - + $.config_hash_mixin - // ensure no other workloads are scheduled - + k.util.antiAffinity - // mount the loki config.yaml - + k.util.configVolumeMount('loki', '/etc/loki/config') - // mount the runtime overrides.yaml - + k.util.configVolumeMount('overrides', '/etc/loki/overrides') - else {}, - - bloom_compactor_service: - if $._config.use_bloom_filters - then - k.util.serviceFor($.bloom_compactor_statefulset, $._config.service_ignored_labels) - else {}, - - bloom_compactor_headless_service: - if $._config.use_bloom_filters - then - k.util.serviceFor($.bloom_compactor_statefulset, $._config.service_ignored_labels) - + service.mixin.metadata.withName(name + '-headless') - + service.mixin.spec.withClusterIp('None') - else {}, -} diff --git a/production/ksonnet/loki/bloom-gateway.libsonnet b/production/ksonnet/loki/bloom-gateway.libsonnet deleted file mode 100644 index 18e50c7e0d91b..0000000000000 --- a/production/ksonnet/loki/bloom-gateway.libsonnet +++ /dev/null @@ -1,170 +0,0 @@ -{ - local k = import 'ksonnet-util/kausal.libsonnet', - local container = k.core.v1.container, - local containerPort = k.core.v1.containerPort, - local pvc = k.core.v1.persistentVolumeClaim, - local service = k.core.v1.service, - local statefulSet = k.apps.v1.statefulSet, - local volume = k.core.v1.volume, - local volumeMount = k.core.v1.volumeMount, - - local name = 'bloom-gateway', - - _config+:: { - bloom_gateway+: { - // number of replicas - replicas: if $._config.use_bloom_filters then 3 else 0, - // if true, the host needs to have local SSD disks mounted, otherwise PVCs are used - use_local_ssd: false, - // PVC config - pvc_size: if !self.use_local_ssd then error 'bloom_gateway.pvc_size needs to be defined when using PVC' else '', - pvc_class: if !self.use_local_ssd then error 'bloom_gateway.pvc_class needs to be defined when using PVC' else '', - // local SSD config - hostpath: if self.use_local_ssd then error 'bloom_gateway.hostpath needs to be defined when using local SSDs' else '', - node_selector: if self.use_local_ssd then error 'bloom_gateway.node_selector needs to be defined when using local SSDs' else {}, - tolerations: if self.use_local_ssd then error 'bloom_gateway.tolerations needs to be defined when using local SSDs' else [], - }, - loki+: - if $._config.use_bloom_filters - then - { - bloom_gateway+: { - enabled: true, - worker_concurrency: 8, - ring: { - replication_factor: 3, - }, - client: { - cache_results: false, - }, - }, - storage_config+: { - bloom_shipper+: { - working_directory: '/data/blooms', - blocks_downloading_queue: { - workers_count: 10, - }, - blocks_cache: { - enabled: true, - max_size_mb: error 'set bloom_shipper.blocks_cache.max_size_mb to ~80% of available disk size', - ttl: '24h', - }, - }, - }, - } - else {}, - }, - - local cfg = self._config.bloom_gateway, - - local volumeName = name + '-data', - - local volumes = - if cfg.use_local_ssd - then [volume.fromHostPath(volumeName, cfg.hostpath)] - else [], - - local volumeMounts = [ - volumeMount.new(volumeName, '/data'), - ], - - bloom_gateway_args:: - if $._config.use_bloom_filters - then - $._config.commonArgs { - target: 'bloom-gateway', - } - else {}, - - bloom_gateway_ports:: [ - containerPort.new(name='http-metrics', port=$._config.http_listen_port), - containerPort.new(name='grpc', port=9095), - ], - - bloom_gateway_data_pvc:: - if $._config.use_bloom_filters && !cfg.use_local_ssd - then - pvc.new(volumeName) - // set disk size - + pvc.mixin.spec.resources.withRequests({ storage: $._config.bloom_gateway.pvc_size }) - // mount the volume as read-write by a single node - + pvc.mixin.spec.withAccessModes(['ReadWriteOnce']) - // set persistent volume storage class - + pvc.mixin.spec.withStorageClassName($._config.bloom_compactor.pvc_class) - else - null, - - bloom_gateway_container:: - if $._config.use_bloom_filters - then - container.new(name, $._images.bloom_gateway) - // add default ports - + container.withPorts($.bloom_gateway_ports) - // add target specific CLI arguments - + container.withArgsMixin(k.util.mapToFlags($.bloom_gateway_args)) - // mount local SSD or PVC - + container.withVolumeMountsMixin(volumeMounts) - // add globale environment variables - + container.withEnvMixin($._config.commonEnvs) - // add HTTP readiness probe - + container.mixin.readinessProbe.httpGet.withPath('/ready') - + container.mixin.readinessProbe.httpGet.withPort($._config.http_listen_port) - + container.mixin.readinessProbe.withTimeoutSeconds(1) - // define container resource requests - + k.util.resourcesRequests('2', '4Gi') - // define container resource limits - + k.util.resourcesLimits(null, '8Gi') - else {}, - - bloom_gateway_statefulset: - if $._config.use_bloom_filters - then - statefulSet.new(name, cfg.replicas, [$.bloom_gateway_container]) - // add clusterIP service - + statefulSet.mixin.spec.withServiceName(name) - // perform rolling update when statefulset configuration changes - + statefulSet.mixin.spec.updateStrategy.withType('RollingUpdate') - // launch or terminate pods in parallel, *does not* affect upgrades - + statefulSet.mixin.spec.withPodManagementPolicy('Parallel') - // 10001 is the user/group ID assigned to Loki in the Dockerfile - + statefulSet.mixin.spec.template.spec.securityContext.withRunAsUser(10001) - + statefulSet.mixin.spec.template.spec.securityContext.withRunAsGroup(10001) - + statefulSet.mixin.spec.template.spec.securityContext.withFsGroup(10001) - // ensure statefulset is updated when loki config changes - + $.config_hash_mixin - // ensure no other workloads are scheduled - + k.util.antiAffinity - // mount the loki config.yaml - + k.util.configVolumeMount('loki', '/etc/loki/config') - // mount the runtime overrides.yaml - + k.util.configVolumeMount('overrides', '/etc/loki/overrides') - // configuration specific to SSD/PVC usage - + ( - if cfg.use_local_ssd - then - // ensure the pod is scheduled on a node with local SSDs if needed - statefulSet.mixin.spec.template.spec.withNodeSelector(cfg.node_selector) - // tolerate the local-ssd taint - + statefulSet.mixin.spec.template.spec.withTolerationsMixin(cfg.tolerations) - // mount the local SSDs - + statefulSet.mixin.spec.template.spec.withVolumesMixin(volumes) - else - // create persistent volume claim - statefulSet.mixin.spec.withVolumeClaimTemplates([$.bloom_gateway_data_pvc]) - ) - else {}, - - bloom_gateway_service: - if $._config.use_bloom_filters - then - k.util.serviceFor($.bloom_gateway_statefulset, $._config.service_ignored_labels) - else {}, - - bloom_gateway_headless_service: - if $._config.use_bloom_filters - then - k.util.serviceFor($.bloom_gateway_statefulset, $._config.service_ignored_labels) - + service.mixin.metadata.withName(name + '-headless') - + service.mixin.spec.withClusterIp('None') - else {}, -} diff --git a/production/ksonnet/loki/bloomfilters.libsonnet b/production/ksonnet/loki/bloomfilters.libsonnet deleted file mode 100644 index 78231a808e1a0..0000000000000 --- a/production/ksonnet/loki/bloomfilters.libsonnet +++ /dev/null @@ -1,8 +0,0 @@ -{ - _config+:: { - // globally enable/disable bloom gateway and bloom compactor - use_bloom_filters: false, - }, -} -+ (import 'bloom-compactor.libsonnet') -+ (import 'bloom-gateway.libsonnet') diff --git a/production/ksonnet/loki/loki.libsonnet b/production/ksonnet/loki/loki.libsonnet index 871a68025e990..ad0489a69cd3f 100644 --- a/production/ksonnet/loki/loki.libsonnet +++ b/production/ksonnet/loki/loki.libsonnet @@ -26,9 +26,6 @@ // BoltDB and TSDB Shipper support. Anything that modifies the compactor must be imported after this. (import 'shipper.libsonnet') + -// Accelerated search using bloom filters -(import 'bloomfilters.libsonnet') + - (import 'table-manager.libsonnet') + // Multi-zone ingester related config