From 67dff94358899a2d1af261d71f3d949b50d2cbd0 Mon Sep 17 00:00:00 2001 From: Michael Haigh Date: Mon, 29 Apr 2024 16:11:57 -0400 Subject: [PATCH] argparse memory optimization Signed-off-by: Michael Haigh --- tkSrc/classes.py | 6 ++- tkSrc/clone.py | 41 ++++++++----------- tkSrc/create.py | 95 ++++++++++++++++++++----------------------- tkSrc/deploy.py | 22 +++++----- tkSrc/destroy.py | 12 +++--- tkSrc/helpers.py | 100 ++++++++++++++++++++++++---------------------- tkSrc/ipr.py | 17 ++++---- tkSrc/manage.py | 47 ++++++++++------------ tkSrc/unmanage.py | 11 +++-- tkSrc/update.py | 38 +++++++++--------- toolkit.py | 26 +++++++----- 11 files changed, 205 insertions(+), 210 deletions(-) diff --git a/tkSrc/classes.py b/tkSrc/classes.py index fc11e6a..ff0b23c 100644 --- a/tkSrc/classes.py +++ b/tkSrc/classes.py @@ -15,6 +15,8 @@ limitations under the License. """ +from tkSrc.helpers import parserError + class ArgparseChoicesLists: """This Class defines a set of Lists which are used in the "choices" section of the Argparse @@ -93,7 +95,7 @@ def buildList(self, name, key, fKey=None, fVal=None): except TypeError: return [] - def getSingleDict(self, name, key, value, parser): + def getSingleDict(self, name, key, value): """Returns a single dict within the "items" list of the main resource dict, based on a matching key/value pair""" try: @@ -101,6 +103,6 @@ def getSingleDict(self, name, key, value, parser): x for x in getattr(self, name)["items"] if self.recursiveGet(key, x) == value ) except StopIteration: - parser.error( + parserError( f"A resource with a '{key}:{value}' pair in the '{name}' dict was not found" ) diff --git a/tkSrc/clone.py b/tkSrc/clone.py index 2aff1c0..44a2799 100644 --- a/tkSrc/clone.py +++ b/tkSrc/clone.py @@ -261,7 +261,6 @@ def doV3Clone( skip_tls_verify, quiet, verbose, - parser, ard, sourceApp, appName, @@ -274,7 +273,7 @@ def doV3Clone( """Performs a live clone by first creating either a snapshot (same-cluster) or backup (cross- cluster) of the sourcApp, and then it calls doV3Restore for the remaining restore operation, which consists of a BackupRestore or SnapshotRestore and an Application definition.""" - bucketDict = helpers.getCommonAppVault(v3, cluster, parser, skip_tls_verify=skip_tls_verify) + bucketDict = helpers.getCommonAppVault(v3, cluster, skip_tls_verify=skip_tls_verify) if dry_run == "client": print(f"# This must be applied on the source cluster specified by '{v3}'") if crossCluster: @@ -317,7 +316,6 @@ def doV3Clone( skip_tls_verify, quiet, verbose, - parser, ard, restoreSourceDict, appName, @@ -335,7 +333,6 @@ def setupV3Restore( skip_tls_verify, quiet, verbose, - parser, ard, restoreSource, appName, @@ -361,9 +358,9 @@ def setupV3Restore( ).main("snapshots") # For restore, we need to figure out if a backup or a snapshot source was provided if restoreSource in ard.buildList("backups", "metadata.name"): - restoreSourceDict = ard.getSingleDict("backups", "metadata.name", restoreSource, parser) + restoreSourceDict = ard.getSingleDict("backups", "metadata.name", restoreSource) elif restoreSource in ard.buildList("snapshots", "metadata.name"): - restoreSourceDict = ard.getSingleDict("snapshots", "metadata.name", restoreSource, parser) + restoreSourceDict = ard.getSingleDict("snapshots", "metadata.name", restoreSource) # crossCluster requires a backup, so create a backup from the specified snapshot if crossCluster: if dry_run == "client": @@ -391,14 +388,15 @@ def setupV3Restore( else: restoreSourceDict = waitForDpCompletion(restoreSourceDict, v3, skip_tls_verify) else: - parser.error(f"the restoreSource '{restoreSource}' is not a valid backup or snapshot") + helpers.parserError( + f"the restoreSource '{restoreSource}' is not a valid backup or snapshot" + ) doV3Restore( v3, dry_run, skip_tls_verify, quiet, verbose, - parser, ard, restoreSourceDict, appName, @@ -418,7 +416,6 @@ def doV3Restore( skip_tls_verify, quiet, verbose, - parser, ard, restoreSourceDict, appName, @@ -432,11 +429,9 @@ def doV3Restore( ): """Restores an app by creating a BackupRestore or SnapshotRestore (based on the contents of restoreSourceDict), and then creates the Application definition.""" - oApp = ard.getSingleDict( - "apps", "metadata.name", restoreSourceDict["spec"]["applicationRef"], parser - ) + oApp = ard.getSingleDict("apps", "metadata.name", restoreSourceDict["spec"]["applicationRef"]) namespaceMapping = helpers.createNamespaceMapping( - oApp["spec"]["includedNamespaces"], newNamespace, multiNsMapping, parser + oApp["spec"]["includedNamespaces"], newNamespace, multiNsMapping ) template = helpers.setupJinja("restore") try: @@ -450,7 +445,6 @@ def doV3Restore( restoreSourceDict["spec"]["appVaultRef"], v3, cluster, - parser, skip_tls_verify=skip_tls_verify, ) if crossCluster @@ -458,7 +452,7 @@ def doV3Restore( ), namespaceMapping=helpers.prependDump(namespaceMapping, prepend=4), resourceFilter=helpers.prependDump( - helpers.createFilterSet(filterSelection, filterSet, None, parser, v3=True), + helpers.createFilterSet(filterSelection, filterSet, None, v3=True), prepend=4, ), newStorageClass=newStorageClass, @@ -489,19 +483,19 @@ def doV3Restore( ) except KeyError as err: rName = restoreSourceDict["metadata"]["name"] - parser.error( + helpers.parserError( f"{err} key not found in '{rName}' object, please ensure " f"{rName}' is a valid backup/snapshot" ) -def main(args, parser, ard): +def main(args, ard): # Ensure proper use of resource filters if args.subcommand == "restore": if (args.filterSelection and not args.filterSet) or ( args.filterSet and not args.filterSelection ): - parser.error( + helpers.parserError( "either both or none of --filterSelection and --filterSet should be specified" ) @@ -521,7 +515,6 @@ def main(args, parser, ard): args.skip_tls_verify, args.quiet, args.verbose, - parser, ard, args.sourceApp, args.appName, @@ -540,7 +533,6 @@ def main(args, parser, ard): args.skip_tls_verify, args.quiet, args.verbose, - parser, ard, args.restoreSource, args.appName, @@ -583,7 +575,7 @@ def main(args, parser, ard): dataProtections = ard.snapshots snapshot = args.restoreSource else: - parser.error( + helpers.parserError( f"the restoreSource '{args.restoreSource}' is not a valid backup or snapshot" ) for app in ard.apps["items"]: @@ -592,17 +584,17 @@ def main(args, parser, ard): oApp = app # Ensure appIDstr is not equal to "", if so bad values were passed in with plaidMode if not oApp: - parser.error( + helpers.parserError( "the corresponding app was not found in the system, please check " + "your inputs and try again." ) doClone( - helpers.isRFC1123(args.appName, parser=parser), + helpers.isRFC1123(args.appName), args.cluster, oApp, helpers.createNamespaceMapping( - oApp["namespaceScopedResources"], args.newNamespace, args.multiNsMapping, parser + oApp["namespaceScopedResources"], args.newNamespace, args.multiNsMapping ), args.newStorageClass, backup, @@ -612,7 +604,6 @@ def main(args, parser, ard): args.filterSelection, args.filterSet, astraSDK.apps.getAppAssets().main(oApp["id"]), - parser, ), pollTimer=args.pollTimer, background=args.background, diff --git a/tkSrc/create.py b/tkSrc/create.py index a9fa0f2..b169af0 100644 --- a/tkSrc/create.py +++ b/tkSrc/create.py @@ -26,7 +26,7 @@ from tkSrc import helpers -def monitorProtectionTask(protectionID, protectionType, appID, background, pollTimer, parser): +def monitorProtectionTask(protectionID, protectionType, appID, background, pollTimer): """Ensure backup/snapshot task was created successfully, then monitor""" if protectionID is False: return False @@ -35,7 +35,7 @@ def monitorProtectionTask(protectionID, protectionType, appID, background, pollT elif protectionType == "snapshot": protection_class = astraSDK.snapshots.getSnaps() else: - parser.error(f"unknown protection type: {protectionType}") + helpers.parserError(f"unknown protection type: {protectionType}") print(f"Starting {protectionType} of {appID}") if background: @@ -75,7 +75,7 @@ def monitorProtectionTask(protectionID, protectionType, appID, background, pollT return False -def monitorV3ProtectionTask(protection, pollTimer, parser, v3, skip_tls_verify): +def monitorV3ProtectionTask(protection, pollTimer, v3, skip_tls_verify): name = protection["metadata"]["name"] singular = protection["kind"].lower() resource_class = astraSDK.k8s.getResources(config_context=v3, skip_tls_verify=skip_tls_verify) @@ -122,9 +122,9 @@ def createV3ConnectorOperator(v3, dry_run, skip_tls_verify, verbose, operator_ve ) -def createCloudCredential(quiet, verbose, path, name, cloudType, parser): +def createCloudCredential(quiet, verbose, path, name, cloudType): """Create a public cloud (AWS/Azure/GCP) credential via the API""" - credDict = helpers.openJson(path, parser) + credDict = helpers.openJson(path) encodedStr = base64.b64encode(json.dumps(credDict).encode("utf-8")).decode("utf-8") rc = astraSDK.credentials.createCredential(quiet=quiet, verbose=verbose).main( "astra-sa@" + name, @@ -137,9 +137,9 @@ def createCloudCredential(quiet, verbose, path, name, cloudType, parser): raise SystemExit("astraSDK.credentials.createCredential() failed") -def createV3CloudCredential(v3, dry_run, skip_tls_verify, quiet, verbose, path, name, parser): +def createV3CloudCredential(v3, dry_run, skip_tls_verify, quiet, verbose, path, name): """Create a public cloud (AWS/Azure/GCP) credential via a Kubernetes secret""" - credDict = helpers.openJson(path, parser) + credDict = helpers.openJson(path) encodedStr = base64.b64encode(json.dumps(credDict).encode("utf-8")).decode("utf-8") data = {"credentials.json": encodedStr} namespace = "astra-connector" @@ -163,7 +163,7 @@ def createV3CloudCredential(v3, dry_run, skip_tls_verify, quiet, verbose, path, ).main(f"{name}-", data, generateName=True, namespace=namespace) -def createLdapCredential(quiet, verbose, username, password, parser): +def createLdapCredential(quiet, verbose, username, password): """Create an LDAP bind credential via the API""" bindDn = base64.b64encode(username.encode("utf-8")).decode("utf-8") enpass = base64.b64encode(password.encode("utf-8")).decode("utf-8") @@ -268,7 +268,6 @@ def createV3Hook( skip_tls_verify, quiet, verbose, - parser, app, name, filePath, @@ -281,11 +280,11 @@ def createV3Hook( containerName=None, ): """Creates an exec hook via a Kubernetes custom resource""" - encodedStr = helpers.openScript(filePath, parser) + encodedStr = helpers.openScript(filePath) template = helpers.setupJinja("hook") v3_dict = yaml.safe_load( template.render( - name=helpers.isRFC1123(name, parser=parser), + name=helpers.isRFC1123(name), action=operation.split("-")[1], appName=app, arguments=helpers.prependDump(helpers.createHookList(hookArguments), prepend=4), @@ -417,7 +416,7 @@ def createV3Snapshot( ) -def main(args, parser, ard): +def main(args, ard): if args.objectType == "backup": if args.v3: if ard.needsattr("buckets"): @@ -425,9 +424,9 @@ def main(args, parser, ard): config_context=args.v3, skip_tls_verify=args.skip_tls_verify ).main("appvaults") if args.bucket is None: - args.bucket = ard.getSingleDict("buckets", "status.state", "available", parser)[ - "metadata" - ]["name"] + args.bucket = ard.getSingleDict("buckets", "status.state", "available")["metadata"][ + "name" + ] backup = createV3Backup( args.v3, args.dry_run, @@ -441,13 +440,11 @@ def main(args, parser, ard): args.reclaimPolicy, ) if backup and not args.dry_run and not args.background: - monitorV3ProtectionTask( - backup, args.pollTimer, parser, args.v3, args.skip_tls_verify - ) + monitorV3ProtectionTask(backup, args.pollTimer, args.v3, args.skip_tls_verify) else: protectionID = astraSDK.backups.takeBackup(quiet=args.quiet, verbose=args.verbose).main( args.app, - helpers.isRFC1123(args.name, parser=parser), + helpers.isRFC1123(args.name), bucketID=args.bucket, snapshotID=args.snapshot, ) @@ -457,12 +454,11 @@ def main(args, parser, ard): args.app, args.background, args.pollTimer, - parser, ) if rc is False: raise SystemExit("monitorProtectionTask() failed") elif args.objectType == "cluster": - kubeconfigDict = helpers.openYaml(args.filePath, parser) + kubeconfigDict = helpers.openYaml(args.filePath) encodedStr = base64.b64encode(json.dumps(kubeconfigDict).encode("utf-8")).decode("utf-8") rc = astraSDK.credentials.createCredential(quiet=args.quiet, verbose=args.verbose).main( kubeconfigDict["clusters"][0]["name"], @@ -483,9 +479,9 @@ def main(args, parser, ard): elif args.objectType == "group": ldapGroups = astraSDK.groups.getLdapGroups().main(dnFilter=args.dn, matchType="eq") if len(ldapGroups["items"]) == 0: - parser.error(f"0 LDAP groups found with DN '{args.dn}'") + helpers.parserError(f"0 LDAP groups found with DN '{args.dn}'") elif len(ldapGroups["items"]) > 1: - parser.error(f"multiple LDAP users found with DN '{args.dn}'") + helpers.parserError(f"multiple LDAP users found with DN '{args.dn}'") # First create the group grc = astraSDK.groups.createGroup(quiet=args.quiet, verbose=args.verbose).main(args.dn) if grc: @@ -510,7 +506,6 @@ def main(args, parser, ard): args.skip_tls_verify, args.quiet, args.verbose, - parser, args.app, args.name, args.filePath, @@ -541,11 +536,9 @@ def main(args, parser, ard): if rc is False: raise SystemExit("astraSDK.hooks.createHook() failed") elif args.objectType == "ldap": - credential = createLdapCredential( - args.quiet, args.verbose, args.username, args.password, parser - ) + credential = createLdapCredential(args.quiet, args.verbose, args.username, args.password) ard.settings = astraSDK.settings.getSettings().main() - ldapSetting = ard.getSingleDict("settings", "name", "astra.account.ldap", parser) + ldapSetting = ard.getSingleDict("settings", "name", "astra.account.ldap") rc = astraSDK.settings.createLdap(quiet=args.quiet, verbose=args.verbose).main( ldapSetting["id"], args.url, @@ -564,28 +557,28 @@ def main(args, parser, ard): naStr = "" if args.v3 else "*" if args.granularity == "hourly": if args.hour: - parser.error("'hourly' granularity must not specify -H / --hour") + helpers.parserError("'hourly' granularity must not specify -H / --hour") args.hour = naStr args.dayOfWeek = naStr args.dayOfMonth = naStr elif args.granularity == "daily": if not isinstance(args.hour, int) and not args.hour: - parser.error("'daily' granularity requires -H / --hour") + helpers.parserError("'daily' granularity requires -H / --hour") args.dayOfWeek = naStr args.dayOfMonth = naStr elif args.granularity == "weekly": if not isinstance(args.hour, int) and not args.hour: - parser.error("'weekly' granularity requires -H / --hour") + helpers.parserError("'weekly' granularity requires -H / --hour") if not isinstance(args.dayOfWeek, int) and not args.dayOfWeek: - parser.error("'weekly' granularity requires -W / --dayOfWeek") + helpers.parserError("'weekly' granularity requires -W / --dayOfWeek") args.dayOfMonth = naStr elif args.granularity == "monthly": if not isinstance(args.hour, int) and not args.hour: - parser.error("'monthly' granularity requires -H / --hour") + helpers.parserError("'monthly' granularity requires -H / --hour") if args.dayOfWeek: - parser.error("'monthly' granularity must not specify -W / --dayOfWeek") + helpers.parserError("'monthly' granularity must not specify -W / --dayOfWeek") if not args.dayOfMonth: - parser.error("'monthly' granularity requires -M / --dayOfMonth") + helpers.parserError("'monthly' granularity requires -M / --dayOfMonth") args.dayOfWeek = naStr if args.v3: if ard.needsattr("buckets"): @@ -593,9 +586,9 @@ def main(args, parser, ard): config_context=args.v3, skip_tls_verify=args.skip_tls_verify ).main("appvaults") if args.bucket is None: - args.bucket = ard.getSingleDict("buckets", "status.state", "available", parser)[ - "metadata" - ]["name"] + args.bucket = ard.getSingleDict("buckets", "status.state", "available")["metadata"][ + "name" + ] createV3Protection( args.v3, args.dry_run, @@ -637,9 +630,11 @@ def main(args, parser, ard): hours = "00" minutes = args.offset.zfill(2) if int(hours) < 0 or int(hours) > 23: - parser.error(f"offset {args.offset} hours must be between 0 and 23, inclusive") + helpers.parserError(f"offset {args.offset} hours must be between 0 and 23, inclusive") elif int(minutes) < 0 or int(minutes) > 59: - parser.error(f"offset '{args.offset}' minutes must be between 0 and 59, inclusive") + helpers.parserError( + f"offset '{args.offset}' minutes must be between 0 and 59, inclusive" + ) dtstart = "DTSTART:20220101T" + hours + minutes + "00Z\n" # Create RRULE string rrule = "RRULE:FREQ=MINUTELY;INTERVAL=" @@ -689,7 +684,7 @@ def main(args, parser, ard): else: raise SystemExit("astraSDK.replications.createReplicationpolicy() failed") elif args.objectType == "script": - encodedStr = helpers.openScript(args.filePath, parser) + encodedStr = helpers.openScript(args.filePath) rc = astraSDK.scripts.createScript(quiet=args.quiet, verbose=args.verbose).main( name=args.name, source=encodedStr, description=args.description ) @@ -702,9 +697,9 @@ def main(args, parser, ard): config_context=args.v3, skip_tls_verify=args.skip_tls_verify ).main("appvaults") if args.bucket is None: - args.bucket = ard.getSingleDict("buckets", "status.state", "available", parser)[ - "metadata" - ]["name"] + args.bucket = ard.getSingleDict("buckets", "status.state", "available")["metadata"][ + "name" + ] snapshot = createV3Snapshot( args.v3, args.dry_run, @@ -719,13 +714,10 @@ def main(args, parser, ard): readyToUseTimeout=args.readyToUseTimeout, ) if snapshot and not args.dry_run and not args.background: - monitorV3ProtectionTask( - snapshot, args.pollTimer, parser, args.v3, args.skip_tls_verify - ) + monitorV3ProtectionTask(snapshot, args.pollTimer, args.v3, args.skip_tls_verify) else: protectionID = astraSDK.snapshots.takeSnap(quiet=args.quiet, verbose=args.verbose).main( - args.app, - helpers.isRFC1123(args.name, parser=parser), + args.app, helpers.isRFC1123(args.name) ) rc = monitorProtectionTask( protectionID, @@ -733,7 +725,6 @@ def main(args, parser, ard): args.app, args.background, args.pollTimer, - parser, ) if rc is False: raise SystemExit("monitorProtectionTask() failed") @@ -742,9 +733,9 @@ def main(args, parser, ard): if args.ldap: ldapUsers = astraSDK.users.getLdapUsers().main(emailFilter=args.email, matchType="eq") if len(ldapUsers["items"]) == 0: - parser.error(f"0 LDAP users found with email '{args.email}'") + helpers.parserError(f"0 LDAP users found with email '{args.email}'") elif len(ldapUsers["items"]) > 1: - parser.error(f"multiple LDAP users found with email '{args.email}'") + helpers.parserError(f"multiple LDAP users found with email '{args.email}'") args.firstName = ldapUsers["items"][0]["firstName"] args.lastName = ldapUsers["items"][0]["lastName"] args.ldap = "ldap" diff --git a/tkSrc/deploy.py b/tkSrc/deploy.py index bbc97a8..8041a23 100644 --- a/tkSrc/deploy.py +++ b/tkSrc/deploy.py @@ -181,7 +181,7 @@ def deployHelm( raise SystemExit(f"cpp.main({period}...) returned False") -def main(args, parser, ard): +def main(args, ard): if args.objectType == "acp": if args.v3: # Ensure the trident orchestrator is already running @@ -192,9 +192,11 @@ def main(args, parser, ard): skip_tls_verify=args.skip_tls_verify, ).main("tridentorchestrators") if torc is None or len(torc["items"]) == 0: - parser.error("trident operator not found on current Kubernetes context") + helpers.parserError("trident operator not found on current Kubernetes context") elif len(torc["items"]) > 1: - parser.error("multiple trident operators found on current Kubernetes context") + helpers.parserError( + "multiple trident operators found on current Kubernetes context" + ) # Handle the registry secret if not args.regCred: cred = astraSDK.k8s.createRegCred( @@ -212,7 +214,7 @@ def main(args, parser, ard): ard.credentials = astraSDK.k8s.getSecrets( config_context=args.v3, skip_tls_verify=args.skip_tls_verify ).main(namespace="trident") - cred = ard.getSingleDict("credentials", "metadata.name", args.regCred, parser) + cred = ard.getSingleDict("credentials", "metadata.name", args.regCred) # Handle default registry if not args.registry: try: @@ -224,7 +226,7 @@ def main(args, parser, ard): ) ) except KeyError as err: - parser.error( + helpers.parserError( f"{args.regCred} does not appear to be a Docker secret: {err} key not found" ) # Create the patch spec @@ -247,7 +249,7 @@ def main(args, parser, ard): else: raise SystemExit("astraSDK.k8s.updateClusterResource() failed") else: - parser.error( + helpers.parserError( "'deploy acp' is currently only supported as a --v3 command, please re-run with " "--v3 and an optional context, kubeconfig_file, or context@kubeconfig_file mapping" ) @@ -260,12 +262,12 @@ def main(args, parser, ard): config_context=args.v3, skip_tls_verify=args.skip_tls_verify ).main("appvaults") if args.bucket is None: - args.bucket = ard.getSingleDict("buckets", "status.state", "available", parser)[ - "metadata" - ]["name"] + args.bucket = ard.getSingleDict("buckets", "status.state", "available")["metadata"][ + "name" + ] deployHelm( args.chart, - helpers.isRFC1123(args.app, parser=parser), + helpers.isRFC1123(args.app), args.namespace, args.set, args.values, diff --git a/tkSrc/destroy.py b/tkSrc/destroy.py index cc66e7f..cdaca68 100644 --- a/tkSrc/destroy.py +++ b/tkSrc/destroy.py @@ -18,7 +18,7 @@ import astraSDK -def main(args, parser, ard): +def main(args, ard): if args.objectType == "backup": if args.v3: rc = astraSDK.k8s.destroyResource( @@ -39,7 +39,7 @@ def main(args, parser, ard): elif args.objectType == "cluster": if ard.needsattr("clusters"): ard.clusters = astraSDK.clusters.getClusters().main() - cluster = ard.getSingleDict("clusters", "id", args.cluster, parser) + cluster = ard.getSingleDict("clusters", "id", args.cluster) rc = astraSDK.clusters.deleteCluster(quiet=args.quiet, verbose=args.verbose).main( cluster["id"], cluster["cloudID"] ) @@ -65,7 +65,7 @@ def main(args, parser, ard): elif args.objectType == "group": if ard.needsattr("rolebindings"): ard.rolebindings = astraSDK.rolebindings.getRolebindings().main() - rb = ard.getSingleDict("rolebindings", "groupID", args.groupID, parser) + rb = ard.getSingleDict("rolebindings", "groupID", args.groupID) if astraSDK.rolebindings.destroyRolebinding(quiet=args.quiet, verbose=args.verbose).main( rb["id"] ): @@ -97,7 +97,7 @@ def main(args, parser, ard): raise SystemExit(f"Failed destroying hook: {args.hook}") elif args.objectType == "ldap": ard.settings = astraSDK.settings.getSettings().main() - ldapSetting = ard.getSingleDict("settings", "name", "astra.account.ldap", parser) + ldapSetting = ard.getSingleDict("settings", "name", "astra.account.ldap") if astraSDK.settings.destroyLdap(quiet=args.quiet, verbose=args.verbose).main( ldapSetting["id"] ): @@ -192,8 +192,8 @@ def main(args, parser, ard): ard.users = astraSDK.users.getUsers().main() if ard.needsattr("rolebindings"): ard.rolebindings = astraSDK.rolebindings.getRolebindings().main() - user = ard.getSingleDict("users", "id", args.userID, parser) - rb = ard.getSingleDict("rolebindings", "userID", args.userID, parser) + user = ard.getSingleDict("users", "id", args.userID) + rb = ard.getSingleDict("rolebindings", "userID", args.userID) if astraSDK.rolebindings.destroyRolebinding(quiet=args.quiet, verbose=args.verbose).main( rb["id"] ): diff --git a/tkSrc/helpers.py b/tkSrc/helpers.py index ed6f322..4e3cca6 100644 --- a/tkSrc/helpers.py +++ b/tkSrc/helpers.py @@ -89,14 +89,14 @@ def createCriteriaList(images, namespaces, pods, labels, names): ) -def createNamespaceMapping(appNamespaces, singleNs, multiNsMapping, parser): +def createNamespaceMapping(appNamespaces, singleNs, multiNsMapping): """Create a list of dictionaries of source and destination namespaces for cloning an application, as the user can provide a variety of input. Return object format: [ { "source": "sourcens1", "destination": "destns1" }, { "source": "sourcens2", "destination": "destns2" } ]""" # Ensure that multiNsMapping was used for multi-namespace apps if multiNsMapping is None and len(appNamespaces) > 1: - parser.error("for multi-namespace apps, --multiNsMapping must be used.") + parserError("for multi-namespace apps, --multiNsMapping must be used.") # For single-namespace apps, the namespace mapping is **not** a required field elif singleNs is None and multiNsMapping is None: return None @@ -105,7 +105,7 @@ def createNamespaceMapping(appNamespaces, singleNs, multiNsMapping, parser): return [ { "source": appNamespaces[0]["namespace"], - "destination": isRFC1123(singleNs, parser=parser), + "destination": isRFC1123(singleNs), } ] # Handle multiNsMapping cases @@ -137,7 +137,7 @@ def createNamespaceMapping(appNamespaces, singleNs, multiNsMapping, parser): returnList.append( { "source": mapping.split("=")[0], - "destination": isRFC1123(mapping.split("=")[1], parser=parser), + "destination": isRFC1123(mapping.split("=")[1]), } ) return returnList @@ -310,7 +310,7 @@ def run(command, captureOutput=False, ignoreErrors=False, env=None): return True -def isRFC1123(string, parser=None, ignore_length=False): +def isRFC1123(string, ignore_length=False): """isRFC1123 returns the input 'string' if it conforms to RFC 1123 spec, otherwise it throws an error and exits""" regex = re.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?$") @@ -322,18 +322,15 @@ def isRFC1123(string, parser=None, ignore_length=False): "and end with an alphanumeric character, and must be at most 63 characters (for " "example 'my-name' or '123-abc')." ) - if parser is not None: - parser.error(error) - else: - raise SystemExit(f"Error: {error}") + parserError(error) -def dupeKeyError(key, parser): +def dupeKeyError(key): """Print an error message if duplicate keys are used""" - parser.error(f"'{key}' should not be specified multiple times within a single --filterSet arg") + parserError(f"'{key}' should not be specified multiple times within a single --filterSet arg") -def createSetDict(setDict, filterStr, assets, parser, v3=False): +def createSetDict(setDict, filterStr, assets, v3=False): """Given a filterStr such as: label=app.kubernetes.io/tier=backend,name=mysql,kind=Deployment Return a setDict with the following format: @@ -353,15 +350,13 @@ def createSetDict(setDict, filterStr, assets, parser, v3=False): elif "label" in key: setDict.setdefault("labelSelectors", []).append(val) elif "group" in key: - setDict["group"] = val if not setDict.get("group") else dupeKeyError("group", parser) + setDict["group"] = val if not setDict.get("group") else dupeKeyError("group") elif "version" in key: - setDict["version"] = ( - val if not setDict.get("version") else dupeKeyError("version", parser) - ) + setDict["version"] = val if not setDict.get("version") else dupeKeyError("version") elif "kind" in key: - setDict["kind"] = val if not setDict.get("kind") else dupeKeyError("kind", parser) + setDict["kind"] = val if not setDict.get("kind") else dupeKeyError("kind") else: - parser.error( + parserError( f"'{key}' not one of ['namespace', 'name', 'label', 'group', 'version', 'kind']" ) # TODO: Add v3 validation once ASTRACTL-31946 is complete @@ -369,7 +364,7 @@ def createSetDict(setDict, filterStr, assets, parser, v3=False): # Validate the inputs are valid assets for this app for key in ["group", "version", "kind"]: if setDict.get(key) and setDict[key] not in [a["GVK"][key] for a in assets["items"]]: - parser.error( + parserError( f"'{setDict[key]}' is not a valid '{key}' for this application, please run '" f"list assets {assets['metadata']['appID']}' to view possible '{key}' choices" ) @@ -382,14 +377,14 @@ def createSetDict(setDict, filterStr, assets, parser, v3=False): if setDict[key1] not in [ a["GVK"][key1] for a in assets["items"] if a["GVK"][key2] == setDict[key2] ]: - parser.error( + parserError( f"'{key1}={setDict[key1]}' does not match with '{key2}={setDict[key2]}'" f", please run 'list assets {assets['metadata']['appID']}' to view " "valid GVK combinations" ) -def createFilterSet(selection, filters, assets, parser, v3=False): +def createFilterSet(selection, filters, assets, v3=False): """createFilterSet takes in a selection string, and a filters array of arrays, for example: [ ['group=apps,version=v1,kind=Deployment'], @@ -424,14 +419,14 @@ def createFilterSet(selection, filters, assets, parser, v3=False): setDict = {} if isinstance(fil, list): for f in fil: - createSetDict(setDict, f, assets, parser, v3=v3) + createSetDict(setDict, f, assets, v3=v3) else: - createSetDict(setDict, fil, assets, parser, v3=v3) + createSetDict(setDict, fil, assets, v3=v3) rFilter[filterKey].append(setDict) return rFilter -def createSingleSecretKeyDict(credKeyPair, ard, parser): +def createSingleSecretKeyDict(credKeyPair, ard): """Given a credKeyPair list like ['s3-creds', 'accessKeyID'] Return a dict with the following format: @@ -449,28 +444,28 @@ def createSingleSecretKeyDict(credKeyPair, ard, parser): cred, key = credKeyPair else: key, cred = credKeyPair - credDict = ard.getSingleDict("credentials", "metadata.name", cred, parser) + credDict = ard.getSingleDict("credentials", "metadata.name", cred) if key not in credDict["data"].keys(): - parser.error( + parserError( f"'{credKeyPair[1]}' not found in '{credKeyPair[0]}' data keys: " f"{', '.join(credDict['data'].keys())}" ) return {"valueFromSecret": {"name": cred, "key": key}} -def createSecretKeyDict(keyNameList, credential, provider, ard, parser): +def createSecretKeyDict(keyNameList, credential, provider, ard): """Use keyNameList to ensure number of credential arguments inputted is correct, and build the full providerCredentials dictionary""" # Ensure correct credential length matches keyNameList length if len(credential) != len(keyNameList): - parser.error( + parserError( f"-s/--credential must be specified {len(keyNameList)} time(s) for " f"'{provider}' provider" ) # argparse ensures len(args.credential) is 2, but can't ensure a valid name/key pair providerCredentials = {} for i, credKeyPair in enumerate(credential): - providerCredentials[keyNameList[i]] = createSingleSecretKeyDict(credKeyPair, ard, parser) + providerCredentials[keyNameList[i]] = createSingleSecretKeyDict(credKeyPair, ard) return providerCredentials @@ -483,14 +478,14 @@ def prependDump(obj, prepend, indent=2): return None -def checkv3Support(args, parser, supportedDict): +def checkv3Support(args, supportedDict): """Function to ensure a v3-specific actoolkit command is currently supported by v3""" if supported := supportedDict.get(args.subcommand): if supported is True: return True elif args.objectType in supported: return True - parser.error(f"'{args.subcommand} {args.objectType}' is not currently a supported --v3 command") + parserError(f"'{args.subcommand} {args.objectType}' is not currently a supported --v3 command") def setupJinja( @@ -528,7 +523,7 @@ def sameK8sCluster(cluster1, cluster2, skip_tls_verify=False): return False -def getCommonAppVault(cluster1, cluster2, parser, skip_tls_verify=False): +def getCommonAppVault(cluster1, cluster2, skip_tls_verify=False): """Function which takes in two cluster contexts, and finds and returns an appVault that's common between the two of them, as designated by status.uid""" c1AppVaults = astraSDK.k8s.getResources( @@ -543,10 +538,10 @@ def getCommonAppVault(cluster1, cluster2, parser, skip_tls_verify=False): if c2av.get("status") and c2av["status"].get("uid"): if c1av["status"]["uid"] == c2av["status"]["uid"]: return c1av - parser.error(f"A common appVault was not found between cluster {cluster1} and {cluster2}") + parserError(f"A common appVault was not found between cluster {cluster1} and {cluster2}") -def swapAppVaultRef(sourceAppVaultRef, sourceCluster, destCluster, parser, skip_tls_verify=False): +def swapAppVaultRef(sourceAppVaultRef, sourceCluster, destCluster, skip_tls_verify=False): """Function which takes in the name of a sourceCluster's appVaultRef, and then returns the name of the destCluster's same appVaultRef (appVaults can be named differently across clusters due to Astra Control auto-appending a unique identifier).""" @@ -562,9 +557,9 @@ def swapAppVaultRef(sourceAppVaultRef, sourceCluster, destCluster, parser, skip_ ) sourceUid = sourceAppVault["status"]["uid"] except StopIteration: - parser.error(f"'{sourceAppVaultRef}' not found on the source cluster,\n{sourceAppVaults=}") + parserError(f"'{sourceAppVaultRef}' not found on the source cluster,\n{sourceAppVaults=}") except KeyError as err: - parser.error(f"{err} key not found in '{sourceAppVaultRef}' object,\n{sourceAppVaults=}") + parserError(f"{err} key not found in '{sourceAppVaultRef}' object,\n{sourceAppVaults=}") try: destAppVault = next(a for a in destAppVaults["items"] if a["status"]["uid"] == sourceUid) return destAppVault["metadata"]["name"] @@ -573,36 +568,36 @@ def swapAppVaultRef(sourceAppVaultRef, sourceCluster, destCluster, parser, skip_ {"name": d["metadata"]["name"], "uid": d["status"]["uid"]} for d in destAppVaults["items"] ] - parser.error( + parserError( f"An appVault with status.uid of '{sourceUid}' not found on the destination cluster," f" destination app vaults: {destAppVaultSum}" ) except KeyError as err: - parser.error(f"{err} key not found in 'destAppVault' object,\n{destAppVaults=}") + parserError(f"{err} key not found in 'destAppVault' object,\n{destAppVaults=}") -def openJson(path, parser): +def openJson(path): """Given a file path, open the json file, and return a dict of its contents""" with open(path, encoding="utf8") as f: try: return json.loads(f.read().rstrip()) except json.decoder.JSONDecodeError: - parser.error(f"{path} does not seem to be valid JSON") + parserError(f"{path} does not seem to be valid JSON") -def openScript(path, parser): +def openScript(path): """Given a file path, open the text file, and return a str of its contents""" with open(path, encoding="utf8") as f: return base64.b64encode(f.read().rstrip().encode("utf-8")).decode("utf-8") -def openYaml(path, parser): +def openYaml(path): """Given a file path, open the yaml file, and return a dict of its contents""" with open(path, encoding="utf8") as f: try: return yaml.load(f.read().rstrip(), Loader=yaml.SafeLoader) except (yaml.scanner.ScannerError, IsADirectoryError): - parser.error(f"{path} does not seem to be valid YAML") + parserError(f"{path} does not seem to be valid YAML") def getNestedValue(obj, key): @@ -621,15 +616,15 @@ def getNestedValue(obj, key): yield result -def extractAwsKeys(path, parser): +def extractAwsKeys(path): """Returns a tuple of the AccessKeyId, SecretAccessKey in an AWS credential JSON""" - awsCreds = openJson(path, parser) + awsCreds = openJson(path) accessKeyID = "".join(getNestedValue(awsCreds, "AccessKeyId")) secretAccessKey = "".join(getNestedValue(awsCreds, "SecretAccessKey")) if not accessKeyID: - parser.error(f"'AccessKeyId' not found in '{path}'") + parserError(f"'AccessKeyId' not found in '{path}'") if not secretAccessKey: - parser.error(f"'SecretAccessKey' not found in '{path}'") + parserError(f"'SecretAccessKey' not found in '{path}'") return accessKeyID, secretAccessKey @@ -639,3 +634,14 @@ def combineResources(*args): for resource in args: base_dict["items"] += resource["items"] return base_dict + + +def parserError(message): + """Prints an argparse-like generic error message""" + prog = "actoolkit" + usage = ( + "usage: actoolkit [-h] [-v] [-o {json,yaml,table}] [-q] [-f] [--v3] " + "[--dry-run {client,server}] [--insecure-skip-tls-verify] " + "{deploy,clone,restore,ipr,list,get,copy,create,manage,define,destroy,unmanage,update} ..." + ) + raise SystemExit(f"{usage}\n{prog}: error: {message}") diff --git a/tkSrc/ipr.py b/tkSrc/ipr.py index aa5a922..f7f74e3 100644 --- a/tkSrc/ipr.py +++ b/tkSrc/ipr.py @@ -30,7 +30,6 @@ def doV3Ipr( skip_tls_verify, quiet, verbose, - parser, ard, backup=None, snapshot=None, @@ -42,13 +41,13 @@ def doV3Ipr( ard.backups = astraSDK.k8s.getResources( config_context=v3, skip_tls_verify=skip_tls_verify ).main("backups") - iprSourceDict = ard.getSingleDict("backups", "metadata.name", backup, parser) + iprSourceDict = ard.getSingleDict("backups", "metadata.name", backup) elif snapshot: if ard.needsattr("snapshots"): ard.snapshots = astraSDK.k8s.getResources( config_context=v3, skip_tls_verify=skip_tls_verify ).main("snapshots") - iprSourceDict = ard.getSingleDict("snapshots", "metadata.name", snapshot, parser) + iprSourceDict = ard.getSingleDict("snapshots", "metadata.name", snapshot) template = helpers.setupJinja("ipr") try: @@ -59,7 +58,7 @@ def doV3Ipr( appArchivePath=iprSourceDict["status"]["appArchivePath"], appVaultRef=iprSourceDict["spec"]["appVaultRef"], resourceFilter=helpers.prependDump( - helpers.createFilterSet(filterSelection, filterSet, None, parser, v3=True), + helpers.createFilterSet(filterSelection, filterSet, None, v3=True), prepend=4, ), ) @@ -82,17 +81,19 @@ def doV3Ipr( ) except KeyError as err: iprSourceName = backup if backup else snapshot - parser.error( + helpers.parserError( f"{err} key not found in '{iprSourceName}' object, please ensure " f"'{iprSourceName}' is a valid backup/snapshot" ) -def main(args, parser, ard): +def main(args, ard): if (args.filterSelection and not args.filterSet) or ( args.filterSet and not args.filterSelection ): - parser.error("either both or none of --filterSelection and --filterSet should be specified") + helpers.parserError( + "either both or none of --filterSelection and --filterSet should be specified" + ) if args.v3: doV3Ipr( @@ -101,7 +102,6 @@ def main(args, parser, ard): args.skip_tls_verify, args.quiet, args.verbose, - parser, ard, backup=args.backup, snapshot=args.snapshot, @@ -117,7 +117,6 @@ def main(args, parser, ard): args.filterSelection, args.filterSet, astraSDK.apps.getAppAssets().main(args.app), - parser, ), ) if rc: diff --git a/tkSrc/manage.py b/tkSrc/manage.py index 76fa0d5..c99e0d0 100644 --- a/tkSrc/manage.py +++ b/tkSrc/manage.py @@ -97,7 +97,6 @@ def manageV3Bucket( skipCertValidation, credential, ard, - parser, ): """Manage a bucket via a Kubernetes custom resource""" # Create providerCredentials based on provider input @@ -110,14 +109,14 @@ def manageV3Bucket( template = helpers.setupJinja("appVault") v3_dict = yaml.safe_load( template.render( - bucketName=helpers.isRFC1123(bucketName, parser=parser), + bucketName=helpers.isRFC1123(bucketName), providerType=provider, accountName=storageAccount, endpoint=serverURL, secure=("false" if http else None), skipCertValidation=("true" if skipCertValidation else None), providerCredentials=helpers.prependDump( - helpers.createSecretKeyDict(keyNameList, credential, provider, ard, parser), + helpers.createSecretKeyDict(keyNameList, credential, provider, ard), prepend=4, ), ) @@ -153,9 +152,8 @@ def manageV3Cluster( cloudID, headless, ard, - parser, ): - helpers.isRFC1123(clusterName, parser=parser) + helpers.isRFC1123(clusterName) create.createV3ConnectorOperator(v3, dry_run, skip_tls_verify, verbose, operator_version) # Create the astra API token secret if not headless: @@ -183,7 +181,7 @@ def manageV3Cluster( ard.credentials = astraSDK.k8s.getSecrets( config_context=v3, skip_tls_verify=skip_tls_verify ).main() - cred = ard.getSingleDict("credentials", "metadata.name", regCred, parser) + cred = ard.getSingleDict("credentials", "metadata.name", regCred) # Create the AstraConnector CR if headless: connector = astraSDK.k8s.createHeadlessConnector( @@ -205,7 +203,7 @@ def manageV3Cluster( raise SystemExit("astraSDK.k8s.createAstraConnector() failed") -def validateBucketArgs(args, parser): +def validateBucketArgs(args): """Validate that user provided inputs for managing a bucket are valid""" # Validate serverURL and storageAccount args depending upon provider type if args.serverURL is None and args.provider in [ @@ -214,35 +212,37 @@ def validateBucketArgs(args, parser): "ontap-s3", "storagegrid-s3", ]: - parser.error(f"--serverURL must be provided for '{args.provider}' provider.") + helpers.parserError(f"--serverURL must be provided for '{args.provider}' provider.") if args.storageAccount is None and args.provider == "azure": - parser.error("--storageAccount must be provided for 'azure' provider.") + helpers.parserError("--storageAccount must be provided for 'azure' provider.") # Error if credential was specified with any other argument if args.credential is not None: if args.json is not None or args.accessKey is not None or args.accessSecret is not None: - parser.error( + helpers.parserError( "if an existing credential is specified, no other credentialGroup arguments " "can be specified" ) # Error if json was specified with accessKey/Secret elif args.json is not None: if args.accessKey is not None or args.accessSecret is not None: - parser.error( + helpers.parserError( "if a public cloud JSON credential file is specified, no other credentialGroup " "arguments can be specified" ) # If neither credential or json was specified, make sure both accessKey/Secret were elif args.accessKey is None or args.accessSecret is None: - parser.error( + helpers.parserError( "either an (existing credential) OR (public cloud JSON credential) OR " "(accessKey AND accessSecret) must be specified" ) # If json was specified, ensure provider is a public cloud if args.json is not None and args.provider not in ["aws", "azure", "gcp"]: - parser.error("--json should only be specified for public cloud providers (aws, azure, gcp)") + helpers.parserError( + "--json should only be specified for public cloud providers (aws, azure, gcp)" + ) -def main(args, parser, ard): +def main(args, ard): if args.objectType == "app" or args.objectType == "application": if args.additionalNamespace: args.additionalNamespace = helpers.createNamespaceList( @@ -295,7 +295,7 @@ def main(args, parser, ard): api_res_list = [f"{a['apiVersion']}/{a['kind']}" for a in ard.apiresources["items"]] for csr in args.clusterScopedResource: if csr[0] not in api_res_list: - parser.error( + helpers.parserError( f"argument -c/--clusterScopedResource: invalid choice: '{csr[0]}' " f"(choose from {', '.join(api_res_list)})" ) @@ -317,7 +317,7 @@ def main(args, parser, ard): ) else: rc = astraSDK.apps.manageApp(quiet=args.quiet, verbose=args.verbose).main( - helpers.isRFC1123(args.appName, parser=parser), + helpers.isRFC1123(args.appName), args.namespace, args.clusterID, args.labelSelectors, @@ -328,7 +328,7 @@ def main(args, parser, ard): raise SystemExit("astraSDK.apps.manageApp() failed") elif args.objectType == "bucket" or args.objectType == "appVault": - validateBucketArgs(args, parser) + validateBucketArgs(args) if ard.needsattr("credentials"): ard.credentials = astraSDK.k8s.getSecrets( config_context=args.v3, skip_tls_verify=args.skip_tls_verify @@ -336,7 +336,7 @@ def main(args, parser, ard): if args.v3: if args.accessKey or (args.json and args.provider == "aws"): if args.json and args.provider == "aws": - args.accessKey, args.accessSecret = helpers.extractAwsKeys(args.json, parser) + args.accessKey, args.accessSecret = helpers.extractAwsKeys(args.json) crc = create.createV3S3Credential( args.v3, args.dry_run, @@ -356,7 +356,6 @@ def main(args, parser, ard): args.verbose, args.json, args.bucketName, - parser, ) if args.accessKey or args.json: args.credential = [] @@ -377,7 +376,6 @@ def main(args, parser, ard): args.skipCertValidation, args.credential, ard, - parser, ) else: if args.accessKey: @@ -392,7 +390,6 @@ def main(args, parser, ard): args.json, args.bucketName, args.provider, - parser, ) args.credential = crc["id"] manageBucket( @@ -420,7 +417,6 @@ def main(args, parser, ard): args.cloudID, args.headless, ard, - parser, ) else: rc = astraSDK.clusters.manageCluster(quiet=args.quiet, verbose=args.verbose).main( @@ -434,14 +430,15 @@ def main(args, parser, ard): # First create the credential if args.cloudType != "private": if args.credentialPath is None: - parser.error(f"--credentialPath is required for cloudType of {args.cloudType}") + helpers.parserError( + f"--credentialPath is required for cloudType of {args.cloudType}" + ) rc = create.createCloudCredential( args.quiet, args.verbose, args.credentialPath, args.cloudName, args.cloudType, - parser, ) credentialID = rc["id"] # Next manage the cloud @@ -455,7 +452,7 @@ def main(args, parser, ard): raise SystemExit("astraSDK.clouds.manageCloud() failed") elif args.objectType == "ldap": ard.settings = astraSDK.settings.getSettings().main() - ldapSetting = ard.getSingleDict("settings", "name", "astra.account.ldap", parser) + ldapSetting = ard.getSingleDict("settings", "name", "astra.account.ldap") rc = astraSDK.settings.manageLdap(quiet=args.quiet, verbose=args.verbose).main( ldapSetting["id"], ldapSetting["currentConfig"] ) diff --git a/tkSrc/unmanage.py b/tkSrc/unmanage.py index bb072dc..4733356 100644 --- a/tkSrc/unmanage.py +++ b/tkSrc/unmanage.py @@ -15,10 +15,11 @@ limitations under the License. """ +from tkSrc import helpers import astraSDK -def main(args, parser, ard): +def main(args, ard): if args.objectType == "app" or args.objectType == "application": if args.v3: astraSDK.k8s.destroyResource( @@ -58,9 +59,11 @@ def main(args, parser, ard): config_context=args.v3, skip_tls_verify=args.skip_tls_verify ).main("astraconnectors", version="v1", group="astra.netapp.io") if ard.connectors is None or len(ard.connectors["items"]) == 0: - parser.error("AstraConnector operator not found on current Kubernetes context") + helpers.parserError( + "AstraConnector operator not found on current Kubernetes context" + ) elif len(ard.connectors["items"]) > 1: - parser.error( + helpers.parserError( "multiple AstraConnector operators found on current Kubernetes context" ) connector = ard.connectors["items"][0] @@ -141,7 +144,7 @@ def main(args, parser, ard): raise SystemExit("astraSDK.clusters.unmanageCloud() failed") elif args.objectType == "ldap": ard.settings = astraSDK.settings.getSettings().main() - ldapSetting = ard.getSingleDict("settings", "name", "astra.account.ldap", parser) + ldapSetting = ard.getSingleDict("settings", "name", "astra.account.ldap") rc = astraSDK.settings.unmanageLdap(quiet=args.quiet, verbose=args.verbose).main( ldapSetting["id"], ldapSetting["currentConfig"] ) diff --git a/tkSrc/update.py b/tkSrc/update.py index a5d6340..e39a674 100644 --- a/tkSrc/update.py +++ b/tkSrc/update.py @@ -19,24 +19,24 @@ import json import yaml - +from tkSrc import helpers import astraSDK -def main(args, parser, ard): +def main(args, ard): if args.objectType == "bucket": # Validate that both credentialID and accessKey/accessSecret were not specified if args.credentialID is not None and ( args.accessKey is not None or args.accessSecret is not None ): - parser.error( + helpers.parserError( "if a credentialID is specified, neither accessKey nor accessSecret" + " should be specified." ) # Validate args and create credential if credentialID was not specified if args.credentialID is None: if args.accessKey is None or args.accessSecret is None: - parser.error( + helpers.parserError( "if a credentialID is not specified, both accessKey and " + "accessSecret arguments must be provided." ) @@ -54,7 +54,7 @@ def main(args, parser, ard): cloudName="s3", ) except StopIteration: - parser.error(f"{args.bucketID} does not seem to be a valid bucketID") + helpers.parserError(f"{args.bucketID} does not seem to be a valid bucketID") if crc: args.credentialID = crc["id"] else: @@ -71,14 +71,14 @@ def main(args, parser, ard): try: credDict = json.loads(f.read().rstrip()) except json.decoder.JSONDecodeError: - parser.error(f"{args.credentialPath} does not seem to be valid JSON") + helpers.parserError(f"{args.credentialPath} does not seem to be valid JSON") encodedStr = base64.b64encode(json.dumps(credDict).encode("utf-8")).decode("utf-8") if ard.needsattr("clouds"): ard.clouds = astraSDK.clouds.getClouds().main() try: cloud = next(c for c in ard.clouds["items"] if c["id"] == args.cloudID) except StopIteration: - parser.error(f"{args.cloudID} does not seem to be a valid cloudID") + helpers.parserError(f"{args.cloudID} does not seem to be a valid cloudID") rc = astraSDK.credentials.createCredential(quiet=args.quiet, verbose=args.verbose).main( "astra-sa@" + cloud["name"], "service-account", @@ -101,7 +101,7 @@ def main(args, parser, ard): # Get the cluster information based on the clusterID input if ard.needsattr("clusters"): ard.clusters = astraSDK.clusters.getClusters().main() - cluster = ard.getSingleDict("clusters", "id", args.clusterID, parser) + cluster = ard.getSingleDict("clusters", "id", args.clusterID) # Currently this is required to be True, but this will not always be the case if args.credentialPath: with open(args.credentialPath, encoding="utf8") as f: @@ -126,16 +126,16 @@ def main(args, parser, ard): elif args.objectType == "protection" or args.objectType == "schedule": if ard.needsattr("protections"): ard.protections = astraSDK.protections.getProtectionpolicies().main() - protection = ard.getSingleDict("protections", "id", args.protection, parser) + protection = ard.getSingleDict("protections", "id", args.protection) granularity = protection["granularity"] if granularity == "hourly" and args.hour: - parser.error(f"{granularity} granularity must not specify -H / --hour") + helpers.parserError(f"{granularity} granularity must not specify -H / --hour") if granularity == "hourly" or granularity == "daily" or granularity == "monthly": if args.dayOfWeek: - parser.error(f"{granularity} granularity must not specify -W / --dayOfWeek") + helpers.parserError(f"{granularity} granularity must not specify -W / --dayOfWeek") if granularity == "hourly" or granularity == "daily" or granularity == "weekly": if args.dayOfMonth: - parser.error(f"{granularity} granularity must not specify -M / --dayOfMonth") + helpers.parserError(f"{granularity} granularity must not specify -M / --dayOfMonth") rc = astraSDK.protections.updateProtectionpolicy( quiet=args.quiet, verbose=args.verbose ).main( @@ -161,19 +161,19 @@ def main(args, parser, ard): if ard.needsattr("replications"): ard.replications = astraSDK.replications.getReplicationpolicies().main() if not ard.replications: # Gracefully handle ACS env - parser.error("'replication' commands are currently only supported in ACC.") + helpers.parserError("'replication' commands are currently only supported in ACC.") repl = None for replication in ard.replications["items"]: if args.replicationID == replication["id"]: repl = replication if not repl: - parser.error(f"replicationID {args.replicationID} not found") + helpers.parserError(f"replicationID {args.replicationID} not found") # Make call based on operation type if args.operation == "resync": if not args.dataSource: - parser.error("--dataSource must be provided for 'resync' operations") + helpers.parserError("--dataSource must be provided for 'resync' operations") if repl["state"] != "failedOver": - parser.error( + helpers.parserError( "to resync a replication, it must be in a 'failedOver' state" + f", not a(n) '{repl['state']}' state" ) @@ -200,7 +200,7 @@ def main(args, parser, ard): destinationClusterID=repl["sourceClusterID"], ) else: - parser.error( + helpers.parserError( f"dataSource '{args.dataSource}' not one of:\n" + f"\t{repl['sourceAppID']}\t(original sourceAppID)\n" + f"\t{repl['sourceClusterID']}\t(original sourceClusterID)\n" @@ -209,7 +209,7 @@ def main(args, parser, ard): ) elif args.operation == "reverse": if repl["state"] != "established" and repl["state"] != "failedOver": - parser.error( + helpers.parserError( "to reverse a replication, it must be in an `established` or " + f"'failedOver' state, not a(n) '{repl['state']}' state" ) @@ -225,7 +225,7 @@ def main(args, parser, ard): ) else: # failover if repl["state"] != "established": - parser.error( + helpers.parserError( "to failover a replication, it must be in an 'established' state" + f", not a(n) '{repl['state']}' state" ) diff --git a/toolkit.py b/toolkit.py index fe44050..f6f14d6 100755 --- a/toolkit.py +++ b/toolkit.py @@ -170,6 +170,8 @@ def main(argv=sys.argv): tkParser = tkSrc.parser.ToolkitParser(acl, plaidMode=plaidMode, v3=v3) parser = tkParser.main() args = parser.parse_args(args=argv) + tkParser, parser = None, None # Memory optimization + if args.v3: v3_dict = {"deploy": ["acp", "chart"]} v3_dict.update( @@ -220,32 +222,34 @@ def main(argv=sys.argv): ], ) ) - tkSrc.helpers.checkv3Support(args, parser, v3_dict) + tkSrc.helpers.checkv3Support(args, v3_dict) if args.dry_run and not args.v3: - parser.error("--dry-run can only be used in conjunction with --v3") + tkSrc.helpers.parserError("--dry-run can only be used in conjunction with --v3") elif args.skip_tls_verify and not args.v3: - parser.error("--insecure-skip-tls-verify can only be used in conjunction with --v3") + tkSrc.helpers.parserError( + "--insecure-skip-tls-verify can only be used in conjunction with --v3" + ) if args.subcommand == "deploy": - tkSrc.deploy.main(args, parser, ard) + tkSrc.deploy.main(args, ard) elif args.subcommand == "clone" or args.subcommand == "restore": - tkSrc.clone.main(args, parser, ard) + tkSrc.clone.main(args, ard) elif args.subcommand == "ipr": - tkSrc.ipr.main(args, parser, ard) + tkSrc.ipr.main(args, ard) elif args.subcommand == "list" or args.subcommand == "get": tkSrc.list.main(args) elif args.subcommand == "copy": tkSrc.copy.main(args) elif args.subcommand == "create": - tkSrc.create.main(args, parser, ard) + tkSrc.create.main(args, ard) elif args.subcommand == "manage" or args.subcommand == "define": - tkSrc.manage.main(args, parser, ard) + tkSrc.manage.main(args, ard) elif args.subcommand == "destroy": - tkSrc.destroy.main(args, parser, ard) + tkSrc.destroy.main(args, ard) elif args.subcommand == "unmanage": - tkSrc.unmanage.main(args, parser, ard) + tkSrc.unmanage.main(args, ard) elif args.subcommand == "update": - tkSrc.update.main(args, parser, ard) + tkSrc.update.main(args, ard) if __name__ == "__main__":