Introduction
Software development professionals are hooked on microservice architecture for its flexibility, scalability, and easier maintenance of applications. But when multiple data requests come in from independent services, it can quickly become overwhelming. That’s where message queues come into play—allowing the asynchronous exchange of data among numerous services while mitigating the risks of service failures and downtime mishaps. Redis streams emerge as a highly efficient message queue system specifically designed to meet the demands of microservice architectural design.
Redis stream features allow multiple producers to write messages that consumers can read in order through reliable microservice processing with auto-retention functionalities and acknowledgments.
In this article, we’ll demonstrate how Python coupled with Redis modules connects you to a Redis stream using queue writing operations enclosed within a Python class flexibly transformed into command-line interface applications. As you finish reading this piece, Redis streams will no longer be rocket science between us since we’ve equipped you with everything essential to implementing them efficiently in your future microservices projects! So what are you waiting for? Let’s dive right into it!
Redis streams
Have you heard about Redis streams? They’re an amazing data structure that lets you store and analyze multiple time-series events in real-time. You can use them to track all sorts of events, from user activity on a website to sensor data from IoT devices or messages in a chat system.
Each event in a Redis stream gets a unique ID, generated automatically by Redis. These IDs are sequential numbers that start at 0 and increase by 1 with every new event added to the stream. And once an event is added, it can’t be changed or removed, making Redis streams perfect for capturing and storing data over time.
But that’s not all! Redis streams also let you consume events in real-time. Consumers can read events from a stream by specifying a starting point, like the ID of the last event processed. And as new events are added, consumers can keep reading in real-time, allowing for instant data processing and analysis.
And if that’s not enough, Redis streams support consumer groups. Multiple consumers can read from the same stream, with each group maintaining its own position in the stream. This means you can balance the load and ensure fault tolerance for your data processing needs.
Redis streams are a powerful tool for capturing, storing, and analyzing time-series events. So why not give them a try? Your data will thank you!
Redis Stream Client Example
For the creation of the example project, we are going to use Poetry, but you can follow the example code with any tool capable of creating a Python virtual environment.
Also you can follow the code explanation cloning the code in the Github repository
The following code defines a class called RedisStreamReader
, which is responsible for consuming messages from a Redis stream using a consumer group:
import redis
class RedisStreamReader:
def __init__(self, stream_key, group_name, consumer_name,
server='localhost', port=6379):
self.redis_client = redis.Redis.from_url(f"redis://{server}:{port}")
self.stream_key = stream_key
self.group_name = group_name
self.consumer_name = consumer_name
# self.redis_client.xgroup_create(self.stream_key, self.group_name, mkstream=True)
self.consumer = self.redis_client.xreadgroup(
self.group_name,
self.consumer_name,
{self.stream_key: ">"})
def publish_message(self, message):
self.redis_client.xadd(self.stream_key, message)
def consume_messages(self, consumer_group, consumer_name,
last_id='>', count=1):
messages = self.redis_client.xreadgroup(
groupname=consumer_group,
consumername=consumer_name,
streams={self.stream_key: last_id},
count=count,
block=0
)
return messages
The above code imports the Redis Python library using import redis
.
Next, a class called RedisStreamReader
is defined. The purpose of this class is to provide a way to read and write to a Redis stream using a specified consumer group and consumer name.
On the __init__
method a connection is created to the Redis server, specify the stream key, group name, and consumer name, and the the consumer is initialized. The method takes the following parameters:
stream_key: The name of the Redis stream to read from.
group_name: The name of the consumer group that is associated with the stream.
consumer_name: The name of the consumer within the consumer group.
server: The Redis server hostname or IP address. The default value is “localhost”.
port: The Redis server port number. Redis default port is 6379.
Within the __init__
method, the Redis client is initialized using the redis.Redis.from_url
method, which takes a Redis server URL as its parameter. The stream key, group name, and consumer name are also set as instance variables.
After the initialization of the internal data, the xreadgroup
method is called on the Redis client object to initialize the consumer. This method takes tree parameters:
group_name: The name of the consumer group to read from.
consumer_name: The name of the consumer within the consumer group.
{self.stream_key: “>”}: A dictionary specifying the stream key to read from and the position to start reading from (in this case, “>” means to start reading from the latest message).
The publish_message
method is used to add a new message to the Redis stream. It takes a message parameter, which is a dictionary containing the message data. The method calls the xadd
method on the Redis client object to add the message to the stream.
The consume_messages
method is used to read messages from the Redis stream and it takes the following parameters:
consumer_group: The name of the consumer group to read from.
consumer_name: The name of the consumer within the consumer group.
last_id: The ID of the last message that was read. Default is “>”, meaning to start reading from the latest message.
count: The maximum number of messages to read. Default is 1.
block: The maximum amount of time to wait for new messages to arrive, in milliseconds. Default is 0, meaning to wait indefinitely.
The method calls the xreadgroup
method on the Redis client object to read messages from the stream. The xreadgroup
method takes the following parameters:
groupname: The name of the consumer group to read from.
consumername: The name of the consumer within the consumer group.
streams: A dictionary specifying the stream key to read from and the ID of the last message that was read.
count: The maximum number of messages to read.
block: The maximum amount of time to wait for new messages to arrive, in milliseconds.
The method returns a list of tuples, where each tuple contains the stream key and a list of messages. Each message is represented as a tuple containing the message ID and a dictionary of message data.
This is a very simple example of how to use the Python redis-py package to connect to a Redis server and consume messages from a stream, you can play a little with this code but we are going to play a little more with this knowledge and take more serious actions 😊
Create a Redis client and run a Redis server for testing our code
Now we are going to create a more real Redis stream consumer and test it with a Redis server running into a Docker container.
For this task we are create a Redis client with some more functionality to connect to a Redis server and then use the “Pytest” library to test the new Redis stream consumer.
The code for the new class is this:
import redis
class RedisClient:
def __init__(self, stream_key, group_name, consumer_name,
host='localhost', port=6379):
self.host = host
self.port = port
self.stream_key = stream_key
self.group_name = group_name
self.consumer_name = consumer_name
self.redis_client = redis.Redis(host=self.host,
port=self.port)
def is_connected(self):
return self.redis_client.ping()
def publish_message(self, message):
self.redis_client.xadd(self.stream_key, message)
def consume_messages(self, last_id='>', count=1):
messages = self.redis_client.xreadgroup(
groupname=self.group_name,
consumername=self.consumer_name,
streams={self.stream_key: last_id},
count=count,
block=0
)
return messages
def ack(self, message_id):
self.redis_client.xack(self.stream_key, self.group_name, message_id)
def nack(self, message_id):
self.redis_client.xack(self.stream_key,
self.group_name,
message_id,
False)
def close(self):
self.redis_client.close()
This is a Python class called RedisClient that provides methods for working with Redis streams.
At the top of the code, it imports the redis module, which is a Python client for Redis that allows Python programs to communicate with Redis servers.
This new RedisClient class has six methods. Let’s go through each one and shoot some explanation:
- __init__: This is the constructor method that initializes the RedisClient instance. It takes five arguments:
stream_key
, which is the name of the Redis stream;group_name
, which is the name of the consumer group;consumer_name
, which is the name of the consumer;host
, which is the hostname of the Redis server (default is ‘localhost’); andport
, which is the port number of the Redis server (default is 6379). The method also creates a Redis client object using theredis.Redis
function. - is_connected: This method simply checks if the Redis client is connected to the server by sending a ping command to the Redis server using the
ping
method. It returnsTrue
if the Redis client is connected, andFalse
otherwise. - publish_message: This method adds a new message to the Redis stream specified by
stream_key
param. It uses thexadd
method of the Redis client to add a new message to the stream. - consume_messages: This method reads messages from the Redis stream specified by
stream_key
using a consumer group specified bygroup_name
andconsumer_name
. The method reads messages starting from the ID specified bylast_id
, which defaults to ‘>’, that’s mean the most recent message arrived in the stream. It also readscount
messages, which defaults to 1. It uses thexreadgroup
method of the Redis client to read messages from the stream. - ack: This method acknowledges a message that has been processed by a consumer in the consumer group. It uses the
xack
method of the Redis client to acknowledge the message. - nack: This method rejects a message that has been processed by a consumer in the consumer group. It uses the
xack
method of the Redis client with theFalse
value for the parameter to reject the message. - Finally, the close method closes the Redis client connection using the also called
close
method of the Redis client.
Create some tests to see how this Redis service works.
We are now going to test the functionality of this new defined class, and for this task, a running Redis server is needed. The Redis server can be installed locally on your PC, but in this little tutorial, we are going to use a Docker container running a Redis instance.
We download the Docker official image in this way:
docker pull redis
After that, a Redis image is ready to use in our PC. For running the Redis server run a new container based on that image with the following command:
docker run --name redis-server -p 6379:6379 -d redis
This command creates a container named “my-redis” using the downloaded Redis image, forwards the Redis port 6379 from the container to the host’s 6379 port, and runs the container in detached mode (-d option).
You can now connect to the Redis server that is running in the Docker container from your local PC by using the Redis client and connecting to “localhost:6379”. We recommend the use of “RedisInsight” a Redis graphical user client, but you can use the same Docker image for connecting to the running container server with the following command:
docker run -it --network some-network --rm redis redis-cli -h redis-server
docker stop my-redis && docker rm my-redis
Testing the client code
These tests cover the basic functionality of the Redis client, ensuring that messages can be published to the stream and consumed by a consumer group.
The following provided code is a set of unit tests written using the “Pytest” testing framework. The tests are designed to test the functionality of the RedisClient
class, which is defined in the simple_redisclient.py
module.
import pytest
from redis_stream.redis_client import RedisClient
stream_name = 'mystream'
consumer_group = 'group1'
consumer_name = 'consumer1'
@pytest.fixture
def redis_client():
return RedisClient(stream_name, group_name=consumer_group,
consumer_name=consumer_name,
host='localhost', port=6379)
def test_publish_message(redis_client):
# Clear all the data in the test stream
# Use XTRIM to remove all messages from the stream
redis_client.redis_client.xtrim(stream_name, maxlen=0)
message = {'name': 'Bob', 'age': '25'}
redis_client.publish_message(message)
result = redis_client.redis_client.xread({redis_client.stream_key: 0},
count=1)
res_msg = {key.decode('utf-8'): value.decode('utf-8')
for key, value in result[0][1][0][1].items()}
assert res_msg == message
def test_consume_messages(redis_client):
# consumer_group = 'group1'
# consumer_name = 'consumer1'
last_id = '>'
count = 3
redis_client.redis_client.xtrim(stream_name, maxlen=0)
# publish some messages to the stream
messages = [
{'name': 'Charlie', 'age': '35'},
{'name': 'David', 'age': '40'},
{'name': 'Eve', 'age': '45'}
]
for message in messages:
redis_client.publish_message(message)
# consume messages from the stream
result = redis_client.consume_messages(
# consumer_group,
# consumer_name,
last_id=last_id,
count=count)
res_message = {key.decode('utf-8'): value.decode('utf-8')
for key, value in result[0][1][0][1].items()}
assert len(result) == 1
assert len(result[0][1]) == count
assert res_message == messages[0]
The first test, test_publish_message
, tests the publish_message
method of the RedisClient
class. This method is used to publish (or produce) a message to a Redis stream. In the test, the stream is first cleared using the xtrim
method, and then a message is published using the publish_message
method. The test then reads the message from the stream using the xread
method and checks that the message is the same as the one that was published.
The second test, test_consume_messages
, tests the consume_messages
method of the RedisClient
class. This method is used to consume messages from a Redis stream using a consumer group. In the test, the stream is first cleared using the xtrim
method, and then three messages are published to the stream using the publish_message
method. The test then consumes the messages from the stream using the consume_messages
method and checks that the messages are the same as the ones that were published. It’s important to note that the xtrim
method employed clears the stream structure and assure that only the future messages inserted for the test remain in the stream, for further retrieving the same messages from the read operation (consumer operation).
Both tests use the assert statement to check that the expected results are returned. If the expected results are not returned, the tests will fail.
The test then is invocated with the calling the PyTest command bellow:
pytest
Pytest will find the defined tests into the “tests” folder into the “Poetry” project structure and run all of them.
The output in the console must be similar to this:
==================================================================== test session starts =====================================================================
platform linux -- Python 3.10.6, pytest-7.3.1, pluggy-1.0.0
rootdir: /home/mark/redis_stream/redis_stream
collected 2 items
tests/test_redis_stream_client.py .. [100%]
===================================================================== 2 passed in 0.03s ======================================================================
Create a little server to run our Redis stream consumer.
To call the above code in a CLI application, you can create an instance of the RedisStreamReader
class with the appropriate parameters (we provide some example), and then use its methods to read and acknowledge messages from the stream. For example:
import argparse
from redis_stream.redis_client import RedisClient
default_data = {
'stream_key': 'mystream',
'group_name': 'group1',
'consumer_name': 'consumer1'
}
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--stream', default=default_data['stream_key'],
help='Redis stream key to read from')
parser.add_argument('--group', default=default_data['group_name'],
help='Consumer group name')
parser.add_argument('--consumer', default=default_data['consumer_name'],
help='Consumer name')
parser.add_argument('--server', default='localhost',
help='Redis server host')
parser.add_argument('--port', default=6379, type=int,
help='Redis server port')
return parser.parse_args()
def main():
args = parse_args()
redis_client = RedisClient(args.stream, args.group, args.consumer,
host=args.server, port=args.port)
try:
if redis_client.is_connected():
print('Connected to Redis Server')
while True:
for message in redis_client.consume_messages(last_id='>'):
# process the message
print(f'Recived message: {message}')
else:
print('Error: Redis client is not connected')
except KeyboardInterrupt:
print('\nKeyboard Interrupt: Exiting...')
redis_client.close()
exit()
if __name__ == '__main__':
main()
In this example, we define a main
function that parses the command line arguments using the “argparse” module, creates an instance of RedisStreamReader
, and then loops through the messages returned by the read
method, processing each message and acknowledging it using the ack
method. If the Redis client is not connected, an error message is printed.
To run the CLI application, you can execute the Python script with the appropriate command line arguments. For example:
python redis_stream/main.py
Now you can insert some messages to the stream and see how the messages are delivered to the consumer. For example we can send some messages with the Redis client …
XADD mystream * msg hello
… and the result will be displayed into the console:
Recived message: [b'mystream', [(b'1681988838361-0', {b'msg': b'hello1'})]]
Conclusion
After delving into the topic of Redis streams and Python clients, it’s clear that the benefits of using these technologies in tandem are truly remarkable. Our team has conducted rigorous testing and development to bring forth a prime example of an effective server that utilizes this functionality.
By tapping into the strength of Redis streams and Python, we’ve been able to devise an efficient and dependable system for handling incoming data. The beauty of this approach lies in its ability to allow for instant processing and analysis of data streams, making it the perfect solution for data analytics, IoT applications, and real-time communication.
The marriage of Redis streams and Python is truly a match made in heaven when it comes to managing data streams and constructing practical solutions for contemporary data-driven applications. Having this knowledge at hand, developers can confidently incorporate this technology into their own projects and fully harness the power of real-time data management.
In future articles we are going to provide more concise examples of how to use this technology in the creation of the event driven microservices architectures.