Source code for autopilot.gui.menus.tests

import ast
import datetime
import itertools
import threading
import time

import numpy as np
import pandas as pd
import pyqtgraph as pg
from PySide2 import QtWidgets, QtGui

from autopilot import prefs
from autopilot.gui.gui import gui_event
from autopilot.networking import Net_Node
from autopilot.utils.loggers import init_logger

[docs]class Bandwidth_Test(QtWidgets.QDialog): """ Test the limits of the rate of messaging from the connected Pilots. Asks pilots to send messages at varying rates and with varying payload sizes, and with messages with/without receipts. Measures drop rates and message latency Attributes: rate_list (list): List of rates (Hz) to test payload_list (list): List of payload sizes (KB) to test messages (list): list of messages received during test """ def __init__(self, pilots): super(Bandwidth_Test, self).__init__() self.pilots = pilots self.rate_list = [] self.payload_list = [] self.test_pilots = [] self.finished_pilots = [] self.messages = [] self.results = [] self.delays = [] self.drops = [] self.speeds = [] self.rates =[] self.end_test = threading.Event() self.end_test.clear() self.listens = { 'BANDWIDTH_MSG': self.register_msg } self.node = Net_Node(id="bandwidth", upstream='T', port = prefs.get('MSGPORT'), listens=self.listens) self.node.send('T', 'INIT') self.logger = init_logger(self) self.init_ui()
[docs] def init_ui(self): """ Look we're just making the stuff in the window over here alright? relax. """ # two panes: left selects the pilots and sets params of the test, # right plots outcomes # main layout l/r self.layout = QtWidgets.QHBoxLayout() # left layout for settings self.settings = QtWidgets.QFormLayout() self.n_messages = QtWidgets.QLineEdit('1000') self.n_messages.setValidator(QtGui.QIntValidator()) self.receipts = QtWidgets.QCheckBox('Get receipts?') self.receipts.setChecked(True) self.random = QtWidgets.QCheckBox() self.blosc = QtWidgets.QCheckBox() self.preserialized = QtWidgets.QCheckBox() self.rates = QtWidgets.QLineEdit('50') self.rates.setObjectName('rates') self.rates.editingFinished.connect(self.validate_list) self.rate_list = [50] self.payloads = QtWidgets.QLineEdit('0') self.payloads.setObjectName('payloads') self.payloads.editingFinished.connect(self.validate_list) self.payload_list = [0] # checkboxes for which pis to include in test self.pilot_box = QtWidgets.QGroupBox('Pilots') self.pilot_checks = {} self.pilot_layout = QtWidgets.QVBoxLayout() for p in self.pilots.keys(): cb = QtWidgets.QCheckBox(p) cb.setChecked(True) self.pilot_checks[p] = cb self.pilot_layout.addWidget(cb) # gotta have progress bars self.all_pbar = QtWidgets.QProgressBar() self.this_pbar = QtWidgets.QProgressBar() # buttons to start test/save results self.start_btn = QtWidgets.QPushButton('Start Test') self.start_btn.clicked.connect(self.start) self.save_btn = QtWidgets.QPushButton('Save Results') self.save_btn.setEnabled(False) self.save_btn.clicked.connect( # combine settings self.settings.addRow('N messages per test', self.n_messages) self.settings.addRow('Confirm sent messages?', self.receipts) self.settings.addRow('Use Random Arrays? (otherwise zeros)', self.random) self.settings.addRow('Compress with blosc?', self.blosc) self.settings.addRow('Preserialize message? \n(dont serialize each message separately)', self.preserialized) self.settings.addRow('Message Rates per Pilot \n(in Hz, list of integers like "[1, 2, 3]")', self.rates) self.settings.addRow('Payload sizes per message \n(in KB, list of integers like "[32, 64, 128]")', self.payloads) self.settings.addRow('Which Pilots to include in test', self.pilot_layout) self.settings.addRow('Progress: All tests', self.all_pbar) self.settings.addRow('Progress: This test', self.this_pbar) self.settings.addRow(self.start_btn, self.save_btn) ########### # plotting widget self.drop_plot = pg.PlotWidget(title='Message Drop Rate') self.delay_plot = pg.PlotWidget(title='Mean Delay') self.speed_plot = pg.PlotWidget(title='Requested vs. Actual speed') # the actual graphical objects that draw stuff for us self.drop_line = self.drop_plot.plot(symbol='t', symbolBrush=(100, 100, 255, 50)) self.delay_line = self.delay_plot.plot(symbol='t', symbolBrush=(100, 100, 255, 50)) self.speed_line = self.speed_plot.plot(symbol='t', symbolBrush=(100, 100, 255, 50)) self.drop_line.setPen((255,0,0)) self.delay_line.setPen((255,0,0)) self.speed_line.setPen((255,0,0)) self.plot_layout = QtWidgets.QVBoxLayout() self.plot_layout.addWidget(self.drop_plot) self.plot_layout.addWidget(self.delay_plot) self.plot_layout.addWidget(self.speed_plot) # add panes self.layout.addLayout(self.settings, 1) self.layout.addLayout(self.plot_layout, 1) self.setLayout(self.layout)
[docs] def start(self): """ Start the test!!! """ # lists to store our results for plotting and etc. self.results = [] self.delays = [] self.drops = [] self.speeds = [] self.rates =[] # first make sure we got everything we need if len(self.rate_list) == 0: warning_msg = QtWidgets.QMessageBox(QtWidgets.QMessageBox.Warning, "No rates to test!", "Couldn't find a list of rates to test, did you enter one?") warning_msg.exec_() return if len(self.payload_list) ==0 : warning_msg = QtWidgets.QMessageBox(QtWidgets.QMessageBox.Warning, "No payloads to test!", "Couldn't find a list of payloads to test, did you enter one?") warning_msg.exec_() return # get list of checked pis test_pilots = [] for pilot, p_box in self.pilot_checks.items(): if p_box.isChecked(): test_pilots.append(pilot) self.test_pilots = test_pilots # stash some run parameters get_receipts = self.receipts.isChecked() n_messages = self.n_messages.text() random = self.random.isChecked() blosc = self.blosc.isChecked() preserialized = self.preserialized.isChecked() # 'n messages for this test' in case user changes it during run self.n_messages_test = int(n_messages) self.save_btn.setEnabled(False) self.start_btn.setEnabled(False) # set pbars if len(self.payload_list) == 0: payload_len = 1 else: payload_len = len(self.payload_list) self.all_pbar.setMaximum(len(self.rate_list)*payload_len) self.this_pbar.setMaximum(self.n_messages_test*len(test_pilots)) self.all_pbar.reset() # save tests to do, disable play button, and get to doing it self.tests_todo = [x for x in itertools.product(self.rate_list, self.payload_list, [self.n_messages_test], [get_receipts], [blosc], [random], [preserialized])] # used to update pbar self.test_counter = itertools.count() self.current_test = self.tests_todo.pop() self.send_test(*self.current_test)
# # start a timer that continues the test if messages are dropped # try: # self.repeat_timer.cancel() # except: # pass # # self.repeat_timer = threading.Timer(self.current_test[0] * self.current_test[2] * 20, # self.process_test, args=self.current_test) # self.repeat_timer.daemon = True # self.repeat_timer.start()
[docs] def send_test(self, rate:int, payload:int, n_msg:int, confirm:bool, blosc:bool, random:bool, preserialized:bool): """ Send a message describing the test to each of the pilots in :attr:`Bandwidth_Test.test_pilots` Args: rate (int): Rate of message sending in Hz payload (int): Size of message payload in bytes n_msg (int): Number of messages to send confirm (bool): If True, use message confirmation, if False no confirmation. blosc (bool): Use blosc compression? random (bool): Use random arrays? preserialized (bool): Serialize the message once, rather than serializing every time? Returns: """ self.finished_pilots = [] self.messages = [] msg = {'rate': rate, 'payload': payload, 'n_msg': n_msg, 'confirm': confirm, 'blosc':blosc, 'random':random, 'preserialized':preserialized } self.end_test.clear() self.this_pbar.reset() self.msg_counter = itertools.count() for p in self.test_pilots: self.node.send(to=p, key="BANDWIDTH", value=msg)
[docs] @gui_event def process_test(self, rate, n_msg, confirm, blosc, random, preserialized): """ Process the results of the test and update the plot window. Reads message results from :attr:`~Bandwidth_Test.messages`, appends computed results to :attr:`~Bandwidth_Test.results`, and starts the next test if any remain. Args: rate (int): Rate of current test in Hz n_msg (int): Number of expected messages in this test confirm (bool): Whether message confirmations were enabled for this test. """ # start a timer that continues the test if messages are dropped try: self.repeat_timer.cancel() except: pass # process messages msg_df = pd.DataFrame.from_records(self.messages, columns=['pilot', 'n_msg', 'timestamp_sent', 'timestamp_rcvd', 'payload_size', 'message_size']) msg_df = msg_df.astype({'timestamp_sent':'datetime64', 'timestamp_rcvd':'datetime64'}) # compute summary try: mean_delay = np.mean(msg_df['timestamp_rcvd'] - msg_df['timestamp_sent']).total_seconds() except AttributeError: mean_delay = np.mean(msg_df['timestamp_rcvd'] - msg_df['timestamp_sent']) try: send_jitter = np.std(msg_df.groupby('pilot').timestamp_sent.diff()).total_seconds() except AttributeError: print(np.std(msg_df.groupby('pilot').timestamp_sent.diff())) send_jitter = np.std(msg_df.groupby('pilot').timestamp_sent.diff()) try: delay_jitter = np.std(msg_df['timestamp_rcvd'] - msg_df['timestamp_sent']).total_seconds() except AttributeError: delay_jitter = np.std(msg_df['timestamp_rcvd'] - msg_df['timestamp_sent']) drop_rate = np.mean(1.0-(msg_df.groupby('pilot').n_msg.count() / float(n_msg))) try: mean_speed = 1.0/msg_df.groupby('pilot').timestamp_rcvd.diff().mean().total_seconds() except AttributeError: mean_speed = 1.0/msg_df.groupby('pilot').timestamp_rcvd.diff().mean() mean_payload = msg_df.payload_size.mean() mean_message = msg_df.message_size.mean() #print(msg_df.groupby('pilot').timestamp_rcvd.diff()) # plot self.rates.append(rate) self.drops.append(drop_rate) self.delays.append(mean_delay) self.speeds.append(mean_speed) self.results.append((rate, mean_payload, mean_message, n_msg, confirm, blosc, random, preserialized, len(self.test_pilots), mean_delay, drop_rate, mean_speed, send_jitter, delay_jitter)) self.delay_line.setData(x=self.rates, y=self.delays) self.drop_line.setData(x=self.rates, y=self.drops) self.speed_line.setData(x=self.rates, y=self.speeds) # self.drop_plot.setYRange(np.min(self.drops), np.max(self.drops), # padding=(np.max(self.drops) - np.min(self.drops)) * .1) # self.delay_plot.setYRange(np.min(self.delays), np.max(self.delays), # padding=(np.max(self.delays) - np.min(self.delays)) * .1) # self.speed_plot.setYRange(np.min(self.speeds), np.max(self.speeds)) self.all_pbar.setValue(next(self.test_counter) + 1) if len(self.tests_todo) == 0: self.save_btn.setEnabled(True) self.start_btn.setEnabled(True) else: time.sleep(2.5) self.current_test = self.tests_todo.pop() self.send_test(*self.current_test) # self.repeat_timer = threading.Timer(self.current_test[0]*self.current_test[2]*10, # self.process_test, args=self.current_test) # self.repeat_timer.daemon = True # self.repeat_timer.start() self.repaint()
[docs] @gui_event def save(self): """ Select save file location for test results (csv) and then save them there """ fileName, filtr = QtWidgets.QFileDialog.getSaveFileName(self, "Where should we save these results?", prefs.get('DATADIR'), "CSV files (*.csv)", "") # make and save results df try: res_df = pd.DataFrame.from_records(self.results, columns=['rate', 'payload_size', 'message_size', 'n_messages', 'confirm', 'blosc', 'random', 'preserialized', 'n_pilots', 'mean_delay', 'drop_rate', 'actual_rate', 'send_jitter', 'delay_jitter']) res_df.to_csv(fileName) reply = QtWidgets.QMessageBox.information(self, "Results saved!", "Results saved to {}".format(fileName)) except Exception as e: reply = QtWidgets.QMessageBox.critical(self, "Error saving", "Error while saving your results:\n{}".format(e))
[docs] def register_msg(self, value): """ Receive message from pilot, stash timestamp, number and pilot Args: value (dict): Value should contain * Pilot * Timestamp * Message number * Payload """ # have to iterate over contents to get true size, # and then add size of container itself. # payload size is distinct from the serialized message size, this is the end size # as it ends up on the disk of the receiver # pdb.set_trace() # payload_size = np.sum([sys.getsizeof(v) for k, v in value.items()]) + sys.getsizeof(value) if 'test_end' in value.keys(): self.finished_pilots.append(value['pilot']) if len(self.finished_pilots) == len(self.test_pilots): self.process_test(value['rate'], value['n_msg'], value['confirm'], value['blosc'], value['random'], value['preserialized']) return payload_size = value['payload_size'] receive_time = try: if value['payload_n']>0: assert isinstance(value['payload'], np.ndarray) #print(value['payload'].shape, value['payload_n'], type(value['payload'])) assert value['payload'].shape[0] == int(value['payload_n']*128) self.messages.append((value['pilot'], int(value['n_msg']), value['timestamp'], receive_time, payload_size, value['message_size'])) except AssertionError: self.logger.exception(f"Payload was not a numpy array or didnt have the expected size\ngot a {type(value['payload'])}") #payload_size = np.frombuffer(base64.b64decode(value['payload']),dtype=np.bool).nbytes msgs_rcvd = next(self.msg_counter) if msgs_rcvd % float(round(self.n_messages_test/100.0)) < 1.0: self.update_pbar(msgs_rcvd+1)
[docs] @gui_event def update_pbar(self, val): self.this_pbar.setValue(val+1)
[docs] def validate_list(self): """ Checks that the entries in :py:attr:`Bandwidth_Test.rates` and :py:attr:`Bandwidth_Test.payloads` are well formed. ie. that they are of the form 'integer, integer, integer'... pops a window that warns about ill formed entry and clears line edit if badly formed If the list validates, stored as either :py:attr:`Bandwidth_Test.rate_list` or :py:attr:`Bandwidth_Test.payload_list` """ # pdb.set_trace() sender = self.sender() text = sender.text() # user doesn't have to add open/close brackets in input, make sure if not text.startswith('['): text = '[ ' + text if not text.endswith(']'): text = text + ' ]' # validate form of string try: a_list = ast.literal_eval(text) except SyntaxError: warning_msg = QtWidgets.QMessageBox(QtWidgets.QMessageBox.Warning, "Improperly formatted list!", "The input received wasn't a properly formatted list of integers. Make sure your input is of the form '1, 2, 3' or '[ 1, 2, 3 ]'\ninstead got : {}".format(text)) sender.setText('') warning_msg.exec_() return # validate integers for i in a_list: if not isinstance (i, int): warning_msg = QtWidgets.QMessageBox(QtWidgets.QMessageBox.Warning, "Improperly formatted list!", "The input received wasn't a properly formatted list of integers. Make sure your input is of the form '1, 2, 3' or '[ 1, 2, 3 ]'\ninstead got : {}".format( text)) sender.setText('') warning_msg.exec_() return # if passes our validation, set list if sender.objectName() == 'rates': self.rate_list = a_list elif sender.objectName() == 'payloads': self.payload_list = a_list else: Warning('Not sure what list this is, object name is: {}'.format(sender.objectName()))