-
Notifications
You must be signed in to change notification settings - Fork 340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#1206: add async to aws:s3 and aws:ecr #1192
Changes from all commits
55a1545
3aa19af
04d7fe9
17f1288
362cfd3
94bfabc
38a7a40
9feb4fd
23185a6
e245b10
f889b53
40b075a
da5c68d
4c89a35
c8273f1
bdda3d1
f8eadd0
8257fe4
a92b20e
92c72ad
bdc664f
a06d145
4ac4dfa
128ef7a
8f770f7
1ad7a8f
4edaefb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,12 @@ | ||
import asyncio | ||
import logging | ||
import re | ||
import sys | ||
from functools import partial | ||
from functools import wraps | ||
from string import Template | ||
from typing import Any | ||
from typing import Awaitable | ||
from typing import BinaryIO | ||
from typing import Callable | ||
from typing import cast | ||
|
@@ -25,6 +28,7 @@ | |
from cartography.stats import get_stats_client | ||
from cartography.stats import ScopedStatsClient | ||
|
||
|
||
if sys.version_info >= (3, 7): | ||
from importlib.resources import open_binary, read_text | ||
else: | ||
|
@@ -141,6 +145,7 @@ def load_resource_binary(package: str, resource_name: str) -> BinaryIO: | |
return open_binary(package, resource_name) | ||
|
||
|
||
R = TypeVar('R') | ||
F = TypeVar('F', bound=Callable[..., Any]) | ||
|
||
|
||
|
@@ -297,3 +302,94 @@ def batch(items: Iterable, size: int = DEFAULT_BATCH_SIZE) -> List[List]: | |
items[i: i + size] | ||
for i in range(0, len(items), size) | ||
] | ||
|
||
|
||
def is_throttling_exception(exc: Exception) -> bool: | ||
''' | ||
Returns True if the exception is caused by a client libraries throttling mechanism | ||
''' | ||
# https://boto3.amazonaws.com/v1/documentation/api/1.19.9/guide/error-handling.html | ||
if isinstance(exc, botocore.exceptions.ClientError): | ||
if exc.response['Error']['Code'] in ['LimitExceededException', 'Throttling']: | ||
return True | ||
# add other exceptions here, if needed, like: | ||
# https://cloud.google.com/python/docs/reference/storage/1.39.0/retry_timeout#configuring-retries | ||
# if isinstance(exc, google.api_core.exceptions.TooManyRequests): | ||
# return True | ||
return False | ||
|
||
|
||
def to_asynchronous(func: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]: | ||
''' | ||
Returns a Future that will run a function and its arguments in the default threadpool. | ||
Helper until we start using python 3.9's asyncio.to_thread | ||
|
||
Calls are also wrapped within a backoff decorator to handle throttling errors. | ||
|
||
:param func: the function to be wrapped by the Future | ||
:param args: a series of arguments to be passed into func | ||
:param kwargs: a series of keyword arguments to be passed into func | ||
|
||
example: | ||
def my_func(arg1, arg2, kwarg1): | ||
return arg1 + arg2 + kwarg1 | ||
|
||
# normal synchronous call: | ||
result = my_func(1, 2, kwarg1=3) | ||
|
||
# asynchronous call: | ||
future = to_asynchronous(my_func, 1, 2, kwarg1=3) | ||
|
||
# the result is stored in the future, and can be retrieved | ||
# from within another async function with: | ||
await future | ||
|
||
# or from within a synchronous function with our helper: | ||
to_synchronous(future) | ||
|
||
NOTE: to use this in a Jupyter notebook, you need to do: | ||
# import nest_asyncio | ||
# nest_asyncio.apply() | ||
''' | ||
CartographyThrottlingException = type('CartographyThrottlingException', (Exception,), {}) | ||
|
||
@wraps(func) | ||
def wrapper(*args: Any, **kwargs: Any) -> R: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nonblock][question] What's the typevar R do? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It binds the return type of |
||
try: | ||
return func(*args, **kwargs) | ||
except Exception as exc: | ||
if is_throttling_exception(exc): | ||
raise CartographyThrottlingException from exc | ||
raise | ||
|
||
# don't use @backoff as decorator, to preserve typing | ||
wrapped = backoff.on_exception(backoff.expo, CartographyThrottlingException)(wrapper) | ||
call = partial(wrapped, *args, **kwargs) | ||
return asyncio.get_event_loop().run_in_executor(None, call) | ||
|
||
|
||
def to_synchronous(*awaitables: Awaitable[Any]) -> List[Any]: | ||
''' | ||
Synchronously waits for the Awaitable(s) to complete and returns their result(s). | ||
See https://docs.python.org/3.8/library/asyncio-task.html#asyncio-awaitables | ||
|
||
:param awaitables: a series of Awaitable objects, with each object being its own argument. | ||
i.e., not a single list of Awaitables | ||
|
||
example: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same thing about explicitly documenting the parameters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added as the normal :param : spec |
||
async def my_async_func(my_arg): | ||
return my_arg | ||
|
||
async def another_async_func(my_arg2): | ||
return my_arg2 | ||
|
||
remember that an invocation of an async function returns a Future (Awaitable), | ||
which needs to be awaited to get the result. You cannot await a Future from within | ||
a non-async function, so you could use this helper to get the result from a Future | ||
|
||
future_1 = my_async_func(1) | ||
future_2 = another_async_func(2) | ||
|
||
results = to_synchronous(future_1, future_2) | ||
''' | ||
return asyncio.get_event_loop().run_until_complete(asyncio.gather(*awaitables)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good that you included an example, but since this is going to be a core function, I think we should have detailed docs on what each of the params is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added