Skip to content

Commit

Permalink
Add basic structure of bloom gateways (#10782)
Browse files Browse the repository at this point in the history
### Summary

This pull requests adds the basic structure for the new bloom gateway component.

- Adds new `bloom-gateway` target that runs with multiple instances joined by a ring
- Adds a querier and client component on the index gateway to filter chunk refs
- Adds the gRPC protobuf definitions for commication between index gateways and bloom gateways
- Adds a store component used on the bloom gateways to query binary bloom files

```

			     Querier   Query Frontend
			        |           |
			................................... service boundary
			        |           |
			        +----+------+
			             |
			     indexgateway.Gateway**
			             |
			   bloomgateway.BloomQuerier
			             |
			   bloomgateway.GatewayClient
			             |
			  logproto.BloomGatewayClient
			             |
			................................... service boundary
			             |
			      bloomgateway.Gateway
			             |
			       bloomshipper.Store
			             |
			      bloomshipper.Shipper
			             |
                        bloomshipper.BloomFileClient**
			             |
			        ObjectClient**
			             |
			................................... service boundary
			             |
		         object storage

** not part of this PR
```

This PR still contains a lot of TODOs and possibilities for optimisations, which will be addressed in subsequent pull requests.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Oct 16, 2023
1 parent 048587f commit b49b3ce
Show file tree
Hide file tree
Showing 35 changed files with 4,000 additions and 214 deletions.
131 changes: 131 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# object store.
[index_gateway: <index_gateway>]

# The bloom_gateway block configures the Loki bloom gateway server, responsible
# for serving queries for filtering chunks based on filter expressions.
[bloom_gateway: <bloom_gateway>]

# The storage_config block configures one of many possible stores for both the
# index and chunks. Which configuration to be picked should be defined in
# schema_config block.
Expand Down Expand Up @@ -1687,6 +1691,125 @@ ring:
[replication_factor: <int> | default = 3]
```

### bloom_gateway

The `bloom_gateway` block configures the Loki bloom gateway server, responsible for serving queries for filtering chunks based on filter expressions.

```yaml
# Defines the ring to be used by the bloom gateway servers and clients. In case
# this isn't configured, this block supports inheriting configuration from the
# common ring section.
ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
# CLI flag: -bloom-gateway.ring.store
[store: <string> | default = "consul"]
# The prefix for the keys in the store. Should end with a /.
# CLI flag: -bloom-gateway.ring.prefix
[prefix: <string> | default = "collectors/"]
# Configuration for a Consul client. Only applies if the selected kvstore is
# consul.
# The CLI flags prefix for this block configuration is: bloom-gateway.ring
[consul: <consul>]
# Configuration for an ETCD v3 client. Only applies if the selected kvstore
# is etcd.
# The CLI flags prefix for this block configuration is: bloom-gateway.ring
[etcd: <etcd>]
multi:
# Primary backend storage used by multi-client.
# CLI flag: -bloom-gateway.ring.multi.primary
[primary: <string> | default = ""]
# Secondary backend storage used by multi-client.
# CLI flag: -bloom-gateway.ring.multi.secondary
[secondary: <string> | default = ""]
# Mirror writes to secondary store.
# CLI flag: -bloom-gateway.ring.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]
# Timeout for storing value to secondary store.
# CLI flag: -bloom-gateway.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -bloom-gateway.ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]
# The heartbeat timeout after which compactors are considered unhealthy within
# the ring. 0 = never (timeout disabled).
# CLI flag: -bloom-gateway.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# File path where tokens are stored. If empty, tokens are not stored at
# shutdown and restored at startup.
# CLI flag: -bloom-gateway.ring.tokens-file-path
[tokens_file_path: <string> | default = ""]
# True to enable zone-awareness and replicate blocks across different
# availability zones.
# CLI flag: -bloom-gateway.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Instance ID to register in the ring.
# CLI flag: -bloom-gateway.ring.instance-id
[instance_id: <string> | default = "<hostname>"]
# Name of network interface to read address from.
# CLI flag: -bloom-gateway.ring.instance-interface-names
[instance_interface_names: <list of strings> | default = [<private network interfaces>]]
# Port to advertise in the ring (defaults to server.grpc-listen-port).
# CLI flag: -bloom-gateway.ring.instance-port
[instance_port: <int> | default = 0]
# IP address to advertise in the ring.
# CLI flag: -bloom-gateway.ring.instance-addr
[instance_addr: <string> | default = ""]
# The availability zone where this instance is running. Required if
# zone-awareness is enabled.
# CLI flag: -bloom-gateway.ring.instance-availability-zone
[instance_availability_zone: <string> | default = ""]
# Enable using a IPv6 instance address.
# CLI flag: -bloom-gateway.ring.instance-enable-ipv6
[instance_enable_ipv6: <boolean> | default = false]
# Factor for data replication on the bloom gateways.
# CLI flag: -bloom-gateway.replication-factor
[replication_factor: <int> | default = 3]
# Flag to enable or disable the usage of the bloom gatway component.
# CLI flag: -bloom-gateway.enabled
[enabled: <boolean> | default = false]
client:
# Configures the behavior of the connection pool.
pool_config:
[client_cleanup_period: <duration>]
[health_check_ingesters: <boolean>]
[remote_timeout: <duration>]
# The grpc_client block configures the gRPC client used to communicate between
# two Loki components.
# The CLI flags prefix for this block configuration is:
# bloom-gateway-client.grpc
[grpc_client_config: <grpc_client>]
# Flag to control whether requests sent to the gateway should be logged or
# not.
# CLI flag: -bloom-gateway-client.log-gateway-requests
[log_gateway_requests: <boolean> | default = false]
```

### storage_config

The `storage_config` block configures one of many possible stores for both the index and chunks. Which configuration to be picked should be defined in schema_config block.
Expand Down Expand Up @@ -2722,6 +2845,11 @@ shard_streams:
# CLI flag: -index-gateway.shard-size
[index_gateway_shard_size: <int> | default = 0]

# The shard size defines how many bloom gateways should be used by a tenant for
# querying.
# CLI flag: -bloom-gateway.shard-size
[bloom_gateway_shard_size: <int> | default = 1]

# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]
Expand Down Expand Up @@ -3362,6 +3490,7 @@ ring:

Configuration for a Consul client. Only applies if the selected kvstore is `consul`. The supported CLI flags `<prefix>` used to reference this configuration block are:

- `bloom-gateway.ring`
- `common.storage.ring`
- `compactor.ring`
- `distributor.ring`
Expand Down Expand Up @@ -3406,6 +3535,7 @@ Configuration for a Consul client. Only applies if the selected kvstore is `cons

Configuration for an ETCD v3 client. Only applies if the selected kvstore is `etcd`. The supported CLI flags `<prefix>` used to reference this configuration block are:

- `bloom-gateway.ring`
- `common.storage.ring`
- `compactor.ring`
- `distributor.ring`
Expand Down Expand Up @@ -3707,6 +3837,7 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type
The `grpc_client` block configures the gRPC client used to communicate between two Loki components. The supported CLI flags `<prefix>` used to reference this configuration block are:

- `bigtable`
- `bloom-gateway-client.grpc`
- `boltdb.shipper.index-gateway-client.grpc`
- `frontend.grpc-client-config`
- `ingester.client`
Expand Down
162 changes: 162 additions & 0 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
Bloom Gateway package
The bloom gateway is a component that can be run as a standalone microserivce
target and provides capabilities for filtering ChunkRefs based on a given list
of line filter expressions.
Querier Query Frontend
| |
................................... service boundary
| |
+----+------+
|
indexgateway.Gateway
|
bloomgateway.BloomQuerier
|
bloomgateway.GatewayClient
|
logproto.BloomGatewayClient
|
................................... service boundary
|
bloomgateway.Gateway
|
bloomshipper.Store
|
bloomshipper.Shipper
|
bloomshipper.BloomFileClient
|
ObjectClient
|
................................... service boundary
|
object storage
*/
package bloomgateway

import (
"context"
"sort"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
var errInvalidTenant = errors.New("invalid tenant in chunk refs")

type metrics struct{}

func newMetrics(r prometheus.Registerer) *metrics {
return &metrics{}
}

type Gateway struct {
services.Service

cfg Config
logger log.Logger
metrics *metrics

bloomStore bloomshipper.Store

sharding ShardingStrategy
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
metrics: newMetrics(reg),
sharding: shardingStrategy,
}

client, err := bloomshipper.NewBloomClient(schemaCfg.Configs, storageCfg, cm)
if err != nil {
return nil, err
}

bloomShipper, err := bloomshipper.NewShipper(client, storageCfg.BloomShipperConfig, logger)
if err != nil {
return nil, err
}

bloomStore, err := bloomshipper.NewBloomStore(bloomShipper)
if err != nil {
return nil, err
}

g.bloomStore = bloomStore
g.Service = services.NewIdleService(g.starting, g.stopping)

return g, nil
}

func (g *Gateway) starting(ctx context.Context) error {
return nil
}

func (g *Gateway) stopping(_ error) error {
g.bloomStore.Stop()
return nil
}

// FilterChunkRefs implements BloomGatewayServer
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

for _, ref := range req.Refs {
if ref.UserID != tenantID {
return nil, errors.Wrapf(errInvalidTenant, "expected chunk refs from tenant %s, got tenant %s", tenantID, ref.UserID)
}
}

// Sort ChunkRefs by fingerprint in ascending order
sort.Slice(req.Refs, func(i, j int) bool {
return req.Refs[i].Fingerprint < req.Refs[j].Fingerprint
})

chunkRefs := req.Refs

// Only query bloom filters if filters are present
if len(req.Filters) > 0 {
chunkRefs, err = g.bloomStore.FilterChunkRefs(ctx, tenantID, req.From.Time(), req.Through.Time(), req.Refs, req.Filters...)
if err != nil {
return nil, err
}
}

// TODO(chaudum): Re-use buffers for response.
resp := make([]*logproto.GroupedChunkRefs, 0)
for idx, chunkRef := range chunkRefs {
fp := chunkRef.Fingerprint
shortRef := &logproto.ShortRef{From: chunkRef.From, Through: chunkRef.Through, Checksum: chunkRef.Checksum}
if idx == 0 || fp > resp[len(resp)-1].Fingerprint {
r := &logproto.GroupedChunkRefs{
Fingerprint: fp,
Tenant: tenantID,
Refs: []*logproto.ShortRef{shortRef},
}
resp = append(resp, r)
continue
}
resp[len(resp)-1].Refs = append(resp[len(resp)-1].Refs, shortRef)
}

return &logproto.FilterChunkRefResponse{ChunkRefs: resp}, nil
}
Loading

0 comments on commit b49b3ce

Please sign in to comment.