"""
Interfaces for pytables and hdf5 generally
"""
import typing
from typing import Union
from abc import abstractmethod
from typing import Optional, List
from pydantic import create_model
from datetime import datetime
import tables
from autopilot import Autopilot_Type
from autopilot.data.interfaces.base import Interface_Mapset, Interface_Map, Interface, resolve_type, _NUMPY_TO_BUILTIN
from autopilot.data.modeling.base import Node, Group
if typing.TYPE_CHECKING:
from autopilot.data.modeling.base import Table
_datetime_conversion: typing.Callable[[datetime], str] = lambda x: x.isoformat()
[docs]class H5F_Node(Node):
"""
Base class for H5F Nodes
"""
path:str
title:Optional[str]=''
filters:Optional[tables.filters.Filters]=None
attrs:Optional[dict]=None
def __init__(self, **data):
self._init_logger()
super().__init__(**data)
@property
def parent(self) -> str:
"""
The parent node under which this node hangs.
Eg. if ``self.path`` is ``/this/is/my/path``, then
parent will be ``/this/is/my``
Returns:
str
"""
return '/'.join(self.path.split('/')[:-1])
@property
def name(self) -> str:
"""
Our path without :attr:`.parent`
Returns:
str
"""
return self.path.split('/')[-1]
[docs] @abstractmethod
def make(self, h5f:tables.file.File):
"""
Abstract method to make whatever this node is
"""
class Config:
arbitrary_types_allowed = True
[docs]class H5F_Group(H5F_Node):
"""
Description of a pytables group and its location
"""
children: Optional[List[Union[H5F_Node, 'H5F_Group']]] = None
[docs] def make(self, h5f:tables.file.File):
"""
Make the group, if it doesn't already exist.
If it exists, do nothing
Args:
h5f (:class:`tables.file.File`): The file to create the table in
"""
try:
node = h5f.get_node(self.path)
# if no exception, already exists
if not isinstance(node, tables.group.Group):
raise ValueError(f'{self.path} already exists, but it isnt a group! instead its a {type(node)}')
except tables.exceptions.NoSuchNodeError:
group = h5f.create_group(self.parent, self.name,
title=self.title, createparents=True,
filters=self.filters)
self._logger.debug(f"Made group {'/'.join([self.parent, self.name])}")
if self.attrs is not None:
group._v_attrs.update(self.attrs)
if self.children is not None:
for c in self.children:
c.make(h5f)
h5f.flush()
[docs]class H5F_Table(H5F_Node):
description: tables.description.MetaIsDescription
expectedrows:int=10000
[docs] def make(self, h5f:tables.file.File):
"""
Make this table according to its description
Args:
h5f (:class:`tables.file.File`): The file to create the table in
"""
try:
node = h5f.get_node(self.path)
if not isinstance(node, tables.table.Table):
raise ValueError(f'{self.path} already exists, but it isnt a Table! instead its a {type(node)}')
elif set(node.description._v_names) != set(list(self.description.columns.keys())):
self._logger.warning(f"Found existing table with columns {node.description._v_names}, but requested a table with {list(self.description.columns.keys())}, remaking.")
self._remake_table(h5f)
else:
self._logger.warning('Found existing table that matches the requested description, not remaking.')
except tables.exceptions.NoSuchNodeError:
tab = h5f.create_table(self.parent, self.name, self.description,
title=self.title, filters=self.filters,
createparents=True,expectedrows=self.expectedrows)
self._logger.debug(f"Made table {'/'.join([self.parent, self.name])}")
if self.attrs is not None:
tab._v_attrs.update(self.attrs)
h5f.flush()
def _remake_table(self, h5f:tables.file.File):
"""Remake an existing table, preserving original data. Mostly for adding new columns"""
# existing table
old_tab = h5f.get_node(self.path)
# new table
tmp_name = f"{self.name}_tmp"
try:
node = h5f.get_node('/'.join([self.parent, tmp_name]))
node.remove()
except tables.NoSuchNodeError:
pass
new_tab = h5f.create_table(self.parent, tmp_name, self.description,
title=self.title, filters=self.filters,
createparents=True,expectedrows=self.expectedrows)
# check which columns to read and whether we should keep the old table
old_cols = old_tab.colnames
new_cols = new_tab.colnames
remove_old = False
would_lose = list(set(old_cols)-set(new_cols))
to_keep = list(set(old_cols).intersection(new_cols))
backup_name = f'{self.name}_bak--0'
if len(would_lose) > 0:
while backup_name in old_tab._v_parent._v_children.keys():
name_pieces = backup_name.split('--')
backup_name = '--'.join([*name_pieces[:-1],str(int(name_pieces[-1])+1)])
self._logger.warning(f"Updating table would delete columns {would_lose}, keeping as {backup_name}")
else:
remove_old = True
# create new rows
for i in range(old_tab.nrows):
new_tab.row.append()
new_tab.flush()
# copy columns
for add_column in to_keep:
getattr(new_tab.cols, add_column)[:] = getattr(old_tab.cols, add_column)[:]
new_tab.flush()
# move or delete old table
if remove_old:
self._logger.debug(f'Removing table {old_tab}')
old_tab.remove()
else:
old_tab.move(self.parent, backup_name)
# move new table
new_tab.move(self.parent, self.name)
class Config:
fields = {'description': {'exclude': True}}
Tables_Mapset = Interface_Mapset(
bool = tables.BoolCol,
int = tables.Int64Col,
float = tables.Float64Col,
str = Interface_Map(
equals=tables.StringCol,
args=[1024]
),
bytes = Interface_Map(
equals=tables.StringCol,
args=[1024]
),
datetime = Interface_Map(
equals=tables.StringCol,
args=[1024],
conversion = _datetime_conversion
),
group = H5F_Group
)
[docs]class Tables_Interface(Interface):
map = Tables_Mapset
[docs] def make(self, h5f:tables.file.File) -> bool:
pass
[docs]def model_to_description(table: typing.Type['Table']) -> typing.Type[tables.IsDescription]:
"""
Make a table description from the type annotations in a model
Args:
table (:class:`.modeling.base.Table`): Table description
Returns:
:class:`tables.IsDescription`
"""
# get column descriptions
cols = {}
for key, field in table.__fields__.items():
type_ = resolve_type(field.type_, resolve_literal=True)
type_str = type_.__name__
cols[key] = Tables_Mapset.get(type_str)
description = type(table.__name__, (tables.IsDescription,), cols) # type: typing.Type[tables.IsDescription]
return description
"""
Mapping between dtype.kind and builtin types
see https://numpy.org/doc/stable/reference/generated/numpy.dtype.kind.html#numpy.dtype.kind
"""
[docs]def description_to_model(description: typing.Type[tables.IsDescription], cls:typing.Type['Table']) -> 'Table':
"""
Make a pydantic :class:`.modeling.base.Table` from a :class:`tables.IsDescription`
Args:
description (:class:`tables.IsDescription`): to convert
cls (:class:`.modeling.base.Table`): Subclass of Table to make
Returns:
Subclass of Table
"""
description_dict = {}
if hasattr(description, 'columns'):
iterator = description.columns
elif hasattr(description, '_v_colobjects'):
# compatibility iwth tables.description.Description types
iterator = description._v_colobjects
else:
raise TypeError(f"Dont know how to convert table from {description}")
for key, col in iterator.items():
python_type = _NUMPY_TO_BUILTIN[col.dtype.kind]
# tables always accept Lists or singletons
python_type = Union[List[python_type], python_type]
description_dict[key] = (python_type, ...)
model = create_model(cls.__name__, __base__=cls, **description_dict)
return model