diff --git a/notebooks/resource-allocation/generate-allocation-summary-arrays.py b/notebooks/resource-allocation/generate-allocation-summary-arrays.py new file mode 100644 index 000000000..8bca68742 --- /dev/null +++ b/notebooks/resource-allocation/generate-allocation-summary-arrays.py @@ -0,0 +1,239 @@ +from qiita_core.util import MaxRSS_helper +from qiita_db.software import Software +import datetime +from io import StringIO +from subprocess import check_output +import pandas as pd +from os.path import join + +# This is an example script to collect the data we need from SLURM, the plan +# is that in the near future we will clean up and add these to the Qiita's main +# code and then have cronjobs to run them. + +# at time of writting we have: +# qp-spades spades +# (*) qp-woltka Woltka v0.1.4 +# qp-woltka SynDNA Woltka +# qp-woltka Calculate Cell Counts +# (*) qp-meta Sortmerna v2.1b +# (*) qp-fastp-minimap2 Adapter and host filtering v2023.12 +# ... and the admin plugin +# (*) qp-klp +# Here we are only going to create summaries for (*) + + +sacct = ['sacct', '-p', + '--format=JobName,JobID,ElapsedRaw,MaxRSS,ReqMem', '-j'] +# for the non admin jobs, we will use jobs from the last six months +six_months = datetime.date.today() - datetime.timedelta(weeks=6*4) + +print('The current "sofware - commands" that use job-arrays are:') +for s in Software.iter(): + if 'ENVIRONMENT="' in s.environment_script: + for c in s.commands: + print(f"{s.name} - {c.name}") + +# 1. Command: woltka + +fn = join('/panfs', 'qiita', 'jobs_woltka.tsv.gz') +print(f"Generating the summary for the woltka jobs: {fn}.") + +cmds = [c for s in Software.iter(False) + if 'woltka' in s.name for c in s.commands] +jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and + j.heartbeat.date() > six_months and j.input_artifacts] + +data = [] +for j in jobs: + size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths]) + jid, mjid = j.external_id.strip().split() + rvals = StringIO(check_output(sacct + [jid]).decode('ascii')) + _d = pd.read_csv(rvals, sep='|') + jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str + else MaxRSS_helper(x)).max() + jwt = _d.ElapsedRaw.max() + + rvals = StringIO(check_output(sacct + [mjid]).decode('ascii')) + _d = pd.read_csv(rvals, sep='|') + mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str + else MaxRSS_helper(x)).max() + mwt = _d.ElapsedRaw.max() + + data.append({ + 'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main', + 'db': j.parameters.values['Database'].split('/')[-1]}) + data.append( + {'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge', + 'db': j.parameters.values['Database'].split('/')[-1]}) +df = pd.DataFrame(data) +df.to_csv(fn, sep='\t', index=False) + +# 2. qp-meta Sortmerna + +fn = join('/panfs', 'qiita', 'jobs_sortmerna.tsv.gz') +print(f"Generating the summary for the woltka jobs: {fn}.") + +# for woltka we will only use jobs from the last 6 months +cmds = [c for s in Software.iter(False) + if 'minimap2' in s.name.lower() for c in s.commands] +jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and + j.heartbeat.date() > six_months and j.input_artifacts] + +data = [] +for j in jobs: + size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths]) + jid, mjid = j.external_id.strip().split() + rvals = StringIO(check_output(sacct + [jid]).decode('ascii')) + _d = pd.read_csv(rvals, sep='|') + jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str + else MaxRSS_helper(x)).max() + jwt = _d.ElapsedRaw.max() + + rvals = StringIO(check_output(sacct + [mjid]).decode('ascii')) + _d = pd.read_csv(rvals, sep='|') + mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str + else MaxRSS_helper(x)).max() + mwt = _d.ElapsedRaw.max() + + data.append({ + 'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main'}) + data.append( + {'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge'}) +df = pd.DataFrame(data) +df.to_csv(fn, sep='\t', index=False) + + +# 3. Adapter and host filtering. Note that there is a new version deployed on +# Jan 2024 so the current results will not be the most accurate + +fn = join('/panfs', 'qiita', 'jobs_adapter_host.tsv.gz') +print(f"Generating the summary for the woltka jobs: {fn}.") + +# for woltka we will only use jobs from the last 6 months +cmds = [c for s in Software.iter(False) + if 'meta' in s.name.lower() for c in s.commands] +jobs = [j for c in cmds if 'sortmerna' in c.name.lower() + for j in c.processing_jobs if j.status == 'success' and + j.heartbeat.date() > six_months and j.input_artifacts] + +data = [] +for j in jobs: + size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths]) + jid, mjid = j.external_id.strip().split() + rvals = StringIO(check_output(sacct + [jid]).decode('ascii')) + _d = pd.read_csv(rvals, sep='|') + jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str + else MaxRSS_helper(x)).max() + jwt = _d.ElapsedRaw.max() + + rvals = StringIO(check_output(sacct + [mjid]).decode('ascii')) + _d = pd.read_csv(rvals, sep='|') + mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str + else MaxRSS_helper(x)).max() + mwt = _d.ElapsedRaw.max() + + data.append({ + 'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main'}) + data.append( + {'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge'}) +df = pd.DataFrame(data) +df.to_csv(fn, sep='\t', index=False) + + +# 4. The SPP! + +fn = join('/panfs', 'qiita', 'jobs_spp.tsv.gz') +print(f"Generating the summary for the SPP jobs: {fn}.") + +# for the SPP we will look at jobs from the last year +year = datetime.date.today() - datetime.timedelta(days=365) +cmds = [c for s in Software.iter(False) + if s.name == 'qp-klp' for c in s.commands] +jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and + j.heartbeat.date() > year] + +# for the SPP we need to find the jobs that were actually run, this means +# looping throught the existing slurm jobs and finding them +max_inter = 2000 + +data = [] +for job in jobs: + jei = int(job.external_id) + rvals = StringIO( + check_output(sacct + [str(jei)]).decode('ascii')) + _d = pd.read_csv(rvals, sep='|') + mem = _d.MaxRSS.apply( + lambda x: x if type(x) is not str else MaxRSS_helper(x)).max() + wt = _d.ElapsedRaw.max() + # the current "easy" way to determine if amplicon or other is to check + # the file extension of the filename + stype = 'other' + if job.parameters.values['sample_sheet']['filename'].endswith('.txt'): + stype = 'amplicon' + rid = job.parameters.values['run_identifier'] + data.append( + {'jid': job.id, 'sjid': jei, 'mem': mem, 'stype': stype, 'wt': wt, + 'type': 'main', 'rid': rid, 'name': _d.JobName[0]}) + + # let's look for the convert job + for jid in range(jei + 1, jei + max_inter): + rvals = StringIO(check_output(sacct + [str(jid)]).decode('ascii')) + _d = pd.read_csv(rvals, sep='|') + if [1 for x in _d.JobName.values if x.startswith(job.id)]: + cjid = int(_d.JobID[0]) + mem = _d.MaxRSS.apply( + lambda x: x if type(x) is not str else MaxRSS_helper(x)).max() + wt = _d.ElapsedRaw.max() + + data.append( + {'jid': job.id, 'sjid': cjid, 'mem': mem, 'stype': stype, + 'wt': wt, 'type': 'convert', 'rid': rid, + 'name': _d.JobName[0]}) + + # now let's look for the next step, if amplicon that's fastqc but + # if other that's qc/nuqc + for jid in range(cjid + 1, cjid + max_inter): + rvals = StringIO( + check_output(sacct + [str(jid)]).decode('ascii')) + _d = pd.read_csv(rvals, sep='|') + if [1 for x in _d.JobName.values if x.startswith(job.id)]: + qc_jid = _d.JobIDRaw.apply( + lambda x: int(x.split('.')[0])).max() + qcmem = _d.MaxRSS.apply( + lambda x: x if type(x) is not str + else MaxRSS_helper(x)).max() + qcwt = _d.ElapsedRaw.max() + + if stype == 'amplicon': + data.append( + {'jid': job.id, 'sjid': qc_jid, 'mem': qcmem, + 'stype': stype, 'wt': qcwt, 'type': 'fastqc', + 'rid': rid, 'name': _d.JobName[0]}) + else: + data.append( + {'jid': job.id, 'sjid': qc_jid, 'mem': qcmem, + 'stype': stype, 'wt': qcwt, 'type': 'qc', + 'rid': rid, 'name': _d.JobName[0]}) + for jid in range(qc_jid + 1, qc_jid + max_inter): + rvals = StringIO(check_output( + sacct + [str(jid)]).decode('ascii')) + _d = pd.read_csv(rvals, sep='|') + if [1 for x in _d.JobName.values if x.startswith( + job.id)]: + fqc_jid = _d.JobIDRaw.apply( + lambda x: int(x.split('.')[0])).max() + fqcmem = _d.MaxRSS.apply( + lambda x: x if type(x) is not str + else MaxRSS_helper(x)).max() + fqcwt = _d.ElapsedRaw.max() + data.append( + {'jid': job.id, 'sjid': fqc_jid, + 'mem': fqcmem, 'stype': stype, + 'wt': fqcwt, 'type': 'fastqc', + 'rid': rid, 'name': _d.JobName[0]}) + break + break + break + +df = pd.DataFrame(data) +df.to_csv(fn, sep='\t', index=False) diff --git a/notebooks/resource-allocation/generate-allocation-summary.py b/notebooks/resource-allocation/generate-allocation-summary.py index e081a5d12..7c8634293 100644 --- a/notebooks/resource-allocation/generate-allocation-summary.py +++ b/notebooks/resource-allocation/generate-allocation-summary.py @@ -5,6 +5,7 @@ from json import loads from os.path import join +from qiita_core.util import MaxRSS_helper from qiita_db.exceptions import QiitaDBUnknownIDError from qiita_db.processing_job import ProcessingJob from qiita_db.software import Software @@ -117,19 +118,8 @@ print('Make sure that only 0/K/M exist', set( df.MaxRSS.apply(lambda x: str(x)[-1]))) - -def _helper(x): - if x[-1] == 'K': - y = float(x[:-1]) * 1000 - elif x[-1] == 'M': - y = float(x[:-1]) * 1000000 - else: - y = float(x) - return y - - # Generating new columns -df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: _helper(str(x))) +df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x))) df['ElapsedRawTime'] = df.ElapsedRaw.apply( lambda x: timedelta(seconds=float(x))) diff --git a/qiita_core/tests/test_util.py b/qiita_core/tests/test_util.py index a3fef6942..dd33e902c 100644 --- a/qiita_core/tests/test_util.py +++ b/qiita_core/tests/test_util.py @@ -10,7 +10,7 @@ from qiita_core.util import ( qiita_test_checker, execute_as_transaction, get_qiita_version, - is_test_environment, get_release_info) + is_test_environment, get_release_info, MaxRSS_helper) from qiita_db.meta_util import ( generate_biom_and_metadata_release, generate_plugin_releases) import qiita_db as qdb @@ -82,6 +82,20 @@ def test_get_release_info(self): self.assertEqual(biom_metadata_release, ('', '', '')) self.assertNotEqual(archive_release, ('', '', '')) + def test_MaxRSS_helper(self): + tests = [ + ('6', 6.0), + ('6K', 6000), + ('6M', 6000000), + ('6G', 6000000000), + ('6.9', 6.9), + ('6.9K', 6900), + ('6.9M', 6900000), + ('6.9G', 6900000000), + ] + for x, y in tests: + self.assertEqual(MaxRSS_helper(x), y) + if __name__ == '__main__': main() diff --git a/qiita_core/util.py b/qiita_core/util.py index b3f6b4142..9692f210b 100644 --- a/qiita_core/util.py +++ b/qiita_core/util.py @@ -151,3 +151,15 @@ def get_release_info(study_status='public'): archive_release = ((md5sum, filepath, timestamp)) return (biom_metadata_release, archive_release) + + +def MaxRSS_helper(x): + if x[-1] == 'K': + y = float(x[:-1]) * 1000 + elif x[-1] == 'M': + y = float(x[:-1]) * 1000000 + elif x[-1] == 'G': + y = float(x[:-1]) * 1000000000 + else: + y = float(x) + return y diff --git a/qiita_ware/commands.py b/qiita_ware/commands.py index 83efcf5a4..249864ebe 100644 --- a/qiita_ware/commands.py +++ b/qiita_ware/commands.py @@ -215,40 +215,64 @@ def submit_EBI(artifact_id, action, send, test=False, test_size=False): LogEntry.create( 'Runtime', 'The submission: %d is larger than allowed (%d), will ' 'try to fix: %d' % (artifact_id, max_size, total_size)) - # transform current metadata to dataframe for easier curation - rows = {k: dict(v) for k, v in ebi_submission.samples.items()} - df = pd.DataFrame.from_dict(rows, orient='index') - # remove unique columns and same value in all columns - nunique = df.apply(pd.Series.nunique) - nsamples = len(df.index) - cols_to_drop = set( - nunique[(nunique == 1) | (nunique == nsamples)].index) - # maximize deletion by removing also columns that are almost all the - # same or almost all unique - cols_to_drop = set( - nunique[(nunique <= int(nsamples * .01)) | - (nunique >= int(nsamples * .5))].index) - cols_to_drop = cols_to_drop - {'taxon_id', 'scientific_name', - 'description', 'country', - 'collection_date'} - all_samples = ebi_submission.sample_template.ebi_sample_accessions - samples = [k for k in ebi_submission.samples if all_samples[k] is None] - if samples: - ebi_submission.write_xml_file( - ebi_submission.generate_sample_xml(samples, cols_to_drop), - ebi_submission.sample_xml_fp) + def _reduce_metadata(low=0.01, high=0.5): + # helper function to + # transform current metadata to dataframe for easier curation + rows = {k: dict(v) for k, v in ebi_submission.samples.items()} + df = pd.DataFrame.from_dict(rows, orient='index') + # remove unique columns and same value in all columns + nunique = df.apply(pd.Series.nunique) + nsamples = len(df.index) + cols_to_drop = set( + nunique[(nunique == 1) | (nunique == nsamples)].index) + # maximize deletion by removing also columns that are almost all + # the same or almost all unique + cols_to_drop = set( + nunique[(nunique <= int(nsamples * low)) | + (nunique >= int(nsamples * high))].index) + cols_to_drop = cols_to_drop - {'taxon_id', 'scientific_name', + 'description', 'country', + 'collection_date'} + all_samples = ebi_submission.sample_template.ebi_sample_accessions + + if action == 'ADD': + samples = [k for k in ebi_submission.samples + if all_samples[k] is None] + else: + samples = [k for k in ebi_submission.samples + if all_samples[k] is not None] + if samples: + ebi_submission.write_xml_file( + ebi_submission.generate_sample_xml(samples, cols_to_drop), + ebi_submission.sample_xml_fp) + + # let's try with the default pameters + _reduce_metadata() # now let's recalculate the size to make sure it's fine new_total_size = sum([stat(tr).st_size for tr in to_review if tr is not None]) LogEntry.create( - 'Runtime', 'The submission: %d after cleaning is %d and was %d' % ( + 'Runtime', + 'The submission: %d after defaul cleaning is %d and was %d' % ( artifact_id, total_size, new_total_size)) if new_total_size > max_size: - raise ComputeError( - 'Even after cleaning the submission: %d is too large. Before ' - 'cleaning: %d, after: %d' % ( + LogEntry.create( + 'Runtime', 'Submission %d still too big, will try more ' + 'stringent parameters' % (artifact_id)) + + _reduce_metadata(0.05, 0.4) + new_total_size = sum([stat(tr).st_size + for tr in to_review if tr is not None]) + LogEntry.create( + 'Runtime', + 'The submission: %d after defaul cleaning is %d and was %d' % ( artifact_id, total_size, new_total_size)) + if new_total_size > max_size: + raise ComputeError( + 'Even after cleaning the submission: %d is too large. ' + 'Before cleaning: %d, after: %d' % ( + artifact_id, total_size, new_total_size)) st_acc, sa_acc, bio_acc, ex_acc, run_acc = None, None, None, None, None if send: