Source code for autopilot.transform.geometry

import typing
from time import time

import numpy as np
from scipy.spatial import distance
from scipy.spatial.transform import Rotation as R
from scipy.optimize import curve_fit

from autopilot.transform.transforms import Transform
from autopilot.transform.timeseries import Kalman


[docs]class Distance(Transform): """ Given an n_samples x n_dimensions array, compute pairwise or mean distances """ format_in = {'type': np.ndarray} format_out = {'type': np.ndarray} def __init__(self, pairwise: bool=False, n_dim: int = 2, metric: str='euclidean', squareform: bool=True, *args, **kwargs): """ Args: pairwise (bool): If False (default), return mean distance. if True, return all distances n_dim (int): number of dimensions (input array will be filtered like ``input[:,0:n_dim]`` metric (str): any metric acceptable to :func:`scipy.spatial.distance.pdist squareform (bool): if pairwise is True, if True return square distance matrix, otherwise return compressed distance matrix (dist(X[i], X[j] = y[i*j]) *args: **kwargs: """ super(Distance, self).__init__(*args, **kwargs) self.pairwise = pairwise self.n_dim = n_dim self.metric = metric self.squareform = squareform
[docs] def process(self, input: np.ndarray): # filter to input_dimension input = input[:,0:self.n_dim] output = distance.pdist(input, metric=self.metric) if self.pairwise: if self.squareform: output = distance.squareform(output) else: output = np.mean(output) return output
[docs]class Angle(Transform): """ Get angle between line formed by two points and horizontal axis """ format_in = {'type': np.ndarray} format_out = {'type': float} def __init__(self, abs=True, degrees=True, *args, **kwargs): super(Angle, self).__init__(*args, **kwargs) self.abs = abs self.degrees = degrees
[docs] def process(self, input): angle = np.arctan2(input[1][1]-input[0][1], input[1][0]-input[0][0]) if self.abs: angle += np.pi if self.degrees: angle = angle*(180/np.pi) return angle
[docs]class IMU_Orientation(Transform): """ Compute absolute orientation (roll, pitch) from accelerometer and gyroscope measurements (eg from :class:`.hardware.i2c.I2C_9DOF` ) Uses a :class:`.timeseries.Kalman` filter, and implements :cite:`patonisFusionMethodCombining2018a` to fuse the sensors Can be used with accelerometer data only, or with combined accelerometer/gyroscope data for greater accuracy Arguments: invert_gyro (bool): if the gyroscope's orientation is inverted from accelerometer measurement, multiply gyro readings by -1 before using use_kalman (bool): Whether to use kalman filtering (True, default), or return raw trigonometric transformation of accelerometer readings (if provided, gyroscope readings will be ignored) Attributes: kalman (:class:`.transform.timeseries.Kalman`): If ``use_kalman == True`` , the Kalman Filter. References: :cite:`patonisFusionMethodCombining2018a` :cite:`abyarjooImplementingSensorFusion2015` """ def __init__(self, use_kalman:bool = True, invert_gyro:bool=False, *args, **kwargs): super(IMU_Orientation, self).__init__(*args, **kwargs) self.invert_gyro = invert_gyro # type: bool self._last_update = None # type: typing.Optional[float] self._dt = 0 # type: float # preallocate orientation array for filtered values self.orientation = np.zeros((2), dtype=float) # type: np.ndarray # and for unfiltered values so they aren't ambiguous self._orientation = np.zeros((2), dtype=float) # type: np.ndarray self.kalman = None # type: typing.Optional[Kalman] if use_kalman: self.kalman = Kalman(dim_state=2, dim_measurement=2, dim_control=2) # type: typing.Optional[Kalman]
[docs] def process(self, accelgyro:typing.Union[typing.Tuple[np.ndarray, np.ndarray], np.ndarray]) -> np.ndarray: """ Args: accelgyro (tuple, :class:`numpy.ndarray`): tuple of (accelerometer[x,y,z], gyro[x,y,z]) readings as arrays, or an array of just accelerometer[x,y,z] Returns: :class:`numpy.ndarray`: filtered [roll, pitch] calculations in degrees """ # check what we were given... if isinstance(accelgyro, (tuple, list)) and len(accelgyro) == 2: # combined accelerometer and gyroscope readings accel, gyro = accelgyro elif isinstance(accelgyro, np.ndarray) and np.squeeze(accelgyro).shape[0] == 3: # just accelerometer readings accel = accelgyro gyro = None else: # idk lol self.logger.exception(f'Need input to be a tuple of accelerometer and gyroscope readings, or an array of accelerometer readings. got {accelgyro}') return # convert accelerometer readings to roll and pitch pitch = 180*np.arctan2(accel[0], np.sqrt(accel[1]**2 + accel[2]**2))/np.pi roll = 180*np.arctan2(accel[1], np.sqrt(accel[0]**2 + accel[2]**2))/np.pi if self.kalman is None: # store orientations in external attribute if not using kalman filter self.orientation[:] = (roll, pitch) return self.orientation.copy() else: # if using kalman filter, use private array to store raw orientation self._orientation[:] = (roll, pitch) # TODO: Don't assume that we're fed samples instantatneously -- ie. once data representations are stable, need to accept a timestamp here rather than making one if self._last_update is None or gyro is None: # first time through don't have dt to scale gyro by self.orientation[:] = np.squeeze(self.kalman.process(self._orientation)) self._last_update = time() else: if self.invert_gyro: gyro *= -1 # get dt for time since last update update_time = time() self._dt = update_time-self._last_update self._last_update = update_time if self._dt>1: # if it's been really long, the gyro read is pretty much useless and will give ridiculous reads self.orientation[:] = np.squeeze(self.kalman.process(self._orientation)) else: # run predict and update stages separately to incorporate gyro self.kalman.predict(u=gyro[0:2]*self._dt) self.orientation[:] = np.squeeze(self.kalman.update(self._orientation)) return self.orientation.copy()
[docs]class Rotate(Transform): """ Rotate in 3 dimensions using :class:`scipy.spatial.transform.Rotation` Args: dims ( "xyz" ): string specifying which axes the rotation will be around, eg ``"xy"`` , ``"xyz"``` rotation_type (str): Format of rotation input, must be one available to the :class:`~scipy.spatial.transform.Rotation` class (but currently only euler angles are supported) degrees (bool): whether to output rotation in degrees (True, default) or radians inverse ("xyz"): dimensions in the "rotation" input to :meth:`.Rotate.process` to inverse before applying rotation rotation (tuple, list, :class:`numpy.ndarray`, None): If supplied, use the same rotation for all processed data. If None, :meth:`.Rotate.process` will expect a tuple of (data, rotation). """ _DIMS = { 'x': 0, 'y': 1, 'z': 2 } def __init__(self, dims="xyz", rotation_type="euler", degrees=True, inverse="", rotation=None, *args, **kwargs): super(Rotate, self).__init__(*args, **kwargs) self.degrees = degrees self.rotation_type = rotation_type # parse dimensions and inverse into slices if not dims: e = ValueError('need to provide some dimensino to rotate around, got empty dims') self.logger.exception(e) raise e # store dims and something we can slice with for dims and inverse self.dims = dims self._dims = [self._DIMS[dim] for dim in dims] if not inverse: self.inverse = False self._inverse = None else: self.inverse = inverse self._inverse = [self._DIMS[dim] for dim in inverse] # stash rotation creation method depending on rotation_type if rotation_type == "euler": self._rotate_constructor = R.from_euler else: e = NotImplementedError('Only euler is implemented currently!') self.logger.exception(e) raise e # if we were provided an initial rotation, instantiate rotation here if rotation: # inverse what must be inverted if self.inverse: rotation[self._inverse] *= -1 self._rotation = rotation self._rotator = self._rotate_constructor(self.dims, self._rotation, degrees=self.degrees) else: self._rotation = None self._rotator = None
[docs] def process(self, input): """ Args: input (tuple, :class:`numpy.ndarray`): a tuple of (input[x,y,z], rotation[x,y,z]) where input is to be rotated according to the axes in rotation (indicated in :attr:`.Rotate.dims` ). If only an input array is provided, a static rotation array must have been provided in the constructor (otherwise the most recent rotation will be used) Returns: :class:`numpy.ndarray` - rotated input array """ if isinstance(input, (tuple, list)) and len(input) == 2: # split out input coords and rotation input, rotate = input # invert what must be inverted if self.inverse: rotate[self._inverse] *= -1 else: rotate = None # if given a new rotation, use it if rotate is not None and (self._rotation is None or not np.array_equal(rotate, self._rotation)): self._rotator = self._rotate_constructor(self.dims, rotate, degrees=self.degrees) self._rotation = rotate # apply itttt and return try: return self._rotator.apply(input) except AttributeError: if self._rotator is None: e = RuntimeError('No rotation was provided, and none is available!') self.logger.exception(e) raise e
[docs]class Spheroid(Transform): """ Fit and transform 3d coordinates according to some spheroid. Eg. for calibrating accelerometer readings by transforming them from their uncalibrated spheroid to the expected sphere with radius == 9.8m/s/s centered at (0,0,0). Does not estimate/correct for rotation of the spheroid. Examples: ###### # Calibrate an accelerometer by transforming readings to a 9.8-radius sphere centered at 0 >>> sphere = Spheroid(target=(9.8,9.8,9.8,0,0,0)) # take some readings... # imagine we're taking them from some sensor idk # say our sensor slightly exaggerates gravity in the z-axis... >>> readings = np.array((0.,0.,10.5)) # fit our object (need >>1 sample) >>> sphere.fit(readings) # transform to proper gravity >>> sphere.process(readings) [0., 0., 9.8] Args: target (tuple): parameterization of spheroid to transform to, if none is passed, transform to unit circle centered at (0,0,0). parameterized as:: (a, # radius of x dimension b, # radius of y dimension c, # radius of z dimension x, # x-offset y, # y-offset z) # z-offset source (tuple): parameterization of spheroid to transform from in the same 6-tuple form as ``target``, if None is passed, assume we will use :meth:`.Spheroid.fit` fit (None, :class:`numpy.ndarray`): Initialize with values to fit, if None assume fit will be called later. References: * https://jekel.me/2020/Least-Squares-Ellipsoid-Fit/ * http://www.juddzone.com/ALGORITHMS/least_squares_3D_ellipsoid.html """ def __init__(self, target=(1,1,1,0,0,0), source:tuple=(None, None, None, None, None, None), fit:typing.Optional[np.ndarray]=None, *args, **kwargs): super(Spheroid, self).__init__(*args, **kwargs) self.target = target self.source = source self._scale = None self._offset_source = None self._offset_target = None self._update_arrays() if fit is not None: self.fit(fit, **kwargs) def _update_arrays(self): if not any([val is None for val in self.source]): self._scale = np.array((self.target[0]/self.source[0], self.target[1]/self.source[1], self.target[2]/self.source[2])) self._offset_source = np.array((self.source[3], self.source[4], self.source[5])) self._offset_target = np.array((self.target[3], self.target[4], self.target[5]))
[docs] def fit(self, points, **kwargs): """ Fit a spheroid from a set of noisy measurements updates the :attr:`._scale` and :attr:`._offset` private arrays used to manipulate input data .. note:: It's usually important to pass ``bounds`` to :func:`scipy.optimize.curve_fit` !!! passed as a 2-tuple of ``((min_a, min_b, ...), (max_a, max_b...))`` In particular such that a, b, and c are positive. If no bounds are passed, assume at least that much. Args: points (:class:`numpy.ndarray`): (M, 3) array of points to fit **kwargs (): passed on to :func:`scipy.optimize.curve_fit` Returns: tuple: parameters of fit ellipsoid (a,b,c,x,y,z) """ if 'bounds' in kwargs.keys(): bounds = kwargs.pop('bounds') else: bounds = ((0, 0, 0, -np.inf, -np.inf, -np.inf), (np.inf, np.inf, np.inf, np.inf, np.inf, np.inf)) y = np.ones((points.shape[0])) parameters, _ = curve_fit(_ellipsoid_func, points, y, bounds=bounds, **kwargs) self.source = parameters self._update_arrays()
[docs] def process(self, input:np.ndarray): """ Transform input (x,y,z) points such that points in :attr:`.source` are mapped to those in :attr:`.target` Args: input (:class:`numpy.ndarray`): x, y, and z coordinates Returns: :class:`numpy.ndarray` : coordinates transformed according to the spheroid requested """ if self._scale is None or self._offset_target is None or self._offset_source is None: self.logger.exception('process called without fit being performed or source ellipsoid provided! returning untransformed points!') return input # move to the center, then scale, then offset. return ((input - self._offset_source) * self._scale) + self._offset_target
[docs] def generate(self, n:int, which:str='source', noise:float=0): """ Generate random points from the ellipsoid Args: n (int): number of points to generate which ('str'): which spheroid to generate from? ('source' - default, or 'target') noise (float): noise to add to points Returns: :class:`numpy.ndarray` : (n, 3) array of generated points """ if which == "source": if not any([val is None for val in self.source]): a,b,c,x,y,z = self.source else: self.logger.exception('Cannot generate from source, dont have ellipsoid parameterization') return elif which == "target": a, b, c, x, y, z = self.target else: self.logger.exception(f"Dont know how to generate points for which == {which}") return u = np.random.rand(n) v = np.random.rand(n) theta = u * 2.0 * np.pi phi = np.arccos(2.0 * v - 1.0) sinTheta, cosTheta = np.sin(theta), np.cos(theta) sinPhi, cosPhi = np.sin(phi), np.cos(phi) rx = (a * sinPhi * cosTheta) + x + (np.random.rand(n) * noise) ry = (b * sinPhi * sinTheta) + y + (np.random.rand(n) * noise) rz = (c * cosPhi) + z + (np.random.rand(n) * noise) return np.column_stack((rx, ry, rz))
[docs]def _ellipsoid_func(fit, a, b, c, x, y, z): """ Ellipsoid equation for use with :meth:`.Ellipsoid.fit` Args: fit (:class:`numpy.ndarray`): (M, 3) array of x,y,z points to fit a (float): X-scale parameter to fit b (float): Y-scale parameter to fit c (float): Z-scale parameter to fit x (float): X-offset parameter to fit y (float): Y-offset parameter to fit z (float): Z-offset parameter to fit Returns: float: result of ellipsoid function, minimize parameters to == 1 """ x_fit, y_fit, z_fit = fit[:,0], fit[:,1], fit[:,2] return ((x_fit - x)**2 / a**2) + ((y_fit - y)**2 / b**2) + ((z_fit - z)**2 / c**2)