Source code for autopilot.core.subject

"""

Classes for managing data and protocol access and storage.

Currently named subject, but will likely be refactored to include other data
models should the need arise.

"""
# TODO: store pilot in biography
import os
import sys
import threading
import tables
from tables.nodes import filenode
import datetime
import json
import pandas as pd
import warnings
import typing
import warnings
from copy import copy
import autopilot
from autopilot import prefs
from autopilot.stim.sound.sounds import STRING_PARAMS
from autopilot.core.loggers import init_logger

import queue


# suppress pytables natural name warnings
warnings.simplefilter('ignore', category=tables.NaturalNameWarning)

import pdb
import numpy as np


[docs]class Subject(object): """ Class for managing one subject's data and protocol. Creates a :mod:`tables` hdf5 file in `prefs.get('DATADIR')` with the general structure:: / root |--- current (tables.filenode) storing the current task as serialized JSON |--- data (group) | |--- task_name (group) | |--- S##_step_name | | |--- trial_data | | |--- continuous_data | |--- ... |--- history (group) | |--- hashes - history of git commit hashes | |--- history - history of changes: protocols assigned, params changed, etc. | |--- weights - history of pre and post-task weights | |--- past_protocols (group) - stash past protocol params on reassign | |--- date_protocol_name - tables.filenode of a previous protocol's params. | |--- ... |--- info - group with biographical information as attributes Attributes: lock (:class:`threading.Lock`): manages access to the hdf5 file name (str): Subject ID file (str): Path to hdf5 file - usually `{prefs.get('DATADIR')}/{self.name}.h5` current (dict): current task parameters. loaded from the 'current' :mod:`~tables.filenode` of the h5 file step (int): current step protocol_name (str): name of currently assigned protocol current_trial (int): number of current trial running (bool): Flag that signals whether the subject is currently running a task or not. data_queue (:class:`queue.Queue`): Queue to dump data while running task thread (:class:`threading.Thread`): thread used to keep file open while running task did_graduate (:class:`threading.Event`): Event used to signal if the subject has graduated the current step STRUCTURE (list): list of tuples with order: * full path, eg. '/history/weights' * relative path, eg. '/history' * name, eg. 'weights' * type, eg. :class:`.Subject.Weight_Table` or 'group' node locations (eg. '/data') to types, either 'group' for groups or a :class:`tables.IsDescriptor` for tables. """ def __init__(self, name: str=None, dir: str=None, file: str=None, new: bool=False, biography: dict=None): """ Args: name (str): subject ID dir (str): path where the .h5 file is located, if `None`, `prefs.get('DATADIR')` is used file (str): load a subject from a filename. if `None`, ignored. new (bool): if True, a new file is made (a new file is made if one does not exist anyway) biography (dict): If making a new subject file, a dictionary with biographical data can be passed """ # try to get name first off for logger self.name = name self.logger = init_logger(self) self.STRUCTURE = [ ('/data', '/', 'data', 'group'), ('/history', '/', 'history' 'group'), ('/history/hashes', '/history', 'hashes', self.Hash_Table), ('/history/history', '/history', 'history', self.History_Table), ('/history/weights', '/history', 'weights', self.Weight_Table), ('/history/past_protocols', '/history', 'past_protocols', 'group'), ('/info', '/', 'info', 'group') ] # use a filter to compress continuous data self.continuous_filter = tables.Filters(complib='blosc', complevel=6) self.lock = threading.Lock() if not dir: try: dir = prefs.get('DATADIR') except AttributeError: dir = os.path.split(file)[0] if not name: if not file: raise Exception('Need to either have a name or a file, how else would we find the .h5 file?') if not os.path.isfile(file): raise Exception('no file was found at passed file: {}'.format(file)) self.file = file else: if file: self.logger.warning('file passed, but so was name, defaulting to using name + dir') self.name = str(name) self.file = os.path.join(dir, name + '.h5') if new or not os.path.isfile(self.file): # set new to true in case new'd from absence of file new = True self.new_subject_file(biography) # before we open, make sure we have the stuff we need self.ensure_structure() h5f = self.open_hdf() if not name: try: self.name = h5f.root.info._v_attrs['name'] except KeyError: self.logger.warning('No Name attribute saved, trying to recover from filename') self.name = os.path.splitext(os.path.split(file)[-1])[0] # If subject has a protocol, load it to a dict self.current = None self.step = None self.protocol_name = None if "/current" in h5f: # We load the info from 'current' but don't keep the node open # Stash it as a dict so better access from Python current_node = filenode.open_node(h5f.root.current) protocol_string = current_node.readall() self.current = json.loads(protocol_string) self.step = int(current_node.attrs['step']) self.protocol_name = current_node.attrs['protocol_name'] elif not new: # if we're not being created for the first time, warn that there is no protocol assigned to the subject self.logger.warning('Subject has no protocol assigned!') # get last session number if we have it try: self.session = int(h5f.root.info._v_attrs['session']) except KeyError: self.session = None # We will get handles to trial and continuous data when we start running self.current_trial = None # Is the subject currently running (ie. we expect data to be incoming) # Used to keep the subject object alive, otherwise we close the file whenever we don't need it self.running = False # We use a threading queue to dump data into a kept-alive h5f file self.data_queue = None self.thread = None self.did_graduate = threading.Event() # Every time we are initialized we stash the git hash history_row = h5f.root.history.hashes.row history_row['time'] = self.get_timestamp() try: history_row['hash'] = prefs.get('HASH') # FIXME: less implicit way of getting hash plz except AttributeError: history_row['hash'] = '' history_row.append() # we have to always open and close the h5f self.close_hdf(h5f)
[docs] def open_hdf(self, mode='r+'): """ Opens the hdf5 file. This should be called at the start of every method that access the h5 file and :meth:`~.Subject.close_hdf` should be called at the end. Otherwise the file will close and we risk file corruption. See the pytables docs `here <https://www.pytables.org/cookbook/threading.html>`_ and `here <https://www.pytables.org/FAQ.html#can-pytables-be-used-in-concurrent-access-scenarios>`_ Args: mode (str): a file access mode, can be: * 'r': Read-only - no data can be modified. * 'w': Write - a new file is created (an existing file with the same name would be deleted). * 'a' Append - an existing file is opened for reading and writing, and if the file does not exist it is created. * 'r+' (default) - Similar to 'a', but file must already exist. Returns: :class:`tables.File`: Opened hdf file. """ # TODO: Use a decorator around methods instead of explicitly calling with self.lock: return tables.open_file(self.file, mode=mode)
[docs] def close_hdf(self, h5f): # type: (tables.file.File) -> None """ Flushes & closes the open hdf file. Must be called whenever :meth:`~.Subject.open_hdf` is used. Args: h5f (:class:`tables.File`): the hdf file opened by :meth:`~.Subject.open_hdf` """ with self.lock: h5f.flush() h5f.close()
[docs] def new_subject_file(self, biography): """ Create a new subject file and make the general filestructure. If a file already exists, open it in append mode, otherwise create it. Args: biography (dict): Biographical details like DOB, mass, etc. Typically created by :class:`~.gui.New_Subject_Wizard.Biography_Tab`. """ # If a file already exists, we open it for appending so we don't lose data. # For now we are assuming that the existing file has the basic structure, # but that's probably a bad assumption for full reliability if os.path.isfile(self.file): h5f = self.open_hdf(mode='a') else: h5f = self.open_hdf(mode='w') # Make Basic file structure h5f.create_group("/","data","Trial Record Data") h5f.create_group("/","info","Biographical Info") history_group = h5f.create_group("/","history","History") # When a whole protocol is changed, we stash the old protocol as a filenode in the past_protocols group h5f.create_group("/history", "past_protocols",'Past Protocol Files') # Also canonical to the basic file structure is the 'current' filenode which stores the current protocol, # but since we want to be able to tell that a protocol hasn't been assigned yet we don't instantiate it here # See http://www.pytables.org/usersguide/filenode.html # filenode.new_node(h5f, where="/", name="current") # We keep track of changes to parameters, promotions, etc. in the history table h5f.create_table(history_group, 'history', self.History_Table, "Change History") # Make table for weights h5f.create_table(history_group, 'weights', self.Weight_Table, "Subject Weights") # And another table to stash the git hash every time we're open. h5f.create_table(history_group, 'hashes', self.Hash_Table, "Git commit hash history") # Save biographical information as node attributes if biography: for k, v in biography.items(): h5f.root.info._v_attrs[k] = v h5f.root.info._v_attrs['name'] = self.name h5f.root.info._v_attrs['session'] = 0 self.close_hdf(h5f)
[docs] def ensure_structure(self): """ Ensure that our h5f has the appropriate baseline structure as defined in `self.STRUCTURE` Checks that all groups and tables are made, makes them if not """ h5f = self.open_hdf() for node in self.STRUCTURE: try: node = h5f.get_node(node[0]) except tables.exceptions.NoSuchNodeError: # try to make it if isinstance(node[3], str): if node[3] == 'group': h5f.create_group(node[1], node[2]) elif issubclass(node[3], tables.IsDescription): h5f.create_table(node[1], node[2], description=node[3]) self.close_hdf(h5f)
[docs] def update_biography(self, params): """ Change or make a new biographical attribute, stored as attributes of the `info` group. Args: params (dict): biographical attributes to be updated. """ h5f = self.open_hdf() for k, v in params.items(): h5f.root.info._v_attrs[k] = v _ = self.close_hdf(h5f)
[docs] def update_history(self, type, name, value, step=None): """ Update the history table when changes are made to the subject's protocol. The current protocol is flushed to the past_protocols group and an updated filenode is created. Note: This **only** updates the history table, and does not make the changes itself. Args: type (str): What type of change is being made? Can be one of * 'param' - a parameter of one task stage * 'step' - the step of the current protocol * 'protocol' - the whole protocol is being updated. name (str): the name of either the parameter being changed or the new protocol value (str): the value that the parameter or step is being changed to, or the protocol dictionary flattened to a string. step (int): When type is 'param', changes the parameter at a particular step, otherwise the current step is used. """ self.logger.info(f'Updating subject {self.name} history - type: {type}, name: {name}, value: {value}, step: {step}') # Make sure the updates are written to the subject file if type == 'param': if not step: self.current[self.step][name] = value else: self.current[step][name] = value self.flush_current() elif type == 'step': self.step = int(value) self.flush_current() elif type == 'protocol': self.flush_current() # Check that we're all strings in here if not isinstance(type, str): type = str(type) if not isinstance(name, str): name = str(name) if not isinstance(value, str): value = str(value) # log the change h5f = self.open_hdf() history_row = h5f.root.history.history.row history_row['time'] = self.get_timestamp(simple=True) history_row['type'] = type history_row['name'] = name history_row['value'] = value history_row.append() _ = self.close_hdf(h5f)
# def update_params(self, param, value): # """ # Args: # param: # value: # """ # # TODO: this # pass
[docs] def assign_protocol(self, protocol, step_n=0): """ Assign a protocol to the subject. If the subject has a currently assigned task, stashes it with :meth:`~.Subject.stash_current` Creates groups and tables according to the data descriptions in the task class being assigned. eg. as described in :class:`.Task.TrialData`. Updates the history table. Args: protocol (str): the protocol to be assigned. Can be one of * the name of the protocol (its filename minus .json) if it is in `prefs.get('PROTOCOLDIR')` * filename of the protocol (its filename with .json) if it is in the `prefs.get('PROTOCOLDIR')` * the full path and filename of the protocol. step_n (int): Which step is being assigned? """ # Protocol will be passed as a .json filename in prefs.get('PROTOCOLDIR') h5f = self.open_hdf() ## Assign new protocol if not protocol.endswith('.json'): protocol = protocol + '.json' # try prepending the protocoldir if we were passed just the name if not os.path.exists(protocol): fullpath = os.path.join(prefs.get('PROTOCOLDIR'), protocol) if not os.path.exists(fullpath): raise Exception('Could not find either {} or {}'.format(protocol, fullpath)) protocol = fullpath # Set name and step # Strip off path and extension to get the protocol name protocol_name = os.path.splitext(protocol)[0].split(os.sep)[-1] # check if this is the same protocol so we don't reset session number same_protocol = False if (protocol_name == self.protocol_name) and (step_n == self.step): same_protocol = True # Load protocol to dict with open(protocol) as protocol_file: prot_dict = json.load(protocol_file) # pdb.set_trace() # Check if there is an existing protocol, archive it if there is. if "/current" in h5f: _ = self.close_hdf(h5f) self.stash_current() h5f = self.open_hdf() # Make filenode and save as serialized json current_node = filenode.new_node(h5f, where='/', name='current') current_node.write(json.dumps(prot_dict).encode('utf-8')) h5f.flush() # save some protocol attributes self.current = prot_dict current_node.attrs['protocol_name'] = protocol_name self.protocol_name = protocol_name current_node.attrs['step'] = step_n self.step = int(step_n) # always start out on session 0 on a new task # unless this is the same task as was already assigned if not same_protocol: h5f.root.info._v_attrs['session'] = 0 self.session = 0 # Make file group for protocol if "/data/{}".format(protocol_name) not in h5f: current_group = h5f.create_group('/data', protocol_name) else: current_group = h5f.get_node('/data', protocol_name) # Create groups for each step # There are two types of data - continuous and trialwise. # Each gets a single table within a group: since each step should have # consistent data requirements over time and hdf5 doesn't need to be in # memory, we can just keep appending to keep things simple. for i, step in enumerate(self.current): # First we get the task class for this step task_class = autopilot.get_task(step['task_type']) step_name = step['step_name'] # group name is S##_'step_name' group_name = "S{:02d}_{}".format(i, step_name) if group_name not in current_group: step_group = h5f.create_group(current_group, group_name) else: step_group = current_group._f_get_child(group_name) # The task class *should* have at least one PyTables DataTypes descriptor try: if task_class.TrialData is not None: trial_descriptor = task_class.TrialData # add a session column, everyone needs a session column if 'session' not in trial_descriptor.columns.keys(): trial_descriptor.columns.update({'session': tables.Int32Col()}) # same thing with trial_num if 'trial_num' not in trial_descriptor.columns.keys(): trial_descriptor.columns.update({'trial_num': tables.Int32Col()}) # if this task has sounds, make columns for them # TODO: Make stim managers return a list of properties for their sounds if 'stim' in step.keys(): if 'groups' in step['stim'].keys(): # managers have stim nested within groups, but this is still really ugly sound_params = {} for g in step['stim']['groups']: for side, sounds in g['sounds'].items(): for sound in sounds: for k, v in sound.items(): if k in STRING_PARAMS: sound_params[k] = tables.StringCol(1024) else: sound_params[k] = tables.Float64Col() trial_descriptor.columns.update(sound_params) elif 'sounds' in step['stim'].keys(): # for now we just assume they're floats sound_params = {} for side, sounds in step['stim']['sounds'].items(): # each side has a list of sounds for sound in sounds: for k, v in sound.items(): if k in STRING_PARAMS: sound_params[k] = tables.StringCol(1024) else: sound_params[k] = tables.Float64Col() trial_descriptor.columns.update(sound_params) h5f.create_table(step_group, "trial_data", trial_descriptor) else: self.logger.warning('No trial data descriptor found, making default table with session and trial_num') h5f.create_table(step_group, "trial_data", {'session': tables.Int32Col(), 'trial_num': tables.Int32Col()}) except tables.NodeError: # we already have made this table, that's fine pass try: # if we have continuous data, make a folder for each data stream. # each session will make its own subfolder, # which contains tables for each of the streams for that session if hasattr(task_class, "ContinuousData"): cont_group = h5f.create_group(step_group, "continuous_data") # save data names as attributes data_names = tuple(task_class.ContinuousData.keys()) cont_group._v_attrs['data'] = data_names #cont_descriptor = task_class.ContinuousData #cont_descriptor.columns.update({'session': tables.Int32Col()}) #h5f.create_table(step_group, "continuous_data", cont_descriptor) except tables.NodeError: # already made it pass _ = self.close_hdf(h5f) # Update history self.update_history(type='protocol', name=protocol_name, value=self.current) self.update_history(type='step', name=self.current[self.step]['step_name'], value=self.step)
[docs] def flush_current(self): """ Flushes the 'current' attribute in the subject object to the current filenode in the .h5 Used to make sure the stored .json representation of the current task stays up to date with the params set in the subject object """ h5f = self.open_hdf() h5f.remove_node('/current') current_node = filenode.new_node(h5f, where='/', name='current') current_node.write(json.dumps(self.current).encode('utf-8')) current_node.attrs['step'] = self.step current_node.attrs['protocol_name'] = self.protocol_name self.close_hdf(h5f) self.logger.debug('current protocol flushed')
[docs] def stash_current(self): """ Save the current protocol in the history group and delete the node Typically this is called when assigning a new protocol. Stored as the date that it was changed followed by its name if it has one """ h5f = self.open_hdf() try: protocol_name = h5f.get_node_attr('/current', 'protocol_name') archive_name = '_'.join([self.get_timestamp(simple=True), protocol_name]) except AttributeError: warnings.warn("protocol_name attribute couldn't be accessed, using timestamp to stash protocol") archive_name = self.get_timestamp(simple=True) # TODO: When would we want to prefer the .h5f copy over the live one? #current_node = filenode.open_node(h5f.root.current) #old_protocol = current_node.readall() archive_node = filenode.new_node(h5f, where='/history/past_protocols', name=archive_name) archive_node.write(json.dumps(self.current).encode('utf-8')) h5f.remove_node('/current') self.close_hdf(h5f) self.logger.debug('current protocol stashed')
[docs] def prepare_run(self): """ Prepares the Subject object to receive data while running the task. Gets information about current task, trial number, spawns :class:`~.tasks.graduation.Graduation` object, spawns :attr:`~.Subject.data_queue` and calls :meth:`~.Subject.data_thread`. Returns: Dict: the parameters for the current step, with subject id, step number, current trial, and session number included. """ if self.current is None: e = RuntimeError('No task assigned to subject, cant prepare_run. use Subject.assign_protocol or protocol reassignment wizard in the terminal GUI') self.logger.exception(f"{e}") raise e trial_table = None cont_table = None # get step history try: step_df = self.get_step_history(use_history=True) except Exception as e: self.logger.exception(f"Couldnt get step history to trim data given to graduation objects, got exception {e}") step_df = None h5f = self.open_hdf() # Get current task parameters and handles to tables task_params = self.current[self.step] step_name = task_params['step_name'] # file structure is '/data/protocol_name/##_step_name/tables' group_name = "/data/{}/S{:02d}_{}".format(self.protocol_name, self.step, step_name) #try: # tasks without TrialData will have some default table, so this should always be present trial_table = h5f.get_node(group_name, 'trial_data') ##################################3 # first try and find some timestamp column to filter past data we give to the graduation object # in case the subject has been stepped back down to a previous stage, for example # FIXME: Hardcoding parameter names, should have a guaranteed 'trial_timestamp' column for each trial slice_start = 0 try: ts_cols = [col for col in trial_table.colnames if 'timestamp' in col] # just use the first timestamp column if len(ts_cols) > 0: trial_ts = pd.DataFrame({'timestamp': trial_table.col(ts_cols[0])}) trial_ts['timestamp'] = pd.to_datetime(trial_ts['timestamp'].str.decode('utf-8')) else: self.logger.warning( 'No timestamp column could be found in trial data, cannot trim data given to graduation objects') trial_ts = None if trial_ts is not None and step_df is not None: # see where, if any, the timestamp column is older than the last time the step was changed good_rows = np.where(trial_ts['timestamp'] >= step_df['timestamp'].iloc[-1])[0] if len(good_rows) > 0: slice_start = np.min(good_rows) # otherwise if it's because we found no good rows but have trials, # we will say not to use them, otherwise we say not to use them by # slicing at the end of the table else: slice_start = trial_table.nrows except Exception as e: self.logger.exception( f"Couldnt trim data given to graduation objects with step change history, got exception {e}") trial_tab = trial_table.read(start=slice_start) trial_tab_keys = tuple(trial_tab.dtype.fields.keys()) ############################## # get last trial number and session try: self.current_trial = trial_tab['trial_num'][-1]+1 except IndexError: if 'trial_num' not in trial_tab_keys: self.logger.info('No previous trials detected, setting current_trial to 0') self.current_trial = 0 # should have gotten session from current node when we started # so sessions increment over the lifespan of the subject, even if # reassigned. if not self.session: try: self.session = trial_tab['session'][-1] except IndexError: if 'session' not in trial_tab_keys: self.logger.warning('previous session couldnt be found, setting to 0') self.session = 0 self.session += 1 h5f.root.info._v_attrs['session'] = self.session h5f.flush() # prepare continuous data group and tables task_class = autopilot.get_task(task_params['task_type']) cont_group = None if hasattr(task_class, 'ContinuousData'): cont_group = h5f.get_node(group_name, 'continuous_data') try: session_group = h5f.create_group(cont_group, "session_{}".format(self.session)) except tables.NodeError: session_group = h5f.get_node(cont_group, "session_{}".format(self.session)) # don't create arrays for each dtype here, we will create them as we receive data # if (trial_table is None) and (cont_group is None): # raise Exception("No data tables exist for step {}! Is there a Trial or Continuous data descriptor in the task class?".format(self.step)) # TODO: Spawn graduation checking object! self.graduation = None if 'graduation' in task_params.keys(): try: grad_type = task_params['graduation']['type'] grad_params = task_params['graduation']['value'].copy() # add other params asked for by the task class grad_obj = autopilot.get('graduation', grad_type) if grad_obj.PARAMS: # these are params that should be set in the protocol settings for param in grad_obj.PARAMS: #if param not in grad_params.keys(): # for now, try to find it in our attributes # but don't overwrite if it already has what it needs in case # of name overlap # TODO: See where else we would want to get these from if hasattr(self, param) and param not in grad_params.keys(): grad_params.update({param:getattr(self, param)}) if grad_obj.COLS: # these are columns in our trial table # then give the data to the graduation object for col in grad_obj.COLS: try: grad_params.update({col: trial_tab[col]}) except KeyError: self.logger.warning('Graduation object requested column {}, but it was not found in the trial table'.format(col)) #grad_params['value']['current_trial'] = str(self.current_trial) # str so it's json serializable self.graduation = grad_obj(**grad_params) self.did_graduate.clear() except Exception as e: self.logger.exception(f'Exception in graduation parameter specification, graduation is disabled.\ngot error: {e}') else: self.graduation = None self.close_hdf(h5f) # spawn thread to accept data self.data_queue = queue.Queue() self.thread = threading.Thread(target=self.data_thread, args=(self.data_queue,)) self.thread.start() self.running = True # return a task parameter dictionary task = copy(self.current[self.step]) task['subject'] = self.name task['step'] = int(self.step) task['current_trial'] = int(self.current_trial) task['session'] = int(self.session) return task
[docs] def data_thread(self, queue): """ Thread that keeps hdf file open and receives data while task is running. receives data through :attr:`~.Subject.queue` as dictionaries. Data can be partial-trial data (eg. each phase of a trial) as long as the task returns a dict with 'TRIAL_END' as a key at the end of each trial. each dict given to the queue should have the `trial_num`, and this method can properly store data without passing `TRIAL_END` if so. I recommend being explicit, however. Checks graduation state at the end of each trial. Args: queue (:class:`queue.Queue`): passed by :meth:`~.Subject.prepare_run` and used by other objects to pass data to be stored. """ h5f = self.open_hdf() task_params = self.current[self.step] step_name = task_params['step_name'] # file structure is '/data/protocol_name/##_step_name/tables' group_name = f"/data/{self.protocol_name}/S{self.step:02d}_{step_name}" #try: trial_table = h5f.get_node(group_name, 'trial_data') trial_keys = trial_table.colnames trial_row = trial_table.row # try to get continuous data table if any cont_data = tuple() cont_tables = {} cont_rows = {} try: continuous_group = h5f.get_node(group_name, 'continuous_data') session_group = h5f.get_node(continuous_group, 'session_{}'.format(self.session)) cont_data = continuous_group._v_attrs['data'] cont_tables = {} cont_rows = {} except AttributeError: continuous_table = False # start getting data # stop when 'END' gets put in the queue for data in iter(queue.get, 'END'): # wrap everything in try because this thread shouldn't crash try: # if we get continuous data, this should be simple because we always get a whole row # there must be a more elegant way to check if something is a key and it is true... # yet here we are if 'continuous' in data.keys(): for k, v in data.items(): # if this isn't data that we're expecting, ignore it if k not in cont_data: continue # if we haven't made a table yet, do it if k not in cont_tables.keys(): # make atom for this data try: # if it's a numpy array... col_atom = tables.Atom.from_type(v.dtype.name, v.shape) except AttributeError: temp_array = np.array(v) col_atom = tables.Atom.from_type(temp_array.dtype.name, temp_array.shape) # should have come in with a timestamp # TODO: Log if no timestamp is received try: temp_timestamp_arr = np.array(data['timestamp']) timestamp_atom = tables.Atom.from_type(temp_timestamp_arr.dtype.name, temp_timestamp_arr.shape) except KeyError: self.logger.warning('no timestamp sent with continuous data') continue cont_tables[k] = h5f.create_table(session_group, k, description={ k: tables.Col.from_atom(col_atom), 'timestamp': tables.Col.from_atom(timestamp_atom) }, filters=self.continuous_filter) cont_rows[k] = cont_tables[k].row cont_rows[k][k] = v cont_rows[k]['timestamp'] = data['timestamp'] cont_rows[k].append() # continue, the rest is for handling trial data continue # Check if this is the same # if we've already recorded a trial number for this row, # and the trial number we just got is not the same, # we edit that row if we already have some data on it or else start a new row if 'trial_num' in data.keys(): if (trial_row['trial_num']) and (trial_row['trial_num'] is None): trial_row['trial_num'] = data['trial_num'] if (trial_row['trial_num']) and (trial_row['trial_num'] != data['trial_num']): # find row with this trial number if it exists # this will return a list of rows with matching trial_num. # if it's empty, we didn't receive a TRIAL_END and should create a new row other_row = [r for r in trial_table.where("trial_num == {}".format(data['trial_num']))] if len(other_row) == 0: # proceed to fill the row below trial_row.append() elif len(other_row) == 1: # update the row and continue so we don't double write # have to be in the middle of iteration to use update() for row in trial_table.where("trial_num == {}".format(data['trial_num'])): for k, v in data.items(): if k in trial_keys: row[k] = v row.update() continue else: # we have more than one row with this trial_num. # shouldn't happen, but we dont' want to throw any data away self.logger.warning('Found multiple rows with same trial_num: {}'.format(data['trial_num'])) # continue just for data conservancy's sake trial_row.append() for k, v in data.items(): # some bug where some columns are not always detected, # rather than failing out here, just log error if k in trial_keys: try: trial_row[k] = v except KeyError: # TODO: Logging here self.logger.warning("Data dropped: key: {}, value: {}".format(k, v)) # TODO: Or if all the values have been filled, shouldn't need explicit TRIAL_END flags if 'TRIAL_END' in data.keys(): trial_row['session'] = self.session if self.graduation: # set our graduation flag, the terminal will get the rest rolling did_graduate = self.graduation.update(trial_row) if did_graduate is True: self.did_graduate.set() trial_row.append() trial_table.flush() # always flush so that our row iteration routines above will find what they're looking for trial_table.flush() except Exception as e: # we shouldn't throw any exception in this thread, just log it and move on self.logger.exception(f'exception in data thread: {e}') self.close_hdf(h5f)
[docs] def save_data(self, data): """ Alternate and equivalent method of putting data in the queue as `Subject.data_queue.put(data)` Args: data (dict): trial data. each should have a 'trial_num', and a dictionary with key 'TRIAL_END' should be passed at the end of each trial. """ self.data_queue.put(data)
[docs] def stop_run(self): """ puts 'END' in the data_queue, which causes :meth:`~.Subject.data_thread` to end. """ self.data_queue.put('END') self.thread.join(5) self.running = False if self.thread.is_alive(): self.logger.warning('Data thread did not exit')
[docs] def to_csv(self, path, task='current', step='all'): """ Export trial data to .csv Args: path (str): output path of .csv task (str, int): not implemented, but in the future pull data from 'current' or other named task step (str, int, list, tuple): Step to select, see :meth:`.Subject.get_trial_data` """ # TODO: Jonny just scratching out temporarily, doesn't have all features implemented df = self.get_trial_data(step=step) df['subject'] = self.name df.to_csv(path) print("""Subject {} dataframe saved to:\n {} ======================== N Trials: {} N Sessions: {}""".format(self.name, path, df.shape[0], len(df.session.unique())))
[docs] def get_trial_data(self, step: typing.Union[int, list, str] = -1, what: str ="data"): """ Get trial data from the current task. Args: step (int, list, 'all'): Step that should be returned, can be one of * -1: most recent step * int: a single step * list of two integers eg. [0, 5], an inclusive range of steps. * string: the name of a step (excluding S##_) * 'all': all steps. what (str): What should be returned? * 'data' : Dataframe of requested steps' trial data * 'variables': dict of variables *without* loading data into memory Returns: :class:`pandas.DataFrame`: DataFrame of requested steps' trial data. """ # step= -1 is just most recent step, # step= int is an integer specified step # step= [n1, n2] is from step n1 to n2 inclusive # step= 'all' or anything that isn't an int or a list is all steps h5f = self.open_hdf() group_name = "/data/{}".format(self.protocol_name) group = h5f.get_node(group_name) step_groups = sorted(group._v_children.keys()) if step == -1: # find the last trial step with data for step_name in reversed(step_groups): if group._v_children[step_name].trial_data.attrs['NROWS']>0: step_groups = [step_name] break elif isinstance(step, int): if step > len(step_groups): ValueError('You provided a step number ({}) greater than the number of steps in the subjects assigned protocol: ()'.format(step, len(step_groups))) step_groups = [step_groups[step]] elif isinstance(step, str) and step != 'all': # since step names have S##_ prepended in the hdf5 file, # but we want to be able to call them by their human readable name, # have to make sure we have the right form _step_groups = [s for s in step_groups if s == step] if len(_step_groups) == 0: _step_groups = [s for s in step_groups if step in s] step_groups = _step_groups elif isinstance(step, list): if isinstance(step[0], int): step_groups = step_groups[int(step[0]):int(step[1])] elif isinstance(step[0], str): _step_groups = [] for a_step in step: step_name = [s for s in step_groups if s==a_step] if len(step_name) == 0: step_name = [s for s in step_groups if a_step in s] _step_groups.extend(step_name) step_groups = _step_groups print('step groups:') print(step_groups) if what == "variables": return_data = {} for step_key in step_groups: step_n = int(step_key[1:3]) # beginning of keys will be 'S##' step_tab = group._v_children[step_key]._v_children['trial_data'] if what == "data": step_df = pd.DataFrame(step_tab.read()) step_df['step'] = step_n step_df['step_name'] = step_key try: return_data = return_data.append(step_df, ignore_index=True) except NameError: return_data = step_df elif what == "variables": return_data[step_key] = step_tab.coldescrs self.close_hdf(h5f) return return_data
[docs] def apply_along(self, along='session', step=-1): h5f = self.open_hdf() group_name = "/data/{}".format(self.protocol_name) group = h5f.get_node(group_name) step_groups = sorted(group._v_children.keys()) if along == "session": if step == -1: # find the last trial step with data for step_name in reversed(step_groups): if group._v_children[step_name].trial_data.attrs['NROWS'] > 0: step_groups = [step_name] break elif isinstance(step, int): if step > len(step_groups): ValueError( 'You provided a step number ({}) greater than the number of steps in the subjects assigned protocol: ()'.format( step, len(step_groups))) step_groups = [step_groups[step]] for step_key in step_groups: step_n = int(step_key[1:3]) # beginning of keys will be 'S##' step_tab = group._v_children[step_key]._v_children['trial_data'] step_df = pd.DataFrame(step_tab.read()) step_df['step'] = step_n yield step_df
[docs] def get_step_history(self, use_history=True): """ Gets a dataframe of step numbers, timestamps, and step names as a coarse view of training status. Args: use_history (bool): whether to use the history table or to reconstruct steps and dates from the trial table itself. compatibility fix for old versions that didn't stash step changes when the whole protocol was updated. Returns: :class:`pandas.DataFrame` """ h5f = self.open_hdf() if use_history: history = h5f.root.history.history step_df = pd.DataFrame(history.read()) if step_df.shape[0] == 0: return None # encode as unicode # https://stackoverflow.com/a/63028569/13113166 for col, dtype in step_df.dtypes.items(): if dtype == np.object: # Only process byte object columns. step_df[col] = step_df[col].apply(lambda x: x.decode("utf-8")) # filter to step only step_df = step_df[step_df['type'] == 'step'].drop('type', axis=1) # rename and retype step_df = step_df.rename(columns={ 'value': 'step_n', 'time': 'timestamp', 'name': 'name'}) step_df['timestamp'] = pd.to_datetime(step_df['timestamp'], format='%y%m%d-%H%M%S') step_df['step_n'] = pd.to_numeric(step_df['step_n']) else: group_name = "/data/{}".format(self.protocol_name) group = h5f.get_node(group_name) step_groups = sorted(group._v_children.keys()) # find the last trial step with data for step_name in reversed(step_groups): if group._v_children[step_name].trial_data.attrs['NROWS']>0: step_groups = [step_name] break # Iterate through steps, find first timestamp, use that. for step_key in step_groups: step_n = int(step_key[1:3]) # beginning of keys will be 'S##' step_name = self.current[step_n]['step_name'] step_tab = group._v_children[step_key]._v_children['trial_data'] # find name of column that is a timestamp colnames = step_tab.cols._v_colnames try: ts_column = [col for col in colnames if "timestamp" in col][0] ts = step_tab.read(start=0, stop=1, field=ts_column) except IndexError: self.logger.warning('No Timestamp column found, only returning step numbers and named that were reached') ts = 0 step_df = pd.DataFrame( {'step_n':step_n, 'timestamp':ts, 'name':step_name }) try: return_df = return_df.append(step_df, ignore_index=True) except NameError: return_df = step_df step_df = return_df self.close_hdf(h5f) return step_df
[docs] def get_timestamp(self, simple=False): # type: (bool) -> str """ Makes a timestamp. Args: simple (bool): if True: returns as format '%y%m%d-%H%M%S', eg '190201-170811' if False: returns in isoformat, eg. '2019-02-01T17:08:02.058808' Returns: basestring """ # Timestamps have two different applications, and thus two different formats: # coarse timestamps that should be human-readable # fine timestamps for data analysis that don't need to be if simple: return datetime.datetime.now().strftime('%y%m%d-%H%M%S') else: return datetime.datetime.now().isoformat()
[docs] def get_weight(self, which='last', include_baseline=False): """ Gets start and stop weights. TODO: add ability to get weights by session number, dates, and ranges. Args: which (str): if 'last', gets most recent weights. Otherwise returns all weights. include_baseline (bool): if True, includes baseline and minimum mass. Returns: dict """ # get either the last start/stop weights, optionally including baseline # TODO: Get by session weights = {} h5f = self.open_hdf() weight_table = h5f.root.history.weights if which == 'last': for column in weight_table.colnames: try: weights[column] = weight_table.read(-1, field=column)[0] except IndexError: weights[column] = None else: for column in weight_table.colnames: try: weights[column] = weight_table.read(field=column) except IndexError: weights[column] = None if include_baseline is True: try: baseline = float(h5f.root.info._v_attrs['baseline_mass']) except KeyError: baseline = 0.0 minimum = baseline*0.8 weights['baseline_mass'] = baseline weights['minimum_mass'] = minimum self.close_hdf(h5f) return weights
[docs] def set_weight(self, date, col_name, new_value): """ Updates an existing weight in the weight table. TODO: Yes, i know this is bad. Merge with update_weights Args: date (str): date in the 'simple' format, %y%m%d-%H%M%S col_name ('start', 'stop'): are we updating a pre-task or post-task weight? new_value (float): New mass. """ h5f = self.open_hdf() weight_table = h5f.root.history.weights # there should only be one matching row since it includes seconds for row in weight_table.where('date == b"{}"'.format(date)): row[col_name] = new_value row.update() self.close_hdf(h5f)
[docs] def update_weights(self, start=None, stop=None): """ Store either a starting or stopping mass. `start` and `stop` can be passed simultaneously, `start` can be given in one call and `stop` in a later call, but `stop` should not be given before `start`. Args: start (float): Mass before running task in grams stop (float): Mass after running task in grams. """ h5f = self.open_hdf() if start is not None: weight_row = h5f.root.history.weights.row weight_row['date'] = self.get_timestamp(simple=True) weight_row['session'] = self.session weight_row['start'] = float(start) weight_row.append() elif stop is not None: # TODO: Make this more robust - don't assume we got a start weight h5f.root.history.weights.cols.stop[-1] = stop else: self.logger.warning("Need either a start or a stop weight") _ = self.close_hdf(h5f)
[docs] def graduate(self): """ Increase the current step by one, unless it is the last step. """ if len(self.current)<=self.step+1: self.logger.warning('Tried to graduate from the last step!\n Task has {} steps and we are on {}'.format(len(self.current), self.step+1)) return # increment step, update_history should handle the rest step = self.step+1 name = self.current[step]['step_name'] self.update_history('step', name, step)
class History_Table(tables.IsDescription): """ Class to describe parameter and protocol change history Attributes: time (str): timestamps type (str): Type of change - protocol, parameter, step name (str): Name - Which parameter was changed, name of protocol, manual vs. graduation step change value (str): Value - What was the parameter/protocol/etc. changed to, step if protocol. """ time = tables.StringCol(256) type = tables.StringCol(256) name = tables.StringCol(256) value = tables.StringCol(4028) class Weight_Table(tables.IsDescription): """ Class to describe table for weight history Attributes: start (float): Pre-task mass stop (float): Post-task mass date (str): Timestamp in simple format session (int): Session number """ start = tables.Float32Col() stop = tables.Float32Col() date = tables.StringCol(256) session = tables.Int32Col() class Hash_Table(tables.IsDescription): """ Class to describe table for hash history Attributes: time (str): Timestamps hash (str): Hash of the currently checked out commit of the git repository. """ time = tables.StringCol(256) hash = tables.StringCol(40)