How to make a fanout pattern solution with RabbitMQ, using Infra as Code and scripts Python
Hello, I am Marcos and I made this text in order to show one type of messaging solution called fan-out.
What is fan-out?
Fan-out pattern is a messaging solution where the messages are broadcast in a one-to-many way. The main concept here is the capability of one sender can send the same message to multiple receivers.
We will use RabbitMQ to implement this solution. We can use this site to design and visualize the working solution, take a look:
What is RabbitMQ?
RabbitMQ is an open-source messaging service that makes use of the AMQP (Advanced Message Queuing Protocol) protocol. Rabbit is compatible with many programming languages and allows you to handle message traffic simply and reliably. It is worth mentioning that it also has a native administration interface and is cross-platform.
How are we going to use it locally?
There are two options for using RabbitMQ locally: install or use a docker container. In this example, we are going to use docker and create the infrastructure using infra as code.
Creating a docker-compose.yaml file
version: '3'
services:
rabbitmq:
build:
context: ./rabbitmq
dockerfile: Dockerfile
container_name: rabbitmq
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=password
ports:
- "5672:5672"
- "15672:15672"
volumes:
- ./conf/myrabbit.conf:/etc/rabbitmq/rabbitmq.config
networks:
- messaging
networks:
messaging:
driver: bridge
Now, create a directory called rabbitmq and the files inside it:
rabbitmq.conf
default_user = user
default_pass = password
listeners.tcp.default = 5672
management.tcp.port = 15672
management.load_definitions = /etc/rabbitmq/definitions.json
definitions.json
{
"vhosts":[
{
"name": "${RABBITMQ_DEFAULT_VHOST}"
}
],
"users": [
{
"name": "user",
"password": "password",
"tags": "administrator"
}
],
"permissions":[
{
"user": "user",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"policies":[
{
"vhost":"${RABBITMQ_DEFAULT_VHOST}",
"name":"lagoon-ha",
"pattern":"${RABBITMQ_DEFAULT_HA_PATTERN}",
"definition":
{
"ha-mode":"exactly",
"ha-params":2,
"ha-sync-mode":"automatic",
"ha-sync-batch-size":5
}
}
],
"queues": [
{
"name": "queue-1",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"durable": true,
"auto_delete": false,
"arguments": {
"x-queue-type": "classic"
}
},
{
"name": "queue-2",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"durable": true,
"auto_delete": false,
"arguments": {
"x-queue-type": "classic"
}
},
{
"name": "queue-3",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"durable": true,
"auto_delete": false,
"arguments": {
"x-queue-type": "classic"
}
}
],
"exchanges": [
{
"name": "fanout_exchange",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
],
"bindings": [
{
"source": "fanout_exchange",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"destination": "queue-1",
"destination_type": "queue",
"routing_key": "queue-1-rk",
"arguments": {}
},
{
"source": "fanout_exchange",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"destination": "queue-2",
"destination_type": "queue",
"routing_key": "queue-2-rk",
"arguments": {}
},
{
"source": "fanout_exchange",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"destination": "queue-3",
"destination_type": "queue",
"routing_key": "queue-3rk",
"arguments": {}
}
]
}
Dockerfile
FROM rabbitmq:3.9-management
COPY definitions.json /etc/rabbitmq/definitions.json
COPY rabbitmq.conf /etc/rabbitmq/rabbitmq.conf
RUN cat /etc/rabbitmq/rabbitmq.conf
That’s all.
Explaining what I am doing here, at first we are setting user, permissions, policies, etc.
"vhosts":[
#..
],
"users": [
#..
],
"permissions":[
#..
],
"policies":[
#..
]
And here I am creating the queue, binding, exchanges, etc:
"queues": [
#..
],
"exchanges": [
#..
],
"bindings": [
#..
]
}
The structure of these files is this:
docker-compose.yaml
/rabbitmq/
--/definitions.json
--/Dockerfile
--/rabbitmq.conf
Now, we must run these two commands:
- docker-compose build
- docker-compose up -d
Now access http://localhost:15672, the credentials we already set in the file definitions.json. It is possible to see that the queues and exchange were created, and how the exchange were configured:
Sending messages to the exchange using Python
Here a let a simple script using pika:
import pika
credentials = pika.PlainCredentials('user', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',
5672,
'${RABBITMQ_DEFAULT_VHOST}',
credentials
))
channel = connection.channel()
message = 'test message sended';
channel.basic_publish(exchange='fanout_exchange',
routing_key='',
body=message)
print(" [x] ", message)
connection.close()
After executing some times, the queues received the messages.
The main thing here is that I only send one message to the exchange, and the exchange sends it to the queues. The producer does not know how many queues exist.
Consuming messages with Python
Here I created 3 scripts in order to consume all the queues:
import pika, sys, os
def main():
credentials = pika.PlainCredentials('user', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',
5672,
'${RABBITMQ_DEFAULT_VHOST}',
credentials
))
channel = connection.channel()
queueName = 'queue-1'
channel.queue_declare(queue=queueName, durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue=queueName, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
queue 2
import pika, sys, os
def main():
credentials = pika.PlainCredentials('user', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',
5672,
'${RABBITMQ_DEFAULT_VHOST}',
credentials
))
channel = connection.channel()
queueName = 'queue-2'
channel.queue_declare(queue=queueName, durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue=queueName, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
queue-3
import pika, sys, os
def main():
credentials = pika.PlainCredentials('user', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',
5672,
'${RABBITMQ_DEFAULT_VHOST}',
credentials
))
channel = connection.channel()
queueName = 'queue-3'
channel.queue_declare(queue=queueName, durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue=queueName, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
All had the same result:
Looking at admin page, it’s possible to see that the messages were consumed
That’s it for today.
The code is here: https://github.com/mmarcosab/rabbitmq-fan-out