From 47b6e50f7762c8d03018cd0fc540fdff8e03a261 Mon Sep 17 00:00:00 2001 From: chaosi-zju Date: Sun, 28 Apr 2024 09:42:06 +0800 Subject: [PATCH] support auto delete WorkloadRebalancer when time up Signed-off-by: chaosi-zju --- api/openapi-spec/swagger.json | 14 ++ .../apps.karmada.io_workloadrebalancers.yaml | 23 +++ .../app/controllermanager.go | 5 +- .../apps/v1alpha1/workloadrebalancer_types.go | 20 ++- .../apps/v1alpha1/zz_generated.deepcopy.go | 9 ++ pkg/controllers/context/context.go | 2 + .../ttlafterfinished_worker.go | 141 ++++++++++++++++++ .../workloadrebalancer_controller.go | 27 +++- pkg/generated/openapi/zz_generated.openapi.go | 22 ++- 9 files changed, 257 insertions(+), 6 deletions(-) create mode 100644 pkg/controllers/workloadrebalancer/ttlafterfinished_worker.go diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 363e317c6b19..4b6229649b56 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -16759,6 +16759,11 @@ "workloads" ], "properties": { + "ttlMinutesAfterFinished": { + "description": "ttlMinutesAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each target workload is finished with result of Successful or Failed). If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted. If this field is unset, the WorkloadRebalancer won't be automatically deleted. If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes.", + "type": "integer", + "format": "int32" + }, "workloads": { "description": "Workloads used to specify the list of expected resource. Nil or empty list is not allowed.", "type": "array", @@ -16773,6 +16778,15 @@ "description": "WorkloadRebalancerStatus contains information about the current status of a WorkloadRebalancer updated periodically by schedule trigger controller.", "type": "object", "properties": { + "lastUpdateTime": { + "description": "LastUpdateTime represents the last update time of any field in WorkloadRebalancerStatus other than itself. optional", + "$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Time" + }, + "observedGeneration": { + "description": "ObservedGeneration is the generation(.metadata.generation) observed by the controller. If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed the rebalance result or hasn't done the rebalance yet. optional", + "type": "integer", + "format": "int64" + }, "observedWorkloads": { "description": "ObservedWorkloads contains information about the execution states and messages of target resources.", "type": "array", diff --git a/charts/karmada/_crds/bases/apps/apps.karmada.io_workloadrebalancers.yaml b/charts/karmada/_crds/bases/apps/apps.karmada.io_workloadrebalancers.yaml index db7ee0a95b39..e657c0a7f5fe 100644 --- a/charts/karmada/_crds/bases/apps/apps.karmada.io_workloadrebalancers.yaml +++ b/charts/karmada/_crds/bases/apps/apps.karmada.io_workloadrebalancers.yaml @@ -41,6 +41,15 @@ spec: description: Spec represents the specification of the desired behavior of WorkloadRebalancer. properties: + ttlMinutesAfterFinished: + description: |- + ttlMinutesAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each + target workload is finished with result of Successful or Failed). + If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted. + If this field is unset, the WorkloadRebalancer won't be automatically deleted. + If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes. + format: int32 + type: integer workloads: description: |- Workloads used to specify the list of expected resource. @@ -76,6 +85,20 @@ spec: status: description: Status represents the status of WorkloadRebalancer. properties: + lastUpdateTime: + description: |- + LastUpdateTime represents the last update time of any field in WorkloadRebalancerStatus other than itself. + optional + format: date-time + type: string + observedGeneration: + description: |- + ObservedGeneration is the generation(.metadata.generation) observed by the controller. + If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed + the rebalance result or hasn't done the rebalance yet. + optional + format: int64 + type: integer observedWorkloads: description: ObservedWorkloads contains information about the execution states and messages of target resources. diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index d270804c6afe..20b059be0b68 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -711,7 +711,8 @@ func startRemedyController(ctx controllerscontext.Context) (enabled bool, err er func startWorkloadRebalancerController(ctx controllerscontext.Context) (enabled bool, err error) { workloadRebalancer := workloadrebalancer.RebalancerController{ - Client: ctx.Mgr.GetClient(), + Client: ctx.Mgr.GetClient(), + ControlPlaneClient: ctx.ControlPlaneClient, } err = workloadRebalancer.SetupWithManager(ctx.Mgr) if err != nil { @@ -727,6 +728,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop dynamicClientSet := dynamic.NewForConfigOrDie(restConfig) discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig) kubeClientSet := kubeclientset.NewForConfigOrDie(restConfig) + controlPlaneClient := gclient.NewForConfigOrDie(restConfig) overrideManager := overridemanager.New(mgr.GetClient(), mgr.GetEventRecorderFor(overridemanager.OverrideManagerName)) skippedResourceConfig := util.NewSkippedResourceConfig() @@ -812,6 +814,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop StopChan: stopChan, DynamicClientSet: dynamicClientSet, KubeClientSet: kubeClientSet, + ControlPlaneClient: controlPlaneClient, OverrideManager: overrideManager, ControlPlaneInformerManager: controlPlaneInformerManager, ResourceInterpreter: resourceInterpreter, diff --git a/pkg/apis/apps/v1alpha1/workloadrebalancer_types.go b/pkg/apis/apps/v1alpha1/workloadrebalancer_types.go index 42554865f9b6..ac12cd33dd36 100644 --- a/pkg/apis/apps/v1alpha1/workloadrebalancer_types.go +++ b/pkg/apis/apps/v1alpha1/workloadrebalancer_types.go @@ -57,6 +57,14 @@ type WorkloadRebalancerSpec struct { // +kubebuilder:validation:MinItems=1 // +required Workloads []ObjectReference `json:"workloads"` + + // ttlMinutesAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each + // target workload is finished with result of Successful or Failed). + // If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted. + // If this field is unset, the WorkloadRebalancer won't be automatically deleted. + // If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes. + // +optional + TTLMinutesAfterFinished *int32 `json:"ttlMinutesAfterFinished,omitempty"` } // ObjectReference the expected resource. @@ -85,6 +93,16 @@ type WorkloadRebalancerStatus struct { // ObservedWorkloads contains information about the execution states and messages of target resources. // +optional ObservedWorkloads []ObservedWorkload `json:"observedWorkloads,omitempty"` + + // ObservedGeneration is the generation(.metadata.generation) observed by the controller. + // If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed + // the rebalance result or hasn't done the rebalance yet. + // optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + + // LastUpdateTime represents the last update time of any field in WorkloadRebalancerStatus other than itself. + // optional + LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` } // ObservedWorkload the observed resource. @@ -117,7 +135,7 @@ type RebalanceFailedReason string const ( // RebalanceObjectNotFound the resource referenced binding not found. - RebalanceObjectNotFound RebalanceFailedReason = "NotFound" + RebalanceObjectNotFound RebalanceFailedReason = "ReferencedBindingNotFound" ) // +kubebuilder:resource:scope="Cluster" diff --git a/pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go index 04953a1cf84c..fc3e24a8888b 100644 --- a/pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -127,6 +127,11 @@ func (in *WorkloadRebalancerSpec) DeepCopyInto(out *WorkloadRebalancerSpec) { *out = make([]ObjectReference, len(*in)) copy(*out, *in) } + if in.TTLMinutesAfterFinished != nil { + in, out := &in.TTLMinutesAfterFinished, &out.TTLMinutesAfterFinished + *out = new(int32) + **out = **in + } return } @@ -148,6 +153,10 @@ func (in *WorkloadRebalancerStatus) DeepCopyInto(out *WorkloadRebalancerStatus) *out = make([]ObservedWorkload, len(*in)) copy(*out, *in) } + if in.LastUpdateTime != nil { + in, out := &in.LastUpdateTime, &out.LastUpdateTime + *out = (*in).DeepCopy() + } return } diff --git a/pkg/controllers/context/context.go b/pkg/controllers/context/context.go index d07274a85aa4..b64053519a83 100644 --- a/pkg/controllers/context/context.go +++ b/pkg/controllers/context/context.go @@ -26,6 +26,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config" "github.com/karmada-io/karmada/pkg/resourceinterpreter" @@ -109,6 +110,7 @@ type Context struct { StopChan <-chan struct{} DynamicClientSet dynamic.Interface KubeClientSet clientset.Interface + ControlPlaneClient client.Client OverrideManager overridemanager.OverrideManager ControlPlaneInformerManager genericmanager.SingleClusterInformerManager ResourceInterpreter resourceinterpreter.ResourceInterpreter diff --git a/pkg/controllers/workloadrebalancer/ttlafterfinished_worker.go b/pkg/controllers/workloadrebalancer/ttlafterfinished_worker.go new file mode 100644 index 000000000000..8b156b7c8937 --- /dev/null +++ b/pkg/controllers/workloadrebalancer/ttlafterfinished_worker.go @@ -0,0 +1,141 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloadrebalancer + +import ( + "context" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + appsv1alpha1 "github.com/karmada-io/karmada/pkg/apis/apps/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" +) + +func (c *RebalancerController) deleteExpiredRebalancer(key util.QueueKey) error { + objKey, ok := key.(client.ObjectKey) + if !ok { + klog.Errorf("Invalid object key: %+v", key) + return nil + } + klog.V(4).Infof("Checking if WorkloadRebalancer(%s) is ready for cleanup", objKey.Name) + + // 1. Get WorkloadRebalancer from cache. + rebalancer := &appsv1alpha1.WorkloadRebalancer{} + if err := c.Client.Get(context.TODO(), objKey, rebalancer); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + // 2. Use the WorkloadRebalancer from cache to see if the TTL expires. + if expiredAt, err := c.processTTL(rebalancer); err != nil { + return err + } else if expiredAt == nil { + return nil + } + + // 3. The WorkloadRebalancer's TTL is assumed to have expired, but the WorkloadRebalancer TTL might be stale. + // Before deleting the WorkloadRebalancer, do a final sanity check. + // If TTL is modified before we do this check, we cannot be sure if the TTL truly expires. + fresh := &appsv1alpha1.WorkloadRebalancer{} + if err := c.ControlPlaneClient.Get(context.TODO(), objKey, fresh); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + // 4. Use the latest WorkloadRebalancer directly from api server to see if the TTL truly expires. + if expiredAt, err := c.processTTL(fresh); err != nil { + return err + } else if expiredAt == nil { + return nil + } + + // 5. deletes the WorkloadRebalancer if TTL truly expires. + options := &client.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &fresh.ResourceVersion}} + if err := c.ControlPlaneClient.Delete(context.TODO(), fresh, options); err != nil { + klog.Errorf("Cleaning up WorkloadRebalancer(%s) failed: %+v", fresh.Name, err) + return err + } + klog.V(4).Infof("Cleaning up WorkloadRebalancer(%s) successful.", fresh.Name) + + return nil +} + +// processTTL checks whether a given WorkloadRebalancer's TTL has expired, and add it to the queue after +// the TTL is expected to expire if the TTL will expire later. +func (c *RebalancerController) processTTL(r *appsv1alpha1.WorkloadRebalancer) (expiredAt *time.Time, err error) { + if !needsCleanup(r) { + return nil, nil + } + + remainingTTL, expireAt, err := timeLeft(r) + if err != nil { + return nil, err + } + + // TTL has expired + if *remainingTTL <= 0 { + return expireAt, nil + } + + c.ttlAfterFinishedWorker.AddAfter(client.ObjectKey{Name: r.Name}, *remainingTTL) + return nil, nil +} + +func timeLeft(r *appsv1alpha1.WorkloadRebalancer) (*time.Duration, *time.Time, error) { + now := time.Now() + finishAt := r.Status.LastUpdateTime.Time + expireAt := finishAt.Add(time.Duration(*r.Spec.TTLMinutesAfterFinished) * time.Second) + + if finishAt.After(now) { + klog.Infof("Found Rebalancer(%s) finished in the future. This is likely due to time skew in the cluster, cleanup will be deferred.", r.Name) + } + + remainingTTL := expireAt.Sub(now) + klog.V(4).Infof("Found Rebalancer(%s) finished, finishTime: %+v, remainingTTL: %+v, startTime: %+v, deadlineTTL: %+v", + r.Name, finishAt.UTC(), remainingTTL, now.UTC(), expireAt.UTC()) + + return &remainingTTL, &expireAt, nil +} + +// needsCleanup checks whether a WorkloadRebalancer has finished and has a TTL set. +func needsCleanup(r *appsv1alpha1.WorkloadRebalancer) bool { + return r.Spec.TTLMinutesAfterFinished != nil && isRebalancerFinished(r) +} + +// isRebalancerFinished checks whether the given WorkloadRebalancer has finished execution. +// It does not discriminate between successful and failed terminations. +func isRebalancerFinished(r *appsv1alpha1.WorkloadRebalancer) bool { + // if a finished WorkloadRebalancer is updated and didn't have time to refresh the status, + // it is regarded as non-finished since observedGeneration not equal to generation. + if r.Status.ObservedGeneration != r.Generation { + return false + } + for _, workload := range r.Status.ObservedWorkloads { + if workload.Result != appsv1alpha1.RebalanceSuccessful && workload.Result != appsv1alpha1.RebalanceFailed { + return false + } + } + return true +} diff --git a/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go b/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go index 925c11832f7d..26b755462fe1 100644 --- a/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go +++ b/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go @@ -35,32 +35,49 @@ import ( appsv1alpha1 "github.com/karmada-io/karmada/pkg/apis/apps/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/names" ) const ( // ControllerName is the controller name that will be used when reporting events. ControllerName = "workload-rebalancer" + + ttlAfterFinishedWorkerNum = 1 ) // RebalancerController is to handle a rebalance to workloads selected by WorkloadRebalancer object. type RebalancerController struct { - Client client.Client + Client client.Client // used to operate WorkloadRebalancer resources from cache. + ControlPlaneClient client.Client // used to fetch arbitrary resources from api server. + + ttlAfterFinishedWorker util.AsyncWorker } // SetupWithManager creates a controller and register to controller manager. func (c *RebalancerController) SetupWithManager(mgr controllerruntime.Manager) error { var predicateFunc = predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { return true }, + CreateFunc: func(e event.CreateEvent) bool { + c.ttlAfterFinishedWorker.Add(client.ObjectKey{Name: e.Object.GetName()}) + return true + }, UpdateFunc: func(e event.UpdateEvent) bool { oldObj := e.ObjectOld.(*appsv1alpha1.WorkloadRebalancer) newObj := e.ObjectNew.(*appsv1alpha1.WorkloadRebalancer) + c.ttlAfterFinishedWorker.Add(client.ObjectKey{Name: newObj.GetName()}) return !reflect.DeepEqual(oldObj.Spec, newObj.Spec) }, DeleteFunc: func(event.DeleteEvent) bool { return false }, GenericFunc: func(event.GenericEvent) bool { return false }, } + ttlAfterFinishedWorkerOptions := util.Options{ + Name: "ttl-after-finished-worker", + ReconcileFunc: c.deleteExpiredRebalancer, + } + c.ttlAfterFinishedWorker = util.NewAsyncWorker(ttlAfterFinishedWorkerOptions) + c.ttlAfterFinishedWorker.Run(ttlAfterFinishedWorkerNum, context.Background().Done()) + return controllerruntime.NewControllerManagedBy(mgr). Named(ControllerName). For(&appsv1alpha1.WorkloadRebalancer{}, builder.WithPredicates(predicateFunc)). @@ -137,7 +154,7 @@ func (c *RebalancerController) syncWorkloadsFromSpecToStatus(rebalancer *appsv1a return stri < strj }) - return &appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads} + return &appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads, ObservedGeneration: rebalancer.Generation} } func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) ( @@ -220,6 +237,10 @@ func (c *RebalancerController) recordAndCountRebalancerFailed(resource *appsv1al func (c *RebalancerController) updateWorkloadRebalancerStatus(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer, newStatus *appsv1alpha1.WorkloadRebalancerStatus) error { + if reflect.DeepEqual(rebalancer.Status, newStatus) { + return nil + } + rebalancerPatch := client.MergeFrom(rebalancer) modifiedRebalancer := rebalancer.DeepCopy() modifiedRebalancer.Status = *newStatus diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index e5ce759bb225..013e139d782e 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -759,6 +759,13 @@ func schema_pkg_apis_apps_v1alpha1_WorkloadRebalancerSpec(ref common.ReferenceCa }, }, }, + "ttlMinutesAfterFinished": { + SchemaProps: spec.SchemaProps{ + Description: "ttlMinutesAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each target workload is finished with result of Successful or Failed). If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted. If this field is unset, the WorkloadRebalancer won't be automatically deleted. If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes.", + Type: []string{"integer"}, + Format: "int32", + }, + }, }, Required: []string{"workloads"}, }, @@ -789,11 +796,24 @@ func schema_pkg_apis_apps_v1alpha1_WorkloadRebalancerStatus(ref common.Reference }, }, }, + "observedGeneration": { + SchemaProps: spec.SchemaProps{ + Description: "ObservedGeneration is the generation(.metadata.generation) observed by the controller. If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed the rebalance result or hasn't done the rebalance yet. optional", + Type: []string{"integer"}, + Format: "int64", + }, + }, + "lastUpdateTime": { + SchemaProps: spec.SchemaProps{ + Description: "LastUpdateTime represents the last update time of any field in WorkloadRebalancerStatus other than itself. optional", + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, }, }, }, Dependencies: []string{ - "github.com/karmada-io/karmada/pkg/apis/apps/v1alpha1.ObservedWorkload"}, + "github.com/karmada-io/karmada/pkg/apis/apps/v1alpha1.ObservedWorkload", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, } }