osBrain - 0.3.0

osBrain logo

What is osBrain?

osBrain is a general-purpose multi-agent system module written in Python. Agents run independently as system processes and communicate with each other using message passing.

osBrain uses ØMQ for efficient and flexible messsage passing between agents. It also uses Pyro4 to ease the configuration and deployment of complex systems.

Please read the Software License and Disclaimer.

Contents

Introduction and Example

osBrain logo

About osBrain: feature overview

osBrain is a general-purpose multi-agent system module written in Python.

  • Agents run independently as system processes and communicate with each other using message passing.
  • Message passing is implemented using ØMQ, and in particular, the PyZMQ Python bindings.
  • ØMQ allows for efficient, asynchronous communication using different commonly used communication patterns such as request-reply, push-pull and publish-subscribe.
  • osBrain integrates Pyro4 to ease the configuration and deployment of complex systems.
  • Thanks to Pyro4, remote agents can be treated as local objects and reconfigured even when they are running. Not just variables, but also new methods can be created in the remote agents.
  • osBrain provides the base for implementing robust, highly-available, flexible multi-agent systems.
  • Being implemented in Python, osBrain can take advantage of a huge set of packages for data analysis, statistics, numerical computing, etc. available in the Python ecosystem.

In order to fully understand osBrain capabilities, it is highly recommended to read the Pyro4 documentation and the ØMQ guide.

OsBrain’s history

osBrain was initially developed in OpenSistemas based on the need to create a real-time automated-trading platform. This platform needed to be able to process real-time market data updates fast and in a parallel way. Robustness was very important as well in order to prevent running trading strategies from being affected by a failure in another strategy.

Python was chosen for being a great language for fast prototyping and for having a huge data analysis ecosystem available. It was kept for its final performance and the beautiful source code created with it.

The appearance of osBrain was a consecuence of a series of steps that were taken during the development process:

  1. Isolation of agents; creating separate system processes to avoid shared memory and any problems derived from multi-threading development.
  2. Implementation of message passing; making use of the modern, efficient and flexible ØMQ library.
  3. Ease of configuration/deployment; making use of the very convenient, well implemented and documented Pyro4 package.
  4. Separation from the trading platform; what it started as a basic architecture to implement a real-time automated-trading platform, ended-up being a general-purpose multi-agent system architecture.

What can you use osBrain for?

osBrain has been successfully used to develop a real-time automated-trading platform in OpenSistemas, but being a general-purpose multi-agent system, it is not limited to this application. Other applications include:

  • Transportation.
  • Logistics.
  • Defense and military applications.
  • Networking.
  • Load balancing.
  • Self-healing networks.

In general, osBrain can be used whenever a multi-agent system architecture fits the application well:

  • Autonomy of the agents.
  • Local views.
  • Decentralization.

Simple Example

In the following example we run a very simple architecture in three steps:

  1. Run a name server, which will register agents running by their alias.
  2. Run an agent with an alias Example.
  3. Log a Hello world message from the agent.
from osbrain import run_nameserver
from osbrain import run_agent


if __name__ == '__main__':

    # System deployment
    run_nameserver()
    agent = run_agent('Example')

    # Log a message
    agent.log_info('Hello world!')

It is important to note that the agent variable in that example is just what is called a proxy to the remote agent so, when log_info() is called, this method is actually serialized and executed in the remote agent. Remember that the agent is running as a separate system process!

Performance

The performance of osBrain, just as the performance of any other system architecture, depends a lot on the actual application. The developer should always take this into account:

  1. Pyro4 is used only for configuration and deployment, which means that the actual performance of the system does not depend on this package.
  2. ØMQ is used with the PyZMQ Python bindings, which means that the system performance depends on the PyZMQ performance.
  3. osBrain uses pickle for serialization, which means that the system performance depends on this package.
  4. ØMQ transport is TCP by default. The network may have a great impact on performance.

Tutorial - 1

Installation

This tutorial is a step-by-step introduction to osBrain with examples. In order to start playing with this module, you only need to install it.

osBrain requires Python 3. Most probably, Python 3 is already packaged for your favorite distribution (and maybe even installed by default in your system). If you do not have Python 3 available, consider using Conda to create a virtual environment with Python 3.

Installing osBrain is very simple with pip:

pip install osbrain

You should now be able to import osbrain from a python console:

>>> import osbrain

Hello world

The first example is, of course, a simple hello world! program. Three steps are taken here:

  1. Run a name server.
  2. Run an agent with an alias Example.
  3. Log a Hello world message from the agent.
from osbrain import run_nameserver
from osbrain import run_agent


if __name__ == '__main__':

    # System deployment
    run_nameserver()
    agent = run_agent('Example')

    # Log a message
    agent.log_info('Hello world!')

Running this example from your terminal should simply show you a log message saying Hello world! but, what exactly is happening there?

Agents and proxies

An agent, in osBrain, is an entity that runs independly from other agents in the system. An agent, by default, will simply poll for incoming messages before executing the code defined by the developer. This means a single agent, as in the Hello world! example, makes little or no sense. Agents in a multi-agent system start to make sense when connected to each other.

The easiest way to run an agent in an osBrain architecture is by calling the function osbrain.core.run_agent():

>>> agent = run_agent(...)

This function will spawn a new agent and will return a osbrain.core.Proxy to it.

Proxies are simply local objects that allow us to easily have access to the remote agent. The fact that agents are run independently from each other justifies the need of a proxy.

A proxy allows us to call methods or access attributes of the remote agent in a very convenient way. See for example the previous call:

>>> agent.log_info('Hello world')

The method log_info() is implemented in osbrain.core.Agent so, when this method is called from the proxy, this call is actually being serialized to the remote running agent and gets executed there. The return value, if any, is then serialized back and returned by the proxy. So basically so get the impression of being working with a local object while your code is executed remotelly.

The name server

A name server is just like another agent, so it runs independently, but with a very specific role. Name servers are used as an address book. This means other agents can be run in the system and can be registered in the name server using a human-readable alias. Aliases help us accessing these agents easily even from remote locations.

Note that when calling the osbrain.core.run_agent() function, we are passing a string parameter. This parameter is the alias the agent will use to register itself in the name server.

When we run a name server calling the osbrain.core.run_nameserver(), we also get in return a proxy to this name server:

>>> ns = run_nameserver()

This proxy can be used to list the agents registered in the name server:

from osbrain import run_nameserver
from osbrain import run_agent


if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    run_agent('Agent0')
    run_agent('Agent1')
    run_agent('Agent2')

    # Show agents registered in the name server
    for alias in ns.agents():
        print(alias)

The code above should simply print the aliases of all the agents registered in the name server.

A name server proxy can also be used to create proxies to registered agents. This is specially useful when accessing the multi-agent system from a different console or location, as it will reduce the number of addresses that we need to remember.

from osbrain import run_nameserver
from osbrain import run_agent


if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    run_agent('Agent0')
    run_agent('Agent1')
    run_agent('Agent2')

    # Create a proxy to Agent1 and log a message
    agent = ns.proxy('Agent1')
    agent.log_info('Hello world!')

The code above creates (and registers) three different agents in a name server and then creates, through the name server proxy, a proxy to one of those agents simply using its alias. Then it uses the agent proxy to remotelly call a method to log a Hello world! message.

Push-Pull

Now that we understand the basics of how proxies, agents and name servers work, let us jump into a more interesting example.

As mentioned before, a multi-agent system only makes sense if agents are connected with each other and share some information using message passing.

In this first example, we will create two agents: Alice and Bob, and we will make alice send messages to Bob using a simple push-pull communication pattern.

import time
from osbrain import run_agent
from osbrain import run_nameserver


def log_message(agent, message):
    agent.log_info('Received: %s' % message)


if __name__ == '__main__':

    # System deployment
    run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')

    # System configuration
    addr = alice.bind('PUSH', alias='main')
    bob.connect(addr, handler=log_message)

    # Send messages
    while True:
        time.sleep(1)
        alice.send('main', 'Hello, Bob!')

So, in this case, we are doing some more stuff. After we spawn Alice and Bob, we connect them.

First, we make Alice bind:

>>> addr = alice.bind('PUSH', alias='main')

There are three things to remark in that line:

  1. The first parameter 'PUSH' represents the communication pattern we want to use. In this case we are using a simple push-pull (unidirectional) pattern to allow Alice to send messages to Bob.
  2. The second parameter is, once again, an alias. We can use this alias to refer to this communication channel in an easier way.
  3. The binding, as you already guessed, takes place in the remote agent, but it actually returns a value, which is the address the agent binded to. This address is serialized back to us so we can use it to connect other agents to it.

The next interesting line of code is the one in which Bob connects to Alice:

>>> bob.connect(addr, handler=log_message)

There are two things to remark in here:

  1. Calling connect() from an agent requires, first, an address. This address is, in this case, the one we got after binding Alice. This method will automatically select the appropriate communication pattern to connect to this pattern ('PULL' in this case).
  2. Bob will be receiving messages from Alice, so we must set a handler function that will be executed when a message from Alice is received. This handler will be serialized and stored in the remote agent to be executed there when needed.

The handler function, in its most basic form, accepts two parameters:

def handler(agent, message):
    ...
  1. The actual agent (can be named self as well, in an OOP way).
  2. The message that is received.

In the example above, the handler simply logs the message received.

Request-Reply

Another common communication patter is the request-reply, in which a requester sends a message to the replier and expects always a reply back. It is sometimes useful, specially when some kind of synchronization is required.

from osbrain import run_agent
from osbrain import run_nameserver


def reply(agent, message):
    return 'Received ' + str(message)


if __name__ == '__main__':

    run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')

    addr = alice.bind('REP', alias='main', handler=reply)
    bob.connect(addr, alias='main')

    for i in range(10):
        bob.send('main', i)
        reply = bob.recv('main')
        print(reply)

The main difference with respect to the push-pull pattern is that, in this case, Bob must run the recv method in order to get the reply back from Alice.

Note

Although the requester is not required to immediately await for the reply (i.e.: can do other stuff after sending the request and before receiving the response), it is required to receive a reply back before making another request through the same communication channel. Multiple requests can be made from the same agent as long as it uses different communication channels for each request.

Publish-Subscribe

One of the most useful communication patterns between agents is the publish and subscribe pattern. The publisher will send messages to all subscribed agents.

Here is an example in which Alice is the publisher and Bob and Eve subscribe to Alice. This way, when Alice sends a message, both Bob and Eve will receive it:

import time
from osbrain import run_agent
from osbrain import run_nameserver


def log_message(agent, message):
    agent.log_info('Received: %s' % message)


if __name__ == '__main__':

    # System deployment
    run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')
    eve = run_agent('Eve')

    # System configuration
    addr = alice.bind('PUB', alias='main')
    bob.connect(addr, handler=log_message)
    eve.connect(addr, handler=log_message)

    # Send messages
    while True:
        time.sleep(1)
        alice.send('main', 'Hello, all!')

Note the similarities between this example and the Sender-Receiver example. The only differences are that Alice is now binding using the 'PUB' pattern and that, instead of having just Bob connecting to Alice, we now have Eve as well connecting to Alice.

This communication pattern allows for easy filtering. Refer to the Filtering section in the tutorial for more details.

Tutorial - 2

Repeated actions

Timers can be used to repeat an action after a period of time. To illustrate this, let us modify the Push-Pull example a bit and make use of the each method:

from osbrain import run_agent
from osbrain import run_nameserver


def log_message(agent, message):
    agent.log_info('Received: %s' % message)


def annoy(agent, say, more=None):
    message = say if not more else say + ' ' + more + '!'
    agent.send('annoy', message)


if __name__ == '__main__':

    run_nameserver()
    orange = run_agent('Orange')
    apple = run_agent('Apple')
    addr = orange.bind('PUSH', alias='annoy')
    apple.connect(addr, handler=log_message)

    # Multiple timers with parameters
    orange.each(1., annoy, 'Hey')
    orange.each(1.4142, annoy, 'Apple')
    orange.each(3.1415, annoy, 'Hey', more='Apple')

Note that if an action takes longer to run than the time available before the next execution, the timer will simply fall behind.

OOP

Although the approach of using proxies for the whole configuration process is valid, sometimes the developer may prefer to use OOP to define the behavior of an agent.

This, of course, can be done with osBrain:

import time
from osbrain import run_agent
from osbrain import run_nameserver
from osbrain import Agent


class Greeter(Agent):
    def on_init(self):
        self.bind('PUSH', alias='main')

    def hello(self, name):
        self.send('main', 'Hello, %s!' % name)


def log_message(agent, message):
    agent.log_info('Received: %s' % message)


if __name__ == '__main__':

    # System deployment
    run_nameserver()
    alice = run_agent('Alice', base=Greeter)
    bob = run_agent('Bob')

    # System configuration
    bob.connect(alice.addr('main'), handler=log_message)

    # Send messages
    while True:
        time.sleep(1)
        alice.hello('Bob')

Most of the code is similar to the one presented in the Push-Pull example, however you may notice some differences:

  1. When runing Alice, a new parameter base is passed to the osbrain.core.run_agent() function. This means that, instead of running the default agent class, the user-defined agent class will be used instead. In this case, this class is named Greeter.
  2. The Greeter class implements two methods:
    1. on_init(): which is executed on initialization and will, in this case, simply bind a 'PUSH' communication channel.
    2. hello(): which simply logs a Hello message when it is executed.
  3. When connecting Bob to Alice, we need the address where Alice binded to. As the binding was executed on initialization, we need to use the addr() method, which will return the address associated to the alias passed as parameter (in the example above it is main).

Filtering

The publish-subscribe pattern is very useful, but it is also very powerful when combined with filtering.

Any time we publish a message from an agent, a topic can be specified. If a topic is specified, then only the agents that are subscribed to that topic will receive the message. This filtering is done in the publisher side, meaning that the network does not suffer from excessive message passing.

In the following example we have Alice publishing messages using topic a or b at random. Then we have Bob subscribed to both topics, Eve subscribed to topic a only and Dave subscribed to topic b only.

import time
import random
from osbrain import run_agent
from osbrain import run_nameserver


def log_a(agent, message):
    agent.log_info('Log a: %s' % message)


def log_b(agent, message):
    agent.log_info('Log b: %s' % message)


if __name__ == '__main__':

    # System deployment
    run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')
    eve = run_agent('Eve')
    dave = run_agent('Dave')

    # System configuration
    addr = alice.bind('PUB', alias='main')
    bob.connect(addr, handler={'a': log_a, 'b': log_b})
    eve.connect(addr, handler={'a': log_a})
    dave.connect(addr, handler={'b': log_b})

    # Send messages
    while True:
        time.sleep(1)
        topic = random.choice(['a', 'b'])
        message = 'Hello, %s!' % topic
        alice.send('main', message, topic=topic)

Note how we can specify different handlers for different topics when subscribing agents.

Adding new methods

Note that proxies can not only be used to execute methods remotely in the agent, but they can also be used to add new methods or change already existing methods in the remote agent.

In the following example you can see how we can create a couple of functions that are then added to the remote agent as new methods.

In order to add new methods (or change current methods) we only need to call set_method() from the proxy.

from osbrain import run_agent
from osbrain import run_nameserver


def set_x(self, value):
    self.x = value


def set_y(self, value):
    self.y = value


def add_xy(self):
    return self.x + self.y


if __name__ == '__main__':

    # System deployment
    run_nameserver()
    agent = run_agent('Example')

    # System configuration
    agent.set_method(set_x, set_y, add=add_xy)

    # Trying the new methods
    agent.set_x(1)
    agent.set_y(2)
    print(agent.add())

Note that set_method() accepts any number of parameters:

  • In case they are not named parameters, the function names will be used as the method names in the remote agent.
  • In case they are named parameters, then the method in the remote agent will be named after the parameter name.

Lambdas

osBrain uses dill for serialization when communicating with remote agents through a proxy. This means that almost anything can be serialized to an agent using a proxy.

In order to further simplify some tasks, lambda functions can be used to configure remote agents:

from osbrain import run_agent
from osbrain import run_nameserver


if __name__ == '__main__':

    run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')

    addr = alice.bind('REP', handler=lambda agent, msg: 'Received ' + str(msg))
    bob.connect(addr, alias='main')

    for i in range(10):
        bob.send('main', i)
        reply = bob.recv('main')
        print(reply)

See the similarities between this example and the one showed in Request-Reply. In fact, the only difference is the binding from Alice, in which we are using a lambda function for the handler.

Reply early

The easiest way to reply to a request is to return a value from the handler, as seen in Request-Reply:

def reply(agent, message):
    return 'Received ' + str(message)

However, an agent can reply earlier if needed:

def reply(agent, message):
    agent.send('main', 'Received' + str(message))  # Reply now
    agent.log_info('Already sent a reply back!')   # Do some stuff later

Note how, in this case, we need to manually send the reply using the corresponding socket, though.

Shutting down

Although not covered in the examples until now (because many times you just want the multi-agent system to run forever until, perhaps, an event occurs), it is possible to actively kill the system using proxies:

import time

from osbrain import run_agent
from osbrain import run_nameserver


def tick(agent):
    agent.log_info('tick')


if __name__ == '__main__':

    ns = run_nameserver()
    a0 = run_agent('Agent0')
    a1 = run_agent('Agent1')

    a0.each(1, tick)
    a1.each(1, tick)
    time.sleep(3)

    a0.shutdown()
    time.sleep(3)

    ns.shutdown()

Note

Shutting down the nameserver will result in all agents registered in the name server being shut down as well.

Security

Warning

osBrain should be considered unsafe when used with remote machines. This package has some security risks. Understanding the risks is very important to avoid creating systems that are very easy to compromise by malicious entities.

Serialization in osBrain

osBrain uses pickle module for serialization when passing messages between agents and can use pickle and dill for serialization when configuring and deploying the multi-agent architectures. It is well known that using pickle or dill for this purpose is a security risk. The main problem is that allowing a program to unpickle or undill arbitrary data can cause arbitrary code execution and this may wreck or compromise your system.

Network interface binding

By default osBrain binds every server on localhost, to avoid exposing things on a public network or over the internet by mistake. If you want to expose your osBrain agents to anything other than localhost, you have to explicitly tell osBrain the network interface address it should use. This means it is a conscious effort to expose agents to remote machines.

Protocol encryption

osBrain doesn’t encrypt the data it sends over the network. This means you must not transfer sensitive data on untrusted networks (especially user data, passwords, and such) because it is possible to eavesdrop. Either encrypt the data yourself before passing, or run osBrain over a secure network (VPN or SSH tunnel).

osBrain library API

This chapter describes osBrain’s library API. All osBrain classes and functions are defined in sub packages such as osbrain.core, but for ease of use, the most important ones are also placed in the osbrain package scope.

osbrain — Main API package

osbrain is the main package of osBrain. It imports most of the other packages that it needs and provides shortcuts to the most frequently used objects and functions from those packages. This means you can mostly just import osbrain in your code to start using osBrain.

The classes and functions provided are:

symbol in osbrain referenced location
class osbrain.Agent
osbrain.core.Agent
osbrain.run_agent()
osbrain.core.run_agent()

See also

Module osbrain.core
The core osBrain classes and functions.
Module osbrain.nameserver
The osBrain name server logic.

osbrain.core — osBrain core logic

Core agent classes.

class osbrain.core.Agent(name=None, host=None)

Bases: object

A base agent class which is to be served by an AgentProcess.

An AgentProcess runs a Pyro multiplexed server and serves one Agent object.

Parameters:
name : str, default is None

Name of the Agent.

host : str, default is None

Host address where the agent will bind to. When not set, ‘127.0.0.1’ (localhost) is used.

Attributes:
name : str

Name of the agent.

host : str

Host address where the agent is binding to.

socket : dict

A dictionary in which the key is the address or the alias and the value is the actual socket.

adddress : dict

A dictionary in which the key is the alias and the value is the actual address.

handler : dict

A dictionary in which the key is the socket and the values are the handlers for each socket.

poll_timeout : int

Polling timeout, in milliseconds. After this timeout, if no message is received, the agent executes de iddle() method before going back to polling.

keep_alive : bool

When set to True, the agent will continue executing the main loop.

running : bool

Set to True if the agent is running (executing the main loop).

Methods

addr(alias)
Parameters:
bind(kind[, alias, handler, host, port]) Bind to an agent address.
connect(server_address[, alias, handler]) Connect to a server agent address.
each(period, method, *args, **kwargs) Execute a repeated action with a defined period.
handle_loopback(message) Handle incoming messages in the loopback socket.
iddle() This function is to be executed when the agent is iddle.
iterate() Agent’s main iteration.
log_debug(message[, logger]) Log a debug message.
log_error(message[, logger]) Log an error message.
log_info(message[, logger]) Log an info message.
log_warning(message[, logger]) Log a warning message.
loop() Agent’s main loop.
loopback(header[, data]) Send a message to the loopback socket.
on_init() This user-defined method is to be executed after initialization.
ping() A simple ping method testing purposes.
raise_exception() Raise an exception (for testing purposes).
run() Run the agent.
safe(method, *args, **kwargs) A safe call to a method.
safe_ping() A simple loopback ping for testing purposes.
set_attr(**kwargs) Set object attributes.
set_logger(logger[, alias]) Connect the agent to a logger and start logging messages to it.
set_loop(loop)
set_method(*args, **kwargs) Set object methods.
stop() Stop the agent.
subscribe(alias, handlers) Subscribe the agent to another agent.
test() A test method to check the readiness of the agent.
close_sockets  
execute  
get_attr  
kill  
recv  
register  
registered  
self_execute  
send  
send_recv  
set_handler  
shutdown  
str2bytes  
addr(alias)
Parameters:
alias : str

Alias of the socket whose address is to be retreived.

Returns:
AgentAddress

Address of the agent socket associated with the alias.

bind(kind, alias=None, handler=None, host=None, port=None)

Bind to an agent address.

Parameters:
kind : str, AgentAddressKind

The agent address kind: PUB, REQ…

alias : str, default is None

Optional alias for the socket.

handler, default is None

If the socket receives input messages, the handler/s is/are to be set with this parameter.

host : str, default is None

The host to bind to, when not given self.host is taken as default.

port : int, default is None

An optional port number. If not set, a random port is used for binding.

close_sockets()
connect(server_address, alias=None, handler=None)

Connect to a server agent address.

Parameters:
server_address : AgentAddress

Agent address to connect to.

alias : str, default is None

Optional alias for the new address.

handler, default is None

If the new socket receives input messages, the handler/s is/are to be set with this parameter.

each(period, method, *args, **kwargs)

Execute a repeated action with a defined period.

Parameters:
period : float

Repeat the action execution with a delay of period seconds between executions.

method

Method (action) to be executed by the agent.

*args : tuple

Parameters to pass for the method execution.

**kwargs : dict

Named parameters to pass for the method execution.

execute(function, *args, **kwargs)
get_attr(name)
handle_loopback(message)

Handle incoming messages in the loopback socket.

iddle()

This function is to be executed when the agent is iddle.

After a timeout occurs when the agent’s poller receives no data in any of its sockets, the agent may execute this function.

iterate()

Agent’s main iteration.

This iteration is normally executed inside the main loop.

The agent is polling all its sockets for input data. It will wait for poll_timeout; after this period, the method iddle will be executed before polling again.

Returns:
int

1 if an error occurred during the iteration (we would expect this to happen if an interruption occurs during polling).

0 otherwise.

kill()
log_debug(message, logger='_logger')

Log a debug message.

Parameters:
message : str

Message to log.

logger : str

Alias of the logger.

log_error(message, logger='_logger')

Log an error message.

Parameters:
message : str

Message to log.

logger : str

Alias of the logger.

log_info(message, logger='_logger')

Log an info message.

Parameters:
message : str

Message to log.

logger : str

Alias of the logger.

log_warning(message, logger='_logger')

Log a warning message.

Parameters:
message : str

Message to log.

logger : str

Alias of the logger.

loop()

Agent’s main loop.

This loop is executed until the keep_alive attribute is False or until an error occurs.

loopback(header, data=None)

Send a message to the loopback socket.

on_init()

This user-defined method is to be executed after initialization.

ping()

A simple ping method testing purposes.

raise_exception()

Raise an exception (for testing purposes).

recv(address)
register(socket, address, alias=None, handler=None)
registered(address)
run()

Run the agent.

safe(method, *args, **kwargs)

A safe call to a method.

A safe call is simply sent to be executed by the main thread.

Parameters:
method : str

Method name to be executed by the main thread.

*args : arguments

Method arguments.

*kwargs : keyword arguments

Method keyword arguments.

safe_ping()

A simple loopback ping for testing purposes.

self_execute(function, *args, **kwargs)
send(address, message, topic='')
send_recv(address, message)
set_attr(**kwargs)

Set object attributes.

Parameters:
kwargs : [name, value]

Keyword arguments will be used to set the object attributes.

set_handler(socket, handler)
set_logger(logger, alias='_logger')

Connect the agent to a logger and start logging messages to it.

set_loop(loop)
set_method(*args, **kwargs)

Set object methods.

Parameters:
args : [function]

New methods will be created for each function, taking the same name as the original function.

kwargs : [name, function]

New methods will be created for each function, taking the name specified by the parameter.

Returns:
str

Name of the registered method in the agent.

shutdown()
stop()

Stop the agent. Agent will stop running.

str2bytes(message)
subscribe(alias, handlers)

Subscribe the agent to another agent.

Parameters:
alias : str

Alias of the new subscriber socket.

handlers : dict

A dictionary in which the keys represent the different topics and the values the actual handlers. If ,instead of a dictionary, a single handler is given, it will be used to subscribe the agent to any topic.

test()

A test method to check the readiness of the agent. Used for testing purposes, where timing is very important. Do not remove.

class osbrain.core.AgentProcess(name, nsaddr=None, addr=None, base=<class 'osbrain.core.Agent'>)

Bases: multiprocessing.context.Process

Agent class. Instances of an Agent are system processes which can be run independently.

Attributes:
authkey
daemon

Return whether process is a daemon

exitcode

Return exit code of process or None if it has yet to stop

ident

Return identifier (PID) of process or None if it has yet to start

name
pid

Return identifier (PID) of process or None if it has yet to start

sentinel

Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.

Methods

is_alive() Return whether process is alive
join([timeout]) Wait until child process terminates
run() Method to be run in sub-process; can be overridden in sub-class
sigint_handler(signal, frame) Handle interruption signals.
start() Start child process
terminate() Terminate process; sends SIGTERM signal or uses TerminateProcess()
kill  
kill()
run()

Method to be run in sub-process; can be overridden in sub-class

sigint_handler(signal, frame)

Handle interruption signals.

start()

Start child process

osbrain.core.run_agent(name, nsaddr=None, addr=None, base=<class 'osbrain.core.Agent'>)

Ease the agent creation process.

This function will create a new agent, start the process and then run its main loop through a proxy.

Parameters:
name : str

Agent name or alias.

nsaddr : SocketAddress, default is None

Name server address.

addr : SocketAddress, default is None

New agent address, if it is to be fixed.

Returns:
proxy

A proxy to the new agent.

osbrain.address — osBrain address logic

Implementation of address-related features.

class osbrain.address.AgentAddress(host, port, kind=None, role=None)

Bases: osbrain.address.SocketAddress

Agent address information consisting on the host, port, kind and role.

Parameters:
host : str

Agent host.

port : int

Agent port.

kind : int, str, AgentAddressKind

Agent kind.

role : str, AgentAddressRole

Agent role.

Attributes:
host : str

Agent host.

port : int

Agent port.

kind : AgentAddressKind

Agent kind.

role : AgentAddressRole

Agent role.

Methods

socket_addr()
Returns:
twin()
Returns:
socket_addr()
Returns:
SocketAddress

Agent address as a SocketAddress object. This means kind and role information are lost.

twin()
Returns:
AgentAddress

The twin address of the current one; while the host and port are kept for the twin, the kind and role change to their corresponding twins, according to the rules defined in the respective classes.

class osbrain.address.AgentAddressKind

Bases: int

Agent’s address kind class. It can be any ZMQ type (‘REP’, ‘PUB’…).

Inherits from int to be compatible with ZMQ definitions, however, it is represented in its string form. The equivalence can also be evaluated against its string form.

Attributes:
denominator

the denominator of a rational number in lowest terms

imag

the imaginary part of a complex number

numerator

the numerator of a rational number in lowest terms

real

the real part of a complex number

Methods

bit_length() Number of bits necessary to represent self in binary.
conjugate Returns self, the complex conjugate of any int.
from_bytes(bytes, byteorder, *[, signed]) Return the integer represented by the given array of bytes.
requires_handler()
Returns:
to_bytes(length, byteorder, *[, signed]) Return an array of bytes representing an integer.
twin()
Returns:
ZMQ_KIND_TWIN = {1: 2, 2: 1, 3: 4, 4: 3, 7: 8, 8: 7}
ZMQ_STR_CONVERSION = {1: 'PUB', 2: 'SUB', 3: 'REQ', 4: 'REP', 'REP': 4, 7: 'PULL', 8: 'PUSH', 'SUB': 2, 'REQ': 3, 'PUB': 1, 'PUSH': 8, 'PULL': 7}
key = 'PULL'
keys = ['SUB', 'REP', 'PUB', 'REQ', 'PUSH', 'PULL']
requires_handler()
Returns:
bool

Whether the Agent’s address kind requires a handler or not. A socket which processes incoming messages would require a handler (i.e. ‘REP’, ‘PULL’, ‘SUB’…).

twin()
Returns:
AgentAddressKind

The twin kind of the current one; REQ would be the twin of REP and viceversa, PUB would be the twin of SUB and viceversa, etc.

class osbrain.address.AgentAddressRole

Bases: str

Agent’s address role class. It can either be ‘server’ or ‘client’.

Methods

capitalize() Return a capitalized version of S, i.e.
casefold() Return a version of S suitable for caseless comparisons.
center(width[, fillchar]) Return S centered in a string of length width.
count(sub[, start[, end]]) Return the number of non-overlapping occurrences of substring sub in string S[start:end].
encode([encoding, errors]) Encode S using the codec registered for encoding.
endswith(suffix[, start[, end]]) Return True if S ends with the specified suffix, False otherwise.
expandtabs([tabsize]) Return a copy of S where all tab characters are expanded using spaces.
find(sub[, start[, end]]) Return the lowest index in S where substring sub is found, such that sub is contained within S[start:end].
format(*args, **kwargs) Return a formatted version of S, using substitutions from args and kwargs.
format_map(mapping) Return a formatted version of S, using substitutions from mapping.
index(sub[, start[, end]]) Like S.find() but raise ValueError when the substring is not found.
isalnum() Return True if all characters in S are alphanumeric and there is at least one character in S, False otherwise.
isalpha() Return True if all characters in S are alphabetic and there is at least one character in S, False otherwise.
isdecimal() Return True if there are only decimal characters in S, False otherwise.
isdigit() Return True if all characters in S are digits and there is at least one character in S, False otherwise.
isidentifier() Return True if S is a valid identifier according to the language definition.
islower() Return True if all cased characters in S are lowercase and there is at least one cased character in S, False otherwise.
isnumeric() Return True if there are only numeric characters in S, False otherwise.
isprintable() Return True if all characters in S are considered printable in repr() or S is empty, False otherwise.
isspace() Return True if all characters in S are whitespace and there is at least one character in S, False otherwise.
istitle() Return True if S is a titlecased string and there is at least one character in S, i.e.
isupper() Return True if all cased characters in S are uppercase and there is at least one cased character in S, False otherwise.
join(iterable) Return a string which is the concatenation of the strings in the iterable.
ljust(width[, fillchar]) Return S left-justified in a Unicode string of length width.
lower() Return a copy of the string S converted to lowercase.
lstrip([chars]) Return a copy of the string S with leading whitespace removed.
maketrans(x[, y, z]) Return a translation table usable for str.translate().
partition(sep) Search for the separator sep in S, and return the part before it, the separator itself, and the part after it.
replace(old, new[, count]) Return a copy of S with all occurrences of substring old replaced by new.
rfind(sub[, start[, end]]) Return the highest index in S where substring sub is found, such that sub is contained within S[start:end].
rindex(sub[, start[, end]]) Like S.rfind() but raise ValueError when the substring is not found.
rjust(width[, fillchar]) Return S right-justified in a string of length width.
rpartition(sep) Search for the separator sep in S, starting at the end of S, and return the part before it, the separator itself, and the part after it.
rsplit([sep, maxsplit]) Return a list of the words in S, using sep as the delimiter string, starting at the end of the string and working to the front.
rstrip([chars]) Return a copy of the string S with trailing whitespace removed.
split([sep, maxsplit]) Return a list of the words in S, using sep as the delimiter string.
splitlines([keepends]) Return a list of the lines in S, breaking at line boundaries.
startswith(prefix[, start[, end]]) Return True if S starts with the specified prefix, False otherwise.
strip([chars]) Return a copy of the string S with leading and trailing whitespace removed.
swapcase() Return a copy of S with uppercase characters converted to lowercase and vice versa.
title() Return a titlecased version of S, i.e.
translate(table) Return a copy of the string S in which each character has been mapped through the given translation table.
twin()
Returns:
upper() Return a copy of S converted to uppercase.
zfill(width) Pad a numeric string S with zeros on the left, to fill a field of the specified width.
twin()
Returns:
AgentAddressRole

The twin role of the current one; server would be the twin of client and viceversa.

class osbrain.address.SocketAddress(host, port)

Bases: object

Socket address information consisting on the host and port.

Parameters:
host : str

Agent host.

port : int

Agent port.

Attributes:
host : str

Agent host.

port : int

Agent port.

osbrain.common — osBrain common logic

Miscellaneous utilities.

class osbrain.common.LogLevel

Bases: str

Identifies the log level: ERROR, WARNING, INFO, DEBUG.

Methods

capitalize() Return a capitalized version of S, i.e.
casefold() Return a version of S suitable for caseless comparisons.
center(width[, fillchar]) Return S centered in a string of length width.
count(sub[, start[, end]]) Return the number of non-overlapping occurrences of substring sub in string S[start:end].
encode([encoding, errors]) Encode S using the codec registered for encoding.
endswith(suffix[, start[, end]]) Return True if S ends with the specified suffix, False otherwise.
expandtabs([tabsize]) Return a copy of S where all tab characters are expanded using spaces.
find(sub[, start[, end]]) Return the lowest index in S where substring sub is found, such that sub is contained within S[start:end].
format(*args, **kwargs) Return a formatted version of S, using substitutions from args and kwargs.
format_map(mapping) Return a formatted version of S, using substitutions from mapping.
index(sub[, start[, end]]) Like S.find() but raise ValueError when the substring is not found.
isalnum() Return True if all characters in S are alphanumeric and there is at least one character in S, False otherwise.
isalpha() Return True if all characters in S are alphabetic and there is at least one character in S, False otherwise.
isdecimal() Return True if there are only decimal characters in S, False otherwise.
isdigit() Return True if all characters in S are digits and there is at least one character in S, False otherwise.
isidentifier() Return True if S is a valid identifier according to the language definition.
islower() Return True if all cased characters in S are lowercase and there is at least one cased character in S, False otherwise.
isnumeric() Return True if there are only numeric characters in S, False otherwise.
isprintable() Return True if all characters in S are considered printable in repr() or S is empty, False otherwise.
isspace() Return True if all characters in S are whitespace and there is at least one character in S, False otherwise.
istitle() Return True if S is a titlecased string and there is at least one character in S, i.e.
isupper() Return True if all cased characters in S are uppercase and there is at least one cased character in S, False otherwise.
join(iterable) Return a string which is the concatenation of the strings in the iterable.
ljust(width[, fillchar]) Return S left-justified in a Unicode string of length width.
lower() Return a copy of the string S converted to lowercase.
lstrip([chars]) Return a copy of the string S with leading whitespace removed.
maketrans(x[, y, z]) Return a translation table usable for str.translate().
partition(sep) Search for the separator sep in S, and return the part before it, the separator itself, and the part after it.
replace(old, new[, count]) Return a copy of S with all occurrences of substring old replaced by new.
rfind(sub[, start[, end]]) Return the highest index in S where substring sub is found, such that sub is contained within S[start:end].
rindex(sub[, start[, end]]) Like S.rfind() but raise ValueError when the substring is not found.
rjust(width[, fillchar]) Return S right-justified in a string of length width.
rpartition(sep) Search for the separator sep in S, starting at the end of S, and return the part before it, the separator itself, and the part after it.
rsplit([sep, maxsplit]) Return a list of the words in S, using sep as the delimiter string, starting at the end of the string and working to the front.
rstrip([chars]) Return a copy of the string S with trailing whitespace removed.
split([sep, maxsplit]) Return a list of the words in S, using sep as the delimiter string.
splitlines([keepends]) Return a list of the lines in S, breaking at line boundaries.
startswith(prefix[, start[, end]]) Return True if S starts with the specified prefix, False otherwise.
strip([chars]) Return a copy of the string S with leading and trailing whitespace removed.
swapcase() Return a copy of S with uppercase characters converted to lowercase and vice versa.
title() Return a titlecased version of S, i.e.
translate(table) Return a copy of the string S in which each character has been mapped through the given translation table.
upper() Return a copy of S converted to uppercase.
zfill(width) Pad a numeric string S with zeros on the left, to fill a field of the specified width.
osbrain.common.address_to_host_port(addr=None)

Try to convert a string or SocketAddress to a (host, port) tuple.

Parameters:
addr : str, SocketAddress
Returns:
tuple

A (host, port) tuple formed with the corresponding data.

osbrain.common.periodic(scheduler, interval, action, args=())

Run a scheduler periodically.

This function will run forever and blocking.

Parameters:
scheduler : sched.scheduler

Scheduler to run.

interval : numeric

Delay to apply to the scheduler.

action

Action to execute by the scheduler.

args, default is ()

Arguments for the action.

osbrain.common.repeat(interval, action, args=())

Repeat an action forever after a given number of seconds.

If a sequence of events takes longer to run than the time available before the next event, the repeater will simply fall behind.

This function is executed in a separate thread.

Parameters:
interval : float

Number of seconds between executions.

action

To be taken after the interval.

args : tuple, default is ()

Arguments for the action.

Returns:
threading.Thread

Thread running the repeat task.

osbrain.common.unbound_method(method)
Returns:
function

Unbounded function.

osbrain.logging — osBrain logging logic

Implementation of logging-related features.

class osbrain.logging.Logger(name=None, host=None)

Bases: osbrain.core.Agent

Specialized Agent for logging. Binds a SUB socket and starts logging incoming messages.

Methods

addr(alias)
Parameters:
bind(kind[, alias, handler, host, port]) Bind to an agent address.
connect(server_address[, alias, handler]) Connect to a server agent address.
each(period, method, *args, **kwargs) Execute a repeated action with a defined period.
handle_loopback(message) Handle incoming messages in the loopback socket.
iddle() This function is to be executed when the agent is iddle.
iterate() Agent’s main iteration.
log_debug(message[, logger]) Log a debug message.
log_error(message[, logger]) Log an error message.
log_handler(message, topic) Handle incoming log messages.
log_info(message[, logger]) Log an info message.
log_warning(message[, logger]) Log a warning message.
loop() Agent’s main loop.
loopback(header[, data]) Send a message to the loopback socket.
on_init() This user-defined method is to be executed after initialization.
ping() A simple ping method testing purposes.
raise_exception() Raise an exception (for testing purposes).
run() Run the agent.
safe(method, *args, **kwargs) A safe call to a method.
safe_ping() A simple loopback ping for testing purposes.
set_attr(**kwargs) Set object attributes.
set_logger(logger[, alias]) Connect the agent to a logger and start logging messages to it.
set_loop(loop)
set_method(*args, **kwargs) Set object methods.
stop() Stop the agent.
subscribe(alias, handlers) Subscribe the agent to another agent.
test() A test method to check the readiness of the agent.
close_sockets  
execute  
get_attr  
kill  
recv  
register  
registered  
self_execute  
send  
send_recv  
set_handler  
shutdown  
str2bytes  
log_handler(message, topic)

Handle incoming log messages.

on_init()

This user-defined method is to be executed after initialization.

osbrain.logging.pyro_log()

Set environment variables to activate Pyro logging. The log level is set to “DEBUG”.

osbrain.logging.run_logger(name, nsaddr=None, addr=None, base=<class 'osbrain.logging.Logger'>)

Ease the logger creation process.

This function will create a new logger, start the process and then run its main loop through a proxy.

Parameters:
name : str

Logger name or alias.

nsaddr : SocketAddress, default is None

Name server address.

addr : SocketAddress, default is None

New logger address, if it is to be fixed.

Returns:
proxy

A proxy to the new logger.

osbrain.proxy — osBrain proxy logic

Implementation of proxy-related features.

class osbrain.proxy.NSProxy(nsaddr=None, timeout=3)

Bases: Pyro4.core.Proxy

A proxy to access a name server.

Parameters:
nsaddr : SocketAddress, str

Name server address.

timeout : float

Timeout, in seconds, to wait until the name server is discovered.

Methods

addr([agent_alias, address_alias]) Return the name server address or the address of an agent’s socket.
proxy(name[, timeout]) Get a proxy to access an agent registered in the name server.
release() Release the connection to the Pyro daemon.
shutdown() Shutdown the name server.
shutdown_agents([timeout]) Shutdown all agents registered in the name server.
addr(agent_alias=None, address_alias=None)

Return the name server address or the address of an agent’s socket.

Parameters:
agent_alias : str, default is None

The alias of the agent to retrieve its socket address.

address_alias : str, default is None

The alias of the socket address to retrieve from the agent.

Returns:
SocketAddress or AgentAddress

The name server or agent’s socket address.

proxy(name, timeout=3.0)

Get a proxy to access an agent registered in the name server.

Parameters:
name : str

Proxy name, as registered in the name server.

timeout : float

Timeout, in seconds, to wait until the agent is discovered.

Returns:
Proxy

A proxy to access an agent registered in the name server.

release()

Release the connection to the Pyro daemon.

shutdown()

Shutdown the name server. All agents will be shutdown as well.

shutdown_agents(timeout=3.0)

Shutdown all agents registered in the name server.

Parameters:
timeout : float, default is 3.

Timeout, in seconds, to wait for the agents to shutdown.

class osbrain.proxy.Proxy(name, nsaddr=None, timeout=3.0)

Bases: Pyro4.core.Proxy

A proxy to access remote agents.

Parameters:
name : str

Proxy name, as registered in the name server.

nsaddr : SocketAddress, str

Name server address.

timeout : float

Timeout, in seconds, to wait until the agent is discovered.

Methods

pyro_addr()
Returns:
release() Release the connection to the Pyro daemon.
pyro_addr()
Returns:
SocketAddress

The socket address of the Pyro server.

release()

Release the connection to the Pyro daemon.

osbrain.proxy.locate_ns(nsaddr, timeout=3)

Locate a name server (ping) to ensure it actually exists.

Parameters:
nsaddr : SocketAddress

The address where the name server should be up and running.

timeout : float

Timeout in seconds before aborting location.

Returns:
nsaddr

The address where the name server was located.

Raises:
NamingError

If the name server could not be located.

osbrain.nameserver — osBrain nameserver logic

Implementation of name server.

class osbrain.nameserver.NameServer(*args, **kwargs)

Bases: Pyro4.naming.NameServer

Methods

agents() List agents registered in the name server.
async_shutdown() Shutdown the name server.
async_shutdown_agents() Shutdown all agents registered in the name server.
list([prefix, regex, metadata_all, …]) Retrieve the registered items as a dictionary name-to-URI.
lookup(name[, return_metadata]) Lookup the given name, returns an URI if found.
ping() A simple test method to check if the name server is running correctly.
register(name, uri[, safe, metadata]) Register a name with an URI.
remove([name, prefix, regex]) Remove a registration.
set_metadata(name, metadata) update the metadata for an existing registration
count  
agents()

List agents registered in the name server.

async_shutdown()

Shutdown the name server. All agents will be shutdown as well.

async_shutdown_agents()

Shutdown all agents registered in the name server.

ping()

A simple test method to check if the name server is running correctly.

class osbrain.nameserver.NameServerProcess(addr=None)

Bases: multiprocessing.context.Process

Name server class. Instances of a name server are system processes which can be run independently.

Attributes:
authkey
daemon

Return whether process is a daemon

exitcode

Return exit code of process or None if it has yet to stop

ident

Return identifier (PID) of process or None if it has yet to start

name
pid

Return identifier (PID) of process or None if it has yet to start

sentinel

Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.

Methods

agents() List agents registered in the name server.
is_alive() Return whether process is alive
join([timeout]) Wait until child process terminates
run() Method to be run in sub-process; can be overridden in sub-class
shutdown() Shutdown the name server.
shutdown_all() Shutdown all agents registered in the name server.
sigint_handler(signal, frame) Handle interruption signals.
start() Start child process
terminate() Terminate process; sends SIGTERM signal or uses TerminateProcess()
agents()

List agents registered in the name server.

run()

Method to be run in sub-process; can be overridden in sub-class

shutdown()

Shutdown the name server. All agents will be shutdown as well.

shutdown_all()

Shutdown all agents registered in the name server.

sigint_handler(signal, frame)

Handle interruption signals.

start()

Start child process

osbrain.nameserver.random_nameserver()

Start a random name server.

Returns:
SocketAddress

The name server address.

osbrain.nameserver.run_nameserver(addr=None)

Ease the name server creation process.

This function will create a new nameserver, start the process and then run its main loop through a proxy.

Parameters:
addr : SocketAddress, default is None

Name server address.

Returns:
proxy

A proxy to the name server.

Software License and Disclaimer

Copyright 2016 Open Sistemas de Información Internet S.L.

Licensed under the Apache License, Version 2.0 (the “License”);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an “AS IS” BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Indices and tables