-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline.py
1171 lines (1026 loc) · 65 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
import argparse
import datetime
import multiprocessing
import os
import re
import shutil
import subprocess
import sys
import math
import psutil
import joblib
import pysam
from Bio import SeqIO
# Authorship information
__author__ = "Kemal İnecik"
__license__ = "GPLv3"
__version__ = "0.2"
__maintainer__ = "Kemal İnecik"
__email__ = "[email protected]"
__status__ = "Development"
def main():
"""
If running the module (the source file) as the main program, this function will be called.
:return: None
"""
a = argument_parser() # Get the command line arguments
# Create the job dictionary by using the introductory txt file.
job_list = JobList(a.filepath)
# Make sure the process will be run with correct settings by making it confirmed by the user.
job_list.confirm_job_list()
print(f"{Col.H}Operation started.{os.linesep}{Col.E}") # Print that the processing has just started.
# Create a controller object, which is the main functional object processing the riboseq data.
controller = Controller(a.identifier, a.organism, a.ensembl_release, a.cpu, a.temp,
a.output, a.assignment, job_list.jobs)
controller.start_processing() # Start processing by explicitly calling it.
print(f"{Col.H}Operation successfully ended.{os.linesep}{Col.E}") # Print that the processing has just ended.
def argument_parser() -> argparse.Namespace:
"""
Extract the information given by the user via command line. It also contains the explanation of the flags and
the program itself
:return: NameSpace object containing parsed arguments.
"""
parser = argparse.ArgumentParser(prog="pipeline.py", # Name of the program
description="Deep sequencing data processing pipeline.") # todo
# Introduce arguments, they are already self explanatory.
parser.add_argument("-r", type=str, dest="identifier",
help="the identifier for this run, which will be the directory name for all outputs under "
"provided main output directory. If skipped, the default is a string containing "
"date and time of the program start.",
required=False, default=datetime.datetime.now().strftime("Run_on_%Y_%m_%d_at_%H_%M_%S"))
parser.add_argument("-a", type=str, dest="organism",
help="organism of interest for the analysis. If skipped, the default value is homo_sapiens.",
required=False, default="homo_sapiens",
choices=["homo_sapiens", "mus_musculus", "saccharomyces_cerevisiae", "escherichia_coli"],
)
parser.add_argument("-e", type=int, dest="ensembl_release",
help="ensembl version to be used. Ignored if the 'organism' does not necessitates Ensembl "
"release information. Default value is 102. For E. coli, it must be 48.",
required=False, default=102)
parser.add_argument("-c", type=int, dest="cpu",
help="number of cpu cores to be used. Default value is maximum minus eight.",
required=False, default=multiprocessing.cpu_count() - 8)
parser.add_argument("-s", type=int, dest="assignment",
help="select 3' or 5' assignment for Julia script.",
required=False, default=3,
choices=[3, 5])
parser.add_argument("-f", type=str, required=True, dest="filepath",
help="path of the txt file which contains the task list.")
parser.add_argument("-t", type=str, required=True, dest="temp",
help="absolute path of the directory to be used for temporary files such as genome indexes.")
parser.add_argument("-o", type=str, required=True, dest="output",
help="absolute path of the directory to be used for output files.")
return parser.parse_args() # Return the parsed result.
class JobList:
"""
Processes the introductory txt file, raises errors if the input is not fine, creates a dictionary with the content,
also makes the result confirmed by the user.
"""
# Steps of the program. Changing the numbers here potentially makes whole pipeline to malfunction. If you will add
# any additional step, please take this into consideration. Either add the new step after 5, or just edit whole
# script properly.
process_map = {1: "01_Preprocessing",
2: "02_Cleanup",
3: "03_LinkingPairs",
4: "04_GenomeAlignment",
5: "05_JuliaAssignment"}
# Default values are based on the data published on Matilde et. al 2021.
# Adapter sequences
default_adapter_single = "ATCGTAGATCGGAAGAGCACACGTCTGAACTCCAGTCAC"
default_adapter1_paired = "ATCGTAGATCGGAAGAGCACACGTCTGAACTCCAGTCAC"
default_adapter2_paired = None # None, if no adapter trimming is required.
# UMI patterns, the below patterns are regex strings.
default_pattern_single = "^(?P<umi_1>.{2}).*(?P<umi_2>.{5})$"
default_pattern1_paired = "^(?P<umi_1>.{2}).*"
default_pattern2_paired = "^(?P<discard_2>.{5})(?P<umi_2>.{5}).*"
# Default processing steps, the numbers corresponds to the steps of process_map dictionary above.
default_processes_single = [1, 2, 4, 5]
default_processes_paired_linking = [1, 2, 3, 4, 5]
default_processes_paired = [1, 2, 4]
# For now, the only valid input for raw processing files are fastq.gz. Other file formats were not tested.
proper_input_extensions = ["fastq.gz"]
def __init__(self, user_task_file: str):
"""
Initiates the object.
:param user_task_file: Absolute or relative path of introductory txt file
"""
self.user_task_file = user_task_file
self.jobs = self.read_job_list(self.user_task_file)
@staticmethod
def read_job_list(file_path: str) -> dict:
"""
Processes the introductory txt file, raises errors if the input file is not in the allowed format, creates a
dictionary with the file content.
:param file_path: Absolute or relative path of introductory txt file
:return: Job IDs as the keys relevant information as the values.
"""
result = dict() # Create a dictionary to populate with jobs
with open(file_path, "r") as handle: # Open the file in read only mode.
# Read all the data in the file. Split the content first with '>>>' to separate out the jobs. Skip comments
content = f"{os.linesep}".join([line.strip() for line in handle if not line.startswith('#')])
for entry in [r.strip() for r in content.split(">>>") if r.strip()]: # For each job
# Separate out the lines in a given job by new line character.
# If it starts with '#' just ignore this line
line = [r.strip() for r in entry.split(os.linesep) if r.strip()]
print_error_entry = Col.B + os.linesep.join(line) + Col.E # A string to be printed in case of an error.
# Do not allow whitespace inside an information. Note that lines were already striped above.
if not all([len(i.split()) == 1 for i in line]):
raise ValueError(f"No whitspace is allowed:{os.linesep}{print_error_entry}")
# Make sure the second line (first starts with '>>>') is one of below list.
if line[1] not in ["single", "paired_linking", "paired"]:
raise ValueError(f"Improper sequencing method:{os.linesep}{print_error_entry}")
# Make sure input raw file is exists, readable, and with one of the allowed extensions.
if not check_file(line[2], JobList.proper_input_extensions):
raise ValueError(f"Improper file or file extension method:{os.linesep}{print_error_entry}")
# Make sure that input raw file path is absolute.
if not os.path.isabs(line[2]):
raise ValueError(f"File paths have to be absolute:{os.linesep}{print_error_entry}")
# Make sure input raw file is exists, readable, and with one of the allowed extensions.
if line[1].startswith("paired") and not check_file(line[3], JobList.proper_input_extensions):
raise ValueError(f"Improper file or file extension method:{os.linesep}{print_error_entry}")
# Make sure that input raw file path is absolute.
if line[1].startswith("paired") and not os.path.isabs(line[3]):
raise ValueError(f"File paths have to be absolute:{os.linesep}{print_error_entry}")
# If the job is for single end sequencing. Initiate the job dictionary with default elements.
if line[1] == "single":
temp_dict = {"sequencing_method": line[1],
"input_fastq": line[2],
"adapter": JobList.default_adapter_single,
"pattern_umi": JobList.default_pattern_single,
"processes": JobList.default_processes_single}
else: # If the job is for paired end sequencing. Initiate the job dictionary with default elements.
# The 'paired_linking' and 'paired' needs to be different in processes parameter, since
# paired_linking requires an extra LinkingPairs step.
process_temp = JobList.default_processes_paired if line[1] == "paired" \
else JobList.default_processes_paired_linking
temp_dict = {"sequencing_method": line[1],
"input_fastq": [line[2], line[3]],
"adapter1": JobList.default_adapter1_paired,
"adapter2": JobList.default_adapter2_paired,
"pattern_umi1": JobList.default_pattern1_paired,
"pattern_umi2": JobList.default_pattern2_paired,
"processes": process_temp}
# If the user want to change adapters, umi, or processes, the lines after 3 for single end sequencing,
# and the lines after 4 for paired end sequencing will be populated.
non_default_entries = line[3:] if line[1] == "single" else line[4:]
# Allowed keys for the non default entries
if line[1] == "single":
possible_settings = ["adapter", "pattern_umi", "processes"]
else:
possible_settings = ["adapter1", "adapter2", "pattern_umi1", "pattern_umi2", "processes"]
for nde in non_default_entries: # For each non default entry
# First split with '=', and strip the output.
des = [r.strip() for r in nde.split("=") if r.strip()]
if len(des) != 2 or des[0] not in possible_settings: # Ignore if setting is not with allowed keys.
# If there is two '=' there will be more than 2 elements after split. Alternatively, if there
# is no '=' or one side of the '=' is not populated, there will be one element.
print(f"{Col.W}Manual setting ignored for {line[0]}:{os.linesep}{des}{Col.E}")
elif des[0] == "processes": # If the user wants to customize processes.
# Split by ',' and convert everything into integers. Error will be raised if it is not possible.
temp_dict[des[0]] = sorted([int(r.strip()) for r in des[1].split(',') if r.strip()])
else: # For other customizations.
# Just get the value directly if the value is not 'None'. If 'None', set the value to None.
temp_dict[des[0]] = des[1] if des[1] != "None" else None
# Make sure that all entries are unique
assert line[0] not in result, f"Duplicated key for {line[0]}"
# Add the dictionary for the individual job to the dictionary of all jobs.
result[line[0]] = temp_dict
return result # Return the final dictionary.
def confirm_job_list(self):
"""
Method to make sure the processing the txt file is correct. It prints out the jobs and request confirmation
by the user. If the user types anything except 'enter', the program will be aborted.
:return: None
"""
# Print the instruction.
print(f"{Col.H}Please confirm the jobs.{os.linesep}"
f"Press enter to confirm, type anything to stop.{os.linesep}"
f"There are {len(self.jobs)} jobs:{Col.E}{os.linesep}")
for job_id in self.jobs: # For all jobs in the job list.
the_job = self.jobs[job_id] # Get the job information dictionary.
# Convert the processing into str with comma as the delimiter to be printed out
processes_pre = ", ".join([str(i) for i in the_job['processes']])
if the_job["sequencing_method"] == "single": # If the job is a single end sequencing job.
# If the default values are used, just add 'Default'. Otherwise, print the information as it is.
adapter_temp = the_job["adapter"] if the_job["adapter"] != JobList.default_adapter_single \
else f"Default (\"{the_job['adapter']}\")"
pattern_temp = the_job["pattern_umi"] if the_job["pattern_umi"] != JobList.default_pattern_single \
else f"Default (\"{the_job['pattern_umi']}\")"
processes_temp = processes_pre if the_job["processes"] != JobList.default_processes_single \
else f"Default (\"{processes_pre}\")"
# Prepare a string to be printed out in the command line
print_string = (
f"{Col.H}> {job_id}{Col.C}{os.linesep}" # Job ID
f"Sequencing Method : {the_job['sequencing_method']}{os.linesep}"
f"Read : {the_job['input_fastq']}{os.linesep}"
f"Adapter : {adapter_temp}{os.linesep}"
f"Pattern UMI : {pattern_temp}{os.linesep}"
f"Processes : {processes_temp}{Col.E}{os.linesep}"
)
else: # If the job is a paired end sequencing job.
# If the default values are used, just add 'Default'. Otherwise, print the information as it is.
adapter1_temp = the_job["adapter1"] if the_job["adapter1"] != JobList.default_adapter1_paired \
else f"Default (\"{the_job['adapter1']}\")"
adapter2_temp = the_job["adapter2"] if the_job["adapter2"] != JobList.default_adapter2_paired \
else f"Default (\"{the_job['adapter2']}\")"
pattern1_temp = the_job["pattern_umi1"] if the_job["pattern_umi1"] != JobList.default_pattern1_paired \
else f"Default (\"{the_job['pattern_umi1']}\")"
pattern2_temp = the_job["pattern_umi2"] if the_job["pattern_umi2"] != JobList.default_pattern2_paired \
else f"Default (\"{the_job['pattern_umi2']}\")"
processes_method = JobList.default_processes_paired if the_job["sequencing_method"] == "paired" \
else JobList.default_processes_paired_linking
processes_temp = processes_pre if the_job["processes"] != processes_method \
else f"Default (\"{processes_pre}\")"
# Prepare a string to be printed out in the command line
print_string = (
f"{Col.H}> {job_id}{Col.C}{os.linesep}" # Job ID
f"Sequencing Method : {the_job['sequencing_method']}{os.linesep}"
f"Read 1 : {the_job['input_fastq'][0]}{os.linesep}"
f"Read 2 : {the_job['input_fastq'][1]}{os.linesep}"
f"Adapter 1 : {adapter1_temp}{os.linesep}"
f"Adapter 2 : {adapter2_temp}{os.linesep}"
f"Pattern UMI 1 : {pattern1_temp}{os.linesep}"
f"Pattern UMI 2 : {pattern2_temp}{os.linesep}"
f"Processes : {processes_temp}{Col.E}{os.linesep}"
)
confirm = input(print_string) # Request the user's response
if confirm != "": # If not pressed anything except directly 'enter'
print(f"{Col.F}Process terminated.{Col.E}")
sys.exit(1) # Terminate the process with an error code 1.
else: # Otherwise just move to the next job.
print(f"{Col.W}Confirmed.{os.linesep}{Col.E}")
class Controller:
def __init__(self, run_identifier: str, organism: str, ensembl_release: int, cpu_cores: int,
temp_repo_dir: str, data_repo_dir: str, assign_from: int, jobs: dict):
# First check if the directories for temp and output, make sure they exist and writable.
check_directory([temp_repo_dir, data_repo_dir])
# Make sure the below third party packages are installed.
# Systems samtools will be used because the conda distribution raises some errors.
check_exist_package(["cutadapt", "umi_tools", "bowtie2-build", "STAR", "bowtie2", "/usr/bin/samtools"])
# TODO: Versioning information will be also checked.
# Assign the parameters as object variables
self.temp_repo_dir = temp_repo_dir
self.data_repo_dir = data_repo_dir
self.organism = organism
self.ensembl_release = ensembl_release
self.jobs = jobs
self.run_identifier = run_identifier
self.cpu = cpu_cores
self.assign_from = assign_from
# Make sure julia assignment script is in the same directory with this script.
self.julia_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "julia_assignment.jl")
assert check_file(self.julia_path, [".jl"]), f"JuliaAssignment script could not be found.{os.linesep}:" \
f"{self.julia_path}"
# Create an OrganismDatabase object with designated parameters.
self.org_db = OrganismDatabase(self.organism, self.ensembl_release, self.temp_repo_dir)
# Call the method to create the folders under data_repo_dir to save the final and intermediate results.
self.create_output_tree()
# Call the method to create index files for genome, rRNA and transcriptome alignments. Note that only
# pair_linking uses transcriptome indexes but this will be created anyway.
self.index_directories = self.create_index()
def is_already_calculated(self):
pass # Not implemented yet!
# The function decides whether a process for current settings and for the same fastq file was already carried
# out in the past. If so, just skip the process and move on.
def start_processing(self):
"""
Runs the processing for all steps.
:return: None
"""
self.preprocessing()
self.cleanup()
self.linking_pairs()
self.genome_alignment()
self.julia_assignment()
# Also save the Controller object as a joblib file, to be able to see running parameters in the future.
joblib.dump(self, os.path.join(self.data_repo_dir, self.run_identifier, "pipeline_controller.joblib"))
# TODO: Save the pipeline.py and julia script into the output folder.
# TODO: Fastqc fastq analysis tool should be implemented here.
def julia_assignment(self):
"""
For the jobs which is set to be processed, run julia assignment.
:return: None
"""
gff_path = self.julia_assignment_gff3_correct() # Get GFF file path
for job_id in self.jobs: # Iterate over the jobs
if 5 in self.jobs[job_id]["processes"]: # If it is set to be processed by this step.
self.julia_assignment_one_job(job_id, gff_path) # Run the processing for this job.
# The last step of the pipeline, no output path is returned.
def julia_assignment_one_job(self, job_id: str, gff_path: str):
"""
Runs the Ilia's Julia assignment script with proper settings.
:param job_id: The job id, which exist in the self.jobs dictionary.
:param gff_path: Absolute path of corrected GFF file.
:return: None
"""
jobbing = self.jobs[job_id] # Get the job from the dictionary containing all other jobs.
job_dir = jobbing["processes_dirs"][5] # Directory for Julia script [5] outputs.
input_sam = self.jobs[job_id]["process_genome_alignment"] # Get the SAM file to be used for the assignment
assert JobList.process_map[5].endswith("JuliaAssignment") # Make sure '5' is for JuliaAssignment process.
run_and_check(( # Execute the following string through the shell.
f"cd {job_dir}; " # Change the directory to the directory of JuliaAssignment
f"{shutil.which('julia')} {self.julia_path} " # Which Julia installation and the script to use
f"-g {gff_path} " # Gff3 file. Removed of duplicated gene names by a function in this pipeline.
f"-a {self.assign_from} " # Assignment from 3' or 5', which is an input for this program.
# "-u " # Inherited from Matilde et al 2021. Removed here because umi-tool deduplication was already done.
f"-o {job_dir} {input_sam}" # Designate output file & input file
), shell=True)
# The last step of the pipeline, no output path is returned.
def julia_assignment_gff3_correct(self) -> str:
"""
For GFF3 file to work by the Ilia's tool, it is a must to have un-duplicated gene names. If you have duplicated
lines of the gene names, then the assignment tool of Ilia's will raise an error. The reason for duplicate gene
name is related to how this annotation sources operates. Some annotation source, for example ensembl, uses the
gene id as the identifier for each gene. So there is no duplicated gene id in ensembl. However, in rare cases, a
gene name is attributed to more than one gene id. As a result, you have more than one gene name in GFF3 file. As
far as I could test, the genes names are not duplicated in refseq annotation. Since the Ilia's tool was based on
refseq annotation, it does not raise any error. In short, this function basically renames the gene names
which is duplicated several times in the GFF3 file.
:return: Absolute path of the corrected GFF3 file.
"""
gff_path = self.org_db.get_db("gff3") # Get the path of input, raw GFF3 file.
output_path = os.path.splitext(gff_path)[0] + "_renamed_duplicate_gene_names.gff3" # The final file name
try:
if not os.access(output_path, os.R_OK) or not os.path.isfile(output_path):
with open(gff_path, "r") as gff_handle: # "GFF3 is imported to the random access memory."
gff_raw = gff_handle.readlines()
n_map = dict() # Gene names are read.
for entry in gff_raw:
if not entry.startswith('#'):
entry = entry.strip().split('\t')
attributes = dict([i.split('=') for i in entry[8].split(';')])
if entry[2] == "gene":
gene_name = attributes['Name']
gene_id = attributes['ID'].split(':')[1]
if gene_name not in n_map:
n_map[gene_name] = [gene_id]
else:
n_map[gene_name].append(gene_id)
duplicated_gene_name = dict() # Duplicated gene names are detected.
for i in n_map:
if len(n_map[i]) > 1:
duplicated_gene_name[i] = 0
# Output is written by changing the names of duplicated gene names.
with open(output_path, "w") as output_handle:
for entry in gff_raw:
if entry.startswith('#'):
output_handle.write(entry)
else:
entry_split = entry.strip().split('\t')
attributes = dict([i.split('=') for i in entry_split[8].split(';')])
if entry_split[2] == "gene" and attributes['Name'] in duplicated_gene_name:
the_name = attributes['Name']
duplicated_gene_name[the_name] += 1
new_name = the_name + f"_duplicated_{duplicated_gene_name[the_name]}"
new_entry = entry.replace(f";Name={the_name};", f";Name={new_name};")
output_handle.write(new_entry)
else:
output_handle.write(entry)
return output_path
except KeyError:
return gff_path
def genome_alignment(self):
for job_id in self.jobs:
if 4 in self.jobs[job_id]["processes"]: # todo: önceki step yoksa olamıyor olsun
self.genome_alignment_one_job(job_id)
else:
self.jobs[job_id]["process_genome_alignment"] = self.jobs[job_id]["process_linking_pairs"]
def genome_alignment_one_job(self, job_id):
jobbing = self.jobs[job_id]
job_dir = jobbing["processes_dirs"][4] # 4'i açıkla
input_fastq_fasta = self.jobs[job_id]["process_linking_pairs"]
assert JobList.process_map[4].endswith("GenomeAlignment")
prefix = "genome_alignment_"
if jobbing["sequencing_method"] in ["paired", "paired_linking"] \
and (jobbing["pattern_umi1"] or jobbing["pattern_umi1"]):
self.jobs[job_id]["is_umi_extracted"] = True
elif jobbing["sequencing_method"] == "single" and jobbing["pattern_umi"]:
self.jobs[job_id]["is_umi_extracted"] = True
else:
self.jobs[job_id]["is_umi_extracted"] = False
available_memory = int(psutil.virtual_memory().available * 9 / 10)
if jobbing["sequencing_method"] in ["paired"]: # note it down, not paired-linking
print(f"Genome alignment for {job_id} is now running.")
run_and_check((
f"cd {job_dir}; " # Change the directory to the index directory
f"{shutil.which('STAR')} " # Define which star installation to use
f"--runThreadN {self.cpu} " # Define how many core to be used. All cores are now using
f"--genomeDir {self.index_directories['index_dna']} "
f"--readFilesIn {input_fastq_fasta[0]} {input_fastq_fasta[1]} "
# All parameters were inherited from Mati-Kai's pipeline.
"--outFilterMultimapNmax 1 "
"--peOverlapNbasesMin 6 "
"--peOverlapMMp 0.1 "
"--outFilterType BySJout "
"--alignIntronMin 5 "
f"--outFileNamePrefix {os.path.join(job_dir, prefix)} "
"--outReadsUnmapped Fastx "
"--outSAMtype BAM SortedByCoordinate "
f"--limitBAMsortRAM {available_memory} " # If not set, error can be raised for some cases.
"--outSAMattributes All XS "
"--quantMode GeneCounts "
"--twopassMode Basic "
"> report_genome_alignment.log"
), shell=True) # The specified command will be executed through the shell.
raw_bam = os.path.join(job_dir, prefix + "Aligned.sortedByCoord.out.bam")
if not jobbing["is_umi_extracted"]:
raw_sam = os.path.join(job_dir, prefix + "Aligned.sortedByCoord.out.sam")
run_and_check(f"cd {job_dir}; {shutil.which('/usr/bin/samtools')} view -h -o {raw_sam} {raw_bam}",
shell=True) # The specified command will be executed through the shell.
self.jobs[job_id]["process_genome_alignment"] = raw_sam
else:
after_sort = os.path.splitext(raw_bam)[0] + '_sorted.bam'
run_and_check(( # Sort bam file and Index
f"cd {job_dir}; "
f"{shutil.which('/usr/bin/samtools')} sort {raw_bam} -o {after_sort}; "
f"{shutil.which('/usr/bin/samtools')} index {after_sort}"
), shell=True) # The specified command will be executed through the shell.
# Deduplication of UMI
output_prefix = "umi-deduplicated"
print(f"UMI deduplication for {job_id} is now running.")
run_and_check(( # Run deduplication
f"cd {job_dir}; "
f"{shutil.which('umi_tools')} dedup " # Define which umi_tools installation to use
f"-I {after_sort} " # Input
f"--output-stats={output_prefix} "
f"--paired "
f"--unpaired-reads discard "
f"-S {output_prefix}.bam "
f"> report_{output_prefix}.log"
), shell=True) # The specified command will be executed through the shell.
run_and_check(( # Convert to sam file
f"cd {job_dir}; "
f"{shutil.which('/usr/bin/samtools')} view -h " # Define which samtools installation to use
f"-o {output_prefix}.sam {output_prefix}.bam" # Output & Input
), shell=True) # The specified command will be executed through the shell.
self.jobs[job_id]["process_genome_alignment"] = os.path.join(job_dir, f"{output_prefix}.sam")
elif jobbing["sequencing_method"] in ["single", "paired_linking"]:
print(f"Genome alignment for {job_id} is now running.")
run_and_check((
f"cd {job_dir}; "
f"{shutil.which('STAR')} " # Define which star installation to use
f"--runThreadN {self.cpu} " # Define how many core to be used. All cores are now using
f"--genomeDir {self.index_directories['index_dna']} "
f"--readFilesIn {input_fastq_fasta} "
# All parameters were inherited from Mati-Kai's pipeline.
"--outFilterMultimapNmax 1 "
# "--peOverlapNbasesMin 6 " It is for paired end sequencing
# "--peOverlapMMp 0.1 " It is for paired end sequencing
"--outFilterType BySJout "
"--alignIntronMin 5 "
f"--outFileNamePrefix {os.path.join(job_dir, prefix)} "
"--outReadsUnmapped Fastx "
"--outSAMtype BAM SortedByCoordinate "
f"--limitBAMsortRAM {available_memory} " # If not set, error can be raised for some cases.
"--outSAMattributes All XS "
"--quantMode GeneCounts "
"--twopassMode Basic "
"> report_genome_alignment.log"
), shell=True) # The specified command will be executed through the shell.
raw_bam = os.path.join(job_dir, prefix + "Aligned.sortedByCoord.out.bam")
if not jobbing["is_umi_extracted"]:
raw_sam = os.path.join(job_dir, prefix + "Aligned.sortedByCoord.out.sam")
run_and_check(f"cd {job_dir}; {shutil.which('/usr/bin/samtools')} view -h -o {raw_sam} {raw_bam}",
shell=True) # The specified command will be executed through the shell.
self.jobs[job_id]["process_genome_alignment"] = raw_sam
else:
after_sort = os.path.splitext(raw_bam)[0] + '_sorted.bam'
run_and_check(( # Sort bam file and Index
f"cd {job_dir}; "
f"{shutil.which('/usr/bin/samtools')} sort {raw_bam} -o {after_sort}; "
f"{shutil.which('/usr/bin/samtools')} index {after_sort}"
), shell=True) # The specified command will be executed through the shell.
# Deduplication of UMI
output_prefix = "umi-deduplicated"
print(f"UMI deduplication for {job_id} is now running.")
run_and_check(( # Run deduplication
f"cd {job_dir}; "
f"{shutil.which('umi_tools')} dedup " # Define which umi_tools installation to use
f"-I {after_sort} " # Input
f"--output-stats={output_prefix} "
f"-S {output_prefix}.bam "
f"> {output_prefix}.log "
f"> report_{output_prefix}.log"
), shell=True)
run_and_check(( # Convert to sam file
f"cd {job_dir}; "
f"{shutil.which('/usr/bin/samtools')} view -h " # Define which samtools installation to use
f"-o {output_prefix}.sam {output_prefix}.bam" # Output & Input
), shell=True) # The specified command will be executed through the shell.
self.jobs[job_id]["process_genome_alignment"] = os.path.join(job_dir, f"{output_prefix}.sam")
def linking_pairs(self):
for job_id in self.jobs:
if 3 in self.jobs[job_id]["processes"]:
self.linking_pairs_one_job(job_id)
else:
self.jobs[job_id]["process_linking_pairs"] = self.jobs[job_id]["process_cleanup"]
def linking_pairs_one_job(self, job_id):
jobbing = self.jobs[job_id]
job_dir = jobbing["processes_dirs"][3] # 1'i açıkla
temp_fastq = self.jobs[job_id]["process_cleanup"]
assert JobList.process_map[3].endswith("LinkingPairs")
sam_path = os.path.join(job_dir, "prealignment.sam")
print(f"Prealignment to link pairs for {job_id} is now running.")
run_and_check((
f"cd {job_dir}; "
f"{shutil.which('bowtie2')} " # Run Bowtie2 module
"-D 40 -R 6 -N 0 -L 15 -i S,1,0.50 " # Alignment effort and sensitivity. It is now very high.
# Add warning!
"-I20 -X120 " # Search only those that has 20-120 nt.
"--score-min G,20,5.5 " # Min score lowered
"--ma 3 --mp 5,1 " # ma bonus increased
"--no-unal " # To suppress the non-aligned reads
f"-p{self.cpu} " # Number of core to use
"--no-discordant " # Filter pairs does not obey orientation/length constraints
"--no-mixed " # Do not search for individual pairs if one in a pair does not align.
"--local " # Indicate the alignment is local. Soft clipping will be applied.
"--time " # Print the wall-clock time required to load the index files and align the reads.
f"-x {self.index_directories['index_cdna']} " # Index directory with the base name
"-q " # Specifies the inputs are in fastq format
f"-1 {temp_fastq[0]} " # Read 1
f"-2 {temp_fastq[1]} " # Read 2
f"-S {sam_path} " # Output sam file
"2> report_prealignment.log"
), shell=True) # The specified command will be executed through the shell.
# Take the transcript sequences into random access memory
fasta_transcriptome = self.org_db.get_db("cdna")
transcript_seqs = dict() # Create a dictionary with transcript id as keys
with open(fasta_transcriptome, "r") as handle: # Use previously filtered fasta file
for record in SeqIO.parse(handle, "fasta"):
transcript_seqs[record.id] = str(record.seq) # Sequence as the values
output_fasta = os.path.join(job_dir, "footprints.fasta")
# Processing the SAM file
with open(output_fasta, "w") as output_handle: # Open output fasta file
popup_dict = dict()
with pysam.AlignmentFile(sam_path, "r") as sam_handle: # Open sam file
sam_iterator = sam_handle.fetch() # Get the iterator
for e in sam_iterator: # Iterate over the entries
# Check if the entry is mapped, paired, and pair is mapped
if not e.is_unmapped and e.is_paired and e.is_proper_pair and not e.mate_is_unmapped:
# Determine start and end positions, taking into account soft-clipping from the ends.
qn = re.search(r"^[^_]*", e.query_name).group() # Remove UMI info if exists
is_entry_complete = False
if not e.is_reverse and qn not in popup_dict:
popup_dict[qn] = [e.reference_start, None]
elif not e.is_reverse: # and qn in popup_dict
popup_dict[qn][0] = e.reference_start
is_entry_complete = True
elif e.is_reverse and qn not in popup_dict:
popup_dict[qn] = [None, e.reference_end]
elif e.is_reverse: # and qn in popup_dict
popup_dict[qn][1] = e.reference_end
is_entry_complete = True
else:
raise Exception("Unexpected entry!")
# Print the result if the dict is fully completed.
if is_entry_complete:
start_seq, end_seq = popup_dict.pop(qn)
# Different reference_names for pairs is impossible
fp = transcript_seqs[e.reference_name][start_seq: end_seq]
# Write down the identifier and sequence to fasta file
output_handle.write(f">{e.query_name}\n{fp}\n")
self.jobs[job_id]["process_linking_pairs"] = output_fasta
def cleanup(self):
for job_id in self.jobs:
if 2 in self.jobs[job_id]["processes"]:
self.cleanup_one_job(job_id)
else:
self.jobs[job_id]["process_cleanup"] = self.jobs[job_id]["process_umitools"]
def cleanup_one_job(self, job_id):
jobbing = self.jobs[job_id]
job_dir = jobbing["processes_dirs"][2] # 2'i açıkla
temp_paths = self.jobs[job_id]["process_umitools"]
assert JobList.process_map[2].endswith("Cleanup")
if jobbing["sequencing_method"] in ["paired", "paired_linking"]:
output_path = os.path.join(job_dir, "Read%_norRNA.fastq")
print(f"rRNA removal for {job_id} is now running.")
run_and_check((
f"cd {job_dir}; " # Change the directory to the index directory
f"{shutil.which('bowtie2')} " # Run Bowtie2 module
f"-p{self.cpu} " # Number of core to use
"--no-mixed " # Do not search for individual pairs if one in a pair does not align.
# Add warning!
# Delete below, no need to be biased!
"-I20 -X120 " # Default -I=0, -X=500. Since I will disregard X>120 and I<20 in the link-pairing module
"--time " # Print the wall-clock time required to load the index files and align the reads.
"--score-min G,20,6 --ma 4 " # Allow looser alignment. --ma 3
"--local --sensitive-local " # Apply soft clipping when aligning. Default sensitivity.
"-k1 " # We are not interested in best alignment as long as it aligns somewhere in the indexed fasta.
f"-x {self.index_directories['index_rrna']} " # Index directory with the base name
"-q " # Indicates the inputs are fastq
f"-1 {temp_paths[0]} " # Read 1
f"-2 {temp_paths[1]} " # Read 2
f"--un-conc {output_path} " # Output fastq file, Contains all reads which did not aligned RNAs.
# f"--al-conc {os.path.join(job_dir, 'Read%_only_rRNA.fastq')} " # For testing purposes
"-S /dev/null " # Discard alignment sam file /dev/null
f"2> report_cleanup.log"
), shell=True) # The specified command will be executed through the shell.
self.jobs[job_id]["process_cleanup"] = [os.path.join(job_dir, j)
for j in ["Read1_norRNA.fastq", "Read2_norRNA.fastq"]]
elif jobbing["sequencing_method"] in ["single"]:
output_path = os.path.join(job_dir, "Read1_norRNA.fastq")
print(f"rRNA removal for {job_id} is now running.")
run_and_check((
f"cd {job_dir}; " # Change the directory to the index directory
f"{shutil.which('bowtie2')} " # Run Bowtie2 module
f"-p{self.cpu} " # Number of core to use
"--time " # Print the wall-clock time required to load the index files and align the reads.
"--score-min G,20,6 --ma 4 " # Allow looser alignment. --ma 3
"--local --sensitive-local " # Apply soft clipping when aligning. Default sensitivity.
"-k1 " # We are not interested in best alignment as long as it aligns somewhere in the indexed fasta.
f"-x {self.index_directories['index_rrna']} " # Index directory with the base name
"-q " # Indicates the inputs are fastq
f"{temp_paths} " # Read 1
f"--un {output_path} " # Output fastq file, Contains all reads which did not aligned RNAs.
# f"--al {os.path.join(job_dir, 'Read1_only_rRNA.fastq')} " # For testing purposes
"-S /dev/null " # Discard alignment sam file /dev/null
f"2> report_rnaremove.txt"
), shell=True) # The specified command will be executed through the shell.
self.jobs[job_id]["process_cleanup"] = output_path
def preprocessing(self):
# explain1
job_with_preprocessing = [job_id for job_id in self.jobs if 1 in self.jobs[job_id]["processes"]]
for job_id in job_with_preprocessing:
self.preprocessing_cutadapt(job_id)
self.preprocessing_umitools_multiprocessing(job_with_preprocessing)
for job_id in [job_id for job_id in self.jobs if 1 not in self.jobs[job_id]["processes"]]:
self.jobs[job_id]["process_cutadapt"] = self.jobs[job_id]["input_fastq"]
self.jobs[job_id]["process_umitools"] = self.jobs[job_id]["input_fastq"]
def preprocessing_umitools_multiprocessing(self, job_id_list):
assert len(self.jobs) <= self.cpu
executor = multiprocessing.Pool(len(self.jobs))
result = executor.map(self.preprocessing_umitools, job_id_list)
executor.terminate()
executor.join()
for job_id, final_paths in zip(job_id_list, result):
self.jobs[job_id]["process_umitools"] = final_paths
def preprocessing_umitools(self, job_id):
jobbing = self.jobs[job_id]
job_dir = jobbing["processes_dirs"][1] # 1'i açıkla
temp_paths = self.jobs[job_id]["process_cutadapt"]
assert JobList.process_map[1].endswith("Preprocessing")
if jobbing["sequencing_method"] in ["paired", "paired_linking"]:
pattern1, pattern2 = jobbing["pattern_umi1"], jobbing["pattern_umi2"]
if not pattern1 or not pattern2:
return temp_paths
final_paths = [f"{i}_no-adapt_umi-aware.fastq.gz" for i in ["read1", "read2"]]
final_paths = [os.path.join(job_dir, i) for i in final_paths]
print(f"Umitool for {job_id} is now running.")
run_and_check((
f"cd {job_dir}; " # Change the directory to the index directory
f"{shutil.which('umi_tools')} extract " # Define which extract installation to use
"--extract-method=regex "
f"--bc-pattern='{pattern1}' " # Barcode pattern
f"--bc-pattern2='{pattern2}' " # Barcode pattern for paired reads"
f"-I {temp_paths[0]} -S {final_paths[0]} " # Input and output for read 1
f"--read2-in={temp_paths[1]} --read2-out={final_paths[1]} " # Input and output for read 2
f"--log=umi_tools.log " # Log the results
f"> 'report_umitool.log'"
), shell=True) # The specified command will be executed through the shell.
return final_paths
elif jobbing["sequencing_method"] in ["single"]:
umitools_pattern = jobbing["pattern_umi"]
if not umitools_pattern:
return temp_paths
final_path = os.path.join(job_dir, "read1_no-adapt_umi-aware.fastq.gz")
print(f"Umitool for {job_id} is now running.")
run_and_check((
f"cd {job_dir}; " # Change the directory to the index directory
f"{shutil.which('umi_tools')} extract " # Define which extract installation to use
"--extract-method=regex "
f"--bc-pattern='{umitools_pattern}' " # Barcode pattern
f"-I {temp_paths} -S {final_path} " # Input and output for read 1
f"--log=umi_tools.log " # Log the results
f"> 'report_umitool.log'"
), shell=True) # The specified command will be executed through the shell.
return final_path
def preprocessing_cutadapt(self, job_id):
jobbing = self.jobs[job_id]
job_dir = jobbing["processes_dirs"][1] # 1'i açıkla
input_fastq = self.jobs[job_id]["input_fastq"]
assert JobList.process_map[1].endswith("Preprocessing")
if jobbing["sequencing_method"] in ["paired", "paired_linking"]:
# Outputs for the first run
temp_paths = [f"read1_cutadapt_temp.fastq.gz", f"read2_cutadapt_temp.fastq.gz"]
temp_paths = [os.path.join(job_dir, i) for i in temp_paths]
# Create flag if adapter is provided
read1_adapter = f"-a {jobbing['adapter1']}" if jobbing['adapter1'] else ""
read2_adapter = f"-A {jobbing['adapter2']}" if jobbing['adapter2'] else ""
print(f"Cutadapt for {job_id} is now running.")
run_and_check((
f"cd {job_dir}; " # Change the directory to the index directory
f"{shutil.which('cutadapt')} " # Define which cutadapt installation to use
f"--cores={self.cpu} " # Define how many core to be used. All cores are now using
"--match-read-wildcards "
f"-q20 -m23 " # Settings for quality and minimum length to cut the adapter sequence
# todo: explain why: check the photo taken in 12 february
# "--discard-untrimmed "
# "-O6 " # Require minimum length overlap between read and adapter for an adapter to be found.
f"{read1_adapter} {read2_adapter}".strip() + " " # Adapter flags. No flanking white space allowed
f"-o {temp_paths[0]} -p {temp_paths[1]} " # Path to output trimmed sequences
f"{input_fastq[0]} {input_fastq[1]} " # Input file
f"1> 'report_cutadapt_temp.log'"
), shell=True) # The specified command will be executed through the shell.
self.jobs[job_id]["process_cutadapt"] = temp_paths
elif jobbing["sequencing_method"] in ["single"]:
temp_path = os.path.join(job_dir, "read1_cutadapt_temp.fastq.gz") # Outputs for the first run
print(f"Cutadapt for {job_id} is now running.")
run_and_check((
f"cd {job_dir}; " # Change the directory to the index directory
f"{shutil.which('cutadapt')} " # Define which cutadapt installation to use
f"--cores={self.cpu} " # Define how many core to be used. All cores are now using
"--match-read-wildcards "
f"-q20 -m23 " # Settings for quality and minimum length to cut the adapter sequence
# If we don't below two, UMI tool definitely malfunction
"--discard-untrimmed "
"-O6 " # Require minimum length overlap between read and adapter for an adapter to be found.
f"-a {jobbing['adapter']} " # The adapter flags. No flanking white space allowed
f"-o {temp_path} " # Path to output trimmed sequences
f"{input_fastq} " # Input file
f"1> 'report_cutadapt_temp.log'"
), shell=True) # The specified command will be executed through the shell.
self.jobs[job_id]["process_cutadapt"] = temp_path
def create_output_tree(self):
dir_run_identifier = create_dir(self.data_repo_dir, self.run_identifier)
for job_id in self.jobs:
dir_job = create_dir(dir_run_identifier, job_id)
self.jobs[job_id]["processes_dirs"] = {process: create_dir(dir_job, JobList.process_map[process])
for process in self.jobs[job_id]["processes"]}
def create_index(self):
dir_base = create_dir(self.temp_repo_dir, self.organism)
dir_base_index = create_dir(dir_base, "index")
dir_cdna = create_dir(dir_base_index, "cdna")
dir_dna = create_dir(dir_base_index, "dna")
dir_rrna = create_dir(dir_base_index, "rrna")
index_name_cdna = f"{self.organism}_cdna"
index_name_rrna = f"{self.organism}_rrna"
if not self.create_index_search_metadata(dir_cdna):
print("Indexing for cDNA is being calculated.")
temp_cdna_fasta = self.org_db.get_db("cdna")
run_and_check((
f"cd {dir_cdna}; " # Change the directory to the index directory
f"{shutil.which('bowtie2-build')} " # Name of the function
f"--threads {self.cpu} " # Number of threads to be used.
f"{temp_cdna_fasta} " # Input file. -f is to indicate the file is in fasta format
f"{index_name_cdna} " # The basename of the index files to write
f"> report_{self.organism}_cdna.log"
), shell=True) # The specified command will be executed through the shell.
metadata_index = get_files_metadata(dir_cdna) # Write the file info
joblib.dump(metadata_index, os.path.join(dir_cdna, ".metadata.joblib"))
if not self.create_index_search_metadata(dir_rrna):
print("Indexing for rRNA is being calculated.")
temp_rrna_fasta = self.org_db.get_db("rrna")
run_and_check((
f"cd {dir_rrna}; " # Change the directory to the index directory
f"{shutil.which('bowtie2-build')} " # Name of the function
f"--threads {self.cpu} " # Number of threads to be used.
f"{temp_rrna_fasta} " # Input file. -f is to indicate the file is in fasta format
f"{index_name_rrna} " # The basename of the index files to write
f"> report_{self.organism}_rrna.log"
), shell=True) # The specified command will be executed through the shell.
metadata_index = get_files_metadata(dir_rrna) # Write the file info
joblib.dump(metadata_index, os.path.join(dir_rrna, ".metadata.joblib"))
if not self.create_index_search_metadata(dir_dna):
# Manual says: "In most cases, the default value of 100 will work as well as the ideal value."
read_length_minus_1 = 100
print("Indexing for DNA is being calculated.")
temp_dna_fasta = self.org_db.get_db("dna")
temp_gtf_fasta = self.org_db.get_db("gtf")
# Calculate genomeSAindexNbases
genome_size = sum([len(rec.seq) for rec in SeqIO.parse(temp_dna_fasta, "fasta")])
sa_index_parameter = min(14, math.floor(math.log(genome_size, 2) / 2 - 1))
# For small genomes, this paramter has to be scaled down.
run_and_check((
f"cd {dir_dna}; " # Change the directory to the index directory
f"{shutil.which('STAR')} " # Define which star installation to use
f"--runThreadN {self.cpu} " # Define how many core to be used. All cores are now using
"--runMode genomeGenerate "
f"--genomeDir {dir_dna} " # Directory to save the files
f"--genomeSAindexNbases {sa_index_parameter} "
f"--genomeFastaFiles {temp_dna_fasta} " # Specifies FASTA file with the genome reference sequences
f"--sjdbGTFfile {temp_gtf_fasta} " # Specifies the file with annotated transcripts in the GTF format
f"--sjdbOverhang {read_length_minus_1} " # The length of the sequence around the annotated junction
f"> report_{self.organism}_dna.log"
), shell=True) # The specified command will be executed through the shell.
metadata_index = get_files_metadata(dir_dna) # Write the file info
joblib.dump(metadata_index, os.path.join(dir_dna, ".metadata.joblib"))
return {"index_cdna": os.path.join(dir_cdna, index_name_cdna),
"index_rrna": os.path.join(dir_rrna, index_name_rrna),
"index_dna": dir_dna}
@staticmethod
def create_index_search_metadata(dir_path):
# Check if the index file
try:
index_metadata_path = os.path.join(dir_path, ".metadata.joblib")
assert os.path.isfile(index_metadata_path) and os.access(index_metadata_path, os.R_OK)
metadata_index_previously = joblib.load(index_metadata_path)
metadata_index = get_files_metadata(dir_path)
assert metadata_index_previously == metadata_index
return True
except AssertionError:
return False
class OrganismDatabase:
ID_MAP = {"homo_sapiens": 9606,
"mus_musculus": 10090,
"saccharomyces_cerevisiae": 4932,
"escherichia_coli": 562}
def __init__(self, organism, ensembl_release, temp_repo_dir):
assert organism in ["homo_sapiens", "mus_musculus", "saccharomyces_cerevisiae", "escherichia_coli"]
self.ensembl_release = ensembl_release
self.organism = organism
self.organism_id = OrganismDatabase.ID_MAP[self.organism]
self.temp_repo_dir = temp_repo_dir
self.repository = create_dir(self.temp_repo_dir, self.organism)
if self.organism != "escherichia_coli":
assert 90 <= self.ensembl_release <= 104, "Ensembl Release must be between 90 and 104."
base_temp = f"ftp://ftp.ensembl.org/pub/release-{ensembl_release}"
else:
assert self.ensembl_release == 48, "Ensembl Release must be '48' for escherichia_coli."
base_temp = f"ftp://ftp.ensemblgenomes.org/pub/release-{ensembl_release}/bacteria"
if self.organism == "homo_sapiens":
# Genome GTF
gtf_temp = f"gtf/homo_sapiens/Homo_sapiens.GRCh38.{self.ensembl_release}.chr_patch_hapl_scaff.gtf.gz"
self.gtf = os.path.join(base_temp, gtf_temp)
# Genome GFF3
gff3_temp = f"gff3/homo_sapiens/Homo_sapiens.GRCh38.{self.ensembl_release}.chr_patch_hapl_scaff.gff3.gz"
self.gff3 = os.path.join(base_temp, gff3_temp)
# Genome DNA fasta
dna_temp = "fasta/homo_sapiens/dna/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz"
self.dna = os.path.join(base_temp, dna_temp)
# Transcriptome DNA fasta
cdna_temp = "fasta/homo_sapiens/cdna/Homo_sapiens.GRCh38.cdna.all.fa.gz"
self.cdna = os.path.join(base_temp, cdna_temp)
# Protein Fasta
pep_temp = "fasta/homo_sapiens/pep/Homo_sapiens.GRCh38.pep.all.fa.gz"
self.pep = os.path.join(base_temp, pep_temp)
# Gerp Conservation score
gerp_temp = "compara/conservation_scores/111_mammals.gerp_conservation_score/" \
"gerp_conservation_scores.homo_sapiens.GRCh38.bw"
self.gerp = os.path.join(base_temp, gerp_temp)
elif self.organism == "mus_musculus":
# Genome GTF
gtf_temp = f"gtf/mus_musculus/Mus_musculus.GRCm38.{self.ensembl_release}.chr_patch_hapl_scaff.gtf.gz"
self.gtf = os.path.join(base_temp, gtf_temp)
# Genome GFF3
gff3_temp = f"gff3/mus_musculus/Mus_musculus.GRCm38.{self.ensembl_release}.chr_patch_hapl_scaff.gff3.gz"
self.gff3 = os.path.join(base_temp, gff3_temp)
# Genome DNA fasta
dna_temp = "fasta/mus_musculus/dna/Mus_musculus.GRCm38.dna.primary_assembly.fa.gz"
self.dna = os.path.join(base_temp, dna_temp)
# Transcriptome DNA fasta
cdna_temp = "fasta/mus_musculus/cdna/Mus_musculus.GRCm38.cdna.all.fa.gz"
self.cdna = os.path.join(base_temp, cdna_temp)
# Protein Fasta
pep_temp = "fasta/mus_musculus/pep/Mus_musculus.GRCm38.pep.all.fa.gz"
self.pep = os.path.join(base_temp, pep_temp)
# Gerp Conservation score
gerp_temp = "compara/conservation_scores/111_mammals.gerp_conservation_score/" \
"gerp_conservation_scores.mus_musculus.GRCm38.bw"
self.gerp = os.path.join(base_temp, gerp_temp)
elif self.organism == "saccharomyces_cerevisiae":