DEV Community

Cover image for Pub/sub with PyZMQ: Part 1
Muhammad Syuqri
Muhammad Syuqri

Posted on • Updated on

Pub/sub with PyZMQ: Part 1

This series of guides will go into the usage of the publish-subscribe pattern with PyZMQ. I will start off with simple examples and then go into some of the use cases that I have applied using this pattern. You will find Gists with sample code to get you started with.

Here are the topics I will go through for this guide:

What is publish/subscribe?

It is a messaging pattern that is meant for allowing senders to send their messages to recipients without having to know who their recipients are. The senders are known as publishers while the recipients are known as subscribers.

A simple analogy for this pattern would be a convention. A convention can be held in a large hall. In the hall there may be multiple sections for speakers to give their talks at. A speaker can give his/her talk regardless of whether or not anyone is listening. As attendees, we can roam around and drop in at any of these sections to listen to any speaker. When we sit in at a particular section, we are only listening to a single speaker. The speakers in this analogy refer to the publishers, while attendees refer to the subscribers.

In this guide, I will be covering a one-to-one publish/subscribe pattern, which simplifies the explanation process for now.

What is PyZMQ?

PyZMQ is the Python-based binding for the popular open-source ZeroMQ (ØMQ/ZMQ) messaging library. ZMQ comes with no messaging brokers on the side, meaning that there is no intermediary server/module which is needed to translate the messages sent between sender and recipient to a uniform messaging protocol. This makes ZMQ lightweight and easy to get started with.

ZMQ also comes with support for multiple messaging patterns apart from pubsub and programming languages, allowing cross-language communication. However, this guide will only be focusing on the pubsub pattern in Python.

Getting started

Install the pyzmq package

pip install pyzmq

We can then proceed to write a simple publisher to start sending out messages.

# simple_pub.py
import zmq

host = "127.0.0.1"
port = "5001"

# Creates a socket instance
context = zmq.Context()
socket = context.socket(zmq.PUB)

# Binds the socket to a predefined port on localhost
socket.bind("tcp://{}:{}".format(host, port))

# Sends a string message
socket.send_string("hello")

The publisher script will run and send a single message with the string "hello" and exit.

The subscriber code can be written as such:

# simple_sub.py
import zmq

host = "127.0.0.1"
port = "5001"

# Creates a socket instance
context = zmq.Context()
socket = context.socket(zmq.SUB)

# Connects to a bound socket
socket.connect("tcp://{}:{}".format(host, port))

# Subscribes to all topics
socket.subscribe("")

# Receives a string format message
socket.recv_string()

The subscriber code is very similar to the publisher one. The only difference is that the subscriber will now connect to the publisher socket to receive messages from the publisher. The subscriber can also subscribe to certain topics, but for the example above, we will subscribe to all topics indicated by the empty string.

Open up a terminal. If you try to run the simple_sub code now, you will see that the code execution is blocked. This is because socket.recv_string() is blocking the execution, waiting for a message to be received. You cannot interrupt the execution using the ctrl+Z command as well. As such, it can be quite troublesome to always have to close the terminal window to force the code to exit or run the publisher code to ensure the completion of the subscriber code. Let's use either of these simple solutions for now.

Open up a separate terminal. When you run simple_pub, you might expect that the subscriber would receive the message. However, this will not be the case as bind() executes in an asynchronous manner. This means that the entire script actually finishes even before the bind() statement manages to complete.

Let's edit simple_pub to include a sleep statement, which is a simplified solution to this problem. This way, the asynchronous bind() should run to completion before the publishing of the message.

# simple_pub.py

# ...

# Binds the socket to a predefined port on localhost
socket.bind("tcp://{}:{}".format(host, port))

time.sleep(1) # new sleep statement

# Receives a string format message
socket.send_string("hello")

Let's run the publisher code again.

Now we can see that the code execution in simple_sub has run to completion and printed out the word hello in string format. If we use the generic socket.recv(), we will end up with a bytes type.

>>> socket.recv_string()
hello
>>> socket.recv()
b'hello'

It really depends on the use case as PyZMQ provides a few functions to send and receive different serialized/encoded messages. These include:

  • send_json/recv_json
  • send_multipart/recv_multipart
  • send_pyobj/recv_pyobj
  • send_serialized/recv_serialized

More details on each of these can be found in the PyZMQ Socket docs.

Specifying topics

In the pubsub pattern, subscribing to a topic means that you only listen in for particular messages that you are interested in and disregard the rest.

Using the convention analogy again, let's say you would like to only listen to certain segments of speaker A's presentation. You are only interested in Machine Learning and Blockchain topics, but find other topics a bore. Whenever the speaker mentions Machine Learning, you take down the notes related to that topic. Whenever the speaker mentions Blockchain, you take down the notes related to that topic. Whenever the speaker mentions Dev.to, you close your notebook and ignore whatever he/she says. This is what subscribing to a topic in the pubsub pattern equates to.

To do this with PyZMQ, you can use socket.subscribe(). Previously, we have subscribed to all topics by using .subscribe(""). From the high-level perspective, the function allows us to subscribe to a topic. What we do not see are the inner workings of ZMQ, which actually creates a filter to sieve the messages that we are interested in. I will not elaborate too much on this as it can be found here for your leisurely read.

Let us now only subscribe to the topic "ML".

# simple_sub.py

# ...

# Subscribes to all topics
socket.subscribe("ML")

# Receives a string format message
socket.recv_string()

This means that any message published starting with "ML" will be received by the subscriber. So, we have to alter the messages sent by the publisher to achieve this.

# simple_pub.py

# ...

# Sends a string message
socket.send_string("ML hello")

When we run the code, we can see that the message is received by the subscriber. To demonstrate that all other topics are filtered out, you can try changing the "Dev.to" in the subscriber code to any other characters and you will see that the code execution for the subscriber will be blocked again, indicating that it has not received messages based on the topics it has subscribed to.

So now, we have the topic and the actual message that we want combined together into a single message that is sent from the publisher. We can use .split() to separate the topic and actual message in the subscriber.

# simple_sub.py

# ...

whole_message = socket.recv_string()
topic, actual_message = whole_message.split(" ")

Alternatively, we can use send_multipart() and recv_multipart() accordingly, which will be explained next.

Multipart messages

Multipart messages are sent from the publisher by including all of your message parts together with a flag which indicates more message parts are to follow. When we were using send_string(), we were only sending a single message disguised as two parts separated by a space.

Now, we shall attempt to use send_multipart() in the publisher to send two separate parts so that the topic and message can be distinguished without using any split().

# multipart_pub.py
import zmq
import time

host = "127.0.0.1"
port = "5001"

# Creates a socket instance
context = zmq.Context()
socket = context.socket(zmq.PUB)

# Binds the socket to a predefined port on localhost
socket.bind("tcp://{}:{}".format(host, port))

time.sleep(1)

# Sends a multipart message
socket.send_multipart([b"ML", b"hello"])

Note: When using .send() and .send_multipart(), the message/message parts need to be bytes type respectively. Previously when we were using send_string(), the function helped us encode the string into bytes.

# multipart_sub.py
import zmq

host = "127.0.0.1"
port = "5001"

# Creates a socket instance
context = zmq.Context()
socket = context.socket(zmq.SUB)

# Connects to a bound socket
socket.connect("tcp://{}:{}".format(host, port))

# Subscribes to all topics
socket.subscribe("ML")

# Receives a multipart message
print(socket.recv_multipart())

After running both scripts together, we can see the output of the execution in multipart_sub:

>>> print(socket.recv_multipart())
[b'ML', b'hello']

The first element in the list is the topic while the second one is the message part which we are interested in.

You can also craft your own multipart message by using the zmq.SNDMORE flag.

# multipart_pub.py

# ...

# Sends the first part
socket.send_string("ML", flags=zmq.SNDMORE)
# Sends the second part
socket.send_string("hello")

There are no flags applied to the second message because the default flag is 0, which indicates that there are now more parts following the message part being sent. The subscriber will still receive the same multipart message as shown previously.

In a more realistic scenario, you might want to send a JSON instead so that key/value retrieval can be done. To do this, simply replace send_string() in the second part with send_json(). send_json() will automatically serialize the dictionary that you pass to it into JSON format.

# multipart_pub.py

# ...

# Sends the first part
socket.send_string("ML", flags=zmq.SNDMORE)
# Sends the second part
socket.send_json({"message": "hello"})

The end result on the subscriber end would be as follows:

>>> print(socket.recv_multipart())
[b'ML', b'{"message":"hello"}']

To ease the deserialization of the JSON back into a Python dictionary, we can use the recv_json() function instead. We can replace the recv_multipart() with two lines of code as follows:

# print(socket.recv_multipart()) 

print(socket.recv_string())
print(socket.recv_json())

Which will then yield the following results:

ML
{'message': 'hello'}

Future Guides

I hope that this post serves as a good starting point for anyone who might be keen on using the pubsub pattern with PyZMQ. For this post, I have mainly focused on the following:

  • Single publisher to single subscriber messaging
  • Subscribing to topics
  • Sending/receiving multipart messages

For the next post, I do hope to cover the following to come up with solutions with some of the limitations touched on in this post:

  • 1-to-N/N-to-1 publisher/subscriber messaging
  • N-to-N publisher/subscriber messaging
  • Using Poller to prevent code execution blocking

Here are the links for the codes:

Thanks for reading! I do hope to learn from your feedback and comments, so feel free do drop them below.

Top comments (4)

Collapse
 
dminor profile image
Darsh Shukla

Awesome 🔥. Well explained. I will try it soon.
I tried to implement the minimalist conceptual architecture of pub/sub using python coroutine.
Have a look : dev.to/dminor/python-part-2-genera...

Collapse
 
limbumanshi profile image
Manshi Limbu

Great article. Waiting for the Part 2 !

Collapse
 
dansyuqri profile image
Muhammad Syuqri

Thank you for reading! ☺️

Collapse
 
marcoguarddog profile image
MarcoGuardDog

You are amazing, thanks alot good man