diff --git a/circle.yml b/circle.yml new file mode 100644 index 0000000..87b003f --- /dev/null +++ b/circle.yml @@ -0,0 +1,56 @@ +machine: + python: + version: 2.7.12 + +dependencies: + pre: + - sudo apt-get update; sudo apt-get install graphviz + - pip install coveralls + - pip install -r requirements.txt + - mkdir ~/.dataduct + - | + echo " + etl: + ROLE: DataPipelineDefaultRole + RESOURCE_ROLE: DataPipelineDefaultResourceRole + S3_ETL_BUCKET: FILL_ME_IN + ec2: + CORE_INSTANCE_TYPE: m1.large + emr: + CLUSTER_AMI: 2.4.7 + redshift: + DATABASE_NAME: FILL_ME_IN + CLUSTER_ID: FILL_ME_IN + USERNAME: FILL_ME_IN + PASSWORD: FILL_ME_IN + postgres: + DATABASE_NAME: FILL_ME_IN + RDS_INSTANCE_ID: FILL_ME_IN + USERNAME: FILL_ME_IN + PASSWORD: FILL_ME_IN + REGION: FILL_ME_IN + mysql: + DATABASE_KEY: + HOST: FILL_ME_IN + USERNAME: FILL_ME_IN + PASSWORD: FILL_ME_IN" > ~/.dataduct/dataduct.cfg + - | + echo " + [distutils] + index-servers = pypi + [pypi] + repository: $PYPI_REPOSITORY + username: $PYPI_USERNAME + password: $PYPI_PASSWORD" > ~/.pypirc + +test: + override: + - nosetests --with-coverage --cover-package=. --cover-erase +# post: +# - coveralls + +deployment: + pypi: + branch: /.*/ + commands: + - python setup.py sdist upload -r pypi diff --git a/dataduct/etl/etl_pipeline.py b/dataduct/etl/etl_pipeline.py index 5cd5aed..c67907c 100644 --- a/dataduct/etl/etl_pipeline.py +++ b/dataduct/etl/etl_pipeline.py @@ -193,12 +193,17 @@ def create_base_objects(self): else: self.sns = self.create_pipeline_object( object_class=SNSAlarm, - topic_arn=self.topic_arn, + topic_arn=self.topic_arn.replace('all:',''), pipeline_name=self.name, ) + if self.frequency == 'on-demand': + scheduleType='ONDEMAND' + else: + scheduleType='cron' self.default = self.create_pipeline_object( object_class=DefaultObject, pipeline_log_uri=self.s3_log_dir, + scheduleType=scheduleType ) @property @@ -476,8 +481,12 @@ def create_steps(self, steps_params, is_bootstrap=False, 'input_path' not in step_param: step_param['input_node'] = input_node - if is_teardown: - step_param['sns_object'] = self.sns + if hasattr(self.sns,'fields'): + if self.topic_arn.startswith("all:"): + ## Instead of just teardown set sns for every step so as to get SNS alerts with error stack trace + step_param['sns_object'] = self.sns + elif is_teardown: + step_param['sns_object'] = self.sns try: step_class = step_param.pop('step_class') diff --git a/dataduct/pipeline/data_pipeline.py b/dataduct/pipeline/data_pipeline.py index acc5488..de85cc7 100644 --- a/dataduct/pipeline/data_pipeline.py +++ b/dataduct/pipeline/data_pipeline.py @@ -65,7 +65,7 @@ def aws_format(self): Returns: result(list of dict): list of AWS-readable dict of all objects """ - return [x.aws_format() for x in self.objects] + return [x.aws_format() for x in self.objects if hasattr(x,'fields')] def add_object(self, pipeline_object): """Add an object to the datapipeline diff --git a/dataduct/pipeline/pipeline_object.py b/dataduct/pipeline/pipeline_object.py index 1878532..c19cefb 100644 --- a/dataduct/pipeline/pipeline_object.py +++ b/dataduct/pipeline/pipeline_object.py @@ -8,7 +8,7 @@ from ..s3 import S3Path from ..utils.exceptions import ETLInputError - +scheduleType = '' class PipelineObject(object): """DataPipeline class with steps and metadata. @@ -56,12 +56,15 @@ def s3_files(self): Returns: result(list of S3Files): List of files to be uploaded to s3 """ - result = self.additional_s3_files - for _, values in self.fields.iteritems(): - for value in values: - if isinstance(value, S3File) or isinstance(value, S3Directory): - result.append(value) - return result + if hasattr(self,'additional_s3_files'): + result = self.additional_s3_files + for _, values in self.fields.iteritems(): + for value in values: + if isinstance(value, S3File) or isinstance(value, S3Directory): + result.append(value) + return result + else: + return [] def __getitem__(self, key): """Fetch the items associated with a key @@ -130,16 +133,25 @@ def aws_format(self): result: The AWS-readable dict format of the object """ fields = [] - for key, values in self.fields.iteritems(): - for value in values: - if isinstance(value, PipelineObject): - fields.append({'key': key, 'refValue': value.id}) - elif isinstance(value, S3Path): - fields.append({'key': key, 'stringValue': value.uri}) - elif isinstance(value, S3File) or \ - isinstance(value, S3Directory): - fields.append({'key': key, - 'stringValue': value.s3_path.uri}) - else: - fields.append({'key': key, 'stringValue': str(value)}) - return {'id': self._id, 'name': self._id, 'fields': fields} + global scheduleType + if hasattr(self, 'fields'): + for key, values in self.fields.iteritems(): + for value in values: + if isinstance(value, PipelineObject): + if scheduleType == 'ONDEMAND'and key == 'schedule' : + pass + else: + fields.append({'key': key, 'refValue': value.id}) + elif isinstance(value, S3Path): + fields.append({'key': key, 'stringValue': value.uri}) + elif isinstance(value, S3File) or \ + isinstance(value, S3Directory): + fields.append({'key': key, + 'stringValue': value.s3_path.uri}) + else: + if key == 'scheduleType' and str(value) == 'ONDEMAND': + scheduleType = 'ONDEMAND' + fields.append({'key': key, 'stringValue': str(value)}) + return {'id': self._id, 'name': self._id, 'fields': fields} + else: + return None diff --git a/dataduct/pipeline/schedule.py b/dataduct/pipeline/schedule.py index 4909efa..a62aa6e 100644 --- a/dataduct/pipeline/schedule.py +++ b/dataduct/pipeline/schedule.py @@ -37,6 +37,9 @@ '8-hours': ('8 hours', None), '12-hours': ('12 hours', None), 'one-time': ('15 minutes', 1), + 'on-demand': ('ondemand', None), + '30-min': ('30 minutes', None), + '15-min': ('15 minutes', None), } @@ -62,6 +65,10 @@ def __init__(self, load_minutes(int): Minutes at which the pipeline should be run **kwargs(optional): Keyword arguments directly passed to base class """ + if frequency == 'on-demand': + logger.debug("On-demand schedule required so don't create schedule object") + return None + current_time = datetime.utcnow() # Set the defaults for load hour and minutes if load_minutes is None: diff --git a/dataduct/pipeline/sns_alarm.py b/dataduct/pipeline/sns_alarm.py index e7dfa8a..95fe464 100644 --- a/dataduct/pipeline/sns_alarm.py +++ b/dataduct/pipeline/sns_alarm.py @@ -42,7 +42,7 @@ def __init__(self, 'Error Stack Trace: #{node.errorStackTrace}' ]) - subject = 'Data Pipeline %s failed' % pipeline_name + subject = 'Data Pipeline %s #{node.@status}' % pipeline_name if topic_arn is None: topic_arn = SNS_TOPIC_ARN_FAILURE diff --git a/dataduct/s3/utils.py b/dataduct/s3/utils.py index 61a5d49..304a48a 100644 --- a/dataduct/s3/utils.py +++ b/dataduct/s3/utils.py @@ -14,6 +14,22 @@ LARGE_FILE_LIMIT = 5000*1024*1024 # 5gb PROGRESS_SECTIONS = 10 +# ---------------------------------------------------------------- +# Fix for ssl issue with where bucket name has dots in it + +try: + import ssl + _old_match_hostname = ssl.match_hostname + + def _new_match_hostname(cert, hostname): + if hostname.endswith('.s3.amazonaws.com'): + pos = hostname.find('.s3.amazonaws.com') + hostname = hostname[:pos].replace('.', '') + hostname[pos:] + return _old_match_hostname(cert, hostname) + + ssl.match_hostname = _new_match_hostname +except Exception: + pass def get_s3_bucket(bucket_name): """Returns an S3 bucket object from boto