node
Classes:
|
Drop in networking object to be given to any sub-object behind some external-facing |
- class Net_Node(id: str, upstream: str, port: int, listens: Dict[str, Callable], instance: bool = True, upstream_ip: str = 'localhost', router_port: int | None = None, daemon: bool = True, expand_on_receive: bool = True)[source]
Bases:
object
Drop in networking object to be given to any sub-object behind some external-facing
Station
object.- To minimize the complexity of the network topology, the typical way to use
addressing each other directly. Practically, this means that all messages are sent first to the parent
networking.Station
object, which then handles them, forwards them, etc. This proved to be horribly misguided and will be changed in v0.5.0 to support simplified messaging to aagent_id.netnode_id
address. Until then the networking modules will be in a bit of flux.To receive messages directly at this Net_Node, pass the
router_port
which will bind azmq.ROUTER
socket, and messages will be handled as regular ‘listens’ Note that Net_Nodes assume that they are the final recipients of messages, and so don’t handle forwarding messages (unless alisten
method explicitly does so), and will automatically deserialize them on receipt.Note
Listen methods currently receive only the
value
of a message, this will change in v0.5.0, where they will receive the full message likenetworking.Station
objects.- Parameters:
id (str) – What are we known as? What do we set our
identity
as?upstream (str) – The identity of the ROUTER socket used by our upstream
Station
object.port (int) – The port that our upstream ROUTER socket is bound to
listens (dict) – Dictionary of functions to call for different types of messages. keys match the
Message.key
.instance (bool) – Should the node try and use the existing zmq context and tornado loop?
upstream_ip (str) – If this Net_Node is being used on its own (ie. not behind a
Station
), it can directly connect to another node at this IP. Otherwise use ‘localhost’ to connect to a station.router_port (int) – Typically, Net_Nodes only have a single Dealer socket and receive messages from their encapsulating
Station
, but if you want to take this node offroad and use it independently, an int here binds a Router to the port.daemon (bool) – Run the IOLoop thread as a
daemon
(default:True
)
- Variables:
context (
zmq.Context
) – zeromq contextloop (
tornado.ioloop.IOLoop
) – a tornado ioloopsock (
zmq.Socket
) – Our DEALER socket.id (str) – What are we known as? What do we set our
identity
as?upstream (str) – The identity of the ROUTER socket used by our upstream
Station
object.port (int) – The port that our upstream ROUTER socket is bound to
listens (dict) – Dictionary of functions to call for different types of messages. keys match the
Message.key
.outbox (dict) – Messages that have been sent but have not been confirmed
timers (dict) – dict of
threading.Timer
s that will check in on outbox messageslogger (
logging.Logger
) – Used to log messages and network events.msg_counter (
itertools.count
) – counter to index our sent messagesloop_thread (
threading.Thread
) – Thread that holds our loop. initialized with daemon=True
Attributes:
Find our IP address
Methods:
Creates socket, connects to specified port on localhost, and starts the
threaded_loop()
as a daemon thread.Run in a thread, either starts the IOLoop, or if it is already started (ie.
handle_listen
(msg)Upon receiving a message, call the appropriate listen method in a new thread and send confirmation it was received.
send
([to, key, value, msg, repeat, flags, ...])Send a message via our
sock
, DEALER socket.repeat
()Periodically (according to
repeat_interval
) resend messages that haven't been confirmedl_confirm
(value)Confirm that a message was received.
l_stream
(msg)Reconstitute the original stream of messages and call their handling methods
prepare_message
(to, key, value, repeat[, ...])Instantiate a
Message
class, give it an ID and the rest of its attributes.get_stream
(id, key[, min_size, upstream, ...])Make a queue that another object can dump data into that sends on its own socket.
release
()- repeat_interval = 5
- init_networking()[source]
Creates socket, connects to specified port on localhost, and starts the
threaded_loop()
as a daemon thread.
- threaded_loop()[source]
Run in a thread, either starts the IOLoop, or if it is already started (ie. running in another thread), breaks.
- handle_listen(msg: List[bytes])[source]
Upon receiving a message, call the appropriate listen method in a new thread and send confirmation it was received.
Note
Unlike
Station.handle_listen()
, only theMessage.value
is given to listen methods. This was initially intended to simplify these methods, but this might change in the future to unify the messaging system.- Parameters:
msg (list) – JSON
Message.serialize()
d message.
- send(to: str | list | None = None, key: str | None = None, value: Any | None = None, msg: Message | None = None, repeat: bool = False, flags=None, force_to: bool = False, blosc: bool = False)[source]
Send a message via our
sock
, DEALER socket.to is not required.
If the node doesn’t have a router, (or the recipient is not in the
Net_Node.senders
dict ) every message is always sent toupstream
. to can be included to send a message further up the network tree to a networking object we’re not directly connected to.If the node has a router, since messages can only be sent on router sockets after the recipient has first sent us a message, if the
to
is in thesenders
dict, it will be directly sent viaNet_Node.router
If the
force_to
arg isTrue
, send to theto
recipient directly via the dealerNet_Node.sock
If
to
is a list, or is intended to be sent as a multihop message with an explicit path, then networking objects will attempt to forward it along that path (disregarding implicit topology).
Either an already created
Message
should be passed as msg, or at least key must be provided for a new message created byprepare_message()
.A
threading.Timer
is created to resend the message usingrepeat()
unless repeat is False.- Parameters:
to (str, list) – The identity of the socket this message is to. If not included, sent to
upstream()
.key (str) – The type of message - used to select which method the receiver uses to process this message.
value – Any information this message should contain. Can be any type, but must be JSON serializable.
msg (.Message) – An already created message.
repeat (bool) – Should this message be resent if confirmation is not received?
flags (dict)
force_to (bool) – If we really really want to use the ‘to’ field to address messages (eg. node being used for direct communication), overrides default behavior of sending to upstream.
blosc (bool) – Tell the message to compress its serialized contents with blosc
- repeat()[source]
Periodically (according to
repeat_interval
) resend messages that haven’t been confirmedTTL is decremented, and messages are resent until their TTL is 0.
- l_confirm(value)[source]
Confirm that a message was received.
- Parameters:
value (str) – The ID of the message we are confirming.
- l_stream(msg)[source]
Reconstitute the original stream of messages and call their handling methods
The
msg
should contain aninner_key
that indicates the key, and thus the handling method.- Parameters:
msg (dict) – Compressed stream sent by
Net_Node._stream()
- prepare_message(to, key, value, repeat, flags=None, blosc: bool = False)[source]
Instantiate a
Message
class, give it an ID and the rest of its attributes.- Parameters:
flags
repeat
to (str) – The identity of the socket this message is to
key (str) – The type of message - used to select which method the receiver uses to process this message.
value – Any information this message should contain. Can be any type, but must be JSON serializable.
blosc (bool) – Whether or not the message should be compressed with blosc
- get_stream(id, key, min_size=5, upstream=None, port=None, ip=None, subject=None, q_size: int | None = None)[source]
Make a queue that another object can dump data into that sends on its own socket. Smarter handling of continuous data than just hitting ‘send’ a shitload of times. :returns: Place to dump ur data :rtype: Queue
- property ip: str
Find our IP address
Todo
this is a copy of the
Station.get_ip()
method – unify this in v0.5.0returns (str): our IPv4 address.