Tutorial - 2¶
Repeated actions¶
Timers can be used to repeat an action after a period of time. To illustrate
this, let us modify the Push-Pull example a bit and make use of the
each
method:
from osbrain import run_agent
from osbrain import run_nameserver
def log_message(agent, message):
agent.log_info('Received: %s' % message)
def annoy(agent, say, more=None):
message = say if not more else say + ' ' + more + '!'
agent.send('annoy', message)
if __name__ == '__main__':
run_nameserver()
orange = run_agent('Orange')
apple = run_agent('Apple')
addr = orange.bind('PUSH', alias='annoy')
apple.connect(addr, handler=log_message)
# Multiple timers with parameters
orange.each(1., annoy, 'Hey')
orange.each(1.4142, annoy, 'Apple')
orange.each(3.1415, annoy, 'Hey', more='Apple')
Note that if an action takes longer to run than the time available before the next execution, the timer will simply fall behind.
OOP¶
Although the approach of using proxies for the whole configuration process is valid, sometimes the developer may prefer to use OOP to define the behavior of an agent.
This, of course, can be done with osBrain:
import time
from osbrain import run_agent
from osbrain import run_nameserver
from osbrain import Agent
class Greeter(Agent):
def on_init(self):
self.bind('PUSH', alias='main')
def hello(self, name):
self.send('main', 'Hello, %s!' % name)
def log_message(agent, message):
agent.log_info('Received: %s' % message)
if __name__ == '__main__':
# System deployment
run_nameserver()
alice = run_agent('Alice', base=Greeter)
bob = run_agent('Bob')
# System configuration
bob.connect(alice.addr('main'), handler=log_message)
# Send messages
while True:
time.sleep(1)
alice.hello('Bob')
Most of the code is similar to the one presented in the Push-Pull example, however you may notice some differences:
- When runing Alice, a new parameter
base
is passed to theosbrain.core.run_agent()
function. This means that, instead of running the default agent class, the user-defined agent class will be used instead. In this case, this class is namedGreeter
. - The
Greeter
class implements two methods:on_init()
: which is executed on initialization and will, in this case, simply bind a'PUSH'
communication channel.hello()
: which simply logs a Hello message when it is executed.
- When connecting Bob to Alice, we need the address where Alice binded
to. As the binding was executed on initialization, we need to use the
addr()
method, which will return the address associated to the alias passed as parameter (in the example above it ismain
).
Filtering¶
The publish-subscribe pattern is very useful, but it is also very powerful when combined with filtering.
Any time we publish a message from an agent, a topic can be specified. If a topic is specified, then only the agents that are subscribed to that topic will receive the message. This filtering is done in the publisher side, meaning that the network does not suffer from excessive message passing.
In the following example we have Alice publishing messages using topic
a
or b
at random. Then we have Bob subscribed to both topics, Eve
subscribed to topic a
only and Dave subscribed to topic b
only.
import time
import random
from osbrain import run_agent
from osbrain import run_nameserver
def log_a(agent, message):
agent.log_info('Log a: %s' % message)
def log_b(agent, message):
agent.log_info('Log b: %s' % message)
if __name__ == '__main__':
# System deployment
run_nameserver()
alice = run_agent('Alice')
bob = run_agent('Bob')
eve = run_agent('Eve')
dave = run_agent('Dave')
# System configuration
addr = alice.bind('PUB', alias='main')
bob.connect(addr, handler={'a': log_a, 'b': log_b})
eve.connect(addr, handler={'a': log_a})
dave.connect(addr, handler={'b': log_b})
# Send messages
while True:
time.sleep(1)
topic = random.choice(['a', 'b'])
message = 'Hello, %s!' % topic
alice.send('main', message, topic=topic)
Note how we can specify different handlers for different topics when subscribing agents.
Adding new methods¶
Note that proxies can not only be used to execute methods remotely in the agent, but they can also be used to add new methods or change already existing methods in the remote agent.
In the following example you can see how we can create a couple of functions that are then added to the remote agent as new methods.
In order to add new methods (or change current methods) we only need to call
set_method()
from the proxy.
from osbrain import run_agent
from osbrain import run_nameserver
def set_x(self, value):
self.x = value
def set_y(self, value):
self.y = value
def add_xy(self):
return self.x + self.y
if __name__ == '__main__':
# System deployment
run_nameserver()
agent = run_agent('Example')
# System configuration
agent.set_method(set_x, set_y, add=add_xy)
# Trying the new methods
agent.set_x(1)
agent.set_y(2)
print(agent.add())
Note that set_method()
accepts any number of parameters:
- In case they are not named parameters, the function names will be used as the method names in the remote agent.
- In case they are named parameters, then the method in the remote agent will be named after the parameter name.
Lambdas¶
osBrain uses dill for serialization when communicating with remote agents through a proxy. This means that almost anything can be serialized to an agent using a proxy.
In order to further simplify some tasks, lambda functions can be used to configure remote agents:
from osbrain import run_agent
from osbrain import run_nameserver
if __name__ == '__main__':
run_nameserver()
alice = run_agent('Alice')
bob = run_agent('Bob')
addr = alice.bind('REP', handler=lambda agent, msg: 'Received ' + str(msg))
bob.connect(addr, alias='main')
for i in range(10):
bob.send('main', i)
reply = bob.recv('main')
print(reply)
See the similarities between this example and the one showed in Request-Reply. In fact, the only difference is the binding from Alice, in which we are using a lambda function for the handler.
Reply early¶
The easiest way to reply to a request is to return a value from the handler, as seen in Request-Reply:
def reply(agent, message):
return 'Received ' + str(message)
However, an agent can reply earlier if needed:
def reply(agent, message):
agent.send('main', 'Received' + str(message)) # Reply now
agent.log_info('Already sent a reply back!') # Do some stuff later
Note how, in this case, we need to manually send the reply using the corresponding socket, though.
Shutting down¶
Although not covered in the examples until now (because many times you just want the multi-agent system to run forever until, perhaps, an event occurs), it is possible to actively kill the system using proxies:
import time
from osbrain import run_agent
from osbrain import run_nameserver
def tick(agent):
agent.log_info('tick')
if __name__ == '__main__':
ns = run_nameserver()
a0 = run_agent('Agent0')
a1 = run_agent('Agent1')
a0.each(1, tick)
a1.each(1, tick)
time.sleep(3)
a0.shutdown()
time.sleep(3)
ns.shutdown()
Note
Shutting down the nameserver will result in all agents registered in the name server being shut down as well.