diff --git a/cluster-autoscaler/loop/trigger.go b/cluster-autoscaler/loop/trigger.go index e5dd513c158b..52ef962ba1b0 100644 --- a/cluster-autoscaler/loop/trigger.go +++ b/cluster-autoscaler/loop/trigger.go @@ -43,29 +43,19 @@ type scalingTimesGetter interface { LastScaleDownDeleteTime() time.Time } -// provisioningRequestProcessingTimesGetter exposes recent provisioning request processing activity regardless of wether the -// ProvisioningRequest was marked as accepted or failed. This is because a ProvisioningRequest being processed indicates that -// there are other ProvisioningRequests that require processing regardless of the outcome of the current one. Thus, the next iteration -// should be started immediately. -type provisioningRequestProcessingTimesGetter interface { - LastProvisioningRequestProcessTime() time.Time -} - // LoopTrigger object implements criteria used to start new autoscaling iteration type LoopTrigger struct { - podObserver *UnschedulablePodObserver - scanInterval time.Duration - scalingTimesGetter scalingTimesGetter - provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter + podObserver *UnschedulablePodObserver + scanInterval time.Duration + scalingTimesGetter scalingTimesGetter } // NewLoopTrigger creates a LoopTrigger object -func NewLoopTrigger(scalingTimesGetter scalingTimesGetter, provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter, podObserver *UnschedulablePodObserver, scanInterval time.Duration) *LoopTrigger { +func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger { return &LoopTrigger{ - podObserver: podObserver, - scanInterval: scanInterval, - scalingTimesGetter: scalingTimesGetter, - provisioningRequestProcessTimeGetter: provisioningRequestProcessTimeGetter, + podObserver: podObserver, + scanInterval: scanInterval, + scalingTimesGetter: scalingTimesGetter, } } @@ -76,18 +66,14 @@ func (t *LoopTrigger) Wait(lastRun time.Time) { // To improve scale-up throughput, Cluster Autoscaler starts new iteration // immediately if the previous one was productive. - if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) { - t.logTriggerReason("Autoscaler loop triggered immediately after a scale up") - return - } - - if !t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) { - t.logTriggerReason("Autoscaler loop triggered immediately after a scale down") - return - } - - if t.provisioningRequestWasProcessed(lastRun) { - t.logTriggerReason("Autoscaler loop triggered immediately after a provisioning request was processed") + if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) || + !t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) { + select { + case <-t.podObserver.unschedulablePodChan: + klog.Info("Autoscaler loop triggered by unschedulable pod appearing") + default: + klog.Infof("Autoscaler loop triggered immediately after a productive iteration") + } return } @@ -132,20 +118,6 @@ func StartPodObserver(ctx context.Context, kubeClient kube_client.Interface) *Un } } -// logTriggerReason logs a message if the next iteration was not triggered by unschedulable pods appearing, else it logs a message that the next iteration was triggered by unschedulable pods appearing -func (t *LoopTrigger) logTriggerReason(message string) { - select { - case <-t.podObserver.unschedulablePodChan: - klog.Info("Autoscaler loop triggered by unschedulable pod appearing") - default: - klog.Infof(message) - } -} - -func (t *LoopTrigger) provisioningRequestWasProcessed(lastRun time.Time) bool { - return t.provisioningRequestProcessTimeGetter != nil && !t.provisioningRequestProcessTimeGetter.LastProvisioningRequestProcessTime().Before(lastRun) -} - // isRecentUnschedulablePod checks if the object is an unschedulable pod observed recently. func isRecentUnschedulablePod(obj any) bool { pod, ok := obj.(*apiv1.Pod) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index fbbba0d5b142..d56d581afb31 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -468,7 +468,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) { }() } -func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger, error) { +func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) { // Create basic config from flags. autoscalingOptions := createAutoscalingOptions() @@ -487,7 +487,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(informerFactory, autoscalingOptions.SchedulerConfig) if err != nil { - return nil, nil, err + return nil, err } deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) drainabilityRules := rules.Default(deleteOptions) @@ -508,14 +508,13 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets) podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker, scheduling.ScheduleAnywhere) - var ProvisioningRequestInjector *provreq.ProvisioningRequestPodsInjector if autoscalingOptions.ProvisioningRequestEnabled { podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) client, err := provreqclient.NewProvisioningRequestClient(restConfig) if err != nil { - return nil, nil, err + return nil, err } provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{ checkcapacity.New(client), @@ -526,11 +525,11 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot opts.ScaleUpOrchestrator = scaleUpOrchestrator provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker) opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) - ProvisioningRequestInjector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize) + injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize) if err != nil { - return nil, nil, err + return nil, err } - podListProcessor.AddProcessor(ProvisioningRequestInjector) + podListProcessor.AddProcessor(injector) podListProcessor.AddProcessor(provreqProcesor) } @@ -595,7 +594,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot // Create autoscaler. autoscaler, err := core.NewAutoscaler(opts, informerFactory) if err != nil { - return nil, nil, err + return nil, err } // Start informers. This must come after fully constructing the autoscaler because @@ -603,22 +602,13 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot stop := make(chan struct{}) informerFactory.Start(stop) - podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(autoscalingOptions.KubeClientOpts)) - - // A ProvisioningRequestPodsInjector is used as provisioningRequestProcessingTimesGetter here to obtain the last time a - // ProvisioningRequest was processed. This is because the ProvisioningRequestPodsInjector in addition to injecting pods - // also marks the ProvisioningRequest as accepted or failed. - trigger := loop.NewLoopTrigger(autoscaler, ProvisioningRequestInjector, podObserver, *scanInterval) - - return autoscaler, trigger, nil + return autoscaler, nil } func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) { metrics.RegisterAll(*emitPerNodeGroupMetrics) - context, cancel := ctx.WithCancel(ctx.Background()) - defer cancel() - autoscaler, trigger, err := buildAutoscaler(context, debuggingSnapshotter) + autoscaler, err := buildAutoscaler(debuggingSnapshotter) if err != nil { klog.Fatalf("Failed to create autoscaler: %v", err) } @@ -635,7 +625,11 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho } // Autoscale ad infinitum. + context, cancel := ctx.WithCancel(ctx.Background()) + defer cancel() if *frequentLoopsEnabled { + podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts)) + trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval) lastRun := time.Now() for { trigger.Wait(lastRun) diff --git a/cluster-autoscaler/processors/provreq/injector.go b/cluster-autoscaler/processors/provreq/injector.go index 70dc78965afc..5fc88380bb0a 100644 --- a/cluster-autoscaler/processors/provreq/injector.go +++ b/cluster-autoscaler/processors/provreq/injector.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest" provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" @@ -37,12 +38,11 @@ import ( // ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list. type ProvisioningRequestPodsInjector struct { - initialRetryTime time.Duration - maxBackoffTime time.Duration - backoffDuration *lru.Cache - clock clock.PassiveClock - client *provreqclient.ProvisioningRequestClient - lastProvisioningRequestProcessTime time.Time + initialRetryTime time.Duration + maxBackoffTime time.Duration + backoffDuration *lru.Cache + clock clock.PassiveClock + client *provreqclient.ProvisioningRequestClient } // IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently. @@ -78,7 +78,6 @@ func (p *ProvisioningRequestPodsInjector) MarkAsAccepted(pr *provreqwrapper.Prov klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) return err } - p.lastProvisioningRequestProcessTime = p.clock.Now() return nil } @@ -88,7 +87,6 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil { klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) } - p.lastProvisioningRequestProcessTime = p.clock.Now() } // GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it. @@ -114,7 +112,7 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( continue } - podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr) + provreqpods, err := provreqpods.PodsForProvisioningRequest(pr) if err != nil { klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name) p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error()) @@ -123,8 +121,7 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( if err := p.MarkAsAccepted(pr); err != nil { continue } - - return podsFromProvReq, nil + return provreqpods, nil } return nil, nil } @@ -154,7 +151,7 @@ func (p *ProvisioningRequestPodsInjector) Process( func (p *ProvisioningRequestPodsInjector) CleanUp() {} // NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor. -func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (*ProvisioningRequestPodsInjector, error) { +func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (pods.PodListProcessor, error) { client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) if err != nil { return nil, err @@ -165,8 +162,3 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffT func key(pr *provreqwrapper.ProvisioningRequest) string { return string(pr.UID) } - -// LastProvisioningRequestProcessTime returns the time when the last provisioning request was processed. -func (p *ProvisioningRequestPodsInjector) LastProvisioningRequestProcessTime() time.Time { - return p.lastProvisioningRequestProcessTime -} diff --git a/cluster-autoscaler/processors/provreq/injector_test.go b/cluster-autoscaler/processors/provreq/injector_test.go index 0e51026c254c..37e4e720168d 100644 --- a/cluster-autoscaler/processors/provreq/injector_test.go +++ b/cluster-autoscaler/processors/provreq/injector_test.go @@ -131,7 +131,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) backoffTime := lru.New(100) backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute) - injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now} + injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client} getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount)) if err != nil { t.Errorf("%s failed: injector.Process return error %v", tc.name, err)