Source code for autopilot.gui.menus.swarm

import multiprocessing as mp
import threading
import time
import typing

from PySide2 import QtWidgets, QtGui

from autopilot import prefs
from autopilot.gui.dialog import pop_dialog
from autopilot.utils.loggers import init_logger
from autopilot.gui.plots.video import Video
from autopilot.networking import Net_Node


[docs]class Stream_Video(QtWidgets.QDialog): """ Dialogue to stream, display, and save video. """ def __init__(self, pilots:dict, *args, **kwargs): """ Args: pilots (dict): The :attr:`.Terminal.pilot_db` with the ``prefs`` of each pilot (given by :meth:`.Pilot.handshake`) """ super(Stream_Video, self).__init__(*args, **kwargs) self.writer = None # type: typing.Optional['Video_Writer'] self.writer_q = mp.Queue() self.writer_file = "" self.writing = threading.Event() self.writing.clear() self.logger = init_logger(self) self.pilots = pilots # -------------------------------------------------- # Parse hardware devices # -------------------------------------------------- self.cameras = {} for pilot, pilot_params in self.pilots.items(): pilot_prefs = pilot_params.get('prefs', None) if pilot_prefs is None: self.logger.exception(f'pilot {pilot} had no prefs in its pilots_db entry') continue self.cameras[pilot] = {} # iterate through nested hardware dictionary, lookin for cameras hardware = pilot_prefs.get('HARDWARE', {'':{}}) for hw_group, hw_items in hardware.items(): for hw_id, hw_params in hw_items.items(): # if it has cameras in its type (eg. 'cameras.PiCamera') # or a group that starts with cam... if 'cameras' in hw_params.get('type', '') or hw_group.lower().startswith('cam'): # store an abbreviated version of the name and its params for the comboboxes self.cameras[pilot]['.'.join((hw_group, hw_id))] = hw_params self.id = f'{prefs.get("NAME")}_video' self.video = Video(('stream',)) self.node = Net_Node(id=self.id, upstream="T", port=prefs.get('MSGPORT'), listens={'CONTINUOUS':self.l_frame}, instance=True) self.layout = None # type: typing.Optional[QtWidgets.QHBoxLayout] self.comboboxes = {} # type: typing.Dict[str, QtWidgets.QComboBox] self.buttons = {} # type: typing.Dict[str, QtWidgets.QPushButton] self.cam_info = {} # type: typing.Dict[str, typing.Union[QtWidgets.QFormLayout, QtWidgets.QLabel]] self._streaming_pilot = '' # keep reference to ID of pilot that was started if combobox values change while streaming self._streaming_cam_id = '' self.init_ui() self.show()
[docs] def init_ui(self): self.layout = QtWidgets.QHBoxLayout() self.layout.addWidget(self.video,3) # -------------------------------------------------- # Controls layout on right - comboboxes and buttons # -------------------------------------------------- self.button_layout = QtWidgets.QVBoxLayout() # combobox to select pilot self.comboboxes['pilot'] = QtWidgets.QComboBox() self.comboboxes['pilot'].addItem('Select Pilot...') for pilot in sorted(self.pilots.keys()): self.comboboxes['pilot'].addItem(pilot) self.comboboxes['pilot'].currentIndexChanged.connect(self.populate_cameras) # and to select camera device self.comboboxes['camera'] = QtWidgets.QComboBox() self.comboboxes['camera'].addItem('Select Camera...') self.comboboxes['camera'].currentIndexChanged.connect(self.camera_selected) # buttons to control video self.buttons['start'] = QtWidgets.QPushButton('Start Streaming') self.buttons['start'].setCheckable(True) self.buttons['start'].setChecked(False) self.buttons['start'].setDisabled(True) self.buttons['start'].toggled.connect(self.toggle_start) # save button to start saving frames self.buttons['write'] = QtWidgets.QPushButton('Write Video...') self.buttons['write'].setCheckable(True) self.buttons['write'].setChecked(False) self.buttons['write'].setDisabled(True) self.buttons['write'].toggled.connect(self.write_video) # Infobox to display camera params self.cam_info['label'] = QtWidgets.QLabel() self.cam_info['form'] = QtWidgets.QFormLayout() # -------------------------------------------------- # add to button layout self.button_layout.addWidget(self.comboboxes['pilot']) self.button_layout.addWidget(self.comboboxes['camera']) self.button_layout.addWidget(self.buttons['start']) self.button_layout.addWidget(self.buttons['write']) self.button_layout.addWidget(self.cam_info['label']) self.button_layout.addLayout(self.cam_info['form']) self.button_layout.addStretch(1) self.layout.addLayout(self.button_layout, 1) self.setLayout(self.layout)
@property def current_pilot(self) -> str: return self.comboboxes['pilot'].currentText() @property def current_camera(self) -> str: return self.comboboxes['camera'].currentText()
[docs] def populate_cameras(self): current_pilot = self.current_pilot self.comboboxes['camera'].clear() self._clear_info() self.buttons['start'].setChecked(False) self.buttons['start'].setDisabled(True) self.buttons['write'].setChecked(False) self.buttons['write'].setDisabled(True) # ignore placeholder text if current_pilot in self.cameras.keys(): self.comboboxes['camera'].addItem('Select Camera...') for cam_name in sorted(self.cameras[current_pilot].keys()): self.comboboxes['camera'].addItem(cam_name) else: self.comboboxes['camera'].addItem('No Camera Configured!')
[docs] def camera_selected(self): current_pilot = self.current_pilot current_camera = self.current_camera if current_pilot in self.cameras.keys() and \ current_camera in self.cameras[current_pilot].keys(): self.cam_info['label'].setText(current_camera) for param_name, param_val in self.cameras[current_pilot][current_camera].items(): self.cam_info['form'].addRow(param_name, QtWidgets.QLabel(str(param_val))) self.buttons['start'].setDisabled(False)
[docs] def toggle_start(self): if self.buttons['start'].isChecked(): # starting!! self.comboboxes['pilot'].setDisabled(True) self.comboboxes['camera'].setDisabled(True) self.buttons['write'].setDisabled(False) self._streaming_cam_id = self.current_camera.split('.')[-1] self.buttons['start'].setText('Streaming...') self.node.send(to=self.current_pilot, key="STREAM_VIDEO", value={ 'starting': True, 'camera': self.current_camera, 'stream_to': self.id }) else: self.node.send(to=self.current_pilot, key="STREAM_VIDEO", value={ 'starting': False, 'camera': self.current_camera, 'stream_to': self.id }) if self.buttons['write'].isChecked(): self.buttons['start'].setDisabled(True) self.buttons['write'].toggle() while not self.buttons['write'].isEnabled(): time.sleep(0.001) self.buttons['start'].setDisabled(False) self.comboboxes['pilot'].setDisabled(False) self.comboboxes['camera'].setDisabled(False) self.buttons['write'].setDisabled(True) self.buttons['start'].setText('Start Streaming')
[docs] def write_video(self): # import here so only import when this particular widget is used. # (until we refactor GUI objects) from autopilot.hardware.cameras import Video_Writer if self.buttons['write'].isChecked(): if self.writer is None: self.writer_file, _ = QtWidgets.QFileDialog.getSaveFileName( self, "Select Output Video Location", prefs.get("DATADIR"), "Video File (*.mp4)" ) # remake queue just in case self.writer_q = mp.Queue() # try to get fps try: fps = int(self.cameras[self.current_pilot][self.current_camera]['fps']) except KeyError: self.logger.warning('Camera does not have an "fps" parameter, using 30') fps = 30 self.writer = Video_Writer( q = self.writer_q, path = self.writer_file, fps=fps, timestamps=True, blosc=False ) self.writer.start() self.writing.set() self.buttons['write'].setText('Writing') else: if self.writer is not None: self.writing.clear() self.writer_q.put('END') self.logger.info('Waiting for writer to finish...') self.buttons['write'].setDisabled(True) while not self.writer_q.empty(): self.buttons['write'].setText(f'Writer finishing {self.writer_q.qsize()} frames') time.sleep(0.2) # give the writer an additional second if it needs it self.writer.join(3) if self.writer.exitcode is None: # ask if we want to wait waitforit = pop_dialog( 'Wait for writer?', details="Writer isn't finished but queue is empty, wait for it to finish? Otherwise we'll try to terminate it", msg_type='question', buttons=('Ok', 'Abort') ) print(waitforit) if waitforit == True: start_time = time.time() while self.writer.exitcode is None: waited = time.time() - start_time self.buttons['write'].setText(f'Waiting for Writer ({waited:.1f})') self.writer.join(0.1) else: self.logger.exception("Had to terminate Video Writer!") self.writer.terminate() self.writer = None self.buttons['write'].setText("Write Video...") self.buttons['write'].setDisabled(False)
def _clear_info(self): self.cam_info['label'].setText('') while self.cam_info['form'].count(): child = self.cam_info['form'].takeAt(0) if child.widget(): child.widget().deleteLater()
[docs] def l_frame(self, value): self.video.update_frame('stream', value[self._streaming_cam_id]) if self.writing.is_set(): self.writer_q.put_nowait((value['timestamp'], value[self._streaming_cam_id]))
[docs] def closeEvent(self, arg__1:QtGui.QCloseEvent): if self.buttons['start'].isChecked(): self.buttons['start'].toggle() # this will also stop the writer max_wait = 10 waited = 0 while not self.buttons['start'].isEnabled() and waited < max_wait: time.sleep(1) waited += 1 super(Stream_Video, self).closeEvent(arg__1)