node

Classes:

Net_Node(id, upstream, port, listens[, ...])

Drop in networking object to be given to any sub-object behind some external-facing Station object.

class Net_Node(id: str, upstream: str, port: int, listens: Dict[str, Callable], instance: bool = True, upstream_ip: str = 'localhost', router_port: Optional[int] = None, daemon: bool = True, expand_on_receive: bool = True)[source]

Bases: object

Drop in networking object to be given to any sub-object behind some external-facing Station object.

To minimize the complexity of the network topology, the typical way to use

``Net_Node``s is through a Station ROUTER, rather than

addressing each other directly. Practically, this means that all messages are sent first to the parent networking.Station object, which then handles them, forwards them, etc. This proved to be horribly misguided and will be changed in v0.5.0 to support simplified messaging to a agent_id.netnode_id address. Until then the networking modules will be in a bit of flux.

To receive messages directly at this Net_Node, pass the router_port which will bind a zmq.ROUTER socket, and messages will be handled as regular ‘listens’ Note that Net_Nodes assume that they are the final recipients of messages, and so don’t handle forwarding messages (unless a listen method explicitly does so), and will automatically deserialize them on receipt.

Note

Listen methods currently receive only the value of a message, this will change in v0.5.0, where they will receive the full message like networking.Station objects.

Parameters
  • id (str) – What are we known as? What do we set our identity as?

  • upstream (str) – The identity of the ROUTER socket used by our upstream Station object.

  • port (int) – The port that our upstream ROUTER socket is bound to

  • listens (dict) – Dictionary of functions to call for different types of messages. keys match the Message.key.

  • instance (bool) – Should the node try and use the existing zmq context and tornado loop?

  • upstream_ip (str) – If this Net_Node is being used on its own (ie. not behind a Station), it can directly connect to another node at this IP. Otherwise use ‘localhost’ to connect to a station.

  • router_port (int) – Typically, Net_Nodes only have a single Dealer socket and receive messages from their encapsulating Station, but if you want to take this node offroad and use it independently, an int here binds a Router to the port.

  • daemon (bool) – Run the IOLoop thread as a daemon (default: True)

Variables
  • context (zmq.Context) – zeromq context

  • loop (tornado.ioloop.IOLoop) – a tornado ioloop

  • sock (zmq.Socket) – Our DEALER socket.

  • id (str) – What are we known as? What do we set our identity as?

  • upstream (str) – The identity of the ROUTER socket used by our upstream Station object.

  • port (int) – The port that our upstream ROUTER socket is bound to

  • listens (dict) – Dictionary of functions to call for different types of messages. keys match the Message.key.

  • outbox (dict) – Messages that have been sent but have not been confirmed

  • timers (dict) – dict of threading.Timer s that will check in on outbox messages

  • logger (logging.Logger) – Used to log messages and network events.

  • msg_counter (itertools.count) – counter to index our sent messages

  • loop_thread (threading.Thread) – Thread that holds our loop. initialized with daemon=True

Attributes:

repeat_interval

ip

Find our IP address

Methods:

init_networking()

Creates socket, connects to specified port on localhost, and starts the threaded_loop() as a daemon thread.

threaded_loop()

Run in a thread, either starts the IOLoop, or if it is already started (ie.

handle_listen(msg)

Upon receiving a message, call the appropriate listen method in a new thread and send confirmation it was received.

send([to, key, value, msg, repeat, flags, ...])

Send a message via our sock , DEALER socket.

repeat()

Periodically (according to repeat_interval) resend messages that haven't been confirmed

l_confirm(value)

Confirm that a message was received.

l_stream(msg)

Reconstitute the original stream of messages and call their handling methods

prepare_message(to, key, value, repeat[, flags])

Instantiate a Message class, give it an ID and the rest of its attributes.

get_stream(id, key[, min_size, upstream, ...])

Make a queue that another object can dump data into that sends on its own socket.

release()

repeat_interval = 5
init_networking()[source]

Creates socket, connects to specified port on localhost, and starts the threaded_loop() as a daemon thread.

threaded_loop()[source]

Run in a thread, either starts the IOLoop, or if it is already started (ie. running in another thread), breaks.

handle_listen(msg: List[bytes])[source]

Upon receiving a message, call the appropriate listen method in a new thread and send confirmation it was received.

Note

Unlike Station.handle_listen() , only the Message.value is given to listen methods. This was initially intended to simplify these methods, but this might change in the future to unify the messaging system.

Parameters

msg (list) – JSON Message.serialize() d message.

send(to: Optional[Union[str, list]] = None, key: Optional[str] = None, value: Optional[Any] = None, msg: Optional[Message] = None, repeat: bool = False, flags=None, force_to: bool = False)[source]

Send a message via our sock , DEALER socket.

to is not required.

  • If the node doesn’t have a router, (or the recipient is not in the Net_Node.senders dict ) every message is always sent to upstream . to can be included to send a message further up the network tree to a networking object we’re not directly connected to.

  • If the node has a router, since messages can only be sent on router sockets after the recipient has first sent us a message, if the to is in the senders dict, it will be directly sent via Net_Node.router

  • If the force_to arg is True, send to the to recipient directly via the dealer Net_Node.sock

  • If to is a list, or is intended to be sent as a multihop message with an explicit path, then networking objects will attempt to forward it along that path (disregarding implicit topology).

Either an already created Message should be passed as msg, or at least key must be provided for a new message created by prepare_message() .

A threading.Timer is created to resend the message using repeat() unless repeat is False.

Parameters
  • to (str, list) – The identity of the socket this message is to. If not included, sent to upstream() .

  • key (str) – The type of message - used to select which method the receiver uses to process this message.

  • value – Any information this message should contain. Can be any type, but must be JSON serializable.

  • msg (.Message) – An already created message.

  • repeat (bool) – Should this message be resent if confirmation is not received?

  • flags (dict)

  • force_to (bool) – If we really really want to use the ‘to’ field to address messages (eg. node being used for direct communication), overrides default behavior of sending to upstream.

repeat()[source]

Periodically (according to repeat_interval) resend messages that haven’t been confirmed

TTL is decremented, and messages are resent until their TTL is 0.

l_confirm(value)[source]

Confirm that a message was received.

Parameters

value (str) – The ID of the message we are confirming.

l_stream(msg)[source]

Reconstitute the original stream of messages and call their handling methods

The msg should contain an inner_key that indicates the key, and thus the handling method.

Parameters

msg (dict) – Compressed stream sent by Net_Node._stream()

prepare_message(to, key, value, repeat, flags=None)[source]

Instantiate a Message class, give it an ID and the rest of its attributes.

Parameters
  • flags

  • repeat

  • to (str) – The identity of the socket this message is to

  • key (str) – The type of message - used to select which method the receiver uses to process this message.

  • value – Any information this message should contain. Can be any type, but must be JSON serializable.

get_stream(id, key, min_size=5, upstream=None, port=None, ip=None, subject=None, q_size: Optional[int] = None)[source]

Make a queue that another object can dump data into that sends on its own socket. Smarter handling of continuous data than just hitting ‘send’ a shitload of times. :returns: Place to dump ur data :rtype: Queue

property ip: str

Find our IP address

Todo

this is a copy of the Station.get_ip() method – unify this in v0.5.0

returns (str): our IPv4 address.

release()[source]