Skip to content

Commit

Permalink
optimize query.py
Browse files Browse the repository at this point in the history
  • Loading branch information
cl117 committed Aug 27, 2024
1 parent 5605f3e commit 0418a64
Showing 1 changed file with 53 additions and 97 deletions.
150 changes: 53 additions & 97 deletions flask/query.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
import requests
import urllib.parse
from functools import lru_cache
import json
from wor_client import WORClient
import re
from configManager import ConfigManager
from logger import Logger

# Load config once and reuse
config_manager = ConfigManager()
config = config_manager.load_config()

logger_ = Logger()
wor_client_ = WORClient()



def query_parts(_from = '', criteria = '', indexing = False):
def query_parts(_from='', criteria='', indexing=False):
"""
Gets all parts from Virtuoso
Gets all parts from Virtuoso.
Args:
_from: Graph the parts are from
criteria: Any additional criteria
indexing: Whether this query is being called during indexing
Returns: Formatted list of all parts from Virtuoso
"""
query = '''
query = f'''
SELECT DISTINCT
?subject
?displayId
Expand All @@ -35,117 +34,85 @@ def query_parts(_from = '', criteria = '', indexing = False):
?graph
?role
?sboltype
''' + _from + '''
WHERE {
''' + criteria + '''
{_from}
WHERE {{
{criteria}
?subject a ?type .
?subject sbh:topLevel ?subject .''' + ('''\n GRAPH ?graph { ?subject ?a ?t } .''' if indexing else "") + '''
OPTIONAL { ?subject sbol2:displayId ?displayId . }
OPTIONAL { ?subject sbol2:version ?version . }
OPTIONAL { ?subject dcterms:title ?name . }
OPTIONAL { ?subject dcterms:description ?description . }
OPTIONAL { ?subject sbol2:role ?role . }
OPTIONAL { ?subject sbol2:type ?sboltype . }
}
?subject sbh:topLevel ?subject .
{("GRAPH ?graph { ?subject ?a ?t } ." if indexing else "")}
OPTIONAL {{ ?subject sbol2:displayId ?displayId . }}
OPTIONAL {{ ?subject sbol2:version ?version . }}
OPTIONAL {{ ?subject dcterms:title ?name . }}
OPTIONAL {{ ?subject dcterms:description ?description . }}
OPTIONAL {{ ?subject sbol2:role ?role . }}
OPTIONAL {{ ?subject sbol2:type ?sboltype . }}
}}
'''

return memoized_query_sparql(query)


@lru_cache(maxsize=32)
def memoized_query_sparql(query):
"""
Speeds up SPARQL queries using a LRU cache
Speeds up SPARQL queries using a LRU cache.
Args:
query: SPARQL Query
Returns: Results of the SPARQL query
"""
return query_sparql(query)


def query_sparql(query):
"""
Query instances of Virtuoso
Query instances of Virtuoso.
Args:
query: SPARQL query
Returns:
Returns: Deduplicated results of the SPARQL query
"""
endpoints = [config_manager.load_config()['sparql_endpoint']]
endpoints = [config['sparql_endpoint']]

if config_manager.load_config()['distributed_search']:
if config.get('distributed_search'):
instances = wor_client_.get_wor_instance()
for instance in instances:
endpoints.append(instance['instanceUrl'] + '/sparql?')
endpoints.extend(instance['instanceUrl'] + '/sparql?' for instance in instances)

results = []

for endpoint in endpoints:
try:
results.extend(page_query(query, endpoint))
except:
logger_.log('[ERROR] failed querying:' + endpoint)
raise Exception("Endpoint not responding")
except Exception as e:
logger_.log(f'[ERROR] failed querying: {endpoint} - {str(e)}')
continue

return deduplicate_results(results)


def deduplicate_results(results):
"""
Removes duplicates from all SPARQL queries to various Virtuoso instances
Removes duplicates from all SPARQL queries to various Virtuoso instances.
Args:
results: List of results which may contain duplicates
Returns: Deduplicated list of results
"""
deduped = set()

seen = set()
deduped = []
for result in results:
deduped.add(json.dumps(result, sort_keys=True))

return [json.loads(result) for result in deduped]

result_tuple = tuple(sorted(result.items()))
if result_tuple not in seen:
seen.add(result_tuple)
deduped.append(result)
return deduped

def page_query(query, endpoint):
"""
Query to get results from a particular page in SynBioHub
Query to get results from a particular page in SynBioHub.
Args:
query: Query to run
endpoint: Virtuoso endpoint to hit
Returns: List of parts
"""
logger_.log('Current endpoint: ' + endpoint)

bar = [
"[ ]",
"[= ]",
"[=== ]",
"[==== ]",
"[===== ]",
"[====== ]",
"[======= ]",
"[========]",
"[ =======]",
"[ ======]",
"[ =====]",
"[ ====]",
"[ ===]",
"[ ==]",
"[ =]",
"[ ]",
"[ ]"
]
bar_counter = 0

if endpoint != config_manager.load_config()['sparql_endpoint']:
query = re.sub(r'''FROM.*\n''', '', query)

logger_.log(f'Current endpoint: {endpoint}')
query_prefix = '''
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX dcterms: <http://purl.org/dc/terms/>
Expand All @@ -163,61 +130,50 @@ def page_query(query, endpoint):

offset = 0
limit = 10000

results = []

while True:
print(bar[bar_counter % len(bar)], end="\r")
bar_counter+= 1
if endpoint != config['sparql_endpoint']:
query = re.sub(r'''FROM.*\n''', '', query)

full_query = query_prefix + query + 'OFFSET ' + str(offset) + ' LIMIT ' + str(limit)
while True:
full_query = f"{query_prefix} {query} OFFSET {offset} LIMIT {limit}"
new_results = send_query(full_query, endpoint)
results.extend(new_results)

if len(new_results) != limit:
if len(new_results) < limit:
break

offset += limit

return results


def send_query(query, endpoint):
"""
Sends a query to Virtuoso
Sends a query to Virtuoso.
Args:
query: Query to be sent
endpoint: Endpoint where Virtuoso resides
Returns: List of parts from Virtuoso
"""
params = {'query': query}

if endpoint == config_manager.load_config()['sparql_endpoint']:
params['default-graph-uri'] = '' # config_manager.load_config()['synbiohub_public_graph']
if endpoint == config['sparql_endpoint']:
params['default-graph-uri'] = '' # Modify this if needed

url = endpoint + urllib.parse.urlencode(params)
url = f"{endpoint}{urllib.parse.urlencode(params)}"
headers = {'Accept': 'application/json'}

try:
r = requests.get(url, headers=headers)
except Exception as e:
logger_.log("[ERROR] exception when connecting: " + str(e))
r.raise_for_status() # Raises an error for bad HTTP responses
except requests.RequestException as e:
logger_.log(f"[ERROR] exception when connecting: {str(e)}")
raise Exception("Local SynBioHub isn't responding")

if r.status_code != 200:
logger_.log('[ERROR] Got status code when querying: ' + str(r.status_code))
logger_.log(r.text)
raise Exception(url + ' is not responding')

results = []

for binding in r.json()['results']['bindings']:
result = {}
for key in binding:
result[key] = binding[key]['value']
results.append(result)
results = [
{key: binding[key]['value'] for key in binding}
for binding in r.json()['results']['bindings']
]

return results

0 comments on commit 0418a64

Please sign in to comment.