Skip to content

Commit

Permalink
Merge pull request #3973 from bibibox/add_job_policy_timeout
Browse files Browse the repository at this point in the history
Fix the restartPod action will cancel the restartJob delay action
  • Loading branch information
volcano-sh-bot authored Jan 18, 2025
2 parents 37cb882 + 452e15d commit 1c60d46
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/controllers/apis/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Request struct {
TaskName string
QueueName string
PodName string
PodUID types.UID

Event v1alpha1.Event
ExitCode int32
Expand Down
65 changes: 56 additions & 9 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -72,6 +73,9 @@ type delayAction struct {
// The name of the pod
podName string

// The UID of the pod
podUID types.UID

// The event caused the action
event busv1alpha1.Event

Expand Down Expand Up @@ -405,7 +409,7 @@ func (cc *jobcontroller) processNextReq(count uint32) bool {

// If the action is not an internal action, cancel all delayed actions
if !isInternalAction(delayAct.action) {
cc.cleanupDelayActions(delayAct.jobKey)
cc.cleanupDelayActions(delayAct)
}

return true
Expand All @@ -416,6 +420,11 @@ func (cc *jobcontroller) processNextReq(count uint32) bool {
// - cancel corresponding Pod Pending delayed action
// - if the event is PodRunning state, cancel corresponding Pod Failed and Pod Evicted delayed actions
func (cc *jobcontroller) CleanPodDelayActionsIfNeed(req apis.Request) {
// Skip cleaning delayed actions for non-pod events
if !cc.isPodEvent(req) {
return
}

if req.Event != busv1alpha1.PodPendingEvent {
key := jobcache.JobKeyByReq(&req)
cc.delayActionMapLock.Lock()
Expand All @@ -426,7 +435,12 @@ func (cc *jobcontroller) CleanPodDelayActionsIfNeed(req apis.Request) {
shouldCancel := false

if delayAct.event == busv1alpha1.PodPendingEvent {
shouldCancel = true
// For PodPending delayed action, we need to check if the Pod UID matches
// Because if a Pod is deleted and immediately recreated,
// the new Pod's pending event may be queued before the old Pod's delete event
if req.PodUID == delayAct.podUID {
shouldCancel = true
}
}

if (delayAct.event == busv1alpha1.PodFailedEvent || delayAct.event == busv1alpha1.PodEvictedEvent) &&
Expand All @@ -435,7 +449,7 @@ func (cc *jobcontroller) CleanPodDelayActionsIfNeed(req apis.Request) {
}

if shouldCancel {
klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> of Job <%s>", delayAct.action, req.PodName, delayAct.jobKey)
klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> because of event <%s> of Job <%s>", delayAct.action, req.PodName, req.Event, delayAct.jobKey)
delayAct.cancel()
delete(taskMap, req.PodName)
}
Expand All @@ -444,6 +458,13 @@ func (cc *jobcontroller) CleanPodDelayActionsIfNeed(req apis.Request) {
}
}

func (cc *jobcontroller) isPodEvent(req apis.Request) bool {
return req.Event == busv1alpha1.PodPendingEvent ||
req.Event == busv1alpha1.PodRunningEvent ||
req.Event == busv1alpha1.PodFailedEvent ||
req.Event == busv1alpha1.PodEvictedEvent
}

func (cc *jobcontroller) AddDelayActionForJob(req apis.Request, delayAct *delayAction) {
cc.delayActionMapLock.Lock()
defer cc.delayActionMapLock.Unlock()
Expand Down Expand Up @@ -490,7 +511,7 @@ func (cc *jobcontroller) AddDelayActionForJob(req apis.Request, delayAct *delayA

queue.Forget(req)

cc.cleanupDelayActions(delayAct.jobKey)
cc.cleanupDelayActions(delayAct)
}()
}

Expand All @@ -513,16 +534,42 @@ func (cc *jobcontroller) handleJobError(queue workqueue.TypedRateLimitingInterfa
req.Namespace, req.JobName, err)
}

func (cc *jobcontroller) cleanupDelayActions(jobKey string) {
// cleanupDelayActions cleans up delayed actions
// After a delayed action is executed, other delayed actions of the same type need to be cleaned up to avoid duplicate execution
// Parameters:
// - currentDelayAction: the delayed action that has just been executed
//
// Implementation logic:
// 1. Get the type of current delayed action (Job level, Task level or Pod level)
// 2. Iterate through all delayed actions under this Job
// 3. If the delayed action type matches, cancel it and remove from the map
//
// Usage scenarios:
// - When a Pod failure triggers Job termination, need to cancel other Job delayed actions under this Job
func (cc *jobcontroller) cleanupDelayActions(currentDelayAction *delayAction) {
cc.delayActionMapLock.Lock()
defer cc.delayActionMapLock.Unlock()

if m, exists := cc.delayActionMap[jobKey]; exists {
actionType := GetActionType(currentDelayAction.action)

if m, exists := cc.delayActionMap[currentDelayAction.jobKey]; exists {
for _, delayAct := range m {
if delayAct.cancel != nil {
delayAct.cancel()
if GetActionType(delayAct.action) == actionType {
// For Task level actions, only cancel delayed actions for the same task
if actionType == TaskAction && delayAct.taskName != currentDelayAction.taskName {
continue
}
// For Pod level actions, only cancel delayed actions for the same pod
if actionType == PodAction && delayAct.podName != currentDelayAction.podName {
continue
}

if delayAct.cancel != nil {
klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> because of event <%s> and action <%s> of Job <%s>", delayAct.action, delayAct.podName, currentDelayAction.event, currentDelayAction.action, delayAct.jobKey)
delayAct.cancel()
}
delete(m, delayAct.podName)
}
}
cc.delayActionMap[jobKey] = make(map[string]*delayAction)
}
}
3 changes: 3 additions & 0 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (cc *jobcontroller) addPod(obj interface{}) {
JobName: jobName,
JobUid: jobUid,
PodName: pod.Name,
PodUID: pod.UID,

Event: bus.PodPendingEvent,
JobVersion: int32(dVersion),
Expand Down Expand Up @@ -302,6 +303,7 @@ func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) {
JobUid: jobUid,
TaskName: taskName,
PodName: newPod.Name,
PodUID: newPod.UID,

Event: event,
ExitCode: exitCode,
Expand Down Expand Up @@ -371,6 +373,7 @@ func (cc *jobcontroller) deletePod(obj interface{}) {
JobUid: jobUid,
TaskName: taskName,
PodName: pod.Name,
PodUID: pod.UID,

Event: bus.PodEvictedEvent,
JobVersion: int32(dVersion),
Expand Down
25 changes: 25 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func applyPolicies(job *batch.Job, req *apis.Request) (delayAct *delayAction) {
event: req.Event,
taskName: req.TaskName,
podName: req.PodName,
podUID: req.PodUID,
// default action is sync job
action: v1alpha1.SyncJobAction,
}
Expand Down Expand Up @@ -436,3 +437,27 @@ func GetStateAction(delayAct *delayAction) state.Action {

return action
}

type ActionType int

const (
JobAction ActionType = iota
TaskAction
PodAction
)

func GetActionType(action v1alpha1.Action) ActionType {
switch action {
case v1alpha1.AbortJobAction,
v1alpha1.RestartJobAction,
v1alpha1.TerminateJobAction,
v1alpha1.CompleteJobAction,
v1alpha1.ResumeJobAction:
return JobAction
case v1alpha1.RestartTaskAction:
return TaskAction
case v1alpha1.RestartPodAction:
return PodAction
}
return JobAction
}

0 comments on commit 1c60d46

Please sign in to comment.