DEV Community

Arnab Das
Arnab Das

Posted on

Building a Producer-Consumer Queue with Redis and Haskell Using Hedis

TL;DR: We'll build a production-grade producer-consumer queue in Haskell using Redis as the message broker via the Hedis client library. By the end, you'll have a working system that can handle high-throughput job dispatch and consumption — the same pattern I used to process 1M+ payment refunds at Juspay.


Why Redis for a Queue?

When people think "message queue," they reach for Kafka or RabbitMQ. But Redis is often the right call when you need:

  • Low latency — sub-millisecond enqueue/dequeue
  • Simplicity — no broker clusters to manage
  • AtomicityLPUSH/BRPOP are atomic operations, safe under concurrency
  • Visibility — you can inspect the queue state instantly with LLEN

At Juspay, we routed payment refunds through a Redis-backed producer-consumer system. The queue absorbed burst traffic from merchant-triggered refund events and fed a pool of consumers that processed each refund, updated sub-statuses, and called downstream banking APIs — all without a single dropped message.

Let's build that.


What We're Building

┌──────────────┐        LPUSH         ┌─────────────────┐       BRPOP        ┌──────────────┐
│   Producer   │ ──────────────────▶  │   Redis Queue   │ ─────────────────▶ │   Consumer   │
│  (Job sender)│                      │  (List: jobs)   │                    │ (Job worker) │
└──────────────┘                      └─────────────────┘                    └──────────────┘
Enter fullscreen mode Exit fullscreen mode
  • Producer pushes JSON-encoded jobs onto a Redis list using LPUSH
  • Consumer blocks on BRPOP — waking up the instant a job arrives
  • Multiple consumers can run in parallel, each pulling distinct jobs atomically

Prerequisites

  • GHC + Cabal (or Stack) installed
  • A running Redis instance (redis-server or Docker: docker run -p 6379:6379 redis)
  • Basic familiarity with Haskell (do notation, IO)

Project Setup

Create a new Cabal project:

mkdir redis-queue && cd redis-queue
cabal init --non-interactive
Enter fullscreen mode Exit fullscreen mode

Add dependencies to your redis-queue.cabal file:

build-depends:
    base        >= 4.14,
    hedis       >= 0.15,
    aeson       >= 2.0,
    text        >= 1.2,
    bytestring  >= 0.11,
    async       >= 2.2
Enter fullscreen mode Exit fullscreen mode

Install and confirm Hedis is available:

cabal build
Enter fullscreen mode Exit fullscreen mode

Understanding Hedis Basics

Hedis wraps all Redis commands in the Redis monad, which you run against a Connection. Here's the mental model:

-- Open a connection pool
conn <- connect defaultConnectInfo

-- Run Redis commands inside runRedis
runRedis conn $ do
    set "hello" "world"
    get "hello"   -- returns Right (Just "world")
Enter fullscreen mode Exit fullscreen mode

Every command returns Either Reply a — the Left branch is a Redis protocol error, Right is success. In practice you'll pattern-match or use either to handle errors.


Step 1 — Define the Job Type

Create src/Job.hs:

{-# LANGUAGE DeriveGeneric #-}

module Job where

import Data.Aeson   (FromJSON, ToJSON, encode, decode)
import Data.Text    (Text)
import GHC.Generics (Generic)
import Data.ByteString.Lazy (ByteString)

-- Our job payload — swap this for whatever your domain needs
data Job = Job
  { jobId     :: Text
  , jobType   :: Text
  , payload   :: Text
  } deriving (Show, Eq, Generic)

instance ToJSON   Job
instance FromJSON Job

-- The Redis key we'll use as our queue
queueKey :: ByteString
queueKey = "jobs:queue"
Enter fullscreen mode Exit fullscreen mode

Keeping the job type generic means you can serialise anything that has a ToJSON instance — refund requests, email notifications, image processing tasks, whatever fits your system.


Step 2 — The Producer

Create src/Producer.hs:

module Producer where

import Database.Redis
import Data.Aeson          (encode)
import Data.ByteString.Lazy (toStrict)
import Control.Monad        (forM_)
import Job

-- Push a single job onto the left end of the list
enqueue :: Connection -> Job -> IO ()
enqueue conn job = do
    let encoded = toStrict (encode job)
    result <- runRedis conn $ lpush queueKey [encoded]
    case result of
        Left err    -> putStrLn $ "Enqueue error: " ++ show err
        Right count -> putStrLn $ "Job enqueued. Queue depth: " ++ show count

-- Simulate a burst of jobs — e.g. end-of-day refund batch
producerMain :: Connection -> IO ()
producerMain conn = do
    let jobs =
          [ Job "txn-001" "refund" "{\"amount\": 500,  \"currency\": \"INR\"}"
          , Job "txn-002" "refund" "{\"amount\": 1200, \"currency\": \"INR\"}"
          , Job "txn-003" "notify" "{\"email\": \"user@example.com\"}"
          , Job "txn-004" "refund" "{\"amount\": 300,  \"currency\": \"USD\"}"
          , Job "txn-005" "notify" "{\"email\": \"other@example.com\"}"
          ]
    putStrLn "Producer starting — pushing jobs..."
    forM_ jobs (enqueue conn)
    putStrLn "Producer done."
Enter fullscreen mode Exit fullscreen mode

Key point: lpush is atomic. Even if 100 producers call it simultaneously, each job lands on the queue exactly once. Redis serialises concurrent writes internally — no locks needed on your side.


Step 3 — The Consumer

Create src/Consumer.hs:

module Consumer where

import Database.Redis
import Data.Aeson          (decode)
import Data.ByteString.Lazy (fromStrict)
import Control.Monad        (forever)
import Job

-- Process a single job — replace this with your real business logic
processJob :: Job -> IO ()
processJob job = putStrLn $
    "[Worker] Processing " ++ show (jobType job) ++
    " | ID: "              ++ show (jobId job)   ++
    " | Payload: "         ++ show (payload job)

-- Block until a job is available, then process it
consumeOne :: Connection -> IO ()
consumeOne conn = do
    result <- runRedis conn $ brpop [queueKey] 30  -- 30s timeout
    case result of
        Left err           -> putStrLn $ "Redis error: " ++ show err
        Right Nothing      -> putStrLn   "Timeout — no jobs in 30s, polling again..."
        Right (Just (_, raw)) ->
            case decode (fromStrict raw) of
                Nothing  -> putStrLn $ "Failed to decode job: " ++ show raw
                Just job -> processJob job

-- Run forever, consuming jobs as they arrive
consumerMain :: Connection -> IO ()
consumerMain conn = do
    putStrLn "Consumer started — waiting for jobs..."
    forever (consumeOne conn)
Enter fullscreen mode Exit fullscreen mode

brpop is the magic here. It blocks the connection until an item is available on any of the listed keys, then atomically pops and returns it. The 30 is a timeout in seconds — after which it returns Right Nothing so you can loop cleanly rather than hanging forever.

This is fundamentally different from polling (RPOP in a loop with threadDelay) — blocking means zero CPU burn while the queue is empty.


Step 4 — Wire It Together

Create app/Main.hs:

module Main where

import Database.Redis
import Control.Concurrent.Async (concurrently_)
import Producer
import Consumer

main :: IO ()
main = do
    -- Connect to local Redis; swap defaultConnectInfo for your host/port/auth
    conn <- connect defaultConnectInfo

    -- Run producer and consumer concurrently
    -- In production you'd run these as separate processes/services
    concurrently_
        (producerMain conn)
        (consumerMain conn)
Enter fullscreen mode Exit fullscreen mode

concurrently_ from the async package runs both actions in parallel on separate OS threads, waiting for both to finish. In a real deployment you'd run the producer and consumer as separate services — this just wires them together for a clean demo.


Running It

# Terminal 1 — start Redis
redis-server

# Terminal 2 — run the app
cabal run redis-queue
Enter fullscreen mode Exit fullscreen mode

Expected output:

Producer starting — pushing jobs...
Job enqueued. Queue depth: 1
Job enqueued. Queue depth: 2
Job enqueued. Queue depth: 3
Job enqueued. Queue depth: 4
Job enqueued. Queue depth: 5
Producer done.
Consumer started — waiting for jobs...
[Worker] Processing "refund" | ID: "txn-001" | Payload: "{"amount": 500, "currency": "INR"}"
[Worker] Processing "refund" | ID: "txn-002" | Payload: "{"amount": 1200, "currency": "INR"}"
[Worker] Processing "notify" | ID: "txn-003" | Payload: "{"email": "user@example.com"}"
[Worker] Processing "refund" | ID: "txn-004" | Payload: "{"amount": 300, "currency": "USD"}"
[Worker] Processing "notify" | ID: "txn-005" | Payload: "{"email": "other@example.com"}"
Enter fullscreen mode Exit fullscreen mode

Step 5 — Scaling to Multiple Consumers

Want parallel workers? Spawn multiple consumers against the same queue:

import Control.Concurrent.Async (replicateConcurrently_)

main :: IO ()
main = do
    conn <- connect defaultConnectInfo
    -- Run 4 parallel consumer workers
    concurrently_
        (producerMain conn)
        (replicateConcurrently_ 4 (consumerMain conn))
Enter fullscreen mode Exit fullscreen mode

Because BRPOP is atomic, each job is delivered to exactly one consumer — no double-processing. Redis handles the fan-out natively.

You can verify this live:

# In a Redis CLI while the app runs:
redis-cli LLEN jobs:queue   # current queue depth
redis-cli MONITOR           # watch every command in real time
Enter fullscreen mode Exit fullscreen mode

Step 6 — Dead Letter Handling (Production Hardening)

In production, jobs can fail. You don't want failed jobs silently disappearing. Add a dead-letter queue:

deadLetterKey :: ByteString
deadLetterKey = "jobs:dead"

-- Consume with failure handling
consumeSafe :: Connection -> IO ()
consumeSafe conn = do
    result <- runRedis conn $ brpop [queueKey] 30
    case result of
        Right (Just (_, raw)) ->
            case decode (fromStrict raw) of
                Nothing  -> do
                    -- Malformed payload — send to dead letter queue
                    _ <- runRedis conn $ lpush deadLetterKey [raw]
                    putStrLn "Malformed job moved to dead letter queue"
                Just job ->
                    -- Wrap in exception handler for business logic failures
                    processJob job `catch` \(e :: SomeException) -> do
                        _ <- runRedis conn $ lpush deadLetterKey [raw]
                        putStrLn $ "Job failed, dead-lettered: " ++ show e
        _ -> pure ()
Enter fullscreen mode Exit fullscreen mode

Now failed jobs accumulate in jobs:dead where you can inspect, replay, or alert on them — no silent data loss.


Connecting to a Real Redis Host

For production (Redis Cloud, AWS ElastiCache, etc.):

import Database.Redis

productionConnInfo :: ConnectInfo
productionConnInfo = defaultConnectInfo
    { connectHost     = "your-redis-host.example.com"
    , connectPort     = PortNumber 6379
    , connectAuth     = Just "your-auth-password"
    , connectDatabase = 0
    , connectMaxConnections = 50   -- connection pool size
    }

main :: IO ()
main = do
    conn <- connect productionConnInfo
    ...
Enter fullscreen mode Exit fullscreen mode

For TLS (Redis Cloud, Upstash, etc.), use checkedConnect with connectTLSParams set.


What We Built vs. What Juspay Ran

The pattern here is the same core design behind Juspay's refund processing pipeline — with a few additions at scale:

This Tutorial Production at Juspay
In-memory job type Protobuf-encoded payloads
Single queue key Separate queues per refund type/priority
brpop timeout loop Supervised consumer pools with health checks
putStrLn processing Downstream bank API calls + DB writes
Basic dead-letter Dead-letter + retry with exponential backoff

The Redis primitives (LPUSH, BRPOP, atomic pops) are identical. Scaling up is mostly operational — more consumer replicas, queue-per-priority, monitoring via LLEN metrics fed into dashboards.


Key Takeaways

  • LPUSH + BRPOP is Redis's native producer-consumer primitive — atomic, fast, and simple
  • Hedis gives you a type-safe, monadic interface to Redis in Haskell with connection pooling built in
  • Blocking pop (BRPOP) beats polling — zero CPU overhead while the queue is idle
  • Dead-letter queues are non-negotiable in production — never let failed jobs disappear silently
  • This pattern scales horizontally: add consumers, add producers, the queue fans out automatically

Full Source Code

The complete working project is on GitHub: https://github.com/arnabdas1999/redis-hedis-queue


What's Next?

  • Priority queues — use multiple Redis lists (jobs:high, jobs:low) and pass both keys to BRPOP; Redis pops from the first non-empty list
  • Delayed jobs — use a Redis Sorted Set with the scheduled timestamp as the score; a scheduler process moves ready jobs to the main queue
  • Exactly-once delivery — combine BRPOPLPUSH with a processing list and a visibility timeout

Drop questions in the comments — happy to dig into any of these.


Arnab Das is an MS student at NYU Tandon and a software engineer who worked on payment infrastructure at Juspay, processing 200M+ daily transactions. Find him on LinkedIn and GitHub.

Top comments (0)