Skip to content

Commit

Permalink
implement materialised queries in solr (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesamcl authored Jan 8, 2025
1 parent 1dfd074 commit 4d087db
Show file tree
Hide file tree
Showing 219 changed files with 10,833 additions and 501 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ jobs:
docker tag ghcr.io/ebispot/grebi_resolver_service:${{github.sha}} ghcr.io/ebispot/grebi_resolver_service:${{ github.ref_name }}
docker push --all-tags ghcr.io/ebispot/grebi_resolver_service
- name: Build and push GrEBI summary service Docker image
- name: Build and push GrEBI metadata service Docker image
run: |
cd webapp/grebi_summary_service
docker build -t ghcr.io/ebispot/grebi_summary_service:${{ github.sha }} .
docker tag ghcr.io/ebispot/grebi_summary_service:${{github.sha}} ghcr.io/ebispot/grebi_summary_service:${{ github.ref_name }}
docker push --all-tags ghcr.io/ebispot/grebi_summary_service
cd webapp/grebi_metadata_service
docker build -t ghcr.io/ebispot/grebi_metadata_service:${{ github.sha }} .
docker tag ghcr.io/ebispot/grebi_metadata_service:${{github.sha}} ghcr.io/ebispot/grebi_metadata_service:${{ github.ref_name }}
docker push --all-tags ghcr.io/ebispot/grebi_metadata_service
- name: Build and push GrEBI UI Docker image
run: |
Expand Down
10 changes: 5 additions & 5 deletions dataload/04_index/grebi_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ struct Args {
subgraph_name: String,

#[arg(long)]
out_summary_json_path: String,
out_graph_metadata_json_path: String,

#[arg(long)]
out_metadata_jsonl_path: String,
out_entity_metadata_jsonl_path: String,

#[arg(long)]
out_names_txt: String,
Expand All @@ -57,8 +57,8 @@ fn main() {
let mut all_names:BTreeSet<Vec<u8>> = BTreeSet::new();
let mut all_ids:BTreeSet<Vec<u8>> = BTreeSet::new();

let mut summary_writer = BufWriter::new(File::create(&args.out_summary_json_path).unwrap());
let mut metadata_writer = BufWriter::new(File::create(&args.out_metadata_jsonl_path).unwrap());
let mut graph_metadata_writer = BufWriter::new(File::create(&args.out_graph_metadata_json_path).unwrap());
let mut metadata_writer = BufWriter::new(File::create(&args.out_entity_metadata_jsonl_path).unwrap());
let mut names_writer = BufWriter::new(File::create(&args.out_names_txt).unwrap());
let mut ids_writer = BufWriter::new(File::create(&args.out_ids_txt).unwrap());

Expand Down Expand Up @@ -223,7 +223,7 @@ fn main() {

let start_time3 = std::time::Instant::now();

summary_writer.write_all(
graph_metadata_writer.write_all(
serde_json::to_string_pretty(&json!({
"subgraph_name": args.subgraph_name,
"entity_props": entity_props_to_count.iter().map(|(k,v)| {
Expand Down
14 changes: 7 additions & 7 deletions dataload/05_link/grebi_link/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ struct Args {
in_metadata_jsonl: String,

#[arg(long)]
in_summary_json: String,
in_graph_metadata_json: String,

#[arg(long)]
out_edges_jsonl: String,

#[arg(long)]
out_summary_json: String,
out_graph_metadata_json: String,

#[arg(long)]
groups_txt: String,
Expand Down Expand Up @@ -119,7 +119,7 @@ fn main() -> std::io::Result<()> {

let mut types_to_count:HashMap<Vec<u8>,i64> = HashMap::new();
{
let summary_json:Map<String, Value> = serde_json::from_reader(File::open(&args.in_summary_json).unwrap()).unwrap();
let summary_json:Map<String, Value> = serde_json::from_reader(File::open(&args.in_graph_metadata_json).unwrap()).unwrap();
for (k, v) in summary_json["types"].as_object().unwrap() {
types_to_count.insert(k.as_bytes().to_vec(), v.as_object().unwrap()["count"].as_i64().unwrap());
}
Expand All @@ -134,8 +134,8 @@ fn main() -> std::io::Result<()> {
let stdout = io::stdout().lock();
let mut nodes_writer = BufWriter::new(stdout);

let summary_file = File::create(args.out_summary_json).unwrap();
let mut summary_writer = BufWriter::new(summary_file);
let summary_file = File::create(args.out_graph_metadata_json).unwrap();
let mut graph_metadata_writer = BufWriter::new(summary_file);

let mut edge_summary:EdgeSummaryTable = HashMap::new();

Expand Down Expand Up @@ -263,7 +263,7 @@ fn main() -> std::io::Result<()> {
}
}

summary_writer.write_all(serde_json::to_string_pretty(&json!({
graph_metadata_writer.write_all(serde_json::to_string_pretty(&json!({
"entity_prop_defs": entity_prop_defs,
"edge_prop_defs": edge_prop_defs,
"types": type_defs,
Expand All @@ -275,7 +275,7 @@ fn main() -> std::io::Result<()> {
"edges": edge_summary
})).unwrap().as_bytes()).unwrap();

summary_writer.flush().unwrap();
graph_metadata_writer.flush().unwrap();

Ok(())
}
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct Args {
in_edges_jsonl: String,

#[arg(long)]
in_summary_jsons: String,
in_graph_metadata_jsons: String,

#[arg(long)]
out_nodes_csv_path: String,
Expand Down Expand Up @@ -59,7 +59,7 @@ fn main() -> std::io::Result<()> {
let mut all_edge_props: HashSet<String> = HashSet::new();


for f in args.in_summary_jsons.split(",") {
for f in args.in_graph_metadata_jsons.split(",") {
let summary:Value = serde_json::from_reader(File::open(f).unwrap()).unwrap();
for prop in summary["edge_props"].as_object().unwrap().keys() {
all_edge_props.insert(prop.to_string());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ def main():
'--bind ' + os.path.abspath(".") + ':/mnt',
'--bind ' + shlex.quote(neo_data_path) + ':/data',
'--bind ' + shlex.quote(neo_logs_path) + ':/logs',
'--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '07_create_db/neo4j/neo4j_import.dockersh')) + ':/import.sh',
'--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '07_create_db/neo4j/cypher')) + ':/cypher',
'--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '06_create_neo_db/neo4j_import.dockersh')) + ':/import.sh',
'--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '06_create_neo_db/cypher')) + ':/cypher',
'--writable-tmpfs',
'--network=none',
'--env NEO4J_AUTH=none',
Expand All @@ -49,8 +49,8 @@ def main():
] + list(map(lambda f: "-v " + os.path.abspath(f) + ":/mnt/" + os.path.basename(f), glob.glob(args.in_csv_path + "/neo_*"))) + [
'-v ' + shlex.quote(neo_data_path) + ':/data',
'-v ' + shlex.quote(neo_logs_path) + ':/logs',
'-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '07_create_db/neo4j/neo4j_import.dockersh')) + ':/import.sh',
'-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '07_create_db/neo4j/cypher')) + ':/cypher',
'-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '06_create_neo_db/neo4j_import.dockersh')) + ':/import.sh',
'-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '06_create_neo_db/cypher')) + ':/cypher',
'-e NEO4J_AUTH=none',
'neo4j:5.18.0',
'bash /import.sh'
Expand Down
21 changes: 21 additions & 0 deletions dataload/07_run_queries/add_query_metadatas_to_graph_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

import json
import sys

def main():
graph_metadata_filename = sys.argv[1]
query_metadata_filenames = sys.argv[2:]

with open(graph_metadata_filename, 'r') as file:
graph_metadata = json.load(file)
graph_metadata['materialised_queries'] = []
for query_metadata_filename in query_metadata_filenames:
with open(query_metadata_filename, 'r') as file:
query_metadata = json.load(file)
graph_metadata['materialised_queries'].append(query_metadata)

print(json.dumps(graph_metadata, indent=2))

if __name__=="__main__":
main()

File renamed without changes.
26 changes: 26 additions & 0 deletions dataload/07_run_queries/jsonl_to_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

import sys
import pandas as pd
import json

def main():
data = []
for line in sys.stdin:
line = line.strip()
if line:
obj = json.loads(line)
for key, value in obj.items():
if isinstance(value, list):
obj[key] = ';'.join(map(str, value))
elif isinstance(value, dict):
obj[key] = json.dumps(value)
else:
obj[key] = str(value)
data.append(obj)

df = pd.DataFrame(data)
df.to_csv(sys.stdout, index=False)

if __name__ == "__main__":
main()

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import os
from pathlib import Path
from pandas import DataFrame
import json
from timeit import default_timer as timer
from datetime import datetime

os.system('echo "dbms.security.auth_enabled=false" >> /var/lib/neo4j/conf/neo4j.conf')

Expand All @@ -15,6 +15,8 @@ from py2neo import Graph
import yaml
graph = Graph("bolt://localhost:7687")

metadatas = []

for file in os.listdir("/materialised_queries"):
if not file.endswith(".yaml"):
continue
Expand All @@ -23,22 +25,29 @@ for file in os.listdir("/materialised_queries"):

query = yaml.safe_load(open(f"/materialised_queries/{file}"))

start_time = timer()
start_time = datetime.now()

print(f"Running query {query_id}")
df = DataFrame(graph.run(query['cypher_query']).data())

end_time = timer()
with open(f"/out/{query_id}.results.jsonl", "w") as f:
for row in graph.run(query['cypher_query']).data():
json.dump(row, f, skipkeys=True)
f.write("\n")

end_time = datetime.now()

query['start_time'] = start_time
query['end_time'] = end_time
query['time'] = end_time - start_time
query['id'] = query_id
query['start_time'] = start_time.strftime("%Y-%m-%d %H:%M:%S")
query['end_time'] = end_time.strftime("%Y-%m-%d %H:%M:%S")
query['time'] = (end_time - start_time).total_seconds()

print(f"Saving {len(df)} rows to {Path(f'/out/{query_id}.csv.gz')}")
df.to_csv(Path(f"/out/{query_id}.csv.gz"), index=False, compression="gzip")
metadatas.append(query)

with open(f"/out/{query_id}.json", "w") as f:
json.dump(query, f)
json.dump(query, f, skipkeys=True)

with open(f"/out/queries.json", "w") as f:
json.dump(metadatas, f, skipkeys=True)

os.system("sleep 20")
os.system("neo4j stop")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def main():

parser = argparse.ArgumentParser(description='Materialise Cypher queries as CSV')
parser.add_argument('--in-db-path', type=str, help='Path with the neo4j database to query', required=True)
parser.add_argument('--out-csvs-path', type=str, help='Path for the output csv files of materialised results', required=True)
parser.add_argument('--out-jsons-path', type=str, help='Path for the output json files of materialised results', required=True)
args = parser.parse_args()

has_singularity = os.system('which singularity') == 0
Expand All @@ -23,9 +23,9 @@ def main():
neo_data_path = os.path.abspath(os.path.join(neo_path, "data"))
neo_logs_path = os.path.abspath(os.path.join(neo_path, "logs"))

csvs_path = args.out_csvs_path
jsons_path = args.out_jsons_path

os.makedirs(csvs_path)
os.makedirs(jsons_path)

if has_singularity:
cmd = ' '.join([
Expand All @@ -34,24 +34,29 @@ def main():
'--bind ' + os.path.abspath(".") + ':/mnt',
'--bind ' + shlex.quote(neo_data_path) + ':/data',
'--bind ' + shlex.quote(neo_logs_path) + ':/logs',
'--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '08_run_queries/run_queries.dockerpy')) + ':/run_queries.py',
'--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '07_run_queries/run_queries.dockerpy')) + ':/run_queries.py',
'--bind ' + os.path.abspath(os.environ['GREBI_QUERY_YAMLS_PATH']) + ':/materialised_queries',
'--bind ' + os.path.abspath(args.out_csvs_path) + ':/out',
'--bind ' + os.path.abspath(args.out_jsons_path) + ':/out',
'--writable-tmpfs',
'--network=none',
'--env NEO4J_AUTH=none',
'--env NEO4J_server_memory_heap_initial__size=300G',
'--env NEO4J_server_memory_heap_max__size=300G',
'--env NEO4J_server_memory_pagecache_size=150G',
'--env NEO4J_dbms_memory_transaction_total_max=150G',
'--env TINI_SUBREAPER=true',
'docker://ghcr.io/ebispot/grebi_neo4j_with_extras:5.18.0',
'python3 /run_queries.py'
])
else:
cmd = ' '.join([
'docker run',
'--user="$(id -u):$(id -g)"'
'--user="$(id -u):$(id -g)"',
'-v ' + shlex.quote(neo_data_path) + ':/data',
'-v ' + shlex.quote(neo_logs_path) + ':/logs',
'-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '08_run_queries/run_queries.dockerpy')) + ':/run_queries.py',
'-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '07_run_queries/run_queries.dockerpy')) + ':/run_queries.py',
'-v ' + os.path.abspath(os.environ['GREBI_QUERY_YAMLS_PATH']) + ':/materialised_queries',
'-v ' + os.path.abspath(args.out_csvs_path) + ':/out',
'-v ' + os.path.abspath(args.out_jsons_path) + ':/out',
'-e NEO4J_AUTH=none',
'ghcr.io/ebispot/grebi_neo4j_with_extras:5.18.0',
'python3 /run_queries.py'
Expand Down
14 changes: 14 additions & 0 deletions dataload/08_create_other_dbs/solr/grebi_link_results/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

[package]
name = "grebi_link_results"
version = "0.1.0"
edition = "2021"

[dependencies]
serde_json = { version = "1.0.108", features=["preserve_order"] }
grebi_shared = { path = "../../../grebi_shared" }
csv = "1.3.0"
lmdb-zero = "0.4.4"
bloomfilter = "1.0.13"
jemallocator = "0.5.4"
clap = { version = "4.4.11", features = ["derive"] }
Loading

0 comments on commit 4d087db

Please sign in to comment.