Skip to content

Commit

Permalink
Adding post-run summary files + testing
Browse files Browse the repository at this point in the history
  • Loading branch information
TarekAbouChakra committed Nov 8, 2023
1 parent 1d114eb commit 5d17aa7
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ jobs:

- name: Run pytest
timeout-minutes: 15
run: poetry run pytest -m "all_examples or metahyper"
run: poetry run pytest -m "all_examples or metahyper or summary_csv"
1 change: 1 addition & 0 deletions neps_examples/basic_usage/hyperparameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ def run_pipeline(float1, float2, categorical, integer1, integer2):
run_pipeline=run_pipeline,
pipeline_space=pipeline_space,
root_directory="results/hyperparameters_example",
post_run_summary=True,
max_evaluations_total=15,
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ line_length = 90

[tool.pytest.ini_options]
addopts = "--basetemp ./tests_tmpdir -m 'core_examples or yaml_api'"
markers = ["all_examples", "core_examples", "regression_all", "metahyper", "yaml_api"]
markers = ["all_examples", "core_examples", "regression_all", "metahyper", "yaml_api", "summary_csv"]
filterwarnings = "ignore::DeprecationWarning:torch.utils.tensorboard.*:"

[tool.mypy]
Expand Down
7 changes: 7 additions & 0 deletions src/neps/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .plot.tensorboard_eval import tblogger
from .search_spaces.parameter import Parameter
from .search_spaces.search_space import SearchSpace, pipeline_space_from_configspace
from .status.status import post_run_csv
from .utils.common import get_searcher_data
from .utils.result_utils import get_loss

Expand Down Expand Up @@ -97,6 +98,7 @@ def run(
pipeline_space: dict[str, Parameter | CS.ConfigurationSpace] | CS.ConfigurationSpace,
root_directory: str | Path,
overwrite_working_directory: bool = False,
post_run_summary: bool = False,
development_stage_id=None,
task_id=None,
max_evaluations_total: int | None = None,
Expand Down Expand Up @@ -135,6 +137,8 @@ def run(
synchronize multiple calls to run(.) for parallelization.
overwrite_working_directory: If true, delete the working directory at the start of
the run. This is, e.g., useful when debugging a run_pipeline function.
post_run_summary: If true, creates csv files after the worker is done,
holding summary information about the configs and results.
development_stage_id: ID for the current development stage. Only needed if
you work with multiple development stages.
task_id: ID for the current task. Only needed if you work with multiple
Expand Down Expand Up @@ -319,3 +323,6 @@ def run(
),
overwrite_optimization_dir=overwrite_working_directory,
)

if post_run_summary is not None:
post_run_csv(root_directory=root_directory, logger=logger)
218 changes: 218 additions & 0 deletions src/neps/status/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
from pathlib import Path
from typing import Any

import pandas as pd

from metahyper import read
from metahyper._locker import Locker
from metahyper.api import ConfigResult

from ..search_spaces.search_space import SearchSpace
Expand Down Expand Up @@ -121,3 +124,218 @@ def status(
print(all_loss_config.read_text(encoding="utf-8"))

return summary["previous_results"], summary["pending_configs"]


def _initiate_summary_csv(
root_directory: str | Path,
logger: logging.Logger,
) -> tuple[Path, Path, Locker]:
"""
Initialize summary CSV files and an associated locker for file access control.
Args:
root_directory (str | Path): The root directory where the summary CSV directory,
containing CSV files and a locker for file access control, will be created.
logger (logging.Logger): A logger for log messages.
Returns:
Tuple[Path, Path, Locker]: A tuple containing the file paths for the
configuration data CSV, run data CSV, and a locker file.
The locker is used for file access control to ensure data integrity in a
multi-threaded or multi-process environment.
"""
root_directory = Path(root_directory)
summary_csv_directory = Path(root_directory / "summary_csv")
summary_csv_directory.mkdir(parents=True, exist_ok=True)

csv_config_data = summary_csv_directory / "config_data.csv"
csv_run_data = summary_csv_directory / "run_data.csv"

csv_lock_file = summary_csv_directory / ".csv_lock"
csv_lock_file.touch(exist_ok=True)
csv_locker = Locker(csv_lock_file, logger.getChild("_locker"))

return (
csv_config_data,
csv_run_data,
csv_locker,
)


def _get_dataframes_from_summary(
root_directory: str | Path,
include_metadatas: bool = True,
include_results: bool = True,
include_configs: bool = True,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Generate Pandas DataFrames from summary data retrieved from a run.
Args:
root_directory (str | Path): The root directory of the NePS run.
include_metadatas (bool): Include metadata in the DataFrames (Default: True).
include_results (bool): Include results in the DataFrames (Default: True).
include_configs (bool): Include configurations in the DataFrames (Default: True).
Returns:
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
"""
indices_prev = []
config_data_prev = []
result_data_prev = []
metadata_data_prev = []

indices_pending = []
config_data_pending = []

summary = get_summary_dict(root_directory=root_directory, add_details=True)

for key_prev, config_result_prev in summary["previous_results"].items():
indices_prev.append(str(key_prev))
if include_configs:
config_data_prev.append(config_result_prev.config)
if include_results:
result_data_prev.append(config_result_prev.result)
if include_metadatas:
metadata_data_prev.append(config_result_prev.metadata)

for key_pending, config_pending in summary["pending_configs"].items():
indices_pending.append(str(key_pending))
if include_configs:
config_data_pending.append(config_pending)

# Creating the dataframe for previous config results.
df_previous = pd.DataFrame({"Config_id": indices_prev})
df_previous["Status"] = "Complete"
df_previous = pd.concat(
[df_previous, pd.json_normalize(config_data_prev).add_prefix("config.")], axis=1
)
df_previous = pd.concat(
[df_previous, pd.json_normalize(metadata_data_prev).add_prefix("metadata.")],
axis=1,
)
df_previous = pd.concat(
[df_previous, pd.json_normalize(result_data_prev).add_prefix("result.")], axis=1
)

# Creating dataframe for pending configs.
df_pending = pd.DataFrame({"Config_id": indices_pending})
df_pending["Status"] = "Pending"
df_pending = pd.concat(
[df_pending, pd.json_normalize(config_data_pending).add_prefix("config.")],
axis=1,
)

# Concatenate the two DataFrames
df_config_data = pd.concat([df_previous, df_pending], join="outer", ignore_index=True)

# Create a dataframe with the specified additional summary data
additional_data = {
"num_evaluated_configs": summary["num_evaluated_configs"],
"num_pending_configs": summary["num_pending_configs"],
"num_pending_configs_with_worker": summary["num_pending_configs_with_worker"],
"best_loss": summary["best_loss"],
"best_config_id": summary["best_config_id"],
"num_error": summary["num_error"],
}

df_run_data = pd.DataFrame.from_dict(
additional_data, orient="index", columns=["Value"]
)
df_run_data.index.name = "Description"

return df_config_data, df_run_data


def _save_data_to_csv(
config_data_file_path: Path,
run_data_file_path: Path,
locker: Locker,
config_data_df: pd.DataFrame,
run_data_df: pd.DataFrame,
) -> None:
"""
Save data to CSV files while acquiring a lock for data integrity.
Args:
config_data_file_path (Path | str): The path to the CSV file for configuration data.
run_data_file_path (Path | str): The path to the CSV file for additional run data.
locker (Locker): An object for acquiring and releasing a lock to ensure data integrity.
config_data_df (pd.DataFrame): The DataFrame containing configuration data.
run_data_df (pd.DataFrame): The DataFrame containing additional run data.
This function saves data to CSV files while acquiring a lock to prevent concurrent writes.
If the lock is acquired, it writes the data to the CSV files and releases the lock.
"""
should_break = False
while not should_break:
if locker.acquire_lock():
try:
pending_configs = run_data_df.loc["num_pending_configs", "Value"]
pending_configs_with_worker = run_data_df.loc[
"num_pending_configs_with_worker", "Value"
]
# Represents the last worker
if int(pending_configs) == 0 and int(pending_configs_with_worker) == 0:
config_data_df = config_data_df.sort_values(
by="result.loss", ascending=True
)
config_data_df.to_csv(config_data_file_path, index=False, mode="w")
run_data_df.to_csv(run_data_file_path, index=True, mode="w")

if run_data_file_path.exists():
prev_run_data_df = pd.read_csv(run_data_file_path)
prev_run_data_df.set_index("Description", inplace=True)

num_evaluated_configs_csv = prev_run_data_df.loc[
"num_evaluated_configs", "Value"
]
num_evaluated_configs_run = run_data_df.loc[
run_data_df.index == "num_evaluated_configs", "Value"
]
# checks if the current worker has more evaluated configs than the previous
if int(num_evaluated_configs_csv) < int(num_evaluated_configs_run):
config_data_df = config_data_df.sort_values(
by="result.loss", ascending=True
)
config_data_df.to_csv(
config_data_file_path, index=False, mode="w"
)
run_data_df.to_csv(run_data_file_path, index=True, mode="w")
# Represents the first worker to be evaluated
else:
config_data_df = config_data_df.sort_values(
by="result.loss", ascending=True
)
config_data_df.to_csv(config_data_file_path, index=False, mode="w")
run_data_df.to_csv(run_data_file_path, index=True, mode="w")
except Exception as e:
raise RuntimeError(f"Error during data saving: {e}") from e
finally:
locker.release_lock()
should_break = True


def post_run_csv(root_directory: str | Path, logger=None) -> None:
if logger is None:
logger = logging.getLogger("neps_status")

csv_config_data, csv_rundata, csv_locker = _initiate_summary_csv(
root_directory, logger=logger
)

df_config_data, df_run_data = _get_dataframes_from_summary(
root_directory,
include_metadatas=True,
include_results=True,
include_configs=True,
)

_save_data_to_csv(
csv_config_data,
csv_rundata,
csv_locker,
df_config_data,
df_run_data,
)
24 changes: 24 additions & 0 deletions tests/test_metahyper/test_locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import subprocess
from pathlib import Path

import pandas as pd
import pytest
from more_itertools import first_true


@pytest.mark.metahyper
@pytest.mark.summary_csv
def test_filelock() -> None:
"""Test that the filelocking method of parallelization works as intended."""
# Note: Not using tmpdir
Expand Down Expand Up @@ -70,3 +72,25 @@ def test_filelock() -> None:
finally:
if results_dir.exists():
shutil.rmtree(results_dir)


@pytest.mark.summary_csv
def test_summary_csv():
# Testing the csv files output.
try:
summary_dir = Path("results") / "hyperparameters_example" / "summary_csv"
assert summary_dir.is_dir()

run_data_df = pd.read_csv(summary_dir / "run_data.csv")
run_data_df.set_index("Description", inplace=True)
num_evaluated_configs_csv = run_data_df.loc["num_evaluated_configs", "Value"]
assert num_evaluated_configs_csv == 15

config_data_df = pd.read_csv(summary_dir / "config_data.csv")
assert config_data_df.shape[0] == 15
assert (config_data_df["Status"] == "Complete").all()
except Exception as e:
raise e
finally:
if summary_dir.exists():
shutil.rmtree(summary_dir)

0 comments on commit 5d17aa7

Please sign in to comment.