DEV Community

Cover image for Track every PostgreSQL data change using Debezium

Track every PostgreSQL data change using Debezium

Emtiaj Hasan on August 27, 2023

Imagine we have a Customer Relationship Management (CRM), sales, and inventory systems. As every system's responsibility is specific, these systems...
Collapse
 
daveybrown profile image
daveybrown

Hey Emtiaj, I just worked through this. It's a really well written post, and I successfuly got your app up and running and logging consumed events 🎉

In docker compose I used version 16 of Postgres - SHOW shared_preload_libraries didn't return wal2json for me, but everything still worked. The guide is a solid foundation for me to build upon, so thank you :)

I did it live on a 2 hour stream, with some commentary, if you like I can share the link.

Collapse
 
emtiajium profile image
Emtiaj Hasan • Edited

That is awesome!

Yes, please. Sharing is caring! 🫠

By the way, I had planned to use AWS Kinesis instead of Kafka and write another blog post, but laziness is all with me. 😭

Collapse
 
daveybrown profile image
daveybrown

It's at youtu.be/kGixHsxirmA 🤲 maybe it can be useful for someone.

Kinesis looks cool, but I'm very disconnected from the AWS world. Would it be a big advantage over Kafka?

Thread Thread
 
emtiajium profile image
Emtiaj Hasan

I wanted to play with it as I heavily use SNS and SQS.

Just out of curiosity, you know!

Thread Thread
 
daveybrown profile image
daveybrown

Cool, I see :)

Also, thanks for the intro to NestJS. I'm researching it now. Do you use it professionally?

Thread Thread
 
emtiajium profile image
Emtiaj Hasan

In my company, for my vocabulary flashcard app's day-to-day's implementation, I use it. 😁

A cool framework!

Collapse
 
krishnesh_kumar_48371d150 profile image
Krishnesh Kumar

what if we have two tables related with Many2Many in this case both events from related table can go to different partition of the topic how we can handle that? I tried to add a partition key to the record via debzium configuration but it don't worked for me.

I tried partition routing but still I am getting the same result i.e my one of the user record is going to partition 0 and m2m related table is going to partition 1 and so forth. Here i am sharing my configuration,

{
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres-abcdef",
      "database.port": "5432",
      "database.user": "testx",
      "database.password": "qwerty123",
      "database.dbname": "dashboard",
      "database.server.name": "dashboard",
      "plugin.name": "pgoutput",
      "publication.name": "users_publication",
      "slot.name": "users_slot",
      "table.include.list": "public.zenauth_abcuser, public.zenauth_usercustomer",
      "database.history.kafka.bootstrap.servers": "kafka:9092",
      "database.history.kafka.topic": "schema-changes.postgres",
      "snapshot.mode": "never",
      "topic.prefix": "dashboard",
      "snapshot.isolation.mode": "read_committed",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false",
      "key.converter.schemas.enable": "false",

      "transforms": "PartitionRouting,Reroute",

      "transforms.PartitionRouting.type":"io.debezium.transforms.partitions.PartitionRouting",
      "transforms.PartitionRouting.partition.topic.num": "3",
      "transforms.PartitionRouting.partition.payload.fields":"after.username",

      "transforms.PartitionRouting.predicate":"allTopic",
      "predicates":"allTopic",
      "predicates.allTopic.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
      "predicates.allTopic.pattern":"public.*",

      "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
      "transforms.Reroute.topic.regex": ".*",
      "transforms.Reroute.topic.replacement": "dashboard.users",


      "errors.tolerance": "all",
      "errors.log.enable": "true",
      "errors.log.include.messages": "true"
    }
Enter fullscreen mode Exit fullscreen mode

Also can you please help what if we have a delete event in that after object will be empty in case of create before object will be empty how we can handle these type of condition?

"transforms.PartitionRouting.partition.payload.fields":"after.username",

Also I saw in doc that we can refer these using change.username is that the correct way?

Collapse
 
emtiajium profile image
Emtiaj Hasan

@krishnesh Kumar, sorry to say that I didn't experience this type of scenario, therefore, I cannot help here. If I manage to replicate this scenario and fix it, for sure, I will share it with you. In the meantime, if you find a solution, don't bother to share it here.

Thanks!

Collapse
 
chirag_goyal_52ddd55fb487 profile image
Chirag Goyal

How this will detect the change if we have shared database

Collapse
 
emtiajium profile image
Emtiaj Hasan

Can you please explain a bit more?
Thanks!