diff --git a/DockerBuild/test-instance-scripts/build-test-arax-from-fresh-instance.sh b/DockerBuild/test-instance-scripts/build-test-arax-from-fresh-instance.sh index 2e9dbad8e..eb2082982 100755 --- a/DockerBuild/test-instance-scripts/build-test-arax-from-fresh-instance.sh +++ b/DockerBuild/test-instance-scripts/build-test-arax-from-fresh-instance.sh @@ -2,6 +2,12 @@ # Stephen Ramsey, Oregon State University +# When you run this shell script, make sure your CWD is in `/home/ubuntu` +# In a bash terminal session, run the script like this +# (in this example, port 8080 is specified on the CLI): +# +# cd ~ && source <(curl -s https://raw.githubusercontent.com/RTXteam/RTX/master/DockerBuild/test-instance-scripts/build-test-arax-from-fresh-instance.sh) 8080 + set -o nounset -o pipefail -o errexit arax_base=/mnt/data/orangeboard @@ -28,7 +34,7 @@ sudo mkdir -p ${arax_base}/databases sudo chown ubuntu.ubuntu ${arax_base}/databases # do a test login to arax.ncats.io, to make sure rsync won't hang up later -ssh -q -oStrictHostKeyChecking=no rtxconfig@arax.ncats.io exit +ssh -q -oStrictHostKeyChecking=no rtxconfig@arax-databases.rtx.ai exit # do a test login to araxconfig.rtx.ai, to make sure the scp won't hang up later ssh -q -oStrictHostKeyChecking=no araxconfig@araxconfig.rtx.ai exit diff --git a/README.md b/README.md index 4f74b9c15..228506cbb 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,6 @@ - [How does ARAX work?](#how-does-arax-work) - [The Reasoners Standard Application Programming Interface](#the-reasoners-standard-application-programming-interface) - [What knowledge providers does ARAX use?](#what-knowledge-providers-does-arax-use) - - [RTX-KG1](#rtx-kg1) - [RTX-KG2](#rtx-kg2) - [Columbia Open Health Data (COHD)](#columbia-open-health-data-cohd) - [PubMed](#pubmed) @@ -210,49 +209,18 @@ development process for the Reasoners Standard API. Currently, ARAX/RTX directly accesses four main knowledge providers in order to handle queries, along with several additional APIs for identifier mapping. -## RTX-KG1 - -RTX-KG1 is a knowledge graph comprising 130k nodes and 3.5M relationships that -is built by integrating concepts and concept-predicate-concept triples obtained -from 17 different knowledge providers by way of their web APIs: - -1. Pathway Commons 2 -2. Disease Ontology -3. Monarch Project Biolink API -4. Drug-Gene Interactions Database -5. KEGG -6. UniProtKB -7. DisGeNet -8. OMIM -9. ChEMBL -10. SIDER -11. Pharos -12. MyChem.info -13. miRGate -14. Gene Ontology -15. Monarch SciGraph API -16. Reactome -17. PubChem - -RTX-KG1 complies with the Biolink model-based Translator Knowledge Graph object -model standard. RTX-KG1 is hosted in a Neo4j graph database server and can be -accessed at [kg1endpoint.rtx.ai:7474](http://kg1endpoint.rtx.ai:7474) (username -is `neo4j`; contact Team Expander Agent for the password). Alternatively, a -Neo4j dump file (in gzipped tar archive format) of KG1 can be downloaded without -password from the [kg1endpoint server](http://kg1endpoint.rtx.ai). - ## RTX-KG2 -RTX-KG2 is a knowledge graph comprising 7.5M nodes and 34.3M relationships +RTX-KG2 (GitHub project area is [RTXteam/RTX-KG2](https://github.com/RTXteam/RTX-KG2)) +is a knowledge graph comprising 7.5M nodes and 34.3M relationships that is built by integrating concepts and concept-predicate-concept triples obtained from: -1. *All of the KG1 knowledge providers* -2. Unified Medical Language System (UMLS; including SNOMED CT) -3. NCBI Genes -4. Ensembl Genes -5. UniChem -6. Semantic Medline Database (SemMedDB) +1. Unified Medical Language System (UMLS; including SNOMED CT) +2. NCBI Genes +3. Ensembl Genes +4. UniChem +5. Semantic Medline Database (SemMedDB) RTX-KG2 complies with the Biomedical Data Translator Knowledge Graph object model standard, which is based on the Biolink model. RTX-KG2 is hosted in a diff --git a/code/ARAX/ARAXQuery/ARAX_background_tasker.py b/code/ARAX/ARAXQuery/ARAX_background_tasker.py index c8890f20b..d43c2ac34 100644 --- a/code/ARAX/ARAXQuery/ARAX_background_tasker.py +++ b/code/ARAX/ARAXQuery/ARAX_background_tasker.py @@ -19,30 +19,33 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) FREQ_KP_INFO_CACHER_SEC = 3600 FREQ_CHECK_ONGOING_SEC = 60 -class ARAXBackgroundTasker: +class ARAXBackgroundTasker: - def __init__(self): + def __init__(self, run_kp_info_cacher=True): + self.run_kp_info_cacher = run_kp_info_cacher timestamp = str(datetime.datetime.now().isoformat()) eprint(f"{timestamp}: INFO: ARAXBackgroundTasker created") - - def run_tasks(self, config): + def run_tasks(self): timestamp = str(datetime.datetime.now().isoformat()) eprint(f"{timestamp}: INFO: ARAXBackgroundTasker starting") - #### Set up the query tracker + # Set up the query tracker query_tracker = ARAXQueryTracker() - kp_info_cacher = KPInfoCacher() - kp_info_cacher_counter = 0 - #### Clear the table of existing queries - eprint(f"{timestamp}: INFO: ARAXBackgroundTasker: Clearing any potential stale queries in ongoing query table") + if self.run_kp_info_cacher: + kp_info_cacher = KPInfoCacher() + kp_info_cacher_counter = 0 + + # Clear the table of existing queries + eprint(f"{timestamp}: INFO: ARAXBackgroundTasker: Clearing any " + "potential stale queries in ongoing query table") query_tracker.clear_ongoing_queries() - #### Print out our packages for debugging - if True: + # Print out our packages for debugging + if False: # set to true to print out the packages eprint("Installed packages:") for location, modname, flag in pkgutil.iter_modules(): location = f"{location}" @@ -50,14 +53,15 @@ def run_tasks(self, config): try: version_str = version(modname) eprint(f" {modname} {version_str}") - except: + except Exception: eprint(f" {modname} ???") else: pass - - #### Check in on the NodeSynonymizer database, which sometimes gets corrupted - node_synonymizer_path = os.path.dirname(os.path.abspath(__file__)) + "/../NodeSynonymizer" + # Check in on the NodeSynonymizer database, which sometimes gets + # corrupted + node_synonymizer_path = os.path.dirname(os.path.abspath(__file__)) + \ + "/../NodeSynonymizer" files = os.listdir(node_synonymizer_path) already_printed_header = False link_counter = 0 @@ -81,21 +85,19 @@ def run_tasks(self, config): try: os.unlink(filepath) except Exception as error: - eprint(f"ERROR: Unable to delete file with error {error}") + eprint("ERROR: Unable to delete file with error " + f"{error}") if file_counter != 1 or link_counter != 1: eprint("ERROR: NodeSynonymizer state is weird. " f"file_counter: {file_counter} " f"link_counter: {link_counter} " - "Recommend running the database_manager and restarting") - # try: - # subprocess.check_call( [ 'python3', node_synonymizer_path + "/../ARAXQuery/ARAX_database_manager.py" ] ) - # except Exception as error: - # eprint(f"ERROR: Attempt to run database manager failed with {error}") - + "Recommend restarting, which will rerun the database " + "manager") - #### Check in on the databases directory - node_synonymizer_path = os.path.dirname(os.path.abspath(__file__)) + "/../NodeSynonymizer" + # Check in on the databases directory + node_synonymizer_path = os.path.dirname(os.path.abspath(__file__)) + \ + "/../NodeSynonymizer" files = os.listdir(node_synonymizer_path) eprint("INFO: Current contents of the databases area:") @@ -106,53 +108,59 @@ def run_tasks(self, config): if os.path.islink(filepath): resolved_path = os.path.dirname(os.readlink(filepath)) eprint(f" {resolved_path}") - result = subprocess.run(['ls', '-l', resolved_path], stdout=subprocess.PIPE) + result = subprocess.run(['ls', '-l', resolved_path], + stdout=subprocess.PIPE) eprint(result.stdout.decode('utf-8')) eprint("INFO: End listing databases area contents") - - - #### Loop forever doing various things + # Loop forever doing various things my_pid = os.getpid() while True: - #### Run the KP Info Cacher less frequently - timestamp = str(datetime.datetime.now().isoformat()) - if kp_info_cacher_counter == 0: - eprint(f"{timestamp}: INFO: ARAXBackgroundTasker: Running refresh_kp_info_caches()") - try: - kp_info_cacher.refresh_kp_info_caches() - eprint(f"{timestamp}: INFO: ARAXBackgroundTasker: Completed refresh_kp_info_caches()") - except Exception as error: - exception_type, exception_value, exception_traceback = sys.exc_info() - eprint(f"{timestamp}: INFO: ARAXBackgroundTasker: refresh_kp_info_caches() failed: {error}: {repr(traceback.format_exception(exception_type, exception_value, exception_traceback))}") - kp_info_cacher_counter += 1 - if kp_info_cacher_counter * FREQ_CHECK_ONGOING_SEC > \ - FREQ_KP_INFO_CACHER_SEC: - kp_info_cacher_counter = 0 - - ongoing_queries_by_remote_address = query_tracker.check_ongoing_queries() + # Run the KP Info Cacher less frequently + if self.run_kp_info_cacher: + if kp_info_cacher_counter == 0: + timestamp = str(datetime.datetime.now().isoformat()) + eprint(f"{timestamp}: INFO: ARAXBackgroundTasker: Running " + "refresh_kp_info_caches()") + try: + kp_info_cacher.refresh_kp_info_caches() + eprint(f"{timestamp}: INFO: ARAXBackgroundTasker: " + "Completed refresh_kp_info_caches()") + except Exception as error: + e_type, e_value, e_traceback =\ + sys.exc_info() + err_str = repr(traceback.format_exception(e_type, + e_value, + e_traceback)) + eprint(f"{timestamp}: INFO: ARAXBackgroundTasker: " + "refresh_kp_info_caches() failed: " + f"{error}: {err_str}") + kp_info_cacher_counter += 1 + if kp_info_cacher_counter * FREQ_CHECK_ONGOING_SEC > \ + FREQ_KP_INFO_CACHER_SEC: + kp_info_cacher_counter = 0 + + ongoing_queries_by_addr = query_tracker.check_ongoing_queries() n_ongoing_queries = 0 n_clients = 0 - for client, n_queries in ongoing_queries_by_remote_address.items(): + for client, n_queries in ongoing_queries_by_addr.items(): n_clients += 1 n_ongoing_queries += n_queries load_tuple = psutil.getloadavg() timestamp = str(datetime.datetime.now().isoformat()) - eprint(f"{timestamp}: INFO: ARAXBackgroundTasker (PID {my_pid}) status: waiting. Current load is {load_tuple}, n_clients={n_clients}, n_ongoing_queries={n_ongoing_queries}") + eprint(f"{timestamp}: INFO: ARAXBackgroundTasker " + f"(PID {my_pid}) status: waiting. Current " + f"load is {load_tuple}, n_clients={n_clients}, " + f"n_ongoing_queries={n_ongoing_queries}") time.sleep(FREQ_CHECK_ONGOING_SEC) - -################################################################################################## def main(): - background_tasker = ARAXBackgroundTasker() - - config = {} - background_tasker.run_tasks( config ) + background_tasker.run_tasks() if __name__ == "__main__": diff --git a/code/ARAX/ARAXQuery/ARAX_database_manager.py b/code/ARAX/ARAXQuery/ARAX_database_manager.py index c219ed231..e470f81a2 100644 --- a/code/ARAX/ARAXQuery/ARAX_database_manager.py +++ b/code/ARAX/ARAXQuery/ARAX_database_manager.py @@ -2,7 +2,7 @@ # NOTE: this module is only to be used either as a CLI script or in the # __main__.py Flask application at application start-up. Please do not -# instantiate this class and call `check_databases` at query time. -SAR +# instantiate this class and call `update_databases` at query time. -SAR import os import sys @@ -22,9 +22,9 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) class ARAXDatabaseManager: - def __init__(self): + def __init__(self, allow_downloads=False): self.RTXConfig = RTXConfiguration() - + self.allow_downloads = allow_downloads pathlist = os.path.realpath(__file__).split(os.path.sep) RTXindex = pathlist.index("RTX") @@ -209,10 +209,12 @@ def __init__(self): } def update_databases(self, debug = True, response = None): + if not self.allow_downloads: + raise ValueError("in ARAXDatabaseManager, update_databases called with self.allow_downloads=False") debug = True # First ensure we have a db versions file if we're in a docker container (since host has dbs predownloaded) if os.path.exists(self.docker_databases_dir_path) and not os.path.exists(versions_path): - self.write_db_versions_file(debug=True) + self._write_db_versions_file(debug=True) # Then ensure each database/symlink is up to date if os.path.exists(versions_path): @@ -222,7 +224,7 @@ def update_databases(self, debug = True, response = None): # Download databases to a persistent central location if this is a docker instance (like arax.ncats.io) if os.path.exists(self.docker_databases_dir_path): eprint(f"Downloading any missing databases from arax-databases.rtx.ai to {self.docker_databases_dir_path}") - self.download_to_mnt(debug=debug, skip_if_exists=True, remove_unused=False) + self._download_to_mnt(debug=debug, skip_if_exists=True, remove_unused=False) # Check that each database exists locally (or a symlink to it does, in the case of a docker host machine) for database_name, local_path in self.local_paths.items(): # iterate through all databases @@ -231,20 +233,20 @@ def update_databases(self, debug = True, response = None): eprint(f"{database_name} ({local_path}) not present locally, downloading or symlinking now...") if response is not None: response.debug(f"Updating the local file for {database_name}...") - self.download_database(remote_location=self.remote_locations[database_name], - local_destination_path=self.local_paths[database_name], - local_symlink_target_path=self.docker_central_paths[database_name], - debug=debug) + self._download_database(remote_location=self.remote_locations[database_name], + local_destination_path=self.local_paths[database_name], + local_symlink_target_path=self.docker_central_paths[database_name], + debug=debug) elif local_versions[database_name]['version'] != self.db_versions[database_name]['version']: # If database is present but wrong version if debug: eprint(f"{database_name} has a local version, '{local_versions[database_name]['version']}', which does not match the remote version, '{self.db_versions[database_name]['version']}'.") eprint("downloading remote version...") if response is not None: response.debug(f"Updating the local file for {database_name}...") - self.download_database(remote_location=self.remote_locations[database_name], - local_destination_path=self.local_paths[database_name], - local_symlink_target_path=self.docker_central_paths[database_name], - debug=debug) + self._download_database(remote_location=self.remote_locations[database_name], + local_destination_path=self.local_paths[database_name], + local_symlink_target_path=self.docker_central_paths[database_name], + debug=debug) if os.path.exists(self.local_paths[database_name]): # check that download worked if so remove old version if debug: eprint("Download successful. Removing local version...") @@ -261,18 +263,21 @@ def update_databases(self, debug = True, response = None): eprint(f"{database_name} ({self.local_paths[database_name]}) not present locally, downloading or symlinking now......") if response is not None: response.debug(f"Updating the local file for {database_name}...") - self.download_database(remote_location=self.remote_locations[database_name], local_destination_path=self.local_paths[database_name], local_symlink_target_path=self.docker_central_paths[database_name], debug=debug) + self._download_database(remote_location=self.remote_locations[database_name], + local_destination_path=self.local_paths[database_name], + local_symlink_target_path=self.docker_central_paths[database_name], + debug=debug) else: if debug: eprint(f"Local version of {database_name} ({local_path}) matches the remote version, skipping...") - self.write_db_versions_file() + self._write_db_versions_file() else: # If database manager has never been run download all databases if debug: eprint("No local verson json file present. Downloading all databases...") if response is not None: response.debug(f"No local verson json file present. Downloading all databases...") - self.force_download_all(debug=debug) - self.write_db_versions_file() + self._force_download_all(debug=debug) + self._write_db_versions_file() return response @staticmethod @@ -330,7 +335,7 @@ def check_date(self, file_path, max_days = 31): else: return True - def download_database(self, remote_location, local_destination_path, local_symlink_target_path, debug=False): + def _download_database(self, remote_location, local_destination_path, local_symlink_target_path, debug=False): if local_symlink_target_path is not None and os.path.exists(local_symlink_target_path): # if on the server symlink instead of downloading self.symlink_database(symlink_path=local_destination_path, target_path=local_symlink_target_path) else: @@ -343,10 +348,9 @@ def rsync_database(self, remote_location, local_path, debug=False): verbose = "" if debug: verbose = "vv" - #os.system(f"rsync -Lhzc{verbose} --progress {remote_location} {local_path}") - eprint(f"ERROR: Wanted to run the following rsync, but it isn't going to work anyway. Skipping: rsync -Lhzc{verbose} --progress {remote_location} {local_path}") + os.system(f"rsync -Lhzc{verbose} --progress {remote_location} {local_path}") - def download_to_mnt(self, debug=False, skip_if_exists=False, remove_unused=False): + def _download_to_mnt(self, debug=False, skip_if_exists=False, remove_unused=False): """ This method downloads databases to the docker host machine in a central location. """ @@ -355,7 +359,7 @@ def download_to_mnt(self, debug=False, skip_if_exists=False, remove_unused=False for database_name in self.remote_locations.keys(): database_dir = os.path.sep.join(self.docker_central_paths[database_name].split('/')[:-1]) if debug: - print(f"On database {database_name} in download_to_mnt()") + print(f"On database {database_name} in _download_to_mnt()") if not os.path.exists(database_dir): if debug: print(f"Creating directory {database_dir}...") @@ -366,23 +370,26 @@ def download_to_mnt(self, debug=False, skip_if_exists=False, remove_unused=False if debug: print(f"Initiating download from location {remote_location}; " f"saving to {docker_host_local_path}") - self.download_database(remote_location=remote_location, - local_destination_path=docker_host_local_path, - local_symlink_target_path=None, - debug=debug) + self._download_database(remote_location=remote_location, + local_destination_path=docker_host_local_path, + local_symlink_target_path=None, + debug=debug) else: print(f" Database already exists, no need to download") if debug else None - - def force_download_all(self, debug=False): + + def _force_download_all(self, debug=False): for database_name in self.remote_locations.keys(): if debug: print(f"Downloading {self.remote_locations[database_name].split('/')[-1]}...") - self.download_database(remote_location=self.remote_locations[database_name], local_destination_path=self.local_paths[database_name], local_symlink_target_path=self.docker_central_paths[database_name], debug=debug) + self._download_database(remote_location=self.remote_locations[database_name], + local_destination_path=self.local_paths[database_name], + local_symlink_target_path=self.docker_central_paths[database_name], + debug=debug) def check_all(self, max_days=31, debug=False): update_flag = False for database_name, file_path in self.local_paths.items(): - if os.path.exists(file_path): + if os.path.exitss(file_path): if debug: now_time = datetime.datetime.now() modified_time = time.localtime(os.stat(file_path).st_mtime) @@ -402,14 +409,7 @@ def check_all(self, max_days=31, debug=False): return True return update_flag - def update_databases_by_date(self, max_days=31, debug=False): - for database_name, local_path in self.local_paths.items(): - if self.check_date(local_path, max_days=max_days): - if debug: - print(f"{database_name} not present or older than {max_days} days. Updating file...") - self.download_database(remote_location=self.remote_locations[database_name], local_destination_path=local_path, local_symlink_target_path=self.docker_central_paths[database_name], debug=debug) - - def write_db_versions_file(self, debug=False): + def _write_db_versions_file(self, debug=False): print(f"saving new version file to {versions_path}") if debug else None with open(versions_path, "w") as fid: json.dump(self.db_versions, fid) @@ -441,7 +441,7 @@ def main(): parser.add_argument("-r", "--remove_unused", action='store_true', dest='remove_unused', required=False, help="for -m mode only, remove database files under /mnt databases directory that are NOT used in config_dbs.json") arguments = parser.parse_args() - DBManager = ARAXDatabaseManager() + DBManager = ARAXDatabaseManager(allow_downloads=True) print(f"Local paths:") for db_name, path in DBManager.local_paths.items(): @@ -457,13 +457,13 @@ def main(): if not DBManager.check_versions(debug=True): print("All local versions are up to date") elif arguments.force_download: - DBManager.force_download_all(debug=True) + DBManager._force_download_all(debug=True) elif arguments.mnt: - DBManager.download_to_mnt(debug=True, - skip_if_exists=arguments.skip_if_exists, - remove_unused=arguments.remove_unused) + DBManager._download_to_mnt(debug=True, + skip_if_exists=arguments.skip_if_exists, + remove_unused=arguments.remove_unused) elif arguments.generate_versions_file: - DBManager.write_db_versions_file(debug=True) + DBManager._write_db_versions_file(debug=True) else: DBManager.update_databases(debug=True) diff --git a/code/ARAX/ARAXQuery/ARAX_expander.py b/code/ARAX/ARAXQuery/ARAX_expander.py index 73d8d0ead..ed7f80065 100644 --- a/code/ARAX/ARAXQuery/ARAX_expander.py +++ b/code/ARAX/ARAXQuery/ARAX_expander.py @@ -1,7 +1,6 @@ #!/bin/env python3 import asyncio import copy -import logging import pickle import sys import os @@ -29,7 +28,6 @@ from openapi_server.models.edge import Edge from openapi_server.models.attribute_constraint import AttributeConstraint from Expand.kg2_querier import KG2Querier -from Expand.ngd_querier import NGDQuerier from Expand.trapi_querier import TRAPIQuerier @@ -51,7 +49,6 @@ def trim_to_size(input_list, length): class ARAXExpander: def __init__(self): - self.logger = logging.getLogger('log') self.bh = BiolinkHelper() self.rtxc = RTXConfiguration() self.plover_url = self.rtxc.plover_url @@ -70,34 +67,22 @@ def describe_me(self): :return: """ kp_selector = KPSelector() - all_kps = sorted(list(kp_selector.valid_kps)) # TODO: Should we include with any maturity here? any TRAPI version? + all_kps = sorted(list(kp_selector.valid_kps)) rtxc = RTXConfiguration() command_definition = { "dsl_command": "expand()", "description": f"This command will expand (aka, answer/fill) your query graph in an edge-by-edge " - f"fashion, intelligently selecting which KPs to use for each edge. It selects KPs from the SmartAPI Registry " - f"based on the meta information provided by their TRAPI APIs (when available), whether they " - f"have an endpoint running a matching TRAPI version, and whether they have an endpoint with matching " - f"maturity. For each QEdge, it queries the selected KPs in parallel; it will " - f"timeout for a particular KP if it decides it's taking too long to respond. You may also " - f"optionally specify a particular KP to use via the 'kp' parameter (described below).\n\n" - f"Current candidate KPs include (for TRAPI {rtxc.trapi_major_version}, maturity '{rtxc.maturity}'): \n" + f"fashion, intelligently selecting which KPs to use for each edge. It selects KPs from " + f"the SmartAPI Registry based on the meta information provided by their TRAPI APIs, " + f"whether they have an endpoint running a matching TRAPI version, and whether they have " + f"an endpoint with matching maturity. For each QEdge, it queries the selected KPs " + f"concurrently; it will timeout for a particular KP if it decides it's taking too long " + f"to respond (this KP timeout can be controlled by the user). You may also optionally " + f"specify a particular KP to use via the 'kp' parameter (described below).\n\n" + f"Current candidate KPs include (for TRAPI {rtxc.trapi_major_version}, " + f"maturity '{rtxc.maturity}'): \n" f"{', '.join(all_kps)}. \n" - f"\n(Note that this list of KPs may change unexpectedly based on the SmartAPI registry.)" - f"\n\n**Notes specific to usage of ARAX's internal KPs:**\n " - f"1. NGD: The 'infores:arax-normalized-google-distance' KP uses ARAX's in-house normalized " - f"google distance (NGD) database to expand " - "a query graph; it returns edges between nodes with an NGD value below a certain " - "threshold. This threshold is currently hardcoded as 0.5, though this will be made " - "configurable/smarter in the future.\n", - # "2. DTD: The 'infores:arax-drug-treats-disease' KP uses ARAX's in-house drug-treats-disease (DTD) database (built from GraphSage model) to expand " - # "a query graph; it returns edges between nodes with a DTD probability above a certain " - # "threshold. The default threshold is currently set to 0.8. If you set this threshold below 0.8, you should also " - # "set DTD_slow_mode=True otherwise a warninig will occur. This is because the current DTD database only stores the pre-calcualted " - # "DTD probability above or equal to 0.8. Therefore, if an user set threshold below 0.8, it will automatically switch to call DTD model " - # "to do a real-time calculation and this will be quite time-consuming. In addition, if you call DTD database, your query node type would be checked. " - # "In other words, the query node has to have a sysnonym which is drug or disease. If you don't want to check node type, set DTD_slow_mode=true " - # "to call DTD model to do a real-time calculation.", + f"\n(Note that this list of KPs may change unexpectedly based on the SmartAPI registry.)", "parameters": self.get_parameter_info_dict() } return [command_definition] @@ -149,27 +134,7 @@ def get_parameter_info_dict(): "type": "boolean", "description": "Whether to omit supporting data on nodes/edges in the results (e.g., publications, " "description, etc.)." - }, - # "DTD_threshold": { - # "is_required": False, - # "examples": [0.8, 0.5], - # "min": 0, - # "max": 1, - # "default": 0.8, - # "type": "float", - # "description": "Applicable only when the 'infores:arax-drug-treats-disease' KP is used. " - # "Defines what cut-off/threshold to use for expanding the DTD virtual edges." - # }, - # "DTD_slow_mode": { - # "is_required": False, - # "examples": ["true", "false"], - # "enum": ["true", "false", "True", "False", "t", "f", "T", "F"], - # "default": "false", - # "type": "boolean", - # "description": "Applicable only when the 'infores:arax-drug-treats-disease' KP is used. " - # "Specifies whether to call DTD model rather than DTD database to do a real-time " - # "calculation for DTD probability." - # } + } } return parameter_info_dict @@ -361,92 +326,11 @@ def apply(self, response, input_parameters, mode: str = "ARAX"): # Default to related_to if no predicate was specified qedge.predicates = [self.bh.get_root_predicate()] - # Send ARAXInfer any one-hop, "inferred", "treats" queries (temporary way of making creative mode work) - if mode != "RTXKG2": - inferred_qedge_keys = [qedge_key for qedge_key, qedge in query_graph.edges.items() if qedge.knowledge_type == "inferred"] - if inferred_qedge_keys: - log.debug(f"knowledge_type='inferred' qedges were detected ({', '.join(inferred_qedge_keys)}); " - f"will determine which model to consult based on the category of source qnode and object qnode, as well as predicate.") - if len(query_graph.edges) == 1: - inferred_qedge_key = inferred_qedge_keys[0] - qedge = query_graph.edges[inferred_qedge_key] - # treats_ancestors = self.bh.get_ancestors("biolink:treats") ## we don't consider the ancestor because both "biolink:treats" and "biolink:regulates" share the same ancestors - if set(['biolink:ameliorates', 'biolink:treats']).intersection(set(qedge.predicates)): # Figure out if this is a "treats" query, then use call XDTD - # Call XDTD and simply return whatever it returns - # Get the subject and object of this edge - subject_qnode = query_graph.nodes[qedge.subject] # drug - object_qnode = query_graph.nodes[qedge.object] # disease - if object_qnode.ids and len(object_qnode.ids) >= 1: - object_curie = object_qnode.ids[0] #FIXME: will need a way to handle multiple IDs - else: - log.error(f"No CURIEs found for qnode {qedge.object}; ARAXInfer/XDTD requires that the" - f" object qnode has 'ids' specified", error_code="NoCURIEs") - #raise Exception(f"No CURIEs found for {object_qnode.name}") - return response - log.info(f"Calling XDTD from Expand for qedge {inferred_qedge_key} (has knowledge_type == inferred) and the subject is {object_curie}") - from ARAX_infer import ARAXInfer - infer_input_parameters = {"action": "drug_treatment_graph_expansion",'node_curie': object_curie, 'qedge_id': inferred_qedge_key} - inferer = ARAXInfer() - infer_response = inferer.apply(response, infer_input_parameters) - # return infer_response - response = infer_response - overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(message.knowledge_graph) - - elif set(['biolink:regulates']).intersection(set(qedge.predicates)): # Figure out if this is a "regulates" query, then use call XCRG models - # Call XCRG models and simply return whatever it returns - # Get the subject and object of this edge - subject_qnode = query_graph.nodes[qedge.subject] # chemical - object_qnode = query_graph.nodes[qedge.object] # gene - qualifier_direction = [qualifier.qualifier_value for qualifier_constraint in qedge.qualifier_constraints for qualifier in qualifier_constraint.qualifier_set if qualifier.qualifier_type_id == 'biolink:object_direction_qualifier'][0] - if qualifier_direction == 'increased': - regulation_type = 'increase' - elif qualifier_direction == 'decreased': - regulation_type = 'decrease' - else: - log.error(f"The action 'chemical_gene_regulation_graph_expansion' only support the qualifier_direction with either 'increased' or 'decreased' but {qualifier_direction} provided.") - return response - if subject_qnode.ids and len(subject_qnode.ids) >= 1: - subject_curie = subject_qnode.ids[0] #FIXME: will need a way to handle multiple IDs - else: - subject_curie = None - if object_qnode.ids and len(object_qnode.ids) >= 1: - object_curie = object_qnode.ids[0] #FIXME: will need a way to handle multiple IDs - else: - object_curie = None - if not subject_curie and not object_curie: - log.error(f"No CURIEs found for both query subject node {qedge.subject} and query object node {qedge.object}; ARAXInfer/XCRG requires " - f"that at least subject qnode or object qnode has 'ids' specified", error_code="NoCURIEs") - return response - if subject_curie and object_curie: - log.error(f"The action 'chemical_gene_regulation_graph_expansion' hasn't support the prediction for a single chemical-gene pair yet.", error_code="InvalidCURIEs") - return response - log.info(f"Calling XCRG from Expand for qedge {inferred_qedge_key} (has knowledge_type == inferred) and the subject is {subject_curie} and the object is {object_curie}") - from ARAX_infer import ARAXInfer - if subject_curie: - infer_input_parameters = {"action": "chemical_gene_regulation_graph_expansion", 'subject_qnode_id' : qedge.subject, 'qedge_id': inferred_qedge_key, 'regulation_type': regulation_type} - else: - infer_input_parameters = {"action": "chemical_gene_regulation_graph_expansion", 'object_qnode_id' : qedge.object, 'object_curie': object_curie, 'qedge_id': inferred_qedge_key, 'regulation_type': regulation_type} - inferer = ARAXInfer() - infer_response = inferer.apply(response, infer_input_parameters) - response = infer_response - overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(message.knowledge_graph) - else: - log.info(f"Qedge {inferred_qedge_key} has knowledge_type == inferred, but the query is not " - f"DTD-related (e.g., 'biolink:ameliorates', 'biolink:treats') or CRG-related ('biolink:regulates') according to the specified predicate. Will answer using the normal 'fill' strategy (not creative mode).") - else: - log.warning(f"Expand does not yet know how to answer multi-qedge query graphs when one or more of " - f"the qedges has knowledge_type == inferred. Will answer using the normal 'fill' strategy " - f"(not creative mode).") - # Expand any specified edges if qedge_keys_to_expand: query_sub_graph = self._extract_query_subgraph(qedge_keys_to_expand, query_graph, log) - if mode != "RTXKG2": - if inferred_qedge_keys and len(query_graph.edges) == 1: - for edge in query_sub_graph.edges.keys(): - query_sub_graph.edges[edge].knowledge_type = 'lookup' - if log.status != 'OK': - return response + inferred_qedge_keys = [qedge_key for qedge_key, qedge in query_graph.edges.items() if + qedge.knowledge_type == "inferred"] log.debug(f"Query graph for this Expand() call is: {query_sub_graph.to_dict()}") ordered_qedge_keys_to_expand = self._get_order_to_expand_qedges_in(query_sub_graph, log) @@ -466,7 +350,17 @@ def apply(self, response, input_parameters, mode: str = "ARAX"): response.update_query_plan(qedge_key, 'edge_properties', 'object', object_details) response.update_query_plan(qedge_key, 'edge_properties', 'predicate', predicate_details) for kp in kp_selector.valid_kps: - response.update_query_plan(qedge_key, kp, 'Waiting', f'Waiting for processing of {qedge_key} to begin') + response.update_query_plan(qedge_key, kp, 'Waiting', f'Waiting for processing to begin') + + # Get any inferred results from ARAX Infer + if mode != "RTXKG2" and inferred_qedge_keys: + response, overarching_kg = self.get_inferred_answers(inferred_qedge_keys, query_graph, response) + if log.status != 'OK': + return response + # Now mark qedges as 'lookup' if this is an inferred query + if inferred_qedge_keys and len(query_graph.edges) == 1: + for edge in query_sub_graph.edges.keys(): + query_sub_graph.edges[edge].knowledge_type = 'lookup' # Expand the query graph edge-by-edge for qedge_key in ordered_qedge_keys_to_expand: @@ -481,6 +375,7 @@ def apply(self, response, input_parameters, mode: str = "ARAX"): # Create a query graph for this edge (that uses curies found in prior steps) one_hop_qg = self._get_query_graph_for_edge(qedge_key, query_graph, overarching_kg, log) if mode != "RTXKG2": + # Mark these qedges as 'lookup' if this is an 'inferred' query if inferred_qedge_keys and len(query_graph.edges) == 1: for edge in one_hop_qg.edges.keys(): one_hop_qg.edges[edge].knowledge_type = 'lookup' @@ -521,7 +416,7 @@ def apply(self, response, input_parameters, mode: str = "ARAX"): skipped_message = "This KP was constrained by this edge" response.update_query_plan(qedge_key, skipped_kp, "Skipped", skipped_message) - log.info(f"The KPs Expand decided to answer {qedge_key} with are: {kps_to_query}") + log.info(f"Expand decided to use {len(kps_to_query)} KPs to answer {qedge_key}: {kps_to_query}") else: kps_to_query = set(eu.convert_to_list(parameters["kp"])) for kp in kp_selector.valid_kps.difference(kps_to_query): @@ -540,7 +435,6 @@ def apply(self, response, input_parameters, mode: str = "ARAX"): asyncio.set_event_loop(loop) tasks = [self._expand_edge_async(one_hop_qg, kp_to_use, - input_parameters, user_specified_kp, kp_timeout, force_local, @@ -682,7 +576,6 @@ def apply(self, response, input_parameters, mode: str = "ARAX"): decorator = ARAXDecorator() decorator.decorate_edges(response, kind="SEMMEDDB") - # Map canonical curies back to the input curies in the QG (where applicable) #1622 self._map_back_to_input_curies(message.knowledge_graph, query_graph, log) if mode == "RTXKG2": @@ -695,9 +588,135 @@ def apply(self, response, input_parameters, mode: str = "ARAX"): return response + @staticmethod + def get_inferred_answers(inferred_qedge_keys: List[str], + query_graph: QueryGraph, + response: ARAXResponse) -> Tuple[ARAXResponse, QGOrganizedKnowledgeGraph]: + # Send ARAXInfer any one-hop, "inferred", "treats" queries (temporary way of making creative mode work) + overarching_kg = QGOrganizedKnowledgeGraph() + if inferred_qedge_keys: + response.debug(f"knowledge_type='inferred' qedges were detected ({', '.join(inferred_qedge_keys)}); " + f"will determine which model to consult based on the category of source qnode and object qnode, as well as predicate.") + if len(query_graph.edges) == 1: + inferred_qedge_key = inferred_qedge_keys[0] + qedge = query_graph.edges[inferred_qedge_key] + # treats_ancestors = self.bh.get_ancestors("biolink:treats") ## we don't consider the ancestor because both "biolink:treats" and "biolink:regulates" share the same ancestors + if set(['biolink:ameliorates', 'biolink:treats']).intersection(set(qedge.predicates)): # Figure out if this is a "treats" query, then use call XDTD + # Call XDTD and simply return whatever it returns + # Get the subject and object of this edge + subject_qnode = query_graph.nodes[qedge.subject] # drug + object_qnode = query_graph.nodes[qedge.object] # disease + if object_qnode.ids and len(object_qnode.ids) >= 1: + object_curie = object_qnode.ids[0] # FIXME: will need a way to handle multiple IDs + else: + response.error(f"No CURIEs found for qnode {qedge.object}; ARAXInfer/XDTD requires that the" + f" object qnode has 'ids' specified", error_code="NoCURIEs") + # raise Exception(f"No CURIEs found for {object_qnode.name}") + return response, overarching_kg + if subject_qnode.ids and len(subject_qnode.ids) >= 1: + subject_curie = subject_qnode.ids # FIXME: will need a way to handle multiple IDs + else: + subject_curie = None + response.info(f"Calling XDTD from Expand for qedge {inferred_qedge_key} (has knowledge_type == inferred) and the subject is {object_curie}") + response.update_query_plan(inferred_qedge_key, "arax-xdtd", + "Waiting", f"Waiting for response") + start = time.time() + + from ARAX_infer import ARAXInfer + infer_input_parameters = {"action": "drug_treatment_graph_expansion", + 'node_curie': object_curie, 'qedge_id': inferred_qedge_key, + 'drug_curie': subject_curie} + inferer = ARAXInfer() + infer_response = inferer.apply(response, infer_input_parameters) + # return infer_response + response = infer_response + overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(response.envelope.message.knowledge_graph) + + wait_time = round(time.time() - start) + if response.status == "OK": + done_message = f"Returned {len(overarching_kg.edges_by_qg_id.get(inferred_qedge_key, dict()))} " \ + f"edges in {wait_time} seconds" + response.update_query_plan(inferred_qedge_key, "arax-xdtd", "Done", done_message) + else: + response.update_query_plan(inferred_qedge_key, "arax-xdtd", "Error", + f"Process error-ed out with {response.status} after {wait_time} seconds") + + elif set(['biolink:regulates']).intersection(set(qedge.predicates)): # Figure out if this is a "regulates" query, then use call XCRG models + # Call XCRG models and simply return whatever it returns + # Get the subject and object of this edge + subject_qnode = query_graph.nodes[qedge.subject] # chemical + object_qnode = query_graph.nodes[qedge.object] # gene + qualifier_direction = \ + [qualifier.qualifier_value for qualifier_constraint in qedge.qualifier_constraints for + qualifier in qualifier_constraint.qualifier_set if + qualifier.qualifier_type_id == 'biolink:object_direction_qualifier'][0] + if qualifier_direction == 'increased': + regulation_type = 'increase' + elif qualifier_direction == 'decreased': + regulation_type = 'decrease' + else: + response.error(f"The action 'chemical_gene_regulation_graph_expansion' only support the qualifier_direction with either 'increased' or 'decreased' but {qualifier_direction} provided.") + return response, overarching_kg + if subject_qnode.ids and len(subject_qnode.ids) >= 1: + subject_curie = subject_qnode.ids[0] # FIXME: will need a way to handle multiple IDs + else: + subject_curie = None + if object_qnode.ids and len(object_qnode.ids) >= 1: + object_curie = object_qnode.ids[0] # FIXME: will need a way to handle multiple IDs + else: + object_curie = None + if not subject_curie and not object_curie: + response.error(f"No CURIEs found for both query subject node {qedge.subject} and query object node {qedge.object}; ARAXInfer/XCRG requires " + f"that at least subject qnode or object qnode has 'ids' specified", + error_code="NoCURIEs") + return response, overarching_kg + if subject_curie and object_curie: + response.error(f"The action 'chemical_gene_regulation_graph_expansion' hasn't support the prediction for a single chemical-gene pair yet.", + error_code="InvalidCURIEs") + return response, overarching_kg + response.info(f"Calling XCRG from Expand for qedge {inferred_qedge_key} (has knowledge_type == inferred) and the subject is {subject_curie} and the object is {object_curie}") + response.update_query_plan(inferred_qedge_key, "arax-xcrg", + "Waiting", f"Waiting for response") + start = time.time() + + from ARAX_infer import ARAXInfer + if subject_curie: + infer_input_parameters = {"action": "chemical_gene_regulation_graph_expansion", + 'subject_qnode_id': qedge.subject, + 'qedge_id': inferred_qedge_key, + 'regulation_type': regulation_type} + else: + infer_input_parameters = {"action": "chemical_gene_regulation_graph_expansion", + 'object_qnode_id': qedge.object, 'object_curie': object_curie, + 'qedge_id': inferred_qedge_key, + 'regulation_type': regulation_type} + inferer = ARAXInfer() + infer_response = inferer.apply(response, infer_input_parameters) + response = infer_response + overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(response.envelope.message.knowledge_graph) + + wait_time = round(time.time() - start) + if response.status == "OK": + done_message = f"Returned {len(overarching_kg.edges_by_qg_id.get(inferred_qedge_key, dict()))} " \ + f"edges in {wait_time} seconds" + response.update_query_plan(inferred_qedge_key, "arax-xcrg", "Done", done_message) + else: + response.update_query_plan(inferred_qedge_key, "arax-xcrg", "Error", + f"Process error-ed out with {response.status} after {wait_time} seconds") + else: + response.info(f"Qedge {inferred_qedge_key} has knowledge_type == inferred, but the query is not " + f"DTD-related (e.g., 'biolink:ameliorates', 'biolink:treats') or CRG-related ('biolink:regulates') according to the specified predicate. Will answer using the normal 'fill' strategy (not creative mode).") + else: + response.warning( + f"Expand does not yet know how to answer multi-qedge query graphs when one or more of " + f"the qedges has knowledge_type == inferred. Will answer using the normal 'fill' strategy " + f"(not creative mode).") + + response.debug(f"Done calling ARAX Infer from Expand; returning to regular Expand execution") + return response, overarching_kg + async def _expand_edge_async(self, edge_qg: QueryGraph, kp_to_use: str, - input_parameters: Dict[str, any], user_specified_kp: bool, kp_timeout: Optional[int], force_local: bool, @@ -721,38 +740,15 @@ async def _expand_edge_async(self, edge_qg: QueryGraph, error_code="InvalidKP") return answer_kg, log - # Route this query to the proper place depending on the KP + # Route this query to the KP's TRAPI API try: - use_custom_querier = kp_to_use in {'infores:arax-drug-treats-disease', 'infores:arax-normalized-google-distance'} - if use_custom_querier: - num_input_curies = max([len(eu.convert_to_list(qnode.ids)) for qnode in edge_qg.nodes.values()]) - waiting_message = f"Query with {num_input_curies} curies sent: waiting for response" - log.update_query_plan(qedge_key, kp_to_use, "Waiting", waiting_message) - start = time.time() - if kp_to_use == 'infores:arax-drug-treats-disease': - # from Expand.DTD_querier import DTDQuerier - # kp_querier = DTDQuerier(log) - log.warning(f"DTD KP has been replaced with xDTD and thus is currently disabled.") - return answer_kg, log - else: - kp_querier = NGDQuerier(log) - answer_kg = kp_querier.answer_one_hop_query(edge_qg) - wait_time = round(time.time() - start) - if log.status == 'OK': - done_message = f"Returned {len(answer_kg.edges_by_qg_id.get(qedge_key, dict()))} edges in {wait_time} seconds" - log.update_query_plan(qedge_key, kp_to_use, "Done", done_message) - else: - log.update_query_plan(qedge_key, kp_to_use, "Error", f"Process error-ed out with {log.status} after {wait_time} seconds") - return answer_kg, log - else: - # This is a general purpose querier for use with any KPs that we query via their TRAPI API - kp_querier = TRAPIQuerier(response_object=log, - kp_name=kp_to_use, - user_specified_kp=user_specified_kp, - kp_timeout=kp_timeout, - kp_selector=kp_selector, - force_local=force_local) - answer_kg = await kp_querier.answer_one_hop_query_async(edge_qg) + kp_querier = TRAPIQuerier(response_object=log, + kp_name=kp_to_use, + user_specified_kp=user_specified_kp, + kp_timeout=kp_timeout, + kp_selector=kp_selector, + force_local=force_local) + answer_kg = await kp_querier.answer_one_hop_query_async(edge_qg) except Exception: tb = traceback.format_exc() error_type, error, _ = sys.exc_info() @@ -780,8 +776,6 @@ async def _expand_edge_async(self, edge_qg: QueryGraph, answer_kg = self._remove_self_edges(answer_kg, kp_to_use, log) return answer_kg, log - - def _expand_edge_kg2_local(self, one_hop_qg: QueryGraph, log: ARAXResponse) -> Tuple[QGOrganizedKnowledgeGraph, ARAXResponse]: qedge_key = next(qedge_key for qedge_key in one_hop_qg.edges) diff --git a/code/ARAX/ARAXQuery/ARAX_infer.py b/code/ARAX/ARAXQuery/ARAX_infer.py index 869ffd4e4..b2be0e7a9 100644 --- a/code/ARAX/ARAXQuery/ARAX_infer.py +++ b/code/ARAX/ARAXQuery/ARAX_infer.py @@ -265,6 +265,8 @@ def check_params(self, allowable_parameters): error_code="UnknownParameter") return -1 elif type(item) == list or type(item) == set: + if key == 'drug_curie': + continue for item_val in item: if item_val not in allowable_parameters[key]: self.response.error( @@ -279,6 +281,8 @@ def check_params(self, allowable_parameters): continue elif key == "node_curie": #FIXME: For now, if it's a node curie, just accept it as it is continue + elif key == "drug_curie": + continue elif key == "subject_curie": #FIXME: same as above continue elif key == "object_curie": #FIXME: same as above @@ -345,7 +349,8 @@ def apply(self, response, input_parameters): def __drug_treatment_graph_expansion(self, describe=False): """ Run "drug_treatment_graph_expansion" action. - Allowable parameters: {'node_curie': str, + Allowable parameters: {'node_curie': str, + 'drug_curie': list 'qedge_id': str, 'n_drugs': int 'n_paths': int} @@ -359,6 +364,7 @@ def __drug_treatment_graph_expansion(self, describe=False): if message and parameters and hasattr(message, 'query_graph') and hasattr(message.query_graph, 'nodes'): allowable_parameters = {'action': {'drug_treatment_graph_expansion'}, 'node_curie': {str()}, + 'drug_curie': {str()}, 'qedge_id': set([key for key in self.message.query_graph.edges.keys()]), 'n_drugs': {int()}, 'n_paths': {int()} @@ -366,6 +372,7 @@ def __drug_treatment_graph_expansion(self, describe=False): else: allowable_parameters = {'action': {'drug_treatment_graph_expansion'}, 'node_curie': {'The node to predict drug treatments for.'}, + 'drug_curie': {'This node contains a list of drug curies for which xDTD will try to find the path for between node_curie and itself'}, 'qedge_id': {'The edge to place the predicted mechanism of action on. If none is provided, the query graph must be empty and a new one will be inserted.'}, 'n_drugs': {'The number of drugs to return. Defaults to 50. Maxiumum is only allowable to be 50.'}, 'n_paths': {'The number of paths connecting each drug to return. Defaults to 25. Maxiumum is only allowable to be 25.'} @@ -422,6 +429,8 @@ def __drug_treatment_graph_expansion(self, describe=False): if self.response.status != 'OK': return self.response + if 'drug_curie' not in self.parameters: + self.parameters['drug_curie'] = None # normalized_curie = self.synonymizer.get_canonical_curies(self.parameters['node_curie'])[self.parameters['node_curie']] # if normalized_curie: # preferred_curie = normalized_curie['preferred_curie'] @@ -438,7 +447,21 @@ def __drug_treatment_graph_expansion(self, describe=False): else: self.response.warning(f"Could not get equivalent curies for disease {self.parameters['node_curie']}") return self.response - + if self.parameters['drug_curie']: + + drug_equivalent_curies_dict = self.synonymizer.get_equivalent_nodes(self.parameters['drug_curie']) + drug_equivalent_curies = [] + for key,value in drug_equivalent_curies_dict.items(): + if not value: + continue + drug_equivalent_curies += list(value) + + [self.parameters['drug_curie']] + if drug_equivalent_curies: + self.response.debug(f"Get equivalent curies for Drug Curie {drug_equivalent_curies} from Node Synonymizer for {self.parameters['drug_curie']}") + else: + self.response.warning(f"Could not get equivalent curies for Drug {self.parameters['drug_curie']}") + return self.response for preferred_curie in all_equivalent_curies: try: top_drugs = XDTD.get_top_drugs_for_disease(disease_ids=preferred_curie) @@ -466,6 +489,14 @@ def __drug_treatment_graph_expansion(self, describe=False): ## Limit the number of drugs and paths to the top n top_drugs = top_drugs.iloc[:self.parameters['n_drugs'],:].reset_index(drop=True) top_paths = {(row[0], row[2]):top_paths[(row[0], row[2])][:self.parameters['n_paths']] for row in top_drugs.to_numpy() if (row[0], row[2]) in top_paths} + if self.parameters['drug_curie']: + drugs_set = set(top_drugs['drug_id']) + intersecting_drug_curies = list(drugs_set.intersection(drug_equivalent_curies)) + if not intersecting_drug_curies: + continue + top_drugs = top_drugs[top_drugs['drug_id'].isin(intersecting_drug_curies)].reset_index(drop=True) + + top_paths = {pair:path for pair,path in top_paths.items() if pair[0] in intersecting_drug_curies} # # TRAPI-ifies the results of the model qedge_id = self.parameters.get('qedge_id') diff --git a/code/ARAX/ARAXQuery/Expand/kp_info_cacher.py b/code/ARAX/ARAXQuery/Expand/kp_info_cacher.py index 597789ce3..5ee60888b 100644 --- a/code/ARAX/ARAXQuery/Expand/kp_info_cacher.py +++ b/code/ARAX/ARAXQuery/Expand/kp_info_cacher.py @@ -58,9 +58,6 @@ def refresh_kp_info_caches(self): # Transform the info into the format we want allowed_kp_urls = {kp_registration["infores_name"]: self._get_kp_url_from_smartapi_registration(kp_registration) for kp_registration in smart_api_kp_registrations} - # Add entries for our local KPs (that are not web services) - allowed_kp_urls["infores:arax-drug-treats-disease"] = None - allowed_kp_urls["infores:arax-normalized-google-distance"] = None smart_api_cache_contents = {"allowed_kp_urls": allowed_kp_urls, "kps_excluded_by_version": smart_api_helper.kps_excluded_by_version, @@ -257,15 +254,7 @@ def _build_meta_map(self, allowed_kps_dict: Dict[str, str]): for category, meta_node in kp_meta_kg["nodes"].items()}} else: eprint(f"Unable to access {kp_infores_curie}'s /meta_knowledge_graph endpoint " - f"(returned status of {kp_response.status_code} for URL {kp_endpoint_url})") - elif kp_infores_curie == "infores:arax-drug-treats-disease": - meta_map[kp_infores_curie] = {"predicates": self._get_dtd_meta_map(), - "prefixes": dict()} - elif kp_infores_curie == "infores:arax-normalized-google-distance": - # This is just a placeholder; not really used for KP selection - predicates = {"biolink:NamedThing": {"biolink:NamedThing": {"biolink:occurs_together_in_literature_with"}}} - meta_map[kp_infores_curie] = {"predicates": predicates, - "prefixes": dict()} + f"(returned status of {kp_response.status_code} for URL {kp_endpoint_url})") # Make sure the map doesn't contain any 'stale' KPs (KPs that used to be in SmartAPI but no longer are) stale_kps = set(meta_map).difference(allowed_kps_dict) @@ -290,21 +279,6 @@ def _convert_meta_kg_to_meta_map(kp_meta_kg: dict) -> dict: kp_meta_map[subject_category][object_category].add(predicate) return kp_meta_map - @staticmethod - def _get_dtd_meta_map(): - dtd_predicates = {"biolink:treats", "biolink:treated_by"} - drug_ish_dict = {"biolink:Drug": dtd_predicates, - "biolink:SmallMolecule": dtd_predicates} - disease_ish_dict = {"biolink:Disease": dtd_predicates, - "biolink:PhenotypicFeature": dtd_predicates, - "biolink:DiseaseOrPhenotypicFeature": dtd_predicates} - dtd_meta_map = {"biolink:Drug": disease_ish_dict, - "biolink:SmallMolecule": disease_ish_dict, - "biolink:Disease": drug_ish_dict, - "biolink:PhenotypicFeature": drug_ish_dict, - "biolink:DiseaseOrPhenotypicFeature": drug_ish_dict} - return dtd_meta_map - if __name__ == "__main__": KPInfoCacher().refresh_kp_info_caches() diff --git a/code/ARAX/ARAXQuery/Expand/kp_selector.py b/code/ARAX/ARAXQuery/Expand/kp_selector.py index 215516be0..d9897ed67 100644 --- a/code/ARAX/ARAXQuery/Expand/kp_selector.py +++ b/code/ARAX/ARAXQuery/Expand/kp_selector.py @@ -1,5 +1,6 @@ #!/bin/env python3 import os +import pprint import sys from typing import Set, List, Optional from collections import defaultdict @@ -26,7 +27,7 @@ def __init__(self, kg2_mode: bool = False, log: ARAXResponse = ARAXResponse()): self.kp_cacher = KPInfoCacher() self.meta_map, self.kp_urls, self.kps_excluded_by_version, self.kps_excluded_by_maturity = self._load_cached_kp_info() self.valid_kps = {"infores:rtx-kg2"} if self.kg2_mode else set(self.kp_urls.keys()) - self.biolink_helper = BiolinkHelper() + self.bh = BiolinkHelper() def _load_cached_kp_info(self) -> tuple: if self.kg2_mode: @@ -40,7 +41,8 @@ def _load_cached_kp_info(self) -> tuple: # Record None URLs for our local KPs allowed_kp_urls = smart_api_info["allowed_kp_urls"] - return meta_map, allowed_kp_urls, smart_api_info["kps_excluded_by_version"], smart_api_info["kps_excluded_by_maturity"] + return (meta_map, allowed_kp_urls, smart_api_info["kps_excluded_by_version"], + smart_api_info["kps_excluded_by_maturity"]) def get_kps_for_single_hop_qg(self, qg: QueryGraph) -> Optional[Set[str]]: """ @@ -49,26 +51,34 @@ def get_kps_for_single_hop_qg(self, qg: QueryGraph) -> Optional[Set[str]]: """ qedge_key = next(qedge_key for qedge_key in qg.edges) qedge = qg.edges[qedge_key] + qedge_predicates = qedge.predicates if qedge.predicates else [self.bh.root_predicate] self.log.debug(f"Selecting KPs to use for qedge {qedge_key}") # confirm that the qg is one hop if len(qg.edges) > 1: - self.log.error(f"Query graph can only have one edge, but instead has {len(qg.edges)}.", error_code="UnexpectedQG") + self.log.error(f"Query graph can only have one edge, but instead has {len(qg.edges)}.", + error_code="UnexpectedQG") return None # isolate possible subject predicate object from qg - sub_categories = set(self.biolink_helper.get_descendants(qg.nodes[qedge.subject].categories)) - obj_categories = set(self.biolink_helper.get_descendants(qg.nodes[qedge.object].categories)) - predicates = set(self.biolink_helper.get_descendants(qedge.predicates)) + sub_categories = set(self.bh.get_descendants(qg.nodes[qedge.subject].categories)) + obj_categories = set(self.bh.get_descendants(qg.nodes[qedge.object].categories)) + predicates = set(self.bh.get_descendants(qedge_predicates)) - symmetrical_predicates = set(filter(self.biolink_helper.is_symmetric, predicates)) + symmetrical_predicates = set(filter(self.bh.is_symmetric, predicates)) # use metamap to check kp for predicate triple self.log.debug(f"selecting from {len(self.valid_kps)} kps") accepting_kps = set() for kp in self.meta_map: - if self._triple_is_in_meta_map(kp, sub_categories, predicates, obj_categories): + if self._triple_is_in_meta_map(kp, + sub_categories, + predicates, + obj_categories): accepting_kps.add(kp) # account for symmetrical predicates by checking if kp accepts with swapped sub and obj categories - elif self._triple_is_in_meta_map(kp, obj_categories, symmetrical_predicates, sub_categories): + elif symmetrical_predicates and self._triple_is_in_meta_map(kp, + obj_categories, + symmetrical_predicates, + sub_categories): accepting_kps.add(kp) else: self.log.update_query_plan(qedge_key, kp, "Skipped", "MetaKG indicates this qedge is unsupported") @@ -100,14 +110,18 @@ def kp_accepts_single_hop_qg(self, qg: QueryGraph, kp: str) -> Optional[bool]: return None qedge = list(qg.edges.values())[0] - sub_categories = set(self.biolink_helper.get_descendants(qg.nodes[qedge.subject].categories)) - obj_categories = set(self.biolink_helper.get_descendants(qg.nodes[qedge.object].categories)) - predicates = set(self.biolink_helper.get_descendants(qedge.predicates)) + sub_categories = set(self.bh.get_descendants(qg.nodes[qedge.subject].categories)) + obj_categories = set(self.bh.get_descendants(qg.nodes[qedge.object].categories)) + qedge_predicates = qedge.predicates if qedge.predicates else [self.bh.root_predicate] + predicates = set(self.bh.get_descendants(qedge_predicates)) kp_accepts = self._triple_is_in_meta_map(kp, sub_categories, predicates, obj_categories) # account for symmetrical predicates by checking if kp accepts with swapped sub and obj categories - symmetrical_predicates = set(filter(self.biolink_helper.is_symmetric, predicates)) - kp_accepts = kp_accepts or self._triple_is_in_meta_map(kp, obj_categories, symmetrical_predicates, sub_categories) + symmetrical_predicates = set(filter(self.bh.is_symmetric, predicates)) + kp_accepts = kp_accepts or (symmetrical_predicates and self._triple_is_in_meta_map(kp, + obj_categories, + symmetrical_predicates, + sub_categories)) return kp_accepts @@ -188,18 +202,24 @@ def _get_uppercase_prefix(curie: str) -> str: return curie.split(":")[0].upper() def _get_supported_prefixes(self, categories: List[str], kp: str) -> Set[str]: - bh = BiolinkHelper() - categories_with_descendants = bh.get_descendants(eu.convert_to_list(categories), include_mixins=False) + categories_with_descendants = self.bh.get_descendants(eu.convert_to_list(categories), include_mixins=False) supported_prefixes = {prefix.upper() for category in categories_with_descendants for prefix in self.meta_map[kp]["prefixes"].get(category, set())} return supported_prefixes - def _triple_is_in_meta_map(self, kp: str, subject_categories: Set[str], predicates: Set[str], object_categories: Set[str]) -> bool: - # returns True if at least one possible triple exists in the KP's meta map + def _triple_is_in_meta_map(self, kp: str, + subject_categories: Set[str], + predicates: Set[str], + object_categories: Set[str]) -> bool: + """ + Returns True if at least one possible triple exists in the KP's meta map. NOT meant to handle empty predicates; + you should sub in "biolink:related_to" for QEdges without predicates before calling this method. + """ kp_meta_map = self.meta_map.get(kp) if not kp_meta_map: if kp not in self.valid_kps: - self.log.error(f"{kp} does not seem to be a valid KP for ARAX. Valid KPs are: {self.valid_kps}", error_code="InvalidKP") + self.log.error(f"{kp} does not seem to be a valid KP for ARAX. Valid KPs are: {self.valid_kps}", + error_code="InvalidKP") else: self.log.warning(f"Somehow missing meta info for {kp}.") return False @@ -212,7 +232,6 @@ def _triple_is_in_meta_map(self, kp: str, subject_categories: Set[str], predicat object_set = set() _ = [object_set.add(obj) for obj_dict in predicates_map.values() for obj in obj_dict.keys()] object_categories = object_set - any_predicate = False if predicates or kp == "NGD" else True # handle combinations of subject and objects using cross product qg_sub_obj_dict = defaultdict(lambda: set()) @@ -230,6 +249,17 @@ def _triple_is_in_meta_map(self, kp: str, subject_categories: Set[str], predicat if len(accepted_objs) > 0: # check predicates for obj in accepted_objs: - if any_predicate or predicates.intersection(predicates_map[sub][obj]): + if predicates.intersection(predicates_map[sub][obj]): return True return False + + +def main(): + kp_selector = KPSelector() + print(f"Meta map is:") + pp = pprint.PrettyPrinter(indent=2) + pp.pprint(kp_selector.meta_map) + + +if __name__ == "__main__": + main() diff --git a/code/ARAX/ARAXQuery/Expand/ngd_querier.py b/code/ARAX/ARAXQuery/Expand/ngd_querier.py deleted file mode 100644 index 9daa12176..000000000 --- a/code/ARAX/ARAXQuery/Expand/ngd_querier.py +++ /dev/null @@ -1,150 +0,0 @@ -#!/bin/env python3 -import copy -import sys -import os -import traceback -import ast -from typing import List, Dict, Tuple - -sys.path.append(os.path.dirname(os.path.abspath(__file__))) -import expand_utilities as eu -from expand_utilities import QGOrganizedKnowledgeGraph -sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../") # ARAXQuery directory -from ARAX_query import ARAXQuery -from ARAX_response import ARAXResponse -from ARAX_decorator import ARAXDecorator -sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../Overlay/") -from Overlay.compute_ngd import ComputeNGD -sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../UI/OpenAPI/python-flask-server/") -from openapi_server.models.node import Node -from openapi_server.models.edge import Edge -from openapi_server.models.attribute import Attribute -from openapi_server.models.query_graph import QueryGraph -from openapi_server.models.q_node import QNode -from openapi_server.models.message import Message - - -class NGDQuerier: - - def __init__(self, response_object: ARAXResponse): - self.response = response_object - self.ngd_edge_predicate = "biolink:occurs_together_in_literature_with" - self.accepted_qedge_predicates = {"biolink:occurs_together_in_literature_with", "biolink:related_to"} - self.ngd_edge_attribute_name = "normalized_google_distance" - self.ngd_edge_attribute_type = "EDAM-DATA:2526" - self.decorator = ARAXDecorator() - - def answer_one_hop_query(self, query_graph: QueryGraph) -> QGOrganizedKnowledgeGraph: - """ - This function answers a one-hop (single-edge) query using NGD (with the assistance of KG2). - :param query_graph: A TRAPI query graph. - :return: An (almost) TRAPI knowledge graph containing all of the nodes and edges returned as - results for the query. (Organized by QG IDs.) - """ - log = self.response - final_kg = QGOrganizedKnowledgeGraph() - - # Verify this is a valid one-hop query graph - self._verify_one_hop_query_graph_is_valid(query_graph, log) - if log.status != 'OK': - return final_kg - qedge_key = next(qedge_key for qedge_key in query_graph.edges) - qedge = query_graph.edges[qedge_key] - if qedge.predicates and not set(qedge.predicates).intersection(self.accepted_qedge_predicates): - log.error(f"NGD can only expand qedges with these predicates: {self.accepted_qedge_predicates}. QEdge" - f" {qedge_key}'s predicate is: {qedge.predicates}", error_code="UnsupportedQG") - return final_kg - source_qnode_key = qedge.subject - target_qnode_key = qedge.object - - # Find potential answers using KG2 - log.debug(f"Finding potential answers using KG2") - modified_qg = copy.deepcopy(query_graph) - for qedge in modified_qg.edges.values(): - qedge.predicates = None - - request_body = {"message": {"query_graph": modified_qg.to_dict()}} - kg2_response, kg2_message = self._run_arax_query(request_body, log) - if log.status != 'OK': - return final_kg - - # Go through those answers from KG2 and calculate ngd for each edge - log.debug(f"Calculating NGD between each potential node pair") - kg2_answer_kg = kg2_message.knowledge_graph - cngd = ComputeNGD(log, kg2_message, None) - cngd.load_curie_to_pmids_data(kg2_answer_kg.nodes) - kg2_edge_ngd_map = dict() - for kg2_edge_key, kg2_edge in kg2_answer_kg.edges.items(): - kg2_node_1_key = kg2_edge.subject - kg2_node_2_key = kg2_edge.object - kg2_node_1 = kg2_answer_kg.nodes.get(kg2_node_1_key) # These are already canonicalized (default behavior) - kg2_node_2 = kg2_answer_kg.nodes.get(kg2_node_2_key) - # Figure out which node corresponds to source qnode (don't necessarily match b/c query was bidirectional) - if source_qnode_key in kg2_node_1.qnode_keys and target_qnode_key in kg2_node_2.qnode_keys: - ngd_subject = kg2_node_1_key - ngd_object = kg2_node_2_key - else: - ngd_subject = kg2_node_2_key - ngd_object = kg2_node_1_key - ngd_value, pmid_set = cngd.calculate_ngd_fast(ngd_subject, ngd_object) - kg2_edge_ngd_map[kg2_edge_key] = {"ngd_value": ngd_value, "subject": ngd_subject, "object": ngd_object, "pmids": [f"PMID:{pmid}" for pmid in pmid_set]} - - # Create edges for those from KG2 found to have a low enough ngd value - threshold = 0.5 - log.debug(f"Creating edges between node pairs with NGD below the threshold ({threshold})") - for kg2_edge_key, ngd_info_dict in kg2_edge_ngd_map.items(): - ngd_value = ngd_info_dict['ngd_value'] - if ngd_value is not None and ngd_value < threshold: # TODO: Make determination of the threshold much more sophisticated - subject = ngd_info_dict["subject"] - object = ngd_info_dict["object"] - pmid_list = ngd_info_dict["pmids"] - ngd_edge_key, ngd_edge = self._create_ngd_edge(ngd_value, subject, object, pmid_list) - ngd_source_node_key, ngd_source_node = self._create_ngd_node(ngd_edge.subject, kg2_answer_kg.nodes.get(ngd_edge.subject)) - ngd_target_node_key, ngd_target_node = self._create_ngd_node(ngd_edge.object, kg2_answer_kg.nodes.get(ngd_edge.object)) - final_kg.add_edge(ngd_edge_key, ngd_edge, qedge_key) - final_kg.add_node(ngd_source_node_key, ngd_source_node, source_qnode_key) - final_kg.add_node(ngd_target_node_key, ngd_target_node, target_qnode_key) - - return final_kg - - def _create_ngd_edge(self, ngd_value: float, subject: str, object: str, pmid_list: list) -> Tuple[str, Edge]: - ngd_edge = Edge() - ngd_edge.predicate = self.ngd_edge_predicate - ngd_edge.subject = subject - ngd_edge.object = object - ngd_edge_key = f"NGD:{subject}--{ngd_edge.predicate}--{object}" - ngd_edge.attributes = [Attribute(original_attribute_name=self.ngd_edge_attribute_name, - attribute_type_id=self.ngd_edge_attribute_type, - value=ngd_value)] - kp_description = "ARAX's in-house normalized google distance database." - ngd_edge.attributes += [self.decorator.create_attribute("publications", pmid_list), - eu.get_computed_value_attribute()] - return ngd_edge_key, ngd_edge - - @staticmethod - def _create_ngd_node(kg2_node_key: str, kg2_node: Node) -> Tuple[str, Node]: - ngd_node = Node() - ngd_node_key = kg2_node_key - ngd_node.name = kg2_node.name - ngd_node.categories = kg2_node.categories - return ngd_node_key, ngd_node - - @staticmethod - def _run_arax_query(request_body: dict, log: ARAXResponse) -> Tuple[ARAXResponse, Message]: - araxq = ARAXQuery() - sub_query_response = araxq.query(request_body, mode="RTXKG2") - if sub_query_response.status != 'OK': - log.error(f"Encountered an error running ARAXQuery within Expand: {sub_query_response.show(level=sub_query_response.DEBUG)}") - return sub_query_response, araxq.message - - @staticmethod - def _verify_one_hop_query_graph_is_valid(query_graph: QueryGraph, log: ARAXResponse): - if len(query_graph.edges) != 1: - log.error(f"answer_one_hop_query() was passed a query graph that is not one-hop: " - f"{query_graph.to_dict()}", error_code="InvalidQuery") - elif len(query_graph.nodes) > 2: - log.error(f"answer_one_hop_query() was passed a query graph with more than two nodes: " - f"{query_graph.to_dict()}", error_code="InvalidQuery") - elif len(query_graph.nodes) < 2: - log.error(f"answer_one_hop_query() was passed a query graph with less than two nodes: " - f"{query_graph.to_dict()}", error_code="InvalidQuery") diff --git a/code/ARAX/ARAXQuery/Filter_KG/remove_nodes.py b/code/ARAX/ARAXQuery/Filter_KG/remove_nodes.py index a8f8bc1af..555c96eb2 100644 --- a/code/ARAX/ARAXQuery/Filter_KG/remove_nodes.py +++ b/code/ARAX/ARAXQuery/Filter_KG/remove_nodes.py @@ -5,7 +5,7 @@ import traceback import json import numpy as np - +import re class RemoveNodes: @@ -176,9 +176,14 @@ def remove_general_concept_nodes(self): synonyms += attribute.get('value',[]) if node_dict['name']: synonyms.append(node_dict['name'].lower()) - if block_list_synonyms.intersection([synonym.lower() for synonym in synonyms if synonym]) \ - or block_list_curies.intersection([curie.lower() for curie in curies if curie]): + if block_list_curies.intersection([curie.lower() for curie in curies if curie]): nodes_to_remove.add(key) + continue + for synonym in synonyms: + for block_list_synonym in block_list_synonyms: + if re.match(block_list_synonym, synonym,re.IGNORECASE): + nodes_to_remove.add(key) + for key in nodes_to_remove: del self.message.knowledge_graph.nodes[key] diff --git a/code/ARAX/Documentation/DSL_Documentation.md b/code/ARAX/Documentation/DSL_Documentation.md index b1f0f9ec5..731353153 100644 --- a/code/ARAX/Documentation/DSL_Documentation.md +++ b/code/ARAX/Documentation/DSL_Documentation.md @@ -9,12 +9,12 @@ - [ARAX_expander](#arax_expander) - [expand()](#expand) - [ARAX_overlay](#arax_overlay) - - [overlay(action=compute_jaccard)](#overlayactioncompute_jaccard) - [overlay(action=overlay_clinical_info)](#overlayactionoverlay_clinical_info) - [overlay(action=fisher_exact_test)](#overlayactionfisher_exact_test) - - [overlay(action=compute_ngd)](#overlayactioncompute_ngd) - - [overlay(action=overlay_exposures_data)](#overlayactionoverlay_exposures_data) - [overlay(action=add_node_pmids)](#overlayactionadd_node_pmids) + - [overlay(action=overlay_exposures_data)](#overlayactionoverlay_exposures_data) + - [overlay(action=compute_ngd)](#overlayactioncompute_ngd) + - [overlay(action=compute_jaccard)](#overlayactioncompute_jaccard) - [ARAX_filter_kg](#arax_filter_kg) - [filter_kg(action=remove_edges_by_predicate)](#filter_kgactionremove_edges_by_predicate) - [filter_kg(action=remove_edges_by_continuous_attribute)](#filter_kgactionremove_edges_by_continuous_attribute) @@ -219,7 +219,7 @@ The `add_qedge` command adds an additional QEdge to the QueryGraph in the Messag This command will expand (aka, answer/fill) your query graph in an edge-by-edge fashion, intelligently selecting which KPs to use for each edge. It selects KPs from the SmartAPI Registry based on the meta information provided by their TRAPI APIs (when available), whether they have an endpoint running a matching TRAPI version, and whether they have an endpoint with matching maturity. For each QEdge, it queries the selected KPs in parallel; it will timeout for a particular KP if it decides it's taking too long to respond. You may also optionally specify a particular KP to use via the 'kp' parameter (described below). Current candidate KPs include (for TRAPI 1.4, maturity 'development'): -infores:arax-drug-treats-disease, infores:arax-normalized-google-distance, infores:automat-gwas-catalog, infores:automat-hgnc, infores:automat-icees-kg, infores:automat-panther, infores:automat-sri-reference-kg, infores:cohd, infores:connections-hypothesis, infores:cqs, infores:genetics-data-provider, infores:knowledge-collaboratory, infores:molepro, infores:openpredict, infores:rtx-kg2, infores:service-provider-trapi, infores:spoke, infores:text-mining-provider-cooccurrence. +infores:agrkb, infores:arax-drug-treats-disease, infores:arax-normalized-google-distance, infores:automat-biolink, infores:automat-cam-kp, infores:automat-ctd, infores:automat-drug-central, infores:automat-gtex, infores:automat-gtopdb, infores:automat-gwas-catalog, infores:automat-hetio, infores:automat-hgnc, infores:automat-hmdb, infores:automat-human-goa, infores:automat-icees-kg, infores:automat-intact, infores:automat-panther, infores:automat-pharos, infores:automat-robokop, infores:automat-sri-reference-kg, infores:automat-string-db, infores:automat-ubergraph, infores:automat-ubergraph-nonredundant, infores:automat-viral-proteome, infores:cohd, infores:connections-hypothesis, infores:gelinea, infores:genetics-data-provider, infores:knowledge-collaboratory, infores:molepro, infores:openpredict, infores:rtx-kg2, infores:service-provider-trapi, infores:spoke, infores:text-mining-provider-cooccurrence. (Note that this list of KPs may change unexpectedly based on the SmartAPI registry.) @@ -296,58 +296,6 @@ infores:arax-drug-treats-disease, infores:arax-normalized-google-distance, infor - `true` and `false` are examples of valid inputs. ## ARAX_overlay -### overlay(action=compute_jaccard) - -`compute_jaccard` creates virtual edges and adds an edge attribute (with the property name `jaccard_index`) containing the following information: -The jaccard similarity measures how many `intermediate_node_key`'s are shared in common between each `start_node_key` and `object_node_key`. -This is used for purposes such as "find me all drugs (`start_node_key`) that have many proteins (`intermediate_node_key`) in common with this disease (`end_node_key`)." -This can be used for downstream filtering to concentrate on relevant bioentities. - -This can be applied to an arbitrary knowledge graph as possible edge types are computed dynamically (i.e. not just those created/recognized by the ARA Expander team). - - -#### parameters: - -* ##### start_node_key - - - A curie id specifying the starting node - - - Acceptable input types: string. - - - This is a required parameter and must be included. - - - `DOID:1872`, `CHEBI:7476`, and `UMLS:C1764836` are examples of valid inputs. - -* ##### intermediate_node_key - - - A curie id specifying the intermediate node - - - Acceptable input types: string. - - - This is a required parameter and must be included. - - - `DOID:1872`, `CHEBI:7476`, and `UMLS:C1764836` are examples of valid inputs. - -* ##### end_node_key - - - A curie id specifying the ending node - - - Acceptable input types: string. - - - This is a required parameter and must be included. - - - `DOID:1872`, `CHEBI:7476`, and `UMLS:C1764836` are examples of valid inputs. - -* ##### virtual_relation_label - - - An optional label to help identify the virtual edge in the relation field. - - - Acceptable input types: string. - - - This is a required parameter and must be included. - - - `N1`, `J2`, and `FET` are examples of valid inputs. - ### overlay(action=overlay_clinical_info) `overlay_clinical_info` overlay edges with information obtained from the knowledge provider (KP) Columbia Open Health Data (COHD). @@ -519,33 +467,39 @@ _, pvalue = stats.fisher_exact([[a, b], [c, d]]) - If not specified the default input will be None. -### overlay(action=compute_ngd) - -`compute_ngd` computes a metric (called the normalized Google distance) based on edge soure/object node co-occurrence in abstracts of all PubMed articles. -This information is then included as an edge attribute with the name `normalized_google_distance`. -You have the choice of applying this to all edges in the knowledge graph, or only between specified subject/object qnode id's. If the later, virtual edges are added with the type specified by `virtual_relation_label`. - -Use cases include: +### overlay(action=add_node_pmids) -* focusing in on edges that are well represented in the literature -* focusing in on edges that are under-represented in the literature +`add_node_pmids` adds PubMed PMID's as node attributes to each node in the knowledge graph. +This information is obtained from mapping node identifiers to MeSH terms and obtaining which PubMed articles have this MeSH term +either labeling in the metadata or has the MeSH term occurring in the abstract of the article. This can be applied to an arbitrary knowledge graph as possible edge types are computed dynamically (i.e. not just those created/recognized by the ARA Expander team). #### parameters: -* ##### default_value +* ##### max_num - - The default value of the normalized Google distance (if its value cannot be determined) + - The maximum number of values to return. Enter 'all' to return everything - - Acceptable input types: string. + - Acceptable input types: int or string. - This is not a required parameter and may be omitted. - - `0` and `inf` are examples of valid inputs. + - `all`, `5`, and `50` are examples of valid inputs. - - If not specified the default input will be inf. + - If not specified the default input will be 100. + +### overlay(action=overlay_exposures_data) + +`overlay_exposures_data` overlays edges with p-values obtained from the ICEES+ (Integrated Clinical and Environmental Exposures Service) knowledge provider. +This information is included in edge attributes with the name `icees_p-value`. +You have the choice of applying this to all edges in the knowledge graph, or only between specified subject/object qnode IDs. If the latter, the data is added in 'virtual' edges with the type `has_icees_p-value_with`. + +This can be applied to an arbitrary knowledge graph (i.e. not just those created/recognized by Expander Agent). + + +#### parameters: * ##### virtual_relation_label @@ -577,17 +531,34 @@ This can be applied to an arbitrary knowledge graph as possible edge types are c - `n00` and `n01` are examples of valid inputs. -### overlay(action=overlay_exposures_data) +### overlay(action=compute_ngd) -`overlay_exposures_data` overlays edges with p-values obtained from the ICEES+ (Integrated Clinical and Environmental Exposures Service) knowledge provider. -This information is included in edge attributes with the name `icees_p-value`. -You have the choice of applying this to all edges in the knowledge graph, or only between specified subject/object qnode IDs. If the latter, the data is added in 'virtual' edges with the type `has_icees_p-value_with`. +`compute_ngd` computes a metric (called the normalized Google distance) based on edge soure/object node co-occurrence in abstracts of all PubMed articles. +This information is then included as an edge attribute with the name `normalized_google_distance`. +You have the choice of applying this to all edges in the knowledge graph, or only between specified subject/object qnode id's. If the later, virtual edges are added with the type specified by `virtual_relation_label`. -This can be applied to an arbitrary knowledge graph (i.e. not just those created/recognized by Expander Agent). +Use cases include: + +* focusing in on edges that are well represented in the literature +* focusing in on edges that are under-represented in the literature + +This can be applied to an arbitrary knowledge graph as possible edge types are computed dynamically (i.e. not just those created/recognized by the ARA Expander team). #### parameters: +* ##### default_value + + - The default value of the normalized Google distance (if its value cannot be determined) + + - Acceptable input types: string. + + - This is not a required parameter and may be omitted. + + - `0` and `inf` are examples of valid inputs. + + - If not specified the default input will be inf. + * ##### virtual_relation_label - An optional label to help identify the virtual edge in the relation field. @@ -618,28 +589,57 @@ This can be applied to an arbitrary knowledge graph (i.e. not just those created - `n00` and `n01` are examples of valid inputs. -### overlay(action=add_node_pmids) +### overlay(action=compute_jaccard) -`add_node_pmids` adds PubMed PMID's as node attributes to each node in the knowledge graph. -This information is obtained from mapping node identifiers to MeSH terms and obtaining which PubMed articles have this MeSH term -either labeling in the metadata or has the MeSH term occurring in the abstract of the article. +`compute_jaccard` creates virtual edges and adds an edge attribute (with the property name `jaccard_index`) containing the following information: +The jaccard similarity measures how many `intermediate_node_key`'s are shared in common between each `start_node_key` and `object_node_key`. +This is used for purposes such as "find me all drugs (`start_node_key`) that have many proteins (`intermediate_node_key`) in common with this disease (`end_node_key`)." +This can be used for downstream filtering to concentrate on relevant bioentities. This can be applied to an arbitrary knowledge graph as possible edge types are computed dynamically (i.e. not just those created/recognized by the ARA Expander team). #### parameters: -* ##### max_num +* ##### start_node_key - - The maximum number of values to return. Enter 'all' to return everything + - A curie id specifying the starting node - - Acceptable input types: int or string. + - Acceptable input types: string. - - This is not a required parameter and may be omitted. + - This is a required parameter and must be included. - - `all`, `5`, and `50` are examples of valid inputs. + - `DOID:1872`, `CHEBI:7476`, and `UMLS:C1764836` are examples of valid inputs. - - If not specified the default input will be 100. +* ##### intermediate_node_key + + - A curie id specifying the intermediate node + + - Acceptable input types: string. + + - This is a required parameter and must be included. + + - `DOID:1872`, `CHEBI:7476`, and `UMLS:C1764836` are examples of valid inputs. + +* ##### end_node_key + + - A curie id specifying the ending node + + - Acceptable input types: string. + + - This is a required parameter and must be included. + + - `DOID:1872`, `CHEBI:7476`, and `UMLS:C1764836` are examples of valid inputs. + +* ##### virtual_relation_label + + - An optional label to help identify the virtual edge in the relation field. + + - Acceptable input types: string. + + - This is a required parameter and must be included. + + - `N1`, `J2`, and `FET` are examples of valid inputs. ## ARAX_filter_kg ### filter_kg(action=remove_edges_by_predicate) @@ -1190,7 +1190,7 @@ This can be applied to an arbitrary knowledge graph as possible node categories - This is a required parameter and must be included. - `chemical_substance` and `disease` are examples of valid inputs. - + ### filter_kg(action=remove_general_concept_nodes) `remove_general_concept_nodes` removes nodes from the knowledge graph (KG) That are general concepts. @@ -1738,7 +1738,7 @@ This cannot be applied to non disease/phenotypic feature nodes (nodes that do no * ##### n_drugs - - The number of drug nodes to return. If not provided defaults to 25. Considering the response speed, the maximum number of drugs returned is only allowed to be 25. + - The number of drug nodes to return. If not provided defaults to 50. Considering the response speed, the maximum number of drugs returned is only allowed to be 50. - Acceptable input types: integer. @@ -1746,7 +1746,7 @@ This cannot be applied to non disease/phenotypic feature nodes (nodes that do no - `5`, `15`, and `25` are examples of valid inputs. - - If not specified the default input will be 25. + - If not specified the default input will be 50. * ##### n_paths diff --git a/code/ARAX/KnowledgeSources/general_concepts.json b/code/ARAX/KnowledgeSources/general_concepts.json index 3479acb67..e947584ea 100644 --- a/code/ARAX/KnowledgeSources/general_concepts.json +++ b/code/ARAX/KnowledgeSources/general_concepts.json @@ -673,6 +673,7 @@ "rna, untranslated", "secondary", "uterotonics", - "radiotherapy" + "radiotherapy", + "pharmacolog.*" ] } \ No newline at end of file diff --git a/code/ARAX/NodeSynonymizer/node_synonymizer.py b/code/ARAX/NodeSynonymizer/node_synonymizer.py index 6c0d6f09b..0353d3ee7 100644 --- a/code/ARAX/NodeSynonymizer/node_synonymizer.py +++ b/code/ARAX/NodeSynonymizer/node_synonymizer.py @@ -40,15 +40,9 @@ def __init__(self): self.sri_nn_infores_curie = "infores:sri-node-normalizer" self.arax_infores_curie = "infores:arax" - # If the database doesn't seem to exist, try running the DatabaseManager if not pathlib.Path(self.database_path).exists(): - print(f"Synonymizer not present at {self.database_path}; attempting to download with database manager..") - db_manager = ARAXDatabaseManager() - db_manager.update_databases() - - if not pathlib.Path(self.database_path).exists(): - raise ValueError(f"Synonymizer specified in config_dbs file does not exist locally, even after " - f"running the database manager! It should be at: {self.database_path}") + raise ValueError(f"Synonymizer specified in config_dbs file does not exist locally." + f" It should be at: {self.database_path}") else: self.db_connection = sqlite3.connect(self.database_path) @@ -291,6 +285,8 @@ def get_normalizer_results(self, entities: Optional[Union[str, Set[str], List[st # Trim down to minimal output, if requested if output_format == "minimal": for normalizer_info in results_dict.values(): + if normalizer_info is None: + continue keys_to_delete = set(normalizer_info.keys()).difference({"id"}) for dict_key in keys_to_delete: del normalizer_info[dict_key] diff --git a/code/ARAX/ResponseCache/recent_uuid_manager.py b/code/ARAX/ResponseCache/recent_uuid_manager.py index a56cc7ffd..11ac5832c 100644 --- a/code/ARAX/ResponseCache/recent_uuid_manager.py +++ b/code/ARAX/ResponseCache/recent_uuid_manager.py @@ -69,17 +69,29 @@ def get_recent_uuids(self, ars_host='ars.ci.transltr.io', top_n_pks=20): if container_key not in response_dict: return( { "status": 404, "title": "Error decoding Response", "detail": f"Cannot decode recent PK list from ARS {ars_host}: cannot find {container_key}", "type": "about:blank" }, 404) + have_timestamps = True + uuid_list = [] for uuid in response_dict[container_key]: #eprint(f"UUID is {uuid}") uuid_data = self.get_uuid(ars_host, uuid) #eprint(json.dumps(uuid_data,indent=2,sort_keys=True)) result = self.summarize_uuid_data(ars_host, uuid_data) + if 'timestamp' not in result: + have_timestamps = False + else: + uuid_list.append( { 'pk': uuid, 'timestamp': result['timestamp'] } ) response['pks'][uuid] = result response['agents_list'] = result['agents_list'] del(result['agents_list']) response['pks'][uuid]['ars_host'] = ars_host #eprint(json.dumps(response,indent=2,sort_keys=True)) + if have_timestamps: + uuid_list.sort(key=lambda x: x['timestamp']) + response['sorted_pk_list'] = [] + for item in uuid_list: + response['sorted_pk_list'].append(item['pk']) + return response @@ -160,6 +172,9 @@ def summarize_uuid_data(self, ars_host, uuid_data): if 'status' in uuid_data: summary['status'] = uuid_data['status'] + if 'timestamp' in uuid_data: + summary['timestamp'] = uuid_data['timestamp'] + if 'children' in uuid_data: for actor_response in uuid_data['children']: code = '-' diff --git a/code/ARAX/test/test_ARAX_expand.py b/code/ARAX/test/test_ARAX_expand.py index 884eaf163..004c0d396 100644 --- a/code/ARAX/test/test_ARAX_expand.py +++ b/code/ARAX/test/test_ARAX_expand.py @@ -7,7 +7,7 @@ import sys import os -from typing import List, Dict, Tuple, Optional +from typing import List, Dict, Optional import pytest @@ -18,7 +18,6 @@ sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../UI/OpenAPI/python-flask-server/") from openapi_server.models.edge import Edge from openapi_server.models.node import Node -from openapi_server.models.query_graph import QueryGraph from openapi_server.models.attribute import Attribute @@ -360,6 +359,7 @@ def test_cohd_expand(): ] nodes_by_qg_id, edges_by_qg_id = _run_query_and_do_standard_testing(actions_list) + @pytest.mark.skip(reason="retire DTD") def test_dtd_expand_1(): actions_list = [ @@ -393,18 +393,6 @@ def test_dtd_expand_2(): assert all([edges_by_qg_id[qedge_key][edge_key].attributes[0].value_url == "https://doi.org/10.1101/765305" for qedge_key in edges_by_qg_id for edge_key in edges_by_qg_id[qedge_key]]) -@pytest.mark.skip # The NGD Expand module has been deprecated... -def test_ngd_expand(): - actions_list = [ - "add_qnode(name=MONDO:0007156, key=n00)", - "add_qnode(categories=biolink:ChemicalEntity, key=n01)", - "add_qedge(subject=n00, object=n01, key=e00)", - "expand(kp=infores:arax-normalized-google-distance)", - "return(message=true, store=false)" - ] - nodes_by_qg_id, edges_by_qg_id = _run_query_and_do_standard_testing(actions_list) - - @pytest.mark.external def test_chp_expand_1(): actions_list = [ @@ -714,6 +702,7 @@ def test_kg2_predicate_hierarchy_reasoning(): assert any(edge for edge in edges_by_qg_id["e00"].values() if edge.predicate == "biolink:affects") assert not any(edge for edge in edges_by_qg_id["e00"].values() if edge.predicate == "biolink:related_to") + @pytest.mark.skip(reason="Dev testing for domain range exclusion") def test_domain_range_exclusion(): actions_list = [ @@ -726,6 +715,7 @@ def test_domain_range_exclusion(): nodes_by_qg_id, edges_by_qg_id = _run_query_and_do_standard_testing(actions_list) assert False + @pytest.mark.slow def test_issue_1373_pinned_curies(): actions_list = [ @@ -1295,7 +1285,7 @@ def test_xdtd_with_other_edges(): } } } - #FIXME: this test is failing since the ability to mix inferred with lookup edges is not yet implemented + # FIXME: this test is failing since the ability to mix inferred with lookup edges is not yet implemented nodes_by_qg_id, edges_by_qg_id = _run_query_and_do_standard_testing(json_query=query, should_throw_error=True) diff --git a/code/UI/OpenAPI/python-flask-server/KG2/openapi_server/__main__.py b/code/UI/OpenAPI/python-flask-server/KG2/openapi_server/__main__.py index 99807759a..20a5cff03 100644 --- a/code/UI/OpenAPI/python-flask-server/KG2/openapi_server/__main__.py +++ b/code/UI/OpenAPI/python-flask-server/KG2/openapi_server/__main__.py @@ -1,121 +1,130 @@ #!/usr/bin/env python3 -import connexion -import flask_cors -import logging -import json -import openapi_server.encoder -import os import sys -import signal -import atexit +import os import traceback -def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) +import json +import setproctitle sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../../../../ARAX/ARAXQuery") - -from ARAX_background_tasker import ARAXBackgroundTasker -from ARAX_database_manager import ARAXDatabaseManager - sys.path.append(os.path.dirname(os.path.abspath(__file__)) + - "/../../../..") -from RTXConfiguration import RTXConfiguration - -# can change this to logging.DEBUG for debuggging -logging.basicConfig(level=logging.INFO) - -child_pid = None - + "/../../../../..") -def receive_sigterm(signal_number, frame): - if signal_number == signal.SIGTERM: - if child_pid is not None: - try: - os.kill(child_pid, signal.SIGKILL) - except ProcessLookupError: - logging.debug(f"child process {child_pid} is already gone; " - "exiting now") - sys.exit(0) - else: - assert False, "should not ever have child_pid be None here" - - -@atexit.register -def ignore_sigchld(): - logging.debug("Setting SIGCHLD to SIG_IGN before exiting") - signal.signal(signal.SIGCHLD, signal.SIG_IGN) +from RTXConfiguration import RTXConfiguration +from ARAX_database_manager import ARAXDatabaseManager -def receive_sigchld(signal_number, frame): - if signal_number == signal.SIGCHLD: - while True: - try: - pid, _ = os.waitpid(-1, os.WNOHANG) - logging.debug(f"PID returned from call to os.waitpid: {pid}") - if pid == 0: - break - except ChildProcessError as e: - logging.debug(repr(e) + - "; this is expected if there are " - "no more child processes to reap") - break +def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) -def receive_sigpipe(signal_number, frame): - if signal_number == signal.SIGPIPE: - logging.error("pipe error") +FLASK_DEFAULT_TCP_PORT = 5008 +child_pid = None +parent_pid = None def main(): - app = connexion.App(__name__, specification_dir='./openapi/') - app.app.json_encoder = openapi_server.encoder.JSONEncoder - app.add_api('openapi.yaml', - arguments={'title': 'RTX KG2 Translator KP'}, - pythonic_params=True) - flask_cors.CORS(app.app) - signal.signal(signal.SIGCHLD, receive_sigchld) - signal.signal(signal.SIGPIPE, receive_sigpipe) - - # Read any load configuration details for this instance - try: - with open('openapi_server/flask_config.json') as infile: - local_config = json.load(infile) - except Exception: - local_config = {"port": 5008} RTXConfiguration() - dbmanager = ARAXDatabaseManager() + dbmanager = ARAXDatabaseManager(allow_downloads=True) try: - logging.info("Checking for complete databases") + eprint("Checking for complete databases") if dbmanager.check_versions(): - logging.warning("Databases incomplete; running update_databases") + eprint("Databases incomplete; running update_databases") dbmanager.update_databases() else: - logging.info("Databases seem to be complete") + eprint("Databases seem to be complete") except Exception as e: - logging.error(traceback.format_exc()) + eprint(traceback.format_exc()) raise e del dbmanager pid = os.fork() if pid == 0: # I am the child process + from ARAX_background_tasker import ARAXBackgroundTasker sys.stdout = open('/dev/null', 'w') sys.stdin = open('/dev/null', 'r') - - logging.info("Starting background tasker in a child process") - ARAXBackgroundTasker().run_tasks(local_config) + setproctitle.setproctitle("python3 ARAX_background_tasker::run_tasks") + eprint("Starting background tasker in a child process") + try: + ARAXBackgroundTasker(run_kp_info_cacher=False).run_tasks() + except Exception as e: + eprint("Error in ARAXBackgroundTasker.run_tasks()") + eprint(traceback.format_exc()) + raise e + eprint("Background tasker child process ended unexpectedly") elif pid > 0: # I am the parent process + import signal + import atexit + + def receive_sigterm(signal_number, frame): + if signal_number == signal.SIGTERM: + if parent_pid == os.getpid(): + try: + os.kill(child_pid, signal.SIGKILL) + except ProcessLookupError: + eprint(f"child process {child_pid} is already gone; " + "exiting now") + os.exit(0) + else: + # handle exit gracefully in the child process + os._exit(0) + + @atexit.register + def ignore_sigchld(): + signal.signal(signal.SIGCHLD, signal.SIG_IGN) + + def receive_sigchld(signal_number, frame): + if signal_number == signal.SIGCHLD: + while True: + try: + pid, _ = os.waitpid(-1, os.WNOHANG) + eprint(f"PID returned from call to os.waitpid: {pid}") + if pid == 0: + break + except ChildProcessError as e: + eprint(repr(e) + + "; this is expected if there are " + "no more child processes to reap") + break + + def receive_sigpipe(signal_number, frame): + if signal_number == signal.SIGPIPE: + eprint("pipe error") + import connexion + import flask_cors + import openapi_server.encoder + app = connexion.App(__name__, specification_dir='./openapi/') + app.app.json_encoder = openapi_server.encoder.JSONEncoder + app.add_api('openapi.yaml', + arguments={'title': 'ARAX KG2 Translator KP'}, + pythonic_params=True) + flask_cors.CORS(app.app) + # Start the service - logging.info(f"Background tasker is running in child process {pid}") + eprint(f"Background tasker is running in child process {pid}") global child_pid child_pid = pid + global parent_pid + parent_pid = os.getpid() + signal.signal(signal.SIGCHLD, receive_sigchld) + signal.signal(signal.SIGPIPE, receive_sigpipe) signal.signal(signal.SIGTERM, receive_sigterm) - logging.info("Starting flask application in the parent process") + + # Read any load configuration details for this instance + try: + with open('openapi_server/flask_config.json') as infile: + local_config = json.load(infile) + except Exception: + local_config = {"port": FLASK_DEFAULT_TCP_PORT} + + eprint("Starting flask application in the parent process") + setproctitle.setproctitle(setproctitle.getproctitle() + + f" [port={local_config['port']}]") app.run(port=local_config['port'], threaded=True) else: - logging.error("[__main__]: fork() unsuccessful") + eprint("[__main__]: fork() unsuccessful") assert False, "****** fork() unsuccessful in __main__" diff --git a/code/UI/OpenAPI/python-flask-server/KG2/openapi_server/controllers/query_controller.py b/code/UI/OpenAPI/python-flask-server/KG2/openapi_server/controllers/query_controller.py index 8ac21f74b..7ebc7c9f1 100644 --- a/code/UI/OpenAPI/python-flask-server/KG2/openapi_server/controllers/query_controller.py +++ b/code/UI/OpenAPI/python-flask-server/KG2/openapi_server/controllers/query_controller.py @@ -1,10 +1,17 @@ -import connexion, flask +import connexion +import flask import json -import os, sys, signal +import os +import sys +import signal import resource -import logging import traceback from typing import Iterable, Callable +import setproctitle + + +def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) + rlimit_child_process_bytes = 34359738368 # 32 GiB @@ -17,15 +24,19 @@ def child_receive_sigpipe(signal_number, frame): if signal_number == signal.SIGPIPE: - logging.info("[query_controller]: child process detected a SIGPIPE; exiting python") + eprint("[query_controller]: child process detected a " + "SIGPIPE; exiting python") os._exit(0) + def run_query_dict_in_child_process(query_dict: dict, query_runner: Callable) -> Iterable[str]: - logging.debug("[query_controller]: Creating pipe and forking a child to handle the query") + eprint("[query_controller]: Creating pipe and " + "forking a child to handle the query") read_fd, write_fd = os.pipe() - # always flush stdout and stderr before calling fork(); someone could have turned off auto-flushing and we don't want double-output + # always flush stdout and stderr before calling fork(); someone could have + # turned off auto-flushing and we don't want double-output sys.stderr.flush() sys.stdout.flush() @@ -35,9 +46,11 @@ def run_query_dict_in_child_process(query_dict: dict, sys.stdout = open('/dev/null', 'w') # parent and child process should not share the same stdout stream object sys.stdin = open('/dev/null', 'r') # parent and child process should not share the same stdin stream object os.close(read_fd) # child doesn't read from the pipe, it writes to it + setproctitle.setproctitle("python3 query_controller::run_query_dict_in_child_process") resource.setrlimit(resource.RLIMIT_AS, (rlimit_child_process_bytes, rlimit_child_process_bytes)) # set a virtual memory limit for the child process signal.signal(signal.SIGPIPE, child_receive_sigpipe) # get rid of signal handler so we don't double-print to the log on SIGPIPE error signal.signal(signal.SIGCHLD, signal.SIG_IGN) # disregard any SIGCHLD signal in the child process + signal.signal(signal.SIGTERM, signal.SIG_DFL) try: with os.fdopen(write_fd, "w") as write_fo: # child process needs to get a stream object for the file descriptor `write_fd` json_string_generator = query_runner(query_dict) @@ -50,10 +63,10 @@ def run_query_dict_in_child_process(query_dict: dict, os._exit(0) elif pid > 0: # I am the parent process os.close(write_fd) # the parent does not write to the pipe, it reads from it - logging.debug(f"[query_controller]: child process pid={pid}") + eprint(f"[query_controller]: child process pid={pid}") read_fo = os.fdopen(read_fd, "r") else: - logging.error("[query_controller]: fork() unsuccessful") + eprint("[query_controller]: fork() unsuccessful") assert False, "********** fork() unsuccessful; something went very wrong *********" return read_fo diff --git a/code/UI/OpenAPI/python-flask-server/openapi_server/__main__.py b/code/UI/OpenAPI/python-flask-server/openapi_server/__main__.py index 102d0ebc9..10d164255 100644 --- a/code/UI/OpenAPI/python-flask-server/openapi_server/__main__.py +++ b/code/UI/OpenAPI/python-flask-server/openapi_server/__main__.py @@ -1,121 +1,130 @@ #!/usr/bin/env python3 -import connexion -import flask_cors -import logging -import json -import openapi_server.encoder -import os import sys -import signal -import atexit +import os import traceback -def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) +import json +import setproctitle sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../../../ARAX/ARAXQuery") - -from ARAX_background_tasker import ARAXBackgroundTasker -from ARAX_database_manager import ARAXDatabaseManager - sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../../..") -from RTXConfiguration import RTXConfiguration - -# can change this to logging.DEBUG for debuggging -logging.basicConfig(level=logging.INFO) - -child_pid = None - -def receive_sigterm(signal_number, frame): - if signal_number == signal.SIGTERM: - if child_pid is not None: - try: - os.kill(child_pid, signal.SIGKILL) - except ProcessLookupError: - logging.debug(f"child process {child_pid} is already gone; " - "exiting now") - sys.exit(0) - else: - assert False, "should not ever have child_pid be None here" - - -@atexit.register -def ignore_sigchld(): - logging.debug("Setting SIGCHLD to SIG_IGN before exiting") - signal.signal(signal.SIGCHLD, signal.SIG_IGN) +from RTXConfiguration import RTXConfiguration +from ARAX_database_manager import ARAXDatabaseManager -def receive_sigchld(signal_number, frame): - if signal_number == signal.SIGCHLD: - while True: - try: - pid, _ = os.waitpid(-1, os.WNOHANG) - logging.debug(f"PID returned from call to os.waitpid: {pid}") - if pid == 0: - break - except ChildProcessError as e: - logging.debug(repr(e) + - "; this is expected if there are " - "no more child processes to reap") - break +def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) -def receive_sigpipe(signal_number, frame): - if signal_number == signal.SIGPIPE: - logging.error("pipe error") +FLASK_DEFAULT_TCP_PORT = 5000 +child_pid = None +parent_pid = None def main(): - app = connexion.App(__name__, specification_dir='./openapi/') - app.app.json_encoder = openapi_server.encoder.JSONEncoder - app.add_api('openapi.yaml', - arguments={'title': 'ARAX Translator Reasoner'}, - pythonic_params=True) - flask_cors.CORS(app.app) - signal.signal(signal.SIGCHLD, receive_sigchld) - signal.signal(signal.SIGPIPE, receive_sigpipe) - - # Read any load configuration details for this instance - try: - with open('openapi_server/flask_config.json') as infile: - local_config = json.load(infile) - except Exception: - local_config = {"port": 5000} RTXConfiguration() - dbmanager = ARAXDatabaseManager() + dbmanager = ARAXDatabaseManager(allow_downloads=True) try: - logging.info("Checking for complete databases") + eprint("Checking for complete databases") if dbmanager.check_versions(): - logging.warning("Databases incomplete; running update_databases") + eprint("Databases incomplete; running update_databases") dbmanager.update_databases() else: - logging.info("Databases seem to be complete") + eprint("Databases seem to be complete") except Exception as e: - logging.error(traceback.format_exc()) + eprint(traceback.format_exc()) raise e del dbmanager pid = os.fork() if pid == 0: # I am the child process + from ARAX_background_tasker import ARAXBackgroundTasker sys.stdout = open('/dev/null', 'w') sys.stdin = open('/dev/null', 'r') - - logging.info("Starting background tasker in a child process") - ARAXBackgroundTasker().run_tasks(local_config) + setproctitle.setproctitle("python3 ARAX_background_tasker::run_tasks") + eprint("Starting background tasker in a child process") + try: + ARAXBackgroundTasker().run_tasks() + except Exception as e: + eprint("Error in ARAXBackgroundTasker.run_tasks()") + eprint(traceback.format_exc()) + raise e + eprint("Background tasker child process ended unexpectedly") elif pid > 0: # I am the parent process + import signal + import atexit + + def receive_sigterm(signal_number, frame): + if signal_number == signal.SIGTERM: + if parent_pid == os.getpid(): + try: + os.kill(child_pid, signal.SIGKILL) + except ProcessLookupError: + eprint(f"child process {child_pid} is already gone; " + "exiting now") + os.exit(0) + else: + # handle exit gracefully in the child process + os._exit(0) + + @atexit.register + def ignore_sigchld(): + signal.signal(signal.SIGCHLD, signal.SIG_IGN) + + def receive_sigchld(signal_number, frame): + if signal_number == signal.SIGCHLD: + while True: + try: + pid, _ = os.waitpid(-1, os.WNOHANG) + eprint(f"PID returned from call to os.waitpid: {pid}") + if pid == 0: + break + except ChildProcessError as e: + eprint(repr(e) + + "; this is expected if there are " + "no more child processes to reap") + break + + def receive_sigpipe(signal_number, frame): + if signal_number == signal.SIGPIPE: + eprint("pipe error") + import connexion + import flask_cors + import openapi_server.encoder + app = connexion.App(__name__, specification_dir='./openapi/') + app.app.json_encoder = openapi_server.encoder.JSONEncoder + app.add_api('openapi.yaml', + arguments={'title': 'ARAX Translator Reasoner'}, + pythonic_params=True) + flask_cors.CORS(app.app) + # Start the service - logging.info(f"Background tasker is running in child process {pid}") + eprint(f"Background tasker is running in child process {pid}") global child_pid child_pid = pid + global parent_pid + parent_pid = os.getpid() + signal.signal(signal.SIGCHLD, receive_sigchld) + signal.signal(signal.SIGPIPE, receive_sigpipe) signal.signal(signal.SIGTERM, receive_sigterm) - logging.info("Starting flask application in the parent process") + + # Read any load configuration details for this instance + try: + with open('openapi_server/flask_config.json') as infile: + local_config = json.load(infile) + except Exception: + local_config = {"port": FLASK_DEFAULT_TCP_PORT} + + eprint("Starting flask application in the parent process") + setproctitle.setproctitle(setproctitle.getproctitle() + + f" [port={local_config['port']}]") app.run(port=local_config['port'], threaded=True) else: - logging.error("[__main__]: fork() unsuccessful") + eprint("[__main__]: fork() unsuccessful") assert False, "****** fork() unsuccessful in __main__" diff --git a/code/UI/OpenAPI/python-flask-server/openapi_server/controllers/query_controller.py b/code/UI/OpenAPI/python-flask-server/openapi_server/controllers/query_controller.py index 9999b97ce..1992b2020 100644 --- a/code/UI/OpenAPI/python-flask-server/openapi_server/controllers/query_controller.py +++ b/code/UI/OpenAPI/python-flask-server/openapi_server/controllers/query_controller.py @@ -1,10 +1,17 @@ -import connexion, flask +import connexion +import flask import json -import os, sys, signal +import os +import sys +import signal import resource -import logging import traceback from typing import Iterable, Callable +import setproctitle + + +def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) + rlimit_child_process_bytes = 34359738368 # 32 GiB @@ -17,15 +24,19 @@ def child_receive_sigpipe(signal_number, frame): if signal_number == signal.SIGPIPE: - logging.info("[query_controller]: child process detected a SIGPIPE; exiting python") + eprint("[query_controller]: child process detected a " + "SIGPIPE; exiting python") os._exit(0) + def run_query_dict_in_child_process(query_dict: dict, query_runner: Callable) -> Iterable[str]: - logging.debug("[query_controller]: Creating pipe and forking a child to handle the query") + eprint("[query_controller]: Creating pipe and " + "forking a child to handle the query") read_fd, write_fd = os.pipe() - # always flush stdout and stderr before calling fork(); someone could have turned off auto-flushing and we don't want double-output + # always flush stdout and stderr before calling fork(); someone could have + # turned off auto-flushing and we don't want double-output sys.stderr.flush() sys.stdout.flush() @@ -34,10 +45,12 @@ def run_query_dict_in_child_process(query_dict: dict, if pid == 0: # I am the child process sys.stdout = open('/dev/null', 'w') # parent and child process should not share the same stdout stream object sys.stdin = open('/dev/null', 'r') # parent and child process should not share the same stdin stream object - os.close(read_fd) # child doesn't read from the pipe, it writes to it + os.close(read_fd) # child doesn't read from the pipe, it writes to it + setproctitle.setproctitle("python3 query_controller::run_query_dict_in_child_process") resource.setrlimit(resource.RLIMIT_AS, (rlimit_child_process_bytes, rlimit_child_process_bytes)) # set a virtual memory limit for the child process signal.signal(signal.SIGPIPE, child_receive_sigpipe) # get rid of signal handler so we don't double-print to the log on SIGPIPE error signal.signal(signal.SIGCHLD, signal.SIG_IGN) # disregard any SIGCHLD signal in the child process + signal.signal(signal.SIGTERM, signal.SIG_DFL) try: with os.fdopen(write_fd, "w") as write_fo: # child process needs to get a stream object for the file descriptor `write_fd` json_string_generator = query_runner(query_dict) @@ -50,10 +63,10 @@ def run_query_dict_in_child_process(query_dict: dict, os._exit(0) elif pid > 0: # I am the parent process os.close(write_fd) # the parent does not write to the pipe, it reads from it - logging.debug(f"[query_controller]: child process pid={pid}") + eprint(f"[query_controller]: child process pid={pid}") read_fo = os.fdopen(read_fd, "r") else: - logging.error("[query_controller]: fork() unsuccessful") + eprint("[query_controller]: fork() unsuccessful") assert False, "********** fork() unsuccessful; something went very wrong *********" return read_fo diff --git a/code/UI/interactive/index.html b/code/UI/interactive/index.html index 08eab4352..7c60af3df 100644 --- a/code/UI/interactive/index.html +++ b/code/UI/interactive/index.html @@ -393,13 +393,13 @@