forked from yishi-lab/AugurLlama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPeptideManager.py
287 lines (225 loc) · 10.3 KB
/
PeptideManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
from Base import *
from Experiment import Experiemnt
from Peptide import Peptide,PeptideMatching,PeptideFragments
from Config import Param
class PeptideManager(object):
def __init__(self):
'''
Initializer for peptide manager
It contains the peptides collection and a cleavage manager
'''
self.peptides = dict()
def load_peptides_from_pd(self,experiment:Experiemnt):
'''
Load peptide identification result from proteome discover
Apply some filters on identification results
Fill survived peptides into peptides collection with key information reserved
:param fpath:
:return:
'''
data = pd.read_excel(experiment.peptide_identification_file)
# ignore post-translational modifications
# turn sequence into upper case
# remove 'Rejected' PSMs
# remove duplicate sequences
data['Sequence'] = data['Annotated Sequence'].apply(lambda seq: seq.upper())
data['PSM Ambiguity'] = data[data['PSM Ambiguity'] != 'Rejected']
data = data.sort_values(['Sequence', 'Charge', 'Isolation Interference [%]']).drop_duplicates(['Annotated Sequence','Charge'], 'first')
for index,row in data.iterrows():
_sequence = row['Sequence']
_mz = round(float(row['m/z [Da]']),4)
_charge = int(row['Charge'])
_retention_time = round(float(row['RT [min]']),4)
_PEP = float(row['Percolator PEP'])
_qvalue = float(row['Percolator q-Value'])
if _sequence in self.peptides.keys():
self.peptides[_sequence].additional.append(Peptide(sequence = _sequence,mz = _mz,charge = _charge, retention_time = _retention_time, PEP = _PEP,qvalue=_qvalue,index=experiment.index))
else:
self.peptides[_sequence]=Peptide(sequence = _sequence,mz = _mz,charge = _charge, retention_time = _retention_time, PEP = _PEP,qvalue=_qvalue,index=experiment.index)
def load_peptide_fragments_annotation(self,experiment:Experiemnt):
dpath = experiment.fragment_spectra_folder
fragments_dict=dict()
for file in os.listdir(dpath):
try:
if not ".txt" in file:
continue
fullpath = os.path.join(dpath,file)
_seq = file.split("_")[0]
if not _seq in self.peptides.keys():
continue
_fragments = np.zeros(len(_seq),int)
data = pd.read_csv(fullpath, sep="\t")
data = data[~data['matches'].isnull()]
for hit in data['matches']:
if 'NH3' in hit or 'H2O' in hit:
continue
# in case there are multiple matches sharing same m/z
# example: y (2) (2+), b (10) (3+)
# we will take the first one
# example: y (2) (2+)
ion = hit.split(',')[0]
# remove information about charge/ loss of water/amonia
# example: ["y","(2)"]
# [0]ion type [1]fragmented postion
ion = ion.strip().split(' ')[:2]
if ion[0] == 'b':
pos = int(ion[1][1:-1])
_fragments[pos - 1] = 1
elif ion[0] == 'y':
pos = int(ion[1][1:-1])
_fragments[len(_seq) - pos] = 1
else:
continue
if _seq in fragments_dict.keys():
merged_fragments = _fragments | fragments_dict[_seq]
fragments_dict[_seq] = merged_fragments
else:
fragments_dict[_seq] = _fragments
except:
raise IOError("Error loading fragment file:{}".format(os.path.join(dpath,file)))
for _seq,_fragments in fragments_dict.items():
self.peptides[_seq].annotate_fragment_ions(PeptideFragments(_fragments))
# def load_peptide_fragments_annotation(self,experiment:Experiemnt):
# '''
# Provided the directory path containing spectra ion match information generated from proteome discover
# parse peptides fragments information
# :param dpath:
# :return:
# '''
# dpath = experiment.fragment_spectra_folder
# for peptide in self.peptides.values():
# _seq = peptide.sequence
# _fragments=[ 0 for i in range(len(_seq))]
#
# #
# # Merge fragments distribution for the identical peptide
# #
# files = [os.path.join(dpath,file) for file in os.listdir(dpath) if _seq == file.split("_")[0]]
# for file in files:
# data = pd.read_csv(file,sep="\t")
# data = data[~data['matches'].isnull()]
# for hit in data['matches']:
# #
# # in case there are multiple matches sharing same m/z
# # example: y (2) (2+), b (10) (3+)
#
# # we will take the first one
# # example: y (2) (2+)
# ion = hit.split(',')[0]
#
# # remove information about charge/ loss of water/amonia
# # example: ["y","(2)"]
# # [0]ion type [1]fragmented postion
# ion = ion.strip().split(' ')[:2]
#
# if ion[0] == 'b':
# pos = int(ion[1][1:-1])
# _fragments[pos - 1] = 1
# elif ion[0] == 'y':
# pos = int(ion[1][1:-1])
# _fragments[len(_seq) - pos] = 1
# else:
# continue
# self.peptides[_seq].annotate_fragment_ions(_fragments)
def get_peptide_fragmentation(self,peptide_sequence:str):
if peptide_sequence not in self.peptides.keys():
warnings.warn("{} not included,skipped")
return None
return self.peptides[peptide_sequence].fragments
def find_parent_protein_id_for_peptides(self,db_file_path:str):
'''
Provided with entire protein database
In-silico digest each protein and
find out proteins that are relevant to peptides that are identified
:param db_file_path:
:return:
'''
index_file_path = CleavageManager.get_peptides_index_file(db_file_path)
db_entries = 0
for record in SeqIO.parse(index_file_path,'fasta'):
db_entries=+1
_id = str(record.name)
_peptides = set(str(record.seq).split("#"))
# find whether there is any peptide that is identified comes from current protein
_peptides_set = self.peptides.keys() & _peptides
for peptide in _peptides_set:
self.peptides[peptide].add_protein_id(_id)
'''
Remove peptides that can be mapped 20% sequences in database
'''
'''
for _peptide in list(self.peptides.keys()):
if len(self.peptides[_peptide].proteins_ids) > 5/db_entries:
self.peptides.pop(_peptide)
'''
def output_peptides_information(self):
output = Param.TASK_NAME + "_peptide_information.csv"
with open(output,'w+') as out:
out.write("peptide,m/z,charge,retention time,PEP,q-value,num(Proteins)\n")
for peptide in self.peptides.values():
out.write(str(peptide))
class CleavageManager(object):
@staticmethod
def verify_index(db_file):
dirname = os.path.dirname(db_file)
basename = os.path.basename(db_file)
index_file = os.path.join(dirname, "_".join(
[os.path.splitext(basename)[0], Param.ENZYME, str(Param.MISS_CLEAVAGE_ALLOWED)]) + ".fasta")
if not verify_file_path(os.path.join(dirname,index_file)):
print("Peptide index for {} does not exist, build index now".format(db_file))
CleavageManager.build_index(db_file,index_file)
print("Finished building index. Location:{}".format(index_file))
@staticmethod
def get_peptides_index_file(db_file):
dirname = os.path.dirname(db_file)
basename = os.path.basename(db_file)
index_file = os.path.join(dirname,"_".join([os.path.splitext(basename)[0], Param.ENZYME, str(Param.MISS_CLEAVAGE_ALLOWED)])+".fasta")
return index_file
@staticmethod
def build_index(db_file:str,index_file:str):
tmp_file = index_file+".tmp"
index_writer = open(tmp_file,"w+")
for record in SeqIO.parse(db_file,'fasta'):
_id = str(record.id)
_seq = str(record.seq)
_peptides = CleavageManager.cleave(_seq)
index_writer.write(">"+_id+"\n")
index_writer.write("#".join(_peptides)+"\n")
index_writer.close()
os.rename(tmp_file,index_file)
@staticmethod
def set_miscleavage(ml:int):
'''
Override the default setting of miscleavages
:param ml:
:return:
'''
if ml <0 :
warnings.warn("miss cleavage cannot be zero, nothing changes")
return False
MISS_CLEAVAGE_ALLOWED = ml
@staticmethod
def cleave(sequence:str):
'''
Cleave sequences into peptides
:param sequence:
:return peptides set:
'''
peptides = set()
cleavage_pos = [0]
for match in re.finditer(Param.CLEAVAGE_RULES[Param.ENZYME], sequence):
cleavage_pos.append(match.span(0)[1])
cleavage_pos.append(len(sequence))
# get cleaved peptides
for miscleavage in range(Param.MISS_CLEAVAGE_ALLOWED + 1):
for i in range(len(cleavage_pos) - 1 - miscleavage):
_start = cleavage_pos[i]
_end = cleavage_pos[i + 1 + miscleavage]
if _end - _start <4:
continue
_pep = sequence[_start:_end]
# _mass = mass.calculate_mass(sequence=_pep)
# if _mass < self.__min_mass or _mass > self.__max_mass:
# continue
peptides.add(_pep)
return peptides