Skip to content

Commit

Permalink
Add Blinker signals to updater (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
patricksanders authored Jun 8, 2020
1 parent d5d8619 commit 00116ba
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ ENV/
# Rope project settings
.ropeproject

# PyCharm project settings
.idea/

# mypy stuff
.mypy_cache/

Expand Down
56 changes: 56 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,62 @@ The `regex` query is only supported in Postgres (natively) and SQLite (via some
### TLS
We recommend enabling TLS for any service. Instructions for setting up TLS are out of scope for this document.

## Signals

> New in v0.3.1
Aardvark uses [Blinker](https://pythonhosted.org/blinker/) for signals in its update process. These signals can be used
for things like emitting metrics, additional logging, or taking more actions on accounts. You can use them by writing a
script that defines your handlers and calls `aardvark.manage.main()`. For example, create a file called
`signals_example.py` with the following contents:

```python
import logging

from aardvark.manage import main
from aardvark.updater import AccountToUpdate

logger = logging.getLogger('aardvark_signals')


@AccountToUpdate.on_ready.connect
def handle_on_ready(sender):
logger.info(f"got on_ready from {sender}")


@AccountToUpdate.on_complete.connect
def handle_on_complete(sender):
logger.info(f"got on_complete from {sender}")


if __name__ == "__main__":
main()
```

This file can now be invoked in the same way as `manage.py`:

```bash
python signals_example.py update -a cool_account
```

The log output will be similar to the following:

```
INFO: getting bucket swag-bucket
INFO: Thread #1 updating account 123456789012 with all arns
INFO: got on_ready from <aardvark.updater.AccountToUpdate object at 0x10c379b50>
INFO: got on_complete from <aardvark.updater.AccountToUpdate object at 0x10c379b50>
INFO: Thread #1 persisting data for account 123456789012
INFO: Thread #1 FINISHED persisting data for account 123456789012
```

### Available signals

| Class | Signals |
|-------|---------|
| `manage.UpdateAccountThread` | `on_ready`, `on_complete`, `on_failure` |
| `updater.AccountToUpdate` | `on_ready`, `on_complete`, `on_error`, `on_failure` |

## TODO:

See [TODO](TODO.md)
8 changes: 4 additions & 4 deletions aardvark/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
__summary__ = ("Multi-Account AWS IAM Access Advisor API")
__uri__ = "https://github.com/Netflix-Skunkworks/aardvark"

__version__ = "0.3.0"
__version__ = "0.3.1"

__author__ = "Patrick Kelley, Travis McPeak"
__email__ = "[email protected], tmcpeak@netflix.com"
__author__ = "Patrick Kelley, Travis McPeak, Patrick Sanders"
__email__ = "aardvark-maintainers@netflix.com"

__license__ = "Apache License, Version 2.0"
__copyright__ = "Copyright 2017 {0}".format(__author__)
__copyright__ = "Copyright 2020 {0}".format(__author__)
9 changes: 7 additions & 2 deletions aardvark/manage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#ensure absolute import for python3
from __future__ import absolute_import

import json
import os
try:
import queue as Queue # Queue renamed to queue in py3
Expand All @@ -11,8 +10,8 @@
import threading

import better_exceptions # noqa
from blinker import Signal
from bunch import Bunch
from distutils.spawn import find_executable
from flask import current_app
from flask_script import Manager, Command, Option
from swag_client.backend import SWAGManager
Expand Down Expand Up @@ -52,6 +51,9 @@

class UpdateAccountThread(threading.Thread):
global ACCOUNT_QUEUE, DB_LOCK, QUEUE_LOCK, UPDATE_DONE
on_ready = Signal()
on_complete = Signal()
on_failure = Signal()

def __init__(self, thread_ID):
self.thread_ID = thread_ID
Expand All @@ -60,6 +62,7 @@ def __init__(self, thread_ID):

def run(self):
while not UPDATE_DONE:
self.on_ready.send(self)

QUEUE_LOCK.acquire()

Expand All @@ -75,6 +78,7 @@ def run(self):
ret_code, aa_data = account.update_account()

if ret_code != 0: # retrieve wasn't successful, put back on queue
self.on_failure.send(self)
QUEUE_LOCK.acquire()
ACCOUNT_QUEUE.put((account_num, role_name, arns))
QUEUE_LOCK.release()
Expand All @@ -85,6 +89,7 @@ def run(self):
persist_aa_data(self.app, aa_data)
DB_LOCK.release()

self.on_complete.send(self)
self.app.logger.info("Thread #{} FINISHED persisting data for account {}".format(self.thread_ID, account_num))
else:
QUEUE_LOCK.release()
Expand Down
23 changes: 17 additions & 6 deletions aardvark/updater/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
import copy
import time

from blinker import Signal
from cloudaux.aws.iam import list_roles, list_users
from cloudaux.aws.sts import boto3_cached_conn
from cloudaux.aws.decorators import rate_limited


class AccountToUpdate(object):
on_ready = Signal()
on_complete = Signal()
on_error = Signal()
on_failure = Signal()

def __init__(self, current_app, account_number, role_name, arns_list):
self.current_app = current_app
self.account_number = account_number
Expand All @@ -34,6 +40,7 @@ def update_account(self):
:return: Return code and JSON Access Advisor data for given account
"""
self.on_ready.send(self)
arns = self._get_arns()

if not arns:
Expand All @@ -43,10 +50,12 @@ def update_account(self):
client = self._get_client()
try:
details = self._call_access_advisor(client, list(arns))
except Exception:
self.current_app.logger.exception('Failed to call access advisor')
except Exception as e:
self.on_failure.send(self, error=e)
self.current_app.logger.exception('Failed to call access advisor', exc_info=True)
return 255, None
else:
self.on_complete.send(self)
return 0, details

def _get_arns(self):
Expand Down Expand Up @@ -123,8 +132,9 @@ def _generate_job_ids(self, iam, arns):
except iam.exceptions.NoSuchEntityException:
""" We're here because this ARN disappeared since the call to self._get_arns(). Log the missing ARN and move along. """
self.current_app.logger.info('ARN {arn} found gone when fetching details'.format(arn=role_arn))
except Exception:
self.current_app.logger.error('Could not gather data from {0}.'.format(role_arn))
except Exception as e:
self.on_error.send(self, error=e)
self.current_app.logger.error('Could not gather data from {0}.'.format(role_arn), exc_info=True)
return jobs

def _get_job_results(self, iam, jobs):
Expand All @@ -146,8 +156,9 @@ def _get_job_results(self, iam, jobs):
role_arn = jobs[job_id]
try:
details = self._get_service_last_accessed_details(iam, job_id)
except Exception:
self.current_app.logger.error('Could not gather data from {0}.'.format(role_arn))
except Exception as e:
self.on_error.send(self, error=e)
self.current_app.logger.error('Could not gather data from {0}.'.format(role_arn), exc_info=True)
continue

# Check job status
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ aniso8601==8.0.0
astroid==2.3.1
attrs==19.3.0
better-exceptions==0.1.7
blinker==1.4
boto==2.49.0
boto3==1.9.252
botocore==1.12.252
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
install_requires = [
'requests>=2.9.1',
'better_exceptions==0.1.7',
'blinker>=1.4',
'Bunch==1.0.1',
'Flask-SQLAlchemy==2.2',
'cloudaux>=1.2.0',
Expand Down

0 comments on commit 00116ba

Please sign in to comment.