From 6ab796b6fe96d4b90c17afd878e365931ed8f972 Mon Sep 17 00:00:00 2001 From: yuxuan-ms Date: Mon, 5 Aug 2024 12:08:24 +0800 Subject: [PATCH] Add early stop in pool. --- testplan/common/entity/base.py | 41 ++++++++++++++++++++++++-- testplan/runners/pools/base.py | 50 ++++++++++++++++++++++++-------- testplan/runners/pools/remote.py | 36 +++++++++++++++++++++-- 3 files changed, 110 insertions(+), 17 deletions(-) diff --git a/testplan/common/entity/base.py b/testplan/common/entity/base.py index 7431defca..2fa3ae3c5 100644 --- a/testplan/common/entity/base.py +++ b/testplan/common/entity/base.py @@ -271,12 +271,39 @@ def start_in_pool(self, pool): resource.wait(resource.STATUS.STARTED) resource.logger.info("%s started", resource) - def stop(self, is_reversed=False): + def sync_stop_resource(self, resource: "Resource"): + """ + Stop a resource and log exceptions. + """ + if resource.status in ( + resource.STATUS.STOPPING, + resource.STATUS.STOPPED, + ): + return + + resource.logger.info("Stopping %s", resource) + try: + resource.stop() + except Exception: + self._record_resource_exception( + message="While stopping resource {resource}:\n" + "{traceback_exc}\n{fetch_msg}", + resource=resource, + msg_store=self.stop_exceptions, + ) + + # Resource status should be STOPPED even it failed to stop + resource.force_stopped() + else: + if resource.async_start: + resource.wait(resource.STATUS.STOPPED) + resource.logger.info("%s stopped", resource) + + def stop(self, is_reversed: bool = False): """ Stop all resources, optionally in reverse order, and log exceptions. :param is_reversed: flag whether to stop resources in reverse order - :type is_reversed: ``bool`` """ resources = list(self._resources.values()) if is_reversed is True: @@ -285,6 +312,11 @@ def stop(self, is_reversed=False): # Stop all resources resources_to_wait_for = [] for resource in resources: + if resource.status in ( + resource.STATUS.STOPPING, + resource.STATUS.STOPPED, + ): + continue try: resource.stop() except Exception: @@ -326,6 +358,11 @@ def stop_in_pool(self, pool, is_reversed=False): # Stop all resources resources_to_wait_for = [] for resource in resources: + if resource.status in ( + resource.STATUS.STOPPING, + resource.STATUS.STOPPED, + ): + continue pool.apply_async( self._log_exception( resource, resource.stop, self.stop_exceptions diff --git a/testplan/runners/pools/base.py b/testplan/runners/pools/base.py index 60733612b..4ad89ae7e 100644 --- a/testplan/runners/pools/base.py +++ b/testplan/runners/pools/base.py @@ -7,7 +7,16 @@ import threading import time import traceback -from typing import Dict, Generator, List, Optional, Tuple, Type, Union +from typing import ( + Dict, + Generator, + List, + Optional, + Tuple, + Type, + Union, + Callable, +) from schema import And, Or @@ -36,6 +45,9 @@ def __init__(self) -> None: self.q = queue.PriorityQueue() self.count = 0 + def size(self) -> int: + return self.q.qsize() + def put(self, priority: int, item: str) -> None: self.q.put((priority, self.count, item)) self.count += 1 @@ -339,6 +351,7 @@ class Pool(Executor): CONFIG = PoolConfig CONN_MANAGER = QueueServer + RESERVE_WORKER_NUM = 3 def __init__( self, @@ -370,7 +383,9 @@ def __init__( # Methods for handling different Message types. These are expected to # take the worker, request and response objects as the only required # positional args. - self._request_handlers = { + self._request_handlers: Dict[ + str, Callable[[Worker, Message, Message], None] + ] = { Message.ConfigRequest: self._handle_cfg_request, Message.TaskPullRequest: self._handle_taskpull_request, Message.TaskResults: self._handle_taskresults, @@ -385,6 +400,13 @@ def __init__( # NOTE: the critical section would be rather meaningless self._discard_pending_lock = threading.RLock() self._discard_pending = False + self._stopping_queue = [] + self.worker_status = { + "active": [], + "inactive": [], + "initializing": [], + "abort": [], + } def uid(self) -> str: """Pool name.""" @@ -535,6 +557,10 @@ def _handle_taskpull_request( try: priority, uid = self.unassigned.get() except queue.Empty: + self.logger.debug("No tasks to assign to %s", worker) + if self._early_stop_worker(worker.uid()): + worker.respond(response.make(Message.Stop)) + return break task = self._input[uid] @@ -702,6 +728,10 @@ def _handle_setupfailed( worker.respond(response.make(Message.Ack)) self._decommission_worker(worker, "Aborting {}, setup failed.") + def _early_stop_worker(self, worker_uid: Union[str, int]) -> bool: + # Thread pool doesn't support early stop + return False + def _decommission_worker(self, worker: Worker, message: str) -> None: """ Decommission a worker by move all assigned task back to pool @@ -724,12 +754,7 @@ def _workers_monitoring(self) -> None: 1) handler status 2) heartbeat if available """ - previous_status = { - "active": [], - "inactive": [], - "initializing": [], - "abort": [], - } + loop_interval = self.cfg.worker_heartbeat or 5 # seconds break_outer_loop = False @@ -750,8 +775,9 @@ def _workers_monitoring(self) -> None: and self.status != self.status.STOPPING and self.status != self.status.STOPPED ): - if self._handle_inactive(worker, reason): - status = "active" + if worker.uid() not in self._stopping_queue: + if self._handle_inactive(worker, reason): + status = "active" else: self.logger.user_info( "%s is aborting/stopping, exit monitor.", self @@ -764,12 +790,12 @@ def _workers_monitoring(self) -> None: if break_outer_loop: break - if hosts_status != previous_status: + if hosts_status != self.worker_status: self.logger.info( "Hosts status update at %s", datetime.datetime.now() ) self.logger.info(pprint.pformat(hosts_status)) - previous_status = hosts_status + self.worker_status = hosts_status if ( not hosts_status["active"] diff --git a/testplan/runners/pools/remote.py b/testplan/runners/pools/remote.py index f31cf5653..c94039f72 100644 --- a/testplan/runners/pools/remote.py +++ b/testplan/runners/pools/remote.py @@ -3,6 +3,8 @@ import os import signal import socket +import threading +import time from multiprocessing.pool import ThreadPool from typing import Callable, Dict, List, Optional, Tuple, Type, Union @@ -286,6 +288,8 @@ class RemotePool(Pool): CONFIG = RemotePoolConfig CONN_MANAGER = ZMQServer + QUEUE_WAIT_INTERVAL = 3 + MAX_THREAD_POOL_SIZE = 5 def __init__( self, @@ -321,7 +325,6 @@ def __init__( options.update(self.filter_locals(locals())) super(RemotePool, self).__init__(**options) self._options = options # pass to remote worker later - self._request_handlers[ Message.MetadataPull ] = self._worker_setup_metadata @@ -340,11 +343,17 @@ def resource_monitor_address(self) -> Optional[str]: return self.parent.resource_monitor_server.address @staticmethod - def _worker_setup_metadata(worker, request, response) -> None: + def _worker_setup_metadata( + worker: RemoteWorker, _: Message, response: Message + ) -> None: worker.respond( response.make(Message.Metadata, data=worker.setup_metadata) ) + def post_start(self) -> None: + super().post_start() + self._stop_worker_thread.start() + def _add_workers(self) -> None: """TODO.""" for instance in self._instances.values(): @@ -378,13 +387,34 @@ def _start_thread_pool(self) -> None: size = len(self._instances) try: if size > 2: - self.pool = ThreadPool(5 if size > 5 else size) + self.pool = ThreadPool(min(self.MAX_THREAD_POOL_SIZE, size)) except Exception as exc: if isinstance(exc, AttributeError): self.logger.warning( "Please upgrade to the suggested python interpreter." ) + def _early_stop_worker(self, worker_uid: Union[str, int]) -> bool: + with self._pool_lock: + if self.pool and (worker_uid not in self._stopping_queue): + alive_worker_ids = set( + [worker.uid() for worker in self.worker_status["active"]] + ) + if self.RESERVE_WORKER_NUM + self.unassigned.size() < len( + alive_worker_ids - set(self._stopping_queue) + ): + self.logger.user_info( + "Early stop worker %s", self._workers[worker_uid] + ) + print(f"Early stop worker {self._workers[worker_uid]}") + self._stopping_queue.append(worker_uid) + self.pool.apply_async( + self._workers.sync_stop_resource, + (self._workers[worker_uid],), + ) + return True + return False + def starting(self) -> None: self._start_thread_pool() super(RemotePool, self).starting()