Source code for autopilot.hardware.usb

"""
Hardware that uses USB
"""
import sys
import threading
import time

from queue import Queue, Empty

import numpy as np
from inputs import devices

from autopilot import prefs
from autopilot.networking import Net_Node
from autopilot.hardware import Hardware


[docs]class Wheel(Hardware): """ A continuously measured mouse wheel. Uses a USB computer mouse. Warning: 'vel' thresh_type not implemented """ input = True type = "Wheel" trigger = False # even though this is a triggerable option, typically don't want to assign a cb and instead us a GPIO # TODO: Make the standard-style trigger. # TODO: Make wheel movements available locally with a deque THRESH_TYPES = ['dist', 'x', 'y', 'vel'] MODES = ('vel_total', 'steady', 'dist', 'timed') MOVE_DTYPE = [('vel', 'i4'), ('dir', 'U5'), ('timestamp', 'f8')] def __init__(self, mouse_idx=0, fs=10, thresh=100, thresh_type='dist', start=True, digi_out = False, mode='vel_total', integrate_dur=5): """ Args: mouse_idx (int): fs (int): thresh (int): thresh_type ('dist'): start (bool): digi_out (:class:`~.Digital_Out`, bool): mode ('vel_total'): integrate_dur (int): """ # try to get mouse from inputs # TODO: More robust - specify mouse by hardware attrs try: self.mouse = devices.mice[mouse_idx] except IndexError: Warning('Could not find requested mouse with index {}\nAttempting to use mouse idx 0'.format(mouse_idx)) self.mouse = devices.mice[0] # frequency of our updating self.fs = fs # time between updates self.update_dur = 1./float(self.fs) self.thresh = thresh # thresh type can be 'dist', 'x', 'y', or 'vel' if thresh_type not in self.THRESH_TYPES: ValueError('thresh_type must be one of {}, given {}'.format(self.THRESH_TYPES, thresh_type)) self.thresh_type = thresh_type # mode can be 'vel_total', 'vel_x', 'vel_y' or 'dist' - report either velocity or distance # mode can also be ' # TODO: Do two parameters - type 'vel' or 'dist' and measure 'x', 'y', 'total'z self.mode = mode # TODO: Implement this if self.mode == "steady": self.thresh_val = np.array([(0, "REL_Y", 0)], dtype=self.MOVE_DTYPE) else: self.thresh_val = 0.0 self.integrate_dur = integrate_dur # event to signal quitting self.quit_evt = threading.Event() self.quit_evt.clear() # event to signal when to start accumulating movements to trigger self.measure_evt = threading.Event() self.measure_time = 0 # queue to I/O mouse movements summarized at fs Hz self.q = Queue() # lock to prevent race between putting and getting self.qlock = threading.Lock() self.listens = {'MEASURE':self.l_measure, 'CLEAR':self.l_clear, 'STOP':self.l_stop} self.node = Net_Node('wheel_{}'.format(mouse_idx), upstream=prefs.get('NAME'), port=prefs.get('MSGPORT'), listens=self.listens, ) # if we are being used in a child object, we send our trigger via a GPIO pin self.digi_out = digi_out self.thread = None if start: self.start()
[docs] def start(self): self.thread = threading.Thread(target=self._record) self.thread.daemon = True self.thread.start()
def _mouse(self): while self.quit_evt: events = self.mouse.read() self.q.put(events) def _record(self): moves = np.array([], dtype=self.MOVE_DTYPE) threading.Thread(target=self._mouse).start() last_update = time.time() while not self.quit_evt.is_set(): try: events = self.q.get_nowait() except Empty: events = None if events is None: move = np.array([(0, "REL_Y", 0)], dtype=self.MOVE_DTYPE) else: # make a numpy record array of events with 3 fields: # velocity, dir(ection), timestamp (system seconds) move = np.array([(int(event.state), event.code, float(event.timestamp))\ for event in events if event.code in ('REL_X', 'REL_Y')], dtype=self.MOVE_DTYPE) moves = np.concatenate([moves, move]) # If we have been told to start measuring for a trigger... if self.measure_evt.is_set(): do_trigger = self.check_thresh(move) if do_trigger: self.thresh_trig() self.measure_evt.clear() # take the integral of velocities # If it's time to report velocity, do it. nowtime = time.time() if (nowtime-last_update)>self.update_dur: # TODO: Implement distance/position reporting y_vel = self.calc_move(moves, 'y') x_vel = self.calc_move(moves, 'x') self.node.send(key='CONTINUOUS', value={'x':x_vel, 'y':y_vel, 't':nowtime}, repeat=False) moves = np.array([], dtype=self.MOVE_DTYPE) last_update = nowtime
[docs] def check_thresh(self, move): """ Updates thresh_val and checks whether it's above/below threshold Args: move (np.array): Structured array with fields ('vel', 'dir', 'timestamp') Returns: """ # Determine whether the threshold was surpassed do_trigger = False if self.mode == 'vel_total': thresh_update = self.calc_move(move) # If instantaneous velocity is above thresh... if thresh_update > self.thresh: do_trigger = True elif self.mode == 'steady': # If movements in the recent past are below a certain value # self.thresh_val should be set to a structured array by l_measure try: self.thresh_val = np.concatenate([self.thresh_val, move]) except TypeError: print('THRESH_VAL:', self.thresh_val, 'MOVE:', move) # trim to movements in the time window thresh_val = self.thresh_val[self.thresh_val['timestamp'] > time.time()-self.integrate_dur] thresh_update = self.calc_move(thresh_val) if (thresh_update < self.thresh) and (self.measure_time+self.integrate_dur < time.time()): do_trigger = True elif self.mode == 'dist': thresh_update = self.calc_move(move) self.thresh_val += thresh_update if self.thresh_val > self.thresh: do_trigger = True else: Warning ("mode is not defined! mode is {}".format(self.mode)) return do_trigger
[docs] def calc_move(self, move, thresh_type=None): """ Calculate distance move depending on type (x, y, total dist) Args: move (): thresh_type (): Returns: """ if thresh_type is None: thresh_type = self.thresh_type # FIXME: rly inefficient # get the value of the movement depending on what we're measuring if thresh_type == 'x': distance = np.sum(move['vel'][move['dir'] == "REL_X"]) elif thresh_type == 'y': distance = np.sum(move['vel'][move['dir'] == "REL_Y"]) elif thresh_type == "dist": x_dist = np.sum(move['vel'][move['dir'] == "REL_X"]) y_dist = np.sum(move['vel'][move['dir'] == "REL_Y"]) distance = np.abs(np.sqrt(float(x_dist ** 2) + float(y_dist ** 2))) return distance
[docs] def thresh_trig(self): if self.digi_out: self.digi_out.pulse() self.measure_evt.clear()
[docs] def assign_cb(self, trigger_fn): # want to have callback write an output pin -- so callback should go back to # the task to write a GPIO pin. self.trig_fn = trigger_fn
[docs] def l_measure(self, value): """ Task has signaled that we need to start measuring movements for a trigger Args: value (): """ if 'mode' in value.keys(): if value['mode'] in self.MODES: self.mode = value['mode'] else: Warning('incorrect mode sent: {}, needs to be one of {}'.format(value['mode'], self.MODES)) if 'thresh' in value.keys(): self.thresh = float(value['thresh']) if self.mode == "steady": self.thresh_val = np.array([(0, "REL_Y", 0)], dtype=self.MOVE_DTYPE) else: self.thresh_val = 0.0 self.measure_time = time.time() self.measure_evt.set() sys.stdout.flush()
[docs] def l_clear(self, value): """ Stop measuring! Args: value (): Returns: """ self.measure_evt.clear()
[docs] def l_stop(self, value): """ Stop measuring and clear system resources Args: value (): Returns: """ self.measure_evt.set() self.release()
[docs] def release(self): self.quit_evt.clear()
[docs]class Scale(Hardware): """ Note: Not implemented, working on using a digital scale to make weighing faster. """ MODEL={ 'stamps.com':{ 'vendor_id':0x1446, 'product_id': 0x6a73 } } def __init__(self, model='stamps.com', vendor_id = None, product_id = None): """ Args: model: vendor_id: product_id: """ self.vendor_id = self.MODEL[model]['vendor_id'] self.product_id = self.MODEL[model]['product_id'] if vendor_id: self.vendor_id = vendor_id if product_id: self.product_id = product_id # find device self.device = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id) # default configuration self.device.set_configuration()