diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 0572482a..95617323 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -41,4 +41,4 @@ jobs: - name: Run pytest timeout-minutes: 15 - run: poetry run pytest -m "all_examples or metahyper" + run: poetry run pytest -m "all_examples or metahyper or summary_csv" diff --git a/neps_examples/basic_usage/hyperparameters.py b/neps_examples/basic_usage/hyperparameters.py index 338be1ab..ad7a344b 100644 --- a/neps_examples/basic_usage/hyperparameters.py +++ b/neps_examples/basic_usage/hyperparameters.py @@ -25,5 +25,6 @@ def run_pipeline(float1, float2, categorical, integer1, integer2): run_pipeline=run_pipeline, pipeline_space=pipeline_space, root_directory="results/hyperparameters_example", + post_run_summary=True, max_evaluations_total=15, ) diff --git a/pyproject.toml b/pyproject.toml index f78987cc..19c4336f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,7 +96,7 @@ line_length = 90 [tool.pytest.ini_options] addopts = "--basetemp ./tests_tmpdir -m 'core_examples or yaml_api'" -markers = ["all_examples", "core_examples", "regression_all", "metahyper", "yaml_api"] +markers = ["all_examples", "core_examples", "regression_all", "metahyper", "yaml_api", "summary_csv"] filterwarnings = "ignore::DeprecationWarning:torch.utils.tensorboard.*:" [tool.mypy] diff --git a/src/neps/api.py b/src/neps/api.py index 54cfab8a..8ee1d869 100644 --- a/src/neps/api.py +++ b/src/neps/api.py @@ -17,6 +17,7 @@ from .plot.tensorboard_eval import tblogger from .search_spaces.parameter import Parameter from .search_spaces.search_space import SearchSpace, pipeline_space_from_configspace +from .status.status import post_run_csv from .utils.common import get_searcher_data from .utils.result_utils import get_loss @@ -97,6 +98,7 @@ def run( pipeline_space: dict[str, Parameter | CS.ConfigurationSpace] | CS.ConfigurationSpace, root_directory: str | Path, overwrite_working_directory: bool = False, + post_run_summary: bool = False, development_stage_id=None, task_id=None, max_evaluations_total: int | None = None, @@ -135,6 +137,8 @@ def run( synchronize multiple calls to run(.) for parallelization. overwrite_working_directory: If true, delete the working directory at the start of the run. This is, e.g., useful when debugging a run_pipeline function. + post_run_summary: If true, creates csv files after the worker is done, + holding summary information about the configs and results. development_stage_id: ID for the current development stage. Only needed if you work with multiple development stages. task_id: ID for the current task. Only needed if you work with multiple @@ -319,3 +323,6 @@ def run( ), overwrite_optimization_dir=overwrite_working_directory, ) + + if post_run_summary is not None: + post_run_csv(root_directory=root_directory, logger=logger) diff --git a/src/neps/status/status.py b/src/neps/status/status.py index 28103c4e..b5fae9f2 100644 --- a/src/neps/status/status.py +++ b/src/neps/status/status.py @@ -4,7 +4,10 @@ from pathlib import Path from typing import Any +import pandas as pd + from metahyper import read +from metahyper._locker import Locker from metahyper.api import ConfigResult from ..search_spaces.search_space import SearchSpace @@ -121,3 +124,218 @@ def status( print(all_loss_config.read_text(encoding="utf-8")) return summary["previous_results"], summary["pending_configs"] + + +def _initiate_summary_csv( + root_directory: str | Path, + logger: logging.Logger, +) -> tuple[Path, Path, Locker]: + """ + Initialize summary CSV files and an associated locker for file access control. + + Args: + root_directory (str | Path): The root directory where the summary CSV directory, + containing CSV files and a locker for file access control, will be created. + logger (logging.Logger): A logger for log messages. + + Returns: + Tuple[Path, Path, Locker]: A tuple containing the file paths for the + configuration data CSV, run data CSV, and a locker file. + + The locker is used for file access control to ensure data integrity in a + multi-threaded or multi-process environment. + """ + root_directory = Path(root_directory) + summary_csv_directory = Path(root_directory / "summary_csv") + summary_csv_directory.mkdir(parents=True, exist_ok=True) + + csv_config_data = summary_csv_directory / "config_data.csv" + csv_run_data = summary_csv_directory / "run_data.csv" + + csv_lock_file = summary_csv_directory / ".csv_lock" + csv_lock_file.touch(exist_ok=True) + csv_locker = Locker(csv_lock_file, logger.getChild("_locker")) + + return ( + csv_config_data, + csv_run_data, + csv_locker, + ) + + +def _get_dataframes_from_summary( + root_directory: str | Path, + include_metadatas: bool = True, + include_results: bool = True, + include_configs: bool = True, +) -> tuple[pd.DataFrame, pd.DataFrame]: + """ + Generate Pandas DataFrames from summary data retrieved from a run. + + Args: + root_directory (str | Path): The root directory of the NePS run. + include_metadatas (bool): Include metadata in the DataFrames (Default: True). + include_results (bool): Include results in the DataFrames (Default: True). + include_configs (bool): Include configurations in the DataFrames (Default: True). + + Returns: + Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame] + """ + indices_prev = [] + config_data_prev = [] + result_data_prev = [] + metadata_data_prev = [] + + indices_pending = [] + config_data_pending = [] + + summary = get_summary_dict(root_directory=root_directory, add_details=True) + + for key_prev, config_result_prev in summary["previous_results"].items(): + indices_prev.append(str(key_prev)) + if include_configs: + config_data_prev.append(config_result_prev.config) + if include_results: + result_data_prev.append(config_result_prev.result) + if include_metadatas: + metadata_data_prev.append(config_result_prev.metadata) + + for key_pending, config_pending in summary["pending_configs"].items(): + indices_pending.append(str(key_pending)) + if include_configs: + config_data_pending.append(config_pending) + + # Creating the dataframe for previous config results. + df_previous = pd.DataFrame({"Config_id": indices_prev}) + df_previous["Status"] = "Complete" + df_previous = pd.concat( + [df_previous, pd.json_normalize(config_data_prev).add_prefix("config.")], axis=1 + ) + df_previous = pd.concat( + [df_previous, pd.json_normalize(metadata_data_prev).add_prefix("metadata.")], + axis=1, + ) + df_previous = pd.concat( + [df_previous, pd.json_normalize(result_data_prev).add_prefix("result.")], axis=1 + ) + + # Creating dataframe for pending configs. + df_pending = pd.DataFrame({"Config_id": indices_pending}) + df_pending["Status"] = "Pending" + df_pending = pd.concat( + [df_pending, pd.json_normalize(config_data_pending).add_prefix("config.")], + axis=1, + ) + + # Concatenate the two DataFrames + df_config_data = pd.concat([df_previous, df_pending], join="outer", ignore_index=True) + + # Create a dataframe with the specified additional summary data + additional_data = { + "num_evaluated_configs": summary["num_evaluated_configs"], + "num_pending_configs": summary["num_pending_configs"], + "num_pending_configs_with_worker": summary["num_pending_configs_with_worker"], + "best_loss": summary["best_loss"], + "best_config_id": summary["best_config_id"], + "num_error": summary["num_error"], + } + + df_run_data = pd.DataFrame.from_dict( + additional_data, orient="index", columns=["Value"] + ) + df_run_data.index.name = "Description" + + return df_config_data, df_run_data + + +def _save_data_to_csv( + config_data_file_path: Path, + run_data_file_path: Path, + locker: Locker, + config_data_df: pd.DataFrame, + run_data_df: pd.DataFrame, +) -> None: + """ + Save data to CSV files while acquiring a lock for data integrity. + + Args: + config_data_file_path (Path | str): The path to the CSV file for configuration data. + run_data_file_path (Path | str): The path to the CSV file for additional run data. + locker (Locker): An object for acquiring and releasing a lock to ensure data integrity. + config_data_df (pd.DataFrame): The DataFrame containing configuration data. + run_data_df (pd.DataFrame): The DataFrame containing additional run data. + + This function saves data to CSV files while acquiring a lock to prevent concurrent writes. + If the lock is acquired, it writes the data to the CSV files and releases the lock. + """ + should_break = False + while not should_break: + if locker.acquire_lock(): + try: + pending_configs = run_data_df.loc["num_pending_configs", "Value"] + pending_configs_with_worker = run_data_df.loc[ + "num_pending_configs_with_worker", "Value" + ] + # Represents the last worker + if int(pending_configs) == 0 and int(pending_configs_with_worker) == 0: + config_data_df = config_data_df.sort_values( + by="result.loss", ascending=True + ) + config_data_df.to_csv(config_data_file_path, index=False, mode="w") + run_data_df.to_csv(run_data_file_path, index=True, mode="w") + + if run_data_file_path.exists(): + prev_run_data_df = pd.read_csv(run_data_file_path) + prev_run_data_df.set_index("Description", inplace=True) + + num_evaluated_configs_csv = prev_run_data_df.loc[ + "num_evaluated_configs", "Value" + ] + num_evaluated_configs_run = run_data_df.loc[ + run_data_df.index == "num_evaluated_configs", "Value" + ] + # checks if the current worker has more evaluated configs than the previous + if int(num_evaluated_configs_csv) < int(num_evaluated_configs_run): + config_data_df = config_data_df.sort_values( + by="result.loss", ascending=True + ) + config_data_df.to_csv( + config_data_file_path, index=False, mode="w" + ) + run_data_df.to_csv(run_data_file_path, index=True, mode="w") + # Represents the first worker to be evaluated + else: + config_data_df = config_data_df.sort_values( + by="result.loss", ascending=True + ) + config_data_df.to_csv(config_data_file_path, index=False, mode="w") + run_data_df.to_csv(run_data_file_path, index=True, mode="w") + except Exception as e: + raise RuntimeError(f"Error during data saving: {e}") from e + finally: + locker.release_lock() + should_break = True + + +def post_run_csv(root_directory: str | Path, logger=None) -> None: + if logger is None: + logger = logging.getLogger("neps_status") + + csv_config_data, csv_rundata, csv_locker = _initiate_summary_csv( + root_directory, logger=logger + ) + + df_config_data, df_run_data = _get_dataframes_from_summary( + root_directory, + include_metadatas=True, + include_results=True, + include_configs=True, + ) + + _save_data_to_csv( + csv_config_data, + csv_rundata, + csv_locker, + df_config_data, + df_run_data, + ) diff --git a/tests/test_metahyper/test_locking.py b/tests/test_metahyper/test_locking.py index e397ec47..aae99f4d 100644 --- a/tests/test_metahyper/test_locking.py +++ b/tests/test_metahyper/test_locking.py @@ -3,11 +3,13 @@ import subprocess from pathlib import Path +import pandas as pd import pytest from more_itertools import first_true @pytest.mark.metahyper +@pytest.mark.summary_csv def test_filelock() -> None: """Test that the filelocking method of parallelization works as intended.""" # Note: Not using tmpdir @@ -70,3 +72,25 @@ def test_filelock() -> None: finally: if results_dir.exists(): shutil.rmtree(results_dir) + + +@pytest.mark.summary_csv +def test_summary_csv(): + # Testing the csv files output. + try: + summary_dir = Path("results") / "hyperparameters_example" / "summary_csv" + assert summary_dir.is_dir() + + run_data_df = pd.read_csv(summary_dir / "run_data.csv") + run_data_df.set_index("Description", inplace=True) + num_evaluated_configs_csv = run_data_df.loc["num_evaluated_configs", "Value"] + assert num_evaluated_configs_csv == 15 + + config_data_df = pd.read_csv(summary_dir / "config_data.csv") + assert config_data_df.shape[0] == 15 + assert (config_data_df["Status"] == "Complete").all() + except Exception as e: + raise e + finally: + if summary_dir.exists(): + shutil.rmtree(summary_dir)