From bdb98389bbf434fec442c1d059558cb7f584a874 Mon Sep 17 00:00:00 2001 From: Uri Laserson Date: Fri, 1 May 2015 21:11:08 -0400 Subject: [PATCH] [EGGO-18] WIP: Setup config files --- client.cfg | 3 +- conf/{ => director}/aws.conf | 0 ...-cloudera-us-east-1-public-subnet.template | 24 +- conf/eggo/eggo.cfg | 45 ++++ eggo/config.py | 61 ++++- eggo/dag.py | 209 +++++++++++------- eggo/director.py | 25 ++- 7 files changed, 258 insertions(+), 109 deletions(-) rename conf/{ => director}/aws.conf (100%) rename conf/{ => director}/cfn-cloudera-us-east-1-public-subnet.template (97%) create mode 100644 conf/eggo/eggo.cfg diff --git a/client.cfg b/client.cfg index d7718ec..1fc25c7 100644 --- a/client.cfg +++ b/client.cfg @@ -1,6 +1,7 @@ +# Luigi client config + [core] logging_conf_file: luigi_logging.ini [hadoop] command: hadoop -# command: /root/ephemeral-hdfs/bin/hadoop diff --git a/conf/aws.conf b/conf/director/aws.conf similarity index 100% rename from conf/aws.conf rename to conf/director/aws.conf diff --git a/conf/cfn-cloudera-us-east-1-public-subnet.template b/conf/director/cfn-cloudera-us-east-1-public-subnet.template similarity index 97% rename from conf/cfn-cloudera-us-east-1-public-subnet.template rename to conf/director/cfn-cloudera-us-east-1-public-subnet.template index 2cfcf87..e3a4805 100644 --- a/conf/cfn-cloudera-us-east-1-public-subnet.template +++ b/conf/director/cfn-cloudera-us-east-1-public-subnet.template @@ -2,14 +2,14 @@ "AWSTemplateFormatVersion" : "2010-09-09", "Description" : "This template creates a VPC infrastructure for a single-AZ, single public subnet deployment of CDH", "Parameters" : { - + "KeyPairName" : { "Type" : "String" }, "DMZCIDR" : { "Type" : "String", "Default" : "10.1.1.0/24" }, "VPCCIDR" : { "Type" : "String", "Default" : "10.1.0.0/16" }, "AZ" : { "Type" : "String", "Default" : "us-east-1b" }, "DomainDNSName" : { "Type" : "String", "Default" : "ec2.internal" } }, - + "Resources" : { "VPC" : { @@ -24,7 +24,7 @@ ] } }, - + "DHCPOptions" : { "Type" : "AWS::EC2::DHCPOptions", "Properties" : { @@ -32,7 +32,7 @@ "DomainNameServers" : ["AmazonProvidedDNS"] } }, - + "VPCDHCPOptionsAssociation" : { "Type" : "AWS::EC2::VPCDHCPOptionsAssociation", "Properties" : { @@ -40,7 +40,7 @@ "DhcpOptionsId" : {"Ref" : "DHCPOptions"} } }, - + "DMZSubnet" : { "Type" : "AWS::EC2::Subnet", "Properties" : { @@ -64,7 +64,7 @@ ] } }, - + "ClusterSG" : { "Type" : "AWS::EC2::SecurityGroup", "Properties" : { @@ -76,7 +76,7 @@ "VpcId" : { "Ref" : "VPC" } } }, - + "AttachGateway" : { "Type" : "AWS::EC2::VPCGatewayAttachment", "Properties" : { @@ -84,7 +84,7 @@ "InternetGatewayId" : { "Ref" : "InternetGateway" } } }, - + "DMZRouteTable" : { "Type" : "AWS::EC2::RouteTable", "Properties" : { @@ -95,7 +95,7 @@ ] } }, - + "DMZRoute" : { "Type" : "AWS::EC2::Route", "Properties" : { @@ -104,7 +104,7 @@ "GatewayId" : { "Ref" : "InternetGateway" } } }, - + "DMZSubnetRouteTableAssociation" : { "Type" : "AWS::EC2::SubnetRouteTableAssociation", "Properties" : { @@ -113,7 +113,7 @@ } } }, - + "Outputs" : { "VPC" : { "Value" : { "Ref" : "VPC" }, @@ -127,5 +127,5 @@ "Value" : { "Ref" : "ClusterSG"}, "Description" : "Cluster security group" } - } + } } \ No newline at end of file diff --git a/conf/eggo/eggo.cfg b/conf/eggo/eggo.cfg new file mode 100644 index 0000000..5acd2fb --- /dev/null +++ b/conf/eggo/eggo.cfg @@ -0,0 +1,45 @@ +; Eggo "global" config + +[paths] +; The location of the result data sets +; supports S3 (s3n://), HDFS (hdfs://), and local (file://) targets +; the specified URL will be the root of the eggo directory structure (see spec) +eggo_base_url: s3n://bdg-eggo + +; Path to stage the raw input data on the target distributed fs +raw_data_prefix: raw + +; Path to store tmp/intermediate data in the target distributed fs +; tmp data always ends up in eggo_base_url/tmp/dataset_name/tmp_data_prefix/... +;tmp_data_prefix: ; if not set manually, module load generates random prefix + +; Absolute path to store tmp data on remote worker machines' local file systems +; e.g., on EC2 machines, this could be the ephemeral drive mount point /mnt +worker_local_tmp_data_dir: /mnt + + +[hadoop] +; NOTE: make sure the Luigi hadoop config is appropriately set in client.cfg +hadoop_home: /root/ephemeral-hdfs ; overridden by HADOOP_HOME env var +streaming_jar: ; this will override the similar option in Luigi's client.cfg + +[aws] +; These will all be overridden by corresponding env variables (in ALL_CAPS) +aws_access_key_id: +aws_secret_access_key: +ec2_key_pair: +ec2_private_key_file: + + +[director] +; TODO: add docs to these options +region: us-east-1 +launcher_instance_type: m3.large +launcher_ami: ami-a25415cb ; RHEL 6.4 x86 +cluster_ami: %(launcher_ami)s +stack_name: bdg-eggo-%(ec2_key_pair)s +; the pointers to the director configs are executed relative to CWD (which may +; be the same as EGGO_HOME); set them to an absolute path if desired, or use +; "%(eggo_home)s" to access the EGGO_HOME env variable +cloudformation_template: conf/director/cfn-cloudera-us-east-1-public-subnet.template +director_conf_template: conf/director/aws.conf diff --git a/eggo/config.py b/eggo/config.py index 2745a92..7b96421 100644 --- a/eggo/config.py +++ b/eggo/config.py @@ -15,19 +15,62 @@ # limitations under the License. import os +from ConfigParser import SafeConfigParser from eggo.util import random_id -# path to store raw input data -RAW_DATA_KEY_PREFIX = 'raw' -# each module load/invocation will generate a new temp location in the distributed fs -TMP_DATA_KEY_PREFIX = random_id() -EGGO_BASE_URL = os.environ.get('EGGO_BASE_URL', 's3n://bdg-eggo') -EGGO_TMP_URL = os.path.join(EGGO_BASE_URL, TMP_DATA_KEY_PREFIX) -EGGO_RAW_URL = os.path.join(EGGO_BASE_URL, RAW_DATA_KEY_PREFIX) +# EGGO CONFIGURATION +def _init_eggo_config(): + # if EGGO_HOME is set, add it to the config + if 'EGGO_HOME' in os.environ: + eh = {'eggo_home': os.environ['EGGO_HOME']} -def validate_config(d): - """Validate a JSON config file for an eggo dataset""" + eggo_config = SafeConfigParser(defaults=eh, + dict_type=dict, + allow_no_value=False) + + with open(os.environ['EGGO_CONFIG'], 'r') as ip: + eggo_config.readfp(ip, os.environ['EGGO_CONFIG']) + + # Get HADOOP_HOME from env var if set + if 'HADOOP_HOME' in os.environ: + eggo_config.set('hadoop', + 'hadoop_home', + os.environ['HADOOP_HOME']) + + # Set AWS variables from environment if available + if 'AWS_ACCESS_KEY_ID' in os.environ: + eggo_config.set('aws', + 'aws_access_key_id', + os.environ['AWS_ACCESS_KEY_ID']) + if 'AWS_SECRET_ACCESS_KEY' in os.environ: + eggo_config.set('aws', + 'aws_secret_access_key', + os.environ['AWS_SECRET_ACCESS_KEY']) + if 'EC2_KEY_PAIR' in os.environ: + eggo_config.set('aws', + 'ec2_key_pair', + os.environ['EC2_KEY_PAIR']) + if 'EC2_PRIVATE_KEY_FILE' in os.environ: + eggo_config.set('aws', + 'ec2_private_key_file', + os.environ['EC2_PRIVATE_KEY_FILE']) + + # Set tmp_data_prefix if not provided in the config file; each module + # load/invocation will generate a new tmp location in the distributed fs + if not eggo_config.has_option('paths', 'tmp_data_prefix'): + eggo_config.set('paths', 'tmp_data_prefix', random_id()) + + return eggo_config + + +eggo_config = _init_eggo_config() + + +# TOAST CONFIGURATION + +def validate_toast_config(d): + """Validate a JSON config file for an eggo dataset (a "toast").""" pass diff --git a/eggo/dag.py b/eggo/dag.py index 97cd579..4b2be7c 100644 --- a/eggo/dag.py +++ b/eggo/dag.py @@ -19,63 +19,127 @@ import os import sys import json -from time import sleep from shutil import rmtree from tempfile import mkdtemp from subprocess import Popen from luigi import Task, Config -from luigi.s3 import S3Target, S3FlagTarget, S3Client +from luigi.s3 import S3Target, S3Client from luigi.hdfs import HdfsClient, HdfsTarget +from luigi.file import LocalTarget from luigi.hadoop import JobTask, HadoopJobRunner from luigi.parameter import Parameter -from eggo.config import validate_config, EGGO_BASE_URL, EGGO_RAW_URL, EGGO_TMP_URL +from eggo.config import eggo_config, validate_toast_config from eggo.util import random_id, build_dest_filename -def raw_data_url(dataset_name): - return os.path.join(EGGO_RAW_URL, dataset_name) + '/' +class JsonFileParameter(Parameter): + def parse(self, p): + with open(p, 'r') as ip: + json_data = json.load(ip) + validate_toast_config(json_data) + return json_data -def target_url(dataset_name, format='bdg', edition='basic'): - return os.path.join(EGGO_BASE_URL, dataset_name, format, edition) + '/' +class ToastConfig(Config): + config = JsonFileParameter() # the toast (JSON) configuration + def raw_data_url(self): + return os.path.join(eggo_config.get('paths', 'eggo_base_url'), + eggo_config.get('paths', 'raw_data_prefix'), + self.config['name']) -def dataset_url(dataset_name): - return os.path.join(EGGO_BASE_URL, dataset_name) + '/' + def dataset_url(self): + return os.path.join(eggo_config.get('paths', 'eggo_base_url'), + self.config['name']) + def edition_url(self, format='bdg', edition='basic'): + return os.path.join(eggo_config.get('paths', 'eggo_base_url'), + self.config['name'], format, edition) -def flag_target(path): - if path.startswith("s3:") or path.startswith("s3n:"): - return S3FlagTarget(path) - return HdfsTarget(path) + def tmp_data_url(self): + return os.path.join(eggo_config.get('paths', 'eggo_base_url'), 'tmp', + self.config['name'], + eggo_config.get('paths', 'tmp_data_prefix')) -class JsonFileParameter(Parameter): +def FlagTargetMixin(object): + def exists(self): + return self.fs.exists(os.path.join(self.path, self.flag)) - def parse(self, p): - with open(p, 'r') as ip: - json_data = json.load(ip) - validate_config(json_data) - return json_data +class S3FlagTarget(S3Target, FlagTargetMixin): + # NOTE: we are implementing our own S3FlagTarget even though Luigi supplies + # this class because the Luigi version requires paths to end in a slash, + # which is annoying to keep track of. Instead, we join paths using + # os.path.join() + def __init__(self, path, flag='_SUCCESS'): + super(S3FlagTarget, self).__init__(path) + self.flag = flag + + +class HdfsFlagTarget(HdfsTarget, FlagTargetMixin): + def __init__(self, path, flag='_SUCCESS'): + super(HdfsFlagTarget, self).__init__(path) + self.flag = flag -class ToastConfig(Config): - config = JsonFileParameter() +class LocalFlagTarget(LocalTarget, FlagTargetMixin): + def __init__(self, path, flag='_SUCCESS'): + super(LocalFlagTarget, self).__init__(path) + self.flag = flag -def _dnload_to_local_upload_to_hadoop(source, destination, compression): - EPHEMERAL_MOUNT = os.environ.get('EPHEMERAL_MOUNT', '/mnt') - tmp_dir = mkdtemp(prefix='tmp_eggo_', dir=EPHEMERAL_MOUNT) +def flag_target(path): + if (path.startswith('s3:') or path.startswith('s3n:') + or path startswith('s3a:'): + return S3FlagTarget(path) + elif path.startswith('hdfs:'): + return HdfsFlagTarget(path) + elif path.startswith('file:'): + return LocalFlagTarget(path) + else: + raise ValueError('Unrecognized URI protocol: {path}'.format(path)) + + +def file_target(path): + if (path.startswith('s3:') or path.startswith('s3n:') + or path startswith('s3a:'): + return S3Target(path) + elif path.startswith('hdfs:'): + return HdfsTarget(path) + elif path.startswith('file:'): + return LocalTarget(path) + else: + raise ValueError('Unrecognized URI protocol: {path}'.format(path)) + + +def create_SUCCESS_file(path): + if (path.startswith('s3:') or path.startswith('s3n:') + or path startswith('s3a:'): + s3_client = S3Client(eggo_config.get('aws', 'aws_access_key_id'), + eggo_config.get('aws', 'aws_secret_access_key')) + s3_client.put_string('', os.path.join(path, '_SUCCESS')) + elif path.startswith('hdfs:'): + hdfs_client = HdfsClient() + hdfs_client.put('/dev/null', os.path.join(path, '_SUCCESS')) + elif path.startswith('file:'): + open(os.path.join(path, '_SUCCESS'), 'a').close() + + +def _dnload_to_local_upload_to_dfs(source, destination, compression): # source: (string) URL suitable for curl # destination: (string) full Hadoop path of destination file name # compression: (bool) whether file needs to be decompressed + tmp_local_dir = mkdtemp( + prefix='tmp_eggo_', + dir=eggo_config.get('paths', 'worker_local_tmp_data_dir')) try: # 1. dnload file - dnload_cmd = 'pushd {tmp_dir} && curl -L -O {source} && popd' - p = Popen(dnload_cmd.format(tmp_dir=tmp_dir, source=source), + dnload_cmd = 'pushd {tmp_local_dir} && curl -L -O {source} && popd' + p = Popen(dnload_cmd.format(tmp_local_dir=tmp_local_dir, + source=source), shell=True) p.wait() @@ -83,56 +147,54 @@ def _dnload_to_local_upload_to_hadoop(source, destination, compression): if compression: compression_type = os.path.splitext(source)[-1] if compression_type == '.gz': - decompr_cmd = ('pushd {tmp_dir} && gunzip *.gz && popd') + decompr_cmd = ('pushd {tmp_local_dir} && gunzip *.gz && popd') else: raise ValueError("Unknown compression type: {0}".format( compression_type)) - p = Popen(decompr_cmd.format(tmp_dir=tmp_dir), shell=True) + p = Popen(decompr_cmd.format(tmp_local_dir=tmp_local_dir), + shell=True) p.wait() # 3. upload to tmp distributed filesystem location (e.g. S3) - hadoop_home = os.environ.get('HADOOP_HOME', '/root/ephemeral-hdfs') - tmp_hadoop_path = os.path.join(EGGO_TMP_URL, random_id()) - upload_cmd = 'pushd {tmp_dir} && ' \ - '{hadoop_home}/bin/hadoop fs -mkdir -p {tmp_hadoop_dir} && ' \ - '{hadoop_home}/bin/hadoop fs -put ./* {tmp_path} && popd' - p = Popen(upload_cmd.format(tmp_dir=tmp_dir, hadoop_home=hadoop_home, - tmp_hadoop_dir=EGGO_TMP_URL, - tmp_path=tmp_hadoop_path), + tmp_staged_dir = os.path.join(ToastConfig().tmp_data_url(), 'staged') + upload_cmd = ('pushd {tmp_local_dir} && ' + '{hadoop_home}/bin/hadoop fs -mkdir -p {tmp_dfs_dir} && ' + '{hadoop_home}/bin/hadoop fs -put ./* {tmp_dfs_dir} && ' + 'popd') + p = Popen(upload_cmd.format(tmp_local_dir=tmp_local_dir, + hadoop_home=eggo_config.get('hadoop', + 'hadoop_home'), + tmp_dfs_dir=tmp_staged_dir), shell=True) p.wait() # 4. rename to final target location rename_cmd = '{hadoop_home}/bin/hadoop fs -mv {tmp_path} {final_path}' - p = Popen(rename_cmd.format(tmp_path=tmp_hadoop_path, hadoop_home=hadoop_home, + p = Popen(rename_cmd.format(tmp_path=tmp_staged_dir, + hadoop_home=eggo_config.get('hadoop', + 'hadoop_home'), final_path=destination), shell=True) p.wait() except: raise finally: - rmtree(tmp_dir) - + rmtree(tmp_local_dir) -def create_SUCCESS_file(s3_path): - s3client = S3Client(os.environ['AWS_ACCESS_KEY_ID'], - os.environ['AWS_SECRET_ACCESS_KEY']) - s3client.put_string('', os.path.join(s3_path, '_SUCCESS')) - -class DownloadFileToS3Task(Task): +class DownloadFileToDFSTask(Task): """Download a file, decompress, and move to S3.""" source = Parameter() # string: URL suitable for curl - target = Parameter() # string: full S3 path of destination file name + target = Parameter() # string: full URL path of destination file name compression = Parameter() # bool: whether file needs to be decompressed def run(self): - _dnload_to_local_upload_to_hadoop( + _dnload_to_local_upload_to_dfs( self.source, self.target, self.compression) def output(self): - return S3Target(path=self.target) + return file_target(path=self.target) class DownloadDatasetTask(Task): @@ -144,7 +206,7 @@ def requires(self): for source in ToastConfig().config['sources']: dest_name = build_dest_filename(source['url'], decompress=source['compression']) - yield DownloadFileToS3Task( + yield DownloadFileToDFSTask( source=source['url'], target=os.path.join(self.destination, dest_name), compression=source['compression']) @@ -160,9 +222,9 @@ class PrepareHadoopDownloadTask(Task): hdfs_path = Parameter() def run(self): - EPHEMERAL_MOUNT = os.environ.get('EPHEMERAL_MOUNT', '/mnt') - tmp_dir = mkdtemp(prefix='tmp_eggo_', dir=EPHEMERAL_MOUNT) - + tmp_dir = mkdtemp( + prefix='tmp_eggo_', + dir=eggo_config.get('paths', 'worker_local_tmp_data_dir')) try: # build the remote command for each source tmp_command_file = '{0}/command_file'.format(tmp_dir) @@ -195,8 +257,9 @@ def requires(self): def job_runner(self): addl_conf = {'mapred.map.tasks.speculative.execution': 'false', 'mapred.task.timeout': 12000000} - streaming_args=['-cmdenv', 'AWS_ACCESS_KEY_ID=' + os.environ['AWS_ACCESS_KEY_ID'], - '-cmdenv', 'AWS_SECRET_ACCESS_KEY=' + os.environ['AWS_SECRET_ACCESS_KEY']] + # TODO: can we delete this with Director? does it set AWS cred in core-site.xml? + streaming_args=['-cmdenv', 'AWS_ACCESS_KEY_ID=' + eggo_config.get('aws', 'aws_access_key_id'), + '-cmdenv', 'AWS_SECRET_ACCESS_KEY=' + eggo_config.get('aws', 'aws_secret_access_key')] return HadoopJobRunner(streaming_jar=os.environ['STREAMING_JAR'], streaming_args=streaming_args, jobconfs=addl_conf, @@ -210,12 +273,12 @@ def mapper(self, line): decompress=source['compression']) dest_url = os.path.join(self.destination, dest_name) if dest_url.startswith("s3:") or dest_url.startswith("s3n:"): - client = S3Client(os.environ['AWS_ACCESS_KEY_ID'], - os.environ['AWS_SECRET_ACCESS_KEY']) + client = S3Client(eggo_config.get('aws', 'aws_access_key_id'), + eggo_config.get('aws', 'aws_secret_access_key')) else: client = HdfsClient() if not client.exists(dest_url): - _dnload_to_local_upload_to_hadoop( + _dnload_to_local_upload_to_dfs( source['url'], dest_url, source['compression']) yield (source['url'], 1) # dummy output @@ -227,10 +290,10 @@ def output(self): class DeleteDatasetTask(Task): def run(self): - hadoop_home = os.environ.get('HADOOP_HOME', '/root/ephemeral-hdfs') delete_raw_cmd = '{hadoop_home}/bin/hadoop fs -rm -r {raw} {target}'.format( - hadoop_home=hadoop_home, raw=raw_data_url(ToastConfig().config['name']), - target=dataset_url(ToastConfig().config['name'])) + hadoop_home=eggo_config.get('hadoop', 'hadoop_home'), + raw=ToastConfig().raw_data_url(), + target=ToastConfig().dataset_url()) p = Popen(delete_raw_cmd, shell=True) p.wait() @@ -242,7 +305,8 @@ class ADAMBasicTask(Task): edition = 'basic' def requires(self): - return DownloadDatasetHadoopTask(destination=raw_data_url(ToastConfig().config['name'])) + return DownloadDatasetHadoopTask( + destination=ToastConfig().raw_data_url()) def run(self): format = ToastConfig().config['sources'][0]['format'].lower() @@ -254,8 +318,8 @@ def run(self): tmp_hadoop_path = '/tmp/{rand_id}.{format}'.format(rand_id=random_id(), format=format) distcp_cmd = '{hadoop_home}/bin/hadoop distcp {source} {target}'.format( - hadoop_home=os.environ['HADOOP_HOME'], - source=raw_data_url(ToastConfig().config['name']), target=tmp_hadoop_path) + hadoop_home=eggo_config.get('hadoop', 'hadoop_home'), + source=ToastConfig().raw_data_url(), target=tmp_hadoop_path) p = Popen(distcp_cmd, shell=True) p.wait() @@ -265,14 +329,12 @@ def run(self): adam_home=os.environ['ADAM_HOME'], spark_master_url=os.environ['SPARK_MASTER_URL'], adam_command=self.adam_command, source=tmp_hadoop_path, - target=target_url(ToastConfig().config['name'], - edition=self.edition)) + target=ToastConfig().edition_url(edition=self.edition)) p = Popen(adam_cmd, shell=True) p.wait() def output(self): - return flag_target( - target_url(ToastConfig().config['name'], edition=self.edition)) + return flag_target(ToastConfig().edition_url(edition=self.edition)) class ADAMFlattenTask(Task): @@ -291,23 +353,20 @@ def run(self): ' {source} {target}').format( adam_home=os.environ['ADAM_HOME'], spark_master_url=os.environ['SPARK_MASTER_URL'], - source=target_url(ToastConfig().config['name'], - edition=self.source_edition), - target=target_url(ToastConfig().config['name'], - edition=self.edition)) + source=ToastConfig().edition_url( + edition=self.source_edition), + target=ToastConfig().edition_url(edition=self.edition)) p = Popen(adam_cmd, shell=True) p.wait() def output(self): - return flag_target( - target_url(ToastConfig().config['name'], edition=self.edition)) + return flag_target(ToastConfig().edition_url(edition=self.edition)) class ToastTask(Task): def output(self): - return flag_target( - target_url(ToastConfig().config['name'], edition=self.edition)) + return flag_target(ToastConfig().edition_url(edition=self.edition)) class VCF2ADAMTask(Task): diff --git a/eggo/director.py b/eggo/director.py index ffafbdb..f43dc4c 100644 --- a/eggo/director.py +++ b/eggo/director.py @@ -26,17 +26,18 @@ from fabric.api import local, env, run, execute, prefix, put, open_shell from tempfile import mkdtemp -REGION = 'us-east-1' -LAUNCHER_AMI = 'ami-a25415cb' # RHEL 6.4 x86 -LAUNCHER_INSTANCE_TYPE = 'm3.large' -CLUSTER_AMI = LAUNCHER_AMI -AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] -AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] -EC2_KEY_PAIR = os.environ['EC2_KEY_PAIR'] -EC2_PRIVATE_KEY_FILE = os.environ['EC2_PRIVATE_KEY_FILE'] -STACK_NAME = 'bdg-eggo-{key_pair}'.format(key_pair=EC2_KEY_PAIR) -CLOUDFORMATION_TEMPLATE='conf/cfn-cloudera-us-east-1-public-subnet.template' -DIRECTOR_CONF_TEMPLATE='conf/aws.conf' +AWS_ACCESS_KEY_ID = eggo_config.get('aws', 'aws_access_key_id') +AWS_SECRET_ACCESS_KEY = eggo_config.get('aws', 'aws_secret_access_key') +EC2_KEY_PAIR = eggo_config.get('aws', 'ec2_key_pair') +EC2_PRIVATE_KEY_FILE = eggo_config.get('aws', 'ec2_private_key_file') + +REGION = eggo_config.get('director', 'region') +LAUNCHER_INSTANCE_TYPE = eggo_config.get('director', 'launcher_instance_type') +LAUNCHER_AMI = eggo_config.get('director', 'launcher_ami') +CLUSTER_AMI = eggo_config.get('director', 'cluster_ami') +STACK_NAME = eggo_config.get('director', 'stack_name') +CLOUDFORMATION_TEMPLATE = eggo_config.get('director', 'cloudformation_template') +DIRECTOR_CONF_TEMPLATE = eggo_config.get('director', 'director_conf_template') def provision(): # create cloud formation stack (VPC etc) @@ -68,7 +69,7 @@ def web_proxy(instance_name, port): local('ssh -i {private_key} -o UserKnownHostsFile=/dev/null ' '-o StrictHostKeyChecking=no -L {port}:{cm_private_ip}:{port} ' 'ec2-user@{cm_public_ip}'.format( - private_key=os.environ['EC2_PRIVATE_KEY_FILE'], + private_key=EC2_PRIVATE_KEY_FILE, port=port, cm_private_ip=instance.private_ip_address, cm_public_ip=instance.ip_address))