Source code for autopilot.stim.sound.base

"""
Base classes for sound objects, depending on the selected audio backend. Use the ``'AUDIOSERVER'`` pref to select, or else
use the :func:`.default_sound_class` function.
"""
import typing
import sys
from warnings import warn
from queue import Empty
from abc import abstractmethod
import threading
from time import sleep
from itertools import cycle

import numpy as np

from autopilot import prefs
from autopilot import dehydrate
from autopilot.utils.loggers import init_logger
from autopilot.utils.requires import Requirements, Python_Package
from autopilot.stim.stim import Stim
from autopilot.utils.decorators import Introspect

Backends = {
    'pyo': Python_Package('pyo'),
    'jack': Requirements([
        Python_Package(
            'jack',
            package_name='JACK-Client') # TODO: Add jack system
        # System_Library(
        #   'jackd2',
        #   bin_name = 'jackd',
        #   install_script = autopilot.setup.SCRIPTS['jackd_source'],
        #   ...
        # )
    ]),
    'dummy': True,
    'docs': True
}



[docs]class Sound(Stim): """ Dummy metaclass for sound base-classes. Allows Sounds to be used without a backend to, eg. synthesize waveforms and the like. Placeholder pending a full refactoring of class structure """ PARAMS = [] type = None server_type = 'dummy' @Introspect() def __init__(self, fs: int = None, duration: float = None, **kwargs ): if fs is None: raise ValueError('Dummy classes must be passed an explicit fs (sampling rate) argument, as it cannot be inferred from the server (since there is none)') self.fs = fs self.duration = duration self.logger = init_logger(self) self.table = None # type: typing.Optional[np.ndarray] self.initialized = False
[docs] def get_nsamples(self): """ given our fs and duration, how many samples do we need? literally:: np.ceil((self.duration/1000.)*self.fs).astype(int) """ self.nsamples = np.ceil((self.duration / 1000.) * self.fs).astype(int)
## Import the required modules if Backends['jack'].met or 'pytest' in sys.modules: # This will warn if the jack library is not found from autopilot.stim.sound import jackclient if Backends['pyo'].met: import pyo
[docs]class Pyo_Sound(Stim): """ Metaclass for pyo sound objects. Note: Use of pyo is generally discouraged due to dropout issues and the general opacity of the module. As such this object is intentionally left undocumented. """ @Introspect() def __init__(self): self.PARAMS = None # list of strings of parameters to be defined self.type = None # string human readable name of sound self.duration = None # duration in ms self.amplitude = None self.table = None self.trigger = None self.server_type = 'pyo'
[docs] def play(self): self.table.out()
[docs] def table_wrap(self, audio, duration=None): """Records a PyoAudio generator into a sound table, returns a tableread object which can play the audio with .out() Args: audio: duration: """ if not duration: duration = self.duration # Duration is in ms, so divide by 1000 # See https://groups.google.com/forum/#!topic/pyo-discuss/N-pan7wPF-o # TODO: Get chnls to be responsive to NCHANNELS in prefs. hardcoded for now tab = pyo.NewTable(length=(float(duration) / 1000), chnls=prefs.get('NCHANNELS')) # Prefs should always be declared in the global namespace tabrec = pyo.TableRec(audio, table=tab, fadetime=0.005).play() sleep((float(duration) / 1000)) self.table = pyo.TableRead(tab, freq=tab.getRate(), loop=0)
[docs] def set_trigger(self, trig_fn): """ Args: trig_fn: """ # Using table triggers, call trig_fn when table finishes playing self.trigger = pyo.TrigFunc(self.table['trig'], trig_fn)
[docs]class Jack_Sound(Stim): """ Base class for sounds that use the :class:`~.jackclient.JackClient` audio server. Attributes: PARAMS (list): List of strings of parameters that need to be defined for this sound type (str): Human readable name of sound type duration (float): Duration of sound in ms amplitude (float): Amplitude of sound as proportion of 1 (eg 0.5 is half amplitude) table (:class:`numpy.ndarray`): A Numpy array of samples chunks (list): :attr:`~.Jack_Sound.table` split up into chunks of :data:`~.jackclient.BLOCKSIZE` trigger (callable): A function that is called when the sound completes nsamples (int): Number of samples in the sound padded (bool): Whether the sound had to be padded with zeros when split into chunks (ie. sound duration was not a multiple of BLOCKSIZE). fs (int): sampling rate of client from :data:`.jackclient.FS` blocksize (int): blocksize of client from :data:`.jackclient.BLOCKSIZE` server (:class:`~.jackclient.Jack_Client`): Current Jack Client q (:class:`multiprocessing.Queue`): Audio Buffer queue from :data:`.jackclient.QUEUE` q_lock (:class:`multiprocessing.Lock`): Audio Buffer lock from :data:`.jackclient.Q_LOCK` play_evt (:class:`multiprocessing.Event`): play event from :data:`.jackclient.PLAY` stop_evt (:class:`multiprocessing.Event`): stop event from :data:`.jackclient.STOP` buffered (bool): has this sound been dumped into the :attr:`~.Jack_Sound.q` ? buffered_continuous (bool): Has the sound been dumped into the :attr:`~.Jack_Sound.continuous_q`? """ PARAMS = [] """ list: list of strings of parameters to be defined """ type = None """ str: string human readable name of sound """ server_type = 'jack' """ str: type of server, always 'jack' for `Jack_Sound` s. """ @Introspect() def __init__(self, jack_client: typing.Optional['autopilot.stim.sound.jackclient.JackClient'] = None, **kwargs): """Initialize a new Jack_Sound This sets sound-specific parameters to None, set jack-specific parameters to their equivalents in jackclient, initializes some other flags and a logger. """ # These sound-specific parameters will be set by the derived # objects. self.duration = None # duration in ms self.amplitude = None self.table = None # numpy array of samples self.chunks = None # table split into a list of chunks self.trigger = None self.nsamples = None self.padded = False # whether or not the sound was padded with zeros when chunked self.continuous = False # Initialize a logger self.logger = init_logger(self) if jack_client is not None: self.logger.debug('Getting jack_client objects from passed jackclient') self.server = jack_client self.continuous_flag = self.server.continuous for attr in ('fs', 'blocksize', 'q', 'q_lock', 'play_evt', 'stop_evt', 'continuous_q', 'continuous_loop'): setattr(self, attr, getattr(jack_client, attr)) else: # These jack-specific parameters are copied from jackclient self.fs = jackclient.FS self.blocksize = jackclient.BLOCKSIZE self.server = jackclient.SERVER self.q = jackclient.QUEUE self.q_lock = jackclient.Q_LOCK self.play_evt = jackclient.PLAY self.stop_evt = jackclient.STOP self.continuous_flag = jackclient.CONTINUOUS self.continuous_q = jackclient.CONTINUOUS_QUEUE self.continuous_loop = jackclient.CONTINUOUS_LOOP # Initalize these flags self.initialized = False self.buffered = False self.buffered_continuous = False
[docs] @abstractmethod def init_sound(self): """ Abstract method to initialize sound. Should set the :attr:`.table` attribute .. todo:: ideally should standardize by returning an array, but pyo objects don't return arrays necessarily... """
[docs] def chunk(self, pad=True): """ Split our `table` up into a list of :attr:`.Jack_Sound.blocksize` chunks. Args: pad (bool): If the sound is not evenly divisible into chunks, pad with zeros (True, default), otherwise jackclient will pad with its continuous sound """ # Convert the table to float32 (if it isn't already) sound = self.table.astype(np.float32) # Determine how much longer it would have to be, if it were # padded to a length that is a multiple of self.blocksize oldlen = len(sound) newlen = int( np.ceil(float(oldlen) / self.blocksize) * self.blocksize) ## Only pad if necessary AND requested if pad and newlen > oldlen: # Pad differently depending on mono or stereo if sound.ndim == 1: # Pad with 1d array of zeros to_concat = np.zeros((newlen - oldlen,), np.float32) # Pad sound = np.concatenate([sound, to_concat]) elif sound.ndim == 2: # Each column is a channel n_channels = sound.shape[1] # Raise error if more than two-channel sound # This would actually be fine for this function, but this # almost surely indicates somebody has transposed something if n_channels > 2: raise ValueError("only 1- or 2-channel sound supported") # Pad with 2d array of zeros to_concat = np.zeros( (newlen - oldlen, sound.shape[1]), np.float32) # Pad sound = np.concatenate([sound, to_concat]) else: raise ValueError("sound must be 1d or 2d") # Flag as padded self.padded = True else: # Flag as not padded self.padded = False ## Reshape into chunks, each of length `self.blocksize` if sound.ndim == 1: sound_list = [sound[i:i + self.blocksize] for i in range(0, sound.shape[0], self.blocksize)] elif sound.ndim == 2: sound_list = [sound[i:i + self.blocksize, :] for i in range(0, sound.shape[0], self.blocksize)] else: raise NotImplementedError(f"sounds with more than 2 dimensions arent supported, got ndim {sound.ndim}") self.chunks = sound_list
[docs] def set_trigger(self, trig_fn): """ Set a trigger function to be called when the :attr:`~.Jack_Sound.stop_evt` is set. Args: trig_fn (callable): Some callable """ if callable(trig_fn): self.trigger = trig_fn else: Exception('trigger must be callable')
[docs] def wait_trigger(self): """ Wait for the stop_evt trigger to be set for at least a second after the sound should have ended. Call the trigger when the event is set. """ # wait for our duration plus a second at most. self.stop_evt.wait((self.duration + 1000) / 1000.) # if the sound actually stopped... if self.stop_evt.is_set(): self.trigger()
[docs] def get_nsamples(self): """ given our fs and duration, how many samples do we need? literally:: np.ceil((self.duration/1000.)*self.fs).astype(int) """ self.nsamples = np.ceil((self.duration / 1000.) * self.fs).astype(int)
[docs] def quantize_duration(self, ceiling=True): """ Extend or shorten a sound so that it is a multiple of :data:`.jackclient.BLOCKSIZE` Args: ceiling (bool): If true, extend duration, otherwise decrease duration. """ # get remainder of samples self.get_nsamples() remainder = self.nsamples % self.blocksize if remainder == 0: return # get target number of samples # get target n blocks and multiply by blocksize if ceiling: target_samples = np.ceil(float(self.nsamples) / self.blocksize) * self.blocksize else: target_samples = np.floor(float(self.nsamples) / self.blocksize) * self.blocksize # get new duration self.duration = (target_samples / self.fs) * 1000. # refresh nsamples self.get_nsamples()
[docs] def buffer(self): """ Dump chunks into the sound queue. After the last chunk, a `None` is put into the queue. This tells the jack server that the sound is over and that it should clear the play flag. """ if hasattr(self, 'path'): self.logger.debug('BUFFERING SOUND {}'.format(self.path)) if not self.initialized and not self.table: try: self.init_sound() self.initialized = True except: pass # TODO: Log this, better error handling here if not self.chunks: self.chunk() with self.q_lock: # empty queue # FIXME: Testing whether this is where we get held up on the 'fail after sound play' bug # n_gets = 0 # while not self.q.empty(): # try: # _ = self.q.get_nowait() # except Empty: # # normal, get until it's empty # break # n_gets += 1 # if n_gets > 100000: # break # for frame in self.chunks: self.q.put_nowait([*self.chunks, None]) # The jack server looks for a None object to clear the play flag # self.q.put_nowait(None) self.buffered = True
def _init_continuous(self): """ Create a duration quantized table for playing continuously """ self.table = None self.initialized = False if not self.initialized and not self.table: self.quantize_duration() self.init_sound() self.initialized = True if not self.chunks: self.chunk() # continous sounds should not have any padding - see docstring if self.padded: raise Exception( "Continuous sounds cannot have padded chunks - sounds need to have n_samples % blocksize == 0")
[docs] def buffer_continuous(self): """ Dump chunks into the continuous sound queue for looping. Continuous shoulds should always have full frames - ie. the number of samples in a sound should be a multiple of :data:`.jackclient.BLOCKSIZE`. This method will call :meth:`.quantize_duration` to force duration such that the sound has full frames. An exception will be raised if the sound has been padded. """ # FIXME: Initialized should be more flexible, # for now just deleting whatever init happened because # continuous sounds are in development # self._init_continuous() # Give a dehydrated version of ourself while not self.continuous_q.empty(): try: _ = self.continuous_q.get_nowait() except Empty: # normal, get until it's empty break # put all the chunks into the queue, rather than one at a time # to avoid partial receipt self.continuous_q.put(dehydrate(self)) self.buffered_continuous = True
[docs] def play(self): """ Play ourselves. If we're not buffered, be buffered. Otherwise, set the play event and clear the stop event. If we have a trigger, set a Thread to wait on it. """ if not self.buffered: self.buffer() if hasattr(self, 'path'): self.logger.debug('PLAYING SOUND {}'.format(self.path)) self.play_evt.set() self.stop_evt.clear() self.buffered = False if callable(self.trigger): threading.Thread(target=self.wait_trigger).start() self.logger.debug('played!')
[docs] def play_continuous(self, loop=True): """ Play the sound continuously. Sound will be paused if another sound has its 'play' method called. Currently - only looping is implemented: the full sound is loaded by the jack client and repeated indefinitely. In the future, sound generation methods will be refactored as python generators so sounds can be continuously generated and played. Args: loop (bool): whether the sound will be stored by the jack client and looped (True), or whether the sound will be continuously streamed (False, not implemented) Returns: todo:: merge into single play method that changes behavior if continuous or not """ if not loop: raise NotImplementedError('Continuous, unlooped streaming has not been implemented yet!') if not self.buffered_continuous: self.buffer_continuous() if loop: self.continuous_loop.set() else: self.continuous_loop.clear() # tell the sound server that it has a continuous sound now self.continuous_flag.set() self.continuous = True # after the sound server start playing, it will clear the queue, unbuffering us self.buffered_continuous = False
[docs] def iter_continuous(self) -> typing.Generator: """ Continuously yield frames of audio. If this method is not overridden, just wraps :attr:`.table` in a :class:`itertools.cycle` object and returns from it. Returns: np.ndarray: A single frame of audio """ self._init_continuous() iterator = cycle(self.chunks) yield from iterator
[docs] def stop_continuous(self): """ Stop playing a continuous sound Should be merged into a general stop method """ if not self.continuous: self.logger.warning("stop_continuous called but not a continuous sound!") return self.logger.debug('stop_continuous sound called') self.continuous_flag.clear() self.continuous_loop.clear()
[docs] def end(self): """ Release any resources held by this sound """ self.logger.debug('end_sound called') if self.play_evt.is_set(): self.play_evt.clear() if not self.stop_evt.is_set(): self.stop_evt.set() if self.continuous: while not self.continuous_q.empty(): try: _ = self.continuous_q.get_nowait() except Empty: # normal, get until it's empty break self.buffered_continuous = False self.continuous_flag.clear() self.table = None self.initialized = False
[docs]def get_sound_class(server_type: typing.Optional[str] = None) -> typing.Union[typing.Type[Sound],typing.Type[Jack_Sound],typing.Type[Pyo_Sound]]: """ Get the default sound class as defined by ``'AUDIOSERVER'`` This function is also a convenience class for testing whether a particular audio backend is available Returns: """ ## First, switch the behavior based on the pref AUDIOSERVER # Get the pref if server_type is None: server_type = prefs.get('AUDIOSERVER') # True is a synonym for 'jack', the default server if server_type == True: server_type = 'jack' elif server_type is None: server_type = 'dummy' # Force lower-case if string try: server_type = server_type.lower() except AttributeError: pass # From now on, server_type should be 'jack', 'pyo', 'docs', or None if server_type not in Backends.keys(): warn(f'Requested server type {server_type}, but it doesnt exist. Using dummy') server_type = 'dummy' # if we're testing, set server_type to jack if 'pytest' in sys.modules: server_type = 'jack' return Jack_Sound # check if requirements are met, if so, return the object. Otherwise return dummy. if server_type == 'jack' and Backends['jack'].met: return Jack_Sound elif server_type == 'pyo' and Backends['pyo'].met: return Pyo_Sound elif server_type == 'dummy': return Sound else: warn(f'requested server_type {server_type} but its requirements arent met. Using dummy') return Sound