Skip to content

Commit

Permalink
📝 Updating documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
rcap107 committed Jul 8, 2024
1 parent 2af6001 commit ba7ca23
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 14 deletions.
2 changes: 0 additions & 2 deletions analysis_query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
to measure the overall statistics of the candidates returned by each.
The results of this script are saved in results/stats/ and are used for some of the plotting scripts.
Author: Riccardo Cappuzzo
"""

import datetime
Expand Down
13 changes: 7 additions & 6 deletions import_from_starmie.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# %%
"""
Simple script used to import the results obtained by Starmie.
Given the path to the Starmie results, this script builds the data structures
needed to run the pipeline.
"""

import argparse
from pathlib import Path

Expand All @@ -10,20 +16,15 @@ def parse_args():
parser.add_argument(
"base_path", type=Path, help="Path to STARMIE dir containing the query results."
)
# parser.add_argument(
# "data_lake_version", type=str, help="Data lake version to evaluate."
# )

return parser.parse_args()


if __name__ == "__main__":
args = parse_args()

# data_lake_version = args.data_lake_version
root_path = args.base_path
data_lake_version = root_path.stem
# base_path = Path(root_path, data_lake_version)

print(root_path)

Expand Down
13 changes: 13 additions & 0 deletions metadata_creation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
"""
One-time script to be run on each data lake to prepare the metadata that is used for all successive operations.
Provide a path to the root data folder that contains the data lake, then the script will recursively explore all folders
and add all files with ".parquet" extension to the metadata index.
The metadata index is used throughout the pipeline to identify each table a data lake.
"""

import argparse
import os
from pathlib import Path
Expand Down Expand Up @@ -29,9 +38,11 @@ def prepare_metadata_from_case(data_folder, flat=False):
case += "_flat"
if data_folder.exists() and data_folder.is_dir():
print(f"Building metadata for folder {data_folder}")
# Importing here to save time, so that if the folder is missing we find out immediately.
from src.data_structures.metadata import MetadataIndex
from src.utils.indexing import save_single_table

# Creating the dirtree
os.makedirs(f"data/metadata/{case}", exist_ok=True)
os.makedirs("data/metadata/_mdi", exist_ok=True)

Expand All @@ -42,6 +53,8 @@ def prepare_metadata_from_case(data_folder, flat=False):
raise RuntimeError("No parquet files found. Is the path correct? ")

metadata_dest = f"data/metadata/{case}"

# Prepare the metadata by running on all available cores
Parallel(n_jobs=-1, verbose=0)(
delayed(save_single_table)(dataset_path, "yadl", metadata_dest)
for dataset_path in tqdm(data_folder.glob(match_pattern), total=total_files)
Expand Down
5 changes: 5 additions & 0 deletions prepare_retrieval_methods.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
This script is used to prepare to build the objects used by the differernt retrieval methods according to the config
specified by the user. Default configurations are provided in folder `config/retrieval/prepare`.
"""

import argparse
from pathlib import Path

Expand Down
23 changes: 18 additions & 5 deletions profile_retrieval.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
"""
This script is used to profile the runtime and peak memory usage of the different retrieval methods. It incorporates
various functions to improve the reliability of the measurements: it is possible to repeat the same operations multiple
times to reduce the variance in the results.
To improve reliability, this script first builds the index and then queries it: it will overwrite any pre-built index in
each iteration.
Note that Starmie is profiled in a different repository, so it is not included here.
"""

import argparse
import datetime as dt
import os
from pathlib import Path

import polars as pl
from memory_profiler import memory_usage, profile
from memory_profiler import memory_usage
from sklearn.model_selection import ParameterGrid
from tqdm import tqdm

Expand All @@ -19,6 +29,8 @@


def wrapper_prepare_exact_matching(queries, method_config, index_dir):
"""Utility function to collect the functions required to prepare Exact Matching.
"""
time_save = 0
time_create = 0
for query_case in tqdm(queries, position=1, total=len(queries), desc="Query: "):
Expand Down Expand Up @@ -213,7 +225,7 @@ def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--data_lake_version", action="store")
parser.add_argument("--retrieval_method", action="store")
parser.add_argument("--rerank", action="store_true")
parser.add_argument("--rerank", action="store_true", help="Set True with retrieval_method=minhash to test hybrid minhash.")

return parser.parse_args()

Expand All @@ -226,6 +238,7 @@ def parse_args():

data_lake_version = args.data_lake_version

# Open Data US has specific queries, so they're prepared explicitly here.
if data_lake_version == "open_data_us":
base_table_root = "data/source_tables/open_data_us"
queries = [
Expand Down Expand Up @@ -273,6 +286,7 @@ def parse_args():
]

else:
# All YADL data lakes have the same format for the queries.
base_table_root = "data/source_tables/yadl/"
queries = [
(
Expand Down Expand Up @@ -305,9 +319,9 @@ def parse_args():
),
]

# retrieval_method = "minhash"
retrieval_method = args.retrieval_method

# In the following: add new parameters to build a parameter grid and test all combinations.
if retrieval_method == "exact_matching":
method_config = {
"metadata_dir": [f"data/metadata/{data_lake_version}"],
Expand All @@ -321,7 +335,6 @@ def parse_args():
"metadata_dir": [f"data/metadata/{data_lake_version}"],
"n_jobs": [32],
"thresholds": [20],
# "thresholds": [20, 60, 80],
"no_tag": [False],
"rerank": [args.rerank],
}
Expand Down
31 changes: 30 additions & 1 deletion query_indices.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
'''
This script is used to query the retrieval methods. Since the querying step may take a long time and can be done offline,
the step is done separately from the execution of the pipeline.
The script runs by reading a configuration file that is provided by the user. Sample query files for the different data
lakes and settings are provided in folder `config/retrieval/query`.
Run the script as follows:
```
python query_indices.py path/to/config.toml
```
'''

import argparse
import os
from pathlib import Path
Expand All @@ -21,32 +35,43 @@


def parse_args():
"""Parse the configuration file from CLI.
"""
parser = argparse.ArgumentParser()
parser.add_argument("config_file", action="store", type=argparse.FileType("r"))

return parser.parse_args()


def prepare_dirtree():
"""Prepare the dirtree to ensure that the required folders are found.
"""
os.makedirs("data/metadata/queries", exist_ok=True)
os.makedirs("results/query_results", exist_ok=True)


if __name__ == "__main__":

# Read the config file
args = parse_args()
prepare_dirtree()
config = toml.load(args.config_file)

# Get the parameters
jd_methods = config["join_discovery_method"]
data_lake_version = config["data_lake"]
rerank = config.get("hybrid", False)
# Iterations are used only to re-run the same configuration multiple times to log multiple runs and reduce variance
# in the results
iterations = config.get("iterations", 1)
query_cases = config["query_cases"]

# Load the metadata index.
mdata_index = get_metadata_index(data_lake_version)

for it in tqdm(range(iterations), position=1):
if "minhash" in jd_methods:
# If the index is minhash, the index is loaded only once, then queried multiple times.
index_name = "minhash_hybrid" if rerank else "minhash"
# The SimpleIndexLogger is used to track the time required to query each index
logger_minhash = SimpleIndexLogger(
Expand All @@ -71,7 +96,7 @@ def prepare_dirtree():
index = minhash_index
index_logger = logger_minhash
elif jd_method == "exact_matching":
# Otherwise, the index is loaded once for each query and the logger is created with it.
# Exact matching must be loaded once per querying result.
index_logger = SimpleIndexLogger(
index_name=jd_method,
step="query",
Expand All @@ -91,6 +116,8 @@ def prepare_dirtree():
index_logger.end_time("load")

elif jd_method == "starmie":
# Indexing was done by starmie, so here we are just loading the query results and converting
# them to the format used for the pipeline.
index_logger = SimpleIndexLogger(
index_name=jd_method,
step="query",
Expand All @@ -108,6 +135,8 @@ def prepare_dirtree():
else:
raise ValueError(f"Unknown jd_method {jd_method}")
index_logger.update_query_parameters(tname, query_column)

# The index has been loaded, now it is queried and the results are saved.
query_result, index_logger = query_index(
index,
query_tab_path,
Expand Down

0 comments on commit ba7ca23

Please sign in to comment.