Skip to content

Commit

Permalink
Task/bulk sms poor csv performance (#350)
Browse files Browse the repository at this point in the history
* Add decorator to process iterables in parallel

* Bump utils + waffles version

* Add auto scaling to parallelizing decorator

- Added a few more tests

* Add exception handling and logging

- Tweaked control_chunk_and_worker_size to ensure that the chunk sizes fit evenly with the number of available workers so long as the chunk size does not exceed the max chunk size. This should mean fewer instances of where a few items from the iterable are left over and a final thread is spawned to process them after the initial batch

* Refine break_condition type hinting

Co-authored-by: Jimmy Royer <[email protected]>

* Add is_atomic flag to control behaviour

- Code cleanups

---------

Co-authored-by: Jumana B <[email protected]>
Co-authored-by: Jimmy Royer <[email protected]>
  • Loading branch information
3 people authored Jan 22, 2025
1 parent 195b462 commit 0fb484b
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/actions/waffles/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ docopt==0.6.2
Flask==2.3.3
markupsafe==2.1.5
setuptools==75.6.0 # required for distutils in Python 3.12
git+https://github.com/cds-snc/[email protected].1#egg=notifications-utils
git+https://github.com/cds-snc/[email protected].2#egg=notifications-utils
62 changes: 62 additions & 0 deletions notifications_utils/decorators.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from typing import Callable, Optional

from flask import current_app

from notifications_utils.iterable import chunk_iterable
from notifications_utils.parallel import control_chunk_and_worker_size


def requires_feature(flag):
def decorator_feature_flag(func):
Expand All @@ -14,3 +19,60 @@ def wrapper(*args, **kwargs):
return wrapper

return decorator_feature_flag


# Parallel processing decorator
def parallel_process_iterable(
chunk_size=10000, max_workers=None, is_atomic=True, break_condition: Optional[Callable[..., bool]] = None
):
"""Decorator to split processing an iterable into chunks and execute in parallel. This should decorate the function responsible for processing each chunk.
If processing can be stopped early, this condition should be defined in the processing function, and the `break_condition` parameter should be provided.
`chunk_size` and `max_workers` are managed internally to optimize performance based on the size of the data to be processed, but can be overridden if necessary.
Args:
chunk_size (int, optional): Defaults to 10,000
max_workers (_type_, optional): Defaults to the number of CPU cores
is_atomic (bool, optional): Defaults to True. If False, any exceptions raised during processing will be caught and logged but will not stop other threads from
continuing.
break_condition (function, optional): A lambda function that defines when parallel execution can be stopped. This applies to the entire iterable, not just the
current chunk. When any of the threads returns a result that satisfies the break condition, the processing is stopped and the results are returned.
"""

def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
data = args[0]
data_size = len(data)
nonlocal chunk_size, max_workers
chunk_size, max_workers = control_chunk_and_worker_size(data_size, chunk_size, max_workers)

def process_chunk(chunk):
return func(chunk, *args[1:])

# Execute in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_chunk, chunk) for chunk in chunk_iterable(data, chunk_size)]
current_app.logger.info(
f"Beginning parallel processing of {data_size} items in {len(futures)} chunks for {func.__name__} across {max_workers} workers."
)
# Combine results
results = []
for future in futures:
result = future.result()
results.append(result)
if break_condition:
try:
if break_condition(result):
return results
except Exception as e:
current_app.logger.warning(f"Break condition exception: {e}")
if is_atomic:
raise e
else:
continue
return results

return wrapper

return decorator
21 changes: 21 additions & 0 deletions notifications_utils/iterable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Helper function to chunk a list
from itertools import islice
from typing import Generator, Iterable


def chunk_iterable(iterable_collection: Iterable, chunk_size: int) -> Generator:
"""Helper function to chunk an iterable collection in preparation for parallel processing.
Args:
iterable_collection (Iterable): The collection to be chunked
chunk_size (int): The size of each chunk
Yields:
list: The next chunk of the iterable
"""
iterable = iter(iterable_collection)
while True:
chunk = list(islice(iterable, chunk_size))
if not chunk:
break
yield chunk
46 changes: 46 additions & 0 deletions notifications_utils/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import math
import multiprocessing


def control_chunk_and_worker_size(data_size=None, chunk_size=None, max_workers=None):
"""Attempts to optimize the chunk size and number of workers based on the size of the data to be processed. The following rules are applied:
- Max concurrently allowed workers is the number of CPU cores
- 1 worker is used when data sets <= 1000
- Chunk sizes are capped at 10,000 and are calculated with: `data_size / max_chunk_size`
- For chunk sizes < 10,000 worker counts are scaled up
- For chunk sizes that would be >= 10,000 the concurrent workers scale down to 5 to limit CPU context switching
Args:
data_size (int, optional): Size of the iterable being chunked. Defaults to 40000.
chunk_size (int, optional): Overrides default chunk_size of 10000. Defaults to None.
max_workers (int, optional): Overrides default max workers of 10. Defaults to None.
Returns:
tuple[int, int]: The optimized chunk size and number of workers to execute in parallel.
"""
MIN_CHUNK_SIZE = 1000
MAX_CHUNK_SIZE = 10000 if not chunk_size else chunk_size
MAX_WORKERS = multiprocessing.cpu_count() if not max_workers else max_workers

if data_size <= MIN_CHUNK_SIZE:
return MIN_CHUNK_SIZE, 1

# Initial chunk size
ideal_chunk_size = max(data_size // MAX_WORKERS, MIN_CHUNK_SIZE)
ideal_chunk_size = min(ideal_chunk_size, MAX_CHUNK_SIZE)

# Adjust the chunk size to ensure no leftovers
worker_count = min(math.ceil(data_size / ideal_chunk_size), MAX_WORKERS) # noqa: F821
chunk_size = math.ceil(data_size / worker_count)

# Ensure chunk size remains within min and max chunk size bounds
chunk_size = max(MIN_CHUNK_SIZE, min(chunk_size, MAX_CHUNK_SIZE))
worker_count = math.ceil(data_size / chunk_size)

# Suppress workers for larger chunks to avoid memory and/or context switching overhead
if chunk_size > MAX_CHUNK_SIZE * 0.8:
worker_count = min(worker_count, MAX_WORKERS // 2)
else:
worker_count = min(worker_count, MAX_WORKERS)

return chunk_size, worker_count
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "notifications-utils"
version = "53.1.1"
version = "53.1.2"
description = "Shared python code for Notification - Provides logging utils etc."
authors = ["Canadian Digital Service"]
license = "MIT license"
Expand Down
105 changes: 104 additions & 1 deletion tests/test_decorators.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from notifications_utils.decorators import requires_feature
import pytest
from notifications_utils.decorators import control_chunk_and_worker_size, parallel_process_iterable, requires_feature


@requires_feature("FEATURE_FLAG")
Expand All @@ -16,3 +17,105 @@ def test_requires_feature_disabled(mocker, app):
app.config["FEATURE_FLAG"] = False
result = decorated_function()
assert result is None


# Sample function to be decorated
@parallel_process_iterable()
def process_chunk(chunk):
return [x * 2 for x in chunk]


def test_parallel_process_iterable(app):
data = [1, 2, 3, 4, 5, 6]
expected_result = [2, 4, 6, 8, 10, 12]
with app.app_context():
result = process_chunk(data)
assert result[0] == expected_result


def test_parallel_process_iterable_with_break_condition(app):
data = [num for num in range(1500, 1, -1)]

def break_condition(result):
return 1500 in result

@parallel_process_iterable(break_condition=break_condition)
def process_chunk_with_break(chunk):
return [x * 2 for x in chunk]

with app.app_context():
results = process_chunk_with_break(data)
assert len(results) == 1 # Only 1 thread should have processed data
assert any(result == 1500 for result in results[0])


@pytest.mark.parametrize(
"data_size, expected_worker_count, expected_chunk_size",
[
(1000, 1, 1000), # data_size <= the minimum chunk size
(900, 1, 900), # data_size <= the minimum chunk size
(
8000,
8,
1000,
), # Small overall data and chunk size, less risk of context switching and CPU overhead, should scale to utilize more workers
(
9000,
9,
1000,
), # Small overall data and chunk size, less risk of context switching and CPU overhead, should scale to utilize more workers
(
20000,
20,
1000,
), # Hitting the max worker count, ensure the worker count stays capped at 28 and chunk_size scales accordingly
(80000, 28, 2858), # Ensure chunk size is scaling, not max workers
],
)
def test_parallel_process_iterable_adjusts_workers_and_chunk_size(
app, data_size, expected_worker_count, expected_chunk_size, mocker
):
data = [num + 1 for num in range(0, data_size, 1)]
mocker.patch("multiprocessing.cpu_count", return_value=28) # m5.large has 28 cores

@parallel_process_iterable()
def process_chunk(chunk):
return [x * 2 for x in chunk]

with app.app_context():
results = process_chunk(data)
assert len(results) == expected_worker_count
assert any(len(result) == expected_chunk_size for result in results)


def test_parallel_process_iterable_raises_break_condition_exceptions_if_atomic(app):
data = [num + 1 for num in range(0, 2000, 1)]

def break_condition(result):
raise ValueError("Something went wrong")

@parallel_process_iterable(chunk_size=2, max_workers=2, break_condition=break_condition, is_atomic=True)
def process_chunk_with_break(chunk):
return [x * 2 for x in chunk]

with pytest.raises(ValueError), app.app_context():
process_chunk_with_break(data)


def test_parallel_process_iterable_continues_on_break_condition_exceptions_if_not_atomic(app):
data = [num + 1 for num in range(0, 2000, 1)]

def break_condition(result):
raise ValueError("Something went wrong")

@parallel_process_iterable(chunk_size=2, max_workers=2, break_condition=break_condition, is_atomic=False)
def process_chunk_with_break(chunk):
return [x * 2 for x in chunk]

results = process_chunk_with_break(data)
assert len(results) == 2


def test_control_chunk_and_worker_size_scales_workers_down_when_chunk_size_exceeds_threshold(mocker):
mocker.patch("multiprocessing.cpu_count", return_value=28) # m5.large has 28 cores
assert control_chunk_and_worker_size(300000) == (10000, 14) # (chunk_size, worker_count)

0 comments on commit 0fb484b

Please sign in to comment.