From 015e12b6859a5b6adf60d5848e2aabf7477060a6 Mon Sep 17 00:00:00 2001 From: Danilo Lessa Bernardineli Date: Thu, 14 Dec 2023 18:51:30 -0300 Subject: [PATCH] add support for single proc runs --- cadCAD/engine/__init__.py | 77 ++++++++++++------- cadCAD/types.py | 13 +++- ...ingle_proc_param_sweep.py => test_runs.py} | 24 ++++-- 3 files changed, 81 insertions(+), 33 deletions(-) rename testing/{test_single_proc_param_sweep.py => test_runs.py} (83%) diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 57d84bc7..c154e72f 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -7,6 +7,7 @@ from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts from cadCAD.engine.simulation import Executor as SimExecutor from cadCAD.engine.execution import single_proc_exec, parallelize_simulations, local_simulations +from cadCAD.types import * VarDictType = Dict[str, List[Any]] StatesListsType = List[Dict[str, Any]] @@ -24,6 +25,17 @@ class ExecutionMode: multi_proc = 'multi_proc' +def auto_mode_switcher(config_amt: int): + try: + if config_amt == 1: + return ExecutionMode.single_mode, single_proc_exec + elif (config_amt > 1): + return ExecutionMode.multi_mode, parallelize_simulations + except AttributeError: + if config_amt < 1: + raise ValueError('N must be >= 1!') + + class ExecutionContext: def __init__(self, context=ExecutionMode.local_mode, method=None, additional_objs=None) -> None: self.name = context @@ -39,7 +51,7 @@ def distroduce_proc( ExpIDs, SubsetIDs, SubsetWindows, - configured_n, # exec_method, + configured_n, # exec_method, sc, additional_objs=additional_objs ): return method( @@ -47,7 +59,7 @@ def distroduce_proc( ExpIDs, SubsetIDs, SubsetWindows, - configured_n, # exec_method, + configured_n, # exec_method, sc, additional_objs ) @@ -56,8 +68,8 @@ def distroduce_proc( class Executor: def __init__(self, - exec_context: ExecutionContext, configs: List[Configuration], sc=None, empty_return=False - ) -> None: + exec_context: ExecutionContext, configs: List[Configuration], sc=None, empty_return=False + ) -> None: self.sc = sc self.SimExecutor = SimExecutor self.exec_method = exec_context.method @@ -70,7 +82,8 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]: return [], [], [] config_proc = Processor() - create_tensor_field = TensorFieldReport(config_proc).create_tensor_field + create_tensor_field = TensorFieldReport( + config_proc).create_tensor_field sessions = [] var_dict_list, states_lists = [], [] @@ -105,18 +118,30 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]: var_dict_list.append(x.sim_config['M']) states_lists.append([x.initial_state]) eps.append(list(x.exogenous_states.values())) - configs_structs.append(config_proc.generate_config(x.initial_state, x.partial_state_update_blocks, eps[config_idx])) + configs_structs.append(config_proc.generate_config( + x.initial_state, x.partial_state_update_blocks, eps[config_idx])) env_processes_list.append(x.env_processes) partial_state_updates.append(x.partial_state_update_blocks) sim_executors.append(SimExecutor(x.policy_ops).simulation) config_idx += 1 - def get_final_dist_results(simulations, psus, eps, sessions): - tensor_fields = [create_tensor_field(psu, ep) for psu, ep in list(zip(psus, eps))] + remote_threshold = 100 + config_amt = len(self.configs) + + def get_final_dist_results(simulations: List[StateHistory], + psus: List[StateUpdateBlocks], + eps, + sessions: List[SessionDict]): + tensor_fields = [create_tensor_field( + psu, ep) for psu, ep in list(zip(psus, eps))] return simulations, tensor_fields, sessions - def get_final_results(simulations, psus, eps, sessions, remote_threshold): + def get_final_results(simulations: List[StateHistory], + psus: List[StateUpdateBlocks], + eps, + sessions: List[SessionDict], + remote_threshold: int): flat_timesteps, tensor_fields = [], [] for sim_result, psu, ep in list(zip(simulations, psus, eps)): flat_timesteps.append(flatten(sim_result)) @@ -128,25 +153,23 @@ def get_final_results(simulations, psus, eps, sessions, remote_threshold): elif config_amt > 1: return flat_simulations, tensor_fields, sessions - remote_threshold = 100 - config_amt = len(self.configs) - - def auto_mode_switcher(config_amt): - try: - if config_amt == 1: - return ExecutionMode.single_mode, single_proc_exec - elif (config_amt > 1): - return ExecutionMode.multi_mode, parallelize_simulations - except AttributeError: - if config_amt < 1: - raise ValueError('N must be >= 1!') - final_result = None original_N = len(configs_as_dicts(self.configs)) if self.exec_context != ExecutionMode.distributed: # Consider Legacy Support - if self.exec_context != ExecutionMode.local_mode: - self.exec_context, self.exec_method = auto_mode_switcher(config_amt) + if self.exec_context == ExecutionMode.local_mode: + self.exec_context, self.exec_method = auto_mode_switcher( + config_amt) + elif self.exec_context == ExecutionMode.single_mode or self.exec_context == ExecutionMode.single_proc: + self.exec_context, self.exec_method = ExecutionMode.single_mode, single_proc_exec + elif self.exec_context == ExecutionMode.multi_mode or self.exec_context == ExecutionMode.multi_proc: + if config_amt == 1: + raise ValueError("Multi mode must have at least 2 configs") + else: + self.exec_context, self.exec_method = ExecutionMode.multi_mode, parallelize_simulations + else: + raise ValueError("Invalid execution mode specified") + print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( @@ -154,14 +177,16 @@ def auto_mode_switcher(config_amt): ExpIDs, SubsetIDs, SubsetWindows, original_N ) - final_result = get_final_results(simulations_results, partial_state_updates, eps, sessions, remote_threshold) + final_result = get_final_results( + simulations_results, partial_state_updates, eps, sessions, remote_threshold) elif self.exec_context == ExecutionMode.distributed: print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs, ExpIDs, SubsetIDs, SubsetWindows, original_N, self.sc ) - final_result = get_final_dist_results(simulations_results, partial_state_updates, eps, sessions) + final_result = get_final_dist_results( + simulations_results, partial_state_updates, eps, sessions) t2 = time() print(f"Total execution time: {t2 - t1 :.2f}s") diff --git a/cadCAD/types.py b/cadCAD/types.py index 382b62fa..78726b5a 100644 --- a/cadCAD/types.py +++ b/cadCAD/types.py @@ -1,4 +1,5 @@ from typing import TypedDict, Callable, Union, Dict, List, Tuple, Iterator +from collections import deque State = Dict[str, object] Parameters = Dict[str, object] @@ -34,4 +35,14 @@ class ConfigurationDict(TypedDict): ExecutorFunction = Callable[[Parameters, StateHistory, StateUpdateBlocks, EnvProcesses, TimeSeq, SimulationID, Run, SubsetID, SubsetWindow, N_Runs], object] -ExecutionParameter = Tuple[ExecutorFunction, Parameters, StateHistory, StateUpdateBlocks, EnvProcesses, TimeSeq, SimulationID, Run, SubsetID, SubsetWindow, N_Runs] \ No newline at end of file +ExecutionParameter = Tuple[ExecutorFunction, Parameters, StateHistory, StateUpdateBlocks, EnvProcesses, TimeSeq, SimulationID, Run, SubsetID, SubsetWindow, N_Runs] + + +class SessionDict(TypedDict): + user_id: str + experiment_id: int + session_id: str + simulation_id: int + run_id: int + subset_id: int + subset_window: deque \ No newline at end of file diff --git a/testing/test_single_proc_param_sweep.py b/testing/test_runs.py similarity index 83% rename from testing/test_single_proc_param_sweep.py rename to testing/test_runs.py index b850355a..2c33c2b4 100644 --- a/testing/test_single_proc_param_sweep.py +++ b/testing/test_runs.py @@ -6,6 +6,7 @@ import pandas as pd # type: ignore import types import inspect +import pytest def describe_or_return(v: object) -> object: """ @@ -163,21 +164,32 @@ def sweeped(params: Parameters, substep: Substep, history: StateHistory, state: def test_mc_sweep_experiment(): - experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS)) + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.local_mode) + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.single_mode) + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.multi_mode) def test_unique_sweep_experiment(): - experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS)) + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.local_mode) + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.single_mode) + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.multi_mode) def test_mc_single_experiment(): - experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS)) + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.local_mode) + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.single_mode) + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.multi_mode) def test_unique_single_experiment(): - experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS)) + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.local_mode) + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.single_mode) + with pytest.raises(ValueError) as e_info: + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.multi_mode) -def experiment_assertions(exp): - exec_context = ExecutionContext(ExecutionMode().single_mode) +def experiment_assertions(exp, mode=None): + if mode == None: + mode = ExecutionMode().local_mode + exec_context = ExecutionContext(mode) executor = Executor(exec_context=exec_context, configs=exp.configs) (records, tensor_field, _) = executor.execute() df = drop_substeps(assign_params(pd.DataFrame(records), exp.configs))