Skip to content

Commit

Permalink
add _helper_update_children
Browse files Browse the repository at this point in the history
  • Loading branch information
antgonza committed Sep 27, 2023
1 parent 6b9f1bd commit 1751948
Showing 1 changed file with 43 additions and 34 deletions.
77 changes: 43 additions & 34 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1311,8 +1311,12 @@ def _complete_artifact_definition(self, artifact_data):
data_type=data_type, name=job_params['name'])
self._set_status('success')

if list(self.children):
self._update_and_launch_children({atype: artifact.id})
# we need to update the children jobs to replace the input
# for the newly created artifact via the validator
for c in self.children:
self._helper_update_children({atype: artifact.id})

self._update_and_launch_children()

def _complete_artifact_transformation(self, artifacts_data):
"""Performs the needed steps to complete an artifact transformation job
Expand Down Expand Up @@ -1682,6 +1686,42 @@ def validator_jobs(self):
for jid in qdb.sql_connection.TRN.execute_fetchflatten():
yield ProcessingJob(jid)

def _helper_update_children(self, new_map):
ready = []
sql = """SELECT command_parameters, pending
FROM qiita.processing_job
WHERE processing_job_id = %s"""
sql_update = """UPDATE qiita.processing_job
SET command_parameters = %s,
pending = %s
WHERE processing_job_id = %s"""
sql_link = """INSERT INTO qiita.artifact_processing_job
(artifact_id, processing_job_id)
VALUES (%s, %s)"""

for c in self.children:
qdb.sql_connection.TRN.add(sql, [c.id])
params, pending = qdb.sql_connection.TRN.execute_fetchflatten()
for pname, out_name in pending[self.id].items():
a_id = new_map[out_name]
params[pname] = str(a_id)
del pending[self.id]
# Link the input artifact with the child job
qdb.sql_connection.TRN.add(sql_link, [a_id, c.id])

# Force to insert a NULL in the DB if pending is empty
pending = pending if pending else None
qdb.sql_connection.TRN.add(sql_update,
[dumps(params), pending, c.id])
qdb.sql_connection.TRN.execute()

if pending is None:
# The child already has all the parameters
# Add it to the ready list
ready.append(c)

return ready

def _update_children(self, mapping):
"""Updates the children of the current job to populate the input params
Expand All @@ -1695,7 +1735,6 @@ def _update_children(self, mapping):
list of qiita_db.processing_job.ProcessingJob
The list of childrens that are ready to be submitted
"""
ready = []
with qdb.sql_connection.TRN:
sql = """SELECT command_output_id, name
FROM qiita.command_output
Expand All @@ -1705,37 +1744,7 @@ def _update_children(self, mapping):
res = qdb.sql_connection.TRN.execute_fetchindex()
new_map = {name: mapping[oid] for oid, name in res}

sql = """SELECT command_parameters, pending
FROM qiita.processing_job
WHERE processing_job_id = %s"""
sql_update = """UPDATE qiita.processing_job
SET command_parameters = %s,
pending = %s
WHERE processing_job_id = %s"""
sql_link = """INSERT INTO qiita.artifact_processing_job
(artifact_id, processing_job_id)
VALUES (%s, %s)"""
for c in self.children:
qdb.sql_connection.TRN.add(sql, [c.id])
params, pending = qdb.sql_connection.TRN.execute_fetchflatten()
for pname, out_name in pending[self.id].items():
a_id = new_map[out_name]
params[pname] = str(a_id)
del pending[self.id]
# Link the input artifact with the child job
qdb.sql_connection.TRN.add(sql_link, [a_id, c.id])

# Force to insert a NULL in the DB if pending is empty
pending = pending if pending else None
qdb.sql_connection.TRN.add(sql_update,
[dumps(params), pending, c.id])
qdb.sql_connection.TRN.execute()

if pending is None:
# The child already has all the parameters
# Add it to the ready list
ready.append(c)
return ready
return self._helper_update_children(new_map)

def _update_and_launch_children(self, mapping):
"""Updates the children of the current job to populate the input params
Expand Down

0 comments on commit 1751948

Please sign in to comment.