Skip to content

Commit

Permalink
fix scheduler memory leak
Browse files Browse the repository at this point in the history
Signed-off-by: 王凯 <[email protected]>
  • Loading branch information
Wang-Kai committed Aug 9, 2024
1 parent e811170 commit 5b13317
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 1 deletion.
4 changes: 4 additions & 0 deletions pkg/scheduler/api/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func PodKey(pod *v1.Pod) TaskID {
return TaskID(key)
}

func GetTaskStatus(pod *v1.Pod) TaskStatus {
return getTaskStatus(pod)
}

func getTaskStatus(pod *v1.Pod) TaskStatus {
switch pod.Status.Phase {
case v1.PodRunning:
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ type TaskInfo struct {
CustomBindErrHandlerSucceeded bool
}

func GetJobID(pod *v1.Pod) JobID {
return getJobID(pod)
}

func getJobID(pod *v1.Pod) JobID {
if gn, found := pod.Annotations[v1beta1.KubeGroupNameAnnotationKey]; found && len(gn) != 0 {
// Make sure Pod and PodGroup belong to the same namespace.
Expand Down
47 changes: 46 additions & 1 deletion pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,55 @@ func (sc *SchedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
klog.V(4).Infof("Pod <%s/%v> already in cache with allocated status, ignore the update event", newPod.Namespace, newPod.Name)
return nil
}
// the version has not changed, nothing to do
if oldPod.ResourceVersion == newPod.ResourceVersion {
return nil
}
// task status has not changed, nothing to do
oldTaskStatus := schedulingapi.GetTaskStatus(oldPod)
newTaskStatus := schedulingapi.GetTaskStatus(newPod)
if oldTaskStatus == newTaskStatus {
return nil
}

var (
jobID = schedulingapi.GetJobID(newPod)
taskID = schedulingapi.TaskID(newPod.UID)
jobInfo *schedulingapi.JobInfo
taskInfo *schedulingapi.TaskInfo
)

jobInfo = sc.Jobs[jobID]
if jobInfo == nil {
return fmt.Errorf("job %s not found in cache.Jobs", jobID)
}
taskInfo = jobInfo.Tasks[taskID]
if taskInfo == nil {
return fmt.Errorf("task %s not found in job.Tasks", taskID)
}

// delete taskInfo from job and node
if err := sc.deleteTask(taskInfo); err != nil {
return err
}

if err := sc.deletePod(oldPod); err != nil {
// update taskInfo by new pod
// 1. update taskInfo.Pod pointer, we don't hold oldPod pointer so that oldPod can be GC
// 2. update node name for pod bind event
// 3. update any other fields relative with annotations
// 4. update taskInfo status
taskInfo.Pod = newPod
taskInfo.NodeName = newPod.Spec.NodeName
taskInfo.Preemptable = schedulingapi.GetPodPreemptable(newPod)
taskInfo.RevocableZone = schedulingapi.GetPodRevocableZone(newPod)
taskInfo.NumaInfo = schedulingapi.GetPodTopologyInfo(newPod)
taskInfo.Status = newTaskStatus

// reuse taskInfo struct and add it to job and node, avoid being recyceld by GC
if err := sc.addTask(taskInfo); err != nil {
return err
}

//when delete pod, the ownerreference of pod will be set nil,just as orphan pod
if len(utils.GetController(newPod)) == 0 {
newPod.OwnerReferences = oldPod.OwnerReferences
Expand Down

0 comments on commit 5b13317

Please sign in to comment.