DEV Community

Cover image for Building Distributed Task Queues in Go with Redis: A Production-Ready Guide
Aarav Joshi
Aarav Joshi

Posted on

Building Distributed Task Queues in Go with Redis: A Production-Ready Guide

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Distributed task queues form a critical component in modern software architectures, enabling efficient processing of background jobs and workload distribution. I've worked extensively with task queues in production environments, and Redis stands out as an excellent choice for implementing them in Go.

A task queue system consists of three main components: the queue itself, producers that add tasks, and consumers (workers) that process them. Redis provides the perfect foundation with its atomic operations and pub/sub capabilities.

Let's start with a basic implementation and gradually enhance it with advanced features.

type Task struct {
    ID        string    `json:"id"`
    Priority  int       `json:"priority"`
    Payload   any      `json:"payload"`
    Status    string    `json:"status"`
    CreatedAt time.Time `json:"created_at"`
    Retries   int       `json:"retries"`
}

type Queue struct {
    redis  *redis.Client
    name   string
    logger *log.Logger
}
Enter fullscreen mode Exit fullscreen mode

The queue implementation needs to handle concurrent access efficiently. Here's how we can achieve this:

func (q *Queue) Enqueue(ctx context.Context, task *Task) error {
    task.CreatedAt = time.Now()
    task.Status = "pending"

    encoded, err := json.Marshal(task)
    if err != nil {
        return fmt.Errorf("failed to encode task: %w", err)
    }

    return q.redis.ZAdd(ctx, q.name, &redis.Z{
        Score:  float64(task.Priority),
        Member: encoded,
    }).Err()
}
Enter fullscreen mode Exit fullscreen mode

Worker processes need careful management. Here's an implementation with retry logic and error handling:

func (q *Queue) StartWorker(ctx context.Context, handler func(*Task) error) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            result, err := q.redis.ZPopMax(ctx, q.name).Result()
            if err != nil {
                time.Sleep(time.Second)
                continue
            }

            var task Task
            if err := json.Unmarshal([]byte(result[0].Member.(string)), &task); err != nil {
                q.logger.Printf("Failed to decode task: %v", err)
                continue
            }

            if err := handler(&task); err != nil {
                task.Retries++
                if task.Retries < 3 {
                    q.Enqueue(ctx, &task)
                } else {
                    q.markAsFailed(ctx, &task, err)
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Error handling and monitoring are crucial in distributed systems. Here's how we can implement them:

func (q *Queue) markAsFailed(ctx context.Context, task *Task, err error) {
    task.Status = "failed"
    encoded, _ := json.Marshal(task)
    q.redis.HSet(ctx, q.name+":failed", task.ID, encoded)
    q.redis.HSet(ctx, q.name+":errors", task.ID, err.Error())
}

func (q *Queue) GetStats(ctx context.Context) (Stats, error) {
    pipe := q.redis.Pipeline()
    pending := pipe.ZCard(ctx, q.name)
    failed := pipe.HLen(ctx, q.name+":failed")

    _, err := pipe.Exec(ctx)
    if err != nil {
        return Stats{}, err
    }

    return Stats{
        PendingTasks: pending.Val(),
        FailedTasks:  failed.Val(),
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

For better reliability, we can implement task prioritization and scheduling:

func (q *Queue) ScheduleTask(ctx context.Context, task *Task, executeAt time.Time) error {
    encoded, err := json.Marshal(task)
    if err != nil {
        return err
    }

    return q.redis.ZAdd(ctx, q.name+":scheduled", &redis.Z{
        Score:  float64(executeAt.Unix()),
        Member: encoded,
    }).Err()
}

func (q *Queue) processScheduledTasks(ctx context.Context) {
    ticker := time.NewTicker(time.Minute)
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            now := float64(time.Now().Unix())
            tasks, err := q.redis.ZRangeByScore(ctx, q.name+":scheduled", &redis.ZRangeBy{
                Min: "-inf",
                Max: fmt.Sprintf("%f", now),
            }).Result()

            if err != nil {
                continue
            }

            for _, encoded := range tasks {
                var task Task
                if err := json.Unmarshal([]byte(encoded), &task); err != nil {
                    continue
                }
                q.Enqueue(ctx, &task)
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

For distributed environments, we need to handle worker coordination:

func (q *Queue) registerWorker(ctx context.Context, workerID string) error {
    return q.redis.HSet(ctx, q.name+":workers", workerID, time.Now().Unix()).Err()
}

func (q *Queue) heartbeat(ctx context.Context, workerID string) {
    ticker := time.NewTicker(time.Second * 30)
    for {
        select {
        case <-ctx.Done():
            q.redis.HDel(ctx, q.name+":workers", workerID)
            return
        case <-ticker.C:
            q.redis.HSet(ctx, q.name+":workers", workerID, time.Now().Unix())
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Here's how to use the queue in a real application:

func main() {
    client := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    queue := NewQueue(client, "myapp:tasks")
    ctx := context.Background()

    // Start multiple workers
    for i := 0; i < 3; i++ {
        workerID := fmt.Sprintf("worker-%d", i)
        go func() {
            queue.registerWorker(ctx, workerID)
            go queue.heartbeat(ctx, workerID)

            queue.StartWorker(ctx, func(task *Task) error {
                // Process task
                return nil
            })
        }()
    }

    // Schedule tasks
    queue.ScheduleTask(ctx, &Task{
        ID:       "task-1",
        Priority: 1,
        Payload:  map[string]interface{}{"action": "send_email"},
    }, time.Now().Add(time.Hour))

    select {}
}
Enter fullscreen mode Exit fullscreen mode

This implementation provides a robust foundation for a distributed task queue system. I've used similar patterns in production systems processing millions of tasks daily. The key is to balance complexity with reliability, ensuring the system can handle failures gracefully while maintaining performance.

Remember to implement proper monitoring and alerting for production use. Redis metrics, worker health, and task processing rates should be tracked to ensure system health.

The system can be extended with features like task cancellation, progress tracking, and result storage. These additions should be made based on specific use case requirements while maintaining the core reliability and performance characteristics.


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more