DEV Community

Shrijith Venkatramana
Shrijith Venkatramana

Posted on

Power Up Your Go Apps: Using PostgreSQL as a Job Queue with River

Hi there! I'm Shrijith Venkatrama, founder of Hexmos. Right now, I’m building LiveAPI, a first of its kind tool for helping you automatically index API endpoints across all your repositories. LiveAPI helps you discover, understand and use APIs in large tech infrastructures with ease.

Job queues are a game-changer for handling background tasks in Go applications. If you're building a web app and need to process tasks like sending emails, generating reports, or syncing data without slowing down your API, a job queue is your friend. Enter River, a Go library that turns your PostgreSQL database into a robust job queue. No need for Redis or RabbitMQ—your existing Postgres can handle it all.

In this post, we'll walk through setting up River with PostgreSQL, creating workers, inserting jobs, and managing them effectively. We'll keep it hands-on with complete, runnable code examples and practical tips to get you started. Let's dive into using River to make your Go apps more scalable and responsive.

Why Use PostgreSQL as a Job Queue?

PostgreSQL is already a battle-tested database for many Go developers. Using it as a job queue with River has some clear advantages:

  • No extra infrastructure: Reuse your existing Postgres database, avoiding the need for additional services like Redis.
  • Transactional safety: River leverages Postgres transactions to ensure jobs are enqueued reliably.
  • Simplicity: River's Go-native design and use of generics make it intuitive for Go developers.
  • Scalability: Handle thousands of jobs with configurable workers and queues.

River is particularly great for apps that already rely on Postgres and want to keep their stack lean. It’s not a one-size-fits-all solution, but if you’re looking to avoid managing another system, it’s worth a look. Check out River’s documentation for more context.

Setting Up River in Your Go Project

To get started, you need a Go project and a PostgreSQL database. River works seamlessly with the pgx driver, but it also supports Go’s database/sql for flexibility. Here’s how to set it up.

Step 1: Install River Packages

Run these commands in your Go project directory (where go.mod exists):

go get github.com/riverqueue/river
go get github.com/riverqueue/river/riverdriver/riverpgxv5
Enter fullscreen mode Exit fullscreen mode

This pulls in the core River package and the pgx/v5 driver. If you prefer database/sql, you can use riverdatabasesql instead, but we’ll stick with pgx for this guide.

Step 2: Install the River CLI

River requires a few database tables for job storage and leader election. The River CLI handles migrations for you:

go install github.com/riverqueue/river/cmd/river@latest
Enter fullscreen mode Exit fullscreen mode

Step 3: Run Migrations

With your DATABASE_URL (e.g., postgres://user:password@localhost:5432/mydb), run:

river migrate-up --database-url "$DATABASE_URL"
Enter fullscreen mode Exit fullscreen mode

This creates the necessary tables. You can verify by checking your database for tables like river_job. If you need to roll back, river migrate-down is available, but use it cautiously.

Defining Jobs and Workers

Every job in River needs two things: a JobArgs struct to hold the job’s data and a Worker to process it. The JobArgs struct defines the job’s arguments and a unique Kind identifier, while the Worker contains the logic to execute the job.

Here’s an example of a job that sorts a list of strings and prints them:

package main

import (
    "context"
    "fmt"
    "sort"

    "github.com/riverqueue/river"
)

// SortArgs defines the arguments for the sort job.
type SortArgs struct {
    Strings []string `json:"strings"`
}

// Kind returns the unique identifier for this job type.
func (SortArgs) Kind() string { return "sort" }

// SortWorker processes the sort job.
type SortWorker struct {
    river.WorkerDefaults[SortArgs]
}

// Work executes the job, sorting the strings and printing them.
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    sort.Strings(job.Args.Strings)
    fmt.Printf("Sorted strings: %v\n", job.Args.Strings)
    return nil
}

// Output when running with input ["zebra", "apple", "monkey"]:
// Sorted strings: [apple monkey zebra]
Enter fullscreen mode Exit fullscreen mode
package main

import (
    "context"
    "fmt"
    "sort"

    "github.com/riverqueue/river"
)

// SortArgs defines the arguments for the sort job.
type SortArgs struct {
    Strings []string `json:"strings"`
}

// Kind returns the unique identifier for this job type.
func (SortArgs) Kind() string { return "sort" }

// SortWorker processes the sort job.
type SortWorker struct {
    river.WorkerDefaults[SortArgs]
}

// Work executes the job, sorting the strings and printing them.
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    sort.Strings(job.Args.Strings)
    fmt.Printf("Sorted strings: %v\n", job.Args.Strings)
    return nil
}

// Output when running with input ["zebra", "apple", "monkey"]:
// Sorted strings: [apple monkey zebra]
Enter fullscreen mode Exit fullscreen mode
package main

import (
    "context"
    "fmt"
    "sort"

    "github.com/riverqueue/river"
)

// SortArgs defines the arguments for the sort job.
type SortArgs struct {
    Strings []string `json:"strings"`
}

// Kind returns the unique identifier for this job type.
func (SortArgs) Kind() string { return "sort" }

// SortWorker processes the sort job.
type SortWorker struct {
    river.WorkerDefaults[SortArgs]
}

// Work executes the job, sorting the strings and printing them.
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    sort.Strings(job.Args.Strings)
    fmt.Printf("Sorted strings: %v\n", job.Args.Strings)
    return nil
}

// Output when running with input ["zebra", "apple", "monkey"]:
// Sorted strings: [apple monkey zebra]
Enter fullscreen mode Exit fullscreen mode

Key points:

  • The SortArgs struct uses JSON tags for serialization.
  • The Kind method uniquely identifies the job in the database.
  • The WorkerDefaults embed simplifies implementing the Worker interface.
  • The Work method contains the job’s logic and must return nil on success or an error on failure.

Registering Workers with River

Before River can process jobs, you need to register your workers. This tells River which workers handle which job kinds. Here’s how to register the SortWorker:

package main

import (
    "github.com/riverqueue/river"
)

func registerWorkers() *river.Workers {
    workers := river.NewWorkers()
    river.AddWorker(workers, &SortWorker{})
    return workers
}
Enter fullscreen mode Exit fullscreen mode

Key points:

  • AddWorker panics on invalid configuration, so use it during startup.
  • If you prefer error handling, use AddWorkerSafely instead.
  • Register all workers before starting the River client.

Starting a River Client

The River Client manages job insertion and processing. It needs a database connection pool, a driver, and a configuration with queues and workers. Here’s a complete example that sets up a client to process jobs from a “default” queue with up to 100 workers:

package main

import (
    "context"
    "log"
    "os"

    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/riverqueue/river"
    "github.com/riverqueue/river/riverdriver/riverpgxv5"
)

func startRiverClient(ctx context.Context, workers *river.Workers) (*river.Client[riverpgxv5.Driver], error) {
    dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
    if err != nil {
        return nil, err
    }

    riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
        Queues: map[string]river.QueueConfig{
            river.QueueDefault: {MaxWorkers: 100},
        },
        Workers: workers,
    })
    if err != nil {
        return nil, err
    }

    if err := riverClient.Start(ctx); err != nil {
        return nil, err
    }

    return riverClient, nil
}

func main() {
    ctx := context.Background()
    workers := registerWorkers()
    client, err := startRiverClient(ctx, workers)
    if err != nil {
        log.Fatalf("Failed to start client: %v", err)
    }
    defer client.Stop(ctx)
}
Enter fullscreen mode Exit fullscreen mode

Key points:

  • Use pgxpool.New to create a connection pool.
  • Configure queues with MaxWorkers to control concurrency.
  • Call Start to begin processing jobs.
  • Always defer Stop to ensure graceful shutdown.

For more on shutdown strategies, see River’s graceful shutdown guide.

Inserting Jobs into the Queue

To add jobs to the queue, use the River Client’s Insert or InsertTx methods. InsertTx is preferred because it ensures jobs are enqueued within a transaction, reducing the risk of partial writes. Here’s an example of inserting a sort job:

package main

import (
    "context"
    "log"

    "github.com/jackc/pgx/v5"
    "github.com/riverqueue/river"
    "github.com/riverqueue/river/riverdriver/riverpgxv5"
)

func insertSortJob(ctx context.Context, client *river.Client[riverpgxv5.Driver]) error {
    tx, err := client.DBPool.Begin(ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(ctx)

    _, err = client.InsertTx(ctx, tx, SortArgs{
        Strings: []string{"zebra", "apple", "monkey"},
    }, nil)
    if err != nil {
        return err
    }

    return tx.Commit(ctx)
}

func main() {
    ctx := context.Background()
    workers := registerWorkers()
    client, err := startRiverClient(ctx, workers)
    if err != nil {
        log.Fatalf("Failed to start client: %v", err)
    }
    defer client.Stop(ctx)

    if err := insertSortJob(ctx, client); err != nil {
        log.Fatalf("Failed to insert job: %v", err)
    }

    // Wait briefly to allow the job to process
    // In a real app, you'd use a more robust synchronization mechanism
    select {
    case <-ctx.Done():
    case <-time.After(2 * time.Second):
    }

    // Output in console:
    // Sorted strings: [apple monkey zebra]
}
Enter fullscreen mode Exit fullscreen mode

Key points:

  • Use InsertTx for transactional safety.
  • Pass nil as the last argument unless you need advanced options like priority or queue name.
  • The job is picked up by a worker and processed asynchronously.

Managing Queues and Concurrency

River lets you configure multiple queues with different priorities and worker counts. This is useful for separating high-priority tasks (e.g., user notifications) from low-priority ones (e.g., data cleanup). Here’s a table summarizing queue configuration options:

Option Description Example Value
MaxWorkers Max concurrent workers per queue 100
Priority Job priority (1 = highest, 4 = lowest) 1
QueueName Custom queue name "high_priority"

To configure multiple queues, update the client’s config:

package main

import (
    "context"
    "github.com/riverqueue/river"
)

func configureMultiQueue(workers *river.Workers) *river.Config {
    return &river.Config{
        Queues: map[string]river.QueueConfig{
            "high_priority": {MaxWorkers: 50, Priority: 1},
            "low_priority":  {MaxWorkers: 20, Priority: 4},
        },
        Workers: workers,
    }
}
Enter fullscreen mode Exit fullscreen mode

Key points:

  • Assign jobs to specific queues using the InsertOpts struct.
  • Higher MaxWorkers values increase throughput but consume more resources.
  • Use priorities to ensure critical tasks are processed first.

Handling Errors and Retries

River automatically retries failed jobs with an exponential backoff. You can customize retry behavior in the Insert or InsertTx call using InsertOpts. Here’s an example of a job that might fail and how to handle retries:

package main

import (
    "context"
    "errors"
    "github.com/riverqueue/river"
)

type RetryArgs struct {
    Attempt int `json:"attempt"`
}

func (RetryArgs) Kind() string { return "retry" }

type RetryWorker struct {
    river.WorkerDefaults[RetryArgs]
}

func (w *RetryWorker) Work(ctx context.Context, job *river.Job[RetryArgs]) error {
    if job.Args.Attempt < 3 {
        return errors.New("simulated failure")
    }
    fmt.Printf("Job succeeded on attempt %d\n", job.Args.Attempt)
    return nil
}

func insertRetryJob(ctx context.Context, client *river.Client[riverpgxv5.Driver]) error {
    _, err := client.Insert(ctx, RetryArgs{Attempt: 1}, &river.InsertOpts{
        MaxAttempts: 5,
    })
    return err
}

// Output after retries:
// Job succeeded on attempt 3
Enter fullscreen mode Exit fullscreen mode

Key points:

  • Set MaxAttempts in InsertOpts to control retry limits.
  • River’s default backoff strategy delays retries progressively.
  • Check the river_job table in Postgres to monitor job status.

Scaling and Best Practices

To make the most of River in production, consider these tips:

  • Separate Insert and Worker Processes: Use insert-only clients in your frontend (e.g., API servers) and dedicated worker processes for job execution. This isolates resource usage.
  • Monitor Job States: Query the river_job table to track job status (e.g., completed, failed, retryable).
  • Use Transactions: Always prefer InsertTx over Insert to ensure data consistency.
  • Tune Worker Counts: Adjust MaxWorkers based on your server’s capacity and workload.
  • Handle Shutdown Gracefully: Use client.Stop to ensure active jobs complete before shutting down.

For advanced use cases, explore River’s batch insertion for enqueuing multiple jobs efficiently.

Next Steps for Your Job Queue Journey

River makes it easy to integrate a job queue into your Go application using PostgreSQL. By leveraging your existing database, you can keep your stack simple while handling background tasks reliably. Start by experimenting with the examples above, then explore advanced features like custom queues, job priorities, and batch inserts.

To go deeper, check out the River documentation for topics like periodic jobs, job timeouts, and observability. If you’re curious about real-world use cases, try implementing River in a small project—like a notification system or a data import pipeline—and see how it fits your needs. With River, you’ve got a powerful tool to make your Go apps more scalable and responsive without the overhead of managing another system.

Top comments (0)