diff --git a/audit/config.go b/audit/config.go new file mode 100644 index 0000000..7b93f39 --- /dev/null +++ b/audit/config.go @@ -0,0 +1,55 @@ +package audit + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + + "github.com/jlevesy/kudo/controller" + "github.com/jlevesy/kudo/pkg/apis/k8s.kudo.dev/v1alpha1" + "github.com/jlevesy/kudo/pkg/generated/clientset/versioned/scheme" +) + +const ( + K8sEventsSink = "K8sEvents" +) + +func BuildSinkFromConfig(cfg controller.AuditConfig, kubeClient kubernetes.Interface) (Sink, error) { + var sinks multiAsyncSink + + for _, sinkCfg := range cfg.Sinks { + switch sinkCfg.Kind { + case K8sEventsSink: + k8sCfg, err := v1alpha1.DecodeValueWithKind[controller.K8sEventsConfig](sinkCfg) + if err != nil { + return nil, err + } + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartStructuredLogging(0) + eventBroadcaster.StartRecordingToSink( + &typedcorev1.EventSinkImpl{ + Interface: kubeClient.CoreV1().Events(k8sCfg.Namespace), + }, + ) + + sinks = append( + sinks, + NewK8sEventSink( + eventBroadcaster.NewRecorder( + scheme.Scheme, + corev1.EventSource{Component: "kudo-controller"}, + ), + ), + ) + default: + return nil, fmt.Errorf("unsupported sink kind %q", sinkCfg.Kind) + } + + } + + return sinks, nil +} diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 930c0c9..3611785 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -7,57 +7,50 @@ import ( "os" "os/signal" "syscall" - "time" "golang.org/x/sync/errgroup" - corev1 "k8s.io/api/core/v1" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "github.com/jlevesy/kudo/audit" + "github.com/jlevesy/kudo/controller" "github.com/jlevesy/kudo/escalation" "github.com/jlevesy/kudo/escalationpolicy" "github.com/jlevesy/kudo/grant" kudov1alpha1 "github.com/jlevesy/kudo/pkg/apis/k8s.kudo.dev/v1alpha1" "github.com/jlevesy/kudo/pkg/controllersupport" clientset "github.com/jlevesy/kudo/pkg/generated/clientset/versioned" - "github.com/jlevesy/kudo/pkg/generated/clientset/versioned/scheme" kudoinformers "github.com/jlevesy/kudo/pkg/generated/informers/externalversions" "github.com/jlevesy/kudo/pkg/webhooksupport" ) var ( - masterURL string - kubeconfig string - threadiness int - resyncInterval time.Duration - retryInterval time.Duration - - webhookConfig webhooksupport.ServerConfig + configPath string + kubeConfig string + masterURL string ) -const defaultInformerResyncInterval = time.Hour - func main() { - flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&configPath, "config", "", "Path to Kudo Configuration") + flag.StringVar(&kubeConfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") - flag.StringVar(&webhookConfig.CertPath, "webhook_cert", "", "Path to webhook TLS cert") - flag.StringVar(&webhookConfig.KeyPath, "webhook_key", "", "Path to webhook TLS key") - flag.StringVar(&webhookConfig.Addr, "webhook_addr", ":8080", "Webhook listening address") - flag.IntVar(&threadiness, "threadiness", 10, "Amount of events processed in paralled") - flag.DurationVar(&resyncInterval, "resync_interval", 30*time.Second, "Maximum period to resync an active escalation") - flag.DurationVar(&retryInterval, "retry_interval", 10*time.Second, "Maximum period retry an escalation not fully granted/reclaimed") klog.InitFlags(nil) + defer klog.Flush() flag.Parse() + klog.Info("Loading Kudo controller configuration", "path", configPath) + + kudoCfg, err := controller.LoadConfigurationFromFile(configPath) + if err != nil { + klog.Fatalf("Unable to load kudo configuration: %s", err.Error()) + } + klog.Info("Starting kudo controller") - cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) + cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeConfig) if err != nil { klog.Fatalf("Unable to build kube client configuration: %s", err.Error()) } @@ -75,15 +68,22 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + auditSink, err := audit.BuildSinkFromConfig(kudoCfg.Audit, kubeClient) + if err != nil { + klog.Fatalf("Unable to build audit sink: %s", err.Error()) + } var ( serveMux = http.NewServeMux() - kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, defaultInformerResyncInterval) - kudoInformerFactory = kudoinformers.NewSharedInformerFactory(kudoClientSet, defaultInformerResyncInterval) + kubeInformerFactory = kubeinformers.NewSharedInformerFactory( + kubeClient, + kudoCfg.Controller.InformerResyncInterval.Duration, + ) + kudoInformerFactory = kudoinformers.NewSharedInformerFactory( + kudoClientSet, + kudoCfg.Controller.InformerResyncInterval.Duration, + ) escalationsInformer = kudoInformerFactory.K8s().V1alpha1().Escalations().Informer() escalationsClient = kudoClientSet.K8sV1alpha1().Escalations() policiesLister = kudoInformerFactory.K8s().V1alpha1().EscalationPolicies().Lister() @@ -95,19 +95,12 @@ func main() { policiesLister, escalationsClient, granterFactory, - audit.MutliAsyncSink( - audit.NewK8sEventSink( - eventBroadcaster.NewRecorder( - scheme.Scheme, - corev1.EventSource{Component: "kudo-controller"}, - ), - ), - ), - escalation.WithResyncInterval(resyncInterval), - escalation.WithRetryInterval(retryInterval), + auditSink, + escalation.WithResyncInterval(kudoCfg.Controller.ResyncInterval.Duration), + escalation.WithRetryInterval(kudoCfg.Controller.RetryInterval.Duration), ), kudov1alpha1.KindEscalation, - threadiness, + kudoCfg.Controller.Threadiness, ) ) @@ -135,7 +128,7 @@ func main() { klog.Info("Informers warmed up, starting controller...") group.Go(func() error { - return webhooksupport.Serve(ctx, webhookConfig, serveMux) + return webhooksupport.Serve(ctx, kudoCfg.Webhook, serveMux) }) group.Go(func() error { diff --git a/controller/config.go b/controller/config.go new file mode 100644 index 0000000..47816aa --- /dev/null +++ b/controller/config.go @@ -0,0 +1,81 @@ +package controller + +import ( + "os" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" + + "github.com/jlevesy/kudo/pkg/apis/k8s.kudo.dev/v1alpha1" +) + +var defaultConfig = Config{ + Audit: AuditConfig{ + Sinks: []v1alpha1.ValueWithKind{ + v1alpha1.MustEncodeValueWithKind( + "K8sEvents", + K8sEventsConfig{ + Namespace: "", + }, + ), + }, + }, + Controller: ControllerConfig{ + ResyncInterval: metav1.Duration{Duration: 30 * time.Second}, + RetryInterval: metav1.Duration{Duration: 10 * time.Second}, + InformerResyncInterval: metav1.Duration{Duration: 1 * time.Hour}, + Threadiness: 10, + }, + Webhook: WebhookConfig{ + CertPath: "/var/run/certs/tls.crt", + KeyPath: "/var/run/certs/tls.key", + Addr: ":8443", + ReadTimeout: metav1.Duration{ + Duration: 20 * time.Second, + }, + WriteTimeout: metav1.Duration{ + Duration: 20 * time.Second, + }, + }, +} + +type Config struct { + Audit AuditConfig `json:"audit"` + Controller ControllerConfig `json:"controller"` + Webhook WebhookConfig `json:"webhook"` +} + +type AuditConfig struct { + Sinks []v1alpha1.ValueWithKind `json:"sinks"` +} + +type ControllerConfig struct { + ResyncInterval metav1.Duration `json:"resyncInterval"` + RetryInterval metav1.Duration `json:"retryInterval"` + InformerResyncInterval metav1.Duration `json:"informerResyncInterval"` + Threadiness int `json:"threadiness"` +} + +type WebhookConfig struct { + Addr string `json:"addr"` + CertPath string `json:"certPath"` + KeyPath string `json:"keyPath"` + ReadTimeout metav1.Duration `json:"readTimeout"` + WriteTimeout metav1.Duration `json:"writeTimeout"` +} + +type K8sEventsConfig struct { + Namespace string `json:"namespace"` +} + +func LoadConfigurationFromFile(path string) (Config, error) { + bytes, err := os.ReadFile(path) + if err != nil { + return Config{}, err + } + + var cfg = defaultConfig + + return cfg, yaml.Unmarshal(bytes, &cfg) +} diff --git a/controller/config_test.go b/controller/config_test.go new file mode 100644 index 0000000..b68bb5c --- /dev/null +++ b/controller/config_test.go @@ -0,0 +1,79 @@ +package controller_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/jlevesy/kudo/controller" + "github.com/jlevesy/kudo/pkg/apis/k8s.kudo.dev/v1alpha1" +) + +func TestLoadConfig(t *testing.T) { + cfg, err := controller.LoadConfigurationFromFile("./testdata/config.yaml") + require.NoError(t, err) + + wantConfig := controller.Config{ + Audit: controller.AuditConfig{ + Sinks: []v1alpha1.ValueWithKind{ + v1alpha1.MustEncodeValueWithKind( + "K8sEvents", + controller.K8sEventsConfig{ + Namespace: "some-namespace", + }, + ), + }, + }, + Controller: controller.ControllerConfig{ + ResyncInterval: metav1.Duration{Duration: 50 * time.Second}, + RetryInterval: metav1.Duration{Duration: 10 * time.Second}, + InformerResyncInterval: metav1.Duration{Duration: 30 * time.Minute}, + Threadiness: 50, + }, + Webhook: controller.WebhookConfig{ + CertPath: "/some/path/cert.pem", + KeyPath: "/some/path/key.pem", + Addr: ":8444", + ReadTimeout: metav1.Duration{Duration: 50 * time.Second}, + WriteTimeout: metav1.Duration{Duration: 20 * time.Second}, + }, + } + + assert.Equal(t, wantConfig, cfg) +} + +func TestLoadConfig_AppliesDefaultConfig(t *testing.T) { + cfg, err := controller.LoadConfigurationFromFile("./testdata/partial_config.yaml") + require.NoError(t, err) + + wantConfig := controller.Config{ + Audit: controller.AuditConfig{ + Sinks: []v1alpha1.ValueWithKind{ + v1alpha1.MustEncodeValueWithKind( + "K8sEvents", + controller.K8sEventsConfig{ + Namespace: "some-namespace", + }, + ), + }, + }, + Controller: controller.ControllerConfig{ + ResyncInterval: metav1.Duration{Duration: 30 * time.Second}, + RetryInterval: metav1.Duration{Duration: 10 * time.Second}, + InformerResyncInterval: metav1.Duration{Duration: time.Hour}, + Threadiness: 10, + }, + Webhook: controller.WebhookConfig{ + CertPath: "/var/run/certs/tls.crt", + KeyPath: "/var/run/certs/tls.key", + Addr: ":8443", + ReadTimeout: metav1.Duration{Duration: 20 * time.Second}, + WriteTimeout: metav1.Duration{Duration: 20 * time.Second}, + }, + } + + assert.Equal(t, wantConfig, cfg) +} diff --git a/controller/testdata/config.yaml b/controller/testdata/config.yaml new file mode 100644 index 0000000..ac76edb --- /dev/null +++ b/controller/testdata/config.yaml @@ -0,0 +1,17 @@ +audit: + sinks: + - kind: K8sEvents + namespace: "some-namespace" + +controller: + resyncInterval: 50s + retryInterval: 10s + informerResyncInterval: 30m + threadiness: 50 + +webhook: + certPath: "/some/path/cert.pem" + keyPath: "/some/path/key.pem" + addr: ":8444" + readTimeout: 50s + writeTimeout: 20s diff --git a/controller/testdata/partial_config.yaml b/controller/testdata/partial_config.yaml new file mode 100644 index 0000000..0897b6a --- /dev/null +++ b/controller/testdata/partial_config.yaml @@ -0,0 +1,4 @@ +audit: + sinks: + - kind: K8sEvents + namespace: "some-namespace" diff --git a/e2e/runtime.go b/e2e/runtime.go index 1344b3c..fa22f00 100644 --- a/e2e/runtime.go +++ b/e2e/runtime.go @@ -242,8 +242,8 @@ func installKudo(ctx context.Context, kubeConfigPath string) error { "--create-namespace", "--namespace=" + kudoInstallNamespace, "--set=image.devRef=" + imageRef, - "--set=controller.resyncInterval=5s", - "--set=controller.retryInterval=1s", + "--set=config.controller.resyncInterval=5s", + "--set=config.controller.retryInterval=1s", "--set=resources.limits.cpu=1", "--set=resources.requests.cpu=1", "--wait", diff --git a/go.mod b/go.mod index 17e20f6..7340f95 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( k8s.io/cli-runtime v0.25.1 k8s.io/client-go v0.25.1 k8s.io/klog/v2 v2.80.1 + sigs.k8s.io/yaml v1.2.0 ) require ( @@ -67,5 +68,4 @@ require ( sigs.k8s.io/kustomize/api v0.12.1 // indirect sigs.k8s.io/kustomize/kyaml v0.13.9 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect - sigs.k8s.io/yaml v1.2.0 // indirect ) diff --git a/helm/templates/_helpers.tpl b/helm/templates/_helpers.tpl index c7b980c..dccb696 100644 --- a/helm/templates/_helpers.tpl +++ b/helm/templates/_helpers.tpl @@ -58,12 +58,19 @@ Create the name of the service account to use {{- end }} {{/* -Create the name of the service account to use +Create the name of the cert secret to use */}} {{- define "helm.certSecretName" -}} {{ include "helm.fullname" . }}-cert {{- end }} +{{/* +Create the name of the config configmap to use +*/}} +{{- define "helm.configConfigMapName" -}} +{{ include "helm.fullname" . }}-config +{{- end }} + {{/* Returns the name of the image according to values. Allow to override standard repository / tag by a full ref diff --git a/helm/templates/configmap.yaml b/helm/templates/configmap.yaml new file mode 100644 index 0000000..9424380 --- /dev/null +++ b/helm/templates/configmap.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ template "helm.configConfigMapName" . }} +data: +{{- with .Values.config }} + config.yaml: |- + {{- toYaml . | nindent 4 }} +{{- end }} diff --git a/helm/templates/deployment.yaml b/helm/templates/deployment.yaml index a3e25c9..e6312f7 100644 --- a/helm/templates/deployment.yaml +++ b/helm/templates/deployment.yaml @@ -31,16 +31,8 @@ spec: image: {{ include "helm.imageName" . }} imagePullPolicy: {{ .Values.image.pullPolicy }} args: - - "-webhook_cert" - - "/var/run/certs/tls.crt" - - "-webhook_key" - - "/var/run/certs/tls.key" - - "-webhook_addr" - - ":8443" - - "-resync_interval" - - {{ .Values.controller.resyncInterval | quote }} - - "-retry_interval" - - {{ .Values.controller.retryInterval | quote }} + - "-config" + - "/var/run/config/config.yaml" ports: - name: https containerPort: 8443 @@ -73,6 +65,9 @@ spec: - name: certs mountPath: /var/run/certs readOnly: true + - name: config + mountPath: /var/run/config + readOnly: true resources: {{- toYaml .Values.resources | nindent 12 }} {{- with .Values.nodeSelector }} @@ -91,3 +86,6 @@ spec: - name: certs secret: secretName: {{ template "helm.certSecretName" . }} + - name: config + configMap: + name: {{ template "helm.configConfigMapName" . }} diff --git a/helm/values.yaml b/helm/values.yaml index 3354b6b..42bea67 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -1,8 +1,36 @@ replicaCount: 1 -controller: - resyncInterval: 30s - retryInterval: 10s +# Controller configuration. +config: + # Configure audit sinks. How kudo persists user escalation activity. + audit: + # List of active sinks and their configuration. + sinks: + - kind: K8sEvents + namespace: "" + # Controller internal configuration. Defaults are usually fine. + controller: + # Maximum period between evaluations of the same escalation. + # A lower period means a more reactive system, however also means more CPU activity. + resyncInterval: 30s + # Period between two retry attempts when kudo fails to create / reclaim external resources. + retryInterval: 10s + # Period between two full state resyc. Ie when kudo reevaluates the status of all known escalations. + informerResyncInterval: 1h + # How many escalation events can be processed in parallel. + threadiness: 10 + # Webhook server configuraiton. Defaults are usually fine. + webhook: + # Address to listen to. + addr: ":8443" + # Path to the webhook certificiate + certPath: "/var/run/certs/tls.crt" + # Path to the webhook private key + keyPath: "/var/run/certs/tls.key" + # HTTP Read timeout. + readTimeout: 20s + # HTTP Write timeout. + writeTimeout: 20s image: repository: ghcr.io/jlevesy/kudo/controller diff --git a/pkg/apis/k8s.kudo.dev/v1alpha1/value.go b/pkg/apis/k8s.kudo.dev/v1alpha1/value.go index d4f1ad1..2b2246e 100644 --- a/pkg/apis/k8s.kudo.dev/v1alpha1/value.go +++ b/pkg/apis/k8s.kudo.dev/v1alpha1/value.go @@ -1,6 +1,7 @@ package v1alpha1 import ( + "bytes" "encoding/json" "errors" ) @@ -27,12 +28,7 @@ func (a *ValueWithKind) UnmarshalJSON(p []byte) error { } func (a ValueWithKind) MarshalJSON() ([]byte, error) { - if len(a.payload) == 0 { - return nil, errors.New("unexpected marshal of an empty payload") - } - - // I'm going to regret this. - return append([]byte(`{"kind":"`+a.Kind+`",`), a.payload[1:]...), nil + return a.payload, nil } func DecodeValueWithKind[V any](v ValueWithKind) (*V, error) { @@ -64,5 +60,15 @@ func EncodeValueWithKind(kind string, value any) (ValueWithKind, error) { return ValueWithKind{}, errors.New("encoding of arrays isn't supported") } - return ValueWithKind{Kind: kind, payload: p}, nil + // I'm going to regret this. + if bytes.Equal(p, []byte("{}")) { + p = []byte(`{"kind":"` + kind + `"}`) + } else { + p = append([]byte(`{"kind":"`+kind+`",`), p[1:]...) + } + + return ValueWithKind{ + Kind: kind, + payload: p, + }, nil } diff --git a/pkg/apis/k8s.kudo.dev/v1alpha1/value_test.go b/pkg/apis/k8s.kudo.dev/v1alpha1/value_test.go index 49436f4..5a2971e 100644 --- a/pkg/apis/k8s.kudo.dev/v1alpha1/value_test.go +++ b/pkg/apis/k8s.kudo.dev/v1alpha1/value_test.go @@ -33,6 +33,8 @@ func TestValueWithKind_EncodeDecode(t *testing.T) { encValue, err := v1alpha1.EncodeValueWithKind(subject.Kind, gotData) require.NoError(t, err) + assert.Equal(t, subject, encValue) + jsonBytes, err := json.Marshal(encValue) require.NoError(t, err) @@ -43,3 +45,25 @@ func TestValueWithKind_EncodeFailsOnArrays(t *testing.T) { _, err := v1alpha1.EncodeValueWithKind("wrong", []int{1, 2, 3, 4}) require.Error(t, err) } + +func TestValueWithKind_EncodeEmptyStruct(t *testing.T) { + v, err := v1alpha1.EncodeValueWithKind("empty", struct{}{}) + require.NoError(t, err) + + got, err := v1alpha1.DecodeValueWithKind[struct{}](v) + require.NoError(t, err) + + assert.Equal(t, &struct{}{}, got) +} + +func TestValueWithKind_DirectEncodeDecode(t *testing.T) { + var subject v1alpha1.ValueWithKind + + err := json.Unmarshal([]byte(rawPayload), &subject) + require.NoError(t, err) + + gotPayload, err := json.Marshal(&subject) + require.NoError(t, err) + + assert.Equal(t, string(rawPayload), string(gotPayload)) +} diff --git a/pkg/webhooksupport/serve.go b/pkg/webhooksupport/serve.go index a54e182..a9a9e65 100644 --- a/pkg/webhooksupport/serve.go +++ b/pkg/webhooksupport/serve.go @@ -6,22 +6,17 @@ import ( "net/http" "time" + "github.com/jlevesy/kudo/controller" "k8s.io/klog/v2" ) -type ServerConfig struct { - CertPath string - KeyPath string - Addr string -} - -func Serve(ctx context.Context, cfg ServerConfig, mux *http.ServeMux) error { +func Serve(ctx context.Context, cfg controller.WebhookConfig, mux *http.ServeMux) error { var ( srv = &http.Server{ Addr: cfg.Addr, Handler: mux, - ReadTimeout: 20 * time.Second, - WriteTimeout: 20 * time.Second, + ReadTimeout: cfg.ReadTimeout.Duration, + WriteTimeout: cfg.WriteTimeout.Duration, MaxHeaderBytes: 1 << 20, // 1048576 }