osbrain.agent
— osBrain agent logic¶
Core agent classes.
-
class
osbrain.agent.
Agent
(name='', host=None, serializer=None, transport=None, attributes=None)¶ Bases:
object
A base agent class which is to be served by an AgentProcess.
An AgentProcess runs a Pyro multiplexed server and serves one Agent object.
Parameters: - name (str, default is None) – Name of the Agent.
- host (str, default is None) – Host address where the agent will bind to. When not set,
'127.0.0.1'
(localhost) is used. - transport (str, AgentAddressTransport, default is None) – Transport protocol.
- attributes (dict, default is None) – A dictionary that defines initial attributes for the agent.
-
name
¶ Name of the agent.
Type: str
-
_host
¶ Host address where the agent is binding to.
Type: str
-
_uuid
¶ Globally unique identifier for the agent.
Type: bytes
-
_running
¶ Set to
True
if the agent is running (executing the main loop).Type: bool
-
_serializer
¶ Default agent serialization format.
Type: str
-
_transport
¶ Default agent transport protocol.
Type: str, AgentAddressTransport, default is None
-
_socket
¶ A dictionary in which the key is the address or the alias and the value is the actual socket.
Type: dict
-
_address
¶ A dictionary in which the key is the address or the alias and the value is the actual address.
Type: dict
-
_handler
¶ A dictionary in which the key is the socket and the values are the handlers for each socket.
Type: dict
-
_context
¶ ZMQ context to create ZMQ socket objects.
Type: zmq.Context
-
_poller
¶ ZMQ poller to wait for incoming data from sockets.
Type: zmq.Poller
-
_poll_timeout
¶ Polling timeout, in milliseconds. After this timeout, if no message is received, the agent executes de
idle()
method before going back to polling.Type: int
-
_keep_alive
¶ When set to
True
, the agent will continue executing the main loop.Type: bool
-
_async_req_uuid
¶ Stores the UUIDs of the asynchronous request sockets (used in communication channels).
Type: dict
-
_async_req_handler
¶ Stores the handler for every asynchronous request sockets (used in communication channels).
Type: dict
-
_die_now
¶ During shutdown, this attribute is set for the agent to die.
Type: bool
-
_DEBUG
¶ Whether to print debug level messages.
Type: bool
-
_pending_requests
¶ Stores pending (waiting for reply) asynchronous requests. The asynchronous request UUID is used as key and its handler as the value.
Type: dict
-
_timer
¶ Stores all the current active timers, using their aliases as keys.
Type: dict
-
addr
(alias)¶ Return the address of a socket given by its alias.
Parameters: alias (str) – Alias of the socket whose address is to be retrieved. Returns: Address of the agent socket associated with the alias. Return type: AgentAddress
-
after
(delay, method, *args, alias=None, **kwargs)¶ Execute an action after a delay.
Parameters: - delay (float) – Execute the action after
delay
seconds. - method – Method (action) to be executed by the agent.
- alias (str, default is None) – An alias for the generated timer.
- *args (tuple) – Parameters to pass for the method execution.
- **kwargs (dict) – Named parameters to pass for the method execution.
Returns: The timer alias or identifier.
Return type: str
- delay (float) – Execute the action after
-
before_loop
()¶ This user-defined method is to be executed right before the main loop.
-
bind
(kind, alias=None, handler=None, addr=None, transport=None, serializer=None)¶ Bind to an agent address.
Parameters: - kind (str, AgentAddressKind) – The agent address kind: PUB, REQ…
- alias (str, default is None) – Optional alias for the socket.
- default is None (handler,) – If the socket receives input messages, the handler/s is/are to be set with this parameter.
- addr (str, default is None) – The address to bind to.
- transport (str, AgentAddressTransport, default is None) – Transport protocol.
Returns: The address where the agent binded to.
Return type:
-
close
(alias, linger=None)¶ Close a socket given its alias and clear its entry from the
Agent._socket
dictionary.
-
close_all
(linger=None)¶ Close all non-internal zmq sockets.
-
connect
(server, alias=None, handler=None)¶ Connect to a server agent address.
Parameters: - server (AgentAddress) – Agent address to connect to.
- alias (str, default is None) – Optional alias for the new address.
- default is None (handler,) – If the new socket receives input messages, the handler/s is/are to be set with this parameter.
-
each
(period, method, *args, alias=None, **kwargs)¶ Execute a repeated action with a defined period.
Parameters: - period (float) – Repeat the action execution with a delay of
period
seconds between executions. - method – Method (action) to be executed by the agent.
- alias (str, default is None) – An alias for the generated timer.
- *args (tuple) – Parameters to pass for the method execution.
- **kwargs (dict) – Named parameters to pass for the method execution.
Returns: The timer alias or identifier.
Return type: str
- period (float) – Repeat the action execution with a delay of
-
execute_as_function
(function, *args, **kwargs)¶ Execute a function passed as parameter.
-
execute_as_method
(function, *args, **kwargs)¶ Execute a function as a method, without adding it to the set of agent methods.
-
get_attr
(name: str)¶ Return the specified attribute of the agent.
Parameters: name – Name of the attribute to be retrieved.
-
has_socket
(alias)¶ Return whether the agent has the passed socket internally stored.
-
idle
()¶ This function is to be executed when the agent is idle.
After a timeout occurs when the agent’s poller receives no data in any of its sockets, the agent may execute this function.
Note
The timeout is set by the agent’s
poll_timeout
attribute.
-
is_running
()¶ Returns a boolean indicating whether the agent is running or not.
-
kill
()¶ Force shutdown of the agent.
If the agent is running the ZMQ context is terminated to allow the main thread to quit and do the tear down.
-
list_timers
()¶ Return a list with all timer aliases currently running.
Returns: A list with all the timer aliases currently running. Return type: list (str)
-
log_debug
(message, logger='_logger')¶ Log a debug message.
Parameters: - message (str) – Message to log.
- logger (str) – Alias of the logger.
-
log_error
(message, logger='_logger')¶ Log an error message.
Parameters: - message (str) – Message to log.
- logger (str) – Alias of the logger.
-
log_info
(message, logger='_logger')¶ Log an info message.
Parameters: - message (str) – Message to log.
- logger (str) – Alias of the logger.
-
log_warning
(message, logger='_logger')¶ Log a warning message.
Parameters: - message (str) – Message to log.
- logger (str) – Alias of the logger.
-
on_init
()¶ This user-defined method is to be executed after initialization.
-
ping
()¶ A test method to check the readiness of the agent. Used for testing purposes, where timing is very important. Do not remove.
-
raise_exception
()¶ Raise an exception (for testing purposes).
-
recv
(address)¶ Receive a message from the specified address.
This method is only used in REQREP communication patterns.
Parameters: address – Returns: The content received in the address. Return type: anything
-
run
()¶ Start the main loop.
-
safe_call
(method, *args, **kwargs)¶ A safe call to a method.
A safe call is simply sent to be executed by the main thread.
Parameters: - method (str) – Method name to be executed by the main thread.
- *args (arguments) – Method arguments.
- *kwargs (keyword arguments) – Method keyword arguments.
-
send
(address, message, topic=None, handler=None, wait=None, on_error=None)¶ Send a message through the specified address.
Note that replies in a REQREP pattern do not use this function in order to be sent.
Parameters: - address (AgentAddress or AgentChannel) – The address to send the message through.
- message – The message to be sent.
- topic (str) – The topic, in case it is relevant (i.e.: for PUB sockets).
- handler (function, method or string) – Code that will be executed on input messages if relevant (i.e.: for asynchronous requests in channels).
- wait (float) – For channel requests, wait at most this number of seconds for a response from the server.
- on_error (function, method or string) – Code to be executed if
wait
is passed and the response is not received.
-
send_recv
(address, message)¶ This method is only used in REQREP communication patterns.
-
set_attr
(**kwargs)¶ Set object attributes.
Parameters: kwargs ([name, value]) – Keyword arguments will be used to set the object attributes.
-
set_logger
(logger, alias='_logger')¶ Connect the agent to a logger and start logging messages to it.
-
set_method
(*args, **kwargs)¶ Set object methods.
Parameters: - args ([function]) – New methods will be created for each function, taking the same name as the original function.
- kwargs ([name, function]) – New methods will be created for each function, taking the name specified by the parameter.
Returns: Name of the registered method in the agent.
Return type: str
-
shutdown
()¶ Cleanly stop and shut down the agent assuming the agent is running.
Will let the main thread do the tear down.
-
stop
()¶ Stop the agent. Agent will stop running.
-
stop_all_timers
()¶ Stop all currently running timers.
-
stop_timer
(alias)¶ Stop a currently running timer.
Parameters: alias (str) – The alias or identifier of the timer.
-
subscribe
(alias: str, handler: Dict[Union[bytes, str], Any]) → None¶ Subscribe a SUB/SYNC_SUB socket given by its alias to the given topics, and leave the handlers prepared internally.
Parameters: - alias – Alias of the new subscriber socket.
- handler – A dictionary in which the keys represent the different topics and the values the actual handlers. If, instead of a dictionary, a single handler is given, it will be used to subscribe the agent to any topic.
-
unsubscribe
(alias: str, topic: Union[bytes, str]) → None¶ Unsubscribe a SUB/SYNC_SUB socket given by its alias from a given specific topic, and delete its entry from the handlers dictionary.
If instead of a single topic, a tuple or a list of topics is passed, the agent will unsubscribe from all the supplied topics.
-
class
osbrain.agent.
AgentProcess
(name='', nsaddr=None, addr=None, serializer=None, transport=None, base=<class 'osbrain.agent.Agent'>, attributes=None)¶ Bases:
multiprocessing.context.Process
Agent class. Instances of an Agent are system processes which can be run independently.
-
kill
()¶ Force kill the agent process.
-
run
()¶ Begin execution of the agent process and start the main loop.
-
start
()¶ Start the system process.
Raises: RuntimeError
– If an error occurred when initializing the daemon.
-
-
osbrain.agent.
compose_message
(message: bytes, topic: bytes, serializer: osbrain.address.AgentAddressSerializer) → bytes¶ Compose a message and leave it ready to be sent through a socket.
This is used in PUB-SUB patterns to combine the topic and the message in a single bytes buffer.
Parameters: - message – Message to be composed.
- topic – Topic to combine the message with.
- serializer – Serialization for the message part.
Returns: Return type: The bytes representation of the final message to be sent.
-
osbrain.agent.
deserialize_message
(message, serializer)¶ Check if a message needs to be deserialized and do it if that is the case.
Parameters: - message (bytes, memoryview) – The serialized message.
- serializer (AgentAddressSerializer) – The type of (de)serializer that should be used.
Returns: The deserialized message, or the same message in case no deserialization is needed.
Return type: anything
-
osbrain.agent.
execute_code_after_yield
(generator)¶ Some responses are dispatched with yield (generator handler). In those cases we still want to execute the remaining code in the generator, and also make sure it does not yield any more.
Parameters: generator – The handler that already yielded one result and is not expected to yield again. Raises: ValueError
– If the generator yielded once more, which is unexpected.
-
osbrain.agent.
run_agent
(name='', nsaddr=None, addr=None, base=<class 'osbrain.agent.Agent'>, serializer=None, transport=None, safe=None, attributes=None)¶ Ease the agent creation process.
This function will create a new agent, start the process and then run its main loop through a proxy.
Parameters: - name (str, default is '') – Agent name or alias.
- nsaddr (SocketAddress, default is None) – Name server address.
- addr (SocketAddress, default is None) – New agent address, if it is to be fixed.
- transport (str, AgentAddressTransport, default is None) – Transport protocol.
- safe (bool, default is None) – Use safe calls by default from the Proxy.
- attributes (dict, default is None) – A dictionary that defines initial attributes for the agent.
Returns: A proxy to the new agent.
Return type: proxy
-
osbrain.agent.
serialize_message
(message, serializer)¶ Check if a message needs to be serialized and do it if that is the case.
Parameters: - message (anything) – The message to serialize.
- serializer (AgentAddressSerializer) – The type of serializer that should be used.
Returns: The serialized message, or the same message in case no serialization is needed.
Return type: bytes