Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for on-demand schedule type and time series #243

Open
wants to merge 18 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
machine:
python:
version: 2.7.12

dependencies:
pre:
- sudo apt-get update; sudo apt-get install graphviz
- pip install coveralls
- pip install -r requirements.txt
- mkdir ~/.dataduct
- |
echo "
etl:
ROLE: DataPipelineDefaultRole
RESOURCE_ROLE: DataPipelineDefaultResourceRole
S3_ETL_BUCKET: FILL_ME_IN
ec2:
CORE_INSTANCE_TYPE: m1.large
emr:
CLUSTER_AMI: 2.4.7
redshift:
DATABASE_NAME: FILL_ME_IN
CLUSTER_ID: FILL_ME_IN
USERNAME: FILL_ME_IN
PASSWORD: FILL_ME_IN
postgres:
DATABASE_NAME: FILL_ME_IN
RDS_INSTANCE_ID: FILL_ME_IN
USERNAME: FILL_ME_IN
PASSWORD: FILL_ME_IN
REGION: FILL_ME_IN
mysql:
DATABASE_KEY:
HOST: FILL_ME_IN
USERNAME: FILL_ME_IN
PASSWORD: FILL_ME_IN" > ~/.dataduct/dataduct.cfg
- |
echo "
[distutils]
index-servers = pypi
[pypi]
repository: $PYPI_REPOSITORY
username: $PYPI_USERNAME
password: $PYPI_PASSWORD" > ~/.pypirc

test:
override:
- nosetests --with-coverage --cover-package=. --cover-erase
# post:
# - coveralls

deployment:
pypi:
branch: /.*/
commands:
- python setup.py sdist upload -r pypi
15 changes: 12 additions & 3 deletions dataduct/etl/etl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,17 @@ def create_base_objects(self):
else:
self.sns = self.create_pipeline_object(
object_class=SNSAlarm,
topic_arn=self.topic_arn,
topic_arn=self.topic_arn.replace('all:',''),
pipeline_name=self.name,
)
if self.frequency == 'on-demand':
scheduleType='ONDEMAND'
else:
scheduleType='cron'
self.default = self.create_pipeline_object(
object_class=DefaultObject,
pipeline_log_uri=self.s3_log_dir,
scheduleType=scheduleType
)

@property
Expand Down Expand Up @@ -476,8 +481,12 @@ def create_steps(self, steps_params, is_bootstrap=False,
'input_path' not in step_param:
step_param['input_node'] = input_node

if is_teardown:
step_param['sns_object'] = self.sns
if hasattr(self.sns,'fields'):
if self.topic_arn.startswith("all:"):
## Instead of just teardown set sns for every step so as to get SNS alerts with error stack trace
step_param['sns_object'] = self.sns
elif is_teardown:
step_param['sns_object'] = self.sns

try:
step_class = step_param.pop('step_class')
Expand Down
2 changes: 1 addition & 1 deletion dataduct/pipeline/data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def aws_format(self):
Returns:
result(list of dict): list of AWS-readable dict of all objects
"""
return [x.aws_format() for x in self.objects]
return [x.aws_format() for x in self.objects if hasattr(x,'fields')]

def add_object(self, pipeline_object):
"""Add an object to the datapipeline
Expand Down
52 changes: 32 additions & 20 deletions dataduct/pipeline/pipeline_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..s3 import S3Path
from ..utils.exceptions import ETLInputError


scheduleType = ''
class PipelineObject(object):
"""DataPipeline class with steps and metadata.

Expand Down Expand Up @@ -56,12 +56,15 @@ def s3_files(self):
Returns:
result(list of S3Files): List of files to be uploaded to s3
"""
result = self.additional_s3_files
for _, values in self.fields.iteritems():
for value in values:
if isinstance(value, S3File) or isinstance(value, S3Directory):
result.append(value)
return result
if hasattr(self,'additional_s3_files'):
result = self.additional_s3_files
for _, values in self.fields.iteritems():
for value in values:
if isinstance(value, S3File) or isinstance(value, S3Directory):
result.append(value)
return result
else:
return []

def __getitem__(self, key):
"""Fetch the items associated with a key
Expand Down Expand Up @@ -130,16 +133,25 @@ def aws_format(self):
result: The AWS-readable dict format of the object
"""
fields = []
for key, values in self.fields.iteritems():
for value in values:
if isinstance(value, PipelineObject):
fields.append({'key': key, 'refValue': value.id})
elif isinstance(value, S3Path):
fields.append({'key': key, 'stringValue': value.uri})
elif isinstance(value, S3File) or \
isinstance(value, S3Directory):
fields.append({'key': key,
'stringValue': value.s3_path.uri})
else:
fields.append({'key': key, 'stringValue': str(value)})
return {'id': self._id, 'name': self._id, 'fields': fields}
global scheduleType
if hasattr(self, 'fields'):
for key, values in self.fields.iteritems():
for value in values:
if isinstance(value, PipelineObject):
if scheduleType == 'ONDEMAND'and key == 'schedule' :
pass
else:
fields.append({'key': key, 'refValue': value.id})
elif isinstance(value, S3Path):
fields.append({'key': key, 'stringValue': value.uri})
elif isinstance(value, S3File) or \
isinstance(value, S3Directory):
fields.append({'key': key,
'stringValue': value.s3_path.uri})
else:
if key == 'scheduleType' and str(value) == 'ONDEMAND':
scheduleType = 'ONDEMAND'
fields.append({'key': key, 'stringValue': str(value)})
return {'id': self._id, 'name': self._id, 'fields': fields}
else:
return None
7 changes: 7 additions & 0 deletions dataduct/pipeline/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
'8-hours': ('8 hours', None),
'12-hours': ('12 hours', None),
'one-time': ('15 minutes', 1),
'on-demand': ('ondemand', None),
'30-min': ('30 minutes', None),
'15-min': ('15 minutes', None),
}


Expand All @@ -62,6 +65,10 @@ def __init__(self,
load_minutes(int): Minutes at which the pipeline should be run
**kwargs(optional): Keyword arguments directly passed to base class
"""
if frequency == 'on-demand':
logger.debug("On-demand schedule required so don't create schedule object")
return None

current_time = datetime.utcnow()
# Set the defaults for load hour and minutes
if load_minutes is None:
Expand Down
2 changes: 1 addition & 1 deletion dataduct/pipeline/sns_alarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self,
'Error Stack Trace: #{node.errorStackTrace}'
])

subject = 'Data Pipeline %s failed' % pipeline_name
subject = 'Data Pipeline %s #{node.@status}' % pipeline_name

if topic_arn is None:
topic_arn = SNS_TOPIC_ARN_FAILURE
Expand Down
16 changes: 16 additions & 0 deletions dataduct/s3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@
LARGE_FILE_LIMIT = 5000*1024*1024 # 5gb
PROGRESS_SECTIONS = 10

# ----------------------------------------------------------------
# Fix for ssl issue with where bucket name has dots in it

try:
import ssl
_old_match_hostname = ssl.match_hostname

def _new_match_hostname(cert, hostname):
if hostname.endswith('.s3.amazonaws.com'):
pos = hostname.find('.s3.amazonaws.com')
hostname = hostname[:pos].replace('.', '') + hostname[pos:]
return _old_match_hostname(cert, hostname)

ssl.match_hostname = _new_match_hostname
except Exception:
pass

def get_s3_bucket(bucket_name):
"""Returns an S3 bucket object from boto
Expand Down