DEV Community

Cover image for Event-Driven Architecture with Go
Joseph Owino
Joseph Owino

Posted on

Event-Driven Architecture with Go

🚀 Building Decoupled Services with NATS and RabbitMQ

Ever wondered how Netflix handles millions of user interactions without everything falling apart? Or how Uber processes thousands of ride requests per second? The secret sauce is Event-Driven Architecture (EDA).

In this article, I will explore how to implement event-driven architecture using Go, with a focus on practical implementation using NATS and RabbitMQ message brokers. I will build a complete notification system that demonstrates real-world patterns and best practices.

📋 Table of Contents

What's Event-Driven Architecture?

Event-driven architecture is a software design pattern where loosely coupled components communicate through the production and consumption of events. Instead of direct service-to-service calls, services publish events when significant business actions occur, and other services react to these events asynchronously.

Synchronous Communication vs Asynchronous Communication

Synchronous communication, like a phone call or instant message, requires participants to be present and interacting in real-time, while asynchronous communication, like email or a forum post, allows for responses to be sent and received at different times

❌ Synchronous (The Old Way):

// Tightly coupled - if email fails, booking fails!
func ProcessBooking(booking *Booking) error {
    // Process booking logic

    if err := emailService.SendConfirmation(booking); err != nil {
        return err // 💥 Booking fails if email fails
    }

    if err := smsService.SendNotification(booking); err != nil {
        return err // 💥 Booking fails if SMS fails  
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

✅ Asynchronous (The New Way):

// Loosely coupled - fire and forget!
func ProcessBooking(booking *Booking) error {
    // Process booking logic

    // Publish event - let others handle their own business
    event := BookingConfirmedEvent{
        BookingID: booking.ID,
        UserID:    booking.UserID,
        Timestamp: time.Now(),
    }

    return eventBus.Publish("booking.confirmed", event)
}
Enter fullscreen mode Exit fullscreen mode

Why Event-Driven Architecture?

🔗 Loose Coupling

Services interact via events without direct dependencies

📈 Better Scalability

Each service can scale independently

🛡️ Improved Resilience

Failures don’t cascade across the system

🧪 Flexibility

New consumers can be added without modifying producers

Building Our First Event-Driven System

Let's build a real-world notification system that handles user bookings like a pro!
The Scenario: We have a booking platform where users can reserve services (think hotel rooms, restaurant tables, or appointment slots). When a booking gets confirmed, we need to:

Send a confirmation email to the user
Send an SMS notification
Update analytics dashboards
Trigger follow-up workflows

The beauty of event-driven architecture is that our booking service doesn't need to know about emails, SMS, or analytics - it just announces "Hey, a booking was confirmed!" and lets other services handle their own responsibilities. No stress, no tight coupling, just clean separation of concerns! 🎯

📝 Event Structure Design

type Event struct {
    ID        string                 `json:"id"`
    Type      string                 `json:"type"`
    Source    string                 `json:"source"`
    Data      map[string]interface{} `json:"data"`
    Timestamp time.Time              `json:"timestamp"`
    Version   string                 `json:"version"`
}

type BookingConfirmedEvent struct {
    BookingID   string    `json:"booking_id"`
    UserID      string    `json:"user_id"`
    Email       string    `json:"email"`
    Phone       string    `json:"phone"`
    ServiceType string    `json:"service_type"`
    Timestamp   time.Time `json:"timestamp"`
}
Enter fullscreen mode Exit fullscreen mode

🏭 Producer Service (Booking Service)

type BookingService struct {
    nc     *nats.Conn
    logger *log.Logger
}

func NewBookingService(nc *nats.Conn) *BookingService {
    return &BookingService{
        nc:     nc,
        logger: log.New(os.Stdout, "[BookingService] ", log.LstdFlags),
    }
}

func (s *BookingService) ConfirmBooking(ctx context.Context, booking *Booking) error {
    // Do the actual booking work
    s.logger.Printf("Confirming booking %s for user %s", booking.ID, booking.UserID)

    // Create the event
    event := &BookingConfirmedEvent{
        BookingID:   booking.ID,
        UserID:      booking.UserID,
        Email:       booking.Email,
        Phone:       booking.Phone,
        ServiceType: booking.ServiceType,
        Timestamp:   time.Now(),
    }

    // Fire and forget! 🚀
    return s.publishEvent(ctx, "booking.confirmed", event)
}

func (s *BookingService) publishEvent(ctx context.Context, subject string, event interface{}) error {
    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("failed to marshal event: %w", err)
    }

    if err := s.nc.Publish(subject, data); err != nil {
        return fmt.Errorf("failed to publish event: %w", err)
    }

    s.logger.Printf("Published event to subject: %s", subject)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

📧 Email Service (Consumer)

type EmailService struct {
    nc           *nats.Conn
    subscription *nats.Subscription
    logger       *log.Logger
}

func NewEmailService(nc *nats.Conn) *EmailService {
    return &EmailService{
        nc:     nc,
        logger: log.New(os.Stdout, "[EmailService] ", log.LstdFlags),
    }
}

func (s *EmailService) Start(ctx context.Context) error {
    sub, err := s.nc.Subscribe("booking.confirmed", s.handleBookingConfirmed)
    if err != nil {
        return fmt.Errorf("failed to subscribe: %w", err)
    }

    s.subscription = sub
    s.logger.Println("Email service started, listening for events...")

    <-ctx.Done()
    return s.subscription.Unsubscribe()
}

func (s *EmailService) handleBookingConfirmed(msg *nats.Msg) {
    var event BookingConfirmedEvent
    if err := json.Unmarshal(msg.Data, &event); err != nil {
        s.logger.Printf("failed to unmarshal event: %v", err)
        return
    }

    // Retry logic with exponential backoff - because stuff fails!
    if err := s.sendEmailWithRetry(context.Background(), &event, 3); err != nil {
        s.logger.Printf("failed to send email after retries: %v", err)
        s.handleFailedEmail(&event, err)
        return
    }

    s.logger.Printf("✅ Email sent successfully for booking %s", event.BookingID)
}

func (s *EmailService) sendEmailWithRetry(ctx context.Context, event *BookingConfirmedEvent, maxRetries int) error {
    var lastErr error
    backoff := time.Second

    for i := 0; i < maxRetries; i++ {
        if err := s.sendEmail(ctx, event); err != nil {
            lastErr = err
            s.logger.Printf("Email attempt %d failed: %v", i+1, err)

            if i < maxRetries-1 {
                time.Sleep(backoff)
                backoff *= 2 // Exponential backoff
            }
            continue
        }
        return nil
    }

    return fmt.Errorf("failed after %d retries: %w", maxRetries, lastErr)
}
Enter fullscreen mode Exit fullscreen mode

📱 SMS Service with Circuit Breaker

Let's add a circuit breaker because we're not savages:

type SMSService struct {
    nc             *nats.Conn
    subscription   *nats.Subscription
    logger         *log.Logger
    circuitBreaker *CircuitBreaker
}

func NewSMSService(nc *nats.Conn) *SMSService {
    return &SMSService{
        nc:             nc,
        logger:         log.New(os.Stdout, "[SMSService] ", log.LstdFlags),
        circuitBreaker: NewCircuitBreaker(5, time.Minute),
    }
}

func (s *SMSService) handleBookingConfirmed(msg *nats.Msg) {
    var event BookingConfirmedEvent
    if err := json.Unmarshal(msg.Data, &event); err != nil {
        s.logger.Printf("failed to unmarshal event: %v", err)
        return
    }

    // Circuit breaker pattern - fail fast when things are broken
    if err := s.circuitBreaker.Execute(func() error {
        return s.sendSMS(context.Background(), &event)
    }); err != nil {
        s.logger.Printf("SMS failed (circuit breaker): %v", err)
        return
    }

    s.logger.Printf("📱 SMS sent successfully for booking %s", event.BookingID)
}
Enter fullscreen mode Exit fullscreen mode

⚡ Circuit Breaker Implementation

Here's a simple but effective circuit breaker:

type CircuitState int

const (
    Closed CircuitState = iota
    Open
    HalfOpen
)

type CircuitBreaker struct {
    mu          sync.RWMutex
    state       CircuitState
    failures    int
    lastFailure time.Time
    threshold   int
    timeout     time.Duration
}

func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        threshold: threshold,
        timeout:   timeout,
        state:     Closed,
    }
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if cb.state == Open {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = HalfOpen
            cb.failures = 0
        } else {
            return fmt.Errorf("circuit breaker is open - failing fast")
        }
    }

    err := fn()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()

        if cb.failures >= cb.threshold {
            cb.state = Open
        }
        return err
    }

    // Success! Reset everything
    cb.failures = 0
    cb.state = Closed
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Message Brokers Comparison

Choosing the right message broker is like choosing the right tool for the job. Let's break it down:

🏎️ NATS - The Speed Demon

Perfect for:

  • Real-time messaging
  • Microservices communication
  • IoT applications
  • When you need FAST

Pros:

  • Blazing fast ⚡
  • Super lightweight
  • Simple to use
  • Built-in clustering

Cons:

  • Limited persistence
  • No message ordering guarantees
  • Fewer routing features
// NATS is dead simple
func connectToNATS() (*nats.Conn, error) {
    return nats.Connect("nats://localhost:4222",
        nats.ReconnectWait(time.Second),
        nats.MaxReconnects(5),
    )
}
Enter fullscreen mode Exit fullscreen mode

🐰 RabbitMQ - The Swiss Army Knife

Perfect for:

  • Complex routing scenarios
  • Enterprise messaging
  • When you need reliability

Pros:

  • Feature-rich
  • Excellent routing
  • Strong durability
  • Dead letter queues

Cons:

  • More complex
  • Higher resource usage
  • Steeper learning curve
// RabbitMQ gives you ALL the features
func setupRabbitMQ() error {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        return err
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        return err
    }
    defer ch.Close()

    // Declare exchange, queues, bindings... lots of setup!
    return ch.ExchangeDeclare("booking_events", "topic", true, false, false, false, nil)
}
Enter fullscreen mode Exit fullscreen mode

📦 Batch Processing for Performance

When you need to process thousands of events efficiently:

type BatchProcessor struct {
    batchSize    int
    batchTimeout time.Duration
    buffer       []Event
    processFn    func([]Event) error
}

func (bp *BatchProcessor) AddEvent(event Event) {
    bp.buffer = append(bp.buffer, event)

    if len(bp.buffer) >= bp.batchSize {
        go bp.processBatch()
    }
}

func (bp *BatchProcessor) processBatch() {
    if err := bp.processFn(bp.buffer); err != nil {
        log.Printf("Batch processing failed: %v", err)
    }
    bp.buffer = bp.buffer[:0] // Clear buffer
}
Enter fullscreen mode Exit fullscreen mode

Testing Strategies

🧪 Unit Testing Your Event Handlers

func TestEmailService_HandleBookingConfirmed(t *testing.T) {
    tests := []struct {
        name      string
        event     BookingConfirmedEvent
        expectErr bool
    }{
        {
            name: "successful email send",
            event: BookingConfirmedEvent{
                BookingID: "123",
                Email:     "user@example.com",
            },
            expectErr: false,
        },
        {
            name: "invalid email",
            event: BookingConfirmedEvent{
                BookingID: "123",
                Email:     "invalid-email",
            },
            expectErr: true,
        },
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            service := NewEmailService(nil)
            err := service.processBookingConfirmed(&tt.event)

            if tt.expectErr && err == nil {
                t.Error("expected error but got none")
            }
            if !tt.expectErr && err != nil {
                t.Errorf("unexpected error: %v", err)
            }
        })
    }
}
Enter fullscreen mode Exit fullscreen mode

🐳 Integration Testing with TestContainers

func TestIntegration_BookingWorkflow(t *testing.T) {
    // Start NATS in a container
    ctx := context.Background()
    natsContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
        ContainerRequest: testcontainers.ContainerRequest{
            Image:        "nats:latest",
            ExposedPorts: []string{"4222/tcp"},
            WaitingFor:   wait.ForListeningPort("4222/tcp"),
        },
        Started: true,
    })
    require.NoError(t, err)
    defer natsContainer.Terminate(ctx)

    // Test the complete workflow
    // ... connect to NATS, send events, verify results
}
Enter fullscreen mode Exit fullscreen mode

Monitoring and Observability

📊 Metrics That Matter

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    eventsPublished = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "events_published_total",
            Help: "Total number of events published",
        },
        []string{"event_type", "service"},
    )

    eventProcessingDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name: "event_processing_duration_seconds",
            Help: "Duration of event processing",
        },
        []string{"event_type", "service"},
    )
)

func (s *EmailService) handleBookingConfirmedWithMetrics(msg *nats.Msg) {
    start := time.Now()
    defer func() {
        eventProcessingDuration.WithLabelValues("booking.confirmed", "email").Observe(time.Since(start).Seconds())
    }()

    // Process the event...
}
Enter fullscreen mode Exit fullscreen mode

🔍 Distributed Tracing

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

func (s *BookingService) ConfirmBookingWithTracing(ctx context.Context, booking *Booking) error {
    tracer := otel.Tracer("booking-service")
    ctx, span := tracer.Start(ctx, "confirm_booking")
    defer span.End()

    span.SetAttributes(
        attribute.String("booking.id", booking.ID),
        attribute.String("booking.user_id", booking.UserID),
    )

    // Your business logic here...

    return nil
}
Enter fullscreen mode Exit fullscreen mode

Wrap Up

Event-driven architecture isn't just a buzzword - it's a proven way to build systems that can handle real-world complexity. Here's what we've covered:

  • Loose coupling - Services don't need to know about each other
  • Better scalability - Scale services independently
  • Fault tolerance - One service failing doesn't bring down everything
  • Real implementation - Working code with proper error handling
  • Testing strategies - Unit tests, integration tests, and contract tests
  • Monitoring - Metrics and tracing for production systems

🎯 Next Steps

  1. Start small - Pick one workflow and convert it to events
  2. Choose your broker - NATS for speed, RabbitMQ for features, Kafka for scale
  3. Add observability - You can't improve what you can't measure
  4. Test everything - Events make testing easier, use that advantage
  5. Scale gradually - Add more consumers as your system grows

🔗 Resources


What's your experience with event-driven architecture? Drop a comment below! Have you tried building something similar? What challenges did you face? Let's discuss! 💬

If this helped you, give it a ❤️ and follow me for more great content!


Top comments (1)

Collapse
 
arturoalvarez profile image
ArturoAlvarez • Edited

En México, Melbet es un casino en línea que ofrece una plataforma casino Melbet estable y bien estructurada, algo que también se valora en sistemas como los desarrollados con arquitectura orientada a eventos. Su diseño facilita la navegación y el acceso rápido a diferentes tipos de juegos.