From 8bc03445c3009ccbb2cc5648d2cfb823527de869 Mon Sep 17 00:00:00 2001 From: Ed Page Date: Fri, 4 Aug 2017 13:00:15 -0500 Subject: [PATCH] docs(example): Signal XY example --- nixnet_examples/can_signal_xy_io.py | 112 ++++++++++++++++++++++++++++ tests/test_examples.py | 18 +++++ 2 files changed, 130 insertions(+) create mode 100644 nixnet_examples/can_signal_xy_io.py diff --git a/nixnet_examples/can_signal_xy_io.py b/nixnet_examples/can_signal_xy_io.py new file mode 100644 index 0000000..f574ff0 --- /dev/null +++ b/nixnet_examples/can_signal_xy_io.py @@ -0,0 +1,112 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import datetime +import pprint +import six +import sys +import time + +import nixnet +from nixnet import constants + + +pp = pprint.PrettyPrinter(indent=4) + + +def convert_timestamp(timestamp): + system_epoch = time.gmtime(0) + system_epock_datetime = datetime.datetime(system_epoch.tm_year, system_epoch.tm_mon, system_epoch.tm_mday) + xnet_epoch_datetime = datetime.datetime(1601, 1, 1) + delta = system_epock_datetime - xnet_epoch_datetime + date = datetime.datetime.fromtimestamp(timestamp * 100e-9) - delta + return date + + +def convert_datetime(date): + system_epoch = time.gmtime(0) + system_epock_datetime = datetime.datetime(system_epoch.tm_year, system_epoch.tm_mon, system_epoch.tm_mday) + xnet_epoch_datetime = datetime.datetime(1601, 1, 1) + delta = system_epock_datetime - xnet_epoch_datetime + timestamp = (date + delta - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1) * 100e9 + return int(timestamp) + + +def main(): + database_name = 'NIXNET_example' + cluster_name = 'CAN_Cluster' + input_signals = ['CANEventSignal1', 'CANEventSignal2'] + output_signals = ['CANEventSignal1', 'CANEventSignal2'] + interface1 = 'CAN1' + interface2 = 'CAN2' + + with nixnet.SignalInXYSession( + interface1, + database_name, + cluster_name, + input_signals) as input_session: + with nixnet.SignalOutXYSession( + interface2, + database_name, + cluster_name, + output_signals) as output_session: + terminated_cable = six.moves.input('Are you using a terminated cable (Y or N)? ') + if terminated_cable.lower() == "y": + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.OFF + elif terminated_cable.lower() == "n": + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.ON + else: + print("Unrecognised input ({}), assuming 'n'".format(terminated_cable)) + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.ON + + # Start the input session manually to make sure that the first + # signal value sent before the initial read will be received. + input_session.start() + + out_waveforms = [] + for out_signal in output_signals: + user_value = six.moves.input('Enter "{}" signal values [float, ...]: '.format(out_signal)) + try: + out_waveforms.append([float(x.strip()) for x in user_value.split(",")]) + except ValueError: + out_waveforms.append([24.5343, 77.0129]) + print('Unrecognized input ({}). Setting waveform to {}'.format(user_value, out_waveforms[-1])) + + print('The same values should be received. Press q to quit') + i = 0 + while True: + for waveform_index, waveform in enumerate(out_waveforms): + for value_index, value in enumerate(waveform): + out_waveforms[waveform_index][value_index] = value + i + output_session.signals.write(out_waveforms) + print('Sent signal values: {}'.format(pp.pformat(out_waveforms))) + + # Wait 5 s and then read the received values. + # They should be the same as the ones sent. + time_limit = convert_datetime(datetime.datetime.now() + datetime.timedelta(seconds=5)) + + signals = input_session.signals.read(len(out_waveforms[0]), time_limit) + print('Received signals:') + for signal_values in signals: + signal_values = [(value, convert_timestamp(timestamp)) for (value, timestamp) in signal_values] + print(' {}', signal_values) + + i += 1 + for waveform in out_waveforms: + if max(waveform) + i > sys.float_info.max: + i = 0 + break + + inp = six.moves.input('Hit enter to continue (q to quit): ') + if inp.lower() == 'q': + break + + print('Data acquisition stopped.') + + +if __name__ == '__main__': + main() diff --git a/tests/test_examples.py b/tests/test_examples.py index 6fe9400..b8ed819 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -13,6 +13,7 @@ from nixnet_examples import can_frame_stream_io from nixnet_examples import can_signal_conversion from nixnet_examples import can_signal_single_point_io +from nixnet_examples import can_signal_xy_io MockXnetLibrary = mock.create_autospec(_cfuncs.XnetLibrary, spec_set=True, instance=True) @@ -24,6 +25,8 @@ MockXnetLibrary.nx_read_frame.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_write_signal_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_read_signal_single_point.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nx_write_signal_xy.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nx_read_signal_xy.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_convert_frames_to_signals_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_convert_signals_to_frames_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_stop.return_value = _ctypedefs.u32(0) @@ -84,6 +87,21 @@ def test_can_signal_single_point_empty_session(input_values): can_signal_single_point_io.main() +@pytest.mark.parametrize("input_values", [ + ['y', '1, 2', '3, 4', 'q'], + ['n', '1, 2', '3, 4', 'q'], + ['invalid', '1, 2', '3, 4', 'q'], + ['y', '1', '3, 4', 'q'], + ['y', 'invalid', '3, 4', 'q'], + ['y', 'invalid', '3, 4'] + 0x100 * [''] + ['q'], +]) +@mock.patch('nixnet._cfuncs.lib', MockXnetLibrary) +@mock.patch('time.sleep', lambda time: None) +def test_can_signal_xy_empty_session(input_values): + with mock.patch('six.moves.input', six_input(input_values)): + can_signal_xy_io.main() + + @pytest.mark.parametrize("input_values", [ ['1, 2'], ['1'],