From f87638937c35d4175269a3e0831321f1ac0d9af5 Mon Sep 17 00:00:00 2001 From: buehlere Date: Wed, 21 Aug 2024 16:43:05 -0400 Subject: [PATCH 01/28] upading access get_request_id function adding the ability to specify an app --- runner/operator/access/__init__.py | 6 +++--- runner/operator/access/v1_0_0/cnv/__init__.py | 2 +- runner/operator/access/v1_0_0/legacy/__init__.py | 2 +- runner/operator/access/v1_0_0/msi/__init__.py | 2 +- runner/operator/access/v1_0_0/snps_and_indels/__init__.py | 2 +- .../operator/access/v1_0_0/structural_variants/__init__.py | 2 +- runner/operator/access/v2_1_0/nucleovar/__init__.py | 0 7 files changed, 8 insertions(+), 8 deletions(-) create mode 100644 runner/operator/access/v2_1_0/nucleovar/__init__.py diff --git a/runner/operator/access/__init__.py b/runner/operator/access/__init__.py index 2ffa3f994..f71fa6e7f 100644 --- a/runner/operator/access/__init__.py +++ b/runner/operator/access/__init__.py @@ -29,7 +29,7 @@ def get_request_id(run_ids, request_id=None): raise Exception("Could not get find request id") -def get_request_id_runs(request_id): +def get_request_id_runs(request_id, app): """ Get the latest completed bam-generation runs for the given request ID @@ -39,7 +39,7 @@ def get_request_id_runs(request_id): operator_run_id = ( Run.objects.filter( tags__igoRequestId=request_id, - app__name__in=["access legacy", "access nucleo"], + app__name__in=app, operator_run__status=RunStatus.COMPLETED, ) .exclude(finished_date__isnull=True) @@ -49,7 +49,7 @@ def get_request_id_runs(request_id): ) request_id_runs = Run.objects.filter( - operator_run_id=operator_run_id, app__name__in=["access legacy", "access nucleo"], status=RunStatus.COMPLETED + operator_run_id=operator_run_id, app__name__in=app, status=RunStatus.COMPLETED ) return request_id_runs diff --git a/runner/operator/access/v1_0_0/cnv/__init__.py b/runner/operator/access/v1_0_0/cnv/__init__.py index 46d930b89..5327fcd88 100644 --- a/runner/operator/access/v1_0_0/cnv/__init__.py +++ b/runner/operator/access/v1_0_0/cnv/__init__.py @@ -43,7 +43,7 @@ def get_sample_inputs(self): :return: list of json_objects """ - run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id)] + run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id, ["access legacy", "access nucleo"])] # Get all unfiltered bam ports for these runs unfiltered_bam_ports = Port.objects.filter( diff --git a/runner/operator/access/v1_0_0/legacy/__init__.py b/runner/operator/access/v1_0_0/legacy/__init__.py index bf7ae3b28..23a7cc51d 100644 --- a/runner/operator/access/v1_0_0/legacy/__init__.py +++ b/runner/operator/access/v1_0_0/legacy/__init__.py @@ -235,7 +235,7 @@ def construct_sample_inputs(samples, request_id, group_id): class AccessLegacyOperator(Operator): def get_jobs(self): - run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id)] + run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id, ["access legacy", "access nucleo"])] ports = Port.objects.filter(run_id__in=run_ids, port_type=PortType.OUTPUT) data = [ diff --git a/runner/operator/access/v1_0_0/msi/__init__.py b/runner/operator/access/v1_0_0/msi/__init__.py index c9e5cb71d..796a346a1 100644 --- a/runner/operator/access/v1_0_0/msi/__init__.py +++ b/runner/operator/access/v1_0_0/msi/__init__.py @@ -51,7 +51,7 @@ def get_sample_inputs(self): :return: list of json_objects """ - run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id)] + run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id, ["access legacy", "access nucleo"])] # Get all standard bam ports for these runs standard_bam_ports = Port.objects.filter( diff --git a/runner/operator/access/v1_0_0/snps_and_indels/__init__.py b/runner/operator/access/v1_0_0/snps_and_indels/__init__.py index 274835e96..e24275695 100644 --- a/runner/operator/access/v1_0_0/snps_and_indels/__init__.py +++ b/runner/operator/access/v1_0_0/snps_and_indels/__init__.py @@ -57,7 +57,7 @@ def get_sample_inputs(self): :return: list of json_objects """ - run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id)] + run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id, ["access legacy", "access nucleo"])] # Get all duplex / simplex bam ports for these runs duplex_output_ports = Port.objects.filter( diff --git a/runner/operator/access/v1_0_0/structural_variants/__init__.py b/runner/operator/access/v1_0_0/structural_variants/__init__.py index 77e04b05f..7ae407b50 100644 --- a/runner/operator/access/v1_0_0/structural_variants/__init__.py +++ b/runner/operator/access/v1_0_0/structural_variants/__init__.py @@ -41,7 +41,7 @@ def get_sample_inputs(self): :return: list of json_objects """ - run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id)] + run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id, ["access legacy", "access nucleo"])] # Get all standard bam ports for these runs standard_bam_ports = Port.objects.filter( diff --git a/runner/operator/access/v2_1_0/nucleovar/__init__.py b/runner/operator/access/v2_1_0/nucleovar/__init__.py new file mode 100644 index 000000000..e69de29bb From 903c14fa910b2ff8eab6fb398f8a40425fb48471 Mon Sep 17 00:00:00 2001 From: buehlere Date: Wed, 21 Aug 2024 16:43:37 -0400 Subject: [PATCH 02/28] remove un-necessary functions --- runner/operator/manifest/access_manifest_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/operator/manifest/access_manifest_operator.py b/runner/operator/manifest/access_manifest_operator.py index 0e3de487a..94e0c84b3 100644 --- a/runner/operator/manifest/access_manifest_operator.py +++ b/runner/operator/manifest/access_manifest_operator.py @@ -11,7 +11,7 @@ from file_system.models import File, FileGroup, FileType from file_system.repository import FileRepository from file_system.helper.access_helper import CmoDMPManifest -from runner.operator.access import get_request_id, get_request_id_runs, create_cwl_file_object +from runner.operator.access import get_request_id import csv logger = logging.getLogger(__name__) From 676fa01dfcba26b2ac44bf1aa93eb31729937de7 Mon Sep 17 00:00:00 2001 From: buehlere Date: Tue, 10 Sep 2024 09:19:42 -0400 Subject: [PATCH 03/28] partial update, not functional --- .../v2_1_0/nucleovar/access_nucleovar.py | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py new file mode 100644 index 000000000..a50b35110 --- /dev/null +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -0,0 +1,188 @@ +import copy +import os +import csv +import logging +from django.db.models import Q +from django.conf import settings +from beagle import __version__ +from datetime import datetime +from file_system.repository.file_repository import FileRepository +from runner.operator.operator import Operator +from runner.models import Pipeline +import runner.operator.chronos_operator.bin.tempo_patient as patient_obj +from notifier.models import JobGroup +from notifier.events import OperatorRequestEvent, ChronosMissingSamplesEvent +from notifier.tasks import send_notification +from runner.run.objects.run_creator_object import RunCreator +from file_system.models import File +from runner.models import Port, RunStatus +from file_system.models import FileMetadata +from runner.models import RunStatus, Port, Run +import json + +WORKDIR = os.path.dirname(os.path.abspath(__file__)) +LOGGER = logging.getLogger(__name__) +ACCESS_CURATED_BAMS_FILE_GROUP_SLUG = "access_curated_normals" +ACCESS_DEFAULT_NORMAL_ID = "DONOR22-TP" +ACCESS_DEFAULT_NORMAL_FILENAME = "DONOR22-TP_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam" +NORMAL_SAMPLE_SEARCH = "-N0" +TUMOR_SAMPLE_SEARCH = "-L0" +DUPLEX_BAM_SEARCH = "__aln_srt_IR_FX-duplex.bam" +SIMPLEX_BAM_SEARCH = "__aln_srt_IR_FX-simplex.bam" +UNFILTERED_BAM_SEARCH = "__aln_srt_IR_FX.bam" +DMP_DUPLEX_REGEX = "-duplex.bam" +DMP_SIMPLEX_REGEX = "-simplex.bam" +DUPLEX_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam" +SIMPLEX_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam" +class NucleoVarOperator(Operator): + + def find_request_bams(run): + """ + Find simplex and duplex bams from a request's nucleo run + - run_ids: run_ids from a request's nucleo run + + :return: list of paired simplex and duplex bams and normal bam + """ + nucleo_output_port_names = [ + "uncollapsed_bam", + "fgbio_group_reads_by_umi_bam", + "fgbio_collapsed_bam", + "fgbio_filter_consensus_reads_duplex_bam", + "fgbio_postprocessing_simplex_bam", + ] + bams = {} + for o in nucleo_output_port_names: + # We are running a multi-sample workflow on just one sample, + # so we create single-element lists here + bam = parse_nucleo_output_ports(run, o) + bams[o] = bam + + return bams + + def parse_nucleo_output_ports(run, port_name): + bam_bai = Port.objects.get(name=port_name, run=run.pk) + if not len(bam_bai.files.all()) in [1, 2]: + raise Exception("Port {} for run {} should have just 1 bam or 1 (bam/bai) pair".format(port_name, run.id)) + bam = [b for b in bam_bai.files.all() if b.file_name.endswith(".bam")][0] + return bam + + def is_tumor_bam(file): + if not file.endswith(".bam"): + return False + t_n_timepoint = file.split("-")[2] + return not t_n_timepoint[0] == "N" + + def find_request_bams(run): + """ + Find simplex and duplex bams from a request's nucleo run + - run_ids: run_ids from a request's nucleo run + + :return: list of paired simplex and duplex bams and normal bam + """ + nucleo_output_port_names = [ + "uncollapsed_bam", + "fgbio_group_reads_by_umi_bam", + "fgbio_collapsed_bam", + "fgbio_filter_consensus_reads_duplex_bam", + "fgbio_postprocessing_simplex_bam", + ] + bams = {} + for o in nucleo_output_port_names: + # We are running a multi-sample workflow on just one sample, + # so we create single-element lists here + bam = self.parse_nucleo_output_ports(run, o) + bams[o] = bam + + return bams + + + def find_curated_normal_bams(): + """ + Find curated normal bams from access curated bam file group + + :return: list of curated normal bams + """ + def split_duplex_simplex(files): + ''' + given a list split file paths using -simplex and -duplex root + :param files: a list of simplex/duplex file path + :return: two lists of file paths: one for simplex, one for duplex + ''' + simplex = [] + duplex = [] + for f in files: + if f.file_name.endswith('-simplex.bam'): + simplex.append(f) + if f.file_name.endswith('-duplex.bam'): + duplex.append(f) + else: + ValueError('Expecting a list of duplex and simplex bams') + return duplex, simplex + + def make_pairs(d, s): + paired = [] + for di in d: + for si in s: + if di.file_name.rstrip("-duplex.bam") == si.file_name.rstrip("-simplex.bam"): + paired.append((di, si)) + return paired + # Cache a set of fillout bams from this request for genotyping (we only need to do this query once) + curated_normals_metadata = FileMetadata.objects.filter( + file__file_group__slug=ACCESS_CURATED_BAMS_FILE_GROUP_SLUG + ) + curated_normal_bams = [f.file for f in curated_normals_metadata] + d,s = split_duplex_simplex(curated_normal_bams) + curated_normal_bams = make_pairs(d, s) + return curated_normal_bams + + + def get_jobs(self): + """ + get_job information tor run NucleoVar Pipeline + - self: NucleoVarOperator(Operator) + + :return: return RunCreator Objects with NucleoVar information + """ + # Run Information + LOGGER.info("Operator JobGroupNotifer ID %s", self.job_group_notifier_id) + app = self.get_pipeline_id() + pipeline = Pipeline.objects.get(id=app) + output_directory = pipeline.output_directory + run_date = datetime.now().strftime("%Y%m%d_%H:%M:%f") + # If no request_id, start happens from runs chaining + # else manual start + if not self.request_id: + most_recent_runs_for_request = Run.objects.filter(pk__in=self.run_ids) + self.request_id = most_recent_runs_for_request[0].tags["igoRequestId"] + else: + runs = self.get_request_id_runs(self.request_id, ["access v2 nucleo"]) + # Get Bams generate by Access V2 Nucleo + bams = [] + for run in runs: + bams.append(self.find_request_bams(run)) + + # TUMOR + tumor_bams = [(b['fgbio_filter_consensus_reads_duplex_bam'], b['fgbio_postprocessing_simplex_bam']) for b in bams if self.is_tumor_bam(b['fgbio_postprocessing_duplex_bam'].file_name)] + + # FILLOUT NORMAL AND TUMOR + fillout_simplex_tumors = [b['fgbio_postprocessing_simplex_bam'] for b in bams if self.is_tumor_bam(b['fgbio_postprocessing_simplex_bam'].file_name)] + fillout_duplex_tumors = [b['fgbio_filter_consensus_reads_duplex_bam'] for b in bams if self.is_tumor_bam(b['fgbio_postprocessing_simplex_bam'].file_name)] + fillout_unfiltered_normals = [b['uncollapsed_bam'] for b in bams if not self.is_tumor_bam(b['uncollapsed_bam'].file_name)] + + # NORMAL BAM + normal_bam = File.objects.filter(file_name=ACCESS_DEFAULT_NORMAL_FILENAME)[0] + + # CURATED NORMAL + curated_normal_bams = self.find_curated_normal_bams() + + # SAMPLE INFO + sample_infos = [] + tumor_bams = tumor_bams[0:1] + for d, s in tumor_bams: + sample_info = self.create_sample_info(d, s, bams) + sample_info["normal_bam"] = [normal_bam] + sample_info["tumor_bam"] = [(d, s)] + sample_infos.append(sample_info) + + return [RunCreator(**job) for job in jobs] + \ No newline at end of file From b8f46d97773ddf4bcaeb2ad8b4f3c9bb085bd220 Mon Sep 17 00:00:00 2001 From: buehlere Date: Wed, 11 Sep 2024 13:31:22 -0400 Subject: [PATCH 04/28] init testing version --- .../access/v2_1_0/nucleovar/__init__.py | 1 + .../v2_1_0/nucleovar/access_nucleovar.py | 596 ++++++++++++++++-- 2 files changed, 532 insertions(+), 65 deletions(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/__init__.py b/runner/operator/access/v2_1_0/nucleovar/__init__.py index e69de29bb..98e9f24dd 100644 --- a/runner/operator/access/v2_1_0/nucleovar/__init__.py +++ b/runner/operator/access/v2_1_0/nucleovar/__init__.py @@ -0,0 +1 @@ +from .access_nucleovar import NucleoVarOperator \ No newline at end of file diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index a50b35110..ad27b4ef7 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -32,45 +32,391 @@ UNFILTERED_BAM_SEARCH = "__aln_srt_IR_FX.bam" DMP_DUPLEX_REGEX = "-duplex.bam" DMP_SIMPLEX_REGEX = "-simplex.bam" +DMP_UNFILTERED_REGEX = "-unfilter.bam" +DMP_REGEX = "-standard.bam" +IGO_UNFILTERED_REGEX = "__aln_srt_IR_FX.bam" +ACCESS_ASSAY_HC = "HC_ACCESS" +DMP_IMPACT_ASSAYS = ["IMPACT341", "IMPACT410", "IMPACT468", "hemepact_v4"] +DMP_FILE_GROUP = "d4775633-f53f-412f-afa5-46e9a86b654b" DUPLEX_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam" -SIMPLEX_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam" -class NucleoVarOperator(Operator): +SIMPLEX_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-simplex.bam" +UNCOLLAPSED_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR" - def find_request_bams(run): - """ - Find simplex and duplex bams from a request's nucleo run - - run_ids: run_ids from a request's nucleo run - :return: list of paired simplex and duplex bams and normal bam - """ - nucleo_output_port_names = [ - "uncollapsed_bam", - "fgbio_group_reads_by_umi_bam", - "fgbio_collapsed_bam", - "fgbio_filter_consensus_reads_duplex_bam", - "fgbio_postprocessing_simplex_bam", - ] - bams = {} - for o in nucleo_output_port_names: - # We are running a multi-sample workflow on just one sample, - # so we create single-element lists here - bam = parse_nucleo_output_ports(run, o) - bams[o] = bam + +def _create_cwl_bam_object(bam): + """ + Util function to create a simple CWL File object from a bam with a path attribute + + :param bam: + :return: + """ + return {"class": "File", "location": "juno://" + bam} + + +def _remove_normal_dups( + geno_samples_normal_unfiltered, + geno_samples_normal_unfiltered_sample_ids, + capture_samples_duplex_sample_ids, +): + """ + Make sure none of the normals are already present in duplex genotyping samples (GBCMS can not have + duplicate sample IDs) + + :param geno_samples_normal_unfiltered: + :param geno_samples_normal_unfiltered_sample_ids: + :param capture_samples_duplex_sample_ids: + :return: + """ + deduped = [] + deduped_ids = [] + for i, s in enumerate(geno_samples_normal_unfiltered_sample_ids): + if not any([sid in s for sid in capture_samples_duplex_sample_ids]): + deduped_ids.append(s) + deduped.append(geno_samples_normal_unfiltered[i]) + return deduped, deduped_ids + + +def split_duplex_simplex(files): + ''' + given a list split file paths using -simplex and -duplex root + :param files: a list of simplex/duplex file path + :return: two lists of file paths: one for simplex, one for duplex + ''' + simplex = [] + duplex = [] + for f in files: + if f.file_name.endswith('-simplex.bam'): + simplex.append(f) + if f.file_name.endswith('-duplex.bam'): + duplex.append(f) + else: + ValueError('Expecting a list of duplex and simplex bams') + return duplex, simplex + +def make_pairs(d, s): + paired = [] + for di in d: + for si in s: + if di.file_name.rstrip("-duplex.bam") == si.file_name.rstrip("-simplex.bam"): + paired.append((di, si)) + return paired + + +def parse_nucleo_output_ports(run, port_name): + bam_bai = Port.objects.get(name=port_name, run=run.pk) + if not len(bam_bai.files.all()) in [1, 2]: + raise Exception("Port {} for run {} should have just 1 bam or 1 (bam/bai) pair".format(port_name, run.id)) + bam = [b for b in bam_bai.files.all() if b.file_name.endswith(".bam")][0] + return bam + +def is_tumor_bam(file): + if not file.endswith(".bam"): + return False + t_n_timepoint = file.split("-")[2] + return not t_n_timepoint[0] == "N" + + +def get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, request_id=None): + """ + Find a matched normal sample for the given patient ID with the following precedence: + 1. Latest Matched Normal from IGO ACCESS samples (same request) + 2. Latest Matched Normal from IGO ACCESS samples (study level 06302_*) + 3. Latest Matched Normal from IGO ACCESS samples (any request) + 4. Latest Matched Normal from DMP ACCESS samples + 5. Latest Matched Normal from DMP IMPACT samples + 6. Return (None, ''), which will be used as a placeholder for skipping genotyping in the SNV pipeline + + Todo: generalize to work for duplex / simplex / standard, and use in MSI operator + + :param: patient_id - str Patient ID in CMO format (C-ABC123) + :param: request_id - str IGO request ID (06302_AA) + :return: (file_system.models.File - bam, str - sample_id) + """ + patient_normals_search = patient_id + NORMAL_SAMPLE_SEARCH + unfiltered_matched_normal_bam = None + unfiltered_matched_normal_sample_id = "" + warnings = [] + + # Case 1 + if request_id: + # Todo: Joining to Port -> Run makes this query slow, make use of output_metadata for requestId instead + for bam in fillout_unfiltered_normals: + if bam.file_name.startswith(patient_normals_search): + unfiltered_matched_normal_bam = bam + if unfiltered_matched_normal_bam: + unfiltered_matched_normal_bam = unfiltered_matched_normal_bam + unfiltered_matched_normal_sample_id = unfiltered_matched_normal_bam.file_name.replace(UNCOLLAPSED_BAM_STEM, "") + # Case 2 + if not request_id or not unfiltered_matched_normal_bam: + unfiltered_matched_normal_bam = ( + File.objects.filter(file_name__startswith=patient_normals_search, + file_name__endswith=IGO_UNFILTERED_REGEX, + port__run__tags__igoRequestId__startswith=request_id.split("_")[0] + ) + .order_by("-created_date") + .first() + ) - return bams + if unfiltered_matched_normal_bam: + unfiltered_matched_normal_bam = unfiltered_matched_normal_bam + unfiltered_matched_normal_sample_id = unfiltered_matched_normal_bam.file_name.rstrip(".bam") + # Case 3 + if not request_id or not unfiltered_matched_normal_bam: + unfiltered_matched_normal_bam = ( + File.objects.filter(file_name__startswith=patient_normals_search, + file_name__endswith=IGO_UNFILTERED_REGEX + ) + .order_by("-created_date") + .first() + ) + + if unfiltered_matched_normal_bam: + unfiltered_matched_normal_bam = unfiltered_matched_normal_bam + unfiltered_matched_normal_sample_id = unfiltered_matched_normal_bam.file_name.rstrip(".bam") + + # Case 4 + if not unfiltered_matched_normal_bam: + warnings.append( + "WARNING: Could not find matching IGO unfiltered normal bam file for patient {}. " + "Searching for DMP ACCESS sample." + ) + unfiltered_matched_normal_bam = ( + FileMetadata.objects.filter( + file__file_group=DMP_FILE_GROUP, + metadata__patient__cmo=patient_id.lstrip("C-"), + metadata__type="N", + metadata__assay='XS2', + file__path__endswith=DMP_REGEX, + ) + .order_by("-metadata__imported") + .first() + ) + if not unfiltered_matched_normal_bam: + unfiltered_matched_normal_bam = ( + FileMetadata.objects.filter( + file__file_group=DMP_FILE_GROUP, + metadata__patient__cmo=patient_id.lstrip("C-"), + metadata__type="N", + metadata__assay='XS1', + file__path__endswith=DMP_REGEX, + ) + .order_by("-metadata__imported") + .first() + ) + if unfiltered_matched_normal_bam: + unfiltered_matched_normal_bam = unfiltered_matched_normal_bam.file + unfiltered_matched_normal_sample_id = unfiltered_matched_normal_bam.file_name.rstrip(".bam") - def parse_nucleo_output_ports(run, port_name): - bam_bai = Port.objects.get(name=port_name, run=run.pk) - if not len(bam_bai.files.all()) in [1, 2]: - raise Exception("Port {} for run {} should have just 1 bam or 1 (bam/bai) pair".format(port_name, run.id)) - bam = [b for b in bam_bai.files.all() if b.file_name.endswith(".bam")][0] - return bam + + # Case 5 + if not unfiltered_matched_normal_bam: + warnings.append( + "WARNING: Could not find matching DMP ACCESS unfiltered normal bam file for patient {}. " + "Searching for DMP IMPACT sample." + ) + + unfiltered_matched_normal_bam = ( + FileMetadata.objects.filter( + metadata__cmo_assay__in=DMP_IMPACT_ASSAYS, + metadata__patient__cmo=patient_id.lstrip("C-"), + metadata__type="N", + ) + .order_by("-metadata__imported") + .first() + ) + if unfiltered_matched_normal_bam: + unfiltered_matched_normal_bam = unfiltered_matched_normal_bam.file + unfiltered_matched_normal_sample_id = unfiltered_matched_normal_bam.file_name.rstrip(".bam") + + # Case 6 + if not unfiltered_matched_normal_bam: + warnings.append( + "WARNING: Could not find DMP or IGO matching unfiltered normal bam file for patient {}. " + "We will skip matched normal genotyping for this sample." + ) + + unfiltered_matched_normal_bam = None + unfiltered_matched_normal_sample_id = "" - def is_tumor_bam(file): - if not file.endswith(".bam"): - return False - t_n_timepoint = file.split("-")[2] - return not t_n_timepoint[0] == "N" + for msg in warnings: + msg = msg.format(patient_id) + + return unfiltered_matched_normal_bam, unfiltered_matched_normal_sample_id + + +def get_normal_geno_samples(tumor_sample_id, matched_normal_unfiltered_id, fillout_unfiltered_normals): + """ + 20 first Normal fillout samples + + :return: + """ + geno_samples_normal_unfiltered = fillout_unfiltered_normals[:20] + patient_id = "-".join(tumor_sample_id.split("-")[0:2]) + if len(geno_samples_normal_unfiltered) < 20: + fillout_unfiltered_normals_ids = [s.file_name.replace(UNCOLLAPSED_BAM_STEM, "") for s in geno_samples_normal_unfiltered] + patient_normals_search = patient_id + NORMAL_SAMPLE_SEARCH + study_level_normals = list( + File.objects.filter(file_name__startswith=patient_normals_search, + file_name__endswith=IGO_UNFILTERED_REGEX, + port__run__tags__igoRequestId__startswith=request_id.split("_")[0] + ).exclude(file_name__in=fillout_unfiltered_normals_ids).all() + ) + geno_samples_normal_unfiltered = geno_samples_normal_unfiltered + study_level_normals + LOGGER.info( + "Adding {} fillout samples to Nucleovar run for sample {}:".format( + len(geno_samples_normal_unfiltered), tumor_sample_id + ) + ) + LOGGER.info([s.file_name for s in geno_samples_normal_unfiltered]) + + # Exclude matched normal bam + if matched_normal_unfiltered_id: + geno_samples_normal_unfiltered = [ + s for s in geno_samples_normal_unfiltered if not s.file_name.startswith(matched_normal_unfiltered_id) + ] + geno_samples_normal_unfiltered_sample_ids = [s.file_name.replace(UNCOLLAPSED_BAM_STEM, "") for s in geno_samples_normal_unfiltered] + return geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_ids + + +def get_geno_samples(tumor_sample_id, matched_normal_id, fillout_simplex_tumors, fillout_duplex_tumors): + """ + Use the initial fastq metadata to get the capture of the sample, + then, based on this capture ID, find tumor and matched normal simplex and duplex bams for genotyping + + Also include any tumor samples from the same patient + + Limits to 40 samples (or 80 bams, because each has Simplex and Duplex) + + Todo: put metadata on the Bams themselves + + :param tumor_sample_id: str + :return: + """ + + # Get capture ID + capture_id = None + sample_ids = [] + sample_fastq = FileRepository.filter( + file_type="fastq", metadata={settings.CMO_SAMPLE_NAME_METADATA_KEY: tumor_sample_id}, filter_redact=True + ) + if len(sample_fastq) >= 1: + capture_id = sample_fastq[0].metadata["captureName"] + + if capture_id: + # Get samples IDs from this capture from fastqs with this capture ID + sample_id_fastqs = FileRepository.filter( + file_type="fastq", metadata={"captureName": capture_id}, filter_redact=True + ) + sample_ids = list(set([f.metadata[settings.CMO_SAMPLE_NAME_METADATA_KEY] for f in sample_id_fastqs])) + # Don't double-genotype the main sample + sample_ids.remove(tumor_sample_id) + + if len(sample_ids) == 0 or not capture_id: + duplex_capture_samples = [] + simplex_capture_samples = [] + else: + capture_q = Q(*[("file_name__startswith", id) for id in sample_ids], _connector=Q.OR) + duplex_capture_q = Q(file_name__endswith=DUPLEX_BAM_SEARCH) & capture_q + simplex_capture_q = Q(file_name__endswith=SIMPLEX_BAM_SEARCH) & capture_q + + duplex_capture_samples = ( + File.objects.filter(duplex_capture_q) + .distinct("file_name") + .order_by("file_name", "-created_date") + .exclude(file_name__startswith=tumor_sample_id) + .exclude(file_name__startswith=matched_normal_id) + ) + simplex_capture_samples = ( + File.objects.filter(simplex_capture_q) + .distinct("file_name") + .order_by("file_name", "-created_date") + .exclude(file_name=tumor_sample_id) + .exclude(file_name__startswith=matched_normal_id) + ) + + duplex_capture_samples = list(duplex_capture_samples) + simplex_capture_samples = list(simplex_capture_samples) + # Add patient matched Tumors samples + patient_id = "-".join(tumor_sample_id.split("-")[0:2]) + matched_tumor_search = patient_id + TUMOR_SAMPLE_SEARCH + duplex_matched_q = Q(file_name__endswith=DUPLEX_BAM_SEARCH) & Q(file_name__startswith=matched_tumor_search) + simplex_matched_q = Q(file_name__endswith=SIMPLEX_BAM_SEARCH) & Q(file_name__startswith=matched_tumor_search) + + duplex_matched_samples = ( + File.objects.filter(duplex_matched_q) + .distinct("file_name") + .order_by("file_name", "-created_date") + .exclude(file_name__startswith=tumor_sample_id) + .exclude(file_name__startswith=matched_normal_id) + ) + simplex_matched_samples = ( + File.objects.filter(simplex_matched_q) + .distinct("file_name") + .order_by("file_name", "-created_date") + .exclude(file_name__startswith=tumor_sample_id) + .exclude(file_name__startswith=matched_normal_id) + ) + + duplex_geno_samples = list(duplex_matched_samples) + list(duplex_capture_samples) + simplex_geno_samples = list(simplex_matched_samples) + list(simplex_capture_samples) + if len(duplex_geno_samples) < 20: + num_geno_samples_to_add = 20 - len(duplex_geno_samples) + duplex_geno_samples_to_add = fillout_duplex_tumors[:num_geno_samples_to_add] + simplex_geno_samples_to_add = fillout_simplex_tumors[:num_geno_samples_to_add] + # Remove the main tumor sample + duplex_geno_samples_to_add = [s for s in duplex_geno_samples_to_add if s.file_name.replace(DUPLEX_BAM_STEM, "") != tumor_sample_id + ] + simplex_geno_samples_to_add = [s for s in simplex_geno_samples_to_add if s.file_name.replace(SIMPLEX_BAM_STEM, "") != tumor_sample_id] + duplex_geno_samples += duplex_geno_samples_to_add + simplex_geno_samples += simplex_geno_samples_to_add + + # Deduplicate based on PK + duplex_geno_samples = list(set(duplex_geno_samples)) + simplex_geno_samples = list(set(simplex_geno_samples)) + # Deduplicate based on file name + duplex_geno_samples, simplex_geno_samples = _remove_dups_by_file_name( + duplex_geno_samples, simplex_geno_samples + ) + + return duplex_geno_samples, simplex_geno_samples + +def get_dmp_matched_patient_geno_samples(patient_id): + """ + Find DMP ACCESS samples for genotyping + + :param patient_id: str - CMO sample ID (C-123ABC) + :return: (QuerySet - Duplex Bams, QuerySet - Simplex Bams, str[] duplex samples IDs, + str[] simplex sample IDs) + """ + matched_tumors_dmp = FileMetadata.objects.filter( + file__file_group=DMP_FILE_GROUP, + metadata__patient__cmo=patient_id.lstrip("C-"), + metadata__assay='XS2', + metadata__type="T", + file__path__endswith=DMP_REGEX, + ) + if not matched_tumors_dmp: + matched_tumors_dmp = FileMetadata.objects.filter( + file__file_group=DMP_FILE_GROUP, + metadata__patient__cmo=patient_id.lstrip("C-"), + metadata__assay='XS1', + metadata__type="T", + file__path__endswith=DMP_REGEX, + ) + matched_tumors_dmp_simplex = [b.file for b in matched_tumors_dmp] + matched_tumors_dmp_duplex = copy.deepcopy(matched_tumors_dmp_simplex) + + for d in matched_tumors_dmp_duplex: + d.file_name = d.file_name.replace('-standard', '-duplex') + for s in matched_tumors_dmp_simplex: + s.file_name = s.file_name.replace('-standard', '-simplex') + return ( + matched_tumors_dmp_duplex, + matched_tumors_dmp_simplex + ) +class NucleoVarOperator(Operator): + def find_request_bams(run): """ @@ -90,7 +436,7 @@ def find_request_bams(run): for o in nucleo_output_port_names: # We are running a multi-sample workflow on just one sample, # so we create single-element lists here - bam = self.parse_nucleo_output_ports(run, o) + bam = parse_nucleo_output_ports(run, o) bams[o] = bam return bams @@ -101,31 +447,7 @@ def find_curated_normal_bams(): Find curated normal bams from access curated bam file group :return: list of curated normal bams - """ - def split_duplex_simplex(files): - ''' - given a list split file paths using -simplex and -duplex root - :param files: a list of simplex/duplex file path - :return: two lists of file paths: one for simplex, one for duplex - ''' - simplex = [] - duplex = [] - for f in files: - if f.file_name.endswith('-simplex.bam'): - simplex.append(f) - if f.file_name.endswith('-duplex.bam'): - duplex.append(f) - else: - ValueError('Expecting a list of duplex and simplex bams') - return duplex, simplex - - def make_pairs(d, s): - paired = [] - for di in d: - for si in s: - if di.file_name.rstrip("-duplex.bam") == si.file_name.rstrip("-simplex.bam"): - paired.append((di, si)) - return paired + """ # Cache a set of fillout bams from this request for genotyping (we only need to do this query once) curated_normals_metadata = FileMetadata.objects.filter( file__file_group__slug=ACCESS_CURATED_BAMS_FILE_GROUP_SLUG @@ -135,7 +457,117 @@ def make_pairs(d, s): curated_normal_bams = make_pairs(d, s) return curated_normal_bams + def create_sample_info(tumor_sample_id, patient_id, fillout_unfiltered_normals, fillout_simplex_tumors, fillout_duplex_tumors): + """ + Query DB for all relevant files / metadata necessary for SNV pipeline input: + + - Tumor Duplex Bam + - Tumor Simplex Bam + - Matched Normal Unfiltered bam (from IGO / DMP or None) (external code) + - Other Tumor Duplex bams from same patient or capture (for genotyping) + - Other Tumor Simplex bams from same patient or capture (for genotyping) + + :return: + """ + # Get the Matched, Unfiltered, Normal BAM + matched_normal_unfiltered_bam, matched_normal_unfiltered_id = get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, request_id) + + # Get genotyping bams for Unfiltered Normal samples from the same Study + geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_ids = get_normal_geno_samples( + tumor_sample_id, matched_normal_unfiltered_id, fillout_unfiltered_normals + ) + # Get genotyping bams for Simplex and Duplex Tumor samples from the same Patient or in the same Capture + geno_samples_duplex, geno_samples_simplex = get_geno_samples( + tumor_sample_id, matched_normal_unfiltered_id, fillout_simplex_tumors, fillout_duplex_tumors + ) + capture_samples_duplex_sample_ids = [s.file_name.replace(DUPLEX_BAM_STEM, "") for s in geno_samples_duplex] + + capture_samples_simplex_sample_ids = [s.file_name.replace(SIMPLEX_BAM_STEM, "") for s in geno_samples_simplex] + # Use sample IDs to remove duplicates from normal geno samples + geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_ids = _remove_normal_dups( + geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_ids, capture_samples_duplex_sample_ids + ) + # SNV pipeline requires that all samples have simplex and duplex bams + if set(capture_samples_duplex_sample_ids) != set(capture_samples_simplex_sample_ids): + msg = "ACCESS SNV Operator Error: Duplex sample IDs not matched to Simplex sample IDs" + raise Exception(msg) + # Add in any DMP ACCESS samples + ( + dmp_matched_tumors_duplex, + dmp_matched_tumors_simplex + ) = get_dmp_matched_patient_geno_samples(patient_id) + geno_samples_duplex = geno_samples_duplex + dmp_matched_tumors_duplex + geno_samples_simplex = geno_samples_simplex + dmp_matched_tumors_simplex + geno_samples = make_pairs(geno_samples_duplex, geno_samples_simplex) + sample_info = { + "matched_normal_unfiltered": [matched_normal_unfiltered_bam], + "geno_samples": geno_samples, + "geno_samples_normal_unfiltered": geno_samples_normal_unfiltered, + } + return sample_info + + def mapping_bams(sample_info): + # sample_id,normal_path,duplex_path,simplex_path,type + # patient_id,sample_id,type,maf,standard_bam,standard_bai,duplex_bam,duplex_bai,simplex_bam,simplex_bai + bams = [] + aux_bams = [] + for key, value in sample_info.items(): + for v in value: + map = {} + if key == 'tumor_bam': + map['patient_id'] = '' + map['sample_id'] = v[0].file_name.replace(DUPLEX_BAM_STEM, '') + map['maf'] = '' + map['standard_bam'] = '' + map['standard_bai'] = '' + map['duplex_bam'] = _create_cwl_bam_object(v[0].path) + map['duplex_bai'] = _create_cwl_bam_object(v[0].path.replace('bam','bai')) + map['simplex_bam'] = _create_cwl_bam_object(v[1].path) + map['simplex_bai'] = _create_cwl_bam_object(v[1].path.replace('bam','bai')) + map['type'] = 'CASE' + bams.append(map) + if key == 'normal_bam': + map['patient_id'] = '' + map['sample_id'] = v.file_name.replace(UNCOLLAPSED_BAM_STEM, "") + map['maf'] = '' + map['standard_bam'] = _create_cwl_bam_object(v.path) + map['standard_bai'] = _create_cwl_bam_object(v.path.replace('bam','bai')) + map['duplex_bam'] = '' + map['duplex_bai'] = '' + map['simplex_bam'] = '' + map['simplex_bai'] = '' + map['type'] = 'CONTROL' + bams.append(map) + if key == 'geno_samples': + map['sample_id'] = v[0].file_name.replace(DUPLEX_BAM_STEM, "") + map['normal_path'] = '' + map['duplex_path'] = _create_cwl_bam_object(v[0].path) + map['simplex_path'] = _create_cwl_bam_object(v[1].path) + map['type'] = 'PLASMA' + aux_bams.append(map) + if key == 'geno_samples_normal_unfiltered': + map['sample_id'] = v.file_name.replace(UNCOLLAPSED_BAM_STEM, "") + map['normal_path'] = _create_cwl_bam_object(v.path) + map['duplex_path'] = '' + map['simplex_path'] = '' + map['type'] = 'UNMATCHED_NORMAL' + aux_bams.append(map) + if key == 'matched_normal_unfiltered': + map['sample_id'] = v.file_name.replace(UNCOLLAPSED_BAM_STEM, "") + map['normal_path'] = _create_cwl_bam_object(v.path) + map['duplex_path'] = '' + map['simplex_path'] = '' + map['type'] = 'MATCHED_NORMAL' + aux_bams.append(map) + bams = { + "input": bams, + } + aux_bams = { + "aux_bams": aux_bams, + } + return(bams, aux_bams) + def get_jobs(self): """ get_job information tor run NucleoVar Pipeline @@ -149,14 +581,15 @@ def get_jobs(self): pipeline = Pipeline.objects.get(id=app) output_directory = pipeline.output_directory run_date = datetime.now().strftime("%Y%m%d_%H:%M:%f") - # If no request_id, start happens from runs chaining - # else manual start + # If no request_id, get request id from run information + # else request_id given directly if not self.request_id: most_recent_runs_for_request = Run.objects.filter(pk__in=self.run_ids) self.request_id = most_recent_runs_for_request[0].tags["igoRequestId"] else: runs = self.get_request_id_runs(self.request_id, ["access v2 nucleo"]) - # Get Bams generate by Access V2 Nucleo + + # TUMOR AND NORMAL BAMS from the request access v2 nucleo run bams = [] for run in runs: bams.append(self.find_request_bams(run)) @@ -177,12 +610,45 @@ def get_jobs(self): # SAMPLE INFO sample_infos = [] - tumor_bams = tumor_bams[0:1] for d, s in tumor_bams: - sample_info = self.create_sample_info(d, s, bams) + tumor_sample_id = d.file_name.replace(DUPLEX_BAM_STEM, "") + patient_id = "-".join(tumor_sample_id.split("-")[0:2]) + sample_info = self.create_sample_info(tumor_sample_id, patient_id, fillout_unfiltered_normals, fillout_simplex_tumors, fillout_duplex_tumors) sample_info["normal_bam"] = [normal_bam] sample_info["tumor_bam"] = [(d, s)] sample_infos.append(sample_info) - + + # Build Ridgeback Jobs from Sample Info + # One job per Sample + jobs = [] + for s in sample_infos: + sample = d.file_name.replace(DUPLEX_BAM_STEM, "") + patient_id = "-".join(sample.split("-")[0:2]) + bams, aux_bams = self.mapping_bams(s) + input_json = { + "input": bams, + "aux_bams": aux_bams, + } + sample_metadata = { + settings.PATIENT_ID_METADATA_KEY: patient_id, + settings.REQUEST_ID_METADATA_KEY: self.request_id, + settings.CMO_SAMPLE_NAME_METADATA_KEY: sample, + } + job_tags = { + "pipeline": pipeline.name, + "pipeline_version": pipeline.version, + settings.PATIENT_ID_METADATA_KEY: patient_id, + settings.REQUEST_ID_METADATA_KEY: self.request_id, + settings.CMO_SAMPLE_NAME_METADATA_KEY: sample, + } + job_json = { + "name": "Nucleovar {sample}: {run_date}".format(sample=sample, run_date=run_date), + "app": app, + "inputs": input_json, + "tags": job_tags, + "output_directory": output_directory, + "output_metadata": sample_metadata + } + jobs.append(job_json) return [RunCreator(**job) for job in jobs] \ No newline at end of file From d909332cdaacd2a14e2dd6b5985fb0ad93e77677 Mon Sep 17 00:00:00 2001 From: buehlere Date: Thu, 12 Sep 2024 09:35:55 -0400 Subject: [PATCH 05/28] resource file update --- .../access/v2_1_0/nucleovar/access_nucleovar.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index ad27b4ef7..e22e097df 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -628,6 +628,17 @@ def get_jobs(self): input_json = { "input": bams, "aux_bams": aux_bams, + "fasta": "/juno/work/access/production/resources/reference/current/Homo_sapiens_assembly19.fasta", + "fai": "/juno/work/access/production/resources/reference/current/Homo_sapiens_assembly19.fasta.fai", + "dict": "/juno/work/access/production/resources/reference/current/Homo_sapiens_assembly19.dict", + "canonical_bed": "/juno/work/access/production/resources/msk-access/v1.0/regions_of_interest/versions/v1.0/MSK-ACCESS-v1_0panelA_canonicaltargets_500buffer.bed", + "target_bed": '/juno/work/access/production/resources/msk-access/v1.0/regions_of_interest/versions/v1.0/MSK-ACCESS-v1_0panelA_canonicaltargets_500buffer.bed', + "rules_json": "/juno/work/access/production/resources/nucleovar/rules.json", + "header_file": '/juno/work/access/production/resources/nucleovar/mutect_annotate_concat_header.txt', + "sample_order_file": '', + "blocklist": "/juno/work/access/production/resources/nucleovar/access_blocklist.txt", + "canonical_tx_ref": "/juno/work/access/production/resources/nucleovar/canonical_target_tx_ref.tsv" + } sample_metadata = { settings.PATIENT_ID_METADATA_KEY: patient_id, From e3ba888e12220e108a57430c8e3f55b35e558d58 Mon Sep 17 00:00:00 2001 From: buehlere Date: Thu, 12 Sep 2024 10:22:37 -0400 Subject: [PATCH 06/28] Update access_nucleovar.py --- .../v2_1_0/nucleovar/access_nucleovar.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index e22e097df..7c55c5786 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -568,6 +568,35 @@ def mapping_bams(sample_info): } return(bams, aux_bams) + + def get_request_id_runs(request_id, app): + """ + Get the latest completed bam-generation runs for the given request ID + + :param request_id: str - IGO request ID + :return: List[str] - List of most recent runs from given request ID + """ + # if not request_id: + # request_id_runs = Run.objects.filter(pk__in=self.run_ids) + # self.request_id = most_recent_runs_for_request[0].tags["igoRequestId"] + # else: + operator_run_id = ( + Run.objects.filter( + tags__igoRequestId=request_id, + app__name__in=app, + operator_run__status=RunStatus.COMPLETED, + ) + .exclude(finished_date__isnull=True) + .order_by("-finished_date") + .first() + .operator_run_id + ) + + request_id_runs = Run.objects.filter( + operator_run_id=operator_run_id, app__name__in=app, status=RunStatus.COMPLETED + ) + return request_id_runs + def get_jobs(self): """ get_job information tor run NucleoVar Pipeline From 4dc030e40c71919a6e21c0b787ec1255374c8475 Mon Sep 17 00:00:00 2001 From: buehlere Date: Thu, 12 Sep 2024 10:30:06 -0400 Subject: [PATCH 07/28] Update access_nucleovar.py --- runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index 7c55c5786..f0df796de 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -569,7 +569,7 @@ def mapping_bams(sample_info): return(bams, aux_bams) - def get_request_id_runs(request_id, app): + def get_request_id_runs(self, app): """ Get the latest completed bam-generation runs for the given request ID @@ -582,7 +582,7 @@ def get_request_id_runs(request_id, app): # else: operator_run_id = ( Run.objects.filter( - tags__igoRequestId=request_id, + tags__igoRequestId=self.request_id, app__name__in=app, operator_run__status=RunStatus.COMPLETED, ) @@ -616,7 +616,7 @@ def get_jobs(self): most_recent_runs_for_request = Run.objects.filter(pk__in=self.run_ids) self.request_id = most_recent_runs_for_request[0].tags["igoRequestId"] else: - runs = self.get_request_id_runs(self.request_id, ["access v2 nucleo"]) + runs = self.get_request_id_runs( ["access v2 nucleo"]) # TUMOR AND NORMAL BAMS from the request access v2 nucleo run bams = [] From fa1a9b2a227eb5ec6e446eee78c2f234b1ff452f Mon Sep 17 00:00:00 2001 From: buehlere Date: Thu, 12 Sep 2024 10:35:02 -0400 Subject: [PATCH 08/28] Update access_nucleovar.py --- runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index f0df796de..2360c6d21 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -616,7 +616,7 @@ def get_jobs(self): most_recent_runs_for_request = Run.objects.filter(pk__in=self.run_ids) self.request_id = most_recent_runs_for_request[0].tags["igoRequestId"] else: - runs = self.get_request_id_runs( ["access v2 nucleo"]) + runs = self.get_request_id_runs( ["access v2 nucleo", "access legacy"]) # TUMOR AND NORMAL BAMS from the request access v2 nucleo run bams = [] From 4831a9adf104156ee04dc7cdcfa3a69e08a21f7b Mon Sep 17 00:00:00 2001 From: buehlere Date: Thu, 12 Sep 2024 10:42:36 -0400 Subject: [PATCH 09/28] Update access_nucleovar.py --- .../v2_1_0/nucleovar/access_nucleovar.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index 2360c6d21..202ba475e 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -580,22 +580,21 @@ def get_request_id_runs(self, app): # request_id_runs = Run.objects.filter(pk__in=self.run_ids) # self.request_id = most_recent_runs_for_request[0].tags["igoRequestId"] # else: - operator_run_id = ( + most_recent_runs_for_request = ( Run.objects.filter( tags__igoRequestId=self.request_id, app__name__in=app, - operator_run__status=RunStatus.COMPLETED, + status=RunStatus.COMPLETED, + operator_run__status=RunStatus.COMPLETED ) - .exclude(finished_date__isnull=True) - .order_by("-finished_date") + .order_by("-created_date") .first() - .operator_run_id + .operator_run.runs.all() ) + if not len(most_recent_runs_for_request): + raise Exception("No matching Nucleo runs found for request {}".format(self.request_id)) - request_id_runs = Run.objects.filter( - operator_run_id=operator_run_id, app__name__in=app, status=RunStatus.COMPLETED - ) - return request_id_runs + return most_recent_runs_for_request def get_jobs(self): """ @@ -616,7 +615,7 @@ def get_jobs(self): most_recent_runs_for_request = Run.objects.filter(pk__in=self.run_ids) self.request_id = most_recent_runs_for_request[0].tags["igoRequestId"] else: - runs = self.get_request_id_runs( ["access v2 nucleo", "access legacy"]) + runs = self.get_request_id_runs(["access v2 nucleo", "access legacy"]) # TUMOR AND NORMAL BAMS from the request access v2 nucleo run bams = [] From 0c456510de26ef9a9d7d2c17389128675f8bad62 Mon Sep 17 00:00:00 2001 From: buehlere Date: Thu, 12 Sep 2024 10:52:32 -0400 Subject: [PATCH 10/28] Update access_nucleovar.py --- .../operator/access/v2_1_0/nucleovar/access_nucleovar.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index 202ba475e..f11bebd24 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -418,7 +418,7 @@ def get_dmp_matched_patient_geno_samples(patient_id): class NucleoVarOperator(Operator): - def find_request_bams(run): + def find_request_bams(self, run): """ Find simplex and duplex bams from a request's nucleo run - run_ids: run_ids from a request's nucleo run @@ -442,7 +442,7 @@ def find_request_bams(run): return bams - def find_curated_normal_bams(): + def find_curated_normal_bams(self): """ Find curated normal bams from access curated bam file group @@ -457,7 +457,7 @@ def find_curated_normal_bams(): curated_normal_bams = make_pairs(d, s) return curated_normal_bams - def create_sample_info(tumor_sample_id, patient_id, fillout_unfiltered_normals, fillout_simplex_tumors, fillout_duplex_tumors): + def create_sample_info(self, tumor_sample_id, patient_id, fillout_unfiltered_normals, fillout_simplex_tumors, fillout_duplex_tumors): """ Query DB for all relevant files / metadata necessary for SNV pipeline input: @@ -507,7 +507,7 @@ def create_sample_info(tumor_sample_id, patient_id, fillout_unfiltered_normals, return sample_info - def mapping_bams(sample_info): + def mapping_bams(self, sample_info): # sample_id,normal_path,duplex_path,simplex_path,type # patient_id,sample_id,type,maf,standard_bam,standard_bai,duplex_bam,duplex_bai,simplex_bam,simplex_bai bams = [] From 8cd9fdb5de10e54c5a16dbfed213a6bf9ad64944 Mon Sep 17 00:00:00 2001 From: buehlere Date: Thu, 12 Sep 2024 13:56:50 -0400 Subject: [PATCH 11/28] Update access_nucleovar.py --- .../operator/access/v2_1_0/nucleovar/access_nucleovar.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index f11bebd24..c8ffd6f68 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -457,7 +457,7 @@ def find_curated_normal_bams(self): curated_normal_bams = make_pairs(d, s) return curated_normal_bams - def create_sample_info(self, tumor_sample_id, patient_id, fillout_unfiltered_normals, fillout_simplex_tumors, fillout_duplex_tumors): + def create_sample_info(self, tumor_sample_id, patient_id, fillout_unfiltered_normals, fillout_simplex_tumors, fillout_duplex_tumors, curated_normal_bams): """ Query DB for all relevant files / metadata necessary for SNV pipeline input: @@ -502,7 +502,7 @@ def create_sample_info(self, tumor_sample_id, patient_id, fillout_unfiltered_nor sample_info = { "matched_normal_unfiltered": [matched_normal_unfiltered_bam], "geno_samples": geno_samples, - "geno_samples_normal_unfiltered": geno_samples_normal_unfiltered, + "geno_samples_normal_unfiltered": geno_samples_normal_unfiltered + curated_normal_bams, } return sample_info @@ -615,7 +615,7 @@ def get_jobs(self): most_recent_runs_for_request = Run.objects.filter(pk__in=self.run_ids) self.request_id = most_recent_runs_for_request[0].tags["igoRequestId"] else: - runs = self.get_request_id_runs(["access v2 nucleo", "access legacy"]) + runs = self.get_request_id_runs(["access v2 nucleo", "access nucleo"]) # TUMOR AND NORMAL BAMS from the request access v2 nucleo run bams = [] @@ -641,7 +641,7 @@ def get_jobs(self): for d, s in tumor_bams: tumor_sample_id = d.file_name.replace(DUPLEX_BAM_STEM, "") patient_id = "-".join(tumor_sample_id.split("-")[0:2]) - sample_info = self.create_sample_info(tumor_sample_id, patient_id, fillout_unfiltered_normals, fillout_simplex_tumors, fillout_duplex_tumors) + sample_info = self.create_sample_info(tumor_sample_id, patient_id, fillout_unfiltered_normals, fillout_simplex_tumors, fillout_duplex_tumors, curated_normal_bams) sample_info["normal_bam"] = [normal_bam] sample_info["tumor_bam"] = [(d, s)] sample_infos.append(sample_info) From e469ef60c14160335060c58956f531e9caec5943 Mon Sep 17 00:00:00 2001 From: buehlere Date: Thu, 12 Sep 2024 14:11:30 -0400 Subject: [PATCH 12/28] Update access_nucleovar.py --- runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py | 1 - 1 file changed, 1 deletion(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index c8ffd6f68..ef0517ef5 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -663,7 +663,6 @@ def get_jobs(self): "target_bed": '/juno/work/access/production/resources/msk-access/v1.0/regions_of_interest/versions/v1.0/MSK-ACCESS-v1_0panelA_canonicaltargets_500buffer.bed', "rules_json": "/juno/work/access/production/resources/nucleovar/rules.json", "header_file": '/juno/work/access/production/resources/nucleovar/mutect_annotate_concat_header.txt', - "sample_order_file": '', "blocklist": "/juno/work/access/production/resources/nucleovar/access_blocklist.txt", "canonical_tx_ref": "/juno/work/access/production/resources/nucleovar/canonical_target_tx_ref.tsv" From 3e2fc219a35cc339eead4882229c040ca7f03b75 Mon Sep 17 00:00:00 2001 From: buehlere Date: Thu, 12 Sep 2024 15:12:46 -0400 Subject: [PATCH 13/28] Update access_nucleovar.py --- runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index ef0517ef5..630220139 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -245,7 +245,7 @@ def get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, reques return unfiltered_matched_normal_bam, unfiltered_matched_normal_sample_id -def get_normal_geno_samples(tumor_sample_id, matched_normal_unfiltered_id, fillout_unfiltered_normals): +def get_normal_geno_samples(request_id, tumor_sample_id, matched_normal_unfiltered_id, fillout_unfiltered_normals): """ 20 first Normal fillout samples @@ -469,12 +469,13 @@ def create_sample_info(self, tumor_sample_id, patient_id, fillout_unfiltered_nor :return: """ + request_id = self.request_id # Get the Matched, Unfiltered, Normal BAM matched_normal_unfiltered_bam, matched_normal_unfiltered_id = get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, request_id) # Get genotyping bams for Unfiltered Normal samples from the same Study geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_ids = get_normal_geno_samples( - tumor_sample_id, matched_normal_unfiltered_id, fillout_unfiltered_normals + request_id, tumor_sample_id, matched_normal_unfiltered_id, fillout_unfiltered_normals ) # Get genotyping bams for Simplex and Duplex Tumor samples from the same Patient or in the same Capture geno_samples_duplex, geno_samples_simplex = get_geno_samples( From 354d3678c8c1c2641237c9b051aad7c7f59996d8 Mon Sep 17 00:00:00 2001 From: buehlere Date: Fri, 13 Sep 2024 10:45:52 -0400 Subject: [PATCH 14/28] Update access_nucleovar.py --- .../v2_1_0/nucleovar/access_nucleovar.py | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index 630220139..9158be92a 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -54,6 +54,30 @@ def _create_cwl_bam_object(bam): return {"class": "File", "location": "juno://" + bam} + +def _remove_dups_by_file_name(duplex_geno_samples, simplex_geno_samples): + """ + Simple util to avoid Genotyping same sample twice + (when bams come from different runs and can't be + de-duplicated based on PK) + + :return: + """ + duplex_geno_samples_dedup_ids = set() + duplex_geno_samples_dedup = [] + for s in duplex_geno_samples: + if not s.file_name in duplex_geno_samples_dedup_ids: + duplex_geno_samples_dedup_ids.add(s.file_name) + duplex_geno_samples_dedup.append(s) + simplex_geno_samples_dedup_ids = set() + simplex_geno_samples_dedup = [] + for s in simplex_geno_samples: + if not s.file_name in simplex_geno_samples_dedup_ids: + simplex_geno_samples_dedup_ids.add(s.file_name) + simplex_geno_samples_dedup.append(s) + return duplex_geno_samples_dedup, simplex_geno_samples_dedup + + def _remove_normal_dups( geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_ids, @@ -624,12 +648,12 @@ def get_jobs(self): bams.append(self.find_request_bams(run)) # TUMOR - tumor_bams = [(b['fgbio_filter_consensus_reads_duplex_bam'], b['fgbio_postprocessing_simplex_bam']) for b in bams if self.is_tumor_bam(b['fgbio_postprocessing_duplex_bam'].file_name)] + tumor_bams = [(b['fgbio_filter_consensus_reads_duplex_bam'], b['fgbio_postprocessing_simplex_bam']) for b in bams if is_tumor_bam(b['fgbio_postprocessing_duplex_bam'].file_name)] # FILLOUT NORMAL AND TUMOR - fillout_simplex_tumors = [b['fgbio_postprocessing_simplex_bam'] for b in bams if self.is_tumor_bam(b['fgbio_postprocessing_simplex_bam'].file_name)] - fillout_duplex_tumors = [b['fgbio_filter_consensus_reads_duplex_bam'] for b in bams if self.is_tumor_bam(b['fgbio_postprocessing_simplex_bam'].file_name)] - fillout_unfiltered_normals = [b['uncollapsed_bam'] for b in bams if not self.is_tumor_bam(b['uncollapsed_bam'].file_name)] + fillout_simplex_tumors = [b['fgbio_postprocessing_simplex_bam'] for b in bams if is_tumor_bam(b['fgbio_postprocessing_simplex_bam'].file_name)] + fillout_duplex_tumors = [b['fgbio_filter_consensus_reads_duplex_bam'] for b in bams if is_tumor_bam(b['fgbio_postprocessing_simplex_bam'].file_name)] + fillout_unfiltered_normals = [b['uncollapsed_bam'] for b in bams if not is_tumor_bam(b['uncollapsed_bam'].file_name)] # NORMAL BAM normal_bam = File.objects.filter(file_name=ACCESS_DEFAULT_NORMAL_FILENAME)[0] From c82d0c407c16b20f631dbae3bcfbc4e258ccdb25 Mon Sep 17 00:00:00 2001 From: buehlere Date: Fri, 13 Sep 2024 11:00:52 -0400 Subject: [PATCH 15/28] Update access_nucleovar.py --- runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index 9158be92a..a44a93de8 100644 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -648,11 +648,11 @@ def get_jobs(self): bams.append(self.find_request_bams(run)) # TUMOR - tumor_bams = [(b['fgbio_filter_consensus_reads_duplex_bam'], b['fgbio_postprocessing_simplex_bam']) for b in bams if is_tumor_bam(b['fgbio_postprocessing_duplex_bam'].file_name)] + tumor_bams = [(b['fgbio_filter_consensus_reads_duplex_bam'], b['fgbio_postprocessing_simplex_bam']) for b in bams if is_tumor_bam(b['fgbio_filter_consensus_reads_duplex_bam'].file_name)] # FILLOUT NORMAL AND TUMOR - fillout_simplex_tumors = [b['fgbio_postprocessing_simplex_bam'] for b in bams if is_tumor_bam(b['fgbio_postprocessing_simplex_bam'].file_name)] - fillout_duplex_tumors = [b['fgbio_filter_consensus_reads_duplex_bam'] for b in bams if is_tumor_bam(b['fgbio_postprocessing_simplex_bam'].file_name)] + fillout_simplex_tumors = [b['fgbio_postprocessing_simplex_bam'] for b in bams if is_tumor_bam(b['fgbio_filter_consensus_reads_duplex_bam'].file_name)] + fillout_duplex_tumors = [b['fgbio_filter_consensus_reads_duplex_bam'] for b in bams if is_tumor_bam(b['fgbio_filter_consensus_reads_duplex_bam'].file_name)] fillout_unfiltered_normals = [b['uncollapsed_bam'] for b in bams if not is_tumor_bam(b['uncollapsed_bam'].file_name)] # NORMAL BAM From 77a1a56c77ef1b88be593510e0f6497c58d4227a Mon Sep 17 00:00:00 2001 From: voyager pipeline user Date: Fri, 4 Oct 2024 11:30:43 -0400 Subject: [PATCH 16/28] working on dev --- runner/models.py | 1 + .../v2_1_0/nucleovar/access_nucleovar.py | 132 +++++++++++------- runner/pipeline/nextflow/nextflow_resolver.py | 37 ++++- runner/pipeline/pipeline_cache.py | 2 +- .../objects/nextflow/nextflow_port_object.py | 1 + .../objects/nextflow/nextflow_run_object.py | 6 +- 6 files changed, 122 insertions(+), 57 deletions(-) mode change 100644 => 100755 runner/models.py mode change 100644 => 100755 runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py mode change 100644 => 100755 runner/pipeline/nextflow/nextflow_resolver.py mode change 100644 => 100755 runner/pipeline/pipeline_cache.py mode change 100644 => 100755 runner/run/objects/nextflow/nextflow_port_object.py mode change 100644 => 100755 runner/run/objects/nextflow/nextflow_run_object.py diff --git a/runner/models.py b/runner/models.py old mode 100644 new mode 100755 index 9feaa9f5a..51085ffdd --- a/runner/models.py +++ b/runner/models.py @@ -75,6 +75,7 @@ class Pipeline(BaseModel): tool_walltime = models.IntegerField(blank=True, null=True) memlimit = models.CharField(blank=True, null=True, max_length=20) config = models.CharField(blank=True, null=True, max_length=3000, default=None) + nfcore_template = models.BooleanField(default=False) @property def pipeline_link(self): diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py old mode 100644 new mode 100755 index a44a93de8..07d53477a --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -19,12 +19,14 @@ from file_system.models import FileMetadata from runner.models import RunStatus, Port, Run import json +from file_system.models import File, FileGroup, FileType WORKDIR = os.path.dirname(os.path.abspath(__file__)) LOGGER = logging.getLogger(__name__) ACCESS_CURATED_BAMS_FILE_GROUP_SLUG = "access_curated_normals" ACCESS_DEFAULT_NORMAL_ID = "DONOR22-TP" -ACCESS_DEFAULT_NORMAL_FILENAME = "DONOR22-TP_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam" +ACCESS_DEFAULT_NORMAL_FILENAME_DUPLEX = "DONOR22-TP_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam" +ACCESS_DEFAULT_NORMAL_FILENAME_SIMPLEX = "DONOR22-TP_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-simplex.bam" NORMAL_SAMPLE_SEARCH = "-N0" TUMOR_SAMPLE_SEARCH = "-L0" DUPLEX_BAM_SEARCH = "__aln_srt_IR_FX-duplex.bam" @@ -40,9 +42,19 @@ DMP_FILE_GROUP = "d4775633-f53f-412f-afa5-46e9a86b654b" DUPLEX_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam" SIMPLEX_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-simplex.bam" -UNCOLLAPSED_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR" - - +UNCOLLAPSED_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR.bam" + +def register_file(file): + fname = os.path.basename(file) + file_group = FileGroup.objects.get(id=DMP_FILE_GROUP) + file_type = FileType.objects.get(name="bam") + try: + os.chmod(file, 0o777) + f = File(file_name=fname, path=file, file_type=file_type, file_group=file_group) + f.save() + return() + except Exception as e: + return() def _create_cwl_bam_object(bam): """ @@ -273,34 +285,42 @@ def get_normal_geno_samples(request_id, tumor_sample_id, matched_normal_unfilter """ 20 first Normal fillout samples + If less than 20 normal samples in the request, look for normals in the same study. + :return: """ geno_samples_normal_unfiltered = fillout_unfiltered_normals[:20] patient_id = "-".join(tumor_sample_id.split("-")[0:2]) + # If less than 20 normal samples in the request, look for normals in the same study if len(geno_samples_normal_unfiltered) < 20: fillout_unfiltered_normals_ids = [s.file_name.replace(UNCOLLAPSED_BAM_STEM, "") for s in geno_samples_normal_unfiltered] patient_normals_search = patient_id + NORMAL_SAMPLE_SEARCH + collected_normals = fillout_unfiltered_normals_ids + [matched_normal_unfiltered_id] + exclude_request_normals = Q() + for normal in collected_normals: + exclude_request_normals |= Q(file_name__startswith=normal) + study_search = Q(file_name__startswith=patient_normals_search, file_name__endswith=IGO_UNFILTERED_REGEX, port__run__tags__igoRequestId__startswith=request_id.split("_")[0]) study_level_normals = list( - File.objects.filter(file_name__startswith=patient_normals_search, - file_name__endswith=IGO_UNFILTERED_REGEX, - port__run__tags__igoRequestId__startswith=request_id.split("_")[0] - ).exclude(file_name__in=fillout_unfiltered_normals_ids).all() - ) + File.objects.filter(study_search) + .distinct("file_name") + .order_by("file_name", "-created_date") + .exclude(file_name__startswith=tumor_sample_id) + .exclude(exclude_request_normals)) geno_samples_normal_unfiltered = geno_samples_normal_unfiltered + study_level_normals + LOGGER.info( "Adding {} fillout samples to Nucleovar run for sample {}:".format( len(geno_samples_normal_unfiltered), tumor_sample_id ) ) - LOGGER.info([s.file_name for s in geno_samples_normal_unfiltered]) # Exclude matched normal bam if matched_normal_unfiltered_id: geno_samples_normal_unfiltered = [ s for s in geno_samples_normal_unfiltered if not s.file_name.startswith(matched_normal_unfiltered_id) ] - geno_samples_normal_unfiltered_sample_ids = [s.file_name.replace(UNCOLLAPSED_BAM_STEM, "") for s in geno_samples_normal_unfiltered] - return geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_ids + geno_samples_normal_unfiltered_sample_id = [s.file_name.replace(UNCOLLAPSED_BAM_STEM, "") for s in geno_samples_normal_unfiltered] + return geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_id def get_geno_samples(tumor_sample_id, matched_normal_id, fillout_simplex_tumors, fillout_duplex_tumors): @@ -433,8 +453,12 @@ def get_dmp_matched_patient_geno_samples(patient_id): for d in matched_tumors_dmp_duplex: d.file_name = d.file_name.replace('-standard', '-duplex') + d.path = d.path.replace('-standard', '-duplex') + register_file(d.path) for s in matched_tumors_dmp_simplex: s.file_name = s.file_name.replace('-standard', '-simplex') + s.path = s.path.replace('-standard', '-simplex') + register_file(s.path) return ( matched_tumors_dmp_duplex, matched_tumors_dmp_simplex @@ -521,13 +545,16 @@ def create_sample_info(self, tumor_sample_id, patient_id, fillout_unfiltered_nor dmp_matched_tumors_duplex, dmp_matched_tumors_simplex ) = get_dmp_matched_patient_geno_samples(patient_id) - geno_samples_duplex = geno_samples_duplex + dmp_matched_tumors_duplex - geno_samples_simplex = geno_samples_simplex + dmp_matched_tumors_simplex + #TODO not flipping file name + + geno_samples_duplex = geno_samples_duplex # + dmp_matched_tumors_duplex + geno_samples_simplex = geno_samples_simplex # + dmp_matched_tumors_simplex geno_samples = make_pairs(geno_samples_duplex, geno_samples_simplex) sample_info = { "matched_normal_unfiltered": [matched_normal_unfiltered_bam], "geno_samples": geno_samples, - "geno_samples_normal_unfiltered": geno_samples_normal_unfiltered + curated_normal_bams, + "geno_samples_normal_unfiltered": geno_samples_normal_unfiltered, + "curated_normal_bams": curated_normal_bams } return sample_info @@ -541,32 +568,35 @@ def mapping_bams(self, sample_info): for v in value: map = {} if key == 'tumor_bam': - map['patient_id'] = '' + map['patient_id'] = 'null' map['sample_id'] = v[0].file_name.replace(DUPLEX_BAM_STEM, '') - map['maf'] = '' - map['standard_bam'] = '' - map['standard_bai'] = '' + map['maf'] = 'null' + map['standard_bam'] = 'null' + map['standard_bai'] = 'null' map['duplex_bam'] = _create_cwl_bam_object(v[0].path) - map['duplex_bai'] = _create_cwl_bam_object(v[0].path.replace('bam','bai')) + map['duplex_bai'] = _create_cwl_bam_object(v[0].path.replace('.bam','.bai')) map['simplex_bam'] = _create_cwl_bam_object(v[1].path) - map['simplex_bai'] = _create_cwl_bam_object(v[1].path.replace('bam','bai')) + map['simplex_bai'] = _create_cwl_bam_object(v[1].path.replace('.bam','.bai')) map['type'] = 'CASE' bams.append(map) if key == 'normal_bam': - map['patient_id'] = '' - map['sample_id'] = v.file_name.replace(UNCOLLAPSED_BAM_STEM, "") - map['maf'] = '' - map['standard_bam'] = _create_cwl_bam_object(v.path) - map['standard_bai'] = _create_cwl_bam_object(v.path.replace('bam','bai')) - map['duplex_bam'] = '' - map['duplex_bai'] = '' - map['simplex_bam'] = '' - map['simplex_bai'] = '' + map['patient_id'] = 'null' + map['sample_id'] = v[0].file_name.replace("-TP_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam", '') + map['maf'] = 'null' + map['standard_bam'] = 'null' + map['standard_bai'] = 'null' + map['duplex_bam'] = _create_cwl_bam_object(v[0].path) + map['duplex_bai'] = _create_cwl_bam_object(v[0].path.replace('.bam','.bai')) + map['simplex_bam'] = _create_cwl_bam_object(v[1].path) + map['simplex_bai'] = _create_cwl_bam_object(v[1].path.replace('.bam','.bai')) map['type'] = 'CONTROL' bams.append(map) if key == 'geno_samples': - map['sample_id'] = v[0].file_name.replace(DUPLEX_BAM_STEM, "") - map['normal_path'] = '' + #TODO jsut do the replace here + sample_id = v[0].file_name.replace(DUPLEX_BAM_STEM, "") + sample_id = sample_id.replace(DMP_DUPLEX_REGEX, "") + map['sample_id'] = sample_id + map['normal_path'] = 'null' map['duplex_path'] = _create_cwl_bam_object(v[0].path) map['simplex_path'] = _create_cwl_bam_object(v[1].path) map['type'] = 'PLASMA' @@ -574,23 +604,24 @@ def mapping_bams(self, sample_info): if key == 'geno_samples_normal_unfiltered': map['sample_id'] = v.file_name.replace(UNCOLLAPSED_BAM_STEM, "") map['normal_path'] = _create_cwl_bam_object(v.path) - map['duplex_path'] = '' - map['simplex_path'] = '' + map['duplex_path'] = 'null' + map['simplex_path'] = 'null' map['type'] = 'UNMATCHED_NORMAL' aux_bams.append(map) + if key == 'curated_normal_bams': + map['sample_id'] = v[0].file_name.replace(DUPLEX_BAM_STEM, "") + map['normal_path'] = 'null' + map['duplex_path'] = _create_cwl_bam_object(v[0].path) + map['simplex_path'] = _create_cwl_bam_object(v[1].path) + map['type'] = 'CURATED' + aux_bams.append(map) if key == 'matched_normal_unfiltered': map['sample_id'] = v.file_name.replace(UNCOLLAPSED_BAM_STEM, "") map['normal_path'] = _create_cwl_bam_object(v.path) - map['duplex_path'] = '' - map['simplex_path'] = '' + map['duplex_path'] = 'null' + map['simplex_path'] = 'null' map['type'] = 'MATCHED_NORMAL' aux_bams.append(map) - bams = { - "input": bams, - } - aux_bams = { - "aux_bams": aux_bams, - } return(bams, aux_bams) @@ -637,8 +668,8 @@ def get_jobs(self): # If no request_id, get request id from run information # else request_id given directly if not self.request_id: - most_recent_runs_for_request = Run.objects.filter(pk__in=self.run_ids) - self.request_id = most_recent_runs_for_request[0].tags["igoRequestId"] + runs = Run.objects.filter(pk__in=self.run_ids) + self.request_id = RunStatus[0].tags["igoRequestId"] else: runs = self.get_request_id_runs(["access v2 nucleo", "access nucleo"]) @@ -656,7 +687,7 @@ def get_jobs(self): fillout_unfiltered_normals = [b['uncollapsed_bam'] for b in bams if not is_tumor_bam(b['uncollapsed_bam'].file_name)] # NORMAL BAM - normal_bam = File.objects.filter(file_name=ACCESS_DEFAULT_NORMAL_FILENAME)[0] + normal_bam = (File.objects.filter(file_name=ACCESS_DEFAULT_NORMAL_FILENAME_DUPLEX)[0], File.objects.filter(file_name=ACCESS_DEFAULT_NORMAL_FILENAME_SIMPLEX)[0]) # CURATED NORMAL curated_normal_bams = self.find_curated_normal_bams() @@ -675,7 +706,7 @@ def get_jobs(self): # One job per Sample jobs = [] for s in sample_infos: - sample = d.file_name.replace(DUPLEX_BAM_STEM, "") + sample = s["tumor_bam"][0][0].file_name.replace(DUPLEX_BAM_STEM, "") patient_id = "-".join(sample.split("-")[0:2]) bams, aux_bams = self.mapping_bams(s) input_json = { @@ -689,8 +720,10 @@ def get_jobs(self): "rules_json": "/juno/work/access/production/resources/nucleovar/rules.json", "header_file": '/juno/work/access/production/resources/nucleovar/mutect_annotate_concat_header.txt', "blocklist": "/juno/work/access/production/resources/nucleovar/access_blocklist.txt", - "canonical_tx_ref": "/juno/work/access/production/resources/nucleovar/canonical_target_tx_ref.tsv" - + "canonical_tx_ref": "/juno/work/access/production/resources/nucleovar/canonical_target_tx_ref.tsv", + "outdir": '/juno//work/access/production/runs/voyager/nucleo_qc', + "hotspots": '/juno/work/access/production/resources/nucleovar/hotspots.maf', + "annotator": "genomenexus" } sample_metadata = { settings.PATIENT_ID_METADATA_KEY: patient_id, @@ -713,5 +746,4 @@ def get_jobs(self): "output_metadata": sample_metadata } jobs.append(job_json) - return [RunCreator(**job) for job in jobs] - \ No newline at end of file + return [RunCreator(**job) for job in jobs] \ No newline at end of file diff --git a/runner/pipeline/nextflow/nextflow_resolver.py b/runner/pipeline/nextflow/nextflow_resolver.py old mode 100644 new mode 100755 index e7c42463e..7013424d1 --- a/runner/pipeline/nextflow/nextflow_resolver.py +++ b/runner/pipeline/nextflow/nextflow_resolver.py @@ -4,13 +4,44 @@ class NextflowResolver(PipelineResolver): - def __init__(self, github, entrypoint, version=None): + def __init__(self, github, entrypoint, nfcore_template, version=None): super().__init__(github, entrypoint, version) + self.nfcore_template = nfcore_template def resolve(self): dir = self._dir_name() location = self._git_clone(dir) - with open(os.path.join(location, "inputs.template.json"), "r") as f: - pipeline = json.load(f) + if self.nfcore_template: + # Check main schema for CLI inputs + with open(os.path.join(location, "nextflow_schema.json"), "r") as f: + nextflow_schema = json.load(f) + inputs = self.schemas2template(nextflow_schema, location) + pipeline = {"inputs": inputs } + else: + with open(os.path.join(location, "inputs.template.json"), "r") as f: + pipeline = json.load(f) self._cleanup(location) return pipeline + + def schemas2template(self, nextflow_schema, location): + # TODO Not sure if this is an exhaustive search of possible definitions + reference = nextflow_schema["definitions"]["reference_genome_options"]["properties"] + input = nextflow_schema["definitions"]["input_output_options"]["properties"] + properties = {**reference, **input} + # Check for sample-sheet CLI inputs + samplesheets = [key for key, val in properties.items() if val.get("format") == "sample-sheet"] + inputs = [{'id': key, 'schema': {'type': val.get("format")}} for key, val in properties.items() if val.get("format") != "sample-sheet"] + # Check Assets for sample sheet schemas + for schema in samplesheets: + with open(os.path.join(location, f'assets/schema_{schema}.json'), "r") as f: + nextflow_schema = json.load(f) + samplesheet_props = nextflow_schema['items']["properties"] + fields = [{'id': key, 'type': val.get("format")} for key, val in samplesheet_props.items()] + header = '\t'.join([f['id'] for f in fields]) + '\n' + body_start = f'{{{{#{schema}}}}}\n' + body_end = f'\n{{{{/{schema}}}}}' + body = '\t'.join([f'{{{{{f["id"]}}}}}' for f in fields]) + template = header + body_start + body + body_end + samplesheet_input = {'id': schema, 'schema': {'items': {"fields": fields}}, 'template': template} + inputs.append(samplesheet_input) + return inputs diff --git a/runner/pipeline/pipeline_cache.py b/runner/pipeline/pipeline_cache.py old mode 100644 new mode 100755 index cde2ba079..5299d94dd --- a/runner/pipeline/pipeline_cache.py +++ b/runner/pipeline/pipeline_cache.py @@ -16,7 +16,7 @@ def get_pipeline(pipeline): resolved_dict = _pipeline.get("app") else: resolver_class = PipelineCache._get_pipeline_resolver(pipeline.pipeline_type) - resolver = resolver_class(pipeline.github, pipeline.entrypoint, pipeline.version) + resolver = resolver_class(pipeline.github, pipeline.entrypoint, pipeline.nfcore_template, pipeline.version) resolved_dict = resolver.resolve() cache.set( pipeline.id, diff --git a/runner/run/objects/nextflow/nextflow_port_object.py b/runner/run/objects/nextflow/nextflow_port_object.py old mode 100644 new mode 100755 index 644e8f356..30afa978f --- a/runner/run/objects/nextflow/nextflow_port_object.py +++ b/runner/run/objects/nextflow/nextflow_port_object.py @@ -34,6 +34,7 @@ def from_definition(cls, run_id, value, port_type, port_values, notify=False): name = value.get("id") schema = value.get("schema") template = value.get("template") + logger = logging.getLogger(__name__) cls.logger.debug(template) port_type = port_type value = copy.deepcopy(port_values.get(name)) diff --git a/runner/run/objects/nextflow/nextflow_run_object.py b/runner/run/objects/nextflow/nextflow_run_object.py old mode 100644 new mode 100755 index 4ec6e3448..67eeb7b31 --- a/runner/run/objects/nextflow/nextflow_run_object.py +++ b/runner/run/objects/nextflow/nextflow_run_object.py @@ -7,10 +7,10 @@ from runner.models import Run, RunStatus, Port, PortType, ProtocolType from runner.run.objects.nextflow.nextflow_port_object import NextflowPortObject from runner.exceptions import PortProcessorException, RunCreateException, RunObjectConstructException - +logger = logging.getLogger(__name__) class NextflowRunObject(RunObject): - logger = logging.getLogger(__name__) + # logger = logging.getLogger(__name__) def __init__( self, @@ -69,7 +69,7 @@ def from_definition(cls, run_id, inputs): try: app = PipelineCache.get_pipeline(run.app) except Exception as e: - raise RunCreateException("Failed to create run. Failed to resolve CWL %s" % str(e)) + raise RunCreateException("Failed to create run. Failed to resolve Nextflow %s" % str(e)) try: input_ports = [ NextflowPortObject.from_definition(run_id, inp, PortType.INPUT, inputs) for inp in app.get("inputs", []) From 593baaed6ee0cea29efbe1eb187e88702d2b9fbe Mon Sep 17 00:00:00 2001 From: voyager pipeline user Date: Fri, 4 Oct 2024 11:41:29 -0400 Subject: [PATCH 17/28] revert v1 changes --- runner/operator/access/v1_0_0/cnv/__init__.py | 2 +- runner/operator/access/v1_0_0/legacy/__init__.py | 2 +- runner/operator/access/v1_0_0/msi/__init__.py | 2 +- runner/operator/access/v1_0_0/snps_and_indels/__init__.py | 2 +- runner/operator/access/v1_0_0/structural_variants/__init__.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/runner/operator/access/v1_0_0/cnv/__init__.py b/runner/operator/access/v1_0_0/cnv/__init__.py index 5327fcd88..46d930b89 100644 --- a/runner/operator/access/v1_0_0/cnv/__init__.py +++ b/runner/operator/access/v1_0_0/cnv/__init__.py @@ -43,7 +43,7 @@ def get_sample_inputs(self): :return: list of json_objects """ - run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id, ["access legacy", "access nucleo"])] + run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id)] # Get all unfiltered bam ports for these runs unfiltered_bam_ports = Port.objects.filter( diff --git a/runner/operator/access/v1_0_0/legacy/__init__.py b/runner/operator/access/v1_0_0/legacy/__init__.py index 23a7cc51d..bf7ae3b28 100644 --- a/runner/operator/access/v1_0_0/legacy/__init__.py +++ b/runner/operator/access/v1_0_0/legacy/__init__.py @@ -235,7 +235,7 @@ def construct_sample_inputs(samples, request_id, group_id): class AccessLegacyOperator(Operator): def get_jobs(self): - run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id, ["access legacy", "access nucleo"])] + run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id)] ports = Port.objects.filter(run_id__in=run_ids, port_type=PortType.OUTPUT) data = [ diff --git a/runner/operator/access/v1_0_0/msi/__init__.py b/runner/operator/access/v1_0_0/msi/__init__.py index 796a346a1..c9e5cb71d 100644 --- a/runner/operator/access/v1_0_0/msi/__init__.py +++ b/runner/operator/access/v1_0_0/msi/__init__.py @@ -51,7 +51,7 @@ def get_sample_inputs(self): :return: list of json_objects """ - run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id, ["access legacy", "access nucleo"])] + run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id)] # Get all standard bam ports for these runs standard_bam_ports = Port.objects.filter( diff --git a/runner/operator/access/v1_0_0/snps_and_indels/__init__.py b/runner/operator/access/v1_0_0/snps_and_indels/__init__.py index e24275695..274835e96 100644 --- a/runner/operator/access/v1_0_0/snps_and_indels/__init__.py +++ b/runner/operator/access/v1_0_0/snps_and_indels/__init__.py @@ -57,7 +57,7 @@ def get_sample_inputs(self): :return: list of json_objects """ - run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id, ["access legacy", "access nucleo"])] + run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id)] # Get all duplex / simplex bam ports for these runs duplex_output_ports = Port.objects.filter( diff --git a/runner/operator/access/v1_0_0/structural_variants/__init__.py b/runner/operator/access/v1_0_0/structural_variants/__init__.py index 7ae407b50..77e04b05f 100644 --- a/runner/operator/access/v1_0_0/structural_variants/__init__.py +++ b/runner/operator/access/v1_0_0/structural_variants/__init__.py @@ -41,7 +41,7 @@ def get_sample_inputs(self): :return: list of json_objects """ - run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id, ["access legacy", "access nucleo"])] + run_ids = self.run_ids if self.run_ids else [r.id for r in get_request_id_runs(self.request_id)] # Get all standard bam ports for these runs standard_bam_ports = Port.objects.filter( From 88ca87f088077ac658cf443aafdf85bf89e74911 Mon Sep 17 00:00:00 2001 From: voyager pipeline user Date: Fri, 4 Oct 2024 11:42:18 -0400 Subject: [PATCH 18/28] revert access init --- .../0062_pipeline_nfcore_template.py | 18 ++++++++++++++++++ runner/operator/access/__init__.py | 6 +++--- 2 files changed, 21 insertions(+), 3 deletions(-) create mode 100644 runner/migrations/0062_pipeline_nfcore_template.py diff --git a/runner/migrations/0062_pipeline_nfcore_template.py b/runner/migrations/0062_pipeline_nfcore_template.py new file mode 100644 index 000000000..5736a57f2 --- /dev/null +++ b/runner/migrations/0062_pipeline_nfcore_template.py @@ -0,0 +1,18 @@ +# Generated by Django 2.2.28 on 2024-09-26 19:28 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('runner', '0061_auto_20240820_1837'), + ] + + operations = [ + migrations.AddField( + model_name='pipeline', + name='nfcore_template', + field=models.BooleanField(default=False), + ), + ] diff --git a/runner/operator/access/__init__.py b/runner/operator/access/__init__.py index f71fa6e7f..2ffa3f994 100644 --- a/runner/operator/access/__init__.py +++ b/runner/operator/access/__init__.py @@ -29,7 +29,7 @@ def get_request_id(run_ids, request_id=None): raise Exception("Could not get find request id") -def get_request_id_runs(request_id, app): +def get_request_id_runs(request_id): """ Get the latest completed bam-generation runs for the given request ID @@ -39,7 +39,7 @@ def get_request_id_runs(request_id, app): operator_run_id = ( Run.objects.filter( tags__igoRequestId=request_id, - app__name__in=app, + app__name__in=["access legacy", "access nucleo"], operator_run__status=RunStatus.COMPLETED, ) .exclude(finished_date__isnull=True) @@ -49,7 +49,7 @@ def get_request_id_runs(request_id, app): ) request_id_runs = Run.objects.filter( - operator_run_id=operator_run_id, app__name__in=app, status=RunStatus.COMPLETED + operator_run_id=operator_run_id, app__name__in=["access legacy", "access nucleo"], status=RunStatus.COMPLETED ) return request_id_runs From 8533d56470738dee11f7554efc9a3924508eaacd Mon Sep 17 00:00:00 2001 From: voyager pipeline user Date: Mon, 21 Oct 2024 13:06:11 -0400 Subject: [PATCH 19/28] working submissions --- runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py | 6 ++---- runner/pipeline/nextflow/nextflow_resolver.py | 1 + runner/pipeline/pipeline_cache.py | 1 + runner/run/objects/nextflow/nextflow_run_object.py | 1 + 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index 07d53477a..67337acca 100755 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -663,7 +663,7 @@ def get_jobs(self): LOGGER.info("Operator JobGroupNotifer ID %s", self.job_group_notifier_id) app = self.get_pipeline_id() pipeline = Pipeline.objects.get(id=app) - output_directory = pipeline.output_directory + # output_directory = pipeline.output_directory run_date = datetime.now().strftime("%Y%m%d_%H:%M:%f") # If no request_id, get request id from run information # else request_id given directly @@ -718,10 +718,9 @@ def get_jobs(self): "canonical_bed": "/juno/work/access/production/resources/msk-access/v1.0/regions_of_interest/versions/v1.0/MSK-ACCESS-v1_0panelA_canonicaltargets_500buffer.bed", "target_bed": '/juno/work/access/production/resources/msk-access/v1.0/regions_of_interest/versions/v1.0/MSK-ACCESS-v1_0panelA_canonicaltargets_500buffer.bed', "rules_json": "/juno/work/access/production/resources/nucleovar/rules.json", - "header_file": '/juno/work/access/production/resources/nucleovar/mutect_annotate_concat_header.txt', + "header_file": '/juno/work/access/production/resources/nucleovar/mutect1_annotate_concat_header.txt', "blocklist": "/juno/work/access/production/resources/nucleovar/access_blocklist.txt", "canonical_tx_ref": "/juno/work/access/production/resources/nucleovar/canonical_target_tx_ref.tsv", - "outdir": '/juno//work/access/production/runs/voyager/nucleo_qc', "hotspots": '/juno/work/access/production/resources/nucleovar/hotspots.maf', "annotator": "genomenexus" } @@ -742,7 +741,6 @@ def get_jobs(self): "app": app, "inputs": input_json, "tags": job_tags, - "output_directory": output_directory, "output_metadata": sample_metadata } jobs.append(job_json) diff --git a/runner/pipeline/nextflow/nextflow_resolver.py b/runner/pipeline/nextflow/nextflow_resolver.py index 7013424d1..8a51606f9 100755 --- a/runner/pipeline/nextflow/nextflow_resolver.py +++ b/runner/pipeline/nextflow/nextflow_resolver.py @@ -25,6 +25,7 @@ def resolve(self): def schemas2template(self, nextflow_schema, location): # TODO Not sure if this is an exhaustive search of possible definitions + # TODO parse everything in definitions reference = nextflow_schema["definitions"]["reference_genome_options"]["properties"] input = nextflow_schema["definitions"]["input_output_options"]["properties"] properties = {**reference, **input} diff --git a/runner/pipeline/pipeline_cache.py b/runner/pipeline/pipeline_cache.py index 5299d94dd..d4245f0bb 100755 --- a/runner/pipeline/pipeline_cache.py +++ b/runner/pipeline/pipeline_cache.py @@ -25,6 +25,7 @@ def get_pipeline(pipeline): "github": pipeline.github, "entrypoint": pipeline.entrypoint, "version": pipeline.version, + "nfcore_template": pipeline.nfcore_template }, ) return resolved_dict diff --git a/runner/run/objects/nextflow/nextflow_run_object.py b/runner/run/objects/nextflow/nextflow_run_object.py index 67eeb7b31..2a2a55fab 100755 --- a/runner/run/objects/nextflow/nextflow_run_object.py +++ b/runner/run/objects/nextflow/nextflow_run_object.py @@ -176,6 +176,7 @@ def dump_job(self, output_directory=None, log_directory=None): "repository": self.run_obj.app.github, "entrypoint": self.run_obj.app.entrypoint, "version": self.run_obj.app.version, + "nfcore_template": self.run_obj.app.nfcore_template } } inputs = dict() From 3329a454ddd957ea39112b7616144b26c1412b3f Mon Sep 17 00:00:00 2001 From: buehlere Date: Mon, 21 Oct 2024 13:28:17 -0400 Subject: [PATCH 20/28] linting --- .../v2_1_0/nucleovar/access_nucleovar.py | 369 ++++++++++-------- runner/pipeline/nextflow/nextflow_resolver.py | 32 +- runner/pipeline/pipeline_cache.py | 2 +- .../objects/nextflow/nextflow_run_object.py | 4 +- 4 files changed, 228 insertions(+), 179 deletions(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index 67337acca..fe2a212a3 100755 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -18,7 +18,7 @@ from runner.models import Port, RunStatus from file_system.models import FileMetadata from runner.models import RunStatus, Port, Run -import json +import json from file_system.models import File, FileGroup, FileType WORKDIR = os.path.dirname(os.path.abspath(__file__)) @@ -36,7 +36,7 @@ DMP_SIMPLEX_REGEX = "-simplex.bam" DMP_UNFILTERED_REGEX = "-unfilter.bam" DMP_REGEX = "-standard.bam" -IGO_UNFILTERED_REGEX = "__aln_srt_IR_FX.bam" +IGO_UNFILTERED_REGEX = "__aln_srt_IR_FX.bam" ACCESS_ASSAY_HC = "HC_ACCESS" DMP_IMPACT_ASSAYS = ["IMPACT341", "IMPACT410", "IMPACT468", "hemepact_v4"] DMP_FILE_GROUP = "d4775633-f53f-412f-afa5-46e9a86b654b" @@ -44,6 +44,7 @@ SIMPLEX_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-simplex.bam" UNCOLLAPSED_BAM_STEM = "_cl_aln_srt_MD_IR_FX_BR.bam" + def register_file(file): fname = os.path.basename(file) file_group = FileGroup.objects.get(id=DMP_FILE_GROUP) @@ -52,9 +53,10 @@ def register_file(file): os.chmod(file, 0o777) f = File(file_name=fname, path=file, file_type=file_type, file_group=file_group) f.save() - return() + return () except Exception as e: - return() + return () + def _create_cwl_bam_object(bam): """ @@ -66,7 +68,6 @@ def _create_cwl_bam_object(bam): return {"class": "File", "location": "juno://" + bam} - def _remove_dups_by_file_name(duplex_geno_samples, simplex_geno_samples): """ Simple util to avoid Genotyping same sample twice @@ -114,22 +115,23 @@ def _remove_normal_dups( def split_duplex_simplex(files): - ''' + """ given a list split file paths using -simplex and -duplex root - :param files: a list of simplex/duplex file path + :param files: a list of simplex/duplex file path :return: two lists of file paths: one for simplex, one for duplex - ''' - simplex = [] - duplex = [] + """ + simplex = [] + duplex = [] for f in files: - if f.file_name.endswith('-simplex.bam'): + if f.file_name.endswith("-simplex.bam"): simplex.append(f) - if f.file_name.endswith('-duplex.bam'): + if f.file_name.endswith("-duplex.bam"): duplex.append(f) else: - ValueError('Expecting a list of duplex and simplex bams') + ValueError("Expecting a list of duplex and simplex bams") return duplex, simplex + def make_pairs(d, s): paired = [] for di in d: @@ -146,6 +148,7 @@ def parse_nucleo_output_ports(run, port_name): bam = [b for b in bam_bai.files.all() if b.file_name.endswith(".bam")][0] return bam + def is_tumor_bam(file): if not file.endswith(".bam"): return False @@ -159,7 +162,7 @@ def get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, reques 1. Latest Matched Normal from IGO ACCESS samples (same request) 2. Latest Matched Normal from IGO ACCESS samples (study level 06302_*) 3. Latest Matched Normal from IGO ACCESS samples (any request) - 4. Latest Matched Normal from DMP ACCESS samples + 4. Latest Matched Normal from DMP ACCESS samples 5. Latest Matched Normal from DMP IMPACT samples 6. Return (None, ''), which will be used as a placeholder for skipping genotyping in the SNV pipeline @@ -182,31 +185,32 @@ def get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, reques unfiltered_matched_normal_bam = bam if unfiltered_matched_normal_bam: unfiltered_matched_normal_bam = unfiltered_matched_normal_bam - unfiltered_matched_normal_sample_id = unfiltered_matched_normal_bam.file_name.replace(UNCOLLAPSED_BAM_STEM, "") + unfiltered_matched_normal_sample_id = unfiltered_matched_normal_bam.file_name.replace( + UNCOLLAPSED_BAM_STEM, "" + ) # Case 2 if not request_id or not unfiltered_matched_normal_bam: unfiltered_matched_normal_bam = ( - File.objects.filter(file_name__startswith=patient_normals_search, - file_name__endswith=IGO_UNFILTERED_REGEX, - port__run__tags__igoRequestId__startswith=request_id.split("_")[0] - ) + File.objects.filter( + file_name__startswith=patient_normals_search, + file_name__endswith=IGO_UNFILTERED_REGEX, + port__run__tags__igoRequestId__startswith=request_id.split("_")[0], + ) .order_by("-created_date") .first() ) - + if unfiltered_matched_normal_bam: unfiltered_matched_normal_bam = unfiltered_matched_normal_bam unfiltered_matched_normal_sample_id = unfiltered_matched_normal_bam.file_name.rstrip(".bam") # Case 3 if not request_id or not unfiltered_matched_normal_bam: unfiltered_matched_normal_bam = ( - File.objects.filter(file_name__startswith=patient_normals_search, - file_name__endswith=IGO_UNFILTERED_REGEX - ) + File.objects.filter(file_name__startswith=patient_normals_search, file_name__endswith=IGO_UNFILTERED_REGEX) .order_by("-created_date") .first() ) - + if unfiltered_matched_normal_bam: unfiltered_matched_normal_bam = unfiltered_matched_normal_bam unfiltered_matched_normal_sample_id = unfiltered_matched_normal_bam.file_name.rstrip(".bam") @@ -222,7 +226,7 @@ def get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, reques file__file_group=DMP_FILE_GROUP, metadata__patient__cmo=patient_id.lstrip("C-"), metadata__type="N", - metadata__assay='XS2', + metadata__assay="XS2", file__path__endswith=DMP_REGEX, ) .order_by("-metadata__imported") @@ -234,7 +238,7 @@ def get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, reques file__file_group=DMP_FILE_GROUP, metadata__patient__cmo=patient_id.lstrip("C-"), metadata__type="N", - metadata__assay='XS1', + metadata__assay="XS1", file__path__endswith=DMP_REGEX, ) .order_by("-metadata__imported") @@ -244,7 +248,6 @@ def get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, reques unfiltered_matched_normal_bam = unfiltered_matched_normal_bam.file unfiltered_matched_normal_sample_id = unfiltered_matched_normal_bam.file_name.rstrip(".bam") - # Case 5 if not unfiltered_matched_normal_bam: warnings.append( @@ -274,7 +277,7 @@ def get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, reques unfiltered_matched_normal_bam = None unfiltered_matched_normal_sample_id = "" - + for msg in warnings: msg = msg.format(patient_id) @@ -293,21 +296,28 @@ def get_normal_geno_samples(request_id, tumor_sample_id, matched_normal_unfilter patient_id = "-".join(tumor_sample_id.split("-")[0:2]) # If less than 20 normal samples in the request, look for normals in the same study if len(geno_samples_normal_unfiltered) < 20: - fillout_unfiltered_normals_ids = [s.file_name.replace(UNCOLLAPSED_BAM_STEM, "") for s in geno_samples_normal_unfiltered] + fillout_unfiltered_normals_ids = [ + s.file_name.replace(UNCOLLAPSED_BAM_STEM, "") for s in geno_samples_normal_unfiltered + ] patient_normals_search = patient_id + NORMAL_SAMPLE_SEARCH collected_normals = fillout_unfiltered_normals_ids + [matched_normal_unfiltered_id] exclude_request_normals = Q() for normal in collected_normals: exclude_request_normals |= Q(file_name__startswith=normal) - study_search = Q(file_name__startswith=patient_normals_search, file_name__endswith=IGO_UNFILTERED_REGEX, port__run__tags__igoRequestId__startswith=request_id.split("_")[0]) + study_search = Q( + file_name__startswith=patient_normals_search, + file_name__endswith=IGO_UNFILTERED_REGEX, + port__run__tags__igoRequestId__startswith=request_id.split("_")[0], + ) study_level_normals = list( File.objects.filter(study_search) - .distinct("file_name") - .order_by("file_name", "-created_date") - .exclude(file_name__startswith=tumor_sample_id) - .exclude(exclude_request_normals)) + .distinct("file_name") + .order_by("file_name", "-created_date") + .exclude(file_name__startswith=tumor_sample_id) + .exclude(exclude_request_normals) + ) geno_samples_normal_unfiltered = geno_samples_normal_unfiltered + study_level_normals - + LOGGER.info( "Adding {} fillout samples to Nucleovar run for sample {}:".format( len(geno_samples_normal_unfiltered), tumor_sample_id @@ -319,7 +329,9 @@ def get_normal_geno_samples(request_id, tumor_sample_id, matched_normal_unfilter geno_samples_normal_unfiltered = [ s for s in geno_samples_normal_unfiltered if not s.file_name.startswith(matched_normal_unfiltered_id) ] - geno_samples_normal_unfiltered_sample_id = [s.file_name.replace(UNCOLLAPSED_BAM_STEM, "") for s in geno_samples_normal_unfiltered] + geno_samples_normal_unfiltered_sample_id = [ + s.file_name.replace(UNCOLLAPSED_BAM_STEM, "") for s in geno_samples_normal_unfiltered + ] return geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_id @@ -355,7 +367,7 @@ def get_geno_samples(tumor_sample_id, matched_normal_id, fillout_simplex_tumors, sample_ids = list(set([f.metadata[settings.CMO_SAMPLE_NAME_METADATA_KEY] for f in sample_id_fastqs])) # Don't double-genotype the main sample sample_ids.remove(tumor_sample_id) - + if len(sample_ids) == 0 or not capture_id: duplex_capture_samples = [] simplex_capture_samples = [] @@ -409,9 +421,12 @@ def get_geno_samples(tumor_sample_id, matched_normal_id, fillout_simplex_tumors, duplex_geno_samples_to_add = fillout_duplex_tumors[:num_geno_samples_to_add] simplex_geno_samples_to_add = fillout_simplex_tumors[:num_geno_samples_to_add] # Remove the main tumor sample - duplex_geno_samples_to_add = [s for s in duplex_geno_samples_to_add if s.file_name.replace(DUPLEX_BAM_STEM, "") != tumor_sample_id + duplex_geno_samples_to_add = [ + s for s in duplex_geno_samples_to_add if s.file_name.replace(DUPLEX_BAM_STEM, "") != tumor_sample_id + ] + simplex_geno_samples_to_add = [ + s for s in simplex_geno_samples_to_add if s.file_name.replace(SIMPLEX_BAM_STEM, "") != tumor_sample_id ] - simplex_geno_samples_to_add = [s for s in simplex_geno_samples_to_add if s.file_name.replace(SIMPLEX_BAM_STEM, "") != tumor_sample_id] duplex_geno_samples += duplex_geno_samples_to_add simplex_geno_samples += simplex_geno_samples_to_add @@ -419,12 +434,11 @@ def get_geno_samples(tumor_sample_id, matched_normal_id, fillout_simplex_tumors, duplex_geno_samples = list(set(duplex_geno_samples)) simplex_geno_samples = list(set(simplex_geno_samples)) # Deduplicate based on file name - duplex_geno_samples, simplex_geno_samples = _remove_dups_by_file_name( - duplex_geno_samples, simplex_geno_samples - ) + duplex_geno_samples, simplex_geno_samples = _remove_dups_by_file_name(duplex_geno_samples, simplex_geno_samples) return duplex_geno_samples, simplex_geno_samples + def get_dmp_matched_patient_geno_samples(patient_id): """ Find DMP ACCESS samples for genotyping @@ -434,37 +448,35 @@ def get_dmp_matched_patient_geno_samples(patient_id): str[] simplex sample IDs) """ matched_tumors_dmp = FileMetadata.objects.filter( - file__file_group=DMP_FILE_GROUP, - metadata__patient__cmo=patient_id.lstrip("C-"), - metadata__assay='XS2', - metadata__type="T", - file__path__endswith=DMP_REGEX, - ) + file__file_group=DMP_FILE_GROUP, + metadata__patient__cmo=patient_id.lstrip("C-"), + metadata__assay="XS2", + metadata__type="T", + file__path__endswith=DMP_REGEX, + ) if not matched_tumors_dmp: - matched_tumors_dmp = FileMetadata.objects.filter( - file__file_group=DMP_FILE_GROUP, - metadata__patient__cmo=patient_id.lstrip("C-"), - metadata__assay='XS1', - metadata__type="T", - file__path__endswith=DMP_REGEX, - ) + matched_tumors_dmp = FileMetadata.objects.filter( + file__file_group=DMP_FILE_GROUP, + metadata__patient__cmo=patient_id.lstrip("C-"), + metadata__assay="XS1", + metadata__type="T", + file__path__endswith=DMP_REGEX, + ) matched_tumors_dmp_simplex = [b.file for b in matched_tumors_dmp] matched_tumors_dmp_duplex = copy.deepcopy(matched_tumors_dmp_simplex) - + for d in matched_tumors_dmp_duplex: - d.file_name = d.file_name.replace('-standard', '-duplex') - d.path = d.path.replace('-standard', '-duplex') + d.file_name = d.file_name.replace("-standard", "-duplex") + d.path = d.path.replace("-standard", "-duplex") register_file(d.path) for s in matched_tumors_dmp_simplex: - s.file_name = s.file_name.replace('-standard', '-simplex') - s.path = s.path.replace('-standard', '-simplex') + s.file_name = s.file_name.replace("-standard", "-simplex") + s.path = s.path.replace("-standard", "-simplex") register_file(s.path) - return ( - matched_tumors_dmp_duplex, - matched_tumors_dmp_simplex - ) -class NucleoVarOperator(Operator): + return (matched_tumors_dmp_duplex, matched_tumors_dmp_simplex) + +class NucleoVarOperator(Operator): def find_request_bams(self, run): """ @@ -486,26 +498,33 @@ def find_request_bams(self, run): # so we create single-element lists here bam = parse_nucleo_output_ports(run, o) bams[o] = bam - + return bams - def find_curated_normal_bams(self): """ Find curated normal bams from access curated bam file group - :return: list of curated normal bams - """ + :return: list of curated normal bams + """ # Cache a set of fillout bams from this request for genotyping (we only need to do this query once) curated_normals_metadata = FileMetadata.objects.filter( file__file_group__slug=ACCESS_CURATED_BAMS_FILE_GROUP_SLUG ) curated_normal_bams = [f.file for f in curated_normals_metadata] - d,s = split_duplex_simplex(curated_normal_bams) + d, s = split_duplex_simplex(curated_normal_bams) curated_normal_bams = make_pairs(d, s) return curated_normal_bams - - def create_sample_info(self, tumor_sample_id, patient_id, fillout_unfiltered_normals, fillout_simplex_tumors, fillout_duplex_tumors, curated_normal_bams): + + def create_sample_info( + self, + tumor_sample_id, + patient_id, + fillout_unfiltered_normals, + fillout_simplex_tumors, + fillout_duplex_tumors, + curated_normal_bams, + ): """ Query DB for all relevant files / metadata necessary for SNV pipeline input: @@ -519,7 +538,9 @@ def create_sample_info(self, tumor_sample_id, patient_id, fillout_unfiltered_nor """ request_id = self.request_id # Get the Matched, Unfiltered, Normal BAM - matched_normal_unfiltered_bam, matched_normal_unfiltered_id = get_unfiltered_matched_normal(patient_id, fillout_unfiltered_normals, request_id) + matched_normal_unfiltered_bam, matched_normal_unfiltered_id = get_unfiltered_matched_normal( + patient_id, fillout_unfiltered_normals, request_id + ) # Get genotyping bams for Unfiltered Normal samples from the same Study geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_ids = get_normal_geno_samples( @@ -530,7 +551,7 @@ def create_sample_info(self, tumor_sample_id, patient_id, fillout_unfiltered_nor tumor_sample_id, matched_normal_unfiltered_id, fillout_simplex_tumors, fillout_duplex_tumors ) capture_samples_duplex_sample_ids = [s.file_name.replace(DUPLEX_BAM_STEM, "") for s in geno_samples_duplex] - + capture_samples_simplex_sample_ids = [s.file_name.replace(SIMPLEX_BAM_STEM, "") for s in geno_samples_simplex] # Use sample IDs to remove duplicates from normal geno samples geno_samples_normal_unfiltered, geno_samples_normal_unfiltered_sample_ids = _remove_normal_dups( @@ -541,89 +562,87 @@ def create_sample_info(self, tumor_sample_id, patient_id, fillout_unfiltered_nor msg = "ACCESS SNV Operator Error: Duplex sample IDs not matched to Simplex sample IDs" raise Exception(msg) # Add in any DMP ACCESS samples - ( - dmp_matched_tumors_duplex, - dmp_matched_tumors_simplex - ) = get_dmp_matched_patient_geno_samples(patient_id) - #TODO not flipping file name - - geno_samples_duplex = geno_samples_duplex # + dmp_matched_tumors_duplex - geno_samples_simplex = geno_samples_simplex # + dmp_matched_tumors_simplex + (dmp_matched_tumors_duplex, dmp_matched_tumors_simplex) = get_dmp_matched_patient_geno_samples(patient_id) + # TODO not flipping file name + + geno_samples_duplex = geno_samples_duplex # + dmp_matched_tumors_duplex + geno_samples_simplex = geno_samples_simplex # + dmp_matched_tumors_simplex geno_samples = make_pairs(geno_samples_duplex, geno_samples_simplex) sample_info = { "matched_normal_unfiltered": [matched_normal_unfiltered_bam], "geno_samples": geno_samples, "geno_samples_normal_unfiltered": geno_samples_normal_unfiltered, - "curated_normal_bams": curated_normal_bams + "curated_normal_bams": curated_normal_bams, } return sample_info - + def mapping_bams(self, sample_info): # sample_id,normal_path,duplex_path,simplex_path,type # patient_id,sample_id,type,maf,standard_bam,standard_bai,duplex_bam,duplex_bai,simplex_bam,simplex_bai bams = [] aux_bams = [] for key, value in sample_info.items(): - for v in value: + for v in value: map = {} - if key == 'tumor_bam': - map['patient_id'] = 'null' - map['sample_id'] = v[0].file_name.replace(DUPLEX_BAM_STEM, '') - map['maf'] = 'null' - map['standard_bam'] = 'null' - map['standard_bai'] = 'null' - map['duplex_bam'] = _create_cwl_bam_object(v[0].path) - map['duplex_bai'] = _create_cwl_bam_object(v[0].path.replace('.bam','.bai')) - map['simplex_bam'] = _create_cwl_bam_object(v[1].path) - map['simplex_bai'] = _create_cwl_bam_object(v[1].path.replace('.bam','.bai')) - map['type'] = 'CASE' + if key == "tumor_bam": + map["patient_id"] = "null" + map["sample_id"] = v[0].file_name.replace(DUPLEX_BAM_STEM, "") + map["maf"] = "null" + map["standard_bam"] = "null" + map["standard_bai"] = "null" + map["duplex_bam"] = _create_cwl_bam_object(v[0].path) + map["duplex_bai"] = _create_cwl_bam_object(v[0].path.replace(".bam", ".bai")) + map["simplex_bam"] = _create_cwl_bam_object(v[1].path) + map["simplex_bai"] = _create_cwl_bam_object(v[1].path.replace(".bam", ".bai")) + map["type"] = "CASE" bams.append(map) - if key == 'normal_bam': - map['patient_id'] = 'null' - map['sample_id'] = v[0].file_name.replace("-TP_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam", '') - map['maf'] = 'null' - map['standard_bam'] = 'null' - map['standard_bai'] = 'null' - map['duplex_bam'] = _create_cwl_bam_object(v[0].path) - map['duplex_bai'] = _create_cwl_bam_object(v[0].path.replace('.bam','.bai')) - map['simplex_bam'] = _create_cwl_bam_object(v[1].path) - map['simplex_bai'] = _create_cwl_bam_object(v[1].path.replace('.bam','.bai')) - map['type'] = 'CONTROL' + if key == "normal_bam": + map["patient_id"] = "null" + map["sample_id"] = v[0].file_name.replace( + "-TP_cl_aln_srt_MD_IR_FX_BR__aln_srt_IR_FX-duplex.bam", "" + ) + map["maf"] = "null" + map["standard_bam"] = "null" + map["standard_bai"] = "null" + map["duplex_bam"] = _create_cwl_bam_object(v[0].path) + map["duplex_bai"] = _create_cwl_bam_object(v[0].path.replace(".bam", ".bai")) + map["simplex_bam"] = _create_cwl_bam_object(v[1].path) + map["simplex_bai"] = _create_cwl_bam_object(v[1].path.replace(".bam", ".bai")) + map["type"] = "CONTROL" bams.append(map) - if key == 'geno_samples': - #TODO jsut do the replace here + if key == "geno_samples": + # TODO jsut do the replace here sample_id = v[0].file_name.replace(DUPLEX_BAM_STEM, "") sample_id = sample_id.replace(DMP_DUPLEX_REGEX, "") - map['sample_id'] = sample_id - map['normal_path'] = 'null' - map['duplex_path'] = _create_cwl_bam_object(v[0].path) - map['simplex_path'] = _create_cwl_bam_object(v[1].path) - map['type'] = 'PLASMA' + map["sample_id"] = sample_id + map["normal_path"] = "null" + map["duplex_path"] = _create_cwl_bam_object(v[0].path) + map["simplex_path"] = _create_cwl_bam_object(v[1].path) + map["type"] = "PLASMA" aux_bams.append(map) - if key == 'geno_samples_normal_unfiltered': - map['sample_id'] = v.file_name.replace(UNCOLLAPSED_BAM_STEM, "") - map['normal_path'] = _create_cwl_bam_object(v.path) - map['duplex_path'] = 'null' - map['simplex_path'] = 'null' - map['type'] = 'UNMATCHED_NORMAL' + if key == "geno_samples_normal_unfiltered": + map["sample_id"] = v.file_name.replace(UNCOLLAPSED_BAM_STEM, "") + map["normal_path"] = _create_cwl_bam_object(v.path) + map["duplex_path"] = "null" + map["simplex_path"] = "null" + map["type"] = "UNMATCHED_NORMAL" aux_bams.append(map) - if key == 'curated_normal_bams': - map['sample_id'] = v[0].file_name.replace(DUPLEX_BAM_STEM, "") - map['normal_path'] = 'null' - map['duplex_path'] = _create_cwl_bam_object(v[0].path) - map['simplex_path'] = _create_cwl_bam_object(v[1].path) - map['type'] = 'CURATED' + if key == "curated_normal_bams": + map["sample_id"] = v[0].file_name.replace(DUPLEX_BAM_STEM, "") + map["normal_path"] = "null" + map["duplex_path"] = _create_cwl_bam_object(v[0].path) + map["simplex_path"] = _create_cwl_bam_object(v[1].path) + map["type"] = "CURATED" aux_bams.append(map) - if key == 'matched_normal_unfiltered': - map['sample_id'] = v.file_name.replace(UNCOLLAPSED_BAM_STEM, "") - map['normal_path'] = _create_cwl_bam_object(v.path) - map['duplex_path'] = 'null' - map['simplex_path'] = 'null' - map['type'] = 'MATCHED_NORMAL' + if key == "matched_normal_unfiltered": + map["sample_id"] = v.file_name.replace(UNCOLLAPSED_BAM_STEM, "") + map["normal_path"] = _create_cwl_bam_object(v.path) + map["duplex_path"] = "null" + map["simplex_path"] = "null" + map["type"] = "MATCHED_NORMAL" aux_bams.append(map) - return(bams, aux_bams) - + return (bams, aux_bams) def get_request_id_runs(self, app): """ @@ -635,13 +654,13 @@ def get_request_id_runs(self, app): # if not request_id: # request_id_runs = Run.objects.filter(pk__in=self.run_ids) # self.request_id = most_recent_runs_for_request[0].tags["igoRequestId"] - # else: + # else: most_recent_runs_for_request = ( Run.objects.filter( tags__igoRequestId=self.request_id, app__name__in=app, status=RunStatus.COMPLETED, - operator_run__status=RunStatus.COMPLETED + operator_run__status=RunStatus.COMPLETED, ) .order_by("-created_date") .first() @@ -651,7 +670,7 @@ def get_request_id_runs(self, app): raise Exception("No matching Nucleo runs found for request {}".format(self.request_id)) return most_recent_runs_for_request - + def get_jobs(self): """ get_job information tor run NucleoVar Pipeline @@ -672,32 +691,56 @@ def get_jobs(self): self.request_id = RunStatus[0].tags["igoRequestId"] else: runs = self.get_request_id_runs(["access v2 nucleo", "access nucleo"]) - + # TUMOR AND NORMAL BAMS from the request access v2 nucleo run bams = [] for run in runs: bams.append(self.find_request_bams(run)) # TUMOR - tumor_bams = [(b['fgbio_filter_consensus_reads_duplex_bam'], b['fgbio_postprocessing_simplex_bam']) for b in bams if is_tumor_bam(b['fgbio_filter_consensus_reads_duplex_bam'].file_name)] - - # FILLOUT NORMAL AND TUMOR - fillout_simplex_tumors = [b['fgbio_postprocessing_simplex_bam'] for b in bams if is_tumor_bam(b['fgbio_filter_consensus_reads_duplex_bam'].file_name)] - fillout_duplex_tumors = [b['fgbio_filter_consensus_reads_duplex_bam'] for b in bams if is_tumor_bam(b['fgbio_filter_consensus_reads_duplex_bam'].file_name)] - fillout_unfiltered_normals = [b['uncollapsed_bam'] for b in bams if not is_tumor_bam(b['uncollapsed_bam'].file_name)] - + tumor_bams = [ + (b["fgbio_filter_consensus_reads_duplex_bam"], b["fgbio_postprocessing_simplex_bam"]) + for b in bams + if is_tumor_bam(b["fgbio_filter_consensus_reads_duplex_bam"].file_name) + ] + + # FILLOUT NORMAL AND TUMOR + fillout_simplex_tumors = [ + b["fgbio_postprocessing_simplex_bam"] + for b in bams + if is_tumor_bam(b["fgbio_filter_consensus_reads_duplex_bam"].file_name) + ] + fillout_duplex_tumors = [ + b["fgbio_filter_consensus_reads_duplex_bam"] + for b in bams + if is_tumor_bam(b["fgbio_filter_consensus_reads_duplex_bam"].file_name) + ] + fillout_unfiltered_normals = [ + b["uncollapsed_bam"] for b in bams if not is_tumor_bam(b["uncollapsed_bam"].file_name) + ] + # NORMAL BAM - normal_bam = (File.objects.filter(file_name=ACCESS_DEFAULT_NORMAL_FILENAME_DUPLEX)[0], File.objects.filter(file_name=ACCESS_DEFAULT_NORMAL_FILENAME_SIMPLEX)[0]) - + normal_bam = ( + File.objects.filter(file_name=ACCESS_DEFAULT_NORMAL_FILENAME_DUPLEX)[0], + File.objects.filter(file_name=ACCESS_DEFAULT_NORMAL_FILENAME_SIMPLEX)[0], + ) + # CURATED NORMAL curated_normal_bams = self.find_curated_normal_bams() - # SAMPLE INFO + # SAMPLE INFO sample_infos = [] for d, s in tumor_bams: tumor_sample_id = d.file_name.replace(DUPLEX_BAM_STEM, "") patient_id = "-".join(tumor_sample_id.split("-")[0:2]) - sample_info = self.create_sample_info(tumor_sample_id, patient_id, fillout_unfiltered_normals, fillout_simplex_tumors, fillout_duplex_tumors, curated_normal_bams) + sample_info = self.create_sample_info( + tumor_sample_id, + patient_id, + fillout_unfiltered_normals, + fillout_simplex_tumors, + fillout_duplex_tumors, + curated_normal_bams, + ) sample_info["normal_bam"] = [normal_bam] sample_info["tumor_bam"] = [(d, s)] sample_infos.append(sample_info) @@ -716,32 +759,32 @@ def get_jobs(self): "fai": "/juno/work/access/production/resources/reference/current/Homo_sapiens_assembly19.fasta.fai", "dict": "/juno/work/access/production/resources/reference/current/Homo_sapiens_assembly19.dict", "canonical_bed": "/juno/work/access/production/resources/msk-access/v1.0/regions_of_interest/versions/v1.0/MSK-ACCESS-v1_0panelA_canonicaltargets_500buffer.bed", - "target_bed": '/juno/work/access/production/resources/msk-access/v1.0/regions_of_interest/versions/v1.0/MSK-ACCESS-v1_0panelA_canonicaltargets_500buffer.bed', + "target_bed": "/juno/work/access/production/resources/msk-access/v1.0/regions_of_interest/versions/v1.0/MSK-ACCESS-v1_0panelA_canonicaltargets_500buffer.bed", "rules_json": "/juno/work/access/production/resources/nucleovar/rules.json", - "header_file": '/juno/work/access/production/resources/nucleovar/mutect1_annotate_concat_header.txt', + "header_file": "/juno/work/access/production/resources/nucleovar/mutect1_annotate_concat_header.txt", "blocklist": "/juno/work/access/production/resources/nucleovar/access_blocklist.txt", "canonical_tx_ref": "/juno/work/access/production/resources/nucleovar/canonical_target_tx_ref.tsv", - "hotspots": '/juno/work/access/production/resources/nucleovar/hotspots.maf', - "annotator": "genomenexus" + "hotspots": "/juno/work/access/production/resources/nucleovar/hotspots.maf", + "annotator": "genomenexus", } sample_metadata = { - settings.PATIENT_ID_METADATA_KEY: patient_id, - settings.REQUEST_ID_METADATA_KEY: self.request_id, - settings.CMO_SAMPLE_NAME_METADATA_KEY: sample, + settings.PATIENT_ID_METADATA_KEY: patient_id, + settings.REQUEST_ID_METADATA_KEY: self.request_id, + settings.CMO_SAMPLE_NAME_METADATA_KEY: sample, } job_tags = { - "pipeline": pipeline.name, - "pipeline_version": pipeline.version, - settings.PATIENT_ID_METADATA_KEY: patient_id, - settings.REQUEST_ID_METADATA_KEY: self.request_id, - settings.CMO_SAMPLE_NAME_METADATA_KEY: sample, + "pipeline": pipeline.name, + "pipeline_version": pipeline.version, + settings.PATIENT_ID_METADATA_KEY: patient_id, + settings.REQUEST_ID_METADATA_KEY: self.request_id, + settings.CMO_SAMPLE_NAME_METADATA_KEY: sample, } job_json = { "name": "Nucleovar {sample}: {run_date}".format(sample=sample, run_date=run_date), "app": app, "inputs": input_json, "tags": job_tags, - "output_metadata": sample_metadata + "output_metadata": sample_metadata, } jobs.append(job_json) - return [RunCreator(**job) for job in jobs] \ No newline at end of file + return [RunCreator(**job) for job in jobs] diff --git a/runner/pipeline/nextflow/nextflow_resolver.py b/runner/pipeline/nextflow/nextflow_resolver.py index 8a51606f9..39c988a47 100755 --- a/runner/pipeline/nextflow/nextflow_resolver.py +++ b/runner/pipeline/nextflow/nextflow_resolver.py @@ -14,10 +14,10 @@ def resolve(self): if self.nfcore_template: # Check main schema for CLI inputs with open(os.path.join(location, "nextflow_schema.json"), "r") as f: - nextflow_schema = json.load(f) + nextflow_schema = json.load(f) inputs = self.schemas2template(nextflow_schema, location) - pipeline = {"inputs": inputs } - else: + pipeline = {"inputs": inputs} + else: with open(os.path.join(location, "inputs.template.json"), "r") as f: pipeline = json.load(f) self._cleanup(location) @@ -29,20 +29,24 @@ def schemas2template(self, nextflow_schema, location): reference = nextflow_schema["definitions"]["reference_genome_options"]["properties"] input = nextflow_schema["definitions"]["input_output_options"]["properties"] properties = {**reference, **input} - # Check for sample-sheet CLI inputs + # Check for sample-sheet CLI inputs samplesheets = [key for key, val in properties.items() if val.get("format") == "sample-sheet"] - inputs = [{'id': key, 'schema': {'type': val.get("format")}} for key, val in properties.items() if val.get("format") != "sample-sheet"] + inputs = [ + {"id": key, "schema": {"type": val.get("format")}} + for key, val in properties.items() + if val.get("format") != "sample-sheet" + ] # Check Assets for sample sheet schemas for schema in samplesheets: - with open(os.path.join(location, f'assets/schema_{schema}.json'), "r") as f: + with open(os.path.join(location, f"assets/schema_{schema}.json"), "r") as f: nextflow_schema = json.load(f) - samplesheet_props = nextflow_schema['items']["properties"] - fields = [{'id': key, 'type': val.get("format")} for key, val in samplesheet_props.items()] - header = '\t'.join([f['id'] for f in fields]) + '\n' - body_start = f'{{{{#{schema}}}}}\n' - body_end = f'\n{{{{/{schema}}}}}' - body = '\t'.join([f'{{{{{f["id"]}}}}}' for f in fields]) - template = header + body_start + body + body_end - samplesheet_input = {'id': schema, 'schema': {'items': {"fields": fields}}, 'template': template} + samplesheet_props = nextflow_schema["items"]["properties"] + fields = [{"id": key, "type": val.get("format")} for key, val in samplesheet_props.items()] + header = "\t".join([f["id"] for f in fields]) + "\n" + body_start = f"{{{{#{schema}}}}}\n" + body_end = f"\n{{{{/{schema}}}}}" + body = "\t".join([f'{{{{{f["id"]}}}}}' for f in fields]) + template = header + body_start + body + body_end + samplesheet_input = {"id": schema, "schema": {"items": {"fields": fields}}, "template": template} inputs.append(samplesheet_input) return inputs diff --git a/runner/pipeline/pipeline_cache.py b/runner/pipeline/pipeline_cache.py index d4245f0bb..c30649f13 100755 --- a/runner/pipeline/pipeline_cache.py +++ b/runner/pipeline/pipeline_cache.py @@ -25,7 +25,7 @@ def get_pipeline(pipeline): "github": pipeline.github, "entrypoint": pipeline.entrypoint, "version": pipeline.version, - "nfcore_template": pipeline.nfcore_template + "nfcore_template": pipeline.nfcore_template, }, ) return resolved_dict diff --git a/runner/run/objects/nextflow/nextflow_run_object.py b/runner/run/objects/nextflow/nextflow_run_object.py index 2a2a55fab..116f103ec 100755 --- a/runner/run/objects/nextflow/nextflow_run_object.py +++ b/runner/run/objects/nextflow/nextflow_run_object.py @@ -7,8 +7,10 @@ from runner.models import Run, RunStatus, Port, PortType, ProtocolType from runner.run.objects.nextflow.nextflow_port_object import NextflowPortObject from runner.exceptions import PortProcessorException, RunCreateException, RunObjectConstructException + logger = logging.getLogger(__name__) + class NextflowRunObject(RunObject): # logger = logging.getLogger(__name__) @@ -176,7 +178,7 @@ def dump_job(self, output_directory=None, log_directory=None): "repository": self.run_obj.app.github, "entrypoint": self.run_obj.app.entrypoint, "version": self.run_obj.app.version, - "nfcore_template": self.run_obj.app.nfcore_template + "nfcore_template": self.run_obj.app.nfcore_template, } } inputs = dict() From 5853822f98698384139ba04311dfd508bded2e87 Mon Sep 17 00:00:00 2001 From: buehlere Date: Mon, 21 Oct 2024 14:25:59 -0400 Subject: [PATCH 21/28] linting and test update --- runner/migrations/0062_pipeline_nfcore_template.py | 6 +++--- runner/operator/access/v2_1_0/nucleovar/__init__.py | 2 +- runner/tests/run/test_nextflow_run.py | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/runner/migrations/0062_pipeline_nfcore_template.py b/runner/migrations/0062_pipeline_nfcore_template.py index 5736a57f2..192077457 100644 --- a/runner/migrations/0062_pipeline_nfcore_template.py +++ b/runner/migrations/0062_pipeline_nfcore_template.py @@ -6,13 +6,13 @@ class Migration(migrations.Migration): dependencies = [ - ('runner', '0061_auto_20240820_1837'), + ("runner", "0061_auto_20240820_1837"), ] operations = [ migrations.AddField( - model_name='pipeline', - name='nfcore_template', + model_name="pipeline", + name="nfcore_template", field=models.BooleanField(default=False), ), ] diff --git a/runner/operator/access/v2_1_0/nucleovar/__init__.py b/runner/operator/access/v2_1_0/nucleovar/__init__.py index 98e9f24dd..8761bf1fd 100644 --- a/runner/operator/access/v2_1_0/nucleovar/__init__.py +++ b/runner/operator/access/v2_1_0/nucleovar/__init__.py @@ -1 +1 @@ -from .access_nucleovar import NucleoVarOperator \ No newline at end of file +from .access_nucleovar import NucleoVarOperator diff --git a/runner/tests/run/test_nextflow_run.py b/runner/tests/run/test_nextflow_run.py index 5bcc4e953..792f9aa9a 100644 --- a/runner/tests/run/test_nextflow_run.py +++ b/runner/tests/run/test_nextflow_run.py @@ -17,7 +17,7 @@ def setUp(self): entrypoint="pipeline.nf", output_file_group=self.nxf_file_group, output_directory="/output/directory/", - config="TEST CONFIG", + config="TEST CONFIG" ) config = """process {\r\n memory = \"8.GB\"\r\n time = { task.attempt < 3 ? 3.h * task.attempt : 500.h }\r\n clusterOptions = \"\"\r\n scratch = true\r\n beforeScript = \". \/etc\/profile.d\/modules.sh; module load singularity\/3.1.1; unset R_LIBS; catch_term () { echo 'caught USR2\/TERM signal'; set +e; false; on_exit ; } ; trap catch_term USR2 TERM\"\r\n}\r\n\r\nparams {\r\n fileTracking = \"{{output_directory}}\"\r\n}""" self.pipeline_config = Pipeline.objects.create( @@ -190,6 +190,7 @@ def test_run_dump_job(self, get_pipeline): "repository": "https://github.com/nextflow_pipeline", "entrypoint": "pipeline.nf", "version": "1.0.0", + "nfcore_template": False } }, ) From f26384497fb9174319ef60a78aa30becc202eec9 Mon Sep 17 00:00:00 2001 From: buehlere Date: Mon, 21 Oct 2024 14:47:23 -0400 Subject: [PATCH 22/28] Update test_nextflow_run.py --- runner/tests/run/test_nextflow_run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runner/tests/run/test_nextflow_run.py b/runner/tests/run/test_nextflow_run.py index 792f9aa9a..8ed46cae2 100644 --- a/runner/tests/run/test_nextflow_run.py +++ b/runner/tests/run/test_nextflow_run.py @@ -17,7 +17,7 @@ def setUp(self): entrypoint="pipeline.nf", output_file_group=self.nxf_file_group, output_directory="/output/directory/", - config="TEST CONFIG" + config="TEST CONFIG", ) config = """process {\r\n memory = \"8.GB\"\r\n time = { task.attempt < 3 ? 3.h * task.attempt : 500.h }\r\n clusterOptions = \"\"\r\n scratch = true\r\n beforeScript = \". \/etc\/profile.d\/modules.sh; module load singularity\/3.1.1; unset R_LIBS; catch_term () { echo 'caught USR2\/TERM signal'; set +e; false; on_exit ; } ; trap catch_term USR2 TERM\"\r\n}\r\n\r\nparams {\r\n fileTracking = \"{{output_directory}}\"\r\n}""" self.pipeline_config = Pipeline.objects.create( @@ -190,7 +190,7 @@ def test_run_dump_job(self, get_pipeline): "repository": "https://github.com/nextflow_pipeline", "entrypoint": "pipeline.nf", "version": "1.0.0", - "nfcore_template": False + "nfcore_template": False, } }, ) From 57fb7cdac970c79b9dcabc92f91ee018327928d8 Mon Sep 17 00:00:00 2001 From: buehlere Date: Mon, 21 Oct 2024 14:52:53 -0400 Subject: [PATCH 23/28] Update access_nucleovar.py --- runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py | 1 - 1 file changed, 1 deletion(-) diff --git a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py index fe2a212a3..b622b1233 100755 --- a/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py +++ b/runner/operator/access/v2_1_0/nucleovar/access_nucleovar.py @@ -477,7 +477,6 @@ def get_dmp_matched_patient_geno_samples(patient_id): class NucleoVarOperator(Operator): - def find_request_bams(self, run): """ Find simplex and duplex bams from a request's nucleo run From bf841b8fcac341f4d73a87944511532fa3e7ed27 Mon Sep 17 00:00:00 2001 From: voyager pipeline user Date: Wed, 30 Oct 2024 14:48:33 -0400 Subject: [PATCH 24/28] updating resolvers --- runner/pipeline/cwl/cwl_resolver.py | 2 +- runner/pipeline/nextflow/nextflow_resolver.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) mode change 100644 => 100755 runner/pipeline/cwl/cwl_resolver.py diff --git a/runner/pipeline/cwl/cwl_resolver.py b/runner/pipeline/cwl/cwl_resolver.py old mode 100644 new mode 100755 index 089549821..98578c46e --- a/runner/pipeline/cwl/cwl_resolver.py +++ b/runner/pipeline/cwl/cwl_resolver.py @@ -7,7 +7,7 @@ class CWLResolver(PipelineResolver): - def __init__(self, github, entrypoint, version=None): + def __init__(self, github, entrypoint, version=None, nfcore_template=None): super().__init__(github, entrypoint, version) def resolve(self): diff --git a/runner/pipeline/nextflow/nextflow_resolver.py b/runner/pipeline/nextflow/nextflow_resolver.py index 39c988a47..ab520425a 100755 --- a/runner/pipeline/nextflow/nextflow_resolver.py +++ b/runner/pipeline/nextflow/nextflow_resolver.py @@ -4,7 +4,7 @@ class NextflowResolver(PipelineResolver): - def __init__(self, github, entrypoint, nfcore_template, version=None): + def __init__(self, github, entrypoint, version=None, nfcore_template=None): super().__init__(github, entrypoint, version) self.nfcore_template = nfcore_template From a18b2fd1009918cc8305c45c9b5eecdeeb3157a2 Mon Sep 17 00:00:00 2001 From: voyager pipeline user Date: Wed, 30 Oct 2024 15:14:51 -0400 Subject: [PATCH 25/28] remove log --- runner/run/objects/nextflow/nextflow_run_object.py | 1 - 1 file changed, 1 deletion(-) diff --git a/runner/run/objects/nextflow/nextflow_run_object.py b/runner/run/objects/nextflow/nextflow_run_object.py index 116f103ec..77c8570bb 100755 --- a/runner/run/objects/nextflow/nextflow_run_object.py +++ b/runner/run/objects/nextflow/nextflow_run_object.py @@ -12,7 +12,6 @@ class NextflowRunObject(RunObject): - # logger = logging.getLogger(__name__) def __init__( self, From d8dc8543a8175a6c08e55c34e621729afb03fc34 Mon Sep 17 00:00:00 2001 From: buehlere Date: Wed, 30 Oct 2024 15:16:11 -0400 Subject: [PATCH 26/28] linting --- runner/run/objects/nextflow/nextflow_run_object.py | 1 - 1 file changed, 1 deletion(-) diff --git a/runner/run/objects/nextflow/nextflow_run_object.py b/runner/run/objects/nextflow/nextflow_run_object.py index 77c8570bb..39682fe4f 100755 --- a/runner/run/objects/nextflow/nextflow_run_object.py +++ b/runner/run/objects/nextflow/nextflow_run_object.py @@ -12,7 +12,6 @@ class NextflowRunObject(RunObject): - def __init__( self, run_id, From 91a9bff9bf583ddcff9e1287dae3ef6b93f52fc1 Mon Sep 17 00:00:00 2001 From: voyager pipeline user Date: Wed, 30 Oct 2024 16:36:22 -0400 Subject: [PATCH 27/28] correcting nfcore template order --- runner/pipeline/pipeline_cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/pipeline/pipeline_cache.py b/runner/pipeline/pipeline_cache.py index c30649f13..fcfcff340 100755 --- a/runner/pipeline/pipeline_cache.py +++ b/runner/pipeline/pipeline_cache.py @@ -16,7 +16,7 @@ def get_pipeline(pipeline): resolved_dict = _pipeline.get("app") else: resolver_class = PipelineCache._get_pipeline_resolver(pipeline.pipeline_type) - resolver = resolver_class(pipeline.github, pipeline.entrypoint, pipeline.nfcore_template, pipeline.version) + resolver = resolver_class(pipeline.github, pipeline.entrypoint, pipeline.version, pipeline.nfcore_template) resolved_dict = resolver.resolve() cache.set( pipeline.id, From c8b31c6571b8362d823bfc82173cd84fb60589d3 Mon Sep 17 00:00:00 2001 From: voyager pipeline user Date: Wed, 30 Oct 2024 16:40:20 -0400 Subject: [PATCH 28/28] adding reactive nfcore checkbox --- .../templates/admin/runner/change_form.html | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100755 runner/templates/admin/runner/change_form.html diff --git a/runner/templates/admin/runner/change_form.html b/runner/templates/admin/runner/change_form.html new file mode 100755 index 000000000..52a7d1f63 --- /dev/null +++ b/runner/templates/admin/runner/change_form.html @@ -0,0 +1,25 @@ +{% extends "admin/change_form.html" %} + +{% block extrahead %} + {{ block.super }} {# This keeps all the existing head elements from the parent template #} + +{% endblock %}