diff --git a/qiita_db/processing_job.py b/qiita_db/processing_job.py index 423c42f51..fd1d3ecec 100644 --- a/qiita_db/processing_job.py +++ b/qiita_db/processing_job.py @@ -1311,8 +1311,12 @@ def _complete_artifact_definition(self, artifact_data): data_type=data_type, name=job_params['name']) self._set_status('success') - if list(self.children): - self._update_and_launch_children({atype: artifact.id}) + # we need to update the children jobs to replace the input + # for the newly created artifact via the validator + for c in self.children: + self._helper_update_children({atype: artifact.id}) + + self._update_and_launch_children() def _complete_artifact_transformation(self, artifacts_data): """Performs the needed steps to complete an artifact transformation job @@ -1682,6 +1686,42 @@ def validator_jobs(self): for jid in qdb.sql_connection.TRN.execute_fetchflatten(): yield ProcessingJob(jid) + def _helper_update_children(self, new_map): + ready = [] + sql = """SELECT command_parameters, pending + FROM qiita.processing_job + WHERE processing_job_id = %s""" + sql_update = """UPDATE qiita.processing_job + SET command_parameters = %s, + pending = %s + WHERE processing_job_id = %s""" + sql_link = """INSERT INTO qiita.artifact_processing_job + (artifact_id, processing_job_id) + VALUES (%s, %s)""" + + for c in self.children: + qdb.sql_connection.TRN.add(sql, [c.id]) + params, pending = qdb.sql_connection.TRN.execute_fetchflatten() + for pname, out_name in pending[self.id].items(): + a_id = new_map[out_name] + params[pname] = str(a_id) + del pending[self.id] + # Link the input artifact with the child job + qdb.sql_connection.TRN.add(sql_link, [a_id, c.id]) + + # Force to insert a NULL in the DB if pending is empty + pending = pending if pending else None + qdb.sql_connection.TRN.add(sql_update, + [dumps(params), pending, c.id]) + qdb.sql_connection.TRN.execute() + + if pending is None: + # The child already has all the parameters + # Add it to the ready list + ready.append(c) + + return ready + def _update_children(self, mapping): """Updates the children of the current job to populate the input params @@ -1695,7 +1735,6 @@ def _update_children(self, mapping): list of qiita_db.processing_job.ProcessingJob The list of childrens that are ready to be submitted """ - ready = [] with qdb.sql_connection.TRN: sql = """SELECT command_output_id, name FROM qiita.command_output @@ -1705,37 +1744,7 @@ def _update_children(self, mapping): res = qdb.sql_connection.TRN.execute_fetchindex() new_map = {name: mapping[oid] for oid, name in res} - sql = """SELECT command_parameters, pending - FROM qiita.processing_job - WHERE processing_job_id = %s""" - sql_update = """UPDATE qiita.processing_job - SET command_parameters = %s, - pending = %s - WHERE processing_job_id = %s""" - sql_link = """INSERT INTO qiita.artifact_processing_job - (artifact_id, processing_job_id) - VALUES (%s, %s)""" - for c in self.children: - qdb.sql_connection.TRN.add(sql, [c.id]) - params, pending = qdb.sql_connection.TRN.execute_fetchflatten() - for pname, out_name in pending[self.id].items(): - a_id = new_map[out_name] - params[pname] = str(a_id) - del pending[self.id] - # Link the input artifact with the child job - qdb.sql_connection.TRN.add(sql_link, [a_id, c.id]) - - # Force to insert a NULL in the DB if pending is empty - pending = pending if pending else None - qdb.sql_connection.TRN.add(sql_update, - [dumps(params), pending, c.id]) - qdb.sql_connection.TRN.execute() - - if pending is None: - # The child already has all the parameters - # Add it to the ready list - ready.append(c) - return ready + return self._helper_update_children(new_map) def _update_and_launch_children(self, mapping): """Updates the children of the current job to populate the input params