Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1206: add async to aws:s3 and aws:ecr #1192

Merged
merged 27 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
55a1545
init
ramonpetgrave64 Jun 22, 2023
3aa19af
note about nest_asyncio
ramonpetgrave64 Jun 23, 2023
04d7fe9
Hotfix (#1190)
jychp Jun 23, 2023
17f1288
0.81.0 (#1191)
ramonpetgrave64 Jun 23, 2023
362cfd3
Set to None the gsuite token instead of github one (#1204)
heryxpc Jul 10, 2023
94bfabc
0.82.0.dev1
ramonpetgrave64 Jul 11, 2023
38a7a40
'0.82.0.dev2'
ramonpetgrave64 Jul 12, 2023
9feb4fd
Update usage docs and roadmap links (#1196)
achantavy Jul 14, 2023
23185a6
#1210: EBSVolume => new data model, Allow node attr updates from mult…
achantavy Jul 14, 2023
e245b10
Fix index out of range for drift detection returning no results (#1220)
skiptomyliu Jul 17, 2023
f889b53
Add contributing guidelines for issues (#1226)
achantavy Jul 24, 2023
40b075a
add throttling and typing
ramonpetgrave64 Aug 1, 2023
da5c68d
add boto3_session fixture with stubbed clients
ramonpetgrave64 Aug 2, 2023
4c89a35
cleanup
ramonpetgrave64 Aug 2, 2023
c8273f1
rename
ramonpetgrave64 Aug 3, 2023
bdda3d1
Merge branch 'master' into ramonpetgrave64-s3-async
ramonpetgrave64 Aug 3, 2023
f8eadd0
Update setup.py
ramonpetgrave64 Aug 3, 2023
8257fe4
Merge branch 'master' into ramonpetgrave64-s3-async
ramonpetgrave64 Aug 8, 2023
a92b20e
refactor, add examples
ramonpetgrave64 Aug 8, 2023
92c72ad
typing
ramonpetgrave64 Aug 8, 2023
bdc664f
whitespace
ramonpetgrave64 Aug 8, 2023
a06d145
Merge branch 'master' into ramonpetgrave64-s3-async
achantavy Aug 22, 2023
4ac4dfa
add :param : docs
ramonpetgrave64 Aug 22, 2023
128ef7a
Update util.py
ramonpetgrave64 Aug 22, 2023
8f770f7
lint
ramonpetgrave64 Aug 22, 2023
1ad7a8f
lint
ramonpetgrave64 Aug 22, 2023
4edaefb
lint
ramonpetgrave64 Aug 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions cartography/intel/aws/ecr.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Any
from typing import Dict
from typing import List

Expand All @@ -9,6 +10,8 @@
from cartography.util import batch
from cartography.util import run_cleanup_job
from cartography.util import timeit
from cartography.util import to_asynchronous
from cartography.util import to_synchronous

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -139,6 +142,21 @@ def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None:
run_cleanup_job('aws_import_ecr_cleanup.json', neo4j_session, common_job_parameters)


def _get_image_data(boto3_session: boto3.session.Session, region: str, repositories: List[Dict]) -> Dict:
ramonpetgrave64 marked this conversation as resolved.
Show resolved Hide resolved
'''
Given a list of repositories, get the image data for each repository,
return as a mapping from repositoryUri to image object
ramonpetgrave64 marked this conversation as resolved.
Show resolved Hide resolved
'''
image_data = {}

async def async_get_images(repo: Dict[str, Any]) -> None:
repo_image_obj = await to_asynchronous(get_ecr_repository_images, boto3_session, region, repo['repositoryName'])
image_data[repo['repositoryUri']] = repo_image_obj
to_synchronous(*[async_get_images(repo) for repo in repositories])

return image_data


@timeit
def sync(
neo4j_session: neo4j.Session, boto3_session: boto3.session.Session, regions: List[str], current_aws_account_id: str,
Expand All @@ -148,9 +166,7 @@ def sync(
logger.info("Syncing ECR for region '%s' in account '%s'.", region, current_aws_account_id)
image_data = {}
repositories = get_ecr_repositories(boto3_session, region)
for repo in repositories:
repo_image_obj = get_ecr_repository_images(boto3_session, region, repo['repositoryName'])
image_data[repo['repositoryUri']] = repo_image_obj
image_data = _get_image_data(boto3_session, region, repositories)
load_ecr_repositories(neo4j_session, repositories, region, current_aws_account_id, update_tag)
repo_images_list = transform_ecr_repository_images(image_data)
load_ecr_repository_images(neo4j_session, repo_images_list, region, update_tag)
Expand Down
30 changes: 23 additions & 7 deletions cartography/intel/aws/s3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import hashlib
import json
import logging
Expand All @@ -20,6 +21,8 @@
from cartography.util import run_analysis_job
from cartography.util import run_cleanup_job
from cartography.util import timeit
from cartography.util import to_asynchronous
from cartography.util import to_synchronous

logger = logging.getLogger(__name__)
stat_handler = get_stats_client(__name__)
Expand Down Expand Up @@ -55,20 +58,33 @@ def get_s3_bucket_details(
# a local store for s3 clients so that we may re-use clients for an AWS region
s3_regional_clients: Dict[Any, Any] = {}

for bucket in bucket_data['Buckets']:
BucketDetail = Tuple[str, Dict[str, Any], Dict[str, Any], Dict[str, Any], Dict[str, Any], Dict[str, Any]]

async def _get_bucket_detail(bucket: Dict[str, Any]) -> BucketDetail:
# Note: bucket['Region'] is sometimes None because
# client.get_bucket_location() does not return a location constraint for buckets
# in us-east-1 region
client = s3_regional_clients.get(bucket['Region'])
if not client:
client = boto3_session.client('s3', bucket['Region'])
s3_regional_clients[bucket['Region']] = client
acl = get_acl(bucket, client)
policy = get_policy(bucket, client)
encryption = get_encryption(bucket, client)
versioning = get_versioning(bucket, client)
public_access_block = get_public_access_block(bucket, client)
yield bucket['Name'], acl, policy, encryption, versioning, public_access_block
(
acl,
policy,
encryption,
versioning,
public_access_block,
) = await asyncio.gather(
to_asynchronous(get_acl, bucket, client),
to_asynchronous(get_policy, bucket, client),
to_asynchronous(get_encryption, bucket, client),
to_asynchronous(get_versioning, bucket, client),
to_asynchronous(get_public_access_block, bucket, client),
)
return bucket['Name'], acl, policy, encryption, versioning, public_access_block

bucket_details = to_synchronous(*[_get_bucket_detail(bucket) for bucket in bucket_data['Buckets']])
yield from bucket_details


@timeit
Expand Down
49 changes: 49 additions & 0 deletions cartography/util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
import logging
import re
import sys
from functools import partial
from functools import wraps
from string import Template
from typing import Any
from typing import Awaitable
from typing import BinaryIO
from typing import Callable
from typing import cast
Expand All @@ -25,6 +28,7 @@
from cartography.stats import get_stats_client
from cartography.stats import ScopedStatsClient


if sys.version_info >= (3, 7):
from importlib.resources import open_binary, read_text
else:
Expand Down Expand Up @@ -141,6 +145,7 @@ def load_resource_binary(package: str, resource_name: str) -> BinaryIO:
return open_binary(package, resource_name)


R = TypeVar('R')
F = TypeVar('F', bound=Callable[..., Any])


Expand Down Expand Up @@ -297,3 +302,47 @@ def batch(items: Iterable, size: int = DEFAULT_BATCH_SIZE) -> List[List]:
items[i: i + size]
for i in range(0, len(items), size)
]


def to_asynchronous(func: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
'''
Returns a Future that will run a function in the default threadpool.
Helper until we start using pytohn 3.9's asyncio.to_thread
ramonpetgrave64 marked this conversation as resolved.
Show resolved Hide resolved

Calls are also wrapped within a backoff decorator to handle throttling errors.

example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good that you included an example, but since this is going to be a core function, I think we should have detailed docs on what each of the params is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

future = to_async(my_func, my_arg, my_arg2)
to_sync(future)

NOTE: to use this in a Jupyter notebook, you need to do:
# import nest_asyncio
# nest_asyncio.apply()
'''
CartographyThrottlingException = type('CartographyThrottlingException', (Exception,), {})
throttling_error_codes = ['LimitExceededException', 'Throttling']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This set of functions is located in cartography.util. Other libraries might use different exception names, so it might get unwieldy to have AWS, GCP, Azure names for throttling exceptions all in one place. I wonder if it might make sense to have this in cartography.aws.util (or whatever the equivalent is) first until we have an example of doing this with another module.


@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> R:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nonblock][question] What's the typevar R do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It binds the return type of func so that mypy can understand that the wrapped function is supposed to still return type R.

try:
return func(*args, **kwargs)
except botocore.exceptions.ClientError as error:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This func is located in cartography.util. If we decide to use this for other sync jobs (non-AWS), will we add all of the different exception types here too, like will GCP exceptions be here too? Might end up having a lot of excepts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I've refactored to another is_throttling_exception() function

if error.response['Error']['Code'] in throttling_error_codes:
raise CartographyThrottlingException from error
raise

# don't use @backoff as decorator, to preserve typing
wrapped = backoff.on_exception(backoff.expo, CartographyThrottlingException)(wrapper)
call = partial(wrapped, *args, **kwargs)
return asyncio.get_event_loop().run_in_executor(None, call)


def to_synchronous(*awaitables: Awaitable[Any]) -> Any:
'''
Waits for the Awaitable(s) to complete and returns their result(s).
See https://docs.python.org/3.8/library/asyncio-task.html#asyncio-awaitables

example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing about explicitly documenting the parameters.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added as the normal :param : spec

result = to_sync(my_async_func(my_arg), another_async(my_arg2)))
'''
return asyncio.get_event_loop().run_until_complete(asyncio.gather(*awaitables))
Loading