DEV Community

Bouchaala Reda
Bouchaala Reda

Posted on

Creating a simple Message Bus: Episode 2

Hello again to this series where I try to build a Message Bus, in the simplest way possible to understand the architecture and how it works.

In the first episode of this series, we talked about what a Message Bus is, why it's useful, and started off with the producer part of the code.

If you haven't read that one, please do as we're going to build on top of it.

Here's a simple diagram to help us see the components (Broker, Consumer, Producer) and how they interact with each other.

Image description


In this episode, I'm going to implement the broker part responsible of handling a new message, and storing it somewhere.

In other words, we're going to fully implement the green lines in the diagram above.

In the next episode however, we're going to tackle the consumers and that's when (hopefully, remember I don't know exactly how things are going... just going with the flow) all the pieces come together.

Enough talk, let's create out broker:

// internal/broker/broker.go

type Broker struct {
    config *BrokerConfig

    // utils
    decoder decoder.Decoder

    // core
    topics map[string]*Topic
}
Enter fullscreen mode Exit fullscreen mode
  • The config field is used to group all configuration in one place instead of having the Broker struct bloated with fields.
  • Our broker needs to be able to "decode" messages sent by the producers, so a Decoder interface is needed, we'll implement it next.
  • The topics field is a map between topic names, and Topic objects. We'll get back to this shortly.

1. Broker config

Here's how the BrokerConfig looks like:

// internal/broker/config.go
type BrokerConfig struct {
    // fields relevant to producers
    ProducerHost string
    ProducerPort string
}
Enter fullscreen mode Exit fullscreen mode

For the time being, it only contains fields relevant to the producer part, but we'll add to it along the way.

2. Decoder

The interface of the decoder is

// internal/shared/decoder/decoder.go

package decoder

import "mbus/internal/apiv1"

type Decoder interface {
    Decode([]byte) (*apiv1.Message, error)
}
Enter fullscreen mode Exit fullscreen mode

And we implement it using msgpack:

// internal/shared/decoder/msgpack.go
package decoder

import (
    "mbus/internal/apiv1"

    "github.com/vmihailenco/msgpack"
)

type MsgpackDecoder struct {
}

func (*MsgpackDecoder) Decode(data []byte) (*apiv1.Message, error) {
    var msg apiv1.Message

    err := msgpack.Unmarshal(data, &msg)
    if err != nil {
        return nil, err
    }

    return &msg, nil
}
Enter fullscreen mode Exit fullscreen mode

3. Topics

Let's zoom inside the broker's internals to understand what map looks like.

Image description

  1. We receive a message from a producer on the "orders" topic (in read life, the message might be a JSON object describing the order).
  2. We route it to the appropriate topic object using our topics map.
  3. We add the message to the topic and diaptch

Note: Dispatch is a term used a lot in both event-based and messaging systems, and it just means delivering the event/message to all its listeners/consumers.

Because we don't have the consumer part of the system build yet, I wanted to just save the messages on the topic object itself until we build the consumer part.

To do that, we need the topic object to have a queue (First In, First Out) that we add messages to.

We can build a simple queue using a Go slice, but it's always best to use language-provided tools and mechanisms in its standard libary. We can use container/list package which implements a doubly-linked list as a queue.

Implementing a queue as a doubly-linked list is better in terms of memory reuse compared to a slice... We're not building Kafka as to worry about memory optimizations but just saying.

Alright. I can hear you say "Enough talk, show me the code."

Let's first implement a queue using container/list:

// internal/shared/queue/queue.go

package queue

import (
    "container/list"
    "mbus/internal/apiv1"
)

type MessageQueue struct {
    list *list.List
}

func New() *MessageQueue {
    return &MessageQueue{
        list: list.New(),
    }
}

func (q *MessageQueue) Enqueue(message *apiv1.Message) {
    q.list.PushBack(message)
}

func (q *MessageQueue) Dequeue() *apiv1.Message {
    if q.list.Len() == 0 {
        return nil
    }

    message := q.list.Front()
    q.list.Remove(message)
    return message.Value.(*apiv1.Message)
}

func (q *MessageQueue) Len() int {
    return q.list.Len()
}
Enter fullscreen mode Exit fullscreen mode

Simple and effective.

Alright, with that done here's our Topic:

// internal/broker/topic.go
package broker

import (
    "mbus/internal/apiv1"
    "mbus/internal/shared/queue"
)

type Topic struct {
    Name  string
    Queue *queue.MessageQueue
}

func NewTopic(name string) *Topic {
    return &Topic{
        Name:  name,
        Queue: queue.New(),
    }
}

func (topic *Topic) Dispatch(message *apiv1.Message) {
    // for now, we only save the message in a queue
    topic.Queue.Enqueue(message)
}
Enter fullscreen mode Exit fullscreen mode

Again, quite simple stuff here.

Now that we have everything ready, let's implement the broker part responsible of handling a new message from producers.

// internal/broker/broker.go

func (broker *Broker) ProducerListen() error {
    listener, err := net.Listen("tcp", net.JoinHostPort(broker.config.ProducerHost, broker.config.ProducerPort))
    if err != nil {
        return err
    }

    for {
        // Accept new connection
        conn, err := listener.Accept()
        if err != nil {
            return err
        }

        // read it
        data, err := io.ReadAll(conn)
        if err != nil {
            return err
        }

        message, err := broker.decoder.Decode(data)
        if err != nil {
            return err
        }

        // do something with the message
        // for now, just print it
        broker.HandleNewMessage(message)

        // close it off
        conn.Close()
    }
}
Enter fullscreen mode Exit fullscreen mode

Alright, let's see what ProducerListen does:

  1. It creates a listener on the host/port combination provided in the config (we'll get them from command-line arguments once we implement the cmd for the broker)
  2. We start by accepting a connection on the TCP listener we created. Mind you, this call will block (i.e. put our program to sleep) until a new connection is established.
  3. After that, we read all data from that connection
  4. We decode the raw data into a apiv1.Message object
  5. We call HandleNewMessage with the message (we'll work on that function shortly)
  6. Close off the connection.
  7. And we keep doing steps 2-6 over and over again.

Our ProducerListen call is blocking in two ways:

  • The call to listener.Accept will block the main thread (the main application path) so our broker won't be able to do anything else but to wait as well. That's kind of bad because we can't send/receive data to/from consumers.
  • After we get a connection, you can see that we do a bunch of operations on it: read it, decode it and handle it. During this time, we can't receive new messages from producers because our main thread is busi handling the message we got. So if all those steps take 1 second to finish, we can only around receive 1 message per second. Our broker throughput is not its selling point.

Both of these points can be solved by introducing goroutines which allow us to take some of the work to the "background" and not halt the entire application when doing a blocking operation.

Introducing goroutines however, will complicate things a bit because we will need to protect ourselves from race conditions using synchronization services such as a Mutex or Semaphore.

But I said in the begenning of these series: everything will be as simple as it can be.

And I meant it. I want to avoid goroutines as much as possible, unless we need to.

Our code is trivial and easy to understand that way. Which is the goal of this series anyway.

Once we finalize everything we can gradually start making things better and better.

Alright, the HandleNewMessage is quite simple:

// internal/broker/broker.go
func (broker *Broker) HandleNewMessage(message *apiv1.Message) {
    // get the topic
    topic, exists := broker.topics[message.Topic]

    // if it does not exist, create it
    if exists == false || topic == nil {
        log.Printf("creating topic %s", message.Topic)
        broker.topics[message.Topic] = NewTopic(message.Topic)
    }

    // add the message to the topic
    log.Printf("new message dispatched on topic %s", message.Topic)
    broker.topics[message.Topic].Dispatch(message)
}
Enter fullscreen mode Exit fullscreen mode
  1. We test whether we have a topic registerd with the message topic
  2. If not, we create one using NewTopic
  3. We call Dispatch on the topic.

If you recall from earlier, our Dispatch function from the Topic class only adds the message to a queue.

Putting it all together

We got all the pieces, now we just need to glue them together with a command-line program:

// cmd/broker/broker.go

package main

import (
    "flag"
    "log"
    "mbus/internal/broker"
)

var (
    produceHost string
    producePort string
)

func main() {
    parseFlags()

    config := &broker.BrokerConfig{
        ProducerHost: produceHost,
        ProducerPort: producePort,
    }

    broker, err := broker.New(config)
    if err != nil {
        log.Fatalf(err.Error())
    }

    // Listen for producers sending in messages
    log.Println("Broker: start listening for incoming producer messages...")
    err = broker.ProducerListen()
    if err != nil {
        log.Fatalf(err.Error())
    }
}

func parseFlags() {
    flag.StringVar(&produceHost, "phost", "127.0.0.1", "The host to listen to for producer messages")
    flag.StringVar(&producePort, "pport", "9990", "The port to listen to for producer messages")

    flag.Parse()
}
Enter fullscreen mode Exit fullscreen mode
  1. We parse command-line flags (host and port where producers send messages to)
  2. We create a BrokerConfig object with the config we got.
  3. We create a broker instance with our config object
  4. Then we start listening for new messages!

Testing it out

At the end of each episode, we try to test our code to see if it works.

We do manual testing here. Nice.

Let's add this piece of code to our broker:

// internal/broker/broker.go

func (broker *Broker) TestingThisOut() {
    for _, topic := range broker.topics {
        if topic == nil {
            continue
        }

        for topic.Queue.Len() > 0 {
            message := topic.Queue.Dequeue()

            log.Printf("Received new message from topic '%s': '%s'", topic.Name, string(message.Data))
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This will allow us to print any new messages we have in the queue of any topic that has any.

And let's add this to our broker command-line, between creating the broker and ProducerListen:

// cmd/broker/broker.go
// broker.New call

    ticker := time.Tick(3 * time.Second)
    go func(ticker <-chan time.Time) {
        for {
            select {
            case <-ticker:
                broker.TestingThisOut()
            }
        }
    }(ticker)

// calling ProducerListen here.
Enter fullscreen mode Exit fullscreen mode

This code, will run forever in the background, periodically every 3 seconds, and will call TestingThisOut function (well thought of name) which will print messages stored in queues of topics.

Now we're ready to test.

Open two terminals. Make sure you run make to rebuild the project.

Run the broker in one of them:

./build/broker
Enter fullscreen mode Exit fullscreen mode

And send a message in the second terminal

./build/producer -topic orders -message "look ma! a new order"
Enter fullscreen mode Exit fullscreen mode

Get back to the terminal where you ran your broker, and you should see something like this:

2024/06/02 10:39:43 Broker: start listening for incoming producer messages...
2024/06/02 10:39:52 creating topic orders
2024/06/02 10:39:52 new message dispatched on topic orders
2024/06/02 10:39:55 Received message from topic orders: look ma! a new order
Enter fullscreen mode Exit fullscreen mode

And that concludes this episode.

See you in the next one!

Top comments (0)