How to make a fanout pattern solution with RabbitMQ, using Infra as Code and scripts Python

Marcos
5 min readJun 3, 2023

--

Hello, I am Marcos and I made this text in order to show one type of messaging solution called fan-out.

What is fan-out?

Fan-out pattern is a messaging solution where the messages are broadcast in a one-to-many way. The main concept here is the capability of one sender can send the same message to multiple receivers.

We will use RabbitMQ to implement this solution. We can use this site to design and visualize the working solution, take a look:

What is RabbitMQ?

RabbitMQ is an open-source messaging service that makes use of the AMQP (Advanced Message Queuing Protocol) protocol. Rabbit is compatible with many programming languages and allows you to handle message traffic simply and reliably. It is worth mentioning that it also has a native administration interface and is cross-platform.

How are we going to use it locally?

There are two options for using RabbitMQ locally: install or use a docker container. In this example, we are going to use docker and create the infrastructure using infra as code.

Creating a docker-compose.yaml file

version: '3'

services:
rabbitmq:
build:
context: ./rabbitmq
dockerfile: Dockerfile
container_name: rabbitmq
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=password
ports:
- "5672:5672"
- "15672:15672"
volumes:
- ./conf/myrabbit.conf:/etc/rabbitmq/rabbitmq.config
networks:
- messaging

networks:
messaging:
driver: bridge

Now, create a directory called rabbitmq and the files inside it:

rabbitmq.conf

default_user = user
default_pass = password
listeners.tcp.default = 5672
management.tcp.port = 15672
management.load_definitions = /etc/rabbitmq/definitions.json

definitions.json

{
"vhosts":[
{
"name": "${RABBITMQ_DEFAULT_VHOST}"
}
],
"users": [
{
"name": "user",
"password": "password",
"tags": "administrator"
}
],
"permissions":[
{
"user": "user",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"policies":[
{
"vhost":"${RABBITMQ_DEFAULT_VHOST}",
"name":"lagoon-ha",
"pattern":"${RABBITMQ_DEFAULT_HA_PATTERN}",
"definition":
{
"ha-mode":"exactly",
"ha-params":2,
"ha-sync-mode":"automatic",
"ha-sync-batch-size":5
}
}
],
"queues": [
{
"name": "queue-1",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"durable": true,
"auto_delete": false,
"arguments": {
"x-queue-type": "classic"
}
},
{
"name": "queue-2",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"durable": true,
"auto_delete": false,
"arguments": {
"x-queue-type": "classic"
}
},
{
"name": "queue-3",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"durable": true,
"auto_delete": false,
"arguments": {
"x-queue-type": "classic"
}
}
],
"exchanges": [
{
"name": "fanout_exchange",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
],
"bindings": [
{
"source": "fanout_exchange",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"destination": "queue-1",
"destination_type": "queue",
"routing_key": "queue-1-rk",
"arguments": {}
},
{
"source": "fanout_exchange",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"destination": "queue-2",
"destination_type": "queue",
"routing_key": "queue-2-rk",
"arguments": {}
},
{
"source": "fanout_exchange",
"vhost": "${RABBITMQ_DEFAULT_VHOST}",
"destination": "queue-3",
"destination_type": "queue",
"routing_key": "queue-3rk",
"arguments": {}
}
]
}

Dockerfile

FROM rabbitmq:3.9-management

COPY definitions.json /etc/rabbitmq/definitions.json
COPY rabbitmq.conf /etc/rabbitmq/rabbitmq.conf

RUN cat /etc/rabbitmq/rabbitmq.conf

That’s all.

Explaining what I am doing here, at first we are setting user, permissions, policies, etc.

"vhosts":[
#..
],
"users": [
#..
],
"permissions":[
#..
],
"policies":[
#..
]

And here I am creating the queue, binding, exchanges, etc:

"queues": [
#..
],
"exchanges": [
#..
],
"bindings": [
#..
]
}

The structure of these files is this:

docker-compose.yaml
/rabbitmq/
--/definitions.json
--/Dockerfile
--/rabbitmq.conf

Now, we must run these two commands:

- docker-compose build
- docker-compose up -d

Now access http://localhost:15672, the credentials we already set in the file definitions.json. It is possible to see that the queues and exchange were created, and how the exchange were configured:

Sending messages to the exchange using Python

Here a let a simple script using pika:

import pika

credentials = pika.PlainCredentials('user', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',
5672,
'${RABBITMQ_DEFAULT_VHOST}',
credentials
))


channel = connection.channel()

message = 'test message sended';
channel.basic_publish(exchange='fanout_exchange',
routing_key='',
body=message)
print(" [x] ", message)

connection.close()

After executing some times, the queues received the messages.

The main thing here is that I only send one message to the exchange, and the exchange sends it to the queues. The producer does not know how many queues exist.

Consuming messages with Python

Here I created 3 scripts in order to consume all the queues:

import pika, sys, os

def main():
credentials = pika.PlainCredentials('user', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',
5672,
'${RABBITMQ_DEFAULT_VHOST}',
credentials
))
channel = connection.channel()
queueName = 'queue-1'
channel.queue_declare(queue=queueName, durable=True)


def callback(ch, method, properties, body):
print(" [x] Received %r" % body)

channel.basic_consume(queue=queueName, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:

queue 2

import pika, sys, os

def main():
credentials = pika.PlainCredentials('user', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',
5672,
'${RABBITMQ_DEFAULT_VHOST}',
credentials
))
channel = connection.channel()

queueName = 'queue-2'
channel.queue_declare(queue=queueName, durable=True)

def callback(ch, method, properties, body):
print(" [x] Received %r" % body)

channel.basic_consume(queue=queueName, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

queue-3

import pika, sys, os

def main():
credentials = pika.PlainCredentials('user', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',
5672,
'${RABBITMQ_DEFAULT_VHOST}',
credentials
))
channel = connection.channel()

queueName = 'queue-3'
channel.queue_declare(queue=queueName, durable=True)

def callback(ch, method, properties, body):
print(" [x] Received %r" % body)

channel.basic_consume(queue=queueName, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

All had the same result:

Looking at admin page, it’s possible to see that the messages were consumed

That’s it for today.

The code is here: https://github.com/mmarcosab/rabbitmq-fan-out

--

--