Skip to content

Commit

Permalink
add support for single proc runs
Browse files Browse the repository at this point in the history
  • Loading branch information
danlessa committed Dec 14, 2023
1 parent f5bae49 commit 015e12b
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 33 deletions.
77 changes: 51 additions & 26 deletions cadCAD/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts
from cadCAD.engine.simulation import Executor as SimExecutor
from cadCAD.engine.execution import single_proc_exec, parallelize_simulations, local_simulations
from cadCAD.types import *

VarDictType = Dict[str, List[Any]]
StatesListsType = List[Dict[str, Any]]
Expand All @@ -24,6 +25,17 @@ class ExecutionMode:
multi_proc = 'multi_proc'


def auto_mode_switcher(config_amt: int):
try:
if config_amt == 1:
return ExecutionMode.single_mode, single_proc_exec
elif (config_amt > 1):
return ExecutionMode.multi_mode, parallelize_simulations
except AttributeError:
if config_amt < 1:
raise ValueError('N must be >= 1!')


class ExecutionContext:
def __init__(self, context=ExecutionMode.local_mode, method=None, additional_objs=None) -> None:
self.name = context
Expand All @@ -39,15 +51,15 @@ def distroduce_proc(
ExpIDs,
SubsetIDs,
SubsetWindows,
configured_n, # exec_method,
configured_n, # exec_method,
sc, additional_objs=additional_objs
):
return method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs,
ExpIDs,
SubsetIDs,
SubsetWindows,
configured_n, # exec_method,
configured_n, # exec_method,
sc, additional_objs
)

Expand All @@ -56,8 +68,8 @@ def distroduce_proc(

class Executor:
def __init__(self,
exec_context: ExecutionContext, configs: List[Configuration], sc=None, empty_return=False
) -> None:
exec_context: ExecutionContext, configs: List[Configuration], sc=None, empty_return=False
) -> None:
self.sc = sc
self.SimExecutor = SimExecutor
self.exec_method = exec_context.method
Expand All @@ -70,7 +82,8 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]:
return [], [], []

config_proc = Processor()
create_tensor_field = TensorFieldReport(config_proc).create_tensor_field
create_tensor_field = TensorFieldReport(
config_proc).create_tensor_field

sessions = []
var_dict_list, states_lists = [], []
Expand Down Expand Up @@ -105,18 +118,30 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]:
var_dict_list.append(x.sim_config['M'])
states_lists.append([x.initial_state])
eps.append(list(x.exogenous_states.values()))
configs_structs.append(config_proc.generate_config(x.initial_state, x.partial_state_update_blocks, eps[config_idx]))
configs_structs.append(config_proc.generate_config(
x.initial_state, x.partial_state_update_blocks, eps[config_idx]))
env_processes_list.append(x.env_processes)
partial_state_updates.append(x.partial_state_update_blocks)
sim_executors.append(SimExecutor(x.policy_ops).simulation)

config_idx += 1

def get_final_dist_results(simulations, psus, eps, sessions):
tensor_fields = [create_tensor_field(psu, ep) for psu, ep in list(zip(psus, eps))]
remote_threshold = 100
config_amt = len(self.configs)

def get_final_dist_results(simulations: List[StateHistory],
psus: List[StateUpdateBlocks],
eps,
sessions: List[SessionDict]):
tensor_fields = [create_tensor_field(
psu, ep) for psu, ep in list(zip(psus, eps))]
return simulations, tensor_fields, sessions

def get_final_results(simulations, psus, eps, sessions, remote_threshold):
def get_final_results(simulations: List[StateHistory],
psus: List[StateUpdateBlocks],
eps,
sessions: List[SessionDict],
remote_threshold: int):
flat_timesteps, tensor_fields = [], []
for sim_result, psu, ep in list(zip(simulations, psus, eps)):
flat_timesteps.append(flatten(sim_result))
Expand All @@ -128,40 +153,40 @@ def get_final_results(simulations, psus, eps, sessions, remote_threshold):
elif config_amt > 1:
return flat_simulations, tensor_fields, sessions

remote_threshold = 100
config_amt = len(self.configs)

def auto_mode_switcher(config_amt):
try:
if config_amt == 1:
return ExecutionMode.single_mode, single_proc_exec
elif (config_amt > 1):
return ExecutionMode.multi_mode, parallelize_simulations
except AttributeError:
if config_amt < 1:
raise ValueError('N must be >= 1!')

final_result = None
original_N = len(configs_as_dicts(self.configs))
if self.exec_context != ExecutionMode.distributed:
# Consider Legacy Support
if self.exec_context != ExecutionMode.local_mode:
self.exec_context, self.exec_method = auto_mode_switcher(config_amt)
if self.exec_context == ExecutionMode.local_mode:
self.exec_context, self.exec_method = auto_mode_switcher(
config_amt)
elif self.exec_context == ExecutionMode.single_mode or self.exec_context == ExecutionMode.single_proc:
self.exec_context, self.exec_method = ExecutionMode.single_mode, single_proc_exec
elif self.exec_context == ExecutionMode.multi_mode or self.exec_context == ExecutionMode.multi_proc:
if config_amt == 1:
raise ValueError("Multi mode must have at least 2 configs")
else:
self.exec_context, self.exec_method = ExecutionMode.multi_mode, parallelize_simulations
else:
raise ValueError("Invalid execution mode specified")


print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs,
ExpIDs, SubsetIDs, SubsetWindows, original_N
)

final_result = get_final_results(simulations_results, partial_state_updates, eps, sessions, remote_threshold)
final_result = get_final_results(
simulations_results, partial_state_updates, eps, sessions, remote_threshold)
elif self.exec_context == ExecutionMode.distributed:
print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts,
SimIDs, RunIDs, ExpIDs, SubsetIDs, SubsetWindows, original_N, self.sc
)
final_result = get_final_dist_results(simulations_results, partial_state_updates, eps, sessions)
final_result = get_final_dist_results(
simulations_results, partial_state_updates, eps, sessions)

t2 = time()
print(f"Total execution time: {t2 - t1 :.2f}s")
Expand Down
13 changes: 12 additions & 1 deletion cadCAD/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import TypedDict, Callable, Union, Dict, List, Tuple, Iterator
from collections import deque

State = Dict[str, object]
Parameters = Dict[str, object]
Expand Down Expand Up @@ -34,4 +35,14 @@ class ConfigurationDict(TypedDict):


ExecutorFunction = Callable[[Parameters, StateHistory, StateUpdateBlocks, EnvProcesses, TimeSeq, SimulationID, Run, SubsetID, SubsetWindow, N_Runs], object]
ExecutionParameter = Tuple[ExecutorFunction, Parameters, StateHistory, StateUpdateBlocks, EnvProcesses, TimeSeq, SimulationID, Run, SubsetID, SubsetWindow, N_Runs]
ExecutionParameter = Tuple[ExecutorFunction, Parameters, StateHistory, StateUpdateBlocks, EnvProcesses, TimeSeq, SimulationID, Run, SubsetID, SubsetWindow, N_Runs]


class SessionDict(TypedDict):
user_id: str
experiment_id: int
session_id: str
simulation_id: int
run_id: int
subset_id: int
subset_window: deque
24 changes: 18 additions & 6 deletions testing/test_single_proc_param_sweep.py → testing/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd # type: ignore
import types
import inspect
import pytest

def describe_or_return(v: object) -> object:
"""
Expand Down Expand Up @@ -163,21 +164,32 @@ def sweeped(params: Parameters, substep: Substep, history: StateHistory, state:


def test_mc_sweep_experiment():
experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS))
experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.local_mode)
experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.single_mode)
experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.multi_mode)

def test_unique_sweep_experiment():
experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS))
experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.local_mode)
experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.single_mode)
experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.multi_mode)

def test_mc_single_experiment():
experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS))
experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.local_mode)
experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.single_mode)
experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.multi_mode)

def test_unique_single_experiment():
experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS))
experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.local_mode)
experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.single_mode)
with pytest.raises(ValueError) as e_info:
experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.multi_mode)



def experiment_assertions(exp):
exec_context = ExecutionContext(ExecutionMode().single_mode)
def experiment_assertions(exp, mode=None):
if mode == None:
mode = ExecutionMode().local_mode
exec_context = ExecutionContext(mode)
executor = Executor(exec_context=exec_context, configs=exp.configs)
(records, tensor_field, _) = executor.execute()
df = drop_substeps(assign_params(pd.DataFrame(records), exp.configs))
Expand Down

0 comments on commit 015e12b

Please sign in to comment.