loading...
Microsoft Azure

Tutorial: Set up a Change Data Capture architecture on Azure using Debezium, Postgres and Kafka

abhirockzz profile image Abhishek Gupta Updated on ・10 min read

Change Data Capture (CDC) is a technique used to track row-level changes in database tables in response to create, update and delete operations. Different databases use different techniques to expose these change data events - for example, logical decoding in PostgreSQL, MySQL binary log (binlog) etc. This is a powerful capability, but useful only if there is a way to tap into these event logs and make it available to other services which depend on that information.

Debezium does just that! It is a distributed platform that builds on top of Change Data Capture features available in different databases. It provides a set of Kafka Connect connectors which tap into row-level changes (using CDC) in database table(s) and convert them into event streams. These event streams are sent to Apache Kafka which is a scalable event streaming platform - a perfect fit! Once the change log events are in Kafka, they will be available to all the downstream applications.

This is different compared to the "polling" technique adopted by the Kafka Connect JDBC connector

The diagram (from the debezium.io website) summarises it nicely!

This blog is a guide to getting started with setting up a change data capture based system on Azure using Debezium, Azure DB for PostgreSQL and Azure Event Hubs (for Kafka). It will use the Debezium PostgreSQL connector to stream database modifications from PostgreSQL to Kafka topics in Azure Event Hubs

The related config files are available in the GitHub repo https://github.com/abhirockzz/

Although I have used managed Azure services for demonstration purposes these instructions should work for any other setup as well e.g. a local Kafka cluster and PostgreSQL instance.

Setup PostgreSQL and Kafka on Azure

This section will provide pointers on how to configure Azure Event Hubs and Azure DB for PostgreSQL. All you need is a Microsoft Azure account - go ahead and sign up for a free one!

Azure DB for PostgreSQL

Azure DB for PostgreSQL is a managed, relational database service based on the community version of open-source PostgreSQL database engine, and is available in two deployment modes.

At the time of writing, it supports PostgreSQL version 11.6

You can setup PostgreSQL on Azure using a variety of options including, the Azure Portal, Azure CLI, Azure PowerShell, ARM template. Once you've done that, you can easily connect to the database using you favourite programming language such as Java, .NET, Node.js, Python, Go etc.

Although the above references are for Single Server deployment mode, please note that Hyperscale (Citus) is another deployment mode you can use for "workloads that are approaching -- or already exceed -- 100 GB of data."

Please ensure that you keep the following PostgreSQL related information handy since you will need them to configure the Debezium Connector in the subsequent sections - database hostname (and port), username, password

Azure Event Hubs

Azure Event Hubs is a fully managed data streaming platform and event ingestion service. It also provides a Kafka endpoint that supports Apache Kafka protocol 1.0 and later and works with existing Kafka client applications and other tools in the Kafka ecosystem including Kafka Connect (demonstrated in this blog).

You can use the Azure Portal, Azure CLI, PowerShell or ARM template to create an Azure Event Hubs namespace and other resources. To ensure that the Kafka functionality is enabled, all you need to do is choose the Standard or Dedicated tier (since the Basic tier doesn't support Kafka on Event Hubs.)

After the setup, please ensure that you keep the Connection String handy since you will need it to configure Kafka Connect. You can do so using the Azure Portal or Azure CLI

Install Kafka

To run Kafka Connect, I will be using a local Kafka installation just for convenience. Just download Apache Kafka, unzip its contents and you're good to go!

Download Debezium connector and start Kafka Connect

To start with, clone this Git repo:

git clone https://github.com/abhirockzz/debezium-azure-postgres-cdc

cd debezium-azure-postgres-cdc

Download Debezium PostgreSQL source connector JARs

1.2.0 is the latest version at the time of writing

DEBEZIUM_CONNECTOR_VERSION=1.2.0

curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/${DEBEZIUM_CONNECTOR_VERSION}.Final/debezium-connector-postgres-${DEBEZIUM_CONNECTOR_VERSION}.Final-plugin.tar.gz --output debezium-connector-postgres.tar.gz

tar -xvzf debezium-connector-postgres.tar.gz

You should now see a new folder named debezium-connector-postgres. Copy the connector JAR files to your Kafka installation:

export KAFKA_HOME=[path to kafka installation e.g. /Users/foo/work/kafka_2.12-2.3.0]

cp debezium-connector-postgres/*.jar $KAFKA_HOME/libs

//confirm
ls -lrt $KAFKA_HOME/libs | grep debezium
ls -lrt $KAFKA_HOME/libs | grep protobuf
ls -lrt $KAFKA_HOME/libs | grep postgresql

Before starting the Kafka Connect cluster, edit the connect.properties file to include appropriate values for the following attributes: bootstrap.servers, sasl.jaas.config, producer.sasl.jaas.config, consumer.sasl.jaas.config (just replace the placeholders)

Start Kafka Connect cluster (I am running it in distributed mode):

export KAFKA_HOME=[path to kafka installation e.g. /Users/foo/work/kafka_2.12-2.3.0]

$KAFKA_HOME/bin/connect-distributed.sh connect.properties

Wait for the Kafka Connect instance to start - you should see Kafka Connect internal topics in Azure Event Hubs e.g.

Configure PostgreSQL

Before installing the connector, we need to:

  • Ensure that the PostgreSQL instance is accessible from your Kafka Connect cluster
  • Ensure that the PostrgeSQL replication setting is set to "Logical"
  • Create a table which you can use to try out the change data capture feature

If you're using Azure DB for PostgreSQL, create a firewall rule using az postgres server firewall-rule create command to whitelist your Kafka Connect host. In my case, it was a local Kafka Connect cluster, so I simply navigated to the Azure portal (Connection security section of my PostrgreSQL instance) and chose Add current client IP address to make sure that my local IP was added to the firewall rule as such:

To change the replication mode for Azure DB for PostgreSQL, you can use the az postgres server configuration command:

az postgres server configuration set --resource-group <name of resource group> --server-name <name of server> --name azure.replication_support --value logical

.. or use the Replication menu of your PostgreSQL instance in the Azure Portal:

After updating the configuration, you will need to re-start the server which you can do using the CLI (az postgres server restart) or the portal.

Once the database is up and running, create the table - I have used psql CLI in this example, but feel free to use any other tool. For example, to connect to your PostgreSQL database on Azure over SSL (you will be prompted for the password):

psql -h <POSTGRESQL_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

//example
psql -h my-pgsql.postgres.database.azure.com -p 5432 -U foo@my-pgsql -W -d postgres --set=sslmode=require

//to create the table
CREATE TABLE todos (id SERIAL, description VARCHAR(30), todo_status VARCHAR(10), PRIMARY KEY(id));

Install Debezium PostgreSQL source connector

Update the pg-source-connector.json file with the details for the Azure PostgreSQL instance. Here is an example:

{
    "name": "todo-test-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<POSTGRES_INSTANCE_NAME>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<DB_USER_NAME>",
        "database.password": "<PASSWORD>",
        "database.dbname": "<DB_NAME e.g. postgres>",
        "database.server.name": "<LOGICAL_NAMESPACE e.g. todo-server>",
        "plugin.name": "wal2json",
        "table.whitelist": "<TABLE_NAMES e.g. public.todos>"
    }
}

Let's go through the configuration:

For detailed info, check Debezium documentation

  • connector.class: name of the connector class (this is a static value)
  • database.hostname and database.port: IP address or hostname for your PostgreSQL instance as well as the port (e.g. 5432)
  • database.user and database.password: username and password for your PostgreSQL instance
  • database.dbname: database name e.g. postgres
  • database.server.name: Logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored.
  • table.whitelist: comma-separated list of regex specifying which tables you want to monitor for change data capture
  • plugin.name: name of the logical decoding plug-in e.g. wal2json

At the time of writing, Debezium supports the following plugins: decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput. I have used wal2json in this example, and it's supported on Azure as well!

Finally, install the connector!

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

Kafka Connect will now start monitoring the todos table for create, update and delete events

Change data capture in action ...

Insert records:

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

INSERT INTO todos (description, todo_status) VALUES ('install postgresql', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('install kafka', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup source connector', 'pending');

The connector should now spring into action and send the CDC events to a Event Hubs topic named <server name in config>.<table name> e.g. todo-server.public.todos

Let's introspect the contents of the topic to make sure everything is working as expected. I am using kafkacat in this example, but you can also create a consumer app using any of these options listed here

Update metadata.broker.list and sasl.password attributes in kafkacat.conf to include Kafka broker details. In a different terminal, use it to read the CDC payloads:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<kafka broker> e.g. for event hubs - my-eventhubs-namespace.servicebus.windows.net:9093
export TOPIC=<server config>.<table name> e.g. todo-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

You should see the JSON payloads representing the change data events generated in PostgreSQL in response to the rows you had just added to the todos table. Here is a snippet of the payload:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "install postgresql",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fullfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

The event consists of the payload along with its schema (omitted for brevity). In payload section, notice how the create operation ("op": "c") is represented - "before": null means that this was a newly INSERTed row, after provides values for the each columns in the row, source provides the PostgreSQL instance metadata from where this event was picked up etc.

You can try the same with update or delete operations as well and introspect the CDC events, e.g.

UPDATE todos SET todo_status = 'complete' WHERE description = 'setup source connector';

(Optional) Install File sink connector

As bonus, you can quickly test this with a File Sink connector as well. It is available out of the box in the Kafka distribution - all you need to do is install the connector. Just replace the topics and file attribute in file-sink-connector.json file

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "<server name>.<table name> e.g. todos-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/work/pg-cdc.txt>"
    }
}

To create the connector:

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

Play around with the database records and monitor the records in the configured output sink file, e.g.

tail -f /Users/foo/work/pg-cdc.txt

Conclusion

If you've reached this far, thanks for reading (this rather lengthy tutorial)!

Change Data Capture is a powerful technique which can help "unlock the database" by providing near real-time access to it's changes. This was a "getting started" guide meant to help you get up and running quickly, experiment with and iterate further. Hope you found it useful!

Posted on by:

abhirockzz profile

Abhishek Gupta

@abhirockzz

Currently working with Kafka, Databases, Azure, Kubernetes and related open source projects

Microsoft Azure

Any language. Any platform.

Discussion

markdown guide
 

Many thanks, Abhishek!

A small (potential) missing step: at least for the pgoutput plugin (which Debezium kind of recommend for PostgreSQL 10+), a publication, named specifically dbz_publication, must be created on the PostgreSQL database for all the tables, which are to be tracked, with a SQL command like create publication dbz_publication for table table1,...,tableN;.

Indeed, when the Debezium Kafka Connector starts, it checks whether such a publication exists, and if not, it tries to create one tracking all the tables (create publication dbz_publication for all tables;), but that requires superuser privileges on the PostgreSQL database, which Azure does not provide (the database admin roles are limited to pg_admin only, not superuser, which Microsoft reserve to their own administrators).

 

Thank you sharing Denis! I used wal2json in the blog, but this is relevant if using pgoutput with managed offerings (such as Azure PostgreSQL). I'll try to capture this information in the form of another blog post... thanks again :)

You might also want to take a look at the publication.autocreate.mode property