Skip to content

Commit

Permalink
Merge pull request #10 from clingen-data-model/demo-notebooks
Browse files Browse the repository at this point in the history
Demo notebooks
  • Loading branch information
theferrit32 authored Aug 28, 2024
2 parents 2b1fca7 + 858b8dd commit ce1f67b
Show file tree
Hide file tree
Showing 5 changed files with 1,369 additions and 3 deletions.
19 changes: 17 additions & 2 deletions clinvar_gk_pilot/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def blob_writer(


def blob_reader(
blob_uri: str, client: storage.Client = None, binary=True
blob_uri: str, client: storage.Client | None = None, binary=True
) -> storage.Blob:
"""
Returns a file-like object that can be used to read from the blob at `blob_uri`
Expand All @@ -109,17 +109,32 @@ def blob_reader(
return blob.open("rb" if binary else "r")


def blob_size(blob_uri: str, client: storage.Client = None) -> int:
def blob_size(blob_uri: str, client: storage.Client | None = None) -> int:
"""
Returns the size of the blob in bytes if it exists. Raises an error if it does not.
"""
if client is None:
client = _get_gcs_client()
blob = parse_blob_uri(blob_uri, client=client)
blob.reload() # Refreshes local Blob object properties from the remote object
assert blob.exists()
assert blob.size is not None
return blob.size


def list_blobs(bucket_name: str, prefix: str, client: storage.Client | None = None):
if client is None:
client = _get_gcs_client()

bucket = client.get_bucket(bucket_name)
blobs = bucket.list_blobs(prefix=prefix)

# Generate the list of blob URIs
blob_uris = [f"gs://{bucket_name}/{blob.name}" for blob in blobs]

return blob_uris


def http_download_requests(
http_uri: str,
local_path: PurePath,
Expand Down
5 changes: 4 additions & 1 deletion clinvar_gk_pilot/logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
import logging.config
from pathlib import Path

with open("log_conf.json", "r") as f:
PROJECT_ROOT = Path(__file__).resolve().parents[1]

with open(PROJECT_ROOT / "log_conf.json", "r", encoding="utf-8") as f:
conf = json.load(f)
logging.config.dictConfig(conf)

Expand Down
265 changes: 265 additions & 0 deletions notebooks/ASHG-clinvar-GKS-duckdb.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import duckdb\n",
"import gzip\n",
"import json\n",
"import os\n",
"\n",
"from clinvar_gk_pilot.gcs import (\n",
" _local_file_path_for,\n",
" download_to_local_file,\n",
" already_downloaded,\n",
")\n",
"\n",
"# variation_blob_uri = \"gs://clingen-public/clinvar-gk-pilot/2024-04-07/dev/clinvar-variation-20240407.json.gz\"\n",
"# scv_blob_uri = (\n",
"# \"gs://clingen-public/clinvar-gk-pilot/2024-04-07/dev/clinvar-scvs-20240407.json.gz\"\n",
"# )\n",
"\n",
"catvar_blob_uri = (\n",
" \"gs://clinvar-gk-pilot/2024-04-07/dev/combined-catvar_output.ndjson.gz\"\n",
")\n",
"scv_blob_uri = \"gs://clinvar-gk-pilot/2024-04-07/dev/combined-scv_output.ndjson.gz\"\n",
"\n",
"catvar_file = \"combined-catvar_output.ndjson.gz\"\n",
"\n",
"\n",
"variation_local_file_path = _local_file_path_for(catvar_blob_uri)\n",
"if not already_downloaded(catvar_blob_uri):\n",
" print(f\"Downloading {catvar_blob_uri} to {variation_local_file_path}\")\n",
" dl_variation_local_file_path = download_to_local_file(catvar_blob_uri)\n",
" assert dl_variation_local_file_path == variation_local_file_path\n",
"\n",
"scv_local_file_path = _local_file_path_for(scv_blob_uri)\n",
"if not already_downloaded(scv_blob_uri):\n",
" print(f\"Downloading {scv_blob_uri} to {scv_local_file_path}\")\n",
" dl_scv_local_file_path = download_to_local_file(scv_blob_uri)\n",
" assert dl_scv_local_file_path == scv_local_file_path\n",
"\n",
"catvar_file = variation_local_file_path\n",
"scv_file = scv_local_file_path"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Our ClinVar datasets are available as NDJSON files. There is a variation file and an SCV file. The records of the variation file are CategoricalVariation objects, and the records of the SCV file are `VariationPathogenicity` (sub-class of `Statement`)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"################################\n",
"# Query the SCV file for a VRS ID using vanilla Python\n",
"#\n",
"# - for a given ClinVar Variation ID, find the corresponding GA4GH CatVar record in the CatVar\n",
"# file and find the SCVs which reference that variant in the SCV file\n",
"#\n",
"# (NOTE: the SCV file also contains the full CatVar definition in the \"variation\" field, but\n",
"# this example illustrates how to query across both files, since the SCV file can be\n",
"# relationally normalized to extract that redundant entity and refer to the variant\n",
"# by the CatVar ID as a foreign key)\n",
"#\n",
"# - print the SCV interpretations for that variant\n",
"#\n",
"################################\n",
"################################\n",
"# Inputs\n",
"\n",
"################################\n",
"# A CanonicalAllele\n",
"## For searching based on the GKS Categorical Variation (CatVrs) ID\n",
"clinvar_id = \"563765\"\n",
"## For searching based on the GA4GH VRS Variation ID\n",
"vrs_id = \"ga4gh:VA.hf_va4AnlG99NuOjtaXJzh_XvszWWOO9\"\n",
"\n",
"\n",
"################################\n",
"# A CategoricalCnv\n",
"## For searching based on the GKS Categorical Variation (CatVrs) ID\n",
"clinvar_id = \"599353\"\n",
"## For searching based on the GA4GH VRS Variation ID\n",
"vrs_id = \"ga4gh:CX.5iqyOA4L5njh5FpymTPcwQ8oHTilQFmo\" # GRCh38 member\n",
"\n",
"\n",
"catvar_file = \"combined-catvar_output.ndjson.gz\"\n",
"scv_file = \"combined-scv_output.ndjson.gz\"\n",
"################################\n",
"assert os.path.exists(catvar_file)\n",
"assert os.path.exists(scv_file)\n",
"catvar_id = f\"clinvar:{clinvar_id}\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"While these can be read with vanilla python by iterating lines and parsing each as JSON, there are also libraries which can make querying the files simpler and potentially more performant.\n",
"\n",
"One option is DuckDB. DuckDB achieves fast speeds and low memory utilization by memory mapping files and dropping rows from memory that don't match filter criteria. It also has the benefit of being able to query GZIP-compressed NDJSON files directly, so disk storage stays minimal, and presenting a SQL interface to the data, with full support of nested structs so we can access fields from nested JSON objects without manipulating the files. Another benefit is that it gracefully handles heterogeneous record schemas, automatically nulling values that don't exist in particular rows."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"ename": "ConversionException",
"evalue": "Conversion Error: Malformed JSON at byte 0 of input: unexpected character. Input: ga4gh:CX.5iqyOA4L5njh5FpymTPcwQ8oHTilQFmo",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mConversionException\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[7], line 78\u001b[0m\n\u001b[1;32m 52\u001b[0m \u001b[38;5;66;03m# # Get the actual SCV records\u001b[39;00m\n\u001b[1;32m 53\u001b[0m \u001b[38;5;66;03m# with gzip.open(scv_file, \"rt\") as f:\u001b[39;00m\n\u001b[1;32m 54\u001b[0m \u001b[38;5;66;03m# for line in f:\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 72\u001b[0m \u001b[38;5;66;03m# for row in batch:\u001b[39;00m\n\u001b[1;32m 73\u001b[0m \u001b[38;5;66;03m# scvs.append(row)\u001b[39;00m\n\u001b[1;32m 75\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m scvs\n\u001b[0;32m---> 78\u001b[0m scvs \u001b[38;5;241m=\u001b[39m \u001b[43mquery_scvs_by_vrs_id\u001b[49m\u001b[43m(\u001b[49m\u001b[43mvrs_id\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mscv_file\u001b[49m\u001b[43m)\u001b[49m\n",
"Cell \u001b[0;32mIn[7], line 43\u001b[0m, in \u001b[0;36mquery_scvs_by_vrs_id\u001b[0;34m(vrs_id, scv_file)\u001b[0m\n\u001b[1;32m 41\u001b[0m scv_ids \u001b[38;5;241m=\u001b[39m []\n\u001b[1;32m 42\u001b[0m results \u001b[38;5;241m=\u001b[39m duckdb\u001b[38;5;241m.\u001b[39msql(sql)\n\u001b[0;32m---> 43\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m batch \u001b[38;5;241m:=\u001b[39m \u001b[43mresults\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mfetchmany\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m100\u001b[39;49m\u001b[43m)\u001b[49m:\n\u001b[1;32m 44\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m row \u001b[38;5;129;01min\u001b[39;00m batch:\n\u001b[1;32m 45\u001b[0m \u001b[38;5;28mid\u001b[39m, defining_context_id, member, rec \u001b[38;5;241m=\u001b[39m row\n",
"\u001b[0;31mConversionException\u001b[0m: Conversion Error: Malformed JSON at byte 0 of input: unexpected character. Input: ga4gh:CX.5iqyOA4L5njh5FpymTPcwQ8oHTilQFmo"
]
}
],
"source": [
"################################\n",
"# Query the SCV file for the matching VRS ID using DuckDB as a query\n",
"# engine to obtain the list of SCVs we are interested in.\n",
"################################\n",
"\n",
"\n",
"def query_scvs_by_vrs_id(vrs_id: str, scv_file: str):\n",
" scvs = []\n",
"\n",
" sql = f\"\"\"\n",
" SELECT id, definingContext_id, member.id as member_id, a\n",
" FROM\n",
" (\n",
" SELECT\n",
" id,\n",
" variation.definingContext.id as definingContext_id,\n",
" unnest(variation.members) as member,\n",
" a\n",
" FROM read_json('{scv_file}', format='newline_delimited', ignore_errors=true) a\n",
" )\n",
" WHERE definingContext_id = '{vrs_id}'\n",
" OR member_id = '{vrs_id}'\n",
" \"\"\"\n",
"\n",
" sql = f\"\"\"\n",
" SELECT id, definingContext_id, member_id, a\n",
" FROM\n",
" (\n",
" SELECT\n",
" id,\n",
" variation.definingContext.id as definingContext_id,\n",
" unnest(json_extract(variation, '$.members[*].id')) as member_id,\n",
" a\n",
" FROM read_json('{scv_file}', format='newline_delimited', ignore_errors=true) a\n",
" )\n",
" WHERE definingContext_id = '{vrs_id}'\n",
" OR member_id = '{vrs_id}'\n",
" \"\"\"\n",
" # LIMIT 10\n",
"\n",
" scv_ids = []\n",
" results = duckdb.sql(sql)\n",
" while batch := results.fetchmany(100):\n",
" for row in batch:\n",
" id, defining_context_id, member, rec = row\n",
" d = {\"id\": id, \"definingContext_id\": defining_context_id, \"member\": member}\n",
" print(d)\n",
" scv_ids.append(row[0])\n",
" print(f\"Found {len(scv_ids)} SCVs for VRS id {vrs_id}\")\n",
" print(f\"SCV IDs: {scv_ids}\")\n",
"\n",
" # # Get the actual SCV records\n",
" # with gzip.open(scv_file, \"rt\") as f:\n",
" # for line in f:\n",
" # # Do simple string contains check before parsing the line as JSON.\n",
" # # With a small set of scv_ids, this is significantly faster than\n",
" # # parsing every line as JSON first.\n",
" # for scv_id in scv_ids:\n",
" # if scv_id in line:\n",
" # record = json.loads(line)\n",
" # if record[\"id\"] in scv_ids:\n",
" # scvs.append(record)\n",
"\n",
" # # Get the actual SCV records with a second duckdb query\n",
" # sql = f\"\"\"\n",
" # SELECT *\n",
" # FROM read_json('{scv_file}', format='newline_delimited', ignore_errors=true)\n",
" # WHERE id IN ({','.join([f\"'{id}'\" for id in scv_ids])})\n",
" # \"\"\"\n",
" # results = duckdb.sql(sql)\n",
" # while batch := results.fetchmany(100):\n",
" # for row in batch:\n",
" # scvs.append(row)\n",
"\n",
" return scvs\n",
"\n",
"\n",
"scvs = query_scvs_by_vrs_id(vrs_id, scv_file)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(scvs)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for scv in scvs:\n",
" classification = scv[\"classification\"][\"label\"]\n",
" condition = scv[\"condition\"][\"label\"]\n",
" print(f\"SCV: {scv['id']} \")\n",
" print(f\" Classification: {classification}\")\n",
" print(f\" Condition: {condition}\")\n",
" print()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit ce1f67b

Please sign in to comment.