diff --git a/testplan/common/entity/base.py b/testplan/common/entity/base.py index 9a58a6a00..6558134f6 100644 --- a/testplan/common/entity/base.py +++ b/testplan/common/entity/base.py @@ -270,12 +270,39 @@ def start_in_pool(self, pool): resource.wait(resource.STATUS.STARTED) resource.logger.info("%s started", resource) - def stop(self, is_reversed=False): + def sync_stop_resource(self, resource: "Resource"): + """ + Stop a resource and log exceptions. + """ + if resource.status in ( + resource.STATUS.STOPPING, + resource.STATUS.STOPPED, + ): + return + + resource.logger.info("Stopping %s", resource) + try: + resource.stop() + except Exception: + self._record_resource_exception( + message="While stopping resource {resource}:\n" + "{traceback_exc}\n{fetch_msg}", + resource=resource, + msg_store=self.stop_exceptions, + ) + + # Resource status should be STOPPED even it failed to stop + resource.force_stopped() + else: + if resource.async_start: + resource.wait(resource.STATUS.STOPPED) + resource.logger.info("%s stopped", resource) + + def stop(self, is_reversed: bool = False): """ Stop all resources, optionally in reverse order, and log exceptions. :param is_reversed: flag whether to stop resources in reverse order - :type is_reversed: ``bool`` """ resources = list(self._resources.values()) if is_reversed is True: @@ -284,6 +311,11 @@ def stop(self, is_reversed=False): # Stop all resources resources_to_wait_for = [] for resource in resources: + if resource.status in ( + resource.STATUS.STOPPING, + resource.STATUS.STOPPED, + ): + continue try: resource.stop() except Exception: @@ -335,6 +367,11 @@ def stop_in_pool(self, pool, is_reversed=False): # Stop all resources resources_to_wait_for = [] for resource in resources: + if resource.status in ( + resource.STATUS.STOPPING, + resource.STATUS.STOPPED, + ): + continue pool.apply_async( self._log_exception( resource, resource.stop, self.stop_exceptions diff --git a/testplan/runners/pools/base.py b/testplan/runners/pools/base.py index d5e8b0c79..8b7cd8b04 100644 --- a/testplan/runners/pools/base.py +++ b/testplan/runners/pools/base.py @@ -7,7 +7,16 @@ import threading import time import traceback -from typing import Dict, Generator, List, Optional, Tuple, Type, Union +from typing import ( + Dict, + Generator, + List, + Optional, + Tuple, + Type, + Union, + Callable, +) from schema import And, Or, Use @@ -36,6 +45,9 @@ def __init__(self) -> None: self.q = queue.PriorityQueue() self.count = 0 + def size(self) -> int: + return self.q.qsize() + def put(self, priority: int, item: str) -> None: self.q.put((priority, self.count, item)) self.count += 1 @@ -336,6 +348,7 @@ class Pool(Executor): CONFIG = PoolConfig CONN_MANAGER = QueueServer + RESERVE_WORKER_NUM = 3 def __init__( self, @@ -367,7 +380,9 @@ def __init__( # Methods for handling different Message types. These are expected to # take the worker, request and response objects as the only required # positional args. - self._request_handlers = { + self._request_handlers: Dict[ + str, Callable[[Worker, Message, Message], None] + ] = { Message.ConfigRequest: self._handle_cfg_request, Message.TaskPullRequest: self._handle_taskpull_request, Message.TaskResults: self._handle_taskresults, @@ -382,6 +397,13 @@ def __init__( # NOTE: the critical section would be rather meaningless self._discard_pending_lock = threading.RLock() self._discard_pending = False + self._stopping_queue = [] + self.worker_status = { + "active": [], + "inactive": [], + "initializing": [], + "abort": [], + } def uid(self) -> str: """Pool name.""" @@ -532,6 +554,10 @@ def _handle_taskpull_request( try: priority, uid = self.unassigned.get() except queue.Empty: + self.logger.debug("No tasks to assign to %s", worker) + if self._early_stop_worker(worker.uid()): + worker.respond(response.make(Message.Stop)) + return break task = self._input[uid] @@ -699,6 +725,10 @@ def _handle_setupfailed( worker.respond(response.make(Message.Ack)) self._decommission_worker(worker, "Aborting {}, setup failed.") + def _early_stop_worker(self, worker_uid: Union[str, int]) -> bool: + # Thread pool doesn't support early stop + return False + def _decommission_worker(self, worker: Worker, message: str) -> None: """ Decommission a worker by move all assigned task back to pool @@ -721,12 +751,7 @@ def _workers_monitoring(self) -> None: 1) handler status 2) heartbeat if available """ - previous_status = { - "active": [], - "inactive": [], - "initializing": [], - "abort": [], - } + loop_interval = self.cfg.worker_heartbeat or 5 # seconds break_outer_loop = False @@ -747,8 +772,9 @@ def _workers_monitoring(self) -> None: and self.status != self.status.STOPPING and self.status != self.status.STOPPED ): - if self._handle_inactive(worker, reason): - status = "active" + if worker.uid() not in self._stopping_queue: + if self._handle_inactive(worker, reason): + status = "active" else: self.logger.user_info( "%s is aborting/stopping, exit monitor.", self @@ -761,12 +787,12 @@ def _workers_monitoring(self) -> None: if break_outer_loop: break - if hosts_status != previous_status: + if hosts_status != self.worker_status: self.logger.info( "Hosts status update at %s", datetime.datetime.now() ) self.logger.info(pprint.pformat(hosts_status)) - previous_status = hosts_status + self.worker_status = hosts_status if ( not hosts_status["active"] diff --git a/testplan/runners/pools/remote.py b/testplan/runners/pools/remote.py index c492ac12d..780875830 100644 --- a/testplan/runners/pools/remote.py +++ b/testplan/runners/pools/remote.py @@ -287,6 +287,8 @@ class RemotePool(Pool): CONFIG = RemotePoolConfig CONN_MANAGER = ZMQServer + QUEUE_WAIT_INTERVAL = 3 + MAX_THREAD_POOL_SIZE = 5 def __init__( self, @@ -322,7 +324,6 @@ def __init__( options.update(self.filter_locals(locals())) super(RemotePool, self).__init__(**options) self._options = options # pass to remote worker later - self._request_handlers[ Message.MetadataPull ] = self._worker_setup_metadata @@ -341,7 +342,9 @@ def resource_monitor_address(self) -> Optional[str]: return self.parent.resource_monitor_server.address @staticmethod - def _worker_setup_metadata(worker, request, response) -> None: + def _worker_setup_metadata( + worker: RemoteWorker, _: Message, response: Message + ) -> None: worker.respond( response.make(Message.Metadata, data=worker.setup_metadata) ) @@ -386,6 +389,26 @@ def _start_thread_pool(self) -> None: "Please upgrade to the suggested python interpreter." ) + def _early_stop_worker(self, worker_uid: Union[str, int]) -> bool: + with self._pool_lock: + if self.pool and (worker_uid not in self._stopping_queue): + alive_worker_ids = set( + [worker.uid() for worker in self.worker_status["active"]] + ) + if self.RESERVE_WORKER_NUM + self.unassigned.size() < len( + alive_worker_ids - set(self._stopping_queue) + ): + self.logger.user_info( + "Early stop worker %s", self._workers[worker_uid] + ) + self._stopping_queue.append(worker_uid) + self.pool.apply_async( + self._workers.sync_stop_resource, + (self._workers[worker_uid],), + ) + return True + return False + def starting(self) -> None: self._start_thread_pool() super(RemotePool, self).starting()