networking

Classes for network communication.

There are two general types of network objects -

  • Station and its children are independent processes that should only be instantiated once

    per piece of hardware. They are used to distribute messages between Net_Node s, forward messages up the networking tree, and responding to messages that don’t need any input from the Pilot or Terminal.

  • Net_Node is a pop-in networking class that can be given to any other object that

    wants to send or receive messages.

Classes:

IOLoop(*args, **kwargs)

An I/O event loop.

Message([msg, expand_arrays])

A formatted message.

Net_Node(id, upstream, port, listens[, …])

Drop in networking object to be given to any sub-object behind some external-facing Station object.

Pilot_Station()

Station object used by Pilot objects.

Station()

Independent networking class used for messaging between computers.

Terminal_Station(pilots)

Station object used by Terminal objects.

ZMQStream(socket[, io_loop])

A utility class to register callbacks when a zmq socket sends and receives

count

count(start=0, step=1) –> count object

Functions:

copy(x)

Shallow copy operation on arbitrary Python objects.

pprint(object[, stream, indent, width, …])

Pretty-print a Python object to a stream [default is sys.stdout].

serialize_array(array)

Pack an array with blosc.pack_array() and serialize with base64.b64encode()

class Station[source]

Bases: multiprocessing.context.Process

Independent networking class used for messaging between computers.

These objects send and handle networking.Message s by using a dictionary of listens, or methods that are called to respond to different types of messages.

Each sent message is given an ID, and a thread is spawned to periodically resend it (up until some time-to-live, typically 5 times) until confirmation is received.

By default, the only listen these objects have is l_confirm(), which responds to message confirmations. Accordingly, listens should be added by using dict.update() rather than reassigning the attribute.

Station objects can be made with or without a pusher, a zmq.DEALER socket that connects to the zmq.ROUTER socket of an upstream Station object.

This class should not be instantiated on its own, but should instead be subclassed in order to provide methods used by handle_listen().

Variables

Attributes:

child

ctx

id

ip

listen_port

listener

listens

log_formatter

log_handler

logger

loop

push_id

push_ip

push_outbox

push_port

pusher

repeat_interval

routes

send_outbox

senders

timers

Methods:

get_ip()

Find our IP address

handle_listen(msg)

Upon receiving a message, call the appropriate listen method in a new thread.

init_logging()

Initialize logging to a timestamped file in prefs.LOGDIR .

l_confirm(msg)

Confirm that a message was received.

l_stream(msg)

Reconstitute the original stream of messages and call their handling methods

prepare_message(to, key, value[, repeat, flags])

If a message originates with us, a Message class is instantiated, given an ID and the rest of its attributes.

push([to, key, value, msg, repeat, flags])

Send a message via our pusher , DEALER socket.

release()

repeat()

Periodically (according to repeat_interval) resend messages that haven’t been confirmed

run()

A zmq.Context and tornado.IOLoop are spawned, the listener and optionally the pusher are instantiated and connected to handle_listen() using on_recv() .

send([to, key, value, msg, repeat, flags])

Send a message via our listener , ROUTER socket.

ctx = None
loop = None
push_ip = None
push_port = None
push_id = ''
listen_port = None
pusher = None
listener = None
logger = None
log_handler = None
log_formatter = None
id = None
senders = {}
push_outbox = {}
send_outbox = {}
timers = {}
child = False
routes = {}
repeat_interval = 5.0
ip = None
listens = {}
run()[source]

A zmq.Context and tornado.IOLoop are spawned, the listener and optionally the pusher are instantiated and connected to handle_listen() using on_recv() .

The process is kept open by the tornado.IOLoop .

prepare_message(to, key, value, repeat=True, flags=None)[source]

If a message originates with us, a Message class is instantiated, given an ID and the rest of its attributes.

Parameters
  • flags

  • repeat

  • to (str) – The identity of the socket this message is to

  • key (str) – The type of message - used to select which method the receiver uses to process this message.

  • value – Any information this message should contain. Can be any type, but must be JSON serializable.

send(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]

Send a message via our listener , ROUTER socket.

Either an already created Message should be passed as msg, or at least to and key must be provided for a new message created by prepare_message() .

A threading.Timer is created to resend the message using repeat() unless repeat is False.

Parameters
  • flags

  • to (str) – The identity of the socket this message is to

  • key (str) – The type of message - used to select which method the receiver uses to process this message.

  • value – Any information this message should contain. Can be any type, but must be JSON serializable.

  • msg (.Message) – An already created message.

  • repeat (bool) – Should this message be resent if confirmation is not received?

push(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]

Send a message via our pusher , DEALER socket.

Unlike send() , to is not required. Every message is always sent to push_id . to can be included to send a message further up the network tree to a networking object we’re not directly connected to.

Either an already created Message should be passed as msg, or at least key must be provided for a new message created by prepare_message() .

A threading.Timer is created to resend the message using repeat() unless repeat is False.

Parameters
  • flags

  • to (str) – The identity of the socket this message is to. If not included, sent to push_id() .

  • key (str) – The type of message - used to select which method the receiver uses to process this message.

  • value – Any information this message should contain. Can be any type, but must be JSON serializable.

  • msg (.Message) – An already created message.

  • repeat (bool) – Should this message be resent if confirmation is not received?

repeat()[source]

Periodically (according to repeat_interval) resend messages that haven’t been confirmed

TTL is decremented, and messages are resent until their TTL is 0.

l_confirm(msg)[source]

Confirm that a message was received.

Parameters

msg (Message) – A confirmation message - note that this message has its own unique ID, so the value of this message contains the ID of the message that is being confirmed

l_stream(msg)[source]

Reconstitute the original stream of messages and call their handling methods

The msg should contain an inner_key that indicates the key, and thus the handling method.

Parameters

msg (dict) – Compressed stream sent by Net_Node._stream()

handle_listen(msg)[source]

Upon receiving a message, call the appropriate listen method in a new thread.

If the message is to us, send confirmation.

If the message is not to us, attempt to forward it.

Parameters

msg (str) – JSON Message.serialize() d message.

init_logging()[source]

Initialize logging to a timestamped file in prefs.LOGDIR .

get_ip()[source]

Find our IP address

returns (str): our IPv4 address.

release()[source]
class Terminal_Station(pilots)[source]

Bases: autopilot.core.networking.Station

Station object used by Terminal objects.

Spawned without a pusher.

Listens

Key

Method

Description

‘PING’

l_ping()

We are asked to confirm that we are alive

‘INIT’

l_init()

Ask all pilots to confirm that they are alive

‘CHANGE’

l_change()

Change a parameter on the Pi

‘STOPALL’

l_stopall()

Stop all pilots and plots

‘KILL’

l_kill()

Terminal wants us to die :(

‘DATA’

l_data()

Stash incoming data from a Pilot

‘STATE’

l_state()

A Pilot has changed state

‘HANDSHAKE’

l_handshake()

A Pi is telling us it’s alive and its IP

‘FILE’

l_file()

The pi needs some file from us

Parameters

pilots (dict) – The Terminal.pilots dictionary.

Methods:

l_change(msg)

Change a parameter on the Pi

l_continuous(msg)

Handle the storage of continuous data

l_data(msg)

Stash incoming data from a Pilot

l_file(msg)

A Pilot needs some file from us.

l_handshake(msg)

A Pi is telling us it’s alive and its IP.

l_init(msg)

Ask all pilots to confirm that they are alive

l_kill(msg)

Terminal wants us to die :(

l_ping(msg)

We are asked to confirm that we are alive

l_state(msg)

A Pilot has changed state.

l_stopall(msg)

Stop all pilots and plots

start_plot_timer()

Start a timer that controls how often streamed video frames are sent to gui.Video plots.

Attributes:

plot_timer

sent_plot

plot_timer = None
sent_plot = {}
start_plot_timer()[source]

Start a timer that controls how often streamed video frames are sent to gui.Video plots.

l_ping(msg)[source]

We are asked to confirm that we are alive

Respond with a blank ‘STATE’ message.

Parameters

msg (Message)

l_init(msg)[source]

Ask all pilots to confirm that they are alive

Sends a “PING” to everyone in the pilots dictionary.

Parameters

msg (Message)

l_change(msg)[source]

Change a parameter on the Pi

Warning

Not Implemented

Parameters

msg (Message)

l_stopall(msg)[source]

Stop all pilots and plots

Parameters

msg (Message)

l_kill(msg)[source]

Terminal wants us to die :(

Stop the Station.loop

Parameters

msg (Message)

l_data(msg)[source]

Stash incoming data from a Pilot

Just forward this along to the internal terminal object (‘_T’) and a copy to the relevant plot.

Parameters

msg (Message)

l_continuous(msg)[source]

Handle the storage of continuous data

Forwards all data on to the Terminal’s internal Net_Node, send to Plot according to update rate in prefs.DRAWFPS

Parameters

msg (dict) – A continuous data message

l_state(msg)[source]

A Pilot has changed state.

Stash in ‘state’ field of pilot dict and send along to _T

Parameters

msg (Message)

l_handshake(msg)[source]

A Pi is telling us it’s alive and its IP.

Send along to _T

Parameters

msg (Message)

l_file(msg)[source]

A Pilot needs some file from us.

Send it back after base64.b64encode() ing it.

Todo

Split large files into multiple messages…

Parameters

msg (Message) – The value field of the message should contain some relative path to a file contained within prefs.SOUNDDIR . eg. ‘/songs/sadone.wav’ would return ‘os.path.join(prefs.SOUNDDIR/songs.sadone.wav’

class Pilot_Station[source]

Bases: autopilot.core.networking.Station

Station object used by Pilot objects.

Spawned with a pusher connected back to the Terminal .

Listens

Key

Method

Description

‘STATE’ ‘COHERE’ ‘PING’ ‘START’ ‘STOP’ ‘PARAM’ ‘FILE’

l_state() l_cohere() l_ping() l_start() l_stop() l_change() l_file()

Pilot has changed state Make sure our data and the Terminal’s match. The Terminal wants to know if we’re listening We are being sent a task to start We are being told to stop the current task The Terminal is changing some task parameter We are receiving a file

Methods:

l_change(msg)

The terminal is changing a parameter

l_child(msg)

Telling our child to run a task.

l_cohere(msg)

Send our local version of the data table so the terminal can double check

l_continuous(msg)

Forwards continuous data sent by children back to terminal.

l_file(msg)

We are receiving a file.

l_forward(msg)

Just forward the message to the pi.

l_noop(msg)

l_ping(msg)

The Terminal wants to know our status

l_start(msg)

We are being sent a task to start

l_state(msg)

Pilot has changed state

l_stop(msg)

Tell the pi to stop the task

l_noop(msg)[source]
l_state(msg)[source]

Pilot has changed state

Stash it and alert the Terminal

Parameters

msg (Message)

l_cohere(msg)[source]

Send our local version of the data table so the terminal can double check

Warning

Not Implemented

Parameters

msg (Message)

l_ping(msg)[source]

The Terminal wants to know our status

Push back our current state.

Parameters

msg (Message)

l_start(msg)[source]

We are being sent a task to start

If we need any files, request them.

Then send along to the pilot.

Parameters

msg (Message) – value will contain a dictionary containing a task description.

l_stop(msg)[source]

Tell the pi to stop the task

Parameters

msg (Message)

l_change(msg)[source]

The terminal is changing a parameter

Warning

Not implemented

Parameters

msg (Message)

l_file(msg)[source]

We are receiving a file.

Decode from b64 and save. Set the file_block.

Parameters

msg (Message) – value will have ‘path’ and ‘file’, where the path determines where in prefs.SOUNDDIR the b64 encoded ‘file’ will be saved.

l_continuous(msg)[source]

Forwards continuous data sent by children back to terminal.

Continuous data sources from this pilot should be streamed directly to the terminal.

Parameters

msg (Message) – Continuous data message

l_child(msg)[source]

Telling our child to run a task.

Parameters

msg ()

Returns:

l_forward(msg)[source]

Just forward the message to the pi.

class Net_Node(id, upstream, port, listens, instance=True, upstream_ip='localhost', daemon=True, expand_on_receive=True)[source]

Bases: object

Drop in networking object to be given to any sub-object behind some external-facing Station object.

These objects are intended to communicate locally, within a piece of hardware, though not necessarily within the same process.

To minimize the complexity of the network topology, Net_Nodes must communicate through a Station ROUTER, rather than address each other directly.

Parameters
  • id (str) – What are we known as? What do we set our identity as?

  • upstream (str) – The identity of the ROUTER socket used by our upstream Station object.

  • port (int) – The port that our upstream ROUTER socket is bound to

  • listens (dict) – Dictionary of functions to call for different types of messages. keys match the Message.key.

  • instance (bool) – Should the node try and use the existing zmq context and tornado loop?

  • upstream_ip (str) – If this Net_Node is being used on its own (ie. not behind a Station), it can directly connect to another node at this IP. Otherwise use ‘localhost’ to connect to a station.

  • route_port (int) – Typically, Net_Nodes only have a single Dealer socket and receive messages from their encapsulating Station, but if you want to take this node offroad and use it independently, an int here binds a Router to the port.

Variables

Attributes:

context

id

listens

log_formatter

log_handler

logger

loop

loop_thread

outbox

port

repeat_interval

sock

timers

upstream

Methods:

get_stream(id, key[, min_size, upstream, …])

Make a queue that another object can dump data into that sends on its own socket.

handle_listen(msg)

Upon receiving a message, call the appropriate listen method in a new thread and send confirmation it was received.

init_logging()

Initialize logging to a timestamped file in prefs.LOGDIR .

init_networking()

Creates socket, connects to specified port on localhost, and starts the threaded_loop() as a daemon thread.

l_confirm(value)

Confirm that a message was received.

l_stream(msg)

Reconstitute the original stream of messages and call their handling methods

prepare_message(to, key, value, repeat[, flags])

Instantiate a Message class, give it an ID and the rest of its attributes.

release()

repeat()

Periodically (according to repeat_interval) resend messages that haven’t been confirmed

send([to, key, value, msg, repeat, flags, …])

Send a message via our sock , DEALER socket.

threaded_loop()

Run in a thread, either starts the IOLoop, or if it is already started (ie.

outbox = {}
timers = {}
logger = None
log_handler = None
log_formatter = None
sock = None
loop_thread = None
repeat_interval = 5
context = None
loop = None
listens = {}
id = None
upstream = None
port = None
init_networking()[source]

Creates socket, connects to specified port on localhost, and starts the threaded_loop() as a daemon thread.

threaded_loop()[source]

Run in a thread, either starts the IOLoop, or if it is already started (ie. running in another thread), breaks.

handle_listen(msg)[source]

Upon receiving a message, call the appropriate listen method in a new thread and send confirmation it was received.

Note

Unlike Station.handle_listen() , only the Message.value is given to listen methods. This was initially intended to simplify these methods, but this might change in the future to unify the messaging system.

Parameters

msg (str) – JSON Message.serialize() d message.

send(to=None, key=None, value=None, msg=None, repeat=True, flags=None, force_to=False)[source]

Send a message via our sock , DEALER socket.

to is not required. Every message is always sent to upstream . to can be included to send a message further up the network tree to a networking object we’re not directly connected to.

Either an already created Message should be passed as msg, or at least key must be provided for a new message created by prepare_message() .

A threading.Timer is created to resend the message using repeat() unless repeat is False.

Parameters
  • to (str, list) – The identity of the socket this message is to. If not included, sent to upstream() .

  • key (str) – The type of message - used to select which method the receiver uses to process this message.

  • value – Any information this message should contain. Can be any type, but must be JSON serializable.

  • msg (.Message) – An already created message.

  • repeat (bool) – Should this message be resent if confirmation is not received?

  • flags (dict)

  • force_to (bool) – If we really really want to use the ‘to’ field to address messages (eg. node being used for direct communication), overrides default behavior of sending to upstream.

repeat()[source]

Periodically (according to repeat_interval) resend messages that haven’t been confirmed

TTL is decremented, and messages are resent until their TTL is 0.

l_confirm(value)[source]

Confirm that a message was received.

Parameters

value (str) – The ID of the message we are confirming.

l_stream(msg)[source]

Reconstitute the original stream of messages and call their handling methods

The msg should contain an inner_key that indicates the key, and thus the handling method.

Parameters

msg (dict) – Compressed stream sent by Net_Node._stream()

prepare_message(to, key, value, repeat, flags=None)[source]

Instantiate a Message class, give it an ID and the rest of its attributes.

Parameters
  • flags

  • repeat

  • to (str) – The identity of the socket this message is to

  • key (str) – The type of message - used to select which method the receiver uses to process this message.

  • value – Any information this message should contain. Can be any type, but must be JSON serializable.

get_stream(id, key, min_size=5, upstream=None, port=None, ip=None, subject=None)[source]

Make a queue that another object can dump data into that sends on its own socket. Smarter handling of continuous data than just hitting ‘send’ a shitload of times. :returns: Place to dump ur data :rtype: Queue

init_logging()[source]

Initialize logging to a timestamped file in prefs.LOGDIR .

The logger name will be ‘node.{id}’ .

release()[source]
class Message(msg=None, expand_arrays=False, **kwargs)[source]

Bases: object

A formatted message.

id, to, sender, and key are required attributes, but any other key-value pair passed on init is added to the message’s attributes and included in the message.

Can be indexed and set like a dictionary (message[‘key’], etc.)

Variables
  • id (str) – ID that uniquely identifies a message. format {sender.id}_{number}

  • to (str) – ID of socket this message is addressed to

  • sender (str) – ID of socket where this message originates

  • key (str) – Type of message, used to select a listen method to process it

  • value – Body of message, can be any type but must be JSON serializable.

  • timestamp (str) – Timestamp of message creation

  • ttl (int) – Time-To-Live, each message is sent this many times at max, each send decrements ttl.

Parameters
  • *args

  • **kwargs

Methods:

__contains__(key)

Parameters

key

__delitem__(key)

Parameters

key

__getitem__(key)

Parameters

key

__setitem__(key, value)

Parameters
  • key

_serialize_numpy(array)

Serialize a numpy array for sending over the wire

expand()

Don’t decompress numpy arrays by default for faster IO, explicitly expand them when needed

get_timestamp()

Get a Python timestamp

serialize()

Serializes all attributes in __dict__ using json.

validate()

Checks if id, to, sender, and key are all defined.

Attributes:

changed

flags

id

key

sender

serialized

timestamp

to

ttl

value

id = None
to = None
sender = None
key = None
value = None
changed = False
flags = {}
timestamp = None
ttl = 2
serialized = None
__getitem__(key)[source]
Parameters

key

__setitem__(key, value)[source]
Parameters
  • key

  • value

_serialize_numpy(array)[source]

Serialize a numpy array for sending over the wire

Parameters

array

Returns:

expand()[source]

Don’t decompress numpy arrays by default for faster IO, explicitly expand them when needed

Returns

__delitem__(key)[source]
Parameters

key

__contains__(key)[source]
Parameters

key

get_timestamp()[source]

Get a Python timestamp

Returns

Isoformatted timestamp from datetime

Return type

str

validate()[source]

Checks if id, to, sender, and key are all defined.

Returns

Does message have all required attributes set?

Return type

bool (True)

serialize()[source]

Serializes all attributes in __dict__ using json.

Returns

JSON serialized message.

Return type

str

serialize_array(array)[source]

Pack an array with blosc.pack_array() and serialize with base64.b64encode()

Parameters

array (numpy.ndarray) – Array to serialize

Returns

{‘NUMPY_ARRAY’: base-64 encoded, blosc-compressed array.}

Return type

dict