Skip to content

Commit

Permalink
docs(example): Signal XY example
Browse files Browse the repository at this point in the history
  • Loading branch information
epage committed Aug 7, 2017
1 parent 51e394e commit 8bc0344
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 0 deletions.
112 changes: 112 additions & 0 deletions nixnet_examples/can_signal_xy_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import datetime
import pprint
import six
import sys
import time

import nixnet
from nixnet import constants


pp = pprint.PrettyPrinter(indent=4)


def convert_timestamp(timestamp):
system_epoch = time.gmtime(0)
system_epock_datetime = datetime.datetime(system_epoch.tm_year, system_epoch.tm_mon, system_epoch.tm_mday)
xnet_epoch_datetime = datetime.datetime(1601, 1, 1)
delta = system_epock_datetime - xnet_epoch_datetime
date = datetime.datetime.fromtimestamp(timestamp * 100e-9) - delta
return date


def convert_datetime(date):
system_epoch = time.gmtime(0)
system_epock_datetime = datetime.datetime(system_epoch.tm_year, system_epoch.tm_mon, system_epoch.tm_mday)
xnet_epoch_datetime = datetime.datetime(1601, 1, 1)
delta = system_epock_datetime - xnet_epoch_datetime
timestamp = (date + delta - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1) * 100e9
return int(timestamp)


def main():
database_name = 'NIXNET_example'
cluster_name = 'CAN_Cluster'
input_signals = ['CANEventSignal1', 'CANEventSignal2']
output_signals = ['CANEventSignal1', 'CANEventSignal2']
interface1 = 'CAN1'
interface2 = 'CAN2'

with nixnet.SignalInXYSession(
interface1,
database_name,
cluster_name,
input_signals) as input_session:
with nixnet.SignalOutXYSession(
interface2,
database_name,
cluster_name,
output_signals) as output_session:
terminated_cable = six.moves.input('Are you using a terminated cable (Y or N)? ')
if terminated_cable.lower() == "y":
input_session.intf.can_term = constants.CanTerm.ON
output_session.intf.can_term = constants.CanTerm.OFF
elif terminated_cable.lower() == "n":
input_session.intf.can_term = constants.CanTerm.ON
output_session.intf.can_term = constants.CanTerm.ON
else:
print("Unrecognised input ({}), assuming 'n'".format(terminated_cable))
input_session.intf.can_term = constants.CanTerm.ON
output_session.intf.can_term = constants.CanTerm.ON

# Start the input session manually to make sure that the first
# signal value sent before the initial read will be received.
input_session.start()

out_waveforms = []
for out_signal in output_signals:
user_value = six.moves.input('Enter "{}" signal values [float, ...]: '.format(out_signal))
try:
out_waveforms.append([float(x.strip()) for x in user_value.split(",")])
except ValueError:
out_waveforms.append([24.5343, 77.0129])
print('Unrecognized input ({}). Setting waveform to {}'.format(user_value, out_waveforms[-1]))

print('The same values should be received. Press q to quit')
i = 0
while True:
for waveform_index, waveform in enumerate(out_waveforms):
for value_index, value in enumerate(waveform):
out_waveforms[waveform_index][value_index] = value + i
output_session.signals.write(out_waveforms)
print('Sent signal values: {}'.format(pp.pformat(out_waveforms)))

# Wait 5 s and then read the received values.
# They should be the same as the ones sent.
time_limit = convert_datetime(datetime.datetime.now() + datetime.timedelta(seconds=5))

signals = input_session.signals.read(len(out_waveforms[0]), time_limit)
print('Received signals:')
for signal_values in signals:
signal_values = [(value, convert_timestamp(timestamp)) for (value, timestamp) in signal_values]
print(' {}', signal_values)

i += 1
for waveform in out_waveforms:
if max(waveform) + i > sys.float_info.max:
i = 0
break

inp = six.moves.input('Hit enter to continue (q to quit): ')
if inp.lower() == 'q':
break

print('Data acquisition stopped.')


if __name__ == '__main__':
main()
18 changes: 18 additions & 0 deletions tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from nixnet_examples import can_frame_stream_io
from nixnet_examples import can_signal_conversion
from nixnet_examples import can_signal_single_point_io
from nixnet_examples import can_signal_xy_io


MockXnetLibrary = mock.create_autospec(_cfuncs.XnetLibrary, spec_set=True, instance=True)
Expand All @@ -24,6 +25,8 @@
MockXnetLibrary.nx_read_frame.return_value = _ctypedefs.u32(0)
MockXnetLibrary.nx_write_signal_single_point.return_value = _ctypedefs.u32(0)
MockXnetLibrary.nx_read_signal_single_point.return_value = _ctypedefs.u32(0)
MockXnetLibrary.nx_write_signal_xy.return_value = _ctypedefs.u32(0)
MockXnetLibrary.nx_read_signal_xy.return_value = _ctypedefs.u32(0)
MockXnetLibrary.nx_convert_frames_to_signals_single_point.return_value = _ctypedefs.u32(0)
MockXnetLibrary.nx_convert_signals_to_frames_single_point.return_value = _ctypedefs.u32(0)
MockXnetLibrary.nx_stop.return_value = _ctypedefs.u32(0)
Expand Down Expand Up @@ -84,6 +87,21 @@ def test_can_signal_single_point_empty_session(input_values):
can_signal_single_point_io.main()


@pytest.mark.parametrize("input_values", [
['y', '1, 2', '3, 4', 'q'],
['n', '1, 2', '3, 4', 'q'],
['invalid', '1, 2', '3, 4', 'q'],
['y', '1', '3, 4', 'q'],
['y', 'invalid', '3, 4', 'q'],
['y', 'invalid', '3, 4'] + 0x100 * [''] + ['q'],
])
@mock.patch('nixnet._cfuncs.lib', MockXnetLibrary)
@mock.patch('time.sleep', lambda time: None)
def test_can_signal_xy_empty_session(input_values):
with mock.patch('six.moves.input', six_input(input_values)):
can_signal_xy_io.main()


@pytest.mark.parametrize("input_values", [
['1, 2'],
['1'],
Expand Down

0 comments on commit 8bc0344

Please sign in to comment.