DEV Community

Bouchaala Reda
Bouchaala Reda

Posted on

Creating a simple Message Bus: Episode 3

I's been a while since the last post, hey! Let's waste no time and dive straight in.

This is the one everything has been building toward.

We have a producer that sends messages to the broker. We have a broker that stores them in topic queues. All that's missing is the consumer — the thing that actually reads those messages.

Let's build it.

The Consumer

// internal/consumer/consumer.go

type Consumer struct {
    host  string
    port  string
    topic string
}
Enter fullscreen mode Exit fullscreen mode

Simple. It knows where the broker is and which topic it cares about.

The interesting part is Subscribe:

func (c *Consumer) Subscribe() ([]string, error) {
    conn, err := net.Dial("tcp", net.JoinHostPort(c.host, c.port))
    if err != nil {
        return nil, err
    }
    defer conn.Close()

    fmt.Fprintf(conn, "%s\n", c.topic)

    var messages []string
    scanner := bufio.NewScanner(conn)
    for scanner.Scan() {
        messages = append(messages, scanner.Text())
    }

    return messages, nil
}
Enter fullscreen mode Exit fullscreen mode
  1. We connect to the broker.
  2. We send the topic name we want to subscribe to, terminated with a newline.
  3. We read back whatever the broker sends us, line by line.
  4. We return the messages.

That's the entire consumer. Pretty much the mirror image of the producer.

Teaching the Broker About Consumers

The broker needs a new port for consumers to connect to. Let's add that to the config first:

// internal/broker/config.go

type BrokerConfig struct {
    ProducerHost string
    ProducerPort string

    ConsumerHost string
    ConsumerPort string
}
Enter fullscreen mode Exit fullscreen mode

And now ConsumerListen:

// internal/broker/broker.go

func (broker *Broker) ConsumerListen() error {
    listener, err := net.Listen("tcp", net.JoinHostPort(broker.config.ConsumerHost, broker.config.ConsumerPort))
    if err != nil {
        return err
    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            return err
        }

        scanner := bufio.NewScanner(conn)
        scanner.Scan()
        topicName := scanner.Text()

        log.Printf("consumer subscribed to topic %s", topicName)

        broker.mu.Lock()
        topic, exists := broker.topics[topicName]
        if !exists || topic == nil {
            broker.mu.Unlock()
            conn.Close()
            continue
        }

        for topic.Queue.Len() > 0 {
            message := topic.Queue.Dequeue()
            fmt.Fprintf(conn, "%s\n", string(message.Data))
        }
        broker.mu.Unlock()

        conn.Close()
    }
}
Enter fullscreen mode Exit fullscreen mode
  1. We listen on the consumer port.
  2. A consumer connects and sends us a topic name.
  3. We look up the topic.
  4. If it exists, we dequeue all its messages and write them back, one line each.
  5. We close the connection and wait for the next consumer.

Remember When I Said No Goroutines?

Yeah, about that.

ProducerListen and ConsumerListen both block forever in a loop. We need both running at the same time, which means we finally have to reach for a goroutine.

But that brings a problem: both methods touch broker.topics, and doing that from multiple goroutines without protection is a data race. So we add a sync.Mutex to the broker:

// internal/broker/broker.go

type Broker struct {
    config  *BrokerConfig
    decoder decoder.Decoder
    topics  map[string]*Topic
    mu      sync.Mutex
}
Enter fullscreen mode Exit fullscreen mode

And we lock around every access to topics — in HandleNewMessage and ConsumerListen.

I said I'd avoid goroutines as much as possible, and we did. But this one we actually need.

Putting it all together

In the broker CLI, we run ProducerListen in a goroutine and let ConsumerListen block the main thread:

// cmd/broker/broker.go

go func() {
    log.Println("Broker: start listening for incoming producer messages...")
    err := b.ProducerListen()
    if err != nil {
        log.Fatalf(err.Error())
    }
}()

log.Println("Broker: start listening for incoming consumer messages...")
err = b.ConsumerListen()
if err != nil {
    log.Fatalf(err.Error())
}
Enter fullscreen mode Exit fullscreen mode

Two new flags: -chost (default 127.0.0.1) and -cport (default 9991).

And the consumer CLI is as straightforward as the producer:

// cmd/consumer/consumer.go

client := consumer.New(brokerHost, brokerPort, topic)

messages, err := client.Subscribe()
if err != nil {
    log.Fatalf(err.Error())
}

for _, message := range messages {
    log.Printf("[%s] %s", topic, message)
}
Enter fullscreen mode Exit fullscreen mode

Flags: -host, -port (default 9991), -topic (required).

Testing it out

Open three terminals. Rebuild first:

make
Enter fullscreen mode Exit fullscreen mode

Start the broker:

./build/broker
Enter fullscreen mode Exit fullscreen mode

Send a few messages from the second terminal:

./build/producer -topic orders -message "first order"
./build/producer -topic orders -message "second order"
./build/producer -topic orders -message "third order"
Enter fullscreen mode Exit fullscreen mode

Now consume them from the third terminal:

./build/consumer -topic orders
Enter fullscreen mode Exit fullscreen mode

You should see:

2024/06/10 11:00:01 [orders] first order
2024/06/10 11:00:01 [orders] second order
2024/06/10 11:00:01 [orders] third order
Enter fullscreen mode Exit fullscreen mode

All the pieces, finally together.

What's next

This is a working message bus, but a very basic one. A few interesting directions to take it:

  1. A consumer that keeps listening instead of pulling once and exiting (push vs. pull)
  2. Multiple consumers on the same topic
  3. Persistence: right now messages are gone when the broker restarts

For now though, this is exactly what I set out to build. Something short and simple, something I understand end to end.

But I doubt I'll end the series here, we'll tackle the points above (especially number 1) in future posts for sure, so stay tuned!

Hope it was useful in any way. See you in the next one!

Top comments (0)