From 33370edf847ae64dc0f6afb6f1148c56138635f2 Mon Sep 17 00:00:00 2001 From: Charlie Le Date: Thu, 21 Mar 2024 18:58:28 -0700 Subject: [PATCH] Use flagext package from cortexproject This also removes the dependency on grafana/dskit since it was only used for its flagext package. Fixes: https://github.com/cortexproject/cortex-tools/issues/23 Signed-off-by: Charlie Le --- cmd/benchtool/main.go | 2 +- cmd/blockscopy/main.go | 2 +- cmd/e2ealerting/main.go | 2 +- .../grafana/dskit/backoff/backoff.go | 117 -- .../grafana/dskit/crypto/tls/tls.go | 87 -- .../github.com/grafana/dskit/flagext/cidr.go | 82 -- .../github.com/grafana/dskit/flagext/day.go | 59 - .../grafana/dskit/flagext/deprecated.go | 37 - .../grafana/dskit/flagext/ignored.go | 22 - .../grafana/dskit/flagext/register.go | 24 - .../grafana/dskit/flagext/secret.go | 34 - .../grafana/dskit/flagext/stringslice.go | 17 - .../grafana/dskit/flagext/stringslicecsv.go | 33 - .../github.com/grafana/dskit/flagext/time.go | 60 - .../github.com/grafana/dskit/flagext/url.go | 59 - .../grafana/dskit/kv/codec/codec.go | 69 - .../grafana/dskit/kv/memberlist/broadcast.go | 63 - .../dskit/kv/memberlist/dnsprovider.go | 15 - .../grafana/dskit/kv/memberlist/kv.pb.go | 767 ---------- .../grafana/dskit/kv/memberlist/kv.proto | 22 - .../dskit/kv/memberlist/kv_init_service.go | 433 ------ .../dskit/kv/memberlist/memberlist_client.go | 1307 ----------------- .../dskit/kv/memberlist/memberlist_logger.go | 106 -- .../grafana/dskit/kv/memberlist/mergeable.go | 46 - .../grafana/dskit/kv/memberlist/metrics.go | 201 --- .../dskit/kv/memberlist/tcp_transport.go | 621 -------- .../grafana/dskit/services/README.md | 155 -- .../grafana/dskit/services/basic_service.go | 359 ----- .../grafana/dskit/services/failure_watcher.go | 35 - .../grafana/dskit/services/manager.go | 344 ----- .../grafana/dskit/services/service.go | 122 -- .../grafana/dskit/services/services.go | 145 -- vendor/modules.txt | 6 - 33 files changed, 3 insertions(+), 5450 deletions(-) delete mode 100644 vendor/github.com/grafana/dskit/backoff/backoff.go delete mode 100644 vendor/github.com/grafana/dskit/crypto/tls/tls.go delete mode 100644 vendor/github.com/grafana/dskit/flagext/cidr.go delete mode 100644 vendor/github.com/grafana/dskit/flagext/day.go delete mode 100644 vendor/github.com/grafana/dskit/flagext/deprecated.go delete mode 100644 vendor/github.com/grafana/dskit/flagext/ignored.go delete mode 100644 vendor/github.com/grafana/dskit/flagext/register.go delete mode 100644 vendor/github.com/grafana/dskit/flagext/secret.go delete mode 100644 vendor/github.com/grafana/dskit/flagext/stringslice.go delete mode 100644 vendor/github.com/grafana/dskit/flagext/stringslicecsv.go delete mode 100644 vendor/github.com/grafana/dskit/flagext/time.go delete mode 100644 vendor/github.com/grafana/dskit/flagext/url.go delete mode 100644 vendor/github.com/grafana/dskit/kv/codec/codec.go delete mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go delete mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/dnsprovider.go delete mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/kv.pb.go delete mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/kv.proto delete mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go delete mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go delete mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/memberlist_logger.go delete mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/mergeable.go delete mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/metrics.go delete mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go delete mode 100644 vendor/github.com/grafana/dskit/services/README.md delete mode 100644 vendor/github.com/grafana/dskit/services/basic_service.go delete mode 100644 vendor/github.com/grafana/dskit/services/failure_watcher.go delete mode 100644 vendor/github.com/grafana/dskit/services/manager.go delete mode 100644 vendor/github.com/grafana/dskit/services/service.go delete mode 100644 vendor/github.com/grafana/dskit/services/services.go diff --git a/cmd/benchtool/main.go b/cmd/benchtool/main.go index 75c79a811..dcb3247c5 100644 --- a/cmd/benchtool/main.go +++ b/cmd/benchtool/main.go @@ -8,9 +8,9 @@ import ( "os" "os/signal" + "github.com/cortexproject/cortex/pkg/util/flagext" logutil "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/flagext" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/weaveworks/common/logging" diff --git a/cmd/blockscopy/main.go b/cmd/blockscopy/main.go index 505bc549a..ee0c2990e 100644 --- a/cmd/blockscopy/main.go +++ b/cmd/blockscopy/main.go @@ -15,10 +15,10 @@ import ( "cloud.google.com/go/storage" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" - "github.com/grafana/dskit/flagext" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" diff --git a/cmd/e2ealerting/main.go b/cmd/e2ealerting/main.go index 3db4615f1..d6804706d 100644 --- a/cmd/e2ealerting/main.go +++ b/cmd/e2ealerting/main.go @@ -7,8 +7,8 @@ import ( "github.com/cortexproject/cortex-tools/pkg/alerting" "github.com/go-kit/log/level" + "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" - "github.com/grafana/dskit/flagext" "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/logging" "github.com/weaveworks/common/server" diff --git a/vendor/github.com/grafana/dskit/backoff/backoff.go b/vendor/github.com/grafana/dskit/backoff/backoff.go deleted file mode 100644 index 2146f3b92..000000000 --- a/vendor/github.com/grafana/dskit/backoff/backoff.go +++ /dev/null @@ -1,117 +0,0 @@ -package backoff - -import ( - "context" - "flag" - "fmt" - "math/rand" - "time" -) - -// Config configures a Backoff -type Config struct { - MinBackoff time.Duration `yaml:"min_period"` // start backoff at this level - MaxBackoff time.Duration `yaml:"max_period"` // increase exponentially to this level - MaxRetries int `yaml:"max_retries"` // give up after this many; zero means infinite retries -} - -// RegisterFlagsWithPrefix for Config. -func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.DurationVar(&cfg.MinBackoff, prefix+".backoff-min-period", 100*time.Millisecond, "Minimum delay when backing off.") - f.DurationVar(&cfg.MaxBackoff, prefix+".backoff-max-period", 10*time.Second, "Maximum delay when backing off.") - f.IntVar(&cfg.MaxRetries, prefix+".backoff-retries", 10, "Number of times to backoff and retry before failing.") -} - -// Backoff implements exponential backoff with randomized wait times -type Backoff struct { - cfg Config - ctx context.Context - numRetries int - nextDelayMin time.Duration - nextDelayMax time.Duration -} - -// New creates a Backoff object. Pass a Context that can also terminate the operation. -func New(ctx context.Context, cfg Config) *Backoff { - return &Backoff{ - cfg: cfg, - ctx: ctx, - nextDelayMin: cfg.MinBackoff, - nextDelayMax: doubleDuration(cfg.MinBackoff, cfg.MaxBackoff), - } -} - -// Reset the Backoff back to its initial condition -func (b *Backoff) Reset() { - b.numRetries = 0 - b.nextDelayMin = b.cfg.MinBackoff - b.nextDelayMax = doubleDuration(b.cfg.MinBackoff, b.cfg.MaxBackoff) -} - -// Ongoing returns true if caller should keep going -func (b *Backoff) Ongoing() bool { - // Stop if Context has errored or max retry count is exceeded - return b.ctx.Err() == nil && (b.cfg.MaxRetries == 0 || b.numRetries < b.cfg.MaxRetries) -} - -// Err returns the reason for terminating the backoff, or nil if it didn't terminate -func (b *Backoff) Err() error { - if b.ctx.Err() != nil { - return b.ctx.Err() - } - if b.cfg.MaxRetries != 0 && b.numRetries >= b.cfg.MaxRetries { - return fmt.Errorf("terminated after %d retries", b.numRetries) - } - return nil -} - -// NumRetries returns the number of retries so far -func (b *Backoff) NumRetries() int { - return b.numRetries -} - -// Wait sleeps for the backoff time then increases the retry count and backoff time -// Returns immediately if Context is terminated -func (b *Backoff) Wait() { - // Increase the number of retries and get the next delay - sleepTime := b.NextDelay() - - if b.Ongoing() { - select { - case <-b.ctx.Done(): - case <-time.After(sleepTime): - } - } -} - -func (b *Backoff) NextDelay() time.Duration { - b.numRetries++ - - // Handle the edge case where the min and max have the same value - // (or due to some misconfig max is < min) - if b.nextDelayMin >= b.nextDelayMax { - return b.nextDelayMin - } - - // Add a jitter within the next exponential backoff range - sleepTime := b.nextDelayMin + time.Duration(rand.Int63n(int64(b.nextDelayMax-b.nextDelayMin))) - - // Apply the exponential backoff to calculate the next jitter - // range, unless we've already reached the max - if b.nextDelayMax < b.cfg.MaxBackoff { - b.nextDelayMin = doubleDuration(b.nextDelayMin, b.cfg.MaxBackoff) - b.nextDelayMax = doubleDuration(b.nextDelayMax, b.cfg.MaxBackoff) - } - - return sleepTime -} - -func doubleDuration(value time.Duration, max time.Duration) time.Duration { - value = value * 2 - - if value <= max { - return value - } - - return max -} diff --git a/vendor/github.com/grafana/dskit/crypto/tls/tls.go b/vendor/github.com/grafana/dskit/crypto/tls/tls.go deleted file mode 100644 index 9886b208d..000000000 --- a/vendor/github.com/grafana/dskit/crypto/tls/tls.go +++ /dev/null @@ -1,87 +0,0 @@ -package tls - -import ( - "crypto/tls" - "crypto/x509" - "flag" - "io/ioutil" - - "github.com/pkg/errors" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" -) - -// ClientConfig is the config for client TLS. -type ClientConfig struct { - CertPath string `yaml:"tls_cert_path"` - KeyPath string `yaml:"tls_key_path"` - CAPath string `yaml:"tls_ca_path"` - ServerName string `yaml:"tls_server_name"` - InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify"` -} - -var ( - errKeyMissing = errors.New("certificate given but no key configured") - errCertMissing = errors.New("key given but no certificate configured") -) - -// RegisterFlagsWithPrefix registers flags with prefix. -func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.StringVar(&cfg.CertPath, prefix+".tls-cert-path", "", "Path to the client certificate file, which will be used for authenticating with the server. Also requires the key path to be configured.") - f.StringVar(&cfg.KeyPath, prefix+".tls-key-path", "", "Path to the key file for the client certificate. Also requires the client certificate to be configured.") - f.StringVar(&cfg.CAPath, prefix+".tls-ca-path", "", "Path to the CA certificates file to validate server certificate against. If not set, the host's root CA certificates are used.") - f.StringVar(&cfg.ServerName, prefix+".tls-server-name", "", "Override the expected name on the server certificate.") - f.BoolVar(&cfg.InsecureSkipVerify, prefix+".tls-insecure-skip-verify", false, "Skip validating server certificate.") -} - -// GetTLSConfig initialises tls.Config from config options -func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { - config := &tls.Config{ - InsecureSkipVerify: cfg.InsecureSkipVerify, - ServerName: cfg.ServerName, - } - - // read ca certificates - if cfg.CAPath != "" { - var caCertPool *x509.CertPool - caCert, err := ioutil.ReadFile(cfg.CAPath) - if err != nil { - return nil, errors.Wrapf(err, "error loading ca cert: %s", cfg.CAPath) - } - caCertPool = x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) - - config.RootCAs = caCertPool - } - - // read client certificate - if cfg.CertPath != "" || cfg.KeyPath != "" { - if cfg.CertPath == "" { - return nil, errCertMissing - } - if cfg.KeyPath == "" { - return nil, errKeyMissing - } - clientCert, err := tls.LoadX509KeyPair(cfg.CertPath, cfg.KeyPath) - if err != nil { - return nil, errors.Wrapf(err, "failed to load TLS certificate %s,%s", cfg.CertPath, cfg.KeyPath) - } - config.Certificates = []tls.Certificate{clientCert} - } - - return config, nil -} - -// GetGRPCDialOptions creates GRPC DialOptions for TLS -func (cfg *ClientConfig) GetGRPCDialOptions(enabled bool) ([]grpc.DialOption, error) { - if !enabled { - return []grpc.DialOption{grpc.WithInsecure()}, nil - } - - tlsConfig, err := cfg.GetTLSConfig() - if err != nil { - return nil, errors.Wrap(err, "error creating grpc dial options") - } - - return []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}, nil -} diff --git a/vendor/github.com/grafana/dskit/flagext/cidr.go b/vendor/github.com/grafana/dskit/flagext/cidr.go deleted file mode 100644 index 72b93b680..000000000 --- a/vendor/github.com/grafana/dskit/flagext/cidr.go +++ /dev/null @@ -1,82 +0,0 @@ -package flagext - -import ( - "net" - "strings" - - "github.com/pkg/errors" -) - -// CIDR is a network CIDR. -type CIDR struct { - Value *net.IPNet -} - -// String implements flag.Value. -func (c CIDR) String() string { - if c.Value == nil { - return "" - } - return c.Value.String() -} - -// Set implements flag.Value. -func (c *CIDR) Set(s string) error { - _, value, err := net.ParseCIDR(s) - if err != nil { - return err - } - c.Value = value - return nil -} - -// CIDRSliceCSV is a slice of CIDRs that is parsed from a comma-separated string. -// It implements flag.Value and yaml Marshalers. -type CIDRSliceCSV []CIDR - -// String implements flag.Value -func (c CIDRSliceCSV) String() string { - values := make([]string, 0, len(c)) - for _, cidr := range c { - values = append(values, cidr.String()) - } - - return strings.Join(values, ",") -} - -// Set implements flag.Value -func (c *CIDRSliceCSV) Set(s string) error { - parts := strings.Split(s, ",") - - for _, part := range parts { - cidr := &CIDR{} - if err := cidr.Set(part); err != nil { - return errors.Wrapf(err, "cidr: %s", part) - } - - *c = append(*c, *cidr) - } - - return nil -} - -// UnmarshalYAML implements yaml.Unmarshaler. -func (c *CIDRSliceCSV) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - - // An empty string means no CIDRs has been configured. - if s == "" { - *c = nil - return nil - } - - return c.Set(s) -} - -// MarshalYAML implements yaml.Marshaler. -func (c CIDRSliceCSV) MarshalYAML() (interface{}, error) { - return c.String(), nil -} diff --git a/vendor/github.com/grafana/dskit/flagext/day.go b/vendor/github.com/grafana/dskit/flagext/day.go deleted file mode 100644 index 8370ac0d5..000000000 --- a/vendor/github.com/grafana/dskit/flagext/day.go +++ /dev/null @@ -1,59 +0,0 @@ -package flagext - -import ( - "time" - - "github.com/prometheus/common/model" -) - -const secondsInDay = 24 * 60 * 60 - -// DayValue is a model.Time that can be used as a flag. -// NB it only parses days! -type DayValue struct { - model.Time - set bool -} - -// NewDayValue makes a new DayValue; will round t down to the nearest midnight. -func NewDayValue(t model.Time) DayValue { - return DayValue{ - Time: model.TimeFromUnix((t.Unix() / secondsInDay) * secondsInDay), - set: true, - } -} - -// String implements flag.Value -func (v DayValue) String() string { - return v.Time.Time().Format(time.RFC3339) -} - -// Set implements flag.Value -func (v *DayValue) Set(s string) error { - t, err := time.Parse("2006-01-02", s) - if err != nil { - return err - } - v.Time = model.TimeFromUnix(t.Unix()) - v.set = true - return nil -} - -// IsSet returns true is the DayValue has been set. -func (v *DayValue) IsSet() bool { - return v.set -} - -// UnmarshalYAML implements yaml.Unmarshaler. -func (v *DayValue) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - return v.Set(s) -} - -// MarshalYAML implements yaml.Marshaler. -func (v DayValue) MarshalYAML() (interface{}, error) { - return v.Time.Time().Format("2006-01-02"), nil -} diff --git a/vendor/github.com/grafana/dskit/flagext/deprecated.go b/vendor/github.com/grafana/dskit/flagext/deprecated.go deleted file mode 100644 index 188be2b04..000000000 --- a/vendor/github.com/grafana/dskit/flagext/deprecated.go +++ /dev/null @@ -1,37 +0,0 @@ -package flagext - -import ( - "flag" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -// DeprecatedFlagsUsed is the metric that counts deprecated flags set. -var DeprecatedFlagsUsed = promauto.NewCounter( - prometheus.CounterOpts{ - Name: "deprecated_flags_inuse_total", - Help: "The number of deprecated flags currently set.", - }) - -type deprecatedFlag struct { - name string - logger log.Logger -} - -func (deprecatedFlag) String() string { - return "deprecated" -} - -func (d deprecatedFlag) Set(string) error { - level.Warn(d.logger).Log("msg", "flag disabled", "flag", d.name) - DeprecatedFlagsUsed.Inc() - return nil -} - -// DeprecatedFlag logs a warning when you try to use it. -func DeprecatedFlag(f *flag.FlagSet, name, message string, logger log.Logger) { - f.Var(deprecatedFlag{name: name, logger: logger}, name, message) -} diff --git a/vendor/github.com/grafana/dskit/flagext/ignored.go b/vendor/github.com/grafana/dskit/flagext/ignored.go deleted file mode 100644 index 2dd49a87e..000000000 --- a/vendor/github.com/grafana/dskit/flagext/ignored.go +++ /dev/null @@ -1,22 +0,0 @@ -package flagext - -import ( - "flag" -) - -type ignoredFlag struct { - name string -} - -func (ignoredFlag) String() string { - return "ignored" -} - -func (d ignoredFlag) Set(string) error { - return nil -} - -// IgnoredFlag ignores set value, without any warning -func IgnoredFlag(f *flag.FlagSet, name, message string) { - f.Var(ignoredFlag{name}, name, message) -} diff --git a/vendor/github.com/grafana/dskit/flagext/register.go b/vendor/github.com/grafana/dskit/flagext/register.go deleted file mode 100644 index 1140843e0..000000000 --- a/vendor/github.com/grafana/dskit/flagext/register.go +++ /dev/null @@ -1,24 +0,0 @@ -package flagext - -import "flag" - -// Registerer is a thing that can RegisterFlags -type Registerer interface { - RegisterFlags(*flag.FlagSet) -} - -// RegisterFlags registers flags with the provided Registerers -func RegisterFlags(rs ...Registerer) { - for _, r := range rs { - r.RegisterFlags(flag.CommandLine) - } -} - -// DefaultValues initiates a set of configs (Registerers) with their defaults. -func DefaultValues(rs ...Registerer) { - fs := flag.NewFlagSet("", flag.PanicOnError) - for _, r := range rs { - r.RegisterFlags(fs) - } - _ = fs.Parse([]string{}) -} diff --git a/vendor/github.com/grafana/dskit/flagext/secret.go b/vendor/github.com/grafana/dskit/flagext/secret.go deleted file mode 100644 index aa7101b14..000000000 --- a/vendor/github.com/grafana/dskit/flagext/secret.go +++ /dev/null @@ -1,34 +0,0 @@ -package flagext - -type Secret struct { - Value string -} - -// String implements flag.Value -func (v Secret) String() string { - return v.Value -} - -// Set implements flag.Value -func (v *Secret) Set(s string) error { - v.Value = s - return nil -} - -// UnmarshalYAML implements yaml.Unmarshaler. -func (v *Secret) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - - return v.Set(s) -} - -// MarshalYAML implements yaml.Marshaler. -func (v Secret) MarshalYAML() (interface{}, error) { - if len(v.Value) == 0 { - return "", nil - } - return "********", nil -} diff --git a/vendor/github.com/grafana/dskit/flagext/stringslice.go b/vendor/github.com/grafana/dskit/flagext/stringslice.go deleted file mode 100644 index 7c4fd45cf..000000000 --- a/vendor/github.com/grafana/dskit/flagext/stringslice.go +++ /dev/null @@ -1,17 +0,0 @@ -package flagext - -import "fmt" - -// StringSlice is a slice of strings that implements flag.Value -type StringSlice []string - -// String implements flag.Value -func (v StringSlice) String() string { - return fmt.Sprintf("%s", []string(v)) -} - -// Set implements flag.Value -func (v *StringSlice) Set(s string) error { - *v = append(*v, s) - return nil -} diff --git a/vendor/github.com/grafana/dskit/flagext/stringslicecsv.go b/vendor/github.com/grafana/dskit/flagext/stringslicecsv.go deleted file mode 100644 index 47ccd54ca..000000000 --- a/vendor/github.com/grafana/dskit/flagext/stringslicecsv.go +++ /dev/null @@ -1,33 +0,0 @@ -package flagext - -import "strings" - -// StringSliceCSV is a slice of strings that is parsed from a comma-separated string -// It implements flag.Value and yaml Marshalers -type StringSliceCSV []string - -// String implements flag.Value -func (v StringSliceCSV) String() string { - return strings.Join(v, ",") -} - -// Set implements flag.Value -func (v *StringSliceCSV) Set(s string) error { - *v = strings.Split(s, ",") - return nil -} - -// UnmarshalYAML implements yaml.Unmarshaler. -func (v *StringSliceCSV) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - - return v.Set(s) -} - -// MarshalYAML implements yaml.Marshaler. -func (v StringSliceCSV) MarshalYAML() (interface{}, error) { - return v.String(), nil -} diff --git a/vendor/github.com/grafana/dskit/flagext/time.go b/vendor/github.com/grafana/dskit/flagext/time.go deleted file mode 100644 index 452857e9d..000000000 --- a/vendor/github.com/grafana/dskit/flagext/time.go +++ /dev/null @@ -1,60 +0,0 @@ -package flagext - -import ( - "fmt" - "time" -) - -// Time usable as flag or in YAML config. -type Time time.Time - -// String implements flag.Value -func (t Time) String() string { - if time.Time(t).IsZero() { - return "0" - } - - return time.Time(t).Format(time.RFC3339) -} - -// Set implements flag.Value -func (t *Time) Set(s string) error { - if s == "0" { - *t = Time(time.Time{}) - return nil - } - - p, err := time.Parse("2006-01-02", s) - if err == nil { - *t = Time(p) - return nil - } - - p, err = time.Parse("2006-01-02T15:04", s) - if err == nil { - *t = Time(p) - return nil - } - - p, err = time.Parse("2006-01-02T15:04:05Z07:00", s) - if err == nil { - *t = Time(p) - return nil - } - - return fmt.Errorf("failed to parse time: %q", s) -} - -// UnmarshalYAML implements yaml.Unmarshaler. -func (t *Time) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - return t.Set(s) -} - -// MarshalYAML implements yaml.Marshaler. -func (t Time) MarshalYAML() (interface{}, error) { - return t.String(), nil -} diff --git a/vendor/github.com/grafana/dskit/flagext/url.go b/vendor/github.com/grafana/dskit/flagext/url.go deleted file mode 100644 index 3b3b8303b..000000000 --- a/vendor/github.com/grafana/dskit/flagext/url.go +++ /dev/null @@ -1,59 +0,0 @@ -package flagext - -import "net/url" - -// URLValue is a url.URL that can be used as a flag. -type URLValue struct { - *url.URL -} - -// String implements flag.Value -func (v URLValue) String() string { - if v.URL == nil { - return "" - } - return v.URL.String() -} - -// Set implements flag.Value -func (v *URLValue) Set(s string) error { - u, err := url.Parse(s) - if err != nil { - return err - } - v.URL = u - return nil -} - -// UnmarshalYAML implements yaml.Unmarshaler. -func (v *URLValue) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - - // An empty string means no URL has been configured. - if s == "" { - v.URL = nil - return nil - } - - return v.Set(s) -} - -// MarshalYAML implements yaml.Marshaler. -func (v URLValue) MarshalYAML() (interface{}, error) { - if v.URL == nil { - return "", nil - } - - // Mask out passwords when marshalling URLs back to YAML. - u := *v.URL - if u.User != nil { - if _, set := u.User.Password(); set { - u.User = url.UserPassword(u.User.Username(), "********") - } - } - - return u.String(), nil -} diff --git a/vendor/github.com/grafana/dskit/kv/codec/codec.go b/vendor/github.com/grafana/dskit/kv/codec/codec.go deleted file mode 100644 index 49540fe3a..000000000 --- a/vendor/github.com/grafana/dskit/kv/codec/codec.go +++ /dev/null @@ -1,69 +0,0 @@ -package codec - -import ( - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" -) - -// Codec allows KV clients to serialise and deserialise values. -type Codec interface { - Decode([]byte) (interface{}, error) - Encode(interface{}) ([]byte, error) - - // CodecID is a short identifier to communicate what codec should be used to decode the value. - // Once in use, this should be stable to avoid confusing other clients. - CodecID() string -} - -// Proto is a Codec for proto/snappy -type Proto struct { - id string - factory func() proto.Message -} - -func NewProtoCodec(id string, factory func() proto.Message) Proto { - return Proto{id: id, factory: factory} -} - -func (p Proto) CodecID() string { - return p.id -} - -// Decode implements Codec -func (p Proto) Decode(bytes []byte) (interface{}, error) { - out := p.factory() - bytes, err := snappy.Decode(nil, bytes) - if err != nil { - return nil, err - } - if err := proto.Unmarshal(bytes, out); err != nil { - return nil, err - } - return out, nil -} - -// Encode implements Codec -func (p Proto) Encode(msg interface{}) ([]byte, error) { - bytes, err := proto.Marshal(msg.(proto.Message)) - if err != nil { - return nil, err - } - return snappy.Encode(nil, bytes), nil -} - -// String is a code for strings. -type String struct{} - -func (String) CodecID() string { - return "string" -} - -// Decode implements Codec. -func (String) Decode(bytes []byte) (interface{}, error) { - return string(bytes), nil -} - -// Encode implements Codec. -func (String) Encode(msg interface{}) ([]byte, error) { - return []byte(msg.(string)), nil -} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go b/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go deleted file mode 100644 index 6657b73a5..000000000 --- a/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go +++ /dev/null @@ -1,63 +0,0 @@ -package memberlist - -import ( - "fmt" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/hashicorp/memberlist" -) - -// ringBroadcast implements memberlist.Broadcast interface, which is used by memberlist.TransmitLimitedQueue. -type ringBroadcast struct { - key string - content []string // Description of what is stored in this value. Used for invalidation. - version uint // local version of the value, generated by merging this change - msg []byte // encoded key and value - finished func(b ringBroadcast) - logger log.Logger -} - -func (r ringBroadcast) Invalidates(old memberlist.Broadcast) bool { - if oldb, ok := old.(ringBroadcast); ok { - if r.key != oldb.key { - return false - } - - // if 'content' (result of Mergeable.MergeContent) of this broadcast is a superset of content of old value, - // and this broadcast has resulted in a newer ring update, we can invalidate the old value - - for _, oldName := range oldb.content { - found := false - for _, newName := range r.content { - if oldName == newName { - found = true - break - } - } - - if !found { - return false - } - } - - // only do this check if this ringBroadcast covers same ingesters as 'b' - // otherwise, we may be invalidating some older messages, which however covered different - // ingesters - if r.version >= oldb.version { - level.Debug(r.logger).Log("msg", "Invalidating forwarded broadcast", "key", r.key, "version", r.version, "oldVersion", oldb.version, "content", fmt.Sprintf("%v", r.content), "oldContent", fmt.Sprintf("%v", oldb.content)) - return true - } - } - return false -} - -func (r ringBroadcast) Message() []byte { - return r.msg -} - -func (r ringBroadcast) Finished() { - if r.finished != nil { - r.finished(r) - } -} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/dnsprovider.go b/vendor/github.com/grafana/dskit/kv/memberlist/dnsprovider.go deleted file mode 100644 index b51a5d055..000000000 --- a/vendor/github.com/grafana/dskit/kv/memberlist/dnsprovider.go +++ /dev/null @@ -1,15 +0,0 @@ -package memberlist - -import ( - "context" -) - -// DNSProvider supports storing or resolving a list of addresses. -type DNSProvider interface { - // Resolve stores a list of provided addresses or their DNS records if requested. - // Implementations may have specific ways of interpreting addresses. - Resolve(ctx context.Context, addrs []string) error - - // Addresses returns the latest addresses present in the DNSProvider. - Addresses() []string -} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/kv.pb.go b/vendor/github.com/grafana/dskit/kv/memberlist/kv.pb.go deleted file mode 100644 index 4c2eb9265..000000000 --- a/vendor/github.com/grafana/dskit/kv/memberlist/kv.pb.go +++ /dev/null @@ -1,767 +0,0 @@ -// Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: kv.proto - -package memberlist - -import ( - bytes "bytes" - fmt "fmt" - _ "github.com/gogo/protobuf/gogoproto" - proto "github.com/gogo/protobuf/proto" - io "io" - math "math" - math_bits "math/bits" - reflect "reflect" - strings "strings" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package - -// KV Store is just a series of key-value pairs. -type KeyValueStore struct { - Pairs []*KeyValuePair `protobuf:"bytes,1,rep,name=pairs,proto3" json:"pairs,omitempty"` -} - -func (m *KeyValueStore) Reset() { *m = KeyValueStore{} } -func (*KeyValueStore) ProtoMessage() {} -func (*KeyValueStore) Descriptor() ([]byte, []int) { - return fileDescriptor_2216fe83c9c12408, []int{0} -} -func (m *KeyValueStore) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *KeyValueStore) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_KeyValueStore.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *KeyValueStore) XXX_Merge(src proto.Message) { - xxx_messageInfo_KeyValueStore.Merge(m, src) -} -func (m *KeyValueStore) XXX_Size() int { - return m.Size() -} -func (m *KeyValueStore) XXX_DiscardUnknown() { - xxx_messageInfo_KeyValueStore.DiscardUnknown(m) -} - -var xxx_messageInfo_KeyValueStore proto.InternalMessageInfo - -func (m *KeyValueStore) GetPairs() []*KeyValuePair { - if m != nil { - return m.Pairs - } - return nil -} - -// Single Key-Value pair. Key must be non-empty. -type KeyValuePair struct { - Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - // ID of the codec used to write the value - Codec string `protobuf:"bytes,3,opt,name=codec,proto3" json:"codec,omitempty"` -} - -func (m *KeyValuePair) Reset() { *m = KeyValuePair{} } -func (*KeyValuePair) ProtoMessage() {} -func (*KeyValuePair) Descriptor() ([]byte, []int) { - return fileDescriptor_2216fe83c9c12408, []int{1} -} -func (m *KeyValuePair) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *KeyValuePair) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_KeyValuePair.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *KeyValuePair) XXX_Merge(src proto.Message) { - xxx_messageInfo_KeyValuePair.Merge(m, src) -} -func (m *KeyValuePair) XXX_Size() int { - return m.Size() -} -func (m *KeyValuePair) XXX_DiscardUnknown() { - xxx_messageInfo_KeyValuePair.DiscardUnknown(m) -} - -var xxx_messageInfo_KeyValuePair proto.InternalMessageInfo - -func (m *KeyValuePair) GetKey() string { - if m != nil { - return m.Key - } - return "" -} - -func (m *KeyValuePair) GetValue() []byte { - if m != nil { - return m.Value - } - return nil -} - -func (m *KeyValuePair) GetCodec() string { - if m != nil { - return m.Codec - } - return "" -} - -func init() { - proto.RegisterType((*KeyValueStore)(nil), "memberlist.KeyValueStore") - proto.RegisterType((*KeyValuePair)(nil), "memberlist.KeyValuePair") -} - -func init() { proto.RegisterFile("kv.proto", fileDescriptor_2216fe83c9c12408) } - -var fileDescriptor_2216fe83c9c12408 = []byte{ - // 236 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xc8, 0x2e, 0xd3, 0x2b, - 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0x4d, 0xcd, 0x4d, 0x4a, 0x2d, 0xca, 0xc9, 0x2c, 0x2e, - 0x91, 0xd2, 0x4d, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xcf, 0x4f, - 0xcf, 0xd7, 0x07, 0x2b, 0x49, 0x2a, 0x4d, 0x03, 0xf3, 0xc0, 0x1c, 0x30, 0x0b, 0xa2, 0x55, 0xc9, - 0x9e, 0x8b, 0xd7, 0x3b, 0xb5, 0x32, 0x2c, 0x31, 0xa7, 0x34, 0x35, 0xb8, 0x24, 0xbf, 0x28, 0x55, - 0x48, 0x8f, 0x8b, 0xb5, 0x20, 0x31, 0xb3, 0xa8, 0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0xdb, 0x48, - 0x42, 0x0f, 0x61, 0xb6, 0x1e, 0x4c, 0x65, 0x40, 0x62, 0x66, 0x51, 0x10, 0x44, 0x99, 0x92, 0x0f, - 0x17, 0x0f, 0xb2, 0xb0, 0x90, 0x00, 0x17, 0x73, 0x76, 0x6a, 0xa5, 0x04, 0xa3, 0x02, 0xa3, 0x06, - 0x67, 0x10, 0x88, 0x29, 0x24, 0xc2, 0xc5, 0x5a, 0x06, 0x92, 0x96, 0x60, 0x52, 0x60, 0xd4, 0xe0, - 0x09, 0x82, 0x70, 0x40, 0xa2, 0xc9, 0xf9, 0x29, 0xa9, 0xc9, 0x12, 0xcc, 0x60, 0x95, 0x10, 0x8e, - 0x93, 0xc9, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, - 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, - 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, - 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x24, 0x36, 0xb0, 0x5f, 0x8c, 0x01, 0x01, - 0x00, 0x00, 0xff, 0xff, 0x7a, 0x22, 0xdf, 0xec, 0x12, 0x01, 0x00, 0x00, -} - -func (this *KeyValueStore) Equal(that interface{}) bool { - if that == nil { - return this == nil - } - - that1, ok := that.(*KeyValueStore) - if !ok { - that2, ok := that.(KeyValueStore) - if ok { - that1 = &that2 - } else { - return false - } - } - if that1 == nil { - return this == nil - } else if this == nil { - return false - } - if len(this.Pairs) != len(that1.Pairs) { - return false - } - for i := range this.Pairs { - if !this.Pairs[i].Equal(that1.Pairs[i]) { - return false - } - } - return true -} -func (this *KeyValuePair) Equal(that interface{}) bool { - if that == nil { - return this == nil - } - - that1, ok := that.(*KeyValuePair) - if !ok { - that2, ok := that.(KeyValuePair) - if ok { - that1 = &that2 - } else { - return false - } - } - if that1 == nil { - return this == nil - } else if this == nil { - return false - } - if this.Key != that1.Key { - return false - } - if !bytes.Equal(this.Value, that1.Value) { - return false - } - if this.Codec != that1.Codec { - return false - } - return true -} -func (this *KeyValueStore) GoString() string { - if this == nil { - return "nil" - } - s := make([]string, 0, 5) - s = append(s, "&memberlist.KeyValueStore{") - if this.Pairs != nil { - s = append(s, "Pairs: "+fmt.Sprintf("%#v", this.Pairs)+",\n") - } - s = append(s, "}") - return strings.Join(s, "") -} -func (this *KeyValuePair) GoString() string { - if this == nil { - return "nil" - } - s := make([]string, 0, 7) - s = append(s, "&memberlist.KeyValuePair{") - s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") - s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") - s = append(s, "Codec: "+fmt.Sprintf("%#v", this.Codec)+",\n") - s = append(s, "}") - return strings.Join(s, "") -} -func valueToGoStringKv(v interface{}, typ string) string { - rv := reflect.ValueOf(v) - if rv.IsNil() { - return "nil" - } - pv := reflect.Indirect(rv).Interface() - return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) -} -func (m *KeyValueStore) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *KeyValueStore) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *KeyValueStore) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Pairs) > 0 { - for iNdEx := len(m.Pairs) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Pairs[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintKv(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0xa - } - } - return len(dAtA) - i, nil -} - -func (m *KeyValuePair) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *KeyValuePair) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *KeyValuePair) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Codec) > 0 { - i -= len(m.Codec) - copy(dAtA[i:], m.Codec) - i = encodeVarintKv(dAtA, i, uint64(len(m.Codec))) - i-- - dAtA[i] = 0x1a - } - if len(m.Value) > 0 { - i -= len(m.Value) - copy(dAtA[i:], m.Value) - i = encodeVarintKv(dAtA, i, uint64(len(m.Value))) - i-- - dAtA[i] = 0x12 - } - if len(m.Key) > 0 { - i -= len(m.Key) - copy(dAtA[i:], m.Key) - i = encodeVarintKv(dAtA, i, uint64(len(m.Key))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func encodeVarintKv(dAtA []byte, offset int, v uint64) int { - offset -= sovKv(v) - base := offset - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return base -} -func (m *KeyValueStore) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if len(m.Pairs) > 0 { - for _, e := range m.Pairs { - l = e.Size() - n += 1 + l + sovKv(uint64(l)) - } - } - return n -} - -func (m *KeyValuePair) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.Key) - if l > 0 { - n += 1 + l + sovKv(uint64(l)) - } - l = len(m.Value) - if l > 0 { - n += 1 + l + sovKv(uint64(l)) - } - l = len(m.Codec) - if l > 0 { - n += 1 + l + sovKv(uint64(l)) - } - return n -} - -func sovKv(x uint64) (n int) { - return (math_bits.Len64(x|1) + 6) / 7 -} -func sozKv(x uint64) (n int) { - return sovKv(uint64((x << 1) ^ uint64((int64(x) >> 63)))) -} -func (this *KeyValueStore) String() string { - if this == nil { - return "nil" - } - repeatedStringForPairs := "[]*KeyValuePair{" - for _, f := range this.Pairs { - repeatedStringForPairs += strings.Replace(f.String(), "KeyValuePair", "KeyValuePair", 1) + "," - } - repeatedStringForPairs += "}" - s := strings.Join([]string{`&KeyValueStore{`, - `Pairs:` + repeatedStringForPairs + `,`, - `}`, - }, "") - return s -} -func (this *KeyValuePair) String() string { - if this == nil { - return "nil" - } - s := strings.Join([]string{`&KeyValuePair{`, - `Key:` + fmt.Sprintf("%v", this.Key) + `,`, - `Value:` + fmt.Sprintf("%v", this.Value) + `,`, - `Codec:` + fmt.Sprintf("%v", this.Codec) + `,`, - `}`, - }, "") - return s -} -func valueToStringKv(v interface{}) string { - rv := reflect.ValueOf(v) - if rv.IsNil() { - return "nil" - } - pv := reflect.Indirect(rv).Interface() - return fmt.Sprintf("*%v", pv) -} -func (m *KeyValueStore) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowKv - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: KeyValueStore: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: KeyValueStore: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Pairs", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowKv - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthKv - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthKv - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Pairs = append(m.Pairs, &KeyValuePair{}) - if err := m.Pairs[len(m.Pairs)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipKv(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthKv - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthKv - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *KeyValuePair) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowKv - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: KeyValuePair: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: KeyValuePair: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowKv - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthKv - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthKv - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Key = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowKv - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthKv - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthKv - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...) - if m.Value == nil { - m.Value = []byte{} - } - iNdEx = postIndex - case 3: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Codec", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowKv - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthKv - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthKv - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Codec = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipKv(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthKv - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthKv - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func skipKv(dAtA []byte) (n int, err error) { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowKv - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - wireType := int(wire & 0x7) - switch wireType { - case 0: - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowKv - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - iNdEx++ - if dAtA[iNdEx-1] < 0x80 { - break - } - } - return iNdEx, nil - case 1: - iNdEx += 8 - return iNdEx, nil - case 2: - var length int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowKv - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - length |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if length < 0 { - return 0, ErrInvalidLengthKv - } - iNdEx += length - if iNdEx < 0 { - return 0, ErrInvalidLengthKv - } - return iNdEx, nil - case 3: - for { - var innerWire uint64 - var start int = iNdEx - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowKv - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - innerWire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { - break - } - next, err := skipKv(dAtA[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next - if iNdEx < 0 { - return 0, ErrInvalidLengthKv - } - } - return iNdEx, nil - case 4: - return iNdEx, nil - case 5: - iNdEx += 4 - return iNdEx, nil - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - } - panic("unreachable") -} - -var ( - ErrInvalidLengthKv = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowKv = fmt.Errorf("proto: integer overflow") -) diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/kv.proto b/vendor/github.com/grafana/dskit/kv/memberlist/kv.proto deleted file mode 100644 index cc5f12463..000000000 --- a/vendor/github.com/grafana/dskit/kv/memberlist/kv.proto +++ /dev/null @@ -1,22 +0,0 @@ -syntax = "proto3"; - -package memberlist; - -import "github.com/gogo/protobuf/gogoproto/gogo.proto"; - -option (gogoproto.marshaler_all) = true; -option (gogoproto.unmarshaler_all) = true; - -// KV Store is just a series of key-value pairs. -message KeyValueStore { - repeated KeyValuePair pairs = 1; -} - -// Single Key-Value pair. Key must be non-empty. -message KeyValuePair { - string key = 1; - bytes value = 2; - - // ID of the codec used to write the value - string codec = 3; -} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go b/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go deleted file mode 100644 index e63aac2f4..000000000 --- a/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go +++ /dev/null @@ -1,433 +0,0 @@ -package memberlist - -import ( - "context" - "encoding/json" - "fmt" - "html/template" - "net/http" - "sort" - "strconv" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/hashicorp/memberlist" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/atomic" - - "github.com/grafana/dskit/services" -) - -// KVInitService initializes a memberlist.KV on first call to GetMemberlistKV, and starts it. On stop, -// KV is stopped too. If KV fails, error is reported from the service. -type KVInitService struct { - services.Service - - // config used for initialization - cfg *KVConfig - logger log.Logger - dnsProvider DNSProvider - registerer prometheus.Registerer - - // init function, to avoid multiple initializations. - init sync.Once - - // state - kv atomic.Value - err error - watcher *services.FailureWatcher -} - -func NewKVInitService(cfg *KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KVInitService { - kvinit := &KVInitService{ - cfg: cfg, - watcher: services.NewFailureWatcher(), - logger: logger, - registerer: registerer, - dnsProvider: dnsProvider, - } - kvinit.Service = services.NewBasicService(nil, kvinit.running, kvinit.stopping).WithName("memberlist KV service") - return kvinit -} - -// GetMemberlistKV will initialize Memberlist.KV on first call, and add it to service failure watcher. -func (kvs *KVInitService) GetMemberlistKV() (*KV, error) { - kvs.init.Do(func() { - kv := NewKV(*kvs.cfg, kvs.logger, kvs.dnsProvider, kvs.registerer) - kvs.watcher.WatchService(kv) - kvs.err = kv.StartAsync(context.Background()) - - kvs.kv.Store(kv) - }) - - return kvs.getKV(), kvs.err -} - -// Returns KV if it was initialized, or nil. -func (kvs *KVInitService) getKV() *KV { - kv := kvs.kv.Load() - if kv == nil { - return nil - } - return kv.(*KV) -} - -func (kvs *KVInitService) running(ctx context.Context) error { - select { - case <-ctx.Done(): - return nil - case err := <-kvs.watcher.Chan(): - // Only happens if KV service was actually initialized in GetMemberlistKV and it fails. - return err - } -} - -func (kvs *KVInitService) stopping(_ error) error { - kv := kvs.getKV() - if kv == nil { - return nil - } - - return services.StopAndAwaitTerminated(context.Background(), kv) -} - -func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) { - kv := kvs.getKV() - if kv == nil { - w.Header().Set("Content-Type", "text/plain") - // Ignore inactionable errors. - _, _ = w.Write([]byte("This instance doesn't use memberlist.")) - return - } - - const ( - downloadKeyParam = "downloadKey" - viewKeyParam = "viewKey" - viewMsgParam = "viewMsg" - deleteMessagesParam = "deleteMessages" - ) - - if err := req.ParseForm(); err == nil { - if req.Form[downloadKeyParam] != nil { - downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. - return - } - - if req.Form[viewKeyParam] != nil { - viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) - return - } - - if req.Form[viewMsgParam] != nil { - msgID, err := strconv.Atoi(req.Form[viewMsgParam][0]) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - sent, received := kv.getSentAndReceivedMessages() - - for _, m := range append(sent, received...) { - if m.ID == msgID { - viewMessage(w, kv, m, getFormat(req)) - return - } - } - - http.Error(w, "message not found", http.StatusNotFound) - return - } - - if len(req.Form[deleteMessagesParam]) > 0 && req.Form[deleteMessagesParam][0] == "true" { - kv.deleteSentReceivedMessages() - - // Redirect back. - w.Header().Set("Location", "?"+deleteMessagesParam+"=false") - w.WriteHeader(http.StatusFound) - return - } - } - - members := kv.memberlist.Members() - sort.Slice(members, func(i, j int) bool { - return members[i].Name < members[j].Name - }) - - sent, received := kv.getSentAndReceivedMessages() - - v := pageData{ - Now: time.Now(), - Memberlist: kv.memberlist, - SortedMembers: members, - Store: kv.storeCopy(), - SentMessages: sent, - ReceivedMessages: received, - } - - accept := req.Header.Get("Accept") - if strings.Contains(accept, "application/json") { - w.Header().Set("Content-Type", "application/json") - - data, err := json.Marshal(v) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // We ignore errors here, because we cannot do anything about them. - // Write will trigger sending Status code, so we cannot send a different status code afterwards. - // Also this isn't internal error, but error communicating with client. - _, _ = w.Write(data) - return - } - - err := pageTemplate.Execute(w, v) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - -func getFormat(req *http.Request) string { - const viewFormat = "format" - - format := "" - if len(req.Form[viewFormat]) > 0 { - format = req.Form[viewFormat][0] - } - return format -} - -func viewMessage(w http.ResponseWriter, kv *KV, msg message, format string) { - c := kv.GetCodec(msg.Pair.Codec) - if c == nil { - http.Error(w, "codec not found", http.StatusNotFound) - return - } - - val, err := c.Decode(msg.Pair.Value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) - return - } - - formatValue(w, val, format) -} - -func viewKey(w http.ResponseWriter, store map[string]valueDesc, key string, format string) { - if store[key].value == nil { - http.Error(w, "value not found", http.StatusNotFound) - return - } - - formatValue(w, store[key].value, format) -} - -func formatValue(w http.ResponseWriter, val interface{}, format string) { - - w.WriteHeader(200) - w.Header().Add("content-type", "text/plain") - - switch format { - case "json", "json-pretty": - enc := json.NewEncoder(w) - if format == "json-pretty" { - enc.SetIndent("", " ") - } - - err := enc.Encode(val) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - - default: - _, _ = fmt.Fprintf(w, "%#v", val) - } -} - -func downloadKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string) { - if store[key].value == nil { - http.Error(w, "value not found", http.StatusNotFound) - return - } - - val := store[key] - - c := kv.GetCodec(store[key].codecID) - if c == nil { - http.Error(w, "codec not found", http.StatusNotFound) - return - } - - encoded, err := c.Encode(val.value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError) - return - } - - w.Header().Add("content-type", "application/octet-stream") - // Set content-length so that client knows whether it has received full response or not. - w.Header().Add("content-length", strconv.Itoa(len(encoded))) - w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.version, key)) - w.WriteHeader(200) - - // Ignore errors, we cannot do anything about them. - _, _ = w.Write(encoded) -} - -type pageData struct { - Now time.Time - Memberlist *memberlist.Memberlist - SortedMembers []*memberlist.Node - Store map[string]valueDesc - SentMessages []message - ReceivedMessages []message -} - -var pageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{ - "StringsJoin": strings.Join, -}).Parse(pageContent)) - -const pageContent = ` - - - - - Memberlist Status - - -

Memberlist Status

-

Current time: {{ .Now }}

- - - -

KV Store

- - - - - - - - - - - - {{ range $k, $v := .Store }} - - - - - - {{ end }} - -
KeyValue DetailsActions
{{ $k }}{{ $v }} - json - | json-pretty - | struct - | download -
- -

Note that value "version" is node-specific. It starts with 0 (on restart), and increases on each received update. Size is in bytes.

- -

Memberlist Cluster Members

- - - - - - - - - - - - {{ range .SortedMembers }} - - - - - - {{ end }} - -
NameAddressState
{{ .Name }}{{ .Address }}{{ .State }}
- -

State: 0 = Alive, 1 = Suspect, 2 = Dead, 3 = Left

- -

Received Messages

- - Delete All Messages (received and sent) - - - - - - - - - - - - - - - - {{ range .ReceivedMessages }} - - - - - - - - - - {{ end }} - -
IDTimeKeyValue in the MessageVersion After Update (0 = no change)ChangesActions
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} - json - | json-pretty - | struct -
- -

Sent Messages

- - Delete All Messages (received and sent) - - - - - - - - - - - - - - - - {{ range .SentMessages }} - - - - - - - - - - {{ end }} - -
IDTimeKeyValueVersionChangesActions
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} - json - | json-pretty - | struct -
- -` diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go deleted file mode 100644 index 1b21fa5c4..000000000 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ /dev/null @@ -1,1307 +0,0 @@ -package memberlist - -import ( - "bytes" - "context" - "crypto/rand" - "encoding/binary" - "errors" - "flag" - "fmt" - "math" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/hashicorp/memberlist" - "github.com/prometheus/client_golang/prometheus" - - "github.com/grafana/dskit/backoff" - "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/codec" - "github.com/grafana/dskit/services" -) - -const ( - maxCasRetries = 10 // max retries in CAS operation - noChangeDetectedRetrySleep = time.Second // how long to sleep after no change was detected in CAS -) - -// Client implements kv.Client interface, by using memberlist.KV -type Client struct { - kv *KV // reference to singleton memberlist-based KV - codec codec.Codec -} - -// NewClient creates new client instance. Supplied codec must already be registered in KV. -func NewClient(kv *KV, codec codec.Codec) (*Client, error) { - c := kv.GetCodec(codec.CodecID()) - if c == nil { - return nil, fmt.Errorf("codec not registered in KV: %s", codec.CodecID()) - } - - return &Client{ - kv: kv, - codec: codec, - }, nil -} - -// List is part of kv.Client interface. -func (c *Client) List(ctx context.Context, prefix string) ([]string, error) { - err := c.awaitKVRunningOrStopping(ctx) - if err != nil { - return nil, err - } - - return c.kv.List(prefix), nil -} - -// Get is part of kv.Client interface. -func (c *Client) Get(ctx context.Context, key string) (interface{}, error) { - err := c.awaitKVRunningOrStopping(ctx) - if err != nil { - return nil, err - } - - return c.kv.Get(key, c.codec) -} - -// Delete is part of kv.Client interface. -func (c *Client) Delete(ctx context.Context, key string) error { - return errors.New("memberlist does not support Delete") -} - -// CAS is part of kv.Client interface -func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - err := c.awaitKVRunningOrStopping(ctx) - if err != nil { - return err - } - - return c.kv.CAS(ctx, key, c.codec, f) -} - -// WatchKey is part of kv.Client interface. -func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { - err := c.awaitKVRunningOrStopping(ctx) - if err != nil { - return - } - - c.kv.WatchKey(ctx, key, c.codec, f) -} - -// WatchPrefix calls f whenever any value stored under prefix changes. -// Part of kv.Client interface. -func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) { - err := c.awaitKVRunningOrStopping(ctx) - if err != nil { - return - } - - c.kv.WatchPrefix(ctx, prefix, c.codec, f) -} - -// We want to use KV in Running and Stopping states. -func (c *Client) awaitKVRunningOrStopping(ctx context.Context) error { - s := c.kv.State() - switch s { - case services.Running, services.Stopping: - return nil - case services.New, services.Starting: - err := c.kv.AwaitRunning(ctx) - if ns := c.kv.State(); ns == services.Stopping { - return nil - } - return err - default: - return fmt.Errorf("unexpected state: %v", s) - } -} - -// KVConfig is a config for memberlist.KV -type KVConfig struct { - // Memberlist options. - NodeName string `yaml:"node_name"` - RandomizeNodeName bool `yaml:"randomize_node_name"` - StreamTimeout time.Duration `yaml:"stream_timeout"` - RetransmitMult int `yaml:"retransmit_factor"` - PushPullInterval time.Duration `yaml:"pull_push_interval"` - GossipInterval time.Duration `yaml:"gossip_interval"` - GossipNodes int `yaml:"gossip_nodes"` - GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time"` - DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time"` - EnableCompression bool `yaml:"compression_enabled"` - - // ip:port to advertise other cluster members. Used for NAT traversal - AdvertiseAddr string `yaml:"advertise_addr"` - AdvertisePort int `yaml:"advertise_port"` - - // List of members to join - JoinMembers flagext.StringSlice `yaml:"join_members"` - MinJoinBackoff time.Duration `yaml:"min_join_backoff"` - MaxJoinBackoff time.Duration `yaml:"max_join_backoff"` - MaxJoinRetries int `yaml:"max_join_retries"` - AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"` - RejoinInterval time.Duration `yaml:"rejoin_interval"` - - // Remove LEFT ingesters from ring after this timeout. - LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"` - - // Timeout used when leaving the memberlist cluster. - LeaveTimeout time.Duration `yaml:"leave_timeout"` - - // How much space to use to keep received and sent messages in memory (for troubleshooting). - MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes"` - - TCPTransport TCPTransportConfig `yaml:",inline"` - - // Where to put custom metrics. Metrics are not registered, if this is nil. - MetricsRegisterer prometheus.Registerer `yaml:"-"` - MetricsNamespace string `yaml:"-"` - - // Codecs to register. Codecs need to be registered before joining other members. - Codecs []codec.Codec `yaml:"-"` -} - -// RegisterFlagsWithPrefix registers flags. -func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - mlDefaults := defaultMemberlistConfig() - - // "Defaults to hostname" -- memberlist sets it to hostname by default. - f.StringVar(&cfg.NodeName, prefix+"memberlist.nodename", "", "Name of the node in memberlist cluster. Defaults to hostname.") // memberlist.DefaultLANConfig will put hostname here. - f.BoolVar(&cfg.RandomizeNodeName, prefix+"memberlist.randomize-node-name", true, "Add random suffix to the node name.") - f.DurationVar(&cfg.StreamTimeout, prefix+"memberlist.stream-timeout", mlDefaults.TCPTimeout, "The timeout for establishing a connection with a remote node, and for read/write operations.") - f.IntVar(&cfg.RetransmitMult, prefix+"memberlist.retransmit-factor", mlDefaults.RetransmitMult, "Multiplication factor used when sending out messages (factor * log(N+1)).") - f.Var(&cfg.JoinMembers, prefix+"memberlist.join", "Other cluster members to join. Can be specified multiple times. It can be an IP, hostname or an entry specified in the DNS Service Discovery format.") - f.DurationVar(&cfg.MinJoinBackoff, prefix+"memberlist.min-join-backoff", 1*time.Second, "Min backoff duration to join other cluster members.") - f.DurationVar(&cfg.MaxJoinBackoff, prefix+"memberlist.max-join-backoff", 1*time.Minute, "Max backoff duration to join other cluster members.") - f.IntVar(&cfg.MaxJoinRetries, prefix+"memberlist.max-join-retries", 10, "Max number of retries to join other cluster members.") - f.BoolVar(&cfg.AbortIfJoinFails, prefix+"memberlist.abort-if-join-fails", true, "If this node fails to join memberlist cluster, abort.") - f.DurationVar(&cfg.RejoinInterval, prefix+"memberlist.rejoin-interval", 0, "If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix the cluster split issue, and is harmless otherwise. For example when using only few components as a seed nodes (via -memberlist.join), then it's recommended to use rejoin. If -memberlist.join points to dynamic service that resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin is not needed.") - f.DurationVar(&cfg.LeftIngestersTimeout, prefix+"memberlist.left-ingesters-timeout", 5*time.Minute, "How long to keep LEFT ingesters in the ring.") - f.DurationVar(&cfg.LeaveTimeout, prefix+"memberlist.leave-timeout", 5*time.Second, "Timeout for leaving memberlist cluster.") - f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", mlDefaults.GossipInterval, "How often to gossip.") - f.IntVar(&cfg.GossipNodes, prefix+"memberlist.gossip-nodes", mlDefaults.GossipNodes, "How many nodes to gossip to.") - f.DurationVar(&cfg.PushPullInterval, prefix+"memberlist.pullpush-interval", mlDefaults.PushPullInterval, "How often to use pull/push sync.") - f.DurationVar(&cfg.GossipToTheDeadTime, prefix+"memberlist.gossip-to-dead-nodes-time", mlDefaults.GossipToTheDeadTime, "How long to keep gossiping to dead nodes, to give them chance to refute their death.") - f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.") - f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.") - f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") - f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.") - f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") - - cfg.TCPTransport.RegisterFlags(f, prefix) -} - -func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix(f, "") -} - -func generateRandomSuffix(logger log.Logger) string { - suffix := make([]byte, 4) - _, err := rand.Read(suffix) - if err != nil { - level.Error(logger).Log("msg", "failed to generate random suffix", "err", err) - return "error" - } - return fmt.Sprintf("%2x", suffix) -} - -// KV implements Key-Value store on top of memberlist library. KV store has API similar to kv.Client, -// except methods also need explicit codec for each operation. -// KV is a Service. It needs to be started first, and is only usable once it enters Running state. -// If joining of the cluster if configured, it is done in Running state, and if join fails and Abort flag is set, service -// fails. -type KV struct { - services.Service - - cfg KVConfig - logger log.Logger - registerer prometheus.Registerer - - // dns discovery provider - provider DNSProvider - - // Protects access to memberlist and broadcasts fields. - initWG sync.WaitGroup - memberlist *memberlist.Memberlist - broadcasts *memberlist.TransmitLimitedQueue - - // KV Store. - storeMu sync.Mutex - store map[string]valueDesc - - // Codec registry - codecs map[string]codec.Codec - - // Key watchers - watchersMu sync.Mutex - watchers map[string][]chan string - prefixWatchers map[string][]chan string - - // Buffers with sent and received messages. Used for troubleshooting only. - // New messages are appended, old messages (based on configured size limit) removed from the front. - messagesMu sync.Mutex - sentMessages []message - sentMessagesSize int - receivedMessages []message - receivedMessagesSize int - messageCounter int // Used to give each message in the sentMessages and receivedMessages a unique ID, for UI. - - // closed on shutdown - shutdown chan struct{} - - // metrics - numberOfReceivedMessages prometheus.Counter - totalSizeOfReceivedMessages prometheus.Counter - numberOfInvalidReceivedMessages prometheus.Counter - numberOfPulls prometheus.Counter - numberOfPushes prometheus.Counter - totalSizeOfPulls prometheus.Counter - totalSizeOfPushes prometheus.Counter - numberOfBroadcastMessagesInQueue prometheus.GaugeFunc - totalSizeOfBroadcastMessagesInQueue prometheus.Gauge - numberOfBroadcastMessagesDropped prometheus.Counter - casAttempts prometheus.Counter - casFailures prometheus.Counter - casSuccesses prometheus.Counter - watchPrefixDroppedNotifications *prometheus.CounterVec - - storeValuesDesc *prometheus.Desc - storeTombstones *prometheus.GaugeVec - storeRemovedTombstones *prometheus.CounterVec - - memberlistMembersCount prometheus.GaugeFunc - memberlistHealthScore prometheus.GaugeFunc - - // make this configurable for tests. Default value is fine for normal usage - // where updates are coming from network, but when running tests with many - // goroutines using same KV, default can be too low. - maxCasRetries int -} - -// Message describes incoming or outgoing message, and local state after applying incoming message, or state when sending message. -// Fields are exported for templating to work. -type message struct { - ID int // Unique local ID of the message. - Time time.Time // Time when message was sent or received. - Size int // Message size - Pair KeyValuePair - - // Following values are computed on the receiving node, based on local state. - Version uint // For sent message, which version the message reflects. For received message, version after applying the message. - Changes []string // List of changes in this message (as computed by *this* node). -} - -type valueDesc struct { - // We store the decoded value here to prevent decoding the entire state for every - // update we receive. Whilst the updates are small and fast to decode, - // the total state can be quite large. - // The CAS function is passed a deep copy because it modifies in-place. - value Mergeable - - // version (local only) is used to keep track of what we're gossiping about, and invalidate old messages - version uint - - // ID of codec used to write this value. Only used when sending full state. - codecID string -} - -func (v valueDesc) Clone() (result valueDesc) { - result = v - if v.value != nil { - result.value = v.value.Clone() - } - return -} - -func (v valueDesc) String() string { - return fmt.Sprintf("version: %d, codec: %s", v.version, v.codecID) -} - -var ( - // if merge fails because of CAS version mismatch, this error is returned. CAS operation reacts on it - errVersionMismatch = errors.New("version mismatch") - errNoChangeDetected = errors.New("no change detected") - errTooManyRetries = errors.New("too many retries") -) - -// NewKV creates new gossip-based KV service. Note that service needs to be started, until then it doesn't initialize -// gossiping part. Only after service is in Running state, it is really gossiping. Starting the service will also -// trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned -// and service enters Failed state. -func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KV { - cfg.TCPTransport.MetricsRegisterer = cfg.MetricsRegisterer - cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace - - mlkv := &KV{ - cfg: cfg, - logger: logger, - registerer: registerer, - provider: dnsProvider, - - store: make(map[string]valueDesc), - codecs: make(map[string]codec.Codec), - watchers: make(map[string][]chan string), - prefixWatchers: make(map[string][]chan string), - shutdown: make(chan struct{}), - maxCasRetries: maxCasRetries, - } - - mlkv.createAndRegisterMetrics() - - for _, c := range cfg.Codecs { - mlkv.codecs[c.CodecID()] = c - } - - mlkv.Service = services.NewBasicService(mlkv.starting, mlkv.running, mlkv.stopping) - return mlkv -} - -func defaultMemberlistConfig() *memberlist.Config { - return memberlist.DefaultLANConfig() -} - -func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { - tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger) - if err != nil { - return nil, fmt.Errorf("failed to create transport: %v", err) - } - - mlCfg := defaultMemberlistConfig() - mlCfg.Delegate = m - - mlCfg.TCPTimeout = m.cfg.StreamTimeout - mlCfg.RetransmitMult = m.cfg.RetransmitMult - mlCfg.PushPullInterval = m.cfg.PushPullInterval - mlCfg.GossipInterval = m.cfg.GossipInterval - mlCfg.GossipNodes = m.cfg.GossipNodes - mlCfg.GossipToTheDeadTime = m.cfg.GossipToTheDeadTime - mlCfg.DeadNodeReclaimTime = m.cfg.DeadNodeReclaimTime - mlCfg.EnableCompression = m.cfg.EnableCompression - - mlCfg.AdvertiseAddr = m.cfg.AdvertiseAddr - mlCfg.AdvertisePort = m.cfg.AdvertisePort - - if m.cfg.NodeName != "" { - mlCfg.Name = m.cfg.NodeName - } - if m.cfg.RandomizeNodeName { - mlCfg.Name = mlCfg.Name + "-" + generateRandomSuffix(m.logger) - level.Info(m.logger).Log("msg", "Using memberlist cluster node name", "name", mlCfg.Name) - } - - mlCfg.LogOutput = newMemberlistLoggerAdapter(m.logger, false) - mlCfg.Transport = tr - - // Memberlist uses UDPBufferSize to figure out how many messages it can put into single "packet". - // As we don't use UDP for sending packets, we can use higher value here. - mlCfg.UDPBufferSize = 10 * 1024 * 1024 - return mlCfg, nil -} - -func (m *KV) starting(_ context.Context) error { - mlCfg, err := m.buildMemberlistConfig() - if err != nil { - return err - } - - // Wait for memberlist and broadcasts fields creation because - // memberlist may start calling delegate methods if it - // receives traffic. - // See https://godoc.org/github.com/hashicorp/memberlist#Delegate - // - // Note: We cannot check for Starting state, as we want to use delegate during cluster joining process - // that happens in Starting state. - m.initWG.Add(1) - list, err := memberlist.Create(mlCfg) - if err != nil { - return fmt.Errorf("failed to create memberlist: %v", err) - } - - // Finish delegate initialization. - m.memberlist = list - m.broadcasts = &memberlist.TransmitLimitedQueue{ - NumNodes: list.NumMembers, - RetransmitMult: mlCfg.RetransmitMult, - } - m.initWG.Done() - - return nil -} - -var errFailedToJoinCluster = errors.New("failed to join memberlist cluster on startup") - -func (m *KV) running(ctx context.Context) error { - // Join the cluster, if configured. We want this to happen in Running state, because started memberlist - // is good enough for usage from Client (which checks for Running state), even before it connects to the cluster. - if len(m.cfg.JoinMembers) > 0 { - // Lookup SRV records for given addresses to discover members. - members := m.discoverMembers(ctx, m.cfg.JoinMembers) - - err := m.joinMembersOnStartup(ctx, members) - if err != nil { - level.Error(m.logger).Log("msg", "failed to join memberlist cluster", "err", err) - - if m.cfg.AbortIfJoinFails { - return errFailedToJoinCluster - } - } - } - - var tickerChan <-chan time.Time - if m.cfg.RejoinInterval > 0 && len(m.cfg.JoinMembers) > 0 { - t := time.NewTicker(m.cfg.RejoinInterval) - defer t.Stop() - - tickerChan = t.C - } - - for { - select { - case <-tickerChan: - members := m.discoverMembers(ctx, m.cfg.JoinMembers) - - reached, err := m.memberlist.Join(members) - if err == nil { - level.Info(m.logger).Log("msg", "re-joined memberlist cluster", "reached_nodes", reached) - } else { - // Don't report error from rejoin, otherwise KV service would be stopped completely. - level.Warn(m.logger).Log("msg", "re-joining memberlist cluster failed", "err", err) - } - - case <-ctx.Done(): - return nil - } - } -} - -// GetCodec returns codec for given ID or nil. -func (m *KV) GetCodec(codecID string) codec.Codec { - return m.codecs[codecID] -} - -// GetListeningPort returns port used for listening for memberlist communication. Useful when BindPort is set to 0. -// This call is only valid after KV service has been started. -func (m *KV) GetListeningPort() int { - return int(m.memberlist.LocalNode().Port) -} - -// JoinMembers joins the cluster with given members. -// See https://godoc.org/github.com/hashicorp/memberlist#Memberlist.Join -// This call is only valid after KV service has been started and is still running. -func (m *KV) JoinMembers(members []string) (int, error) { - if m.State() != services.Running { - return 0, fmt.Errorf("service not Running") - } - return m.memberlist.Join(members) -} - -func (m *KV) joinMembersOnStartup(ctx context.Context, members []string) error { - reached, err := m.memberlist.Join(members) - if err == nil { - level.Info(m.logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached) - return nil - } - - if m.cfg.MaxJoinRetries <= 0 { - return err - } - - level.Debug(m.logger).Log("msg", "attempt to join memberlist cluster failed", "retries", 0, "err", err) - lastErr := err - - cfg := backoff.Config{ - MinBackoff: m.cfg.MinJoinBackoff, - MaxBackoff: m.cfg.MaxJoinBackoff, - MaxRetries: m.cfg.MaxJoinRetries, - } - - backoff := backoff.New(ctx, cfg) - - for backoff.Ongoing() { - backoff.Wait() - - reached, err := m.memberlist.Join(members) - if err != nil { - lastErr = err - level.Debug(m.logger).Log("msg", "attempt to join memberlist cluster failed", "retries", backoff.NumRetries(), "err", err) - continue - } - - level.Info(m.logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached) - return nil - } - - return lastErr -} - -// Provides a dns-based member disovery to join a memberlist cluster w/o knowning members' addresses upfront. -func (m *KV) discoverMembers(ctx context.Context, members []string) []string { - if len(members) == 0 { - return nil - } - - var ms, resolve []string - - for _, member := range members { - if strings.Contains(member, "+") { - resolve = append(resolve, member) - } else { - // No DNS SRV record to lookup, just append member - ms = append(ms, member) - } - } - - err := m.provider.Resolve(ctx, resolve) - if err != nil { - level.Error(m.logger).Log("msg", "failed to resolve members", "addrs", strings.Join(resolve, ",")) - } - - ms = append(ms, m.provider.Addresses()...) - - return ms -} - -// While Stopping, we try to leave memberlist cluster and then shutdown memberlist client. -// We do this in order to send out last messages, typically that ingester has LEFT the ring. -func (m *KV) stopping(_ error) error { - level.Info(m.logger).Log("msg", "leaving memberlist cluster") - - // Wait until broadcast queue is empty, but don't wait for too long. - // Also don't wait if there is just one node left. - // Problem is that broadcast queue is also filled up by state changes received from other nodes, - // so it may never be empty in a busy cluster. However, we generally only care about messages - // generated on this node via CAS, and those are disabled now (via casBroadcastsEnabled), and should be able - // to get out in this timeout. - - waitTimeout := time.Now().Add(10 * time.Second) - for m.broadcasts.NumQueued() > 0 && m.memberlist.NumMembers() > 1 && time.Now().Before(waitTimeout) { - time.Sleep(250 * time.Millisecond) - } - - if cnt := m.broadcasts.NumQueued(); cnt > 0 { - level.Warn(m.logger).Log("msg", "broadcast messages left in queue", "count", cnt, "nodes", m.memberlist.NumMembers()) - } - - err := m.memberlist.Leave(m.cfg.LeaveTimeout) - if err != nil { - level.Error(m.logger).Log("msg", "error when leaving memberlist cluster", "err", err) - } - - close(m.shutdown) - - err = m.memberlist.Shutdown() - if err != nil { - level.Error(m.logger).Log("msg", "error when shutting down memberlist client", "err", err) - } - return nil -} - -// List returns all known keys under a given prefix. -// No communication with other nodes in the cluster is done here. -func (m *KV) List(prefix string) []string { - m.storeMu.Lock() - defer m.storeMu.Unlock() - - var keys []string - for k := range m.store { - if strings.HasPrefix(k, prefix) { - keys = append(keys, k) - } - } - return keys -} - -// Get returns current value associated with given key. -// No communication with other nodes in the cluster is done here. -func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) { - val, _, err := m.get(key, codec) - return val, err -} - -// Returns current value with removed tombstones. -func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) { - m.storeMu.Lock() - v := m.store[key].Clone() - m.storeMu.Unlock() - - if v.value != nil { - // remove ALL tombstones before returning to client. - // No need for clients to see them. - _, _ = v.value.RemoveTombstones(time.Time{}) - } - - return v.value, v.version, nil -} - -// WatchKey watches for value changes for given key. When value changes, 'f' function is called with the -// latest value. Notifications that arrive while 'f' is running are coalesced into one subsequent 'f' call. -// -// Watching ends when 'f' returns false, context is done, or this client is shut down. -func (m *KV) WatchKey(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool) { - // keep one extra notification, to avoid missing notification if we're busy running the function - w := make(chan string, 1) - - // register watcher - m.watchersMu.Lock() - m.watchers[key] = append(m.watchers[key], w) - m.watchersMu.Unlock() - - defer func() { - // unregister watcher on exit - m.watchersMu.Lock() - defer m.watchersMu.Unlock() - - removeWatcherChannel(key, w, m.watchers) - }() - - for { - select { - case <-w: - // value changed - val, _, err := m.get(key, codec) - if err != nil { - level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err) - continue - } - - if !f(val) { - return - } - - case <-m.shutdown: - // stop watching on shutdown - return - - case <-ctx.Done(): - return - } - } -} - -// WatchPrefix watches for any change of values stored under keys with given prefix. When change occurs, -// function 'f' is called with key and current value. -// Each change of the key results in one notification. If there are too many pending notifications ('f' is slow), -// some notifications may be lost. -// -// Watching ends when 'f' returns false, context is done, or this client is shut down. -func (m *KV) WatchPrefix(ctx context.Context, prefix string, codec codec.Codec, f func(string, interface{}) bool) { - // we use bigger buffer here, since keys are interesting and we don't want to lose them. - w := make(chan string, 16) - - // register watcher - m.watchersMu.Lock() - m.prefixWatchers[prefix] = append(m.prefixWatchers[prefix], w) - m.watchersMu.Unlock() - - defer func() { - // unregister watcher on exit - m.watchersMu.Lock() - defer m.watchersMu.Unlock() - - removeWatcherChannel(prefix, w, m.prefixWatchers) - }() - - for { - select { - case key := <-w: - val, _, err := m.get(key, codec) - if err != nil { - level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err) - continue - } - - if !f(key, val) { - return - } - - case <-m.shutdown: - // stop watching on shutdown - return - - case <-ctx.Done(): - return - } - } -} - -func removeWatcherChannel(k string, w chan string, watchers map[string][]chan string) { - ws := watchers[k] - for ix, kw := range ws { - if kw == w { - ws = append(ws[:ix], ws[ix+1:]...) - break - } - } - - if len(ws) > 0 { - watchers[k] = ws - } else { - delete(watchers, k) - } -} - -func (m *KV) notifyWatchers(key string) { - m.watchersMu.Lock() - defer m.watchersMu.Unlock() - - for _, kw := range m.watchers[key] { - select { - case kw <- key: - // notification sent. - default: - // cannot send notification to this watcher at the moment - // but since this is a buffered channel, it means that - // there is already a pending notification anyway - } - } - - for p, ws := range m.prefixWatchers { - if strings.HasPrefix(key, p) { - for _, pw := range ws { - select { - case pw <- key: - // notification sent. - default: - c, _ := m.watchPrefixDroppedNotifications.GetMetricWithLabelValues(p) - if c != nil { - c.Inc() - } - - level.Warn(m.logger).Log("msg", "failed to send notification to prefix watcher", "prefix", p) - } - } - } - } -} - -// CAS implements Compare-And-Set/Swap operation. -// -// CAS expects that value returned by 'f' function implements Mergeable interface. If it doesn't, CAS fails immediately. -// -// This method combines Compare-And-Swap with Merge: it calls 'f' function to get a new state, and then merges this -// new state into current state, to find out what the change was. Resulting updated current state is then CAS-ed to -// KV store, and change is broadcast to cluster peers. Merge function is called with CAS flag on, so that it can -// detect removals. If Merge doesn't result in any change (returns nil), then operation fails and is retried again. -// After too many failed retries, this method returns error. -func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) error { - var lastError error - -outer: - for retries := m.maxCasRetries; retries > 0; retries-- { - m.casAttempts.Inc() - - if lastError == errNoChangeDetected { - // We only get here, if 'f' reports some change, but Merge function reports no change. This can happen - // with Ring's merge function, which depends on timestamps (and not the tokens) with 1-second resolution. - // By waiting for one second, we hope that Merge will be able to detect change from 'f' function. - - select { - case <-time.After(noChangeDetectedRetrySleep): - // ok - case <-ctx.Done(): - lastError = ctx.Err() - break outer - } - } - - change, newver, retry, err := m.trySingleCas(key, codec, f) - if err != nil { - level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry) - - lastError = err - if !retry { - break - } - continue - } - - if change != nil { - m.casSuccesses.Inc() - m.notifyWatchers(key) - - if m.State() == services.Running { - m.broadcastNewValue(key, change, newver, codec) - } else { - level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key) - } - } - - return nil - } - - if lastError == errVersionMismatch { - // this is more likely error than version mismatch. - lastError = errTooManyRetries - } - - m.casFailures.Inc() - return fmt.Errorf("failed to CAS-update key %s: %v", key, lastError) -} - -// returns change, error (or nil, if CAS succeeded), and whether to retry or not. -// returns errNoChangeDetected if merge failed to detect change in f's output. -func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, error) { - val, ver, err := m.get(key, codec) - if err != nil { - return nil, 0, false, fmt.Errorf("failed to get value: %v", err) - } - - out, retry, err := f(val) - if err != nil { - return nil, 0, retry, fmt.Errorf("fn returned error: %v", err) - } - - if out == nil { - // no change to be done - return nil, 0, false, nil - } - - // Don't even try - r, ok := out.(Mergeable) - if !ok || r == nil { - return nil, 0, retry, fmt.Errorf("invalid type: %T, expected Mergeable", out) - } - - // To support detection of removed items from value, we will only allow CAS operation to - // succeed if version hasn't changed, i.e. state hasn't changed since running 'f'. - change, newver, err := m.mergeValueForKey(key, r, ver, codec) - if err == errVersionMismatch { - return nil, 0, retry, err - } - - if err != nil { - return nil, 0, retry, fmt.Errorf("merge failed: %v", err) - } - - if newver == 0 { - // CAS method reacts on this error - return nil, 0, retry, errNoChangeDetected - } - - return change, newver, retry, nil -} - -func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec) { - data, err := codec.Encode(change) - if err != nil { - level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err) - m.numberOfBroadcastMessagesDropped.Inc() - return - } - - kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID()} - pairData, err := kvPair.Marshal() - if err != nil { - level.Error(m.logger).Log("msg", "failed to serialize KV pair", "key", key, "version", version, "err", err) - m.numberOfBroadcastMessagesDropped.Inc() - return - } - - if len(pairData) > 65535 { - // Unfortunately, memberlist will happily let us send bigger messages via gossip, - // but then it will fail to parse them properly, because its own size field is 2-bytes only. - // (github.com/hashicorp/memberlist@v0.1.4/util.go:167, makeCompoundMessage function) - // - // Typically messages are smaller (when dealing with couple of updates only), but can get bigger - // when broadcasting result of push/pull update. - level.Debug(m.logger).Log("msg", "broadcast message too big, not broadcasting", "key", key, "version", version, "len", len(pairData)) - m.numberOfBroadcastMessagesDropped.Inc() - return - } - - m.addSentMessage(message{ - Time: time.Now(), - Size: len(pairData), - Pair: kvPair, - Version: version, - Changes: change.MergeContent(), - }) - - m.queueBroadcast(key, change.MergeContent(), version, pairData) -} - -// NodeMeta is method from Memberlist Delegate interface -func (m *KV) NodeMeta(limit int) []byte { - // we can send local state from here (512 bytes only) - // if state is updated, we need to tell memberlist to distribute it. - return nil -} - -// NotifyMsg is method from Memberlist Delegate interface -// Called when single message is received, i.e. what our broadcastNewValue has sent. -func (m *KV) NotifyMsg(msg []byte) { - m.initWG.Wait() - - m.numberOfReceivedMessages.Inc() - m.totalSizeOfReceivedMessages.Add(float64(len(msg))) - - kvPair := KeyValuePair{} - err := kvPair.Unmarshal(msg) - if err != nil { - level.Warn(m.logger).Log("msg", "failed to unmarshal received KV Pair", "err", err) - m.numberOfInvalidReceivedMessages.Inc() - return - } - - if len(kvPair.Key) == 0 { - level.Warn(m.logger).Log("msg", "received an invalid KV Pair (empty key)") - m.numberOfInvalidReceivedMessages.Inc() - return - } - - codec := m.GetCodec(kvPair.GetCodec()) - if codec == nil { - m.numberOfInvalidReceivedMessages.Inc() - level.Error(m.logger).Log("msg", "failed to decode received value, unknown codec", "codec", kvPair.GetCodec()) - return - } - - // we have a ring update! Let's merge it with our version of the ring for given key - mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec) - - changes := []string(nil) - if mod != nil { - changes = mod.MergeContent() - } - - m.addReceivedMessage(message{ - Time: time.Now(), - Size: len(msg), - Pair: kvPair, - Version: version, - Changes: changes, - }) - - if err != nil { - level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) - } else if version > 0 { - m.notifyWatchers(kvPair.Key) - - // Don't resend original message, but only changes. - m.broadcastNewValue(kvPair.Key, mod, version, codec) - } -} - -func (m *KV) queueBroadcast(key string, content []string, version uint, message []byte) { - l := len(message) - - b := ringBroadcast{ - key: key, - content: content, - version: version, - msg: message, - finished: func(b ringBroadcast) { - m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) - }, - logger: m.logger, - } - - m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) - m.broadcasts.QueueBroadcast(b) -} - -// GetBroadcasts is method from Memberlist Delegate interface -// It returns all pending broadcasts (within the size limit) -func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { - m.initWG.Wait() - - return m.broadcasts.GetBroadcasts(overhead, limit) -} - -// LocalState is method from Memberlist Delegate interface -// -// This is "pull" part of push/pull sync (either periodic, or when new node joins the cluster). -// Here we dump our entire state -- all keys and their values. There is no limit on message size here, -// as Memberlist uses 'stream' operations for transferring this state. -func (m *KV) LocalState(join bool) []byte { - m.initWG.Wait() - - m.numberOfPulls.Inc() - - m.storeMu.Lock() - defer m.storeMu.Unlock() - - // For each Key/Value pair in our store, we write - // [4-bytes length of marshalled KV pair] [marshalled KV pair] - - buf := bytes.Buffer{} - sent := time.Now() - - kvPair := KeyValuePair{} - for key, val := range m.store { - if val.value == nil { - continue - } - - codec := m.GetCodec(val.codecID) - if codec == nil { - level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.codecID, "key", key) - continue - } - - encoded, err := codec.Encode(val.value) - if err != nil { - level.Error(m.logger).Log("msg", "failed to encode remote state", "err", err) - continue - } - - kvPair.Reset() - kvPair.Key = key - kvPair.Value = encoded - kvPair.Codec = val.codecID - - ser, err := kvPair.Marshal() - if err != nil { - level.Error(m.logger).Log("msg", "failed to serialize KV Pair", "err", err) - continue - } - - if uint(len(ser)) > math.MaxUint32 { - level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(encoded)) - continue - } - - err = binary.Write(&buf, binary.BigEndian, uint32(len(ser))) - if err != nil { - level.Error(m.logger).Log("msg", "failed to write uint32 to buffer?", "err", err) - continue - } - buf.Write(ser) - - m.addSentMessage(message{ - Time: sent, - Size: len(ser), - Pair: kvPair, // Makes a copy of kvPair. - Version: val.version, - }) - } - - m.totalSizeOfPulls.Add(float64(buf.Len())) - return buf.Bytes() -} - -// MergeRemoteState is method from Memberlist Delegate interface -// -// This is 'push' part of push/pull sync. We merge incoming KV store (all keys and values) with ours. -// -// Data is full state of remote KV store, as generated by LocalState method (run on another node). -func (m *KV) MergeRemoteState(data []byte, join bool) { - received := time.Now() - - m.initWG.Wait() - - m.numberOfPushes.Inc() - m.totalSizeOfPushes.Add(float64(len(data))) - - kvPair := KeyValuePair{} - - var err error - // Data contains individual KV pairs (encoded as protobuf messages), each prefixed with 4 bytes length of KV pair: - // [4-bytes length of marshalled KV pair] [marshalled KV pair] [4-bytes length] [KV pair]... - for len(data) > 0 { - if len(data) < 4 { - err = fmt.Errorf("not enough data left for another KV Pair: %d", len(data)) - break - } - - kvPairLength := binary.BigEndian.Uint32(data) - - data = data[4:] - - if len(data) < int(kvPairLength) { - err = fmt.Errorf("not enough data left for next KV Pair, expected %d, remaining %d bytes", kvPairLength, len(data)) - break - } - - kvPair.Reset() - err = kvPair.Unmarshal(data[:kvPairLength]) - if err != nil { - err = fmt.Errorf("failed to parse KV Pair: %v", err) - break - } - - data = data[kvPairLength:] - - codec := m.GetCodec(kvPair.GetCodec()) - if codec == nil { - level.Error(m.logger).Log("msg", "failed to parse remote state: unknown codec for key", "codec", kvPair.GetCodec(), "key", kvPair.GetKey()) - continue - } - - // we have both key and value, try to merge it with our state - change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec) - - changes := []string(nil) - if change != nil { - changes = change.MergeContent() - } - - m.addReceivedMessage(message{ - Time: received, - Size: int(kvPairLength), - Pair: kvPair, // Makes a copy of kvPair. - Version: newver, - Changes: changes, - }) - - if err != nil { - level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) - } else if newver > 0 { - m.notifyWatchers(kvPair.Key) - m.broadcastNewValue(kvPair.Key, change, newver, codec) - } - } - - if err != nil { - level.Error(m.logger).Log("msg", "failed to parse remote state", "err", err) - } -} - -func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec) (Mergeable, uint, error) { - decodedValue, err := codec.Decode(incomingData) - if err != nil { - return nil, 0, fmt.Errorf("failed to decode value: %v", err) - } - - incomingValue, ok := decodedValue.(Mergeable) - if !ok { - return nil, 0, fmt.Errorf("expected Mergeable, got: %T", decodedValue) - } - - return m.mergeValueForKey(key, incomingValue, 0, codec) -} - -// Merges incoming value with value we have in our store. Returns "a change" that can be sent to other -// cluster members to update their state, and new version of the value. -// If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. -// If no modification occurred, new version is 0. -func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion uint, codec codec.Codec) (Mergeable, uint, error) { - m.storeMu.Lock() - defer m.storeMu.Unlock() - - curr := m.store[key].Clone() - // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. - if casVersion > 0 && curr.version != casVersion { - return nil, 0, errVersionMismatch - } - result, change, err := computeNewValue(incomingValue, curr.value, casVersion > 0) - if err != nil { - return nil, 0, err - } - - // No change, don't store it. - if change == nil || len(change.MergeContent()) == 0 { - return nil, 0, nil - } - - if m.cfg.LeftIngestersTimeout > 0 { - limit := time.Now().Add(-m.cfg.LeftIngestersTimeout) - total, removed := result.RemoveTombstones(limit) - m.storeTombstones.WithLabelValues(key).Set(float64(total)) - m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed)) - - // Remove tombstones from change too. If change turns out to be empty after this, - // we don't need to change local value either! - // - // Note that "result" and "change" may actually be the same Mergeable. That is why we - // call RemoveTombstones on "result" first, so that we get the correct metrics. Calling - // RemoveTombstones twice with same limit should be noop. - change.RemoveTombstones(limit) - if len(change.MergeContent()) == 0 { - return nil, 0, nil - } - } - - newVersion := curr.version + 1 - m.store[key] = valueDesc{ - value: result, - version: newVersion, - codecID: codec.CodecID(), - } - - return change, newVersion, nil -} - -// returns [result, change, error] -func computeNewValue(incoming Mergeable, oldVal Mergeable, cas bool) (Mergeable, Mergeable, error) { - if oldVal == nil { - return incoming, incoming, nil - } - - // otherwise we have two mergeables, so merge them - change, err := oldVal.Merge(incoming, cas) - return oldVal, change, err -} - -func (m *KV) storeCopy() map[string]valueDesc { - m.storeMu.Lock() - defer m.storeMu.Unlock() - - result := make(map[string]valueDesc, len(m.store)) - for k, v := range m.store { - result[k] = v.Clone() - } - return result -} -func (m *KV) addReceivedMessage(msg message) { - if m.cfg.MessageHistoryBufferBytes == 0 { - return - } - - m.messagesMu.Lock() - defer m.messagesMu.Unlock() - - m.messageCounter++ - msg.ID = m.messageCounter - - m.receivedMessages, m.receivedMessagesSize = addMessageToBuffer(m.receivedMessages, m.receivedMessagesSize, m.cfg.MessageHistoryBufferBytes, msg) -} - -func (m *KV) addSentMessage(msg message) { - if m.cfg.MessageHistoryBufferBytes == 0 { - return - } - - m.messagesMu.Lock() - defer m.messagesMu.Unlock() - - m.messageCounter++ - msg.ID = m.messageCounter - - m.sentMessages, m.sentMessagesSize = addMessageToBuffer(m.sentMessages, m.sentMessagesSize, m.cfg.MessageHistoryBufferBytes, msg) -} - -func (m *KV) getSentAndReceivedMessages() (sent, received []message) { - m.messagesMu.Lock() - defer m.messagesMu.Unlock() - - // Make copy of both slices. - return append([]message(nil), m.sentMessages...), append([]message(nil), m.receivedMessages...) -} - -func (m *KV) deleteSentReceivedMessages() { - m.messagesMu.Lock() - defer m.messagesMu.Unlock() - - m.sentMessages = nil - m.sentMessagesSize = 0 - m.receivedMessages = nil - m.receivedMessagesSize = 0 -} - -func addMessageToBuffer(msgs []message, size int, limit int, msg message) ([]message, int) { - msgs = append(msgs, msg) - size += msg.Size - - for len(msgs) > 0 && size > limit { - size -= msgs[0].Size - msgs = msgs[1:] - } - - return msgs, size -} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_logger.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_logger.go deleted file mode 100644 index 6ccb469b6..000000000 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_logger.go +++ /dev/null @@ -1,106 +0,0 @@ -package memberlist - -import ( - "io" - "regexp" - "strings" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" -) - -// loggerAdapter wraps a Logger and allows it to be passed to the stdlib -// logger's SetOutput. It understand and parses output produced by memberlist -// library (esp. level). Timestamp from memberlist can be ignored (eg. pkg/util/log.Logger -// is set up to auto-include timestamp with every message already) -type loggerAdapter struct { - log.Logger - logTimestamp bool -} - -// newMemberlistLoggerAdapter returns a new loggerAdapter, that can be passed -// memberlist.Config.LogOutput field. -func newMemberlistLoggerAdapter(logger log.Logger, logTimestamp bool) io.Writer { - a := loggerAdapter{ - Logger: logger, - logTimestamp: logTimestamp, - } - return a -} - -func (a loggerAdapter) Write(p []byte) (int, error) { - result := subexps(p) - keyvals := []interface{}{} - var timestamp string - if date, ok := result["date"]; ok && date != "" { - timestamp = date - } - if time, ok := result["time"]; ok && time != "" { - if timestamp != "" { - timestamp += " " - } - timestamp += time - } - if a.logTimestamp && timestamp != "" { - keyvals = append(keyvals, "ts", timestamp) - } - if file, ok := result["file"]; ok && file != "" { - keyvals = append(keyvals, "file", file) - } - if lvl, ok := result["level"]; ok { - lvl = strings.ToLower(lvl) - var lvlVal level.Value - - switch lvl { - case "debug": - lvlVal = level.DebugValue() - case "warn": - lvlVal = level.WarnValue() - case "info": - lvlVal = level.InfoValue() - case "err", "error": - lvlVal = level.ErrorValue() - } - - if lvlVal != nil { - keyvals = append(keyvals, "level", lvlVal) - } else { - keyvals = append(keyvals, "level", lvl) - } - } - if msg, ok := result["msg"]; ok { - keyvals = append(keyvals, "msg", msg) - } - if err := a.Logger.Log(keyvals...); err != nil { - return 0, err - } - return len(p), nil -} - -// 2019/10/01 12:05:06 [DEBUG] memberlist: Failed to join 127.0.0.1: dial tcp 127.0.0.1:8012: connect: connection refused -// 2019/10/01 12:07:34 /Users/test/go/pkg/mod/github.com/hashicorp/memberlist@v0.1.4/memberlist.go:245: [DEBUG] memberlist: Failed to join ::1: dial tcp [::1]:8012: connect: connection refused -const ( - logRegexpDate = `(?P[0-9]{4}/[0-9]{2}/[0-9]{2})?[ ]?` - logRegexpTime = `(?P