As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
When I first started working with message queues in Go, I quickly realized that most existing solutions either lacked persistence guarantees or sacrificed performance for durability. Building a custom message queue that balances both requirements became a fascinating challenge that taught me valuable lessons about storage systems and concurrent programming patterns.
Message queues serve as the backbone of modern distributed systems, enabling asynchronous communication between services while providing reliability guarantees. The key to building an effective persistent message queue lies in understanding the trade-offs between write performance, read efficiency, and storage overhead.
Architecture Foundation
The core architecture revolves around segmented storage, where messages are written to append-only log files that rotate based on size thresholds. This approach provides several advantages including efficient sequential writes, simplified cleanup operations, and better cache locality for recent messages.
I designed the system with three primary components: the MessageQueue coordinator, Topic managers, and Segment handlers. Each component has specific responsibilities that minimize lock contention and maximize throughput.
type MessageQueue struct {
mu sync.RWMutex
topics map[string]*Topic
dataDir string
maxSegmentSize int64
retentionPeriod time.Duration
flushInterval time.Duration
messageCounter uint64
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
The MessageQueue struct maintains global state while delegating topic-specific operations to individual Topic instances. This separation allows for fine-grained locking and better concurrency characteristics.
Binary Message Format
Efficient message serialization directly impacts both storage overhead and processing speed. I chose a compact binary format that includes checksums for data integrity while minimizing space usage.
type MessageHeader struct {
CRC uint32
Size uint32
ID uint64
Timestamp int64
TopicLen uint8
DataLen uint32
Retries int32
}
The header contains all necessary metadata for message processing and recovery. The CRC32 checksum ensures data integrity, while the size field enables efficient message boundary detection during reads.
The serialization process carefully orders fields to minimize padding and uses little-endian encoding for consistent cross-platform compatibility:
func (s *Segment) serializeMessage(msg *Message) ([]byte, error) {
topicBytes := []byte(msg.Topic)
totalSize := 4 + 4 + 8 + 8 + 1 + len(topicBytes) + 4 + len(msg.Data) + 4
buf := make([]byte, totalSize)
offset := 0
header := MessageHeader{
Size: uint32(totalSize),
ID: msg.ID,
Timestamp: msg.Timestamp.UnixNano(),
TopicLen: uint8(len(topicBytes)),
DataLen: uint32(len(msg.Data)),
Retries: msg.Retries,
}
binary.LittleEndian.PutUint32(buf[offset+4:], header.Size)
offset += 8
binary.LittleEndian.PutUint64(buf[offset:], header.ID)
offset += 8
binary.LittleEndian.PutUint64(buf[offset:], uint64(header.Timestamp))
offset += 8
buf[offset] = header.TopicLen
offset++
copy(buf[offset:], topicBytes)
offset += len(topicBytes)
binary.LittleEndian.PutUint32(buf[offset:], header.DataLen)
offset += 4
copy(buf[offset:], msg.Data)
offset += len(msg.Data)
binary.LittleEndian.PutUint32(buf[offset:], uint32(header.Retries))
crc := crc32.ChecksumIEEE(buf[4:])
binary.LittleEndian.PutUint32(buf[0:4], crc)
return buf, nil
}
Segment Management
Segments represent individual log files that store messages for a topic. The segment rotation strategy balances file size management with operational efficiency. When a segment reaches the configured size threshold, the system creates a new active segment while maintaining read access to older segments.
func (t *Topic) rotateSegment() error {
if t.activeSegment != nil {
if err := t.activeSegment.close(); err != nil {
return err
}
}
segmentID := uint64(len(t.segments))
segment, err := t.createSegment(segmentID)
if err != nil {
return err
}
t.activeSegment = segment
t.segments = append(t.segments, segment)
return nil
}
Each segment maintains its own file handle and buffered writer for optimal I/O performance. The buffered writing strategy reduces system call overhead while the periodic flushing ensures durability guarantees are met.
func (s *Segment) writeMessage(message *Message) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return fmt.Errorf("segment is closed")
}
data, err := s.serializeMessage(message)
if err != nil {
return fmt.Errorf("failed to serialize message: %w", err)
}
n, err := s.writer.Write(data)
if err != nil {
return fmt.Errorf("failed to write message: %w", err)
}
s.size += int64(n)
s.messageCount++
return nil
}
Consumer Implementation
The consumer model supports multiple concurrent consumers per topic with independent offset tracking. Each consumer maintains its current position within the topic's message stream, enabling parallel processing without message duplication.
type Consumer struct {
id string
topic *Topic
offset uint64
mu sync.Mutex
lastSeen time.Time
}
func (c *Consumer) Consume() (*Message, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.lastSeen = time.Now()
segment, err := c.topic.findSegmentForOffset(c.offset)
if err != nil {
return nil, err
}
if segment == nil {
return nil, io.EOF
}
message, nextOffset, err := segment.readMessageAtOffset(c.offset)
if err != nil {
return nil, err
}
c.offset = nextOffset
return message, nil
}
The consumer implementation tracks the last seen timestamp for monitoring and cleanup purposes. Inactive consumers can be identified and their resources reclaimed through background maintenance processes.
Configuration Options
Flexible configuration options allow the message queue to adapt to different use cases and performance requirements. The functional options pattern provides a clean API for customization:
func WithMaxSegmentSize(size int64) QueueOption {
return func(mq *MessageQueue) {
mq.maxSegmentSize = size
}
}
func WithRetentionPeriod(period time.Duration) QueueOption {
return func(mq *MessageQueue) {
mq.retentionPeriod = period
}
}
func WithFlushInterval(interval time.Duration) QueueOption {
return func(mq *MessageQueue) {
mq.flushInterval = interval
}
}
These options control segment rotation frequency, data retention policies, and durability guarantees. Tuning these parameters based on workload characteristics significantly impacts performance and resource utilization.
Background Maintenance
The maintenance subsystem handles periodic tasks essential for queue health and performance. These include data flushing, segment cleanup, and consumer management operations.
func (mq *MessageQueue) startMaintenance() {
mq.wg.Add(1)
go func() {
defer mq.wg.Done()
ticker := time.NewTicker(mq.flushInterval)
defer ticker.Stop()
for {
select {
case <-mq.ctx.Done():
return
case <-ticker.C:
mq.flush()
mq.cleanup()
}
}
}()
}
The flush operation ensures buffered data reaches persistent storage within the configured interval, providing durability guarantees without sacrificing write performance:
func (mq *MessageQueue) flush() {
mq.mu.RLock()
defer mq.mu.RUnlock()
for _, topic := range mq.topics {
topic.mu.RLock()
if topic.activeSegment != nil {
topic.activeSegment.mu.Lock()
if !topic.activeSegment.closed {
topic.activeSegment.writer.Flush()
topic.activeSegment.file.Sync()
}
topic.activeSegment.mu.Unlock()
}
topic.mu.RUnlock()
}
}
Cleanup and Retention
The cleanup process implements the retention policy by removing segments older than the configured threshold. This prevents unbounded disk usage while maintaining recent message availability:
func (mq *MessageQueue) cleanup() {
cutoff := time.Now().Add(-mq.retentionPeriod)
mq.mu.RLock()
defer mq.mu.RUnlock()
for _, topic := range mq.topics {
topic.mu.Lock()
var activeSegments []*Segment
for _, segment := range topic.segments {
if segment.created.After(cutoff) {
activeSegments = append(activeSegments, segment)
} else {
segment.close()
os.Remove(segment.file.Name())
}
}
topic.segments = activeSegments
topic.mu.Unlock()
}
}
Performance Considerations
Several design decisions optimize performance for high-throughput scenarios. Sequential writes to append-only logs maximize disk throughput while buffered I/O reduces system call overhead. The segmented approach enables parallel operations across different segments and topics.
Lock granularity balances concurrency with consistency requirements. Topic-level locks allow independent operation of different topics while segment-level locks enable concurrent reads from multiple segments within a topic.
Memory usage remains predictable through careful buffer management and prompt resource cleanup. The system avoids loading entire segments into memory, instead reading messages on demand with efficient offset tracking.
Error Handling and Recovery
Robust error handling ensures system reliability in the face of various failure modes. CRC checksums detect data corruption while careful file operations prevent partial writes from corrupting the message stream.
The graceful shutdown process ensures all buffered data reaches persistent storage before termination:
func (mq *MessageQueue) Close() error {
mq.cancel()
mq.wg.Wait()
mq.mu.Lock()
defer mq.mu.Unlock()
for _, topic := range mq.topics {
topic.mu.Lock()
for _, segment := range topic.segments {
segment.close()
}
topic.mu.Unlock()
}
return nil
}
Production Readiness
This implementation provides a solid foundation for production message queuing systems. The modular design enables easy extension for features like message priorities, dead letter queues, and distributed coordination.
Monitoring integration points allow tracking of key metrics including message throughput, consumer lag, and segment statistics. These metrics prove essential for operational visibility and capacity planning.
The persistent storage approach ensures message durability across system restarts while the efficient binary format minimizes storage overhead and processing costs. This combination makes it suitable for both high-volume transactional workloads and long-term message retention scenarios.
Building a high-performance persistent message queue requires careful attention to storage patterns, concurrency control, and operational requirements. This implementation demonstrates how thoughtful design decisions create a system that delivers both performance and reliability for demanding production environments.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)