Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/suppress certain exceptions during app stop #1145

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
If :py:class:`App <~testplan.testing.multitest.driver.app.App>` driver times out during shutdown or leaves orphaned processes after shutdown, Testplan will now emit a warning and perform a forced cleanup instead of failing the tests.
74 changes: 56 additions & 18 deletions testplan/common/entity/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def __init__(self, parent: Optional["Entity"] = None):
self.__dict__["_resources"] = OrderedDict()
self.__dict__["start_exceptions"] = OrderedDict()
self.__dict__["stop_exceptions"] = OrderedDict()
# warnings for info display only, no interference needed (really?)
self.__dict__["start_warnings"] = OrderedDict()
self.__dict__["stop_warnings"] = OrderedDict()

def add(self, item: "Resource", uid: Optional[str] = None) -> str:
"""
Expand Down Expand Up @@ -174,6 +177,17 @@ def all_status(self, target) -> bool:
for resource in self._resources
)

def _record_resource_warning(self, message, resource, msg_store):
fetch_msg = "\n".join(resource.fetch_error_log())

msg = message.format(
resource=resource,
traceback_exc=traceback.format_exc(),
fetch_msg=fetch_msg,
)
resource.logger.warning(msg)
msg_store[resource] = "WARNING: " + msg

def _record_resource_exception(self, message, resource, msg_store):
fetch_msg = "\n".join(resource.fetch_error_log())

Expand All @@ -183,14 +197,14 @@ def _record_resource_exception(self, message, resource, msg_store):
fetch_msg=fetch_msg,
)
resource.logger.error(msg)
msg_store[resource] = msg
msg_store[resource] = "ERROR: " + msg

def start(self):
"""
Starts all resources sequentially and log errors.
"""
# Trigger start all resources
resources_to_wait_for = []
resources_to_wait_for: List[Resource] = []
for resource in self._resources.values():
if not resource.auto_start:
continue
Expand All @@ -205,6 +219,7 @@ def start(self):
msg_store=self.start_exceptions,
)

# XXX: in the case of failover, put exc in warnings instead?
failover = resource.failover()
if failover:
self._resources[resource.uid()] = failover
Expand Down Expand Up @@ -277,22 +292,35 @@ def stop(self, is_reversed=False):
:param is_reversed: flag whether to stop resources in reverse order
:type is_reversed: ``bool``
"""
resources = list(self._resources.values())
resources: List[Resource] = list(self._resources.values())
if is_reversed is True:
resources = resources[::-1]

# XXX: with the legacy design, we cannot really distinguish exceptions
# XXX: raised by ``stopping`` from those raised by ``stopped_check``,
# XXX: we have this ambiguity regarding ``stop``.

# Stop all resources
resources_to_wait_for = []
resources_to_wait_for: List[Resource] = []
for resource in resources:
try:
resource.stop()
except Exception:
self._record_resource_exception(
message="While stopping resource {resource}:"
"\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)
except Exception as e:
if isinstance(e, tuple(resource.SUPPRESSED_STOP_EXC)):
self._record_resource_warning(
message="While stopping resource {resource}"
" (mitigated by forcefully stopping resource)"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While stopping resource {resource}"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)

# Resource status should be STOPPED even it failed to stop
resource.force_stopped()
Expand All @@ -308,13 +336,22 @@ def stop(self, is_reversed=False):
for resource in resources_to_wait_for:
try:
resource.wait(resource.STATUS.STOPPED)
except Exception:
self._record_resource_exception(
message="While waiting for resource {resource} to stop:"
"\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)
except Exception as e:
if isinstance(e, tuple(resource.SUPPRESSED_STOP_EXC)):
self._record_resource_warning(
message="While waiting for resource {resource} to stop"
" (mitigated by forcefully stopping resource)"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While waiting for resource {resource} to stop"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)
# Resource status should be STOPPED even it failed to stop
resource.force_stopped()
resource.logger.info("%s stopped", resource)
Expand Down Expand Up @@ -1348,6 +1385,7 @@ class Resource(Entity):

CONFIG = ResourceConfig
STATUS = ResourceStatus
SUPPRESSED_STOP_EXC = []

def __init__(self, **options):
super(Resource, self).__init__(**options)
Expand Down
11 changes: 11 additions & 0 deletions testplan/common/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ def cleanup_child_procs(
log(exc)


def any_alive_child_procs(procs: List[psutil.Process]) -> bool:
for p in procs:
try:
# orphaned zombie process should be reaped by pid 1
if p.is_running() and p.status() != psutil.STATUS_ZOMBIE:
return True
except psutil.NoSuchProcess:
pass
return False


DEFAULT_CLOSE_FDS = platform.system() != "Windows"


Expand Down
8 changes: 8 additions & 0 deletions testplan/testing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ def _start_resource(self) -> None:
)
self._record_driver_connection(case_report)
case_report.pass_if_empty()

if self.resources.start_warnings:
for msg in self.resources.start_warnings.values():
case_report.logger.warning(msg)
if self.resources.start_exceptions:
for msg in self.resources.start_exceptions.values():
case_report.logger.error(msg)
Expand All @@ -485,6 +489,10 @@ def _stop_resource(self, is_reversed=True) -> None:
ResourceTimings.RESOURCE_TEARDOWN, case_report
)
case_report.pass_if_empty()

if self.resources.stop_warnings:
for msg in self.resources.stop_warnings.values():
case_report.logger.warning(msg)
if self.resources.stop_exceptions:
for msg in self.resources.stop_exceptions.values():
case_report.logger.error(msg)
Expand Down
64 changes: 49 additions & 15 deletions testplan/testing/environment/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import copy
import time
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional

Expand Down Expand Up @@ -135,6 +134,8 @@ def start(self):
)
# exception occurred, skip rest of drivers
self._rt_dependency.purge_drivers_to_process()
self._rt_dependency.mark_processing(driver)
self._rt_dependency.mark_failed_to_process(driver)
break
else:
self._rt_dependency.mark_processing(driver)
Expand Down Expand Up @@ -204,15 +205,33 @@ def stop(self, is_reversed=False):
try:
self._pocketwatches[driver.uid()].record_start()
driver.stop()
except Exception:
self._record_resource_exception(
message="While stopping driver {resource}:\n"
"{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
except Exception as e:
do_suppress = isinstance(
e, tuple(driver.SUPPRESSED_STOP_EXC)
)
if do_suppress:
self._record_resource_warning(
message="While stopping driver {resource}"
" (mitigated by forcefully stopping driver)"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While stopping driver {resource}"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
)
# driver status should be STOPPED even it failed to stop
driver.force_stopped()
driver.logger.info("%s force stopped", driver)
self._rt_dependency.mark_processing(driver)
if do_suppress:
self._rt_dependency.mark_processed(driver)
else:
self._rt_dependency.mark_failed_to_process(driver)
else:
self._rt_dependency.mark_processing(driver)

Expand All @@ -233,16 +252,31 @@ def stop(self, is_reversed=False):
driver._after_stopped()
driver.logger.info("%s stopped", driver)
self._rt_dependency.mark_processed(driver)
except Exception:
self._record_resource_exception(
message="While waiting for driver {resource} to stop:\n"
"{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
except Exception as e:
do_suppress = isinstance(
e, tuple(driver.SUPPRESSED_STOP_EXC)
)
if do_suppress:
self._record_resource_warning(
message="While waiting for driver {resource} to stop"
" (mitigated by forcefully stopping driver)"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While waiting for driver {resource} to stop"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
)
# driver status should be STOPPED even it failed to stop
driver.force_stopped()
driver.logger.info("%s stopped", driver)
self._rt_dependency.mark_processed(driver)
driver.logger.info("%s force stopped", driver)
if do_suppress:
self._rt_dependency.mark_processed(driver)
else:
self._rt_dependency.mark_failed_to_process(driver)

time.sleep(MINIMUM_CHECK_INTERVAL)
39 changes: 34 additions & 5 deletions testplan/testing/multitest/driver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,29 @@
from testplan.common.utils.match import LogMatcher
from testplan.common.utils.path import StdFiles, archive, makedirs
from testplan.common.utils.process import (
any_alive_child_procs,
cleanup_child_procs,
kill_process,
subprocess_popen,
wait_process_clean,
)
from testplan.common.utils.timing import TimeoutException

from .base import Driver, DriverConfig
from .connection import (
from testplan.testing.multitest.driver.base import Driver, DriverConfig
from testplan.testing.multitest.driver.connection import (
SubprocessFileConnectionExtractor,
SubprocessPortConnectionExtractor,
)

IS_WIN = platform.system() == "Windows"


class OrphanedProcessException(Exception):
def __init__(self, driver, psutil_procs, *args, **kwargs):
msg = f"Orphaned processes detected after stopping {driver}: {psutil_procs}"
super().__init__(msg, *args, **kwargs)


class AppConfig(DriverConfig):
"""
Configuration object for
Expand Down Expand Up @@ -122,6 +129,7 @@ class App(Driver):
SubprocessFileConnectionExtractor(),
SubprocessPortConnectionExtractor(),
]
SUPPRESSED_STOP_EXC = [TimeoutException, OrphanedProcessException]

def __init__(
self,
Expand Down Expand Up @@ -460,6 +468,14 @@ def post_stop(self):
)
raise RuntimeError(err_msg)

if any_alive_child_procs(self._child_procs):
self.logger.warning(
"Orphaned processes detected after stopping %s: %s",
self,
self._child_procs,
)
raise OrphanedProcessException(self, self._child_procs)

def make_runpath_dirs(self) -> None:
"""
Create mandatory directories and install files from given templates
Expand Down Expand Up @@ -487,9 +503,22 @@ def restart(self, clean: bool = True) -> None:
all persistence is deleted, else a normal restart.

"""
self.stop()
if self.async_start:
self.wait(self.status.STOPPED)
try:
self.stop()
if self.async_start:
self.wait(self.status.STOPPED)
except Exception as e:
if isinstance(e, tuple(self.SUPPRESSED_STOP_EXC)):
self.logger.warning(
"When stopping %s within restart call: %s", self, e
)
self.force_stopped()
else:
self.logger.error(
"When stopping %s within restart call: %s", self, e
)
self.force_stopped()
raise

if clean:
self._move_app_path()
Expand Down
Loading