diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 930c0c9..e25a081 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -7,57 +7,44 @@ import ( "os" "os/signal" "syscall" - "time" "golang.org/x/sync/errgroup" - corev1 "k8s.io/api/core/v1" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "github.com/jlevesy/kudo/audit" "github.com/jlevesy/kudo/escalation" "github.com/jlevesy/kudo/escalationpolicy" "github.com/jlevesy/kudo/grant" + "github.com/jlevesy/kudo/kudo" kudov1alpha1 "github.com/jlevesy/kudo/pkg/apis/k8s.kudo.dev/v1alpha1" "github.com/jlevesy/kudo/pkg/controllersupport" clientset "github.com/jlevesy/kudo/pkg/generated/clientset/versioned" - "github.com/jlevesy/kudo/pkg/generated/clientset/versioned/scheme" kudoinformers "github.com/jlevesy/kudo/pkg/generated/informers/externalversions" "github.com/jlevesy/kudo/pkg/webhooksupport" ) -var ( - masterURL string - kubeconfig string - threadiness int - resyncInterval time.Duration - retryInterval time.Duration - - webhookConfig webhooksupport.ServerConfig -) - -const defaultInformerResyncInterval = time.Hour +var configPath string func main() { - flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") - flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") - flag.StringVar(&webhookConfig.CertPath, "webhook_cert", "", "Path to webhook TLS cert") - flag.StringVar(&webhookConfig.KeyPath, "webhook_key", "", "Path to webhook TLS key") - flag.StringVar(&webhookConfig.Addr, "webhook_addr", ":8080", "Webhook listening address") - flag.IntVar(&threadiness, "threadiness", 10, "Amount of events processed in paralled") - flag.DurationVar(&resyncInterval, "resync_interval", 30*time.Second, "Maximum period to resync an active escalation") - flag.DurationVar(&retryInterval, "retry_interval", 10*time.Second, "Maximum period retry an escalation not fully granted/reclaimed") + flag.StringVar(&configPath, "config", "", "Path to Kudo Configuration") klog.InitFlags(nil) + defer klog.Flush() flag.Parse() + klog.Info("Loading Kudo controller configuration", "path", configPath) + + kudoCfg, err := kudo.LoadConfigurationFromFile(configPath) + if err != nil { + klog.Fatalf("Unable to load kudo configuration: %s", err.Error()) + } + klog.Info("Starting kudo controller") - cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) + cfg, err := clientcmd.BuildConfigFromFlags(kudoCfg.Controller.KubeMasterURL, kudoCfg.Controller.KubeConfigPath) if err != nil { klog.Fatalf("Unable to build kube client configuration: %s", err.Error()) } @@ -75,15 +62,22 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + auditSink, err := audit.BuildSinkFromConfig(kudoCfg.Controller, kubeClient) + if err != nil { + klog.Fatalf("Unable to build audit sink: %s", err.Error()) + } var ( serveMux = http.NewServeMux() - kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, defaultInformerResyncInterval) - kudoInformerFactory = kudoinformers.NewSharedInformerFactory(kudoClientSet, defaultInformerResyncInterval) + kubeInformerFactory = kubeinformers.NewSharedInformerFactory( + kubeClient, + kudoCfg.Controller.InformerResyncInterval.Duration, + ) + kudoInformerFactory = kudoinformers.NewSharedInformerFactory( + kudoClientSet, + kudoCfg.Controller.InformerResyncInterval.Duration, + ) escalationsInformer = kudoInformerFactory.K8s().V1alpha1().Escalations().Informer() escalationsClient = kudoClientSet.K8sV1alpha1().Escalations() policiesLister = kudoInformerFactory.K8s().V1alpha1().EscalationPolicies().Lister() @@ -95,19 +89,12 @@ func main() { policiesLister, escalationsClient, granterFactory, - audit.MutliAsyncSink( - audit.NewK8sEventSink( - eventBroadcaster.NewRecorder( - scheme.Scheme, - corev1.EventSource{Component: "kudo-controller"}, - ), - ), - ), - escalation.WithResyncInterval(resyncInterval), - escalation.WithRetryInterval(retryInterval), + auditSink, + escalation.WithResyncInterval(kudoCfg.Controller.ResyncInterval.Duration), + escalation.WithRetryInterval(kudoCfg.Controller.RetryInterval.Duration), ), kudov1alpha1.KindEscalation, - threadiness, + kudoCfg.Controller.Threadiness, ) ) @@ -135,7 +122,7 @@ func main() { klog.Info("Informers warmed up, starting controller...") group.Go(func() error { - return webhooksupport.Serve(ctx, webhookConfig, serveMux) + return webhooksupport.Serve(ctx, kudoCfg.Webhook, serveMux) }) group.Go(func() error {