From 89171d156417b76133da66ef132f10f97964153c Mon Sep 17 00:00:00 2001 From: qzhu Date: Fri, 27 Sep 2024 17:34:19 +0800 Subject: [PATCH 1/3] [YUNIKORN-2884] Task fail with post allocated but the pod will keep pending --- pkg/cache/scheduler_callback.go | 11 +++++------ pkg/cache/scheduler_callback_test.go | 4 +++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go index 643b4c719..d00853bc6 100644 --- a/pkg/cache/scheduler_callback.go +++ b/pkg/cache/scheduler_callback.go @@ -93,12 +93,11 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons // update cache callback.context.ForgetPod(release.GetAllocationKey()) - // TerminationType 0 mean STOPPED_BY_RM - if release.TerminationType != si.TerminationType_STOPPED_BY_RM { - // send release app allocation to application states machine - ev := NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType, release.AllocationKey) - dispatcher.Dispatch(ev) - } + // TerminationType 0 mean STOPPED_BY_RM, but we also need to do the release when task failed, + // we also should send release event to application in case task failed but the pod is still pending. + // send release app allocation to application states machine + ev := NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType, release.AllocationKey) + dispatcher.Dispatch(ev) } return nil diff --git a/pkg/cache/scheduler_callback_test.go b/pkg/cache/scheduler_callback_test.go index d98f5f6f7..619b58f5e 100644 --- a/pkg/cache/scheduler_callback_test.go +++ b/pkg/cache/scheduler_callback_test.go @@ -215,7 +215,9 @@ func TestUpdateAllocation_AllocationReleased_StoppedByRM(t *testing.T) { assert.NilError(t, err, "error updating allocation") assert.Assert(t, !context.schedulerCache.IsAssumedPod(taskUID1)) err = utils.WaitForCondition(deleteCalled.Load, 10*time.Millisecond, 500*time.Millisecond) - assert.Error(t, err, "timeout waiting for condition") // pod is not expected to be deleted + // Pod should be deleted, because TerminationType_STOPPED_BY_RM will also be called when task fail. + // If we don't delete the pod, the pod will be stuck in pending state. + assert.NilError(t, err, "pod has not been deleted") } func TestUpdateApplication_Accepted(t *testing.T) { From 5ea3ac059c0ccf3e6f333a4e7e26c7f683cc203b Mon Sep 17 00:00:00 2001 From: qzhu Date: Fri, 27 Sep 2024 19:07:04 +0800 Subject: [PATCH 2/3] Support deletion when it existed --- pkg/cache/application.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/cache/application.go b/pkg/cache/application.go index 8438957ba..ad111ae17 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -27,6 +27,7 @@ import ( "github.com/looplab/fsm" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/apache/yunikorn-k8shim/pkg/common" @@ -636,7 +637,19 @@ func (app *Application) handleReleaseAppAllocationEvent(taskID string, terminati if task, ok := app.taskMap[taskID]; ok { task.setTaskTerminationType(terminationType) - err := task.DeleteTaskPod() + pod := task.GetTaskPod() + // Get the pod first to check if it exists + // For example, users manually killed the pod, so the pod already deleted before release. + _, err := task.context.apiProvider.GetAPIs().KubeClient.Get(pod.Namespace, pod.Name) + if err != nil { + if k8serrors.IsNotFound(err) { + // Pod does not exist, no need to delete + log.Log(log.ShimCacheApplication).Info("pod not found, it already deleted or released") + } else { + log.Log(log.ShimCacheApplication).Error("failed to get pod", zap.Error(err)) + } + } + err = task.DeleteTaskPod() if err != nil { log.Log(log.ShimCacheApplication).Error("failed to release allocation from application", zap.Error(err)) } From 8b843cc4a8af4f1449bc30ecd9bb27b7b25667e5 Mon Sep 17 00:00:00 2001 From: qzhu Date: Fri, 27 Sep 2024 19:15:44 +0800 Subject: [PATCH 3/3] Fix --- pkg/cache/application.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/pkg/cache/application.go b/pkg/cache/application.go index ad111ae17..1aaecef4d 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -641,18 +641,16 @@ func (app *Application) handleReleaseAppAllocationEvent(taskID string, terminati // Get the pod first to check if it exists // For example, users manually killed the pod, so the pod already deleted before release. _, err := task.context.apiProvider.GetAPIs().KubeClient.Get(pod.Namespace, pod.Name) - if err != nil { - if k8serrors.IsNotFound(err) { - // Pod does not exist, no need to delete - log.Log(log.ShimCacheApplication).Info("pod not found, it already deleted or released") - } else { - log.Log(log.ShimCacheApplication).Error("failed to get pod", zap.Error(err)) + if err != nil && k8serrors.IsNotFound(err) { + // Pod does not exist, no need to delete + log.Log(log.ShimCacheApplication).Info("pod not found, it already deleted or released") + } else { + // Pod exists, delete it + err = task.DeleteTaskPod() + if err != nil { + log.Log(log.ShimCacheApplication).Error("failed to release allocation from application", zap.Error(err)) } } - err = task.DeleteTaskPod() - if err != nil { - log.Log(log.ShimCacheApplication).Error("failed to release allocation from application", zap.Error(err)) - } app.publishPlaceholderTimeoutEvents(task) } else { log.Log(log.ShimCacheApplication).Warn("task not found",