Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
dkoslicki committed Apr 10, 2024
2 parents 307cd57 + a06161b commit 6aa4233
Show file tree
Hide file tree
Showing 34 changed files with 1,198 additions and 675 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ on:
workflow_dispatch:

push:
branches: [ master, production, itrb-test ]
branches: [ master, production, itrb-test, dev ]
paths:
- 'code/**'
- 'DockerBuild/**'
- 'requirements.txt'
- '.github/workflows/pytest.yml'
pull_request:
branches: [ master, production, itrb-test ]
branches: [ master, production, itrb-test, dev ]
paths:
- 'code/**'
- 'DockerBuild/**'
Expand Down
340 changes: 119 additions & 221 deletions code/ARAX/ARAXQuery/ARAX_connect.py

Large diffs are not rendered by default.

35 changes: 30 additions & 5 deletions code/ARAX/ARAXQuery/ARAX_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def query_return_stream(self, query, mode='ARAX'):
while i_message < n_messages:
with self.lock:
i_message_obj = self.response.messages[i_message].copy()
yield(json.dumps(i_message_obj) + "\n")
yield(json.dumps(i_message_obj, allow_nan=False) + "\n")
i_message += 1
idle_ticks = 0.0

Expand All @@ -147,7 +147,7 @@ def query_return_stream(self, query, mode='ARAX'):
query_plan_counter = self_query_plan_counter
with self.lock:
self_response_query_plan = self.response.query_plan.copy()
yield(json.dumps(self_response_query_plan, sort_keys=True) + "\n")
yield(json.dumps(self_response_query_plan, allow_nan=False, sort_keys=True) + "\n")
idle_ticks = 0.0
time.sleep(0.2)
idle_ticks += 0.2
Expand All @@ -161,14 +161,14 @@ def query_return_stream(self, query, mode='ARAX'):
# #### If there are any more logging messages in the queue, send them first
n_messages = len(self.response.messages)
while i_message < n_messages:
yield(json.dumps(self.response.messages[i_message]) + "\n")
yield(json.dumps(self.response.messages[i_message], allow_nan=False) + "\n")
i_message += 1

#### Also emit any updates to the query_plan
self_response_query_plan_counter = self.response.query_plan['counter']
if query_plan_counter < self_response_query_plan_counter:
query_plan_counter = self_response_query_plan_counter
yield(json.dumps(self.response.query_plan, sort_keys=True) + "\n")
yield(json.dumps(self.response.query_plan, allow_nan=False, sort_keys=True) + "\n")

# Remove the little DONE flag the other thread used to signal this thread that it is done
self.response.status = re.sub('DONE,', '', self.response.status)
Expand All @@ -178,7 +178,21 @@ def query_return_stream(self, query, mode='ARAX'):
self.response.envelope.status = 'Success'

# Stream the resulting message back to the client
yield(json.dumps(self.response.envelope.to_dict(), sort_keys=True) + "\n")
try:
msg_str = json.dumps(self.response.envelope.to_dict(),
allow_nan=False,
sort_keys=True) + "\n"
except ValueError as v:
self.response.envelope.message.results = []
self.response.envelope.message.auxiliary_graphs = None
self.response.envelope.message.knowledge_graph = {'edges': dict(), 'nodes': dict()}
self.response.envelope.status = 'ERROR'
error_message_str = f"error dumping result to JSON: {str(v)}"
self.response.error(error_message_str)
eprint(error_message_str)
msg_str = json.dumps(self.response.envelope.to_dict(),
sort_keys=True) + "\n"
yield msg_str

# Wait until both threads rejoin here and the return
main_query_thread.join()
Expand Down Expand Up @@ -1816,6 +1830,17 @@ def main():
"scoreless_resultify(ignore_edge_direction=true)",
"rank_results()"
]}}
elif params.example_number == 2262:
query = {"operations": {"actions": [
"create_message",
"add_qnode(name=DOID:1227, key=n00)",
"add_qnode(categories=biolink:ChemicalEntity, key=n01)",
"add_qedge(subject=n01, object=n00, key=e00, predicates=biolink:treats)",
"expand(edge_key=e00, kp=infores:rtx-kg2)",
"filter_kg(action=remove_edges_by_predicate, edge_predicate=biolink:treats, remove_connected_nodes=t, qedge_keys=[e00])",
"resultify(ignore_edge_direction=true)",
"return(message=true, store=false)"
]}}
else:
eprint(f"Invalid test number {params.example_number}. Try 1 through 17")
return
Expand Down
247 changes: 0 additions & 247 deletions code/ARAX/ARAXQuery/Overlay/ngd/estimate_coverage.py

This file was deleted.

56 changes: 56 additions & 0 deletions code/ARAX/ARAXQuery/Path_Finder/BidirectionalPathFinder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import sys
import os
import math
import threading

sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from BreadthFirstSearch import BreadthFirstSearch
from model.Node import Node
from model.Path import Path
from model.PathContainer import PathContainer


class BidirectionalPathFinder:

def __init__(self, repository):
self.repo = repository

def find_all_paths(self, node_id_1, node_id_2, hops_numbers=1):
result = set()
if hops_numbers == 0:
return result
if node_id_1 == node_id_2:
return result

hops_numbers_1 = math.floor((hops_numbers + 1) / 2)
hops_numbers_2 = math.floor(hops_numbers / 2)

path_container_1 = PathContainer()
bfs_1 = BreadthFirstSearch(self.repo, path_container_1)

path_container_2 = PathContainer()
bfs_2 = BreadthFirstSearch(self.repo, path_container_2)

thread_1 = threading.Thread(target=lambda: bfs_1.traverse(node_id_1, hops_numbers_1))
thread_2 = threading.Thread(target=lambda: bfs_2.traverse(node_id_2, hops_numbers_2))
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()

intersection_list = path_container_1.path_dict.keys() & path_container_2.path_dict.keys()

for node in intersection_list:
for path_1 in path_container_1.path_dict[node]:
for path_2 in path_container_2.path_dict[node]:
temp_path_1 = [Node(link.id, link.weight) for link in path_1.links]
temp_path_2 = []
for i in range(len(path_2.links) - 2, -1, -1):
temp_path_2.append(Node(path_2.links[i].id, path_2.links[i + 1].weight))
temp_path_1.extend(temp_path_2)
if len(temp_path_1) == len(set(temp_path_1)):
result.add(Path(0, temp_path_1))

result = sorted(list(result), key=lambda path: path.compute_weight())

return result
Loading

0 comments on commit 6aa4233

Please sign in to comment.