DEV Community

Cover image for PostgreSQL to NATS Streaming
Vladyslav Len
Vladyslav Len

Posted on • Originally published at

2 3 3 2 3

PostgreSQL to NATS Streaming


As we all know - Postgres is eating the world of databases. It stands out like a Swiss army knife from databases. So, more and more developers adopt PostgreSQL in their projects to store the data.
But as this always happens as projects grow - the need to stream the changes from the database to other services arises. This is where DataBrew Cloud and Open Source Blink come in.

Why would you do that?

Streaming data is not a silver bullet, but it still has a lot of use cases. Here are some of them:

  • Building event-driven architecture
  • Real-time analytics
  • Sharing data with external systems

What are the benefits?

Data streaming from Postgres, also called CDC (Change-Data-Capture) is a process of reading changes from a WAL file directly, instead of querying your data which may cause a significant load on the database.

It also allows you to be sure your consumer may be offline for a while and still get all the changes when they come back online.


Postgres setup

First, let's ensure you have your database ready for CDC.
Let's check your WAL_LEVEL:

SHOW wal_level;
Enter fullscreen mode Exit fullscreen mode

If the result is not logical you should change it to logical:

WAL_LEVEL param represents the way your database will work with WAL.
We want to have it set to logical as it makes the database write changes to a WAL file in a way that we can read it later.

NATS setup

Make sure you have server running. You can use the official docker image:

docker run -p 4222:4222 -ti nats:latest
Enter fullscreen mode Exit fullscreen mode

You should see logs like this:

[1] 2019/05/24 15:42:58.228063 [INF] Starting nats-server version #.#.#
[1] 2019/05/24 15:42:58.228115 [INF] Git commit [#######]
[1] 2019/05/24 15:42:58.228201 [INF] Starting http monitor on
[1] 2019/05/24 15:42:58.228740 [INF] Listening for client connections on
[1] 2019/05/24 15:42:58.228765 [INF] Server is ready
[1] 2019/05/24 15:42:58.229003 [INF] Listening for route connections on
Enter fullscreen mode Exit fullscreen mode

If you are going to use DataBrew Cloud - you must ensure your Postgres and NATS are accessible from the internet. You can use services like ngrok to expose your local services to the internet. Or you can deploy them in the cloud.

Start with DataBrew Cloud

First, you need to create a new account in DataBrew Cloud or log into an existing one.

Then you need to create a new pipeline. You can do this by clicking on the "New Pipeline" button in the top right corner.

Add Postgres source

First, we must configure our PostgreSQL database as a source for the pipeline. Click on the "Add Connector" button and select "Postgres-CDC" from the list.

Create new Postgres-CDC Connector

Then you need to fill in the connection details for your Postgres database. You need to provide the following information:

Postgres-CDC Connector settings

When you fill out all the info - Press "Check Connection" to ensure the connection is working.

You will later be asked to provide the table you want to stream the changes from. Simply select the one needed to proceed.

Add NATS sink

To create a full pipeline you need to add a sink for the data. In our case, it will be NATS.

Click on the "Add Connector" button and select "NATS" from the list.
The flow is relatively the same as with Postgres-CDC connector. You need to provide the connection details for your NATS server.

Create NATS Connector destination

Provide the connection details and press "Check Connection" to ensure the connection is working.

NATS Connector settings

Creating the pipeline

Once you have both connectors configured, you can press the "Create Pipeline" button to create the pipeline.

Select the previously created Postgres-CDC Connection as a source and NATS connector as a destination. It our case the connection name is "Taxi rides", as we are going to stream the changes from the "taxi_rides" table.

Select Postgres as pipeline source

Select NATS as pipeline destination

Now is the time to save and deploy our pipeline. Press the "Save pipeline" button. We are not going to add any processors to our data flow just yet.

Save new pipeline

After you store the pipeline and press the "Deploy" button - you will see the logs of the pipeline execution.

Please keep in mind that the first pipeline deployment may take a few seconds.

Within a few seconds, you will see the logs from the pipeline execution. If everything is correct - you will see logs like this:

2024-05-22 22:11:59 INFO Metrics: Component has been loaded
2024-05-22 22:11:59 INFO Source: Loaded driver=postgres_cdc
2024-05-22 22:11:59 INFO Sinks: Loaded driver=nats
2024/05/22 22:11:59 INFO [source]: PostgreSQL-CDC: Create publication for table schemas with query CREATE PUBLICATION pglog_stream_rs_databrew_replication_slot_174_2231 FOR TABLE public.taxi_rides;
2024/05/22 22:11:59 INFO [source]: PostgreSQL-CDC: Created Postgresql publication publication_name=rs_databrew_replication_slot_174_2231
2024/05/22 22:11:59 INFO [source]: PostgreSQL-CDC: System identification result SystemID:=7293538614695768105 Timeline:=1 XLogPos:=E4/5C009318 DBName:=mocks
2024/05/22 22:12:00 INFO [source]: PostgreSQL-CDC: Processing database snapshot schema=public
  table={TableName:public.taxi_rides Schema:schema:
  │   fields: 11
  │     - _cq_sync_time: type=utf8, nullable
  │     - distance_traveled: type=float64, nullable
  │     - driver_id: type=int32, nullable
  │     - duration: type=int32, nullable
  │     - end_location: type=utf8, nullable
  │     - fare_amount: type=float64, nullable
  │     - log_id: type=int32
  │     - passenger_id: type=int32, nullable
  │     - payment_method: type=utf8, nullable
  │     - start_location: type=utf8, nullable
  │     - timestamp: type=utf8, nullable}
2024/05/22 22:12:00 INFO [source]: PostgreSQL-CDC: Query snapshot batch-size=13500
2024/05/22 22:12:00 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=0
2024/05/22 22:12:05 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=13500
2024/05/22 22:12:08 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=27000
2024/05/22 22:12:09 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=40500
2024-05-22 22:12:09 INFO Stream: Messages stat messages_received=53649 messages_sent=53649 messages_dropped_or_filtered=0
2024/05/22 22:12:09 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=54000
2024/05/22 22:12:10 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=67500
2024/05/22 22:12:11 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=81000
2024/05/22 22:12:11 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=94500
2024/05/22 22:12:12 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=108000
Enter fullscreen mode Exit fullscreen mode

Start with Open Source Blink

Blink is an Open-Source project from DataBrew that allows you to stream data from various sources to various destinations.

In this section, we will cover how to start with Blink and stream data from Postgres to NATS.

Assuming you already have all Postgres and NATS setup - let's start with Blink.

Download and install Blink

You can read more about the installation here - Installing Blink.

Create a new pipeline

Comparing to the DataBrew Cloud - Blink is a CLI tool. You can create a new pipeline by defining the pipeline configuration in a YAML file.

Here is an example of the pipeline configuration for our particular use case:
Store the file with the name blink.yaml

  pipeline_id: 223
  driver: postgres_cdc
    host: localhost
    slot_name: slot_example_name
    user: postgres
    password: 12345
    port: 5432
    schema: public
    stream_snapshot: false
    snapshot_memory_safety_factor: 0.1
    snapshot_batch_size: 10000
    ssl_required: true
    database: mocks
    - stream: public.taxi_rides
        - name: log_id
          databrewType: Int32
          nativeConnectorType: integer
          pk: true
          nullable: false
        - name: _cq_sync_time
          databrewType: String
          nativeConnectorType: timestamp without time zone
          pk: false
          nullable: true
        - name: distance_traveled
          databrewType: Float64
          nativeConnectorType: double precision
          pk: false
          nullable: true
        - name: driver_id
          databrewType: Int32
          nativeConnectorType: integer
          pk: false
          nullable: true
        - name: duration
          databrewType: Int32
          nativeConnectorType: integer
          pk: false
          nullable: true
        - name: end_location
          databrewType: String
          nativeConnectorType: text
          pk: false
          nullable: true
        - name: fare_amount
          databrewType: Float64
          nativeConnectorType: double precision
          pk: false
          nullable: true
        - name: passenger_id
          databrewType: Int32
          nativeConnectorType: integer
          pk: false
          nullable: true
        - name: payment_method
          databrewType: String
          nativeConnectorType: text
          pk: false
          nullable: true
        - name: start_location
          databrewType: String
          nativeConnectorType: text
          pk: false
          nullable: true
        - name: timestamp
          databrewType: String
          nativeConnectorType: text
          pk: false
          nullable: true
processors: []
  driver: nats
    url: localhost:4222
    subject: taxi_rides
    username: ""
    password: ""
Enter fullscreen mode Exit fullscreen mode

Start the pipeline

If you have Blink installed locally, you can start the pipeline by running the following command:

blink start -c blink.yaml
Enter fullscreen mode Exit fullscreen mode

You should see the following output:

2024/05/22 22:12:41 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=567000
2024/05/22 22:12:42 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=580500
Enter fullscreen mode Exit fullscreen mode

The logs above display the data that is being streamed from the snapshot of existing data in the Postgres table.

Your logs may be slightly different as you may have different data in your Postgres table.

Check the data in NATS

The last step we can do is to check the data in NATS. You can use the NATS CLI tool to check the data in the subject.

nats sub -s nats:// "taxi_rides"
Enter fullscreen mode Exit fullscreen mode

If you did everything correctly, you should see the following logs:

22:17:42 Subscribing on taxi_rides

[#1] Received on "taxi_rides"
[{"_cq_sync_time":null,"distance_traveled":19.15,"driver_id":1,"duration":320,"end_location":"55 Schlimgen Road","fare_amount":309.56,"log_id":8540,"passenger_id":583,"payment_method":"cash","start_location":"87 Fisk Driv","timestamp":"08/18/2022"}]
Enter fullscreen mode Exit fullscreen mode

In this article, we explored how to set up a streaming pipeline from PostgreSQL to NATS using DataBrew Cloud and the open-source tool Blink. We covered the initial setup of PostgreSQL for Change Data Capture (CDC), configuring NATS, and creating a streaming pipeline with DataBrew Cloud. Additionally, we demonstrated how to achieve the same result using Blink, a powerful CLI tool from DataBrew.

By leveraging these tools, you can build efficient and scalable data streaming solutions for various use cases such as event-driven architectures, real-time analytics, and seamless data integration with external systems. Streaming data from PostgreSQL using CDC ensures minimal load on your database and reliable data delivery, even if your consumer is temporarily offline.

If you found this guide helpful and are interested in supporting our work, please consider giving a star to our project on GitHub. Your support helps us continue to develop and improve these tools.

Give us a star

Give a star on GitHub!

API Trace View

Struggling with slow API calls?

Dan Mindru walks through how he used Sentry's new Trace View feature to shave off 22.3 seconds from an API call.

Get a practical walkthrough of how to identify bottlenecks, split tasks into multiple parallel tasks, identify slow AI model calls, and more.

Read more →

Top comments (2)

warwait profile image
Parker Waiters

Could you explain more about how DataBrew Cloud and Blink handle schema changes in the PostgreSQL database during data streaming?

levla profile image
Vladyslav Len

Hey, sure.
At this stage, we are not sure if we need to propagate schema changes.
Therefore, if your source schema changes in a way that gets more columns - DataBrew Cloud & blink will not be parsing these new columns. But if it changes in a way it drops some of the required fields - this will cause a pipeline stop event, as your external system may probably expect these columns to be delivered.

Currently, we have schema change handling in our plans and aiming to make it configurable and seamless.

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!
