Skip to content

Commit

Permalink
Added controller and autoscaler into this extension (#10)
Browse files Browse the repository at this point in the history
* Added controller and autoscaler into this extension

* Mofidy the go.mod
  • Loading branch information
houshengbo authored Oct 24, 2023
1 parent 86d00ab commit d6b223f
Show file tree
Hide file tree
Showing 1,635 changed files with 249,065 additions and 14,551 deletions.
102 changes: 102 additions & 0 deletions cmd/autoscaler/leaderelection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2023 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"

"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/controller"
"knative.dev/pkg/leaderelection"
"knative.dev/pkg/reconciler"
)

type leaderAwareReconciler interface {
reconciler.LeaderAware
controller.Reconciler
}

// leaderAware is intended to wrap a controller.Reconciler in order to disable
// leader election. It accomplishes this by not implementing the reconciler.LeaderAware
// interface. Bucket promotion/demotion needs to be done manually
//
// The controller's reconciler needs to be set prior to calling Run
type leaderAware struct {
reconciler leaderAwareReconciler
enqueue func(bkt reconciler.Bucket, key types.NamespacedName)
}

func (l *leaderAware) Reconcile(ctx context.Context, key string) error {
return l.reconciler.Reconcile(ctx, key)
}

func setupSharedElector(ctx context.Context, controllers []*controller.Impl) (leaderelection.Elector, error) {
reconcilers := make([]*leaderAware, 0, len(controllers))

for _, c := range controllers {
if r, ok := c.Reconciler.(leaderAwareReconciler); ok {
la := &leaderAware{reconciler: r, enqueue: c.MaybeEnqueueBucketKey}
// rewire the controller's reconciler so it isn't leader aware
// this prevents the universal bucket from being promoted
c.Reconciler = la
reconcilers = append(reconcilers, la)
}
}

// the elector component config on the ctx will override the queueName
// value so we leave this empty
queueName := ""

// this is a noop function since we will use each controller's
// MaybeEnqueueBucketKey when we promote buckets
noopEnqueue := func(reconciler.Bucket, types.NamespacedName) {}

el, err := leaderelection.BuildElector(ctx, coalesce(reconcilers), queueName, noopEnqueue)

if err != nil {
return nil, err
}

electorWithBuckets, ok := el.(leaderelection.ElectorWithInitialBuckets)
if !ok || len(electorWithBuckets.InitialBuckets()) == 0 {
return el, nil
}

for _, r := range reconcilers {
for _, b := range electorWithBuckets.InitialBuckets() {
// Controller hasn't started yet so need to enqueue anything
r.reconciler.Promote(b, nil)
}
}
return el, nil
}

func coalesce(reconcilers []*leaderAware) reconciler.LeaderAware {
return &reconciler.LeaderAwareFuncs{
PromoteFunc: func(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
for _, r := range reconcilers {
r.reconciler.Promote(b, r.enqueue)
}
return nil
},
DemoteFunc: func(b reconciler.Bucket) {
for _, r := range reconcilers {
r.reconciler.Demote(b)
}
},
}
}
295 changes: 295 additions & 0 deletions cmd/autoscaler/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
/*
Copyright 2023 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Multitenant autoscaler executable.
package main

import (
"context"
"errors"
"fmt"
"log"
"net/http"
"time"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
kubeclient "knative.dev/pkg/client/injection/kube/client"

netcfg "knative.dev/networking/pkg/config"
filteredpodinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
filteredinformerfactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
configmap "knative.dev/pkg/configmap/informer"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/leaderelection"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/profiling"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
"knative.dev/pkg/version"
autoscalingv1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1"
"knative.dev/serving/pkg/apis/serving"
"knative.dev/serving/pkg/autoscaler/bucket"
asmetrics "knative.dev/serving/pkg/autoscaler/metrics"
"knative.dev/serving/pkg/autoscaler/scaling"
"knative.dev/serving/pkg/autoscaler/statforwarder"
"knative.dev/serving/pkg/autoscaler/statserver"
smetrics "knative.dev/serving/pkg/metrics"
"knative.dev/serving/pkg/reconciler/autoscaling/kpa"
"knative.dev/serving/pkg/reconciler/metric"
"knative.dev/serving/pkg/resources"
)

const (
statsServerAddr = ":8080"
statsBufferLen = 1000
component = "autoscaler"
controllerNum = 2
)

func main() {
// Set up signals so we handle the first shutdown signal gracefully.
ctx := signals.NewContext()

// Report stats on Go memory usage every 30 seconds.
metrics.MemStatsOrDie(ctx)

cfg := injection.ParseAndGetRESTConfigOrDie()

log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))
log.Printf("Registering %d filtered informers", len(injection.Default.GetFilteredInformers()))
log.Printf("Registering %d controllers", controllerNum)

// Adjust our client's rate limits based on the number of controller's we are running.
cfg.QPS = controllerNum * rest.DefaultQPS
cfg.Burst = controllerNum * rest.DefaultBurst
ctx = filteredinformerfactory.WithSelectors(ctx, serving.RevisionUID)
ctx, informers := injection.Default.SetupInformers(ctx, cfg)

kubeClient := kubeclient.Get(ctx)

// We sometimes startup faster than we can reach kube-api. Poll on failure to prevent us terminating
var err error
if perr := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(context.Context) (bool, error) {
if err = version.CheckMinimumVersion(kubeClient.Discovery()); err != nil {
log.Print("Failed to get k8s version ", err)
}
return err == nil, nil
}); perr != nil {
log.Fatal("Timed out attempting to get k8s version: ", err)
}

// Set up our logger.
loggingConfig, err := sharedmain.GetLoggingConfig(ctx)
if err != nil {
log.Fatal("Error loading/parsing logging configuration: ", err)
}
logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, component)
defer flush(logger)
ctx = logging.WithLogger(ctx, logger)

// statsCh is the main communication channel between the stats server and multiscaler.
statsCh := make(chan asmetrics.StatMessage, statsBufferLen)
defer close(statsCh)

profilingHandler := profiling.NewHandler(logger, false)

cmw := configmap.NewInformedWatcher(kubeclient.Get(ctx), system.Namespace())
// Watch the logging config map and dynamically update logging levels.
cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
// Watch the observability config map
cmw.Watch(metrics.ConfigMapName(),
metrics.ConfigMapWatcher(ctx, component, nil /* SecretFetcher */, logger),
profilingHandler.UpdateFromConfigMap)

podLister := filteredpodinformer.Get(ctx, serving.RevisionUID).Lister()
networkCM, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, netcfg.ConfigMapName, metav1.GetOptions{})
if err != nil {
logger.Fatalw("Failed to fetch network config", zap.Error(err))
}
networkConfig, err := netcfg.NewConfigFromMap(networkCM.Data)
if err != nil {
logger.Fatalw("Failed to construct network config", zap.Error(err))
}

collector := asmetrics.NewMetricCollector(
statsScraperFactoryFunc(podLister, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode), logger)

// Set up scalers.
multiScaler := scaling.NewMultiScaler(ctx.Done(),
uniScalerFactoryFunc(podLister, collector), logger)

controllers := []*controller.Impl{
kpa.NewController(ctx, cmw, multiScaler),
metric.NewController(ctx, cmw, collector),
}

// Start watching the configs.
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start watching configs", zap.Error(err))
}

// Start all of the informers and wait for them to sync.
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logger.Fatalw("Failed to start informers", zap.Error(err))
}

// accept is the func to call when this pod owns the Revision for this StatMessage.
accept := func(sm asmetrics.StatMessage) {
collector.Record(sm.Key, time.Unix(sm.Stat.Timestamp, 0), sm.Stat)
multiScaler.Poke(sm.Key, sm.Stat)
}

cc := componentConfigAndIP(ctx)

// We don't want an elector on the controller context
// since they will be sharing an elector
var electorCtx context.Context

var f *statforwarder.Forwarder
if b, bs, err := leaderelection.NewStatefulSetBucketAndSet(int(cc.Buckets)); err == nil {
logger.Info("Running with StatefulSet leader election")
electorCtx = leaderelection.WithStatefulSetElectorBuilder(ctx, cc, b)
f = statforwarder.New(ctx, bs)
if err := statforwarder.StatefulSetBasedProcessor(ctx, f, accept); err != nil {
logger.Fatalw("Failed to set up statefulset processors", zap.Error(err))
}
} else {
logger.Info("Running with Standard leader election")
electorCtx = leaderelection.WithStandardLeaderElectorBuilder(ctx, kubeClient, cc)
f = statforwarder.New(ctx, bucket.AutoscalerBucketSet(cc.Buckets))
if err := statforwarder.LeaseBasedProcessor(ctx, f, accept); err != nil {
logger.Fatalw("Failed to set up lease tracking", zap.Error(err))
}
}

elector, err := setupSharedElector(electorCtx, controllers)
if err != nil {
logger.Fatalw("Failed to setup elector", zap.Error(err))
}

// Set up a statserver.
statsServer := statserver.New(statsServerAddr, statsCh, logger, f.IsBucketOwner)
defer f.Cancel()

go func() {
for sm := range statsCh {
// Set the timestamp when first receiving the stat.
if sm.Stat.Timestamp == 0 {
sm.Stat.Timestamp = time.Now().Unix()
}
f.Process(sm)
}
}()

profilingServer := profiling.NewServer(profilingHandler)

eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
elector.Run(egCtx)
return nil
})
eg.Go(statsServer.ListenAndServe)
eg.Go(profilingServer.ListenAndServe)
eg.Go(func() error {
return controller.StartAll(egCtx, controllers...)
})

// This will block until either a signal arrives or one of the grouped functions
// returns an error.
<-egCtx.Done()

statsServer.Shutdown(5 * time.Second)
profilingServer.Shutdown(context.Background())
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := eg.Wait(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Errorw("Error while running server", zap.Error(err))
}
}

func uniScalerFactoryFunc(podLister corev1listers.PodLister,
metricClient asmetrics.MetricClient) scaling.UniScalerFactory {
return func(decider *scaling.Decider) (scaling.UniScaler, error) {
configName := decider.Labels[serving.ConfigurationLabelKey]
if configName == "" {
return nil, fmt.Errorf("label %q not found or empty in Decider %s", serving.ConfigurationLabelKey, decider.Name)
}
revisionName := decider.Labels[serving.RevisionLabelKey]
if revisionName == "" {
return nil, fmt.Errorf("label %q not found or empty in Decider %s", serving.RevisionLabelKey, decider.Name)
}
serviceName := decider.Labels[serving.ServiceLabelKey] // This can be empty.

// Create a stats reporter which tags statistics by PA namespace, configuration name, and PA name.
ctx := smetrics.RevisionContext(decider.Namespace, serviceName, configName, revisionName)

podAccessor := resources.NewPodAccessor(podLister, decider.Namespace, revisionName)
return scaling.New(ctx, decider.Namespace, decider.Name, metricClient,
podAccessor, &decider.Spec), nil
}
}

func statsScraperFactoryFunc(podLister corev1listers.PodLister, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode) asmetrics.StatsScraperFactory {
return func(metric *autoscalingv1alpha1.Metric, logger *zap.SugaredLogger) (asmetrics.StatsScraper, error) {
if metric.Spec.ScrapeTarget == "" {
return nil, nil
}

revisionName := metric.Labels[serving.RevisionLabelKey]
if revisionName == "" {
return nil, fmt.Errorf("label %q not found or empty in Metric %s", serving.RevisionLabelKey, metric.Name)
}

podAccessor := resources.NewPodAccessor(podLister, metric.Namespace, revisionName)
return asmetrics.NewStatsScraper(metric, revisionName, podAccessor, usePassthroughLb, meshMode, logger), nil
}
}

func flush(logger *zap.SugaredLogger) {
logger.Sync()
metrics.FlushExporter()
}

func componentConfigAndIP(ctx context.Context) leaderelection.ComponentConfig {
id, err := bucket.Identity()
if err != nil {
logging.FromContext(ctx).Fatalw("Failed to generate Lease holder identity", zap.Error(err))
}

// Set up leader election config
leaderElectionConfig, err := sharedmain.GetLeaderElectionConfig(ctx)
if err != nil {
logging.FromContext(ctx).Fatalw("Error loading leader election configuration", zap.Error(err))
}

cc := leaderElectionConfig.GetComponentConfig(component)
cc.LeaseName = func(i uint32) string {
return bucket.AutoscalerBucketName(i, cc.Buckets)
}
cc.Identity = id

return cc
}
Loading

0 comments on commit d6b223f

Please sign in to comment.