Skip to content

Commit

Permalink
Merge pull request #874 from minrk/safer-killpg
Browse files Browse the repository at this point in the history
Only kill children in process group at shutdown
  • Loading branch information
Carreau authored Feb 21, 2022
2 parents 78c83ad + 5c16fde commit dcde5cf
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 68 deletions.
12 changes: 8 additions & 4 deletions ipykernel/debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

from .compiler import (get_file_name, get_tmp_directory, get_tmp_hash_seed)

# This import is required to have the next ones working...
from debugpy.server import api # noqa
from _pydevd_bundle import pydevd_frame_utils
from _pydevd_bundle.pydevd_suspended_frames import SuspendedFramesManager, _FramesTracker
try:
# This import is required to have the next ones working...
from debugpy.server import api # noqa
from _pydevd_bundle import pydevd_frame_utils
from _pydevd_bundle.pydevd_suspended_frames import SuspendedFramesManager, _FramesTracker
_is_debugpy_available = True
except ImportError:
_is_debugpy_available = False

# Required for backwards compatiblity
ROUTING_ID = getattr(zmq, 'ROUTING_ID', None) or zmq.IDENTITY
Expand Down
7 changes: 1 addition & 6 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .zmqshell import ZMQInteractiveShell
from .eventloops import _use_appnope
from .compiler import XCachingCompiler
from .debugger import Debugger, _is_debugpy_available

try:
from IPython.core.interactiveshell import _asyncio_runner
Expand All @@ -33,12 +34,6 @@
except ImportError:
_use_experimental_60_completion = False

try:
import debugpy
from .debugger import Debugger
_is_debugpy_available = True
except ImportError:
_is_debugpy_available = False

_EXPERIMENTAL_KEY_NAME = '_jupyter_types_experimental'

Expand Down
101 changes: 46 additions & 55 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
SIGKILL = "windown-SIGKILL-sentinel"


try:
import psutil
except ImportError:
psutil = None


try:
Expand All @@ -37,6 +33,7 @@
# jupyter_client < 5, use local now()
now = datetime.now

import psutil
import zmq
from IPython.core.error import StdinNotImplementedError
from jupyter_client.session import Session
Expand Down Expand Up @@ -808,7 +805,8 @@ def _send_interupt_children(self):
pid = os.getpid()
pgid = os.getpgid(pid)
# Prefer process-group over process
if pgid and hasattr(os, "killpg"):
# but only if the kernel is the leader of the process group
if pgid and pgid == pid and hasattr(os, "killpg"):
try:
os.killpg(pgid, SIGINT)
return
Expand Down Expand Up @@ -897,8 +895,6 @@ async def usage_request(self, stream, ident, parent):
reply_content = {
'hostname': socket.gethostname()
}
if psutil is None:
return reply_content
current_process = psutil.Process()
all_processes = [current_process] + current_process.children(recursive=True)
process_metric_value = self.get_process_metric_value
Expand Down Expand Up @@ -1136,67 +1132,62 @@ def _input_request(self, prompt, ident, parent, password=False):
raise EOFError
return value

def _killpg(self, signal):
def _signal_children(self, signum):
"""
similar to killpg but use psutil if it can on windows
or if pgid is none
Send a signal to all our children
Like `killpg`, but does not include the current process
(or possible parents).
"""
pgid = os.getpgid(os.getpid())
if pgid and hasattr(os, "killpg"):
for p in self._process_children():
self.log.debug(f"Sending {Signals(signum)!r} to subprocess {p}")
try:
os.killpg(pgid, signal)
except (OSError) as e:
self.log.exception(f"OSError running killpg, not killing children.")
return
elif psutil is not None:
children = parent.children(recursive=True)
for p in children:
try:
if signal == SIGTERM:
p.terminate()
elif signal == SIGKILL:
p.kill()
except psutil.NoSuchProcess:
pass
if signum == SIGTERM:
p.terminate()
elif signum == SIGKILL:
p.kill()
else:
p.send_signal(signum)
except psutil.NoSuchProcess:
pass

async def _progressively_terminate_all_children(self):
def _process_children(self):
"""Retrieve child processes in the kernel's process group
pgid = os.getpgid(os.getpid())
if psutil is None:
# blindly send quickly sigterm/sigkill to processes if psutil not there.
self.log.info("Please install psutil for a cleaner subprocess shutdown.")
self._send_interupt_children()
await asyncio.sleep(0.05)
self.log.debug("Sending SIGTERM to {pgid}")
self._killpg(SIGTERM)
await asyncio.sleep(0.05)
self.log.debug("Sending SIGKILL to {pgid}")
self._killpg(pgid, SIGKILL)
Avoids:
- including parents and self with killpg
- including all children that may have forked-off a new group
"""
kernel_process = psutil.Process()
all_children = kernel_process.children(recursive=True)
if os.name == "nt":
return all_children
kernel_pgid = os.getpgrp()
process_group_children = []
for child in all_children:
try:
child_pgid = os.getpgid(child.pid)
except OSError:
pass
else:
if child_pgid == kernel_pgid:
process_group_children.append(child)
return process_group_children

async def _progressively_terminate_all_children(self):
sleeps = (0.01, 0.03, 0.1, 0.3, 1, 3, 10)
children = psutil.Process().children(recursive=True)
if not children:
if not self._process_children():
self.log.debug("Kernel has no children.")
return
self.log.debug(f"Trying to interrupt then kill subprocesses : {children}")
self._send_interupt_children()

for signum in (SIGTERM, SIGKILL):
self.log.debug(
f"Will try to send {signum} ({Signals(signum)!r}) to subprocesses :{children}"
)
for delay in sleeps:
children = psutil.Process().children(recursive=True)
try:
if not children:
self.log.warning(
"No more children, continuing shutdown routine."
)
return
except psutil.NoSuchProcess:
pass
self._killpg(15)
children = self._process_children()
if not children:
self.log.debug("No more children, continuing shutdown routine.")
return
# signals only children, not current process
self._signal_children(signum)
self.log.debug(
f"Will sleep {delay}s before checking for children and retrying. {children}"
)
Expand Down
2 changes: 1 addition & 1 deletion ipykernel/kernelspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from jupyter_client.kernelspec import KernelSpecManager

from .ipkernel import _is_debugpy_available
from .debugger import _is_debugpy_available

pjoin = os.path.join

Expand Down
76 changes: 75 additions & 1 deletion ipykernel/tests/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import ast
import os.path
import platform
import signal
import subprocess
import sys
import time
from subprocess import Popen
from tempfile import TemporaryDirectory

from flaky import flaky
import psutil
import pytest
from packaging import version

import IPython
from IPython.paths import locate_profile
Expand Down Expand Up @@ -496,3 +498,75 @@ def test_control_thread_priority():
# comparing first to last ought to be enough, since queues preserve order
# use <= in case of very-fast handling and/or low resolution timers
assert control_dates[-1] <= shell_dates[0]


def _child():
print("in child", os.getpid())

def _print_and_exit(sig, frame):
print(f"Received signal {sig}")
# take some time so retries are triggered
time.sleep(0.5)
sys.exit(-sig)

signal.signal(signal.SIGTERM, _print_and_exit)
time.sleep(30)


def _start_children():
ip = IPython.get_ipython()
ns = ip.user_ns

cmd = [sys.executable, "-c", f"from {__name__} import _child; _child()"]
child_pg = Popen(cmd, start_new_session=False)
child_newpg = Popen(cmd, start_new_session=True)
ns["pid"] = os.getpid()
ns["child_pg"] = child_pg.pid
ns["child_newpg"] = child_newpg.pid
# give them time to start up and register signal handlers
time.sleep(1)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy",
reason="does not work on PyPy",
)
def test_shutdown_subprocesses():
"""Kernel exits after polite shutdown_request"""
with new_kernel() as kc:
km = kc.parent
msg_id, reply = execute(
f"from {__name__} import _start_children\n_start_children()",
kc=kc,
user_expressions={
"pid": "pid",
"child_pg": "child_pg",
"child_newpg": "child_newpg",
},
)
print(reply)
expressions = reply["user_expressions"]
kernel_process = psutil.Process(int(expressions["pid"]["data"]["text/plain"]))
child_pg = psutil.Process(int(expressions["child_pg"]["data"]["text/plain"]))
child_newpg = psutil.Process(
int(expressions["child_newpg"]["data"]["text/plain"])
)
wait_for_idle(kc)

kc.shutdown()
for i in range(300): # 30s timeout
if km.is_alive():
time.sleep(0.1)
else:
break
assert not km.is_alive()
assert not kernel_process.is_running()
# child in the process group shut down
assert not child_pg.is_running()
# child outside the process group was not shut down (unix only)
if os.name != 'nt':
assert child_newpg.is_running()
try:
child_newpg.terminate()
except psutil.NoSuchProcess:
pass
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def run(self):
'tornado>=4.2,<7.0',
'matplotlib-inline>=0.1.0,<0.2.0',
'appnope;platform_system=="Darwin"',
'psutil;platform_system=="Windows"',
'psutil',
'nest_asyncio',
],
extras_require={
Expand Down

0 comments on commit dcde5cf

Please sign in to comment.