Skip to content

Commit

Permalink
[EGGO-18] WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
laserson committed May 7, 2015
1 parent 4152060 commit a771591
Showing 1 changed file with 35 additions and 25 deletions.
60 changes: 35 additions & 25 deletions eggo/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ def edition_url(self, format='bdg', edition='basic'):
return os.path.join(eggo_config.get('core', 'eggo_base_url'),
self.config['name'], format, edition)

@classmethod
def dfs_tmp_data_url(cls):
def dfs_tmp_data_url(self):
return os.path.join(eggo_config.get('core', 'eggo_base_url'),
eggo_config.get('paths', 'dfs_tmp_data_prefix'),
cls.config['name'],
self.config['name'],
eggo_config.get('paths', 'random_id'))


Expand Down Expand Up @@ -139,10 +138,11 @@ def _dnload_to_local_upload_to_dfs(source, destination, compression):
# source: (string) URL suitable for curl
# destination: (string) full Hadoop path of destination file name
# compression: (bool) whether file needs to be decompressed
tmp_local_dir = mkdtemp(
try:
tmp_local_dir = mkdtemp(
prefix='tmp_eggo_',
dir=eggo_config.get('paths', 'worker_data_dir'))
try:
# 1. dnload file
dnload_cmd = 'pushd {tmp_local_dir} && curl -L -O {source} && popd'
p = Popen(dnload_cmd.format(tmp_local_dir=tmp_local_dir,
Expand All @@ -162,27 +162,37 @@ def _dnload_to_local_upload_to_dfs(source, destination, compression):
shell=True)
p.wait()

# 3. upload to tmp distributed filesystem location (e.g. S3)
tmp_staged_dir = os.path.join(ToastConfig.dfs_tmp_data_url(), 'staged')
upload_cmd = ('pushd {tmp_local_dir} && '
'{hadoop_home}/bin/hadoop fs -mkdir -p {tmp_dfs_dir} && '
'{hadoop_home}/bin/hadoop fs -put ./* {tmp_dfs_dir} && '
'popd')
p = Popen(upload_cmd.format(tmp_local_dir=tmp_local_dir,
hadoop_home=eggo_config.get('worker_env',
'hadoop_home'),
tmp_dfs_dir=tmp_staged_dir),
shell=True)
p.wait()
try:
# 3. upload to tmp distributed filesystem location (e.g. S3)
tmp_staged_dir = os.path.join(
eggo_config.get('core', 'eggo_base_url'),
eggo_config.get('paths', 'dfs_tmp_data_prefix'),
eggo_config.get('paths', 'random_id'),
'staged',
random_id())
upload_cmd = ('pushd {tmp_local_dir} && '
'{hadoop_home}/bin/hadoop fs -mkdir -p {tmp_dfs_dir} && '
'{hadoop_home}/bin/hadoop fs -put ./* {tmp_dfs_dir} && '
'popd')
p = Popen(upload_cmd.format(tmp_local_dir=tmp_local_dir,
hadoop_home=eggo_config.get('worker_env',
'hadoop_home'),
tmp_dfs_dir=tmp_staged_dir),
shell=True)
p.wait()

# 4. rename to final target location
rename_cmd = '{hadoop_home}/bin/hadoop fs -mv {tmp_path} {final_path}'
p = Popen(rename_cmd.format(tmp_path=tmp_staged_dir,
hadoop_home=eggo_config.get('worker_env',
'hadoop_home'),
final_path=destination),
shell=True)
p.wait()
# 4. rename to final target location
rename_cmd = '{hadoop_home}/bin/hadoop fs -mv {tmp_path} {final_path}'
p = Popen(rename_cmd.format(tmp_path=tmp_staged_dir,
hadoop_home=eggo_config.get('worker_env',
'hadoop_home'),
final_path=destination),
shell=True)
p.wait()
except:
raise
finally:
pass # TODO: clean up dfs tmp dir
except:
raise
finally:
Expand Down

0 comments on commit a771591

Please sign in to comment.