Skip to content

Commit

Permalink
allow custom stop_signal (add windows tests back)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenyu-ms committed Sep 27, 2024
1 parent 2aaebc3 commit da15631
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 133 deletions.
19 changes: 14 additions & 5 deletions testplan/common/remote/remote_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import re
import signal
import subprocess
import warnings
from typing import Optional

import rpyc
from rpyc import Connection
from schema import Use

from testplan.common.config import ConfigOption
from testplan.common.entity import Resource, ResourceConfig
Expand Down Expand Up @@ -43,7 +45,7 @@ def get_options(cls):
"name": str,
ConfigOption("rpyc_bin", default=RPYC_BIN): str,
ConfigOption("rpyc_port", default=0): int,
ConfigOption("sigint_timeout", default=5): int,
ConfigOption("stop_timeout", default=5): Use(float),
}


Expand All @@ -57,7 +59,7 @@ class RemoteService(Resource, RemoteResource):
:param rpyc_bin: Location of rpyc_classic.py script
:param rpyc_port: Specific port for rpyc connection on the remote host. Defaults to 0
which start the rpyc server on a random port.
:param sigint_timeout: number of seconds to wait between ``SIGINT`` and ``SIGKILL``
:param stop_timeout: Timeout of graceful shutdown (in seconds).
Also inherits all
:py:class:`~testplan.common.entity.base.Resource` and
Expand All @@ -72,11 +74,19 @@ def __init__(
remote_host: str,
rpyc_bin: str = RPYC_BIN,
rpyc_port: str = 0,
sigint_timeout: int = 5,
stop_timeout: float = 5,
**options,
) -> None:
options.update(self.filter_locals(locals()))
options["async_start"] = False
# ``sigint_timeout`` is deprecated
if "sigint_timeout" in options:
options["stop_timeout"] = options.pop("sigint_timeout")
warnings.warn(
"``sigint_timeout`` argument is deprecated, "
"please use ``stop_timeout`` instead.",
DeprecationWarning,
)
super(RemoteService, self).__init__(**options)

self.proc: Optional[subprocess.Popen] = None
Expand Down Expand Up @@ -239,7 +249,6 @@ def stopping(self) -> None:
# actually if remote rpyc server is shutdown, ssh proc is also finished
# but calling kill_process just in case
if self.proc:
kill_process(self.proc, self.cfg.sigint_timeout)
self.proc.wait()
kill_process(self.proc, self.cfg.stop_timeout)

self.status.change(self.STATUS.STOPPED)
61 changes: 41 additions & 20 deletions testplan/common/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import functools
import platform
import shlex
import subprocess
import threading
import time
import traceback
import warnings
from enum import Enum, auto
from signal import Signals
from typing import IO, Any, Callable, List, Union
from typing import IO, Any, Callable, List, Union, Optional

import psutil

Expand All @@ -29,9 +31,9 @@ def _log_proc(msg: Any, warn=False, output: IO = None):
def kill_process(
proc: subprocess.Popen,
timeout: int = 5,
signal_: Signals = None,
output: IO = None,
on_failed_termination: Callable[[int, int], None] = None,
signal_: Optional[Signals] = None,
output: Optional[IO] = None,
on_failed_termination: Optional[Callable[[int, int], None]] = None,
) -> Union[int, None]:
"""
If alive, kills the process.
Expand Down Expand Up @@ -63,6 +65,9 @@ def kill_process(
else:
proc.terminate()

if timeout == 0:
return retcode

sleeper = get_sleeper((0.05, 1), timeout=timeout)
while next(sleeper):
if retcode is None:
Expand All @@ -81,27 +86,19 @@ def kill_process(
except (RuntimeError, OSError) as error:
_log(msg="Could not kill process - {}".format(error), warn=True)

_, alive = psutil.wait_procs(child_procs, timeout=timeout)
for p in alive:
try:
p.kill()
except psutil.NoSuchProcess:
pass # already dead
except Exception as exc:
_log(
msg="While terminating child process - {}".format(exc),
warn=True,
)
def _log_child_exc(exc):
_log(msg="While killing child process - {}".format(exc), warn=True)

cleanup_child_procs(child_procs, timeout, _log_child_exc)
return proc.returncode


def kill_process_psutil(
proc: psutil.Process,
timeout: int = 5,
signal_: Signals = None,
output: IO = None,
on_failed_termination: Callable[[int, int], None] = None,
signal_: Optional[Signals] = None,
output: Optional[IO] = None,
on_failed_termination: Optional[Callable[[int, int], None]] = None,
) -> List[psutil.Process]:
"""
If alive, kills the process (an instance of ``psutil.Process``).
Expand All @@ -123,6 +120,8 @@ def kill_process_psutil(
"""
_log = functools.partial(_log_proc, output=output)
try:
# XXX: do we need to distinguish between parent and child?
# XXX: here we basically apply on_failed_termination to all procs
all_procs = proc.children(recursive=True) + [proc]
except psutil.NoSuchProcess:
return []
Expand Down Expand Up @@ -153,6 +152,28 @@ def kill_process_psutil(
return alive


def cleanup_child_procs(
procs: List[psutil.Process],
timeout: float,
log: Callable[[BaseException], None],
) -> None:
"""
Kill child processes to eliminate possibly orphaned processes.
:param procs: List of child processes to kill.
:param timeout: Timeout in seconds, defaults to 5 seconds.
:param log: Callback function to log exceptions.
"""
_, alive = psutil.wait_procs(procs, timeout=timeout)
for p in alive:
try:
p.kill()
except psutil.NoSuchProcess:
pass # already reaped
except Exception as exc:
log(exc)


DEFAULT_CLOSE_FDS = platform.system() != "Windows"


Expand Down Expand Up @@ -254,8 +275,8 @@ def execute_cmd(

if isinstance(cmd, list):
cmd = [str(a) for a in cmd]
# FIXME: not good enough, need shell escaping
cmd_string = " ".join(cmd) # for logging, easy to copy and execute
# for logging, easy to copy and execute
cmd_string = " ".join(map(shlex.quote, cmd))
else:
cmd_string = cmd

Expand Down
11 changes: 4 additions & 7 deletions testplan/runners/pools/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import traceback
from typing import Dict, Generator, List, Optional, Tuple, Type, Union

from schema import And, Or
from schema import And, Or, Use

from testplan.common import entity
from testplan.common.config import ConfigOption
Expand Down Expand Up @@ -63,6 +63,7 @@ def get_options(cls):
"index": Or(int, str),
ConfigOption("transport", default=QueueClient): object,
ConfigOption("restart_count", default=3): int,
ConfigOption("stop_timeout", default=10): Use(float),
}


Expand All @@ -72,11 +73,9 @@ class WorkerBase(entity.Resource):
and sends back task results.
:param index: Worker index id.
:type index: ``int`` or ``str``
:param transport: Transport class for pool/worker communication.
:type transport: :py:class:`~testplan.runners.pools.connection.Client`
:param restart_count: How many times a worker in pool can be restarted.
:type restart_count: ``int``
:param stop_timeout: Timeout for graceful shutdown (in seconds).
Also inherits all :py:class:`~testplan.common.entity.base.Resource`
options.
Expand Down Expand Up @@ -171,8 +170,6 @@ class Worker(WorkerBase):
Worker that runs a thread and pull tasks from transport
"""

_STOP_TIMEOUT = 10

def __init__(self, **options) -> None:
super().__init__(**options)
self._handler = None
Expand All @@ -195,7 +192,7 @@ def starting(self) -> None:
def stopping(self) -> None:
"""Stops the worker."""
if self._handler:
interruptible_join(self._handler, self._STOP_TIMEOUT)
interruptible_join(self._handler, self.cfg.stop_timeout)
self._handler = None

def aborting(self) -> None:
Expand Down
14 changes: 4 additions & 10 deletions testplan/runners/pools/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def get_options(cls):
"""
return {
ConfigOption("transport", default=ZMQClientProxy): object,
ConfigOption("sigint_timeout", default=5): int,
}


Expand All @@ -44,18 +43,13 @@ class ProcessWorker(Worker):
executes them and sends back task results.
:param transport: Transport class for pool/worker communication.
:param sigint_timeout: number of seconds to wait between ``SIGINT`` and ``SIGKILL``
Also inherits all :py:class:`~testplan.runners.pools.base.Worker` options.
"""

CONFIG = ProcessWorkerConfig

def __init__(
self,
sigint_timeout: int = 5,
**options,
) -> None:
def __init__(self, **options) -> None:
options.update(self.filter_locals(locals()))
super(ProcessWorker, self).__init__(**options)

Expand Down Expand Up @@ -151,8 +145,8 @@ def is_alive(self) -> bool:

def stopping(self) -> None:
"""Stop child process worker."""
if hasattr(self, "_handler") and self._handler:
kill_process(self._handler, self.cfg.sigint_timeout)
if self._handler:
kill_process(self._handler, self.cfg.stop_timeout)
self.status.change(self.STATUS.STOPPED)

def aborting(self) -> None:
Expand Down Expand Up @@ -182,7 +176,7 @@ def get_options(cls):
ConfigOption("port", default=0): int,
ConfigOption(
"abort_signals", default=[signal.SIGINT, signal.SIGTERM]
): [int],
): [signal.Signals],
ConfigOption("worker_type", default=ProcessWorker): object,
ConfigOption("worker_heartbeat", default=5): Or(int, float, None),
}
Expand Down
Loading

0 comments on commit da15631

Please sign in to comment.