DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

Learning Golang (some rough notes) - S02E02 - Adding error handling to the Producer

I looked last time at the very bare basics of writing a Kafka producer using Go. It worked, but only with everything lined up and pointing the right way. There was no error handling of any sorts. Let’s see about fixing this now.

A bit of code tidying

To make the code more readable to me, I split out the configuration into a new variable that’s then passed to the NewProducer, so instead of the more compact but possibly less readable

p, _ := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092"})
Enter fullscreen mode Exit fullscreen mode

I have this:

c := kafka.ConfigMap{
    "boostrap.servers": "localhost:9092"}

p, _ := kafka.NewProducer(&c)
Enter fullscreen mode Exit fullscreen mode

Catching errors from NewProducer()

Instead of p, _ when invoking NewProducer (the _ denoting an empty placeholder variable), we supply a variable into which the error can be stored:

p, e := kafka.NewProducer(&c)
Enter fullscreen mode Exit fullscreen mode

Now we can check it and report back if there’s a problem:

if e != nil {
    fmt.Printf("😢Oh noes, there's an error creating the Producer! %v", e)
} else {
    // do producing stuff
}
Enter fullscreen mode Exit fullscreen mode

If I make a deliberate mistake so that the NewProducer returns an error we get this

😢 Oh noes, there's an error creating the Producer! No such configuration property: "boostrap.servers"
Enter fullscreen mode Exit fullscreen mode

Be Assertive!

We can take this a step further and look at the type of error that’s returned. Whilst e is a generic interface for holding an error, we can try casting it to a kafka.Error. Why would we want to do this? Well, kafka.Error exposes properties including an ErrorCode that describes the type of error. From that code we can handle the error in a more useful way than just dumping it to the screen. For example, if it’s an error about configuration properties (as in the example above) we could tell the user where to find the reference information for this; but including that in an error to the user if it’s not a problem in this area would be redundant (and possibly confusing).

To cast it to a kafka.Error we use a type assertion:

ke := e.(kafka.Error)
Enter fullscreen mode Exit fullscreen mode

We also check for success when we do it (just in case it’s not actually a kafka.Error)

if ke, ok := e.(kafka.Error); ok==true {
    // it's a kafka.Error
} else {
    // it's an error, but not a kafka.Error
}
Enter fullscreen mode Exit fullscreen mode

Once we’ve established that it’s a kafka.Error we can use the Code() function to access the ErrorCode and handle it with a switch:

if e != nil {
    if ke, ok := e.(kafka.Error); ok == true {
        switch ec := ke.Code(); ec {
        case kafka.ErrInvalidArg:
            fmt.Printf("😢 Can't create the producer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
        default:
            fmt.Printf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)
        }
    } else {
        // It's not a kafka.Error
        fmt.Printf("😢 Oh noes, there's a generic error creating the Producer! %v", e.Error())
    }
Enter fullscreen mode Exit fullscreen mode

So now the same mistake as before in configuring bootstrap.servers is caught and reported like this:

😢 Can't create the producer because you've configured it wrong (code: -186)!
    No such configuration property: "boostrap.servers"

To see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Enter fullscreen mode Exit fullscreen mode

Handling Producer Errors

When it comes to the Produce I’ve done the same as above - split out the creation of the message into a new variable for clarity, and added a check for error:

// Build the message
m := kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic},
    Value: []byte("Hello world")}

// Produce the message
if e := p.Produce(&m, nil); e != nil {
    fmt.Printf("😢 Darn, there's an error producing the message! %v", e.Error())
}
Enter fullscreen mode Exit fullscreen mode

But if I kill my Kafka broker and run my code, I don’t get an error. Why not? Because if you look at the documentation for Produce you’ll see that it says

This is an asynchronous call that enqueues the message on the internal transmit queue, thus returning immediately

So all we’re doing is "fire and forget". Put it on an internal queue, and we’re done. We still don’t know if it was delivered.

Producer Events

Since the Producer is called asychronously, it uses Go Channels to provide events back to the calling application about what’s going on. These can be of different types, covering both errors, and the status of messages that have been sent for producing.

  • Error events contain … errors ;-)

  • Message events contain information about messages that have been sent for producing, including whether it worked or not.

You can consume these events in two different ways:

  1. Poll the producer’s Events() channel and triage by type

  2. Create a dedicated delivery report channel, and poll Events() for errors only

Here’s an example of the first option, in which we use a Go Routine to listen to all events and handle them based on type:

go func() {
    for {
        // The `select` blocks until one of the `case` conditions
        // are met - therefore we run it in a Go Routine.
        select {
            case ev := <-p.Events():
                // Look at the type of Event we've received
                switch ev.(type) {

                case *kafka.Message:
                    // It's a delivery report
                    km := ev.(*kafka.Message)
                    if km.TopicPartition.Error != nil {
                        fmt.Printf("☠️ Failed to send message '%v' to topic '%v'\n\tErr: %v",
                            string(km.Value),
                            string(*km.TopicPartition.Topic),
                            km.TopicPartition.Error)
                    } else {
                        fmt.Printf("✅ Message '%v' delivered to topic '%v' (partition %d at offset %d)\n",
                            string(km.Value),
                            string(*km.TopicPartition.Topic),
                            km.TopicPartition.Partition,
                            km.TopicPartition.Offset)
                    }

                case kafka.Error:
                    // It's an error
                    em := ev.(kafka.Error)
                    fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)
                default:
                    // It's not anything we were expecting
                    fmt.Printf("Got an event that's not a Message or Error 👻\n\t%v\n", ev)

                }
        }
    }
}()
Enter fullscreen mode Exit fullscreen mode

Now when we produce a message successfully we receive a kafka.Message with a nil value in TopicPartition.Error and the offset of the produced message in TopicPartition.Offset:

✅ Message 'Hello world' delivered to topic 'test_topic_02' (partition 0 at offset 0)
Enter fullscreen mode Exit fullscreen mode

And if there’s a problem we get full details of it

☠️ Uh oh, caught an error:
    foobar:9092/1: Failed to resolve 'foobar:9092': nodename nor servname provided, or not known (after 64ms in state CONNECT)
Enter fullscreen mode Exit fullscreen mode

Phantom events

If you run the code as-is you’ll notice you get this

✅ Message 'Hello world' delivered to topic 'test_topic_02' (partition 0 at offset 3)

--
✨ All messages flushed from the queue
Got an event that's not a Message or Error 👻
    <nil>
Got an event that's not a Message or Error 👻
    <nil>
Got an event that's not a Message or Error 👻
    <nil>
Got an event that's not a Message or Error 👻
    <nil>
Got an event that's not a Message or Error 👻
    <nil>
…
Enter fullscreen mode Exit fullscreen mode

After the Close() is called, there are still events being consumed by our Go Routine event handler. We don’t want to be doing this (if the Producer is closed, then its Events() channel is meaningless), so use this pattern (inspired by this code) to avoid it:

// For signalling termination from main to go-routine
termChan := make(chan bool, 1)
// For signalling that termination is done from go-routine to main
doneChan := make(chan bool)

go func() {
    doTerm := false
    for !doTerm {
        select {

            // channels that we're listening to

        case <-termChan:
            doTerm = true
        }
    }

    close(doneChan)
}()

// …

// We're ready to finish
termChan <- true
// wait for go-routine to terminate
<-doneChan
// Now we can exit
p.Close()
Enter fullscreen mode Exit fullscreen mode

Make sure you Flush()

Once we’ve sent our message to the Producer, we get control back straight away, because it’s an asynchronous process. If we don’t put anything else in place the code will run on through to the Close() and exit. We want to make sure we’ve sent all the messages successfully - or not. To do this we use the Flush() function with a timeout of how long we’ll wait before considering sending messages to have failed.

// Flush the Producer queue
if r := p.Flush(10000); r > 0 {
    fmt.Printf("\n--\n⚠️ Failed to flush all messages after 10 seconds. %d message(s) remain\n", r)
} else {
    fmt.Println("\n--\n✨ All messages flushed from the queue")
}
Enter fullscreen mode Exit fullscreen mode

With this in place we get a confirmation on exit of success:

✨ All messages flushed from the queue
Enter fullscreen mode Exit fullscreen mode

or failure:

⚠️ Failed to flush all messages after 10 seconds. 1 message(s) remain
Enter fullscreen mode Exit fullscreen mode

The finished result

package main

import (
    "fmt"

    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

    // --
    // The topic is passed as a pointer to the Producer, so we can't
    // use a hard-coded literal. And a variable is a nicer way to do
    // it anyway ;-)
    topic := "test_topic_02"

    // --
    // Create Producer instance
    // https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewProducer

    // Store the config
    c := kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092"}

    // Variable p holds the new Producer instance.
    p, e := kafka.NewProducer(&c)

    // Check for errors in creating the Producer
    if e != nil {
        if ke, ok := e.(kafka.Error); ok == true {
            switch ec := ke.Code(); ec {
            case kafka.ErrInvalidArg:
                fmt.Printf("😢 Can't create the producer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
            default:
                fmt.Printf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)
            }
        } else {
            // It's not a kafka.Error
            fmt.Printf("😢 Oh noes, there's a generic error creating the Producer! %v", e.Error())
        }

    } else {

        // For signalling termination from main to go-routine
        termChan := make(chan bool, 1)
        // For signalling that termination is done from go-routine to main
        doneChan := make(chan bool)

        // --
        // Send a message using Produce()
        // https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#Producer.Produce
        //
        // Build the message
        m := kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic},
            Value: []byte("Hello world")}

        // Handle any events that we get
        go func() {
            doTerm := false
            for !doTerm {
                // The `select` blocks until one of the `case` conditions
                // are met - therefore we run it in a Go Routine.
                select {
                case ev := <-p.Events():
                    // Look at the type of Event we've received
                    switch ev.(type) {

                    case *kafka.Message:
                        // It's a delivery report
                        km := ev.(*kafka.Message)
                        if km.TopicPartition.Error != nil {
                            fmt.Printf("☠️ Failed to send message '%v' to topic '%v'\n\tErr: %v",
                                string(km.Value),
                                string(*km.TopicPartition.Topic),
                                km.TopicPartition.Error)
                        } else {
                            fmt.Printf("✅ Message '%v' delivered to topic '%v' (partition %d at offset %d)\n",
                                string(km.Value),
                                string(*km.TopicPartition.Topic),
                                km.TopicPartition.Partition,
                                km.TopicPartition.Offset)
                        }

                    case kafka.Error:
                        // It's an error
                        em := ev.(kafka.Error)
                        fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)
                    default:
                        // It's not anything we were expecting
                        fmt.Printf("Got an event that's not a Message or Error 👻\n\t%v\n", ev)

                    }
                case <-termChan:
                    doTerm = true

                }
            }
            close(doneChan)
        }()
        // Produce the message
        if e := p.Produce(&m, nil); e != nil {
            fmt.Printf("😢 Darn, there's an error producing the message! %v", e.Error())
        }

        // --
        // Flush the Producer queue
        t := 10000
        if r := p.Flush(t); r > 0 {
            fmt.Printf("\n--\n⚠️ Failed to flush all messages after %d milliseconds. %d message(s) remain\n", t, r)
        } else {
            fmt.Println("\n--\n✨ All messages flushed from the queue")
        }
        // --
        // Stop listening to events and close the producer
        // We're ready to finish
        termChan <- true
        // wait for go-routine to terminate
        <-doneChan
        // Now we can exit
        p.Close()

    }

}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)