forked from lmmpf/PyAutoFEP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsavestate_util.py
397 lines (340 loc) · 17.3 KB
/
savestate_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
#! /usr/bin/env python3
#
# savestate_util.py
#
# Copyright 2018 Luan Carvalho Martins <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
import pickle
import merge_topologies
from all_classes import Namespace
import os
import os_util
from time import strftime
from shutil import copy2, copytree, rmtree
class SavableState(Namespace):
""" A class which also behaves like a dict and can load and save to pickle
Data structure = {
mcs_dict: {
frozenset([smiles_a, smiles_b]): mcs,
},
ligands_data: {
molecule_name: {
'molecule': rdkit.Chem.Mol,
'topology': [top_file_a, top_file_b], # GROMACS-compatible topology files
'image': {
'2d_hs': str # SVG data of 2D depiction with Hs
'2d_nohs': str # SVG data of 2D depiction without Hs
'perturbations': {
molecule_b_name: {
common_core_smarts: {
'2d_hs': svg_data_hs, # 2D SVG, with Hs, aligned to core, atoms highlighted
'2d_nohs': svg_data_nohs # 2D SVG, no Hs, aligned to core, atoms highlighted
}
}
}
}
},
},
'superimpose_data': {
'reference_pose_path': path_to_ref_pose_file,
'ligand_dict': {
molecule_name: rdkit.Chem.Mol, # Molecule aligned to path_to_ref_pose_file
}
},
'thermograph': {
'run_%d%m%Y_%H%M%S': {
'runtype': runtype, # One of: ['optimal', 'star', 'wheel']
'bias': molecule_name # Map was biased towards a molecule, if any
'input_molecules': {
molecule_name: { 'molecule': rdkit.Chem.Mol }
}
'best_solution': networkx.Graph # Graph of the best solution found
'optimization_data': all_classes.AntSolver # Containing data of every optimization step
}
}
}
"""
def __init__(self, input_file='', verbosity=0):
""" Init SavableState by reading a pickle file, or doing nothing. If no input_file is given, a default name will
be generated
:param str input_file: save result to this file
:param int verbosity: control verbosity level
:rtype dict
"""
super().__init__()
if input_file:
saved_data = self._read_data(input_file)
for key, value in saved_data.items():
setattr(self, key, value)
try:
if self.data_file != input_file and verbosity >= 0:
# User moved/renamed progress file or something wired happened
os_util.local_print('Progress file {} claims to be generated as file {}'
''.format(input_file, self.data_file), current_verbosity=verbosity,
msg_verbosity=os_util.verbosity_level.warning)
self.data_file = input_file
except AttributeError:
os_util.local_print('Progress file {} does not contain data_file data. Is it a progress file?'
''.format(input_file, self.data_file),
msg_verbosity=os_util.verbosity_level.error, current_verbosity=verbosity)
raise ValueError('Invalid progress file')
else:
# User did not supplied a name, generate one
from time import strftime
self.data_file = 'savedata_{}.pkl'.format(strftime('%d%m%Y_%H%M%S'))
def _read_data(self, input_file):
""" Reads a pickle file, returns its object or None on fail
:param str input_file: save result to this file
:rtype: dict
"""
try:
with open(input_file, 'rb') as file_handler:
raw_data = pickle.load(file_handler)
except FileNotFoundError:
# input_file does not exist, create file and save data to it
self.data_file = input_file
with open(input_file, 'wb') as file_handler:
pickle.dump(self.__dict__, file_handler)
return self.__dict__
else:
return raw_data
def save_data(self, output_file='', verbosity=0):
""" Save state to a pickle file
:param str output_file: save result to this file
:param int verbosity: controls verbosity level
:rtype: bool
"""
if output_file != '':
self.data_file = output_file
try:
with open('.temp_pickle_test.pkl', 'wb') as file_handler:
pickle.dump(self.__dict__, file_handler)
os.fsync(file_handler.fileno())
except (IOError, FileNotFoundError):
os_util.local_print('Could not save data to {}'.format(self.data_file), current_verbosity=verbosity,
msg_verbosity=os_util.verbosity_level.error)
raise SystemExit(1)
else:
try:
os.replace('.temp_pickle_test.pkl', self.data_file)
except FileNotFoundError as error:
os_util.local_print('Failed to save progress data to file {}'.format(self.data_file),
current_verbosity=verbosity,
msg_verbosity=os_util.verbosity_level.error)
raise FileNotFoundError(error)
os_util.local_print('Saved data to {}'.format(self.data_file), current_verbosity=verbosity,
msg_verbosity=os_util.verbosity_level.debug)
return True
@property
def __dict__(self):
return dict(self)
def update_mol_image(self, mol_name, save=False, verbosity=0):
""" Generate mol images, if needed
:param str mol_name: name of the molecule to be updated
:param bool save: automatically save data
:param int verbosity: controls verbosity level
"""
import rdkit.Chem
from rdkit.Chem.Draw import MolDraw2DSVG
from rdkit.Chem.AllChem import Compute2DCoords
try:
this_mol = self.ligands_data[mol_name]['molecule']
except KeyError:
os_util.local_print('Molecule name {} not found in the ligands data. Cannot draw it to a 2D svg'
''.format(mol_name),
msg_verbosity=os_util.verbosity_level.warning, current_verbosity=verbosity)
return False
self.ligands_data[mol_name].setdefault('images', {})
if not {'2d_hs', '2d_nohs'}.issubset(self.ligands_data[mol_name]['images']):
draw_2d_svg = MolDraw2DSVG(300, 300)
temp_mol = rdkit.Chem.Mol(this_mol)
Compute2DCoords(temp_mol)
draw_2d_svg.drawOptions().addStereoAnnotation=True
draw_2d_svg.DrawMolecule(temp_mol)
draw_2d_svg.FinishDrawing()
svg_data_hs = draw_2d_svg.GetDrawingText()
temp_mol = rdkit.Chem.RemoveHs(temp_mol)
Compute2DCoords(temp_mol)
draw_2d_svg = MolDraw2DSVG(300, 300)
try:
draw_2d_svg.DrawMolecule(temp_mol)
except RuntimeError:
os_util.local_print('Removing hydrogens of {} would break chirality. I will no generate a '
'representation without Hs'.format(mol_name),
msg_verbosity=os_util.verbosity_level.debug, current_verbosity=verbosity)
svg_data_no_hs = svg_data_hs
else:
draw_2d_svg.FinishDrawing()
svg_data_no_hs = draw_2d_svg.GetDrawingText()
self.ligands_data[mol_name]['images'].update({'2d_hs': svg_data_hs, '2d_nohs': svg_data_no_hs})
if save:
self.save_data()
def update_pertubation_image(self, mol_a_name, mol_b_name, core_smarts=None, save=False, verbosity=0, **kwargs):
""" Generate mol images describing a perturbation between the ligand pair
:param str mol_a_name: name of the molecule A
:param str mol_b_name: name of the molecule B
:param str core_smarts: use this smarts as common core
:param bool save: automatically save data
:param int verbosity: controls verbosity level
"""
self.ligands_data[mol_a_name].setdefault('images', {})
self.ligands_data[mol_a_name]['images'].setdefault('perturbations', {})
self.ligands_data[mol_b_name].setdefault('images', {})
self.ligands_data[mol_b_name]['images'].setdefault('perturbations', {})
import rdkit.Chem
this_mol_a = rdkit.Chem.Mol(self.ligands_data[mol_a_name]['molecule'])
this_mol_b = rdkit.Chem.Mol(self.ligands_data[mol_b_name]['molecule'])
if core_smarts is None:
# Get core_smarts using find_mcs
from merge_topologies import find_mcs
this_mol_a.RemoveAllConformers()
this_mol_b.RemoveAllConformers()
core_smarts = find_mcs([this_mol_a, this_mol_b], savestate=self, verbosity=verbosity, **kwargs).smartsString
try:
# Test whether the correct data structure is already present
assert len(self.ligands_data[mol_a_name]['images']['perturbations'][mol_b_name][core_smarts]) > 0
assert len(self.ligands_data[mol_b_name]['images']['perturbations'][mol_a_name][core_smarts]) > 0
except (KeyError, AssertionError):
# It isn't, go on and create the images
os_util.local_print('Perturbation images for molecules {} and {} with common core "{}" were not found. '
'Generating it.'.format(mol_a_name, mol_b_name, core_smarts),
msg_verbosity=os_util.verbosity_level.debug, current_verbosity=verbosity)
else:
return None
from rdkit.Chem.Draw import MolDraw2DSVG
from rdkit.Chem.AllChem import Compute2DCoords, GenerateDepictionMatching2DStructure
core_mol = rdkit.Chem.MolFromSmarts(core_smarts)
# print(core_smarts)
core_mol.UpdatePropertyCache()
Compute2DCoords(core_mol)
for each_name, each_mol, other_mol in zip([mol_a_name, mol_b_name],
[this_mol_a, this_mol_b],
[mol_b_name, mol_a_name]):
GenerateDepictionMatching2DStructure(each_mol, core_mol, acceptFailure=True)
# Draw mol with hydrogens
draw_2d_svg = MolDraw2DSVG(300, 150)
draw_2d_svg.drawOptions().addStereoAnnotation = True
common_atoms = merge_topologies.get_substruct_matches_fallback(each_mol, core_mol, die_on_error=False,
verbosity=verbosity)
if not common_atoms:
draw_2d_svg.DrawMolecule(each_mol, legend=each_name)
draw_2d_svg.FinishDrawing()
svg_data_hs = draw_2d_svg.GetDrawingText()
else:
not_common_atoms = [i.GetIdx() for i in each_mol.GetAtoms() if i.GetIdx() not in common_atoms]
draw_2d_svg.DrawMolecule(each_mol, legend=each_name, highlightAtoms=not_common_atoms)
draw_2d_svg.FinishDrawing()
svg_data_hs = draw_2d_svg.GetDrawingText()
# Draw mol without hydrogens
draw_2d_svg = MolDraw2DSVG(300, 150)
draw_2d_svg.drawOptions().addStereoAnnotation = True
each_mol = rdkit.Chem.RemoveHs(each_mol)
common_atoms = merge_topologies.get_substruct_matches_fallback(each_mol,
rdkit.Chem.RemoveHs(core_mol),
die_on_error=False,
verbosity=verbosity)
if not common_atoms:
draw_2d_svg.DrawMolecule(each_mol, legend=each_name)
draw_2d_svg.FinishDrawing()
svg_data_nohs = draw_2d_svg.GetDrawingText()
else:
not_common_atoms = [i.GetIdx() for i in each_mol.GetAtoms() if i.GetIdx() not in common_atoms]
draw_2d_svg.DrawMolecule(each_mol, legend=each_name, highlightAtoms=not_common_atoms)
draw_2d_svg.FinishDrawing()
svg_data_nohs = draw_2d_svg.GetDrawingText()
perturbation_imgs = self.ligands_data[each_name]['images']['perturbations']
perturbation_imgs.setdefault(other_mol, {})[core_smarts] = {'2d_hs': svg_data_hs, '2d_nohs': svg_data_nohs}
if save:
self.save_data()
class UserStorageDirectory:
def __init__(self, path='', verbosity=0):
""" Constructs a UserStorageDirectory object
:param str path: use this dir, default: $XDG_CONFIG_HOME/fep_automate
:param int verbosity: verbosity level
"""
if not path:
try:
self.path = os.environ['XDG_CONFIG_HOME']
except KeyError:
try:
self.path = os.path.join(os.environ['HOME'], '.config')
except KeyError:
# Is this unix?
self.path = os.path.join(os.curdir, '.config')
os_util.local_print('You seem to be running on a non-UNIX system (or there are issues in your '
'environment). Trying to go on, but you may experience errors.',
msg_verbosity=os_util.verbosity_level.warning, current_verbosity=verbosity)
else:
self.path = path
self.path = os.path.join(self.path, 'fep_automate')
try:
os.mkdir(self.path)
except FileExistsError:
pass
def create_file(self, file_name, contents, verbosity=0):
""" Create file in storage dir
:param str file_name: name of file to be created
:param [str, bytes] contents: save this data to file
:param int verbosity: verbosity level
:rtype: bool
"""
if isinstance(contents, str):
mode = 'w'
elif isinstance(contents, bytes):
mode = 'wb'
else:
raise TypeError('str or bytes expected, got {} instead'.format(type(contents)))
with open(os.path.join(self.path, file_name), mode=mode) as fh:
fh.write(contents)
# Save a backup of the file with and timestamp
backup_name = os.path.join(self.path, '{}_{}{}'.format(os.path.basename(file_name), strftime('%H%M%S_%d%m%Y'),
os.path.splitext(file_name)))
copy2(os.path.join(self.path, file_name), backup_name)
return True
def store_file(self, source, dest_file='', verbosity=0):
""" Copy file or dir to storage dir
:param str source: file to be copied
:param str dest_file: new file name, default: use source_file name
:param int verbosity: verbosity level
:rtype: bool
"""
if not dest_file:
dest_file = os.path.basename(source)
if dest_file == '':
# source is a directory
dest_file = source.split(os.sep)[-1]
if dest_file in ['', '.']:
os_util.local_print('Could not get a name from {} and dest_file was not supplied. Cannot go continue.'
''.format(source),
msg_verbosity=os_util.verbosity_level.error, current_verbosity=verbosity)
raise ValueError('invalid source name')
backup_name = os.path.join(self.path, '{}_{}{}'.format(os.path.basename(dest_file), strftime('%H%M%S_%d%m%Y'),
os.path.splitext(dest_file)))
try:
copy2(source, os.path.join(self.path, dest_file))
copy2(source, backup_name)
except IsADirectoryError:
try:
copytree(source, os.path.join(self.path, dest_file))
except FileExistsError:
rmtree(os.path.join(self.path, dest_file))
copytree(source, os.path.join(self.path, dest_file))
finally:
copytree(source, os.path.join(self.path, backup_name))
return True