forked from chm052/gene_topology
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlib.py
123 lines (90 loc) · 3.13 KB
/
lib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
INDEX_G1 = 1
INDEX_G2 = 3
INDEX_SCORE = 5
INDEX_EXTRA_DATA = 6
def read(filename, skip_header=True):
f = open(filename)
if skip_header:
f.readline()
return f
def get_seed_list(seed_file):
seed = []
for line in seed_file:
seed.append(line.strip())
return seed
def relevant_score(score):
return float(score) != 0
def find_networks(f):
# initialise empty set of sets
# each set represents a connected network
networks = []
for line in f:
list_line = line.split("\t")
g1 = list_line[INDEX_G1]
g2 = list_line[INDEX_G2]
score = list_line[INDEX_SCORE]
# if the nodes are connected
if relevant_score(score):
g1_index = -1
g2_index = -1
g1_network = []
g2_network = []
for index, network in enumerate(networks):
if g1 in network:
g1_index = index
g1_network = network
if g2 in network:
g2_index = index
g2_network = network
if g1_index != -1 and g2_index != -1:
break
# if new network
if g1_index == -1 and g2_index == -1:
networks.append({g1, g2})
elif g1_index != -1 and g2_index != -1:
# if 2 different current networks, union those networks
if g1_index != g2_index:
networks[g1_index].update(networks[g2_index])
del networks[g2_index]
# otherwise ignore - they are already recorded
# if 1 current network
elif g1_index != -1:
g1_network.add(g2)
elif g2_index != -1:
g2_network.add(g1)
else:
raise Exception()
return networks
def filter_by_seed(networks, seed):
seeded_networks = []
for network in networks:
# if there is a seed in the network
if len(network.intersection(seed)) > 0:
seeded_networks.append(network)
# otherwise, ignore it
return seeded_networks
def get_data_for_seeded_networks(seeded_networks, f, test):
f.seek(0)
f.readline() # skip header
results = []
# open the output files for writing
out_files = [open("output/gene_topology_output_network_" + str(index), "w")
for index in range(len(seeded_networks))]
for line in f:
list_line = line.split("\t")
g1 = list_line[INDEX_G1]
g2 = list_line[INDEX_G2]
score = list_line[INDEX_SCORE]
extra = list_line[INDEX_EXTRA_DATA:]
if relevant_score(score):
for index, seeded_network in enumerate(seeded_networks):
if g1 in seeded_network: # g2 must also be in the network
row = [g1, g2, score]
row.extend(extra)
results.append(row)
if not test:
out_files[index].write("\t".join(row) + "\n")
break
for out_file in out_files:
out_file.close()
return results