"""
Hardware that uses USB
"""
import sys
import threading
import time
if sys.version_info >= (3, 0):
from queue import Queue, Empty
else:
from Queue import Queue, Empty
import numpy as np
from inputs import devices
from autopilot import prefs
from autopilot.core.networking import Net_Node
from autopilot.hardware import Hardware
[docs]class Wheel(Hardware):
"""
A continuously measured mouse wheel.
Uses a USB computer mouse.
Warning:
'vel' thresh_type not implemented
"""
input = True
type = "Wheel"
trigger = False # even though this is a triggerable option, typically don't want to assign a cb and instead us a GPIO
# TODO: Make the standard-style trigger.
# TODO: Make wheel movements available locally with a deque
THRESH_TYPES = ['dist', 'x', 'y', 'vel']
MODES = ('vel_total', 'steady', 'dist', 'timed')
MOVE_DTYPE = [('vel', 'i4'), ('dir', 'U5'), ('timestamp', 'f8')]
def __init__(self, mouse_idx=0, fs=10, thresh=100, thresh_type='dist', start=True,
digi_out = False, mode='vel_total', integrate_dur=5):
"""
Args:
mouse_idx (int):
fs (int):
thresh (int):
thresh_type ('dist'):
start (bool):
digi_out (:class:`~.Digital_Out`, bool):
mode ('vel_total'):
integrate_dur (int):
"""
# try to get mouse from inputs
# TODO: More robust - specify mouse by hardware attrs
try:
self.mouse = devices.mice[mouse_idx]
except IndexError:
Warning('Could not find requested mouse with index {}\nAttempting to use mouse idx 0'.format(mouse_idx))
self.mouse = devices.mice[0]
# frequency of our updating
self.fs = fs
# time between updates
self.update_dur = 1./float(self.fs)
self.thresh = thresh
# thresh type can be 'dist', 'x', 'y', or 'vel'
if thresh_type not in self.THRESH_TYPES:
ValueError('thresh_type must be one of {}, given {}'.format(self.THRESH_TYPES, thresh_type))
self.thresh_type = thresh_type
# mode can be 'vel_total', 'vel_x', 'vel_y' or 'dist' - report either velocity or distance
# mode can also be '
# TODO: Do two parameters - type 'vel' or 'dist' and measure 'x', 'y', 'total'z
self.mode = mode
# TODO: Implement this
if self.mode == "steady":
self.thresh_val = np.array([(0, "REL_Y", 0)], dtype=self.MOVE_DTYPE)
else:
self.thresh_val = 0.0
self.integrate_dur = integrate_dur
# event to signal quitting
self.quit_evt = threading.Event()
self.quit_evt.clear()
# event to signal when to start accumulating movements to trigger
self.measure_evt = threading.Event()
self.measure_time = 0
# queue to I/O mouse movements summarized at fs Hz
self.q = Queue()
# lock to prevent race between putting and getting
self.qlock = threading.Lock()
self.listens = {'MEASURE':self.l_measure,
'CLEAR':self.l_clear,
'STOP':self.l_stop}
self.node = Net_Node('wheel_{}'.format(mouse_idx),
upstream=prefs.get('NAME'),
port=prefs.get('MSGPORT'),
listens=self.listens,
)
# if we are being used in a child object, we send our trigger via a GPIO pin
self.digi_out = digi_out
self.thread = None
if start:
self.start()
[docs] def start(self):
self.thread = threading.Thread(target=self._record)
self.thread.daemon = True
self.thread.start()
def _mouse(self):
while self.quit_evt:
events = self.mouse.read()
self.q.put(events)
def _record(self):
moves = np.array([], dtype=self.MOVE_DTYPE)
threading.Thread(target=self._mouse).start()
last_update = time.time()
while not self.quit_evt.is_set():
try:
events = self.q.get_nowait()
except Empty:
events = None
if events is None:
move = np.array([(0, "REL_Y", 0)], dtype=self.MOVE_DTYPE)
else:
# make a numpy record array of events with 3 fields:
# velocity, dir(ection), timestamp (system seconds)
move = np.array([(int(event.state), event.code, float(event.timestamp))\
for event in events if event.code in ('REL_X', 'REL_Y')],
dtype=self.MOVE_DTYPE)
moves = np.concatenate([moves, move])
# If we have been told to start measuring for a trigger...
if self.measure_evt.is_set():
do_trigger = self.check_thresh(move)
if do_trigger:
self.thresh_trig()
self.measure_evt.clear()
# take the integral of velocities
# If it's time to report velocity, do it.
nowtime = time.time()
if (nowtime-last_update)>self.update_dur:
# TODO: Implement distance/position reporting
y_vel = self.calc_move(moves, 'y')
x_vel = self.calc_move(moves, 'x')
self.node.send(key='CONTINUOUS', value={'x':x_vel, 'y':y_vel, 't':nowtime},
repeat=False)
moves = np.array([], dtype=self.MOVE_DTYPE)
last_update = nowtime
[docs] def check_thresh(self, move):
"""
Updates thresh_val and checks whether it's above/below threshold
Args:
move (np.array): Structured array with fields ('vel', 'dir', 'timestamp')
Returns:
"""
# Determine whether the threshold was surpassed
do_trigger = False
if self.mode == 'vel_total':
thresh_update = self.calc_move(move)
# If instantaneous velocity is above thresh...
if thresh_update > self.thresh:
do_trigger = True
elif self.mode == 'steady':
# If movements in the recent past are below a certain value
# self.thresh_val should be set to a structured array by l_measure
try:
self.thresh_val = np.concatenate([self.thresh_val, move])
except TypeError:
print('THRESH_VAL:', self.thresh_val, 'MOVE:', move)
# trim to movements in the time window
thresh_val = self.thresh_val[self.thresh_val['timestamp'] > time.time()-self.integrate_dur]
thresh_update = self.calc_move(thresh_val)
if (thresh_update < self.thresh) and (self.measure_time+self.integrate_dur < time.time()):
do_trigger = True
elif self.mode == 'dist':
thresh_update = self.calc_move(move)
self.thresh_val += thresh_update
if self.thresh_val > self.thresh:
do_trigger = True
else:
Warning ("mode is not defined! mode is {}".format(self.mode))
return do_trigger
[docs] def calc_move(self, move, thresh_type=None):
"""
Calculate distance move depending on type (x, y, total dist)
Args:
move ():
thresh_type ():
Returns:
"""
if thresh_type is None:
thresh_type = self.thresh_type
# FIXME: rly inefficient
# get the value of the movement depending on what we're measuring
if thresh_type == 'x':
distance = np.sum(move['vel'][move['dir'] == "REL_X"])
elif thresh_type == 'y':
distance = np.sum(move['vel'][move['dir'] == "REL_Y"])
elif thresh_type == "dist":
x_dist = np.sum(move['vel'][move['dir'] == "REL_X"])
y_dist = np.sum(move['vel'][move['dir'] == "REL_Y"])
distance = np.abs(np.sqrt(float(x_dist ** 2) + float(y_dist ** 2)))
return distance
[docs] def thresh_trig(self):
if self.digi_out:
self.digi_out.pulse()
self.measure_evt.clear()
[docs] def assign_cb(self, trigger_fn):
# want to have callback write an output pin -- so callback should go back to
# the task to write a GPIO pin.
self.trig_fn = trigger_fn
[docs] def l_measure(self, value):
"""
Task has signaled that we need to start measuring movements for a trigger
Args:
value ():
"""
if 'mode' in value.keys():
if value['mode'] in self.MODES:
self.mode = value['mode']
else:
Warning('incorrect mode sent: {}, needs to be one of {}'.format(value['mode'], self.MODES))
if 'thresh' in value.keys():
self.thresh = float(value['thresh'])
if self.mode == "steady":
self.thresh_val = np.array([(0, "REL_Y", 0)], dtype=self.MOVE_DTYPE)
else:
self.thresh_val = 0.0
self.measure_time = time.time()
self.measure_evt.set()
sys.stdout.flush()
[docs] def l_clear(self, value):
"""
Stop measuring!
Args:
value ():
Returns:
"""
self.measure_evt.clear()
[docs] def l_stop(self, value):
"""
Stop measuring and clear system resources
Args:
value ():
Returns:
"""
self.measure_evt.set()
self.release()
[docs] def release(self):
self.quit_evt.clear()
[docs]class Scale(Hardware):
"""
Note:
Not implemented, working on using a digital scale to
make weighing faster.
"""
MODEL={
'stamps.com':{
'vendor_id':0x1446,
'product_id': 0x6a73
}
}
def __init__(self, model='stamps.com', vendor_id = None, product_id = None):
"""
Args:
model:
vendor_id:
product_id:
"""
self.vendor_id = self.MODEL[model]['vendor_id']
self.product_id = self.MODEL[model]['product_id']
if vendor_id:
self.vendor_id = vendor_id
if product_id:
self.product_id = product_id
# find device
self.device = usb.core.find(idVendor=self.vendor_id,
idProduct=self.product_id)
# default configuration
self.device.set_configuration()