From 51228a70c4b4c6dcc29f22ead12a0dcffc19e03b Mon Sep 17 00:00:00 2001 From: Robin Andersson Date: Tue, 12 Nov 2024 10:59:44 -0500 Subject: [PATCH] improved code and added test for next start index --- src/common/io.py | 24 +++++++---- tests/test_io.py | 103 ++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 114 insertions(+), 13 deletions(-) diff --git a/src/common/io.py b/src/common/io.py index d5966382..7b20c208 100644 --- a/src/common/io.py +++ b/src/common/io.py @@ -1630,14 +1630,24 @@ def get_variables_data(self, for name in names: trajectories[name] = self._get_variable_data_as_trajectory(name, time, start_index, stop_index) - largest_trajectory_length = -1 - for v, t in trajectories.items(): - largest_trajectory_length = max(largest_trajectory_length, len(t.x)) + largest_trajectory_length = self._find_max_trajectory_length(trajectories) new_start_index = (start_index + largest_trajectory_length) if trajectories else start_index self._last_set_of_indices = (start_index, stop_index) # update them before we exit return trajectories, new_start_index + def _find_max_trajectory_length(self, trajectories): + """ + Given a dict of trajectories, find the length of the largest trajectory + among the set of continuous variables. We disregard parameters/constants since they are not stored + with the same amount of data points as trajectories for continuous variables. + """ + length = 0 + for var_name, trajectory in trajectories.items(): + if self.is_variable(var_name): # since we only consider continuous variables + length = max(length, len(trajectory.x)) + return length + def _calculate_events_and_steps(self, name): if name in self._data_3: return self._data_3[name] @@ -1712,15 +1722,13 @@ def is_variable(self, name): return True elif '{}.'.format(DiagnosticsBase.calculated_diagnostics['nbr_state_limits_step']['name']) in name: return True + variable_index = self.get_variable_index(name) data_mat = self._dataInfo[0][variable_index] - if data_mat<1: + if data_mat < 1: data_mat = 1 - if data_mat == 1: - return False - else: - return True + return data_mat != 1 def is_negated(self, name): """ diff --git a/tests/test_io.py b/tests/test_io.py index 746117e0..c9e270ef 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -24,11 +24,30 @@ from collections import OrderedDict from pyfmi import testattr -from pyfmi.fmi import FMUException, FMUModelME2 -from pyfmi.common.io import (ResultHandler, ResultDymolaTextual, ResultDymolaBinary, JIOError, ResultSizeError, - ResultHandlerCSV, ResultCSVTextual, ResultHandlerBinaryFile, ResultHandlerFile) -from pyfmi.common.io import get_result_handler -from pyfmi.common.diagnostics import DIAGNOSTICS_PREFIX, setup_diagnostics_variables +from pyfmi.fmi import ( + FMUException, + FMUModelME2, + FMI2_PARAMETER, + FMI2_CONSTANT, + FMI2_LOCAL +) +from pyfmi.common.io import ( + ResultHandler, + ResultDymolaTextual, + ResultDymolaBinary, + JIOError, + ResultSizeError, + ResultHandlerCSV, + ResultCSVTextual, + ResultHandlerBinaryFile, + ResultHandlerFile, + Trajectory, + get_result_handler +) +from pyfmi.common.diagnostics import ( + DIAGNOSTICS_PREFIX, + setup_diagnostics_variables +) import pyfmi.fmi as fmi from pyfmi.tests.test_util import Dummy_FMUModelME1, Dummy_FMUModelME2, Dummy_FMUModelCS2 @@ -1695,6 +1714,80 @@ def test_csv_options_cs2(self): class TestResultDymolaBinary: + def test_next_start_index(self): + """ + Test that calculation of the next start index works as expected. + + This test sets up a dummy FMU and dummy trajectories since we need + trajectories of uneven lengths. + + """ + # Begin by setting up minimal required environment in order to perform the test + fmu = Dummy_FMUModelME2([], os.path.join(file_path, "files", "FMUs", "XML", "ME2.0", "CoupledClutches.fmu"), + _connect_dll=False) + + result_handler = ResultHandlerBinaryFile(fmu) + + opts = fmu.simulate_options() + opts["result_handling"] = "binary" + opts["result_handler"] = result_handler + + fmu.setup_experiment() + fmu.initialize() + opts["initialize"] = False + + result_handler.set_options(opts) # required in order to call simulation_start() + result_handler.initialize_complete() + result_handler.simulation_start() + + fmu.set('J4.phi', 1) # arbitrary + result_handler.integration_point() + rdb = ResultDymolaBinary(fmu.get_last_result_file(), allow_file_updates=True) + + # Actual test starts below + vars_to_test = [ + 'J1.J', # this is a parameter + 'clutch1.Backward' # this is a constant + ] + + # if this is not True, then the rest of test does not hold + assert vars_to_test[0] in result_handler.model.get_model_variables(causality = FMI2_PARAMETER).keys() + assert vars_to_test[1] in result_handler.model.get_model_variables(variability = FMI2_CONSTANT).keys() + assert 'J4.phi' in result_handler.model.states.keys() + + + for v in vars_to_test: + trajectories1 = { + 'J4.phi': Trajectory(np.array([]), np.array([])), + v: Trajectory(np.array([0]), np.array([1])) + } + + trajectories2 = { + 'J4.phi': Trajectory(np.array([0]), np.array([1])), + v: Trajectory(np.array([0, 1]), np.array([1, 1])) + } + + trajectories3 = { + 'J4.phi': Trajectory(np.array([0]), np.array([1])), + v: Trajectory(np.array([0]), np.array([1])) + } + + trajectories4 = { + 'J4.phi': Trajectory(np.array([0, 1]), np.array([1, 1])), + v: Trajectory(np.array([0]), np.array([1])) + } + + trajectories5 = { + 'J4.phi': Trajectory(np.array([0, 1, 2]), np.array([1, 1, 1])), + v: Trajectory(np.array([0]), np.array([1])) + } + + assert rdb._find_max_trajectory_length(trajectories1) == 0 + assert rdb._find_max_trajectory_length(trajectories2) == 1 + assert rdb._find_max_trajectory_length(trajectories3) == 1 + assert rdb._find_max_trajectory_length(trajectories4) == 2 + assert rdb._find_max_trajectory_length(trajectories5) == 3 + def _test_get_variables_data(self, dynamic_diagnostics: bool, nbr_of_calls: int, diag_data_ratio: int, vars_to_test: list, stop_index_function: callable, result_file_name: str) -> dict: """