Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coral2-dws: workflow resource can become stranded with a finalizer when job is canceled #165

Closed
garlick opened this issue May 23, 2024 · 2 comments · Fixed by #168
Closed

Comments

@garlick
Copy link
Member

garlick commented May 23, 2024

Problem: (reported in slack by Dean R.) : "Last week we noticed if we cancelled a job early flux-cora2-dws could leave a Workflow resource stranded with a finalizer. Right now on elcap1 we're running a fix that has held up to early cancellation and we've run it under stress when there is poor connectivity to the kubeapiserver. it is holding up much better now."

This is the diff (relative to 0.12.0):

--- /usr/bin/coral2_dws.py-orig	2024-05-16 12:26:43.291189496 -0700
+++ /usr/bin/coral2_dws.py	2024-05-22 12:12:55.412562876 -0700
@@ -10,7 +10,6 @@
 import os
 import syslog
 import json
-import re
 import functools
 import argparse
 import logging
@@ -18,6 +17,7 @@
 import time
 import pathlib
 import math
+import sys
 
 import kubernetes as k8s
 from kubernetes.client.rest import ApiException
@@ -68,7 +68,7 @@
     try:
         msg = rpc.get()
     except Exception as exc:
-        LOGGER.warning("RPC error %s", str(exc))
+        LOGGER.warning("RPC error %s: (type=%s)", str(exc), type(exc))
     else:
         if msg is not None:
             LOGGER.debug("RPC response was %s", msg)
@@ -111,13 +111,28 @@
             handle.log(syslog.LOG_ERR, f"{os.path.basename(__file__)}: {errstr}
")
             handle.respond(msg, {"success": False, "errstr": errstr})
             LOGGER.error(
-                "Error in responding to %s RPC for %s: %s", topic, jobid, errst
r
+                "Error in responding to %s RPC for %s: %s: (type=%s)", topic, j
obid, errstr, type(exc)
             )
         else:
             handle.respond(msg, {"success": True})
 
     return wrapper
 
+def remove_finalizer(workflow_name, k8s_api, workflow=None):
+    """Remove the finalizer from the workflow so it can be deleted."""
+    if workflow is None:
+        workflow = k8s_api.get_namespaced_custom_object(*WORKFLOW_CRD, workflow
_name)
+
+    try:
+        workflow["metadata"]["finalizers"].remove(_FINALIZER)
+    except ValueError:
+        pass
+    else:
+        k8s_api.patch_namespaced_custom_object(
+            *WORKFLOW_CRD,
+            workflow_name,
+            {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}},
+        )
 
 def move_workflow_desiredstate(workflow_name, desiredstate, k8s_api):
     """Helper function for moving workflow to a desiredState."""
@@ -292,6 +307,8 @@
         # the job hit an exception before beginning to run; transition
         # the workflow immediately to 'teardown'
         move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
+        # Remove the finalizer so the resource can be deleted.
+        remove_finalizer(winfo.name, k8s_api, workflow=None)
         winfo.toredown = True
     else:
         move_workflow_desiredstate(winfo.name, "PostRun", k8s_api)
@@ -304,6 +321,11 @@
         and workflow["status"]["ready"]
     )
 
+def state_active(workflow, state):
+    """Helper function for checking whether a workflow is working on a given st
ate."""
+    return (
+        workflow["spec"]["desiredState"] == workflow["status"]["state"] == stat
e
+    )
 
 def workflow_state_change_cb(event, handle, k8s_api):
     """Exception-catching wrapper around _workflow_state_change_cb_inner."""
@@ -324,17 +346,18 @@
         return
     try:
         _workflow_state_change_cb_inner(workflow, jobid, winfo, handle, k8s_api
)
-    except Exception:
+    except Exception as exc:
         LOGGER.exception(
-            "Failed to process event update for workflow with jobid %s:", jobid
+            "Failed to process event update for workflow with jobid %s: (type=%
s)", jobid, type(exc)
         )
         try:
             move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
-        except ApiException:
+            # Remove the finalizer so the resource can be deleted.
+            remove_finalizer(winfo.name, k8s_api, workflow=workflow)
+        except ApiException as exc2:
             LOGGER.exception(
-                "Failed to move workflow with jobid %s to 'teardown' "
-                "state after error: ",
-                jobid,
+                "Failed to move workflow with jobid %s to 'teardown' (type=%s)"
,
+                jobid, type(exc2)
             )
         else:
             winfo.toredown = True
@@ -349,18 +372,15 @@
     if winfo.deleted:
         # deletion request has been submitted, nothing to do
         return
-    if state_complete(workflow, "Teardown"):
-        # delete workflow object and tell DWS jobtap plugin that the job is don
e
-        try:
-            workflow["metadata"]["finalizers"].remove(_FINALIZER)
-        except ValueError:
-            pass
-        else:
-            k8s_api.patch_namespaced_custom_object(
-                *WORKFLOW_CRD,
-                winfo.name,
-                {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}
},
-            )
+    if state_active(workflow, "Teardown") and not state_complete(workflow, "Tea
rdown"):
+        # Remove the finalizer as soon as the workflow begins working on its
+        # teardown state.
+        remove_finalizer(winfo.name, k8s_api, workflow=workflow)
+    elif state_complete(workflow, "Teardown"):
+        # Delete workflow object and tell DWS jobtap plugin that the job is don
e.
+        # Attempt to remove the finalizer again in case the state transitioned
+        # too quickly for it to be noticed earlier.
+        remove_finalizer(winfo.name, k8s_api, workflow=workflow)
         k8s_api.delete_namespaced_custom_object(*WORKFLOW_CRD, winfo.name)
         winfo.deleted = True
         handle.rpc("job-manager.dws.epilog-remove", payload={"id": jobid}).then
(
@roehrich-hpe
Copy link
Collaborator

See also #159

@roehrich-hpe
Copy link
Collaborator

Today (May 31) there's a new version of the plugin installed on elcap1. Here's an updated version of my patch for that new plugin. This is not yet tested:

--- elcap-coral2_dws.py-20240531	2024-05-31 12:56:32
+++ elcap-coral2_dws.py-20240531-mine	2024-05-31 13:21:17
@@ -11,7 +11,6 @@
 import sys
 import syslog
 import json
-import re
 import functools
 import argparse
 import logging
@@ -118,6 +117,23 @@
             handle.respond(msg, {"success": True})
 
     return wrapper
+
+
+def remove_finalizer(workflow_name, k8s_api, workflow=None):
+    """Remove the finalizer from the workflow so it can be deleted."""
+    if workflow is None:
+        workflow = k8s_api.get_namespaced_custom_object(*WORKFLOW_CRD, workflow_name)
+
+    try:
+        workflow["metadata"]["finalizers"].remove(_FINALIZER)
+    except ValueError:
+        pass
+    else:
+        k8s_api.patch_namespaced_custom_object(
+            *WORKFLOW_CRD,
+            workflow_name,
+            {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}},
+        )
 
 
 def move_workflow_desiredstate(workflow_name, desiredstate, k8s_api):
@@ -293,6 +309,8 @@
         # the job hit an exception before beginning to run; transition
         # the workflow immediately to 'teardown'
         move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
+        # Remove the finalizer so the resource can be deleted.
+        remove_finalizer(winfo.name, k8s_api, workflow=None)
         winfo.toredown = True
     else:
         move_workflow_desiredstate(winfo.name, "PostRun", k8s_api)
@@ -305,6 +323,13 @@
         and workflow["status"]["ready"]
     )
 
+
+def state_active(workflow, state):
+    """Helper function for checking whether a workflow is working on a given state."""
+    return (
+        workflow["spec"]["desiredState"] == workflow["status"]["state"] == state
+    )
+ 
 
 def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion):
     """Exception-catching wrapper around _workflow_state_change_cb_inner."""
@@ -333,6 +358,8 @@
         )
         try:
             move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
+            # Remove the finalizer so the resource can be deleted.
+            remove_finalizer(winfo.name, k8s_api, workflow=workflow)
         except ApiException:
             LOGGER.exception(
                 "Failed to move workflow with jobid %s to 'teardown' "
@@ -354,18 +381,15 @@
     if winfo.deleted:
         # deletion request has been submitted, nothing to do
         return
-    if state_complete(workflow, "Teardown"):
-        # delete workflow object and tell DWS jobtap plugin that the job is done
-        try:
-            workflow["metadata"]["finalizers"].remove(_FINALIZER)
-        except ValueError:
-            pass
-        else:
-            k8s_api.patch_namespaced_custom_object(
-                *WORKFLOW_CRD,
-                winfo.name,
-                {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}},
-            )
+    if state_active(workflow, "Teardown") and not state_complete(workflow, "Teardown"):
+        # Remove the finalizer as soon as the workflow begins working on its
+        # teardown state.
+        remove_finalizer(winfo.name, k8s_api, workflow=workflow)
+    elif state_complete(workflow, "Teardown"):
+        # Delete workflow object and tell DWS jobtap plugin that the job is done.
+        # Attempt to remove the finalizer again in case the state transitioned
+        # too quickly for it to be noticed earlier.
+        remove_finalizer(winfo.name, k8s_api, workflow=workflow)
         k8s_api.delete_namespaced_custom_object(*WORKFLOW_CRD, winfo.name)
         winfo.deleted = True
         handle.rpc("job-manager.dws.epilog-remove", payload={"id": jobid}).then(
@@ -422,6 +446,8 @@
     elif state_complete(workflow, "DataOut"):
         # move workflow to next stage, teardown
         move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
+        # Remove the finalizer so the resource can be deleted.
+        remove_finalizer(winfo.name, k8s_api, workflow=workflow)
         winfo.toredown = True
     if workflow["status"].get("status") == "Error":
         # a fatal error has occurred

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants