Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add auto scale out #6

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/v1alpha1/trainingjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ type TrainingJobSpec struct {
// Defaults to 1.
// +optional
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`

// ScalePolicy defines whether to enable auto scale of workers if possible.
// +optional
ScalePolicy *string `json:"scalePolicy,omitempty"`
// AutoScaleIntervalSeconds defines the period of seconds between each auto scale.
// +optional
AutoScaleIntervalSeconds *int32 `json:"autoScaleIntervalSeconds,omitempty"`
// AutoScaleTimeoutSeconds defines the timeout seconds of auto scale.
// +optional
AutoScaleTimeoutSeconds *int32 `json:"autoScaleTimeoutSeconds,omitempty"`
}

type ETReplicaSpecs struct {
Expand Down
15 changes: 15 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions config/crd/bases/kai.alibabacloud.com_trainingjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ spec:
spec:
description: TrainingJobSpec defines the desired state of TrainingJob
properties:
autoScaleIntervalSeconds:
description: AutoScaleIntervalSeconds defines the period of seconds
between each auto scale.
format: int32
type: integer
autoScaleTimeoutSeconds:
description: AutoScaleTimeoutSeconds defines the timeout seconds of
auto scale.
format: int32
type: integer
cleanPodPolicy:
description: CleanPodPolicy defines the policy that whether to kill
pods after the job completes. Defaults to None.
Expand Down Expand Up @@ -13132,6 +13142,10 @@ spec:
- worker
type: object
x-kubernetes-preserve-unknown-fields: true
scalePolicy:
description: ScalePolicy defines whether to enable auto scale of workers
if possible.
type: string
launcherAttachMode:
description: Specifies the mode when launcher attach to workers. available
option is ssh / kubexec Defaults is kubexec.
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (

// scalingInCreatedReason is added in a scalein when it is created.
scalingStartReason = "ScalingStart"
// autoScalingException
autoScalingException = "TrainingJobAutoScaleFailed"
)

// initializeTrainingJobStatuses initializes the ReplicaStatuses for TrainingJob.
Expand Down
81 changes: 81 additions & 0 deletions pkg/controllers/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers
import (
"context"
"fmt"
"time"

kaiv1alpha1 "github.com/AliyunContainerService/et-operator/api/v1alpha1"
logger "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -56,6 +57,12 @@ func (r *TrainingJobReconciler) syncWorkersState(job *kaiv1alpha1.TrainingJob) e

r.workerReplicasStatus(job.GetJobStatus(), workers)

r.handleWorkersAutoScale(job, workers)
//err = r.handleWorkersAutoScale(job, workers)
//if err != nil {
// return err
//}

err = r.handleWorkersFailed(job, workers)
if err != nil {
return err
Expand All @@ -78,6 +85,80 @@ func (r *TrainingJobReconciler) waitWorkersRunning(job *kaiv1alpha1.TrainingJob)
return nil
}

func (r *TrainingJobReconciler) handleWorkersAutoScale(job *kaiv1alpha1.TrainingJob, pods []corev1.Pod) error {
if job.Spec.ScalePolicy == nil || *job.Spec.ScalePolicy != "Auto" {
logger.Info("no need to autoscale")
return nil
}
currentWorkers := []string{}
maxWorkers := job.Spec.ETReplicaSpecs.Worker.MaxReplicas
currentWorkers = job.Status.CurrentWorkers

if len(currentWorkers) != len(pods) {
logger.Info("no need to autoscale when currentworkers not equals to worker pods.")
return nil
}

var autoScaleTimeout int32 = 30
var autoScaleInterval int32 = 60
if job.Spec.AutoScaleIntervalSeconds != nil {
autoScaleInterval = *job.Spec.AutoScaleIntervalSeconds
}
if job.Spec.AutoScaleTimeoutSeconds != nil {
autoScaleTimeout = *job.Spec.AutoScaleTimeoutSeconds
}
scaleCount := *maxWorkers - int32(len(currentWorkers))

hasScaling := false
scalingTimeout := false
for _, condition := range job.Status.Conditions {
if condition.Type == common.Scaling && condition.Reason == scalingStartReason {
hasScaling = true
scalingTimeout = condition.LastUpdateTime.Add(time.Duration(autoScaleInterval) * time.Second).Before(time.Now())
}
}
if scaleCount > 0 && (!hasScaling || scalingTimeout) {
scaleOut := kaiv1alpha1.ScaleOut{
ObjectMeta: metav1.ObjectMeta{
Name: job.Name + "-autoscaleout",
Namespace: job.Namespace,
},
Spec: kaiv1alpha1.ScaleOutSpec{
Selector: kaiv1alpha1.Selector{Name: job.Name},
ScaleScriptSpec: kaiv1alpha1.ScaleScriptSpec{
Timeout: &autoScaleTimeout,
},
ToAdd: &kaiv1alpha1.ToAddSpec{Count: &scaleCount},
},
}

msg := fmt.Sprintf("trainingjob(%s/%s): create autoscaleout %++v", job.Namespace, job.Name, scaleOut.Spec)
logger.Infof(msg)
if err := r.Client.Create(context.Background(), &scaleOut); err != nil {
msg := fmt.Sprintf("trainingjob(%s/%s): failed to create autoscale %++v.", job.Namespace, job.Name, scaleOut.Spec)
r.recorder.Event(job, corev1.EventTypeWarning, autoScalingException, msg)
logger.Infof(msg)
return err
}
// call delete after interval
time.AfterFunc(time.Duration(autoScaleInterval)*time.Second, func() {
if err := r.Client.Delete(context.Background(), &scaleOut); err != nil {
msg := fmt.Sprintf("trainingjob(%s/%s): failed to delete autoscale %++v, err: %s", job.Namespace, job.Name, scaleOut, err)
r.recorder.Event(job, corev1.EventTypeWarning, autoScalingException, msg)
logger.Infof(msg)
}
msg := fmt.Sprintf("trainingjob(%s/%s): delete autoscaleout %++v", job.Namespace, job.Name, scaleOut.Spec)
logger.Infof(msg)
})

r.recorder.Event(job, corev1.EventTypeNormal, scalingStartReason, msg)
updateJobConditions(job.GetJobStatus(), common.Scaling, scalingStartReason, msg)
return nil
}

return nil
}

func (r *TrainingJobReconciler) handleWorkersFailed(job *kaiv1alpha1.TrainingJob, pods []corev1.Pod) error {
lastRunningPods := map[string]bool{}
for _, worker := range job.Status.CurrentWorkers {
Expand Down