Real-time data pipelines are becoming essential for modern applications. Whether you want to trigger event-driven microservices, update analytics dashboards, or implement an outbox pattern, reacting to database changes in real time is often a pain.
Postgres provides logical replication, but connecting it to AWS services like SQS typically requires complex setups involving Kafka, Debezium, or custom scripts.
I've been working with PostgreSQL logical replication for over six years - that experience inspired me to build pg2sqs: a lightweight tool that reads the Postgres Write-Ahead Log (WAL) and streams database changes to AWS SQS in real time. It's fast, easy to set up, and works right out of the box.
Why You Might Need This
Consider these common scenarios:
- You want event-driven microservices to react to database changes.
- You need real-time analytics on transactional data.
- You want a simple outbox pattern without the overhead of Kafka.
Existing solutions work, but they can be heavy and complex. My goal was to create something practical, lightweight, and deployable in minutes.
How it works
At a high level, the app does three things:
- Connects to Postgres using logical replication connection.
- Receives WAL entries and converts them into JSON messages.
- Sends the messages to AWS SQS.
Advanced features under the hood
-
High-Performance WAL Processing
- The app buffers WAL entries in memory and sends them to SQS using the Batch API. You can configure both the number of simultaneous batch requests and the flush interval, so it scales with your workload and network conditions.
-
Reliable Delivery with Automatic Retries
- Each batch response is carefully processed, and any failed entries are retried automatically using a backoff algorithm. Retry parameters are fully configurable, giving you precise control over timing and limits.
-
Flexible Queue Options
- Supports FIFO queues with configurable
GroupID
per queue. - Allows separate queues for inserts, updates, and deletes, keeping your event streams organized.
- Supports FIFO queues with configurable
-
Multi-Replica Support & High Availability
- The app can connect to multiple Postgres replicas and handles master disconnects gracefully. If the primary goes down, the app keeps running, waiting for a replica promotion.
- For Postgres version ≥16, it also manages replication slots on replicas, ensuring that entries already processed before a failover aren't reprocessed - maintaining exactly-once semantics.
Getting started
Let's prepare an AWS SQS queue. In this example, we'll use a users.fifo FIFO queue created via the AWS Management Console.
Alternatively, you can let pg2sqs create the queue automatically on startup - just make sure the IAM user has the required permissions.
For the Postgres and pg2sqs I created the following docker-compose
file:
version: "3.9"
services:
postgres:
image: postgres:17
container_name: pg2sqs-postgres
command: ["postgres", "-c", "wal_level=logical", "-c", "max_wal_senders=5", "-c", "max_replication_slots=5"]
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: pg2sqspassword
POSTGRES_DB: postgres
volumes:
- ./sql/01_table_pub.sql:/docker-entrypoint-initdb.d/01_table_pub.sql
- ./sql/02_slot.sql:/docker-entrypoint-initdb.d/02_slot.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
retries: 5
networks:
- pg2sqs-net
pg2sqs:
image: alikpgwalk/pg2sqs:latest
container_name: pg2sqs
depends_on:
postgres:
condition: service_healthy
environment:
PG2SQS_POSTGRES_STANDBYTIMEOUT: 35s
PG2SQS_POSTGRES_RECEIVETIMEOUT: 35s
PG2SQS_POSTGRES_CONN_DATABASE: postgres
PG2SQS_POSTGRES_CONN_HOST: pg2sqs-postgres
PG2SQS_POSTGRES_CONN_PORT: 5432
PG2SQS_POSTGRES_CONN_USER: postgres
PG2SQS_POSTGRES_CONN_PASSWORD: pg2sqspassword
PG2SQS_POSTGRES_REPL_PUB: users_pub
PG2SQS_POSTGRES_REPL_SLOT: users_slot
PG2SQS_FLUSHINTERVAL: 1s
PG2SQS_FLUSHWORKERS: 5
PG2SQS_MAXWRITEQUEUESIZE: 10000
PG2SQS_WRITETIMEOUT: 10s
PG2SQS_SHUTDOWNTIMEOUT: 20s
PG2SQS_STATSINTERVAL: 30s
PG2SQS_RETRYPOLICY_MAXRETRIES: 5
PG2SQS_RETRYPOLICY_MAXCONNECTIONRETRIES: 0
PG2SQS_RETRYPOLICY_INITIALBACKOFF: 15s
PG2SQS_RETRYPOLICY_MULTIPLIER: 1
PG2SQS_RETRYPOLICY_JITTER: 0.4
PG2SQS_RETRYPOLICY_MAXBACKOFF: 120s
PG2SQS_T_0_NAME: public.users
PG2SQS_T_0_COLUMNS: all
PG2SQS_T_0_Q_NAME: users.fifo
PG2SQS_T_0_Q_GROUPID: '$${%table%}$${id}'
PGWALK_LIC_PG2SQS: CzluOrIpH440IyK0MOPNOoOjOdSsqBxzp9qW9V6H8C6YDdDN4PWyd3zIBTMS7lqJtYa6Lo10jnDPJ7fHiUzuMa2ArLDaWuoqwvyhb14SosDNVDsMOxUA5hb9w4Hv3DGsAheFi9O3FhwkTepGMqeK2BwmxFGtgUwdhYVG9TVgGkTmTHmIW2btK5ft6Jmjy5Np
AWS_REGION: <AWS REGION e.g. us-east-1>
AWS_ENDPOINT_URL_SQS: <AWS SQS Endpoint url e.g. https://sqs.us-east-1.amazonaws.com>
AWS_ACCESS_KEY_ID: <AWS ACCESS KEY ID>
AWS_SECRET_ACCESS_KEY: <AWS SECRET>
networks:
- pg2sqs-net
networks:
pg2sqs-net:
driver: bridge
Let's break this down.
The postgres service is created from the postgres:17
image. We set the password to pg2sqspassword
and override the default command to set wal_level
, max_wal_senders
, and max_replication_slots
. These settings are necessary to enable logical replication.
For production use it's recommended to create a separate user with replication permissions. However, for the sake of simplicity in this article we will use postgres user. For more details see the pg2sqs documentation
We mount two SQL files into the Postgres container, placing them in the docker-entrypoint-initdb.d
directory. Postgres automatically runs any .sql
files found in this directory during initialization.
The first file creates users
table and users_pub
publication:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email TEXT NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT now()
);
CREATE PUBLICATION users_pub FOR TABLE users;
The second file creates a logical replication slot:
SELECT pg_create_logical_replication_slot('users_slot', 'pgoutput');
Then we create pg2sqs service from the alikpgwalk/pg2sqs:latest
image and configure it by using environment variables. The most important are:
-
PG2SQS_T_...
- variables are used to configure how a table maps to an SQS queue. For the first table we useT_0
, for the 2ndT_1
etc.-
PG2SQS_T_0_NAME
- the table name -
PG2SQS_T_0_COLUMNS
- columns list orall
for all columns -
PG2SQS_T_0_Q_NAME
-the SQS queue name -
PG2SQS_T_0_Q_GROUPID
- a template expresssion for groupid. Because we're using a FIFO queue, a GroupID is required. When creating messages, the app evaluates the GroupID expression into something likepublic.users<id>
. This ensures that all messages for a specific id are delivered in order by SQS. Also note double $$. We need this to escape the original $ symbol in the compose file only. The actual value is${%table%}${id}
-
-
PGWALK_LIC_PG2SQS
- the license env var. Use the value provided in the compose file. This license key allows free usage of pg2sqs until January 31th, 2026. -
AWS_
env vars - set according to your AWS account settings. E.g. if you're usingus-east-1
region you can setAWS_REGION: us-east-1
andAWS_ENDPOINT_URL_SQS: https://sqs.us-east-1.amazonaws.com
.AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
need to be configured via IAM for your AWS account. Do not commit actual values into the repository.
We connect both containers to the pg2sqs-net
network in Docker, which we define at the end of the Compose file. This network allows us to reference pg2sqs-postgres
by its service name instead of an IP address. Without a shared network, we would have to rely on IPs, which are less convenient and not stable.
For a complete reference see the pg2sqs documentation.
pg2sqs in action
Now let's start the containers via docker compose up -d
command.
And insert a user into the users
table:
docker exec -i pg2sqs-postgres psql -U postgres -d postgres -c "INSERT INTO users (email) VALUES ('alice@example.com');"
If we then check the messages for users.fifo
queue we would see the following message:
{"id":1,"email":"alice@example.com","created_at":"2025-10-02T20:22:54.859794Z","kind":"insert"}
That's everything you need to get started. The process is straightforward, and pg2sqs
even prints statistics that look like this:
{
"level": "info",
"ts": "2025–10–02T16:30:01.153717123Z",
"logger": "pg_replicator",
"msg": "statistics",
"pg2sqs_tx_last_committed": 0,
"pg2sqs_tx_last_reported": 756,
"pg2sqs_lsn_last_reported": "0/1584C31",
"pg2sqs_tx_total": 1,
"pg2sqs_tx_streamed_total": 0,
"pg2sqs_tx_last_processed": 756,
"pg2sqs_lsn_last_processed": "0/1584C30",
"pg2sqs_downstream_write_queue_length": 0,
"pg2sqs_downstream_retry_queue_length": 0,
"pg2sqs_downstream_tx_last_sent": 756,
"pg2sqs_inserts_total{table=\"public.users\"}": 1,
"pg2sqs_updates_total{table=\"public.users\"}": 0,
"pg2sqs_deletes_total{table=\"public.users\"}": 0,
"pg2sqs_sqs_requests_total{queue=\"users.fifo\"}": 1,
"pg2sqs_sqs_failures_total{queue=\"users.fifo\"}": 0,
"pg2sqs_sqs_requests_inflight_total": 0,
"pg2sqs_sqs_tx_last_sent": 756,
"pg2sqs_sqs_tx_last_ok": 756
}
To learn more, check out the pg2sqs documentation or try integrating it with your own projects. This setup makes it easy to start replicating Postgres changes into SQS and experiment with different configurations.
If you have any feedback or questions, feel free to email us at pg2sqs@pgwalk.com.
Top comments (0)