<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Alik</title>
    <description>The latest articles on DEV Community by Alik (@alik-pgwalk).</description>
    <link>https://dev.to/alik-pgwalk</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3543286%2Fb1d6fd70-c2a6-4fb3-9005-c0502b216386.png</url>
      <title>DEV Community: Alik</title>
      <link>https://dev.to/alik-pgwalk</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/alik-pgwalk"/>
    <language>en</language>
    <item>
      <title>Stream Postgres WAL to AWS SQS: A Lightweight Approach</title>
      <dc:creator>Alik</dc:creator>
      <pubDate>Tue, 07 Oct 2025 13:00:00 +0000</pubDate>
      <link>https://dev.to/alik-pgwalk/stream-postgres-wal-to-aws-sqs-a-lightweight-approach-2dgm</link>
      <guid>https://dev.to/alik-pgwalk/stream-postgres-wal-to-aws-sqs-a-lightweight-approach-2dgm</guid>
      <description>&lt;p&gt;Real-time data pipelines are becoming essential for modern applications. Whether you want to trigger event-driven microservices, update analytics dashboards, or implement an outbox pattern, reacting to database changes in real time is often a pain.&lt;/p&gt;

&lt;p&gt;Postgres provides &lt;strong&gt;logical replication&lt;/strong&gt;, but connecting it to AWS services like SQS typically requires complex setups involving Kafka, Debezium, or custom scripts.&lt;/p&gt;

&lt;p&gt;I've been working with PostgreSQL logical replication for over six years - that experience inspired me to build &lt;strong&gt;&lt;a href="https://www.pgwalk.com/pg2sqs/" rel="noopener noreferrer"&gt;pg2sqs&lt;/a&gt;&lt;/strong&gt;: a lightweight tool that reads the Postgres Write-Ahead Log (WAL) and streams database changes to &lt;strong&gt;AWS SQS&lt;/strong&gt; in real time. It's fast, easy to set up, and works right out of the box.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvo1jszi5wfbuyexsbbhw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvo1jszi5wfbuyexsbbhw.png" alt="PostgreSQL logical replication to AWS SQS flow diagram showing pg2sqs reading WAL changes and forwarding them to queues" width="800" height="533"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Why You Might Need This
&lt;/h2&gt;

&lt;p&gt;Consider these common scenarios:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;You want &lt;strong&gt;event-driven microservices&lt;/strong&gt; to react to database changes.&lt;/li&gt;
&lt;li&gt;You need &lt;strong&gt;real-time analytics&lt;/strong&gt; on transactional data.&lt;/li&gt;
&lt;li&gt;You want a simple &lt;strong&gt;outbox pattern&lt;/strong&gt; without the overhead of Kafka.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Existing solutions work, but they can be heavy and complex. My goal was to create something &lt;strong&gt;practical, lightweight, and deployable in minutes&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  How it works
&lt;/h2&gt;

&lt;p&gt;At a high level, the app does three things:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Connects to Postgres using &lt;strong&gt;logical replication&lt;/strong&gt; connection.&lt;/li&gt;
&lt;li&gt;Receives WAL entries and converts them into JSON messages.&lt;/li&gt;
&lt;li&gt;Sends the messages to &lt;strong&gt;AWS SQS&lt;/strong&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Advanced features under the hood
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;High-Performance WAL Processing&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;The app buffers WAL entries in memory and sends them to SQS using the &lt;strong&gt;Batch API&lt;/strong&gt;. You can configure both the number of simultaneous batch requests and the flush interval, so it scales with your workload and network conditions.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Reliable Delivery with Automatic Retries&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Each batch response is carefully processed, and any failed entries are &lt;strong&gt;retried automatically&lt;/strong&gt; using a backoff algorithm. Retry parameters are fully configurable, giving you precise control over timing and limits.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Flexible Queue Options&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Supports &lt;strong&gt;FIFO&lt;/strong&gt; queues with configurable &lt;code&gt;GroupID&lt;/code&gt; per queue.&lt;/li&gt;
&lt;li&gt;Allows separate queues for inserts, updates, and deletes, keeping your event streams organized.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Multi-Replica Support &amp;amp; High Availability&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;The app can connect to multiple Postgres replicas and handles &lt;strong&gt;master disconnects gracefully&lt;/strong&gt;. If the primary goes down, the app keeps running, waiting for a replica promotion.&lt;/li&gt;
&lt;li&gt;For &lt;strong&gt;Postgres version ≥16&lt;/strong&gt;, it also manages &lt;strong&gt;replication slots on replicas&lt;/strong&gt;, ensuring that entries already processed before a failover aren't reprocessed - maintaining exactly-once semantics.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Getting started
&lt;/h2&gt;

&lt;p&gt;Let's prepare an &lt;strong&gt;AWS SQS&lt;/strong&gt; queue. In this example, we'll use a &lt;strong&gt;users.fifo&lt;/strong&gt; FIFO queue created via the AWS Management Console.&lt;br&gt;
Alternatively, you can let &lt;strong&gt;pg2sqs&lt;/strong&gt; create the queue automatically on startup - just make sure the IAM user has the required permissions.&lt;/p&gt;

&lt;p&gt;For the Postgres and pg2sqs I created the following &lt;code&gt;docker-compose&lt;/code&gt; file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;version: "3.9"

services:
  postgres:
    image: postgres:17
    container_name: pg2sqs-postgres
    command: ["postgres", "-c", "wal_level=logical", "-c", "max_wal_senders=5", "-c", "max_replication_slots=5"]
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: pg2sqspassword
      POSTGRES_DB: postgres
    volumes:
      - ./sql/01_table_pub.sql:/docker-entrypoint-initdb.d/01_table_pub.sql
      - ./sql/02_slot.sql:/docker-entrypoint-initdb.d/02_slot.sql
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      retries: 5
    networks:
      - pg2sqs-net

  pg2sqs:
    image: alikpgwalk/pg2sqs:latest
    container_name: pg2sqs
    depends_on:
      postgres:
        condition: service_healthy
    environment:
      PG2SQS_POSTGRES_STANDBYTIMEOUT: 35s
      PG2SQS_POSTGRES_RECEIVETIMEOUT: 35s
      PG2SQS_POSTGRES_CONN_DATABASE: postgres
      PG2SQS_POSTGRES_CONN_HOST: pg2sqs-postgres
      PG2SQS_POSTGRES_CONN_PORT: 5432
      PG2SQS_POSTGRES_CONN_USER: postgres
      PG2SQS_POSTGRES_CONN_PASSWORD: pg2sqspassword
      PG2SQS_POSTGRES_REPL_PUB: users_pub
      PG2SQS_POSTGRES_REPL_SLOT: users_slot
      PG2SQS_FLUSHINTERVAL: 1s
      PG2SQS_FLUSHWORKERS: 5
      PG2SQS_MAXWRITEQUEUESIZE: 10000
      PG2SQS_WRITETIMEOUT: 10s
      PG2SQS_SHUTDOWNTIMEOUT: 20s
      PG2SQS_STATSINTERVAL: 30s
      PG2SQS_RETRYPOLICY_MAXRETRIES: 5
      PG2SQS_RETRYPOLICY_MAXCONNECTIONRETRIES: 0
      PG2SQS_RETRYPOLICY_INITIALBACKOFF: 15s
      PG2SQS_RETRYPOLICY_MULTIPLIER: 1
      PG2SQS_RETRYPOLICY_JITTER: 0.4
      PG2SQS_RETRYPOLICY_MAXBACKOFF: 120s
      PG2SQS_T_0_NAME: public.users
      PG2SQS_T_0_COLUMNS: all
      PG2SQS_T_0_Q_NAME: users.fifo
      PG2SQS_T_0_Q_GROUPID: '$${%table%}$${id}'
      PGWALK_LIC_PG2SQS: CzluOrIpH440IyK0MOPNOoOjOdSsqBxzp9qW9V6H8C6YDdDN4PWyd3zIBTMS7lqJtYa6Lo10jnDPJ7fHiUzuMa2ArLDaWuoqwvyhb14SosDNVDsMOxUA5hb9w4Hv3DGsAheFi9O3FhwkTepGMqeK2BwmxFGtgUwdhYVG9TVgGkTmTHmIW2btK5ft6Jmjy5Np
      AWS_REGION: &amp;lt;AWS REGION e.g. us-east-1&amp;gt;
      AWS_ENDPOINT_URL_SQS: &amp;lt;AWS SQS Endpoint url e.g. https://sqs.us-east-1.amazonaws.com&amp;gt;
      AWS_ACCESS_KEY_ID: &amp;lt;AWS ACCESS KEY ID&amp;gt; 
      AWS_SECRET_ACCESS_KEY: &amp;lt;AWS SECRET&amp;gt;
    networks:
      - pg2sqs-net

networks:
  pg2sqs-net:
    driver: bridge
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's break this down.&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;postgres&lt;/strong&gt; service is created from the &lt;code&gt;postgres:17&lt;/code&gt; image. We set the password to &lt;code&gt;pg2sqspassword&lt;/code&gt; and override the default command to set &lt;code&gt;wal_level&lt;/code&gt;, &lt;code&gt;max_wal_senders&lt;/code&gt;, and &lt;code&gt;max_replication_slots&lt;/code&gt;. These settings are necessary to enable logical replication.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;em&gt;For production use it's recommended to create a separate user with replication permissions. However, for the sake of simplicity in this article we will use postgres user. For more details see the &lt;a href="https://github.com/pgwalk/pg2sqs/blob/main/docs/config.md#security" rel="noopener noreferrer"&gt;pg2sqs documentation&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;We mount two SQL files into the Postgres container, placing them in the &lt;code&gt;docker-entrypoint-initdb.d&lt;/code&gt; directory. Postgres automatically runs any &lt;code&gt;.sql&lt;/code&gt; files found in this directory during initialization.&lt;/p&gt;

&lt;p&gt;The first file creates &lt;code&gt;users&lt;/code&gt; table and &lt;code&gt;users_pub&lt;/code&gt; publication:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE TABLE users (
 id SERIAL PRIMARY KEY,
 email TEXT NOT NULL UNIQUE,
 created_at TIMESTAMP DEFAULT now()
);
CREATE PUBLICATION users_pub FOR TABLE users;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The second file creates a logical replication slot:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT pg_create_logical_replication_slot('users_slot', 'pgoutput');
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then we create &lt;strong&gt;pg2sqs&lt;/strong&gt; service from the &lt;code&gt;alikpgwalk/pg2sqs:latest&lt;/code&gt; image and configure it by using environment variables. The most important are:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;code&gt;PG2SQS_T_...&lt;/code&gt; - variables are used to configure how a table maps to an SQS queue. For the first table we use &lt;code&gt;T_0&lt;/code&gt;, for the 2nd &lt;code&gt;T_1&lt;/code&gt; etc. 

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;PG2SQS_T_0_NAME&lt;/code&gt; - the table name &lt;/li&gt;
&lt;li&gt;
&lt;code&gt;PG2SQS_T_0_COLUMNS&lt;/code&gt; - columns list or &lt;code&gt;all&lt;/code&gt; for all columns &lt;/li&gt;
&lt;li&gt;
&lt;code&gt;PG2SQS_T_0_Q_NAME&lt;/code&gt; -the  SQS queue name &lt;/li&gt;
&lt;li&gt;
&lt;code&gt;PG2SQS_T_0_Q_GROUPID&lt;/code&gt; - a template expresssion for groupid. Because we're using a FIFO queue, a GroupID is required. When creating messages, the app evaluates the GroupID expression into something like &lt;code&gt;public.users&amp;lt;id&amp;gt;&lt;/code&gt;. This ensures that all messages for a specific id are delivered in order by SQS. Also note double &lt;strong&gt;$$&lt;/strong&gt;. We need this to escape the original &lt;strong&gt;$&lt;/strong&gt; symbol in the compose file only. The actual value is &lt;code&gt;${%table%}${id}&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;PGWALK_LIC_PG2SQS&lt;/code&gt; - the license env var. Use the value provided in the compose file. This license key allows free usage of pg2sqs until &lt;strong&gt;January 31th, 2026&lt;/strong&gt;.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;AWS_&lt;/code&gt; env vars - set according to your AWS account settings. E.g. if you're using &lt;code&gt;us-east-1&lt;/code&gt; region you can set &lt;code&gt;AWS_REGION: us-east-1&lt;/code&gt; and &lt;code&gt;AWS_ENDPOINT_URL_SQS: https://sqs.us-east-1.amazonaws.com&lt;/code&gt;. &lt;code&gt;AWS_ACCESS_KEY_ID&lt;/code&gt; and &lt;code&gt;AWS_SECRET_ACCESS_KEY&lt;/code&gt; need to be configured via IAM for your AWS account. &lt;strong&gt;Do not commit actual values into the repository.&lt;/strong&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;We connect both containers to the &lt;code&gt;pg2sqs-net&lt;/code&gt; network in Docker, which we define at the end of the Compose file. This network allows us to reference &lt;code&gt;pg2sqs-postgres&lt;/code&gt; by its service name instead of an IP address. Without a shared network, we would have to rely on IPs, which are less convenient and not stable.&lt;/p&gt;

&lt;p&gt;For a complete reference see the &lt;a href="https://github.com/pgwalk/pg2sqs" rel="noopener noreferrer"&gt;pg2sqs documentation&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;pg2sqs&lt;/strong&gt; in action
&lt;/h2&gt;

&lt;p&gt;Now let's start the containers via docker &lt;code&gt;compose up -d&lt;/code&gt; command.&lt;br&gt;
And insert a user into the &lt;code&gt;users&lt;/code&gt; table:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -i pg2sqs-postgres psql -U postgres -d postgres -c "INSERT INTO users (email) VALUES ('alice@example.com');"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we then check the messages for &lt;code&gt;users.fifo&lt;/code&gt; queue we would see the following message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{"id":1,"email":"alice@example.com","created_at":"2025-10-02T20:22:54.859794Z","kind":"insert"}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That's everything you need to get started. The process is straightforward, and &lt;code&gt;pg2sqs&lt;/code&gt; even prints statistics that look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
 "level": "info",
 "ts": "2025–10–02T16:30:01.153717123Z",
 "logger": "pg_replicator",
 "msg": "statistics",
 "pg2sqs_tx_last_committed": 0,
 "pg2sqs_tx_last_reported": 756,
 "pg2sqs_lsn_last_reported": "0/1584C31",
 "pg2sqs_tx_total": 1,
 "pg2sqs_tx_streamed_total": 0,
 "pg2sqs_tx_last_processed": 756,
 "pg2sqs_lsn_last_processed": "0/1584C30",
 "pg2sqs_downstream_write_queue_length": 0,
 "pg2sqs_downstream_retry_queue_length": 0,
 "pg2sqs_downstream_tx_last_sent": 756,
 "pg2sqs_inserts_total{table=\"public.users\"}": 1,
 "pg2sqs_updates_total{table=\"public.users\"}": 0,
 "pg2sqs_deletes_total{table=\"public.users\"}": 0,
 "pg2sqs_sqs_requests_total{queue=\"users.fifo\"}": 1,
 "pg2sqs_sqs_failures_total{queue=\"users.fifo\"}": 0,
 "pg2sqs_sqs_requests_inflight_total": 0,
 "pg2sqs_sqs_tx_last_sent": 756,
 "pg2sqs_sqs_tx_last_ok": 756
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To learn more, check out the &lt;a href="https://github.com/pgwalk/pg2sqs" rel="noopener noreferrer"&gt;pg2sqs documentation&lt;/a&gt; or try integrating it with your own projects. This setup makes it easy to start replicating Postgres changes into SQS and experiment with different configurations.&lt;br&gt;
If you have any feedback or questions, feel free to email us at &lt;strong&gt;&lt;a href="mailto:pg2sqs@pgwalk.com"&gt;pg2sqs@pgwalk.com&lt;/a&gt;&lt;/strong&gt;.&lt;/p&gt;

</description>
      <category>postgres</category>
      <category>aws</category>
      <category>sqs</category>
      <category>cloud</category>
    </item>
  </channel>
</rss>
