DEV Community

Bouchaala Reda
Bouchaala Reda

Posted on • Updated on

Creating a simple Message Bus: Episode 1

In my perpetual quest to be a better engineer and to understand tools/architectures better, I decided to start building stuff.

Build a message bus, database, reverse proxy... etc.
Whatever. Just build something I'm interested in learning more of.

To not think about this as a huge task, I decided to commit myself to build stuff in the simplest way possible. No fancy shenanigans.

Start small and simple. Add more along the way.

I'll be working with Go, not because I'm a Go expert, but because I like it and I feel like it helps with my productivity. I'll probably be learning more about Go along the way. Two birds with one stone kind of thing.

I also want to point out, that I write a post after I'm done with a portion of the code, sort of journaling my way through it.

which means that, the code could be incomplete or does not work (as I'm writing this, I'm thinking but that's what tests are for, but lets leave the tests for some other time). And it also means I'll probably be jumping between files a lot.


I wanted to start with a message bus.

Let's define it and start this series of posts by creating the project structure and maybe a bit more.

A message bus, is a messaging system that allows different systems to communicate with each other via sending and receiving of these messages.

So this message bus, is a system (also called the broker) that allows senders (also called producers) to send messages (just data, it could contain anything) to receivers (also called consumers).

In other words,

  1. A producer prepares a messages, points it to the broker and says "here, deliver this message please" to this destination
  2. The broker gets the message and delivers it to one or more consumers that are subscribing to said destination.

Message Bus
(image source: here)

Project layout

So we have three actors: a broker, consumer and producer.

Let's start by creating an empty project structure. I'll call the go module mbus. Short and nice.

# Create the dir and cd into it
mkdir mbus
cd mbus

# Create the go module
go mod init mbus

# Create the project layout
mkdir cmd internal build
mkdir cmd/{broker,producer,consumer}
mkdir internal/{broker,producer,consumer}
Enter fullscreen mode Exit fullscreen mode

Our base project layout is created.

To make our lives easier, let's create a very simple Makefile

all: clean build

.PHONY: build
build:
    go build -o build/broker cmd/broker/broker.go
    go build -o build/producer cmd/producer/producer.go
    go build -o build/consumer cmd/consumer/consumer.go

.PHONY: clean
clean:
    rm -f build/broker
    rm -f build/consumer
    rm -f build/producer
Enter fullscreen mode Exit fullscreen mode

So running make in the command line will rebuild our project. You could use something like gowatch, but again I'm keeping it simple.

Message structure

Let's define what "message" is in our application.

  1. It needs to have some data, it could be json, it could be Base64 encoded image... we don't know and we don't care.
  2. It needs to have some sort of destination name, for us to know where to send it to. In the "message bus" world, it's often called a "topic" or a "routing key" if you want to sound like a real nerd. I like "routing key" but let's use topic since it's shorter.

The message will be our contract between all parties, so let's call it apiv1 and put it inside internal, like so

mkdir internal/apiv1
touch internal/apiv1/message.go
Enter fullscreen mode Exit fullscreen mode
// internal/apiv1/message.go

package apiv1

type Message struct {
    Data  []byte
    Len   int
    Topic string
}

func NewMessage(topic string, data []byte) *Message {
    return &Message{
        Data:  data,
        Len:   len(data),
        Topic: topic,
    }
}
Enter fullscreen mode Exit fullscreen mode

Nice and simple.

The Len field is something we might not use, but when dealing with slices it's always a good idea to keep the length of it around. We'll see, if we don't need it we can just remove it later on.

Now, let's create the "producer" part of the app and call it a day.

Producing messages

If you remember from our intro, a producer is very simple: it has a message and a topic, and it just sends them off to a broker.

Knowing that, let's create a command line app that will accept a host and port pair to point it to the broker, a topic and a message.

// cmd/producer/producer.go
package main

import (
    "flag"
    "log"

    "mbus/internal/producer"
)

var (
    brokerHost string
    brokerPort string
    topic      string
    message    string
)

func main() {
    parseFlags()

    client := producer.New(brokerHost, brokerPort)

    err := client.Publish(topic, message)
    if err != nil {
        log.Fatalf(err.Error())
    }
}

func parseFlags() {
    flag.StringVar(&brokerHost, "host", "127.0.0.1", "Broker host")
    flag.StringVar(&brokerPort, "port", "9990", "Broker port")
    flag.StringVar(&topic, "topic", "", "Topic to produce the message for")
    flag.StringVar(&message, "message", "", "The message contents")

    flag.Parse()

    if topic == "" {
        log.Fatalf("please provide a topic")
    }

    if message == "" {
        log.Fatalf("please provide a message to be sent")
    }
}
Enter fullscreen mode Exit fullscreen mode
  1. Parsing flags to get command line arguments,
  2. Creating a client by using mbus/internal/producer package (which we'll create after this)
  3. Publishing the message to the topic, using the client.

The interesting stuff is at internal/producer/producer.go which we'll create in a minute, first I want to show you what a Producer looks like.

// internal/producer/producer.go
type Producer struct {
    host string
    port string

    conn net.Conn

    encoder encoder.Encoder
}
Enter fullscreen mode Exit fullscreen mode
  • The first two fields are there to know where the broker is in our network.
  • The second field, represents the TCP connection to the broker.
  • The next one is the encoder. More on this bellow.

In order for us to send a Message object down the wire, we need to properly encode it to binary. We have a bunch of options in Go, but I'll go with msgpack. (Offical Website)

The encoder.Encoder is an interface so we can swap out the msgpack implementation with another one.

I'm used to a lot of OOP so that encoded is embedded inside the publisher (composition), but I realize that maybe that's not the best way to things all the time.

But it works for now, so let's leave it be.

// Creating a shared folder for all shared things
mkdir -p internal/shared/encoder
Enter fullscreen mode Exit fullscreen mode

The Encoder interface is pretty simple:

// internal/shared/encoder/encoder.go

package encoder

import "mbus/internal/apiv1"

type Encoder interface {
    Encode(*apiv1.Message) ([]byte, error)
}
Enter fullscreen mode Exit fullscreen mode

Let's create a msgpack encoder, but first let's install the msgpack package:

go get -u github.com/vmihailenco/msgpack
Enter fullscreen mode Exit fullscreen mode
// internal/shared/encoder/msgpack.go
package encoder

import (
    "mbus/internal/apiv1"

    "github.com/vmihailenco/msgpack"
)

type MsgpackEncoder struct {
}

func (e *MsgpackEncoder) Encode(msg *apiv1.Message) ([]byte, error) {
    data, err := msgpack.Marshal(msg)
    if err != nil {
        return nil, err
    }

    return data, nil
}
Enter fullscreen mode Exit fullscreen mode

Pretty simple stuff.

Now let's get back to our producer by creating a constructor method:

// internal/producer/producer.go

func New(host, port string) *Producer {
    return &Producer{
        host:    host,
        port:    port,
        conn:    nil,
        encoder: &encoder.MsgpackEncoder{},
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, we create a new Publisher and use MsgpackEncoder for decoding.

Now, let's add a method to the Publisher so we can start publishing messages:

// internal/producer/producer.go
func (c *Producer) Publish(topic, message string) error {
    // we could connect in the New function before returning
    // but it's better to defer it and call it here, whenever
    // the user tries to publish a message.
    err := c.connect()
    if err != nil {
        return err
    }

    msg := apiv1.NewMessage(topic, []byte(message))

    data, err := c.encoder.Encode(msg)
    if err != nil {
        return err
    }

    n, err := c.conn.Write(data)
    if err != nil {
        return err
    }

    if n != len(data) {
        return errors.New("could not write all data")
    }

    return nil
}

func (c *Producer) connect() error {
    conn, err := net.Dial("tcp", net.JoinHostPort(c.host, c.port))
    if err != nil {
        return nil
    }

    c.conn = conn
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Again very simple.

We connect to the broker, create a Message object, encode it, and send it to the broker using the connection established.

That's it. Producer part done. I told you the producer is the easiest one.

Next one will be the broker.

But first, let's at least manually test (since we don't have unit tests, lazy me) that our producer is actually sending stuff somewhere.

For that, we can use netcat for this. Run this command in another terminal:

nc -l -p 9990 -t 127.0.0.1
Enter fullscreen mode Exit fullscreen mode

This will tell netcat (nc) to listen for TCP connections, on 127.0.0.1 port 9990. Kind of like a temporary test double for our broker 😁

Now, let's compile our app and run the producer:

make
./build/producer -topic sales -message hey
Enter fullscreen mode Exit fullscreen mode

You should see something printed on the terminal where you ran nc

Done, test coverage 100%.

Jokes aside, we'll probably add tests in another episode.

But for now, we'll call it a day.


Like I said at the start of this post, I'm just starting out with this so I don't really know where I'm going with this, and that's part of the fun. But it also means you could find the code not working or incomplete.

In any way, if you find a mistake or have some feedback, I'd love to hear it.

until then, see you in another episode!

Top comments (1)

Collapse
 
zendyani profile image
Abdeldjalil

Awsome, thanks for sharing.