networking

Classes for network communication.

There are two general types of network objects -

  • Station and its children are independent processes that should only be instantiated once
    per piece of hardware. They are used to distribute messages between Net_Node s, forward messages up the networking tree, and responding to messages that don’t need any input from the Pilot or Terminal.
  • Net_Node is a pop-in networking class that can be given to any other object that
    wants to send or receive messages.
class autopilot.core.networking.Station[source]

Bases: multiprocessing.process.Process

Independent networking class used for messaging between computers.

These objects send and handle networking.Message s by using a dictionary of listens, or methods that are called to respond to different types of messages.

Each sent message is given an ID, and a thread is spawned to periodically resend it (up until some time-to-live, typically 5 times) until confirmation is received.

By default, the only listen these objects have is l_confirm(), which responds to message confirmations. Accordingly, listens should be added by using dict.update() rather than reassigning the attribute.

Station objects can be made with or without a pusher, a zmq.DEALER socket that connects to the zmq.ROUTER socket of an upstream Station object.

This class should not be instantiated on its own, but should instead be subclassed in order to provide methods used by handle_listen().

Variables:
ctx = None
loop = None
push_ip = None
push_port = None
push_id = ''
listen_port = None
pusher = None
listener = None
logger = None
do_logging = <multiprocessing.synchronize.Event object>
log_handler = None
log_formatter = None
id = None
senders = {}
push_outbox = {}
send_outbox = {}
timers = {}
child = False
routes = {}
repeat_interval = 5.0
ip = None
listens = {}
run()[source]

A zmq.Context and tornado.IOLoop are spawned, the listener and optionally the pusher are instantiated and connected to handle_listen() using on_recv() .

The process is kept open by the tornado.IOLoop .

prepare_message(to, key, value, repeat=True, flags=None)[source]

If a message originates with us, a Message class is instantiated, given an ID and the rest of its attributes.

Parameters:
  • to (str) – The identity of the socket this message is to
  • key (str) – The type of message - used to select which method the receiver uses to process this message.
  • value – Any information this message should contain. Can be any type, but must be JSON serializable.
send(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]

Send a message via our listener , ROUTER socket.

Either an already created Message should be passed as msg, or at least to and key must be provided for a new message created by prepare_message() .

A threading.Timer is created to resend the message using repeat() unless repeat is False.

Parameters:
  • to (str) – The identity of the socket this message is to
  • key (str) – The type of message - used to select which method the receiver uses to process this message.
  • value – Any information this message should contain. Can be any type, but must be JSON serializable.
  • msg (.Message) – An already created message.
  • repeat (bool) – Should this message be resent if confirmation is not received?
push(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]

Send a message via our pusher , DEALER socket.

Unlike send() , to is not required. Every message is always sent to push_id . to can be included to send a message further up the network tree to a networking object we’re not directly connected to.

Either an already created Message should be passed as msg, or at least key must be provided for a new message created by prepare_message() .

A threading.Timer is created to resend the message using repeat() unless repeat is False.

Parameters:
  • to (str) – The identity of the socket this message is to. If not included, sent to push_id() .
  • key (str) – The type of message - used to select which method the receiver uses to process this message.
  • value – Any information this message should contain. Can be any type, but must be JSON serializable.
  • msg (.Message) – An already created message.
  • repeat (bool) – Should this message be resent if confirmation is not received?
repeat()[source]

Periodically (according to repeat_interval) resend messages that haven’t been confirmed

TTL is decremented, and messages are resent until their TTL is 0.

l_confirm(msg)[source]

Confirm that a message was received.

Parameters:msg (Message) – A confirmation message - note that this message has its own unique ID, so the value of this message contains the ID of the message that is being confirmed
handle_listen(msg)[source]

Upon receiving a message, call the appropriate listen method in a new thread.

If the message is to us, send confirmation.

If the message is not to us, attempt to forward it.

Parameters:msg (str) – JSON Message.serialize() d message.
init_logging()[source]

Initialize logging to a timestamped file in prefs.LOGDIR .

get_ip()[source]

Find our IP address

returns (str): our IPv4 address.

set_logging(do_logging)[source]
class autopilot.core.networking.Terminal_Station(pilots)[source]

Bases: autopilot.core.networking.Station

Station object used by Terminal objects.

Spawned without a pusher.

Listens

Key Method Description
‘PING’ l_ping() We are asked to confirm that we are alive
‘INIT’ l_init() Ask all pilots to confirm that they are alive
‘CHANGE’ l_change() Change a parameter on the Pi
‘STOPALL’ l_stopall() Stop all pilots and plots
‘KILL’ l_kill() Terminal wants us to die :(
‘DATA’ l_data() Stash incoming data from a Pilot
‘STATE’ l_state() A Pilot has changed state
‘HANDSHAKE’ l_handshake() A Pi is telling us it’s alive and its IP
‘FILE’ l_file() The pi needs some file from us
Parameters:pilots (dict) – The Terminal.pilots dictionary.
l_ping(msg)[source]

We are asked to confirm that we are alive

Respond with a blank ‘STATE’ message.

Parameters:msg (Message) –
l_init(msg)[source]

Ask all pilots to confirm that they are alive

Sends a “PING” to everyone in the pilots dictionary.

Parameters:msg (Message) –
l_change(msg)[source]

Change a parameter on the Pi

Warning

Not Implemented

Parameters:msg (Message) –
l_stopall(msg)[source]

Stop all pilots and plots

Parameters:msg (Message) –
l_kill(msg)[source]

Terminal wants us to die :(

Stop the Station.loop

Parameters:msg (Message) –
l_data(msg)[source]

Stash incoming data from a Pilot

Just forward this along to the internal terminal object (‘_T’) and a copy to the relevant plot.

Parameters:msg (Message) –
l_state(msg)[source]

A Pilot has changed state.

Stash in ‘state’ field of pilot dict and send along to _T

Parameters:msg (Message) –
l_handshake(msg)[source]

A Pi is telling us it’s alive and its IP.

Send along to _T

Parameters:msg (Message) –
l_file(msg)[source]

A Pilot needs some file from us.

Send it back after base64.b64encode() ing it.

Todo

Split large files into multiple messages…

Parameters:msg (Message) – The value field of the message should contain some relative path to a file contained within prefs.SOUNDDIR . eg. ‘/songs/sadone.wav’ would return ‘os.path.join(prefs.SOUNDDIR/songs.sadone.wav’
class autopilot.core.networking.Pilot_Station[source]

Bases: autopilot.core.networking.Station

Station object used by Pilot objects.

Spawned with a pusher connected back to the Terminal .

Listens

Key Method Description
‘STATE’ ‘COHERE’ ‘PING’ ‘START’ ‘STOP’ ‘PARAM’ ‘FILE’ l_state() l_cohere() l_ping() l_start() l_stop() l_change() l_file() Pilot has changed state Make sure our data and the Terminal’s match. The Terminal wants to know if we’re listening We are being sent a task to start We are being told to stop the current task The Terminal is changing some task parameter We are receiving a file
l_noop(msg)[source]
l_state(msg)[source]

Pilot has changed state

Stash it and alert the Terminal

Parameters:msg (Message) –
l_cohere(msg)[source]

Send our local version of the data table so the terminal can double check

Warning

Not Implemented

Parameters:msg (Message) –
l_ping(msg)[source]

The Terminal wants to know our status

Push back our current state.

Parameters:msg (Message) –
l_start(msg)[source]

We are being sent a task to start

If we need any files, request them.

Then send along to the pilot.

Parameters:msg (Message) – value will contain a dictionary containing a task description.
l_stop(msg)[source]

Tell the pi to stop the task

Parameters:msg (Message) –
l_change(msg)[source]

The terminal is changing a parameter

Warning

Not implemented

Parameters:msg (Message) –
l_file(msg)[source]

We are receiving a file.

Decode from b64 and save. Set the file_block.

Parameters:msg (Message) – value will have ‘path’ and ‘file’, where the path determines where in prefs.SOUNDDIR the b64 encoded ‘file’ will be saved.
l_continuous(msg)[source]
l_child(msg)[source]

Telling our child to run a task.

Parameters:() (msg) –

Returns:

l_forward(msg)[source]

Just forward the message to the pi.

class autopilot.core.networking.Net_Node(id, upstream, port, listens, instance=True, do_logging=True)[source]

Bases: object

Drop in networking object to be given to any sub-object behind some external-facing Station object.

These objects are intended to communicate locally, within a piece of hardware, though not necessarily within the same process.

To minimize the complexity of the network topology, Net_Nodes must communicate through a Station ROUTER, rather than address each other directly.

Variables:
Parameters:
  • id (str) – What are we known as? What do we set our identity as?
  • upstream (str) – The identity of the ROUTER socket used by our upstream Station object.
  • port (int) – The port that our upstream ROUTER socket is bound to
  • listens (dict) – Dictionary of functions to call for different types of messages. keys match the Message.key.
  • instance (bool) – Should the node try and use the existing zmq context and tornado loop?
outbox = {}
timers = {}
logger = None
do_logging = <threading._Event object>
log_handler = None
log_formatter = None
sock = None
loop_thread = None
repeat_interval = 5
context = None
loop = None
listens = {}
id = None
upstream = None
port = None
init_networking()[source]

Creates socket, connects to specified port on localhost, and starts the threaded_loop() as a daemon thread.

threaded_loop()[source]

Run in a thread, either starts the IOLoop, or if it is already started (ie. running in another thread), breaks.

handle_listen(msg)[source]

Upon receiving a message, call the appropriate listen method in a new thread and send confirmation it was received.

Note

Unlike Station.handle_listen() , only the Message.value is given to listen methods. This was initially intended to simplify these methods, but this might change in the future to unify the messaging system.

Parameters:msg (str) – JSON Message.serialize() d message.
send(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]

Send a message via our sock , DEALER socket.

to is not required. Every message is always sent to upstream . to can be included to send a message further up the network tree to a networking object we’re not directly connected to.

Either an already created Message should be passed as msg, or at least key must be provided for a new message created by prepare_message() .

A threading.Timer is created to resend the message using repeat() unless repeat is False.

Parameters:
  • to (str, list) – The identity of the socket this message is to. If not included, sent to upstream() .
  • key (str) – The type of message - used to select which method the receiver uses to process this message.
  • value – Any information this message should contain. Can be any type, but must be JSON serializable.
  • msg (.Message) – An already created message.
  • repeat (bool) – Should this message be resent if confirmation is not received?
repeat()[source]

Periodically (according to repeat_interval) resend messages that haven’t been confirmed

TTL is decremented, and messages are resent until their TTL is 0.

l_confirm(value)[source]

Confirm that a message was received.

Parameters:value (str) – The ID of the message we are confirming.
prepare_message(to, key, value, repeat, flags=None)[source]

Instantiate a Message class, give it an ID and the rest of its attributes.

Parameters:
  • to (str) – The identity of the socket this message is to
  • key (str) – The type of message - used to select which method the receiver uses to process this message.
  • value – Any information this message should contain. Can be any type, but must be JSON serializable.
init_logging()[source]

Initialize logging to a timestamped file in prefs.LOGDIR .

The logger name will be ‘node.{id}’ .

class autopilot.core.networking.Message(*args, **kwargs)[source]

Bases: object

A formatted message.

id, to, sender, and key are required attributes, but any other key-value pair passed on init is added to the message’s attributes and included in the message.

Can be indexed and set like a dictionary (message[‘key’], etc.)

Variables:
  • id (str) – ID that uniquely identifies a message. format {sender.id}_{number}
  • to (str) – ID of socket this message is addressed to
  • sender (str) – ID of socket where this message originates
  • key (str) – Type of message, used to select a listen method to process it
  • value – Body of message, can be any type but must be JSON serializable.
  • timestamp (str) – Timestamp of message creation
  • ttl (int) – Time-To-Live, each message is sent this many times at max, each send decrements ttl.
Parameters:
  • *args
  • **kwargs
id = None
to = None
sender = None
key = None
value = None
flags = {}
timestamp = None
ttl = 2
__getitem__(key)[source]
Parameters:key
__setitem__(key, value)[source]
Parameters:
  • key
  • value
__delitem__(key)[source]
Parameters:key
__contains__(key)[source]
Parameters:key
get_timestamp()[source]
validate()[source]

Checks if id, to, sender, and key are all defined.

Returns:Does message have all required attributes set?
Return type:bool (True)
serialize()[source]

Serializes all attributes in __dict__ using json.

Returns:JSON serialized message.
Return type:str