Skip to content

Commit

Permalink
suppress certain exceptions during app stop
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenyu-ms committed Nov 7, 2024
1 parent f90ae3b commit a9a1624
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 56 deletions.
74 changes: 56 additions & 18 deletions testplan/common/entity/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def __init__(self, parent: Optional["Entity"] = None):
self.__dict__["_resources"] = OrderedDict()
self.__dict__["start_exceptions"] = OrderedDict()
self.__dict__["stop_exceptions"] = OrderedDict()
# warnings for info display only, no interference needed (really?)
self.__dict__["start_warnings"] = OrderedDict()
self.__dict__["stop_warnings"] = OrderedDict()

def add(self, item: "Resource", uid: Optional[str] = None) -> str:
"""
Expand Down Expand Up @@ -174,6 +177,17 @@ def all_status(self, target) -> bool:
for resource in self._resources
)

def _record_resource_warning(self, message, resource, msg_store):
fetch_msg = "\n".join(resource.fetch_error_log())

msg = message.format(
resource=resource,
traceback_exc=traceback.format_exc(),
fetch_msg=fetch_msg,
)
resource.logger.warning(msg)
msg_store[resource] = "WARNING: " + msg

def _record_resource_exception(self, message, resource, msg_store):
fetch_msg = "\n".join(resource.fetch_error_log())

Expand All @@ -183,14 +197,14 @@ def _record_resource_exception(self, message, resource, msg_store):
fetch_msg=fetch_msg,
)
resource.logger.error(msg)
msg_store[resource] = msg
msg_store[resource] = "ERROR: " + msg

def start(self):
"""
Starts all resources sequentially and log errors.
"""
# Trigger start all resources
resources_to_wait_for = []
resources_to_wait_for: List[Resource] = []
for resource in self._resources.values():
if not resource.auto_start:
continue
Expand All @@ -205,6 +219,7 @@ def start(self):
msg_store=self.start_exceptions,
)

# XXX: in the case of failover, put exc in warnings instead?
failover = resource.failover()
if failover:
self._resources[resource.uid()] = failover
Expand Down Expand Up @@ -277,22 +292,35 @@ def stop(self, is_reversed=False):
:param is_reversed: flag whether to stop resources in reverse order
:type is_reversed: ``bool``
"""
resources = list(self._resources.values())
resources: List[Resource] = list(self._resources.values())
if is_reversed is True:
resources = resources[::-1]

# XXX: with the legacy design, we cannot really distinguish exceptions
# XXX: raised by ``stopping`` from those raised by ``stopped_check``,
# XXX: we have this ambiguity regarding ``stop``.

# Stop all resources
resources_to_wait_for = []
resources_to_wait_for: List[Resource] = []
for resource in resources:
try:
resource.stop()
except Exception:
self._record_resource_exception(
message="While stopping resource {resource}:"
"\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)
except Exception as e:
if isinstance(e, tuple(resource.SUPPRESSED_STOP_EXC)):
self._record_resource_warning(
message="While stopping resource {resource}"
" (mitigated by forcefully stopping resource)"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While stopping resource {resource}"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)

# Resource status should be STOPPED even it failed to stop
resource.force_stopped()
Expand All @@ -308,13 +336,22 @@ def stop(self, is_reversed=False):
for resource in resources_to_wait_for:
try:
resource.wait(resource.STATUS.STOPPED)
except Exception:
self._record_resource_exception(
message="While waiting for resource {resource} to stop:"
"\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)
except Exception as e:
if isinstance(e, tuple(resource.SUPPRESSED_STOP_EXC)):
self._record_resource_warning(
message="While waiting for resource {resource} to stop"
" (mitigated by forcefully stopping resource)"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While waiting for resource {resource} to stop"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)
# Resource status should be STOPPED even it failed to stop
resource.force_stopped()
resource.logger.info("%s stopped", resource)
Expand Down Expand Up @@ -1348,6 +1385,7 @@ class Resource(Entity):

CONFIG = ResourceConfig
STATUS = ResourceStatus
SUPPRESSED_STOP_EXC = []

def __init__(self, **options):
super(Resource, self).__init__(**options)
Expand Down
8 changes: 8 additions & 0 deletions testplan/testing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ def _start_resource(self) -> None:
)
self._record_driver_connection(case_report)
case_report.pass_if_empty()

if self.resources.start_warnings:
for msg in self.resources.start_warnings.values():
case_report.logger.warning(msg)
if self.resources.start_exceptions:
for msg in self.resources.start_exceptions.values():
case_report.logger.error(msg)
Expand All @@ -485,6 +489,10 @@ def _stop_resource(self, is_reversed=True) -> None:
ResourceTimings.RESOURCE_TEARDOWN, case_report
)
case_report.pass_if_empty()

if self.resources.stop_warnings:
for msg in self.resources.stop_warnings.values():
case_report.logger.warning(msg)
if self.resources.stop_exceptions:
for msg in self.resources.stop_exceptions.values():
case_report.logger.error(msg)
Expand Down
62 changes: 48 additions & 14 deletions testplan/testing/environment/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import copy
import time
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional

Expand Down Expand Up @@ -135,6 +134,8 @@ def start(self):
)
# exception occurred, skip rest of drivers
self._rt_dependency.purge_drivers_to_process()
self._rt_dependency.mark_processing(driver)
self._rt_dependency.mark_failed_to_process(driver)
break
else:
self._rt_dependency.mark_processing(driver)
Expand Down Expand Up @@ -205,14 +206,32 @@ def stop(self, is_reversed=False):
self._pocketwatches[driver.uid()].record_start()
driver.stop()
except Exception:
self._record_resource_exception(
message="While stopping driver {resource}:\n"
"{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
do_suppress = isinstance(
e, tuple(driver.SUPPRESSED_STOP_EXC)
)
if do_suppress:
self._record_resource_warning(
message="While stopping driver {resource}"
" (mitigated by forcefully stopping resource)"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While stopping driver {resource}"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
)
# driver status should be STOPPED even it failed to stop
driver.force_stopped()
driver.logger.info("%s force stopped", driver)
self._rt_dependency.mark_processing(driver)
if do_suppress:
self._rt_dependency.mark_processed(driver)
else:
self._rt_dependency.mark_failed_to_process(driver)
else:
self._rt_dependency.mark_processing(driver)

Expand All @@ -233,16 +252,31 @@ def stop(self, is_reversed=False):
driver._after_stopped()
driver.logger.info("%s stopped", driver)
self._rt_dependency.mark_processed(driver)
except Exception:
self._record_resource_exception(
message="While waiting for driver {resource} to stop:\n"
"{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
except Exception as e:
do_suppress = isinstance(
e, tuple(driver.SUPPRESSED_STOP_EXC)
)
if do_suppress:
self._record_resource_warning(
message="While waiting for driver {resource} to stop"
" (mitigated by forcefully stopping resource)"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While waiting for driver {resource} to stop"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
)
# driver status should be STOPPED even it failed to stop
driver.force_stopped()
driver.logger.info("%s stopped", driver)
self._rt_dependency.mark_processed(driver)
driver.logger.info("%s force stopped", driver)
if do_suppress:
self._rt_dependency.mark_processed(driver)
else:
self._rt_dependency.mark_failed_to_process(driver)

time.sleep(MINIMUM_CHECK_INTERVAL)
39 changes: 34 additions & 5 deletions testplan/testing/multitest/driver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,29 @@
from testplan.common.utils.match import LogMatcher
from testplan.common.utils.path import StdFiles, archive, makedirs
from testplan.common.utils.process import (
any_alive_child_procs,
cleanup_child_procs,
kill_process,
subprocess_popen,
wait_process_clean,
)
from testplan.common.utils.timing import TimeoutException

from .base import Driver, DriverConfig
from .connection import (
from testplan.testing.multitest.driver.base import Driver, DriverConfig
from testplan.testing.multitest.driver.connection import (
SubprocessFileConnectionExtractor,
SubprocessPortConnectionExtractor,
)

IS_WIN = platform.system() == "Windows"


class OrphanedProcessException(Exception):
def __init__(self, driver, psutil_procs, *args, **kwargs):
msg = f"Orphaned processes detected after stopping {driver}: {psutil_procs}"
super().__init__(msg, *args, **kwargs)


class AppConfig(DriverConfig):
"""
Configuration object for
Expand Down Expand Up @@ -122,6 +129,7 @@ class App(Driver):
SubprocessFileConnectionExtractor(),
SubprocessPortConnectionExtractor(),
]
SUPPRESSED_STOP_EXC = [TimeoutException, OrphanedProcessException]

def __init__(
self,
Expand Down Expand Up @@ -460,6 +468,14 @@ def post_stop(self):
)
raise RuntimeError(err_msg)

if any_alive_child_procs(self._child_procs):
self.logger.warning(
"Orphaned processes detected after stopping %s: %s",
self,
self._child_procs,
)
raise OrphanedProcessException(self, self._child_procs)

def make_runpath_dirs(self) -> None:
"""
Create mandatory directories and install files from given templates
Expand Down Expand Up @@ -487,9 +503,22 @@ def restart(self, clean: bool = True) -> None:
all persistence is deleted, else a normal restart.
"""
self.stop()
if self.async_start:
self.wait(self.status.STOPPED)
try:
self.stop()
if self.async_start:
self.wait(self.status.STOPPED)
except Exception as e:
if isinstance(e, tuple(self.SUPPRESSED_STOP_EXC)):
self.logger.warning(
"When stopping %s within restart call: %s", self, e
)
self.force_stopped()
else:
self.logger.error(
"When stopping %s within restart call: %s", self, e
)
self.force_stopped()
raise

if clean:
self._move_app_path()
Expand Down
Loading

0 comments on commit a9a1624

Please sign in to comment.