Source code for autopilot.hardware.usb

"""
Hardware that uses USB
"""
import sys
import threading
import time
if sys.version_info >= (3, 0):
    from queue import Queue, Empty
else:
    from Queue import Queue, Empty

import numpy as np
from inputs import devices

from autopilot import prefs
from autopilot.core.networking import Net_Node
from autopilot.hardware import Hardware


[docs]class Wheel(Hardware): """ A continuously measured mouse wheel. Uses a USB computer mouse. Warning: 'vel' thresh_type not implemented """ input = True type = "Wheel" trigger = False # even though this is a triggerable option, typically don't want to assign a cb and instead us a GPIO # TODO: Make the standard-style trigger. # TODO: Make wheel movements available locally with a deque THRESH_TYPES = ['dist', 'x', 'y', 'vel'] MODES = ('vel_total', 'steady', 'dist', 'timed') MOVE_DTYPE = [('vel', 'i4'), ('dir', 'U5'), ('timestamp', 'f8')] def __init__(self, mouse_idx=0, fs=10, thresh=100, thresh_type='dist', start=True, digi_out = False, mode='vel_total', integrate_dur=5): """ Args: mouse_idx (int): fs (int): thresh (int): thresh_type ('dist'): start (bool): digi_out (:class:`~.Digital_Out`, bool): mode ('vel_total'): integrate_dur (int): """ # try to get mouse from inputs # TODO: More robust - specify mouse by hardware attrs try: self.mouse = devices.mice[mouse_idx] except IndexError: Warning('Could not find requested mouse with index {}\nAttempting to use mouse idx 0'.format(mouse_idx)) self.mouse = devices.mice[0] # frequency of our updating self.fs = fs # time between updates self.update_dur = 1./float(self.fs) self.thresh = thresh # thresh type can be 'dist', 'x', 'y', or 'vel' if thresh_type not in self.THRESH_TYPES: ValueError('thresh_type must be one of {}, given {}'.format(self.THRESH_TYPES, thresh_type)) self.thresh_type = thresh_type # mode can be 'vel_total', 'vel_x', 'vel_y' or 'dist' - report either velocity or distance # mode can also be ' # TODO: Do two parameters - type 'vel' or 'dist' and measure 'x', 'y', 'total'z self.mode = mode # TODO: Implement this if self.mode == "steady": self.thresh_val = np.array([(0, "REL_Y", 0)], dtype=self.MOVE_DTYPE) else: self.thresh_val = 0.0 self.integrate_dur = integrate_dur # event to signal quitting self.quit_evt = threading.Event() self.quit_evt.clear() # event to signal when to start accumulating movements to trigger self.measure_evt = threading.Event() self.measure_time = 0 # queue to I/O mouse movements summarized at fs Hz self.q = Queue() # lock to prevent race between putting and getting self.qlock = threading.Lock() self.listens = {'MEASURE':self.l_measure, 'CLEAR':self.l_clear, 'STOP':self.l_stop} self.node = Net_Node('wheel_{}'.format(mouse_idx), upstream=prefs.get('NAME'), port=prefs.get('MSGPORT'), listens=self.listens, ) # if we are being used in a child object, we send our trigger via a GPIO pin self.digi_out = digi_out self.thread = None if start: self.start()
[docs] def start(self): self.thread = threading.Thread(target=self._record) self.thread.daemon = True self.thread.start()
def _mouse(self): while self.quit_evt: events = self.mouse.read() self.q.put(events) def _record(self): moves = np.array([], dtype=self.MOVE_DTYPE) threading.Thread(target=self._mouse).start() last_update = time.time() while not self.quit_evt.is_set(): try: events = self.q.get_nowait() except Empty: events = None if events is None: move = np.array([(0, "REL_Y", 0)], dtype=self.MOVE_DTYPE) else: # make a numpy record array of events with 3 fields: # velocity, dir(ection), timestamp (system seconds) move = np.array([(int(event.state), event.code, float(event.timestamp))\ for event in events if event.code in ('REL_X', 'REL_Y')], dtype=self.MOVE_DTYPE) moves = np.concatenate([moves, move]) # If we have been told to start measuring for a trigger... if self.measure_evt.is_set(): do_trigger = self.check_thresh(move) if do_trigger: self.thresh_trig() self.measure_evt.clear() # take the integral of velocities # If it's time to report velocity, do it. nowtime = time.time() if (nowtime-last_update)>self.update_dur: # TODO: Implement distance/position reporting y_vel = self.calc_move(moves, 'y') x_vel = self.calc_move(moves, 'x') self.node.send(key='CONTINUOUS', value={'x':x_vel, 'y':y_vel, 't':nowtime}, repeat=False) moves = np.array([], dtype=self.MOVE_DTYPE) last_update = nowtime
[docs] def check_thresh(self, move): """ Updates thresh_val and checks whether it's above/below threshold Args: move (np.array): Structured array with fields ('vel', 'dir', 'timestamp') Returns: """ # Determine whether the threshold was surpassed do_trigger = False if self.mode == 'vel_total': thresh_update = self.calc_move(move) # If instantaneous velocity is above thresh... if thresh_update > self.thresh: do_trigger = True elif self.mode == 'steady': # If movements in the recent past are below a certain value # self.thresh_val should be set to a structured array by l_measure try: self.thresh_val = np.concatenate([self.thresh_val, move]) except TypeError: print('THRESH_VAL:', self.thresh_val, 'MOVE:', move) # trim to movements in the time window thresh_val = self.thresh_val[self.thresh_val['timestamp'] > time.time()-self.integrate_dur] thresh_update = self.calc_move(thresh_val) if (thresh_update < self.thresh) and (self.measure_time+self.integrate_dur < time.time()): do_trigger = True elif self.mode == 'dist': thresh_update = self.calc_move(move) self.thresh_val += thresh_update if self.thresh_val > self.thresh: do_trigger = True else: Warning ("mode is not defined! mode is {}".format(self.mode)) return do_trigger
[docs] def calc_move(self, move, thresh_type=None): """ Calculate distance move depending on type (x, y, total dist) Args: move (): thresh_type (): Returns: """ if thresh_type is None: thresh_type = self.thresh_type # FIXME: rly inefficient # get the value of the movement depending on what we're measuring if thresh_type == 'x': distance = np.sum(move['vel'][move['dir'] == "REL_X"]) elif thresh_type == 'y': distance = np.sum(move['vel'][move['dir'] == "REL_Y"]) elif thresh_type == "dist": x_dist = np.sum(move['vel'][move['dir'] == "REL_X"]) y_dist = np.sum(move['vel'][move['dir'] == "REL_Y"]) distance = np.abs(np.sqrt(float(x_dist ** 2) + float(y_dist ** 2))) return distance
[docs] def thresh_trig(self): if self.digi_out: self.digi_out.pulse() self.measure_evt.clear()
[docs] def assign_cb(self, trigger_fn): # want to have callback write an output pin -- so callback should go back to # the task to write a GPIO pin. self.trig_fn = trigger_fn
[docs] def l_measure(self, value): """ Task has signaled that we need to start measuring movements for a trigger Args: value (): """ if 'mode' in value.keys(): if value['mode'] in self.MODES: self.mode = value['mode'] else: Warning('incorrect mode sent: {}, needs to be one of {}'.format(value['mode'], self.MODES)) if 'thresh' in value.keys(): self.thresh = float(value['thresh']) if self.mode == "steady": self.thresh_val = np.array([(0, "REL_Y", 0)], dtype=self.MOVE_DTYPE) else: self.thresh_val = 0.0 self.measure_time = time.time() self.measure_evt.set() sys.stdout.flush()
[docs] def l_clear(self, value): """ Stop measuring! Args: value (): Returns: """ self.measure_evt.clear()
[docs] def l_stop(self, value): """ Stop measuring and clear system resources Args: value (): Returns: """ self.measure_evt.set() self.release()
[docs] def release(self): self.quit_evt.clear()
[docs]class Scale(Hardware): """ Note: Not implemented, working on using a digital scale to make weighing faster. """ MODEL={ 'stamps.com':{ 'vendor_id':0x1446, 'product_id': 0x6a73 } } def __init__(self, model='stamps.com', vendor_id = None, product_id = None): """ Args: model: vendor_id: product_id: """ self.vendor_id = self.MODEL[model]['vendor_id'] self.product_id = self.MODEL[model]['product_id'] if vendor_id: self.vendor_id = vendor_id if product_id: self.product_id = product_id # find device self.device = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id) # default configuration self.device.set_configuration()