From adbdb77daac02c6d4f28820801ea3ebdb8b7be3a Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Wed, 16 Feb 2022 17:53:21 +0100 Subject: [PATCH 1/7] BUG: Kill subprocesses on shutdown. Fixes #jupyter/jupyter_client#104 This should make sure we properly cull all subprocesses at shutdown, it does change one of the private method from sync to async in order to no user time.sleep or thread so this may affect subclasses, though I doubt it. It's also not completely clear to me whether this works on windows as SIGINT I belove is not a thing. Regardless as this affects things like dask, and others that are mostly on unix, it should be an improvement. It does the following, stopping as soon as it does not find any more children to current process. - Send sigint to everything - Immediately send sigterm in look with an exponential backoff from 0.01 to 1 second roughtly multiplying the delay until next send by 3 each time. - Switch to sending sigkill with same backoff. There is no delay after sigint, as this is just a courtesy. The delays backoff are not configurable. I can imagine that on slow systems it may make sens --- ipykernel/kernelbase.py | 89 ++++++++++++++++++++++++++++++++--------- 1 file changed, 71 insertions(+), 18 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index c1494ffbb..497c6a2f7 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -5,23 +5,26 @@ import asyncio import concurrent.futures -from datetime import datetime -from functools import partial +import inspect import itertools import logging -import inspect import os -from signal import signal, default_int_handler, SIGINT -import sys import socket +import sys import time import uuid import warnings +from datetime import datetime +from functools import partial +from signal import (SIGINT, SIGKILL, SIGTERM, Signals, default_int_handler, + signal) + try: import psutil except ImportError: psutil = None + try: # jupyter_client >= 5, use tz-aware now from jupyter_client.session import utcnow as now @@ -29,20 +32,17 @@ # jupyter_client < 5, use local now() now = datetime.now +import zmq +from IPython.core.error import StdinNotImplementedError +from jupyter_client.session import Session from tornado import ioloop from tornado.queues import Queue, QueueEmpty -import zmq +from traitlets import (Any, Bool, Dict, Float, Instance, Integer, List, Set, + Unicode, default, observe) +from traitlets.config.configurable import SingletonConfigurable from zmq.eventloop.zmqstream import ZMQStream -from traitlets.config.configurable import SingletonConfigurable -from IPython.core.error import StdinNotImplementedError from ipykernel.jsonutil import json_clean -from traitlets import ( - Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, - observe, default -) - -from jupyter_client.session import Session from ._version import kernel_protocol_version @@ -796,13 +796,13 @@ async def comm_info_request(self, stream, ident, parent): reply_content, parent, ident) self.log.debug("%s", msg) - async def interrupt_request(self, stream, ident, parent): + def _send_interupt_children(self): + pid = os.getpid() pgid = os.getpgid(pid) if os.name == "nt": self.log.error("Interrupt message not supported on Windows") - else: # Prefer process-group over process if pgid and hasattr(os, "killpg"): @@ -816,6 +816,8 @@ async def interrupt_request(self, stream, ident, parent): except OSError: pass + async def interrupt_request(self, stream, ident, parent): + self._send_interupt_children() content = parent['content'] self.session.send(stream, 'interrupt_reply', content, parent, ident=ident) return @@ -830,7 +832,7 @@ async def shutdown_request(self, stream, ident, parent): content, parent ) - self._at_shutdown() + await self._at_shutdown() self.log.debug('Stopping control ioloop') control_io_loop = self.control_stream.io_loop @@ -1131,9 +1133,60 @@ def _input_request(self, prompt, ident, parent, password=False): raise EOFError return value - def _at_shutdown(self): + async def _progressively_terminate_all_children(self): + + pgid = os.getpgid(os.getpid()) + if not pgid: + self.log.warning(f"No Pgid ({pgid=}), not trying to stop subprocesses.") + return + if psutil is None: + # blindly send quickly sigterm/sigkill to processes if psutil not there. + self.log.warning( + f"Please install psutil for a cleaner subprocess shutdown." + ) + self._send_interupt_children() + try: + await asyncio.sleep(0.05) + self.log.debug("Sending SIGTERM to {pgid=}") + os.killpg(pgid, SIGTERM) + await asyncio.sleep(0.05) + self.log.debug("Sending SIGKILL to {pgid=}") + os.killpg(pgid, SIGKILL) + except Exception: + self.log.exception("Exception during subprocesses termination") + return + + sleeps = (0.01, 0.03, 0.1, 0.3, 1) + children = psutil.Process().children(recursive=True) + if not children: + self.log.debug("Kernel has no children.") + return + self.log.debug(f"Trying to interrupt then kill subprocesses : {children=}") + self._send_interupt_children() + for signum in (SIGTERM, SIGKILL): + self.log.debug( + f"Will try to send {signum} ({Signals(signum)}) to subprocesses :{children}" + ) + for delay in sleeps: + children = psutil.Process().children(recursive=True) + if not children: + self.log.debug("No more children, continuing shutdown routine.") + return + if pgid and hasattr(os, "killpg"): + try: + os.killpg(pgid, signum) + except OSError: + self.log.warning("OSError running killpg, not killing children") + return + self.log.debug( + f"Will sleep {delay}s before checking for children and retrying." + ) + await ascynio.sleep(delay) + + async def _at_shutdown(self): """Actions taken at shutdown by the kernel, called by python's atexit. """ + await self._progressively_terminate_all_children() if self._shutdown_message is not None: self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) self.log.debug("%s", self._shutdown_message) From 106b57a924d139bcdff186c3d0ee21e546b323d2 Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Thu, 17 Feb 2022 12:59:13 +0100 Subject: [PATCH 2/7] Fix windows --- ipykernel/kernelbase.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 497c6a2f7..88b926c90 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -16,8 +16,13 @@ import warnings from datetime import datetime from functools import partial -from signal import (SIGINT, SIGKILL, SIGTERM, Signals, default_int_handler, - signal) +from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal + +if sys.platform != "win32": + from signal import SIGKILL +else: + SIGKILL = None + try: import psutil @@ -1149,9 +1154,10 @@ async def _progressively_terminate_all_children(self): await asyncio.sleep(0.05) self.log.debug("Sending SIGTERM to {pgid=}") os.killpg(pgid, SIGTERM) - await asyncio.sleep(0.05) - self.log.debug("Sending SIGKILL to {pgid=}") - os.killpg(pgid, SIGKILL) + if sys.platform != "win32": + await asyncio.sleep(0.05) + self.log.debug("Sending SIGKILL to {pgid=}") + os.killpg(pgid, SIGKILL) except Exception: self.log.exception("Exception during subprocesses termination") return @@ -1163,7 +1169,12 @@ async def _progressively_terminate_all_children(self): return self.log.debug(f"Trying to interrupt then kill subprocesses : {children=}") self._send_interupt_children() - for signum in (SIGTERM, SIGKILL): + if sys.platform != "win32": + sigs = (SIGTERM, SIGKILL) + else: + sigs = SIGTERM + + for signum in sigs: self.log.debug( f"Will try to send {signum} ({Signals(signum)}) to subprocesses :{children}" ) From f24a98958eccba742b2d9a69190fe586bd33000b Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Thu, 17 Feb 2022 13:00:13 +0100 Subject: [PATCH 3/7] fix 37 --- ipykernel/kernelbase.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 88b926c90..51ebef516 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -1142,21 +1142,19 @@ async def _progressively_terminate_all_children(self): pgid = os.getpgid(os.getpid()) if not pgid: - self.log.warning(f"No Pgid ({pgid=}), not trying to stop subprocesses.") + self.log.warning(f"No Pgid ({pgid}), not trying to stop subprocesses.") return if psutil is None: # blindly send quickly sigterm/sigkill to processes if psutil not there. - self.log.warning( - f"Please install psutil for a cleaner subprocess shutdown." - ) + self.log.debug("Please install psutil for a cleaner subprocess shutdown.") self._send_interupt_children() try: await asyncio.sleep(0.05) - self.log.debug("Sending SIGTERM to {pgid=}") + self.log.debug("Sending SIGTERM to {pgid}") os.killpg(pgid, SIGTERM) if sys.platform != "win32": await asyncio.sleep(0.05) - self.log.debug("Sending SIGKILL to {pgid=}") + self.log.debug("Sending SIGKILL to {pgid}") os.killpg(pgid, SIGKILL) except Exception: self.log.exception("Exception during subprocesses termination") @@ -1167,7 +1165,7 @@ async def _progressively_terminate_all_children(self): if not children: self.log.debug("Kernel has no children.") return - self.log.debug(f"Trying to interrupt then kill subprocesses : {children=}") + self.log.debug(f"Trying to interrupt then kill subprocesses : {children}") self._send_interupt_children() if sys.platform != "win32": sigs = (SIGTERM, SIGKILL) From 7d7961e037ffb77c230f30ed037f1c1bb9e1439f Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Thu, 17 Feb 2022 19:59:52 +0100 Subject: [PATCH 4/7] need to be even stricter on windows --- ipykernel/kernelbase.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 51ebef516..49c205014 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -802,13 +802,11 @@ async def comm_info_request(self, stream, ident, parent): self.log.debug("%s", msg) def _send_interupt_children(self): - - pid = os.getpid() - pgid = os.getpgid(pid) - if os.name == "nt": self.log.error("Interrupt message not supported on Windows") else: + pid = os.getpid() + pgid = os.getpgid(pid) # Prefer process-group over process if pgid and hasattr(os, "killpg"): try: @@ -1139,6 +1137,9 @@ def _input_request(self, prompt, ident, parent, password=False): return value async def _progressively_terminate_all_children(self): + if sys.platform != "win32": + self.log.info(f"Terminating subprocesses not yet supported on windows.") + return pgid = os.getpgid(os.getpid()) if not pgid: @@ -1152,10 +1153,9 @@ async def _progressively_terminate_all_children(self): await asyncio.sleep(0.05) self.log.debug("Sending SIGTERM to {pgid}") os.killpg(pgid, SIGTERM) - if sys.platform != "win32": - await asyncio.sleep(0.05) - self.log.debug("Sending SIGKILL to {pgid}") - os.killpg(pgid, SIGKILL) + await asyncio.sleep(0.05) + self.log.debug("Sending SIGKILL to {pgid}") + os.killpg(pgid, SIGKILL) except Exception: self.log.exception("Exception during subprocesses termination") return @@ -1167,12 +1167,8 @@ async def _progressively_terminate_all_children(self): return self.log.debug(f"Trying to interrupt then kill subprocesses : {children}") self._send_interupt_children() - if sys.platform != "win32": - sigs = (SIGTERM, SIGKILL) - else: - sigs = SIGTERM - for signum in sigs: + for signum in (SIGTERM, SIGKILL): self.log.debug( f"Will try to send {signum} ({Signals(signum)}) to subprocesses :{children}" ) From 0ccc5914847b63145df8343662fff00d3685d077 Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Thu, 17 Feb 2022 20:42:51 +0100 Subject: [PATCH 5/7] Update ipykernel/kernelbase.py Co-authored-by: Steven Silvester --- ipykernel/kernelbase.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 49c205014..e888ae26a 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -1137,7 +1137,7 @@ def _input_request(self, prompt, ident, parent, password=False): return value async def _progressively_terminate_all_children(self): - if sys.platform != "win32": + if sys.platform == "win32": self.log.info(f"Terminating subprocesses not yet supported on windows.") return From 8518d6f0ea6ddc2edd6259f77affe066d9ff3cee Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Fri, 18 Feb 2022 14:56:00 +0100 Subject: [PATCH 6/7] add psutils on windows and try to kill children --- ipykernel/kernelbase.py | 38 +++++++++++++++++++++++++++++--------- setup.py | 1 + 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index e888ae26a..275a2078e 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -21,7 +21,7 @@ if sys.platform != "win32": from signal import SIGKILL else: - SIGKILL = None + SIGKILL = "windown-SIGKILL-sentinel" try: @@ -1136,6 +1136,31 @@ def _input_request(self, prompt, ident, parent, password=False): raise EOFError return value + + async def _killpg(self, *, signal): + """ + similar to killpg but use psutil if it can on windows + or if pgid is none + + """ + pgid = os.getpgid(os.getpid()) + if pgid and hasattr(os, "killpg"): + try: + os.killpg(pgid, signal) + except OSError: + self.log.warning("OSError running killpg, not killing children") + return + elif psutil is not None: + children = parent.children(recursive=True) + for p in children: + try: + if signal == SIGTERM: + p.terminate() + elif signal == SIGKILL: + p.kill() + except psutil.NoSuchProcess: + pass + async def _progressively_terminate_all_children(self): if sys.platform == "win32": self.log.info(f"Terminating subprocesses not yet supported on windows.") @@ -1152,10 +1177,10 @@ async def _progressively_terminate_all_children(self): try: await asyncio.sleep(0.05) self.log.debug("Sending SIGTERM to {pgid}") - os.killpg(pgid, SIGTERM) + self._killpg(SIGTERM) await asyncio.sleep(0.05) self.log.debug("Sending SIGKILL to {pgid}") - os.killpg(pgid, SIGKILL) + self._killpg(pgid, SIGKILL) except Exception: self.log.exception("Exception during subprocesses termination") return @@ -1177,12 +1202,7 @@ async def _progressively_terminate_all_children(self): if not children: self.log.debug("No more children, continuing shutdown routine.") return - if pgid and hasattr(os, "killpg"): - try: - os.killpg(pgid, signum) - except OSError: - self.log.warning("OSError running killpg, not killing children") - return + self._killpg(signum) self.log.debug( f"Will sleep {delay}s before checking for children and retrying." ) diff --git a/setup.py b/setup.py index 95dffbc66..e7af2a86c 100644 --- a/setup.py +++ b/setup.py @@ -68,6 +68,7 @@ def run(self): 'tornado>=4.2,<7.0', 'matplotlib-inline>=0.1.0,<0.2.0', 'appnope;platform_system=="Darwin"', + 'psutil;platform_system=="Windows"', 'nest_asyncio', ], extras_require={ From aaad57563197dd36456506dc642f0ee4db3ae0d8 Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Fri, 18 Feb 2022 15:35:22 +0100 Subject: [PATCH 7/7] handle windows --- ipykernel/kernelbase.py | 73 +++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 275a2078e..7a7fcde45 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -1136,8 +1136,7 @@ def _input_request(self, prompt, ident, parent, password=False): raise EOFError return value - - async def _killpg(self, *, signal): + def _killpg(self, signal): """ similar to killpg but use psutil if it can on windows or if pgid is none @@ -1147,8 +1146,8 @@ async def _killpg(self, *, signal): if pgid and hasattr(os, "killpg"): try: os.killpg(pgid, signal) - except OSError: - self.log.warning("OSError running killpg, not killing children") + except (OSError) as e: + self.log.exception(f"OSError running killpg, not killing children.") return elif psutil is not None: children = parent.children(recursive=True) @@ -1162,30 +1161,20 @@ async def _killpg(self, *, signal): pass async def _progressively_terminate_all_children(self): - if sys.platform == "win32": - self.log.info(f"Terminating subprocesses not yet supported on windows.") - return pgid = os.getpgid(os.getpid()) - if not pgid: - self.log.warning(f"No Pgid ({pgid}), not trying to stop subprocesses.") - return if psutil is None: # blindly send quickly sigterm/sigkill to processes if psutil not there. - self.log.debug("Please install psutil for a cleaner subprocess shutdown.") + self.log.info("Please install psutil for a cleaner subprocess shutdown.") self._send_interupt_children() - try: - await asyncio.sleep(0.05) - self.log.debug("Sending SIGTERM to {pgid}") - self._killpg(SIGTERM) - await asyncio.sleep(0.05) - self.log.debug("Sending SIGKILL to {pgid}") - self._killpg(pgid, SIGKILL) - except Exception: - self.log.exception("Exception during subprocesses termination") - return - - sleeps = (0.01, 0.03, 0.1, 0.3, 1) + await asyncio.sleep(0.05) + self.log.debug("Sending SIGTERM to {pgid}") + self._killpg(SIGTERM) + await asyncio.sleep(0.05) + self.log.debug("Sending SIGKILL to {pgid}") + self._killpg(pgid, SIGKILL) + + sleeps = (0.01, 0.03, 0.1, 0.3, 1, 3, 10) children = psutil.Process().children(recursive=True) if not children: self.log.debug("Kernel has no children.") @@ -1195,24 +1184,38 @@ async def _progressively_terminate_all_children(self): for signum in (SIGTERM, SIGKILL): self.log.debug( - f"Will try to send {signum} ({Signals(signum)}) to subprocesses :{children}" + f"Will try to send {signum} ({Signals(signum)!r}) to subprocesses :{children}" ) for delay in sleeps: children = psutil.Process().children(recursive=True) - if not children: - self.log.debug("No more children, continuing shutdown routine.") - return - self._killpg(signum) + try: + if not children: + self.log.warning( + "No more children, continuing shutdown routine." + ) + return + except psutil.NoSuchProcess: + pass + self._killpg(15) self.log.debug( - f"Will sleep {delay}s before checking for children and retrying." + f"Will sleep {delay}s before checking for children and retrying. {children}" ) - await ascynio.sleep(delay) + await asyncio.sleep(delay) async def _at_shutdown(self): """Actions taken at shutdown by the kernel, called by python's atexit. """ - await self._progressively_terminate_all_children() - if self._shutdown_message is not None: - self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) - self.log.debug("%s", self._shutdown_message) - self.control_stream.flush(zmq.POLLOUT) + try: + await self._progressively_terminate_all_children() + except Exception as e: + self.log.exception("Exception during subprocesses termination %s", e) + + finally: + if self._shutdown_message is not None: + self.session.send( + self.iopub_socket, + self._shutdown_message, + ident=self._topic("shutdown"), + ) + self.log.debug("%s", self._shutdown_message) + self.control_stream.flush(zmq.POLLOUT)