Skip to content

Commit

Permalink
Fix App driver stop take 3 (#1146)
Browse files Browse the repository at this point in the history
Co-authored-by: Pyifan <[email protected]>
  • Loading branch information
Pyifan and Pyifan authored Nov 14, 2024
1 parent be96313 commit 1437cee
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 284 deletions.
1 change: 1 addition & 0 deletions doc/newsfragments/3041_changed.app_stop_refactor.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make sure when stop() is called on App type driver, we clean up all orphaned processes.
79 changes: 20 additions & 59 deletions testplan/common/entity/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ def __init__(self, parent: Optional["Entity"] = None):
self.__dict__["_resources"] = OrderedDict()
self.__dict__["start_exceptions"] = OrderedDict()
self.__dict__["stop_exceptions"] = OrderedDict()
# warnings for info display only, no interference needed (really?)
self.__dict__["start_warnings"] = OrderedDict()
self.__dict__["stop_warnings"] = OrderedDict()

def add(self, item: "Resource", uid: Optional[str] = None) -> str:
"""
Expand Down Expand Up @@ -177,17 +174,6 @@ def all_status(self, target) -> bool:
for resource in self._resources
)

def _record_resource_warning(self, message, resource, msg_store):
fetch_msg = "\n".join(resource.fetch_error_log())

msg = message.format(
resource=resource,
traceback_exc=traceback.format_exc(),
fetch_msg=fetch_msg,
)
resource.logger.warning(msg)
msg_store[resource] = "WARNING: " + msg

def _record_resource_exception(self, message, resource, msg_store):
fetch_msg = "\n".join(resource.fetch_error_log())

Expand Down Expand Up @@ -219,7 +205,6 @@ def start(self):
msg_store=self.start_exceptions,
)

# XXX: in the case of failover, put exc in warnings instead?
failover = resource.failover()
if failover:
self._resources[resource.uid()] = failover
Expand Down Expand Up @@ -296,34 +281,20 @@ def stop(self, is_reversed=False):
if is_reversed is True:
resources = resources[::-1]

# XXX: with the legacy design, we cannot really distinguish exceptions
# XXX: raised by ``stopping`` from those raised by ``stopped_check``,
# XXX: we have this ambiguity regarding ``stop``.

# Stop all resources
resources_to_wait_for: List[Resource] = []
for resource in resources:
try:
resource.stop()
except Exception as e:
if isinstance(e, tuple(resource.SUPPRESSED_STOP_EXC)):
self._record_resource_warning(
message="While stopping resource {resource}"
" (mitigated by forcefully stopping resource)"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While stopping resource {resource}"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)

except Exception:
self._record_resource_exception(
message="While stopping resource {resource}"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)
# Resource status should be STOPPED even it failed to stop
resource.force_stopped()
resource.force_stop()
else:
if (
resource.async_start
Expand All @@ -336,24 +307,15 @@ def stop(self, is_reversed=False):
for resource in resources_to_wait_for:
try:
resource.wait(resource.STATUS.STOPPED)
except Exception as e:
if isinstance(e, tuple(resource.SUPPRESSED_STOP_EXC)):
self._record_resource_warning(
message="While waiting for resource {resource} to stop"
" (mitigated by forcefully stopping resource)"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While waiting for resource {resource} to stop"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)
except Exception:
self._record_resource_exception(
message="While waiting for resource {resource} to stop"
":\n{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)
# Resource status should be STOPPED even it failed to stop
resource.force_stopped()
resource.force_stop()
resource.logger.info("%s stopped", resource)

def stop_in_pool(self, pool, is_reversed=False):
Expand Down Expand Up @@ -393,7 +355,7 @@ def stop_in_pool(self, pool, is_reversed=False):
resource.logger.info("%s stopped", resource)
else:
# Resource status should be STOPPED even it failed to stop
resource.force_stopped()
resource.force_stop()

def _log_exception(self, resource, func, exception_record):
"""
Expand Down Expand Up @@ -1385,7 +1347,6 @@ class Resource(Entity):

CONFIG = ResourceConfig
STATUS = ResourceStatus
SUPPRESSED_STOP_EXC = []

def __init__(self, **options):
super(Resource, self).__init__(**options)
Expand Down Expand Up @@ -1562,9 +1523,9 @@ def _wait_stopped(self, timeout: Optional[float] = None):
:param timeout: timeout in seconds
"""
self._after_stopped()
self._mark_stopped()

def _after_stopped(self):
def _mark_stopped(self):
"""
Common logic after a successful Resource stop.
"""
Expand Down Expand Up @@ -1617,7 +1578,7 @@ def restart(self):
if self.async_start:
self.wait(self.STATUS.STARTED)

def force_stopped(self):
def force_stop(self):
"""
Change the status to STOPPED (e.g. exception raised).
"""
Expand Down
60 changes: 49 additions & 11 deletions testplan/common/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,55 @@ def kill_process_psutil(
return alive


def kill_proc_and_child_procs(
proc: subprocess.Popen,
child_procs: List[psutil.Process],
log: Callable[[BaseException], None] = None,
timeout: int = 5,
) -> None:
"""
Kill a process and its child processes.
This function attempts to kill a given process and its child processes.
It first kills the main process, waits for it to terminate, and then
attempts to kill any remaining child processes.
:param proc: The main process to kill.
:type proc: subprocess.Popen
:param child_procs: A list of child processes to kill.
:type child_procs: List[psutil.Process]
:param log: A callable to log exceptions, defaults to None.
:type log: Callable[[BaseException], None], optional
:param timeout: The timeout in seconds to wait for processes to terminate, defaults to 5.
:type timeout: int, optional
:return: The return code of the main process if it was terminated, otherwise None.
:rtype: Union[int, None]
"""

if proc is not None and proc.poll() is None:
try:
proc.kill()
proc.wait()
wait_process_clean(proc.pid, timeout=timeout)
except Exception as exc:
if log:
log(exc)

if child_procs:
# we did not send sig term to child processes
# thus no point to wait for them to terminate
_, alive = psutil.wait_procs(child_procs, timeout=0)
for p in alive:
try:
p.kill()
p.wait()
except psutil.NoSuchProcess:
pass # already reaped
except Exception as exc:
if log:
log(exc)


def cleanup_child_procs(
procs: List[psutil.Process],
timeout: float,
Expand All @@ -206,17 +255,6 @@ def cleanup_child_procs(
log(exc)


def any_alive_child_procs(procs: List[psutil.Process]) -> bool:
for p in procs:
try:
# orphaned zombie process should be reaped by pid 1
if p.is_running() and p.status() != psutil.STATUS_ZOMBIE:
return True
except psutil.NoSuchProcess:
pass
return False


DEFAULT_CLOSE_FDS = platform.system() != "Windows"


Expand Down
6 changes: 0 additions & 6 deletions testplan/testing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,6 @@ def _start_resource(self) -> None:
self._record_driver_connection(case_report)
case_report.pass_if_empty()

if self.resources.start_warnings:
for msg in self.resources.start_warnings.values():
case_report.logger.warning(msg)
if self.resources.start_exceptions:
for msg in self.resources.start_exceptions.values():
case_report.logger.error(msg)
Expand All @@ -490,9 +487,6 @@ def _stop_resource(self, is_reversed=True) -> None:
)
case_report.pass_if_empty()

if self.resources.stop_warnings:
for msg in self.resources.stop_warnings.values():
case_report.logger.warning(msg)
if self.resources.stop_exceptions:
for msg in self.resources.stop_exceptions.values():
case_report.logger.error(msg)
Expand Down
73 changes: 17 additions & 56 deletions testplan/testing/environment/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,77 +206,38 @@ def stop(self, is_reversed=False):
self._pocketwatches[driver.uid()].record_start()
driver.stop()
except Exception as e:
do_suppress = isinstance(
e, tuple(driver.SUPPRESSED_STOP_EXC)
self._record_resource_exception(
message="While stopping driver {resource}"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
)
if do_suppress:
self._record_resource_warning(
message="While stopping driver {resource}"
" (mitigated by forcefully stopping driver)"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While stopping driver {resource}"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
)
# driver status should be STOPPED even it failed to stop
driver.force_stopped()
driver.force_stop()
driver.logger.info("%s force stopped", driver)
self._rt_dependency.mark_processing(driver)
if do_suppress:
self._rt_dependency.mark_processed(driver)
else:
self._rt_dependency.mark_failed_to_process(driver)
self._rt_dependency.mark_failed_to_process(driver)
else:
self._rt_dependency.mark_processing(driver)

# check current drivers
for driver in self._rt_dependency.drivers_processing():
watch: DriverPocketwatch = self._pocketwatches[driver.uid()]
try:
if time.time() >= watch.start_time + watch.total_wait:
# we got a timed-out here
raise TimeoutException(
f"Timeout when stopping {driver}. "
f"{TimeoutExceptionInfo(watch.start_time).msg()}"
)
res = None
if watch.should_check():
res = driver.stopped_check()
if res:
driver._after_stopped()
if driver.stopped_check_with_watch(watch):
driver._mark_stopped()
driver.logger.info("%s stopped", driver)
self._rt_dependency.mark_processed(driver)
except Exception as e:
do_suppress = isinstance(
e, tuple(driver.SUPPRESSED_STOP_EXC)
except Exception:
self._record_resource_exception(
message="While waiting for driver {resource} to stop:\n"
"{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
)
if do_suppress:
self._record_resource_warning(
message="While waiting for driver {resource} to stop"
" (mitigated by forcefully stopping driver)"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_warnings,
)
else:
self._record_resource_exception(
message="While waiting for driver {resource} to stop"
":\n{traceback_exc}\n{fetch_msg}",
resource=driver,
msg_store=self.stop_exceptions,
)
# driver status should be STOPPED even it failed to stop
driver.force_stopped()
driver.force_stop()
driver.logger.info("%s force stopped", driver)
if do_suppress:
self._rt_dependency.mark_processed(driver)
else:
self._rt_dependency.mark_failed_to_process(driver)
self._rt_dependency.mark_failed_to_process(driver)

time.sleep(MINIMUM_CHECK_INTERVAL)
Loading

0 comments on commit 1437cee

Please sign in to comment.