Source code for

import copy
import datetime
import json
import os
from collections import OrderedDict as odict

import numpy as np
from PySide2 import QtWidgets, QtGui

from autopilot import prefs
from autopilot.utils.loggers import init_logger
from autopilot.gui.gui import gui_event
from autopilot.networking import Net_Node

[docs]class Calibrate_Water(QtWidgets.QDialog): """ A window to calibrate the volume of water dispensed per ms. """ def __init__(self, pilots): """ Args: pilots (:py:attr:`.Terminal.pilots`): A dictionary of pilots message_fn (:py:meth:`.Net_Node.send`): The method the Terminal uses to send messages via its net node. """ super(Calibrate_Water, self).__init__() self.pilots = pilots self.pilot_widgets = {} self.init_ui()
[docs] def init_ui(self): self.layout = QtWidgets.QVBoxLayout() # Container Widget self.container = QtWidgets.QWidget() # Layout of Container Widget self.container_layout = QtWidgets.QVBoxLayout(self) self.container.setLayout(self.container_layout) screen_geom = QtWidgets.QDesktopWidget().availableGeometry() # get max pixel value for each subwidget widget_height = np.floor(screen_geom.height()-50/float(len(self.pilots))) for p in self.pilots: self.pilot_widgets[p] = Pilot_Ports(p) self.pilot_widgets[p].setMaximumHeight(widget_height) self.pilot_widgets[p].setMaximumWidth(screen_geom.width()) self.pilot_widgets[p].setSizePolicy(QtWidgets.QSizePolicy.Maximum, QtWidgets.QSizePolicy.Maximum) self.container_layout.addWidget(self.pilot_widgets[p]) # Scroll Area Properties self.scroll = QtWidgets.QScrollArea() self.scroll.setWidgetResizable(False) self.scroll.setWidget(self.container) self.layout.addWidget(self.scroll) # ok/cancel buttons buttonBox = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel) buttonBox.accepted.connect(self.accept) buttonBox.rejected.connect(self.reject) self.layout.addWidget(buttonBox) self.setLayout(self.layout) # prevent from expanding # set max size to screen size self.setMaximumHeight(screen_geom.height()) self.setMaximumWidth(screen_geom.width()) self.setSizePolicy(QtWidgets.QSizePolicy.Maximum, QtWidgets.QSizePolicy.Maximum) self.scrollArea = QtWidgets.QScrollArea(self) self.scrollArea.setWidgetResizable(True)
[docs]class Reassign(QtWidgets.QDialog): """ A dialog that lets subjects be batch reassigned to new protocols or steps. """ def __init__(self, subjects, protocols): """ Args: subjects (dict): A dictionary that contains each subject's protocol and step, ie.:: {'subject_id':['protocol_name', step_int], ... } protocols (list): list of protocol files in the `prefs.get('PROTOCOLDIR')`. Not entirely sure why we don't just list them ourselves here. """ super(Reassign, self).__init__() # FIXME: get logger in a superclass, good god. self.logger = init_logger(self) self.subjects = subjects self.protocols = protocols self.protocol_dir = prefs.get('PROTOCOLDIR') self.init_ui()
[docs] def init_ui(self): """ Initializes graphical elements. Makes a row for each subject where its protocol and step can be changed. """ self.grid = QtWidgets.QGridLayout() self.subject_objects = {} for i, (subject, protocol) in enumerate(self.subjects.items()): subject_name = copy.deepcopy(subject) step = protocol[1] protocol = protocol[0] # subject label subject_lab = QtWidgets.QLabel(subject) self.subject_objects[subject] = [QtWidgets.QComboBox(), QtWidgets.QComboBox()] protocol_box = self.subject_objects[subject][0] protocol_box.setObjectName(subject_name) protocol_box.insertItems(0, self.protocols) # add blank at the end protocol_box.addItem('') # set current item if subject has matching protocol protocol_bool = [protocol == p for p in self.protocols] if any(protocol_bool): protocol_ind = np.where(protocol_bool)[0][0] protocol_box.setCurrentIndex(protocol_ind) else: # set to blank protocol_box.setCurrentIndex(protocol_box.count()-1) protocol_box.currentIndexChanged.connect(self.set_protocol) # create & populate step box step_box = self.subject_objects[subject][1] step_box.setObjectName(subject_name) self.populate_steps(subject_name) if step: step_box.setCurrentIndex(step) step_box.currentIndexChanged.connect(self.set_step) # add to layout self.grid.addWidget(subject_lab, i%25, 0+(np.floor(i/25))*3) self.grid.addWidget(protocol_box, i%25, 1+(np.floor(i/25))*3) self.grid.addWidget(step_box, i%25, 2+(np.floor(i/25))*3) buttonBox = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel) buttonBox.accepted.connect(self.accept) buttonBox.rejected.connect(self.reject) main_layout = QtWidgets.QVBoxLayout() main_layout.addLayout(self.grid) main_layout.addWidget(buttonBox) self.setLayout(main_layout)
[docs] def populate_steps(self, subject): """ When a protocol is selected, populate the selection box with the steps that can be chosen. Args: subject (str): ID of subject whose steps are being populated """ protocol_box = self.subject_objects[subject][0] step_box = self.subject_objects[subject][1] while step_box.count(): step_box.removeItem(0) # Load the protocol and parse its steps protocol_str = protocol_box.currentText() # if unassigned, will be the blank string (which evals False here) # so do nothing in that case if protocol_str: protocol_file = os.path.join(self.protocol_dir, protocol_str + '.json') try: with open(protocol_file) as protocol_file_open: protocol = json.load(protocol_file_open) except json.decoder.JSONDecodeError: self.logger.exception(f'Steps could not be populated because task could not be loaded due to malformed JSON in protocol file {protocol_file}') return except Exception: self.logger.exception(f'Steps could not be populated due to an unknown error loading {protocol_file}. Catching and continuing to populate window') return step_list = [] for i, s in enumerate(protocol): step_list.append(s['step_name']) step_box.insertItems(0, step_list)
[docs] def set_protocol(self): """ When the protocol is changed, stash that and call :py:meth:`.Reassign.populate_steps` . Returns: """ subject = self.sender().objectName() protocol_box = self.subject_objects[subject][0] step_box = self.subject_objects[subject][1] self.subjects[subject][0] = protocol_box.currentText() self.subjects[subject][1] = 0 self.populate_steps(subject)
[docs] def set_step(self): """ When the step is changed, stash that. """ subject = self.sender().objectName() protocol_box = self.subject_objects[subject][0] step_box = self.subject_objects[subject][1] self.subjects[subject][1] = step_box.currentIndex()
[docs]class Weights(QtWidgets.QTableWidget): """ A table for viewing and editing the most recent subject weights. """ def __init__(self, subject_weights, subjects): """ Args: subject_weights (list): a list of weights of the format returned by :py:meth:`.Subject.get_weight(baseline=True)`. subjects (dict): the Terminal's :py:attr:`.Terminal.subjects` dictionary of :class:`.Subject` objects. """ super(Weights, self).__init__() self.subject_weights = subject_weights self.subjects = subjects # subject objects from terminal self.colnames = odict() self.colnames['subject'] = "Subject" self.colnames['date'] = "Date" self.colnames['baseline_mass'] = "Baseline" self.colnames['minimum_mass'] = "Minimum" self.colnames['start'] = 'Starting Mass' self.colnames['stop'] = 'Stopping Mass' self.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding) self.init_ui() self.cellChanged.connect(self.set_weight) self.changed_cells = [] # if we change cells, store the row, column and value so terminal can update
[docs] def init_ui(self): """ Initialized graphical elements. Literally just filling a table. """ # set shape (rows by cols self.shape = (len(self.subject_weights), len(self.colnames.keys())) self.setRowCount(self.shape[0]) self.setColumnCount(self.shape[1]) for row in range(self.shape[0]): for j, col in enumerate(self.colnames.keys()): try: if col == "date": format_date = datetime.datetime.strptime(self.subject_weights[row][col], '%y%m%d-%H%M%S') format_date = format_date.strftime('%b %d') item = QtWidgets.QTableWidgetItem(format_date) elif col == "stop": stop_wt = str(self.subject_weights[row][col]) minimum = float(self.subject_weights[row]['minimum_mass']) item = QtWidgets.QTableWidgetItem(stop_wt) if float(stop_wt) < minimum: item.setBackground(QtGui.QColor(255,0,0)) else: item = QtWidgets.QTableWidgetItem(str(self.subject_weights[row][col])) except: item = QtWidgets.QTableWidgetItem(str(self.subject_weights[row][col])) self.setItem(row, j, item) # make headers self.setHorizontalHeaderLabels(list(self.colnames.values())) self.resizeColumnsToContents() self.updateGeometry() self.adjustSize() self.sortItems(0)
[docs] def set_weight(self, row, column): """ Updates the most recent weights in :attr:`.gui.Weights.subjects` objects. Note: Only the daily weight measurements can be changed this way - not subject name, baseline weight, etc. Args: row (int): row of table column (int): column of table """ if column > 3: # if this is one of the daily weights new_val = self.item(row, column).text() try: new_val = float(new_val) except ValueError: ValueError("New value must be able to be coerced to a float! input: {}".format(new_val)) return # get subject, date and column name subject_name = self.item(row, 0).text() date = self.subject_weights[row]['date'] column_name = self.colnames.keys()[column] # recall colnames is an ordered dictionary self.subjects[subject_name].set_weight(date, column_name, new_val)
[docs]class Pilot_Ports(QtWidgets.QWidget): """ Created by :class:`.Calibrate_Water`, Each pilot's ports and buttons to control repeated release. """ def __init__(self, pilot, n_clicks=1000, click_dur=30): """ Args: pilot (str): name of pilot to calibrate n_clicks (int): number of times to open the port during calibration click_dur (int): how long to open the port (in ms) """ super(Pilot_Ports, self).__init__() self.pilot = pilot # when starting, stash the duration sent to the pi in case it's changed during. self.open_params = {} # store volumes per dispense here. self.volumes = {} self.listens = { 'CAL_PROGRESS': self.l_progress } self.node = Net_Node(id="Cal_{}".format(self.pilot), upstream="T", port=prefs.get('MSGPORT'), listens=self.listens) self.init_ui()
[docs] def init_ui(self): """ Init the layout for one pilot's ports: * pilot name * port buttons * 3 times and vol dispersed :return: """ layout = QtWidgets.QHBoxLayout() pilot_lab = QtWidgets.QLabel(self.pilot) pilot_font = QtGui.QFont() pilot_font.setBold(True) pilot_font.setPointSize(14) pilot_lab.setFont(pilot_font) pilot_lab.setStyleSheet('border: 1px solid black') layout.addWidget(pilot_lab) # make param setting boxes param_layout = QtWidgets.QFormLayout() self.n_clicks = QtWidgets.QLineEdit(str(1000)) self.n_clicks.setSizePolicy(QtWidgets.QSizePolicy.Fixed,QtWidgets.QSizePolicy.Fixed) self.n_clicks.setValidator(QtGui.QIntValidator()) self.interclick_interval = QtWidgets.QLineEdit(str(50)) self.interclick_interval.setSizePolicy(QtWidgets.QSizePolicy.Fixed,QtWidgets.QSizePolicy.Fixed) self.interclick_interval.setValidator(QtGui.QIntValidator()) param_layout.addRow("n clicks", self.n_clicks) param_layout.addRow("interclick (ms)", self.interclick_interval) layout.addLayout(param_layout) # buttons and fields for each port #button_layout = QtWidgets.QVBoxLayout() vol_layout = QtWidgets.QGridLayout() self.dur_boxes = {} self.vol_boxes = {} self.pbars = {} self.flowrates = {} for i, port in enumerate(['L', 'C', 'R']): # init empty dict to store volumes and params later self.volumes[port] = {} # button to start calibration port_button = QtWidgets.QPushButton(port) port_button.setObjectName(port) port_button.clicked.connect(self.start_calibration) vol_layout.addWidget(port_button, i, 0) # set click duration dur_label = QtWidgets.QLabel("Click dur (ms)") self.dur_boxes[port] = QtWidgets.QLineEdit(str(20)) self.dur_boxes[port].setSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed) self.dur_boxes[port].setValidator(QtGui.QIntValidator()) vol_layout.addWidget(dur_label, i, 1) vol_layout.addWidget(self.dur_boxes[port], i, 2) # Divider divider = QtWidgets.QFrame() divider.setFrameShape(QtWidgets.QFrame.VLine) vol_layout.addWidget(divider, i, 3) # input dispensed volume vol_label = QtWidgets.QLabel("Dispensed volume (mL)") self.vol_boxes[port] = QtWidgets.QLineEdit() self.vol_boxes[port].setSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed) self.vol_boxes[port].setObjectName(port) self.vol_boxes[port].setValidator(QtGui.QDoubleValidator()) self.vol_boxes[port].textEdited.connect(self.update_volumes) vol_layout.addWidget(vol_label, i, 4) vol_layout.addWidget(self.vol_boxes[port], i, 5) self.pbars[port] = QtWidgets.QProgressBar() vol_layout.addWidget(self.pbars[port], i, 6) # display flow rate #self.flowrates[port] = QtWidgets.QLabel('?uL/ms') #self.flowrates[port].setSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed) #vol_layout.addWidget(self.flowrates[port], i, 7) layout.addLayout(vol_layout) self.setLayout(layout)
[docs] def update_volumes(self): """ Store the result of a volume calibration test in :attr:`~Pilot_Ports.volumes` """ port = self.sender().objectName() if port in self.open_params.keys(): open_dur = self.open_params[port]['dur'] n_clicks = self.open_params[port]['n_clicks'] click_iti = self.open_params[port]['click_iti'] else: Warning('Volume can only be updated after a calibration has been run') return vol = float(self.vol_boxes[port].text()) self.volumes[port][open_dur] = { 'vol': vol, 'n_clicks': n_clicks, 'click_iti': click_iti, 'timestamp': }
# set flowrate label #flowrate = ((vol * 1000.0) / n_clicks) / open_dur #frame_geom = self.flowrates[port].frameGeometry() #self.flowrates[port].setMaximumHeight(frame_geom.height()) #self.flowrates[port].setText("{} uL/ms".format(flowrate))
[docs] def start_calibration(self): """ Send the calibration test parameters to the :class:`.Pilot` Sends a message with a ``'CALIBRATE_PORT'`` key, which is handled by :meth:`.Pilot.l_cal_port` """ port = self.sender().objectName() # stash params at the time of starting calibration self.open_params[port] = { 'dur':int(self.dur_boxes[port].text()), 'n_clicks': int(self.n_clicks.text()), 'click_iti': int(self.interclick_interval.text()) } self.pbars[port].setMaximum(self.open_params[port]['n_clicks']) self.pbars[port].setValue(0) msg = self.open_params[port] msg.update({'port':port}) self.node.send(to=self.pilot, key="CALIBRATE_PORT", value=msg)
[docs] @gui_event def l_progress(self, value): """ Value should contain * Pilot * Port * Current Click (click_num) :param value: :return: """ self.pbars[value['port']].setValue(int(value['click_num']))