DEV Community

Pawel Suchanecki
Pawel Suchanecki

Posted on • Edited on

How to Implement the Producer-Consumer Concurrency Design Pattern with asyncio Coroutines

The Producer-Consumer concurrency design pattern is a widely used approach in concurrent programming, where a producer generates data and a consumer consumes data. These components run in parallel, and the data is passed between them using a buffer or a queue.

In this article, we demonstrate how to use asyncio coroutines to implement the Producer-Consumer concurrency design pattern, with a practical example of monitoring a log file on a remote machine over SSH connection. We start by implementing the Producer-Consumer pattern using the yield statement, then we modify the code to use a queue for inter-coroutine communication.

Note: in example we require password-less sudo to work on remote host (server) as we monitor nginx log file, which is not publicly accessible.

Using yield Statement

from fabric import Connection
import asyncio
import datetime
import time

host = '1337-demo-host.org'
user = 'almalinux'
key_filename = '/Users/user/.ssh/id_rsa'
log_file = '/var/log/nginx/access.log'
delay = 5
buffer_lines = 5

async def producer():
    conn = Connection(host, user=user, connect_kwargs={'key_filename': key_filename})
    with conn.cd('/tmp'):
        tail_cmd = f'sudo tail -n {buffer_lines} {log_file}'
        old_data = ()
        while True:
            channel = conn.run(command=tail_cmd, hide='both')
            data = channel.stdout.splitlines()
            if old_data != data:
                old_d=set(old_data)
                delta_data = [x for x in data if x not in old_d]
                for d in delta_data:
                    yield str(d)
            old_data = data.copy()
            data.clear()
            time.sleep(delay)
    conn.close()

async def consumer():
    p=producer()
    async for line in p:
        dt = datetime.datetime.fromtimestamp(time.time())
        print (f"TS: {dt}\n{line}", end = '\n', flush = True)

async def main():
    await asyncio.gather(consumer())

if __name__ == '__main__':
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

To make the script asynchronous, we use the asyncio library. The asyncio module provides a high-level concurrency framework based on coroutines, which allows us to write asynchronous code that looks synchronous. Coroutines are implemented using the async and await keywords.

In our script, the producer coroutine generates data by executing the tail command on the remote machine over SSH connection. The consumer coroutine consumes data by printing the new log lines along with the timestamp. The yield statement is used in the producer coroutine to generate new log lines, which are consumed by the consumer coroutine.

The asyncio.gather() method is used to run both coroutines concurrently, allowing the consumer coroutine to consume the data produced by the producer coroutine in a non-blocking way. The await keyword is used to wait for the coroutines to complete.

About Fabric

We used Fabric to establish an SSH connection to a remote machine, execute remote command and retrieve the output of it. Fabric is a Python library that simplifies SSH connections and remote execution of commands on multiple machines, providing a higher-level API for common tasks.
More on Fabric can be found at the project's website.

Using asyncio.Queue()

In this version, we replace the yield statement with an asyncio.Queue() object for inter-coroutine communication.

As previously, the producer coroutine generates data by executing the tail command on the remote machine over SSH connection but this time, it puts new log lines into the queue. The consumer coroutine reads data from the queue and prints the new log lines along with the timestamp.

The await queue.put() method is used to put new log lines into the queue, and the await queue.get() method is used to read new log lines from the queue. The queue.task_done() method is used to indicate that the consumer has finished processing a log line.

By using a queue for inter-coroutine communication, the script can handle a large number of log lines without the need to copy the data. The use of asyncio.Queue() object for communication is a best practice in asynchronous programming that allows the coroutines to run in parallel.

from fabric import Connection
import asyncio
import datetime
import time
from asyncio.queues import Queue

host = '1337-demo-host2.org'
user = 'almalinux'
key_filename = '/Users/user/.ssh/id_rsa'
log_file = '/var/log/nginx/access.log'
delay = 5
buffer_lines = 5

async def producer(queue: Queue):
    conn = Connection(host, user=user, connect_kwargs={'key_filename': key_filename})
    with conn.cd('/tmp'):
        tail_cmd = f'sudo tail -n {buffer_lines} {log_file}'
        old_data = ()
        while True:
            channel = conn.run(command=tail_cmd, hide='both')
            data = channel.stdout.splitlines()
            if old_data != data:
                old_d=set(old_data)
                delta_data = [x for x in data if x not in old_d]
                for d in delta_data:
                    await queue.put(d)
            old_data = data.copy()
            data.clear()
            await asyncio.sleep(delay)
    conn.close()

async def consumer(queue: Queue):
    while True:
        line = await queue.get()
        dt = datetime.datetime.fromtimestamp(time.time())
        print (f"TS: {dt}\n{line}", end = '\n', flush = True)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(producer(queue), consumer(queue))

if __name__ == '__main__':
    asyncio.run(main())

Enter fullscreen mode Exit fullscreen mode

Overall, this second version demonstrates how to use a queue for inter-coroutine communication in the Producer-Consumer concurrency design pattern with asyncio coroutines to monitor log file on remote machine. The use case here is trivial but this technique can be applied to a wide range of concurrent programming problems, making it a useful tool for developing scalable and high-performance systems.

Example:

# python3 producer-consumer-tail-logfile-over-ssh-with-queue.py
TS: 2023-02-25 01:12:23.705987
XXX.XX.XX.XX - - [24/Feb/2023:23:50:35 +0000] "GET / HTTP/1.1" 200 4727 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706113
XXX.XX.XX.XX - - [24/Feb/2023:23:50:36 +0000] "GET / HTTP/1.1" 200 4727 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706146
XXX.XX.XX.XX - - [24/Feb/2023:23:50:58 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706171
XXX.XX.XX.XX - - [25/Feb/2023:00:03:59 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706193
XXX.XX.XX.XX - - [25/Feb/2023:00:04:02 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:33.965717
XXX.XX.XX.XX - - [25/Feb/2023:00:12:29 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:18:53.170400
XXX.XX.XX.XX - - [25/Feb/2023:00:18:52 +0000] "RAW / HTTP/1.1" 404 146 "-" "Mozilla/5.0 zgrab/0.x" "-"
TS: 2023-02-25 01:30:35.124575
XXX.XX.XX.XX - - [25/Feb/2023:00:30:31 +0000] "GET /.env HTTP/1.1" 404 146 "-" "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50" "-"
TS: 2023-02-25 01:36:13.271236
XXX.XX.XX.XX - - [25/Feb/2023:00:36:12 +0000] "GET / HTTP/1.1" 200 4727 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:106.0) Gecko/20100101 Firefox/106.0" "-"
TS: 2023-02-25 01:37:40.304185
XXX.XX.XX.XX - - [25/Feb/2023:00:37:38 +0000] "GET /0bef HTTP/1.0" 400 248 "-" "-" "-"
Enter fullscreen mode Exit fullscreen mode

UPDATE:

delta_data = [x for x in data if x not in old_d] ?

This is a Python list comprehension that creates a new list called delta_data based on two input lists, data and old_d. The resulting list contains elements that are present in data, but not in old_d. With this we only put new lines to the Queue (queue.put()).

Break down of the code

delta_data: the new list we are creating.
data: the input list from which we will extract elements.
old_d: the reference list that we will compare against.
x: a ("temporary") variable that will hold each element of data as we iterate through it.
for x in data: a loop that iterates through each element of data.
if x not in old_d: a condition that checks whether the element x is not in old_d.
[x for x in data if x not in old_d]: the list comprehension that creates a new list delta_data by filtering data based on the above condition.

Summary

Overall, this code is useful for finding the elements that have been added to a list (data) since the last time it was checked, by comparing it to a reference list (old_d).

The resulting delta_data list contains only the new lines from monitored log file - this is what we need not to repeat messages. This also makes the code put nothing to Queue when no new data was observed (since last check).

Top comments (1)

Collapse
 
xsub profile image
Pawel Suchanecki

Hi again! 👊

I case you are wondering about this line:

delta_data = [x for x in data if x not in old_d]

-- please, check the update: a new paragraph was added with explanation!

Pawel