I's been a while since the last post, hey! Let's waste no time and dive straight in.
This is the one everything has been building toward.
We have a producer that sends messages to the broker. We have a broker that stores them in topic queues. All that's missing is the consumer — the thing that actually reads those messages.
Let's build it.
The Consumer
// internal/consumer/consumer.go
type Consumer struct {
host string
port string
topic string
}
Simple. It knows where the broker is and which topic it cares about.
The interesting part is Subscribe:
func (c *Consumer) Subscribe() ([]string, error) {
conn, err := net.Dial("tcp", net.JoinHostPort(c.host, c.port))
if err != nil {
return nil, err
}
defer conn.Close()
fmt.Fprintf(conn, "%s\n", c.topic)
var messages []string
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
messages = append(messages, scanner.Text())
}
return messages, nil
}
- We connect to the broker.
- We send the topic name we want to subscribe to, terminated with a newline.
- We read back whatever the broker sends us, line by line.
- We return the messages.
That's the entire consumer. Pretty much the mirror image of the producer.
Teaching the Broker About Consumers
The broker needs a new port for consumers to connect to. Let's add that to the config first:
// internal/broker/config.go
type BrokerConfig struct {
ProducerHost string
ProducerPort string
ConsumerHost string
ConsumerPort string
}
And now ConsumerListen:
// internal/broker/broker.go
func (broker *Broker) ConsumerListen() error {
listener, err := net.Listen("tcp", net.JoinHostPort(broker.config.ConsumerHost, broker.config.ConsumerPort))
if err != nil {
return err
}
for {
conn, err := listener.Accept()
if err != nil {
return err
}
scanner := bufio.NewScanner(conn)
scanner.Scan()
topicName := scanner.Text()
log.Printf("consumer subscribed to topic %s", topicName)
broker.mu.Lock()
topic, exists := broker.topics[topicName]
if !exists || topic == nil {
broker.mu.Unlock()
conn.Close()
continue
}
for topic.Queue.Len() > 0 {
message := topic.Queue.Dequeue()
fmt.Fprintf(conn, "%s\n", string(message.Data))
}
broker.mu.Unlock()
conn.Close()
}
}
- We listen on the consumer port.
- A consumer connects and sends us a topic name.
- We look up the topic.
- If it exists, we dequeue all its messages and write them back, one line each.
- We close the connection and wait for the next consumer.
Remember When I Said No Goroutines?
Yeah, about that.
ProducerListen and ConsumerListen both block forever in a loop. We need both running at the same time, which means we finally have to reach for a goroutine.
But that brings a problem: both methods touch broker.topics, and doing that from multiple goroutines without protection is a data race. So we add a sync.Mutex to the broker:
// internal/broker/broker.go
type Broker struct {
config *BrokerConfig
decoder decoder.Decoder
topics map[string]*Topic
mu sync.Mutex
}
And we lock around every access to topics — in HandleNewMessage and ConsumerListen.
I said I'd avoid goroutines as much as possible, and we did. But this one we actually need.
Putting it all together
In the broker CLI, we run ProducerListen in a goroutine and let ConsumerListen block the main thread:
// cmd/broker/broker.go
go func() {
log.Println("Broker: start listening for incoming producer messages...")
err := b.ProducerListen()
if err != nil {
log.Fatalf(err.Error())
}
}()
log.Println("Broker: start listening for incoming consumer messages...")
err = b.ConsumerListen()
if err != nil {
log.Fatalf(err.Error())
}
Two new flags: -chost (default 127.0.0.1) and -cport (default 9991).
And the consumer CLI is as straightforward as the producer:
// cmd/consumer/consumer.go
client := consumer.New(brokerHost, brokerPort, topic)
messages, err := client.Subscribe()
if err != nil {
log.Fatalf(err.Error())
}
for _, message := range messages {
log.Printf("[%s] %s", topic, message)
}
Flags: -host, -port (default 9991), -topic (required).
Testing it out
Open three terminals. Rebuild first:
make
Start the broker:
./build/broker
Send a few messages from the second terminal:
./build/producer -topic orders -message "first order"
./build/producer -topic orders -message "second order"
./build/producer -topic orders -message "third order"
Now consume them from the third terminal:
./build/consumer -topic orders
You should see:
2024/06/10 11:00:01 [orders] first order
2024/06/10 11:00:01 [orders] second order
2024/06/10 11:00:01 [orders] third order
All the pieces, finally together.
What's next
This is a working message bus, but a very basic one. A few interesting directions to take it:
- A consumer that keeps listening instead of pulling once and exiting (push vs. pull)
- Multiple consumers on the same topic
- Persistence: right now messages are gone when the broker restarts
For now though, this is exactly what I set out to build. Something short and simple, something I understand end to end.
But I doubt I'll end the series here, we'll tackle the points above (especially number 1) in future posts for sure, so stay tuned!
Hope it was useful in any way. See you in the next one!
Top comments (0)