Source code for autopilot.hardware.gpio

"""
Hardware that uses the GPIO pins of the Raspi. These classes rely on `pigpio
<http://abyz.me.uk/rpi/pigpio/>`_, whose daemon (pigpiod) must be running in the background --
typically this is handled with a launch script/system daemon (see the launch_pilot.sh script generated by setup_autopilot.py)

Autopilot uses a custom version of pigpio (`<https://github.com/sneakers-the-rat/pigpio>`_) that
returns isoformatted timestamps rather than tick numbers in callbacks. See the ``setup_pilot.sh`` script.

Note:
    Autopilot uses the "Board" rather than "Broadcom" numbering system, see :ref:`the numbering note. <numbering-note>`
    :class:`.GPIO` objects convert internally between board and bcm numbers using :attr:`.GPIO.pin` ,
    :attr:`.GPIO.pin_bcm` , :data:`.BOARD_TO_BCM` , and :data:`.BCM_TO_BOARD` .


Note:
    This module does not include hardware that uses the GPIO pins over a specific protocol like i2c
"""
import os
import sys
import threading
import time
import numpy as np
from datetime import datetime
import itertools
import typing
import warnings
from collections import deque as dq

from autopilot import prefs
from autopilot.hardware import Hardware, BOARD_TO_BCM
from autopilot import external

ENABLED = False
"""
False if pigpio cannot be imported -- and GPIO devices cannot be used.

True if pigpio can be imported
"""
try:
    import pigpio


    TRIGGER_MAP = {
        'U': pigpio.RISING_EDGE,
        1: pigpio.RISING_EDGE,
        True: pigpio.RISING_EDGE,
        'D': pigpio.FALLING_EDGE,
        0: pigpio.FALLING_EDGE,
        False: pigpio.FALLING_EDGE,
        'B': pigpio.EITHER_EDGE,
        (0,1): pigpio.EITHER_EDGE
    }
    """
    Maps user input descriptions of triggers to the corresponding pigpio object.
    """

    INVERSE_TRIGGER_MAP = {
        pigpio.RISING_EDGE: 'U',
        pigpio.FALLING_EDGE: 'D',
        pigpio.EITHER_EDGE: 'B'
    }
    """
    Inverse of :data:`.TRIGGER_MAP`. Used to assign canonical references to triggers -- 
    ie. it is possible to take multiple params (1, True, 'U') -> pigpio trigger objects,
    but there is one preferred way to refer to a pigpio object.
    """

    PULL_MAP = {
        1: pigpio.PUD_UP,
        True: pigpio.PUD_UP,
        'U': pigpio.PUD_UP,
        0: pigpio.PUD_DOWN,
        False: pigpio.PUD_DOWN,
        'D': pigpio.PUD_DOWN,
        None: pigpio.PUD_OFF
    }
    """
    Maps user input descriptions of internal resistor pullups/downs to the corresponding
    pigpio object.
    """

    INVERSE_PULL_MAP = {
        pigpio.PUD_UP: 'U',
        pigpio.PUD_DOWN: 'D',
        pigpio.PUD_OFF: None
    }
    """
    Inverse of :data:`.PULL_MAP`, mapping pigpio objects for internal resistor pullups/downs to 
    their canonical form ('U', 'D', None for pullup, pulldown, or no pull)
    """

    ENABLED = True

except ImportError:
    if prefs.get('AGENT') == "PILOT":
        warnings.warn("pigpio could not be imported, gpio not enabled", ImportWarning)


[docs]def clear_scripts(max_scripts=256): """ Stop and delete all scripts running on the pigpio client. To be called, eg. between tasks to ensure none are left hanging by badly behaved GPIO devices Args: max_scripts (int): maximum number of scripts allowed by pigpio. Set in ``pigpio.c`` and not exported to the python module, so have to hardcode it again here, default for pigpio fork is 256 """ if globals()['ENABLED'] == False: warnings.warn('pigpio was not imported, so scripts cannot be cleared') return pig = pigpio.pi() for i in range(max_scripts): try: pig.stop_script(i) except pigpio.error as e: if 'unknown script id' in str(e): continue try: pig.delete_script(i) except Exception as e: warnings.warn(f'Error deleting script {i}, got exception: \n{e}')
[docs]class GPIO(Hardware): """ Metaclass for hardware that uses GPIO. Should not be instantiated on its own. Handles initializing pigpio and wraps some of its commonly used methods Args: pin (int): The `Board-numbered <https://raspberrypi.stackexchange.com/a/12967>`_ GPIO pin of this object. polarity (int): Logic direction. if 1: on=High=1, off=Low=0; if 0: off=Low=0, on=High=1 pull (str, int): state of pullup/down resistor. Can be set as 'U'/'D' or 1/0 to pull up/down. See :data:`.PULL_MAP` trigger (str, int, bool): whether callbacks are triggered on rising ('U', 1, True), falling ('D', 0, False), or both edges ('B', (0,1)) kwargs: passed to the :class:`.Hardware` superclass. Attributes: pig (:class:`pigpio.pi`): An object that manages connection to the pigpio daemon. See docs at http://abyz.me.uk/rpi/pigpio/python.html CONNECTED (bool): Whether the connection to pigpio was successful pigpiod: Reference to the pigpiod process launched by :func:`.external.start_pigpiod` pin (int): The `Board-numbered <https://raspberrypi.stackexchange.com/a/12967>`_ GPIO pin of this object. pin_bcm (int): The BCM number of the connected pin -- used by pigpio. Converted from pin passed as argument on initialization, which is assumed to be the board number. pull (str, int): state of pullup/down resistor. Can be set as 'U'/'D' or 1/0 to pull up/down polarity (int): Logic direction. if 1: on=High=1, off=Low=0; if 0: off=Low=0, on=High=1 on (int): if polarity == 1, high/1. if polarity == 0, low/0 off (int): if polarity == 1, low/0. if polarity == 0, high/1 trigger (str, int, bool): whether callbacks are triggered on rising ('U', 1, True), falling ('D', 0, False), or both edges ('B', (0,1)) trigger_edge: The pigpio object representing RISING_EDGE, FALLING_EDGE, BOTH_EDGES. Set by :attr`.trigger` """ def __init__(self, pin=None, polarity=1, pull = None, trigger = None, **kwargs): super(GPIO, self).__init__(**kwargs) if not ENABLED: raise RuntimeError('Couldnt import pigpio, so GPIO objects cant be used') # initialize attributes self._polarity = None self._pull = None self._pin = None self._trigger = None self.trigger_edge = None try: self.pin_bcm = None except AttributeError: # if a subclass has made this a property, don't fail here self.logger.warning('pin_bcm is defined as a property without a setter so cant be set') self.pig = None # type: typing.Optional[pigpio.pi] self.pigpiod = None # init pigpio self.CONNECTED = False self.CONNECTED = self.init_pigpio() # set default attributes self.pin = pin self.polarity = polarity self.pull = pull self.trigger = trigger if not self.CONNECTED: RuntimeError('No connection could be made to the pigpio daemon')
[docs] def init_pigpio(self) -> bool: """ Create a socket connection to the pigpio daemon and set as :attr:`GPIO.pig` Returns: bool: True if connection was successful, False otherwise """ self.pigpiod = external.start_pigpiod() self.pig = pigpio.pi() if self.pig.connected: return True else: return False
@property def pin(self): """ `Board-numbered <https://raspberrypi.stackexchange.com/a/12967>`_ GPIO pin. When assigned, also updates :attr:`.pin_bcm` with the BCM-numbered pin. """ return self._pin @pin.setter def pin(self, pin): if pin is None: return assert(int(pin)) self._pin = int(pin) self.pin_bcm = BOARD_TO_BCM[self._pin] @property def state(self) -> bool: """ Instantaneous state of GPIO pin, on (``True``) or off (``False``) Returns: bool """ return bool(self.pig.read(self.pin_bcm)) @property def pull(self): """ State of internal pullup/down resistor. See :data:`.PULL_MAP` for possible values. Returns: int: 'U'/'D'/None for pulled up, down or not set. """ return self._pull @pull.setter def pull(self, direction): try: pull = PULL_MAP[direction] self._pull = INVERSE_PULL_MAP[pull] self._pull = direction self.pig.set_pull_up_down(self.pin_bcm, pull) except KeyError: ValueError("pull must be one of 0/'D'/False for pulldown, 1/'U'/True for pullup, or None to clear") @property def polarity(self): """ Logic direction. if 1: on=High=1, off=Low=0; if 0: off=Low=0, on=High=1. When set, updates :attr:`~.GPIO.on` and :attr:`~.GPIO.off` accordingly """ return self._polarity @polarity.setter def polarity(self, polarity): if polarity == 1: self.on = 1 self.off = 0 elif polarity == 0: self.on = 0 self.off = 1 else: raise ValueError('polarity must be 0 or 1') self._polarity = polarity @property def trigger(self): """ dict: Maps strings (('U',1,True), ('D',0,False), ('B',[0,1])) to pigpio edge types (RISING_EDGE, FALLING_EDGE, EITHER_EDGE), respectively. """ return self._trigger @trigger.setter def trigger(self, trigger): try: self.trigger_edge = TRIGGER_MAP[trigger] self._trigger = INVERSE_TRIGGER_MAP[self.trigger_edge] except KeyError: ValueError('trigger must be one of {}. instead got {}'.format(INVERSE_TRIGGER_MAP.values(), trigger))
[docs] def release(self): """ Release the connection to the pigpio daemon. Note: the Hardware metaclass will call this method on object deletion. """ self.logger.debug('releasing') try: self.pull = None except: pass self.pig.stop()
[docs]class Digital_Out(GPIO): """ TTL/Digital logic out through a GPIO pin. Args: pin (int): The `Board-numbered <https://raspberrypi.stackexchange.com/a/12967>`_ GPIO pin of this object pulse_width (int): Width of digital output :meth:`~.Digital_Out.pulse` (us). range: 1-100 polarity (bool): Whether 'on' is High (1, default) and pulses bring the voltage High, or vice versa (0) Attributes: scripts (dict): maps script IDs to pigpio script handles pigs_function (bytes): when using pigpio scripts, what function is used to set the value of the output? (eg. 'w' for digital out, 'gdc' for pwm, more info here: http://abyz.me.uk/rpi/pigpio/pigs.html) script_counter (:class:`itertools.count`): generate script IDs if not explicitly given to :meth:`~.Digital_Out.series`. generated IDs are of the form 'series_#' """ output = True type="DIGITAL_OUT" pigs_function = b"w" def __init__(self, pin=None, pulse_width=100, polarity=1, **kwargs): """ """ super(Digital_Out, self).__init__(pin, polarity=polarity, **kwargs) self._last_script = None self.scripts = {} self.script_handles = {} self.script_counter = itertools.count() self.pulse_width = np.clip(pulse_width, 0, 100).astype(np.int) if pulse_width > 100 or pulse_width < 0: Warning('pulse_width must be <100(ms) & >0 and has been clipped to {}'.format(self.pulse_width)) # setup pin if we are a true single-pin digital output and not a subclass if isinstance(self.pin_bcm, int): self.pig.set_mode(self.pin_bcm, pigpio.OUTPUT) self.set(self.off)
[docs] def set(self, value: bool): """ Set pin logic level. Default uses :meth:`pigpio.pi.write`, but can be overwritten by inheriting classes Stops the last running script when called. Args: value (int, bool): (1, True) to set High, (0, False) to set Low. """ self.stop_script() self.pig.write(self.pin_bcm, value)
[docs] def turn(self, direction='on'): """ Change output state using on/off parlance. logic direction varies based on :attr:`Digital_Out.polarity` Stops the last running script when called. Args: direction (str, bool): 'on', 1, or True to turn to :attr:`~.Digital_Out.on` and vice versa for :attr:`~.Digital_Out.off` """ self.stop_script() if direction in ('on', True, 1): self.set(self.on) elif direction in ('off', False, 0): self.set(self.off)
[docs] def toggle(self): """ If pin is High, set Low, and vice versa. Stops the last running script when called. """ self.stop_script() if self.pig.read(self.pin_bcm): self.set(0) else: self.set(1)
[docs] def pulse(self, duration=None): """ Send a timed :attr:`~Digital_Out.on` pulse. Args: duration (int): If None (default), uses :attr:`~.Digital_Out.duration`, otherwise duration of pulse from 1-100us. """ if not duration: self.pig.gpio_trigger(self.pin_bcm, self.pulse_width, self.on) elif duration: duration = np.clip(duration, 0, 100).astype(np.int) self.pig.gpio_trigger(self.pin_bcm, duration, self.on) else: raise RuntimeError('How did we even get here? Pulse value must be an int 1-100')
[docs] def _series_script(self, values, durations = None, unit="ms", repeat=None, finish_off = True): """ Create a pigpio script to set a pin to a series of values for a series of durations. Typically shouldn't be called by itself, is used by :meth:`~.Digital_Out.series` or :meth:`~.Digital_Out.store_series` For more information on pigpio scripts, see: `<http://abyz.me.uk/rpi/pigpio/pigs.html#Scripts>`_ Args: values (list): A list of tuples of (value, duration) or a list of values in (1,0) to set self.pin_bcm to. durations (list): If ``values`` is not a list of tuples, a list of durations. len(durations) must be either == len(values) or else len(durations) == 1, in which case the duration is repeated. unit ("ms", "us"): units of durations in milliseconds or microseconds repeat (int): If the script should be repeated, how many times? A value of 2 results in the script being run 2 times total, not 2 *additional* times (or, 3 total times) finish_off (bool): If true, the script ends by turning the pin to :attr:`~.Digital_Out.off` Returns: (str): the constructed script string """ # if single value as an int/float/etc, wrap in list. if not (isinstance(values, list) or isinstance(values, tuple)): values = [values] if durations: if isinstance(durations, float) or isinstance(durations, int): durations = [round(durations)] if len(values) == len(durations): iter_series = zip(values, durations) elif len(durations) == 1: iter_series = itertools.product(values, durations) else: raise ValueError("length of values and durations must be equal, or length of durations must be 1. got len(values)={}, len(durations)={}".format(len(values), len(durations))) else: iter_series = values.__iter__() if unit == "ms": wait_fn = "mils" elif unit == "us": wait_fn = "mics" else: raise ValueError("Unit for durations must be ms (milliseconds) or us (microseconds)") string_pieces = [b" ".join((self.pigs_function, str(self.pin_bcm).encode('utf-8'), str(val).encode('utf-8'), bytes(wait_fn, 'utf-8'), str(round(dur)).encode('utf-8'))) for val, dur in iter_series] script_str = b" ".join(string_pieces) if repeat: try: repeat = int(repeat) except: raise ValueError('Repeat must be coerceable to an integer, got {}'.format(repeat)) script_str = b" ".join(("LD v0", str(repeat-1), # "LD (load) variable 0 with number of repeats" "tag 999", # create a tag that can be returned to script_str, # do the script one time "dcr v0" # decrement v0. "jp 999") # jump to the tag at the beginning if v0 is still positive ) # turn off when finished if finish_off: script_str += b" ".join((self.pigs_function, str(self.pin_bcm).encode('utf-8'), str(self.off).encode('utf-8'))) return script_str
[docs] def store_series(self, id, **kwargs): """ Create, and store a pigpio script for a series of output values to be called by :meth:`~.Digital_Out.series` Args: id (str): shorthand key used to call this series with :meth:`.series` kwargs: passed to :meth:`~.Digital_Out._series_script` """ # actually might want to overwrite script_handles w/ same ids, like a solenoid that changes its 'open' duration # if id in self.script_handles.keys(): # Exception("Script with id {} already created!".format(id)) series_script = self._series_script(**kwargs) # check if there is an identical script already and use that instead matches = [script_id for script_id, test_script in self.scripts.items() if test_script == series_script] if len(matches)>0: script_id = self.script_handles[matches[0]] else: try: script_id = self.pig.store_script(series_script) except Exception as e: if 'illegal script command' in str(e): raise Exception(f'got pigpio exception: {e} from attempted script {series_script}') else: raise e self.script_handles[id] = script_id self.scripts[id] = series_script
[docs] def series(self, id=None, delete=None, **kwargs): """ Execute a script that sets the pin to a series of values for a series of durations. See :meth:`~.Digital_Out._series_script` for series parameterization. Ideally one would use :meth:`.store_series` and use the returned id to call this function. Otherwise, this method calls :meth:`.store_series` and runs it. Args: id (str, int): ID of the script, if not already created, created with :meth:`.store_script`. If None (default), an ID is generated with :attr:`~.Digital_Out.script_counter` of the form ``'script_#'`` kwargs: passed to :meth:`~.Digital_Out._series_script` """ return_id = False if id: if id not in self.script_handles.keys(): self.store_series(id, **kwargs) # if we weren't called with an ID, have to have a list of values/etc to create a script else: id = "script_{}".format(next(self.script_counter)) self.store_series(id, **kwargs) # if we haven't been explicitly told to not delete it, we should if delete is None: delete = True # since we've generated a script ID, we should return it return_id = True script_status = self.pig.script_status(self.script_handles[id]) if script_status == pigpio.PI_SCRIPT_INITING: check_times = 0 while self.pig.script_status(self.script_handles[id]) == pigpio.PI_SCRIPT_INITING: # TODO: Expose this as a parameter -- how long to try and init scripts before skipping, mebs some general 'timeout' variable for all blocking ops. time.sleep(0.005) check_times += 1 if check_times > 200: break try: self.pig.run_script(self.script_handles[id]) self._last_script = id except pigpio.error as e: self.logger.exception(f'Couldnt run script: {e}') finally: if delete: self.delete_script(id) if return_id: return id
[docs] def delete_script(self, script_id): """ spawn a thread to delete a script with id ``script_id`` This is a 'soft' deletion -- it checks if the script is running, and waits for up to 10 seconds before actually deleting it. The script is deleted from the pigpio daemon, from ``script_handles`` and from ``scripts`` Args: script_id (str): a script ID in :attr:`.Digital_Out.script_handles` """ delete_script = threading.Thread(target=self._delete_script, args=(script_id,)) delete_script.start()
def _delete_script(self, script_id): checktimes = 0 while self.pig.script_status(self.script_handles[script_id]) == pigpio.PI_SCRIPT_RUNNING: time.sleep(1) checktimes += 1 if checktimes > 10: continue self.pig.delete_script(self.script_handles[script_id]) del self.scripts[self.script_handles[script_id]] del self.script_handles[script_id]
[docs] def delete_all_scripts(self): """ Stop and delete all scripts This is a "hard" deletion -- the script will be immediately stopped if it's running. """ for script_handle, script_id in self.script_handles.items(): try: self.stop_script(script_handle) except Exception as e: self.logger.exception(e) try: self.pig.delete_script(script_id) except AttributeError: pass except Exception as e: if 'unknown script id' in str(e): pass else: self.logger.exception(e) try: del self.scripts[self.script_handles[script_id]] except KeyError: pass try: del self.script_handles[script_id] except KeyError: pass
[docs] def stop_script(self, id=None): """ Stops a running pigpio script Args: id (str, none): If None, stops the last run script. if str, stops script with that id. """ if not id: id = self._last_script if id is None: # if there was no passed id and _last_script is None, return peacefully return # if we were passed a keyed, named script id, convert it to the pigio integer script id if id in self.script_handles.keys(): script_id = self.script_handles[id] status, _ = self.pig.script_status(script_id) # if the script is still running, stop it and set the pin to off. if status == pigpio.PI_SCRIPT_RUNNING: self.pig.stop_script(script_id) #self.set(self.off) # set last_script to None so we won't try to double-stop this script self._last_script = None else: self.logger.warning('No script with id {} found to stop'.format(id))
[docs] def release(self): """ Stops and deletes all scripts, sets to :attr:`~.Digital_Out.off`, and calls :meth:`.GPIO.release` """ self.logger.debug('releasing') try: self.delete_all_scripts() #self.stop_script() self.set(self.off) time.sleep(0.1) except AttributeError: # self.pig has already been deleted (release has already been called) # so self.pig has no attribute 'send' because it is None pass finally: super(Digital_Out, self).release()
[docs]class Digital_In(GPIO): """ Record digital input and call one or more callbacks on logic transition. Args: pin (int): `Board-numbered <https://raspberrypi.stackexchange.com/a/12967>`_ GPIO pin. event (:class:`threading.Event`): For callbacks assigned with :meth:`.assign_cb` with ``evented = True``, set this event whenever the callback is triggered. Can be used to handle stage transition logic here instead of the :class:`.Task` object, as is typical. record (bool): Whether all logic transitions should be recorded as a list of ('EVENT', 'Timestamp') tuples. max_events (int): Maximum size of the :attr:`.events` deque **kwargs: passed to :class:`GPIO` Sets the internal pullup/down resistor to :attr:`.Digital_In.off` and :attr:`.Digital_In.trigger` to :attr:`.Digital_In.on` upon instantiation. Note: pull and trigger are set by polarity on initialization in digital inputs, unlike other GPIO classes. They are not mutually synchronized however, ie. after initialization if any one of these attributes are changed, the other two will remain the same. Attributes: pig (:meth:`pigpio.pi`): The pigpio connection. pin (int): Broadcom-numbered pin, converted from the argument given on instantiation callbacks (list): A list of :meth:`pigpio.callback`s kept to clear them on exit polarity (int): Logic direction, if 1: off=0, on=1, pull=low, trigger=high and vice versa for 0 events (list): if :attr:`.record` is True, a deque of ('EVENT', 'TIMESTAMP') tuples of length ``max_events`` """ is_trigger=True type = 'DIGI_IN' input = True def __init__(self, pin, event=None, record=True, max_events=256, **kwargs): """ """ super(Digital_In, self).__init__(pin, **kwargs) # pull the resistor in the off direction and set the trigger to be the on direction self.pull = self.off self.trigger = self.on # We can be passed a threading.Event object if we want to handle stage logic here # rather than in the parent as is typical. self.event = event # List to store callback handles assigned with assign_cb self.callbacks = [] # List to store logic transition events self.events = dq(maxlen=max_events) self.record = record if self.record: self.assign_cb(self.record_event, add=True, evented=False, manual_trigger='B') # Setup pin self.pig.set_mode(self.pin_bcm, pigpio.INPUT)
[docs] def assign_cb(self, callback_fn, add=True, evented=False, manual_trigger=None): """ Sets ``callback_fn`` to be called when :attr:`.Digital_In.trigger` is detected. ``callback_fn`` must accept three parameters: * GPIO (int, 0-31): the BCM number of the pin that was triggered * level (0-2): * 0: change to low (falling) * 1: change to high (rising) * 2: no change (watchdog timeout) * timestamp (str): If using the Autopilot version of pigpio, an isoformatted timestamp Args: callback_fn (callable): The function to be called when triggered add (bool): Are we adding another callback? If False, the previous callbacks are cleared. evented (bool): Should triggering this event also set the internal :attr:`~.Digital_In.event`? Note that :attr:`.Digital_In.event` must have been passed. manual_trigger ('U', 'D', 'B'): Override :attr:`.Digital_In.trigger` if needed. """ # If we aren't adding, we clear any existing callbacks if not add: self.clear_cb() if self.record: # if we're clearing all callbacks (maybe by accident) # but we're configured to record events, re-add the record cb. self.assign_cb(self.record_event, add=True, evented=False, manual_trigger="B") # We can set the direction of the trigger manually, # for example if we want to set 'BOTH' only sometimes if not manual_trigger: trigger_ud = self.trigger_edge else: trigger_ud = TRIGGER_MAP[manual_trigger] # We can handle eventing (blocking) here if we want (usually this is handled in the parent) # This won't work if we weren't init'd with an event. if evented and self.event: cb = self.pig.callback(self.pin_bcm, trigger_ud, self.event.set) self.callbacks.append(cb) elif evented and not self.event: raise Exception('We have no internal event to set!') cb = self.pig.callback(self.pin_bcm, trigger_ud, callback_fn) self.callbacks.append(cb)
[docs] def clear_cb(self): """ Tries to call `.cancel()` on each of the callbacks in :attr:`~Digital_In.callbacks` """ for cb in self.callbacks: try: cb.cancel() except: self.logger.warning('could not cancel callback: {}'.format(cb)) self.callbacks = []
[docs] def record_event(self, pin, level, timestamp): """ On either direction of logic transition, record the time Args: pin (int): BCM numbered pin passed from pigpio level (bool): High/Low status of current pin timestamp (str): isoformatted timestamp """ # use self.pin rather than incoming pin b/c method is bound # (ie. only will be called by pin assigned to) # and self.pin is board rather than bcm numbered self.events.append((self.pin, level, timestamp))
[docs] def release(self): """ Clears any callbacks and calls :meth:`GPIO.release` """ self.logger.debug('releasing') self.clear_cb() super(Digital_In, self).release()
[docs]class PWM(Digital_Out): """ PWM output from GPIO. """ output = True type="PWM" pigs_function = b'pwm' def __init__(self, pin, range=255, **kwargs): """ Args: pin (int): Board numbered GPIO pin range (int): Maximum value of PWM duty-cycle. Default 255. **kwargs: passed to :class:`Digital_Out` """ self._range = None super(PWM, self).__init__(pin, **kwargs) self.range = range #self.pig.set_mode(self.pin_bcm, pigpio.OUTPUT) # set to off in case of common anode/inverted polarity self.set(self.off)
[docs] def set(self, value): """ Sets PWM duty cycle normalized to :attr:`.polarity` and transformed by :meth:`._clean_value` Stops the last running script Args: value (int, float): - if int > 1, sets value (or :attr:`PWM.range`-value if :attr:`PWM.polarity` is inverted). - if 0 <= float <= 1, transforms to a proportion of :attr:`.range` (inverted if needed as well). """ value = self._clean_value(value) self.stop_script() self.pig.set_PWM_dutycycle(self.pin_bcm, value)
def _clean_value(self, value): if value > 1: if value > self.range: self.logger.warning('clipping {} to range {}'.format(value, self.range)) value = np.clip(value, 0, self.range).astype(np.int) elif 0 <= value <= 1: value = np.round(value * self.range).astype(np.int) else: self.logger.exception('PWM value must be an integer between 0 and range: {}, or a float between 0 and 1. got {}'.format(self.range, value)) if self.polarity == 0: value = self.range - value return value @property def range(self): """ Maximum value of PWM dutycycle. Doesn't set duration of PWM, but set values will be divided by this range. eg. if ``range == 200``, calling :meth:`PWM.set(100)` would result in a 50% duty cycle Args: (int): 25-40000 """ if not self._range: self._range = self.pig.get_PWM_range(self.pin_bcm) return self._range @range.setter def range(self, value): try: value = int(value) except ValueError: ValueError('PWM Range must be coerceable to an integer, got {}'.format(value)) if (value < 25) or (value > 40000): Warning('PWM Range must be between 25 and 40000, got {}, clipping to range'.format(value)) value = np.clip(value, 25, 40000).astype(np.int) self.pig.set_PWM_range(self.pin_bcm, value) self._range = value # set polarity again so off/on update self.polarity = self._polarity @property def polarity(self): """ Logic direction. - if 1: on=High=:attr:`~PWM.range`, off=Low=0; - if 0: off=Low=0, on=High=:attr:`~PWM.range`. When set, updates :attr:`~PWM.on` and :attr:`~PWM.off` """ return self._polarity @polarity.setter def polarity(self, polarity): if polarity == 1: self.on = self.range self.off = 0 elif polarity == 0: self.on = 0 self.off = self.range else: ValueError('polarity must be 0 or 1') return self._polarity = polarity
[docs] def release(self): """ Turn off and call :meth:`Digital_Out.release` Returns: """ # FIXME: reimplementing parent release method here because of inconsistent use of self.off -- unify API and fix!! self.logger.debug('releasing') try: self.delete_all_scripts() self.set(0) # clean values should handle inversion, don't use self.off time.sleep(0.1) self.pig.stop() except AttributeError: # has already been called, pig already destroyed pass
[docs]class LED_RGB(Digital_Out): output = True type = "LEDS" def __init__(self, pins = None, r = None, g = None, b = None, polarity = 1, blink = True, **kwargs): """ An RGB LED, wrapper around three :class:`PWM` objects. Args: pins (list): A list of (board) pin numbers. Either `pins` OR all `r`, `g`, `b` must be passed. r (int): Board number of Red pin - must be passed with `g` and `b` g (int): Board number of Green pin - must be passed with `r` and `b` b (int): Board number of Blue pin - must be passed with `r` and `g`: polarity (0, 1): 0: common anode (low turns LED on) 1: common cathode (low turns LED off) blink (bool): Flash RGB at the end of init to show we're alive and bc it's real cute. **kwargs: passed to :class:`Digital_Out` Attributes: channels (dict): The three PWM objects, {'r':PWM, ... etc} """ # self._pin = {} self.channels = {} self.scripts = {} if 'pin' in kwargs.keys(): raise ValueError('pin passed to LED_RGB, need a list of 3 pins instead (r, g, b). pin, used by single-pin GPIO objects, is otherwise ambiguous with pins.') super(LED_RGB, self).__init__(**kwargs) self.flash_params = None if pins and len(pins) == 3: for color, pin in zip(('r', 'g', 'b'), pins): self.channels[color] = PWM(pin, polarity=polarity, name=f"{self.name}_{color.upper()}") elif r and g and b: self.channels = {'r':PWM(r, polarity=polarity, name="{}_R".format(self.name)), 'g':PWM(g, polarity=polarity, name="{}_G".format(self.name)), 'b':PWM(b, polarity=polarity, name="{}_B".format(self.name))} else: raise ValueError('Must either set with pins= list/tuple of r,g,b pins, or pass all three as separate params') self.store_series(id='blink', colors=((1,0,0),(0,1,0),(0,0,1),(0,0,0)), durations=250) if blink: self.series(id='blink') @property def range(self) -> dict: """ Returns: dict: ranges for each of the :attr:`LED_RGB.channels` """ return {'r':self.channels['r'].range, 'g':self.channels['g'].range, 'b':self.channels['b'].range} @range.setter def range(self, range): for channel in self.channels.items(): channel.range = range
[docs] def set(self, value=None, r=None, g=None, b=None): """ Set the color of the LED. Can either pass - a full (R, G, B) tuple to ``value``, - a single ``value`` that is applied to each channel, - if ``value`` is not passed, individual ``r``, ``g``, or ``b`` values can be passed (any combination can be set in a single call) Stops the last run script Args: value (int, float, tuple, list): If list or tuple, an (R, G, B) color. If float or int, applied to each color channe. Can be set with floats 0-1, or ints >= 1 (See :attr:`PWM.range`). If None, use ``r``, ``g``, and ``b``. r (float, int): value to set red channel g (float, int): value to set green channel b (float, int): value to set blue channel """ # Stop any running scripts (like blinks) self.stop_script() # Set either by `value` or by individual r, g, b if value is not None: # Set by `value` if isinstance(value, int) or isinstance(value, float): # Apply `value` to each channel for channel in self.channels.values(): channel.set(value) elif len(value) == 3: # Assume value is a tuple of r, g, b for channel_key, color_val in zip(('r','g','b'), value): self.channels[channel_key].set(color_val) else: raise ValueError('Value must either be a single value or a tuple of (r,g,b)') else: # Set by individually specified r, g, b arguments if r is not None: self.channels['r'].set(r) if g is not None: self.channels['g'].set(g) if b is not None: self.channels['b'].set(b)
[docs] def toggle(self): self.logger.warning('toggle not well defined for PWM')
[docs] def pulse(self, duration=None): self.logger.warning('Use flash for LEDs instead')
[docs] def _series_script(self, colors, durations = None, unit="ms", repeat=None, finish_off = True): """ Create a script to flash a series of colors. Like :meth:`Digital_Out._series_script`, but sets all pins at once. Args: colors (list): a list of (R, G, B) colors, or a list of ((R,G,B),duration) tuples. durations (int, list): Duration of each color. if a single value, used for all ``colors``. if a list, ``len(durations) == len(colors)``. If None, colors must be ``((R,G,B),duration)`` tuples. unit ('ms', 'us'): unit of durations, milliseconds or microseconds repeat (int): Number of repetitions. If None, script runs once. finish_off (bool): Whether the channels should be set to ``off`` when the script completes Returns: str: constructed pigpio script string. """ if durations: if isinstance(durations, int) or isinstance(durations, float): iter_series = itertools.product(colors, [round(durations)]) elif len(colors) == len(durations): iter_series = zip(colors, durations) elif len(durations) == 1: iter_series = itertools.product(colors, durations) else: ValueError("length of colors and durations must be equal, or length of durations must be 1. got len(colors)={}, len(durations)={}".format(len(colors), len(durations))) else: iter_series = colors.__iter__() if unit == "ms": wait_fn = "mils" elif unit == "us": wait_fn = "mics" else: ValueError("Unit for durations must be ms (milliseconds) or us (microseconds)") string_pieces = [] for color, duration in iter_series: # iter over keys rather than values of channels dict b/c dicts are unordered and colors are an ordered tuple for channel_key, color_value in zip(('r', 'g', 'b'), color): string_pieces.append(b" ".join((self.channels[channel_key].pigs_function, bytes(str(self.channels[channel_key].pin_bcm), 'utf-8'), bytes(str(self.channels[channel_key]._clean_value(color_value)), 'utf-8')))) string_pieces.append(bytes("{} {}".format(wait_fn, duration), 'utf-8')) script_str = b" ".join(string_pieces) if repeat: try: repeat = int(repeat) except: ValueError('Repeat must be coerceable to an integer, got {}'.format(repeat)) script_str = b" ".join(("LD v0", str(repeat-1), # "LD (load) variable 0 with number of repeats" "tag 999", # create a tag that can be returned to script_str, # do the script one time "dcr v0" # decrement v0. "jp 999") # jump to the tag at the beginning if v0 is still positive ) # turn off when finished if finish_off: #script_str += b" " finish_pieces = [script_str] for channel in self.channels.values(): finish_pieces.append(b" ".join((channel.pigs_function, bytes(str(channel.pin_bcm), 'utf-8'), bytes(str(channel.off), 'utf-8')))) script_str = b" ".join(finish_pieces) return script_str
[docs] def flash(self, duration, frequency=10, colors=((1,1,1),(0,0,0))): """ Specify a color series by total duration and flash frequency. Largely a convenience function for on/off flashes. Args: duration (int, float): Duration of flash in ms. frequency (int, float): Frequency of flashes in Hz colors (list): A list of RGB values 0-255 like:: [[255,255,255],[0,0,0]] """ # Duration is total in ms, frequency in Hz # Get number of flashes in duration rounded down n_rep = int(float(duration) / 1000. * float(frequency)) flashes = colors * n_rep # Invert frequency to duration for single flash # divide by 2 b/c each 'color' is half the duration single_dur = ((1. / frequency) * 1000) / 2. flash_params = (flashes, single_dur) new_flash = True if self.flash_params: if self.flash_params == flash_params: # do nothing new_flash = False if new_flash: self.store_series('flash', colors=flashes, durations=single_dur) self.flash_params = flash_params self.series('flash')
# # def __getattr__(self, name): # """if a method is called that we don't explicitly redefine, try to apply it to each of our channels""" # # attrs = {color: object.__getattribute__(chan, name) for color, chan in self.channels.items()} # if hasattr(list(attrs.values())[0], '__call__'): # def newfunc(*args, **kwargs): # result = {} # # if called with a dictionary of {'r':params, 'g':params, ...}, call that explicitly # if isinstance(args[0], dict): # if any([k in args[0].keys() for k in self.channels.keys()]): # for color, method in attrs.items(): # if color in args[0].keys(): # result[color] = method(**args[0][color]) # return result # # # otherwise, call all channels with the same argument # for color, method in attrs.items(): # result[color] = method(*args, **kwargs) # return result # # return newfunc # else: # return attrs
[docs] def release(self): """ Release each channel and stop pig without calling superclass. """ self.logger.debug('releasing') for chan in self.channels.values(): chan.release() self.pig.stop()
@property def pin(self): """ Dict of the board pin number of each channel, ``{'r' : self.channels['r'].pin, ... }`` """ try: return {'r':self.channels['r'].pin, 'g':self.channels['g'].pin, 'b':self.channels['b'].pin} except KeyError: return {} @pin.setter def pin(self, pin): """ Pins don't get set like this for the LED_RGB, ignore. """ if pin is None: # just remnants of the attempt to set from the GPIO metaclass return self.logger.warning('pin cant be set via the attribute') @property def pin_bcm(self): """ Dict of the broadcom pin number of each channel, ``{'r' : self.channels['r'].pin_bcm, ... }`` """ try: return {'r':self.channels['r'].pin_bcm, 'g':self.channels['g'].pin_bcm, 'b':self.channels['b'].pin_bcm} except KeyError: return {} @pin_bcm.setter def pin_bcm(self, pin_bcm): """ Pins don't get set like this for the LED_RGB, ignore\ """ if pin_bcm is None: return self.logger.warning('pin_bcm cant be set via the attribute') @property def pull(self): return None @pull.setter def pull(self, direction): if direction is None: return self.logger.warning('pull cant be set via the attribute')
[docs]class Solenoid(Digital_Out): """ Solenoid valve for water delivery. Args: pin (int): Board pin number, converted to BCM on init. polarity (0, 1): Whether HIGH opens the port (1) or closes it (0) duration (int, float): duration of open, ms. vol (int, float): desired volume of reward in uL, must have computed calibration results, see :meth:`~.Terminal.calibrate_ports` **kwargs: passed to :class:`Digital_Out` Only NC solenoids should be used, as there is no way to guarantee that a pin will maintain its voltage when it is released, and you will spill water all over the place. Attributes: calibration (dict): Dict with with line coefficients fitting volume to open duration, see :meth:`~.Terminal.calibrate_ports`. Retrieved from prefs, specifically ``prefs.get('PORT_CALIBRATION')[name]`` mode ('DURATION', 'VOLUME'): Whether open duration is given in ms, or computed from calibration duration (int, float): Duration of valve opening, in ms. When set, creates a script 'open' that is used to open the valve for a precise amount of time """ output = True type = "SOLENOID" DURATION_MIN = 2 #: Minimum allowed duration in ms def __init__(self, pin, polarity=1, duration=20, vol=None, **kwargs): """ """ super(Solenoid, self).__init__(pin, polarity=polarity, **kwargs) self.calibration = None self._duration = None # Pigpio has us create waves to deliver timed output # Since we typically only use one duration, # we make the wave once and only make it again when asked to # We start with passed or default duration (ms) if vol: self.duration = self.dur_from_vol(vol) self.mode = 'VOLUME' else: self.duration = duration self.mode='DURATION' @property def duration(self): return self._duration @duration.setter def duration(self, duration): duration = round(duration) if duration < self.DURATION_MIN: Warning('Duration cannot be less than {}, so value was clipped. given {}'.format(self.DURATION_MIN, duration)) duration = self.DURATION_MIN self._duration = duration self.store_series(id="open", values=self.on, durations=self._duration, unit="ms", finish_off=True, repeat=None)
[docs] def dur_from_vol(self, vol): """ Given a desired volume, compute an open duration. Must have calibration available in prefs, see :meth:`~.Terminal.calibrate_ports`. Args: vol (float, int): desired reward volume in uL Returns: int: computed opening duration for given volume """ # find our pin name if not self.name: self.name = self.get_name() # legacy, compatibility code -- if calibrations were made in the olde way # then try and load them. they will be saved in the new way by the calibration setter in the root class # prefs should have loaded any calibration if self.calibration is None: try: self.calibration = prefs.get('PORT_CALIBRATION')[self.name] except KeyError: # try using name prepended with PORTS_, which happens for hardware objects with implicit names self.calibration = prefs.get('PORT_CALIBRATION')[self.name.replace('PORTS_', '')] except Exception as e: self.logger.exception(f'couldnt get calibration, using default LUT y = 3.5 + 2. got error {e}') self.calibration = {'slope': 3.5, 'intercept': 2} # compute duration from slope and intercept duration = round(float(self.calibration['intercept']) + (float(self.calibration['slope']) * float(vol))) return duration
[docs] def open(self, duration=None): """ Open the valve. Uses the 'open' script created when assigning duration. Args: duration (float): If provided, open for this duration instead of the duration stored on instantiation. """ if duration: self.duration = duration self.series(id="open")