Interfaces for pytables and hdf5 generally
import typing
from typing import Union
from abc import abstractmethod
from typing import Optional, List
from pydantic import create_model

from datetime import datetime

import tables

from autopilot import Autopilot_Type
from import Interface_Mapset, Interface_Map, Interface, resolve_type, _NUMPY_TO_BUILTIN
from import Node, Group

if typing.TYPE_CHECKING:
    from import Table

_datetime_conversion: typing.Callable[[datetime], str] = lambda x: x.isoformat()

[docs]class H5F_Node(Node): """ Base class for H5F Nodes """ path:str title:Optional[str]='' filters:Optional[tables.filters.Filters]=None attrs:Optional[dict]=None def __init__(self, **data): self._init_logger() super().__init__(**data) @property def parent(self) -> str: """ The parent node under which this node hangs. Eg. if ``self.path`` is ``/this/is/my/path``, then parent will be ``/this/is/my`` Returns: str """ return '/'.join(self.path.split('/')[:-1]) @property def name(self) -> str: """ Our path without :attr:`.parent` Returns: str """ return self.path.split('/')[-1]
[docs] @abstractmethod def make(self, h5f:tables.file.File): """ Abstract method to make whatever this node is """
class Config: arbitrary_types_allowed = True
[docs]class H5F_Group(H5F_Node): """ Description of a pytables group and its location """ children: Optional[List[Union[H5F_Node, 'H5F_Group']]] = None
[docs] def make(self, h5f:tables.file.File): """ Make the group, if it doesn't already exist. If it exists, do nothing Args: h5f (:class:`tables.file.File`): The file to create the table in """ try: node = h5f.get_node(self.path) # if no exception, already exists if not isinstance(node, raise ValueError(f'{self.path} already exists, but it isnt a group! instead its a {type(node)}') except tables.exceptions.NoSuchNodeError: group = h5f.create_group(self.parent,, title=self.title, createparents=True, filters=self.filters) self._logger.debug(f"Made group {'/'.join([self.parent,])}") if self.attrs is not None: group._v_attrs.update(self.attrs) if self.children is not None: for c in self.children: c.make(h5f) h5f.flush()
[docs]class H5F_Table(H5F_Node): description: tables.description.MetaIsDescription expectedrows:int=10000
[docs] def make(self, h5f:tables.file.File): """ Make this table according to its description Args: h5f (:class:`tables.file.File`): The file to create the table in """ try: node = h5f.get_node(self.path) if not isinstance(node, tables.table.Table): raise ValueError(f'{self.path} already exists, but it isnt a Table! instead its a {type(node)}') elif set(node.description._v_names) != set(list(self.description.columns.keys())): self._logger.warning(f"Found existing table with columns {node.description._v_names}, but requested a table with {list(self.description.columns.keys())}, remaking.") self._remake_table(h5f) else: self._logger.warning('Found existing table that matches the requested description, not remaking.') except tables.exceptions.NoSuchNodeError: tab = h5f.create_table(self.parent,, self.description, title=self.title, filters=self.filters, createparents=True,expectedrows=self.expectedrows) self._logger.debug(f"Made table {'/'.join([self.parent,])}") if self.attrs is not None: tab._v_attrs.update(self.attrs) h5f.flush()
def _remake_table(self, h5f:tables.file.File): """Remake an existing table, preserving original data. Mostly for adding new columns""" # existing table old_tab = h5f.get_node(self.path) # new table tmp_name = f"{}_tmp" try: node = h5f.get_node('/'.join([self.parent, tmp_name])) node.remove() except tables.NoSuchNodeError: pass new_tab = h5f.create_table(self.parent, tmp_name, self.description, title=self.title, filters=self.filters, createparents=True,expectedrows=self.expectedrows) # check which columns to read and whether we should keep the old table old_cols = old_tab.colnames new_cols = new_tab.colnames remove_old = False would_lose = list(set(old_cols)-set(new_cols)) to_keep = list(set(old_cols).intersection(new_cols)) backup_name = f'{}_bak--0' if len(would_lose) > 0: while backup_name in old_tab._v_parent._v_children.keys(): name_pieces = backup_name.split('--') backup_name = '--'.join([*name_pieces[:-1],str(int(name_pieces[-1])+1)]) self._logger.warning(f"Updating table would delete columns {would_lose}, keeping as {backup_name}") else: remove_old = True # create new rows for i in range(old_tab.nrows): new_tab.row.append() new_tab.flush() # copy columns for add_column in to_keep: getattr(new_tab.cols, add_column)[:] = getattr(old_tab.cols, add_column)[:] new_tab.flush() # move or delete old table if remove_old: self._logger.debug(f'Removing table {old_tab}') old_tab.remove() else: old_tab.move(self.parent, backup_name) # move new table new_tab.move(self.parent, class Config: fields = {'description': {'exclude': True}}
Tables_Mapset = Interface_Mapset( bool = tables.BoolCol, int = tables.Int64Col, float = tables.Float64Col, str = Interface_Map( equals=tables.StringCol, args=[1024] ), bytes = Interface_Map( equals=tables.StringCol, args=[1024] ), datetime = Interface_Map( equals=tables.StringCol, args=[1024], conversion = _datetime_conversion ), group = H5F_Group )
[docs]class Tables_Interface(Interface): map = Tables_Mapset
[docs] def make(self, h5f:tables.file.File) -> bool: pass
[docs]def model_to_description(table: typing.Type['Table']) -> typing.Type[tables.IsDescription]: """ Make a table description from the type annotations in a model Args: table (:class:`.modeling.base.Table`): Table description Returns: :class:`tables.IsDescription` """ # get column descriptions cols = {} for key, field in table.__fields__.items(): type_ = resolve_type(field.type_, resolve_literal=True) type_str = type_.__name__ cols[key] = Tables_Mapset.get(type_str) description = type(table.__name__, (tables.IsDescription,), cols) # type: typing.Type[tables.IsDescription] return description
""" Mapping between dtype.kind and builtin types see """
[docs]def description_to_model(description: typing.Type[tables.IsDescription], cls:typing.Type['Table']) -> 'Table': """ Make a pydantic :class:`.modeling.base.Table` from a :class:`tables.IsDescription` Args: description (:class:`tables.IsDescription`): to convert cls (:class:`.modeling.base.Table`): Subclass of Table to make Returns: Subclass of Table """ description_dict = {} if hasattr(description, 'columns'): iterator = description.columns elif hasattr(description, '_v_colobjects'): # compatibility iwth tables.description.Description types iterator = description._v_colobjects else: raise TypeError(f"Dont know how to convert table from {description}") for key, col in iterator.items(): python_type = _NUMPY_TO_BUILTIN[col.dtype.kind] # tables always accept Lists or singletons python_type = Union[List[python_type], python_type] description_dict[key] = (python_type, ...) model = create_model(cls.__name__, __base__=cls, **description_dict) return model