Design and implementation using RabbitMQ and Debezium
Using kafka as a queue, and debezium connector (part of kafka connect) for monitoring the database, the following components are used:
Debezium Server: Standalone software that monitors a database for changes, and sends the changes to a queue system (like RabbitMQ).
RabbitMQ Broker: queuing system that keeps the database changes events into queues.
RabbitMQ Consumer: consumes the database events from a RabbitMQ queue
The guide (using RabbitMQ as a queue broker) is divided into three parts:
High-Level Design: A brief description of each component's functionality and how they are interconnected.
Implementation with Docker Compose: A step-by-step guide to setting up the components using a Docker Compose file.
RabbitMQ Consumer Integration: How to integrate the RabbitMQ Consumer with the Sapient app.
High level design
The high level design, is depicted in the following image:
The main components of the system are:
Debezium Server: monitors the PostgrSQL for databsae changes. If any change occurs (CREATE, DELETE, and UPDATE, not the structural ones) an event is send in the RabbitMQ broker
RabbitMQ Broker: stores the database changes in multiple queues. In our case, because we need each change to be broadasted to all queues, the exchange type is set to fanout.
RabbitMQ consumer integration to Sapient app: Consumes the database changes from a queue. Each queue is related to each app instance and is bound to a signle exchange.
Implementation through a Docker Compose file
create a folder named debezium_conf. On this folder, create a file named application.properties.
The file contains the Debezium Server configuration. Two crucial parts of the configuration are:
source configuration (the database that we want to monitor, postgreSQL in our case)
sink configuration (the queue system that we want to send the changes, RabbitMQ in our case)
the contents of application.properties file:
# Sink connector config - RabbitMQ
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=rabbitmq
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.ackTimeout=3000
debezium.sink.rabbitmq.exchange=db_changes
# Source connector config - PostgreSQL
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.database.dbname=sapient_insights
debezium.source.database.hostname=database
debezium.source.database.password=camlin
debezium.source.database.port=5432
debezium.source.database.user=camlin
debezium.source.offset.flush.interval.ms=0
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.plugin.name=pgoutput
debezium.source.topic.prefix=tutorial
debezium.source.table.include.list=public.preferences
a sour
# Key Converter configuration
debezium.source.key.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.key.converter.schemas.enable=false
# Value Converter configuration
debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.value.converter.schemas.enable=false
# Format config
debezium.format.key=json
debezium.format.value=json
# Quarkus
quarkus.log.console.json=false
# SMT configuration
debezium.transforms=flatten
debezium.transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
debezium.transforms.flatten.delimiter=.
add debezium server to docker compose file:
services:
...
debezium:
image: debezium/server:2.7
healthcheck:
test: curl http://debezium:8080/q/health || exit 1
interval: 5s
timeout: 5s
retries: 5
depends_on:
- database
- rabbitmq
restart: on-failure
volumes:
- ./debezium_conf:/debezium/conf:readonly
ports:
- "8080:8080"
...
create a folder named rabbitmq_conf. On this folder, create three files:
definitions.json: defines an exchange, a user, and the permissions for this user. The excahnge is fanout type:
{
"rabbit_version": "3.13.1",
"rabbitmq_version": "3.13.1",
"product_name": "RabbitMQ",
"product_version": "3.13.1",
"vhosts": [
{
"name": "/"
}
],
"users": [
{
"name": "guest",
"password_hash": "9/1i+jKFRpbTRV1PtRnzFFYibT3cEpP92JeZ8YKGtflf4e/u",
"tags": ["administrator"]
}
],
"permissions": [
{
"user": "guest",
"vhost": "/",
"configure": ".*",
"read": ".*",
"write": ".*"
}
],
"exchanges": [
{
"name": "db_changes",
"vhost": "/",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
]
}
rabbitmq.conf, defines directory in which the above definitions are loaded. Moreover, a default user, and the listening ports are set.
default_user = guest
default_pass = guest
listeners.tcp.default = 5672
management.tcp.port = 15672
management.load_definitions = /etc/rabbitmq/definitions.json
Dockerfile: the dockerfile from wich RabbitMQ is created (takes into account the definitions and the rabbitMQ configuration).
FROM rabbitmq:3-management
COPY rabbitmq.conf /etc/rabbitmq
COPY definitions.json /etc/rabbitmq
add RabbitMQ to docker compose file:
services:
....
rabbitmq:
build:
context: ./rabbitmq_conf/
dockerfile: Dockerfile
image: rabbitmq-demo
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
- "25672:25672"
....
RabbitMQ Consumer integration
We need a mechanism for receiving the database change notifications from the RabbitMQ Broker in an asynchronous way (non-blocking). This is possible through a RabbitMQ Consumer. The aiopika, provides an asynchronous solution for performing this, more information about aiopika, see this link.
add the following lines:
import backoff
import json
import uuid
from aio_pika import connect_robust, IncomingMessage
from loguru import logger
from .websocket import manager
from . import processor
from ..config import config
RABBIT_URL_CONNECTION = (
f"amqp://{config.RABBIT_USER_NAME}:{config.RABBIT_USER_NAME}@{config.RABBIT_HOST}/"
)
@backoff.on_exception(
backoff.expo,
Exception,
max_tries=7,
jitter=backoff.full_jitter,
on_backoff=lambda details: logger.warning(
f"Retrying to connect to RabbitMQ: attempt {details['tries']}"
),
)
async def start_rabbit_consumer():
connection = await connect_robust(RABBIT_URL_CONNECTION)
channel = await connection.channel()
queue = await channel.declare_queue(str(uuid.uuid4()), durable=True)
await queue.bind(config.RABBIT_EXCHANGE)
await queue.consume(process_message, no_ack=False)
async def start_consumer():
try:
await start_rabbit_consumer()
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
async def process_message(message: IncomingMessage):
async with message.process():
message_dict = json.loads(message.body.decode("utf-8"))
transformed_message = processor.transform(None, message_dict)
await manager.send_message(json.dumps(transformed_message))
Top comments (0)