Inspirel banner

Programming Distributed Systems with YAMI4

9.3.5 Python

The Python publisher and subscriber are simple scripts that only require the visibility of the YAMI4 implementation files from the place where the scripts are started - please see the chapter about library structure and compilation for instructions on how to ensure this is the case.

Assuming that the environment is properly set up, the following invocation is enough to start the publisher program with the shortest possible target:

$ python3 publisher.py 'tcp://*:*'

The publisher program is entirely implemented in the publisher.py file, which is dissected in detail below.

First the necessary import statements:

import random
import sys
import time
import yami

Apart from the yami and sys packages, which were needed in the other examples as well, this program relies also on the random number generator and ability to pause for a defined period of time - these packages help to implement the ``domain'' part of the program.

The publisher verifies its command-line arguments:

if len(sys.argv) != 2:
    print("expecting one parameter: publisher destination")
    exit()

publisher_address = sys.argv[1]

The publisher is very similar in its structure to a typical server, as already presented in previous examples, but here an important new entity is the ValuePublisher object that encapsulates the subscription management for a single data source:

try:
    random_value = yami.ValuePublisher()

The random_value object declared above will be later registered as a regular YAMI4 object that can be seen by remote clients.

    with yami.Agent() as publisher_agent:
        resolved_address = publisher_agent.add_listener(
            publisher_address)

        print("The publisher is listening on", resolved_address)

The value publisher object is registered as a regular object. In this example the "random_number" is the name that will be seen by remote clients.

        publisher_agent.register_value_publisher(
            "random_number", random_value)

The publisher performs a simple work of generating random numbers every second and publishing them to the currently subscribed clients.

All values that are published need to be expressed in terms of the parameters object, which is consistent with the general YAMI4 data model. In this example a single random value is published as an integer entry in the parameters object:

        # publish random numbers forever
        while True:
            rnd = random.randint(0, 99)
            content = {"value":rnd}

            print("publishing value", rnd)

            random_value.publish(content)

The above operation is asynchronous just like any other message-sending operation. Internally, it scans the list of currently subscribed clients and sends them an update message with the given parameters object. The actual messages are sent in background.

The publisher's main loop involves one-second delay, but in practical programs data publishing need not be periodic.

            time.sleep(1)

The publisher concludes with basic exception handling.

except Exception as e:
    print("error:", e)

The subscriber program is implemented in a single subscriber.py file.

Subscribers have both client- and server-side properties in the sense that they need to be able to both send a subscription message to the publisher - which is the client-side activity - and to receive the updates - which makes them act like servers.

The example subscriber code starts with relevant imports:

import sys
import yami

To enable the subscriber to receive subscription updates, the appropriate message handler needs to be implemented. As with other servers, this is achieved with a simple global function named call(), although any other callable entity could be used as well.

The update handler simply prints the received value on the console:

def update_handler(message):
    content = message.get_parameters()
    value = content["value"]
    print("received update:", value)


The main part of the program obtains the publisher target from its command-line arguments and registers the message handler to enable proper routing of subscription updates.

An important property of this program is that even though it technically acts like a server (it receives updates via registered message handler), it does not have any listener. The listener is not needed here, because the subscription updates will be sent back using the same communication channel that is initially used for sending the subscription message - this is an example of reverting the channel's natural direction.

if len(sys.argv) != 2:
    print("expecting one parameter: publisher destination")
    exit()

publisher_address = sys.argv[1]

try:
    with yami.Agent() as subscriber_agent:

        # prepare subscription update callback

        update_object_name = "update_handler"

        subscriber_agent.register_object(
            update_object_name, update_handler)

Once the message handler is registered as a regular object, the subscription message can be sent to the publisher. It is important to properly inform the publisher of the intended object name where the updates are to be delivered - here, the object is registered with the name "update_handler" and this name is sent to the publisher as the "destination_object" entry in the subscription message's payload:

        # subscribe to the producer

        params = {"destination_object":update_object_name}

        subscriber_agent.send_one_way(publisher_address,
            "random_number", "subscribe", params)

        print("subscribed, waiting for updates")

Above, the message that is sent to publisher is named ``subscribe'', which is automatically recognized by the value publisher object. Another recognized name is ``unsubscribe'', which causes the given subscriber to be removed from the list.

After all these preparations, the subscriber is ready to receive updates and falls into a dummy input operation to leave the background threads drive the whole program's activity.

        # block forever and receive updates in background

        dummy = sys.stdin.read()

except Exception as e:
    print("error:", e)