Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ingester RF-1 #13365

Merged
merged 9 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ server:
http_listen_port: 3100
grpc_listen_port: 9096
log_level: debug
grpc_server_max_concurrent_streams: 1000

common:
instance_addr: 127.0.0.1
Expand All @@ -17,6 +18,9 @@ common:
kvstore:
store: inmemory

ingester_rf1:
enabled: false

query_range:
results_cache:
cache:
Expand Down
358 changes: 309 additions & 49 deletions docs/sources/shared/configuration.md

Large diffs are not rendered by default.

104 changes: 104 additions & 0 deletions pkg/ingester-rf1/clientpool/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package clientpool

import (
"flag"
"io"
"time"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/util/server"

"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)

var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_client_request_duration_seconds",
Help: "Time spent doing Ingester RF1 requests.",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
}, []string{"operation", "status_code"})

type HealthAndIngesterClient interface {
grpc_health_v1.HealthClient
Close() error
}

type ClosableHealthAndIngesterClient struct {
logproto.PusherRF1Client
grpc_health_v1.HealthClient
io.Closer
}

// Config for an ingester client.
type Config struct {
PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures how connections are pooled."`
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures how the gRPC connection to ingesters work as a client."`
GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"`
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"`

// Internal is used to indicate that this client communicates on behalf of
// a machine and not a user. When Internal = true, the client won't attempt
// to inject an userid into the context.
Internal bool `yaml:"-"`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester-rf1.client", f)
cfg.PoolConfig.RegisterFlagsWithPrefix("ingester-rf1.", f)

f.DurationVar(&cfg.PoolConfig.RemoteTimeout, "ingester-rf1.client.healthcheck-timeout", 1*time.Second, "How quickly a dead client will be removed after it has been detected to disappear. Set this to a value to allow time for a secondary health check to recover the missing client.")
f.DurationVar(&cfg.RemoteTimeout, "ingester-rf1.client.timeout", 5*time.Second, "The remote request timeout on the client side.")
}

// New returns a new ingester client.
func NewClient(cfg Config, addr string) (HealthAndIngesterClient, error) {
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...),
}

dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation(&cfg))
if err != nil {
return nil, err
}

opts = append(opts, dialOpts...)
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
return ClosableHealthAndIngesterClient{
PusherRF1Client: logproto.NewPusherRF1Client(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
Closer: conn,
}, nil
}

func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
var unaryInterceptors []grpc.UnaryClientInterceptor
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...)
unaryInterceptors = append(unaryInterceptors, server.UnaryClientQueryTagsInterceptor)
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()))
if !cfg.Internal {
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor)
}
unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(ingesterClientRequestDuration))

var streamInterceptors []grpc.StreamClientInterceptor
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
streamInterceptors = append(streamInterceptors, server.StreamClientQueryTagsInterceptor)
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()))
if !cfg.Internal {
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor)
}
streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(ingesterClientRequestDuration))

return unaryInterceptors, streamInterceptors
}
46 changes: 46 additions & 0 deletions pkg/ingester-rf1/clientpool/ingester_client_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package clientpool

import (
"flag"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var clients prometheus.Gauge

// PoolConfig is config for creating a Pool.
type PoolConfig struct {
ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period"`
HealthCheckIngesters bool `yaml:"health_check_ingesters"`
RemoteTimeout time.Duration `yaml:"remote_timeout"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.ClientCleanupPeriod, prefix+"client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
f.BoolVar(&cfg.HealthCheckIngesters, prefix+"health-check-ingesters", true, "Run a health check on each ingester client during periodic cleanup.")
f.DurationVar(&cfg.RemoteTimeout, prefix+"remote-timeout", 1*time.Second, "Timeout for the health check.")
}

func NewPool(name string, cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger, metricsNamespace string) *ring_client.Pool {
poolCfg := ring_client.PoolConfig{
CheckInterval: cfg.ClientCleanupPeriod,
HealthCheckEnabled: cfg.HealthCheckIngesters,
HealthCheckTimeout: cfg.RemoteTimeout,
}

if clients == nil {
clients = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "ingester_rf1_clients",
Help: "The current number of RF1 ingester clients.",
})
}
// TODO(chaudum): Allow configuration of metric name by the caller.
return ring_client.NewPool(name, poolCfg, ring_client.NewRingServiceDiscovery(ring), factory, clients, logger)
}
186 changes: 186 additions & 0 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package ingesterrf1

import (
"fmt"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
)

const (
// Backoff for retrying 'immediate' flushes. Only counts for queue
// position, not wallclock time.
flushBackoff = 1 * time.Second

nameLabel = "__name__"
logsValue = "logs"

flushReasonIdle = "idle"
flushReasonMaxAge = "max_age"
flushReasonForced = "forced"
flushReasonFull = "full"
flushReasonSynced = "synced"
)

// Note: this is called both during the WAL replay (zero or more times)
// and then after replay as well.
func (i *Ingester) InitFlushQueues() {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength)
go i.flushLoop(j)
}
}

// Flush implements ring.FlushTransferer
// Flush triggers a flush of all the chunks and closes the flush queues.
// Called from the Lifecycler as part of the ingester shutdown.
func (i *Ingester) Flush() {
i.flush()
}

// TransferOut implements ring.FlushTransferer
// Noop implemenetation because ingesters have a WAL now that does not require transferring chunks any more.
// We return ErrTransferDisabled to indicate that we don't support transfers, and therefore we may flush on shutdown if configured to do so.
func (i *Ingester) TransferOut(_ context.Context) error {
return ring.ErrTransferDisabled
}

func (i *Ingester) flush() {
// TODO: Flush the last chunks
// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
flushQueue.Close()
}

i.flushQueuesDone.Wait()
level.Debug(i.logger).Log("msg", "flush queues have drained")
}

// FlushHandler triggers a flush of all in memory chunks. Mainly used for
// local testing.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

type flushOp struct {
from model.Time
userID string
fp model.Fingerprint
immediate bool
}

func (o *flushOp) Key() string {
return fmt.Sprintf("%s-%s-%v", o.userID, o.fp, o.immediate)
}

func (o *flushOp) Priority() int64 {
return -int64(o.from)
}

func (i *Ingester) flushLoop(j int) {
l := log.With(i.logger, "loop", j)
defer func() {
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
i.flushQueuesDone.Done()
}()

for {
o := i.flushQueues[j].Dequeue()
if o == nil {
return
}
op := o.(*flushCtx)

err := i.flushOp(l, op)
if err != nil {
level.Error(l).Log("msg", "failed to flush", "err", err)
// Immediately re-queue another attempt at flushing this segment.
// TODO: Add some backoff or something?
i.flushQueues[j].Enqueue(op)
} else {
// Close the channel and trigger all waiting listeners to return
// TODO: Figure out how to return an error if we want to?
close(op.flushDone)
}
}
}

func (i *Ingester) flushOp(l log.Logger, flushCtx *flushCtx) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushSegment(ctx, flushCtx.segmentWriter)
if err == nil {
break
}
level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err)
b.Wait()
}
return b.Err()
}

// flushChunk flushes the given chunk to the store.
//
// If the flush is successful, metrics for this flush are to be reported.
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// segments to have another opportunity to be flushed.
func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) error {
if err := i.store.PutWal(ctx, ch); err != nil {
i.metrics.chunksFlushFailures.Inc()
return fmt.Errorf("store put chunk: %w", err)
}
i.metrics.flushedChunksStats.Inc(1)
// TODO: report some flush metrics
return nil
}

// reportFlushedChunkStatistics calculate overall statistics of flushed chunks without compromising the flush process.
func (i *Ingester) reportFlushedChunkStatistics(ch *chunk.Chunk, desc *chunkDesc, sizePerTenant prometheus.Counter, countPerTenant prometheus.Counter, reason string) {
byt, err := ch.Encoded()
if err != nil {
level.Error(i.logger).Log("msg", "failed to encode flushed wire chunk", "err", err)
return
}

i.metrics.chunksFlushedPerReason.WithLabelValues(reason).Add(1)

compressedSize := float64(len(byt))
uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data)

if ok && compressedSize > 0 {
i.metrics.chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}

utilization := ch.Data.Utilization()
i.metrics.chunkUtilization.Observe(utilization)
numEntries := desc.chunk.Size()
i.metrics.chunkEntries.Observe(float64(numEntries))
i.metrics.chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
countPerTenant.Inc()

boundsFrom, boundsTo := desc.chunk.Bounds()
i.metrics.chunkAge.Observe(time.Since(boundsFrom).Seconds())
i.metrics.chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours())

i.metrics.flushedChunksBytesStats.Record(compressedSize)
i.metrics.flushedChunksLinesStats.Record(float64(numEntries))
i.metrics.flushedChunksUtilizationStats.Record(utilization)
i.metrics.flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds())
i.metrics.flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Seconds())
}
Loading
Loading