Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored Metahyper and Its Test Cases #54

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 53 additions & 30 deletions neps/metahyper/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ def store_in_run_data(
ConfigInRun.previous_pipeline_directory = previous_pipeline_directory
ConfigInRun.optimization_dir = Path(optimization_dir)

@staticmethod
def empty_config_data():
ConfigInRun.config = None
ConfigInRun.config_id = None
ConfigInRun.pipeline_directory = None
ConfigInRun.previous_pipeline_directory = None


class Sampler(ABC):
# pylint: disable=no-self-use,unused-argument
Expand Down Expand Up @@ -164,6 +171,8 @@ def _process_sampler_info(
finally:
decision_locker.release_lock()
should_break = True
else:
time.sleep(3)


def _load_sampled_paths(optimization_dir: Path | str, serializer, logger):
Expand Down Expand Up @@ -264,16 +273,14 @@ def read(optimization_dir: Path | str, serializer=None, logger=None, do_lock=Tru


def _check_max_evaluations(
optimization_dir,
max_evaluations,
serializer,
previous_results,
pending_configs,
pending_configs_free,
logger,
max_evaluations,
continue_until_max_evaluation_completed,
):
logger.debug("Checking if max evaluations is reached")
previous_results, pending_configs, pending_configs_free = read(
optimization_dir, serializer, logger
)
evaluation_count = len(previous_results)

# Taking into account pending evaluations
Expand All @@ -283,12 +290,16 @@ def _check_max_evaluations(
return evaluation_count >= max_evaluations


def _sample_config(optimization_dir, sampler, serializer, logger, pre_load_hooks):
# First load the results and state of the optimizer
previous_results, pending_configs, pending_configs_free = read(
optimization_dir, serializer, logger, do_lock=False
)

def _sample_config(
previous_results,
pending_configs,
pending_configs_free,
optimization_dir,
sampler,
serializer,
logger,
pre_load_hooks,
):
base_result_directory = optimization_dir / "results"

logger.debug("Sampling a new configuration")
Expand Down Expand Up @@ -458,25 +469,29 @@ def metahyper_run(

evaluations_in_this_run = 0
while True:
if max_evaluations_total is not None and _check_max_evaluations(
optimization_dir,
max_evaluations_total,
serializer,
logger,
continue_until_max_evaluation_completed,
):
logger.info("Maximum total evaluations is reached, shutting down")
break

if (
max_evaluations_per_run is not None
and evaluations_in_this_run >= max_evaluations_per_run
):
logger.info("Maximum evaluations per run is reached, shutting down")
break

if decision_locker.acquire_lock():
try:
previous_results, pending_configs, pending_configs_free = read(
optimization_dir, serializer, logger, do_lock=False
)
if max_evaluations_total is not None and _check_max_evaluations(
previous_results,
pending_configs,
pending_configs_free,
logger,
max_evaluations_total,
continue_until_max_evaluation_completed,
):
logger.info("Maximum total evaluations is reached, shutting down")
break

if (
max_evaluations_per_run is not None
and evaluations_in_this_run >= max_evaluations_per_run
):
logger.info("Maximum evaluations per run is reached, shutting down")
break

with sampler.using_state(sampler_state_file, serializer):
if sampler.budget is not None:
if sampler.used_budget >= sampler.budget:
Expand All @@ -488,7 +503,14 @@ def metahyper_run(
pipeline_directory,
previous_pipeline_directory,
) = _sample_config(
optimization_dir, sampler, serializer, logger, pre_load_hooks
previous_results,
pending_configs,
pending_configs_free,
optimization_dir,
sampler,
serializer,
logger,
pre_load_hooks,
)
# Storing the config details in ConfigInRun
ConfigInRun.store_in_run_data(
Expand Down Expand Up @@ -556,6 +578,7 @@ def metahyper_run(
)
else:
logger.info(f"Finished evaluating config {config_id}")
ConfigInRun.empty_config_data()
finally:
config_locker.release_lock()

Expand Down
3 changes: 3 additions & 0 deletions neps/status/status.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import time
from pathlib import Path
from typing import Any

Expand Down Expand Up @@ -313,6 +314,8 @@ def _save_data_to_csv(
finally:
locker.release_lock()
should_break = True
else:
time.sleep(3)


def post_run_csv(root_directory: str | Path, logger=None) -> None:
Expand Down
93 changes: 59 additions & 34 deletions tests/test_metahyper/test_locking.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
import concurrent.futures
import re
import shutil
import subprocess
from pathlib import Path

import numpy as np
import pandas as pd
import pytest
from more_itertools import first_true


def launch_example_processes(n_workers: int = 3) -> list:
processes = []
for _ in range(n_workers):
processes.append(
subprocess.Popen( # pylint: disable=consider-using-with
"python -m neps_examples.basic_usage.hyperparameters && python -m neps_examples.basic_usage.analyse",
stdout=subprocess.PIPE,
shell=True,
text=True,
)
)
return processes
def run_worker():
command = "python -m neps_examples.basic_usage.hyperparameters && python -m neps_examples.basic_usage.analyse"

# Run the command and capture the output
result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, text=True)

# Return stdout and return code
return result.stdout, result.returncode


@pytest.mark.metahyper
def test_filelock() -> None:
def test_filelock(n_workers=2) -> None:
"""Test that the filelocking method of parallelization works as intended."""
# Note: Not using tmpdir
#
Expand All @@ -39,27 +37,34 @@ def test_filelock() -> None:
results_dir = Path("results") / "hyperparameters_example" / "results"
try:
assert not results_dir.exists()
# Wait for them
p_list = launch_example_processes(n_workers=2)
for p in p_list:
p.wait()
out, _ = p.communicate()
lines = out.splitlines()
exit_codes = []
with concurrent.futures.ProcessPoolExecutor(max_workers=n_workers) as executor:
# Submit each worker function to the concurrent executor
futures = [executor.submit(run_worker) for _ in range(n_workers)]

for future in concurrent.futures.as_completed(futures):
stdout, return_code = future.result()

pending_re = r"#Pending configs with worker:\s+(\d+)"
eval_re = r"#Evaluated configs:\s+(\d+)"
exit_codes.append(return_code)
lines = stdout.splitlines()

evaluated = first_true(re.match(eval_re, l) for l in lines) # noqa
pending = first_true(re.match(pending_re, l) for l in lines) # noqa
pending_re = r"#Pending configs with worker:\s+(\d+)"
eval_re = r"#Evaluated configs:\s+(\d+)"

assert evaluated is not None
assert pending is not None
evaluated = first_true(re.match(eval_re, l) for l in lines) # noqa
pending = first_true(re.match(pending_re, l) for l in lines) # noqa

evaluated_configs = int(evaluated.groups()[0])
pending_configs = int(pending.groups()[0])
assert evaluated is not None
assert pending is not None

# Make sure the evaluated configs and the ones pending add up to 15
assert evaluated_configs + pending_configs == 15
evaluated_configs = int(evaluated.groups()[0])
pending_configs = int(pending.groups()[0])

# Make sure the evaluated configs and the ones pending add up to 15
assert evaluated_configs + pending_configs == 15

# Make sure all processes don't fail
assert np.array_equal(exit_codes, np.zeros(n_workers))

# Make sure there are 15 completed configurations
expected = sorted(f"config_{i}" for i in range(1, 16))
Expand All @@ -74,22 +79,42 @@ def test_filelock() -> None:


@pytest.mark.summary_csv
def test_summary_csv():
def test_summary_csv(n_workers=2):
# Testing the csv files output.
summary_dir = Path("results") / "hyperparameters_example" / "summary_csv"
try:
if not summary_dir.exists():
p_list = launch_example_processes(n_workers=2)
for p in p_list:
p.wait()
assert not summary_dir.exists()
exit_codes = []
with concurrent.futures.ProcessPoolExecutor(max_workers=n_workers) as executor:
# Submit each worker function to the concurrent executor
futures = [executor.submit(run_worker) for _ in range(n_workers)]

# Wait for all workers to complete
done, _ = concurrent.futures.wait(
futures, return_when=concurrent.futures.ALL_COMPLETED
)

for future in done:
_, return_code = future.result()
exit_codes.append(return_code)

# Make sure all processes don't fail
assert np.array_equal(exit_codes, np.zeros(n_workers))

# Make sure the directory is created
assert summary_dir.is_dir()

run_data_df = pd.read_csv(summary_dir / "run_status.csv")
run_data_df.set_index("description", inplace=True)
num_evaluated_configs_csv = run_data_df.loc["num_evaluated_configs", "value"]
# Make sure all configs are evaluated (expected)
assert num_evaluated_configs_csv == 15

config_data_df = pd.read_csv(summary_dir / "config_data.csv")
# Make sure the total number of rows in our csv is equal to evaluated configs
assert config_data_df.shape[0] == 15

# Make sure that the status of all config is complete, hence all are evaluated
assert (config_data_df["status"] == "complete").all()
except Exception as e:
raise e
Expand Down