Skip to content

Commit

Permalink
Add early stop in pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxuan-ms committed Sep 2, 2024
1 parent f3ad1c5 commit 6ab796b
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 17 deletions.
41 changes: 39 additions & 2 deletions testplan/common/entity/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,39 @@ def start_in_pool(self, pool):
resource.wait(resource.STATUS.STARTED)
resource.logger.info("%s started", resource)

def stop(self, is_reversed=False):
def sync_stop_resource(self, resource: "Resource"):
"""
Stop a resource and log exceptions.
"""
if resource.status in (
resource.STATUS.STOPPING,
resource.STATUS.STOPPED,
):
return

resource.logger.info("Stopping %s", resource)
try:
resource.stop()
except Exception:
self._record_resource_exception(
message="While stopping resource {resource}:\n"
"{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)

# Resource status should be STOPPED even it failed to stop
resource.force_stopped()
else:
if resource.async_start:
resource.wait(resource.STATUS.STOPPED)
resource.logger.info("%s stopped", resource)

def stop(self, is_reversed: bool = False):
"""
Stop all resources, optionally in reverse order, and log exceptions.
:param is_reversed: flag whether to stop resources in reverse order
:type is_reversed: ``bool``
"""
resources = list(self._resources.values())
if is_reversed is True:
Expand All @@ -285,6 +312,11 @@ def stop(self, is_reversed=False):
# Stop all resources
resources_to_wait_for = []
for resource in resources:
if resource.status in (
resource.STATUS.STOPPING,
resource.STATUS.STOPPED,
):
continue
try:
resource.stop()
except Exception:
Expand Down Expand Up @@ -326,6 +358,11 @@ def stop_in_pool(self, pool, is_reversed=False):
# Stop all resources
resources_to_wait_for = []
for resource in resources:
if resource.status in (
resource.STATUS.STOPPING,
resource.STATUS.STOPPED,
):
continue
pool.apply_async(
self._log_exception(
resource, resource.stop, self.stop_exceptions
Expand Down
50 changes: 38 additions & 12 deletions testplan/runners/pools/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@
import threading
import time
import traceback
from typing import Dict, Generator, List, Optional, Tuple, Type, Union
from typing import (
Dict,
Generator,
List,
Optional,
Tuple,
Type,
Union,
Callable,
)

from schema import And, Or

Expand Down Expand Up @@ -36,6 +45,9 @@ def __init__(self) -> None:
self.q = queue.PriorityQueue()
self.count = 0

def size(self) -> int:
return self.q.qsize()

def put(self, priority: int, item: str) -> None:
self.q.put((priority, self.count, item))
self.count += 1
Expand Down Expand Up @@ -339,6 +351,7 @@ class Pool(Executor):

CONFIG = PoolConfig
CONN_MANAGER = QueueServer
RESERVE_WORKER_NUM = 3

def __init__(
self,
Expand Down Expand Up @@ -370,7 +383,9 @@ def __init__(
# Methods for handling different Message types. These are expected to
# take the worker, request and response objects as the only required
# positional args.
self._request_handlers = {
self._request_handlers: Dict[
str, Callable[[Worker, Message, Message], None]
] = {
Message.ConfigRequest: self._handle_cfg_request,
Message.TaskPullRequest: self._handle_taskpull_request,
Message.TaskResults: self._handle_taskresults,
Expand All @@ -385,6 +400,13 @@ def __init__(
# NOTE: the critical section would be rather meaningless
self._discard_pending_lock = threading.RLock()
self._discard_pending = False
self._stopping_queue = []
self.worker_status = {
"active": [],
"inactive": [],
"initializing": [],
"abort": [],
}

def uid(self) -> str:
"""Pool name."""
Expand Down Expand Up @@ -535,6 +557,10 @@ def _handle_taskpull_request(
try:
priority, uid = self.unassigned.get()
except queue.Empty:
self.logger.debug("No tasks to assign to %s", worker)
if self._early_stop_worker(worker.uid()):
worker.respond(response.make(Message.Stop))
return
break

task = self._input[uid]
Expand Down Expand Up @@ -702,6 +728,10 @@ def _handle_setupfailed(
worker.respond(response.make(Message.Ack))
self._decommission_worker(worker, "Aborting {}, setup failed.")

def _early_stop_worker(self, worker_uid: Union[str, int]) -> bool:
# Thread pool doesn't support early stop
return False

def _decommission_worker(self, worker: Worker, message: str) -> None:
"""
Decommission a worker by move all assigned task back to pool
Expand All @@ -724,12 +754,7 @@ def _workers_monitoring(self) -> None:
1) handler status
2) heartbeat if available
"""
previous_status = {
"active": [],
"inactive": [],
"initializing": [],
"abort": [],
}

loop_interval = self.cfg.worker_heartbeat or 5 # seconds
break_outer_loop = False

Expand All @@ -750,8 +775,9 @@ def _workers_monitoring(self) -> None:
and self.status != self.status.STOPPING
and self.status != self.status.STOPPED
):
if self._handle_inactive(worker, reason):
status = "active"
if worker.uid() not in self._stopping_queue:
if self._handle_inactive(worker, reason):
status = "active"
else:
self.logger.user_info(
"%s is aborting/stopping, exit monitor.", self
Expand All @@ -764,12 +790,12 @@ def _workers_monitoring(self) -> None:
if break_outer_loop:
break

if hosts_status != previous_status:
if hosts_status != self.worker_status:
self.logger.info(
"Hosts status update at %s", datetime.datetime.now()
)
self.logger.info(pprint.pformat(hosts_status))
previous_status = hosts_status
self.worker_status = hosts_status

if (
not hosts_status["active"]
Expand Down
36 changes: 33 additions & 3 deletions testplan/runners/pools/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
import signal
import socket
import threading
import time
from multiprocessing.pool import ThreadPool
from typing import Callable, Dict, List, Optional, Tuple, Type, Union

Expand Down Expand Up @@ -286,6 +288,8 @@ class RemotePool(Pool):

CONFIG = RemotePoolConfig
CONN_MANAGER = ZMQServer
QUEUE_WAIT_INTERVAL = 3
MAX_THREAD_POOL_SIZE = 5

def __init__(
self,
Expand Down Expand Up @@ -321,7 +325,6 @@ def __init__(
options.update(self.filter_locals(locals()))
super(RemotePool, self).__init__(**options)
self._options = options # pass to remote worker later

self._request_handlers[
Message.MetadataPull
] = self._worker_setup_metadata
Expand All @@ -340,11 +343,17 @@ def resource_monitor_address(self) -> Optional[str]:
return self.parent.resource_monitor_server.address

@staticmethod
def _worker_setup_metadata(worker, request, response) -> None:
def _worker_setup_metadata(
worker: RemoteWorker, _: Message, response: Message
) -> None:
worker.respond(
response.make(Message.Metadata, data=worker.setup_metadata)
)

def post_start(self) -> None:
super().post_start()
self._stop_worker_thread.start()

def _add_workers(self) -> None:
"""TODO."""
for instance in self._instances.values():
Expand Down Expand Up @@ -378,13 +387,34 @@ def _start_thread_pool(self) -> None:
size = len(self._instances)
try:
if size > 2:
self.pool = ThreadPool(5 if size > 5 else size)
self.pool = ThreadPool(min(self.MAX_THREAD_POOL_SIZE, size))
except Exception as exc:
if isinstance(exc, AttributeError):
self.logger.warning(
"Please upgrade to the suggested python interpreter."
)

def _early_stop_worker(self, worker_uid: Union[str, int]) -> bool:
with self._pool_lock:
if self.pool and (worker_uid not in self._stopping_queue):
alive_worker_ids = set(
[worker.uid() for worker in self.worker_status["active"]]
)
if self.RESERVE_WORKER_NUM + self.unassigned.size() < len(
alive_worker_ids - set(self._stopping_queue)
):
self.logger.user_info(
"Early stop worker %s", self._workers[worker_uid]
)
print(f"Early stop worker {self._workers[worker_uid]}")
self._stopping_queue.append(worker_uid)
self.pool.apply_async(
self._workers.sync_stop_resource,
(self._workers[worker_uid],),
)
return True
return False

def starting(self) -> None:
self._start_thread_pool()
super(RemotePool, self).starting()
Expand Down

0 comments on commit 6ab796b

Please sign in to comment.