As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Distributed task queues form a critical component in modern software architectures, enabling efficient processing of background jobs and workload distribution. I've worked extensively with task queues in production environments, and Redis stands out as an excellent choice for implementing them in Go.
A task queue system consists of three main components: the queue itself, producers that add tasks, and consumers (workers) that process them. Redis provides the perfect foundation with its atomic operations and pub/sub capabilities.
Let's start with a basic implementation and gradually enhance it with advanced features.
type Task struct {
ID string `json:"id"`
Priority int `json:"priority"`
Payload any `json:"payload"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
Retries int `json:"retries"`
}
type Queue struct {
redis *redis.Client
name string
logger *log.Logger
}
The queue implementation needs to handle concurrent access efficiently. Here's how we can achieve this:
func (q *Queue) Enqueue(ctx context.Context, task *Task) error {
task.CreatedAt = time.Now()
task.Status = "pending"
encoded, err := json.Marshal(task)
if err != nil {
return fmt.Errorf("failed to encode task: %w", err)
}
return q.redis.ZAdd(ctx, q.name, &redis.Z{
Score: float64(task.Priority),
Member: encoded,
}).Err()
}
Worker processes need careful management. Here's an implementation with retry logic and error handling:
func (q *Queue) StartWorker(ctx context.Context, handler func(*Task) error) {
for {
select {
case <-ctx.Done():
return
default:
result, err := q.redis.ZPopMax(ctx, q.name).Result()
if err != nil {
time.Sleep(time.Second)
continue
}
var task Task
if err := json.Unmarshal([]byte(result[0].Member.(string)), &task); err != nil {
q.logger.Printf("Failed to decode task: %v", err)
continue
}
if err := handler(&task); err != nil {
task.Retries++
if task.Retries < 3 {
q.Enqueue(ctx, &task)
} else {
q.markAsFailed(ctx, &task, err)
}
}
}
}
}
Error handling and monitoring are crucial in distributed systems. Here's how we can implement them:
func (q *Queue) markAsFailed(ctx context.Context, task *Task, err error) {
task.Status = "failed"
encoded, _ := json.Marshal(task)
q.redis.HSet(ctx, q.name+":failed", task.ID, encoded)
q.redis.HSet(ctx, q.name+":errors", task.ID, err.Error())
}
func (q *Queue) GetStats(ctx context.Context) (Stats, error) {
pipe := q.redis.Pipeline()
pending := pipe.ZCard(ctx, q.name)
failed := pipe.HLen(ctx, q.name+":failed")
_, err := pipe.Exec(ctx)
if err != nil {
return Stats{}, err
}
return Stats{
PendingTasks: pending.Val(),
FailedTasks: failed.Val(),
}, nil
}
For better reliability, we can implement task prioritization and scheduling:
func (q *Queue) ScheduleTask(ctx context.Context, task *Task, executeAt time.Time) error {
encoded, err := json.Marshal(task)
if err != nil {
return err
}
return q.redis.ZAdd(ctx, q.name+":scheduled", &redis.Z{
Score: float64(executeAt.Unix()),
Member: encoded,
}).Err()
}
func (q *Queue) processScheduledTasks(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
now := float64(time.Now().Unix())
tasks, err := q.redis.ZRangeByScore(ctx, q.name+":scheduled", &redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%f", now),
}).Result()
if err != nil {
continue
}
for _, encoded := range tasks {
var task Task
if err := json.Unmarshal([]byte(encoded), &task); err != nil {
continue
}
q.Enqueue(ctx, &task)
}
}
}
}
For distributed environments, we need to handle worker coordination:
func (q *Queue) registerWorker(ctx context.Context, workerID string) error {
return q.redis.HSet(ctx, q.name+":workers", workerID, time.Now().Unix()).Err()
}
func (q *Queue) heartbeat(ctx context.Context, workerID string) {
ticker := time.NewTicker(time.Second * 30)
for {
select {
case <-ctx.Done():
q.redis.HDel(ctx, q.name+":workers", workerID)
return
case <-ticker.C:
q.redis.HSet(ctx, q.name+":workers", workerID, time.Now().Unix())
}
}
}
Here's how to use the queue in a real application:
func main() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
queue := NewQueue(client, "myapp:tasks")
ctx := context.Background()
// Start multiple workers
for i := 0; i < 3; i++ {
workerID := fmt.Sprintf("worker-%d", i)
go func() {
queue.registerWorker(ctx, workerID)
go queue.heartbeat(ctx, workerID)
queue.StartWorker(ctx, func(task *Task) error {
// Process task
return nil
})
}()
}
// Schedule tasks
queue.ScheduleTask(ctx, &Task{
ID: "task-1",
Priority: 1,
Payload: map[string]interface{}{"action": "send_email"},
}, time.Now().Add(time.Hour))
select {}
}
This implementation provides a robust foundation for a distributed task queue system. I've used similar patterns in production systems processing millions of tasks daily. The key is to balance complexity with reliability, ensuring the system can handle failures gracefully while maintaining performance.
Remember to implement proper monitoring and alerting for production use. Redis metrics, worker health, and task processing rates should be tracked to ensure system health.
The system can be extended with features like task cancellation, progress tracking, and result storage. These additions should be made based on specific use case requirements while maintaining the core reliability and performance characteristics.
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)