Source code for autopilot.hardware

"""

Classes that manage hardware logic.

Each hardware class should be able to operate independently - ie. not
be dependent on a particular task class, etc. Other than that there are
very few design requirements:

* Every class should have a .release() method that releases any system
  resources in use by the object, eg. objects that use pigpio must have
  their `pigpio.pi` client stopped; LEDs should be explicitly turned off.
* The very minimal class attributes are described in the :class:`Hardware` metaclass.
* Hardware methods are typically called in their own threads, so care should
  be taken to make any long-running operations internally threadsafe.

.. _numbering-note:

Note:
    This software was primarily developed for the Raspberry Pi, which
    has `two types of numbering schemes <https://pinout.xyz/#>`_ ,
    "board" numbering based on physical position (e.g. pins 1-40, in 2 rows of 20 pins) and "bcm" numbering
    based on the broadcom chip numbering scheme (e.g. GPIO2, GPIO27).

    Board numbering is easier to use, but `pigpio <http://abyz.me.uk/rpi/pigpio/>`_
    , which we use as a bridge between Python and the GPIOs, uses the BCM scheme.
    As such each class that uses the GPIOs takes a board number as its argument
    and converts it to a BCM number in the __init__ method.

    If there is sufficient demand to make this more flexible, we can implement
    an additional `pref` to set the numbering scheme, but the current solution
    works without getting too muddy.

Warning:
    In order to use pigpio, the pigpio daemon must be running. See `the docs <http://abyz.me.uk/rpi/pigpio/python.html>`_
    Usually :class:`~.core.pilot.Pilot` s should be started by the bash script or systemd service
    generated by :mod:`.setup.setup_pilot`, which starts pigpiod.
"""

import typing
import warnings
import json
from pathlib import Path
import importlib

from autopilot import prefs
from autopilot.networking import Net_Node
from autopilot.core.loggers import init_logger
from autopilot.utils.common import NumpyEncoder, NumpyDecoder

# FIXME: Hardcoding names of metaclasses, should have some better system of denoting which classes can be instantiated
# directly for setup and prefs management.
META_CLASS_NAMES = ['Hardware', 'Camera', 'GPIO', 'Directory_Writer', 'Video_Writer']

# pigpio only uses BCM numbers, we need to translate them
# See https://www.element14.com/community/servlet/JiveServlet/previewBody/73950-102-11-339300/pi3_gpio.png
BOARD_TO_BCM = {
     3: 2,   5: 3,   7: 4,   8: 14, 10: 15,
    11: 17, 12: 18, 13: 27, 15: 22, 16: 23,
    18: 24, 19: 10, 21: 9,  22: 25, 23: 11,
    24: 8,  26: 7,  29: 5,  31: 6,  32: 12,
    33: 13, 35: 19, 36: 16, 37: 26, 38: 20,
    40: 21
}
"""
dict: Mapping from board (physical) numbering to BCM numbering. 

See `this pinout <https://pinout.xyz/#>`_.

Hardware objects take board numbered pins and convert them to BCM 
numbers for use with `pigpio`.
"""

BCM_TO_BOARD = dict([reversed(i) for i in BOARD_TO_BCM.items()])
"""
dict: The inverse of :const:`BOARD_TO_BCM`.
"""

[docs]class Hardware(object): """ Generic class inherited by all hardware. Should not be instantiated on its own (but it won't do anything bad so go nuts i guess). Primarily for the purpose of defining necessary attributes. Attributes: name (str): unique name used to identify this object within its group. group (str): hardware group, corresponds to key in prefs.json ``"HARDWARE": {"GROUP": {"ID": {**params}}}`` is_trigger (bool): Is this object a discrete event input device? or, will this device be used to trigger some event? If `True`, will be given a callback by :class:`.Task`, and :meth:`.assign_cb` must be redefined. pin (int): The BCM pin used by this device, or None if no pin is used. type (str): What is this device known as in `.prefs`? Not required. input (bool): Is this an input device? output (bool): Is this an output device? """ # metaclass for hardware objects is_trigger = False pin = None type = "" # what are we known as in prefs? input = False output = False def __init__(self, name=None, group=None, **kwargs): if name: self.name = name else: try: self.name = self.get_name() except: warnings.warn('wasnt passed name and couldnt find from prefs for object: {}'.format(self.__str__)) self.name = None self.group = group self._calibration = None self.logger = init_logger(self) # type: logging.Logger self.listens = {} self.node = None
[docs] def release(self): """ Every hardware device needs to redefine `release()`, and must * Safely unload any system resources used by the object, and * Return the object to a neutral state - eg. LEDs turn off. When not redefined, a warning is given. """ raise Exception('The release method was not overridden by the subclass!')
[docs] def assign_cb(self, trigger_fn): """ Every hardware device that is a :attr:`~Hardware.trigger` must redefine this to accept a function (typically :meth:`.Task.handle_trigger`) that is called when that trigger is activated. When not redefined, a warning is given. """ if self.is_trigger: raise Exception("The assign_cb method was not overridden by the subclass!")
[docs] def get_name(self): """ Usually Hardware is only instantiated with its pin number, but we can get its name from prefs """ # TODO: Unify identification of hardware types across prefs and hardware objects try: our_type = prefs.get('HARDWARE')[self.type] except KeyError: our_type = prefs.get('HARDWARE')[self.__class__.__name__] for name, pin in our_type.items(): if self.pin == pin: return name elif isinstance(pin, dict): if self.pin == pin['pin']: return name
[docs] def init_networking(self, listens=None, **kwargs): """ Spawn a :class:`.Net_Node` to :attr:`Hardware.node` for streaming or networked command Args: listens (dict): Dictionary mapping message keys to handling methods **kwargs: Passed to :class:`.Net_Node` Returns: """ if not listens: listens = self.listens self.node = Net_Node( self.name, upstream=prefs.get('NAME'), port=prefs.get('MSGPORT'), listens=listens, instance=False, **kwargs #upstream_ip=prefs.get('TERMINALIP'), #daemon=False )
@property def calibration(self) -> typing.Optional[dict]: """ Calibration used by the hardware object. Attempt to read from ``prefs.get('CALIBRATIONDIR')/group.name.json`` , if :attr:`Hardware.group` is ``None``, attempt to read from ``prefs.get('CALIBRATIONDIR')/name.json`` Setting the attribute (over)writes the calibration to disk as a `.json` file Will be different for each hardware type, subclasses should document this property separately (eg. by overwriting ``Hardware.calibration.__doc__`` Returns: (dict): if calibration is found, a dictionary of calibration for each property. None if no calibration found """ if self._calibration is None: # try and find calibration file cal_name = None if self.name is not None and self.group is not None: cal_name = ".".join([self.group, self.name]) elif self.name is not None: cal_name = self.name else: self.logger.debug('Hardware object has no group or name, cant find calibration!') if cal_name is not None: cal_name += ".json" path = Path(prefs.get('CALIBRATIONDIR')) / cal_name if path.exists(): with open(path, 'r') as cal_f: self._calibration = json.load(cal_f, cls=NumpyDecoder) self.logger.info(f'Calibration loaded from {path}') else: self.logger.debug(f"No calibration found at {path}!") return self._calibration @calibration.setter def calibration(self, calibration): if calibration is None: self._calibration = calibration return # write to file # try and find calibration file cal_name = None if self.name is not None and self.group is not None: cal_name = ".".join([self.group, self.name]) elif self.name is not None: cal_name = self.name else: self.logger.exception('Hardware has no group or name, dont know where to write calibration! saving in attribute, but will be lost on close!') if cal_name is not None: cal_name += '.json' cal_fn = Path(prefs.get('CALIBRATIONDIR')) / cal_name with open(cal_fn, 'w') as cal_f: json.dump(calibration, cal_f, cls=NumpyEncoder) self.logger.info(f'Calibration saved to {cal_fn}: \n{calibration}') self._calibration = calibration