Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

constraint replica within [min,max] while training or scaling #10

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1alpha1/scalein_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ScaleInSpec struct {
}

type ToDeleteSpec struct {
Count int `json:"count,omitempty"`
Count int32 `json:"count,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need change this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep same with ScaleOut schema

PodNames []string `json:"podNames,omitempty"`
}

Expand Down
1 change: 1 addition & 0 deletions config/crd/bases/kai.alibabacloud.com_scaleins.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ spec:
toDelete:
properties:
count:
format: int32
type: integer
podNames:
items:
Expand Down
4 changes: 2 additions & 2 deletions examples/scale_in_count.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
apiVersion: kai.alibabacloud.com/v1alpha1
kind: ScaleIn
metadata:
generateName: scalein-sample-
name: scalein-sample
spec:
selector:
name: elastic-training
toDelete:
count: 1
count: 1
4 changes: 2 additions & 2 deletions examples/scale_out.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
apiVersion: kai.alibabacloud.com/v1alpha1
kind: ScaleOut
metadata:
generateName: elastic-training-scaleout-
name: elastic-training-scaleout
spec:
selector:
name: elastic-training
timeout: 300
toAdd:
count: 2
count: 1
2 changes: 1 addition & 1 deletion examples/training_job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ spec:
limits:
nvidia.com/gpu: "1"
requests:
nvidia.com/gpu: "1"
nvidia.com/gpu: "1"
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func main() {
}
// +kubebuilder:scaffold:builder

setupLog.Info("starting manager")
setupLog.Info("starting et-operator manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ const (
// reached phase failed with no restarting.
// The training has failed its execution.
JobFailed JobConditionType = "Failed"
ValidateFailed JobConditionType = "ValidateFailed"

Scaling JobConditionType = "Scaling"
ScaleFailed JobConditionType = "ScaleFailed"
ScaleAborted JobConditionType = "ScaleAborted"
ScaleSucceeded JobConditionType = "ScaleSucceeded"
ScriptExecuted JobConditionType = "ScriptExecuted"

Expand Down
27 changes: 22 additions & 5 deletions pkg/controllers/scalein.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ import (
"strings"
)

func (r *TrainingJobReconciler) validateScaleIn(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error {
toDelete := scaleIn.Spec.ToDelete
if toDelete == nil {
return fmt.Errorf(".spec.toDelete.count shouldn't be empty")
}
cnt := int32((*toDelete).Count)
if cnt <= 0 {
return fmt.Errorf(".spec.toAdd.count shouldn be greater than 0")
}
deltaCnt := -cnt
return r.validateReplica(job, &deltaCnt)
}

func (r *TrainingJobReconciler) executeScaleIn(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error {
if scaleIn.DeletionTimestamp != nil || isScaleFinished(*scaleIn.GetJobStatus()) {
logger.Info("reconcile cancelled, scalein does not need to do reconcile or has been deleted")
Expand All @@ -20,8 +33,12 @@ func (r *TrainingJobReconciler) executeScaleIn(job *kaiv1alpha1.TrainingJob, sca

initializeJobStatus(scaleIn.GetJobStatus())

//TODO: Validate the scalein count for minSize
err := r.setsSaleInToDelete(job, scaleIn)
if err := r.validateScaleIn(job, scaleIn); err != nil {
r.updateScalerAbort(scaleIn, job, err.Error())
return nil
}

err := r.setsScaleInToDelete(job, scaleIn)
if err != nil {
msg := fmt.Sprintf("%s get to delete workers name failed, error: %v", scaleIn.GetFullName(), err)
r.updateScalerFailed(scaleIn, job, msg)
Expand Down Expand Up @@ -132,7 +149,7 @@ func (r *TrainingJobReconciler) isWorkersDeleted(namespace string, workersName [
return true, nil
}

func (r *TrainingJobReconciler) setsSaleInToDelete(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error {
func (r *TrainingJobReconciler) setsScaleInToDelete(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error {
podNames := scaleIn.Status.ToDeletePods
if len(podNames) != 0 {
return /*filterPodNames(workers, podNames, false), */ nil
Expand All @@ -148,9 +165,9 @@ func (r *TrainingJobReconciler) setsSaleInToDelete(job *kaiv1alpha1.TrainingJob,
if toDelete.PodNames != nil {
workers = filterPodNames(workers, toDelete.PodNames, false)
} else if toDelete.Count > 0 {
if toDelete.Count < len(workers) {
if int(toDelete.Count) < len(workers) {
allPodNames := getSortPodNames(job.Name, workers)
deletePodNames := allPodNames[len(workers)-toDelete.Count:]
deletePodNames := allPodNames[len(workers)-int(toDelete.Count):]
workers = filterPodNames(workers, deletePodNames, false)
} else {
return fmt.Errorf(".spec.toDelete.count should be less than current replicas %d", len(workers))
Expand Down
13 changes: 7 additions & 6 deletions pkg/controllers/scaleout.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func (r *TrainingJobReconciler) executeScaleOut(job *kaiv1alpha1.TrainingJob, sc

initializeJobStatus(scaleOut.GetJobStatus())

if err := r.validateScaleOut(scaleOut); err != nil {
r.updateScalerFailed(scaleOut, job, err.Error())
return err
if err := r.validateScaleOut(job, scaleOut); err != nil {
r.updateScalerAbort(scaleOut, job, err.Error())
return nil
}

if err := r.setScaleOutWorkers(job, scaleOut); err != nil {
Expand Down Expand Up @@ -66,14 +66,15 @@ func (r *TrainingJobReconciler) executeScaleOut(job *kaiv1alpha1.TrainingJob, sc
return nil
}

func (r *TrainingJobReconciler) validateScaleOut(scaleOut *kaiv1alpha1.ScaleOut) error {
func (r *TrainingJobReconciler) validateScaleOut(job *kaiv1alpha1.TrainingJob, scaleOut *kaiv1alpha1.ScaleOut) error {
if scaleOut.Spec.ToAdd == nil || scaleOut.Spec.ToAdd.Count == nil {
return fmt.Errorf(".spec.toAdd.count shouldn't be empty")
}
if *scaleOut.Spec.ToAdd.Count <= 0 {
cnt := *scaleOut.Spec.ToAdd.Count
if cnt <= 0 {
return fmt.Errorf(".spec.toAdd.count shouldn be greater than 0")
}
return nil
return r.validateReplica(job, &cnt)
}

func (r *TrainingJobReconciler) ScaleOutFailed(job *kaiv1alpha1.TrainingJob, scaleOut *kaiv1alpha1.ScaleOut, msg string) error {
Expand Down
7 changes: 7 additions & 0 deletions pkg/controllers/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ func (r *TrainingJobReconciler) updateCurrentScaler(job *kaiv1alpha1.TrainingJob
return nil
}

func (r *TrainingJobReconciler) updateScalerAbort(scaleObj Scaler, trainingJob *kaiv1alpha1.TrainingJob, msg string) error {
logger.Error(msg)
r.recorder.Event(scaleObj, corev1.EventTypeWarning, "", msg)
reason := fmt.Sprintf("%s%s", scaleObj.GetScaleType(), commonv1.ValidateFailed)
return r.updateScalerState(scaleObj, trainingJob, newCondition(commonv1.ScaleAborted, reason, msg))
}

func (r *TrainingJobReconciler) updateScalerFailed(scaleObj Scaler, trainingJob *kaiv1alpha1.TrainingJob, msg string) error {
logger.Error(msg)
r.recorder.Event(scaleObj, corev1.EventTypeWarning, "", msg)
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (
trainingJobRunningReason = "TrainingJobRunning"
// trainingJobFailedReason is added in a trainingjob when it is failed.
trainingJobFailedReason = "TrainingJobFailed"
// spec invalidate failed
trainingJobValidateFailedReason = "TrainingJobValidateFailed"
// trainingJobEvict
trainingJobEvict = "TrainingJobEvicted"
// trainingJobWorkerException
Expand Down Expand Up @@ -53,6 +55,7 @@ func isScaleFinished(status common.JobStatus) bool {
func isScaleFailed(status common.JobStatus) bool {
return hasCondition(status, common.ScaleFailed)
}

func isScaleSucceeded(status common.JobStatus) bool {
return hasCondition(status, common.ScaleSucceeded)
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/controllers/trainingjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,30 @@ func GenLabels(jobName string) map[string]string {
}
}

func (r *TrainingJobReconciler) validateReplica(job *kaiv1alpha1.TrainingJob, delta *int32) error {
replicas := job.Spec.ETReplicaSpecs.Worker.Replicas
if nil == replicas {
return fmt.Errorf("worker replca empty")
}
replica := *replicas
if delta != nil {
replica += *delta
}
maxReplica := job.Spec.ETReplicaSpecs.Worker.MaxReplicas
if nil != maxReplica {
if replica > *maxReplica {
return fmt.Errorf("worker replica:%d > max:%d", replica, *maxReplica)
}
}
minReplica := job.Spec.ETReplicaSpecs.Worker.MinReplicas
if nil != minReplica {
if replica < *minReplica {
return fmt.Errorf("worker replica:%d < min:%d", replica, *minReplica)
}
}
return nil
}

func (r *TrainingJobReconciler) ReconcileJobs(job *kaiv1alpha1.TrainingJob) (result reconcile.Result, err error) {
oldJobStatus := job.Status.DeepCopy()

Expand Down Expand Up @@ -244,6 +268,7 @@ func (r *TrainingJobReconciler) initializeJob(job *kaiv1alpha1.TrainingJob) {
now := metav1.Now()
job.Status.StartTime = &now
}

return
}

Expand All @@ -268,11 +293,18 @@ type Step struct {
}

func (r *TrainingJobReconciler) reconcileResource(job *kaiv1alpha1.TrainingJob) error {
if err := r.validateReplica(job, nil); err != nil {
logger.Warnf("job:%s phase:%s validate replica failed:%s", job.Name, job.Status.Phase, err)
updateJobConditions(job.GetJobStatus(), commonv1.JobFailed, trainingJobValidateFailedReason, err.Error())
updatePhase(job.GetJobStatus(), commonv1.JobFailed)
return nil
}
steps := r.newSteps()
err := r.doSteps(job, steps)
if err != nil {
r.Log.Error(err, "failed to reconcileResource")
}

return err
}

Expand Down
1 change: 0 additions & 1 deletion pkg/controllers/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
)

func (r *TrainingJobReconciler) createTrainingJobWorkers(job *kaiv1alpha1.TrainingJob) error {

if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
if cm, err := r.GetOrCreateSecret(job); cm == nil || err != nil {
msg := fmt.Sprintf("job(%s/%s) create secret failed, error: %v", job.Namespace, job.Name, err)
Expand Down