DEV Community

Konstantinos Blatsoukas
Konstantinos Blatsoukas

Posted on

Debezium integration

Design and implementation using RabbitMQ and Debezium

Using kafka as a queue, and debezium connector (part of kafka connect) for monitoring the database, the following components are used:

Debezium Server: Standalone software that monitors a database for changes, and sends the changes to a queue system (like RabbitMQ).

RabbitMQ Broker: queuing system that keeps the database changes events into queues.

RabbitMQ Consumer: consumes the database events from a RabbitMQ queue

The guide (using RabbitMQ as a queue broker) is divided into three parts:

High-Level Design: A brief description of each component's functionality and how they are interconnected.

Implementation with Docker Compose: A step-by-step guide to setting up the components using a Docker Compose file.

RabbitMQ Consumer Integration: How to integrate the RabbitMQ Consumer with the Sapient app.

High level design

The high level design, is depicted in the following image:

Image description

The main components of the system are:

Debezium Server: monitors the PostgrSQL for databsae changes. If any change occurs (CREATE, DELETE, and UPDATE, not the structural ones) an event is send in the RabbitMQ broker

RabbitMQ Broker: stores the database changes in multiple queues. In our case, because we need each change to be broadasted to all queues, the exchange type is set to fanout.

RabbitMQ consumer integration to Sapient app: Consumes the database changes from a queue. Each queue is related to each app instance and is bound to a signle exchange.

Implementation through a Docker Compose file

create a folder named debezium_conf. On this folder, create a file named application.properties.

The file contains the Debezium Server configuration. Two crucial parts of the configuration are:

source configuration (the database that we want to monitor, postgreSQL in our case)

sink configuration (the queue system that we want to send the changes, RabbitMQ in our case)

the contents of application.properties file:



# Sink connector config - RabbitMQ
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=rabbitmq
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.ackTimeout=3000
debezium.sink.rabbitmq.exchange=db_changes

# Source connector config - PostgreSQL
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.database.dbname=sapient_insights
debezium.source.database.hostname=database
debezium.source.database.password=camlin
debezium.source.database.port=5432
debezium.source.database.user=camlin
debezium.source.offset.flush.interval.ms=0
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.plugin.name=pgoutput
debezium.source.topic.prefix=tutorial
debezium.source.table.include.list=public.preferences
a sour
# Key Converter configuration
debezium.source.key.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.key.converter.schemas.enable=false

# Value Converter configuration
debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.value.converter.schemas.enable=false

# Format config
debezium.format.key=json
debezium.format.value=json

# Quarkus
quarkus.log.console.json=false

# SMT configuration
debezium.transforms=flatten
debezium.transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
debezium.transforms.flatten.delimiter=.



Enter fullscreen mode Exit fullscreen mode

add debezium server to docker compose file:



services:
  ...
    debezium:
      image: debezium/server:2.7
      healthcheck:
        test: curl http://debezium:8080/q/health || exit 1
        interval: 5s
        timeout: 5s
        retries: 5
      depends_on:
        - database
        - rabbitmq
      restart: on-failure
      volumes:
        - ./debezium_conf:/debezium/conf:readonly
      ports:
        - "8080:8080"
  ...



Enter fullscreen mode Exit fullscreen mode

create a folder named rabbitmq_conf. On this folder, create three files:

definitions.json: defines an exchange, a user, and the permissions for this user. The excahnge is fanout type:



{
  "rabbit_version": "3.13.1",
  "rabbitmq_version": "3.13.1",
  "product_name": "RabbitMQ",
  "product_version": "3.13.1",
  "vhosts": [
    {
      "name": "/"
    }
  ],
  "users": [
    {
      "name": "guest",
      "password_hash": "9/1i+jKFRpbTRV1PtRnzFFYibT3cEpP92JeZ8YKGtflf4e/u",
      "tags": ["administrator"]
    }
  ],
  "permissions": [
    {
      "user": "guest",
      "vhost": "/",
      "configure": ".*",
      "read": ".*",
      "write": ".*"
    }
  ],
  "exchanges": [
    {
      "name": "db_changes",
      "vhost": "/",
      "type": "fanout",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ]
}


Enter fullscreen mode Exit fullscreen mode

rabbitmq.conf, defines directory in which the above definitions are loaded. Moreover, a default user, and the listening ports are set.



default_user = guest
default_pass = guest

listeners.tcp.default = 5672
management.tcp.port = 15672
management.load_definitions = /etc/rabbitmq/definitions.json


Enter fullscreen mode Exit fullscreen mode

Dockerfile: the dockerfile from wich RabbitMQ is created (takes into account the definitions and the rabbitMQ configuration).



FROM rabbitmq:3-management

COPY rabbitmq.conf /etc/rabbitmq
COPY definitions.json /etc/rabbitmq


Enter fullscreen mode Exit fullscreen mode

add RabbitMQ to docker compose file:



services:
  ....
  rabbitmq:
    build:
      context: ./rabbitmq_conf/
      dockerfile: Dockerfile
    image: rabbitmq-demo
    container_name: rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
      - "25672:25672"
  ....


Enter fullscreen mode Exit fullscreen mode

RabbitMQ Consumer integration

We need a mechanism for receiving the database change notifications from the RabbitMQ Broker in an asynchronous way (non-blocking). This is possible through a RabbitMQ Consumer. The aiopika, provides an asynchronous solution for performing this, more information about aiopika, see this link.

add the following lines:



import backoff
import json
import uuid

from aio_pika import connect_robust, IncomingMessage
from loguru import logger

from .websocket import manager
from . import processor
from ..config import config


RABBIT_URL_CONNECTION = (
    f"amqp://{config.RABBIT_USER_NAME}:{config.RABBIT_USER_NAME}@{config.RABBIT_HOST}/"
)


@backoff.on_exception(
    backoff.expo,
    Exception,
    max_tries=7,
    jitter=backoff.full_jitter,
    on_backoff=lambda details: logger.warning(
        f"Retrying to connect to RabbitMQ: attempt {details['tries']}"
    ),
)
async def start_rabbit_consumer():
    connection = await connect_robust(RABBIT_URL_CONNECTION)
    channel = await connection.channel()

    queue = await channel.declare_queue(str(uuid.uuid4()), durable=True)
    await queue.bind(config.RABBIT_EXCHANGE)

    await queue.consume(process_message, no_ack=False)


async def start_consumer():
    try:
        await start_rabbit_consumer()
    except Exception as e:
        logger.error(f"Failed to connect to RabbitMQ: {e}")


async def process_message(message: IncomingMessage):
    async with message.process():
        message_dict = json.loads(message.body.decode("utf-8"))
        transformed_message = processor.transform(None, message_dict)
        await manager.send_message(json.dumps(transformed_message))



Enter fullscreen mode Exit fullscreen mode

Top comments (0)