Skip to content

Commit

Permalink
support auto delete WorkloadRebalancer when time up
Browse files Browse the repository at this point in the history
Signed-off-by: chaosi-zju <[email protected]>
  • Loading branch information
chaosi-zju committed May 9, 2024
1 parent 0dad9dc commit 5ca7500
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 6 deletions.
14 changes: 14 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -16759,6 +16759,11 @@
"workloads"
],
"properties": {
"ttlMinutesAfterFinished": {
"description": "ttlMinutesAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each target workload is finished with result of Successful or Failed). If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted. If this field is unset, the WorkloadRebalancer won't be automatically deleted. If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes.",
"type": "integer",
"format": "int32"
},
"workloads": {
"description": "Workloads used to specify the list of expected resource. Nil or empty list is not allowed.",
"type": "array",
Expand All @@ -16773,6 +16778,15 @@
"description": "WorkloadRebalancerStatus contains information about the current status of a WorkloadRebalancer updated periodically by schedule trigger controller.",
"type": "object",
"properties": {
"lastUpdateTime": {
"description": "LastUpdateTime represents the last update time of any field in WorkloadRebalancerStatus other than itself. optional",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Time"
},
"observedGeneration": {
"description": "ObservedGeneration is the generation(.metadata.generation) observed by the controller. If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed the rebalance result or hasn't done the rebalance yet. optional",
"type": "integer",
"format": "int64"
},
"observedWorkloads": {
"description": "ObservedWorkloads contains information about the execution states and messages of target resources.",
"type": "array",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ spec:
description: Spec represents the specification of the desired behavior
of WorkloadRebalancer.
properties:
ttlMinutesAfterFinished:
description: |-
ttlMinutesAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each
target workload is finished with result of Successful or Failed).
If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted.
If this field is unset, the WorkloadRebalancer won't be automatically deleted.
If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes.
format: int32
type: integer
workloads:
description: |-
Workloads used to specify the list of expected resource.
Expand Down Expand Up @@ -76,6 +85,20 @@ spec:
status:
description: Status represents the status of WorkloadRebalancer.
properties:
lastUpdateTime:
description: |-
LastUpdateTime represents the last update time of any field in WorkloadRebalancerStatus other than itself.
optional
format: date-time
type: string
observedGeneration:
description: |-
ObservedGeneration is the generation(.metadata.generation) observed by the controller.
If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed
the rebalance result or hasn't done the rebalance yet.
optional
format: int64
type: integer
observedWorkloads:
description: ObservedWorkloads contains information about the execution
states and messages of target resources.
Expand Down
5 changes: 4 additions & 1 deletion cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ func startRemedyController(ctx controllerscontext.Context) (enabled bool, err er

func startWorkloadRebalancerController(ctx controllerscontext.Context) (enabled bool, err error) {
workloadRebalancer := workloadrebalancer.RebalancerController{
Client: ctx.Mgr.GetClient(),
Client: ctx.Mgr.GetClient(),
ControlPlaneClient: ctx.ControlPlaneClient,
}
err = workloadRebalancer.SetupWithManager(ctx.Mgr)
if err != nil {
Expand All @@ -727,6 +728,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
kubeClientSet := kubeclientset.NewForConfigOrDie(restConfig)
controlPlaneClient := gclient.NewForConfigOrDie(restConfig)

overrideManager := overridemanager.New(mgr.GetClient(), mgr.GetEventRecorderFor(overridemanager.OverrideManagerName))
skippedResourceConfig := util.NewSkippedResourceConfig()
Expand Down Expand Up @@ -812,6 +814,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
StopChan: stopChan,
DynamicClientSet: dynamicClientSet,
KubeClientSet: kubeClientSet,
ControlPlaneClient: controlPlaneClient,
OverrideManager: overrideManager,
ControlPlaneInformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
Expand Down
20 changes: 19 additions & 1 deletion pkg/apis/apps/v1alpha1/workloadrebalancer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ type WorkloadRebalancerSpec struct {
// +kubebuilder:validation:MinItems=1
// +required
Workloads []ObjectReference `json:"workloads"`

// ttlMinutesAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each
// target workload is finished with result of Successful or Failed).
// If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted.
// If this field is unset, the WorkloadRebalancer won't be automatically deleted.
// If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes.
// +optional
TTLMinutesAfterFinished *int32 `json:"ttlMinutesAfterFinished,omitempty"`
}

// ObjectReference the expected resource.
Expand Down Expand Up @@ -85,6 +93,16 @@ type WorkloadRebalancerStatus struct {
// ObservedWorkloads contains information about the execution states and messages of target resources.
// +optional
ObservedWorkloads []ObservedWorkload `json:"observedWorkloads,omitempty"`

// ObservedGeneration is the generation(.metadata.generation) observed by the controller.
// If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed
// the rebalance result or hasn't done the rebalance yet.
// optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`

// LastUpdateTime represents the last update time of any field in WorkloadRebalancerStatus other than itself.
// optional
LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"`
}

// ObservedWorkload the observed resource.
Expand Down Expand Up @@ -117,7 +135,7 @@ type RebalanceFailedReason string

const (
// RebalanceObjectNotFound the resource referenced binding not found.
RebalanceObjectNotFound RebalanceFailedReason = "NotFound"
RebalanceObjectNotFound RebalanceFailedReason = "ReferencedBindingNotFound"
)

// +kubebuilder:resource:scope="Cluster"
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/controllers/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
Expand Down Expand Up @@ -109,6 +110,7 @@ type Context struct {
StopChan <-chan struct{}
DynamicClientSet dynamic.Interface
KubeClientSet clientset.Interface
ControlPlaneClient client.Client
OverrideManager overridemanager.OverrideManager
ControlPlaneInformerManager genericmanager.SingleClusterInformerManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter
Expand Down
141 changes: 141 additions & 0 deletions pkg/controllers/workloadrebalancer/ttlafterfinished_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2024 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workloadrebalancer

import (
"context"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/karmada-io/karmada/pkg/apis/apps/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
)

func (c *RebalancerController) deleteExpiredRebalancer(key util.QueueKey) error {
objKey, ok := key.(client.ObjectKey)
if !ok {
klog.Errorf("Invalid object key: %+v", key)
return nil
}
klog.V(4).Infof("Checking if WorkloadRebalancer(%s) is ready for cleanup", objKey.Name)

// 1. Get WorkloadRebalancer from cache.
rebalancer := &appsv1alpha1.WorkloadRebalancer{}
if err := c.Client.Get(context.TODO(), objKey, rebalancer); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

// 2. Use the WorkloadRebalancer from cache to see if the TTL expires.
if expiredAt, err := c.processTTL(rebalancer); err != nil {
return err
} else if expiredAt == nil {
return nil
}

// 3. The WorkloadRebalancer's TTL is assumed to have expired, but the WorkloadRebalancer TTL might be stale.
// Before deleting the WorkloadRebalancer, do a final sanity check.
// If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
fresh := &appsv1alpha1.WorkloadRebalancer{}
if err := c.ControlPlaneClient.Get(context.TODO(), objKey, fresh); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

// 4. Use the latest WorkloadRebalancer directly from api server to see if the TTL truly expires.
if expiredAt, err := c.processTTL(fresh); err != nil {
return err
} else if expiredAt == nil {
return nil
}

// 5. deletes the WorkloadRebalancer if TTL truly expires.
options := &client.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &fresh.ResourceVersion}}
if err := c.ControlPlaneClient.Delete(context.TODO(), fresh, options); err != nil {
klog.Errorf("Cleaning up WorkloadRebalancer(%s) failed: %+v", fresh.Name, err)
return err
}
klog.V(4).Infof("Cleaning up WorkloadRebalancer(%s) successful.", fresh.Name)

return nil
}

// processTTL checks whether a given WorkloadRebalancer's TTL has expired, and add it to the queue after
// the TTL is expected to expire if the TTL will expire later.
func (c *RebalancerController) processTTL(r *appsv1alpha1.WorkloadRebalancer) (expiredAt *time.Time, err error) {
if !needsCleanup(r) {
return nil, nil
}

remainingTTL, expireAt, err := timeLeft(r)
if err != nil {
return nil, err
}

// TTL has expired
if *remainingTTL <= 0 {
return expireAt, nil
}

c.ttlAfterFinishedWorker.AddAfter(client.ObjectKey{Name: r.Name}, *remainingTTL)
return nil, nil
}

func timeLeft(r *appsv1alpha1.WorkloadRebalancer) (*time.Duration, *time.Time, error) {
now := time.Now()
finishAt := r.Status.LastUpdateTime.Time
expireAt := finishAt.Add(time.Duration(*r.Spec.TTLMinutesAfterFinished) * time.Second)

if finishAt.After(now) {
klog.Infof("Found Rebalancer(%s) finished in the future. This is likely due to time skew in the cluster, cleanup will be deferred.", r.Name)
}

remainingTTL := expireAt.Sub(now)
klog.V(4).Infof("Found Rebalancer(%s) finished, finishTime: %+v, remainingTTL: %+v, startTime: %+v, deadlineTTL: %+v",
r.Name, finishAt.UTC(), remainingTTL, now.UTC(), expireAt.UTC())

return &remainingTTL, &expireAt, nil
}

// needsCleanup checks whether a WorkloadRebalancer has finished and has a TTL set.
func needsCleanup(r *appsv1alpha1.WorkloadRebalancer) bool {
return r.Spec.TTLMinutesAfterFinished != nil && isRebalancerFinished(r)
}

// isRebalancerFinished checks whether the given WorkloadRebalancer has finished execution.
// It does not discriminate between successful and failed terminations.
func isRebalancerFinished(r *appsv1alpha1.WorkloadRebalancer) bool {
// if a finished WorkloadRebalancer is updated and didn't have time to refresh the status,
// it is regarded as non-finished since observedGeneration not equal to generation.
if r.Status.ObservedGeneration != r.Generation {
return false
}
for _, workload := range r.Status.ObservedWorkloads {
if workload.Result != appsv1alpha1.RebalanceSuccessful && workload.Result != appsv1alpha1.RebalanceFailed {
return false
}
}
return true
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,49 @@ import (

appsv1alpha1 "github.com/karmada-io/karmada/pkg/apis/apps/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/names"
)

const (
// ControllerName is the controller name that will be used when reporting events.
ControllerName = "workload-rebalancer"

ttlAfterFinishedWorkerNum = 1
)

// RebalancerController is to handle a rebalance to workloads selected by WorkloadRebalancer object.
type RebalancerController struct {
Client client.Client
Client client.Client // used to operate WorkloadRebalancer resources from cache.
ControlPlaneClient client.Client // used to fetch arbitrary resources from api server.

ttlAfterFinishedWorker util.AsyncWorker
}

// SetupWithManager creates a controller and register to controller manager.
func (c *RebalancerController) SetupWithManager(mgr controllerruntime.Manager) error {
var predicateFunc = predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return true },
CreateFunc: func(e event.CreateEvent) bool {
c.ttlAfterFinishedWorker.Add(client.ObjectKey{Name: e.Object.GetName()})
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj := e.ObjectOld.(*appsv1alpha1.WorkloadRebalancer)
newObj := e.ObjectNew.(*appsv1alpha1.WorkloadRebalancer)
c.ttlAfterFinishedWorker.Add(client.ObjectKey{Name: newObj.GetName()})
return !reflect.DeepEqual(oldObj.Spec, newObj.Spec)
},
DeleteFunc: func(event.DeleteEvent) bool { return false },
GenericFunc: func(event.GenericEvent) bool { return false },
}

ttlAfterFinishedWorkerOptions := util.Options{
Name: "ttl-after-finished-worker",
ReconcileFunc: c.deleteExpiredRebalancer,
}
c.ttlAfterFinishedWorker = util.NewAsyncWorker(ttlAfterFinishedWorkerOptions)
c.ttlAfterFinishedWorker.Run(ttlAfterFinishedWorkerNum, context.Background().Done())

return controllerruntime.NewControllerManagedBy(mgr).
Named(ControllerName).
For(&appsv1alpha1.WorkloadRebalancer{}, builder.WithPredicates(predicateFunc)).
Expand Down Expand Up @@ -137,7 +154,7 @@ func (c *RebalancerController) syncWorkloadsFromSpecToStatus(rebalancer *appsv1a
return stri < strj
})

return &appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads}
return &appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads, ObservedGeneration: rebalancer.Generation}
}

func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) (
Expand Down Expand Up @@ -220,6 +237,10 @@ func (c *RebalancerController) recordAndCountRebalancerFailed(resource *appsv1al

func (c *RebalancerController) updateWorkloadRebalancerStatus(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer,
newStatus *appsv1alpha1.WorkloadRebalancerStatus) error {
if reflect.DeepEqual(rebalancer.Status, newStatus) {
return nil
}

rebalancerPatch := client.MergeFrom(rebalancer)
modifiedRebalancer := rebalancer.DeepCopy()
modifiedRebalancer.Status = *newStatus
Expand Down
Loading

0 comments on commit 5ca7500

Please sign in to comment.