Skip to content

Commit

Permalink
allow set_thread_pool_size to be set after pool has been once used al…
Browse files Browse the repository at this point in the history
…ready
  • Loading branch information
mjurbanski-reef committed Feb 25, 2024
1 parent bf573f3 commit 2c7ce95
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 42 deletions.
67 changes: 62 additions & 5 deletions b2sdk/utils/thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,84 @@
######################################################################
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Callable

from b2sdk.utils import B2TraceMetaAbstract


class LazyThreadPool:
"""
Lazily initialized thread pool.
"""

_THREAD_POOL_FACTORY = ThreadPoolExecutor

def __init__(self, max_workers: int | None = None, **kwargs):
self._max_workers = max_workers
self._thread_pool: ThreadPoolExecutor | None = None
super().__init__(**kwargs)

def submit(self, fn: Callable, *args, **kwargs) -> Future:
if self._thread_pool is None:
self._thread_pool = self._THREAD_POOL_FACTORY(self._max_workers)
return self._thread_pool.submit(fn, *args, **kwargs)

def set_size(self, max_workers: int) -> None:
"""
Set the size of the thread pool.
This operation will block until all tasks in the current thread pool are completed.
:param max_workers: New size of the thread pool
:return: None
"""
if self._max_workers == max_workers:
return
old_thread_pool = self._thread_pool
self._thread_pool = self._THREAD_POOL_FACTORY(max_workers=max_workers)
if old_thread_pool is not None:
old_thread_pool.shutdown(wait=True)
self._max_workers = max_workers

def get_size(self) -> int | None:
return self._max_workers


class ThreadPoolMixin(metaclass=B2TraceMetaAbstract):
"""
Mixin class with ThreadPoolExecutor.
"""
DEFAULT_THREAD_POOL_CLASS = staticmethod(ThreadPoolExecutor)

DEFAULT_THREAD_POOL_CLASS = LazyThreadPool

def __init__(
self,
thread_pool: ThreadPoolExecutor | None = None,
max_workers: int | None = None,
**kwargs
**kwargs,
):
"""
:param thread_pool: thread pool to be used
:param max_workers: maximum number of worker threads (ignored if thread_pool is not None)
"""
self._thread_pool = thread_pool if thread_pool is not None \
else self.DEFAULT_THREAD_POOL_CLASS(max_workers=max_workers)
self._thread_pool = (
thread_pool
if thread_pool is not None else self.DEFAULT_THREAD_POOL_CLASS(max_workers=max_workers)
)
self._max_workers = max_workers
super().__init__(**kwargs)

def set_thread_pool_size(self, max_workers: int) -> None:
"""
Set the size of the thread pool.
This operation will block until all tasks in the current thread pool are completed.
:param max_workers: New size of the thread pool
:return: None
"""
return self._thread_pool.set_size(max_workers)

def get_thread_pool_size(self) -> int | None:
return self._thread_pool.get_size()
37 changes: 4 additions & 33 deletions b2sdk/v2/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,17 @@
######################################################################
from __future__ import annotations

from concurrent.futures import Future, ThreadPoolExecutor
from typing import Callable

from b2sdk import _v3 as v3


class LazyThreadPool:
"""
Lazily initialized thread pool.
"""

def __init__(self, max_workers: int | None = None, **kwargs):
self._max_workers = max_workers
self._thread_pool = None # type: 'Optional[ThreadPoolExecutor]'
super().__init__(**kwargs)

def submit(self, fn: Callable, *args, **kwargs) -> Future:
if self._thread_pool is None:
self._thread_pool = ThreadPoolExecutor(self._max_workers)
return self._thread_pool.submit(fn, *args, **kwargs)

def set_size(self, max_workers: int) -> None:
if self._max_workers == max_workers:
return
if self._thread_pool is not None:
raise RuntimeError('Thread pool already created')
self._max_workers = max_workers
from b2sdk.utils.thread_pool import LazyThreadPool # noqa: F401


class ThreadPoolMixin(v3.ThreadPoolMixin):
DEFAULT_THREAD_POOL_CLASS = staticmethod(LazyThreadPool)

# This method is used in CLI even though it doesn't belong to the public API
def set_thread_pool_size(self, max_workers: int) -> None:
self._thread_pool.set_size(max_workers)
pass


class DownloadManager(v3.DownloadManager, ThreadPoolMixin):
class DownloadManager(v3.DownloadManager):
pass


class UploadManager(v3.UploadManager, ThreadPoolMixin):
class UploadManager(v3.UploadManager):
pass
1 change: 1 addition & 0 deletions changelog.d/+set_threads.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `set_thread_pool_size`, `get_thread_pool_size` to *Manger classes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
######################################################################
#
# File: test/unit/v2/test_transfer.py
# File: test/unit/v_all/test_transfer.py
#
# Copyright 2022 Backblaze Inc. All Rights Reserved.
#
Expand All @@ -11,19 +11,24 @@

from unittest.mock import Mock

from apiver_deps import DownloadManager, UploadManager

from ..test_base import TestBase
from .apiver.apiver_deps import DownloadManager, UploadManager


class TestDownloadManager(TestBase):
def test_set_thread_pool_size(self) -> None:
download_manager = DownloadManager(services=Mock())
assert download_manager.get_thread_pool_size() is None
download_manager.set_thread_pool_size(21)
self.assertEqual(download_manager._thread_pool._max_workers, 21)
assert download_manager._thread_pool._max_workers == 21
assert download_manager.get_thread_pool_size() == 21


class TestUploadManager(TestBase):
def test_set_thread_pool_size(self) -> None:
upload_manager = UploadManager(services=Mock())
assert upload_manager.get_thread_pool_size() is None
upload_manager.set_thread_pool_size(37)
self.assertEqual(upload_manager._thread_pool._max_workers, 37)
assert upload_manager._thread_pool._max_workers == 37
assert upload_manager.get_thread_pool_size() == 37

0 comments on commit 2c7ce95

Please sign in to comment.