osbrain.agent
— osBrain agent logic¶
Core agent classes.
-
class
osbrain.agent.
Agent
(name=None, host=None, serializer=None, transport=None, attributes=None)¶ Bases:
object
A base agent class which is to be served by an AgentProcess.
An AgentProcess runs a Pyro multiplexed server and serves one Agent object.
Parameters: - name : str, default is None
Name of the Agent.
- host : str, default is None
Host address where the agent will bind to. When not set,
'127.0.0.1'
(localhost) is used.- transport : str, AgentAddressTransport, default is None
Transport protocol.
- attributes : dict, default is None
A dictionary that defines initial attributes for the agent.
Attributes: - name : str
Name of the agent.
- host : str
Host address where the agent is binding to.
- socket : dict
A dictionary in which the key is the address or the alias and the value is the actual socket.
- adddress : dict
A dictionary in which the key is the address or the alias and the value is the actual address.
- handler : dict
A dictionary in which the key is the socket and the values are the handlers for each socket.
- poll_timeout : int
Polling timeout, in milliseconds. After this timeout, if no message is received, the agent executes de
idle()
method before going back to polling.- keep_alive : bool
When set to
True
, the agent will continue executing the main loop.- running : bool
Set to
True
if the agent is running (executing the main loop).
Methods
addr
(alias)Return the address of a socket given by its alias. after
(delay, method, *args[, alias])Execute an action after a delay. bind
(kind[, alias, handler, addr, …])Bind to an agent address. close
(alias)Close a socket given its alias and clear its entry from the Agent.socket
dictionary.close_all
()Close all non-internal zmq sockets. connect
(server[, alias, handler])Connect to a server agent address. each
(period, method, *args[, alias])Execute a repeated action with a defined period. execute_as_function
(function, *args, **kwargs)Execute a function passed as parameter. execute_as_method
(function, *args, **kwargs)Execute a function as a method, without adding it to the set of agent methods. get_attr
(name)Return the specified attribute of the agent. get_unique_external_zmq_sockets
()Return an iterable containing all the zmq.Socket objects from self.socket
which are not internal, without repetition.has_socket
(alias)Return whether the agent has the passed socket internally stored. idle
()This function is to be executed when the agent is idle. iterate
()Agent’s main iteration. list_timers
()Return a list with all timer aliases currently running. log_debug
(message[, logger])Log a debug message. log_error
(message[, logger])Log an error message. log_info
(message[, logger])Log an info message. log_warning
(message[, logger])Log a warning message. loop
()Agent’s main loop. on_init
()This user-defined method is to be executed after initialization. ping
()A test method to check the readiness of the agent. raise_exception
()Raise an exception (for testing purposes). recv
(address)Receive a message from the specified address. run
()Run the agent. safe_call
(method, *args, **kwargs)A safe call to a method. send
(address, message[, topic, handler, …])Send a message through the specified address. send_recv
(address, message)This method is only used in REQREP communication patterns. set_attr
(**kwargs)Set object attributes. set_logger
(logger[, alias])Connect the agent to a logger and start logging messages to it. set_method
(*args, **kwargs)Set object methods. stop
()Stop the agent. stop_all_timers
()Stop all currently running timers. stop_timer
(alias)Stop a currently running timer. kill register registered shutdown -
addr
(alias)¶ Return the address of a socket given by its alias.
Parameters: - alias : str
Alias of the socket whose address is to be retreived.
Returns: - AgentAddress
Address of the agent socket associated with the alias.
-
after
(delay, method, *args, alias=None, **kwargs)¶ Execute an action after a delay.
Parameters: - delay : float
Execute the action after
delay
seconds.- method
Method (action) to be executed by the agent.
- alias : str, default is None
An alias for the generated timer.
- *args : tuple
Parameters to pass for the method execution.
- **kwargs : dict
Named parameters to pass for the method execution.
Returns: - str
The timer alias or identifier.
-
bind
(kind, alias=None, handler=None, addr=None, transport=None, serializer=None)¶ Bind to an agent address.
Parameters: - kind : str, AgentAddressKind
The agent address kind: PUB, REQ…
- alias : str, default is None
Optional alias for the socket.
- handler, default is None
If the socket receives input messages, the handler/s is/are to be set with this parameter.
- addr : str, default is None
The address to bind to.
- transport : str, AgentAddressTransport, default is None
Transport protocol.
Returns: - AgentAddress
The address where the agent binded to.
-
close
(alias)¶ Close a socket given its alias and clear its entry from the
Agent.socket
dictionary.
-
close_all
()¶ Close all non-internal zmq sockets.
-
connect
(server, alias=None, handler=None)¶ Connect to a server agent address.
Parameters: - server : AgentAddress
Agent address to connect to.
- alias : str, default is None
Optional alias for the new address.
- handler, default is None
If the new socket receives input messages, the handler/s is/are to be set with this parameter.
-
each
(period, method, *args, alias=None, **kwargs)¶ Execute a repeated action with a defined period.
Parameters: - period : float
Repeat the action execution with a delay of
period
seconds between executions.- method
Method (action) to be executed by the agent.
- alias : str, default is None
An alias for the generated timer.
- *args : tuple
Parameters to pass for the method execution.
- **kwargs : dict
Named parameters to pass for the method execution.
Returns: - str
The timer alias or identifier.
-
execute_as_function
(function, *args, **kwargs)¶ Execute a function passed as parameter.
-
execute_as_method
(function, *args, **kwargs)¶ Execute a function as a method, without adding it to the set of agent methods.
-
get_attr
(name: str)¶ Return the specified attribute of the agent.
Parameters: - name
Name of the attribute to be retrieved.
-
get_unique_external_zmq_sockets
()¶ Return an iterable containing all the zmq.Socket objects from
self.socket
which are not internal, without repetition.Originally, a socket was internal if its alias was one of the following:
- loopback
- _loopback_safe
- inproc://loopback
- inproc://_loopback_safe
However, since we are storing more than one entry in the
self.socket
dictionary per zmq.socket (by storing its AgentAddress, for example), we need a way to simply get all non-internal zmq.socket objects, and this is precisely what this function does.
-
has_socket
(alias)¶ Return whether the agent has the passed socket internally stored.
-
idle
()¶ This function is to be executed when the agent is idle.
After a timeout occurs when the agent’s poller receives no data in any of its sockets, the agent may execute this function.
-
iterate
()¶ Agent’s main iteration.
This iteration is normally executed inside the main loop.
The agent is polling all its sockets for input data. It will wait for
poll_timeout
; after this period, the methodidle
will be executed before polling again.Returns: - int
1 if an error occurred during the iteration (we would expect this to happen if an interruption occurs during polling).
0 otherwise.
-
kill
()¶
-
list_timers
()¶ Return a list with all timer aliases currently running.
Returns: - list (str)
A list with all the timer aliases currently running.
-
log_debug
(message, logger='_logger')¶ Log a debug message.
Parameters: - message : str
Message to log.
- logger : str
Alias of the logger.
-
log_error
(message, logger='_logger')¶ Log an error message.
Parameters: - message : str
Message to log.
- logger : str
Alias of the logger.
-
log_info
(message, logger='_logger')¶ Log an info message.
Parameters: - message : str
Message to log.
- logger : str
Alias of the logger.
-
log_warning
(message, logger='_logger')¶ Log a warning message.
Parameters: - message : str
Message to log.
- logger : str
Alias of the logger.
-
loop
()¶ Agent’s main loop.
This loop is executed until the
keep_alive
attribute is False or until an error occurs.
-
on_init
()¶ This user-defined method is to be executed after initialization.
-
ping
()¶ A test method to check the readiness of the agent. Used for testing purposes, where timing is very important. Do not remove.
-
raise_exception
()¶ Raise an exception (for testing purposes).
-
recv
(address)¶ Receive a message from the specified address.
This method is only used in REQREP communication patterns.
Parameters: - address :
Returns: - anything
The content received in the address.
-
register
(socket, address, alias=None, handler=None)¶
-
registered
(address)¶
-
run
()¶ Run the agent.
-
safe_call
(method, *args, **kwargs)¶ A safe call to a method.
A safe call is simply sent to be executed by the main thread.
Parameters: - method : str
Method name to be executed by the main thread.
- *args : arguments
Method arguments.
- *kwargs : keyword arguments
Method keyword arguments.
-
send
(address, message, topic=None, handler=None, wait=None, on_error=None)¶ Send a message through the specified address.
Note that replies in a REQREP pattern do not use this function in order to be sent.
Parameters: - address : AgentAddress or AgentChannel
The address to send the message through.
- message
The message to be sent.
- topic : str
The topic, in case it is relevant (i.e.: for PUB sockets).
- handler : function, method or string
Code that will be executed on input messages if relevant (i.e.: for PULL sockets).
- wait : float
For channel requests, wait at most this number of seconds for a response from the server.
- on_error : function, method or string
Code to be executed if
wait
is passed and the response is not received.
-
send_recv
(address, message)¶ This method is only used in REQREP communication patterns.
-
set_attr
(**kwargs)¶ Set object attributes.
Parameters: - kwargs : [name, value]
Keyword arguments will be used to set the object attributes.
-
set_logger
(logger, alias='_logger')¶ Connect the agent to a logger and start logging messages to it.
-
set_method
(*args, **kwargs)¶ Set object methods.
Parameters: - args : [function]
New methods will be created for each function, taking the same name as the original function.
- kwargs : [name, function]
New methods will be created for each function, taking the name specified by the parameter.
Returns: - str
Name of the registered method in the agent.
-
shutdown
()¶
-
stop
()¶ Stop the agent. Agent will stop running.
-
stop_all_timers
()¶ Stop all currently running timers.
-
stop_timer
(alias)¶ Stop a currently running timer.
Parameters: - alias : str
The alias or identifier of the timer.
-
class
osbrain.agent.
AgentProcess
(name, nsaddr=None, addr=None, serializer=None, transport=None, base=<class 'osbrain.agent.Agent'>, attributes=None)¶ Bases:
multiprocessing.context.Process
Agent class. Instances of an Agent are system processes which can be run independently.
Attributes: - authkey
daemon
Return whether process is a daemon
exitcode
Return exit code of process or
None
if it has yet to stopident
Return identifier (PID) of process or
None
if it has yet to start- name
pid
Return identifier (PID) of process or
None
if it has yet to startsentinel
Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.
Methods
is_alive
()Return whether process is alive join
([timeout])Wait until child process terminates run
()Method to be run in sub-process; can be overridden in sub-class sigint_handler
(signal, frame)Handle interruption signals. start
()Start child process terminate
()Terminate process; sends SIGTERM signal or uses TerminateProcess() kill -
kill
()¶
-
run
()¶ Method to be run in sub-process; can be overridden in sub-class
-
sigint_handler
(signal, frame)¶ Handle interruption signals.
-
start
()¶ Start child process
-
osbrain.agent.
compose_message
(message: bytes, topic: bytes, serializer: osbrain.address.AgentAddressSerializer) → bytes¶ Compose a message and leave it ready to be sent through a socket.
This is used in PUB-SUB patterns to combine the topic and the message in a single bytes buffer.
Parameters: - message
Message to be composed.
- topic
Topic to combine the message with.
- serializer
Serialization for the message part.
Returns: - The bytes representation of the final message to be sent.
-
osbrain.agent.
deserialize_message
(message, serializer)¶ Check if a message needs to be deserialized and do it if that is the case.
Parameters: - message : bytes, memoryview
The serialized message.
- serializer : AgentAddressSerializer
The type of (de)serializer that should be used.
Returns: - anything
The deserialized message, or the same message in case no deserialization is needed.
-
osbrain.agent.
execute_code_after_yield
(generator)¶ Some responses are dispatched with yield (generator handler). In those cases we still want to execute the remaining code in the generator, and also make sure it does not yield any more.
Raises: - ValueError
If the generator yielded once more, which is unexpected.
-
osbrain.agent.
run_agent
(name, nsaddr=None, addr=None, base=<class 'osbrain.agent.Agent'>, serializer=None, transport=None, safe=None, attributes=None)¶ Ease the agent creation process.
This function will create a new agent, start the process and then run its main loop through a proxy.
Parameters: - name : str
Agent name or alias.
- nsaddr : SocketAddress, default is None
Name server address.
- addr : SocketAddress, default is None
New agent address, if it is to be fixed.
- transport : str, AgentAddressTransport, default is None
Transport protocol.
- safe : bool, default is None
Use safe calls by default from the Proxy.
- attributes : dict, default is None
A dictionary that defines initial attributes for the agent.
Returns: - proxy
A proxy to the new agent.
-
osbrain.agent.
serialize_message
(message, serializer)¶ Check if a message needs to be serialized and do it if that is the case.
Parameters: - message : anything
The message to serialize.
- serializer : AgentAddressSerializer
The type of serializer that should be used.
Returns: - bytes
The serialized message, or the same message in case no serialization is needed.