diff --git a/.coveragerc b/.coveragerc index 7006680..c05a228 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,2 +1,2 @@ [report] -include = RPi/_GPIO.py +include = RPi/core.py diff --git a/Makefile b/Makefile index d6975b4..de60ce8 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,12 @@ +# Makefile to automate common testing procedures for python3-libgpiod-rpi +# By Joel Savitz +# This is free software, see LICENSE for details + all: test -test: unit-cov style -#@echo "MAKE TEST PASS" +test: test-cov style -unit-cov: +test-cov: @bash test-cov.sh -m && echo "FUNTIONAL PASS" || echo "FAILURE IN UNIT TEST" style: diff --git a/RPi/GPIO/__init__.py b/RPi/GPIO/__init__.py index 6e0178e..2383935 100644 --- a/RPi/GPIO/__init__.py +++ b/RPi/GPIO/__init__.py @@ -1,7 +1,49 @@ -# The extended RPi.GPIO API -from RPi._GPIO import setup, cleanup, output, input, setmode, getmode, add_event_detect, remove_event_detect, event_detected, \ - add_event_callback, wait_for_edge, gpio_function, setwarnings, \ - getbias, setbias, getdirection, setdirection, getactive_state, setactive_state, \ - channel_valid_or_die, \ - BCM, BOARD, UNKNOWN, IN, OUT, RISING, FALLING, BOTH, PUD_UP, PUD_DOWN, PUD_OFF, PUD_DISABLE, \ - HIGH, LOW, PWM, I2C, SPI, HARD_PWM, SERIAL +""" +The RPi.GPIO API +Originally created by Ben Croston +Reimplemented and extended by Joel Savitz and Fabrizio D'Angelo +This is free software, see LICENSE for details +""" + +from RPi.core import\ + BCM,\ + BOARD,\ + BOTH,\ + FALLING,\ + HARD_PWM,\ + HIGH,\ + I2C,\ + IN,\ + LOW,\ + OUT,\ + PUD_DISABLE,\ + PUD_DOWN,\ + PUD_OFF,\ + PUD_UP,\ + PWM,\ + RISING,\ + RPI_INFO,\ + RPI_REVISION,\ + SPI,\ + UNKNOWN,\ + VERSION,\ + add_event_callback,\ + add_event_detect,\ + channel_valid_or_die,\ + cleanup,\ + event_detected, \ + getactive_state,\ + getbias,\ + getdirection,\ + getmode,\ + gpio_function,\ + input,\ + output,\ + remove_event_detect,\ + setactive_state,\ + setbias,\ + setdirection,\ + setmode,\ + setup,\ + setwarnings,\ + wait_for_edge diff --git a/RPi/GPIO_DEVEL/__init__.py b/RPi/GPIO_DEVEL/__init__.py index 6df1e84..3455730 100644 --- a/RPi/GPIO_DEVEL/__init__.py +++ b/RPi/GPIO_DEVEL/__init__.py @@ -1,3 +1,21 @@ -# Development functions, not needed for normal use -from RPi._GPIO import Reset, State_Access, setdebuginfo, is_all_ints, is_all_bools_or_directions,\ - is_iterable, line_get_mode, line_set_mode, _line_mode_none, _line_mode_out, _line_mode_in +""" +The new RPi.GPIO_DEVEL development and debug API +By Joel Savitz and Fabrizio D'Angelo +This is free software, see LICENSE for details +""" + +# We have added functions and constants to this list as we have seen +# necesary but we are open to adding more if there is any interest + +from RPi.core import\ + is_all_bools_or_directions,\ + is_all_ints,\ + is_iterable,\ + Reset,\ + setdebuginfo,\ + State_Access,\ + line_get_mode,\ + line_set_mode,\ + _line_mode_in,\ + _line_mode_none,\ + _line_mode_out diff --git a/RPi/_GPIO.py b/RPi/core.py similarity index 84% rename from RPi/_GPIO.py rename to RPi/core.py index 9405f3a..3137b74 100644 --- a/RPi/_GPIO.py +++ b/RPi/core.py @@ -1,16 +1,18 @@ -# -# Core implementation of python3-libgpiod-rpi -# By Joel Savitz and Fabrizio D'Angelo -# This is free software, see LICENSE for details -# +""" +Core implementation of python3-libgpiod-rpi +By Joel Savitz and Fabrizio D'Angelo +This is free software, see LICENSE for details +""" + import gpiod from warnings import warn import os import sys import time from threading import Thread, Event, Lock +import atexit -# BCM to Board mode conversion table +# BCM to Board mode conversion table for Raspbery Pi 3 Model B pin_to_gpio_rev3 = [ -1, -1, -1, 2, -1, 3, -1, 4, 14, -1, # NOQA 15, 17, 18, 27, -1, 22, 23, -1, 24, 10, # NOQA @@ -18,23 +20,42 @@ -1, 6, 12, 13, -1, 19, 16, 26, 20, -1, 21 # NOQA ] + # === User Facing Data === # Exact values for constants taken from RPi.GPIO source code # file: source/common.h +# [API] RPi.GPIO API version (not python3-libgpiod-rpi version) +# NOTE: we currently only officially support the Raspbery Pi 3 Model B +# but we soon plan to implement support for the Raspbery Pi 4 Model B +# We are limited by the hardware available to the developers +VERSION = "0.7.0" + +# [API] Hardware information +RPI_INFO = { + "P1_REVISION": 3, + "REVISION": "a22082", + "TYPE": "Pi 3 Model B", + "MANUFACTURER": "Embest" + "PROCESSOR" "BCM2837", + "RAM": "1G", +} +# [API] Depcrecated source of hardware information +RPI_REVISION = RPI_INFO["P1_REVISION"] + # [API] Pin numbering modes UNKNOWN = -1 BCM = 11 BOARD = 10 -# [API] Random constants +# [API] Random constants defined but unused in the latest RPi.GPIO release SERIAL = 40 SPI = 41 I2C = 42 HARD_PWM = 43 -# Output modes +# [API] Output modes LOW = gpiod.Line.ACTIVE_LOW HIGH = gpiod.Line.ACTIVE_HIGH @@ -49,13 +70,13 @@ def active_flag(const): return _LINE_ACTIVE_STATE_COSNT_TO_FLAG[const] +# [API] Software pull up/pull down resistor modes # We map RPi.GPIO PUD modes to libgpiod PUD constants PUD_OFF = gpiod.Line.BIAS_AS_IS PUD_UP = gpiod.Line.BIAS_PULL_UP PUD_DOWN = gpiod.Line.BIAS_PULL_DOWN -# We extend RPi.GPIO with the ability to explicitly disable pull up/down -# behavior +# We extend RPi.GPIO with the ability to explicitly disable pull up/down behavior PUD_DISABLE = gpiod.Line.BIAS_DISABLE # libgpiod uses distinct flag values for each line bias constant returned by @@ -74,7 +95,7 @@ def bias_flag(const): return _LINE_BIAS_CONST_TO_FLAG[const] -# internal line modes +# Internal line modes _line_mode_none = 0 _line_mode_in = gpiod.LINE_REQ_DIR_IN _line_mode_out = gpiod.LINE_REQ_DIR_OUT @@ -86,7 +107,7 @@ def bias_flag(const): _line_mode_as_is = gpiod.LINE_REQ_DIR_AS_IS -# [API] Request types +# [API] Line event types FALLING = _line_mode_falling RISING = _line_mode_rising BOTH = _line_mode_both @@ -132,9 +153,9 @@ def __init__(self, channel, target_type, args): def kill(self): self.killswitch.set() - end_critical_section(self.channel, msg="drop lock and join poll thread") + end_critical_section(self.channel, msg="drop lock and join line thread") self.join() - begin_critical_section(self.channel, msg="poll thread dead so get lock") + begin_critical_section(self.channel, msg="line thread dead so get lock") class _Line: @@ -220,11 +241,6 @@ def DCprint(channel, *msgargs): Dprint("[{}]".format(channel), *msgargs) -# Mess with the internal state for development or recreational purposes -def State_Access(): - return _State - - # Reset internal state to default def Reset(): Dprint("Reset begins") @@ -246,23 +262,29 @@ def Reset(): Dprint("Reset commplete") -def is_all_ints(data): +def State_Access(): + # The purpose of this funtion is to allow the user to mess with the + # internal state of the library for development or recreational purposes + return _State + + +def is_all_bools_or_directions(data): if not is_iterable(data): data = [data] if len(data) < 1: return False - return all([isinstance(elem, int) for elem in data]) \ - if not isinstance(data, int)\ + return all([(isinstance(elem, bool) or elem in [HIGH, LOW]) for elem in data]) \ + if not (isinstance(data, bool) or data in [HIGH, LOW])\ else True -def is_all_bools_or_directions(data): +def is_all_ints(data): if not is_iterable(data): data = [data] if len(data) < 1: return False - return all([(isinstance(elem, bool) or elem in [HIGH, LOW]) for elem in data]) \ - if not (isinstance(data, bool) or data in [HIGH, LOW])\ + return all([isinstance(elem, int) for elem in data]) \ + if not isinstance(data, int)\ else True @@ -275,8 +297,29 @@ def is_iterable(data): return True +def setdebuginfo(value): + """Enable or disable debug messages""" + if not value: + Dprint("debuginfo output set to", _State.debuginfo) + _State.debuginfo = bool(value) + if value: + Dprint("debuginfo output set to", _State.debuginfo) + + +def wait_for_edge_validation(edge, bouncetime, timeout): + if edge not in [RISING, FALLING, BOTH]: + raise ValueError("The edge must be set to RISING, FALLING or BOTH") + + if bouncetime is not None and bouncetime <= 0: + raise ValueError("Bouncetime must be greater than 0") + + if timeout and timeout < 0: + # error semantics differ from RPi.GPIO + raise ValueError("Timeout must be greater than or equal to 0") + + def channel_fix_and_validate_bcm(channel): - if channel < 0 or channel > _State.chip.num_lines() - 1: + if channel < 0 or channel > chip_get_num_lines() - 1: raise ValueError("The channel sent is invalid on a Raspberry Pi") else: return channel @@ -312,16 +355,6 @@ def channel_fix_and_validate(channel_raw): return channel_fix_and_validate_board(channel_raw) -def channel_valid_or_die(channel): - """ - Validate a channel/pin number - Returns the pin number on success otherwise throws a ValueError - - channel - an integer to be validated as a channel - """ - channel_fix_and_validate(channel) - - def validate_gpio_dev_exists(): # This function only ever needs to be run once if validate_gpio_dev_exists.found: @@ -349,7 +382,7 @@ def chip_init(): try: _State.chip = gpiod.Chip("gpiochip0") except PermissionError: - print("Script or interpreter must be run as root") + print("Unable to access /dev/gpiochip0. Are you sure you have permission?") sys.exit() Dprint("state chip has value:", _State.chip) @@ -359,6 +392,10 @@ def chip_close(): _State.chip = None +def chip_is_open(): + return _State.chip is not None + + def chip_init_if_needed(): if _State.chip is None: chip_init() @@ -424,6 +461,10 @@ def line_get_active_state(channel): return _State.lines[channel].line.active_state() +def line_get_direction(channel): + return _LINE_MODE_TO_DIR_CONST[line_get_mode(channel)] + + def line_get_bias(channel): return _State.lines[channel].line.bias() @@ -476,20 +517,24 @@ def line_pwm_start(channel, dutycycle): if not line_is_pwm(channel) and \ line_pwm_get_frequency(channel) != -1: begin_critical_section(channel, msg="pwm start") + + # If you forgot to setup this channel as an output, we've got you + line_set_mode(channel, _line_mode_out) + line_pwm_set_dutycycle(channel, dutycycle) _State.lines[channel].thread_start(_line_thread_pwm, args=(channel,)) + end_critical_section(channel, msg="pwm start") return line_is_pwm(channel) + else: + warn("invalid call to pwm_start(). Did you call PWM.__init__() on this channel?") + return False + # If the line is already running a PwM thread # return True, but if there is no thread running # and the user tried to call PWM.start() before # calling PWM.__init__() somewhow, then # return False and tell them to call init. - if line_is_pwm(channel): - return True - else: - warn("invalid call to pwm_start(). Did you call PWM.__init__() on this channel?") - return False def line_pwm_stop(channel): @@ -550,220 +595,239 @@ def line_set_value(channel, value): def line_get_value(channel): - _State.lines[channel].line.get_value() + return _State.lines[channel].line.get_value() -# === Interface Functions === +def line_event_wait_lock(channel, bouncetime, timeout): + begin_critical_section(channel, msg="event wait") + ret = line_event_wait(channel, bouncetime, timeout) + end_critical_section(channel, msg="event wait") + return ret -def setmode(mode): - """ - Set up numbering mode to use for channels. - BOARD - Use Raspberry Pi board numbers - BCM - Use Broadcom GPIO 00..nn numbers - """ - if _State.mode != UNKNOWN: - raise ValueError("A different mode has already been set!") - if mode != BCM and mode != BOARD: - raise ValueError("An invalid mode was passed to setmode()") +# requires lock +def line_event_wait(channel, bouncetime, timeout): + # Split up timeout into appropriate parts + timeout_sec = int(int(timeout) / 1000) + timeout_nsec = (int(timeout) % 1000) * 1000 - _State.mode = mode + ret = None - chip_init_if_needed() + # We only care about bouncetime if it is explicitly speficied in the call to this function or if + # this is not the first call to wait_for_edge on the specified pin + if bouncetime and _State.lines[channel].timestamp and \ + time.time() - _State.lines[channel].timestamp < bouncetime: + pass + elif _State.lines[channel].line.event_wait(sec=timeout_sec, nsec=timeout_nsec): + _State.lines[channel].timestamp = time.time() + if channel not in _State.event_ls: + # Ensure no double appends. FIXME: should this be done outside of a poll thread? + _State.event_ls.append(channel) + event = _State.lines[channel].line.event_read() - Dprint("mode set to", _State.mode) + # A hack to clear the event buffer by reading a bunch of bytes + # from the underlying file representing the GPIO line + eventfd = _State.lines[channel].line.event_get_fd() + os.read(eventfd, 10000) + if event: + ret = channel + return ret -def setwarnings(value): - """Enable or disable warning messages""" - _State.warnings = bool(value) - Dprint("warning output set to", _State.warnings) +def line_add_callback(channel, callback): + begin_critical_section(channel, "add callback") + _State.lines[channel].callbacks.append(callback) + end_critical_section(channel, "add callback") -def setdebuginfo(value): - """Enable or disable debug messages""" - _State.debuginfo = bool(value) - Dprint("debuginfo output set to", _State.debuginfo) +def line_thread_should_die(channel): + return _State.lines[channel].thread.killswitch.is_set() -def setup(channel, direction, pull_up_down=PUD_OFF, initial=None): - """ - Set up a GPIO channel or list of channels with a direction and (optional) pull/up down control - channel - either board pin number or BCM number depending on which mode is set. - direction - IN or OUT - [pull_up_down] - PUD_OFF (default), PUD_UP or PUD_DOWN - [initial] - Initial value for an output channel - """ - # Channel must contain only integral data - if not is_all_ints(channel): - raise ValueError("Channel must be an integer or list/tuple of integers") +TEN_MILLISECONDS_IN_SECONDS = 0.0010 - # Direction must be valid - if direction != IN and direction != OUT: - raise ValueError("An invalid direction was passed to setup()") - if direction == OUT and pull_up_down != PUD_OFF: - raise ValueError("pull_up_down parameter is not valid for outputs") +def line_do_poll(channel, bouncetime, timeout): - if direction == IN and initial: - raise ValueError("initial parameter is not valid for inputs") + while True: + begin_critical_section(channel, msg="do poll") + if line_thread_should_die(channel): + end_critical_section(channel, msg="do poll exit") + break + if line_event_wait(channel, bouncetime, timeout): + callbacks = _State.lines[channel].callbacks + for fn in callbacks(): + fn() + end_critical_section(channel, msg="do poll") + time.sleep(TEN_MILLISECONDS_IN_SECONDS) - if pull_up_down not in [PUD_OFF, PUD_UP, PUD_DOWN, PUD_DISABLE]: - raise ValueError("Invalid value for pull_up_down - should be either PUD_OFF, PUD_UP, PUD_DOWN, or PUD_DISABLE") - # Make the channel data iterable by force - if not is_iterable(channel): - channel = [channel] +def poll_thread(channel, edge, callback, bouncetime): - # This implements BOARD mode - for pin in channel: - pin = channel_fix_and_validate(pin) + # FIXME: this is arbitrary + timeout = 10 # milliseconds + wait_for_edge_validation(edge, bouncetime, timeout) - request_flags = 0 - request_flags |= bias_flag(pull_up_down) + DCprint(channel, "launch poll thread") + line_do_poll(channel, bouncetime, timeout) + DCprint(channel, "terminate poll thread") - for pin in channel: - try: - line_set_mode(pin, direction, request_flags) - if initial is not None: - line_set_value(pin, initial) - except OSError: - warn("This channel is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.\n \ - Further attemps to use channel {} will fail unless setup() is run again sucessfully".format(pin)) +# NOTE: RPi.GPIO specifies: +# Default to 1 kHz frequency 0.0% dutycycle +# but interface functions require explicit arguments +def pwm_thread(channel): + DCprint(channel, "begin PwM thread with dutycycle {}% and frequency {} Hz".format(_State.lines[channel].dutycycle, + _State.lines[channel].frequency)) + + # We wrap the loop with a try except so we can drop the lock and exit if + # access to the channel is suddenly revoked by the main thread + try: + while True: + begin_critical_section(channel, msg="do pwm") + if line_thread_should_die(channel): + end_critical_section(channel, msg="do pwm exit") + break + if _State.lines[channel].dutycycle > 0: + line_set_value(channel, True) + DCprint(channel, "PwM: ON") + # PwM calculation for high voltage part of period: + time.sleep(1 / _State.lines[channel].frequency * (_State.lines[channel].dutycycle / 100.0)) + if _State.lines[channel].dutycycle < 100: + line_set_value(channel, False) + DCprint(channel, "PwM: OFF") + # PwM calculation for low voltage part of period: + time.sleep(1 / _State.lines[channel].frequency * (1.0 - _State.lines[channel].dutycycle / 100.0)) + end_critical_section(channel, msg="do pwm") + time.sleep(TEN_MILLISECONDS_IN_SECONDS) + # arbitrary time to sleep without lock + # TODO: may interfere with overall timing of PwM but it's rough anyway + except (ValueError, PermissionError): + # If this thread suddenly fails to access a channel, exit gracefully + end_critical_section(channel, msg="do pwm sudden exit") + + +# === RPi.GPIO API entry points === -def output(channel, value): - """ - Output to a GPIO channel or list of channel - channel - either board pin number or BCM number depending on which mode is set. - value - 0/1 or False/True or LOW/HIGH - {compat} channel and value parameters may be lists or tuples of equal length +def add_event_callback(channel, callback): """ - if not is_all_ints(channel): - raise ValueError("Channel must be an integer or list/tuple of integers") + Add a callback for an event already defined using add_event_detect() + channel - either board pin number or BCM number depending on which mode is set. + callback - a callback function" - if not is_iterable(channel): - channel = [channel] + {compat} we do not require that the channel be setup as an input + """ # This implements BOARD mode - for chan in channel: - chan = channel_fix_and_validate(chan) - - if (not is_all_ints(value)) and (not is_all_bools_or_directions(value)): - raise ValueError("Value must be an integer/boolean or a list/tuple of integers/booleans") - - if not is_iterable(value): - value = [value] + channel = channel_fix_and_validate(channel) - # Normalize the value argument - for i in range(len(value)): - if value[i] == HIGH: - value[i] = True - if value[i] == LOW: - value[i] = False + if not line_is_poll(channel): + raise RuntimeError("Add event detection using add_event_detect first before adding a callback") - if len(channel) != len(value): - raise RuntimeError("Number of channel != number of value") + if not callable(callback): + raise TypeError("Parameter must be callable") - for chan, val in zip(channel, value): - if line_get_mode(chan) != _line_mode_out: - warn("The GPIO channel has not been set up as an OUTPUT\n\tSkipping channel {}".format(chan)) - else: - try: - line_set_value(chan, bool(val)) - except PermissionError: - warn("Unable to set value of channel {}, did you forget to run setup()? Or did setup() fail?".format(chan)) + line_add_callback(channel, callback) -def input(channel): +def add_event_detect(channel, edge, callback=None, bouncetime=None): """ - Input from a GPIO channel. Returns HIGH=1=True or LOW=0=False - # channel - either board pin number or BCM number depending on which mode is set. + Enable edge detection events for a particular GPIO channel. + channel - either board pin number or BCM number depending on which mode is set. + edge - RISING, FALLING or BOTH + [callback] - A callback function for the event (optional) + [bouncetime] - Switch bounce timeout in ms for callback + + {compat} we do not require that the channel be setup as an input as a prerequiste to running this function, + however the initial value on the channel is undefined """ # This implements BOARD mode channel = channel_fix_and_validate(channel) - # this does't really make sense but it matches rpi gpio source code logic - if getdirection(channel) not in [IN, OUT]: - raise RuntimeError("You must setup() the GPIO channel first") + valid_edges = [RISING, FALLING, BOTH] + if edge not in valid_edges: + raise ValueError("The edge must be set to RISING, FALLING or BOTH") - # TODO I feel like we should do more validation + if callback and not callable(callback): + raise TypeError("Parameter must be callable") - return line_get_value(channel) + if bouncetime and bouncetime <= 0: + raise ValueError("Bouncetime must be greater than 0") + line_set_mode(channel, edge) + line_poll_start(channel, edge, callback, bouncetime) -def getmode(): - """ - Get numbering mode used for channel numbers. - Returns BOARD, BCM or None - """ - return _State.mode if _State.mode else None - - -def getbias(channel): - """ - Get bias mode of an active channel - Returns PUD_OFF, PUD_DOWN, PUD_UP, or PUD disabled if the channel is - active or just PUD_OFF if the channel is not active. +def channel_valid_or_die(channel): """ + Validate a channel/pin number + Returns the pin number on success otherwise throws a ValueError - channel = channel_fix_and_validate(channel) - - if line_is_active(channel): - return line_get_bias(channel) - else: - return PUD_OFF + channel - an integer to be validated as a channel + """ + channel_fix_and_validate(channel) -def setbias(channel, bias): +def cleanup(channels=None): """ - Set bias of an active channel + Clean up by resetting all GPIO channels that have been used by this program to INPUT with no pullup/pulldown and no event detection + [channel] - individual channel or list/tuple of channels to clean up. + Default - clean every channel that has been used. + + {compat} Cleanup is mostly handled by libgpiod and the kernel, but we use this opportunity to kill any running callback poll threads + as well as close any open file descriptors """ - channel = channel_fix_and_validate(channel) + if not chip_is_open(): + chip_init_if_needed() + _State.lines = [_Line(channel) for channel in range(chip_get_num_lines())] - if bias not in [PUD_OFF, PUD_UP, PUD_DOWN, PUD_DISABLE]: - raise ValueError("An invalid bias was passed to setbias()") + destroy = False + if channels is None: + destroy = True + channels = [i for i in range(chip_get_num_lines())] - current = getbias(channel) - if bias != current: - flags = line_get_flags(channel) - flags &= ~bias_flag(getbias(channel)) - flags |= bias_flag(bias) - line_set_flags(channel, flags) + if not is_all_ints(channels): + raise ValueError("Channel must be an integer or list/tuple of integers") + elif isinstance(channels, tuple): + # Convert tuples to lists to make them writable for normalization of values + channels = [c for c in channels] + if not destroy: + Dprint("NOT DESTROY: iterable=", is_iterable(channels)) + if not is_iterable(channels): + channels = [channels] + for i in range(len(channels)): + # This implements BOARD mode + channels[i] = channel_fix_and_validate(channels[i]) -def getdirection(channel): - """ - Get direction of an active channel - Returns OUT if the channel is in an output mode, IN if the channel is in an input mode, - and -1 otherwise - """ + Dprint("cleanup {} lines".format(len(channels))) + if not destroy: + Dprint("channels:", channels) + for chan in channels: + line_set_mode(chan, _line_mode_none) - channel = channel_fix_and_validate(channel) - return _LINE_MODE_TO_DIR_CONST[line_get_mode(channel)] + if destroy: + chip_destroy() -def setdirection(channel, direction): +def event_detected(channel): """ - Set direction of an active channel + Returns True if an edge has occurred on a given GPIO. You need to enable edge detection using add_event_detect() first. + channel - either board pin number or BCM number depending on which mode is set." """ + # This implements BOARD mode channel = channel_fix_and_validate(channel) - if direction != IN and direction != OUT: - raise ValueError("An invalid direction was passed to setdirection()") - - current = getdirection(channel) - if current != -1: - if current == IN and direction == OUT: - line_set_mode(channel, _line_mode_out) - elif current == OUT and direction == IN: - line_set_mode(channel, _line_mode_in) + if channel in _State.event_ls: + _State.event_ls.remove(channel) + return True + else: + return False def getactive_state(channel): @@ -780,285 +844,312 @@ def getactive_state(channel): return -1 -def setactive_state(channel, active_state): +def getbias(channel): """ - Set the active_state of an active channel + Get bias mode of an active channel + Returns PUD_OFF, PUD_DOWN, PUD_UP, or PUD disabled if the channel is + active or just PUD_OFF if the channel is not active. """ channel = channel_fix_and_validate(channel) - if active_state not in [HIGH, LOW]: - raise ValueError("An active state was passed to setactive_state()") + if line_is_active(channel): + return line_get_bias(channel) + else: + return PUD_OFF - current = getactive_state(channel) - if active_state != current: - flags = line_get_flags(channel) - flags &= ~active_flag(getactive_state(channel)) - flags |= active_flag(active_state) - line_set_flags(channel, flags) +def getdirection(channel): + """ + Get direction of an active channel + Returns OUT if the channel is in an output mode, IN if the channel is in an input mode, + and -1 otherwise + """ -def wait_for_edge_validation(edge, bouncetime, timeout): - if edge not in [RISING, FALLING, BOTH]: - raise ValueError("The edge must be set to RISING, FALLING or BOTH") + channel = channel_fix_and_validate(channel) + return line_get_direction(channel) - if bouncetime is not None and bouncetime <= 0: - raise ValueError("Bouncetime must be greater than 0") - if timeout and timeout < 0: - # error semantics differ from RPi.GPIO - raise ValueError("Timeout must be greater than or equal to 0") +def getmode(): + """ + Get numbering mode used for channel numbers. + Returns BOARD, BCM or None + """ + return _State.mode if _State.mode else None -def wait_for_edge(channel, edge, bouncetime=None, timeout=0): - """ - Wait for an edge. Returns the channel number or None on timeout. - channel - either board pin number or BCM number depending on which mode is set. - edge - RISING, FALLING or BOTH - [bouncetime] - time allowed between calls to allow for switchbounce - [timeout] - timeout in ms - {compat} bouncetime units are in seconds. this is subject to change +def gpio_function(channel): """ + Return the current GPIO function (IN, OUT, PWM, SERIAL, I2C, SPI) + channel - either board pin number or BCM number depending on which mode is set. - # Running this function before setup is allowed but the initial pin value is undefined - # RPi.GPIO requires one to setup a pin as input before using it for event detection, - # while libgpiod provides an interface that keeps the two mutually exclusive. We get around - # this by not requiring it, though to maintain the same semantics as RPi.GPIO, we attempt - # to release the channel's handle as a an input value, and acquire a new handle for an - # event value. + {compat} This is a stateless function that will return a constant value for every pin + """ # This implements BOARD mode channel = channel_fix_and_validate(channel) - wait_for_edge_validation(edge, bouncetime, timeout) - - # ensure the line is in the right mode - # FIXME does this break input mode? - try: - line_set_mode(channel, edge) - except OSError: - raise RuntimeError("Channel is currently in use (Device or Resource Busy)") - - return line_event_wait_lock(channel, bouncetime, timeout) - + mode = line_get_mode(channel) -def line_event_wait_lock(channel, bouncetime, timeout): - begin_critical_section(channel, msg="event wait") - ret = line_event_wait(channel, bouncetime, timeout) - end_critical_section(channel, msg="event wait") - return ret + if mode == _line_mode_out: + return OUT + elif mode == _line_mode_in: + return IN + else: + return UNKNOWN + # We will provide support for other potential values of gpio_function + # when the underlying functions (SPI, SERIAL, I2C, HARD_PWM) are implemented. -# requires lock -def line_event_wait(channel, bouncetime, timeout): - # Split up timeout into appropriate parts - timeout_sec = int(int(timeout) / 1000) - timeout_nsec = (int(timeout) % 1000) * 1000 - # We only care about bouncetime if it is explicitly speficied in the call to this function or if - # this is not the first call to wait_for_edge on the specified pin - if bouncetime and _State.lines[channel].timestamp and \ - time.time() - _State.lines[channel].timestamp < bouncetime: - ret = None - elif _State.lines[channel].line.event_wait(sec=timeout_sec, nsec=timeout_nsec): - _State.lines[channel].timestamp = time.time() - if channel not in _State.event_ls: - # Ensure no double appends. FIXME: should this be done outside of a poll thread? - _State.event_ls.append(channel) - event = _State.lines[channel].line.event_read() +def input(channel): + """ + Input from a GPIO channel. Returns HIGH=1=True or LOW=0=False + # channel - either board pin number or BCM number depending on which mode is set. + """ - # A hack to clear the event buffer by reading a bunch of bytes - # from the underlying file representing the GPIO line - eventfd = _State.lines[channel].line.event_get_fd() - os.read(eventfd, 10000) - ret = event - else: - ret = None + # This implements BOARD mode + channel = channel_fix_and_validate(channel) - return ret + # this does't really make sense but it matches rpi gpio source code logic + if getdirection(channel) not in [IN, OUT]: + raise RuntimeError("You must setup() the GPIO channel first") + # TODO I feel like we should do more validation -def line_thread_should_die(channel): - return _State.lines[channel].thread.killswitch.is_set() + return line_get_value(channel) -TEN_MILLISECONDS_IN_SECONDS = 0.0010 +def output(channel, value): + """ + Output to a GPIO channel or list of channel + channel - either board pin number or BCM number depending on which mode is set. + value - 0/1 or False/True or LOW/HIGH + {compat} channel and value parameters may be lists or tuples of equal length + """ + if not is_all_ints(channel): + raise ValueError("Channel must be an integer or list/tuple of integers") -def line_do_poll(channel, bouncetime, timeout): + if not is_iterable(channel): + channel = [channel] - while True: - begin_critical_section(channel, msg="do poll") - if line_thread_should_die(channel): - end_critical_section(channel, msg="do poll exit") - break - if line_event_wait(channel, bouncetime, timeout): - callbacks = _State.lines[channel].callbacks - for fn in callbacks(): - fn() - end_critical_section(channel, msg="do poll") - time.sleep(TEN_MILLISECONDS_IN_SECONDS) + if (not is_all_ints(value)) and (not is_all_bools_or_directions(value)): + raise ValueError("Value must be an integer/boolean or a list/tuple of integers/booleans") + # Convert tuples to lists to make them writable for normalization of values + if isinstance(value, tuple): + value = [v for v in value] -def poll_thread(channel, edge, callback, bouncetime): + # If there is a single value provided, we set each channel to that value + if not is_iterable(value): + value = [value for i in range(len(channel))] - # FIXME: this is arbitrary - timeout = 10 # milliseconds - wait_for_edge_validation(edge, bouncetime, timeout) + # This implements BOARD mode for all input cases + for i in range(len(channel)): + channel[i] = channel_fix_and_validate(channel[i]) - DCprint(channel, "launch poll thread") - line_do_poll(channel, bouncetime, timeout) - DCprint(channel, "terminate poll thread") + # Normalize the value argument + for i in range(len(value)): + if value[i] == HIGH: + value[i] = True + if value[i] == LOW: + value[i] = False + Dprint("channel", channel, "value", value) + if len(channel) != len(value): + raise RuntimeError("Number of channel != number of value") -# NOTE: RPi.GPIO specifies: -# Default to 1 kHz frequency 0.0% dutycycle -# but interface functions require explicit arguments -def pwm_thread(channel): - DCprint(channel, "begin PwM thread with dutycycle {}% and frequency {} Hz".format(_State.lines[channel].dutycycle, - _State.lines[channel].frequency)) - while True: - begin_critical_section(channel, msg="do pwm") - if line_thread_should_die(channel): - end_critical_section(channel, msg="do pwm exit") - break - if _State.lines[channel].dutycycle > 0: - line_set_value(channel, True) - DCprint(channel, "PwM: ON") - # PwM calculation for high voltage part of period: - time.sleep(1 / _State.lines[channel].frequency * (_State.lines[channel].dutycycle / 100.0)) - if _State.lines[channel].dutycycle < 100: - line_set_value(channel, False) - DCprint(channel, "PwM: OFF") - # PwM calculation for low voltage part of period: - time.sleep(1 / _State.lines[channel].frequency * (1.0 - _State.lines[channel].dutycycle / 100.0)) - end_critical_section(channel, msg="do pwm") - time.sleep(TEN_MILLISECONDS_IN_SECONDS) - # arbitrary time to sleep without lock, TODO: may interfere with overall timing of PwM but it's rough anyway + for chan, val in zip(channel, value): + if line_get_mode(chan) != _line_mode_out: + warn("The GPIO channel has not been set up as an OUTPUT\n\tSkipping channel {}".format(chan)) + else: + try: + line_set_value(chan, bool(val)) + except PermissionError: + warn("Unable to set value of channel {}, did you forget to run setup()? Or did setup() fail?".format(chan)) -def add_event_detect(channel, edge, callback=None, bouncetime=None): +def remove_event_detect(channel): """ - Enable edge detection events for a particular GPIO channel. - channel - either board pin number or BCM number depending on which mode is set. - edge - RISING, FALLING or BOTH - [callback] - A callback function for the event (optional) - [bouncetime] - Switch bounce timeout in ms for callback - - {compat} we do not require that the channel be setup as an input as a prerequiste to running this function, - however the initial value on the channel is undefined + Remove edge detection for a particular GPIO channel + channel - either board pin number or BCM number depending on which mode is set. """ # This implements BOARD mode channel = channel_fix_and_validate(channel) - valid_edges = [RISING, FALLING, BOTH] - if edge not in valid_edges: - raise ValueError("The edge must be set to RISING, FALLING or BOTH") + if line_is_poll(channel): + line_kill_poll_lock(channel) + else: + raise ValueError("event detection not setup on channel {}".format(channel)) - if callback and not callable(callback): - raise TypeError("Parameter must be callable") - if bouncetime and bouncetime <= 0: - raise ValueError("Bouncetime must be greater than 0") +def setactive_state(channel, active_state): + """ + Set the active_state of an active channel + """ - line_set_mode(channel, edge) - line_poll_start(channel, edge, callback, bouncetime) + channel = channel_fix_and_validate(channel) + if active_state not in [HIGH, LOW]: + raise ValueError("An active state was passed to setactive_state()") -def add_event_callback(channel, callback): - """ - Add a callback for an event already defined using add_event_detect() - channel - either board pin number or BCM number depending on which mode is set. - callback - a callback function" + current = getactive_state(channel) + if active_state != current: + flags = line_get_flags(channel) + flags &= ~active_flag(getactive_state(channel)) + flags |= active_flag(active_state) + line_set_flags(channel, flags) - {compat} we do not require that the channel be setup as an input + +def setbias(channel, bias): + """ + Set bias of an active channel """ - # This implements BOARD mode channel = channel_fix_and_validate(channel) - if not line_is_poll(channel): - raise RuntimeError("Add event detection using add_event_detect first before adding a callback") + if bias not in [PUD_OFF, PUD_UP, PUD_DOWN, PUD_DISABLE]: + raise ValueError("An invalid bias was passed to setbias()") - if not callable(callback): - raise TypeError("Parameter must be callable") + current = getbias(channel) + if bias != current: + flags = line_get_flags(channel) + flags &= ~bias_flag(getbias(channel)) + flags |= bias_flag(bias) + line_set_flags(channel, flags) - line_add_callback(channel, callback) +def setdirection(channel, direction): + """ + Set direction of an active channel + """ -def line_add_callback(channel, callback): - begin_critical_section(channel, "add callback") - _State.lines[channel].callbacks.append(callback) - end_critical_section(channel, "add callback") + channel = channel_fix_and_validate(channel) + + if direction != IN and direction != OUT: + raise ValueError("An invalid direction was passed to setdirection()") + current = line_get_direction(channel) + if current != -1: + if current == IN and direction == OUT: + line_set_mode(channel, _line_mode_out) + elif current == OUT and direction == IN: + line_set_mode(channel, _line_mode_in) -def remove_event_detect(channel): + +def setmode(mode): """ - Remove edge detection for a particular GPIO channel - channel - either board pin number or BCM number depending on which mode is set. + Set up numbering mode to use for channels. + BOARD - Use Raspberry Pi board numbers + BCM - Use Broadcom GPIO 00..nn numbers """ + if _State.mode != UNKNOWN: + raise ValueError("A different mode has already been set!") - # This implements BOARD mode - channel = channel_fix_and_validate(channel) + if mode != BCM and mode != BOARD: + raise ValueError("An invalid mode was passed to setmode()") - if line_is_poll(channel): - line_kill_poll_lock(channel) - else: - raise ValueError("event detection not setup on channel {}".format(channel)) + _State.mode = mode + chip_init_if_needed() -def event_detected(channel): + Dprint("mode set to", _State.mode) + + +def setup(channel, direction, pull_up_down=PUD_OFF, initial=None): """ - Returns True if an edge has occurred on a given GPIO. You need to enable edge detection using add_event_detect() first. - channel - either board pin number or BCM number depending on which mode is set." + Set up a GPIO channel or list of channels with a direction and (optional) pull/up down control + channel - either board pin number or BCM number depending on which mode is set. + direction - IN or OUT + [pull_up_down] - PUD_OFF (default), PUD_UP or PUD_DOWN + [initial] - Initial value for an output channel """ - # This implements BOARD mode - channel = channel_fix_and_validate(channel) + # Channel must contain only integral data + if not is_all_ints(channel): + raise ValueError("Channel must be an integer or list/tuple of integers") - if channel in _State.event_ls: - _State.event_ls.remove(channel) - return True - else: - return False + # Direction must be valid + if direction != IN and direction != OUT: + raise ValueError("An invalid direction was passed to setup()") + + if direction == OUT and pull_up_down != PUD_OFF: + raise ValueError("pull_up_down parameter is not valid for outputs") + if direction == IN and initial: + raise ValueError("initial parameter is not valid for inputs") -def cleanup(): - """ - Clean up by resetting all GPIO channels that have been used by this program to INPUT with no pullup/pulldown and no event detection - [channel] - individual channel or list/tuple of channels to clean up. - Default - clean every channel that has been used. + if pull_up_down not in [PUD_OFF, PUD_UP, PUD_DOWN, PUD_DISABLE]: + raise ValueError("Invalid value for pull_up_down - should be either PUD_OFF, PUD_UP, PUD_DOWN, or PUD_DISABLE") - {compat} Cleanup is mostly handled by libgpiod and the kernel, but we use this opportunity to kill any running callback poll threads - as well as close any open file descriptors - """ + # Convert tuples to lists to make them writable for validations of channels + if isinstance(channel, tuple): + channel = [c for c in channel] + + # Make the channel data iterable by force + if not is_iterable(channel): + channel = [channel] - Dprint("cleanup {} lines".format(len(_State.lines))) - for channel in range(len(_State.lines)): - line_set_mode(channel, _line_mode_none) + # This implements BOARD mode + for i in range(len(channel)): + channel[i] = channel_fix_and_validate(channel[i]) - chip_destroy() + request_flags = 0 + request_flags |= bias_flag(pull_up_down) + for pin in channel: + try: + line_set_mode(pin, direction, request_flags) + if initial is not None: + line_set_value(pin, initial) + except OSError: + warn("This channel is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.\n \ + Further attemps to use channel {} will fail unless setup() is run again sucessfully".format(pin)) -def get_gpio_number(channel): - return channel_fix_and_validate(channel) +def setwarnings(value): + """Enable or disable warning messages""" + _State.warnings = bool(value) + Dprint("warning output set to", _State.warnings) -def gpio_function(channel): + +def wait_for_edge(channel, edge, bouncetime=None, timeout=0): """ - Return the current GPIO function (IN, OUT, PWM, SERIAL, I2C, SPI) - channel - either board pin number or BCM number depending on which mode is set. + Wait for an edge. Returns the channel number or None on timeout. + channel - either board pin number or BCM number depending on which mode is set. + edge - RISING, FALLING or BOTH + [bouncetime] - time allowed between calls to allow for switchbounce + [timeout] - timeout in ms - {compat} This is a stateless function that will return a constant value for every pin + {compat} bouncetime units are in seconds. this is subject to change """ + # Running this function before setup is allowed but the initial pin value is undefined + # RPi.GPIO requires one to setup a pin as input before using it for event detection, + # while libgpiod provides an interface that keeps the two mutually exclusive. We get around + # this by not requiring it, though to maintain the same semantics as RPi.GPIO, we attempt + # to release the channel's handle as a an input value, and acquire a new handle for an + # event value. + # This implements BOARD mode channel = channel_fix_and_validate(channel) - # error handling is done in the called function - return get_gpio_number(channel) + wait_for_edge_validation(edge, bouncetime, timeout) + + # ensure the line is in the right mode + # FIXME does this break input mode? + try: + line_set_mode(channel, edge) + except OSError: + raise RuntimeError("Channel is currently in use (Device or Resource Busy)") + + return line_event_wait_lock(channel, bouncetime, timeout) + + +# === RPi.GPIO Pulse-width Modulation (PwM) class === class PWM: @@ -1085,7 +1176,7 @@ def start(self, dutycycle): if dutycycle < 0.0 or dutycycle > 100.0: raise ValueError("dutycycle must have a value from 0.0 to 100.0") - line_pwm_start(self.channel, dutycycle) + return line_pwm_start(self.channel, dutycycle) def stop(self): """ @@ -1115,11 +1206,18 @@ def ChangeFrequency(self, frequency): line_pwm_set_frequency(self.channel, frequency) +# === Library initialization === + + # Initialize the library with a reset Reset() + # line thead type to callable entry point mapping _LINE_THREAD_TYPE_TO_TARGET = { _line_thread_poll: poll_thread, _line_thread_pwm: pwm_thread, } + +# Run cleanup() when an interpreter using this module terminates +atexit.register(cleanup) diff --git a/examples/callback2.py b/examples/callback2.py index e7cc204..c46fd73 100755 --- a/examples/callback2.py +++ b/examples/callback2.py @@ -1,3 +1,4 @@ +#!/bin/python3 # Daniel's example (reproduced bug 6 until it was fixed) import RPi.GPIO as GPIO import time diff --git a/examples/flash18.py b/examples/flash18.py index 23baef6..25677e8 100755 --- a/examples/flash18.py +++ b/examples/flash18.py @@ -1,10 +1,8 @@ #!/bin/python3 import RPi.GPIO as GPIO -import RPi.GPIO_DEVEL as GPIO_DEVEL import time GPIO.setmode(GPIO.BCM) -GPIO_DEVEL.setdebuginfo(True) GPIO.setup(18, GPIO.OUT) for i in range(5): diff --git a/examples/input_examples.py b/examples/input_examples.py new file mode 100755 index 0000000..c79de9e --- /dev/null +++ b/examples/input_examples.py @@ -0,0 +1,75 @@ +#!/bin/python3 +# Input examples from Ben Croston +# source: https://sourceforge.net/p/raspberry-gpio-python/wiki/Inputs/ + +import RPi.GPIO as GPIO +import time + +GPIO.setmode(GPIO.BCM) + +channel = 18 + +GPIO.setup(channel, GPIO.IN, pull_up_down=GPIO.PUD_UP) +# or +GPIO.setup(channel, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) + +if GPIO.input(channel): + print('Input was HIGH') +else: + print('Input was LOW') + +# wait for button (real example doensn't have the "or False" at the end) +while GPIO.input(channel) == GPIO.LOW or False: + time.sleep(0.01) # wait 10 ms to give CPU chance to do other things + + +GPIO.wait_for_edge(channel, GPIO.RISING) + +# wait for up to 5 seconds for a rising edge (timeout is in milliseconds) +channel = GPIO.wait_for_edge(channel, GPIO.RISING, timeout=5000) +if channel is None: + print('Timeout occurred') +else: + print('Edge detected on channel', channel) + + +def do_something(): + print("running do_something()") + + +channel = 18 +GPIO.add_event_detect(channel, GPIO.RISING) # add rising edge detection on a channel +do_something() +if GPIO.event_detected(channel): + print('Button pressed') + + +def my_callback(channel): + print('This is a edge event callback function!') + print('Edge detected on channel %s' % channel) + print('This is run in a different thread to your main program') + + +GPIO.add_event_detect(channel, GPIO.RISING, callback=my_callback) # add rising edge detection on a channel +# ...the rest of your program... + + +def my_callback_one(channel): + print('Callback one') + + +def my_callback_two(channel): + print('Callback two') + + +GPIO.add_event_detect(channel, GPIO.RISING) +GPIO.add_event_callback(channel, my_callback_one) +GPIO.add_event_callback(channel, my_callback_two) + +# add rising edge detection on a channel, ignoring further edges for 200ms for switch bounce handling +GPIO.add_event_detect(channel, GPIO.RISING, callback=my_callback, bouncetime=200) + +# This example is on the linked webpage but it is wrong, I checked the RPi.GPIO source code +# GPIO.add_event_callback(channel, my_callback, bouncetime=200) + +GPIO.remove_event_detect(channel) diff --git a/examples/morse.py b/examples/morse.py index 384fcd3..5b425aa 100755 --- a/examples/morse.py +++ b/examples/morse.py @@ -1,121 +1,152 @@ #!/bin/env python3 -#import wrap as GPIO -#import sys -#from time import sleep -## Python program to implement Morse Code Translator - -# NOTE: this is broken - -#''' -#VARIABLE KEY -#'cipher' -> 'stores the morse translated form of the english string' -#'decipher' -> 'stores the english translated form of the morse string' -#'citext' -> 'stores morse code of a single character' -#'i' -> 'keeps count of the spaces between morse characters' -#'message' -> 'stores the string to be encoded or decoded' -#''' - -#pin=GPIO.Pin(25,GPIO.OUT) -#pin.value = 0 -## Dictionary representing the morse code chart -#MORSE_CODE_DICT = { -# 'A':'.-', 'B':'-...', -# 'C':'-.-.', 'D':'-..', 'E':'.', -# 'F':'..-.', 'G':'--.', 'H':'....', -# 'I':'..', 'J':'.---', 'K':'-.-', -# 'L':'.-..', 'M':'--', 'N':'-.', -# 'O':'---', 'P':'.--.', 'Q':'--.-', -# 'R':'.-.', 'S':'...', 'T':'-', -# 'U':'..-', 'V':'...-', 'W':'.--', -# 'X':'-..-', 'Y':'-.--', 'Z':'--..', -# '1':'.----', '2':'..---', '3':'...--', -# '4':'....-', '5':'.....', '6':'-....', -# '7':'--...', '8':'---..', '9':'----.', -# '0':'-----', ', ':'--..--', '.':'.-.-.-', -# '?':'..--..', '/':'-..-.', '-':'-....-', -# '(':'-.--.', ')':'-.--.-' -# } - -## Function to encrypt the string -## according to the morse code chart -#def encrypt(message): -# cipher = '' -# for letter in message: -# if letter != ' ': - -# # Looks up the dictionary and adds the -# # correspponding morse code -# # along with a space to separate -# # morse codes for different characters -# cipher += MORSE_CODE_DICT[letter] + ' ' -# else: -# # 1 space indicates different characters -# # and 2 indicates different words -# cipher += ' ' - -# return cipher - -## Function to decrypt the string -## from morse to english -#def decrypt(message): - -# # extra space added at the end to access the -# # last morse code -# message += ' ' - -# decipher = '' -# citext = '' -# for letter in message: - -# # checks for space -# if (letter != ' '): -# # counter to keep track of space -# i = 0 -# # storing morse code of a single character -# citext += letter -# # in case of space -# else: -# # if i = 1 that indicates a new character -# i += 1 -# # if i = 2 that indicates a new word -# if i == 2: -# # adding space to separate words -# decipher += ' ' -# else: -# # accessing the keys using their values (reverse of encryption) -# decipher += list(MORSE_CODE_DICT.keys())[list(MORSE_CODE_DICT -# .values()).index(citext)] -# citext = '' - -# return decipher - -## Hard-coded driver function to run the program -#def long(): -# pin.toggle() -# sleep(.4) -# pin.toggle() - -#def short(): -# pin.toggle() -# sleep(.2) -# pin.toggle() -#def main(): -# msg = sys.argv[1] -# result = encrypt(msg.upper()) -# for i in result: -# if i == "-": -# long() -# elif i == "." -# short() -# else: -# sleep(.5) -# sleep(.1) -# #print (result) - -# #message = "--. . . -.- ... -....- ..-. --- .-. -....- --. . . -.- ... " -# #result = decrypt(message) -# #print (result) - -## Executes the main function -#if __name__ == '__main__': -# main() +import RPi.GPIO as GPIO +import sys +from time import sleep + +# Python program to implement Morse Code Translator +# Modifed to interface with the RPi.GPIO API +# source: https://www.geeksforgeeks.org/morse-code-translator-python/ + +channel = 12 + +''' +VARIABLE KEY +'cipher' -> 'stores the morse translated form of the english string' +'decipher' -> 'stores the english translated form of the morse string' +'citext' -> 'stores morse code of a single character' +'i' -> 'keeps count of the spaces between morse characters' +'message' -> 'stores the string to be encoded or decoded' +''' + +# Dictionary representing the morse code chart +MORSE_CODE_DICT = { + 'A': '.-', + 'B': '-...', + 'C': '-.-.', + 'D': '-..', + 'E': '.', + 'F': '..-.', + 'G': '--.', + 'H': '....', + 'I': '..', + 'J': '.---', + 'K': '-.-', + 'L': '.-..', + 'M': '--', + 'N': '-.', + 'O': '---', + 'P': '.--.', + 'Q': '--.-', + 'R': '.-.', + 'S': '...', + 'T': '-', + 'U': '..-', + 'V': '...-', + 'W': '.--', + 'X': '-..-', + 'Y': '-.--', + 'Z': '--..', + '1': '.----', + '2': '..---', + '3': '...--', + '4': '....-', + '5': '.....', + '6': '-....', + '7': '--...', + '8': '---..', + '9': '----.', + '0': '-----', + ',': '--..--', + '.': '.-.-.-', + '?': '..--..', + '/': '-..-.', + '-': '-....-', + '(': '-.--.', + ')': '-.--.-', +} + + +# Function to encrypt the string according to the morse code chart +def encrypt(message): + cipher = '' + for letter in message: + if letter != ' ': + + # Looks up the dictionary and adds the correspponding morse code + # along with a space to separate morse codes for different characters + cipher += MORSE_CODE_DICT[letter] + ' ' + else: + # 1 space indicates different character and 2 indicates different words + cipher += ' ' + + return cipher + + +# Function to decrypt the string from morse to english +def decrypt(message): + + # extra space added at the end to access the last morse code + message += ' ' + + decipher = '' + citext = '' + for letter in message: + + # checks for space + if (letter != ' '): + # counter to keep track of space + i = 0 + # storing morse code of a single character + citext += letter + # in case of space + else: + # if i = 1 that indicates a new character + i += 1 + # if i = 2 that indicates a new word + if i == 2: + # adding space to separate words + decipher += ' ' + else: + # accessing the keys using their values (reverse of encryption) + decipher += list(MORSE_CODE_DICT.keys())[list(MORSE_CODE_DICT.values()).index(citext)] + citext = '' + + return decipher + + +def long_pulse(): + GPIO.output(channel, GPIO.HIGH) + sleep(.4) + GPIO.output(channel, GPIO.LOW) + + +def short_pulse(): + GPIO.output(channel, GPIO.HIGH) + sleep(.2) + GPIO.output(channel, GPIO.LOW) + + +def main(): + GPIO.setmode(GPIO.BOARD) + GPIO.setup(channel, GPIO.OUT) + + if len(sys.argv) > 1: + msg = sys.argv[1] + else: + msg = "hello world" + result = encrypt(msg.upper()) + + for i in result: + if i == "-": + long_pulse() + elif i == ".": + short_pulse() + else: + sleep(.5) + sleep(.1) + GPIO.cleanup() + + +# Executes the main function +if __name__ == '__main__': + main() diff --git a/examples/output_examples.py b/examples/output_examples.py new file mode 100755 index 0000000..e1d83c3 --- /dev/null +++ b/examples/output_examples.py @@ -0,0 +1,46 @@ +#!/bin/python3 +# Output and gpio_function examples from Ben Croston +# source: https://sourceforge.net/p/raspberry-gpio-python/wiki/Outputs/ +# source: https://sourceforge.net/p/raspberry-gpio-python/wiki/Checking%20function%20of%20GPIO%20channels/ + +import RPi.GPIO as GPIO + +GPIO.setmode(GPIO.BOARD) +GPIO.setup((11, 12), GPIO.OUT) + +GPIO.output(12, GPIO.HIGH) +# or +GPIO.output(12, 1) +# or +GPIO.output(12, True) + +GPIO.output(12, GPIO.LOW) +# or +GPIO.output(12, 0) +# or +GPIO.output(12, False) + + +chan_list = (11, 12) +GPIO.output(chan_list, GPIO.LOW) # all LOW +GPIO.output(chan_list, (GPIO.HIGH, GPIO.LOW)) # first LOW, second HIGH + +GPIO.setup(11, GPIO.IN) +GPIO.output(12, not GPIO.input(11)) + +GPIO.cleanup(12) + +func = GPIO.gpio_function(12) +print("is channel 12 GPIO.UNKNOWN?", func == GPIO.UNKNOWN) + +print("is channel 12 GPIO.OUT?:", func == GPIO.OUT) + +print("setting up channel 12 as output...") +GPIO.setup(12, GPIO.OUT) + +func = GPIO.gpio_function(12) +print("is channel 12 GPIO.UNKNOWN?", func == GPIO.UNKNOWN) + +print("is channel 12 GPIO.OUT?:", func == GPIO.OUT) + +GPIO.cleanup() diff --git a/examples/pwm.py b/examples/pwm.py deleted file mode 100755 index 060c556..0000000 --- a/examples/pwm.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/python3 -import RPi.GPIO as GPIO -import RPi.GPIO_DEVEL as GPIO_DEVEL -import time - -GPIO.setmode(GPIO.BCM) -# GPIO_DEVEL.setdebuginfo(True) -GPIO.setup(18, GPIO.OUT) -pwm = GPIO.PWM(18, 0.5) -pwm.start(50) - -time.sleep(10) - -for i in range(20): - pwm.ChangeFrequency(2 ** i) - print("set frequency to", 2 ** i) - time.sleep(3) - -pwm.stop() diff --git a/examples/pwm1.py b/examples/pwm1.py new file mode 100755 index 0000000..27b480b --- /dev/null +++ b/examples/pwm1.py @@ -0,0 +1,20 @@ +#!/bin/python3 +import RPi.GPIO as GPIO +import time + +GPIO.setmode(GPIO.BCM) +GPIO.setup(18, GPIO.OUT) +pwm = GPIO.PWM(18, 0.5) +pwm.start(50) + +time.sleep(10) + +try: + for i in range(20): + pwm.ChangeFrequency(2 ** i) + print("set frequency to", 2 ** i) + time.sleep(3) +except KeyboardInterrupt: + pass + +pwm.stop() diff --git a/examples/pwm2.py b/examples/pwm2.py new file mode 100755 index 0000000..b870289 --- /dev/null +++ b/examples/pwm2.py @@ -0,0 +1,25 @@ +#!/bin/python3 +# PWM demo from Ben Croston +# source: https://sourceforge.net/p/raspberry-gpio-python/wiki/PWM/ + +# It works out of the box with no changes! + +import time +import RPi.GPIO as GPIO +GPIO.setmode(GPIO.BOARD) +GPIO.setup(12, GPIO.OUT) + +p = GPIO.PWM(12, 50) # channel=12 frequency=50Hz +p.start(0) +try: + while 1: + for dc in range(0, 101, 5): + p.ChangeDutyCycle(dc) + time.sleep(0.1) + for dc in range(100, -1, -5): + p.ChangeDutyCycle(dc) + time.sleep(0.1) +except KeyboardInterrupt: + pass +p.stop() +GPIO.cleanup() diff --git a/examples/pwm3.py b/examples/pwm3.py new file mode 100755 index 0000000..a72e52f --- /dev/null +++ b/examples/pwm3.py @@ -0,0 +1,42 @@ +#!/bin/python3 +# Another example of PwM usage using the RPi.GPIO API +# By "alex" +# source: https://raspi.tv/2013/rpi-gpio-0-5-2a-now-has-software-pwm-how-to-use-it + +# Don't try to run this as a script or it will all be over very quickly +# it won't do any harm though. +# these are all the elements you need to control PWM on 'normal' GPIO ports +# with RPi.GPIO - requires RPi.GPIO 0.5.2a or higher + +# always needed with RPi.GPIO +import RPi.GPIO as GPIO + + +# choose BCM or BOARD numbering schemes. I use BCM +GPIO.setmode(GPIO.BCM) + +# set GPIO 25 as an output. You can use any GPIO port +GPIO.setup(25, GPIO.OUT) + +# create an object p for PWM on port 25 at 50 Hertz +# you can have more than one of these, but they need +# different names for each port +# e.g. p1, p2, motor, servo1 etc. +p = GPIO.PWM(25, 50) + +# start the PWM on 50 percent duty cycle +# duty cycle value can be 0.0 to 100.0%, floats are OK +p.start(50) + +# change the duty cycle to 90% +p.ChangeDutyCycle(90) + +# change the frequency to 100 Hz (floats also work) +# e.g. 100.5, 5.2 +p.ChangeFrequency(100) + +# stop the PWM output +p.stop() + +# when your program exits, tidy up after yourself +GPIO.cleanup() diff --git a/examples/pwm4.py b/examples/pwm4.py new file mode 100755 index 0000000..8667a50 --- /dev/null +++ b/examples/pwm4.py @@ -0,0 +1,42 @@ +#!/bin/python3 +# A random PwM script pulled from a comment section +# by "Nick" and improved by "AndrewS" (@lurch on GitHub) +# source: https://raspi.tv/2013/rpi-gpio-0-5-2a-now-has-software-pwm-how-to-use-it#comment-29887 +# source: https://raspi.tv/2013/rpi-gpio-0-5-2a-now-has-software-pwm-how-to-use-it#comment-30520 + +import RPi.GPIO as GPIO +import time + +# PWM frequency +HZ = 100 +FADESPEED = 0.002 + +GPIO.setmode(GPIO.BCM) + +gpioPinsList = [["r1", 18], ["g1", 23], ["b1", 25], ["r2", 17], ["g2", 22], ["b2", 24]] +gpioPinsObjs = [] + +# setup GPIO pins as outputs and create PWM objects for each +for i in range(len(gpioPinsList)): + GPIO.setup(gpioPinsList[i][1], GPIO.OUT) + gpioPinsObjs.append(GPIO.PWM(gpioPinsList[i][1], HZ)) + +try: + for pinObj in gpioPinsObjs: + pinObj.start(100) + time.sleep(FADESPEED) + while True: + #fade in + for i in range(101): + for pinObj in gpioPinsObjs: + pinObj.ChangeDutyCycle(0 + i) + time.sleep(FADESPEED) + + #fade out + for i in range(101): + for pinObj in gpioPinsObjs: + pinObj.ChangeDutyCycle(100 - i) + time.sleep(FADESPEED) +except KeyboardInterrupt: + GPIO.cleanup() + pass diff --git a/examples/pwm_usage.py b/examples/pwm_usage.py new file mode 100755 index 0000000..8a1f257 --- /dev/null +++ b/examples/pwm_usage.py @@ -0,0 +1,31 @@ +#!/bin/python3 +# PWM examples from Ben Croston +# source: https://sourceforge.net/p/raspberry-gpio-python/wiki/PWM/ +import RPi.GPIO as GPIO + +GPIO.setmode(GPIO.BOARD) +channel = 11 +frequency = 10 +freq = 100 +dc = 20 + +GPIO.setup(11, GPIO.OUT) + +p = GPIO.PWM(channel, frequency) + +p.start(dc) # where dc is the duty cycle (0.0 <= dc <= 100.0) + +p.ChangeFrequency(freq) # where freq is the new frequency in Hz + +dc = 30 +p.ChangeDutyCycle(dc) # where 0.0 <= dc <= 100.0 + +p.stop() + +GPIO.setup(12, GPIO.OUT) + +p = GPIO.PWM(12, 0.5) +p.start(1) +input('Press return to stop:') # use raw_input for Python 2 +p.stop() +GPIO.cleanup() diff --git a/examples/random_usage.py b/examples/random_usage.py new file mode 100755 index 0000000..a1a2ca3 --- /dev/null +++ b/examples/random_usage.py @@ -0,0 +1,53 @@ +#!/bin/python3 +# Random examples from Ben Croston +# source: https://sourceforge.net/p/raspberry-gpio-python/wiki/BasicUsage/ +import RPi.GPIO as GPIO +import RPi.GPIO_DEVEL as GPIO_DEVEL + +GPIO.setmode(GPIO.BOARD) + +GPIO_DEVEL.Reset() + +GPIO.setmode(GPIO.BCM) + +mode = GPIO.getmode() + +print("Is mode BCM:", mode == GPIO.BCM) + +GPIO.setwarnings(False) + +channel = 18 +GPIO.setup(channel, GPIO.IN) +GPIO.setup(channel, GPIO.OUT) +GPIO.setup(channel, GPIO.OUT, initial=GPIO.HIGH) + +# add as many channels as you want! +# you can tuples instead i.e.: +# chan_list = (11,12) +chan_list = [11, 12] +GPIO.setup(chan_list, GPIO.OUT) + +GPIO.setup(18, GPIO.OUT) + +GPIO.input(channel) + +chan_list = [11, 12] # also works with tuples +GPIO.output(chan_list, GPIO.LOW) # sets all to GPIO.LOW +GPIO.output(chan_list, (GPIO.HIGH, GPIO.LOW)) # sets first HIGH and second LOW + +GPIO.cleanup() + +channel = 18 +channel1 = 19 +channel2 = 21 + +GPIO.cleanup(channel) +GPIO.cleanup((channel1, channel2)) +GPIO.cleanup([channel1, channel2]) + +print("Some useful information:") +print("RPi.GPIO_INFO:", GPIO.RPI_INFO) +print("RPi.GPIO_INFO['P1_REVISION]':", GPIO.RPI_INFO['P1_REVISION']) +print("GPIO.RPI_REVISION", GPIO.RPI_REVISION) # (deprecated) + +print("RPi.GPIO.VERSION:", GPIO.VERSION) diff --git a/examples/try_import.py b/examples/try_import.py new file mode 100755 index 0000000..5894f8c --- /dev/null +++ b/examples/try_import.py @@ -0,0 +1,17 @@ +#!/bin/python3 +# An example of how one may import this library +# by Ben Croston +# Source: https://sourceforge.net/p/raspberry-gpio-python/wiki/BasicUsage/ + +try: + import RPi.GPIO as GPIO +except RuntimeError: + print("Error importing RPi.GPIO! \ + This is probably because you need superuser privileges. \ + You can achieve this by using 'sudo' to run your script") + + +# Note that one may use this library as a non-superuser +# by setting the Linux file group to a group such as gpi +# and adding a user to that group with a command +# like: `sudo usermod -aG gpio ` diff --git a/non_root_permission.sh b/non_root_permission.sh index 1c5d6fb..d34ba49 100755 --- a/non_root_permission.sh +++ b/non_root_permission.sh @@ -3,6 +3,10 @@ # Inspiration: https://forum.armbian.com/topic/8714-gpio-not-working-for-non-root/?do=findComment&comment=86295 # Run this script to enable all users in Linux user group gpio to access the gpio pins +# NOTE: the changes made by this script do _not_ persist after reboot. +# Until we provide an automated solution, we suggest that one +# place this snippet of code in a script that runs at system startup. + if [ "$UID" != "0" ] then echo "Script must be run as root! quitting..." diff --git a/requirements.txt b/requirements.txt index dce821f..c448acf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,20 @@ -pytest==5.2.2 -pytest-cov==2.8.1 -flake8==3.7.9 +bcrypt==3.1.6 +cffi==1.12.3 +coverage==4.5.4 +cryptography==2.6.1 +dbus-python==1.2.8 +gpg==1.13.1 +libcomps==0.1.15 +psutil==5.6.7 +pwquality==1.4.2 +pycairo==1.18.1 +PyGObject==3.34.0 +PyNaCl==1.3.0 +pyparted==3.11.2 +pyrsistent==0.16.0 +PyYAML==5.3.1 +-e git+https://github.com/underground-software/python3-libgpiod-rpi@00e7588bab1489cd81c1ee62349d4d4122d96e3d#egg=RPi.GPIO +rpm==4.15.1 +selinux==2.9 +setools==4.2.2 +systemd-python==234 diff --git a/spec/spec.pdf b/spec/spec.pdf index 46424d7..51cb122 100644 Binary files a/spec/spec.pdf and b/spec/spec.pdf differ diff --git a/spec/spec.tex b/spec/spec.tex index 7c1e416..97affa5 100644 --- a/spec/spec.tex +++ b/spec/spec.tex @@ -26,7 +26,7 @@ \subsection{Scope} \subsection{Overview} -We begin with a a discussion of the problem and our proposed solution. Then, we define first the high-level functional requirements for version 1.0 and then what each API function should do and not do. Finally, we give a high-level description of the data structures and algorithms used to implement the system. +We begin with a discussion of the problem and our proposed solution. Then, we define first the high-level functional requirements for version 1.0 and then what each API function should do and not do. Finally, we give a high-level description of the data structures and algorithms used to implement the system. \subsection{Definitions and Acronyms} @@ -175,10 +175,32 @@ \subsection{Core \texttt{RPi.GPIO} API} \label{coreapi} \item ValueError --- Invalid channel, edge is not one of RISING, FALLING, or BOTH, or a negative bouncetime is specified \end{itemize} +\noindent \texttt{RPi.GPIO}.\textbf{channel\_valid\_or\_die}(channel) + +Validate \texttt{channel} using the current numbering mode. + +If the channel is valid, the function returns with no effect. +If the channel is invalid, a \texttt{ValueError} exception is raised. + +See getmode()/setmode() in section \ref{coreapi} for more information about numbering modes. + +\textit{parameters}: +\begin{enumerate} + \item channel --- GPIO channel +\end{enumerate} + +\textit{exceptions}: +\begin{itemize} + \item ValueError --- Invalid channel +\end{itemize} + \noindent \texttt{RPi.GPIO}.\textbf{cleanup}([channel]) Cleans up library state. Resets all GPIO lines that have been used by this program to INPUT with neither PUD\_UP or PUD\_DOWN set nor event detection enabled on the line. When called with no arguments, this function targets every channel, otherwise, it targets only the channels specified. Subsequent to the actions of cleanup() on a channel, one must pass it to setup() to use it again. +This function is automatically invoked with no parameters +upon terminaton of a python interpreter using this module. + \textit{parameters}: \begin{enumerate} \item channel (optional) --- individual GPIO channel or list/tuple of GPIO channels @@ -252,6 +274,7 @@ \subsection{Core \texttt{RPi.GPIO} API} \label{coreapi} \noindent \texttt{RPi.GPIO}.\textbf{output}(channel) Set the value of an individual GPIO channel or a list/tuple of GPIO channels with an individual value or a list/tuple of values respectively. +One may specify multiple channels and a single value for each channel to be set to the same value. \textit{parameters}: \begin{enumerate} @@ -348,6 +371,8 @@ \subsection{Core \texttt{RPi.GPIO} API} \label{coreapi} Then, the 0-ary callable objects stored in the list \texttt{\_State.lines[channel].callbacks} will be serially invoked. If one subsequently invokes \texttt{event\_dected(channel)}, the event detected by this function will \textit{not} be reported. +Returns \texttt{channel} if any event is detected, otherwise returns None. + \textit{parameters}: \begin{enumerate} \item channel --- GPIO channel @@ -370,6 +395,8 @@ \subsection{Core \texttt{RPi.GPIO} API} \label{coreapi} % TODO: return to this issue % % Also, should this spec include OSError exceptions we introduce? + % \item RuntimeError --- edge detection is already configured on this channel + % Yeah you're allowed to do that ^^ with this library as well, unlike the original RPi.GPIO \end{itemize} \subsection{Class \texttt{RPi.GPIO.PWM}} \label{pwm} @@ -682,7 +709,8 @@ \subsection{Data Design: Class \_Line} The GPIO channel corresponding to this line. -\textit{Default value:} An integer passed to \texttt{\_\_init\_\_} via \texttt{Reset()}. See \texttt{\_State.lines} in section \ref{class_state}. +\textit{Default value:} An integer passed to \texttt{\_\_init\_\_} via \texttt{Reset()}. +See \texttt{\_State.lines} in section \ref{class_state}. \noindent \_Line.\textbf{line} @@ -752,9 +780,8 @@ \subsection{The \texttt{channel\_} and \texttt{chip\_} interfaces} \noindent channel\_\textbf{fix\_and\_validate}(channel) -\noindent channel\_\textbf{valid\_or\_die}(channel) - -Validate \texttt{channel} using the current numbering mode. See getmode()/setmode() in section \ref{coreapi} for more information about numbering modes. The first name is used internally while the latter name is currently not called within the library but may be specified in a future version of the core API. Exposure of this functionality is undefined in version 1.0 +Validate \texttt{channel} using the current numbering mode. +See getmode()/setmode() in section \ref{coreapi} for more information about numbering modes. \textit{parameters}: \begin{enumerate} @@ -792,6 +819,11 @@ \subsection{The \texttt{channel\_} and \texttt{chip\_} interfaces} \item PermissionError --- Unable to open file ``/dev/gpiochip0'', perhaps because script was not run as root. \end{itemize} +\noindent chip\_\textbf{is\_open}() + +Returns the truth value of whether the chip has been initialized. +True if yes, False if no. + \subsection{The \texttt{line\_} interface} This section defines the internal interface functions used to implement operations at the \texttt{gpiod.Line} level. External exposure of these functions is undefined behavior. @@ -824,7 +856,7 @@ \subsection{The \texttt{line\_} interface} \noindent line\_\textbf{event\_wait\_lock}(channel, bouncetime, track) [first: LOCK REQUIRED, second: LOCKS channel] -Wait for an edge event of type previously specified in edge event detection setup. Calls \texttt{gpiod.Line.event\_wait()}. Returns the event returned by \texttt{gpiod.Line.event\_read()} if one occurred and returns None otherwise. +Wait for an edge event of type previously specified in edge event detection setup. Calls \texttt{gpiod.Line.event\_wait()}. Returns the channel number if an event occurred and returns None otherwise. \textit{parameters}: \begin{enumerate} diff --git a/test-style.sh b/test-style.sh index e33d271..11bb782 100755 --- a/test-style.sh +++ b/test-style.sh @@ -21,7 +21,7 @@ scan() { fi } -scan RPi/_GPIO.py +scan RPi/core.py scan tests/test_gpio.py scan tests/test_pwm.py @@ -30,7 +30,7 @@ scan RPi/GPIO_DEVEL/__init__.py for f in $(ls examples) do - if [ "$f" != "__pycache__" ] && [ "$f" != "morse.py" ] + if [ "$f" != "__pycache__" ] then scan examples/"$f" fi diff --git a/tests/test_gpio.py b/tests/test_gpio.py index 928aef9..67a19fa 100644 --- a/tests/test_gpio.py +++ b/tests/test_gpio.py @@ -76,10 +76,10 @@ def test_validate_pin_or_die(): GPIO_DEVEL.Reset() GPIO.setmode(GPIO.BOARD) with pytest.raises(ValueError): - channel = GPIO.channel_valid_or_die(-666) # NOQA + channel = GPIO.channel_valid_or_die(-666) # NOQA with pytest.raises(ValueError): - channel = GPIO.channel_valid_or_die(1) # NOQA + channel = GPIO.channel_valid_or_die(41) # NOQA def test_setmode(): @@ -383,12 +383,22 @@ def test_gpio_function(): GPIO_DEVEL.Reset() GPIO.setmode(GPIO.BCM) - assert GPIO.gpio_function(16) == 16 + assert GPIO.gpio_function(18) == GPIO.UNKNOWN + + GPIO.setup(18, GPIO.OUT) + assert GPIO.gpio_function(18) == GPIO.OUT + GPIO.setup(18, GPIO.IN) + assert GPIO.gpio_function(18) == GPIO.IN GPIO_DEVEL.Reset() GPIO.setmode(GPIO.BOARD) - assert GPIO.gpio_function(16) == 11 + assert GPIO.gpio_function(12) == GPIO.UNKNOWN + + GPIO.setup(12, GPIO.OUT) + assert GPIO.gpio_function(12) == GPIO.OUT + GPIO.setup(12, GPIO.IN) + assert GPIO.gpio_function(12) == GPIO.IN def test_setdebuginfo(): @@ -401,6 +411,8 @@ def test_setdebuginfo(): assert GPIO_DEVEL.State_Access().debuginfo is True + GPIO_DEVEL.setdebuginfo(False) + def test_bias(): GPIO_DEVEL.Reset() @@ -524,4 +536,16 @@ def test_cleanup(): GPIO_DEVEL.Reset() GPIO.setmode(GPIO.BCM) GPIO.setup(21, GPIO.OUT) + + with pytest.raises(ValueError) as e: + GPIO.cleanup(["foo"]) + assert "Channel must be an integer or list/tuple of integers" in str(e.value) + + with pytest.raises(ValueError) as e: + GPIO.cleanup(-1) + assert "is invalid" in str(e.value) + + GPIO.cleanup(21) + GPIO.cleanup((18, 21)) + GPIO_DEVEL.Reset() diff --git a/tests/test_pwm.py b/tests/test_pwm.py index 8f4ae71..1ef72a0 100644 --- a/tests/test_pwm.py +++ b/tests/test_pwm.py @@ -33,13 +33,28 @@ def test_init(): def test_start_stop(): GPIO_DEVEL.Reset() - GPIO_DEVEL.setdebuginfo(True) GPIO.setmode(GPIO.BCM) GPIO.setup(18, GPIO.OUT) foo = GPIO.PWM(18, 100) - foo.start(50) + with pytest.raises(ValueError) as e: + foo.start(-1) + assert "dutycycle must have a value from 0.0 to 100.0" in str(e.value) + + with pytest.raises(ValueError) as e: + foo.start(101) + assert "dutycycle must have a value from 0.0 to 100.0" in str(e.value) + + assert foo.start(50) + + # Can't run start twice but it won't raise an exception + with pytest.warns(Warning): + assert foo.start(51) is False + + time.sleep(.2) + foo.stop() + time.sleep(.2) foo.stop() @@ -47,7 +62,6 @@ def test_start_stop(): def test_change_attributes(): GPIO_DEVEL.Reset() - GPIO_DEVEL.setdebuginfo(True) GPIO.setmode(GPIO.BCM) GPIO.setup(18, GPIO.OUT)