station
digraph inheritancebcbc6077be { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "autopilot.networking.station.Pilot_Station" [URL="../networking/station.html#autopilot.networking.station.Pilot_Station",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip=":class:`~.networking.Station` object used by :class:`~.Pilot`"]; "autopilot.networking.station.Station" -> "autopilot.networking.station.Pilot_Station" [arrowsize=0.5,style="setlinewidth(0.5)"]; "autopilot.networking.station.Station" [URL="../networking/station.html#autopilot.networking.station.Station",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Independent networking class used for messaging between computers."]; "multiprocessing.context.Process" -> "autopilot.networking.station.Station" [arrowsize=0.5,style="setlinewidth(0.5)"]; "autopilot.networking.station.Terminal_Station" [URL="../networking/station.html#autopilot.networking.station.Terminal_Station",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip=":class:`~.networking.Station` object used by :class:`~.Terminal`"]; "autopilot.networking.station.Station" -> "autopilot.networking.station.Terminal_Station" [arrowsize=0.5,style="setlinewidth(0.5)"]; "multiprocessing.context.Process" [fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled"]; "multiprocessing.process.BaseProcess" -> "multiprocessing.context.Process" [arrowsize=0.5,style="setlinewidth(0.5)"]; "multiprocessing.process.BaseProcess" [fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",tooltip="Process objects represent activity that is run in a separate process"]; }Classes:
|
Independent networking class used for messaging between computers. |
|
|
|
- class Station(id: str | None = None, push_ip: str | None = None, push_port: int | None = None, push_id: str | None = None, pusher: bool = False, listen_port: int | None = None, listens: Dict[str, Callable] | None = None)[source]
Bases:
ProcessIndependent networking class used for messaging between computers.
These objects send and handle
networking.Messages by using a dictionary oflistens, or methods that are called to respond to different types of messages.Each sent message is given an ID, and a thread is spawned to periodically resend it (up until some time-to-live, typically 5 times) until confirmation is received.
By default, the only listen these objects have is
l_confirm(), which responds to message confirmations. Accordingly, listens should be added by usingdict.update()rather than reassigning the attribute.Station objects can be made with or without a
pusher, azmq.DEALERsocket that connects to thezmq.ROUTERsocket of an upstream Station object.This class can be instantiated on its own if all of the required arguments are supplied, but the intended pattern of use is to subclass it with any custom
listenmethods for handling message types and other logic that would be specific for an agent type that uses it.Note
This object will likely be deprecated in v0.5.0, as the gains of a separate messaging process are not as great as the complications caused by having two different kinds of networking object in the system. In the future we will move to having a single type of networking object that can either be spawned as a separate process or as a thread.
Args are similar to the documented Attributes, and so only those that differ from attributes are documented here
- Parameters:
pusher (bool) – If
True, create azmq.DEALERsocket connected topush_ip,push_port, andpush_id. (Default:False).- Variables:
context (
zmq.Context) – zeromq contextloop (
tornado.ioloop.IOLoop) – a tornado iolooppusher (
zmq.Socket) – pusher socket - a dealer socket that connects to other routerspush_ip (str) – If we have a dealer, IP to push messages to
push_port (str) – If we have a dealer, port to push messages to
push_id (str) –
identityof the Router we push tolistener (
zmq.Socket) – The main router socket to send/recv messageslisten_port (str) – Port our router listens on
logger (
logging.Logger) – Used to log messages and network events.id (str) – What are we known as? What do we set our
identityas?ip (str) – Device IP
listens (dict) – Dictionary of functions to call for different types of messages. keys match the
Message.key.senders (dict) – Identities of other sockets (keys, ie. directly connected) and their state (values) if they keep one
push_outbox (dict) – Messages that have been sent but have not been confirmed to our
Station.pushersend_outbox (dict) – Messages that have been sent but have not been confirmed to our
Station.listenertimers (dict) – dict of
threading.Timers that will check in on outbox messagesmsg_counter (
itertools.count) – counter to index our sent messagesfile_block (
threading.Event) – Event to signal when a file is being received.
Attributes:
Methods:
run()A
zmq.Contextandtornado.IOLoopare spawned, the listener and optionally the pusher are instantiated and connected tohandle_listen()usingon_recv().prepare_message(to, key, value[, repeat, flags])If a message originates with us, a
Messageclass is instantiated, given an ID and the rest of its attributes.send([to, key, value, msg, repeat, flags])Send a message via our
listener, ROUTER socket.push([to, key, value, msg, repeat, flags])Send a message via our
pusher, DEALER socket.repeat()Periodically (according to
repeat_interval) resend messages that haven't been confirmedl_confirm(msg)Confirm that a message was received.
l_stream(msg)Reconstitute the original stream of messages and call their handling methods
l_kill(msg)Terminal wants us to die :(
handle_listen(msg)Upon receiving a message, call the appropriate listen method in a new thread.
get_ip()Find our IP address
release()- repeat_interval = 5.0
- run()[source]
A
zmq.Contextandtornado.IOLoopare spawned, the listener and optionally the pusher are instantiated and connected tohandle_listen()usingon_recv().The process is kept open by the
tornado.IOLoop.
- prepare_message(to, key, value, repeat=True, flags=None)[source]
If a message originates with us, a
Messageclass is instantiated, given an ID and the rest of its attributes.- Parameters:
flags
repeat
to (str) – The identity of the socket this message is to
key (str) – The type of message - used to select which method the receiver uses to process this message.
value – Any information this message should contain. Can be any type, but must be JSON serializable.
- send(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]
Send a message via our
listener, ROUTER socket.Either an already created
Messageshould be passed as msg, or at least to and key must be provided for a new message created byprepare_message().A
threading.Timeris created to resend the message usingrepeat()unless repeat is False.- Parameters:
flags
to (str) – The identity of the socket this message is to
key (str) – The type of message - used to select which method the receiver uses to process this message.
value – Any information this message should contain. Can be any type, but must be JSON serializable.
msg (.Message) – An already created message.
repeat (bool) – Should this message be resent if confirmation is not received?
- push(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]
Send a message via our
pusher, DEALER socket.Unlike
send(), to is not required. Every message is always sent topush_id. to can be included to send a message further up the network tree to a networking object we’re not directly connected to.Either an already created
Messageshould be passed as msg, or at least key must be provided for a new message created byprepare_message().A
threading.Timeris created to resend the message usingrepeat()unless repeat is False.- Parameters:
flags
to (str) – The identity of the socket this message is to. If not included, sent to
push_id().key (str) – The type of message - used to select which method the receiver uses to process this message.
value – Any information this message should contain. Can be any type, but must be JSON serializable.
msg (.Message) – An already created message.
repeat (bool) – Should this message be resent if confirmation is not received?
- repeat()[source]
Periodically (according to
repeat_interval) resend messages that haven’t been confirmedTTL is decremented, and messages are resent until their TTL is 0.
- l_confirm(msg)[source]
Confirm that a message was received.
- Parameters:
msg (
Message) – A confirmation message - note that this message has its own unique ID, so the value of this message contains the ID of the message that is being confirmed
- l_stream(msg)[source]
Reconstitute the original stream of messages and call their handling methods
The
msgshould contain aninner_keythat indicates the key, and thus the handling method.- Parameters:
msg (dict) – Compressed stream sent by
Net_Node._stream()
- l_kill(msg: Message)[source]
Terminal wants us to die :(
Stop the
Station.loop- Parameters:
msg (
Message)
- handle_listen(msg: List[bytes])[source]
Upon receiving a message, call the appropriate listen method in a new thread.
If the message is
tous, send confirmation.If the message is not
tous, attempt to forward it.- Parameters:
msg (str) – JSON
Message.serialize()d message.
- class Terminal_Station(pilots)[source]
Bases:
StationStationobject used byTerminalobjects.Spawned without a
pusher.Listens
Key
Method
Description
‘PING’
We are asked to confirm that we are alive
‘INIT’
Ask all pilots to confirm that they are alive
‘CHANGE’
Change a parameter on the Pi
‘STOPALL’
Stop all pilots and plots
‘KILL’
l_kill()Terminal wants us to die :(
‘DATA’
Stash incoming data from a Pilot
‘STATE’
A Pilot has changed state
‘HANDSHAKE’
A Pi is telling us it’s alive and its IP
‘FILE’
The pi needs some file from us
- Parameters:
pilots (dict) – The
Terminal.pilotsdictionary.
Attributes:
Methods:
Start a timer that controls how often streamed video frames are sent to
gui.Videoplots.l_ping(msg)We are asked to confirm that we are alive
l_init(msg)Ask all pilots to confirm that they are alive
l_change(msg)Change a parameter on the Pi
l_stopall(msg)Stop all pilots and plots
l_data(msg)Stash incoming data from a Pilot
l_continuous(msg)Handle the storage of continuous data
l_state(msg)A Pilot has changed state.
l_handshake(msg)A Pi is telling us it's alive and its IP.
l_file(msg)A Pilot needs some file from us.
- plot_timer = None
- sent_plot = {}
- start_plot_timer()[source]
Start a timer that controls how often streamed video frames are sent to
gui.Videoplots.
- l_ping(msg: Message)[source]
We are asked to confirm that we are alive
Respond with a blank ‘STATE’ message.
- Parameters:
msg (
Message)
- l_init(msg: Message)[source]
Ask all pilots to confirm that they are alive
Sends a “PING” to everyone in the pilots dictionary.
- Parameters:
msg (
Message)
- l_change(msg: Message)[source]
Change a parameter on the Pi
Warning
Not Implemented
- Parameters:
msg (
Message)
- l_data(msg: Message)[source]
Stash incoming data from a Pilot
Just forward this along to the internal terminal object (‘_T’) and a copy to the relevant plot.
- Parameters:
msg (
Message)
- l_continuous(msg: Message)[source]
Handle the storage of continuous data
Forwards all data on to the Terminal’s internal
Net_Node, send toPlotaccording to update rate inprefs.get('DRAWFPS')- Parameters:
msg (
Message) – A continuous data message
- l_state(msg: Message)[source]
A Pilot has changed state.
Stash in ‘state’ field of pilot dict and send along to _T
- Parameters:
msg (
Message)
- l_handshake(msg: Message)[source]
A Pi is telling us it’s alive and its IP.
Send along to _T
- Parameters:
msg (
Message)
- l_file(msg: Message)[source]
A Pilot needs some file from us.
Send it back after
base64.b64encode()ing it.Todo
Split large files into multiple messages…
- Parameters:
msg (
Message) – The value field of the message should contain some relative path to a file contained within prefs.get(‘SOUNDDIR’) . eg. ‘/songs/sadone.wav’ would return ‘os.path.join(prefs.get(‘SOUNDDIR’)/songs.sadone.wav’
- class Pilot_Station[source]
Bases:
StationStationobject used byPilotobjects.Spawned with a
pusherconnected back to theTerminal.Listens
Key
Method
Description
‘STATE’ ‘COHERE’ ‘PING’ ‘START’ ‘STOP’ ‘PARAM’ ‘FILE’
l_state()l_cohere()l_ping()l_start()l_stop()l_change()l_file()Pilot has changed state Make sure our data and the Terminal’s match. The Terminal wants to know if we’re listening We are being sent a task to start We are being told to stop the current task The Terminal is changing some task parameter We are receiving a file
Attributes:
Methods:
l_noop(msg)l_state(msg)Pilot has changed state
l_cohere(msg)Send our local version of the data table so the terminal can double check
l_ping([msg])The Terminal wants to know our status
l_start(msg)We are being sent a task to start
l_stop(msg)Tell the pi to stop the task
l_change(msg)The terminal is changing a parameter
l_file(msg)We are receiving a file.
l_continuous(msg)Forwards continuous data sent by children back to terminal.
l_child(msg)Tell one or more children to start running a task.
l_forward(msg)Just forward the message to the pi.
- l_state(msg: Message)[source]
Pilot has changed state
Stash it and alert the Terminal
- Parameters:
msg (
Message)
- l_cohere(msg: Message)[source]
Send our local version of the data table so the terminal can double check
Warning
Not Implemented
- Parameters:
msg (
Message)
- l_ping(msg: Message | None = None)[source]
The Terminal wants to know our status
Push back our current state.
- Parameters:
msg (
Message)
- l_start(msg: Message)[source]
We are being sent a task to start
If we need any files, request them.
Then send along to the pilot.
- Parameters:
msg (
Message) – value will contain a dictionary containing a task description.
- l_change(msg: Message)[source]
The terminal is changing a parameter
Warning
Not implemented
- Parameters:
msg (
Message)
- l_file(msg: Message)[source]
We are receiving a file.
Decode from b64 and save. Set the file_block.
- Parameters:
msg (
Message) – value will have ‘path’ and ‘file’, where the path determines where in prefs.get(‘SOUNDDIR’) the b64 encoded ‘file’ will be saved.
- l_continuous(msg: Message)[source]
Forwards continuous data sent by children back to terminal.
Continuous data sources from this pilot should be streamed directly to the terminal.
- Parameters:
msg (
Message) – Continuous data message
- l_child(msg: Message)[source]
Tell one or more children to start running a task.
By default, the key argument passed to self.send is ‘START’. However, this can be overriden by providing the desired string as msg.value[‘KEY’].
This checks the pref CHILDID to get the names of one or more children. If that pref is a string, sends the message to just that child. If that pref is a list, sends the message to each child in the list.
- Parameters:
msg () – A message to send to the child or children.
- Returns:
nothing