Skip to content

Commit

Permalink
chore(controller): use kudo config file
Browse files Browse the repository at this point in the history
  • Loading branch information
jlevesy committed Sep 16, 2022
1 parent 7a2d275 commit 31ca4f1
Showing 1 changed file with 29 additions and 42 deletions.
71 changes: 29 additions & 42 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,57 +7,44 @@ import (
"os"
"os/signal"
"syscall"
"time"

"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

"github.com/jlevesy/kudo/audit"
"github.com/jlevesy/kudo/escalation"
"github.com/jlevesy/kudo/escalationpolicy"
"github.com/jlevesy/kudo/grant"
"github.com/jlevesy/kudo/kudo"
kudov1alpha1 "github.com/jlevesy/kudo/pkg/apis/k8s.kudo.dev/v1alpha1"
"github.com/jlevesy/kudo/pkg/controllersupport"
clientset "github.com/jlevesy/kudo/pkg/generated/clientset/versioned"
"github.com/jlevesy/kudo/pkg/generated/clientset/versioned/scheme"
kudoinformers "github.com/jlevesy/kudo/pkg/generated/informers/externalversions"
"github.com/jlevesy/kudo/pkg/webhooksupport"
)

var (
masterURL string
kubeconfig string
threadiness int
resyncInterval time.Duration
retryInterval time.Duration

webhookConfig webhooksupport.ServerConfig
)

const defaultInformerResyncInterval = time.Hour
var configPath string

func main() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&webhookConfig.CertPath, "webhook_cert", "", "Path to webhook TLS cert")
flag.StringVar(&webhookConfig.KeyPath, "webhook_key", "", "Path to webhook TLS key")
flag.StringVar(&webhookConfig.Addr, "webhook_addr", ":8080", "Webhook listening address")
flag.IntVar(&threadiness, "threadiness", 10, "Amount of events processed in paralled")
flag.DurationVar(&resyncInterval, "resync_interval", 30*time.Second, "Maximum period to resync an active escalation")
flag.DurationVar(&retryInterval, "retry_interval", 10*time.Second, "Maximum period retry an escalation not fully granted/reclaimed")
flag.StringVar(&configPath, "config", "", "Path to Kudo Configuration")
klog.InitFlags(nil)
defer klog.Flush()

flag.Parse()

klog.Info("Loading Kudo controller configuration", "path", configPath)

kudoCfg, err := kudo.LoadConfigurationFromFile(configPath)
if err != nil {
klog.Fatalf("Unable to load kudo configuration: %s", err.Error())
}

klog.Info("Starting kudo controller")

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
cfg, err := clientcmd.BuildConfigFromFlags(kudoCfg.Controller.KubeMasterURL, kudoCfg.Controller.KubeConfigPath)
if err != nil {
klog.Fatalf("Unable to build kube client configuration: %s", err.Error())
}
Expand All @@ -75,15 +62,22 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
auditSink, err := audit.BuildSinkFromConfig(kudoCfg.Controller, kubeClient)
if err != nil {
klog.Fatalf("Unable to build audit sink: %s", err.Error())
}

var (
serveMux = http.NewServeMux()

kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, defaultInformerResyncInterval)
kudoInformerFactory = kudoinformers.NewSharedInformerFactory(kudoClientSet, defaultInformerResyncInterval)
kubeInformerFactory = kubeinformers.NewSharedInformerFactory(
kubeClient,
kudoCfg.Controller.InformerResyncInterval.Duration,
)
kudoInformerFactory = kudoinformers.NewSharedInformerFactory(
kudoClientSet,
kudoCfg.Controller.InformerResyncInterval.Duration,
)
escalationsInformer = kudoInformerFactory.K8s().V1alpha1().Escalations().Informer()
escalationsClient = kudoClientSet.K8sV1alpha1().Escalations()
policiesLister = kudoInformerFactory.K8s().V1alpha1().EscalationPolicies().Lister()
Expand All @@ -95,19 +89,12 @@ func main() {
policiesLister,
escalationsClient,
granterFactory,
audit.MutliAsyncSink(
audit.NewK8sEventSink(
eventBroadcaster.NewRecorder(
scheme.Scheme,
corev1.EventSource{Component: "kudo-controller"},
),
),
),
escalation.WithResyncInterval(resyncInterval),
escalation.WithRetryInterval(retryInterval),
auditSink,
escalation.WithResyncInterval(kudoCfg.Controller.ResyncInterval.Duration),
escalation.WithRetryInterval(kudoCfg.Controller.RetryInterval.Duration),
),
kudov1alpha1.KindEscalation,
threadiness,
kudoCfg.Controller.Threadiness,
)
)

Expand Down Expand Up @@ -135,7 +122,7 @@ func main() {
klog.Info("Informers warmed up, starting controller...")

group.Go(func() error {
return webhooksupport.Serve(ctx, webhookConfig, serveMux)
return webhooksupport.Serve(ctx, kudoCfg.Webhook, serveMux)
})

group.Go(func() error {
Expand Down

0 comments on commit 31ca4f1

Please sign in to comment.