Skip to content

Commit

Permalink
Merge pull request #394 from DaveYesland/enhancement/improve_credenti…
Browse files Browse the repository at this point in the history
…al_handeling

improve credential handling
  • Loading branch information
DaveYesland authored Jan 9, 2024
2 parents 2a2f50d + c93b5a4 commit a893c0f
Showing 1 changed file with 64 additions and 41 deletions.
105 changes: 64 additions & 41 deletions pacu/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from pacu.core.models import AWSKey, PacuSession, migrations
from pacu.setup_database import setup_database_if_not_present
from sqlalchemy import exc, orm # type: ignore
from pacu.utils import get_database_connection, set_sigint_handler
from pacu.utils import get_database_connection, set_sigint_handler, decode_accesskey_id
except ModuleNotFoundError:
exception_type, exception_value, tb = sys.exc_info()
print('Traceback (most recent call last):\n{}{}: {}\n'.format(''.join(traceback.format_tb(tb)), str(exception_type), str(exception_value)))
Expand Down Expand Up @@ -646,7 +646,7 @@ def parse_commands_from_file(self, command):

def parse_awscli_keys_import(self, command):
if len(command) == 1:
self.display_command_help('import_keys')
self.import_awscli_key_default()
return

boto3_session = boto3.session.Session()
Expand All @@ -659,6 +659,14 @@ def parse_awscli_keys_import(self, command):

self.import_awscli_key(command[1])

def import_awscli_key_default(self) -> None:
answer = input(' No profile specified, do you want to use the systems default credentials? (y/n): ')
if answer.lower() == 'y':
self.set_keys(key_alias='import_from_default', access_key_id='c', secret_access_key='c', session_token='c')
self.get_boto_session()
else:
print(' No keys imported.')

def import_awscli_key(self, profile_name: str) -> None:
try:
boto3_session = boto3.session.Session(profile_name=profile_name)
Expand Down Expand Up @@ -864,17 +872,6 @@ def process_service(service):

def print_web_console_url(self) -> None:
active_session = self.get_active_session()

if active_session.key_alias is None:
print(' No keys have been set. Not generating the URL.')
return
if not active_session.access_key_id:
print(' No access key has been set. Not generating the URL.')
return
if not active_session.secret_access_key:
print(' No secret key has been set. Not generating the URL.')
return

sts = self.get_boto3_client('sts')

if active_session.session_token:
Expand Down Expand Up @@ -940,33 +937,33 @@ def all_region_prompt(self) -> bool:
def export_keys(self, command) -> None:
export = input('Export the active keys to the AWS CLI credentials file (~/.aws/credentials)? (y/n) ').rstrip()

if export.lower() == 'y':
session = self.get_active_session()
session = self.get_active_session()

if not session.access_key_id:
print(' No access key has been set. Not exporting credentials.')
return
if not session.secret_access_key:
print(' No secret key has been set. Not exporting credentials.')
return
if not session.access_key_id:
print(' No access key has been set. Not exporting credentials.')
return
if not session.secret_access_key:
print(' No secret key has been set. Not exporting credentials.')
return

config = """
config = """
\n\n[{}]
aws_access_key_id = {}
aws_secret_access_key = {}
""".format(session.key_alias, session.access_key_id, session.secret_access_key)
if session.session_token:
config = config + 'aws_session_token = "{}"'.format(session.session_token)

config = config + '\n'
if session.session_token:
config = config + 'aws_session_token = "{}"'.format(session.session_token)

config = config + '\n'
if export.lower() == 'y':
with open('{}/.aws/credentials'.format(os.path.expanduser('~')), 'a+') as f:
f.write(config)

print('Successfully exported {}. Use it with the AWS CLI like this: aws ec2 describe instances --profile {}'.format(
session.key_alias, session.key_alias
))
else:
print(config)
return

# ***** Some module notes *****
Expand All @@ -977,15 +974,6 @@ def export_keys(self, command) -> None:
def exec_module(self, command: List[str]) -> None:
session = self.get_active_session()

# Run key checks so that if no keys have been set, Pacu doesn't default to
# the AWSCLI default profile:
if not session.access_key_id:
print(' No access key has been set. Not running module.')
return
if not session.secret_access_key:
print(' No secret key has been set. Not running module.')
return

module_name = command[1].lower()
module = import_module_by_name(module_name, include=['main', 'module_info', 'summary'])

Expand Down Expand Up @@ -1054,9 +1042,9 @@ def display_command_help(self, command_name: str) -> None:
if command_name == 'list' or command_name == 'ls':
print('\n list/ls\n List all modules\n')
elif command_name == 'import_keys':
print('\n import_keys <profile name>|--all\n Import AWS keys from the AWS CLI credentials file (located at ~/.aws/credentials) to the '
print('\n import_keys [profile name]|--all\n Import AWS keys from the AWS CLI credentials file (located at ~/.aws/credentials) to the '
'current sessions database. Enter the name of a profile you would like to import or supply --all to import all the credentials in the '
'file.\n')
'file. No argument will import the default system AWS credentials.\n')
elif command_name == 'assume_role':
print('\n assume_role <role arn>\n Call AssumeRole on the specified role from the current credentials, add the resulting temporary '
'keys to the Pacu key database and start using these new credentials.')
Expand Down Expand Up @@ -1205,6 +1193,7 @@ def list_modules(self, search_term, by_category=False):

def set_keys(self, key_alias: str = None, access_key_id: str = None, secret_access_key: str = None, session_token: str = None):
session = self.get_active_session()
reservered_key_names = ['imported-', 'from_default-', 'import_from_default']

# If key_alias is None, then it's being run normally from the command line (set_keys),
# otherwise it means it is set programmatically and we don't want any prompts if it is
Expand All @@ -1223,6 +1212,9 @@ def set_keys(self, key_alias: str = None, access_key_id: str = None, secret_acce
new_value = self.input('Key alias [{}]: '.format(session.key_alias))
if new_value == '':
new_value = str(session.key_alias)
if any([new_value.startswith(n) for n in reservered_key_names]):
self.print(f'Key alias cannot start with "{", ".join(reservered_key_names)}"')
new_value = ""
else:
new_value = key_alias.strip()
self.print('Key alias [{}]: {}'.format(session.key_alias, new_value), output='file')
Expand Down Expand Up @@ -1478,14 +1470,45 @@ def check_user_agent(self) -> None:
self.print(' {}'.format(new_ua))

def get_boto_session(self, region: str = None) -> boto3.session.Session:
def set_keys_in_session(boto3_sess, key_alias=None):
creds = boto3_sess.get_credentials()
account_id = decode_accesskey_id(creds.access_key)
if not key_alias:
key_alias = f'from_default-{account_id}'
print(f' Setting keys for account: {account_id}')
self.set_keys(key_alias=key_alias, access_key_id=creds.access_key, secret_access_key=creds.secret_key, session_token=creds.token)

session = self.get_active_session()
key_alias = session.key_alias or ''

# If the user has not set any keys, check if they want to use the default system credentials
if (not session.access_key_id) or (not session.secret_access_key):
answer = ''
if key_alias != 'import_from_default':
answer = input(' Access keys not currently set, do you want to use the system default credentials? (y/n): ')
if (answer.lower() == 'y') or (key_alias == 'import_from_default'):
self.print(' Setting keys from AWS system default credentials')
boto3_session = boto3.session.Session(region_name=region)
set_keys_in_session(boto3_session)
return boto3_session
else:
raise UserWarning(' No access keys have been set.')
return None

if not session.access_key_id:
raise UserWarning(' No access key has been set. Failed to generate boto3 Client.')
# Check if the keys were imported from a profile and refresh those creds
if key_alias.startswith('imported-'):
profile_name = key_alias.replace('imported-', '')
boto3_session = boto3.session.Session(profile_name=profile_name)
set_keys_in_session(boto3_session, key_alias)
return boto3_session

if not session.secret_access_key:
raise UserWarning(' No secret key has been set. Failed to generate boto3 Client.')
# Check if the keys were set from system default and refresh
if key_alias.startswith('from_default-'):
boto3_session = boto3.session.Session(region_name=region)
set_keys_in_session(boto3_session, key_alias)
return boto3_session

# If there are explicit keys set, which were not imported just get a session with them
return boto3.session.Session(
region_name=region,
aws_access_key_id=session.access_key_id,
Expand Down

0 comments on commit a893c0f

Please sign in to comment.