osBrain - 0.3.0¶
What is osBrain?¶
osBrain is a general-purpose multi-agent system module written in Python. Agents run independently as system processes and communicate with each other using message passing.
osBrain uses ØMQ for efficient and flexible messsage passing between agents. It also uses Pyro4 to ease the configuration and deployment of complex systems.
Please read the Software License and Disclaimer.
Contents¶
Introduction and Example¶
About osBrain: feature overview¶
osBrain is a general-purpose multi-agent system module written in Python.
- Agents run independently as system processes and communicate with each other using message passing.
- Message passing is implemented using ØMQ, and in particular, the PyZMQ Python bindings.
- ØMQ allows for efficient, asynchronous communication using different commonly used communication patterns such as request-reply, push-pull and publish-subscribe.
- osBrain integrates Pyro4 to ease the configuration and deployment of complex systems.
- Thanks to Pyro4, remote agents can be treated as local objects and reconfigured even when they are running. Not just variables, but also new methods can be created in the remote agents.
- osBrain provides the base for implementing robust, highly-available, flexible multi-agent systems.
- Being implemented in Python, osBrain can take advantage of a huge set of packages for data analysis, statistics, numerical computing, etc. available in the Python ecosystem.
In order to fully understand osBrain capabilities, it is highly recommended to read the Pyro4 documentation and the ØMQ guide.
OsBrain’s history¶
osBrain was initially developed in OpenSistemas based on the need to create a real-time automated-trading platform. This platform needed to be able to process real-time market data updates fast and in a parallel way. Robustness was very important as well in order to prevent running trading strategies from being affected by a failure in another strategy.
Python was chosen for being a great language for fast prototyping and for having a huge data analysis ecosystem available. It was kept for its final performance and the beautiful source code created with it.
The appearance of osBrain was a consecuence of a series of steps that were taken during the development process:
- Isolation of agents; creating separate system processes to avoid shared memory and any problems derived from multi-threading development.
- Implementation of message passing; making use of the modern, efficient and flexible ØMQ library.
- Ease of configuration/deployment; making use of the very convenient, well implemented and documented Pyro4 package.
- Separation from the trading platform; what it started as a basic architecture to implement a real-time automated-trading platform, ended-up being a general-purpose multi-agent system architecture.
What can you use osBrain for?¶
osBrain has been successfully used to develop a real-time automated-trading platform in OpenSistemas, but being a general-purpose multi-agent system, it is not limited to this application. Other applications include:
- Transportation.
- Logistics.
- Defense and military applications.
- Networking.
- Load balancing.
- Self-healing networks.
In general, osBrain can be used whenever a multi-agent system architecture fits the application well:
- Autonomy of the agents.
- Local views.
- Decentralization.
Simple Example¶
In the following example we run a very simple architecture in three steps:
- Run a name server, which will register agents running by their alias.
- Run an agent with an alias
Example
. - Log a
Hello world
message from the agent.
from osbrain import run_nameserver
from osbrain import run_agent
if __name__ == '__main__':
# System deployment
run_nameserver()
agent = run_agent('Example')
# Log a message
agent.log_info('Hello world!')
It is important to note that the agent
variable in that example is just
what is called a proxy to the remote agent so, when log_info()
is called,
this method is actually serialized and executed in the remote agent. Remember
that the agent is running as a separate system process!
Performance¶
The performance of osBrain, just as the performance of any other system architecture, depends a lot on the actual application. The developer should always take this into account:
- Pyro4 is used only for configuration and deployment, which means that the actual performance of the system does not depend on this package.
- ØMQ is used with the PyZMQ Python bindings, which means that the system performance depends on the PyZMQ performance.
- osBrain uses pickle for serialization, which means that the system performance depends on this package.
- ØMQ transport is TCP by default. The network may have a great impact on performance.
Tutorial - 1¶
Installation¶
This tutorial is a step-by-step introduction to osBrain with examples. In order to start playing with this module, you only need to install it.
osBrain requires Python 3. Most probably, Python 3 is already packaged for your favorite distribution (and maybe even installed by default in your system). If you do not have Python 3 available, consider using Conda to create a virtual environment with Python 3.
Installing osBrain is very simple with pip:
pip install osbrain
You should now be able to import osbrain
from a python console:
>>> import osbrain
Hello world¶
The first example is, of course, a simple hello world! program. Three steps are taken here:
- Run a name server.
- Run an agent with an alias
Example
. - Log a
Hello world
message from the agent.
from osbrain import run_nameserver
from osbrain import run_agent
if __name__ == '__main__':
# System deployment
run_nameserver()
agent = run_agent('Example')
# Log a message
agent.log_info('Hello world!')
Running this example from your terminal should simply show you a log message saying Hello world! but, what exactly is happening there?
Agents and proxies¶
An agent, in osBrain, is an entity that runs independly from other agents in the system. An agent, by default, will simply poll for incoming messages before executing the code defined by the developer. This means a single agent, as in the Hello world! example, makes little or no sense. Agents in a multi-agent system start to make sense when connected to each other.
The easiest way to run an agent in an osBrain architecture is by calling the
function osbrain.core.run_agent()
:
>>> agent = run_agent(...)
This function will spawn a new agent and will return a
osbrain.core.Proxy
to it.
Proxies are simply local objects that allow us to easily have access to the remote agent. The fact that agents are run independently from each other justifies the need of a proxy.
A proxy allows us to call methods or access attributes of the remote agent in a very convenient way. See for example the previous call:
>>> agent.log_info('Hello world')
The method log_info()
is implemented in osbrain.core.Agent
so,
when this method is called from the proxy, this call is actually being
serialized to the remote running agent and gets executed there. The return
value, if any, is then serialized back and returned by the proxy. So basically
so get the impression of being working with a local object while your code is
executed remotelly.
The name server¶
A name server is just like another agent, so it runs independently, but with a very specific role. Name servers are used as an address book. This means other agents can be run in the system and can be registered in the name server using a human-readable alias. Aliases help us accessing these agents easily even from remote locations.
Note that when calling the osbrain.core.run_agent()
function, we are
passing a string parameter. This parameter is the alias the agent will use to
register itself in the name server.
When we run a name server calling the osbrain.core.run_nameserver()
, we
also get in return a proxy to this name server:
>>> ns = run_nameserver()
This proxy can be used to list the agents registered in the name server:
from osbrain import run_nameserver
from osbrain import run_agent
if __name__ == '__main__':
# System deployment
ns = run_nameserver()
run_agent('Agent0')
run_agent('Agent1')
run_agent('Agent2')
# Show agents registered in the name server
for alias in ns.agents():
print(alias)
The code above should simply print the aliases of all the agents registered in the name server.
A name server proxy can also be used to create proxies to registered agents. This is specially useful when accessing the multi-agent system from a different console or location, as it will reduce the number of addresses that we need to remember.
from osbrain import run_nameserver
from osbrain import run_agent
if __name__ == '__main__':
# System deployment
ns = run_nameserver()
run_agent('Agent0')
run_agent('Agent1')
run_agent('Agent2')
# Create a proxy to Agent1 and log a message
agent = ns.proxy('Agent1')
agent.log_info('Hello world!')
The code above creates (and registers) three different agents in a name server and then creates, through the name server proxy, a proxy to one of those agents simply using its alias. Then it uses the agent proxy to remotelly call a method to log a Hello world! message.
Push-Pull¶
Now that we understand the basics of how proxies, agents and name servers work, let us jump into a more interesting example.
As mentioned before, a multi-agent system only makes sense if agents are connected with each other and share some information using message passing.
In this first example, we will create two agents: Alice and Bob, and we will make alice send messages to Bob using a simple push-pull communication pattern.
import time
from osbrain import run_agent
from osbrain import run_nameserver
def log_message(agent, message):
agent.log_info('Received: %s' % message)
if __name__ == '__main__':
# System deployment
run_nameserver()
alice = run_agent('Alice')
bob = run_agent('Bob')
# System configuration
addr = alice.bind('PUSH', alias='main')
bob.connect(addr, handler=log_message)
# Send messages
while True:
time.sleep(1)
alice.send('main', 'Hello, Bob!')
So, in this case, we are doing some more stuff. After we spawn Alice and Bob, we connect them.
First, we make Alice bind:
>>> addr = alice.bind('PUSH', alias='main')
There are three things to remark in that line:
- The first parameter
'PUSH'
represents the communication pattern we want to use. In this case we are using a simple push-pull (unidirectional) pattern to allow Alice to send messages to Bob. - The second parameter is, once again, an alias. We can use this alias to refer to this communication channel in an easier way.
- The binding, as you already guessed, takes place in the remote agent, but it actually returns a value, which is the address the agent binded to. This address is serialized back to us so we can use it to connect other agents to it.
The next interesting line of code is the one in which Bob connects to Alice:
>>> bob.connect(addr, handler=log_message)
There are two things to remark in here:
- Calling
connect()
from an agent requires, first, an address. This address is, in this case, the one we got after binding Alice. This method will automatically select the appropriate communication pattern to connect to this pattern ('PULL'
in this case). - Bob will be receiving messages from Alice, so we must set a handler function that will be executed when a message from Alice is received. This handler will be serialized and stored in the remote agent to be executed there when needed.
The handler function, in its most basic form, accepts two parameters:
def handler(agent, message):
...
- The actual agent (can be named
self
as well, in an OOP way). - The message that is received.
In the example above, the handler simply logs the message received.
Request-Reply¶
Another common communication patter is the request-reply, in which a requester sends a message to the replier and expects always a reply back. It is sometimes useful, specially when some kind of synchronization is required.
from osbrain import run_agent
from osbrain import run_nameserver
def reply(agent, message):
return 'Received ' + str(message)
if __name__ == '__main__':
run_nameserver()
alice = run_agent('Alice')
bob = run_agent('Bob')
addr = alice.bind('REP', alias='main', handler=reply)
bob.connect(addr, alias='main')
for i in range(10):
bob.send('main', i)
reply = bob.recv('main')
print(reply)
The main difference with respect to the push-pull pattern is that, in this
case, Bob must run the recv
method in order to get the reply back from
Alice.
Note
Although the requester is not required to immediately await for the reply (i.e.: can do other stuff after sending the request and before receiving the response), it is required to receive a reply back before making another request through the same communication channel. Multiple requests can be made from the same agent as long as it uses different communication channels for each request.
Publish-Subscribe¶
One of the most useful communication patterns between agents is the publish and subscribe pattern. The publisher will send messages to all subscribed agents.
Here is an example in which Alice is the publisher and Bob and Eve subscribe to Alice. This way, when Alice sends a message, both Bob and Eve will receive it:
import time
from osbrain import run_agent
from osbrain import run_nameserver
def log_message(agent, message):
agent.log_info('Received: %s' % message)
if __name__ == '__main__':
# System deployment
run_nameserver()
alice = run_agent('Alice')
bob = run_agent('Bob')
eve = run_agent('Eve')
# System configuration
addr = alice.bind('PUB', alias='main')
bob.connect(addr, handler=log_message)
eve.connect(addr, handler=log_message)
# Send messages
while True:
time.sleep(1)
alice.send('main', 'Hello, all!')
Note the similarities between this example and the Sender-Receiver example.
The only differences are that Alice is now binding using the 'PUB'
pattern and that, instead of having just Bob connecting to Alice, we now
have Eve as well connecting to Alice.
This communication pattern allows for easy filtering. Refer to the Filtering section in the tutorial for more details.
Tutorial - 2¶
Repeated actions¶
Timers can be used to repeat an action after a period of time. To illustrate
this, let us modify the Push-Pull example a bit and make use of the
each
method:
from osbrain import run_agent
from osbrain import run_nameserver
def log_message(agent, message):
agent.log_info('Received: %s' % message)
def annoy(agent, say, more=None):
message = say if not more else say + ' ' + more + '!'
agent.send('annoy', message)
if __name__ == '__main__':
run_nameserver()
orange = run_agent('Orange')
apple = run_agent('Apple')
addr = orange.bind('PUSH', alias='annoy')
apple.connect(addr, handler=log_message)
# Multiple timers with parameters
orange.each(1., annoy, 'Hey')
orange.each(1.4142, annoy, 'Apple')
orange.each(3.1415, annoy, 'Hey', more='Apple')
Note that if an action takes longer to run than the time available before the next execution, the timer will simply fall behind.
OOP¶
Although the approach of using proxies for the whole configuration process is valid, sometimes the developer may prefer to use OOP to define the behavior of an agent.
This, of course, can be done with osBrain:
import time
from osbrain import run_agent
from osbrain import run_nameserver
from osbrain import Agent
class Greeter(Agent):
def on_init(self):
self.bind('PUSH', alias='main')
def hello(self, name):
self.send('main', 'Hello, %s!' % name)
def log_message(agent, message):
agent.log_info('Received: %s' % message)
if __name__ == '__main__':
# System deployment
run_nameserver()
alice = run_agent('Alice', base=Greeter)
bob = run_agent('Bob')
# System configuration
bob.connect(alice.addr('main'), handler=log_message)
# Send messages
while True:
time.sleep(1)
alice.hello('Bob')
Most of the code is similar to the one presented in the Push-Pull example, however you may notice some differences:
- When runing Alice, a new parameter
base
is passed to theosbrain.core.run_agent()
function. This means that, instead of running the default agent class, the user-defined agent class will be used instead. In this case, this class is namedGreeter
. - The
Greeter
class implements two methods:on_init()
: which is executed on initialization and will, in this case, simply bind a'PUSH'
communication channel.hello()
: which simply logs a Hello message when it is executed.
- When connecting Bob to Alice, we need the address where Alice binded
to. As the binding was executed on initialization, we need to use the
addr()
method, which will return the address associated to the alias passed as parameter (in the example above it ismain
).
Filtering¶
The publish-subscribe pattern is very useful, but it is also very powerful when combined with filtering.
Any time we publish a message from an agent, a topic can be specified. If a topic is specified, then only the agents that are subscribed to that topic will receive the message. This filtering is done in the publisher side, meaning that the network does not suffer from excessive message passing.
In the following example we have Alice publishing messages using topic
a
or b
at random. Then we have Bob subscribed to both topics, Eve
subscribed to topic a
only and Dave subscribed to topic b
only.
import time
import random
from osbrain import run_agent
from osbrain import run_nameserver
def log_a(agent, message):
agent.log_info('Log a: %s' % message)
def log_b(agent, message):
agent.log_info('Log b: %s' % message)
if __name__ == '__main__':
# System deployment
run_nameserver()
alice = run_agent('Alice')
bob = run_agent('Bob')
eve = run_agent('Eve')
dave = run_agent('Dave')
# System configuration
addr = alice.bind('PUB', alias='main')
bob.connect(addr, handler={'a': log_a, 'b': log_b})
eve.connect(addr, handler={'a': log_a})
dave.connect(addr, handler={'b': log_b})
# Send messages
while True:
time.sleep(1)
topic = random.choice(['a', 'b'])
message = 'Hello, %s!' % topic
alice.send('main', message, topic=topic)
Note how we can specify different handlers for different topics when subscribing agents.
Adding new methods¶
Note that proxies can not only be used to execute methods remotely in the agent, but they can also be used to add new methods or change already existing methods in the remote agent.
In the following example you can see how we can create a couple of functions that are then added to the remote agent as new methods.
In order to add new methods (or change current methods) we only need to call
set_method()
from the proxy.
from osbrain import run_agent
from osbrain import run_nameserver
def set_x(self, value):
self.x = value
def set_y(self, value):
self.y = value
def add_xy(self):
return self.x + self.y
if __name__ == '__main__':
# System deployment
run_nameserver()
agent = run_agent('Example')
# System configuration
agent.set_method(set_x, set_y, add=add_xy)
# Trying the new methods
agent.set_x(1)
agent.set_y(2)
print(agent.add())
Note that set_method()
accepts any number of parameters:
- In case they are not named parameters, the function names will be used as the method names in the remote agent.
- In case they are named parameters, then the method in the remote agent will be named after the parameter name.
Lambdas¶
osBrain uses dill for serialization when communicating with remote agents through a proxy. This means that almost anything can be serialized to an agent using a proxy.
In order to further simplify some tasks, lambda functions can be used to configure remote agents:
from osbrain import run_agent
from osbrain import run_nameserver
if __name__ == '__main__':
run_nameserver()
alice = run_agent('Alice')
bob = run_agent('Bob')
addr = alice.bind('REP', handler=lambda agent, msg: 'Received ' + str(msg))
bob.connect(addr, alias='main')
for i in range(10):
bob.send('main', i)
reply = bob.recv('main')
print(reply)
See the similarities between this example and the one showed in Request-Reply. In fact, the only difference is the binding from Alice, in which we are using a lambda function for the handler.
Reply early¶
The easiest way to reply to a request is to return a value from the handler, as seen in Request-Reply:
def reply(agent, message):
return 'Received ' + str(message)
However, an agent can reply earlier if needed:
def reply(agent, message):
agent.send('main', 'Received' + str(message)) # Reply now
agent.log_info('Already sent a reply back!') # Do some stuff later
Note how, in this case, we need to manually send the reply using the corresponding socket, though.
Shutting down¶
Although not covered in the examples until now (because many times you just want the multi-agent system to run forever until, perhaps, an event occurs), it is possible to actively kill the system using proxies:
import time
from osbrain import run_agent
from osbrain import run_nameserver
def tick(agent):
agent.log_info('tick')
if __name__ == '__main__':
ns = run_nameserver()
a0 = run_agent('Agent0')
a1 = run_agent('Agent1')
a0.each(1, tick)
a1.each(1, tick)
time.sleep(3)
a0.shutdown()
time.sleep(3)
ns.shutdown()
Note
Shutting down the nameserver will result in all agents registered in the name server being shut down as well.
Security¶
Warning
osBrain should be considered unsafe when used with remote machines. This package has some security risks. Understanding the risks is very important to avoid creating systems that are very easy to compromise by malicious entities.
Serialization in osBrain¶
osBrain uses pickle
module for serialization when passing messages
between agents and can use pickle
and dill
for
serialization when configuring and deploying the multi-agent architectures.
It is well known that using pickle or dill for this purpose is a security risk.
The main problem is that allowing a program to unpickle or undill arbitrary
data can cause arbitrary code execution and this may wreck or compromise your
system.
Network interface binding¶
By default osBrain binds every server on localhost, to avoid exposing things on a public network or over the internet by mistake. If you want to expose your osBrain agents to anything other than localhost, you have to explicitly tell osBrain the network interface address it should use. This means it is a conscious effort to expose agents to remote machines.
Protocol encryption¶
osBrain doesn’t encrypt the data it sends over the network. This means you must not transfer sensitive data on untrusted networks (especially user data, passwords, and such) because it is possible to eavesdrop. Either encrypt the data yourself before passing, or run osBrain over a secure network (VPN or SSH tunnel).
osBrain library API¶
This chapter describes osBrain’s library API. All osBrain classes and
functions are defined in sub packages such as osbrain.core
, but for
ease of use, the most important ones are also placed in the osbrain
package scope.
osbrain
— Main API package¶
osbrain
is the main package of osBrain. It imports most of the other
packages that it needs and provides shortcuts to the most frequently used
objects and functions from those packages. This means you can mostly just
import osbrain
in your code to start using osBrain.
The classes and functions provided are:
symbol in osbrain |
referenced location |
---|---|
|
osbrain.core.Agent |
|
osbrain.core.run_agent() |
See also
- Module
osbrain.core
- The core osBrain classes and functions.
- Module
osbrain.nameserver
- The osBrain name server logic.
osbrain.core
— osBrain core logic¶
Core agent classes.
-
class
osbrain.core.
Agent
(name=None, host=None)¶ Bases:
object
A base agent class which is to be served by an AgentProcess.
An AgentProcess runs a Pyro multiplexed server and serves one Agent object.
Parameters: - name : str, default is None
Name of the Agent.
- host : str, default is None
Host address where the agent will bind to. When not set, ‘127.0.0.1’ (localhost) is used.
Attributes: - name : str
Name of the agent.
- host : str
Host address where the agent is binding to.
- socket : dict
A dictionary in which the key is the address or the alias and the value is the actual socket.
- adddress : dict
A dictionary in which the key is the alias and the value is the actual address.
- handler : dict
A dictionary in which the key is the socket and the values are the handlers for each socket.
- poll_timeout : int
Polling timeout, in milliseconds. After this timeout, if no message is received, the agent executes de iddle() method before going back to polling.
- keep_alive : bool
When set to True, the agent will continue executing the main loop.
- running : bool
Set to True if the agent is running (executing the main loop).
Methods
addr
(alias)Parameters: bind
(kind[, alias, handler, host, port])Bind to an agent address. connect
(server_address[, alias, handler])Connect to a server agent address. each
(period, method, *args, **kwargs)Execute a repeated action with a defined period. handle_loopback
(message)Handle incoming messages in the loopback socket. iddle
()This function is to be executed when the agent is iddle. iterate
()Agent’s main iteration. log_debug
(message[, logger])Log a debug message. log_error
(message[, logger])Log an error message. log_info
(message[, logger])Log an info message. log_warning
(message[, logger])Log a warning message. loop
()Agent’s main loop. loopback
(header[, data])Send a message to the loopback socket. on_init
()This user-defined method is to be executed after initialization. ping
()A simple ping method testing purposes. raise_exception
()Raise an exception (for testing purposes). run
()Run the agent. safe
(method, *args, **kwargs)A safe call to a method. safe_ping
()A simple loopback ping for testing purposes. set_attr
(**kwargs)Set object attributes. set_logger
(logger[, alias])Connect the agent to a logger and start logging messages to it. set_loop
(loop)set_method
(*args, **kwargs)Set object methods. stop
()Stop the agent. subscribe
(alias, handlers)Subscribe the agent to another agent. test
()A test method to check the readiness of the agent. close_sockets execute get_attr kill recv register registered self_execute send send_recv set_handler shutdown str2bytes -
addr
(alias)¶ Parameters: - alias : str
Alias of the socket whose address is to be retreived.
Returns: - AgentAddress
Address of the agent socket associated with the alias.
-
bind
(kind, alias=None, handler=None, host=None, port=None)¶ Bind to an agent address.
Parameters: - kind : str, AgentAddressKind
The agent address kind: PUB, REQ…
- alias : str, default is None
Optional alias for the socket.
- handler, default is None
If the socket receives input messages, the handler/s is/are to be set with this parameter.
- host : str, default is None
The host to bind to, when not given self.host is taken as default.
- port : int, default is None
An optional port number. If not set, a random port is used for binding.
-
close_sockets
()¶
-
connect
(server_address, alias=None, handler=None)¶ Connect to a server agent address.
Parameters: - server_address : AgentAddress
Agent address to connect to.
- alias : str, default is None
Optional alias for the new address.
- handler, default is None
If the new socket receives input messages, the handler/s is/are to be set with this parameter.
-
each
(period, method, *args, **kwargs)¶ Execute a repeated action with a defined period.
Parameters: - period : float
Repeat the action execution with a delay of period seconds between executions.
- method
Method (action) to be executed by the agent.
- *args : tuple
Parameters to pass for the method execution.
- **kwargs : dict
Named parameters to pass for the method execution.
-
execute
(function, *args, **kwargs)¶
-
get_attr
(name)¶
-
handle_loopback
(message)¶ Handle incoming messages in the loopback socket.
-
iddle
()¶ This function is to be executed when the agent is iddle.
After a timeout occurs when the agent’s poller receives no data in any of its sockets, the agent may execute this function.
-
iterate
()¶ Agent’s main iteration.
This iteration is normally executed inside the main loop.
The agent is polling all its sockets for input data. It will wait for poll_timeout; after this period, the method iddle will be executed before polling again.
Returns: - int
1 if an error occurred during the iteration (we would expect this to happen if an interruption occurs during polling).
0 otherwise.
-
kill
()¶
-
log_debug
(message, logger='_logger')¶ Log a debug message.
Parameters: - message : str
Message to log.
- logger : str
Alias of the logger.
-
log_error
(message, logger='_logger')¶ Log an error message.
Parameters: - message : str
Message to log.
- logger : str
Alias of the logger.
-
log_info
(message, logger='_logger')¶ Log an info message.
Parameters: - message : str
Message to log.
- logger : str
Alias of the logger.
-
log_warning
(message, logger='_logger')¶ Log a warning message.
Parameters: - message : str
Message to log.
- logger : str
Alias of the logger.
-
loop
()¶ Agent’s main loop.
This loop is executed until the keep_alive attribute is False or until an error occurs.
-
loopback
(header, data=None)¶ Send a message to the loopback socket.
-
on_init
()¶ This user-defined method is to be executed after initialization.
-
ping
()¶ A simple ping method testing purposes.
-
raise_exception
()¶ Raise an exception (for testing purposes).
-
recv
(address)¶
-
register
(socket, address, alias=None, handler=None)¶
-
registered
(address)¶
-
run
()¶ Run the agent.
-
safe
(method, *args, **kwargs)¶ A safe call to a method.
A safe call is simply sent to be executed by the main thread.
Parameters: - method : str
Method name to be executed by the main thread.
- *args : arguments
Method arguments.
- *kwargs : keyword arguments
Method keyword arguments.
-
safe_ping
()¶ A simple loopback ping for testing purposes.
-
self_execute
(function, *args, **kwargs)¶
-
send
(address, message, topic='')¶
-
send_recv
(address, message)¶
-
set_attr
(**kwargs)¶ Set object attributes.
Parameters: - kwargs : [name, value]
Keyword arguments will be used to set the object attributes.
-
set_handler
(socket, handler)¶
-
set_logger
(logger, alias='_logger')¶ Connect the agent to a logger and start logging messages to it.
-
set_loop
(loop)¶
-
set_method
(*args, **kwargs)¶ Set object methods.
Parameters: - args : [function]
New methods will be created for each function, taking the same name as the original function.
- kwargs : [name, function]
New methods will be created for each function, taking the name specified by the parameter.
Returns: - str
Name of the registered method in the agent.
-
shutdown
()¶
-
stop
()¶ Stop the agent. Agent will stop running.
-
str2bytes
(message)¶
-
subscribe
(alias, handlers)¶ Subscribe the agent to another agent.
Parameters: - alias : str
Alias of the new subscriber socket.
- handlers : dict
A dictionary in which the keys represent the different topics and the values the actual handlers. If ,instead of a dictionary, a single handler is given, it will be used to subscribe the agent to any topic.
-
test
()¶ A test method to check the readiness of the agent. Used for testing purposes, where timing is very important. Do not remove.
-
class
osbrain.core.
AgentProcess
(name, nsaddr=None, addr=None, base=<class 'osbrain.core.Agent'>)¶ Bases:
multiprocessing.context.Process
Agent class. Instances of an Agent are system processes which can be run independently.
Attributes: - authkey
daemon
Return whether process is a daemon
exitcode
Return exit code of process or None if it has yet to stop
ident
Return identifier (PID) of process or None if it has yet to start
- name
pid
Return identifier (PID) of process or None if it has yet to start
sentinel
Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.
Methods
is_alive
()Return whether process is alive join
([timeout])Wait until child process terminates run
()Method to be run in sub-process; can be overridden in sub-class sigint_handler
(signal, frame)Handle interruption signals. start
()Start child process terminate
()Terminate process; sends SIGTERM signal or uses TerminateProcess() kill -
kill
()¶
-
run
()¶ Method to be run in sub-process; can be overridden in sub-class
-
sigint_handler
(signal, frame)¶ Handle interruption signals.
-
start
()¶ Start child process
-
osbrain.core.
run_agent
(name, nsaddr=None, addr=None, base=<class 'osbrain.core.Agent'>)¶ Ease the agent creation process.
This function will create a new agent, start the process and then run its main loop through a proxy.
Parameters: - name : str
Agent name or alias.
- nsaddr : SocketAddress, default is None
Name server address.
- addr : SocketAddress, default is None
New agent address, if it is to be fixed.
Returns: - proxy
A proxy to the new agent.
osbrain.address
— osBrain address logic¶
Implementation of address-related features.
-
class
osbrain.address.
AgentAddress
(host, port, kind=None, role=None)¶ Bases:
osbrain.address.SocketAddress
Agent address information consisting on the host, port, kind and role.
Parameters: - host : str
Agent host.
- port : int
Agent port.
- kind : int, str, AgentAddressKind
Agent kind.
- role : str, AgentAddressRole
Agent role.
Attributes: - host : str
Agent host.
- port : int
Agent port.
- kind : AgentAddressKind
Agent kind.
- role : AgentAddressRole
Agent role.
Methods
socket_addr
()Returns: twin
()Returns: -
socket_addr
()¶ Returns: - SocketAddress
Agent address as a SocketAddress object. This means kind and role information are lost.
-
twin
()¶ Returns: - AgentAddress
The twin address of the current one; while the host and port are kept for the twin, the kind and role change to their corresponding twins, according to the rules defined in the respective classes.
-
class
osbrain.address.
AgentAddressKind
¶ Bases:
int
Agent’s address kind class. It can be any ZMQ type (‘REP’, ‘PUB’…).
Inherits from int to be compatible with ZMQ definitions, however, it is represented in its string form. The equivalence can also be evaluated against its string form.
Attributes: denominator
the denominator of a rational number in lowest terms
imag
the imaginary part of a complex number
numerator
the numerator of a rational number in lowest terms
real
the real part of a complex number
Methods
bit_length
()Number of bits necessary to represent self in binary. conjugate
Returns self, the complex conjugate of any int. from_bytes
(bytes, byteorder, *[, signed])Return the integer represented by the given array of bytes. requires_handler
()Returns: to_bytes
(length, byteorder, *[, signed])Return an array of bytes representing an integer. twin
()Returns: -
ZMQ_KIND_TWIN
= {1: 2, 2: 1, 3: 4, 4: 3, 7: 8, 8: 7}¶
-
ZMQ_STR_CONVERSION
= {1: 'PUB', 2: 'SUB', 3: 'REQ', 4: 'REP', 'REP': 4, 7: 'PULL', 8: 'PUSH', 'SUB': 2, 'REQ': 3, 'PUB': 1, 'PUSH': 8, 'PULL': 7}¶
-
key
= 'PULL'¶
-
keys
= ['SUB', 'REP', 'PUB', 'REQ', 'PUSH', 'PULL']¶
-
requires_handler
()¶ Returns: - bool
Whether the Agent’s address kind requires a handler or not. A socket which processes incoming messages would require a handler (i.e. ‘REP’, ‘PULL’, ‘SUB’…).
-
twin
()¶ Returns: - AgentAddressKind
The twin kind of the current one; REQ would be the twin of REP and viceversa, PUB would be the twin of SUB and viceversa, etc.
-
class
osbrain.address.
AgentAddressRole
¶ Bases:
str
Agent’s address role class. It can either be ‘server’ or ‘client’.
Methods
capitalize
()Return a capitalized version of S, i.e. casefold
()Return a version of S suitable for caseless comparisons. center
(width[, fillchar])Return S centered in a string of length width. count
(sub[, start[, end]])Return the number of non-overlapping occurrences of substring sub in string S[start:end]. encode
([encoding, errors])Encode S using the codec registered for encoding. endswith
(suffix[, start[, end]])Return True if S ends with the specified suffix, False otherwise. expandtabs
([tabsize])Return a copy of S where all tab characters are expanded using spaces. find
(sub[, start[, end]])Return the lowest index in S where substring sub is found, such that sub is contained within S[start:end]. format
(*args, **kwargs)Return a formatted version of S, using substitutions from args and kwargs. format_map
(mapping)Return a formatted version of S, using substitutions from mapping. index
(sub[, start[, end]])Like S.find() but raise ValueError when the substring is not found. isalnum
()Return True if all characters in S are alphanumeric and there is at least one character in S, False otherwise. isalpha
()Return True if all characters in S are alphabetic and there is at least one character in S, False otherwise. isdecimal
()Return True if there are only decimal characters in S, False otherwise. isdigit
()Return True if all characters in S are digits and there is at least one character in S, False otherwise. isidentifier
()Return True if S is a valid identifier according to the language definition. islower
()Return True if all cased characters in S are lowercase and there is at least one cased character in S, False otherwise. isnumeric
()Return True if there are only numeric characters in S, False otherwise. isprintable
()Return True if all characters in S are considered printable in repr() or S is empty, False otherwise. isspace
()Return True if all characters in S are whitespace and there is at least one character in S, False otherwise. istitle
()Return True if S is a titlecased string and there is at least one character in S, i.e. isupper
()Return True if all cased characters in S are uppercase and there is at least one cased character in S, False otherwise. join
(iterable)Return a string which is the concatenation of the strings in the iterable. ljust
(width[, fillchar])Return S left-justified in a Unicode string of length width. lower
()Return a copy of the string S converted to lowercase. lstrip
([chars])Return a copy of the string S with leading whitespace removed. maketrans
(x[, y, z])Return a translation table usable for str.translate(). partition
(sep)Search for the separator sep in S, and return the part before it, the separator itself, and the part after it. replace
(old, new[, count])Return a copy of S with all occurrences of substring old replaced by new. rfind
(sub[, start[, end]])Return the highest index in S where substring sub is found, such that sub is contained within S[start:end]. rindex
(sub[, start[, end]])Like S.rfind() but raise ValueError when the substring is not found. rjust
(width[, fillchar])Return S right-justified in a string of length width. rpartition
(sep)Search for the separator sep in S, starting at the end of S, and return the part before it, the separator itself, and the part after it. rsplit
([sep, maxsplit])Return a list of the words in S, using sep as the delimiter string, starting at the end of the string and working to the front. rstrip
([chars])Return a copy of the string S with trailing whitespace removed. split
([sep, maxsplit])Return a list of the words in S, using sep as the delimiter string. splitlines
([keepends])Return a list of the lines in S, breaking at line boundaries. startswith
(prefix[, start[, end]])Return True if S starts with the specified prefix, False otherwise. strip
([chars])Return a copy of the string S with leading and trailing whitespace removed. swapcase
()Return a copy of S with uppercase characters converted to lowercase and vice versa. title
()Return a titlecased version of S, i.e. translate
(table)Return a copy of the string S in which each character has been mapped through the given translation table. twin
()Returns: upper
()Return a copy of S converted to uppercase. zfill
(width)Pad a numeric string S with zeros on the left, to fill a field of the specified width. -
twin
()¶ Returns: - AgentAddressRole
The twin role of the current one; server would be the twin of client and viceversa.
-
-
class
osbrain.address.
SocketAddress
(host, port)¶ Bases:
object
Socket address information consisting on the host and port.
Parameters: - host : str
Agent host.
- port : int
Agent port.
Attributes: - host : str
Agent host.
- port : int
Agent port.
osbrain.common
— osBrain common logic¶
Miscellaneous utilities.
-
class
osbrain.common.
LogLevel
¶ Bases:
str
Identifies the log level: ERROR, WARNING, INFO, DEBUG.
Methods
capitalize
()Return a capitalized version of S, i.e. casefold
()Return a version of S suitable for caseless comparisons. center
(width[, fillchar])Return S centered in a string of length width. count
(sub[, start[, end]])Return the number of non-overlapping occurrences of substring sub in string S[start:end]. encode
([encoding, errors])Encode S using the codec registered for encoding. endswith
(suffix[, start[, end]])Return True if S ends with the specified suffix, False otherwise. expandtabs
([tabsize])Return a copy of S where all tab characters are expanded using spaces. find
(sub[, start[, end]])Return the lowest index in S where substring sub is found, such that sub is contained within S[start:end]. format
(*args, **kwargs)Return a formatted version of S, using substitutions from args and kwargs. format_map
(mapping)Return a formatted version of S, using substitutions from mapping. index
(sub[, start[, end]])Like S.find() but raise ValueError when the substring is not found. isalnum
()Return True if all characters in S are alphanumeric and there is at least one character in S, False otherwise. isalpha
()Return True if all characters in S are alphabetic and there is at least one character in S, False otherwise. isdecimal
()Return True if there are only decimal characters in S, False otherwise. isdigit
()Return True if all characters in S are digits and there is at least one character in S, False otherwise. isidentifier
()Return True if S is a valid identifier according to the language definition. islower
()Return True if all cased characters in S are lowercase and there is at least one cased character in S, False otherwise. isnumeric
()Return True if there are only numeric characters in S, False otherwise. isprintable
()Return True if all characters in S are considered printable in repr() or S is empty, False otherwise. isspace
()Return True if all characters in S are whitespace and there is at least one character in S, False otherwise. istitle
()Return True if S is a titlecased string and there is at least one character in S, i.e. isupper
()Return True if all cased characters in S are uppercase and there is at least one cased character in S, False otherwise. join
(iterable)Return a string which is the concatenation of the strings in the iterable. ljust
(width[, fillchar])Return S left-justified in a Unicode string of length width. lower
()Return a copy of the string S converted to lowercase. lstrip
([chars])Return a copy of the string S with leading whitespace removed. maketrans
(x[, y, z])Return a translation table usable for str.translate(). partition
(sep)Search for the separator sep in S, and return the part before it, the separator itself, and the part after it. replace
(old, new[, count])Return a copy of S with all occurrences of substring old replaced by new. rfind
(sub[, start[, end]])Return the highest index in S where substring sub is found, such that sub is contained within S[start:end]. rindex
(sub[, start[, end]])Like S.rfind() but raise ValueError when the substring is not found. rjust
(width[, fillchar])Return S right-justified in a string of length width. rpartition
(sep)Search for the separator sep in S, starting at the end of S, and return the part before it, the separator itself, and the part after it. rsplit
([sep, maxsplit])Return a list of the words in S, using sep as the delimiter string, starting at the end of the string and working to the front. rstrip
([chars])Return a copy of the string S with trailing whitespace removed. split
([sep, maxsplit])Return a list of the words in S, using sep as the delimiter string. splitlines
([keepends])Return a list of the lines in S, breaking at line boundaries. startswith
(prefix[, start[, end]])Return True if S starts with the specified prefix, False otherwise. strip
([chars])Return a copy of the string S with leading and trailing whitespace removed. swapcase
()Return a copy of S with uppercase characters converted to lowercase and vice versa. title
()Return a titlecased version of S, i.e. translate
(table)Return a copy of the string S in which each character has been mapped through the given translation table. upper
()Return a copy of S converted to uppercase. zfill
(width)Pad a numeric string S with zeros on the left, to fill a field of the specified width.
-
osbrain.common.
address_to_host_port
(addr=None)¶ Try to convert a string or SocketAddress to a (host, port) tuple.
Parameters: - addr : str, SocketAddress
Returns: - tuple
A (host, port) tuple formed with the corresponding data.
-
osbrain.common.
periodic
(scheduler, interval, action, args=())¶ Run a scheduler periodically.
This function will run forever and blocking.
Parameters: - scheduler : sched.scheduler
Scheduler to run.
- interval : numeric
Delay to apply to the scheduler.
- action
Action to execute by the scheduler.
- args, default is ()
Arguments for the action.
-
osbrain.common.
repeat
(interval, action, args=())¶ Repeat an action forever after a given number of seconds.
If a sequence of events takes longer to run than the time available before the next event, the repeater will simply fall behind.
This function is executed in a separate thread.
Parameters: - interval : float
Number of seconds between executions.
- action
To be taken after the interval.
- args : tuple, default is ()
Arguments for the action.
Returns: - threading.Thread
Thread running the repeat task.
-
osbrain.common.
unbound_method
(method)¶ Returns: - function
Unbounded function.
osbrain.logging
— osBrain logging logic¶
Implementation of logging-related features.
-
class
osbrain.logging.
Logger
(name=None, host=None)¶ Bases:
osbrain.core.Agent
Specialized Agent for logging. Binds a SUB socket and starts logging incoming messages.
Methods
addr
(alias)Parameters: bind
(kind[, alias, handler, host, port])Bind to an agent address. connect
(server_address[, alias, handler])Connect to a server agent address. each
(period, method, *args, **kwargs)Execute a repeated action with a defined period. handle_loopback
(message)Handle incoming messages in the loopback socket. iddle
()This function is to be executed when the agent is iddle. iterate
()Agent’s main iteration. log_debug
(message[, logger])Log a debug message. log_error
(message[, logger])Log an error message. log_handler
(message, topic)Handle incoming log messages. log_info
(message[, logger])Log an info message. log_warning
(message[, logger])Log a warning message. loop
()Agent’s main loop. loopback
(header[, data])Send a message to the loopback socket. on_init
()This user-defined method is to be executed after initialization. ping
()A simple ping method testing purposes. raise_exception
()Raise an exception (for testing purposes). run
()Run the agent. safe
(method, *args, **kwargs)A safe call to a method. safe_ping
()A simple loopback ping for testing purposes. set_attr
(**kwargs)Set object attributes. set_logger
(logger[, alias])Connect the agent to a logger and start logging messages to it. set_loop
(loop)set_method
(*args, **kwargs)Set object methods. stop
()Stop the agent. subscribe
(alias, handlers)Subscribe the agent to another agent. test
()A test method to check the readiness of the agent. close_sockets execute get_attr kill recv register registered self_execute send send_recv set_handler shutdown str2bytes -
log_handler
(message, topic)¶ Handle incoming log messages.
-
on_init
()¶ This user-defined method is to be executed after initialization.
-
-
osbrain.logging.
pyro_log
()¶ Set environment variables to activate Pyro logging. The log level is set to “DEBUG”.
-
osbrain.logging.
run_logger
(name, nsaddr=None, addr=None, base=<class 'osbrain.logging.Logger'>)¶ Ease the logger creation process.
This function will create a new logger, start the process and then run its main loop through a proxy.
Parameters: - name : str
Logger name or alias.
- nsaddr : SocketAddress, default is None
Name server address.
- addr : SocketAddress, default is None
New logger address, if it is to be fixed.
Returns: - proxy
A proxy to the new logger.
osbrain.proxy
— osBrain proxy logic¶
Implementation of proxy-related features.
-
class
osbrain.proxy.
NSProxy
(nsaddr=None, timeout=3)¶ Bases:
Pyro4.core.Proxy
A proxy to access a name server.
Parameters: - nsaddr : SocketAddress, str
Name server address.
- timeout : float
Timeout, in seconds, to wait until the name server is discovered.
Methods
addr
([agent_alias, address_alias])Return the name server address or the address of an agent’s socket. proxy
(name[, timeout])Get a proxy to access an agent registered in the name server. release
()Release the connection to the Pyro daemon. shutdown
()Shutdown the name server. shutdown_agents
([timeout])Shutdown all agents registered in the name server. -
addr
(agent_alias=None, address_alias=None)¶ Return the name server address or the address of an agent’s socket.
Parameters: - agent_alias : str, default is None
The alias of the agent to retrieve its socket address.
- address_alias : str, default is None
The alias of the socket address to retrieve from the agent.
Returns: - SocketAddress or AgentAddress
The name server or agent’s socket address.
-
proxy
(name, timeout=3.0)¶ Get a proxy to access an agent registered in the name server.
Parameters: - name : str
Proxy name, as registered in the name server.
- timeout : float
Timeout, in seconds, to wait until the agent is discovered.
Returns: - Proxy
A proxy to access an agent registered in the name server.
-
release
()¶ Release the connection to the Pyro daemon.
-
shutdown
()¶ Shutdown the name server. All agents will be shutdown as well.
-
shutdown_agents
(timeout=3.0)¶ Shutdown all agents registered in the name server.
Parameters: - timeout : float, default is 3.
Timeout, in seconds, to wait for the agents to shutdown.
-
class
osbrain.proxy.
Proxy
(name, nsaddr=None, timeout=3.0)¶ Bases:
Pyro4.core.Proxy
A proxy to access remote agents.
Parameters: - name : str
Proxy name, as registered in the name server.
- nsaddr : SocketAddress, str
Name server address.
- timeout : float
Timeout, in seconds, to wait until the agent is discovered.
Methods
pyro_addr
()Returns: release
()Release the connection to the Pyro daemon. -
pyro_addr
()¶ Returns: - SocketAddress
The socket address of the Pyro server.
-
release
()¶ Release the connection to the Pyro daemon.
-
osbrain.proxy.
locate_ns
(nsaddr, timeout=3)¶ Locate a name server (ping) to ensure it actually exists.
Parameters: - nsaddr : SocketAddress
The address where the name server should be up and running.
- timeout : float
Timeout in seconds before aborting location.
Returns: - nsaddr
The address where the name server was located.
Raises: - NamingError
If the name server could not be located.
osbrain.nameserver
— osBrain nameserver logic¶
Implementation of name server.
-
class
osbrain.nameserver.
NameServer
(*args, **kwargs)¶ Bases:
Pyro4.naming.NameServer
Methods
agents
()List agents registered in the name server. async_shutdown
()Shutdown the name server. async_shutdown_agents
()Shutdown all agents registered in the name server. list
([prefix, regex, metadata_all, …])Retrieve the registered items as a dictionary name-to-URI. lookup
(name[, return_metadata])Lookup the given name, returns an URI if found. ping
()A simple test method to check if the name server is running correctly. register
(name, uri[, safe, metadata])Register a name with an URI. remove
([name, prefix, regex])Remove a registration. set_metadata
(name, metadata)update the metadata for an existing registration count -
agents
()¶ List agents registered in the name server.
-
async_shutdown
()¶ Shutdown the name server. All agents will be shutdown as well.
-
async_shutdown_agents
()¶ Shutdown all agents registered in the name server.
-
ping
()¶ A simple test method to check if the name server is running correctly.
-
-
class
osbrain.nameserver.
NameServerProcess
(addr=None)¶ Bases:
multiprocessing.context.Process
Name server class. Instances of a name server are system processes which can be run independently.
Attributes: - authkey
daemon
Return whether process is a daemon
exitcode
Return exit code of process or None if it has yet to stop
ident
Return identifier (PID) of process or None if it has yet to start
- name
pid
Return identifier (PID) of process or None if it has yet to start
sentinel
Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.
Methods
agents
()List agents registered in the name server. is_alive
()Return whether process is alive join
([timeout])Wait until child process terminates run
()Method to be run in sub-process; can be overridden in sub-class shutdown
()Shutdown the name server. shutdown_all
()Shutdown all agents registered in the name server. sigint_handler
(signal, frame)Handle interruption signals. start
()Start child process terminate
()Terminate process; sends SIGTERM signal or uses TerminateProcess() -
agents
()¶ List agents registered in the name server.
-
run
()¶ Method to be run in sub-process; can be overridden in sub-class
-
shutdown
()¶ Shutdown the name server. All agents will be shutdown as well.
-
shutdown_all
()¶ Shutdown all agents registered in the name server.
-
sigint_handler
(signal, frame)¶ Handle interruption signals.
-
start
()¶ Start child process
-
osbrain.nameserver.
random_nameserver
()¶ Start a random name server.
Returns: - SocketAddress
The name server address.
-
osbrain.nameserver.
run_nameserver
(addr=None)¶ Ease the name server creation process.
This function will create a new nameserver, start the process and then run its main loop through a proxy.
Parameters: - addr : SocketAddress, default is None
Name server address.
Returns: - proxy
A proxy to the name server.
Software License and Disclaimer¶
Copyright 2016 Open Sistemas de Información Internet S.L.Licensed under the Apache License, Version 2.0 (the “License”);you may not use this file except in compliance with the License.You may obtain a copy of the License atUnless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an “AS IS” BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License.