"""
Hardware that uses the GPIO pins of the Raspi. These classes rely on `pigpio
<http://abyz.me.uk/rpi/pigpio/>`_, whose daemon (pigpiod) must be running in the background --
typically this is handled with a launch script/system daemon (see the launch_pilot.sh script generated by setup_autopilot.py)
Autopilot uses a custom version of pigpio (`<https://github.com/sneakers-the-rat/pigpio>`_) that
returns isoformatted timestamps rather than tick numbers in callbacks. See the ``setup_pilot.sh`` script.
Note:
Autopilot uses the "Board" rather than "Broadcom" numbering system, see :ref:`the numbering note. <numbering-note>`
:class:`.GPIO` objects convert internally between board and bcm numbers using :attr:`.GPIO.pin` ,
:attr:`.GPIO.pin_bcm` , :data:`.BOARD_TO_BCM` , and :data:`.BCM_TO_BOARD` .
Note:
This module does not include hardware that uses the GPIO pins over a specific protocol like i2c
"""
import os
import sys
import threading
import time
import numpy as np
from datetime import datetime
import itertools
import typing
import warnings
from collections import deque as dq
from autopilot import prefs
from autopilot.hardware import Hardware, BOARD_TO_BCM
from autopilot import external
ENABLED = False
"""
False if pigpio cannot be imported -- and GPIO devices cannot be used.
True if pigpio can be imported
"""
try:
import pigpio
TRIGGER_MAP = {
'U': pigpio.RISING_EDGE,
1: pigpio.RISING_EDGE,
True: pigpio.RISING_EDGE,
'D': pigpio.FALLING_EDGE,
0: pigpio.FALLING_EDGE,
False: pigpio.FALLING_EDGE,
'B': pigpio.EITHER_EDGE,
(0,1): pigpio.EITHER_EDGE
}
"""
Maps user input descriptions of triggers to the corresponding pigpio object.
"""
INVERSE_TRIGGER_MAP = {
pigpio.RISING_EDGE: 'U',
pigpio.FALLING_EDGE: 'D',
pigpio.EITHER_EDGE: 'B'
}
"""
Inverse of :data:`.TRIGGER_MAP`. Used to assign canonical references to triggers --
ie. it is possible to take multiple params (1, True, 'U') -> pigpio trigger objects,
but there is one preferred way to refer to a pigpio object.
"""
PULL_MAP = {
1: pigpio.PUD_UP,
True: pigpio.PUD_UP,
'U': pigpio.PUD_UP,
0: pigpio.PUD_DOWN,
False: pigpio.PUD_DOWN,
'D': pigpio.PUD_DOWN,
None: pigpio.PUD_OFF
}
"""
Maps user input descriptions of internal resistor pullups/downs to the corresponding
pigpio object.
"""
INVERSE_PULL_MAP = {
pigpio.PUD_UP: 'U',
pigpio.PUD_DOWN: 'D',
pigpio.PUD_OFF: None
}
"""
Inverse of :data:`.PULL_MAP`, mapping pigpio objects for internal resistor pullups/downs to
their canonical form ('U', 'D', None for pullup, pulldown, or no pull)
"""
ENABLED = True
except ImportError:
if prefs.get('AGENT') == "PILOT":
warnings.warn("pigpio could not be imported, gpio not enabled", ImportWarning)
[docs]def clear_scripts(max_scripts=256):
"""
Stop and delete all scripts running on the pigpio client.
To be called, eg. between tasks to ensure none are left hanging by badly behaved GPIO devices
Args:
max_scripts (int): maximum number of scripts allowed by pigpio. Set in ``pigpio.c`` and not exported
to the python module, so have to hardcode it again here, default for pigpio fork is 256
"""
if globals()['ENABLED'] == False:
warnings.warn('pigpio was not imported, so scripts cannot be cleared')
return
pig = pigpio.pi()
for i in range(max_scripts):
try:
pig.stop_script(i)
except pigpio.error as e:
if 'unknown script id' in str(e):
continue
try:
pig.delete_script(i)
except Exception as e:
warnings.warn(f'Error deleting script {i}, got exception: \n{e}')
[docs]class GPIO(Hardware):
"""
Metaclass for hardware that uses GPIO. Should not be instantiated on its own.
Handles initializing pigpio and wraps some of its commonly used methods
Args:
pin (int): The `Board-numbered <https://raspberrypi.stackexchange.com/a/12967>`_ GPIO pin of this object.
polarity (int): Logic direction. if 1: on=High=1, off=Low=0; if 0: off=Low=0, on=High=1
pull (str, int): state of pullup/down resistor. Can be set as 'U'/'D' or 1/0 to pull up/down. See :data:`.PULL_MAP`
trigger (str, int, bool): whether callbacks are triggered on rising ('U', 1, True), falling ('D', 0, False),
or both edges ('B', (0,1))
kwargs: passed to the :class:`.Hardware` superclass.
Attributes:
pig (:class:`pigpio.pi`): An object that manages connection to the pigpio daemon. See docs at http://abyz.me.uk/rpi/pigpio/python.html
CONNECTED (bool): Whether the connection to pigpio was successful
pigpiod: Reference to the pigpiod process launched by :func:`.external.start_pigpiod`
pin (int): The `Board-numbered <https://raspberrypi.stackexchange.com/a/12967>`_ GPIO pin of this object.
pin_bcm (int): The BCM number of the connected pin -- used by pigpio. Converted from pin passed as argument on initialization,
which is assumed to be the board number.
pull (str, int): state of pullup/down resistor. Can be set as 'U'/'D' or 1/0 to pull up/down
polarity (int): Logic direction. if 1: on=High=1, off=Low=0; if 0: off=Low=0, on=High=1
on (int): if polarity == 1, high/1. if polarity == 0, low/0
off (int): if polarity == 1, low/0. if polarity == 0, high/1
trigger (str, int, bool): whether callbacks are triggered on rising ('U', 1, True), falling ('D', 0, False),
or both edges ('B', (0,1))
trigger_edge: The pigpio object representing RISING_EDGE, FALLING_EDGE, BOTH_EDGES. Set by :attr`.trigger`
"""
def __init__(self, pin=None, polarity=1, pull = None, trigger = None, **kwargs):
super(GPIO, self).__init__(**kwargs)
if not ENABLED:
raise RuntimeError('Couldnt import pigpio, so GPIO objects cant be used')
# initialize attributes
self._polarity = None
self._pull = None
self._pin = None
self._trigger = None
self.trigger_edge = None
try:
self.pin_bcm = None
except AttributeError:
# if a subclass has made this a property, don't fail here
self.logger.warning('pin_bcm is defined as a property without a setter so cant be set')
self.pig = None # type: typing.Optional[pigpio.pi]
self.pigpiod = None
# init pigpio
self.CONNECTED = False
self.CONNECTED = self.init_pigpio()
# set default attributes
self.pin = pin
self.polarity = polarity
self.pull = pull
self.trigger = trigger
if not self.CONNECTED:
RuntimeError('No connection could be made to the pigpio daemon')
[docs] def init_pigpio(self) -> bool:
"""
Create a socket connection to the pigpio daemon and set as :attr:`GPIO.pig`
Returns:
bool: True if connection was successful, False otherwise
"""
self.pigpiod = external.start_pigpiod()
self.pig = pigpio.pi()
if self.pig.connected:
return True
else:
return False
@property
def pin(self):
"""
`Board-numbered <https://raspberrypi.stackexchange.com/a/12967>`_ GPIO pin.
When assigned, also updates :attr:`.pin_bcm` with the BCM-numbered pin.
"""
return self._pin
@pin.setter
def pin(self, pin):
if pin is None:
return
assert(int(pin))
self._pin = int(pin)
self.pin_bcm = BOARD_TO_BCM[self._pin]
@property
def state(self) -> bool:
"""
Instantaneous state of GPIO pin, on (``True``) or off (``False``)
Returns:
bool
"""
return bool(self.pig.read(self.pin_bcm))
@property
def pull(self):
"""
State of internal pullup/down resistor.
See :data:`.PULL_MAP` for possible values.
Returns:
int: 'U'/'D'/None for pulled up, down or not set.
"""
return self._pull
@pull.setter
def pull(self, direction):
try:
pull = PULL_MAP[direction]
self._pull = INVERSE_PULL_MAP[pull]
self._pull = direction
self.pig.set_pull_up_down(self.pin_bcm, pull)
except KeyError:
ValueError("pull must be one of 0/'D'/False for pulldown, 1/'U'/True for pullup, or None to clear")
@property
def polarity(self):
"""
Logic direction. if 1: on=High=1, off=Low=0; if 0: off=Low=0, on=High=1.
When set, updates :attr:`~.GPIO.on` and :attr:`~.GPIO.off` accordingly
"""
return self._polarity
@polarity.setter
def polarity(self, polarity):
if polarity == 1:
self.on = 1
self.off = 0
elif polarity == 0:
self.on = 0
self.off = 1
else:
raise ValueError('polarity must be 0 or 1')
self._polarity = polarity
@property
def trigger(self):
"""
dict: Maps strings (('U',1,True), ('D',0,False), ('B',[0,1])) to pigpio edge types
(RISING_EDGE, FALLING_EDGE, EITHER_EDGE), respectively.
"""
return self._trigger
@trigger.setter
def trigger(self, trigger):
try:
self.trigger_edge = TRIGGER_MAP[trigger]
self._trigger = INVERSE_TRIGGER_MAP[self.trigger_edge]
except KeyError:
ValueError('trigger must be one of {}. instead got {}'.format(INVERSE_TRIGGER_MAP.values(),
trigger))
[docs] def release(self):
"""
Release the connection to the pigpio daemon.
Note:
the Hardware metaclass will call this method on object deletion.
"""
self.logger.debug('releasing')
try:
self.pull = None
except:
pass
self.pig.stop()
[docs]class Digital_Out(GPIO):
"""
TTL/Digital logic out through a GPIO pin.
Args:
pin (int): The `Board-numbered <https://raspberrypi.stackexchange.com/a/12967>`_ GPIO pin of this object
pulse_width (int): Width of digital output :meth:`~.Digital_Out.pulse` (us). range: 1-100
polarity (bool): Whether 'on' is High (1, default) and pulses bring the voltage High, or vice versa (0)
Attributes:
scripts (dict): maps script IDs to pigpio script handles
pigs_function (bytes): when using pigpio scripts, what function is used to set the value of the output?
(eg. 'w' for digital out, 'gdc' for pwm, more info here: http://abyz.me.uk/rpi/pigpio/pigs.html)
script_counter (:class:`itertools.count`): generate script IDs if not explicitly given to :meth:`~.Digital_Out.series`.
generated IDs are of the form 'series_#'
"""
output = True
type="DIGITAL_OUT"
pigs_function = b"w"
def __init__(self, pin=None, pulse_width=100, polarity=1, **kwargs):
"""
"""
super(Digital_Out, self).__init__(pin, polarity=polarity, **kwargs)
self._last_script = None
self.scripts = {}
self.script_handles = {}
self.script_counter = itertools.count()
self.pulse_width = np.clip(pulse_width, 0, 100).astype(np.int)
if pulse_width > 100 or pulse_width < 0:
Warning('pulse_width must be <100(ms) & >0 and has been clipped to {}'.format(self.pulse_width))
# setup pin if we are a true single-pin digital output and not a subclass
if isinstance(self.pin_bcm, int):
self.pig.set_mode(self.pin_bcm, pigpio.OUTPUT)
self.set(self.off)
[docs] def set(self, value: bool):
"""
Set pin logic level.
Default uses :meth:`pigpio.pi.write`, but can be overwritten by inheriting classes
Stops the last running script when called.
Args:
value (int, bool): (1, True) to set High, (0, False) to set Low.
"""
self.stop_script()
self.pig.write(self.pin_bcm, value)
[docs] def turn(self, direction='on'):
"""
Change output state using on/off parlance. logic direction varies based on :attr:`Digital_Out.polarity`
Stops the last running script when called.
Args:
direction (str, bool): 'on', 1, or True to turn to :attr:`~.Digital_Out.on` and vice versa for :attr:`~.Digital_Out.off`
"""
self.stop_script()
if direction in ('on', True, 1):
self.set(self.on)
elif direction in ('off', False, 0):
self.set(self.off)
[docs] def toggle(self):
"""
If pin is High, set Low, and vice versa.
Stops the last running script when called.
"""
self.stop_script()
if self.pig.read(self.pin_bcm):
self.set(0)
else:
self.set(1)
[docs] def pulse(self, duration=None):
"""
Send a timed :attr:`~Digital_Out.on` pulse.
Args:
duration (int): If None (default), uses :attr:`~.Digital_Out.duration`, otherwise
duration of pulse from 1-100us.
"""
if not duration:
self.pig.gpio_trigger(self.pin_bcm, self.pulse_width, self.on)
elif duration:
duration = np.clip(duration, 0, 100).astype(np.int)
self.pig.gpio_trigger(self.pin_bcm,
duration,
self.on)
else:
raise RuntimeError('How did we even get here? Pulse value must be an int 1-100')
[docs] def _series_script(self, values, durations = None, unit="ms", repeat=None, finish_off = True):
"""
Create a pigpio script to set a pin to a series of values for a series of durations.
Typically shouldn't be called by itself, is used by :meth:`~.Digital_Out.series` or :meth:`~.Digital_Out.store_series`
For more information on pigpio scripts, see: `<http://abyz.me.uk/rpi/pigpio/pigs.html#Scripts>`_
Args:
values (list): A list of tuples of (value, duration) or a list of values in (1,0) to set self.pin_bcm to.
durations (list): If ``values`` is not a list of tuples, a list of durations.
len(durations) must be either == len(values) or else len(durations) == 1,
in which case the duration is repeated.
unit ("ms", "us"): units of durations in milliseconds or microseconds
repeat (int): If the script should be repeated, how many times? A value of 2 results in
the script being run 2 times total, not 2 *additional* times (or, 3 total times)
finish_off (bool): If true, the script ends by turning the pin to :attr:`~.Digital_Out.off`
Returns:
(str): the constructed script string
"""
# if single value as an int/float/etc, wrap in list.
if not (isinstance(values, list) or isinstance(values, tuple)):
values = [values]
if durations:
if isinstance(durations, float) or isinstance(durations, int):
durations = [round(durations)]
if len(values) == len(durations):
iter_series = zip(values, durations)
elif len(durations) == 1:
iter_series = itertools.product(values, durations)
else:
raise ValueError("length of values and durations must be equal, or length of durations must be 1. got len(values)={}, len(durations)={}".format(len(values), len(durations)))
else:
iter_series = values.__iter__()
if unit == "ms":
wait_fn = "mils"
elif unit == "us":
wait_fn = "mics"
else:
raise ValueError("Unit for durations must be ms (milliseconds) or us (microseconds)")
string_pieces = [b" ".join((self.pigs_function,
str(self.pin_bcm).encode('utf-8'),
str(val).encode('utf-8'),
bytes(wait_fn, 'utf-8'),
str(round(dur)).encode('utf-8'))) for val, dur in iter_series]
script_str = b" ".join(string_pieces)
if repeat:
try:
repeat = int(repeat)
except:
raise ValueError('Repeat must be coerceable to an integer, got {}'.format(repeat))
script_str = b" ".join(("LD v0", str(repeat-1), # "LD (load) variable 0 with number of repeats"
"tag 999", # create a tag that can be returned to
script_str, # do the script one time
"dcr v0" # decrement v0.
"jp 999") # jump to the tag at the beginning if v0 is still positive
)
# turn off when finished
if finish_off:
script_str += b" ".join((self.pigs_function, str(self.pin_bcm).encode('utf-8'), str(self.off).encode('utf-8')))
return script_str
[docs] def store_series(self, id, **kwargs):
"""
Create, and store a pigpio script for a series of output values to be called by :meth:`~.Digital_Out.series`
Args:
id (str): shorthand key used to call this series with :meth:`.series`
kwargs: passed to :meth:`~.Digital_Out._series_script`
"""
# actually might want to overwrite script_handles w/ same ids, like a solenoid that changes its 'open' duration
# if id in self.script_handles.keys():
# Exception("Script with id {} already created!".format(id))
series_script = self._series_script(**kwargs)
# check if there is an identical script already and use that instead
matches = [script_id for script_id, test_script in self.scripts.items() if test_script == series_script]
if len(matches)>0:
script_id = self.script_handles[matches[0]]
else:
try:
script_id = self.pig.store_script(series_script)
except Exception as e:
if 'illegal script command' in str(e):
raise Exception(f'got pigpio exception: {e} from attempted script {series_script}')
else:
raise e
self.script_handles[id] = script_id
self.scripts[id] = series_script
[docs] def series(self, id=None, delete=None, **kwargs):
"""
Execute a script that sets the pin to a series of values for a series of durations.
See :meth:`~.Digital_Out._series_script` for series parameterization.
Ideally one would use :meth:`.store_series` and use the returned id to call this function.
Otherwise, this method calls :meth:`.store_series` and runs it.
Args:
id (str, int): ID of the script, if not already created, created with :meth:`.store_script`.
If None (default), an ID is generated with :attr:`~.Digital_Out.script_counter` of the form
``'script_#'``
kwargs: passed to :meth:`~.Digital_Out._series_script`
"""
return_id = False
if id:
if id not in self.script_handles.keys():
self.store_series(id, **kwargs)
# if we weren't called with an ID, have to have a list of values/etc to create a script
else:
id = "script_{}".format(next(self.script_counter))
self.store_series(id, **kwargs)
# if we haven't been explicitly told to not delete it, we should
if delete is None:
delete = True
# since we've generated a script ID, we should return it
return_id = True
script_status = self.pig.script_status(self.script_handles[id])
if script_status == pigpio.PI_SCRIPT_INITING:
check_times = 0
while self.pig.script_status(self.script_handles[id]) == pigpio.PI_SCRIPT_INITING:
# TODO: Expose this as a parameter -- how long to try and init scripts before skipping, mebs some general 'timeout' variable for all blocking ops.
time.sleep(0.005)
check_times += 1
if check_times > 200:
break
try:
self.pig.run_script(self.script_handles[id])
self._last_script = id
except pigpio.error as e:
self.logger.exception(f'Couldnt run script: {e}')
finally:
if delete:
self.delete_script(id)
if return_id:
return id
[docs] def delete_script(self, script_id):
"""
spawn a thread to delete a script with id ``script_id``
This is a 'soft' deletion -- it checks if the script is running, and waits for up to 10 seconds
before actually deleting it.
The script is deleted from the pigpio daemon, from ``script_handles`` and from ``scripts``
Args:
script_id (str): a script ID in :attr:`.Digital_Out.script_handles`
"""
delete_script = threading.Thread(target=self._delete_script, args=(script_id,))
delete_script.start()
def _delete_script(self, script_id):
checktimes = 0
while self.pig.script_status(self.script_handles[script_id]) == pigpio.PI_SCRIPT_RUNNING:
time.sleep(1)
checktimes += 1
if checktimes > 10:
continue
self.pig.delete_script(self.script_handles[script_id])
del self.scripts[self.script_handles[script_id]]
del self.script_handles[script_id]
[docs] def delete_all_scripts(self):
"""
Stop and delete all scripts
This is a "hard" deletion -- the script will be immediately stopped if it's running.
"""
for script_handle, script_id in self.script_handles.items():
try:
self.stop_script(script_handle)
except Exception as e:
self.logger.exception(e)
try:
self.pig.delete_script(script_id)
except AttributeError:
pass
except Exception as e:
if 'unknown script id' in str(e):
pass
else:
self.logger.exception(e)
try:
del self.scripts[self.script_handles[script_id]]
except KeyError:
pass
try:
del self.script_handles[script_id]
except KeyError:
pass
[docs] def stop_script(self, id=None):
"""
Stops a running pigpio script
Args:
id (str, none): If None, stops the last run script. if str, stops script with that id.
"""
if not id:
id = self._last_script
if id is None:
# if there was no passed id and _last_script is None, return peacefully
return
# if we were passed a keyed, named script id, convert it to the pigio integer script id
if id in self.script_handles.keys():
script_id = self.script_handles[id]
status, _ = self.pig.script_status(script_id)
# if the script is still running, stop it and set the pin to off.
if status == pigpio.PI_SCRIPT_RUNNING:
self.pig.stop_script(script_id)
#self.set(self.off)
# set last_script to None so we won't try to double-stop this script
self._last_script = None
else:
self.logger.warning('No script with id {} found to stop'.format(id))
[docs] def release(self):
"""
Stops and deletes all scripts, sets to :attr:`~.Digital_Out.off`, and calls :meth:`.GPIO.release`
"""
self.logger.debug('releasing')
try:
self.delete_all_scripts()
#self.stop_script()
self.set(self.off)
time.sleep(0.1)
except AttributeError:
# self.pig has already been deleted (release has already been called)
# so self.pig has no attribute 'send' because it is None
pass
finally:
super(Digital_Out, self).release()
[docs]class Digital_In(GPIO):
"""
Record digital input and call one or more callbacks on logic transition.
Args:
pin (int): `Board-numbered <https://raspberrypi.stackexchange.com/a/12967>`_ GPIO pin.
event (:class:`threading.Event`): For callbacks assigned with :meth:`.assign_cb` with ``evented = True``,
set this event whenever the callback is triggered. Can be used to handle
stage transition logic here instead of the :class:`.Task` object, as is typical.
record (bool): Whether all logic transitions should be recorded as a list of ('EVENT', 'Timestamp') tuples.
max_events (int): Maximum size of the :attr:`.events` deque
**kwargs: passed to :class:`GPIO`
Sets the internal pullup/down resistor to :attr:`.Digital_In.off` and
:attr:`.Digital_In.trigger` to :attr:`.Digital_In.on` upon instantiation.
Note:
pull and trigger are set by polarity on initialization in digital inputs, unlike other GPIO classes.
They are not mutually synchronized however, ie. after initialization if any one of these attributes are changed, the other two will remain the same.
Attributes:
pig (:meth:`pigpio.pi`): The pigpio connection.
pin (int): Broadcom-numbered pin, converted from the argument given on instantiation
callbacks (list): A list of :meth:`pigpio.callback`s kept to clear them on exit
polarity (int): Logic direction, if 1: off=0, on=1, pull=low, trigger=high and vice versa for 0
events (list): if :attr:`.record` is True, a deque of ('EVENT', 'TIMESTAMP') tuples of length ``max_events``
"""
is_trigger=True
type = 'DIGI_IN'
input = True
def __init__(self, pin, event=None, record=True, max_events=256, **kwargs):
"""
"""
super(Digital_In, self).__init__(pin, **kwargs)
# pull the resistor in the off direction and set the trigger to be the on direction
self.pull = self.off
self.trigger = self.on
# We can be passed a threading.Event object if we want to handle stage logic here
# rather than in the parent as is typical.
self.event = event
# List to store callback handles assigned with assign_cb
self.callbacks = []
# List to store logic transition events
self.events = dq(maxlen=max_events)
self.record = record
if self.record:
self.assign_cb(self.record_event, add=True, evented=False, manual_trigger='B')
# Setup pin
self.pig.set_mode(self.pin_bcm, pigpio.INPUT)
[docs] def assign_cb(self, callback_fn, add=True, evented=False, manual_trigger=None):
"""
Sets ``callback_fn`` to be called when :attr:`.Digital_In.trigger` is detected.
``callback_fn`` must accept three parameters:
* GPIO (int, 0-31): the BCM number of the pin that was triggered
* level (0-2):
* 0: change to low (falling)
* 1: change to high (rising)
* 2: no change (watchdog timeout)
* timestamp (str): If using the Autopilot version of pigpio, an isoformatted timestamp
Args:
callback_fn (callable): The function to be called when triggered
add (bool): Are we adding another callback?
If False, the previous callbacks are cleared.
evented (bool): Should triggering this event also set the internal :attr:`~.Digital_In.event`?
Note that :attr:`.Digital_In.event` must have been passed.
manual_trigger ('U', 'D', 'B'): Override :attr:`.Digital_In.trigger` if needed.
"""
# If we aren't adding, we clear any existing callbacks
if not add:
self.clear_cb()
if self.record:
# if we're clearing all callbacks (maybe by accident)
# but we're configured to record events, re-add the record cb.
self.assign_cb(self.record_event, add=True, evented=False, manual_trigger="B")
# We can set the direction of the trigger manually,
# for example if we want to set 'BOTH' only sometimes
if not manual_trigger:
trigger_ud = self.trigger_edge
else:
trigger_ud = TRIGGER_MAP[manual_trigger]
# We can handle eventing (blocking) here if we want (usually this is handled in the parent)
# This won't work if we weren't init'd with an event.
if evented and self.event:
cb = self.pig.callback(self.pin_bcm, trigger_ud, self.event.set)
self.callbacks.append(cb)
elif evented and not self.event:
raise Exception('We have no internal event to set!')
cb = self.pig.callback(self.pin_bcm, trigger_ud, callback_fn)
self.callbacks.append(cb)
[docs] def clear_cb(self):
"""
Tries to call `.cancel()` on each of the callbacks in :attr:`~Digital_In.callbacks`
"""
for cb in self.callbacks:
try:
cb.cancel()
except:
self.logger.warning('could not cancel callback: {}'.format(cb))
self.callbacks = []
[docs] def record_event(self, pin, level, timestamp):
"""
On either direction of logic transition, record the time
Args:
pin (int): BCM numbered pin passed from pigpio
level (bool): High/Low status of current pin
timestamp (str): isoformatted timestamp
"""
# use self.pin rather than incoming pin b/c method is bound
# (ie. only will be called by pin assigned to)
# and self.pin is board rather than bcm numbered
self.events.append((self.pin, level, timestamp))
[docs] def release(self):
"""
Clears any callbacks and calls :meth:`GPIO.release`
"""
self.logger.debug('releasing')
self.clear_cb()
super(Digital_In, self).release()
[docs]class PWM(Digital_Out):
"""
PWM output from GPIO.
"""
output = True
type="PWM"
pigs_function = b'pwm'
def __init__(self, pin, range=255, **kwargs):
"""
Args:
pin (int): Board numbered GPIO pin
range (int): Maximum value of PWM duty-cycle. Default 255.
**kwargs: passed to :class:`Digital_Out`
"""
self._range = None
super(PWM, self).__init__(pin, **kwargs)
self.range = range
#self.pig.set_mode(self.pin_bcm, pigpio.OUTPUT)
# set to off in case of common anode/inverted polarity
self.set(self.off)
[docs] def set(self, value):
"""
Sets PWM duty cycle normalized to :attr:`.polarity` and transformed by :meth:`._clean_value`
Stops the last running script
Args:
value (int, float):
- if int > 1, sets value (or :attr:`PWM.range`-value if :attr:`PWM.polarity` is inverted).
- if 0 <= float <= 1, transforms to a proportion of :attr:`.range` (inverted if needed as well).
"""
value = self._clean_value(value)
self.stop_script()
self.pig.set_PWM_dutycycle(self.pin_bcm, value)
def _clean_value(self, value):
if value > 1:
if value > self.range:
self.logger.warning('clipping {} to range {}'.format(value, self.range))
value = np.clip(value, 0, self.range).astype(np.int)
elif 0 <= value <= 1:
value = np.round(value * self.range).astype(np.int)
else:
self.logger.exception('PWM value must be an integer between 0 and range: {}, or a float between 0 and 1. got {}'.format(self.range, value))
if self.polarity == 0:
value = self.range - value
return value
@property
def range(self):
"""
Maximum value of PWM dutycycle.
Doesn't set duration of PWM, but set values will be divided by this range.
eg. if ``range == 200``, calling :meth:`PWM.set(100)` would result in a 50% duty cycle
Args:
(int): 25-40000
"""
if not self._range:
self._range = self.pig.get_PWM_range(self.pin_bcm)
return self._range
@range.setter
def range(self, value):
try:
value = int(value)
except ValueError:
ValueError('PWM Range must be coerceable to an integer, got {}'.format(value))
if (value < 25) or (value > 40000):
Warning('PWM Range must be between 25 and 40000, got {}, clipping to range'.format(value))
value = np.clip(value, 25, 40000).astype(np.int)
self.pig.set_PWM_range(self.pin_bcm, value)
self._range = value
# set polarity again so off/on update
self.polarity = self._polarity
@property
def polarity(self):
"""
Logic direction.
- if 1: on=High=:attr:`~PWM.range`, off=Low=0;
- if 0: off=Low=0, on=High=:attr:`~PWM.range`.
When set, updates :attr:`~PWM.on` and :attr:`~PWM.off`
"""
return self._polarity
@polarity.setter
def polarity(self, polarity):
if polarity == 1:
self.on = self.range
self.off = 0
elif polarity == 0:
self.on = 0
self.off = self.range
else:
ValueError('polarity must be 0 or 1')
return
self._polarity = polarity
[docs] def release(self):
"""
Turn off and call :meth:`Digital_Out.release`
Returns:
"""
# FIXME: reimplementing parent release method here because of inconsistent use of self.off -- unify API and fix!!
self.logger.debug('releasing')
try:
self.delete_all_scripts()
self.set(0) # clean values should handle inversion, don't use self.off
time.sleep(0.1)
self.pig.stop()
except AttributeError:
# has already been called, pig already destroyed
pass
[docs]class LED_RGB(Digital_Out):
output = True
type = "LEDS"
def __init__(self, pins = None, r = None, g = None, b = None, polarity = 1, blink = True, **kwargs):
"""
An RGB LED, wrapper around three :class:`PWM` objects.
Args:
pins (list): A list of (board) pin numbers.
Either `pins` OR all `r`, `g`, `b` must be passed.
r (int): Board number of Red pin - must be passed with `g` and `b`
g (int): Board number of Green pin - must be passed with `r` and `b`
b (int): Board number of Blue pin - must be passed with `r` and `g`:
polarity (0, 1):
0: common anode (low turns LED on)
1: common cathode (low turns LED off)
blink (bool): Flash RGB at the end of init to show we're alive and bc it's real cute.
**kwargs: passed to :class:`Digital_Out`
Attributes:
channels (dict): The three PWM objects, {'r':PWM, ... etc}
"""
# self._pin = {}
self.channels = {}
self.scripts = {}
if 'pin' in kwargs.keys():
raise ValueError('pin passed to LED_RGB, need a list of 3 pins instead (r, g, b). pin, used by single-pin GPIO objects, is otherwise ambiguous with pins.')
super(LED_RGB, self).__init__(**kwargs)
self.flash_params = None
if pins and len(pins) == 3:
for color, pin in zip(('r', 'g', 'b'), pins):
self.channels[color] = PWM(pin, polarity=polarity, name=f"{self.name}_{color.upper()}")
elif r and g and b:
self.channels = {'r':PWM(r, polarity=polarity, name="{}_R".format(self.name)),
'g':PWM(g, polarity=polarity, name="{}_G".format(self.name)),
'b':PWM(b, polarity=polarity, name="{}_B".format(self.name))}
else:
raise ValueError('Must either set with pins= list/tuple of r,g,b pins, or pass all three as separate params')
self.store_series(id='blink',
colors=((1,0,0),(0,1,0),(0,0,1),(0,0,0)),
durations=250)
if blink:
self.series(id='blink')
@property
def range(self) -> dict:
"""
Returns:
dict: ranges for each of the :attr:`LED_RGB.channels`
"""
return {'r':self.channels['r'].range,
'g':self.channels['g'].range,
'b':self.channels['b'].range}
@range.setter
def range(self, range):
for channel in self.channels.items():
channel.range = range
[docs] def set(self, value=None, r=None, g=None, b=None):
"""
Set the color of the LED.
Can either pass
- a full (R, G, B) tuple to ``value``,
- a single ``value`` that is applied to each channel,
- if ``value`` is not passed, individual ``r``, ``g``, or ``b`` values can be passed (any combination can be set in a single call)
Stops the last run script
Args:
value (int, float, tuple, list): If list or tuple, an (R, G, B) color. If float or int, applied to each color channe.
Can be set with floats 0-1, or ints >= 1 (See :attr:`PWM.range`).
If None, use ``r``, ``g``, and ``b``.
r (float, int): value to set red channel
g (float, int): value to set green channel
b (float, int): value to set blue channel
"""
# Stop any running scripts (like blinks)
self.stop_script()
# Set either by `value` or by individual r, g, b
if value is not None:
# Set by `value`
if isinstance(value, int) or isinstance(value, float):
# Apply `value` to each channel
for channel in self.channels.values():
channel.set(value)
elif len(value) == 3:
# Assume value is a tuple of r, g, b
for channel_key, color_val in zip(('r','g','b'), value):
self.channels[channel_key].set(color_val)
else:
raise ValueError('Value must either be a single value or a tuple of (r,g,b)')
else:
# Set by individually specified r, g, b arguments
if r is not None:
self.channels['r'].set(r)
if g is not None:
self.channels['g'].set(g)
if b is not None:
self.channels['b'].set(b)
[docs] def toggle(self):
self.logger.warning('toggle not well defined for PWM')
[docs] def pulse(self, duration=None):
self.logger.warning('Use flash for LEDs instead')
[docs] def _series_script(self, colors, durations = None, unit="ms", repeat=None, finish_off = True):
"""
Create a script to flash a series of colors.
Like :meth:`Digital_Out._series_script`, but sets all pins at once.
Args:
colors (list): a list of (R, G, B) colors, or a list of ((R,G,B),duration) tuples.
durations (int, list): Duration of each color. if a single value, used for all ``colors``.
if a list, ``len(durations) == len(colors)``. If None, colors must be ``((R,G,B),duration)`` tuples.
unit ('ms', 'us'): unit of durations, milliseconds or microseconds
repeat (int): Number of repetitions. If None, script runs once.
finish_off (bool): Whether the channels should be set to ``off`` when the script completes
Returns:
str: constructed pigpio script string.
"""
if durations:
if isinstance(durations, int) or isinstance(durations, float):
iter_series = itertools.product(colors, [round(durations)])
elif len(colors) == len(durations):
iter_series = zip(colors, durations)
elif len(durations) == 1:
iter_series = itertools.product(colors, durations)
else:
ValueError("length of colors and durations must be equal, or length of durations must be 1. got len(colors)={}, len(durations)={}".format(len(colors), len(durations)))
else:
iter_series = colors.__iter__()
if unit == "ms":
wait_fn = "mils"
elif unit == "us":
wait_fn = "mics"
else:
ValueError("Unit for durations must be ms (milliseconds) or us (microseconds)")
string_pieces = []
for color, duration in iter_series:
# iter over keys rather than values of channels dict b/c dicts are unordered and colors are an ordered tuple
for channel_key, color_value in zip(('r', 'g', 'b'), color):
string_pieces.append(b" ".join((self.channels[channel_key].pigs_function,
bytes(str(self.channels[channel_key].pin_bcm), 'utf-8'),
bytes(str(self.channels[channel_key]._clean_value(color_value)), 'utf-8'))))
string_pieces.append(bytes("{} {}".format(wait_fn, duration), 'utf-8'))
script_str = b" ".join(string_pieces)
if repeat:
try:
repeat = int(repeat)
except:
ValueError('Repeat must be coerceable to an integer, got {}'.format(repeat))
script_str = b" ".join(("LD v0", str(repeat-1), # "LD (load) variable 0 with number of repeats"
"tag 999", # create a tag that can be returned to
script_str, # do the script one time
"dcr v0" # decrement v0.
"jp 999") # jump to the tag at the beginning if v0 is still positive
)
# turn off when finished
if finish_off:
#script_str += b" "
finish_pieces = [script_str]
for channel in self.channels.values():
finish_pieces.append(b" ".join((channel.pigs_function, bytes(str(channel.pin_bcm), 'utf-8'), bytes(str(channel.off), 'utf-8'))))
script_str = b" ".join(finish_pieces)
return script_str
[docs] def flash(self, duration, frequency=10, colors=((1,1,1),(0,0,0))):
"""
Specify a color series by total duration and flash frequency.
Largely a convenience function for on/off flashes.
Args:
duration (int, float): Duration of flash in ms.
frequency (int, float): Frequency of flashes in Hz
colors (list): A list of RGB values 0-255 like::
[[255,255,255],[0,0,0]]
"""
# Duration is total in ms, frequency in Hz
# Get number of flashes in duration rounded down
n_rep = int(float(duration) / 1000. * float(frequency))
flashes = colors * n_rep
# Invert frequency to duration for single flash
# divide by 2 b/c each 'color' is half the duration
single_dur = ((1. / frequency) * 1000) / 2.
flash_params = (flashes, single_dur)
new_flash = True
if self.flash_params:
if self.flash_params == flash_params:
# do nothing
new_flash = False
if new_flash:
self.store_series('flash',
colors=flashes,
durations=single_dur)
self.flash_params = flash_params
self.series('flash')
#
# def __getattr__(self, name):
# """if a method is called that we don't explicitly redefine, try to apply it to each of our channels"""
#
# attrs = {color: object.__getattribute__(chan, name) for color, chan in self.channels.items()}
# if hasattr(list(attrs.values())[0], '__call__'):
# def newfunc(*args, **kwargs):
# result = {}
# # if called with a dictionary of {'r':params, 'g':params, ...}, call that explicitly
# if isinstance(args[0], dict):
# if any([k in args[0].keys() for k in self.channels.keys()]):
# for color, method in attrs.items():
# if color in args[0].keys():
# result[color] = method(**args[0][color])
# return result
#
# # otherwise, call all channels with the same argument
# for color, method in attrs.items():
# result[color] = method(*args, **kwargs)
# return result
#
# return newfunc
# else:
# return attrs
[docs] def release(self):
"""
Release each channel and stop pig without calling superclass.
"""
self.logger.debug('releasing')
for chan in self.channels.values():
chan.release()
self.pig.stop()
@property
def pin(self):
"""
Dict of the board pin number of each channel, ``{'r' : self.channels['r'].pin, ... }``
"""
try:
return {'r':self.channels['r'].pin,
'g':self.channels['g'].pin,
'b':self.channels['b'].pin}
except KeyError:
return {}
@pin.setter
def pin(self, pin):
"""
Pins don't get set like this for the LED_RGB, ignore.
"""
if pin is None:
# just remnants of the attempt to set from the GPIO metaclass
return
self.logger.warning('pin cant be set via the attribute')
@property
def pin_bcm(self):
"""
Dict of the broadcom pin number of each channel, ``{'r' : self.channels['r'].pin_bcm, ... }``
"""
try:
return {'r':self.channels['r'].pin_bcm,
'g':self.channels['g'].pin_bcm,
'b':self.channels['b'].pin_bcm}
except KeyError:
return {}
@pin_bcm.setter
def pin_bcm(self, pin_bcm):
"""
Pins don't get set like this for the LED_RGB, ignore\
"""
if pin_bcm is None:
return
self.logger.warning('pin_bcm cant be set via the attribute')
@property
def pull(self):
return None
@pull.setter
def pull(self, direction):
if direction is None:
return
self.logger.warning('pull cant be set via the attribute')
[docs]class Solenoid(Digital_Out):
"""
Solenoid valve for water delivery.
Args:
pin (int): Board pin number, converted to BCM on init.
polarity (0, 1): Whether HIGH opens the port (1) or closes it (0)
duration (int, float): duration of open, ms.
vol (int, float): desired volume of reward in uL, must have computed calibration results, see :meth:`~.Terminal.calibrate_ports`
**kwargs: passed to :class:`Digital_Out`
Only NC solenoids should be used, as there is no way to guarantee
that a pin will maintain its voltage when it is released, and you will
spill water all over the place.
Attributes:
calibration (dict): Dict with with line coefficients fitting volume to open duration, see :meth:`~.Terminal.calibrate_ports`.
Retrieved from prefs, specifically ``prefs.get('PORT_CALIBRATION')[name]``
mode ('DURATION', 'VOLUME'): Whether open duration is given in ms, or computed from calibration
duration (int, float): Duration of valve opening, in ms. When set, creates a script 'open' that is used to open the valve for a precise amount of time
"""
output = True
type = "SOLENOID"
DURATION_MIN = 2 #: Minimum allowed duration in ms
def __init__(self, pin, polarity=1, duration=20, vol=None, **kwargs):
"""
"""
super(Solenoid, self).__init__(pin, polarity=polarity, **kwargs)
self.calibration = None
self._duration = None
# Pigpio has us create waves to deliver timed output
# Since we typically only use one duration,
# we make the wave once and only make it again when asked to
# We start with passed or default duration (ms)
if vol:
self.duration = self.dur_from_vol(vol)
self.mode = 'VOLUME'
else:
self.duration = duration
self.mode='DURATION'
@property
def duration(self):
return self._duration
@duration.setter
def duration(self, duration):
duration = round(duration)
if duration < self.DURATION_MIN:
Warning('Duration cannot be less than {}, so value was clipped. given {}'.format(self.DURATION_MIN, duration))
duration = self.DURATION_MIN
self._duration = duration
self.store_series(id="open", values=self.on, durations=self._duration, unit="ms", finish_off=True, repeat=None)
[docs] def dur_from_vol(self, vol):
"""
Given a desired volume, compute an open duration.
Must have calibration available in prefs, see :meth:`~.Terminal.calibrate_ports`.
Args:
vol (float, int): desired reward volume in uL
Returns:
int: computed opening duration for given volume
"""
# find our pin name
if not self.name:
self.name = self.get_name()
# legacy, compatibility code -- if calibrations were made in the olde way
# then try and load them. they will be saved in the new way by the calibration setter in the root class
# prefs should have loaded any calibration
if self.calibration is None:
try:
self.calibration = prefs.get('PORT_CALIBRATION')[self.name]
except KeyError:
# try using name prepended with PORTS_, which happens for hardware objects with implicit names
self.calibration = prefs.get('PORT_CALIBRATION')[self.name.replace('PORTS_', '')]
except Exception as e:
self.logger.exception(f'couldnt get calibration, using default LUT y = 3.5 + 2. got error {e}')
self.calibration = {'slope': 3.5, 'intercept': 2}
# compute duration from slope and intercept
duration = round(float(self.calibration['intercept']) + (float(self.calibration['slope']) * float(vol)))
return duration
[docs] def open(self, duration=None):
"""
Open the valve.
Uses the 'open' script created when assigning duration.
Args:
duration (float): If provided, open for this duration instead of
the duration stored on instantiation.
"""
if duration:
self.duration = duration
self.series(id="open")