DEV Community

Cover image for Building Real-Time Event Broadcasting Systems in Go: A Practical Guide
Aarav Joshi
Aarav Joshi

Posted on

Building Real-Time Event Broadcasting Systems in Go: A Practical Guide

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Real-time event broadcasting has become an essential component in modern applications. From chat applications to stock tickers, from IoT platforms to monitoring dashboards, the ability to distribute information efficiently to interested parties is crucial. In Go, we can build highly efficient event broadcasting systems that take advantage of the language's concurrent features.

I've spent considerable time working with event-driven architectures in Go, and I'd like to share my insights on implementing effective real-time event broadcasting systems.

Understanding Event Broadcasting

At its core, event broadcasting is about delivering messages to multiple recipients who have expressed interest in specific types of events. The publisher-subscriber (pub/sub) pattern is the backbone of event broadcasting systems.

In Go, we can implement this pattern elegantly using goroutines and channels. The concurrency primitives built into the language make it particularly well-suited for high-performance event distribution.

Core Components of an Event Broadcasting System

A robust event broadcasting system in Go typically consists of these key components:

  • Event Bus: The central component that manages subscriptions and distributes events
  • Events: Data structures representing the messages being passed
  • Publishers: Components that produce events
  • Subscribers: Components that consume events
  • Topics: Categories that allow filtering of events

Let me walk through building a complete implementation.

Implementing an Event Bus in Go

First, let's define our basic data structures:

type Event struct {
    Topic   string
    Payload interface{}
    Time    time.Time
}

type Subscriber struct {
    ID      string
    Topics  map[string]bool
    Channel chan Event
}

type EventBus struct {
    subscribers map[string]*Subscriber
    topics      map[string]map[string]bool
    mu          sync.RWMutex
    bufferSize  int
}
Enter fullscreen mode Exit fullscreen mode

The Event struct represents individual messages. The Subscriber tracks which topics a client is interested in and provides a channel for receiving events. The EventBus maintains the mapping between topics and subscribers.

Creating a new event bus is straightforward:

func NewEventBus(bufferSize int) *EventBus {
    return &EventBus{
        subscribers: make(map[string]*Subscriber),
        topics:      make(map[string]map[string]bool),
        bufferSize:  bufferSize,
    }
}
Enter fullscreen mode Exit fullscreen mode

The bufferSize parameter determines how many events can be queued for each subscriber before backpressure is applied.

Managing Subscriptions

Adding subscription capability:

func (bus *EventBus) Subscribe(id string, topics []string) (*Subscriber, error) {
    bus.mu.Lock()
    defer bus.mu.Unlock()

    if _, exists := bus.subscribers[id]; exists {
        return nil, fmt.Errorf("subscriber %s already exists", id)
    }

    subscriber := &Subscriber{
        ID:      id,
        Topics:  make(map[string]bool),
        Channel: make(chan Event, bus.bufferSize),
    }

    for _, topic := range topics {
        subscriber.Topics[topic] = true

        if _, exists := bus.topics[topic]; !exists {
            bus.topics[topic] = make(map[string]bool)
        }
        bus.topics[topic][id] = true
    }

    bus.subscribers[id] = subscriber
    return subscriber, nil
}
Enter fullscreen mode Exit fullscreen mode

This method registers a new subscriber and creates a buffered channel for receiving events. It also updates the internal mapping of topics to subscribers.

Equally important is handling unsubscriptions properly:

func (bus *EventBus) Unsubscribe(id string) error {
    bus.mu.Lock()
    defer bus.mu.Unlock()

    subscriber, exists := bus.subscribers[id]
    if !exists {
        return fmt.Errorf("subscriber %s does not exist", id)
    }

    for topic := range subscriber.Topics {
        if topicSubs, exists := bus.topics[topic]; exists {
            delete(topicSubs, id)
            if len(topicSubs) == 0 {
                delete(bus.topics, topic)
            }
        }
    }

    close(subscriber.Channel)
    delete(bus.subscribers, id)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

This method cleans up all references to the subscriber and closes their channel, which is important for preventing goroutine leaks.

Publishing Events

The core of our system is the event publishing mechanism:

func (bus *EventBus) Publish(topic string, payload interface{}) {
    event := Event{
        Topic:   topic,
        Payload: payload,
        Time:    time.Now(),
    }

    bus.mu.RLock()
    defer bus.mu.RUnlock()

    topicSubs, exists := bus.topics[topic]
    if !exists {
        return
    }

    for subID := range topicSubs {
        subscriber := bus.subscribers[subID]
        select {
        case subscriber.Channel <- event:
            // Event was sent successfully
        default:
            // Channel is full, event is dropped
            fmt.Printf("Event dropped for subscriber %s: buffer full\n", subID)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This method creates an event and distributes it to all interested subscribers. Notice the use of a select statement with a default case - this ensures that publishing is non-blocking, and slow consumers won't stall the entire system.

Handling Events on the Subscriber Side

Subscribers need to process the events they receive:

func handleEvents(ctx context.Context, subscriber *Subscriber) {
    for {
        select {
        case event, ok := <-subscriber.Channel:
            if !ok {
                fmt.Printf("Subscriber %s channel closed\n", subscriber.ID)
                return
            }
            fmt.Printf("Subscriber %s received: %s - %v\n", 
                subscriber.ID, event.Topic, event.Payload)
        case <-ctx.Done():
            fmt.Printf("Subscriber %s context done\n", subscriber.ID)
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This function continuously listens for events and processes them. The context provides a way to gracefully shut down event processing.

Putting It All Together

Here's a simple application that demonstrates the event bus in action:

func main() {
    bus := NewEventBus(10)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Create subscribers
    sub1, _ := bus.Subscribe("client1", []string{"orders", "notifications"})
    sub2, _ := bus.Subscribe("client2", []string{"orders"})
    sub3, _ := bus.Subscribe("admin", []string{"orders", "notifications", "system"})

    // Start handling events
    go handleEvents(ctx, sub1)
    go handleEvents(ctx, sub2)
    go handleEvents(ctx, sub3)

    // Publish events
    bus.Publish("orders", "New order #12345")
    bus.Publish("notifications", "System maintenance in 10 minutes")
    bus.Publish("system", "CPU usage: 80%")

    // Allow time for event processing
    time.Sleep(100 * time.Millisecond)

    // Unsubscribe a client
    bus.Unsubscribe("client2")

    // Publish more events
    bus.Publish("orders", "Order #12345 updated")

    time.Sleep(100 * time.Millisecond)
}
Enter fullscreen mode Exit fullscreen mode

Scaling for Production

The basic implementation provided is a good starting point, but production systems often need additional features:

Persistent Event Storage

For critical systems, we might want to store events in a database:

func (bus *EventBus) PublishWithStorage(topic string, payload interface{}, storage EventStorage) {
    event := Event{
        Topic:   topic,
        Payload: payload,
        Time:    time.Now(),
    }

    // Store event before publishing
    err := storage.StoreEvent(event)
    if err != nil {
        // Handle storage error
        return
    }

    // Continue with normal publishing
    bus.mu.RLock()
    defer bus.mu.RUnlock()
    // ... rest of publish logic
}
Enter fullscreen mode Exit fullscreen mode

Dead Letter Queue

To handle events that can't be delivered, we can implement a dead letter queue:

func (bus *EventBus) Publish(topic string, payload interface{}) {
    // ... existing code

    for subID := range topicSubs {
        subscriber := bus.subscribers[subID]
        select {
        case subscriber.Channel <- event:
            // Event was sent successfully
        default:
            // Send to dead letter queue
            bus.deadLetterQueue <- DeadLetter{
                Event:        event,
                SubscriberID: subID,
                Reason:       "Buffer full",
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Distributed Event Bus

For scaling beyond a single process, we can integrate with message brokers:

type DistributedEventBus struct {
    localBus  *EventBus
    redisConn redis.Conn
}

func (dbus *DistributedEventBus) Publish(topic string, payload interface{}) {
    event := Event{
        Topic:   topic,
        Payload: payload,
        Time:    time.Now(),
    }

    // Publish locally
    dbus.localBus.Publish(topic, payload)

    // Publish to Redis
    data, _ := json.Marshal(event)
    dbus.redisConn.Do("PUBLISH", topic, data)
}
Enter fullscreen mode Exit fullscreen mode

Performance Considerations

When implementing real-time event broadcasting in Go, performance is often a critical concern. I've found these optimizations particularly effective:

  1. Use read-write mutexes to allow concurrent reads when possible
  2. Implement event batching for high-throughput scenarios
  3. Consider memory pooling for events to reduce GC pressure
  4. Use buffered channels with appropriate sizing

Here's an example of event batching:

func (bus *EventBus) PublishBatch(events []Event) {
    bus.mu.RLock()
    defer bus.mu.RUnlock()

    // Group events by topic for efficiency
    topicEvents := make(map[string][]Event)
    for _, event := range events {
        topicEvents[event.Topic] = append(topicEvents[event.Topic], event)
    }

    // Process each topic's events
    for topic, evts := range topicEvents {
        topicSubs, exists := bus.topics[topic]
        if !exists {
            continue
        }

        for subID := range topicSubs {
            subscriber := bus.subscribers[subID]
            for _, event := range evts {
                // Try to send, but don't block
                select {
                case subscriber.Channel <- event:
                    // Sent successfully
                default:
                    // Drop event or handle overflow
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Testing Event Broadcasting Systems

Testing concurrent systems can be challenging. Here's an approach I've found effective:

func TestEventBus(t *testing.T) {
    bus := NewEventBus(5)

    // Add subscribers
    sub1, _ := bus.Subscribe("test1", []string{"topic1"})
    sub2, _ := bus.Subscribe("test2", []string{"topic1", "topic2"})

    // Publish events
    bus.Publish("topic1", "message1")
    bus.Publish("topic2", "message2")

    // Check sub1 received only topic1
    select {
    case event := <-sub1.Channel:
        if event.Topic != "topic1" || event.Payload != "message1" {
            t.Errorf("Expected topic1/message1, got %s/%v", event.Topic, event.Payload)
        }
    case <-time.After(100 * time.Millisecond):
        t.Error("Timed out waiting for message on sub1")
    }

    // Check sub1 doesn't receive topic2
    select {
    case event := <-sub1.Channel:
        t.Errorf("Unexpected event received: %v", event)
    case <-time.After(100 * time.Millisecond):
        // This is expected
    }

    // Check sub2 receives both topics
    eventsReceived := 0
    for i := 0; i < 2; i++ {
        select {
        case <-sub2.Channel:
            eventsReceived++
        case <-time.After(100 * time.Millisecond):
            t.Error("Timed out waiting for message on sub2")
        }
    }

    if eventsReceived != 2 {
        t.Errorf("Expected 2 events, got %d", eventsReceived)
    }
}
Enter fullscreen mode Exit fullscreen mode

Real-world Applications

I've implemented event broadcasting systems in various contexts:

  1. Real-time dashboards that update multiple clients simultaneously
  2. Chat applications where messages need to be distributed to chat room participants
  3. IoT platforms that process and distribute sensor data
  4. Financial systems that broadcast price updates to trading terminals

In each case, the core patterns remain similar, but with adaptations for the specific domain requirements.

Error Handling and Reliability

Production-grade event systems need robust error handling:

func (bus *EventBus) PublishWithRetry(topic string, payload interface{}, maxRetries int) {
    var lastErr error
    for i := 0; i <= maxRetries; i++ {
        err := bus.publishWithAck(topic, payload)
        if err == nil {
            return
        }
        lastErr = err
        time.Sleep(time.Duration(i*100) * time.Millisecond) // Exponential backoff
    }

    // Handle ultimate failure
    bus.errorHandler(lastErr, Event{Topic: topic, Payload: payload})
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

Building effective real-time event broadcasting systems in Go leverages many of the language's strengths: concurrency, channels, and efficient handling of I/O. The implementation I've provided offers a solid foundation that can be extended to meet specific requirements.

The key to success lies in carefully managing subscriptions, ensuring non-blocking behavior, and implementing appropriate strategies for handling backpressure. With Go's concurrency primitives, we can create event broadcasting systems that are both simple and highly performant.

I've found that investing time in designing the right event broadcasting architecture pays significant dividends as applications grow. It provides the flexibility to evolve systems while maintaining clean separation between components that produce and consume events.


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Heroku

Deploy with ease. Manage efficiently. Scale faster.

Leave the infrastructure headaches to us, while you focus on pushing boundaries, realizing your vision, and making a lasting impression on your users.

Get Started

Top comments (0)

👋 Kindness is contagious

Explore a trove of insights in this engaging article, celebrated within our welcoming DEV Community. Developers from every background are invited to join and enhance our shared wisdom.

A genuine "thank you" can truly uplift someone’s day. Feel free to express your gratitude in the comments below!

On DEV, our collective exchange of knowledge lightens the road ahead and strengthens our community bonds. Found something valuable here? A small thank you to the author can make a big difference.

Okay