From 47dcdcd3995c928734bc87a4fc7e575811c186da Mon Sep 17 00:00:00 2001 From: DaveYesland Date: Mon, 8 Jan 2024 20:10:46 -0800 Subject: [PATCH 1/3] Update handeling of credentials and importing credentials --- pacu/main.py | 109 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 65 insertions(+), 44 deletions(-) diff --git a/pacu/main.py b/pacu/main.py index 77319374..b001159e 100644 --- a/pacu/main.py +++ b/pacu/main.py @@ -35,7 +35,7 @@ from pacu.core.models import AWSKey, PacuSession, migrations from pacu.setup_database import setup_database_if_not_present from sqlalchemy import exc, orm # type: ignore - from pacu.utils import get_database_connection, set_sigint_handler + from pacu.utils import get_database_connection, set_sigint_handler, decode_accesskey_id except ModuleNotFoundError: exception_type, exception_value, tb = sys.exc_info() print('Traceback (most recent call last):\n{}{}: {}\n'.format(''.join(traceback.format_tb(tb)), str(exception_type), str(exception_value))) @@ -642,7 +642,7 @@ def parse_commands_from_file(self, command): def parse_awscli_keys_import(self, command): if len(command) == 1: - self.display_command_help('import_keys') + self.import_awscli_key_default() return boto3_session = boto3.session.Session() @@ -655,6 +655,14 @@ def parse_awscli_keys_import(self, command): self.import_awscli_key(command[1]) + def import_awscli_key_default(self) -> None: + answer = input(' No profile specified, do you want to use the systems default credentials? (y/n): ') + if answer.lower() == 'y': + self.set_keys(key_alias='import_from_default', access_key_id='c', secret_access_key='c', session_token='c') + self.get_boto_session() + else: + print(' No keys imported.') + def import_awscli_key(self, profile_name: str) -> None: try: boto3_session = boto3.session.Session(profile_name=profile_name) @@ -860,17 +868,6 @@ def process_service(service): def print_web_console_url(self) -> None: active_session = self.get_active_session() - - if active_session.key_alias is None: - print(' No keys have been set. Not generating the URL.') - return - if not active_session.access_key_id: - print(' No access key has been set. Not generating the URL.') - return - if not active_session.secret_access_key: - print(' No secret key has been set. Not generating the URL.') - return - sts = self.get_boto3_client('sts') if active_session.session_token: @@ -936,26 +933,25 @@ def all_region_prompt(self) -> bool: def export_keys(self, command) -> None: export = input('Export the active keys to the AWS CLI credentials file (~/.aws/credentials)? (y/n) ').rstrip() - if export.lower() == 'y': - session = self.get_active_session() + session = self.get_active_session() - if not session.access_key_id: - print(' No access key has been set. Not exporting credentials.') - return - if not session.secret_access_key: - print(' No secret key has been set. Not exporting credentials.') - return + if not session.access_key_id: + print(' No access key has been set. Not exporting credentials.') + return + if not session.secret_access_key: + print(' No secret key has been set. Not exporting credentials.') + return - config = """ + config = """ \n\n[{}] aws_access_key_id = {} aws_secret_access_key = {} """.format(session.key_alias, session.access_key_id, session.secret_access_key) - if session.session_token: - config = config + 'aws_session_token = "{}"'.format(session.session_token) - - config = config + '\n' + if session.session_token: + config = config + 'aws_session_token = "{}"'.format(session.session_token) + config = config + '\n' + if export.lower() == 'y': with open('{}/.aws/credentials'.format(os.path.expanduser('~')), 'a+') as f: f.write(config) @@ -963,6 +959,7 @@ def export_keys(self, command) -> None: session.key_alias, session.key_alias )) else: + print(config) return # ***** Some module notes ***** @@ -973,15 +970,6 @@ def export_keys(self, command) -> None: def exec_module(self, command: List[str]) -> None: session = self.get_active_session() - # Run key checks so that if no keys have been set, Pacu doesn't default to - # the AWSCLI default profile: - if not session.access_key_id: - print(' No access key has been set. Not running module.') - return - if not session.secret_access_key: - print(' No secret key has been set. Not running module.') - return - module_name = command[1].lower() module = import_module_by_name(module_name, include=['main', 'module_info', 'summary']) @@ -1050,9 +1038,9 @@ def display_command_help(self, command_name: str) -> None: if command_name == 'list' or command_name == 'ls': print('\n list/ls\n List all modules\n') elif command_name == 'import_keys': - print('\n import_keys |--all\n Import AWS keys from the AWS CLI credentials file (located at ~/.aws/credentials) to the ' + print('\n import_keys [profile name]|--all\n Import AWS keys from the AWS CLI credentials file (located at ~/.aws/credentials) to the ' 'current sessions database. Enter the name of a profile you would like to import or supply --all to import all the credentials in the ' - 'file.\n') + 'file. No argument will import the default system AWS credentials.\n') elif command_name == 'assume_role': print('\n assume_role \n Call AssumeRole on the specified role from the current credentials, add the resulting temporary ' 'keys to the Pacu key database and start using these new credentials.') @@ -1201,6 +1189,7 @@ def list_modules(self, search_term, by_category=False): def set_keys(self, key_alias: str = None, access_key_id: str = None, secret_access_key: str = None, session_token: str = None): session = self.get_active_session() + reservered_key_names = ['imported-','from_default-','import_from_default'] # If key_alias is None, then it's being run normally from the command line (set_keys), # otherwise it means it is set programmatically and we don't want any prompts if it is @@ -1219,6 +1208,9 @@ def set_keys(self, key_alias: str = None, access_key_id: str = None, secret_acce new_value = self.input('Key alias [{}]: '.format(session.key_alias)) if new_value == '': new_value = str(session.key_alias) + if any([new_value.startswith(n) for n in reservered_key_names]): + self.print(f'Key alias cannot start with "{", ".join(reservered_key_names)}"') + new_value = "" else: new_value = key_alias.strip() self.print('Key alias [{}]: {}'.format(session.key_alias, new_value), output='file') @@ -1474,14 +1466,43 @@ def check_user_agent(self) -> None: self.print(' {}'.format(new_ua)) def get_boto_session(self, region: str = None) -> boto3.session.Session: - session = self.get_active_session() - - if not session.access_key_id: - raise UserWarning(' No access key has been set. Failed to generate boto3 Client.') - - if not session.secret_access_key: - raise UserWarning(' No secret key has been set. Failed to generate boto3 Client.') + def set_keys_in_session(boto3_sess, key_alias=None): + creds = boto3_sess.get_credentials() + account_id = decode_accesskey_id(creds.access_key) + if not key_alias: + key_alias = f'from_default-{account_id}' + self.set_keys(key_alias=key_alias, access_key_id=creds.access_key, secret_access_key=creds.secret_key, + session_token=creds.token) + session = self.get_active_session() + + # If the user has not set any keys, check if they want to use the default system credentials + if (not session.access_key_id) or (not session.secret_access_key): + answer = '' + if session.key_alias != 'import_from_default': + answer = input(' Access keys not currently set, do you want to use the system default credentials? (y/n): ') + if (answer.lower() == 'y') or (session.key_alias == 'import_from_default'): + self.print(' Setting keys from AWS system default credentials') + boto3_session = boto3.session.Session(region_name=region) + set_keys_in_session(boto3_session) + return boto3_session + else: + raise UserWarning(' No access keys have been set.') + return None + + # Check if the keys were imported from a profile and refresh those creds + if session.key_alias.startswith('imported-'): + boto3_session = boto3.session.Session(profile_name=session.key_alias.replace('imported-', '')) + set_keys_in_session(boto3_session, session.key_alias) + return boto3_session + + # Check if the keys were set from system default and refresh + if session.key_alias.startswith('from_default-'): + boto3_session = boto3.session.Session(region_name=region) + set_keys_in_session(boto3_session, session.key_alias) + return boto3_session + + # If there are explicit keys set, which were not imported just get a session with them return boto3.session.Session( region_name=region, aws_access_key_id=session.access_key_id, From f1b4d1dcabb3025c50d61cf1fa04f2b2140259d9 Mon Sep 17 00:00:00 2001 From: DaveYesland Date: Mon, 8 Jan 2024 20:43:55 -0800 Subject: [PATCH 2/3] Fix lint errors --- pacu/main.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pacu/main.py b/pacu/main.py index b001159e..4e9fee10 100644 --- a/pacu/main.py +++ b/pacu/main.py @@ -1189,7 +1189,7 @@ def list_modules(self, search_term, by_category=False): def set_keys(self, key_alias: str = None, access_key_id: str = None, secret_access_key: str = None, session_token: str = None): session = self.get_active_session() - reservered_key_names = ['imported-','from_default-','import_from_default'] + reservered_key_names = ['imported-', 'from_default-', 'import_from_default'] # If key_alias is None, then it's being run normally from the command line (set_keys), # otherwise it means it is set programmatically and we don't want any prompts if it is @@ -1471,17 +1471,17 @@ def set_keys_in_session(boto3_sess, key_alias=None): account_id = decode_accesskey_id(creds.access_key) if not key_alias: key_alias = f'from_default-{account_id}' - self.set_keys(key_alias=key_alias, access_key_id=creds.access_key, secret_access_key=creds.secret_key, - session_token=creds.token) + self.set_keys(key_alias=key_alias, access_key_id=creds.access_key, secret_access_key=creds.secret_key, session_token=creds.token) session = self.get_active_session() - + key_alias = session.key_alias or '' + # If the user has not set any keys, check if they want to use the default system credentials if (not session.access_key_id) or (not session.secret_access_key): answer = '' - if session.key_alias != 'import_from_default': + if key_alias != 'import_from_default': answer = input(' Access keys not currently set, do you want to use the system default credentials? (y/n): ') - if (answer.lower() == 'y') or (session.key_alias == 'import_from_default'): + if (answer.lower() == 'y') or (key_alias == 'import_from_default'): self.print(' Setting keys from AWS system default credentials') boto3_session = boto3.session.Session(region_name=region) set_keys_in_session(boto3_session) @@ -1491,15 +1491,16 @@ def set_keys_in_session(boto3_sess, key_alias=None): return None # Check if the keys were imported from a profile and refresh those creds - if session.key_alias.startswith('imported-'): - boto3_session = boto3.session.Session(profile_name=session.key_alias.replace('imported-', '')) - set_keys_in_session(boto3_session, session.key_alias) + if key_alias.startswith('imported-'): + profile_name = key_alias.replace('imported-', '') + boto3_session = boto3.session.Session(profile_name=profile_name) + set_keys_in_session(boto3_session, key_alias) return boto3_session - + # Check if the keys were set from system default and refresh - if session.key_alias.startswith('from_default-'): + if key_alias.startswith('from_default-'): boto3_session = boto3.session.Session(region_name=region) - set_keys_in_session(boto3_session, session.key_alias) + set_keys_in_session(boto3_session, key_alias) return boto3_session # If there are explicit keys set, which were not imported just get a session with them From c32b7e2cf4d8273fb7e70dc09b7422700d44f0cc Mon Sep 17 00:00:00 2001 From: DaveYesland Date: Tue, 9 Jan 2024 08:32:55 -0800 Subject: [PATCH 3/3] Make default key update clear what account was added --- pacu/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pacu/main.py b/pacu/main.py index 4e9fee10..eca6dfeb 100644 --- a/pacu/main.py +++ b/pacu/main.py @@ -1471,6 +1471,7 @@ def set_keys_in_session(boto3_sess, key_alias=None): account_id = decode_accesskey_id(creds.access_key) if not key_alias: key_alias = f'from_default-{account_id}' + print(f' Setting keys for account: {account_id}') self.set_keys(key_alias=key_alias, access_key_id=creds.access_key, secret_access_key=creds.secret_key, session_token=creds.token) session = self.get_active_session()