From 1437cee7261dd4ddb53a9796291364ead37526d2 Mon Sep 17 00:00:00 2001 From: Pyifan <37059868+Pyifan@users.noreply.github.com> Date: Thu, 14 Nov 2024 21:09:59 +0800 Subject: [PATCH] Fix App driver stop take 3 (#1146) Co-authored-by: Pyifan --- .../3041_changed.app_stop_refactor.rst | 1 + testplan/common/entity/base.py | 79 +++------- testplan/common/utils/process.py | 60 ++++++-- testplan/testing/base.py | 6 - testplan/testing/environment/base.py | 73 +++------- testplan/testing/multitest/driver/app.py | 136 ++++++++++-------- testplan/testing/multitest/driver/base.py | 13 ++ .../testing/multitest/test_bad_app.py | 67 ++------- .../multitest/driver/myapp/test_app.py | 45 ++---- 9 files changed, 196 insertions(+), 284 deletions(-) create mode 100644 doc/newsfragments/3041_changed.app_stop_refactor.rst diff --git a/doc/newsfragments/3041_changed.app_stop_refactor.rst b/doc/newsfragments/3041_changed.app_stop_refactor.rst new file mode 100644 index 000000000..a40cbb016 --- /dev/null +++ b/doc/newsfragments/3041_changed.app_stop_refactor.rst @@ -0,0 +1 @@ +Make sure when stop() is called on App type driver, we clean up all orphaned processes. diff --git a/testplan/common/entity/base.py b/testplan/common/entity/base.py index 2a2feb7e9..c03d1dca5 100644 --- a/testplan/common/entity/base.py +++ b/testplan/common/entity/base.py @@ -60,9 +60,6 @@ def __init__(self, parent: Optional["Entity"] = None): self.__dict__["_resources"] = OrderedDict() self.__dict__["start_exceptions"] = OrderedDict() self.__dict__["stop_exceptions"] = OrderedDict() - # warnings for info display only, no interference needed (really?) - self.__dict__["start_warnings"] = OrderedDict() - self.__dict__["stop_warnings"] = OrderedDict() def add(self, item: "Resource", uid: Optional[str] = None) -> str: """ @@ -177,17 +174,6 @@ def all_status(self, target) -> bool: for resource in self._resources ) - def _record_resource_warning(self, message, resource, msg_store): - fetch_msg = "\n".join(resource.fetch_error_log()) - - msg = message.format( - resource=resource, - traceback_exc=traceback.format_exc(), - fetch_msg=fetch_msg, - ) - resource.logger.warning(msg) - msg_store[resource] = "WARNING: " + msg - def _record_resource_exception(self, message, resource, msg_store): fetch_msg = "\n".join(resource.fetch_error_log()) @@ -219,7 +205,6 @@ def start(self): msg_store=self.start_exceptions, ) - # XXX: in the case of failover, put exc in warnings instead? failover = resource.failover() if failover: self._resources[resource.uid()] = failover @@ -296,34 +281,20 @@ def stop(self, is_reversed=False): if is_reversed is True: resources = resources[::-1] - # XXX: with the legacy design, we cannot really distinguish exceptions - # XXX: raised by ``stopping`` from those raised by ``stopped_check``, - # XXX: we have this ambiguity regarding ``stop``. - # Stop all resources resources_to_wait_for: List[Resource] = [] for resource in resources: try: resource.stop() - except Exception as e: - if isinstance(e, tuple(resource.SUPPRESSED_STOP_EXC)): - self._record_resource_warning( - message="While stopping resource {resource}" - " (mitigated by forcefully stopping resource)" - ":\n{traceback_exc}\n{fetch_msg}", - resource=resource, - msg_store=self.stop_warnings, - ) - else: - self._record_resource_exception( - message="While stopping resource {resource}" - ":\n{traceback_exc}\n{fetch_msg}", - resource=resource, - msg_store=self.stop_exceptions, - ) - + except Exception: + self._record_resource_exception( + message="While stopping resource {resource}" + ":\n{traceback_exc}\n{fetch_msg}", + resource=resource, + msg_store=self.stop_exceptions, + ) # Resource status should be STOPPED even it failed to stop - resource.force_stopped() + resource.force_stop() else: if ( resource.async_start @@ -336,24 +307,15 @@ def stop(self, is_reversed=False): for resource in resources_to_wait_for: try: resource.wait(resource.STATUS.STOPPED) - except Exception as e: - if isinstance(e, tuple(resource.SUPPRESSED_STOP_EXC)): - self._record_resource_warning( - message="While waiting for resource {resource} to stop" - " (mitigated by forcefully stopping resource)" - ":\n{traceback_exc}\n{fetch_msg}", - resource=resource, - msg_store=self.stop_warnings, - ) - else: - self._record_resource_exception( - message="While waiting for resource {resource} to stop" - ":\n{traceback_exc}\n{fetch_msg}", - resource=resource, - msg_store=self.stop_exceptions, - ) + except Exception: + self._record_resource_exception( + message="While waiting for resource {resource} to stop" + ":\n{traceback_exc}\n{fetch_msg}", + resource=resource, + msg_store=self.stop_exceptions, + ) # Resource status should be STOPPED even it failed to stop - resource.force_stopped() + resource.force_stop() resource.logger.info("%s stopped", resource) def stop_in_pool(self, pool, is_reversed=False): @@ -393,7 +355,7 @@ def stop_in_pool(self, pool, is_reversed=False): resource.logger.info("%s stopped", resource) else: # Resource status should be STOPPED even it failed to stop - resource.force_stopped() + resource.force_stop() def _log_exception(self, resource, func, exception_record): """ @@ -1385,7 +1347,6 @@ class Resource(Entity): CONFIG = ResourceConfig STATUS = ResourceStatus - SUPPRESSED_STOP_EXC = [] def __init__(self, **options): super(Resource, self).__init__(**options) @@ -1562,9 +1523,9 @@ def _wait_stopped(self, timeout: Optional[float] = None): :param timeout: timeout in seconds """ - self._after_stopped() + self._mark_stopped() - def _after_stopped(self): + def _mark_stopped(self): """ Common logic after a successful Resource stop. """ @@ -1617,7 +1578,7 @@ def restart(self): if self.async_start: self.wait(self.STATUS.STARTED) - def force_stopped(self): + def force_stop(self): """ Change the status to STOPPED (e.g. exception raised). """ diff --git a/testplan/common/utils/process.py b/testplan/common/utils/process.py index b2cf88cc2..69488f73f 100644 --- a/testplan/common/utils/process.py +++ b/testplan/common/utils/process.py @@ -184,6 +184,55 @@ def kill_process_psutil( return alive +def kill_proc_and_child_procs( + proc: subprocess.Popen, + child_procs: List[psutil.Process], + log: Callable[[BaseException], None] = None, + timeout: int = 5, +) -> None: + """ + Kill a process and its child processes. + + This function attempts to kill a given process and its child processes. + It first kills the main process, waits for it to terminate, and then + attempts to kill any remaining child processes. + + :param proc: The main process to kill. + :type proc: subprocess.Popen + :param child_procs: A list of child processes to kill. + :type child_procs: List[psutil.Process] + :param log: A callable to log exceptions, defaults to None. + :type log: Callable[[BaseException], None], optional + :param timeout: The timeout in seconds to wait for processes to terminate, defaults to 5. + :type timeout: int, optional + :return: The return code of the main process if it was terminated, otherwise None. + :rtype: Union[int, None] + """ + + if proc is not None and proc.poll() is None: + try: + proc.kill() + proc.wait() + wait_process_clean(proc.pid, timeout=timeout) + except Exception as exc: + if log: + log(exc) + + if child_procs: + # we did not send sig term to child processes + # thus no point to wait for them to terminate + _, alive = psutil.wait_procs(child_procs, timeout=0) + for p in alive: + try: + p.kill() + p.wait() + except psutil.NoSuchProcess: + pass # already reaped + except Exception as exc: + if log: + log(exc) + + def cleanup_child_procs( procs: List[psutil.Process], timeout: float, @@ -206,17 +255,6 @@ def cleanup_child_procs( log(exc) -def any_alive_child_procs(procs: List[psutil.Process]) -> bool: - for p in procs: - try: - # orphaned zombie process should be reaped by pid 1 - if p.is_running() and p.status() != psutil.STATUS_ZOMBIE: - return True - except psutil.NoSuchProcess: - pass - return False - - DEFAULT_CLOSE_FDS = platform.system() != "Windows" diff --git a/testplan/testing/base.py b/testplan/testing/base.py index bd8e792d5..3c2e349b5 100644 --- a/testplan/testing/base.py +++ b/testplan/testing/base.py @@ -464,9 +464,6 @@ def _start_resource(self) -> None: self._record_driver_connection(case_report) case_report.pass_if_empty() - if self.resources.start_warnings: - for msg in self.resources.start_warnings.values(): - case_report.logger.warning(msg) if self.resources.start_exceptions: for msg in self.resources.start_exceptions.values(): case_report.logger.error(msg) @@ -490,9 +487,6 @@ def _stop_resource(self, is_reversed=True) -> None: ) case_report.pass_if_empty() - if self.resources.stop_warnings: - for msg in self.resources.stop_warnings.values(): - case_report.logger.warning(msg) if self.resources.stop_exceptions: for msg in self.resources.stop_exceptions.values(): case_report.logger.error(msg) diff --git a/testplan/testing/environment/base.py b/testplan/testing/environment/base.py index b294cb093..dddd0f937 100644 --- a/testplan/testing/environment/base.py +++ b/testplan/testing/environment/base.py @@ -206,32 +206,17 @@ def stop(self, is_reversed=False): self._pocketwatches[driver.uid()].record_start() driver.stop() except Exception as e: - do_suppress = isinstance( - e, tuple(driver.SUPPRESSED_STOP_EXC) + self._record_resource_exception( + message="While stopping driver {resource}" + ":\n{traceback_exc}\n{fetch_msg}", + resource=driver, + msg_store=self.stop_exceptions, ) - if do_suppress: - self._record_resource_warning( - message="While stopping driver {resource}" - " (mitigated by forcefully stopping driver)" - ":\n{traceback_exc}\n{fetch_msg}", - resource=driver, - msg_store=self.stop_warnings, - ) - else: - self._record_resource_exception( - message="While stopping driver {resource}" - ":\n{traceback_exc}\n{fetch_msg}", - resource=driver, - msg_store=self.stop_exceptions, - ) # driver status should be STOPPED even it failed to stop - driver.force_stopped() + driver.force_stop() driver.logger.info("%s force stopped", driver) self._rt_dependency.mark_processing(driver) - if do_suppress: - self._rt_dependency.mark_processed(driver) - else: - self._rt_dependency.mark_failed_to_process(driver) + self._rt_dependency.mark_failed_to_process(driver) else: self._rt_dependency.mark_processing(driver) @@ -239,44 +224,20 @@ def stop(self, is_reversed=False): for driver in self._rt_dependency.drivers_processing(): watch: DriverPocketwatch = self._pocketwatches[driver.uid()] try: - if time.time() >= watch.start_time + watch.total_wait: - # we got a timed-out here - raise TimeoutException( - f"Timeout when stopping {driver}. " - f"{TimeoutExceptionInfo(watch.start_time).msg()}" - ) - res = None - if watch.should_check(): - res = driver.stopped_check() - if res: - driver._after_stopped() + if driver.stopped_check_with_watch(watch): + driver._mark_stopped() driver.logger.info("%s stopped", driver) self._rt_dependency.mark_processed(driver) - except Exception as e: - do_suppress = isinstance( - e, tuple(driver.SUPPRESSED_STOP_EXC) + except Exception: + self._record_resource_exception( + message="While waiting for driver {resource} to stop:\n" + "{traceback_exc}\n{fetch_msg}", + resource=driver, + msg_store=self.stop_exceptions, ) - if do_suppress: - self._record_resource_warning( - message="While waiting for driver {resource} to stop" - " (mitigated by forcefully stopping driver)" - ":\n{traceback_exc}\n{fetch_msg}", - resource=driver, - msg_store=self.stop_warnings, - ) - else: - self._record_resource_exception( - message="While waiting for driver {resource} to stop" - ":\n{traceback_exc}\n{fetch_msg}", - resource=driver, - msg_store=self.stop_exceptions, - ) # driver status should be STOPPED even it failed to stop - driver.force_stopped() + driver.force_stop() driver.logger.info("%s force stopped", driver) - if do_suppress: - self._rt_dependency.mark_processed(driver) - else: - self._rt_dependency.mark_failed_to_process(driver) + self._rt_dependency.mark_failed_to_process(driver) time.sleep(MINIMUM_CHECK_INTERVAL) diff --git a/testplan/testing/multitest/driver/app.py b/testplan/testing/multitest/driver/app.py index 2464c1b88..1466a07a4 100644 --- a/testplan/testing/multitest/driver/app.py +++ b/testplan/testing/multitest/driver/app.py @@ -7,6 +7,7 @@ import signal import socket import subprocess +import time import uuid import warnings from typing import Dict, List, Optional, Union @@ -31,13 +32,11 @@ from testplan.common.utils.match import LogMatcher from testplan.common.utils.path import StdFiles, archive, makedirs from testplan.common.utils.process import ( - any_alive_child_procs, - cleanup_child_procs, kill_process, subprocess_popen, - wait_process_clean, + kill_proc_and_child_procs, ) -from testplan.common.utils.timing import TimeoutException +from testplan.common.utils.timing import TimeoutException, TimeoutExceptionInfo from testplan.testing.multitest.driver.base import Driver, DriverConfig from testplan.testing.multitest.driver.connection import ( @@ -49,9 +48,15 @@ class OrphanedProcessException(Exception): - def __init__(self, driver, psutil_procs, *args, **kwargs): - msg = f"Orphaned processes detected after stopping {driver}: {psutil_procs}" - super().__init__(msg, *args, **kwargs) + """ + Exception raised when there are orphaned processes after stopping the + driver. + """ + + def __init__(self, driver, procs): + self.driver = driver + self.procs = procs + super().__init__(f"Orphaned processes detected: {procs} for {driver}") class AppConfig(DriverConfig): @@ -129,7 +134,6 @@ class App(Driver): SubprocessFileConnectionExtractor(), SubprocessPortConnectionExtractor(), ] - SUPPRESSED_STOP_EXC = [TimeoutException, OrphanedProcessException] def __init__( self, @@ -169,7 +173,7 @@ def __init__( self._resolved_bin = None self._env = None - self._child_procs = [] # for orphaned procs elimination + self._alive_child_procs = [] # for orphaned procs elimination @emphasized @property @@ -192,6 +196,13 @@ def retcode(self) -> Optional[int]: self._retcode = self.proc.poll() return self._retcode + @property + def alive_child_procs(self) -> List[psutil.Process]: + _, self._alive_child_procs = psutil.wait_procs( + self._alive_child_procs, timeout=0 + ) + return self._alive_child_procs + @emphasized @property def cmd(self) -> str: @@ -399,9 +410,12 @@ def stop_timeout(self) -> float: def stopping(self) -> None: """Stops the application binary process.""" if self.proc is not None and self.retcode is None: - self._child_procs = psutil.Process(self.pid).children( + # take a snapshot of all child procs + # so that we can check if they all terminate later + self._alive_child_procs = psutil.Process(self.pid).children( recursive=True ) + if self.cfg.stop_signal is None: self.proc.terminate() else: @@ -411,41 +425,68 @@ def stopping(self) -> None: def stopped_check(self) -> ActionResult: if self.proc is None: return True - return self.retcode is not None - def force_stopped(self): - if self.proc is not None and self.retcode is None: - self.logger.info( - "%s still alive after %d seconds, force killing", + if self.retcode is None: + return False + + if self.alive_child_procs: + raise OrphanedProcessException(self, self._alive_child_procs) + + return True + + def stopped_check_with_watch(self, watch) -> ActionResult: + def log_fn(exc): + self.logger.warning("While killing driver %s: %s", self, exc) + + if time.time() >= watch.start_time + watch.total_wait: + # not raising a timeout here + self.logger.warning( + "%s still alive after %d seconds, killing", self, self.stop_timeout, ) + kill_proc_and_child_procs( + self.proc, self.alive_child_procs, log_fn + ) + + if watch.should_check(): try: - self.proc.kill() - self.proc.wait() - wait_process_clean(self.proc.pid, timeout=self.stop_timeout) - except (OSError, RuntimeError, TimeoutException) as exc: - self.logger.info( - "While killing process %s: %s", self.proc, exc + return self.stopped_check() + except OrphanedProcessException as exc: + self.logger.warning(exc) + kill_proc_and_child_procs( + self.proc, self.alive_child_procs, log_fn ) + return True - def _log_exc(exc): - self.logger.info( - "While killing child process of %s: %s", self.proc, exc - ) + return False - cleanup_child_procs(self._child_procs, self.stop_timeout, _log_exc) + def _wait_stopped(self, timeout: Optional[float] = None) -> None: + # for App driver and its inheritance, always force stop - self.proc = None - if self.std: - self.std.close() + def log_fn(exc): + self.logger.warning("While killing driver %s: %s", self, exc) - # reset env, binary etc. as they need re-eval in case of restart - self._env = None - self._binary = None - self._log_matcher = None + try: + super()._wait_stopped(timeout) + except TimeoutException: + self.logger.warning( + "%s still alive after %d seconds, killing", + self, + self.stop_timeout, + ) + kill_proc_and_child_procs( + self.proc, self.alive_child_procs, log_fn + ) + self._mark_stopped() + except OrphanedProcessException as exc: + self.logger.warning(exc) + kill_proc_and_child_procs( + self.proc, self.alive_child_procs, log_fn + ) + self._mark_stopped() - super().force_stopped() + # other exceptions gets propagated def post_stop(self): rc = self.retcode @@ -468,14 +509,6 @@ def post_stop(self): ) raise RuntimeError(err_msg) - if any_alive_child_procs(self._child_procs): - self.logger.warning( - "Orphaned processes detected after stopping %s: %s", - self, - self._child_procs, - ) - raise OrphanedProcessException(self, self._child_procs) - def make_runpath_dirs(self) -> None: """ Create mandatory directories and install files from given templates @@ -503,22 +536,9 @@ def restart(self, clean: bool = True) -> None: all persistence is deleted, else a normal restart. """ - try: - self.stop() - if self.async_start: - self.wait(self.status.STOPPED) - except Exception as e: - if isinstance(e, tuple(self.SUPPRESSED_STOP_EXC)): - self.logger.warning( - "When stopping %s within restart call: %s", self, e - ) - self.force_stopped() - else: - self.logger.error( - "When stopping %s within restart call: %s", self, e - ) - self.force_stopped() - raise + self.stop() + if self.async_start: + self.wait(self.status.STOPPED) if clean: self._move_app_path() diff --git a/testplan/testing/multitest/driver/base.py b/testplan/testing/multitest/driver/base.py index 7052d6539..ec681f131 100644 --- a/testplan/testing/multitest/driver/base.py +++ b/testplan/testing/multitest/driver/base.py @@ -212,6 +212,19 @@ def stopped_check(self) -> ActionResult: """ return True + def stopped_check_with_watch(self, watch) -> ActionResult: + + if time.time() >= watch.start_time + watch.total_wait: + raise TimeoutException( + f"Timeout when stopping {self}. " + f"{TimeoutExceptionInfo(watch.start_time).msg()}" + ) + + if watch.should_check(): + return self.stopped_check() + + return False + def starting(self) -> None: """Triggers driver start.""" self._setup_file_logger() diff --git a/tests/functional/testplan/testing/multitest/test_bad_app.py b/tests/functional/testplan/testing/multitest/test_bad_app.py index 9b25c7676..b303e0f13 100644 --- a/tests/functional/testplan/testing/multitest/test_bad_app.py +++ b/tests/functional/testplan/testing/multitest/test_bad_app.py @@ -55,34 +55,30 @@ def make_app(app_args, driver_args, name="app"): @pytest.mark.parametrize( - "app_args, driver_args, suite_cls, warning_pattern", + "app_args, driver_args, suite_cls", ( # normal operation - ([], {}, GoodSuite, None), + ([], {}, GoodSuite), # illegal operation would cause plan crash - ([], {}, BadSuite, None), + ([], {}, BadSuite), # terms all child, parent exit normally though sigterm trapped - (["--mask-sigterm", "parent"], {}, GoodSuite, None), - # parent term timeout, exc suppressed & parent killed + (["--mask-sigterm", "parent"], {}, GoodSuite), + # parent term timeout, parent killed ( ["--mask-sigterm", "parent", "--sleep-time", "5"], {"sigint_timeout": 1}, GoodSuite, - r"WARNING.*Timeout when stopping App\[app\].*", ), # parent killed with child orphaned, exc suppressed ( ["--mask-sigterm", "parent", "--sleep-time", "5"], {"stop_timeout": 1, "stop_signal": signal.SIGKILL}, GoodSuite, - r"WARNING.*Orphaned processes detected after stopping App\[app\].*", ), ), ids=count(0), ) -def test_basic_loose( - app_args, driver_args, suite_cls, warning_pattern, mocker -): +def test_basic_loose(app_args, driver_args, suite_cls, mocker): mock_warn = mocker.patch("warnings.warn") mockplan = Mockplan( name="bad_app_mock_test", @@ -102,59 +98,12 @@ def test_basic_loose( r"sigint_timeout.*deprecated", mock_warn.call_args[0][0] ) - # force_stopped triggered, direct child terminated + # force_stop triggered, direct child terminated curr_proc = psutil.Process() child_procs = curr_proc.children(recursive=True) assert len(child_procs) == 0 assert report.status != report.status.ERROR - if warning_pattern: - stopping_case = report.entries[0].entries[2].entries[0] - assert re.match( - warning_pattern, stopping_case.logs[0]["message"], flags=re.DOTALL - ) - - -@pytest.mark.parametrize( - "app_args, driver_args, error_pattern", - ( - ( - ["--mask-sigterm", "parent", "--sleep-time", "5"], - {"stop_timeout": 1}, - r"ERROR.*Timeout when stopping App\[app\].*", - ), - ( - ["--mask-sigterm", "parent", "--sleep-time", "5"], - {"stop_timeout": 1, "stop_signal": signal.SIGKILL}, - r"ERROR.*Orphaned processes detected after stopping App\[app\].*", - ), - ), - ids=count(0), -) -def test_basic_tight(app_args, driver_args, error_pattern): - App.SUPPRESSED_STOP_EXC = [] - mockplan = Mockplan( - name="bad_app_mock_test", - ) - mockplan.add( - mt.MultiTest( - "dummy_mt", - GoodSuite(), - environment=[make_app(app_args, driver_args)], - ) - ) - report = mockplan.run().report - - # force_stopped triggered, direct child terminated - curr_proc = psutil.Process() - child_procs = curr_proc.children(recursive=True) - assert len(child_procs) == 0 - - assert report.status == report.status.ERROR - stopping_case = report.entries[0].entries[2].entries[0] - assert re.match( - error_pattern, stopping_case.logs[0]["message"], flags=re.DOTALL - ) @pytest.mark.parametrize( @@ -170,7 +119,7 @@ def test_basic_tight(app_args, driver_args, error_pattern): ["--mask-sigterm", "all"], {"stop_timeout": 1}, {"app2": "app"}, - ["app3", "app2"], + ["app3", "app", "app2"], ), ), ids=count(0), diff --git a/tests/unit/testplan/testing/multitest/driver/myapp/test_app.py b/tests/unit/testplan/testing/multitest/driver/myapp/test_app.py index c926592af..4ea6a7af3 100644 --- a/tests/unit/testplan/testing/multitest/driver/myapp/test_app.py +++ b/tests/unit/testplan/testing/multitest/driver/myapp/test_app.py @@ -371,29 +371,17 @@ def run_app(cwd, runpath): @skip_on_windows(reason='No need to dive into Windows "signals".') @pytest.mark.parametrize( - "app_args, force_stop, num_leftover", + "app_args, return_code", ( - ([], False, 0), - (["--mask-sigterm", "parent"], False, 0), - (["--mask-sigterm", "parent"], True, 0), - ( - ["--mask-sigterm", "parent", "--sleep-time", "5"], - False, - 1, - ), - ( - ["--mask-sigterm", "parent", "--sleep-time", "5"], - True, - 0, - ), - (["--mask-sigterm", "child"], False, 2), - (["--mask-sigterm", "child"], True, 0), - (["--mask-sigterm", "all"], False, 3), - (["--mask-sigterm", "all"], True, 0), + ([], 0), + (["--mask-sigterm", "parent"], 0), + (["--mask-sigterm", "parent", "--sleep-time", "5"], -9), + (["--mask-sigterm", "child"], 1), + (["--mask-sigterm", "all"], -9), ), ids=count(0), ) -def test_multiproc_app_stop(runpath, app_args, force_stop, num_leftover): +def test_multiproc_app_stop(runpath, app_args, return_code): """Test App driver stopping behaviour when used with a binary that create processes.""" app = App( name="dummy_multi_proc", @@ -407,28 +395,15 @@ def test_multiproc_app_stop(runpath, app_args, force_stop, num_leftover): runpath=runpath, stop_timeout=1, async_start=False, - expected_retcode=0, + expected_retcode=return_code, ) # TODO: make start, stop behave consistent regardlessly async_start app.start() - try: - app.stop() - except Exception as e: - if force_stop: - app.force_stopped() - # XXX: we don't wait on orphaned child procs, give OS some time - time.sleep(0.01) - else: - assert "Timeout when stopping App" in str( - e - ) or "but actual return code" in str(e) + app.stop() procs = reduce( lambda x, y: psutil.pid_exists(y) and x + [psutil.Process(y)] or x, map(lambda x: int(app.extracts[x]), ["pid", "pid1", "pid2"]), [], ) - assert len(procs) == num_leftover - for p in procs: - p.kill() - p.wait() + assert len(procs) == 0