Source code for autopilot.transform.transforms
"""
Data transformations.
Experimental module.
Reusable transformations from one representation of data to another.
eg. converting frames of a video to locations of objects,
or locations of objects to area labels
.. todo::
This is a preliminary module and it purely synchronous at the moment. It will be expanded to ...
* support multiple asynchronous processing rhythms
* support automatic value coercion
The following design features need to be added
* recursion checks -- make sure a child hasn't already been added to a processing chain.
"""
import types
import typing
from enum import Enum, auto
from autopilot.core.loggers import init_logger
[docs]class TransformRhythm(Enum):
"""
Attributes:
FIFO: First-in-first-out, process inputs as they are received, potentially slowing down the transformation pipeline
FILO: First-in-last-out, process the most recent input, ignoring previous (lossy transformation)
"""
FIFO = auto()
FILO = auto()
[docs]class Transform(object):
"""
Metaclass for data transformations
Each subclass should define the following
* :meth:`.process` - a method that takes the input of the transoformation as its single argument and returns the transformed output
* :attr:`.format_in` - a `dict` that specifies the input format
* :attr:`.format_out` - a `dict` that specifies the output format
Arguments:
rhythm (:class:`TransformRhythm`): A rhythm by which the transformation object processes its inputs
Attributes:
child (class:`Transform`): Another Transform object chained after this one
"""
def __init__(self, rhythm : TransformRhythm = TransformRhythm.FILO, *args, **kwargs):
self._child = None
self._check = None
self._rhythm = None
self._process = None
self._format_in = None
self._parent = None
self._coerce = None
self.rhythm = rhythm
self.logger = init_logger(self)
# self._wrap_process()
@property
def rhythm(self) -> TransformRhythm:
return self._rhythm
@rhythm.setter
def rhythm(self, rhythm: TransformRhythm):
if rhythm not in TransformRhythm:
raise ValueError(f'rhythm must be one of TransformRhythm, got {rhythm}')
self._rhythm = rhythm
@property
def format_in(self) -> dict:
raise NotImplementedError('Every subclass of Transform must define format_in!')
@format_in.setter
def format_in(self, format_in: dict):
raise NotImplementedError('Every subclass of Transform must define format_in!')
@property
def format_out(self) -> dict:
raise NotImplementedError('Every subclass of Transform must define format_out!')
@format_out.setter
def format_out(self, format_out: dict):
raise NotImplementedError('Every subclass of Transform must define format_out!')
@property
def parent(self) -> typing.Union['Transform', None]:
"""
If this Transform is in a chain of transforms, the transform that precedes it
Returns:
:class:`.Transform`, ``None`` if no parent.
"""
return self._parent
@parent.setter
def parent(self, parent):
if not issubclass(type(parent), Transform):
raise TypeError('parents must be subclasses of Transform')
self._parent = parent
[docs] def process(self, input):
raise NotImplementedError('Every subclass of Transform must define its own process method!')
[docs] def reset(self):
"""
If a transformation is stateful, reset state.
"""
raise Warning('reset method not explicitly overridden in transformation, doing nothing!')
[docs] def check_compatible(self, child: 'Transform'):
"""
Check that this Transformation's :attr:`.format_out` is compatible with another's :attr:`.format_in`
.. todo::
Check for types that can be automatically coerced into one another and set :attr:`_coercion` to appropriate function
Args:
child (:class:`Transform`): Transformation to check compatibility
Returns:
bool
"""
ret = False
if isinstance(child.format_in['type'], (list, tuple)):
if self.format_out['type'] in child.format_in['type']:
ret = True
elif child.format_in['type'] == 'any':
ret = True
elif self.format_out['type'] == child.format_in['type']:
ret = True
# if child has a specific requirement of parent transform class, ensure
parent_req = child.format_in.get('parent', False)
if parent_req:
if not isinstance(self, parent_req):
ret = False
return ret
# if self.format_out['type'] in (int, np.int, )
[docs] def __add__(self, other):
"""
Add another Transformation in the chain to make a processing pipeline
Args:
other (:class:`Transformation`): The transformation to be chained
"""
if not issubclass(type(other), Transform):
raise RuntimeError('Can only add subclasses of Transform to other Transforms!')
if self._child is None:
# if we haven't been chained at all yet, claim the child
# first check if it aligns
#if not self.check_compatible(other):
# raise ValueError(f'Incompatible transformation formats: \nOutput: {self.format_out},\nInput: {other.format_in}')
self._child = other
self._child.parent = self
# override our process method with one that calls recursively
# back it up first
self._process = self.process
def new_process(self, input):
return self._child.process(self._process(input))
self.process = types.MethodType(new_process, self)
else:
# we already have a child,
# add it to our child instead (potentially recursively)
self._child = self._child + other
return self