Skip to content

Commit

Permalink
add default-workflow to future artifact - SPP (#3307)
Browse files Browse the repository at this point in the history
* add default-workflow to future artifact - SPP

* fix error

* current_job -> starting_job

* fix test

* addressing @charles-cowart comments
  • Loading branch information
antgonza authored Aug 4, 2023
1 parent 62511cc commit a412b45
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 11 deletions.
15 changes: 13 additions & 2 deletions qiita_db/handlers/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def post(self):
atype = self.get_argument('artifact_type')
aname = self.get_argument('command_artifact_name', 'Name')
files = self.get_argument('files')
add_default_workflow = self.get_argument('add_default_workflow', False)

if job_id is None and prep_id is None:
raise HTTPError(
Expand Down Expand Up @@ -314,8 +315,18 @@ def post(self):
values['template'] = prep_id
cmd = qdb.software.Command.get_validator(atype)
params = qdb.software.Parameters.load(cmd, values_dict=values)
new_job = PJ.create(user, params, True)
new_job.submit()
if add_default_workflow:
pwk = qdb.processing_job.ProcessingWorkflow.from_scratch(
user, params, name=f'ProcessingWorkflow for {job_id}')
# the new job is the first job in the workflow
new_job = list(pwk.graph.nodes())[0]
# adding default pipeline to the preparation
pt = qdb.metadata_template.prep_template.PrepTemplate(prep_id)
pt.add_default_workflow(user, pwk)
pwk.submit()
else:
new_job = PJ.create(user, params, True)
new_job.submit()

r_client.set('prep_template_%d' % prep_id,
dumps({'job_id': new_job.id, 'is_qiita_job': True}))
Expand Down
52 changes: 52 additions & 0 deletions qiita_db/handlers/tests/test_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,58 @@ def test_post(self):
sleep(0.5)
self.assertIsNotNone(new_prep.artifact)

def test_post_insert_artifact_and_add_default_processing(self):
# now let's test adding an artifact + default processing to a new
# preparation
new_prep = qdb.metadata_template.prep_template.PrepTemplate.create(
pd.DataFrame({'new_col': {'1.SKB1.640202': 1,
'1.SKD3.640198': 2,
'1.SKM4.640180': 3}}),
qdb.study.Study(1), '16S')

# creating the fastq files to be added
fd, fp1 = mkstemp(suffix='_seqs.fastq')
close(fd)
self._clean_up_files.append(fp1)
with open(fp1, 'w') as f:
f.write("@HWI-ST753:189:D1385ACXX:1:1101:1214:1906 1:N:0:\n"
"NACGTAGGGTGCAAGCGTTGTCCGGAATNA\n"
"+\n"
"#1=DDFFFHHHHHJJJJJJJJJJJJGII#0\n")

fd, fp2 = mkstemp(suffix='_barcodes.fastq')
close(fd)
self._clean_up_files.append(fp2)
with open(fp2, 'w') as f:
f.write("@HWI-ST753:189:D1385ACXX:1:1101:1214:1906 2:N:0:\n"
"NNNCNNNNNNNNN\n"
"+\n"
"#############\n")

data = {'user_email': '[email protected]',
'artifact_type': 'FASTQ',
'prep_id': new_prep.id,
'files': dumps([(fp1, 'raw_forward_seqs'),
(fp2, 'raw_barcodes')]),
'add_default_workflow': False}
obs = self.post('/qiita_db/artifact/', headers=self.header, data=data)
self.assertEqual(obs.code, 200)
jid = loads(obs.body)['job_id']
# if we got to this point, then we should have a job and that job
# should have children jobs (generated by the default workflow)
job = qdb.processing_job.ProcessingJob(jid)
children = [c.command.name for c in job.children]
grandchildren = [gc.command.name for c in job.children
for gc in c.children]
self.assertEqual('Validate', job.command.name)
self.assertEqual(['Split libraries FASTQ'], children)
self.assertEqual(['Pick closed-reference OTUs'], grandchildren)

# just to avoid any tentative issues, let's wait for the main job to
# finish
while job.status not in ('error', 'success'):
sleep(0.5)


if __name__ == '__main__':
main()
46 changes: 37 additions & 9 deletions qiita_db/metadata_template/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,13 +726,15 @@ def modification_timestamp(self):
def max_samples():
return qdb.util.max_preparation_samples()

def add_default_workflow(self, user):
"""The modification timestamp of the prep information
def add_default_workflow(self, user, workflow=None):
"""Adds the commands of the default workflow to this preparation
Parameters
----------
user : qiita_db.user.User
The user that requested to add the default workflows
workflow : qiita_db.processing_job.ProcessingWorkflow, optional
The workflow to add the default processing
Returns
-------
Expand All @@ -745,6 +747,13 @@ def add_default_workflow(self, user):
a. If this preparation doesn't have valid workflows
b. This preparation has been fully processed (no new steps needed)
c. If there is no valid initial artifact to start the workflow
Notes
-----
This method adds the commands in a default workflow (definition) to
the preparation, if a workflow (object) is passed it will add the
commands to the last artifact in that workflow but if it's None it will
create a new workflow (default)
"""
# helper functions to avoid duplication of code

Expand Down Expand Up @@ -806,9 +815,14 @@ def _get_predecessors(workflow, node):
# workflow

# 1.
prep_jobs = [j for c in self.artifact.descendants.nodes()
for j in c.jobs(show_hidden=True)
if j.command.software.type == 'artifact transformation']
# let's assume that if there is a workflow, there are no jobs
if workflow is not None:
prep_jobs = []
else:
prep_jobs = [j for c in self.artifact.descendants.nodes()
for j in c.jobs(show_hidden=True)
if j.command.software.type ==
'artifact transformation']
merging_schemes = {
qdb.archive.Archive.get_merging_scheme_from_job(j): {
x: y.id for x, y in j.outputs.items()}
Expand All @@ -821,7 +835,14 @@ def _get_predecessors(workflow, node):

# 2.
pt_dt = self.data_type()
pt_artifact = self.artifact.artifact_type
# if there is a workflow, we would need to get the artifact_type from
# the job
if workflow is not None:
starting_job = list(workflow.graph.nodes())[0]
pt_artifact = starting_job.parameters.values['artifact_type']
else:
starting_job = None
pt_artifact = self.artifact.artifact_type
workflows = [wk for wk in qdb.software.DefaultWorkflow.iter()
if wk.artifact_type == pt_artifact and
pt_dt in wk.data_type]
Expand All @@ -846,7 +867,6 @@ def _get_predecessors(workflow, node):
raise ValueError('This preparation is complete')

# 3.
workflow = None
for wk, wk_data in missing_artifacts.items():
previous_jobs = dict()
for ma, node in wk_data.items():
Expand Down Expand Up @@ -886,12 +906,20 @@ def _get_predecessors(workflow, node):

cmds_to_create.append([pdp_cmd, params, reqp])

init_artifacts = {wkartifact_type: self.artifact.id}
if starting_job is not None:
init_artifacts = {
wkartifact_type: f'{starting_job.id}:'}
else:
init_artifacts = {wkartifact_type: self.artifact.id}

cmds_to_create.reverse()
current_job = None
for i, (cmd, params, rp) in enumerate(cmds_to_create):
previous_job = current_job
if starting_job is not None:
previous_job = starting_job
starting_job = None
else:
previous_job = current_job
if previous_job is None:
req_params = dict()
for iname, dname in rp.items():
Expand Down

0 comments on commit a412b45

Please sign in to comment.