Skip to content

Commit

Permalink
Release/0.8.rc0 (#54)
Browse files Browse the repository at this point in the history
* Bumping version

* Feature/cde crawls (#51)

* adding cde kgx files

* extracted elements path

* adding methods for extracting dug elements from graph

* adding indexing step to dag

* adding dug config for expansion queries

* adapting to latest dug code

* adapting to latest dug code

* moving cde kgxs as new entry in metadata, adding dug release , fix long running python dep resolution

* adding enabled flag for concept -> dug element crawls

* adding legacy resolver for make file aswell

* parse bool as string in yaml , since all env vars are going to be strings

* Update config.yaml

* Update requirements.txt

updating dug lib to the current release

* resolving long taking pip installs, with Jeffs help

Co-authored-by: Yaphetkg <[email protected]>

* Remove logging of config; contains redis and S3 keys (#52)

* when two graph sets are downloaded, bug that deletes the first download (#53)

Co-authored-by: Yaphetkg <[email protected]>

* Update _version.py

* Update _version.py

Co-authored-by: Carl Schreep <[email protected]>
Co-authored-by: Yaphetkg <[email protected]>
Co-authored-by: Mac Chaffee <[email protected]>
  • Loading branch information
4 people authored Feb 14, 2022
1 parent c5f34bb commit a425bf3
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 10 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ RUN apt-get update && \
apt-get install -y git gcc python3-dev nano vim
USER airflow
COPY requirements.txt requirements.txt
# dependency resolution taking hours eventually failing,
# @TODO fix click lib dependency
RUN pip install -r requirements.txt
RUN pip uninstall -y elasticsearch-dsl
RUN rm -f requirements.txt
2 changes: 1 addition & 1 deletion dags/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.7.0"
version = "0.8.0"
28 changes: 26 additions & 2 deletions dags/dug_helpers/dug_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self, config: RogerConfig, to_string=True):
graph_name = self.config["redisgraph"]["graph"]
source = f"redis:{graph_name}"
self.tranql_queries: dict = self.factory.build_tranql_queries(source)
self.node_to_element_queries: list = self.factory.build_element_extraction_parameters(source)

indexing_config = config.indexing
self.variables_index = indexing_config.get('variables_index')
Expand Down Expand Up @@ -430,17 +431,19 @@ def crawl_concepts(self, concepts, data_set_name):
"""
crawl_dir = Util.dug_crawl_path('crawl_output')
output_file_name = os.path.join(data_set_name, 'expanded_concepts.pickle')
extracted_dug_elements_file_name = os.path.join(data_set_name, 'extracted_graph_elements.pickle')
output_file = Util.dug_expanded_concepts_path(output_file_name)
extracted_output_file = Util.dug_expanded_concepts_path(extracted_dug_elements_file_name)
Path(crawl_dir).mkdir(parents=True, exist_ok=True)

extracted_dug_elements = []
log.debug("Creating Dug Crawler object")
crawler = Crawler(
crawl_file="",
parser=None,
annotator=None,
tranqlizer=self.tranqlizer,
tranql_queries=self.tranql_queries,
http_session=self.cached_session
http_session=self.cached_session,
)
crawler.crawlspace = crawl_dir
counter = 0
Expand All @@ -450,11 +453,22 @@ def crawl_concepts(self, concepts, data_set_name):
crawler.expand_concept(concept)
concept.set_search_terms()
concept.set_optional_terms()
for query in self.node_to_element_queries:
casting_config = query['casting_config']
tranql_source = query['tranql_source']
dug_element_type = query['output_dug_type']
extracted_dug_elements += crawler.expand_to_dug_element(
concept=concept,
casting_config=casting_config,
dug_element_type=dug_element_type,
tranql_source=tranql_source
)
concept.clean()
percent_complete = int((counter / total) * 100)
if percent_complete % 10 == 0:
log.info(f"{percent_complete}%")
Util.write_object(obj=concepts, path=output_file)
Util.write_object(obj=extracted_dug_elements, path=extracted_output_file)

def index_concepts(self, concepts):
log.info("Indexing Concepts")
Expand Down Expand Up @@ -552,6 +566,7 @@ def clear_kg_index(self):
def clear_concepts_index(self):
self.clear_index(self.concepts_index)


class DugUtil():

@staticmethod
Expand Down Expand Up @@ -637,6 +652,15 @@ def index_variables(config=None, to_string=False):
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

@staticmethod
def index_extracted_elements(config=None, to_string=False):
with Dug(config, to_string=to_string) as dug:
elements_object_files = Util.dug_extracted_elements_objects()
for file in elements_object_files:
dug.index_elements(file)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

@staticmethod
def index_concepts(config=None, to_string=False):
with Dug(config=config, to_string=to_string) as dug:
Expand Down
9 changes: 8 additions & 1 deletion dags/index_dag.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy import DummyOperator

from roger.dag_util import get_executor_config, default_args, create_python_task
from dug_helpers.dug_utils import DugUtil
Expand All @@ -19,8 +20,14 @@
validate_index_variables = create_python_task(dag,"ValidateIndexVariables", DugUtil.validate_indexed_variables)
crawl_tags = create_python_task(dag, "CrawlConcepts", DugUtil.crawl_tranql)
index_concepts = create_python_task(dag, "IndexConcepts", DugUtil.index_concepts)
dummy_stepover = DummyOperator(
task_id="continue",
)
index_extracted_dug_elements = create_python_task(dag, "IndexExtractedElements", DugUtil.index_extracted_elements)
validate_index_concepts = create_python_task(dag, "ValidateIndexConcepts", DugUtil.validate_indexed_concepts)
finish = BashOperator (task_id='Finish', bash_command='echo finish')
""" Build the DAG. """
intro >> index_variables >> validate_index_variables >> finish
intro >> crawl_tags >> index_concepts >> validate_index_concepts >> finish
intro >> crawl_tags >> index_concepts >> dummy_stepover
intro >> crawl_tags >> index_extracted_dug_elements >> dummy_stepover
dummy_stepover >> validate_index_concepts >> finish
6 changes: 6 additions & 0 deletions dags/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ kgx:
files:
- panther.json
name: test
- version: v3.0
name: cde-graph
format: jsonl
files:
- cde/annotated_edges_v3.0.jsonl
- cde/annotated_nodes_v3.0.jsonl
dug_inputs:
versions:
- name: bdc
Expand Down
11 changes: 11 additions & 0 deletions dags/roger/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,16 @@ class IndexingConfig(DictLike):
"anat_to_pheno": ["anatomical_entity", "phenotypic_feature"],
})
tranql_endpoint: str = "http://tranql:8081/tranql/query?dynamic_id_resolution=true&asynchronous=false"
# by default skips node to element queries
node_to_element_queries: dict = field(default_factory=lambda: {})

def __post_init__(self):
node_to_el_enabled = True if str(self.node_to_element_queries.get("enabled")).lower() == "true" else False
final_node_to_element_queries = {}
if node_to_el_enabled:
for key in filter(lambda k: k != "enabled", self.node_to_element_queries.keys()):
final_node_to_element_queries[key] = self.node_to_element_queries[key]
self.node_to_element_queries = final_node_to_element_queries

@dataclass
class ElasticsearchConfig(DictLike):
Expand All @@ -127,6 +136,7 @@ class ElasticsearchConfig(DictLike):
nboost_host: str = ""



class RogerConfig(DictLike):

OS_VAR_PREFIX = "ROGER_"
Expand Down Expand Up @@ -178,6 +188,7 @@ def to_dug_conf(self) -> DugConfig:
'min_tranql_score': self.indexing.tranql_min_score,
},
ontology_greenlist=self.annotation.ontology_greenlist,
node_to_element_queries=self.indexing.node_to_element_queries,
)

@property
Expand Down
6 changes: 5 additions & 1 deletion dags/roger/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ indexing:
"chemical_mixture_to_disease": ["chemical_mixture", "disease"]
"phen_to_anat": ["phenotypic_feature", "anatomical_entity"]
tranql_endpoint: "http://tranql:8081/tranql/query?dynamic_id_resolution=true&asynchronous=false"
node_to_element_queries:
enabled: false
cde:
node_type: biolink:Publication

elasticsearch:
host: elasticsearch
Expand Down Expand Up @@ -131,4 +135,4 @@ s3:
host: ""
bucket: ""
access_key: ""
secret_key: ""
secret_key: ""
11 changes: 9 additions & 2 deletions dags/roger/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ def dug_expanded_concept_objects():
file_pattern = Util.dug_expanded_concepts_path(os.path.join('*','expanded_concepts.pickle'))
return sorted(glob.glob(file_pattern))

@staticmethod
def dug_extracted_elements_objects():
file_pattern = Util.dug_expanded_concepts_path(os.path.join('*', 'extracted_graph_elements.pickle'))
return sorted(glob.glob(file_pattern))

@staticmethod
def dug_crawl_path(name):
return str(ROGER_DATA_DIR / 'dug' / 'crawl' / name)
Expand Down Expand Up @@ -593,13 +598,14 @@ def get (self, dataset_version = "v1.0"):
"""
metadata = Util.read_relative_object ("../metadata.yaml")
data_set_list = self.config.kgx.data_sets
kgx_files_remote = []
for item in metadata['kgx']['versions']:
if item['version'] == dataset_version and item['name'] in data_set_list:
log.info(f"Getting KGX dataset {item['name']} , version {item['version']}")
if item['format'] == 'json':
kgx_files_remote = self.get_kgx_json_format(item['files'], item['version'])
kgx_files_remote += self.get_kgx_json_format(item['files'], item['version'])
elif item['format'] == 'jsonl':
kgx_files_remote = self.get_kgx_jsonl_format(item['files'], item['version'])
kgx_files_remote += self.get_kgx_jsonl_format(item['files'], item['version'])
else:
raise ValueError(f"Unrecognized format in metadata.yaml: {item['format']}, valid formats are `json` "
f"and `jsonl`.")
Expand Down Expand Up @@ -851,6 +857,7 @@ def merge_node_and_edges (self, nodes, edges, current_metric , data_set_name ):
# add predicate labels to edges;
for edge_id in edges:
edges[edge_id]['predicate_label'] = self.biolink.get_label(edges[edge_id]['predicate'])
edges[edge_id]['id'] = edges[edge_id].get('id', edge_id.replace('edge-', ''))
merge_time = time.time() - merge_time
current_metric['merge_time'] = merge_time
write_to_redis_time = time.time()
Expand Down
2 changes: 0 additions & 2 deletions dags/roger/dag_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ def task_wrapper(python_callable, **kwargs):
del kwargs['dag_run']
# overrides values
config.dag_run = dag_run
logger.info("Config")
logger.info(config.dict)
return python_callable(to_string=False, config=config)


Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ argcomplete==1.12.0
apache-airflow==2.1.2
boto3==1.18.23
botocore==1.21.23
black==21.10b0
flatten-dict
psycopg2-binary==2.8.6
redis==3.5.3
Expand All @@ -10,7 +11,7 @@ redisgraph-bulk-loader==0.9.5
requests<2.24.0
pytest==6.2.2
PyYAML==5.3.1
git+git://github.com/helxplatform/dug@2.7.0#egg=dug
git+git://github.com/helxplatform/dug@v2.8.0
elasticsearch==7.11.0
biolinkml>=1.5.10
orjson
Expand Down

0 comments on commit a425bf3

Please sign in to comment.