Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add early stop in pool. #1122

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions testplan/common/entity/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,39 @@ def start_in_pool(self, pool):
resource.wait(resource.STATUS.STARTED)
resource.logger.info("%s started", resource)

def stop(self, is_reversed=False):
def sync_stop_resource(self, resource: "Resource"):
"""
Stop a resource and log exceptions.
"""
if resource.status in (
resource.STATUS.STOPPING,
resource.STATUS.STOPPED,
):
return

resource.logger.info("Stopping %s", resource)
try:
resource.stop()
except Exception:
self._record_resource_exception(
message="While stopping resource {resource}:\n"
"{traceback_exc}\n{fetch_msg}",
resource=resource,
msg_store=self.stop_exceptions,
)

# Resource status should be STOPPED even it failed to stop
resource.force_stopped()
else:
if resource.async_start:
resource.wait(resource.STATUS.STOPPED)
resource.logger.info("%s stopped", resource)

def stop(self, is_reversed: bool = False):
"""
Stop all resources, optionally in reverse order, and log exceptions.

:param is_reversed: flag whether to stop resources in reverse order
:type is_reversed: ``bool``
"""
resources = list(self._resources.values())
if is_reversed is True:
Expand All @@ -284,6 +311,11 @@ def stop(self, is_reversed=False):
# Stop all resources
resources_to_wait_for = []
for resource in resources:
if resource.status in (
resource.STATUS.STOPPING,
resource.STATUS.STOPPED,
):
continue
try:
resource.stop()
except Exception:
Expand Down Expand Up @@ -335,6 +367,11 @@ def stop_in_pool(self, pool, is_reversed=False):
# Stop all resources
resources_to_wait_for = []
for resource in resources:
if resource.status in (
resource.STATUS.STOPPING,
resource.STATUS.STOPPED,
):
continue
pool.apply_async(
self._log_exception(
resource, resource.stop, self.stop_exceptions
Expand Down
50 changes: 38 additions & 12 deletions testplan/runners/pools/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@
import threading
import time
import traceback
from typing import Dict, Generator, List, Optional, Tuple, Type, Union
from typing import (
Dict,
Generator,
List,
Optional,
Tuple,
Type,
Union,
Callable,
)

from schema import And, Or, Use

Expand Down Expand Up @@ -36,6 +45,9 @@ def __init__(self) -> None:
self.q = queue.PriorityQueue()
self.count = 0

def size(self) -> int:
return self.q.qsize()

def put(self, priority: int, item: str) -> None:
self.q.put((priority, self.count, item))
self.count += 1
Expand Down Expand Up @@ -336,6 +348,7 @@ class Pool(Executor):

CONFIG = PoolConfig
CONN_MANAGER = QueueServer
RESERVE_WORKER_NUM = 3

def __init__(
self,
Expand Down Expand Up @@ -367,7 +380,9 @@ def __init__(
# Methods for handling different Message types. These are expected to
# take the worker, request and response objects as the only required
# positional args.
self._request_handlers = {
self._request_handlers: Dict[
str, Callable[[Worker, Message, Message], None]
] = {
Message.ConfigRequest: self._handle_cfg_request,
Message.TaskPullRequest: self._handle_taskpull_request,
Message.TaskResults: self._handle_taskresults,
Expand All @@ -382,6 +397,13 @@ def __init__(
# NOTE: the critical section would be rather meaningless
self._discard_pending_lock = threading.RLock()
self._discard_pending = False
self._stopping_queue = []
self.worker_status = {
"active": [],
"inactive": [],
"initializing": [],
"abort": [],
}

def uid(self) -> str:
"""Pool name."""
Expand Down Expand Up @@ -532,6 +554,10 @@ def _handle_taskpull_request(
try:
priority, uid = self.unassigned.get()
except queue.Empty:
self.logger.debug("No tasks to assign to %s", worker)
if self._early_stop_worker(worker.uid()):
worker.respond(response.make(Message.Stop))
return
break

task = self._input[uid]
Expand Down Expand Up @@ -699,6 +725,10 @@ def _handle_setupfailed(
worker.respond(response.make(Message.Ack))
self._decommission_worker(worker, "Aborting {}, setup failed.")

def _early_stop_worker(self, worker_uid: Union[str, int]) -> bool:
# Thread pool doesn't support early stop
return False

def _decommission_worker(self, worker: Worker, message: str) -> None:
"""
Decommission a worker by move all assigned task back to pool
Expand All @@ -721,12 +751,7 @@ def _workers_monitoring(self) -> None:
1) handler status
2) heartbeat if available
"""
previous_status = {
"active": [],
"inactive": [],
"initializing": [],
"abort": [],
}

loop_interval = self.cfg.worker_heartbeat or 5 # seconds
break_outer_loop = False

Expand All @@ -747,8 +772,9 @@ def _workers_monitoring(self) -> None:
and self.status != self.status.STOPPING
and self.status != self.status.STOPPED
):
if self._handle_inactive(worker, reason):
status = "active"
if worker.uid() not in self._stopping_queue:
if self._handle_inactive(worker, reason):
status = "active"
else:
self.logger.user_info(
"%s is aborting/stopping, exit monitor.", self
Expand All @@ -761,12 +787,12 @@ def _workers_monitoring(self) -> None:
if break_outer_loop:
break

if hosts_status != previous_status:
if hosts_status != self.worker_status:
self.logger.info(
"Hosts status update at %s", datetime.datetime.now()
)
self.logger.info(pprint.pformat(hosts_status))
previous_status = hosts_status
self.worker_status = hosts_status

if (
not hosts_status["active"]
Expand Down
27 changes: 25 additions & 2 deletions testplan/runners/pools/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ class RemotePool(Pool):

CONFIG = RemotePoolConfig
CONN_MANAGER = ZMQServer
QUEUE_WAIT_INTERVAL = 3
MAX_THREAD_POOL_SIZE = 5

def __init__(
self,
Expand Down Expand Up @@ -322,7 +324,6 @@ def __init__(
options.update(self.filter_locals(locals()))
super(RemotePool, self).__init__(**options)
self._options = options # pass to remote worker later

self._request_handlers[
Message.MetadataPull
] = self._worker_setup_metadata
Expand All @@ -341,7 +342,9 @@ def resource_monitor_address(self) -> Optional[str]:
return self.parent.resource_monitor_server.address

@staticmethod
def _worker_setup_metadata(worker, request, response) -> None:
def _worker_setup_metadata(
worker: RemoteWorker, _: Message, response: Message
) -> None:
worker.respond(
response.make(Message.Metadata, data=worker.setup_metadata)
)
Expand Down Expand Up @@ -386,6 +389,26 @@ def _start_thread_pool(self) -> None:
"Please upgrade to the suggested python interpreter."
)

def _early_stop_worker(self, worker_uid: Union[str, int]) -> bool:
with self._pool_lock:
if self.pool and (worker_uid not in self._stopping_queue):
alive_worker_ids = set(
[worker.uid() for worker in self.worker_status["active"]]
)
if self.RESERVE_WORKER_NUM + self.unassigned.size() < len(
alive_worker_ids - set(self._stopping_queue)
):
self.logger.user_info(
"Early stop worker %s", self._workers[worker_uid]
)
self._stopping_queue.append(worker_uid)
self.pool.apply_async(
self._workers.sync_stop_resource,
(self._workers[worker_uid],),
)
return True
return False

def starting(self) -> None:
self._start_thread_pool()
super(RemotePool, self).starting()
Expand Down
Loading