DEV Community

Cover image for Building High-Performance Message Queues in Go: A Developer's Guide
Aarav Joshi
Aarav Joshi

Posted on

1

Building High-Performance Message Queues in Go: A Developer's Guide

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

In the fast-paced world of software development, efficient message queuing systems are critical for building responsive, scalable applications. Go has emerged as an ideal language for implementing these systems due to its built-in concurrency model and efficient resource management. I've spent years working with message queues in Go and want to share practical insights on creating robust solutions for real-time applications.

Message queues serve as intermediaries between components in distributed systems, enabling asynchronous communication, load balancing, and system resilience. With Go's goroutines and channels, we can construct highly efficient queuing mechanisms tailored to specific application needs.

The fundamental concept behind message queues is simple: producers send messages to a queue, and consumers retrieve them for processing. However, building a production-ready implementation requires careful consideration of concurrency, persistence, and error handling.

Let's start with a basic in-memory queue implementation:

package queue

import (
    "sync"
    "time"
)

type Message struct {
    ID        string
    Body      []byte
    Timestamp time.Time
    Metadata  map[string]string
}

type InMemoryQueue struct {
    messages  []*Message
    mutex     sync.RWMutex
    notEmpty  *sync.Cond
    maxSize   int
    consumers []chan *Message
}

func NewInMemoryQueue(maxSize int) *InMemoryQueue {
    q := &InMemoryQueue{
        messages:  make([]*Message, 0, maxSize),
        maxSize:   maxSize,
        consumers: make([]chan *Message, 0),
    }
    q.notEmpty = sync.NewCond(&q.mutex)
    return q
}

func (q *InMemoryQueue) Enqueue(msg *Message) bool {
    q.mutex.Lock()
    defer q.mutex.Unlock()

    if len(q.messages) >= q.maxSize {
        return false
    }

    q.messages = append(q.messages, msg)
    q.notEmpty.Signal()

    // Notify all consumers
    for _, ch := range q.consumers {
        select {
        case ch <- msg:
            // Message sent to consumer
        default:
            // Consumer's buffer is full, skip
        }
    }

    return true
}

func (q *InMemoryQueue) Dequeue() *Message {
    q.mutex.Lock()
    defer q.mutex.Unlock()

    for len(q.messages) == 0 {
        q.notEmpty.Wait()
    }

    msg := q.messages[0]
    q.messages = q.messages[1:]
    return msg
}

func (q *InMemoryQueue) Subscribe(bufferSize int) <-chan *Message {
    q.mutex.Lock()
    defer q.mutex.Unlock()

    ch := make(chan *Message, bufferSize)
    q.consumers = append(q.consumers, ch)
    return ch
}
Enter fullscreen mode Exit fullscreen mode

This simple implementation demonstrates core concepts but lacks many features needed for production use. Real-world message queues require persistence, delivery guarantees, and failure handling.

For persistent message storage, we can integrate with a database or file system:

type PersistentQueue struct {
    InMemoryQueue
    db          *sql.DB
    persistLock sync.Mutex
}

func NewPersistentQueue(maxSize int, dbConn *sql.DB) (*PersistentQueue, error) {
    q := &PersistentQueue{
        InMemoryQueue: *NewInMemoryQueue(maxSize),
        db:            dbConn,
    }

    // Create table if not exists
    _, err := dbConn.Exec(`
        CREATE TABLE IF NOT EXISTS messages (
            id TEXT PRIMARY KEY,
            body BLOB,
            timestamp INTEGER,
            metadata TEXT,
            processed BOOLEAN DEFAULT FALSE
        )
    `)
    if err != nil {
        return nil, err
    }

    // Load unprocessed messages
    err = q.loadMessages()
    if err != nil {
        return nil, err
    }

    return q, nil
}

func (q *PersistentQueue) loadMessages() error {
    rows, err := q.db.Query("SELECT id, body, timestamp, metadata FROM messages WHERE processed = FALSE ORDER BY timestamp")
    if err != nil {
        return err
    }
    defer rows.Close()

    for rows.Next() {
        var id string
        var body []byte
        var timestamp int64
        var metadataJSON string

        err = rows.Scan(&id, &body, &timestamp, &metadataJSON)
        if err != nil {
            return err
        }

        metadata := make(map[string]string)
        err = json.Unmarshal([]byte(metadataJSON), &metadata)
        if err != nil {
            return err
        }

        msg := &Message{
            ID:        id,
            Body:      body,
            Timestamp: time.Unix(timestamp, 0),
            Metadata:  metadata,
        }

        q.InMemoryQueue.Enqueue(msg)
    }

    return rows.Err()
}

func (q *PersistentQueue) Enqueue(msg *Message) bool {
    q.persistLock.Lock()
    defer q.persistLock.Unlock()

    metadataJSON, err := json.Marshal(msg.Metadata)
    if err != nil {
        return false
    }

    _, err = q.db.Exec(
        "INSERT INTO messages (id, body, timestamp, metadata) VALUES (?, ?, ?, ?)",
        msg.ID, msg.Body, msg.Timestamp.Unix(), metadataJSON,
    )
    if err != nil {
        return false
    }

    return q.InMemoryQueue.Enqueue(msg)
}

func (q *PersistentQueue) MarkProcessed(msgID string) error {
    _, err := q.db.Exec("UPDATE messages SET processed = TRUE WHERE id = ?", msgID)
    return err
}
Enter fullscreen mode Exit fullscreen mode

For high-throughput applications, we need to consider message batching to minimize I/O operations:

func (q *PersistentQueue) EnqueueBatch(messages []*Message) (int, error) {
    q.persistLock.Lock()
    defer q.persistLock.Unlock()

    tx, err := q.db.Begin()
    if err != nil {
        return 0, err
    }

    stmt, err := tx.Prepare("INSERT INTO messages (id, body, timestamp, metadata) VALUES (?, ?, ?, ?)")
    if err != nil {
        tx.Rollback()
        return 0, err
    }
    defer stmt.Close()

    successCount := 0
    for _, msg := range messages {
        metadataJSON, err := json.Marshal(msg.Metadata)
        if err != nil {
            continue
        }

        _, err = stmt.Exec(msg.ID, msg.Body, msg.Timestamp.Unix(), metadataJSON)
        if err != nil {
            continue
        }

        if q.InMemoryQueue.Enqueue(msg) {
            successCount++
        }
    }

    if err := tx.Commit(); err != nil {
        return successCount, err
    }

    return successCount, nil
}
Enter fullscreen mode Exit fullscreen mode

To ensure message delivery in distributed systems, we need acknowledgment mechanisms:

type ConsumerConfig struct {
    BatchSize      int
    VisibilityTime time.Duration
    MaxRetries     int
}

func (q *PersistentQueue) ConsumeWithAck(config ConsumerConfig) (<-chan *Message, chan<- string) {
    messagesChan := make(chan *Message, config.BatchSize)
    ackChan := make(chan string, config.BatchSize)

    go func() {
        pending := make(map[string]time.Time)
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()

        for {
            select {
            case id := <-ackChan:
                delete(pending, id)
                q.MarkProcessed(id)

            case <-ticker.C:
                now := time.Now()
                q.mutex.Lock()
                for id, deadline := range pending {
                    if now.After(deadline) {
                        // Re-deliver message
                        for i, msg := range q.messages {
                            if msg.ID == id {
                                // Check retry count
                                retries, _ := strconv.Atoi(msg.Metadata["retries"])
                                if retries < config.MaxRetries {
                                    msg.Metadata["retries"] = strconv.Itoa(retries + 1)
                                    messagesChan <- msg
                                    pending[id] = now.Add(config.VisibilityTime)
                                } else {
                                    // Move to dead letter queue
                                    q.moveToDeadLetter(msg)
                                    delete(pending, id)
                                }
                                break
                            }
                        }
                    }
                }
                q.mutex.Unlock()

            default:
                msg := q.Dequeue()
                if msg != nil {
                    if _, exists := msg.Metadata["retries"]; !exists {
                        msg.Metadata["retries"] = "0"
                    }
                    messagesChan <- msg
                    pending[msg.ID] = time.Now().Add(config.VisibilityTime)
                }
            }
        }
    }()

    return messagesChan, ackChan
}

func (q *PersistentQueue) moveToDeadLetter(msg *Message) error {
    _, err := q.db.Exec(
        "INSERT INTO dead_letter_queue SELECT * FROM messages WHERE id = ?",
        msg.ID,
    )
    if err != nil {
        return err
    }

    _, err = q.db.Exec("DELETE FROM messages WHERE id = ?", msg.ID)
    return err
}
Enter fullscreen mode Exit fullscreen mode

For scaling across multiple processes or machines, we can implement partitioning:

type PartitionedQueue struct {
    partitions []*PersistentQueue
    partitioner func(*Message) int
}

func NewPartitionedQueue(partitionCount int, maxSizePerPartition int, dbConnections []*sql.DB) (*PartitionedQueue, error) {
    if len(dbConnections) != partitionCount {
        return nil, errors.New("number of DB connections must match partition count")
    }

    q := &PartitionedQueue{
        partitions: make([]*PersistentQueue, partitionCount),
        partitioner: func(msg *Message) int {
            // Default partitioning by message ID hash
            h := fnv.New32a()
            h.Write([]byte(msg.ID))
            return int(h.Sum32() % uint32(partitionCount))
        },
    }

    for i := 0; i < partitionCount; i++ {
        partition, err := NewPersistentQueue(maxSizePerPartition, dbConnections[i])
        if err != nil {
            return nil, err
        }
        q.partitions[i] = partition
    }

    return q, nil
}

func (q *PartitionedQueue) SetPartitioner(fn func(*Message) int) {
    q.partitioner = fn
}

func (q *PartitionedQueue) Enqueue(msg *Message) bool {
    partition := q.partitioner(msg)
    return q.partitions[partition].Enqueue(msg)
}

func (q *PartitionedQueue) EnqueueBatch(messages []*Message) int {
    // Group messages by partition
    partitionedMsgs := make([][]*Message, len(q.partitions))
    for _, msg := range messages {
        partition := q.partitioner(msg)
        partitionedMsgs[partition] = append(partitionedMsgs[partition], msg)
    }

    // Enqueue to each partition
    total := 0
    var wg sync.WaitGroup
    results := make([]int, len(q.partitions))

    for i, msgs := range partitionedMsgs {
        if len(msgs) == 0 {
            continue
        }

        wg.Add(1)
        go func(idx int, batch []*Message) {
            defer wg.Done()
            count, _ := q.partitions[idx].EnqueueBatch(batch)
            results[idx] = count
        }(i, msgs)
    }

    wg.Wait()

    for _, count := range results {
        total += count
    }

    return total
}
Enter fullscreen mode Exit fullscreen mode

For monitoring queue health and performance, we can add metrics collection:

type QueueMetrics struct {
    EnqueueCount        int64
    DequeueCount        int64
    EnqueueErrors       int64
    Size                int64
    OldestMessageAge    time.Duration
    ProcessingLatency   time.Duration
    AverageMessageSize  int64
    mutex               sync.Mutex
}

func NewQueueMetrics() *QueueMetrics {
    return &QueueMetrics{}
}

func (m *QueueMetrics) RecordEnqueue(success bool, size int) {
    m.mutex.Lock()
    defer m.mutex.Unlock()

    if success {
        atomic.AddInt64(&m.EnqueueCount, 1)
        atomic.StoreInt64(&m.Size, atomic.LoadInt64(&m.Size)+1)

        // Update average message size
        current := atomic.LoadInt64(&m.AverageMessageSize)
        count := atomic.LoadInt64(&m.EnqueueCount)
        if count > 1 {
            newAvg := (current*(count-1) + int64(size)) / count
            atomic.StoreInt64(&m.AverageMessageSize, newAvg)
        } else {
            atomic.StoreInt64(&m.AverageMessageSize, int64(size))
        }
    } else {
        atomic.AddInt64(&m.EnqueueErrors, 1)
    }
}

func (m *QueueMetrics) RecordDequeue(processingTime time.Duration) {
    m.mutex.Lock()
    defer m.mutex.Unlock()

    atomic.AddInt64(&m.DequeueCount, 1)
    atomic.StoreInt64(&m.Size, atomic.LoadInt64(&m.Size)-1)

    // Update processing latency
    current := m.ProcessingLatency
    count := atomic.LoadInt64(&m.DequeueCount)
    if count > 1 {
        m.ProcessingLatency = (current*time.Duration(count-1) + processingTime) / time.Duration(count)
    } else {
        m.ProcessingLatency = processingTime
    }
}

func (m *QueueMetrics) UpdateOldestMessageAge(age time.Duration) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.OldestMessageAge = age
}

func (m *QueueMetrics) GetMetrics() map[string]interface{} {
    m.mutex.Lock()
    defer m.mutex.Unlock()

    return map[string]interface{}{
        "enqueue_count":       atomic.LoadInt64(&m.EnqueueCount),
        "dequeue_count":       atomic.LoadInt64(&m.DequeueCount),
        "enqueue_errors":      atomic.LoadInt64(&m.EnqueueErrors),
        "size":                atomic.LoadInt64(&m.Size),
        "oldest_message_age":  m.OldestMessageAge.Milliseconds(),
        "processing_latency":  m.ProcessingLatency.Milliseconds(),
        "average_message_size": atomic.LoadInt64(&m.AverageMessageSize),
    }
}
Enter fullscreen mode Exit fullscreen mode

In real-time applications, priority queuing becomes essential for ensuring timely processing of critical messages:

type PriorityQueue struct {
    queues    []*InMemoryQueue
    levels    int
    metrics   *QueueMetrics
}

func NewPriorityQueue(levels int, maxSizePerLevel int) *PriorityQueue {
    pq := &PriorityQueue{
        queues:  make([]*InMemoryQueue, levels),
        levels:  levels,
        metrics: NewQueueMetrics(),
    }

    for i := 0; i < levels; i++ {
        pq.queues[i] = NewInMemoryQueue(maxSizePerLevel)
    }

    return pq
}

func (pq *PriorityQueue) Enqueue(msg *Message, priority int) bool {
    if priority < 0 || priority >= pq.levels {
        priority = pq.levels - 1 // Default to lowest priority
    }

    success := pq.queues[priority].Enqueue(msg)
    pq.metrics.RecordEnqueue(success, len(msg.Body))
    return success
}

func (pq *PriorityQueue) Dequeue() *Message {
    start := time.Now()

    // Try to dequeue from highest priority first
    for i := 0; i < pq.levels; i++ {
        select {
        case msg := <-pq.queues[i].Subscribe(1):
            processingTime := time.Since(start)
            pq.metrics.RecordDequeue(processingTime)
            return msg
        default:
            // Queue empty, try next priority
        }
    }

    // If we get here, all queues are empty
    // Wait on highest priority queue
    msg := pq.queues[0].Dequeue()
    processingTime := time.Since(start)
    pq.metrics.RecordDequeue(processingTime)
    return msg
}
Enter fullscreen mode Exit fullscreen mode

For applications requiring exactly-once delivery semantics, we need to implement idempotent consumers:

type IdempotentConsumer struct {
    queue       *PersistentQueue
    processedIDs map[string]bool
    processingFn func(*Message) error
    db          *sql.DB
    mutex       sync.RWMutex
}

func NewIdempotentConsumer(queue *PersistentQueue, db *sql.DB, processingFn func(*Message) error) (*IdempotentConsumer, error) {
    consumer := &IdempotentConsumer{
        queue:       queue,
        processedIDs: make(map[string]bool),
        processingFn: processingFn,
        db:          db,
    }

    // Create processed messages table
    _, err := db.Exec(`
        CREATE TABLE IF NOT EXISTS processed_messages (
            id TEXT PRIMARY KEY,
            processed_at INTEGER
        )
    `)
    if err != nil {
        return nil, err
    }

    // Load already processed IDs
    err = consumer.loadProcessedIDs()
    if err != nil {
        return nil, err
    }

    return consumer, nil
}

func (c *IdempotentConsumer) loadProcessedIDs() error {
    rows, err := c.db.Query("SELECT id FROM processed_messages")
    if err != nil {
        return err
    }
    defer rows.Close()

    for rows.Next() {
        var id string
        if err := rows.Scan(&id); err != nil {
            return err
        }
        c.processedIDs[id] = true
    }

    return rows.Err()
}

func (c *IdempotentConsumer) Start(workers int) {
    for i := 0; i < workers; i++ {
        go c.worker()
    }
}

func (c *IdempotentConsumer) worker() {
    messages, acks := c.queue.ConsumeWithAck(ConsumerConfig{
        BatchSize:      100,
        VisibilityTime: time.Minute,
        MaxRetries:     3,
    })

    for msg := range messages {
        if c.isProcessed(msg.ID) {
            // Skip already processed message
            acks <- msg.ID
            continue
        }

        err := c.processingFn(msg)
        if err == nil {
            c.markProcessed(msg.ID)
            acks <- msg.ID
        }
        // If error, don't ack - message will be redelivered
    }
}

func (c *IdempotentConsumer) isProcessed(id string) bool {
    c.mutex.RLock()
    defer c.mutex.RUnlock()
    return c.processedIDs[id]
}

func (c *IdempotentConsumer) markProcessed(id string) error {
    c.mutex.Lock()
    defer c.mutex.Unlock()

    _, err := c.db.Exec(
        "INSERT INTO processed_messages (id, processed_at) VALUES (?, ?)",
        id, time.Now().Unix(),
    )
    if err != nil {
        return err
    }

    c.processedIDs[id] = true
    return nil
}
Enter fullscreen mode Exit fullscreen mode

When implementing message queues in Go, it's important to consider memory usage. Memory profiling can help identify bottlenecks:

func monitorQueueMemoryUsage(queue *PersistentQueue, interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    var memStats runtime.MemStats

    for range ticker.C {
        runtime.ReadMemStats(&memStats)

        log.Printf("Queue memory stats - Alloc: %v MiB, Sys: %v MiB, NumGC: %v",
            memStats.Alloc/1024/1024,
            memStats.Sys/1024/1024,
            memStats.NumGC,
        )

        log.Printf("Queue size: %d messages", queue.Size())
    }
}
Enter fullscreen mode Exit fullscreen mode

For high-throughput systems, ring buffers provide efficient memory management:

type RingBuffer struct {
    buffer     []*Message
    size       int
    capacity   int
    head       int
    tail       int
    mutex      sync.RWMutex
    notEmpty   *sync.Cond
    notFull    *sync.Cond
}

func NewRingBuffer(capacity int) *RingBuffer {
    rb := &RingBuffer{
        buffer:   make([]*Message, capacity),
        capacity: capacity,
        size:     0,
        head:     0,
        tail:     0,
    }
    rb.notEmpty = sync.NewCond(&rb.mutex)
    rb.notFull = sync.NewCond(&rb.mutex)
    return rb
}

func (rb *RingBuffer) Enqueue(msg *Message) bool {
    rb.mutex.Lock()
    defer rb.mutex.Unlock()

    for rb.size == rb.capacity {
        rb.notFull.Wait()
    }

    rb.buffer[rb.tail] = msg
    rb.tail = (rb.tail + 1) % rb.capacity
    rb.size++

    rb.notEmpty.Signal()
    return true
}

func (rb *RingBuffer) Dequeue() *Message {
    rb.mutex.Lock()
    defer rb.mutex.Unlock()

    for rb.size == 0 {
        rb.notEmpty.Wait()
    }

    msg := rb.buffer[rb.head]
    rb.buffer[rb.head] = nil // Help GC
    rb.head = (rb.head + 1) % rb.capacity
    rb.size--

    rb.notFull.Signal()
    return msg
}
Enter fullscreen mode Exit fullscreen mode

To handle backpressure in high-load systems, we can implement rate limiting:

type RateLimitedQueue struct {
    queue      *PersistentQueue
    limiter    *rate.Limiter
}

func NewRateLimitedQueue(queue *PersistentQueue, messagesPerSecond int) *RateLimitedQueue {
    return &RateLimitedQueue{
        queue:   queue,
        limiter: rate.NewLimiter(rate.Limit(messagesPerSecond), messagesPerSecond),
    }
}

func (rlq *RateLimitedQueue) Enqueue(msg *Message) bool {
    if !rlq.limiter.Allow() {
        // Optionally block instead of rejecting
        // rlq.limiter.Wait(context.Background())
        return false
    }

    return rlq.queue.Enqueue(msg)
}
Enter fullscreen mode Exit fullscreen mode

When working with message queues in production, I've found that implementing proper error handling and observability is crucial. Add logging throughout the queue operations to aid in troubleshooting:

func (q *PersistentQueue) Enqueue(msg *Message) bool {
    start := time.Now()

    metadataJSON, err := json.Marshal(msg.Metadata)
    if err != nil {
        log.Printf("Failed to marshal metadata for message %s: %v", msg.ID, err)
        return false
    }

    _, err = q.db.Exec(
        "INSERT INTO messages (id, body, timestamp, metadata) VALUES (?, ?, ?, ?)",
        msg.ID, msg.Body, msg.Timestamp.Unix(), metadataJSON,
    )
    if err != nil {
        log.Printf("Failed to persist message %s: %v", msg.ID, err)
        return false
    }

    success := q.InMemoryQueue.Enqueue(msg)

    duration := time.Since(start)
    log.Printf("Enqueued message %s (success=%v, duration=%v)", msg.ID, success, duration)

    return success
}
Enter fullscreen mode Exit fullscreen mode

I've personally found that message queue performance can degrade over time without proper maintenance. Implementing a cleaner process helps maintain optimal performance:

func startQueueMaintenance(q *PersistentQueue, interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for range ticker.C {
        cleanupStart := time.Now()

        // Remove old processed messages
        result, err := q.db.Exec(
            "DELETE FROM processed_messages WHERE processed_at < ?",
            time.Now().Add(-30*24*time.Hour).Unix(), // 30 days retention
        )
        if err != nil {
            log.Printf("Failed to clean up processed messages: %v", err)
            continue
        }

        rowsAffected, _ := result.RowsAffected()

        // Optimize database
        _, err = q.db.Exec("VACUUM")
        if err != nil {
            log.Printf("Failed to vacuum database: %v", err)
        }

        log.Printf("Queue maintenance completed in %v, removed %d old processed messages",
            time.Since(cleanupStart), rowsAffected)
    }
}
Enter fullscreen mode Exit fullscreen mode

By implementing these patterns and techniques in Go, we can create highly efficient message queue systems tailored to specific application requirements. The language's concurrency model, garbage collection, and performance characteristics make it particularly well-suited for real-time messaging applications.

Whether building chat applications, financial transaction systems, or IoT platforms, a properly implemented message queue in Go provides the foundation for robust, scalable architectures. The code examples I've shared reflect real-world implementations I've refined over years of experience, focusing on performance, reliability, and maintainability.


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Hostinger image

Get n8n VPS hosting 3x cheaper than a cloud solution

Get fast, easy, secure n8n VPS hosting from $4.99/mo at Hostinger. Automate any workflow using a pre-installed n8n application and no-code customization.

Start now

Top comments (0)

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay