Source code for autopilot.networking.message

import base64
import datetime
import json

import blosc

[docs]class Message(object): """ A formatted message that takes ``value``, sends it to ``id``, who should call the listen method indicated by the ``key``. Additional message behavior can be indicated by passing ``flags`` Numpy arrays given in the value field are automatically serialized and deserialized when sending and receiving using bas64 encoding and blosc compression. `id`, `to`, `sender`, and `key` are required attributes, but any other key-value pair passed on init is added to the message's attributes and included in the message. Can be indexed and set like a dictionary (message['key'], etc.) Attributes: id (str): ID that uniquely identifies a message. format {}_{number} to (str): ID of socket this message is addressed to sender (str): ID of socket where this message originates key (str): Type of message, used to select a listen method to process it value: Body of message, can be any type but must be JSON serializable. timestamp (str): Timestamp of message creation ttl (int): Time-To-Live, each message is sent this many times at max, each send decrements ttl. flags (dict): Flags determine additional message behavior. If a flag has no value associated with it, add it as a key with ``None`` as the value (eg. self.flags['MINPRINT'] = None), the value doesn't matter. * ``MINPRINT`` - don't print the value in logs (eg. when a large array is being sent) * ``NOREPEAT`` - sender will not seek, and recipients will not attempt to send message receipt confirmations * ``NOLOG`` - don't log this message! for streaming, or other instances where the constant printing of the logger is performance prohibitive """ def __init__(self, msg=None, expand_arrays = False, **kwargs): # Messages don't need to have all attributes on creation, # but do need them to serialize """ Args: *args: **kwargs: """ = None # number of message, format {}_{number} = None self.sender = None self.key = None # value is the only attribute that can be left None, # ie. with signal-type messages like "STOP" self.value = None self.timestamp = None self.flags = {} self.changed = False self.serialized = None # optional attrs should be instance attributes so they are caught by _-dict__ self.flags = {} self.timestamp = None self.ttl = kwargs.get('ttl', 2) #set_trace(term_size=(120,40)) #if len(args)>1: # Exception("Messages can only be constructed with a single positional argument, which is assumed to be a serialized message") #elif len(args)>0: if msg: self.serialized = msg if expand_arrays: deserialized = json.loads(msg, object_pairs_hook=self._deserialize_numpy) else: deserialized = json.loads(msg) kwargs.update(deserialized) for k, v in kwargs.items(): setattr(self, k, v) #self[k] = v # if we're not a previous message being recreated, get a timestamp for our creation if 'timestamp' not in kwargs.keys(): self.get_timestamp() # self.DETECTED_MINPRINT = False def __str__(self): # type: () -> str # if len(str(self.value))>100: # self.DETECTED_MINPRINT = True # TODO: Make verbose/debugging mode, print value in that case. if self.key == 'FILE' or ('MINPRINT' in self.flags.keys()): me_string = "ID: {}; TO: {}; SENDER: {}; KEY: {}, FLAGS: {}".format(,, self.sender, self.key, self.flags) else: me_string = "ID: {}; TO: {}; SENDER: {}; KEY: {}; FLAGS: {}; VALUE: {}".format(,, self.sender, self.key, self.flags, self.value) #me_string = "ID: {}; TO: {}; SENDER: {}; KEY: {}".format(,, self.sender, self.key) return me_string # enable dictionary-like behavior
[docs] def __getitem__(self, key): """ Args: key: """ #value = self._check_dec(self.__dict__[key]) # TODO: Recursively walk looking for 'NUMPY ARRAY' and expand before giving return self.__dict__[key]
[docs] def __setitem__(self, key, value): """ Args: key: value: """ # self.changed=True #value = self._check_enc(value) self.__dict__[key] = value
# def __setattr__(self, key, value): # self.changed=True # #value = self._check_enc(value) # super(Message, self).__setattr__(self, key, value) # self.__dict__[key] = value # def __getattr__(self, key): # #value = self._check_dec(self.__dict__[key]) # return self.__dict__[key] # # def _check_enc(self, value): # if isinstance(value, np.ndarray): # value = json_tricks.dumps(value) # elif isinstance(value, dict): # for k, v in value.items(): # value[k] = self._check_enc(v) # elif isinstance(value, list): # value = [self._check_enc(v) for v in value] # return value # # def _check_dec(self, value): # # # if numpy array, reconstitute # if isinstance(value, basestring): # if value.startswith('{"__ndarray__'): # value = json_tricks.loads(value) # elif isinstance(value, dict): # for k, v in value.items(): # value[k] = self._check_dec(v) # elif isinstance(value, list): # value = [self._check_dec(v) for v in value] # return value
[docs] def _serialize_numpy(self, array): """ Serialize a numpy array for sending over the wire Args: array: Returns: """ compressed = base64.b64encode(blosc.pack_array(array)).decode('ascii') return {'NUMPY_ARRAY': compressed}
def _deserialize_numpy(self, obj_pairs): # print(len(obj_pairs), obj_pairs) if (len(obj_pairs) == 1) and obj_pairs[0][0] == "NUMPY_ARRAY": return blosc.unpack_array(base64.b64decode(obj_pairs[0][1])) else: return dict(obj_pairs)
[docs] def expand(self): """ Don't decompress numpy arrays by default for faster IO, explicitly expand them when needed :return: """ pass
[docs] def __delitem__(self, key): """ Args: key: """ self.changed=True del self.__dict__[key]
[docs] def __contains__(self, key): """ Args: key: """ return key in self.__dict__
def __len__(self): return len(self.__dict__)
[docs] def get_timestamp(self): """ Get a Python timestamp Returns: str: Isoformatted timestamp from ``datetime`` """ self.timestamp =
[docs] def validate(self): """ Checks if `id`, `to`, `sender`, and `key` are all defined. Returns: bool (True): Does message have all required attributes set? """ valid = True for prop in (,, self.sender, self.key): if prop is None: valid = False return valid
[docs] def serialize(self): """ Serializes all attributes in `__dict__` using json. Returns: str: JSON serialized message. """ if not self.changed and self.serialized: return self.serialized valid = self.validate() if not valid: Exception("""Message invalid at the time of serialization!\n {}""".format(str(self))) return False # msg = { # 'id':, # 'to':, # 'sender': self.sender, # 'key': self.key, # 'value': self.value # } msg = self.__dict__ # exclude 'serialized' so it's not in there twice try: del msg['serialized'] except KeyError: pass try: msg_enc = json.dumps(msg, default=self._serialize_numpy).encode('utf-8') self.serialized = msg_enc self.changed=False return msg_enc except: return False