Skip to content

Commit

Permalink
refactored perform_single_analysis and fixed documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
fxjung committed Feb 9, 2024
1 parent 3ad80ae commit 61bd951
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 75 deletions.
45 changes: 20 additions & 25 deletions doc/extras.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
Module ``extras``
=================

This package contains additional functionality that is not a core part of the simulator
but rather makes it more convenient to set up and use it. Currently this involves code
for performing multiple simulations while varying parameters ("parameter scan"), reading
and writing parameter sets and simulation output to and from disk as JSON/`JSON Lines`_.
In addition, convenience methods for easily creating graph transport spaces are
included.
This package contains additional functionality that is not a core part of the simulator but rather makes it more convenient to set up and use it. Currently this involves code for performing multiple simulations while varying parameters ("parameter scan"), reading and writing parameter sets and simulation output to and from disk as JSON/`JSON Lines`_. In addition, convenience methods for easily creating graph transport spaces are included.


Simulations and Parameter Scans
Expand Down Expand Up @@ -69,31 +64,31 @@ not specified through arguments. The order of precedence is, last taking highest
``default_base_params``, ``base_params``, ``zip_params``, ``product_params``.


Executing Simulations
Executing simulations
~~~~~~~~~~~~~~~~~~~~~

Simulations are executed when `SimulationSet.run()` is called. Independent simulations
are performed through executing `.perform_single_simulation()` for each parameter set
using multiprocessing. The events that are generated by the simulation are written to
disk in the `JSON Lines`_ format. The simulation parameters are also written to disk, in
separate JSON files. This includes all data necessary to perform the respective
simulation. For more detail, see :ref:`JSON IO`. For each simulation run, a unique
identfier is generated and the data is stored to ``<uuid>.jsonl`` for the events and
``<uuid>_params.json`` for the simulation parameters. The identifier hashes the
parameter set, thereby allowing to continue an interrupted simulation set run later.
The IDs generated can be retrieved using `SimulationSet.simulation_ids`. Alternatively the
filenames of the resulting JSONL/JSON files are also directly available through
`SimulationSet.param_paths` and `SimulationSet.result_paths`.
Simulations are executed when `SimulationSet.run()` is called. Independent simulations are performed through executing `.perform_single_simulation()` for each parameter set using multiprocessing. The events that are generated by the simulation are written to disk in the `JSON Lines`_ format. The simulation parameters are also written to disk, in separate JSON files. This includes all data necessary to perform the respective simulation. For more detail, see :ref:`JSON IO`. For each simulation run, a unique identfier is generated and the data is stored to ``<uuid>.jsonl`` for the events and ``<uuid>_params.json`` for the simulation parameters. The identifier hashes the parameter set, thereby allowing to continue an interrupted simulation set run later. The IDs generated can be retrieved using `SimulationSet.simulation_ids`. Alternatively the filenames of the resulting JSONL/JSON files are also directly available through `SimulationSet.param_paths` and `SimulationSet.event_paths`.


.. automodule:: ridepy.extras.simulation_set
:members: SimulationSet
.. autoclass:: SimulationSet

.. automodule:: ridepy.extras.simulation_set
:members: perform_single_simulation
.. automethod:: run

.. automodule:: ridepy.extras.simulation_set
:members: simulate_parameter_combinations
.. autoattribute:: simulation_ids

.. autoattribute:: param_paths

.. autoattribute:: event_paths

.. autofunction:: perform_single_simulation


Running analysis on the simulation results
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The simulation results can be automatically analyzed using the `ridepy.extras.analytics` module, storing the results to disk.

.. automethod:: SimulationSet.run_analytics

JSON IO
-------
Expand Down
3 changes: 0 additions & 3 deletions src/ridepy/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,6 @@ def analyze(
_, sys_quant = perform_single_analysis(
sim_id=simulation_id,
data_dir=output_directory,
update_existing=False,
compute_system_quantities=compute_system_quantities,
compute_vehicle_quantities=True,
)
system_quantities.append(sys_quant)

Expand Down
162 changes: 115 additions & 47 deletions src/ridepy/extras/simulation_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,56 @@ def get_params(directory: Path, sim_id: str, param_path_suffix: str = "_params.j
def perform_single_analysis(
sim_id: str,
data_dir: Path,
tasks_if_not_existent: Optional[Iterable[str]] = None,
tasks_if_existent: Optional[Iterable[str]] = None,
param_path_suffix: str = "_params.json",
event_path_suffix: str = ".jsonl",
update_existing: bool = False,
compute_system_quantities: bool = True,
compute_vehicle_quantities: bool = True,
stops_path_suffix: str = "_stops.pq",
requests_path_suffix: str = "_requests.pq",
vehicle_quantities_path_suffix: str = "_vehicle_quantities.pq",
) -> tuple[str, dict]:
) -> tuple[str, dict[str, Any]]:
"""
Compute stops, requests, vehicle quantities, and system quantities from
simulation events.
Parameters
----------
sim_id
Simulation ID
data_dir
Directory from which to read the events and to which to write the resulting
parquet output files
tasks_if_not_existent
Collection of tasks to perform if the respective output files do not exist.
Valid entries are 'stops', 'requests', 'vehicle_quantities', and
'system_quantities'. If None, everything is computed if not existent.
tasks_if_existent
Collection of tasks to perform even if the respective output files do exist,
i.e. tasks to recompute. Valid entries are 'stops', 'requests',
'vehicle_quantities', and 'system_quantities'. If None, nothing is recomputed.
param_path_suffix
Appending this suffix to the simulation ID yields the path to the JSON parameter
file.
event_path_suffix
Appending this suffix to the simulation ID yields the path to the JSONL
event file.
stops_path_suffix
Appending this suffix to the simulation ID yields the path to the parquet
stops file.
requests_path_suffix
Appending this suffix to the simulation ID yields the path to the parquet
requests file.
vehicle_quantities_path_suffix
Appending this suffix to the simulation ID yields the path to the \
parquet vehicle quantities file.
Returns
-------
sim_id
Simulation ID
system_quantities
Dictionary containing the computed system quantities as entries
"""
stops_path = make_file_path(sim_id, data_dir, stops_path_suffix)
requests_path = make_file_path(sim_id, data_dir, requests_path_suffix)
vehicle_quantities_path = make_file_path(
Expand All @@ -82,16 +123,27 @@ def perform_single_analysis(

tasks = set()

if update_existing:
tasks |= {"stops", "requests", "vehicle_quantities", "system_quantities"}
if not stops_path.exists():
tasks_if_not_existent = tasks_if_not_existent or {
"stops",
"requests",
"vehicle_quantities",
"system_quantities",
}
tasks_if_existent = tasks_if_existent or set()

if "stops" in tasks_if_not_existent and not stops_path.exists():
tasks.add("stops")
if not requests_path.exists():
if "requests" in tasks_if_not_existent and not requests_path.exists():
tasks.add("requests")
if compute_system_quantities:
tasks.add("system_quantities")
if compute_vehicle_quantities and not vehicle_quantities_path.exists():
if (
"vehicle_quantities" in tasks_if_not_existent
and not vehicle_quantities_path.exists()
):
tasks.add("vehicle_quantities")
if "system_quantities" in tasks_if_not_existent:
tasks.add("system_quantities")

tasks |= tasks_if_existent

system_quantities = {}
if tasks:
Expand Down Expand Up @@ -726,28 +778,40 @@ def __len__(self) -> int:

def run_analytics(
self,
update_existing: bool = False,
only_stops_and_requests: bool = False,
update_existing: Union[bool, list[str]] = False,
check_for_changes: bool = True,
stops_path_suffix: str = "_stops.pq",
requests_path_suffix: str = "_requests.pq",
only_stops_and_requests: bool = False, # only compute stops and requests
vehicle_quantities_path_suffix: str = "_vehicle_quantities.pq",
system_quantities_filename: str = "system_quantities.pq",
) -> None:
"""
Compute analytics from simulation events and store them to disk
in parquet format.
Parameters
----------
only_stops_and_requests
Only compute stops and requests, not vehicle and system quantities.
update_existing
Recompute existing outputs
Recompute existing outputs. If a list is given, only recompute
the list entries. Valid list items are 'system_quantities',
'vehicle_quantities', 'stops', and 'requests'.
check_for_changes
If True, only update if simulation ids have changed.
If False, do update in any case.
If True, only update system quantities if simulation ids have changed.
If False, do update system quantities in any case.
stops_path_suffix
Appending this suffix to the simulation ID yields the path to the parquet
stops file.
requests_path_suffix
only_stops_and_requests
Appending this suffix to the simulation ID yields the path to the parquet
requests file.
vehicle_quantities_path_suffix
Appending this suffix to the simulation ID yields the path to the parquet
vehicle quantities file.
system_quantities_filename
Filename of the parquet file to store the system quantities in.
"""
self.system_quantities_path = self.data_dir / system_quantities_filename

Expand All @@ -756,33 +820,38 @@ def run_analytics(
"no simulations have been run (simulation_ids empty)", UserWarning
)
else:
if only_stops_and_requests:
compute_system_quantities = compute_vehicle_quantities = False

# In any case, stops and requests have to be computed if they don't exist
tasks_if_not_existent = {"stops", "requests"}

if not only_stops_and_requests:
# Additionally, we are computing vehicle and system quantities, now,
# if they don't exist
tasks_if_not_existent |= {"vehicle_quantities", "system_quantities"}

if isinstance(update_existing, Iterable):
# If we have been handed a list of tasks to update, we only update these,
# not recomputing tasks that should not have be computed in the first place
tasks_if_existent = set(update_existing) & tasks_if_not_existent
elif update_existing == True:
tasks_if_existent = tasks_if_not_existent
elif update_existing == False:
tasks_if_existent = set()
else:
compute_vehicle_quantities = True

if self.system_quantities_path.exists():
# file exists, should we recompute?
if update_existing:
# we update existing files. do we still recompute
# if there are no changes?
if check_for_changes:
# we check for changes and only recompute if there are some
sqdf = pd.read_parquet(self.system_quantities_path)
compute_system_quantities = set(sqdf.index) != set(
self.simulation_ids
)
del sqdf
else:
# we don't care whether there are changes or not,
# thus we recompute in any case
compute_system_quantities = True
else:
# file exists and we are not updating, thus not recomputing
compute_system_quantities = False
else:
# file doesn't exist, we have to compute
compute_system_quantities = True
raise ValueError(f"Got invalid value for {update_existing=}")

if self.system_quantities_path.exists():
tasks_if_not_existent -= {"system_quantities"}
if check_for_changes:
# Currently, we only check for changes in the sense that new
# simulations with new ids have been performed and are thus missing from
# the system quantities output. For vehicle quantities, stops, and requests,
# check_for_changes does not apply, as we compute these in any case, should
# they be missing.
sqdf = pd.read_parquet(self.system_quantities_path)
if set(sqdf.index) == set(self.simulation_ids):
tasks_if_existent -= {"system_quantities"}
del sqdf

with loky.get_reusable_executor(max_workers=self.max_workers) as executor:
sim_ids, system_quantities = zip(
Expand All @@ -791,9 +860,8 @@ def run_analytics(
ft.partial(
perform_single_analysis,
data_dir=self.data_dir,
update_existing=update_existing,
compute_system_quantities=compute_system_quantities,
compute_vehicle_quantities=compute_vehicle_quantities,
tasks_if_not_existent=tasks_if_not_existent,
tasks_if_existent=tasks_if_existent,
param_path_suffix=self._param_path_suffix,
event_path_suffix=self._event_path_suffix,
stops_path_suffix=stops_path_suffix,
Expand All @@ -812,7 +880,7 @@ def run_analytics(
UserWarning,
)

if compute_system_quantities:
if "system_quantities" in tasks_if_not_existent | tasks_if_existent:
system_quantities_df = pd.DataFrame(system_quantities, index=sim_ids)
system_quantities_df.rename_axis("simulation_id", inplace=True)
system_quantities_df.to_parquet(self.system_quantities_path)

0 comments on commit 61bd951

Please sign in to comment.