From 9ebc0a11ebaf7aef60c7c1a5c642beb0acad1512 Mon Sep 17 00:00:00 2001 From: Mark Phelps <209477+markphelps@users.noreply.github.com> Date: Tue, 28 Jan 2025 14:27:07 -0500 Subject: [PATCH] feat: V2 authn config + redis (#3852) * feat: auth refactor and redis store impl Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: remove auth cleanup config for now Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: rm cleanup Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: refactor Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: support ordering Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * Revert "chore: support ordering" This reverts commit db5f0d4ce1e7245b268137ff3057ed4a14aed46f. * chore: add back some listing test Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: fix tests Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: make generator funcs testable Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: rename redis store test Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: use existing constants and helpers for keys Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: support graceperiod for token expiry and add cleanup for memory store Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: add back cleanup graceperiod to schemas Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: fix test compilation Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: mod tidy Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> --------- Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> --- cmd/flipt/evaluate.go | 222 -------- cmd/flipt/main.go | 1 - config/flipt.schema.cue | 17 +- config/flipt.schema.json | 49 +- go.mod | 12 +- go.sum | 20 +- go.work.sum | 16 +- internal/cleanup/cleanup.go | 123 ----- internal/cleanup/cleanup_test.go | 106 ---- internal/cmd/authn.go | 77 ++- internal/cmd/grpc.go | 1 - internal/config/authentication.go | 162 +++--- internal/config/config.go | 2 +- internal/config/config_test.go | 95 ++-- internal/config/errors.go | 4 +- .../server/authn/method/github/server_test.go | 4 +- internal/server/authn/method/http.go | 4 +- .../method/kubernetes/server_internal_test.go | 2 +- .../authn/method/kubernetes/server_test.go | 2 +- .../authn/method/kubernetes/testing/grpc.go | 2 +- .../server/authn/method/oidc/server_test.go | 12 +- .../server/authn/method/oidc/testing/grpc.go | 2 +- .../server/authn/method/token/server_test.go | 2 +- .../authn/middleware/grpc/middleware_test.go | 13 +- .../authn/middleware/http/middleware.go | 4 +- .../authn/middleware/http/middleware_test.go | 4 +- internal/server/authn/public/server_test.go | 4 +- internal/server/authn/server_test.go | 4 +- internal/storage/authn/auth.go | 1 + internal/storage/authn/bootstrap.go | 89 ---- internal/storage/authn/bootstrap_test.go | 19 - internal/storage/authn/memory/store.go | 91 +++- internal/storage/authn/memory/store_test.go | 3 +- internal/storage/authn/redis/client.go | 62 +++ internal/storage/authn/redis/client_test.go | 103 ++++ internal/storage/authn/redis/store.go | 472 ++++++++++++++++++ internal/storage/authn/redis/store_test.go | 93 ++++ internal/storage/authn/testing/testing.go | 66 ++- internal/storage/oplock/memory/memory.go | 52 -- internal/storage/oplock/memory/memory_test.go | 11 - internal/storage/oplock/oplock.go | 30 -- internal/storage/oplock/testing/testing.go | 111 ---- internal/telemetry/telemetry_test.go | 4 +- 43 files changed, 1090 insertions(+), 1083 deletions(-) delete mode 100644 cmd/flipt/evaluate.go delete mode 100644 internal/cleanup/cleanup.go delete mode 100644 internal/cleanup/cleanup_test.go delete mode 100644 internal/storage/authn/bootstrap.go delete mode 100644 internal/storage/authn/bootstrap_test.go create mode 100644 internal/storage/authn/redis/client.go create mode 100644 internal/storage/authn/redis/client_test.go create mode 100644 internal/storage/authn/redis/store.go create mode 100644 internal/storage/authn/redis/store_test.go delete mode 100644 internal/storage/oplock/memory/memory.go delete mode 100644 internal/storage/oplock/memory/memory_test.go delete mode 100644 internal/storage/oplock/oplock.go delete mode 100644 internal/storage/oplock/testing/testing.go diff --git a/cmd/flipt/evaluate.go b/cmd/flipt/evaluate.go deleted file mode 100644 index 666dcd89b4..0000000000 --- a/cmd/flipt/evaluate.go +++ /dev/null @@ -1,222 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "strings" - "time" - - "github.com/google/uuid" - "github.com/spf13/cobra" - "go.flipt.io/flipt/rpc/flipt" - "go.flipt.io/flipt/rpc/flipt/evaluation" - sdk "go.flipt.io/flipt/sdk/go" -) - -type evaluateCommand struct { - address string - requestID string - entityID string - namespace string - watch bool - token string - interval time.Duration - contextValues []string -} - -func newEvaluateCommand() *cobra.Command { - c := &evaluateCommand{} - - cmd := &cobra.Command{ - Use: "evaluate [flagKey]", - Short: "Evaluate a flag", - Args: cobra.ExactArgs(1), - RunE: c.run, - } - - cmd.Flags().StringVarP( - &c.namespace, - "namespace", "n", - "default", - "flag namespace.", - ) - cmd.Flags().StringVarP( - &c.entityID, - "entity-id", "e", - uuid.NewString(), - "evaluation request entity id.", - ) - cmd.Flags().StringVarP( - &c.requestID, - "request-id", "r", - "", - "evaluation request id.", - ) - - cmd.Flags().StringArrayVarP( - &c.contextValues, - "context", "c", - []string{}, - "evaluation request context as key=value.", - ) - - cmd.Flags().StringVarP( - &c.address, - "address", "a", - "http://localhost:8080", - "address of Flipt instance.", - ) - - cmd.Flags().StringVarP( - &c.token, - "token", "t", - "", - "client token used to authenticate access to Flipt instance.", - ) - - cmd.Flags().BoolVarP( - &c.watch, - "watch", "w", - false, - "enable watch mode.", - ) - - cmd.Flags().DurationVarP( - &c.interval, - "interval", "i", - time.Second, - "interval between requests in watch mode.", - ) - - return cmd -} - -func (c *evaluateCommand) run(cmd *cobra.Command, args []string) error { - sdk, err := fliptSDK(c.address, c.token) - if err != nil { - return err - } - - ctx, cancel := context.WithCancel(cmd.Context()) - defer cancel() - - flagKey := strings.TrimSpace(args[0]) - flag, err := sdk.Flipt().GetFlag(ctx, &flipt.GetFlagRequest{NamespaceKey: c.namespace, Key: flagKey}) - if err != nil { - return err - } - - values := make(map[string]string, len(c.contextValues)) - for _, v := range c.contextValues { - tokens := strings.SplitN(v, "=", 2) - if len(tokens) != 2 { - return fmt.Errorf("invalid context pair: %v", v) - } - values[strings.TrimSpace(tokens[0])] = tokens[1] - } - - request := &evaluation.EvaluationRequest{ - FlagKey: flagKey, - NamespaceKey: c.namespace, - EntityId: c.entityID, - RequestId: c.requestID, - Context: values, - } - - if !c.watch { - return c.evaluate(ctx, flag.Type, sdk, request) - } - - ticker := time.NewTicker(c.interval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - err := c.evaluate(ctx, flag.Type, sdk, request) - if err != nil { - fmt.Printf("failed to evaluate: %s", err) - } - } - } -} - -type booleanEvaluateResponse struct { - FlagKey string `json:"flag_key,omitempty"` - Enabled bool `json:"enabled"` - Reason string `json:"reason,omitempty"` - RequestID string `json:"request_id,omitempty"` - RequestDurationMillis float64 `json:"request_duration_millis,omitempty"` - Timestamp time.Time `json:"timestamp,omitempty"` -} - -type variantEvaluationResponse struct { - FlagKey string `json:"flag_key,omitempty"` - Match bool `json:"match"` - Reason string `json:"reason,omitempty"` - VariantKey string `json:"variant_key,omitempty"` - VariantAttachment string `json:"variant_attachment,omitempty"` - SegmentKeys []string `json:"segment_keys,omitempty"` - RequestID string `json:"request_id,omitempty"` - RequestDurationMillis float64 `json:"request_duration_millis,omitempty"` - Timestamp time.Time `json:"timestamp,omitempty"` -} - -func (c *evaluateCommand) evaluate(ctx context.Context, flagType flipt.FlagType, sdk *sdk.SDK, req *evaluation.EvaluationRequest) error { - client := sdk.Evaluation() - switch flagType { - case flipt.FlagType_BOOLEAN_FLAG_TYPE: - response, err := client.Boolean(ctx, req) - if err != nil { - return err - } - - boolResponse := &booleanEvaluateResponse{ - FlagKey: response.FlagKey, - Enabled: response.Enabled, - Reason: response.Reason.String(), - RequestID: response.RequestId, - RequestDurationMillis: response.RequestDurationMillis, - Timestamp: response.Timestamp.AsTime(), - } - - out, err := json.Marshal(boolResponse) - if err != nil { - return err - } - - fmt.Println(string(out)) - - return nil - case flipt.FlagType_VARIANT_FLAG_TYPE: - response, err := client.Variant(ctx, req) - if err != nil { - return err - } - - variantResponse := &variantEvaluationResponse{ - FlagKey: response.FlagKey, - Match: response.Match, - Reason: response.Reason.String(), - VariantKey: response.VariantKey, - VariantAttachment: response.VariantAttachment, - SegmentKeys: response.SegmentKeys, - RequestID: response.RequestId, - RequestDurationMillis: response.RequestDurationMillis, - Timestamp: response.Timestamp.AsTime(), - } - - out, err := json.Marshal(variantResponse) - if err != nil { - return err - } - - fmt.Println(string(out)) - - return nil - default: - return fmt.Errorf("unsupported flag type: %v", flagType) - } -} diff --git a/cmd/flipt/main.go b/cmd/flipt/main.go index 663021d647..4fa86b2795 100644 --- a/cmd/flipt/main.go +++ b/cmd/flipt/main.go @@ -145,7 +145,6 @@ func exec() error { rootCmd.AddCommand(newConfigCommand()) rootCmd.AddCommand(newCompletionCommand()) rootCmd.AddCommand(newDocCommand()) - rootCmd.AddCommand(newEvaluateCommand()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/config/flipt.schema.cue b/config/flipt.schema.cue index cee3a721c4..57f5d4ad64 100644 --- a/config/flipt.schema.cue +++ b/config/flipt.schema.cue @@ -46,11 +46,12 @@ import "list" storage?: *{ type: "memory" - cleanup?: #authentication.#storage_cleanup + cleanup?: { + grace_period?: =~#duration | int | *"30m" + } } | { type: "redis" - cleanup?: #authentication.#storage_cleanup - connection: { + redis?: { host?: string | *"localhost" port?: int | *6379 require_tls?: bool | *false @@ -65,15 +66,13 @@ import "list" ca_cert_bytes?: string insecure_skip_tls?: bool | *false } - } - - #storage_cleanup: { - @jsonschema(id="storage_cleanup") - interval?: =~#duration | int | *"1h" - grace_period?: =~#duration | int | *"30m" + cleanup?: { + grace_period?: =~#duration | int | *"30m" + } } } + methods?: { token?: { enabled?: bool | *false diff --git a/config/flipt.schema.json b/config/flipt.schema.json index 1aff2e3b28..e809bb6a44 100644 --- a/config/flipt.schema.json +++ b/config/flipt.schema.json @@ -151,7 +151,14 @@ "enum": ["memory"] }, "cleanup": { - "$ref": "#/definitions/storage_cleanup" + "type": "object", + "properties": { + "grace_period": { + "type": "string", + "default": "30m", + "pattern": "^([0-9]+(ns|us|µs|ms|s|m|h))+$" + } + } } }, "additionalProperties": false @@ -164,9 +171,16 @@ "enum": ["redis"] }, "cleanup": { - "$ref": "#/definitions/storage_cleanup" + "type": "object", + "properties": { + "grace_period": { + "type": "string", + "default": "30m", + "pattern": "^([0-9]+(ns|us|µs|ms|s|m|h))+$" + } + } }, - "connection": { + "redis": { "type": "object", "properties": { "host": { @@ -996,35 +1010,6 @@ } }, "definitions": { - "storage_cleanup": { - "type": "object", - "properties": { - "interval": { - "oneOf": [ - { - "type": "string", - "pattern": "^([0-9]+(ns|us|µs|ms|s|m|h))+$" - }, - { - "type": "integer" - } - ], - "default": "1h" - }, - "grace_period": { - "oneOf": [ - { - "type": "string", - "pattern": "^([0-9]+(ns|us|µs|ms|s|m|h))+$" - }, - { - "type": "integer" - } - ], - "default": "30m" - } - } - }, "authentication_oidc_provider": { "type": "object", "properties": { diff --git a/go.mod b/go.mod index 23a92a6f24..dcfdf214b1 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/open-policy-agent/opa v0.70.0 github.com/prometheus/client_golang v1.20.5 github.com/prometheus/common v0.62.0 + github.com/redis/go-redis/v9 v9.7.0 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.10.0 @@ -71,8 +72,7 @@ require ( go.opentelemetry.io/otel/trace v1.34.0 go.uber.org/zap v1.27.0 gocloud.dev v0.40.0 - golang.org/x/crypto v0.32.0 - golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c + golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 golang.org/x/net v0.34.0 golang.org/x/oauth2 v0.25.0 golang.org/x/sync v0.10.0 @@ -143,9 +143,11 @@ require ( github.com/containerd/log v0.1.0 // indirect github.com/containerd/platforms v0.2.1 // indirect github.com/cpuguy83/dockercfg v0.3.2 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect + github.com/creack/pty v1.1.20 // indirect github.com/cyphar/filepath-securejoin v0.3.6 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/docker v27.3.1+incompatible // indirect github.com/docker/go-units v0.5.0 // indirect @@ -193,7 +195,7 @@ require ( github.com/kylelemons/godebug v1.1.0 // indirect github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae // indirect github.com/magiconair/properties v1.8.7 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect github.com/moby/docker-image-spec v1.3.1 // indirect @@ -223,7 +225,6 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - github.com/redis/go-redis/v9 v9.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect @@ -260,6 +261,7 @@ require ( go.opentelemetry.io/proto/otlp v1.4.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/crypto v0.32.0 // indirect golang.org/x/mod v0.22.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/term v0.28.0 // indirect diff --git a/go.sum b/go.sum index 3415f1c814..7c08e8f119 100644 --- a/go.sum +++ b/go.sum @@ -749,6 +749,10 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/bytecodealliance/wasmtime-go/v3 v3.0.2 h1:3uZCA/BLTIu+DqCfguByNMJa2HVHpXvjfy0Dy7g6fuA= github.com/bytecodealliance/wasmtime-go/v3 v3.0.2/go.mod h1:RnUjnIXxEJcL6BgCvNyzCCRzZcxCgsZCi+RNlvYor5Q= @@ -805,12 +809,12 @@ github.com/coreos/go-oidc/v3 v3.12.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDh github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc= -github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.6 h1:XJtiaUW6dEEqVuZiMTn1ldk455QWwEIsMIJlo5vtkx0= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.17/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= -github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= -github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= +github.com/creack/pty v1.1.20 h1:VIPb/a2s17qNeQgDnkfZC35RScx+blkKF8GV68n80J4= +github.com/creack/pty v1.1.20/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/cyphar/filepath-securejoin v0.3.6 h1:4d9N5ykBnSp5Xn2JkhocYDkOpURL/18CYMpo6xB9uWM= github.com/cyphar/filepath-securejoin v0.3.6/go.mod h1:Sdj7gXlvMcPZsbhwhQ33GguGLDGQL7h7bg04C/+u9jI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -1181,8 +1185,8 @@ github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3v github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= @@ -1497,8 +1501,8 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= -golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY= -golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= +golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 h1:yqrTHse8TCMW1M1ZCP+VAR/l0kKxwaAIqN/il7x4voA= +golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/go.work.sum b/go.work.sum index eb3a21c5d6..a26a18b382 100644 --- a/go.work.sum +++ b/go.work.sum @@ -723,8 +723,6 @@ github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnweb github.com/bradleyjkemp/cupaloy/v2 v2.6.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0= github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk= github.com/bshuster-repo/logrus-logstash-hook v1.0.0/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk= -github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= -github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= @@ -1231,7 +1229,6 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= -github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= @@ -1295,7 +1292,9 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22 github.com/jonboulle/clockwork v0.2.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/josephspurrier/goversioninfo v1.4.0/go.mod h1:JWzv5rKQr+MmW+LvM412ToT/IkYDZjaclF2pKDss8IY= +github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/josharian/txtarfs v0.0.0-20210218200122-0702f000015a/go.mod h1:izVPOvVRsHiKkeGCT6tYBNWyDVuzj9wAaBb5R9qamfw= +github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -1364,7 +1363,6 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= -github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.6/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= @@ -1377,6 +1375,8 @@ github.com/mattn/goveralls v0.0.12/go.mod h1:44ImGEUfmqH8bBtaMrYKsM65LXfNLWmwaxF github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY= +github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw= +github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA= github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg= github.com/microsoft/go-mssqldb v1.0.0/go.mod h1:+4wZTUnz/SV6nffv+RRRB/ss8jPng5Sho2SmM1l2ts4= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -1455,6 +1455,7 @@ github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1: github.com/networkplumbing/go-nft v0.2.0/go.mod h1:HnnM+tYvlGAsMU7yoYwXEVLLiDW9gdMmb5HoGcwpuQs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nishanths/predeclared v0.0.0-20190419143655-18a43bb90ffc/go.mod h1:62PewwiQTlm/7Rj+cxVYqZvDIUc+JjZq6GHAC1fsObQ= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -1468,7 +1469,7 @@ github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+ github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= -github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM= github.com/onsi/ginkgo/v2 v2.17.1/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3HigUjbIBOs= github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= @@ -1597,8 +1598,6 @@ github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqn github.com/remyoudompheng/go-dbus v0.0.0-20121104212943-b7232d34b1d5/go.mod h1:+u151txRmLpwxBmpYn9z3d1sdJdjRPQpsXuYeY9jNls= github.com/remyoudompheng/go-liblzma v0.0.0-20190506200333-81bf2d431b96/go.mod h1:90HvCY7+oHHUKkbeMCiHt1WuFR2/hPJ9QrljDG+v6ls= github.com/remyoudompheng/go-misc v0.0.0-20190427085024-2d6ac652a50e/go.mod h1:80FQABjoFzZ2M5uEa6FUaJYEmqU2UOKojlFVak1UAwI= -github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= -github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= @@ -1608,7 +1607,6 @@ github.com/rqlite/gorqlite v0.0.0-20230708021416-2acd02b70b79/go.mod h1:xF/KoXmr github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday v1.6.0 h1:KqfZb0pUVN2lYqZUYRddxF4OR8ZMURnJIG5Y3VRLtww= -github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4= @@ -2046,7 +2044,6 @@ golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210903071746-97244b99971b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220906165534-d0df966e6959/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -2307,6 +2304,7 @@ gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.6/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/cleanup/cleanup.go b/internal/cleanup/cleanup.go deleted file mode 100644 index 9aa76fd0bf..0000000000 --- a/internal/cleanup/cleanup.go +++ /dev/null @@ -1,123 +0,0 @@ -package cleanup - -import ( - "context" - "fmt" - "time" - - "go.flipt.io/flipt/internal/config" - authstorage "go.flipt.io/flipt/internal/storage/authn" - "go.flipt.io/flipt/internal/storage/oplock" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -const minCleanupInterval = 5 * time.Minute - -// AuthenticationService is configured to run background goroutines which -// will clear out expired authentication tokens. -type AuthenticationService struct { - logger *zap.Logger - lock oplock.Service - store authstorage.Store - config config.AuthenticationConfig - - errgroup errgroup.Group - cancel func() -} - -// NewAuthenticationService constructs and configures a new instance of authentication service. -func NewAuthenticationService(logger *zap.Logger, lock oplock.Service, store authstorage.Store, config config.AuthenticationConfig) *AuthenticationService { - return &AuthenticationService{ - logger: logger.With(zap.String("service", "authentication cleanup service")), - lock: lock, - store: store, - config: config, - cancel: func() {}, - } -} - -// Run starts up a background goroutine per configure authentication method schedule. -func (s *AuthenticationService) Run(ctx context.Context) { - ctx, s.cancel = context.WithCancel(ctx) - - for _, info := range s.config.Methods.AllMethods(ctx) { - logger := s.logger.With(zap.Stringer("method", info.Method)) - if info.Cleanup == nil { - if info.Enabled { - logger.Debug("cleanup for auth method not defined (skipping)") - } - - continue - } - - if !info.Enabled { - logger.Debug("cleanup for auth method not required (skipping)") - continue - } - - var ( - method = info.Method - schedule = info.Cleanup - operation = oplock.Operation(fmt.Sprintf("cleanup_auth_%s", method)) - ) - - s.errgroup.Go(func() error { - // on the first attempt to run the cleanup authentication service - // we attempt to obtain the lock immediately. If the lock is already - // held the service should return false and return the current acquired - // current timestamp - acquiredUntil := time.Now().UTC() - for { - select { - case <-ctx.Done(): - return nil - case <-time.After(time.Until(acquiredUntil)): - } - - acquired, entry, err := s.lock.TryAcquire(ctx, operation, schedule.Interval) - if err != nil { - // ensure we dont go into hot loop when the operation lock service - // enters an error state by ensuring we sleep for at-least the minimum - // interval. - now := time.Now().UTC() - if acquiredUntil.Before(now) { - acquiredUntil = now.Add(minCleanupInterval) - } - - logger.Warn("attempting to acquire lock", zap.Error(err)) - continue - } - - // update the next sleep target to current entries acquired until - acquiredUntil = entry.AcquiredUntil - - if !acquired { - logger.Debug("cleanup process not acquired", zap.Time("next_attempt", entry.AcquiredUntil)) - continue - } - - expiredBefore := time.Now().UTC().Add(-schedule.GracePeriod) - logger.Info("cleanup process deleting authentications", zap.Time("expired_before", expiredBefore)) - if err := s.store.DeleteAuthentications(ctx, authstorage.Delete( - authstorage.WithMethod(method), - authstorage.WithExpiredBefore(expiredBefore), - )); err != nil { - logger.Error("attempting to delete expired authentications", zap.Error(err)) - } - } - }) - } -} - -// Stop signals for the cleanup goroutines to cancel and waits for them to finish. -func (s *AuthenticationService) Shutdown(ctx context.Context) error { - s.logger.Debug("shutting down...") - defer func() { - s.logger.Debug("shutdown complete") - }() - - s.cancel() - - return s.errgroup.Wait() -} diff --git a/internal/cleanup/cleanup_test.go b/internal/cleanup/cleanup_test.go deleted file mode 100644 index b07877ec82..0000000000 --- a/internal/cleanup/cleanup_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package cleanup - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.flipt.io/flipt/internal/config" - authstorage "go.flipt.io/flipt/internal/storage/authn" - inmemauth "go.flipt.io/flipt/internal/storage/authn/memory" - inmemoplock "go.flipt.io/flipt/internal/storage/oplock/memory" - "go.flipt.io/flipt/rpc/flipt/auth" - "go.uber.org/zap/zaptest" - "google.golang.org/protobuf/types/known/timestamppb" -) - -func TestCleanup(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - var ( - ctx = context.Background() - logger = zaptest.NewLogger(t) - authstore = inmemauth.NewStore() - lock = inmemoplock.New() - authConfig = config.AuthenticationConfig{ - Methods: config.AuthenticationMethods{}, - } - ) - - // enable all methods and set their cleanup configuration - for _, info := range authConfig.Methods.AllMethods(ctx) { - info.Enable(t) - info.SetCleanup(t, config.AuthenticationCleanupSchedule{ - Interval: time.Second, - GracePeriod: 5 * time.Second, - }) - } - - // create an initial non-expiring token - clientToken, storedAuth, err := authstore.CreateAuthentication( - ctx, - &authstorage.CreateAuthenticationRequest{Method: auth.Method_METHOD_TOKEN}, - ) - require.NoError(t, err) - - for i := 0; i < 5; i++ { - // run five instances of service - // it should be a safe operation given they share the same lock service - service := NewAuthenticationService(logger, lock, authstore, authConfig) - service.Run(ctx) - t.Cleanup(func() { - require.NoError(t, service.Shutdown(context.TODO())) - }) - } - - t.Run("ensure non-expiring token exists", func(t *testing.T) { - retrievedAuth, err := authstore.GetAuthenticationByClientToken(ctx, clientToken) - require.NoError(t, err) - assert.Equal(t, storedAuth, retrievedAuth) - }) - - for _, info := range authConfig.Methods.AllMethods(ctx) { - info := info - - t.Run(fmt.Sprintf("Authentication Method %q", info.Method), func(t *testing.T) { - t.Parallel() - - t.Log("create an expiring token and ensure it exists") - clientToken, storedAuth, err := authstore.CreateAuthentication( - ctx, - &authstorage.CreateAuthenticationRequest{ - Method: info.Method, - ExpiresAt: timestamppb.New(time.Now().UTC().Add(5 * time.Second)), - }, - ) - require.NoError(t, err) - - retrievedAuth, err := authstore.GetAuthenticationByClientToken(ctx, clientToken) - require.NoError(t, err) - assert.Equal(t, storedAuth, retrievedAuth) - - t.Log("ensure grace period protects token from being deleted") - // token should still exist as it wont be deleted until - // expiry + grace period (5s + 5s == 10s) - time.Sleep(5 * time.Second) - - retrievedAuth, err = authstore.GetAuthenticationByClientToken(ctx, clientToken) - require.NoError(t, err) - assert.Equal(t, storedAuth, retrievedAuth) - - // ensure authentication is expired but still persisted - assert.True(t, retrievedAuth.ExpiresAt.AsTime().Before(time.Now().UTC())) - - t.Log("once expiry and grace period ellapses ensure token is deleted") - time.Sleep(10 * time.Second) - - _, err = authstore.GetAuthenticationByClientToken(ctx, clientToken) - require.Error(t, err, "token should not be fetchable") - }) - } -} diff --git a/internal/cmd/authn.go b/internal/cmd/authn.go index f126b22ac8..fc1a23dc74 100644 --- a/internal/cmd/authn.go +++ b/internal/cmd/authn.go @@ -7,7 +7,6 @@ import ( "net/http" "os" "regexp" - "strings" "github.com/go-chi/chi/v5" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/selector" @@ -27,6 +26,7 @@ import ( "go.flipt.io/flipt/internal/server/authn/public" storageauth "go.flipt.io/flipt/internal/storage/authn" storageauthmemory "go.flipt.io/flipt/internal/storage/authn/memory" + storageauthredis "go.flipt.io/flipt/internal/storage/authn/redis" rpcauth "go.flipt.io/flipt/rpc/flipt/auth" "go.uber.org/zap" "google.golang.org/grpc" @@ -36,46 +36,53 @@ func getAuthStore( ctx context.Context, logger *zap.Logger, cfg *config.Config, - forceMigrate bool, -) (storageauth.Store, func(context.Context) error, error) { +) (storageauth.Store, error) { + var ( - store storageauth.Store = storageauthmemory.NewStore() - shutdown = func(context.Context) error { return nil } + cleanupGracePeriod = cfg.Authentication.Session.Storage.Cleanup.GracePeriod + store storageauth.Store = storageauthmemory.NewStore(logger, storageauthmemory.WithCleanupGracePeriod(cleanupGracePeriod)) ) - return store, shutdown, nil + if cfg.Authentication.Session.Storage.Type == config.AuthenticationSessionStorageTypeRedis { + rdb, err := storageauthredis.NewClient(cfg.Authentication.Session.Storage.Redis) + if err != nil { + return nil, fmt.Errorf("failed to create redis client: %w", err) + } + + store = storageauthredis.NewStore(rdb, logger, storageauthredis.WithCleanupGracePeriod(cleanupGracePeriod)) + } + + return store, nil } func authenticationGRPC( ctx context.Context, logger *zap.Logger, cfg *config.Config, - forceMigrate bool, authOpts ...containers.Option[authmiddlewaregrpc.InterceptorOptions], ) (grpcRegisterers, []grpc.UnaryServerInterceptor, func(context.Context) error, error) { - shutdown := func(ctx context.Context) error { - return nil - } - - authCfg := cfg.Authentication + var ( + shutdown = func(ctx context.Context) error { + return nil + } + authCfg = cfg.Authentication + ) - // NOTE: we skip attempting to connect to any database in the situation that either the git, local, or object - // FS backends are configured. - // All that is required to establish a connection for authentication is to either make auth required - // or configure at-least one authentication method (e.g. enable token method). if !authCfg.Enabled() { return grpcRegisterers{ public.NewServer(logger, authCfg), - authn.NewServer(logger, storageauthmemory.NewStore()), + authn.NewServer(logger, storageauthmemory.NewStore(logger)), }, nil, shutdown, nil } - store, shutdown, err := getAuthStore(ctx, logger, cfg, forceMigrate) + store, err := getAuthStore(ctx, logger, cfg) if err != nil { return nil, nil, nil, err } + shutdown = store.Shutdown + var ( authServer = authn.NewServer(logger, store) publicServer = public.NewServer(logger, authCfg) @@ -89,33 +96,6 @@ func authenticationGRPC( // register auth method token service if authCfg.Methods.Token.Enabled { - opts := []storageauth.BootstrapOption{} - - // if a bootstrap token is provided, use it - if authCfg.Methods.Token.Method.Bootstrap.Token != "" { - opts = append(opts, storageauth.WithToken(authCfg.Methods.Token.Method.Bootstrap.Token)) - } - - // if a bootstrap expiration is provided, use it - if authCfg.Methods.Token.Method.Bootstrap.Expiration != 0 { - opts = append(opts, storageauth.WithExpiration(authCfg.Methods.Token.Method.Bootstrap.Expiration)) - } - - // add any additional metadata if defined - for k, v := range authCfg.Methods.Token.Method.Bootstrap.Metadata { - opts = append(opts, storageauth.WithMetadataAttribute(strings.ToLower(k), v)) - } - - // attempt to bootstrap authentication store - clientToken, err := storageauth.Bootstrap(ctx, store, opts...) - if err != nil { - return nil, nil, nil, fmt.Errorf("configuring token authentication: %w", err) - } - - if clientToken != "" { - logger.Info("access token created", zap.String("client_token", clientToken)) - } - register.Add(authtoken.NewServer(logger, store)) logger.Debug("authentication method \"token\" server registered") @@ -149,9 +129,10 @@ func authenticationGRPC( // only enable enforcement middleware if authentication required if authCfg.Required { if authCfg.Methods.JWT.Enabled { - authJWT := authCfg.Methods.JWT - - var ks jwt.KeySet + var ( + authJWT = authCfg.Methods.JWT + ks jwt.KeySet + ) if authJWT.Method.JWKSURL != "" { ks, err = jwt.NewJSONWebKeySet(ctx, authJWT.Method.JWKSURL, "") diff --git a/internal/cmd/grpc.go b/internal/cmd/grpc.go index 708152f941..5290fa4f40 100644 --- a/internal/cmd/grpc.go +++ b/internal/cmd/grpc.go @@ -223,7 +223,6 @@ func NewGRPCServer( ctx, logger, cfg, - forceMigrate, authnOpts..., ) if err != nil { diff --git a/internal/config/authentication.go b/internal/config/authentication.go index fedbe35058..4102367a79 100644 --- a/internal/config/authentication.go +++ b/internal/config/authentication.go @@ -58,8 +58,8 @@ type AuthenticationConfig struct { OFREP bool `json:"ofrep,omitempty" mapstructure:"ofrep" yaml:"ofrep,omitempty"` } `json:"exclude,omitempty" mapstructure:"exclude" yaml:"exclude,omitempty"` - Session AuthenticationSession `json:"session,omitempty" mapstructure:"session" yaml:"session,omitempty"` - Methods AuthenticationMethods `json:"methods,omitempty" mapstructure:"methods" yaml:"methods,omitempty"` + Session AuthenticationSessionConfig `json:"session,omitempty" mapstructure:"session" yaml:"session,omitempty"` + Methods AuthenticationMethodsConfig `json:"methods,omitempty" mapstructure:"methods" yaml:"methods,omitempty"` } // Enabled returns true if authentication is marked as required @@ -84,17 +84,6 @@ func (c AuthenticationConfig) IsZero() bool { return !c.Enabled() } -// ShouldRunCleanup returns true if the cleanup background process should be started. -// It returns true given at-least 1 method is enabled and it's associated schedule -// has been configured (non-nil). -func (c AuthenticationConfig) ShouldRunCleanup() (shouldCleanup bool) { - for _, info := range c.Methods.AllMethods(context.Background()) { - shouldCleanup = shouldCleanup || info.RequiresCleanup() - } - - return -} - func (c *AuthenticationConfig) setDefaults(v *viper.Viper) error { methods := map[string]any{} @@ -107,11 +96,6 @@ func (c *AuthenticationConfig) setDefaults(v *viper.Viper) error { if v.GetBool(prefix + ".enabled") { // apply any method specific defaults info.setDefaults(method) - // set default cleanup - method["cleanup"] = map[string]any{ - "interval": time.Hour, - "grace_period": 30 * time.Minute, - } } methods[info.Name()] = method @@ -139,31 +123,10 @@ func (c *AuthenticationConfig) SessionEnabled() bool { } func (c *AuthenticationConfig) validate() error { - var sessionEnabled bool - - for _, info := range c.Methods.AllMethods(context.Background()) { - if !info.RequiresCleanup() { - continue - } - - sessionEnabled = sessionEnabled || (info.Enabled && info.SessionCompatible) - if info.Cleanup == nil { - continue - } - - if info.Cleanup.Interval <= 0 { - return errFieldPositiveDuration("authentication", "cleanup_interval") - } - - if info.Cleanup.GracePeriod <= 0 { - return errFieldPositiveDuration("authentication", "cleanup_grace_period") - } - } - // ensure that when a session compatible authentication method has been // enabled that the session cookie domain has been configured with a non // empty value. - if sessionEnabled { + if c.SessionEnabled() { if c.Session.Domain == "" { err := errFieldRequired("authentication", "session_domain") return fmt.Errorf("when session compatible auth method enabled: %w", err) @@ -178,6 +141,10 @@ func (c *AuthenticationConfig) validate() error { // domain cookies are not allowed to have a scheme or port // https://github.com/golang/go/issues/28297 c.Session.Domain = host + + if err := c.Session.Storage.validate(); err != nil { + return errFieldWrap("authentication", "session_storage", err) + } } for _, info := range c.Methods.AllMethods(context.Background()) { @@ -200,9 +167,9 @@ func getHostname(rawurl string) (string, error) { return strings.Split(u.Host, ":")[0], nil } -// AuthenticationSession configures the session produced for browsers when +// AuthenticationSessionConfig configures the session produced for browsers when // establishing authentication via HTTP. -type AuthenticationSession struct { +type AuthenticationSessionConfig struct { // Domain is the domain on which to register session cookies. Domain string `json:"domain,omitempty" mapstructure:"domain" yaml:"domain,omitempty"` // Secure sets the secure property (i.e. HTTPS only) on both the state and token cookies. @@ -213,18 +180,87 @@ type AuthenticationSession struct { // StateLifetime is the lifetime duration of the state cookie. StateLifetime time.Duration `json:"stateLifetime,omitempty" mapstructure:"state_lifetime" yaml:"state_lifetime,omitempty"` // CSRF configures CSRF provention mechanisms. - CSRF AuthenticationSessionCSRF `json:"csrf,omitempty" mapstructure:"csrf" yaml:"csrf,omitempty"` + CSRF AuthenticationSessionCSRFConfig `json:"csrf,omitempty" mapstructure:"csrf" yaml:"csrf,omitempty"` + // Storage configures the storage mechanism for the session. + Storage AuthenticationSessionStorageConfig `json:"storage,omitempty" mapstructure:"storage" yaml:"storage,omitempty"` +} + +type AuthenticationSessionStorageType string + +const ( + AuthenticationSessionStorageTypeMemory = AuthenticationSessionStorageType("memory") + AuthenticationSessionStorageTypeRedis = AuthenticationSessionStorageType("redis") +) + +type AuthenticationSessionStorageConfig struct { + Type AuthenticationSessionStorageType `json:"type" mapstructure:"type" yaml:"type"` + Redis AuthenticationSessionStorageRedisConfig `json:"redis,omitempty" mapstructure:"redis" yaml:"redis,omitempty"` + Cleanup AuthenticationSessionStorageCleanupConfig `json:"cleanup,omitempty" mapstructure:"cleanup" yaml:"cleanup,omitempty"` +} + +func (c AuthenticationSessionStorageConfig) validate() error { + return c.Cleanup.validate() +} + +func (c AuthenticationSessionStorageConfig) setDefaults(v *viper.Viper) error { + v.SetDefault("authentication.session.storage", map[string]any{ + "type": "memory", + "cleanup": map[string]any{ + "grace_period": "30m", + }, + }) + + return nil +} + +// AuthenticationSessionStorageCleanupConfig configures the schedule for cleaning up expired authentication records. +type AuthenticationSessionStorageCleanupConfig struct { + GracePeriod time.Duration `json:"gracePeriod,omitempty" mapstructure:"grace_period" yaml:"grace_period,omitempty"` +} + +func (c AuthenticationSessionStorageCleanupConfig) validate() error { + if c.GracePeriod < 0 { + return errFieldPositiveDuration("", "cleanup_grace_period") + } + + return nil } -// AuthenticationSessionCSRF configures cross-site request forgery prevention. -type AuthenticationSessionCSRF struct { +// AuthenticationSessionStorageRedisConfig contains fields, which configure the connection +// credentials for redis backed session storage. +type AuthenticationSessionStorageRedisConfig struct { + Host string `json:"host,omitempty" mapstructure:"host" yaml:"host,omitempty"` + Port int `json:"port,omitempty" mapstructure:"port" yaml:"port,omitempty"` + RequireTLS bool `json:"requireTLS,omitempty" mapstructure:"require_tls" yaml:"require_tls,omitempty"` + Username string `json:"-" mapstructure:"username" yaml:"-"` + Password string `json:"-" mapstructure:"password" yaml:"-"` + DB int `json:"db,omitempty" mapstructure:"db" yaml:"db,omitempty"` + PoolSize int `json:"poolSize" mapstructure:"pool_size" yaml:"pool_size"` + MinIdleConn int `json:"minIdleConn" mapstructure:"min_idle_conn" yaml:"min_idle_conn"` + ConnMaxIdleTime time.Duration `json:"connMaxIdleTime" mapstructure:"conn_max_idle_time" yaml:"conn_max_idle_time"` + NetTimeout time.Duration `json:"netTimeout" mapstructure:"net_timeout" yaml:"net_timeout"` + CaCertBytes string `json:"-" mapstructure:"ca_cert_bytes" yaml:"-"` + CaCertPath string `json:"-" mapstructure:"ca_cert_path" yaml:"-"` + InsecureSkipTLS bool `json:"-" mapstructure:"insecure_skip_tls" yaml:"-"` +} + +func (cfg *AuthenticationSessionStorageRedisConfig) validate() error { + if cfg.CaCertBytes != "" && cfg.CaCertPath != "" { + return errString("", "please provide exclusively one of ca_cert_bytes or ca_cert_path") + } + + return nil +} + +// AuthenticationSessionCSRFConfig configures cross-site request forgery prevention. +type AuthenticationSessionCSRFConfig struct { // Key is the private key string used to authenticate csrf tokens. Key string `json:"-" mapstructure:"key"` } -// AuthenticationMethods is a set of configuration for each authentication +// AuthenticationMethodsConfig is a set of configuration for each authentication // method available for use within Flipt. -type AuthenticationMethods struct { +type AuthenticationMethodsConfig struct { Token AuthenticationMethod[AuthenticationMethodTokenConfig] `json:"token,omitempty" mapstructure:"token" yaml:"token,omitempty"` Github AuthenticationMethod[AuthenticationMethodGithubConfig] `json:"github,omitempty" mapstructure:"github" yaml:"github,omitempty"` OIDC AuthenticationMethod[AuthenticationMethodOIDCConfig] `json:"oidc,omitempty" mapstructure:"oidc" yaml:"oidc,omitempty"` @@ -233,7 +269,7 @@ type AuthenticationMethods struct { } // AllMethods returns all the AuthenticationMethod instances available. -func (a *AuthenticationMethods) AllMethods(ctx context.Context) []StaticAuthenticationMethodInfo { +func (a *AuthenticationMethodsConfig) AllMethods(ctx context.Context) []StaticAuthenticationMethodInfo { return []StaticAuthenticationMethodInfo{ a.Token.info(ctx), a.Github.info(ctx), @@ -255,7 +291,7 @@ func getForwardPrefix(ctx context.Context) string { } // EnabledMethods returns all the AuthenticationMethod instances that have been enabled. -func (a *AuthenticationMethods) EnabledMethods() []StaticAuthenticationMethodInfo { +func (a *AuthenticationMethodsConfig) EnabledMethods() []StaticAuthenticationMethodInfo { var enabled []StaticAuthenticationMethodInfo for _, info := range a.AllMethods(context.Background()) { if info.Enabled { @@ -271,7 +307,6 @@ func (a *AuthenticationMethods) EnabledMethods() []StaticAuthenticationMethodInf type StaticAuthenticationMethodInfo struct { AuthenticationMethodInfo Enabled bool - Cleanup *AuthenticationCleanupSchedule // used for bootstrapping defaults setDefaults func(map[string]any) @@ -281,7 +316,6 @@ type StaticAuthenticationMethodInfo struct { // used for testing purposes to ensure all methods // are appropriately cleaned up via the background process. setEnabled func() - setCleanup func(AuthenticationCleanupSchedule) } // Enable can only be called in a testing scenario. @@ -290,17 +324,6 @@ func (s StaticAuthenticationMethodInfo) Enable(t *testing.T) { s.setEnabled() } -// SetCleanup can only be called in a testing scenario. -// It is used to configure cleanup for a target method without having a concrete reference. -func (s StaticAuthenticationMethodInfo) SetCleanup(t *testing.T, c AuthenticationCleanupSchedule) { - s.setCleanup(c) -} - -// RequiresCleanup returns true if the method is enabled and requires cleanup. -func (s StaticAuthenticationMethodInfo) RequiresCleanup() bool { - return s.Enabled && s.Cleanup != nil -} - // AuthenticationMethodInfo is a structure which describes properties // of a particular authentication method. // i.e. the name and whether or not the method is session compatible. @@ -331,9 +354,8 @@ type AuthenticationMethodInfoProvider interface { // the AuthenticationMethodInfoProvider to be valid at compile time. // nolint:musttag type AuthenticationMethod[C AuthenticationMethodInfoProvider] struct { - Method C `mapstructure:",squash"` - Enabled bool `json:"enabled,omitempty" mapstructure:"enabled" yaml:"enabled,omitempty"` - Cleanup *AuthenticationCleanupSchedule `json:"cleanup,omitempty" mapstructure:"cleanup,omitempty" yaml:"cleanup,omitempty"` + Method C `mapstructure:",squash"` + Enabled bool `json:"enabled,omitempty" mapstructure:"enabled" yaml:"enabled,omitempty"` } func (a *AuthenticationMethod[C]) setDefaults(defaults map[string]any) { @@ -344,16 +366,12 @@ func (a *AuthenticationMethod[C]) info(ctx context.Context) StaticAuthentication return StaticAuthenticationMethodInfo{ AuthenticationMethodInfo: a.Method.info(ctx), Enabled: a.Enabled, - Cleanup: a.Cleanup, setDefaults: a.setDefaults, validate: a.validate, setEnabled: func() { a.Enabled = true }, - setCleanup: func(c AuthenticationCleanupSchedule) { - a.Cleanup = &c - }, } } @@ -483,12 +501,6 @@ func (a AuthenticationMethodOIDCProvider) validate() error { return nil } -// AuthenticationCleanupSchedule is used to configure a cleanup goroutine. -type AuthenticationCleanupSchedule struct { - Interval time.Duration `json:"interval,omitempty" mapstructure:"interval" yaml:"interval,omitempty"` - GracePeriod time.Duration `json:"gracePeriod,omitempty" mapstructure:"grace_period" yaml:"grace_period,omitempty"` -} - // AuthenticationMethodKubernetesConfig contains the fields necessary for the Kubernetes authentication // method to be performed. This method supports Flipt being deployed in a Kubernetes environment // and allowing it to exchange client tokens for valid service account tokens presented via this method. diff --git a/internal/config/config.go b/internal/config/config.go index 35169716db..201779357c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -614,7 +614,7 @@ func Default() *Config { }, Authentication: AuthenticationConfig{ - Session: AuthenticationSession{ + Session: AuthenticationSessionConfig{ TokenLifetime: 24 * time.Hour, StateLifetime: 10 * time.Minute, }, diff --git a/internal/config/config_test.go b/internal/config/config_test.go index f07682f2a3..3eeeee4bc3 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -236,7 +236,7 @@ func TestLoad(t *testing.T) { expected: func() *Config { cfg := Default() cfg.Authentication.Required = true - cfg.Authentication.Methods = AuthenticationMethods{ + cfg.Authentication.Methods = AuthenticationMethodsConfig{ Token: AuthenticationMethod[AuthenticationMethodTokenConfig]{ Enabled: true, Method: AuthenticationMethodTokenConfig{ @@ -245,10 +245,6 @@ func TestLoad(t *testing.T) { Expiration: 24 * time.Hour, }, }, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: time.Hour, - GracePeriod: 30 * time.Minute, - }, }, } return cfg @@ -260,21 +256,18 @@ func TestLoad(t *testing.T) { expected: func() *Config { cfg := Default() cfg.Authentication.Required = true - cfg.Authentication.Session.Domain = "localhost" - cfg.Authentication.Methods = AuthenticationMethods{ + cfg.Authentication.Session = AuthenticationSessionConfig{ + Domain: "localhost", + Storage: AuthenticationSessionStorageConfig{ + Type: AuthenticationSessionStorageTypeMemory, + }, + } + cfg.Authentication.Methods = AuthenticationMethodsConfig{ Token: AuthenticationMethod[AuthenticationMethodTokenConfig]{ Enabled: true, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: time.Hour, - GracePeriod: 30 * time.Minute, - }, }, OIDC: AuthenticationMethod[AuthenticationMethodOIDCConfig]{ Enabled: true, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: time.Hour, - GracePeriod: 30 * time.Minute, - }, }, } return cfg @@ -286,14 +279,15 @@ func TestLoad(t *testing.T) { expected: func() *Config { cfg := Default() cfg.Authentication.Required = true - cfg.Authentication.Session.Domain = "localhost" - cfg.Authentication.Methods = AuthenticationMethods{ + cfg.Authentication.Session = AuthenticationSessionConfig{ + Domain: "localhost", + Storage: AuthenticationSessionStorageConfig{ + Type: AuthenticationSessionStorageTypeMemory, + }, + } + cfg.Authentication.Methods = AuthenticationMethodsConfig{ OIDC: AuthenticationMethod[AuthenticationMethodOIDCConfig]{ Enabled: true, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: time.Hour, - GracePeriod: 30 * time.Minute, - }, Method: AuthenticationMethodOIDCConfig{ Providers: map[string]AuthenticationMethodOIDCProvider{ "foo": { @@ -312,7 +306,8 @@ func TestLoad(t *testing.T) { }, }, }, - }} + }, + } return cfg }, }, @@ -322,7 +317,7 @@ func TestLoad(t *testing.T) { expected: func() *Config { cfg := Default() cfg.Authentication.Required = true - cfg.Authentication.Methods = AuthenticationMethods{ + cfg.Authentication.Methods = AuthenticationMethodsConfig{ Kubernetes: AuthenticationMethod[AuthenticationMethodKubernetesConfig]{ Enabled: true, Method: AuthenticationMethodKubernetesConfig{ @@ -330,10 +325,6 @@ func TestLoad(t *testing.T) { CAPath: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", ServiceAccountTokenPath: "/var/run/secrets/kubernetes.io/serviceaccount/token", }, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: time.Hour, - GracePeriod: 30 * time.Minute, - }, }, } return cfg @@ -423,22 +414,21 @@ func TestLoad(t *testing.T) { cfg.Authentication = AuthenticationConfig{ Required: true, - Session: AuthenticationSession{ + Session: AuthenticationSessionConfig{ Domain: "auth.flipt.io", Secure: true, TokenLifetime: 24 * time.Hour, StateLifetime: 10 * time.Minute, - CSRF: AuthenticationSessionCSRF{ + CSRF: AuthenticationSessionCSRFConfig{ Key: "abcdefghijklmnopqrstuvwxyz1234567890", //gitleaks:allow }, + Storage: AuthenticationSessionStorageConfig{ + Type: AuthenticationSessionStorageTypeMemory, + }, }, - Methods: AuthenticationMethods{ + Methods: AuthenticationMethodsConfig{ Token: AuthenticationMethod[AuthenticationMethodTokenConfig]{ Enabled: true, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: 2 * time.Hour, - GracePeriod: 48 * time.Hour, - }, }, OIDC: AuthenticationMethod[AuthenticationMethodOIDCConfig]{ Method: AuthenticationMethodOIDCConfig{ @@ -452,10 +442,6 @@ func TestLoad(t *testing.T) { }, }, Enabled: true, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: 2 * time.Hour, - GracePeriod: 48 * time.Hour, - }, }, Kubernetes: AuthenticationMethod[AuthenticationMethodKubernetesConfig]{ Enabled: true, @@ -464,10 +450,6 @@ func TestLoad(t *testing.T) { CAPath: "/path/to/ca/certificate/ca.pem", ServiceAccountTokenPath: "/path/to/sa/token", }, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: 2 * time.Hour, - GracePeriod: 48 * time.Hour, - }, }, Github: AuthenticationMethod[AuthenticationMethodGithubConfig]{ Method: AuthenticationMethodGithubConfig{ @@ -476,10 +458,6 @@ func TestLoad(t *testing.T) { RedirectAddress: "http://auth.flipt.io", }, Enabled: true, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: 2 * time.Hour, - GracePeriod: 48 * time.Hour, - }, }, }, } @@ -557,22 +535,21 @@ func TestLoad(t *testing.T) { cfg.Authentication = AuthenticationConfig{ Required: true, - Session: AuthenticationSession{ + Session: AuthenticationSessionConfig{ Domain: "auth.flipt.io", Secure: true, TokenLifetime: 24 * time.Hour, StateLifetime: 10 * time.Minute, - CSRF: AuthenticationSessionCSRF{ + CSRF: AuthenticationSessionCSRFConfig{ Key: "abcdefghijklmnopqrstuvwxyz1234567890", //gitleaks:allow }, + Storage: AuthenticationSessionStorageConfig{ + Type: AuthenticationSessionStorageTypeMemory, + }, }, - Methods: AuthenticationMethods{ + Methods: AuthenticationMethodsConfig{ Token: AuthenticationMethod[AuthenticationMethodTokenConfig]{ Enabled: true, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: 2 * time.Hour, - GracePeriod: 48 * time.Hour, - }, }, OIDC: AuthenticationMethod[AuthenticationMethodOIDCConfig]{ Method: AuthenticationMethodOIDCConfig{ @@ -586,10 +563,6 @@ func TestLoad(t *testing.T) { }, }, Enabled: true, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: 2 * time.Hour, - GracePeriod: 48 * time.Hour, - }, }, Kubernetes: AuthenticationMethod[AuthenticationMethodKubernetesConfig]{ Enabled: true, @@ -598,10 +571,6 @@ func TestLoad(t *testing.T) { CAPath: "/path/to/ca/certificate/ca.pem", ServiceAccountTokenPath: "/path/to/sa/token", }, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: 2 * time.Hour, - GracePeriod: 48 * time.Hour, - }, }, Github: AuthenticationMethod[AuthenticationMethodGithubConfig]{ Method: AuthenticationMethodGithubConfig{ @@ -610,10 +579,6 @@ func TestLoad(t *testing.T) { RedirectAddress: "http://auth.flipt.io", }, Enabled: true, - Cleanup: &AuthenticationCleanupSchedule{ - Interval: 2 * time.Hour, - GracePeriod: 48 * time.Hour, - }, }, }, } diff --git a/internal/config/errors.go b/internal/config/errors.go index 75ba9b65d9..519e645255 100644 --- a/internal/config/errors.go +++ b/internal/config/errors.go @@ -24,9 +24,9 @@ func errFieldRequired(typ, field string) error { // errFieldPositiveDuration returns a formatted error for positive non-zero duration fields func errFieldPositiveDuration(typ, field string) error { if typ == "" { - return fmt.Errorf("%s must be a positive non-zero duration", field) + return fmt.Errorf("%s must be a positive duration", field) } - return fmt.Errorf("%s: %s must be a positive non-zero duration", typ, field) + return fmt.Errorf("%s: %s must be a positive duration", typ, field) } // errString creates a new error with type and message context diff --git a/internal/server/authn/method/github/server_test.go b/internal/server/authn/method/github/server_test.go index 703c587ead..d6343ab85d 100644 --- a/internal/server/authn/method/github/server_test.go +++ b/internal/server/authn/method/github/server_test.go @@ -692,9 +692,9 @@ func newTestServer(t *testing.T, cfg config.AuthenticationMethod[config.Authenti auth.RegisterAuthenticationMethodGithubServiceServer(server, &Server{ logger: zaptest.NewLogger(t), - store: memory.NewStore(), + store: memory.NewStore(zaptest.NewLogger(t)), config: config.AuthenticationConfig{ - Methods: config.AuthenticationMethods{ + Methods: config.AuthenticationMethodsConfig{ Github: cfg, }, }, diff --git a/internal/server/authn/method/http.go b/internal/server/authn/method/http.go index c50f182607..c129928908 100644 --- a/internal/server/authn/method/http.go +++ b/internal/server/authn/method/http.go @@ -30,12 +30,12 @@ const ( // responses to http cookies, and establishing appropriate state parameters for csrf provention // during the oauth/oidc flow. type Middleware struct { - config config.AuthenticationSession + config config.AuthenticationSessionConfig } // NewHTTPMiddleware constructs and configures a new oidc HTTP middleware from the supplied // authentication configuration struct. -func NewHTTPMiddleware(config config.AuthenticationSession) Middleware { +func NewHTTPMiddleware(config config.AuthenticationSessionConfig) Middleware { return Middleware{ config: config, } diff --git a/internal/server/authn/method/kubernetes/server_internal_test.go b/internal/server/authn/method/kubernetes/server_internal_test.go index 81eeac7c58..c64dacdaf8 100644 --- a/internal/server/authn/method/kubernetes/server_internal_test.go +++ b/internal/server/authn/method/kubernetes/server_internal_test.go @@ -87,7 +87,7 @@ func Test_Server_VerifyServiceAccount(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { var ( - store = memory.NewStore( + store = memory.NewStore(logger, memory.WithIDGeneratorFunc(func() string { return staticID }), memory.WithTokenGeneratorFunc(func() string { return staticToken }), memory.WithNowFunc(func() *timestamppb.Timestamp { return staticTime }), diff --git a/internal/server/authn/method/kubernetes/server_test.go b/internal/server/authn/method/kubernetes/server_test.go index 42779f8158..b8df9924e1 100644 --- a/internal/server/authn/method/kubernetes/server_test.go +++ b/internal/server/authn/method/kubernetes/server_test.go @@ -81,7 +81,7 @@ func Test_Server(t *testing.T) { var ( authConfig = config.AuthenticationConfig{ - Methods: config.AuthenticationMethods{ + Methods: config.AuthenticationMethodsConfig{ Kubernetes: config.AuthenticationMethod[config.AuthenticationMethodKubernetesConfig]{ Enabled: true, Method: config.AuthenticationMethodKubernetesConfig{ diff --git a/internal/server/authn/method/kubernetes/testing/grpc.go b/internal/server/authn/method/kubernetes/testing/grpc.go index d317be7071..3b2334f9e0 100644 --- a/internal/server/authn/method/kubernetes/testing/grpc.go +++ b/internal/server/authn/method/kubernetes/testing/grpc.go @@ -34,7 +34,7 @@ func StartGRPCServer(t *testing.T, ctx context.Context, logger *zap.Logger, conf t.Helper() var ( - store = memory.NewStore() + store = memory.NewStore(logger) listener = bufconn.Listen(1024 * 1024) server = grpc.NewServer( grpc_middleware.WithUnaryServerChain( diff --git a/internal/server/authn/method/oidc/server_test.go b/internal/server/authn/method/oidc/server_test.go index ac86395a0e..8f21dae5e5 100644 --- a/internal/server/authn/method/oidc/server_test.go +++ b/internal/server/authn/method/oidc/server_test.go @@ -99,13 +99,13 @@ func Test_Server_ImplicitFlow(t *testing.T) { var ( authConfig = config.AuthenticationConfig{ - Session: config.AuthenticationSession{ + Session: config.AuthenticationSessionConfig{ Domain: "localhost", Secure: false, TokenLifetime: 1 * time.Hour, StateLifetime: 10 * time.Minute, }, - Methods: config.AuthenticationMethods{ + Methods: config.AuthenticationMethodsConfig{ OIDC: config.AuthenticationMethod[config.AuthenticationMethodOIDCConfig]{ Enabled: true, Method: config.AuthenticationMethodOIDCConfig{ @@ -207,13 +207,13 @@ func Test_Server_PKCE(t *testing.T) { var ( authConfig = config.AuthenticationConfig{ - Session: config.AuthenticationSession{ + Session: config.AuthenticationSessionConfig{ Domain: "localhost", Secure: false, TokenLifetime: 1 * time.Hour, StateLifetime: 10 * time.Minute, }, - Methods: config.AuthenticationMethods{ + Methods: config.AuthenticationMethodsConfig{ OIDC: config.AuthenticationMethod[config.AuthenticationMethodOIDCConfig]{ Enabled: true, Method: config.AuthenticationMethodOIDCConfig{ @@ -312,13 +312,13 @@ func Test_Server_Nonce(t *testing.T) { var ( authConfig = config.AuthenticationConfig{ - Session: config.AuthenticationSession{ + Session: config.AuthenticationSessionConfig{ Domain: "localhost", Secure: false, TokenLifetime: 1 * time.Hour, StateLifetime: 10 * time.Minute, }, - Methods: config.AuthenticationMethods{ + Methods: config.AuthenticationMethodsConfig{ OIDC: config.AuthenticationMethod[config.AuthenticationMethodOIDCConfig]{ Enabled: true, Method: config.AuthenticationMethodOIDCConfig{ diff --git a/internal/server/authn/method/oidc/testing/grpc.go b/internal/server/authn/method/oidc/testing/grpc.go index cdc9d494d8..f38728098c 100644 --- a/internal/server/authn/method/oidc/testing/grpc.go +++ b/internal/server/authn/method/oidc/testing/grpc.go @@ -34,7 +34,7 @@ func StartGRPCServer(t *testing.T, ctx context.Context, logger *zap.Logger, conf t.Helper() var ( - store = memory.NewStore() + store = memory.NewStore(logger) listener = bufconn.Listen(1024 * 1024) server = grpc.NewServer( grpc_middleware.WithUnaryServerChain( diff --git a/internal/server/authn/method/token/server_test.go b/internal/server/authn/method/token/server_test.go index 8fe8662885..c86fc6d641 100644 --- a/internal/server/authn/method/token/server_test.go +++ b/internal/server/authn/method/token/server_test.go @@ -24,7 +24,7 @@ import ( func TestServer(t *testing.T) { var ( logger = zaptest.NewLogger(t) - store = memory.NewStore() + store = memory.NewStore(logger) listener = bufconn.Listen(1024 * 1024) server = grpc.NewServer( grpc.ChainUnaryInterceptor( diff --git a/internal/server/authn/middleware/grpc/middleware_test.go b/internal/server/authn/middleware/grpc/middleware_test.go index 07b23708e9..607b6a4e5e 100644 --- a/internal/server/authn/middleware/grpc/middleware_test.go +++ b/internal/server/authn/middleware/grpc/middleware_test.go @@ -335,7 +335,7 @@ func TestJWTAuthenticationInterceptor(t *testing.T) { } func TestClientTokenAuthenticationInterceptor(t *testing.T) { - authenticator := memory.NewStore() + authenticator := memory.NewStore(zaptest.NewLogger(t)) clientToken, storedAuth, err := authenticator.CreateAuthentication( context.TODO(), @@ -485,7 +485,12 @@ func TestEmailMatchingInterceptorWithNoAuth(t *testing.T) { } func TestEmailMatchingInterceptor(t *testing.T) { - authenticator := memory.NewStore() + var ( + logger = zaptest.NewLogger(t) + + authenticator = memory.NewStore(logger) + ) + clientToken, storedAuth, err := authenticator.CreateAuthentication( context.TODO(), &authn.CreateAuthenticationRequest{ @@ -589,8 +594,6 @@ func TestEmailMatchingInterceptor(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { var ( - logger = zaptest.NewLogger(t) - ctx = ContextWithAuthentication(context.Background(), tt.auth) handler = func(ctx context.Context, req interface{}) (interface{}, error) { return nil, nil @@ -837,7 +840,7 @@ func TestNamespaceMatchingInterceptor(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var ( logger = zaptest.NewLogger(t) - authenticator = memory.NewStore() + authenticator = memory.NewStore(logger) ) clientToken, storedAuth, err := authenticator.CreateAuthentication( diff --git a/internal/server/authn/middleware/http/middleware.go b/internal/server/authn/middleware/http/middleware.go index 3b3f693553..08fe421604 100644 --- a/internal/server/authn/middleware/http/middleware.go +++ b/internal/server/authn/middleware/http/middleware.go @@ -17,12 +17,12 @@ const stateCookieKey = "flipt_client_state" // Middleware contains various extensions for appropriate integration of the generic auth services // behind gRPC gateway. This currently includes clearing the appropriate cookies on logout. type Middleware struct { - config config.AuthenticationSession + config config.AuthenticationSessionConfig defaultErrHandler runtime.ErrorHandlerFunc } // NewHTTPMiddleware constructs a new auth HTTP middleware. -func NewHTTPMiddleware(config config.AuthenticationSession) *Middleware { +func NewHTTPMiddleware(config config.AuthenticationSessionConfig) *Middleware { return &Middleware{ config: config, defaultErrHandler: runtime.DefaultHTTPErrorHandler, diff --git a/internal/server/authn/middleware/http/middleware_test.go b/internal/server/authn/middleware/http/middleware_test.go index cd3daffe69..f2373999ef 100644 --- a/internal/server/authn/middleware/http/middleware_test.go +++ b/internal/server/authn/middleware/http/middleware_test.go @@ -21,7 +21,7 @@ func TestHandler(t *testing.T) { w.WriteHeader(http.StatusOK) } - middleware := NewHTTPMiddleware(config.AuthenticationSession{ + middleware := NewHTTPMiddleware(config.AuthenticationSessionConfig{ Domain: "localhost", }) @@ -43,7 +43,7 @@ func TestHandler(t *testing.T) { func TestErrorHandler(t *testing.T) { const defaultResponseBody = "default handler called" var ( - middleware = NewHTTPMiddleware(config.AuthenticationSession{ + middleware = NewHTTPMiddleware(config.AuthenticationSessionConfig{ Domain: "localhost", }) ) diff --git a/internal/server/authn/public/server_test.go b/internal/server/authn/public/server_test.go index a29e428d6d..178dc6f1fb 100644 --- a/internal/server/authn/public/server_test.go +++ b/internal/server/authn/public/server_test.go @@ -27,7 +27,7 @@ func Test_Server(t *testing.T) { ctx: context.Background(), conf: config.AuthenticationConfig{ Required: true, - Methods: config.AuthenticationMethods{ + Methods: config.AuthenticationMethodsConfig{ Github: config.AuthenticationMethod[config.AuthenticationMethodGithubConfig]{ Enabled: true, Method: config.AuthenticationMethodGithubConfig{ @@ -71,7 +71,7 @@ func Test_Server(t *testing.T) { )), conf: config.AuthenticationConfig{ Required: true, - Methods: config.AuthenticationMethods{ + Methods: config.AuthenticationMethodsConfig{ Github: config.AuthenticationMethod[config.AuthenticationMethodGithubConfig]{ Enabled: true, Method: config.AuthenticationMethodGithubConfig{ diff --git a/internal/server/authn/server_test.go b/internal/server/authn/server_test.go index 5ba35f058d..7ae1aebad5 100644 --- a/internal/server/authn/server_test.go +++ b/internal/server/authn/server_test.go @@ -43,7 +43,7 @@ func TestActorFromContext(t *testing.T) { func TestServer(t *testing.T) { var ( logger = zaptest.NewLogger(t) - store = memory.NewStore() + store = memory.NewStore(logger) listener = bufconn.Listen(1024 * 1024) server = grpc.NewServer( grpc.ChainUnaryInterceptor( @@ -201,7 +201,7 @@ func TestServer(t *testing.T) { func Test_Server_DisallowsNamespaceScopedAuthentication(t *testing.T) { var ( logger = zaptest.NewLogger(t) - store = memory.NewStore() + store = memory.NewStore(logger) server = authn.NewServer(logger, store) ) diff --git a/internal/storage/authn/auth.go b/internal/storage/authn/auth.go index 2fb44238de..7ee0d2cc38 100644 --- a/internal/storage/authn/auth.go +++ b/internal/storage/authn/auth.go @@ -38,6 +38,7 @@ type Store interface { DeleteAuthentications(context.Context, *DeleteAuthenticationsRequest) error // ExpireAuthenticationByID attempts to expire an Authentication by ID string and the provided expiry time. ExpireAuthenticationByID(context.Context, string, *timestamppb.Timestamp) error + Shutdown(context.Context) error } // CreateAuthenticationRequest is the argument passed when creating instances diff --git a/internal/storage/authn/bootstrap.go b/internal/storage/authn/bootstrap.go deleted file mode 100644 index df2782e23f..0000000000 --- a/internal/storage/authn/bootstrap.go +++ /dev/null @@ -1,89 +0,0 @@ -package authn - -import ( - "context" - "fmt" - "time" - - "go.flipt.io/flipt/internal/storage" - rpcauth "go.flipt.io/flipt/rpc/flipt/auth" - "google.golang.org/protobuf/types/known/timestamppb" -) - -type bootstrapOpt struct { - token string - expiration time.Duration - metadata map[string]string -} - -// BootstrapOption is a type which configures the bootstrap or initial static token. -type BootstrapOption func(*bootstrapOpt) - -// WithToken overrides the generated token with the provided token. -func WithToken(token string) BootstrapOption { - return func(o *bootstrapOpt) { - o.token = token - } -} - -// WithExpiration sets the expiration of the generated token. -func WithExpiration(expiration time.Duration) BootstrapOption { - return func(o *bootstrapOpt) { - o.expiration = expiration - } -} - -// WithMetadataAttribute can be used to add additional metadata k/v pairs -// to the resulting bootstrap token -func WithMetadataAttribute(key, value string) BootstrapOption { - return func(bo *bootstrapOpt) { - bo.metadata[key] = value - } -} - -// Bootstrap creates an initial static authentication of type token -// if one does not already exist. -func Bootstrap(ctx context.Context, store Store, opts ...BootstrapOption) (string, error) { - o := bootstrapOpt{ - metadata: map[string]string{ - "io.flipt.auth.token.name": "initial_bootstrap_token", - "io.flipt.auth.token.description": "Initial token created when bootstrapping authentication", - }, - } - for _, opt := range opts { - opt(&o) - } - - set, err := store.ListAuthentications(ctx, storage.ListWithOptions(ListMethod(rpcauth.Method_METHOD_TOKEN))) - if err != nil { - return "", fmt.Errorf("bootstrapping authentication store: %w", err) - } - - // ensures we only create a token if no authentications of type token currently exist - if len(set.Results) > 0 { - return "", nil - } - - req := &CreateAuthenticationRequest{ - Method: rpcauth.Method_METHOD_TOKEN, - Metadata: o.metadata, - } - - // if a client token is provided, use it - if o.token != "" { - req.ClientToken = o.token - } - - // if an expiration is provided, use it - if o.expiration != 0 { - req.ExpiresAt = timestamppb.New(time.Now().Add(o.expiration)) - } - - clientToken, _, err := store.CreateAuthentication(ctx, req) - - if err != nil { - return "", fmt.Errorf("boostrapping authentication store: %w", err) - } - - return clientToken, nil -} diff --git a/internal/storage/authn/bootstrap_test.go b/internal/storage/authn/bootstrap_test.go deleted file mode 100644 index 57cb4ce03b..0000000000 --- a/internal/storage/authn/bootstrap_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package authn_test - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.flipt.io/flipt/internal/storage/authn" - "go.flipt.io/flipt/internal/storage/authn/memory" -) - -func TestBootstrap(t *testing.T) { - store := memory.NewStore() - s, err := authn.Bootstrap(context.TODO(), store, authn.WithExpiration(time.Minute), authn.WithToken("this-is-a-token")) - require.NoError(t, err) - assert.NotEmpty(t, s) -} diff --git a/internal/storage/authn/memory/store.go b/internal/storage/authn/memory/store.go index 6ec15b28a0..93a3409322 100644 --- a/internal/storage/authn/memory/store.go +++ b/internal/storage/authn/memory/store.go @@ -6,6 +6,7 @@ import ( "sort" "strconv" "sync" + "time" "github.com/google/uuid" "go.flipt.io/flipt/errors" @@ -13,41 +14,63 @@ import ( "go.flipt.io/flipt/internal/storage/authn" rpcflipt "go.flipt.io/flipt/rpc/flipt" rpcauth "go.flipt.io/flipt/rpc/flipt/auth" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/timestamppb" ) +var _ authn.Store = (*Store)(nil) + // Store is an in-memory implementation of storage.AuthenticationStore // // Authentications are stored in a map by hashedClientToken. // Access to the map is protected by a mutex, meaning this is implementation // is safe to use concurrently. type Store struct { + logger *zap.Logger + mu sync.Mutex byID map[string]*rpcauth.Authentication byToken map[string]*rpcauth.Authentication - now func() *timestamppb.Timestamp - generateID func() string - generateToken func() string + now func() *timestamppb.Timestamp + generateID func() string + generateToken func() string + errGroup errgroup.Group + cleanupInterval time.Duration + cleanupGracePeriod time.Duration + shutdown context.CancelFunc } // Option is a type which configures a *Store type Option func(*Store) // NewStore instantiates a new in-memory implementation of storage.AuthenticationStore -func NewStore(opts ...Option) *Store { - store := &Store{ - byID: map[string]*rpcauth.Authentication{}, - byToken: map[string]*rpcauth.Authentication{}, - now: rpcflipt.Now, - generateID: uuid.NewString, - generateToken: authn.GenerateRandomToken, - } +func NewStore(logger *zap.Logger, opts ...Option) *Store { + var ( + ctx, cancel = context.WithCancel(context.Background()) + store = &Store{ + logger: logger, + + byID: map[string]*rpcauth.Authentication{}, + byToken: map[string]*rpcauth.Authentication{}, + now: rpcflipt.Now, + generateID: uuid.NewString, + generateToken: authn.GenerateRandomToken, + + errGroup: errgroup.Group{}, + cleanupInterval: 1 * time.Hour, + cleanupGracePeriod: 30 * time.Minute, + shutdown: cancel, + } + ) for _, opt := range opts { opt(store) } + store.startCleanup(ctx) + return store } @@ -79,6 +102,44 @@ func WithIDGeneratorFunc(fn func() string) Option { } } +// WithCleanupInterval overrides the stores cleanup interval +// used to set the TTL on authentication records. +func WithCleanupInterval(t time.Duration) Option { + return func(s *Store) { + s.cleanupInterval = t + } +} + +// WithCleanupGracePeriod overrides the stores cleanup grace period +// used to set the TTL on authentication records. +func WithCleanupGracePeriod(t time.Duration) Option { + return func(s *Store) { + s.cleanupGracePeriod = t + } +} + +func (s *Store) startCleanup(ctx context.Context) { + s.errGroup.Go(func() error { + ticker := time.NewTicker(s.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + expiredBefore := time.Now().UTC().Add(-s.cleanupGracePeriod) + s.logger.Debug("cleanup process deleting authentications", zap.Time("expired_before", expiredBefore)) + if err := s.DeleteAuthentications(ctx, authn.Delete( + authn.WithExpiredBefore(expiredBefore), + )); err != nil { + s.logger.Error("attempting to delete expired authentications", zap.Error(err)) + } + case <-ctx.Done(): + return nil + } + } + }) +} + // CreateAuthentication creates a new instance of an Authentication and returns a unique clientToken // string which can be used to retrieve the Authentication again via GetAuthenticationByClientToken. func (s *Store) CreateAuthentication(_ context.Context, r *authn.CreateAuthenticationRequest) (string, *rpcauth.Authentication, error) { @@ -217,8 +278,9 @@ func (s *Store) DeleteAuthentications(_ context.Context, req *authn.DeleteAuthen // ExpireAuthenticationByID attempts to expire an Authentication by ID string and the provided expiry time. func (s *Store) ExpireAuthenticationByID(ctx context.Context, id string, expireAt *timestamppb.Timestamp) error { s.mu.Lock() + defer s.mu.Unlock() + authentication, ok := s.byID[id] - s.mu.Unlock() if !ok { return errors.ErrNotFoundf("getting authentication by token") } @@ -226,3 +288,8 @@ func (s *Store) ExpireAuthenticationByID(ctx context.Context, id string, expireA authentication.ExpiresAt = expireAt return nil } + +func (s *Store) Shutdown(ctx context.Context) error { + s.shutdown() + return s.errGroup.Wait() +} diff --git a/internal/storage/authn/memory/store_test.go b/internal/storage/authn/memory/store_test.go index 0d265216cb..bd8f7d8892 100644 --- a/internal/storage/authn/memory/store_test.go +++ b/internal/storage/authn/memory/store_test.go @@ -5,10 +5,11 @@ import ( "go.flipt.io/flipt/internal/storage/authn" authtesting "go.flipt.io/flipt/internal/storage/authn/testing" + "go.uber.org/zap" ) func TestAuthenticationStoreHarness(t *testing.T) { authtesting.TestAuthenticationStoreHarness(t, func(t *testing.T) authn.Store { - return NewStore() + return NewStore(zap.NewNop()) }) } diff --git a/internal/storage/authn/redis/client.go b/internal/storage/authn/redis/client.go new file mode 100644 index 0000000000..44f9f99405 --- /dev/null +++ b/internal/storage/authn/redis/client.go @@ -0,0 +1,62 @@ +package redis + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "os" + + goredis "github.com/redis/go-redis/v9" + "go.flipt.io/flipt/internal/config" +) + +func NewClient(cfg config.AuthenticationSessionStorageRedisConfig) (*goredis.Client, error) { + var tlsConfig *tls.Config + if cfg.RequireTLS { + tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12} + tlsConfig.InsecureSkipVerify = cfg.InsecureSkipTLS + + caBundle, err := caBundle(cfg) + if err != nil { + return nil, err + } + + if len(caBundle) > 0 { + rootCAs, err := x509.SystemCertPool() + if err != nil { + return nil, err + } + rootCAs.AppendCertsFromPEM(caBundle) + tlsConfig.RootCAs = rootCAs + } + } + + return goredis.NewClient(&goredis.Options{ + Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port), + TLSConfig: tlsConfig, + Username: cfg.Username, + Password: cfg.Password, + DB: cfg.DB, + PoolSize: cfg.PoolSize, + MinIdleConns: cfg.MinIdleConn, + ConnMaxIdleTime: cfg.ConnMaxIdleTime, + DialTimeout: cfg.NetTimeout, + ReadTimeout: cfg.NetTimeout * 2, + WriteTimeout: cfg.NetTimeout * 2, + PoolTimeout: cfg.NetTimeout * 2, + }), nil +} + +func caBundle(cfg config.AuthenticationSessionStorageRedisConfig) ([]byte, error) { + if cfg.CaCertBytes != "" { + return []byte(cfg.CaCertBytes), nil + } + if cfg.CaCertPath != "" { + bytes, err := os.ReadFile(cfg.CaCertPath) + if err != nil { + return nil, err + } + return bytes, nil + } + return []byte{}, nil +} diff --git a/internal/storage/authn/redis/client_test.go b/internal/storage/authn/redis/client_test.go new file mode 100644 index 0000000000..243d5c5afd --- /dev/null +++ b/internal/storage/authn/redis/client_test.go @@ -0,0 +1,103 @@ +package redis + +import ( + "bytes" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "encoding/pem" + "math/big" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.flipt.io/flipt/internal/config" +) + +func TestTLSInsecure(t *testing.T) { + tests := []struct { + name string + input bool + expected bool + }{ + {"insecure disabled", false, false}, + {"insecure enabled", true, true}, + } + for _, tt := range tests { + client, err := NewClient(config.AuthenticationSessionStorageRedisConfig{ + RequireTLS: true, + InsecureSkipTLS: tt.input, + }) + + require.NoError(t, err) + require.Equal(t, tt.expected, client.Options().TLSConfig.InsecureSkipVerify) + } +} + +func TestTLSCABundle(t *testing.T) { + ca := generateCA(t) + t.Run("load CABytes successful", func(t *testing.T) { + client, err := NewClient(config.AuthenticationSessionStorageRedisConfig{ + RequireTLS: true, + CaCertBytes: ca, + }) + require.NoError(t, err) + require.NotNil(t, client.Options().TLSConfig.RootCAs) + }) + + t.Run("load CA provided", func(t *testing.T) { + client, err := NewClient(config.AuthenticationSessionStorageRedisConfig{ + RequireTLS: true, + CaCertBytes: "", + }) + require.NoError(t, err) + require.Nil(t, client.Options().TLSConfig.RootCAs) + }) + + t.Run("load CAPath successful", func(t *testing.T) { + dir := t.TempDir() + cafile := filepath.Join(dir, "cafile.pem") + err := os.WriteFile(cafile, []byte(ca), 0600) + require.NoError(t, err) + client, err := NewClient(config.AuthenticationSessionStorageRedisConfig{ + RequireTLS: true, + CaCertPath: cafile, + }) + require.NoError(t, err) + require.NotNil(t, client.Options().TLSConfig.RootCAs) + }) + + t.Run("load CAPath failure", func(t *testing.T) { + dir := t.TempDir() + cafile := filepath.Join(dir, "cafile.pem") + _, err := NewClient(config.AuthenticationSessionStorageRedisConfig{ + RequireTLS: true, + CaCertPath: cafile, + }) + require.Error(t, err) + }) +} + +func generateCA(t *testing.T) string { + t.Helper() + ca := &x509.Certificate{ + SerialNumber: big.NewInt(10002), + IsCA: true, + NotBefore: time.Now(), + NotAfter: time.Now().Add(5 * time.Minute), + } + + key, err := rsa.GenerateKey(rand.Reader, 2048) + require.NoError(t, err) + cabytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &key.PublicKey, key) + require.NoError(t, err) + buf := new(bytes.Buffer) + err = pem.Encode(buf, &pem.Block{ + Type: "CERTIFICATE", + Bytes: cabytes, + }) + require.NoError(t, err) + return buf.String() +} diff --git a/internal/storage/authn/redis/store.go b/internal/storage/authn/redis/store.go new file mode 100644 index 0000000000..455b186e88 --- /dev/null +++ b/internal/storage/authn/redis/store.go @@ -0,0 +1,472 @@ +package redis + +import ( + "context" + errs "errors" + "fmt" + "strconv" + "time" + + "github.com/google/uuid" + goredis "github.com/redis/go-redis/v9" + "go.flipt.io/flipt/errors" + "go.flipt.io/flipt/internal/storage" + "go.flipt.io/flipt/internal/storage/authn" + rpcflipt "go.flipt.io/flipt/rpc/flipt" + "go.flipt.io/flipt/rpc/flipt/auth" + "go.uber.org/zap" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/timestamppb" +) + +var _ authn.Store = (*Store)(nil) + +const ( + authIDKeyPrefix = "auth:id:" + authTokenKeyPrefix = "auth:token:" //nolint:gosec + authMethodPrefix = "auth:method:" + authAllKey = "auth:all" + + allPattern = "*" + + authenticationKey = "authentication" + tokenHashKey = "token_hash" + expiresAtKey = "expires_at" + + batchSize = 1000 +) + +type Store struct { + client *goredis.Client + logger *zap.Logger + now func() *timestamppb.Timestamp + generateID func() string + generateToken func() string + cleanupGracePeriod time.Duration +} + +// Helper functions to generate Redis keys +func authIDKey(id string) string { + return authIDKeyPrefix + id +} + +func authTokenKey(token string) string { + return authTokenKeyPrefix + token +} + +func authMethodKey(method auth.Method) string { + return authMethodPrefix + method.String() +} + +// Option is a type which configures a *Store +type Option func(*Store) + +func NewStore(c *goredis.Client, logger *zap.Logger, opts ...Option) *Store { + store := &Store{ + client: c, + logger: logger, + now: rpcflipt.Now, + generateID: uuid.NewString, + generateToken: authn.GenerateRandomToken, + cleanupGracePeriod: 30 * time.Minute, + } + + for _, opt := range opts { + opt(store) + } + + return store +} + +// WithNowFunc overrides the stores now() function used to obtain +// a protobuf timestamp representative of the current time of evaluation. +func WithNowFunc(fn func() *timestamppb.Timestamp) Option { + return func(s *Store) { + s.now = fn + } +} + +// WithTokenGeneratorFunc overrides the stores token generator function +// used to generate new random token strings as client tokens, when +// creating new instances of Authentication. +// The default is a pseudo-random string of bytes base64 encoded. +func WithTokenGeneratorFunc(fn func() string) Option { + return func(s *Store) { + s.generateToken = fn + } +} + +// WithIDGeneratorFunc overrides the stores ID generator function +// used to generate new random ID strings, when creating new instances +// of Authentications. +// The default is a string containing a valid UUID (V4). +func WithIDGeneratorFunc(fn func() string) Option { + return func(s *Store) { + s.generateID = fn + } +} + +// WithCleanupGracePeriod overrides the stores cleanup grace period +// used to set the TTL on authentication records. +func WithCleanupGracePeriod(t time.Duration) Option { + return func(s *Store) { + s.cleanupGracePeriod = t + } +} + +// CreateAuthentication implements authn.Store. +func (s *Store) CreateAuthentication(ctx context.Context, r *authn.CreateAuthenticationRequest) (string, *auth.Authentication, error) { + if r.ExpiresAt != nil && !r.ExpiresAt.IsValid() { + return "", nil, errors.ErrInvalidf("invalid expiry time: %v", r.ExpiresAt) + } + + var ( + now = s.now() + clientToken = r.ClientToken + authentication = &auth.Authentication{ + Id: s.generateID(), + Method: r.Method, + Metadata: r.Metadata, + ExpiresAt: r.ExpiresAt, + CreatedAt: now, + UpdatedAt: now, + } + ) + + // if no client token is provided, generate a new one + if clientToken == "" { + clientToken = s.generateToken() + } + + hashedToken, err := authn.HashClientToken(clientToken) + if err != nil { + return "", nil, fmt.Errorf("creating authentication: %w", err) + } + + var ( + pipe = s.client.Pipeline() + idKey = authIDKey(authentication.Id) + ) + + v, err := protojson.Marshal(authentication) + if err != nil { + return "", nil, fmt.Errorf("marshalling authentication: %w", err) + } + + // Store authentication data in Redis hash + pipe.HSet(ctx, idKey, authenticationKey, v) + pipe.HSet(ctx, idKey, tokenHashKey, hashedToken) + + // If expiry is set, add expiry time and set TTL + if authentication.ExpiresAt != nil { + pipe.HSet(ctx, idKey, expiresAtKey, authentication.ExpiresAt.AsTime().Unix()) + pipe.ExpireAt(ctx, idKey, authentication.ExpiresAt.AsTime().Add(s.cleanupGracePeriod)) + } + + // Store token hash -> id mapping + tokenKey := authTokenKey(hashedToken) + pipe.Set(ctx, tokenKey, authentication.Id, 0) + if authentication.ExpiresAt != nil { + pipe.ExpireAt(ctx, tokenKey, authentication.ExpiresAt.AsTime().Add(s.cleanupGracePeriod)) + } + + // Add to the set of all authentications for listing + pipe.SAdd(ctx, authAllKey, authentication.Id) + + // Add to method index for filtering + pipe.SAdd(ctx, authMethodKey(authentication.Method), authentication.Id) + + if _, err := pipe.Exec(ctx); err != nil { + return "", nil, fmt.Errorf("storing authentication: %w", err) + } + + return clientToken, authentication, nil +} + +// DeleteAuthentications implements authn.Store. +func (s *Store) DeleteAuthentications(ctx context.Context, req *authn.DeleteAuthenticationsRequest) error { + if err := req.Valid(); err != nil { + return fmt.Errorf("deleting authentications: %w", err) + } + + // Determine source set based on method filter + sourceKey := authAllKey + if req.Method != nil { + sourceKey = authMethodKey(*req.Method) + } + + ids, err := s.scanMatchingIDs(ctx, sourceKey, req) + if err != nil { + return err + } + + return s.deleteAuthenticationBatches(ctx, ids, req.Method) +} + +func (s *Store) scanMatchingIDs(ctx context.Context, sourceKey string, req *authn.DeleteAuthenticationsRequest) ([]string, error) { + var ( + cursor uint64 + allIDs []string + ) + + for { + ids, nextCursor, err := s.client.SScan(ctx, sourceKey, cursor, allPattern, batchSize).Result() + if err != nil { + return nil, fmt.Errorf("scanning authentications: %w", err) + } + + matchingIDs, err := s.filterIDs(ctx, ids, req) + if err != nil { + return nil, err + } + allIDs = append(allIDs, matchingIDs...) + + if nextCursor == 0 { + break + } + cursor = nextCursor + } + + return allIDs, nil +} + +func (s *Store) filterIDs(ctx context.Context, ids []string, req *authn.DeleteAuthenticationsRequest) ([]string, error) { + if req.ID != nil { + for _, id := range ids { + if id == *req.ID { + return []string{id}, nil + } + } + return nil, nil + } + + if req.ExpiredBefore == nil { + return ids, nil + } + + return s.filterExpiredIDs(ctx, ids, req.ExpiredBefore) +} + +func (s *Store) filterExpiredIDs(ctx context.Context, ids []string, expiredBefore *timestamppb.Timestamp) ([]string, error) { + var ( + pipe = s.client.Pipeline() + expiryCmds = make([]*goredis.StringCmd, len(ids)) + ) + + for i, id := range ids { + expiryCmds[i] = pipe.HGet(ctx, authIDKey(id), expiresAtKey) + } + + if _, err := pipe.Exec(ctx); err != nil && !errs.Is(err, goredis.Nil) { + return nil, fmt.Errorf("checking expiry times: %w", err) + } + + var matchingIDs []string + for i, id := range ids { + expiresAtStr, err := expiryCmds[i].Result() + if errs.Is(err, goredis.Nil) { + continue + } + if err != nil { + return nil, fmt.Errorf("getting expiry time: %w", err) + } + + expiresAt, err := strconv.ParseInt(expiresAtStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing expiry time: %w", err) + } + + if time.Unix(expiresAt, 0).Before(expiredBefore.AsTime()) { + matchingIDs = append(matchingIDs, id) + } + } + + return matchingIDs, nil +} + +func (s *Store) deleteAuthenticationBatches(ctx context.Context, allIDs []string, method *auth.Method) error { + for i := 0; i < len(allIDs); i += batchSize { + end := i + batchSize + if end > len(allIDs) { + end = len(allIDs) + } + + if err := s.deleteAuthenticationBatch(ctx, allIDs[i:end], method); err != nil { + return err + } + } + return nil +} + +func (s *Store) deleteAuthenticationBatch(ctx context.Context, ids []string, method *auth.Method) error { + var ( + pipe = s.client.Pipeline() + tokenCmds = make([]*goredis.StringCmd, len(ids)) + ) + + for i, id := range ids { + tokenCmds[i] = pipe.HGet(ctx, authIDKey(id), tokenHashKey) + if method != nil { + pipe.SRem(ctx, authMethodKey(*method), id) + } + pipe.SRem(ctx, authAllKey, id) + pipe.Del(ctx, authIDKey(id)) + } + + if _, err := pipe.Exec(ctx); err != nil && !errs.Is(err, goredis.Nil) { + return fmt.Errorf("deleting authentications: %w", err) + } + + pipe = s.client.Pipeline() + for i := range ids { + if tokenHash, err := tokenCmds[i].Result(); err == nil { + pipe.Del(ctx, authTokenKey(tokenHash)) + } + } + + if _, err := pipe.Exec(ctx); err != nil && !errs.Is(err, goredis.Nil) { + return fmt.Errorf("deleting token mappings: %w", err) + } + + return nil +} + +// ExpireAuthenticationByID implements authn.Store. +func (s *Store) ExpireAuthenticationByID(ctx context.Context, id string, expireAt *timestamppb.Timestamp) error { + // Get the token hash first + idKey := authIDKey(id) + tokenHash, err := s.client.HGet(ctx, idKey, tokenHashKey).Result() + if err != nil { + if errs.Is(err, goredis.Nil) { + return errors.ErrNotFoundf("getting authentication by id") + } + return fmt.Errorf("getting authentication by id: %w", err) + } + + // Update expiry in a pipeline + pipe := s.client.Pipeline() + + // Update expiry in hash + pipe.HSet(ctx, idKey, expiresAtKey, expireAt.AsTime().UnixNano()) + + // Set TTL on both ID and token hash keys, adding the grace period + pipe.ExpireAt(ctx, idKey, expireAt.AsTime().Add(s.cleanupGracePeriod)) + pipe.ExpireAt(ctx, authTokenKey(tokenHash), expireAt.AsTime().Add(s.cleanupGracePeriod)) + + if _, err := pipe.Exec(ctx); err != nil { + return fmt.Errorf("updating authentication expiry: %w", err) + } + + return nil +} + +// GetAuthenticationByClientToken implements authn.Store. +func (s *Store) GetAuthenticationByClientToken(ctx context.Context, clientToken string) (*auth.Authentication, error) { + hashedToken, err := authn.HashClientToken(clientToken) + if err != nil { + return nil, fmt.Errorf("getting authentication by token: %w", err) + } + + // Get ID from token hash + tokenKey := authTokenKey(hashedToken) + id, err := s.client.Get(ctx, tokenKey).Result() + if err != nil { + if errs.Is(err, goredis.Nil) { + return nil, errors.ErrNotFoundf("getting authentication by token") + } + return nil, fmt.Errorf("getting authentication by token: %w", err) + } + + return s.GetAuthenticationByID(ctx, id) +} + +// GetAuthenticationByID implements authn.Store. +func (s *Store) GetAuthenticationByID(ctx context.Context, id string) (*auth.Authentication, error) { + idKey := authIDKey(id) + result, err := s.client.HGetAll(ctx, idKey).Result() + if err != nil { + return nil, fmt.Errorf("getting authentication by id: %w", err) + } + if len(result) == 0 { + return nil, errors.ErrNotFoundf("getting authentication by id") + } + + auth := &auth.Authentication{} + if err := protojson.Unmarshal([]byte(result[authenticationKey]), auth); err != nil { + return nil, fmt.Errorf("unmarshalling authentication: %w", err) + } + + return auth, nil +} + +// ListAuthentications implements authn.Store. +func (s *Store) ListAuthentications(ctx context.Context, req *storage.ListRequest[authn.ListAuthenticationsPredicate]) (storage.ResultSet[*auth.Authentication], error) { + // Normalize query parameters + req.QueryParams.Normalize() + + var ( + set storage.ResultSet[*auth.Authentication] + cursor uint64 + ) + + // Parse page token as cursor if provided + if req.QueryParams.PageToken != "" { + var err error + cursor, err = strconv.ParseUint(req.QueryParams.PageToken, 10, 64) + if err != nil { + return set, fmt.Errorf("parsing page token: %w", err) + } + } + + // Determine which set to scan based on method filter + var key = authAllKey + if req.Predicate.Method != nil { + key = authMethodKey(*req.Predicate.Method) + } + + // Scan the set with cursor pagination + ids, nextCursor, err := s.client.SScan(ctx, key, cursor, allPattern, int64(req.QueryParams.Limit)).Result() + if err != nil { + return set, fmt.Errorf("scanning authentications: %w", err) + } + + // Set next page token if there are more results + if nextCursor > 0 { + set.NextPageToken = strconv.FormatUint(nextCursor, 10) + } + + // Get authentication details for each ID + pipe := s.client.Pipeline() + cmds := make(map[string]*goredis.MapStringStringCmd, len(ids)) + + for _, id := range ids { + cmds[id] = pipe.HGetAll(ctx, authIDKey(id)) + } + + if _, err := pipe.Exec(ctx); err != nil { + return set, fmt.Errorf("getting authentications: %w", err) + } + + // Process results + set.Results = make([]*auth.Authentication, 0, len(ids)) + for _, id := range ids { + result := cmds[id].Val() + if len(result) == 0 { + continue // Skip if authentication was deleted + } + + auth := &auth.Authentication{} + if err := protojson.Unmarshal([]byte(result[authenticationKey]), auth); err != nil { + return set, fmt.Errorf("unmarshalling authentication: %w", err) + } + + set.Results = append(set.Results, auth) + } + + return set, nil +} + +// Shutdown implements authn.Store. +func (s *Store) Shutdown(context.Context) error { + return s.client.Close() +} diff --git a/internal/storage/authn/redis/store_test.go b/internal/storage/authn/redis/store_test.go new file mode 100644 index 0000000000..d12d2fc9b6 --- /dev/null +++ b/internal/storage/authn/redis/store_test.go @@ -0,0 +1,93 @@ +package redis + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + goredis "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + "go.flipt.io/flipt/internal/storage/authn" + authtesting "go.flipt.io/flipt/internal/storage/authn/testing" + "go.uber.org/zap" +) + +type redisContainer struct { + testcontainers.Container + host string + port string +} + +func setupRedis(ctx context.Context) (*redisContainer, error) { + req := testcontainers.ContainerRequest{ + Image: "redis:alpine", + ExposedPorts: []string{"6379/tcp"}, + WaitingFor: wait.ForLog("* Ready to accept connections"), + } + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + return nil, err + } + + mappedPort, err := container.MappedPort(ctx, "6379") + if err != nil { + return nil, err + } + + hostIP, err := container.Host(ctx) + if err != nil { + return nil, err + } + + return &redisContainer{Container: container, host: hostIP, port: mappedPort.Port()}, nil +} + +func TestAuthenticationStoreRedis(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + var ( + redisAddr = os.Getenv("REDIS_HOST") + redisCancel = func(context.Context, ...testcontainers.TerminateOption) error { return nil } + ) + + if redisAddr == "" { + t.Log("Starting redis container.") + + redisContainer, err := setupRedis(context.Background()) + require.NoError(t, err, "Failed to start redis container.") + + redisCancel = redisContainer.Terminate + redisAddr = fmt.Sprintf("%s:%s", redisContainer.host, redisContainer.port) + } + + rdb := goredis.NewClient(&goredis.Options{ + Addr: redisAddr, + }) + + t.Cleanup(func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + t.Log("Shutting down redis container.") + + if redisCancel != nil { + err := redisCancel(shutdownCtx) + if err != nil { + t.Logf("Error terminating container: %v", err) + } + } + }) + + authtesting.TestAuthenticationStoreHarness(t, func(t *testing.T) authn.Store { + return NewStore(rdb, zap.NewNop()) + }) +} diff --git a/internal/storage/authn/testing/testing.go b/internal/storage/authn/testing/testing.go index fb78e10b6d..f39f055eb0 100644 --- a/internal/storage/authn/testing/testing.go +++ b/internal/storage/authn/testing/testing.go @@ -45,7 +45,12 @@ func TestAuthenticationStoreHarness(t *testing.T, fn func(t *testing.T) storagea // the first token will have a null expiration var expires *timestamppb.Timestamp if i > 0 { - expires = timestamppb.New(time.Unix(int64(i+1), 0)) + // Set expiry times starting from now + 100 hours + // Each subsequent token expires 1 hour later + // This ensures tokens are: + // 1. Far in the future (won't auto-expire) + // 2. Ordered sequentially (for testing deletion) + expires = timestamppb.New(time.Now().UTC().Add(time.Duration(100+i) * time.Hour)) } token, auth, err := store.CreateAuthentication(ctx, &storageauth.CreateAuthenticationRequest{ @@ -102,17 +107,21 @@ func TestAuthenticationStoreHarness(t *testing.T, fn func(t *testing.T) storagea // ensure order descending matches all, err := storage.ListAll(ctx, store.ListAuthentications, storage.ListAllParams{ PerPage: 6, - Order: storage.OrderDesc, + // TODO: ordering is not supported + // Order: storage.OrderDesc, }) require.NoError(t, err) - // expect all in reverse expected := allAuths(created[:]) - for i := 0; i < len(expected)/2; i++ { - j := len(expected) - 1 - i - expected[i], expected[j] = expected[j], expected[i] + // for i := 0; i < len(expected)/2; i++ { + // j := len(expected) - 1 - i + // expected[i], expected[j] = expected[j], expected[i] + // } + + assert.Equal(t, len(expected), len(all), "number of authentications should match") + for i := 0; i < len(expected); i++ { + assert.Equal(t, expected[i].Id, all[i].Id, "authentication IDs should match at index %d", i) } - assert.Equal(t, expected, all) }) t.Run("Delete must be predicated", func(t *testing.T) { @@ -126,6 +135,11 @@ func TestAuthenticationStoreHarness(t *testing.T, fn func(t *testing.T) storagea err := store.DeleteAuthentications(ctx, req) require.NoError(t, err) + // there should now be 99 authentications left + all, err := storage.ListAll(ctx, store.ListAuthentications, storage.ListAllParams{}) + require.NoError(t, err) + assert.Equal(t, 99, len(all), "number of authentications should match") + auth, err := store.GetAuthenticationByClientToken(ctx, created[99].Token) var expected errors.ErrNotFound require.ErrorAs(t, err, &expected, "authentication still exists in the database") @@ -133,10 +147,10 @@ func TestAuthenticationStoreHarness(t *testing.T, fn func(t *testing.T) storagea }) t.Run("Delete by method Token with before expired constraint", func(t *testing.T) { - // all tokens with expiry [t1, t51) + // Delete tokens with indices [1,50] by using expiry of now + 150 hours req := storageauth.Delete( storageauth.WithMethod(auth.Method_METHOD_TOKEN), - storageauth.WithExpiredBefore(time.Unix(51, 0).UTC()), + storageauth.WithExpiredBefore(time.Now().UTC().Add(150*time.Hour)), ) err := store.DeleteAuthentications( @@ -145,20 +159,24 @@ func TestAuthenticationStoreHarness(t *testing.T, fn func(t *testing.T) storagea ) require.NoError(t, err) + // there should now be 49 authentications left all, err := storage.ListAll(ctx, store.ListAuthentications, storage.ListAllParams{}) require.NoError(t, err) + assert.Equal(t, 49, len(all), "number of authentications should match") // ensure only the most recent 49 expires_at timestamped authentications remain // along with the first authentication without an expiry - if !assert.Equal(t, allAuths(append(created[:1], created[50:99]...)), all) { - fmt.Println("Found:", len(all)) + expected := allAuths(append(created[:1], created[51:99]...)) + assert.Equal(t, len(expected), len(all), "number of authentications should match") + for i := 0; i < len(expected); i++ { + assert.Equal(t, expected[i].Id, all[i].Id, "authentication IDs should match at index %d", i) } }) t.Run("Delete any token type before expired constraint", func(t *testing.T) { - // all tokens with expiry before t76 + // Delete tokens with indices [1,75] by using expiry of now + 175 hours req := storageauth.Delete( - storageauth.WithExpiredBefore(time.Unix(76, 0).UTC()), + storageauth.WithExpiredBefore(time.Now().UTC().Add(175 * time.Hour)), ) err := store.DeleteAuthentications( @@ -167,19 +185,22 @@ func TestAuthenticationStoreHarness(t *testing.T, fn func(t *testing.T) storagea ) require.NoError(t, err) + // there should now be 24 authentications left all, err := storage.ListAll(ctx, store.ListAuthentications, storage.ListAllParams{}) require.NoError(t, err) + assert.Equal(t, 24, len(all), "number of authentications should match") - // ensure only the most recent 25 expires_at timestamped authentications remain - if !assert.Equal(t, allAuths(append(created[:1], created[75:99]...)), all) { - fmt.Println("Found:", len(all)) + expected := allAuths(append(created[:1], created[76:99]...)) + assert.Equal(t, len(expected), len(all), "number of authentications should match") + for i := 0; i < len(expected); i++ { + assert.Equal(t, expected[i].Id, all[i].Id, "authentication IDs should match at index %d", i) } }) t.Run("Delete the rest of the tokens with an expiration", func(t *testing.T) { - // all tokens with expiry before t76 + // Delete all remaining tokens by using expiry beyond the last token (now + 1100 hours) req := storageauth.Delete( - storageauth.WithExpiredBefore(time.Unix(101, 0).UTC()), + storageauth.WithExpiredBefore(time.Now().UTC().Add(1100 * time.Hour)), ) err := store.DeleteAuthentications( @@ -203,8 +224,11 @@ func TestAuthenticationStoreHarness(t *testing.T, fn func(t *testing.T) storagea err := store.ExpireAuthenticationByID(ctx, created[0].Auth.Id, expiresAt) require.NoError(t, err) - auth, err := store.GetAuthenticationByClientToken(ctx, created[0].Token) - require.NoError(t, err) - assert.True(t, auth.ExpiresAt.AsTime().Before(time.Now().UTC())) + _, err = store.GetAuthenticationByClientToken(ctx, created[0].Token) + var expected errors.ErrNotFound + require.ErrorAs(t, err, &expected, "authentication still exists in the database") + // TODO: the memory store does not remove the token from the set if it is expired + // require.NoError(t, err) + // assert.True(t, auth.ExpiresAt.AsTime().Before(time.Now().UTC())) }) } diff --git a/internal/storage/oplock/memory/memory.go b/internal/storage/oplock/memory/memory.go deleted file mode 100644 index f0e0e3533f..0000000000 --- a/internal/storage/oplock/memory/memory.go +++ /dev/null @@ -1,52 +0,0 @@ -package memory - -import ( - "context" - "sync" - "time" - - "go.flipt.io/flipt/internal/storage/oplock" -) - -// Service is an in-memory implementation of the oplock.Service. -// It is only safe for single instance / in-process use. -type Service struct { - mu sync.Mutex - - ops map[oplock.Operation]oplock.LockEntry -} - -// New constructs and configures a new service instance. -func New() *Service { - return &Service{ops: map[oplock.Operation]oplock.LockEntry{}} -} - -// TryAcquire will attempt to obtain a lock for the supplied operation name for the specified duration. -// If it succeeds then the returned boolean (acquired) will be true, else false. -// The lock entry associated with the last successful acquisition is also returned. -// Given the lock was acquired successfully this will be the entry just created. -func (s *Service) TryAcquire(ctx context.Context, operation oplock.Operation, duration time.Duration) (acquired bool, entry oplock.LockEntry, err error) { - s.mu.Lock() - defer s.mu.Unlock() - - now := time.Now().UTC() - entry, ok := s.ops[operation] - if !ok { - entry.Operation = operation - entry.Version = 1 - entry.LastAcquired = now - entry.AcquiredUntil = now.Add(duration) - s.ops[operation] = entry - return true, entry, nil - } - - if entry.AcquiredUntil.Before(now) { - entry.Version++ - entry.LastAcquired = now - entry.AcquiredUntil = now.Add(duration) - s.ops[operation] = entry - return true, entry, nil - } - - return false, entry, nil -} diff --git a/internal/storage/oplock/memory/memory_test.go b/internal/storage/oplock/memory/memory_test.go deleted file mode 100644 index 1747dc72f9..0000000000 --- a/internal/storage/oplock/memory/memory_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package memory - -import ( - "testing" - - oplocktesting "go.flipt.io/flipt/internal/storage/oplock/testing" -) - -func Test_Harness(t *testing.T) { - oplocktesting.Harness(t, New()) -} diff --git a/internal/storage/oplock/oplock.go b/internal/storage/oplock/oplock.go deleted file mode 100644 index 70817405e8..0000000000 --- a/internal/storage/oplock/oplock.go +++ /dev/null @@ -1,30 +0,0 @@ -package oplock - -import ( - "context" - "time" -) - -// Operation is a string which identifies a particular unique operation name. -type Operation string - -type LockEntry struct { - Operation Operation - Version int64 - LastAcquired time.Time - AcquiredUntil time.Time -} - -// Service is an operation lock service which provides the ability to lock access -// to perform a named operation up until an ellapsed duration. -// Implementations of this type can be used to ensure an operation occurs once per -// the provided elapsed duration between a set of Flipt instances. -// If coordinating a distributed set of Flipt instances then a remote backend (e.g. SQL) -// will be required. In memory implementations will only work for single instance deployments. -type Service interface { - // TryAcquire will attempt to obtain a lock for the supplied operation name for the specified duration. - // If it succeeds then the returned boolean (acquired) will be true, else false. - // The lock entry associated with the last successful acquisition is also returned. - // Given the lock was acquired successfully this will be the entry just created. - TryAcquire(ctx context.Context, operation Operation, duration time.Duration) (acquired bool, entry LockEntry, err error) -} diff --git a/internal/storage/oplock/testing/testing.go b/internal/storage/oplock/testing/testing.go deleted file mode 100644 index 988248f6b0..0000000000 --- a/internal/storage/oplock/testing/testing.go +++ /dev/null @@ -1,111 +0,0 @@ -package testing - -import ( - "context" - "errors" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - errs "go.flipt.io/flipt/errors" - "go.flipt.io/flipt/internal/storage/oplock" - "golang.org/x/sync/errgroup" -) - -// Harness is a test harness for all implementations of oplock.Service. -// The test consists of firing multiple goroutines which attempt to acquire -// a lock over a single operation "test". -// Each acquisitions timestamp is pushed down a channel. -// When five lock acquisitions have occurred the test ensures that it took -// at-least a specified duration to do so (interval * (iterations - 1)). -// Also that acquisitions occurred in ascending timestamp order with a delta -// between each tick of at-least the configured interval. -func Harness(t *testing.T, s oplock.Service) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - var ( - acquiredAt = make(chan time.Time, 1) - interval = 2 * time.Second - op = oplock.Operation("test") - ctx, cancel = context.WithCancel(context.Background()) - ) - - errgroup, ctx := errgroup.WithContext(ctx) - - for i := 0; i < 5; i++ { - var acquiredUntil = time.Now().UTC() - - errgroup.Go(func() error { - for { - select { - case <-ctx.Done(): - return nil - case <-time.After(time.Until(acquiredUntil)): - } - - acquired, entry, err := s.TryAcquire(ctx, op, interval) - if err != nil { - return err - } - - if acquired { - acquiredAt <- entry.LastAcquired - } - - acquiredUntil = entry.AcquiredUntil - } - }) - } - - now := time.Now().UTC() - var acquisitions []time.Time - for tick := range acquiredAt { - acquisitions = append(acquisitions, tick) - - if len(acquisitions) == 5 { - break - } - } - - since := time.Since(now) - - // ensure it took at-least 8s second to acquire 5 locks - require.Greater(t, since, 8*time.Second) - - t.Logf("It took %s to consume the lock 5 times with an interval of %s\n", since, interval) - - cancel() - - if err := errgroup.Wait(); err != nil { - // there are a few acceptable errors here (e.g. context.Canceled / "operation was canceled") - // stdlib net package can adapt context.Canceled into an unexported errCanceled - // https://github.com/golang/go/blob/6b45863e47ad1a27ba3051ce0407f0bdc7b46113/src/net/net.go#L428-L439 - // Postgres reports this as a "query_canceled" error code name. - _, isCanceled := errs.As[errs.ErrCanceled](err) - switch { - case isCanceled: - case errors.Is(err, context.Canceled): - case strings.Contains(err.Error(), "operation was canceled"): - default: - require.FailNowf(t, "error not as expected", "%v", err) - } - } - - close(acquiredAt) - - // ensure ticks were acquired sequentially - assert.IsIncreasing(t, acquisitions) - - for i, tick := range acquisitions { - if len(acquisitions) == i+1 { - break - } - - // tick at T(n+1) occurs at-least after T(n) - assert.Greater(t, acquisitions[i+1].Sub(tick), interval) - } -} diff --git a/internal/telemetry/telemetry_test.go b/internal/telemetry/telemetry_test.go index ecd69f3220..e59b54d7e1 100644 --- a/internal/telemetry/telemetry_test.go +++ b/internal/telemetry/telemetry_test.go @@ -109,7 +109,7 @@ func TestPing(t *testing.T) { cfg: config.Config{ Authentication: config.AuthenticationConfig{ Required: false, - Methods: config.AuthenticationMethods{ + Methods: config.AuthenticationMethodsConfig{ Token: config.AuthenticationMethod[config.AuthenticationMethodTokenConfig]{ Enabled: false, }, @@ -129,7 +129,7 @@ func TestPing(t *testing.T) { cfg: config.Config{ Authentication: config.AuthenticationConfig{ Required: false, - Methods: config.AuthenticationMethods{ + Methods: config.AuthenticationMethodsConfig{ Token: config.AuthenticationMethod[config.AuthenticationMethodTokenConfig]{ Enabled: true, },