Skip to content

Commit

Permalink
Fix the restartPod action will cancel the restartJob delay action
Browse files Browse the repository at this point in the history
Signed-off-by: Box Zhang <[email protected]>
  • Loading branch information
bibibox committed Jan 17, 2025
1 parent bff438b commit 16b4725
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 8 deletions.
54 changes: 46 additions & 8 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (cc *jobcontroller) processNextReq(count uint32) bool {

// If the action is not an internal action, cancel all delayed actions
if !isInternalAction(delayAct.action) {
cc.cleanupDelayActions(delayAct.jobKey)
cc.cleanupDelayActions(delayAct)
}

return true
Expand All @@ -416,6 +416,11 @@ func (cc *jobcontroller) processNextReq(count uint32) bool {
// - cancel corresponding Pod Pending delayed action
// - if the event is PodRunning state, cancel corresponding Pod Failed and Pod Evicted delayed actions
func (cc *jobcontroller) CleanPodDelayActionsIfNeed(req apis.Request) {
// Skip cleaning delayed actions for non-pod events
if !cc.isPodEvent(req) {
return
}

if req.Event != busv1alpha1.PodPendingEvent {
key := jobcache.JobKeyByReq(&req)
cc.delayActionMapLock.Lock()
Expand All @@ -435,7 +440,7 @@ func (cc *jobcontroller) CleanPodDelayActionsIfNeed(req apis.Request) {
}

if shouldCancel {
klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> of Job <%s>", delayAct.action, req.PodName, delayAct.jobKey)
klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> because of event <%s> of Job <%s>", delayAct.action, req.PodName, req.Event, delayAct.jobKey)
delayAct.cancel()
delete(taskMap, req.PodName)
}
Expand All @@ -444,6 +449,13 @@ func (cc *jobcontroller) CleanPodDelayActionsIfNeed(req apis.Request) {
}
}

func (cc *jobcontroller) isPodEvent(req apis.Request) bool {
return req.Event == busv1alpha1.PodPendingEvent ||
req.Event == busv1alpha1.PodRunningEvent ||
req.Event == busv1alpha1.PodFailedEvent ||
req.Event == busv1alpha1.PodEvictedEvent
}

func (cc *jobcontroller) AddDelayActionForJob(req apis.Request, delayAct *delayAction) {
cc.delayActionMapLock.Lock()
defer cc.delayActionMapLock.Unlock()
Expand Down Expand Up @@ -490,7 +502,7 @@ func (cc *jobcontroller) AddDelayActionForJob(req apis.Request, delayAct *delayA

queue.Forget(req)

cc.cleanupDelayActions(delayAct.jobKey)
cc.cleanupDelayActions(delayAct)
}()
}

Expand All @@ -513,16 +525,42 @@ func (cc *jobcontroller) handleJobError(queue workqueue.TypedRateLimitingInterfa
req.Namespace, req.JobName, err)
}

func (cc *jobcontroller) cleanupDelayActions(jobKey string) {
// cleanupDelayActions cleans up delayed actions
// After a delayed action is executed, other delayed actions of the same type need to be cleaned up to avoid duplicate execution
// Parameters:
// - currentDelayAction: the delayed action that has just been executed
//
// Implementation logic:
// 1. Get the type of current delayed action (Job level, Task level or Pod level)
// 2. Iterate through all delayed actions under this Job
// 3. If the delayed action type matches, cancel it and remove from the map
//
// Usage scenarios:
// - When a Pod failure triggers Job termination, need to cancel other Job delayed actions under this Job
func (cc *jobcontroller) cleanupDelayActions(currentDelayAction *delayAction) {
cc.delayActionMapLock.Lock()
defer cc.delayActionMapLock.Unlock()

if m, exists := cc.delayActionMap[jobKey]; exists {
actionType := GetActionType(currentDelayAction.action)

if m, exists := cc.delayActionMap[currentDelayAction.jobKey]; exists {
for _, delayAct := range m {
if delayAct.cancel != nil {
delayAct.cancel()
if GetActionType(delayAct.action) == actionType {
// For Task level actions, only cancel delayed actions for the same task
if actionType == TaskAction && delayAct.taskName != currentDelayAction.taskName {
continue
}
// For Pod level actions, only cancel delayed actions for the same pod
if actionType == PodAction && delayAct.podName != currentDelayAction.podName {
continue
}

if delayAct.cancel != nil {
klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> because of event <%s> and action <%s> of Job <%s>", delayAct.action, delayAct.podName, currentDelayAction.event, currentDelayAction.action, delayAct.jobKey)
delayAct.cancel()
}
delete(m, delayAct.podName)
}
}
cc.delayActionMap[jobKey] = make(map[string]*delayAction)
}
}
24 changes: 24 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,27 @@ func GetStateAction(delayAct *delayAction) state.Action {

return action
}

type ActionType int

const (
JobAction ActionType = iota
TaskAction
PodAction
)

func GetActionType(action v1alpha1.Action) ActionType {
switch action {
case v1alpha1.AbortJobAction,
v1alpha1.RestartJobAction,
v1alpha1.TerminateJobAction,
v1alpha1.CompleteJobAction,
v1alpha1.ResumeJobAction:
return JobAction
case v1alpha1.RestartTaskAction:
return TaskAction
case v1alpha1.RestartPodAction:
return PodAction
}
return JobAction
}

0 comments on commit 16b4725

Please sign in to comment.