diff --git a/qiita_db/handlers/artifact.py b/qiita_db/handlers/artifact.py index 5080bae46..03e69a090 100644 --- a/qiita_db/handlers/artifact.py +++ b/qiita_db/handlers/artifact.py @@ -267,6 +267,7 @@ def post(self): atype = self.get_argument('artifact_type') aname = self.get_argument('command_artifact_name', 'Name') files = self.get_argument('files') + add_default_workflow = self.get_argument('add_default_workflow', False) if job_id is None and prep_id is None: raise HTTPError( @@ -314,8 +315,18 @@ def post(self): values['template'] = prep_id cmd = qdb.software.Command.get_validator(atype) params = qdb.software.Parameters.load(cmd, values_dict=values) - new_job = PJ.create(user, params, True) - new_job.submit() + if add_default_workflow: + pwk = qdb.processing_job.ProcessingWorkflow.from_scratch( + user, params, name=f'ProcessingWorkflow for {job_id}') + # the new job is the first job in the workflow + new_job = list(pwk.graph.nodes())[0] + # adding default pipeline to the preparation + pt = qdb.metadata_template.prep_template.PrepTemplate(prep_id) + pt.add_default_workflow(user, pwk) + pwk.submit() + else: + new_job = PJ.create(user, params, True) + new_job.submit() r_client.set('prep_template_%d' % prep_id, dumps({'job_id': new_job.id, 'is_qiita_job': True})) diff --git a/qiita_db/handlers/tests/test_artifact.py b/qiita_db/handlers/tests/test_artifact.py index 009454b87..d826a5e28 100644 --- a/qiita_db/handlers/tests/test_artifact.py +++ b/qiita_db/handlers/tests/test_artifact.py @@ -411,6 +411,58 @@ def test_post(self): sleep(0.5) self.assertIsNotNone(new_prep.artifact) + def test_post_insert_artifact_and_add_default_processing(self): + # now let's test adding an artifact + default processing to a new + # preparation + new_prep = qdb.metadata_template.prep_template.PrepTemplate.create( + pd.DataFrame({'new_col': {'1.SKB1.640202': 1, + '1.SKD3.640198': 2, + '1.SKM4.640180': 3}}), + qdb.study.Study(1), '16S') + + # creating the fastq files to be added + fd, fp1 = mkstemp(suffix='_seqs.fastq') + close(fd) + self._clean_up_files.append(fp1) + with open(fp1, 'w') as f: + f.write("@HWI-ST753:189:D1385ACXX:1:1101:1214:1906 1:N:0:\n" + "NACGTAGGGTGCAAGCGTTGTCCGGAATNA\n" + "+\n" + "#1=DDFFFHHHHHJJJJJJJJJJJJGII#0\n") + + fd, fp2 = mkstemp(suffix='_barcodes.fastq') + close(fd) + self._clean_up_files.append(fp2) + with open(fp2, 'w') as f: + f.write("@HWI-ST753:189:D1385ACXX:1:1101:1214:1906 2:N:0:\n" + "NNNCNNNNNNNNN\n" + "+\n" + "#############\n") + + data = {'user_email': 'demo@microbio.me', + 'artifact_type': 'FASTQ', + 'prep_id': new_prep.id, + 'files': dumps([(fp1, 'raw_forward_seqs'), + (fp2, 'raw_barcodes')]), + 'add_default_workflow': False} + obs = self.post('/qiita_db/artifact/', headers=self.header, data=data) + self.assertEqual(obs.code, 200) + jid = loads(obs.body)['job_id'] + # if we got to this point, then we should have a job and that job + # should have children jobs (generated by the default workflow) + job = qdb.processing_job.ProcessingJob(jid) + children = [c.command.name for c in job.children] + grandchildren = [gc.command.name for c in job.children + for gc in c.children] + self.assertEqual('Validate', job.command.name) + self.assertEqual(['Split libraries FASTQ'], children) + self.assertEqual(['Pick closed-reference OTUs'], grandchildren) + + # just to avoid any tentative issues, let's wait for the main job to + # finish + while job.status not in ('error', 'success'): + sleep(0.5) + if __name__ == '__main__': main() diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index b23b6dc9f..cf38644eb 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -726,13 +726,15 @@ def modification_timestamp(self): def max_samples(): return qdb.util.max_preparation_samples() - def add_default_workflow(self, user): - """The modification timestamp of the prep information + def add_default_workflow(self, user, workflow=None): + """Adds the commands of the default workflow to this preparation Parameters ---------- user : qiita_db.user.User The user that requested to add the default workflows + workflow : qiita_db.processing_job.ProcessingWorkflow, optional + The workflow to add the default processing Returns ------- @@ -745,6 +747,13 @@ def add_default_workflow(self, user): a. If this preparation doesn't have valid workflows b. This preparation has been fully processed (no new steps needed) c. If there is no valid initial artifact to start the workflow + + Notes + ----- + This method adds the commands in a default workflow (definition) to + the preparation, if a workflow (object) is passed it will add the + commands to the last artifact in that workflow but if it's None it will + create a new workflow (default) """ # helper functions to avoid duplication of code @@ -806,9 +815,14 @@ def _get_predecessors(workflow, node): # workflow # 1. - prep_jobs = [j for c in self.artifact.descendants.nodes() - for j in c.jobs(show_hidden=True) - if j.command.software.type == 'artifact transformation'] + # let's assume that if there is a workflow, there are no jobs + if workflow is not None: + prep_jobs = [] + else: + prep_jobs = [j for c in self.artifact.descendants.nodes() + for j in c.jobs(show_hidden=True) + if j.command.software.type == + 'artifact transformation'] merging_schemes = { qdb.archive.Archive.get_merging_scheme_from_job(j): { x: y.id for x, y in j.outputs.items()} @@ -821,7 +835,14 @@ def _get_predecessors(workflow, node): # 2. pt_dt = self.data_type() - pt_artifact = self.artifact.artifact_type + # if there is a workflow, we would need to get the artifact_type from + # the job + if workflow is not None: + starting_job = list(workflow.graph.nodes())[0] + pt_artifact = starting_job.parameters.values['artifact_type'] + else: + starting_job = None + pt_artifact = self.artifact.artifact_type workflows = [wk for wk in qdb.software.DefaultWorkflow.iter() if wk.artifact_type == pt_artifact and pt_dt in wk.data_type] @@ -846,7 +867,6 @@ def _get_predecessors(workflow, node): raise ValueError('This preparation is complete') # 3. - workflow = None for wk, wk_data in missing_artifacts.items(): previous_jobs = dict() for ma, node in wk_data.items(): @@ -886,12 +906,20 @@ def _get_predecessors(workflow, node): cmds_to_create.append([pdp_cmd, params, reqp]) - init_artifacts = {wkartifact_type: self.artifact.id} + if starting_job is not None: + init_artifacts = { + wkartifact_type: f'{starting_job.id}:'} + else: + init_artifacts = {wkartifact_type: self.artifact.id} cmds_to_create.reverse() current_job = None for i, (cmd, params, rp) in enumerate(cmds_to_create): - previous_job = current_job + if starting_job is not None: + previous_job = starting_job + starting_job = None + else: + previous_job = current_job if previous_job is None: req_params = dict() for iname, dname in rp.items():