DEV Community

Alik
Alik

Posted on • Originally published at Medium

Stream Postgres WAL to AWS SQS: A Lightweight Approach

Real-time data pipelines are becoming essential for modern applications. Whether you want to trigger event-driven microservices, update analytics dashboards, or implement an outbox pattern, reacting to database changes in real time is often a pain.

Postgres provides logical replication, but connecting it to AWS services like SQS typically requires complex setups involving Kafka, Debezium, or custom scripts.

I've been working with PostgreSQL logical replication for over six years - that experience inspired me to build pg2sqs: a lightweight tool that reads the Postgres Write-Ahead Log (WAL) and streams database changes to AWS SQS in real time. It's fast, easy to set up, and works right out of the box.

PostgreSQL logical replication to AWS SQS flow diagram showing pg2sqs reading WAL changes and forwarding them to queues

Why You Might Need This

Consider these common scenarios:

  • You want event-driven microservices to react to database changes.
  • You need real-time analytics on transactional data.
  • You want a simple outbox pattern without the overhead of Kafka.

Existing solutions work, but they can be heavy and complex. My goal was to create something practical, lightweight, and deployable in minutes.

How it works

At a high level, the app does three things:

  • Connects to Postgres using logical replication connection.
  • Receives WAL entries and converts them into JSON messages.
  • Sends the messages to AWS SQS.

Advanced features under the hood

  1. High-Performance WAL Processing
    • The app buffers WAL entries in memory and sends them to SQS using the Batch API. You can configure both the number of simultaneous batch requests and the flush interval, so it scales with your workload and network conditions.
  2. Reliable Delivery with Automatic Retries
    • Each batch response is carefully processed, and any failed entries are retried automatically using a backoff algorithm. Retry parameters are fully configurable, giving you precise control over timing and limits.
  3. Flexible Queue Options
    • Supports FIFO queues with configurable GroupID per queue.
    • Allows separate queues for inserts, updates, and deletes, keeping your event streams organized.
  4. Multi-Replica Support & High Availability
    • The app can connect to multiple Postgres replicas and handles master disconnects gracefully. If the primary goes down, the app keeps running, waiting for a replica promotion.
    • For Postgres version ≥16, it also manages replication slots on replicas, ensuring that entries already processed before a failover aren't reprocessed - maintaining exactly-once semantics.

Getting started

Let's prepare an AWS SQS queue. In this example, we'll use a users.fifo FIFO queue created via the AWS Management Console.
Alternatively, you can let pg2sqs create the queue automatically on startup - just make sure the IAM user has the required permissions.

For the Postgres and pg2sqs I created the following docker-compose file:

version: "3.9"

services:
  postgres:
    image: postgres:17
    container_name: pg2sqs-postgres
    command: ["postgres", "-c", "wal_level=logical", "-c", "max_wal_senders=5", "-c", "max_replication_slots=5"]
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: pg2sqspassword
      POSTGRES_DB: postgres
    volumes:
      - ./sql/01_table_pub.sql:/docker-entrypoint-initdb.d/01_table_pub.sql
      - ./sql/02_slot.sql:/docker-entrypoint-initdb.d/02_slot.sql
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      retries: 5
    networks:
      - pg2sqs-net

  pg2sqs:
    image: alikpgwalk/pg2sqs:latest
    container_name: pg2sqs
    depends_on:
      postgres:
        condition: service_healthy
    environment:
      PG2SQS_POSTGRES_STANDBYTIMEOUT: 35s
      PG2SQS_POSTGRES_RECEIVETIMEOUT: 35s
      PG2SQS_POSTGRES_CONN_DATABASE: postgres
      PG2SQS_POSTGRES_CONN_HOST: pg2sqs-postgres
      PG2SQS_POSTGRES_CONN_PORT: 5432
      PG2SQS_POSTGRES_CONN_USER: postgres
      PG2SQS_POSTGRES_CONN_PASSWORD: pg2sqspassword
      PG2SQS_POSTGRES_REPL_PUB: users_pub
      PG2SQS_POSTGRES_REPL_SLOT: users_slot
      PG2SQS_FLUSHINTERVAL: 1s
      PG2SQS_FLUSHWORKERS: 5
      PG2SQS_MAXWRITEQUEUESIZE: 10000
      PG2SQS_WRITETIMEOUT: 10s
      PG2SQS_SHUTDOWNTIMEOUT: 20s
      PG2SQS_STATSINTERVAL: 30s
      PG2SQS_RETRYPOLICY_MAXRETRIES: 5
      PG2SQS_RETRYPOLICY_MAXCONNECTIONRETRIES: 0
      PG2SQS_RETRYPOLICY_INITIALBACKOFF: 15s
      PG2SQS_RETRYPOLICY_MULTIPLIER: 1
      PG2SQS_RETRYPOLICY_JITTER: 0.4
      PG2SQS_RETRYPOLICY_MAXBACKOFF: 120s
      PG2SQS_T_0_NAME: public.users
      PG2SQS_T_0_COLUMNS: all
      PG2SQS_T_0_Q_NAME: users.fifo
      PG2SQS_T_0_Q_GROUPID: '$${%table%}$${id}'
      PGWALK_LIC_PG2SQS: CzluOrIpH440IyK0MOPNOoOjOdSsqBxzp9qW9V6H8C6YDdDN4PWyd3zIBTMS7lqJtYa6Lo10jnDPJ7fHiUzuMa2ArLDaWuoqwvyhb14SosDNVDsMOxUA5hb9w4Hv3DGsAheFi9O3FhwkTepGMqeK2BwmxFGtgUwdhYVG9TVgGkTmTHmIW2btK5ft6Jmjy5Np
      AWS_REGION: <AWS REGION e.g. us-east-1>
      AWS_ENDPOINT_URL_SQS: <AWS SQS Endpoint url e.g. https://sqs.us-east-1.amazonaws.com>
      AWS_ACCESS_KEY_ID: <AWS ACCESS KEY ID> 
      AWS_SECRET_ACCESS_KEY: <AWS SECRET>
    networks:
      - pg2sqs-net

networks:
  pg2sqs-net:
    driver: bridge
Enter fullscreen mode Exit fullscreen mode

Let's break this down.

The postgres service is created from the postgres:17 image. We set the password to pg2sqspassword and override the default command to set wal_level, max_wal_senders, and max_replication_slots. These settings are necessary to enable logical replication.

For production use it's recommended to create a separate user with replication permissions. However, for the sake of simplicity in this article we will use postgres user. For more details see the pg2sqs documentation

We mount two SQL files into the Postgres container, placing them in the docker-entrypoint-initdb.d directory. Postgres automatically runs any .sql files found in this directory during initialization.

The first file creates users table and users_pub publication:

CREATE TABLE users (
 id SERIAL PRIMARY KEY,
 email TEXT NOT NULL UNIQUE,
 created_at TIMESTAMP DEFAULT now()
);
CREATE PUBLICATION users_pub FOR TABLE users;
Enter fullscreen mode Exit fullscreen mode

The second file creates a logical replication slot:

SELECT pg_create_logical_replication_slot('users_slot', 'pgoutput');
Enter fullscreen mode Exit fullscreen mode

Then we create pg2sqs service from the alikpgwalk/pg2sqs:latest image and configure it by using environment variables. The most important are:

  1. PG2SQS_T_... - variables are used to configure how a table maps to an SQS queue. For the first table we use T_0, for the 2nd T_1 etc. 
    • PG2SQS_T_0_NAME - the table name 
    • PG2SQS_T_0_COLUMNS - columns list or all for all columns 
    • PG2SQS_T_0_Q_NAME -the SQS queue name 
    • PG2SQS_T_0_Q_GROUPID - a template expresssion for groupid. Because we're using a FIFO queue, a GroupID is required. When creating messages, the app evaluates the GroupID expression into something like public.users<id>. This ensures that all messages for a specific id are delivered in order by SQS. Also note double $$. We need this to escape the original $ symbol in the compose file only. The actual value is ${%table%}${id}
  2. PGWALK_LIC_PG2SQS - the license env var. Use the value provided in the compose file. This license key allows free usage of pg2sqs until January 31th, 2026.
  3. AWS_ env vars - set according to your AWS account settings. E.g. if you're using us-east-1 region you can set AWS_REGION: us-east-1 and AWS_ENDPOINT_URL_SQS: https://sqs.us-east-1.amazonaws.com. AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY need to be configured via IAM for your AWS account. Do not commit actual values into the repository.

We connect both containers to the pg2sqs-net network in Docker, which we define at the end of the Compose file. This network allows us to reference pg2sqs-postgres by its service name instead of an IP address. Without a shared network, we would have to rely on IPs, which are less convenient and not stable.

For a complete reference see the pg2sqs documentation.

pg2sqs in action

Now let's start the containers via docker compose up -d command.
And insert a user into the users table:

docker exec -i pg2sqs-postgres psql -U postgres -d postgres -c "INSERT INTO users (email) VALUES ('alice@example.com');"
Enter fullscreen mode Exit fullscreen mode

If we then check the messages for users.fifo queue we would see the following message:

{"id":1,"email":"alice@example.com","created_at":"2025-10-02T20:22:54.859794Z","kind":"insert"}
Enter fullscreen mode Exit fullscreen mode

That's everything you need to get started. The process is straightforward, and pg2sqs even prints statistics that look like this:

{
 "level": "info",
 "ts": "2025–10–02T16:30:01.153717123Z",
 "logger": "pg_replicator",
 "msg": "statistics",
 "pg2sqs_tx_last_committed": 0,
 "pg2sqs_tx_last_reported": 756,
 "pg2sqs_lsn_last_reported": "0/1584C31",
 "pg2sqs_tx_total": 1,
 "pg2sqs_tx_streamed_total": 0,
 "pg2sqs_tx_last_processed": 756,
 "pg2sqs_lsn_last_processed": "0/1584C30",
 "pg2sqs_downstream_write_queue_length": 0,
 "pg2sqs_downstream_retry_queue_length": 0,
 "pg2sqs_downstream_tx_last_sent": 756,
 "pg2sqs_inserts_total{table=\"public.users\"}": 1,
 "pg2sqs_updates_total{table=\"public.users\"}": 0,
 "pg2sqs_deletes_total{table=\"public.users\"}": 0,
 "pg2sqs_sqs_requests_total{queue=\"users.fifo\"}": 1,
 "pg2sqs_sqs_failures_total{queue=\"users.fifo\"}": 0,
 "pg2sqs_sqs_requests_inflight_total": 0,
 "pg2sqs_sqs_tx_last_sent": 756,
 "pg2sqs_sqs_tx_last_ok": 756
}
Enter fullscreen mode Exit fullscreen mode

To learn more, check out the pg2sqs documentation or try integrating it with your own projects. This setup makes it easy to start replicating Postgres changes into SQS and experiment with different configurations.
If you have any feedback or questions, feel free to email us at pg2sqs@pgwalk.com.

Top comments (0)