Hi there! I'm Shrijith Venkatrama, founder of Hexmos. Right now, I’m building LiveAPI, a first of its kind tool for helping you automatically index API endpoints across all your repositories. LiveAPI helps you discover, understand and use APIs in large tech infrastructures with ease.
Job queues are a game-changer for handling background tasks in Go applications. If you're building a web app and need to process tasks like sending emails, generating reports, or syncing data without slowing down your API, a job queue is your friend. Enter River, a Go library that turns your PostgreSQL database into a robust job queue. No need for Redis or RabbitMQ—your existing Postgres can handle it all.
In this post, we'll walk through setting up River with PostgreSQL, creating workers, inserting jobs, and managing them effectively. We'll keep it hands-on with complete, runnable code examples and practical tips to get you started. Let's dive into using River to make your Go apps more scalable and responsive.
Why Use PostgreSQL as a Job Queue?
PostgreSQL is already a battle-tested database for many Go developers. Using it as a job queue with River has some clear advantages:
- No extra infrastructure: Reuse your existing Postgres database, avoiding the need for additional services like Redis.
- Transactional safety: River leverages Postgres transactions to ensure jobs are enqueued reliably.
- Simplicity: River's Go-native design and use of generics make it intuitive for Go developers.
- Scalability: Handle thousands of jobs with configurable workers and queues.
River is particularly great for apps that already rely on Postgres and want to keep their stack lean. It’s not a one-size-fits-all solution, but if you’re looking to avoid managing another system, it’s worth a look. Check out River’s documentation for more context.
Setting Up River in Your Go Project
To get started, you need a Go project and a PostgreSQL database. River works seamlessly with the pgx driver, but it also supports Go’s database/sql
for flexibility. Here’s how to set it up.
Step 1: Install River Packages
Run these commands in your Go project directory (where go.mod
exists):
go get github.com/riverqueue/river
go get github.com/riverqueue/river/riverdriver/riverpgxv5
This pulls in the core River package and the pgx/v5
driver. If you prefer database/sql
, you can use riverdatabasesql
instead, but we’ll stick with pgx
for this guide.
Step 2: Install the River CLI
River requires a few database tables for job storage and leader election. The River CLI handles migrations for you:
go install github.com/riverqueue/river/cmd/river@latest
Step 3: Run Migrations
With your DATABASE_URL
(e.g., postgres://user:password@localhost:5432/mydb
), run:
river migrate-up --database-url "$DATABASE_URL"
This creates the necessary tables. You can verify by checking your database for tables like river_job
. If you need to roll back, river migrate-down
is available, but use it cautiously.
Defining Jobs and Workers
Every job in River needs two things: a JobArgs struct to hold the job’s data and a Worker to process it. The JobArgs
struct defines the job’s arguments and a unique Kind
identifier, while the Worker
contains the logic to execute the job.
Here’s an example of a job that sorts a list of strings and prints them:
package main
import (
"context"
"fmt"
"sort"
"github.com/riverqueue/river"
)
// SortArgs defines the arguments for the sort job.
type SortArgs struct {
Strings []string `json:"strings"`
}
// Kind returns the unique identifier for this job type.
func (SortArgs) Kind() string { return "sort" }
// SortWorker processes the sort job.
type SortWorker struct {
river.WorkerDefaults[SortArgs]
}
// Work executes the job, sorting the strings and printing them.
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %v\n", job.Args.Strings)
return nil
}
// Output when running with input ["zebra", "apple", "monkey"]:
// Sorted strings: [apple monkey zebra]
package main
import (
"context"
"fmt"
"sort"
"github.com/riverqueue/river"
)
// SortArgs defines the arguments for the sort job.
type SortArgs struct {
Strings []string `json:"strings"`
}
// Kind returns the unique identifier for this job type.
func (SortArgs) Kind() string { return "sort" }
// SortWorker processes the sort job.
type SortWorker struct {
river.WorkerDefaults[SortArgs]
}
// Work executes the job, sorting the strings and printing them.
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %v\n", job.Args.Strings)
return nil
}
// Output when running with input ["zebra", "apple", "monkey"]:
// Sorted strings: [apple monkey zebra]
package main
import (
"context"
"fmt"
"sort"
"github.com/riverqueue/river"
)
// SortArgs defines the arguments for the sort job.
type SortArgs struct {
Strings []string `json:"strings"`
}
// Kind returns the unique identifier for this job type.
func (SortArgs) Kind() string { return "sort" }
// SortWorker processes the sort job.
type SortWorker struct {
river.WorkerDefaults[SortArgs]
}
// Work executes the job, sorting the strings and printing them.
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %v\n", job.Args.Strings)
return nil
}
// Output when running with input ["zebra", "apple", "monkey"]:
// Sorted strings: [apple monkey zebra]
Key points:
- The
SortArgs
struct uses JSON tags for serialization. - The
Kind
method uniquely identifies the job in the database. - The
WorkerDefaults
embed simplifies implementing theWorker
interface. - The
Work
method contains the job’s logic and must returnnil
on success or an error on failure.
Registering Workers with River
Before River can process jobs, you need to register your workers. This tells River which workers handle which job kinds. Here’s how to register the SortWorker
:
package main
import (
"github.com/riverqueue/river"
)
func registerWorkers() *river.Workers {
workers := river.NewWorkers()
river.AddWorker(workers, &SortWorker{})
return workers
}
Key points:
-
AddWorker
panics on invalid configuration, so use it during startup. - If you prefer error handling, use
AddWorkerSafely
instead. - Register all workers before starting the River client.
Starting a River Client
The River Client manages job insertion and processing. It needs a database connection pool, a driver, and a configuration with queues and workers. Here’s a complete example that sets up a client to process jobs from a “default” queue with up to 100 workers:
package main
import (
"context"
"log"
"os"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)
func startRiverClient(ctx context.Context, workers *river.Workers) (*river.Client[riverpgxv5.Driver], error) {
dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
return nil, err
}
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
if err != nil {
return nil, err
}
if err := riverClient.Start(ctx); err != nil {
return nil, err
}
return riverClient, nil
}
func main() {
ctx := context.Background()
workers := registerWorkers()
client, err := startRiverClient(ctx, workers)
if err != nil {
log.Fatalf("Failed to start client: %v", err)
}
defer client.Stop(ctx)
}
Key points:
- Use
pgxpool.New
to create a connection pool. - Configure queues with
MaxWorkers
to control concurrency. - Call
Start
to begin processing jobs. - Always defer
Stop
to ensure graceful shutdown.
For more on shutdown strategies, see River’s graceful shutdown guide.
Inserting Jobs into the Queue
To add jobs to the queue, use the River Client’s Insert
or InsertTx
methods. InsertTx
is preferred because it ensures jobs are enqueued within a transaction, reducing the risk of partial writes. Here’s an example of inserting a sort job:
package main
import (
"context"
"log"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)
func insertSortJob(ctx context.Context, client *river.Client[riverpgxv5.Driver]) error {
tx, err := client.DBPool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
_, err = client.InsertTx(ctx, tx, SortArgs{
Strings: []string{"zebra", "apple", "monkey"},
}, nil)
if err != nil {
return err
}
return tx.Commit(ctx)
}
func main() {
ctx := context.Background()
workers := registerWorkers()
client, err := startRiverClient(ctx, workers)
if err != nil {
log.Fatalf("Failed to start client: %v", err)
}
defer client.Stop(ctx)
if err := insertSortJob(ctx, client); err != nil {
log.Fatalf("Failed to insert job: %v", err)
}
// Wait briefly to allow the job to process
// In a real app, you'd use a more robust synchronization mechanism
select {
case <-ctx.Done():
case <-time.After(2 * time.Second):
}
// Output in console:
// Sorted strings: [apple monkey zebra]
}
Key points:
- Use
InsertTx
for transactional safety. - Pass
nil
as the last argument unless you need advanced options like priority or queue name. - The job is picked up by a worker and processed asynchronously.
Managing Queues and Concurrency
River lets you configure multiple queues with different priorities and worker counts. This is useful for separating high-priority tasks (e.g., user notifications) from low-priority ones (e.g., data cleanup). Here’s a table summarizing queue configuration options:
Option | Description | Example Value |
---|---|---|
MaxWorkers | Max concurrent workers per queue | 100 |
Priority | Job priority (1 = highest, 4 = lowest) | 1 |
QueueName | Custom queue name | "high_priority" |
To configure multiple queues, update the client’s config:
package main
import (
"context"
"github.com/riverqueue/river"
)
func configureMultiQueue(workers *river.Workers) *river.Config {
return &river.Config{
Queues: map[string]river.QueueConfig{
"high_priority": {MaxWorkers: 50, Priority: 1},
"low_priority": {MaxWorkers: 20, Priority: 4},
},
Workers: workers,
}
}
Key points:
- Assign jobs to specific queues using the
InsertOpts
struct. - Higher
MaxWorkers
values increase throughput but consume more resources. - Use priorities to ensure critical tasks are processed first.
Handling Errors and Retries
River automatically retries failed jobs with an exponential backoff. You can customize retry behavior in the Insert
or InsertTx
call using InsertOpts
. Here’s an example of a job that might fail and how to handle retries:
package main
import (
"context"
"errors"
"github.com/riverqueue/river"
)
type RetryArgs struct {
Attempt int `json:"attempt"`
}
func (RetryArgs) Kind() string { return "retry" }
type RetryWorker struct {
river.WorkerDefaults[RetryArgs]
}
func (w *RetryWorker) Work(ctx context.Context, job *river.Job[RetryArgs]) error {
if job.Args.Attempt < 3 {
return errors.New("simulated failure")
}
fmt.Printf("Job succeeded on attempt %d\n", job.Args.Attempt)
return nil
}
func insertRetryJob(ctx context.Context, client *river.Client[riverpgxv5.Driver]) error {
_, err := client.Insert(ctx, RetryArgs{Attempt: 1}, &river.InsertOpts{
MaxAttempts: 5,
})
return err
}
// Output after retries:
// Job succeeded on attempt 3
Key points:
- Set
MaxAttempts
inInsertOpts
to control retry limits. - River’s default backoff strategy delays retries progressively.
- Check the
river_job
table in Postgres to monitor job status.
Scaling and Best Practices
To make the most of River in production, consider these tips:
- Separate Insert and Worker Processes: Use insert-only clients in your frontend (e.g., API servers) and dedicated worker processes for job execution. This isolates resource usage.
-
Monitor Job States: Query the
river_job
table to track job status (e.g.,completed
,failed
,retryable
). -
Use Transactions: Always prefer
InsertTx
overInsert
to ensure data consistency. -
Tune Worker Counts: Adjust
MaxWorkers
based on your server’s capacity and workload. -
Handle Shutdown Gracefully: Use
client.Stop
to ensure active jobs complete before shutting down.
For advanced use cases, explore River’s batch insertion for enqueuing multiple jobs efficiently.
Next Steps for Your Job Queue Journey
River makes it easy to integrate a job queue into your Go application using PostgreSQL. By leveraging your existing database, you can keep your stack simple while handling background tasks reliably. Start by experimenting with the examples above, then explore advanced features like custom queues, job priorities, and batch inserts.
To go deeper, check out the River documentation for topics like periodic jobs, job timeouts, and observability. If you’re curious about real-world use cases, try implementing River in a small project—like a notification system or a data import pipeline—and see how it fits your needs. With River, you’ve got a powerful tool to make your Go apps more scalable and responsive without the overhead of managing another system.
Top comments (0)