Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1206: add async to aws:s3 and aws:ecr #1192

Merged
merged 27 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
55a1545
init
ramonpetgrave64 Jun 22, 2023
3aa19af
note about nest_asyncio
ramonpetgrave64 Jun 23, 2023
04d7fe9
Hotfix (#1190)
jychp Jun 23, 2023
17f1288
0.81.0 (#1191)
ramonpetgrave64 Jun 23, 2023
362cfd3
Set to None the gsuite token instead of github one (#1204)
heryxpc Jul 10, 2023
94bfabc
0.82.0.dev1
ramonpetgrave64 Jul 11, 2023
38a7a40
'0.82.0.dev2'
ramonpetgrave64 Jul 12, 2023
9feb4fd
Update usage docs and roadmap links (#1196)
achantavy Jul 14, 2023
23185a6
#1210: EBSVolume => new data model, Allow node attr updates from mult…
achantavy Jul 14, 2023
e245b10
Fix index out of range for drift detection returning no results (#1220)
skiptomyliu Jul 17, 2023
f889b53
Add contributing guidelines for issues (#1226)
achantavy Jul 24, 2023
40b075a
add throttling and typing
ramonpetgrave64 Aug 1, 2023
da5c68d
add boto3_session fixture with stubbed clients
ramonpetgrave64 Aug 2, 2023
4c89a35
cleanup
ramonpetgrave64 Aug 2, 2023
c8273f1
rename
ramonpetgrave64 Aug 3, 2023
bdda3d1
Merge branch 'master' into ramonpetgrave64-s3-async
ramonpetgrave64 Aug 3, 2023
f8eadd0
Update setup.py
ramonpetgrave64 Aug 3, 2023
8257fe4
Merge branch 'master' into ramonpetgrave64-s3-async
ramonpetgrave64 Aug 8, 2023
a92b20e
refactor, add examples
ramonpetgrave64 Aug 8, 2023
92c72ad
typing
ramonpetgrave64 Aug 8, 2023
bdc664f
whitespace
ramonpetgrave64 Aug 8, 2023
a06d145
Merge branch 'master' into ramonpetgrave64-s3-async
achantavy Aug 22, 2023
4ac4dfa
add :param : docs
ramonpetgrave64 Aug 22, 2023
128ef7a
Update util.py
ramonpetgrave64 Aug 22, 2023
8f770f7
lint
ramonpetgrave64 Aug 22, 2023
1ad7a8f
lint
ramonpetgrave64 Aug 22, 2023
4edaefb
lint
ramonpetgrave64 Aug 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions cartography/intel/aws/ecr.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Any
from typing import Dict
from typing import List

Expand All @@ -9,6 +10,8 @@
from cartography.util import batch
from cartography.util import run_cleanup_job
from cartography.util import timeit
from cartography.util import to_asynchronous
from cartography.util import to_synchronous

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -139,6 +142,25 @@ def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None:
run_cleanup_job('aws_import_ecr_cleanup.json', neo4j_session, common_job_parameters)


def _get_image_data(
boto3_session: boto3.session.Session,
region: str,
repositories: List[Dict[str, Any]],
) -> Dict[str, Any]:
'''
Given a list of repositories, get the image data for each repository,
return as a mapping from repositoryUri to image object
'''
image_data = {}

async def async_get_images(repo: Dict[str, Any]) -> None:
repo_image_obj = await to_asynchronous(get_ecr_repository_images, boto3_session, region, repo['repositoryName'])
image_data[repo['repositoryUri']] = repo_image_obj
to_synchronous(*[async_get_images(repo) for repo in repositories])

return image_data


@timeit
def sync(
neo4j_session: neo4j.Session, boto3_session: boto3.session.Session, regions: List[str], current_aws_account_id: str,
Expand All @@ -148,9 +170,7 @@ def sync(
logger.info("Syncing ECR for region '%s' in account '%s'.", region, current_aws_account_id)
image_data = {}
repositories = get_ecr_repositories(boto3_session, region)
for repo in repositories:
repo_image_obj = get_ecr_repository_images(boto3_session, region, repo['repositoryName'])
image_data[repo['repositoryUri']] = repo_image_obj
image_data = _get_image_data(boto3_session, region, repositories)
load_ecr_repositories(neo4j_session, repositories, region, current_aws_account_id, update_tag)
repo_images_list = transform_ecr_repository_images(image_data)
load_ecr_repository_images(neo4j_session, repo_images_list, region, update_tag)
Expand Down
30 changes: 23 additions & 7 deletions cartography/intel/aws/s3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import hashlib
import json
import logging
Expand All @@ -20,6 +21,8 @@
from cartography.util import run_analysis_job
from cartography.util import run_cleanup_job
from cartography.util import timeit
from cartography.util import to_asynchronous
from cartography.util import to_synchronous

logger = logging.getLogger(__name__)
stat_handler = get_stats_client(__name__)
Expand Down Expand Up @@ -55,20 +58,33 @@ def get_s3_bucket_details(
# a local store for s3 clients so that we may re-use clients for an AWS region
s3_regional_clients: Dict[Any, Any] = {}

for bucket in bucket_data['Buckets']:
BucketDetail = Tuple[str, Dict[str, Any], Dict[str, Any], Dict[str, Any], Dict[str, Any], Dict[str, Any]]

async def _get_bucket_detail(bucket: Dict[str, Any]) -> BucketDetail:
# Note: bucket['Region'] is sometimes None because
# client.get_bucket_location() does not return a location constraint for buckets
# in us-east-1 region
client = s3_regional_clients.get(bucket['Region'])
if not client:
client = boto3_session.client('s3', bucket['Region'])
s3_regional_clients[bucket['Region']] = client
acl = get_acl(bucket, client)
policy = get_policy(bucket, client)
encryption = get_encryption(bucket, client)
versioning = get_versioning(bucket, client)
public_access_block = get_public_access_block(bucket, client)
yield bucket['Name'], acl, policy, encryption, versioning, public_access_block
(
acl,
policy,
encryption,
versioning,
public_access_block,
) = await asyncio.gather(
to_asynchronous(get_acl, bucket, client),
to_asynchronous(get_policy, bucket, client),
to_asynchronous(get_encryption, bucket, client),
to_asynchronous(get_versioning, bucket, client),
to_asynchronous(get_public_access_block, bucket, client),
)
return bucket['Name'], acl, policy, encryption, versioning, public_access_block

bucket_details = to_synchronous(*[_get_bucket_detail(bucket) for bucket in bucket_data['Buckets']])
yield from bucket_details


@timeit
Expand Down
89 changes: 89 additions & 0 deletions cartography/util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
import logging
import re
import sys
from functools import partial
from functools import wraps
from string import Template
from typing import Any
from typing import Awaitable
from typing import BinaryIO
from typing import Callable
from typing import cast
Expand All @@ -25,6 +28,7 @@
from cartography.stats import get_stats_client
from cartography.stats import ScopedStatsClient


if sys.version_info >= (3, 7):
from importlib.resources import open_binary, read_text
else:
Expand Down Expand Up @@ -141,6 +145,7 @@ def load_resource_binary(package: str, resource_name: str) -> BinaryIO:
return open_binary(package, resource_name)


R = TypeVar('R')
F = TypeVar('F', bound=Callable[..., Any])


Expand Down Expand Up @@ -297,3 +302,87 @@ def batch(items: Iterable, size: int = DEFAULT_BATCH_SIZE) -> List[List]:
items[i: i + size]
for i in range(0, len(items), size)
]


def is_throttling_exception(exc: Exception) -> bool:
'''
Returns True if the exception is caused by a client libraries throttling mechanism
'''
# https://boto3.amazonaws.com/v1/documentation/api/1.19.9/guide/error-handling.html
if isinstance(exc, botocore.exceptions.ClientError):
if exc.response['Error']['Code'] in ['LimitExceededException', 'Throttling']:
return True
# add other exceptions here, if needed, like:
# https://cloud.google.com/python/docs/reference/storage/1.39.0/retry_timeout#configuring-retries
# if isinstance(exc, google.api_core.exceptions.TooManyRequests):
# return True
return False


def to_asynchronous(func: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
'''
Returns a Future that will run a function and its arguments in the default threadpool.
Helper until we start using python 3.9's asyncio.to_thread

Calls are also wrapped within a backoff decorator to handle throttling errors.

example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good that you included an example, but since this is going to be a core function, I think we should have detailed docs on what each of the params is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

def my_func(arg1, arg2, kwarg1):
return arg1 + arg2 + kwarg1

# normal synchronous call:
result = my_func(1, 2, kwarg1=3)

# asynchronous call:
future = to_asynchronous(my_func, 1, 2, kwarg1=3)

# the result is stored in the future, and can be retrieved
# from within another async function with:
await future

# or from within a synchronous function with our helper:
to_synchronous(future)

NOTE: to use this in a Jupyter notebook, you need to do:
# import nest_asyncio
# nest_asyncio.apply()
'''
CartographyThrottlingException = type('CartographyThrottlingException', (Exception,), {})

@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> R:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nonblock][question] What's the typevar R do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It binds the return type of func so that mypy can understand that the wrapped function is supposed to still return type R.

try:
return func(*args, **kwargs)
except Exception as exc:
if is_throttling_exception(exc):
raise CartographyThrottlingException from exc
raise

# don't use @backoff as decorator, to preserve typing
wrapped = backoff.on_exception(backoff.expo, CartographyThrottlingException)(wrapper)
call = partial(wrapped, *args, **kwargs)
return asyncio.get_event_loop().run_in_executor(None, call)


def to_synchronous(*awaitables: Awaitable[Any]) -> List[Any]:
'''
Synchronously waits for the Awaitable(s) to complete and returns their result(s).
See https://docs.python.org/3.8/library/asyncio-task.html#asyncio-awaitables

example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing about explicitly documenting the parameters.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added as the normal :param : spec

async def my_async_func(my_arg):
return my_arg

async def another_async_func(my_arg2):
return my_arg2

remember that an invocation of an async function returns a Future (Awaitable),
which needs to be awaited to get the result. You cannot await a Future from within
a non-async function, so you could use this helper to get the result from a Future

future_1 = my_async_func(1)
future_2 = another_async_func(2)

results = to_synchronous(future_1, future_2)
'''
return asyncio.get_event_loop().run_until_complete(asyncio.gather(*awaitables))
Loading