station

Inheritance diagram of autopilot.networking.station

Classes:

Station([id, push_ip, push_port, push_id, ...])

Independent networking class used for messaging between computers.

Terminal_Station(pilots)

Station object used by Terminal objects.

Pilot_Station()

Station object used by Pilot objects.

class Station(id: Optional[str] = None, push_ip: Optional[str] = None, push_port: Optional[int] = None, push_id: Optional[str] = None, pusher: bool = False, listen_port: Optional[int] = None, listens: Optional[Dict[str, Callable]] = None)[source]

Bases: multiprocessing.context.Process

Independent networking class used for messaging between computers.

These objects send and handle networking.Message s by using a dictionary of listens, or methods that are called to respond to different types of messages.

Each sent message is given an ID, and a thread is spawned to periodically resend it (up until some time-to-live, typically 5 times) until confirmation is received.

By default, the only listen these objects have is l_confirm(), which responds to message confirmations. Accordingly, listens should be added by using dict.update() rather than reassigning the attribute.

Station objects can be made with or without a pusher, a zmq.DEALER socket that connects to the zmq.ROUTER socket of an upstream Station object.

This class can be instantiated on its own if all of the required arguments are supplied, but the intended pattern of use is to subclass it with any custom listen methods for handling message types and other logic that would be specific for an agent type that uses it.

Note

This object will likely be deprecated in v0.5.0, as the gains of a separate messaging process are not as great as the complications caused by having two different kinds of networking object in the system. In the future we will move to having a single type of networking object that can either be spawned as a separate process or as a thread.

Args are similar to the documented Attributes, and so only those that differ from attributes are documented here

Parameters

pusher (bool) – If True, create a zmq.DEALER socket connected to push_ip, push_port, and push_id. (Default: False).

Variables
  • context (zmq.Context) – zeromq context

  • loop (tornado.ioloop.IOLoop) – a tornado ioloop

  • pusher (zmq.Socket) – pusher socket - a dealer socket that connects to other routers

  • push_ip (str) – If we have a dealer, IP to push messages to

  • push_port (str) – If we have a dealer, port to push messages to

  • push_id (str) – identity of the Router we push to

  • listener (zmq.Socket) – The main router socket to send/recv messages

  • listen_port (str) – Port our router listens on

  • logger (logging.Logger) – Used to log messages and network events.

  • id (str) – What are we known as? What do we set our identity as?

  • ip (str) – Device IP

  • listens (dict) – Dictionary of functions to call for different types of messages. keys match the Message.key.

  • senders (dict) – Identities of other sockets (keys, ie. directly connected) and their state (values) if they keep one

  • push_outbox (dict) – Messages that have been sent but have not been confirmed to our Station.pusher

  • send_outbox (dict) – Messages that have been sent but have not been confirmed to our Station.listener

  • timers (dict) – dict of threading.Timer s that will check in on outbox messages

  • msg_counter (itertools.count) – counter to index our sent messages

  • file_block (threading.Event) – Event to signal when a file is being received.

Attributes:

repeat_interval

Methods:

run()

A zmq.Context and tornado.IOLoop are spawned, the listener and optionally the pusher are instantiated and connected to handle_listen() using on_recv() .

prepare_message(to, key, value[, repeat, flags])

If a message originates with us, a Message class is instantiated, given an ID and the rest of its attributes.

send([to, key, value, msg, repeat, flags])

Send a message via our listener , ROUTER socket.

push([to, key, value, msg, repeat, flags])

Send a message via our pusher , DEALER socket.

repeat()

Periodically (according to repeat_interval) resend messages that haven't been confirmed

l_confirm(msg)

Confirm that a message was received.

l_stream(msg)

Reconstitute the original stream of messages and call their handling methods

l_kill(msg)

Terminal wants us to die :(

handle_listen(msg)

Upon receiving a message, call the appropriate listen method in a new thread.

get_ip()

Find our IP address

release()

repeat_interval = 5.0
loop: Optional[tornado.ioloop.IOLoop]
pusher: Union[bool, zmq.sugar.socket.Socket]
run()[source]

A zmq.Context and tornado.IOLoop are spawned, the listener and optionally the pusher are instantiated and connected to handle_listen() using on_recv() .

The process is kept open by the tornado.IOLoop .

prepare_message(to, key, value, repeat=True, flags=None)[source]

If a message originates with us, a Message class is instantiated, given an ID and the rest of its attributes.

Parameters
  • flags

  • repeat

  • to (str) – The identity of the socket this message is to

  • key (str) – The type of message - used to select which method the receiver uses to process this message.

  • value – Any information this message should contain. Can be any type, but must be JSON serializable.

send(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]

Send a message via our listener , ROUTER socket.

Either an already created Message should be passed as msg, or at least to and key must be provided for a new message created by prepare_message() .

A threading.Timer is created to resend the message using repeat() unless repeat is False.

Parameters
  • flags

  • to (str) – The identity of the socket this message is to

  • key (str) – The type of message - used to select which method the receiver uses to process this message.

  • value – Any information this message should contain. Can be any type, but must be JSON serializable.

  • msg (.Message) – An already created message.

  • repeat (bool) – Should this message be resent if confirmation is not received?

push(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]

Send a message via our pusher , DEALER socket.

Unlike send() , to is not required. Every message is always sent to push_id . to can be included to send a message further up the network tree to a networking object we’re not directly connected to.

Either an already created Message should be passed as msg, or at least key must be provided for a new message created by prepare_message() .

A threading.Timer is created to resend the message using repeat() unless repeat is False.

Parameters
  • flags

  • to (str) – The identity of the socket this message is to. If not included, sent to push_id() .

  • key (str) – The type of message - used to select which method the receiver uses to process this message.

  • value – Any information this message should contain. Can be any type, but must be JSON serializable.

  • msg (.Message) – An already created message.

  • repeat (bool) – Should this message be resent if confirmation is not received?

repeat()[source]

Periodically (according to repeat_interval) resend messages that haven’t been confirmed

TTL is decremented, and messages are resent until their TTL is 0.

l_confirm(msg)[source]

Confirm that a message was received.

Parameters

msg (Message) – A confirmation message - note that this message has its own unique ID, so the value of this message contains the ID of the message that is being confirmed

l_stream(msg)[source]

Reconstitute the original stream of messages and call their handling methods

The msg should contain an inner_key that indicates the key, and thus the handling method.

Parameters

msg (dict) – Compressed stream sent by Net_Node._stream()

l_kill(msg: autopilot.networking.message.Message)[source]

Terminal wants us to die :(

Stop the Station.loop

Parameters

msg (Message)

handle_listen(msg: List[bytes])[source]

Upon receiving a message, call the appropriate listen method in a new thread.

If the message is to us, send confirmation.

If the message is not to us, attempt to forward it.

Parameters

msg (str) – JSON Message.serialize() d message.

get_ip()[source]

Find our IP address

returns (str): our IPv4 address.

release()[source]
class Terminal_Station(pilots)[source]

Bases: autopilot.networking.station.Station

Station object used by Terminal objects.

Spawned without a pusher.

Listens

Key

Method

Description

‘PING’

l_ping()

We are asked to confirm that we are alive

‘INIT’

l_init()

Ask all pilots to confirm that they are alive

‘CHANGE’

l_change()

Change a parameter on the Pi

‘STOPALL’

l_stopall()

Stop all pilots and plots

‘KILL’

l_kill()

Terminal wants us to die :(

‘DATA’

l_data()

Stash incoming data from a Pilot

‘STATE’

l_state()

A Pilot has changed state

‘HANDSHAKE’

l_handshake()

A Pi is telling us it’s alive and its IP

‘FILE’

l_file()

The pi needs some file from us

Parameters

pilots (dict) – The Terminal.pilots dictionary.

Attributes:

plot_timer

sent_plot

Methods:

start_plot_timer()

Start a timer that controls how often streamed video frames are sent to gui.Video plots.

l_ping(msg)

We are asked to confirm that we are alive

l_init(msg)

Ask all pilots to confirm that they are alive

l_change(msg)

Change a parameter on the Pi

l_stopall(msg)

Stop all pilots and plots

l_data(msg)

Stash incoming data from a Pilot

l_continuous(msg)

Handle the storage of continuous data

l_state(msg)

A Pilot has changed state.

l_handshake(msg)

A Pi is telling us it's alive and its IP.

l_file(msg)

A Pilot needs some file from us.

plot_timer = None
sent_plot = {}
pusher: Union[bool, zmq.sugar.socket.Socket]
loop: Optional[tornado.ioloop.IOLoop]
start_plot_timer()[source]

Start a timer that controls how often streamed video frames are sent to gui.Video plots.

l_ping(msg: autopilot.networking.message.Message)[source]

We are asked to confirm that we are alive

Respond with a blank ‘STATE’ message.

Parameters

msg (Message)

l_init(msg: autopilot.networking.message.Message)[source]

Ask all pilots to confirm that they are alive

Sends a “PING” to everyone in the pilots dictionary.

Parameters

msg (Message)

l_change(msg: autopilot.networking.message.Message)[source]

Change a parameter on the Pi

Warning

Not Implemented

Parameters

msg (Message)

l_stopall(msg: autopilot.networking.message.Message)[source]

Stop all pilots and plots

Parameters

msg (Message)

l_data(msg: autopilot.networking.message.Message)[source]

Stash incoming data from a Pilot

Just forward this along to the internal terminal object (‘_T’) and a copy to the relevant plot.

Parameters

msg (Message)

l_continuous(msg: autopilot.networking.message.Message)[source]

Handle the storage of continuous data

Forwards all data on to the Terminal’s internal Net_Node, send to Plot according to update rate in prefs.get('DRAWFPS')

Parameters

msg (Message) – A continuous data message

l_state(msg: autopilot.networking.message.Message)[source]

A Pilot has changed state.

Stash in ‘state’ field of pilot dict and send along to _T

Parameters

msg (Message)

l_handshake(msg: autopilot.networking.message.Message)[source]

A Pi is telling us it’s alive and its IP.

Send along to _T

Parameters

msg (Message)

l_file(msg: autopilot.networking.message.Message)[source]

A Pilot needs some file from us.

Send it back after base64.b64encode() ing it.

Todo

Split large files into multiple messages…

Parameters

msg (Message) – The value field of the message should contain some relative path to a file contained within prefs.get(‘SOUNDDIR’) . eg. ‘/songs/sadone.wav’ would return ‘os.path.join(prefs.get(‘SOUNDDIR’)/songs.sadone.wav’

class Pilot_Station[source]

Bases: autopilot.networking.station.Station

Station object used by Pilot objects.

Spawned with a pusher connected back to the Terminal .

Listens

Key

Method

Description

‘STATE’ ‘COHERE’ ‘PING’ ‘START’ ‘STOP’ ‘PARAM’ ‘FILE’

l_state() l_cohere() l_ping() l_start() l_stop() l_change() l_file()

Pilot has changed state Make sure our data and the Terminal’s match. The Terminal wants to know if we’re listening We are being sent a task to start We are being told to stop the current task The Terminal is changing some task parameter We are receiving a file

Attributes:

Methods:

l_noop(msg)

l_state(msg)

Pilot has changed state

l_cohere(msg)

Send our local version of the data table so the terminal can double check

l_ping([msg])

The Terminal wants to know our status

l_start(msg)

We are being sent a task to start

l_stop(msg)

Tell the pi to stop the task

l_change(msg)

The terminal is changing a parameter

l_file(msg)

We are receiving a file.

l_continuous(msg)

Forwards continuous data sent by children back to terminal.

l_child(msg)

Tell one or more children to start running a task.

l_forward(msg)

Just forward the message to the pi.

pusher: Union[bool, zmq.sugar.socket.Socket]
l_noop(msg)[source]
l_state(msg: autopilot.networking.message.Message)[source]

Pilot has changed state

Stash it and alert the Terminal

Parameters

msg (Message)

l_cohere(msg: autopilot.networking.message.Message)[source]

Send our local version of the data table so the terminal can double check

Warning

Not Implemented

Parameters

msg (Message)

l_ping(msg: Optional[autopilot.networking.message.Message] = None)[source]

The Terminal wants to know our status

Push back our current state.

Parameters

msg (Message)

l_start(msg: autopilot.networking.message.Message)[source]

We are being sent a task to start

If we need any files, request them.

Then send along to the pilot.

Parameters

msg (Message) – value will contain a dictionary containing a task description.

loop: Optional[tornado.ioloop.IOLoop]
l_stop(msg: autopilot.networking.message.Message)[source]

Tell the pi to stop the task

Parameters

msg (Message)

l_change(msg: autopilot.networking.message.Message)[source]

The terminal is changing a parameter

Warning

Not implemented

Parameters

msg (Message)

l_file(msg: autopilot.networking.message.Message)[source]

We are receiving a file.

Decode from b64 and save. Set the file_block.

Parameters

msg (Message) – value will have ‘path’ and ‘file’, where the path determines where in prefs.get(‘SOUNDDIR’) the b64 encoded ‘file’ will be saved.

l_continuous(msg: autopilot.networking.message.Message)[source]

Forwards continuous data sent by children back to terminal.

Continuous data sources from this pilot should be streamed directly to the terminal.

Parameters

msg (Message) – Continuous data message

l_child(msg: autopilot.networking.message.Message)[source]

Tell one or more children to start running a task.

By default, the key argument passed to self.send is ‘START’. However, this can be overriden by providing the desired string as msg.value[‘KEY’].

This checks the pref CHILDID to get the names of one or more children. If that pref is a string, sends the message to just that child. If that pref is a list, sends the message to each child in the list.

Parameters

msg () – A message to send to the child or children.

Returns

nothing

l_forward(msg: autopilot.networking.message.Message)[source]

Just forward the message to the pi.