Skip to content

Commit

Permalink
support auto delete WorkloadRebalancer when time up
Browse files Browse the repository at this point in the history
Signed-off-by: chaosi-zju <[email protected]>
  • Loading branch information
chaosi-zju committed May 24, 2024
1 parent a9f9020 commit f5076cd
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 31 deletions.
14 changes: 14 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -16759,6 +16759,11 @@
"workloads"
],
"properties": {
"ttlSecondsAfterFinished": {
"description": "TTLSecondsAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each target workload is finished with result of Successful or Failed). If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted. If this field is unset, the WorkloadRebalancer won't be automatically deleted. If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes.",
"type": "integer",
"format": "int32"
},
"workloads": {
"description": "Workloads used to specify the list of expected resource. Nil or empty list is not allowed.",
"type": "array",
Expand All @@ -16773,6 +16778,15 @@
"description": "WorkloadRebalancerStatus contains information about the current status of a WorkloadRebalancer updated periodically by schedule trigger controller.",
"type": "object",
"properties": {
"finishTime": {
"description": "FinishTime represents the finish time of rebalancer.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Time"
},
"observedGeneration": {
"description": "ObservedGeneration is the generation(.metadata.generation) observed by the controller. If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed the rebalance result or hasn't done the rebalance yet.",
"type": "integer",
"format": "int64"
},
"observedWorkloads": {
"description": "ObservedWorkloads contains information about the execution states and messages of target resources.",
"type": "array",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ spec:
description: Spec represents the specification of the desired behavior
of WorkloadRebalancer.
properties:
ttlSecondsAfterFinished:
description: |-
TTLSecondsAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each
target workload is finished with result of Successful or Failed).
If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted.
If this field is unset, the WorkloadRebalancer won't be automatically deleted.
If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes.
format: int32
type: integer
workloads:
description: |-
Workloads used to specify the list of expected resource.
Expand Down Expand Up @@ -76,6 +85,17 @@ spec:
status:
description: Status represents the status of WorkloadRebalancer.
properties:
finishTime:
description: FinishTime represents the finish time of rebalancer.
format: date-time
type: string
observedGeneration:
description: |-
ObservedGeneration is the generation(.metadata.generation) observed by the controller.
If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed
the rebalance result or hasn't done the rebalance yet.
format: int64
type: integer
observedWorkloads:
description: ObservedWorkloads contains information about the execution
states and messages of target resources.
Expand Down
20 changes: 19 additions & 1 deletion pkg/apis/apps/v1alpha1/workloadrebalancer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ type WorkloadRebalancerSpec struct {
// +kubebuilder:validation:MinItems=1
// +required
Workloads []ObjectReference `json:"workloads"`

// TTLSecondsAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each
// target workload is finished with result of Successful or Failed).
// If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted.
// If this field is unset, the WorkloadRebalancer won't be automatically deleted.
// If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes.
// +optional
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`
}

// ObjectReference the expected resource.
Expand Down Expand Up @@ -85,6 +93,16 @@ type WorkloadRebalancerStatus struct {
// ObservedWorkloads contains information about the execution states and messages of target resources.
// +optional
ObservedWorkloads []ObservedWorkload `json:"observedWorkloads,omitempty"`

// ObservedGeneration is the generation(.metadata.generation) observed by the controller.
// If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed
// the rebalance result or hasn't done the rebalance yet.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`

// FinishTime represents the finish time of rebalancer.
// +optional
FinishTime *metav1.Time `json:"finishTime,omitempty"`
}

// ObservedWorkload the observed resource.
Expand Down Expand Up @@ -117,7 +135,7 @@ type RebalanceFailedReason string

const (
// RebalanceObjectNotFound the resource referenced binding not found.
RebalanceObjectNotFound RebalanceFailedReason = "NotFound"
RebalanceObjectNotFound RebalanceFailedReason = "ReferencedBindingNotFound"
)

// +kubebuilder:resource:scope="Cluster"
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 75 additions & 19 deletions pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"sort"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -48,10 +49,17 @@ type RebalancerController struct {
Client client.Client
}

type rebalancerResult struct {
newStatus *appsv1alpha1.WorkloadRebalancerStatus
statusChanged bool
successNum int64
retryNum int64
}

// SetupWithManager creates a controller and register to controller manager.
func (c *RebalancerController) SetupWithManager(mgr controllerruntime.Manager) error {
var predicateFunc = predicate.Funcs{
CreateFunc: func(event.CreateEvent) bool { return true },
CreateFunc: func(_ event.CreateEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj := e.ObjectOld.(*appsv1alpha1.WorkloadRebalancer)
newObj := e.ObjectNew.(*appsv1alpha1.WorkloadRebalancer)
Expand Down Expand Up @@ -84,18 +92,34 @@ func (c *RebalancerController) Reconcile(ctx context.Context, req controllerrunt
}

// 2. get and update referenced binding to trigger a rescheduling
newStatus, successNum, retryNum := c.doWorkloadRebalance(ctx, rebalancer)
result := c.doWorkloadRebalance(ctx, rebalancer)

// 3. update status of WorkloadRebalancer
if err := c.updateWorkloadRebalancerStatus(ctx, rebalancer, newStatus); err != nil {
return controllerruntime.Result{}, err
// 3. update status of WorkloadRebalancer if statusChanged
if result.statusChanged {
if err := c.updateWorkloadRebalancerStatus(ctx, rebalancer, result.newStatus); err != nil {
return controllerruntime.Result{}, err
}
}

klog.Infof("Finish handling WorkloadRebalancer (%s), %d/%d resource success in all, while %d resource need retry",
rebalancer.Name, successNum, len(newStatus.ObservedWorkloads), retryNum)
rebalancer.Name, result.successNum, len(result.newStatus.ObservedWorkloads), result.retryNum)

if result.retryNum > 0 {
return controllerruntime.Result{}, fmt.Errorf("%d resource reschedule triggered failed and need retry", result.retryNum)
}

if retryNum > 0 {
return controllerruntime.Result{}, fmt.Errorf("%d resource reschedule triggered failed and need retry", retryNum)
// 4. when no more resource needs retry, judge whether it needs cleanup.
rebalancer.Status = *result.newStatus
if rebalancer.Spec.TTLSecondsAfterFinished != nil && rebalancer.Status.FinishTime != nil {
remainingTTL := timeLeft(rebalancer)
if remainingTTL > 0 {
return controllerruntime.Result{RequeueAfter: remainingTTL}, nil
}
if err := c.deleteWorkloadRebalancer(ctx, rebalancer); err != nil {
return controllerruntime.Result{}, err
}
}

return controllerruntime.Result{}, nil
}

Expand Down Expand Up @@ -132,21 +156,19 @@ func (c *RebalancerController) syncWorkloadsFromSpecToStatus(rebalancer *appsv1a
sort.Slice(observedWorkloads, func(i, j int) bool {
wi := observedWorkloads[i].Workload
wj := observedWorkloads[j].Workload
stri := fmt.Sprintf("%s#%s#%s#%s", wi.APIVersion, wi.Kind, wi.Namespace, wi.Name)
strj := fmt.Sprintf("%s#%s#%s#%s", wj.APIVersion, wj.Kind, wj.Namespace, wj.Name)
stri := fmt.Sprintf("%s %s %s %s", wi.APIVersion, wi.Kind, wi.Namespace, wi.Name)
strj := fmt.Sprintf("%s %s %s %s", wj.APIVersion, wj.Kind, wj.Namespace, wj.Name)
return stri < strj
})

return &appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads}
return &appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads, ObservedGeneration: rebalancer.Generation}
}

func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) (
newStatus *appsv1alpha1.WorkloadRebalancerStatus, successNum int64, retryNum int64) {
func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) rebalancerResult {
// get previous status and update basing on it
newStatus := c.syncWorkloadsFromSpecToStatus(rebalancer)

newStatus = c.syncWorkloadsFromSpecToStatus(rebalancer)

successNum, retryNum = int64(0), int64(0)
successNum, retryNum := int64(0), int64(0)
for i, resource := range newStatus.ObservedWorkloads {
if resource.Result == appsv1alpha1.RebalanceSuccessful {
successNum++
Expand Down Expand Up @@ -196,7 +218,20 @@ func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalanc
c.recordAndCountRebalancerSuccess(&newStatus.ObservedWorkloads[i], &successNum)
}
}
return

statusChanged := false
newStatus.FinishTime = rebalancer.Status.FinishTime
// compare whether newStatus equals to old status except finishTime field.
if !reflect.DeepEqual(rebalancer.Status, *newStatus) {
statusChanged = true
// retryNum is 0 means the latest rebalancer has finished, which should update finishTime.
if retryNum == 0 {
finishTime := metav1.Now()
newStatus.FinishTime = &finishTime
}
}

return rebalancerResult{newStatus: newStatus, statusChanged: statusChanged, successNum: successNum, retryNum: retryNum}
}

func (c *RebalancerController) needTriggerReschedule(creationTimestamp metav1.Time, rescheduleTriggeredAt *metav1.Time) bool {
Expand All @@ -220,16 +255,37 @@ func (c *RebalancerController) recordAndCountRebalancerFailed(resource *appsv1al

func (c *RebalancerController) updateWorkloadRebalancerStatus(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer,
newStatus *appsv1alpha1.WorkloadRebalancerStatus) error {
rebalancerPatch := client.MergeFrom(rebalancer)
modifiedRebalancer := rebalancer.DeepCopy()
modifiedRebalancer.Status = *newStatus

return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
klog.V(4).Infof("Start to patch WorkloadRebalancer(%s) status", rebalancer.Name)
if err = c.Client.Status().Patch(ctx, modifiedRebalancer, rebalancerPatch); err != nil {
if err = c.Client.Status().Patch(ctx, modifiedRebalancer, client.MergeFrom(rebalancer)); err != nil {
klog.Errorf("Failed to patch WorkloadRebalancer (%s) status, err: %+v", rebalancer.Name, err)
return err
}
klog.V(4).Infof("Patch WorkloadRebalancer(%s) successful", rebalancer.Name)
return nil
})
}

func (c *RebalancerController) deleteWorkloadRebalancer(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) error {
klog.V(4).Infof("Start to clean up WorkloadRebalancer(%s)", rebalancer.Name)

options := &client.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &rebalancer.ResourceVersion}}
if err := c.Client.Delete(ctx, rebalancer, options); err != nil {
klog.Errorf("Cleaning up WorkloadRebalancer(%s) failed: %+v.", rebalancer.Name, err)
return err
}

klog.V(4).Infof("Cleaning up WorkloadRebalancer(%s) successful", rebalancer.Name)
return nil
}

func timeLeft(r *appsv1alpha1.WorkloadRebalancer) time.Duration {
expireAt := r.Status.FinishTime.Add(time.Duration(*r.Spec.TTLSecondsAfterFinished) * time.Second)
remainingTTL := time.Until(expireAt)

klog.V(4).Infof("Found Rebalancer(%s) finished at: %+v, remainingTTL: %+v", r.Name, r.Status.FinishTime.UTC(), remainingTTL)
return remainingTTL
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"time"

appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -121,6 +123,22 @@ var (
},
},
}
ttlFinishedRebalancer = &appsv1alpha1.WorkloadRebalancer{
ObjectMeta: metav1.ObjectMeta{Name: "ttl-finished-rebalancer", CreationTimestamp: oneHourAgo},
Spec: appsv1alpha1.WorkloadRebalancerSpec{
TTLSecondsAfterFinished: pointer.Int32(5),
Workloads: []appsv1alpha1.ObjectReference{deploy1Obj},
},
Status: appsv1alpha1.WorkloadRebalancerStatus{
FinishTime: &oneHourAgo,
ObservedWorkloads: []appsv1alpha1.ObservedWorkload{
{
Workload: deploy1Obj,
Result: appsv1alpha1.RebalanceSuccessful,
},
},
},
}
)

func TestRebalancerController_Reconcile(t *testing.T) {
Expand All @@ -131,6 +149,7 @@ func TestRebalancerController_Reconcile(t *testing.T) {
existObjsWithStatus []client.Object
wantErr bool
wantStatus appsv1alpha1.WorkloadRebalancerStatus
needsCleanup bool
}{
{
name: "reconcile pendingRebalancer",
Expand Down Expand Up @@ -222,6 +241,15 @@ func TestRebalancerController_Reconcile(t *testing.T) {
},
},
},
{
name: "reconcile ttlFinishedRebalancer",
req: controllerruntime.Request{
NamespacedName: types.NamespacedName{Name: ttlFinishedRebalancer.Name},
},
existObjects: []client.Object{deploy1, binding1, ttlFinishedRebalancer},
existObjsWithStatus: []client.Object{ttlFinishedRebalancer},
needsCleanup: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -238,8 +266,14 @@ func TestRebalancerController_Reconcile(t *testing.T) {
// 2. check final WorkloadRebalancer status
rebalancerGet := &appsv1alpha1.WorkloadRebalancer{}
if err := c.Client.Get(context.TODO(), tt.req.NamespacedName, rebalancerGet); err != nil {
if apierrors.IsNotFound(err) && tt.needsCleanup {
t.Logf("WorkloadRebalancer %s has be cleaned up as expected", tt.req.NamespacedName)
return
}
t.Fatalf("get WorkloadRebalancer failed: %+v", err)
}
// we can't predict `FinishTime` in `wantStatus`, so not compare this field.
tt.wantStatus.FinishTime = rebalancerGet.Status.FinishTime
if !reflect.DeepEqual(rebalancerGet.Status, tt.wantStatus) {
t.Fatalf("update WorkloadRebalancer failed, got: %+v, want: %+v", rebalancerGet.Status, tt.wantStatus)
}
Expand Down Expand Up @@ -288,17 +322,17 @@ func TestRebalancerController_updateWorkloadRebalancerStatus(t *testing.T) {
WithObjects(tt.rebalancer, tt.modifiedRebalancer).
WithStatusSubresource(tt.rebalancer, tt.modifiedRebalancer).Build(),
}
newStatus := tt.modifiedRebalancer.Status
err := c.updateWorkloadRebalancerStatus(context.TODO(), tt.rebalancer, &newStatus)
wantStatus := tt.modifiedRebalancer.Status
err := c.updateWorkloadRebalancerStatus(context.TODO(), tt.rebalancer, &wantStatus)
if (err == nil && tt.wantErr) || (err != nil && !tt.wantErr) {
t.Fatalf("updateWorkloadRebalancerStatus() error = %v, wantErr %v", err, tt.wantErr)
}
rebalancerGet := &appsv1alpha1.WorkloadRebalancer{}
if err := c.Client.Get(context.TODO(), client.ObjectKey{Name: tt.modifiedRebalancer.Name}, rebalancerGet); err != nil {
if err := c.Client.Get(context.TODO(), client.ObjectKey{Name: tt.rebalancer.Name}, rebalancerGet); err != nil {
t.Fatalf("get WorkloadRebalancer failed: %+v", err)
}
if !reflect.DeepEqual(rebalancerGet.Status, newStatus) {
t.Fatalf("update WorkloadRebalancer failed, got: %+v, want: %+v", rebalancerGet.Status, tt.modifiedRebalancer.Status)
if !reflect.DeepEqual(rebalancerGet.Status, wantStatus) {
t.Fatalf("update WorkloadRebalancer failed, got: %+v, want: %+v", rebalancerGet.Status, wantStatus)
}
})
}
Expand Down
Loading

0 comments on commit f5076cd

Please sign in to comment.