Recentemente tenho estudado e trabalhado bastante com o Apache Kafka. Dentro do ecossistema golang existem diversas bibliotecas disponíveis para realizar essa integração, alguns exemplos são:
Cada uma tem suas particularidades, vantagens e desvantagens. Tenho realizado testes com todas e em um post futuro podemos falar mais sobre cada uma delas. Antes de irmos direto pro código vou falar um pouco sobre os Headers e quando essa funcionalidade foi incluída no Kafka.
Headers
O Kafka é totalmente agnóstico em relação ao conteúdo da mensagem que o Producer envia, ou seja, ele deixa a cargo do usuário a tarefa de enriquecer ou atribuir mais significado a uma mensagem. Uma alternativa para contornar esse problema é utilizar padrões estruturados como JSON ou AVRO onde o usuário é livre para definir os campos necessários e pode incluir metadados facilmente.
Um header é um par (chave,valor) e uma única mensagem pode conter diversos headers. Esse é um conceito encontrado em sistemas de mensagem como JMS e de transporte como TCP e HTTP e eles podem ser utilizados para roteamento, filtros e anotações. O Kafka adicionou suporte a headers a partir da sua versão v0.11.0.0. Podemos utilizar os headers para adicionar informarções extras as mensagens quem podem interessar a diferentes Consumers.
Vamos pro código
Depois dessa pequena introdução vamos ao que interessa! Antes de mais nada, precisamos de um ambiente com o Kafka configurado e executando e da biblioteca instalada no ambiente de desenvolvimento. A instalação é bem simples só precisa executar o seguinte comando:
go get -u github.com/Shopify/sarama
Após a instalação já podemos começar a escrever o código que vai estabelecer a conexão com o Kafka e produzir as mensagens. O primeiro passo é instanciar o objeto de configuração e criar o Producer.
func initProducer() (sarama.SyncProducer, error) {
// setup sarama log to stdout
sarama.Logger = log.New(os.Stdout, "", log.Ltime)
// producer config
config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
config.Version = sarama.V0_11_0_0
// create producer
prd, err := sarama.NewSyncProducer([]string{kafkaConn}, config)
return prd, err
}
Esta etapa pode parecer banal e passar despercebida quando não se tem experiência com a biblioteca. A documentação da biblioteca não deixa claro que precisamos especificar a versão do Kafka que será utilizada e sem essa configuração não iremos conseguir utilizar os headers. No meu primeiro contato com a biblioteca cheguei a perder um bom tempo sem conseguir produzir no tópico por causa dessa configuração. Após gastar algum tempo pesquisando encontrei uma issuer no repositório da biblioteca que me ajudou a entender o problema. Para quem tiver curiosidade deixarei o link para a issuer a seguir:
Header information not received in the consumer #1074
Versions
Sarama Version: v1.16.0 Kafka Version: kafka_2.11-1.0.0.tgz Go Version: go1.10 darwin/amd64
Configuration
I have written a small test reproducer. Please see attached files producer.go and consumer.go.
Logs
The output from running my test producer which has the headers set: 41973 producer.go:24] producer message &{send test message for headers [{[] []} {[116 101 115 116 72 101 97 100 101 114 49] [116 101 115 116 86 97 108 117 101 49]}] 0 0 0001-01-01 00:00:00 +0000 UTC 0 0} 41973 producer.go:27] producer message header key testHeader1, value testValue1
The output from running my test consumer which has zero headers: 41959 consumer.go:25] Message topic: send, partition: 0, offset: 0, key: , value: test message for headers 41959 consumer.go:26] Consumer message header size 0
Problem Description
When sending a message with header from producer side, the header information got lost from the consumer side.
Some information about the project I am working on: In my project, I am using the kafka header to propagate the zipkin tracing information. I am using the sarama library for the producer side and sarama-cluster library for the consumer side.
In the simple reproduce I wrote, I used sarama library for both producer and consumer to rule out potential issues from sarama-cluster library.
For the documentation, the consumer message header is supported for kafka version 0.11+. And I am using Kafka 1.0.0 version which should have the support.
Agora que tudo está configurado podemos escrever o código que vai ser responsável por produzir as mensagens no tópico. A biblioteca do Sarama possui uma estrutura para a mensagem do produtor e do consumidor. Essa estrutura irá receber informações como o tópico para o qual a mensagem será enviada, a chave dessa mensagem, os metadados, os headers e até mesmo dados que serão preenchidos somente quando a mensagem for entregue no barramento Kafka, como o offset e a partição.
func produce(message string, headers map[string]string, producer sarama.SyncProducer) {
// publish sync
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(message),
Headers: convertHeaders(headers),
}
p, o, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Error publish: ", err.Error())
}
fmt.Println("Partition: ", p)
fmt.Println("Offset: ", o)
}
func convertHeaders(headers map[string]string) []sarama.RecordHeader {
output := make([]sarama.RecordHeader, 0)
for key, value := range headers {
output = append(output, sarama.RecordHeader{
Key: []byte(key),
Value: []byte(value),
})
}
return output
}
Depois de conhecer as especificidades da biblioteca o código acima é bastante simples e de fácil compreensão. Precisamos apenas instanciar uma mensagem e atribuir os valores. Para enviarmos os headers precisamos converter para o padrão da biblioteca e por isso temos a função convertHeaders. Essa função é responsável por preencher a estrutura sarama.RecordHeader que é a modelagem da biblioteca para lidar com essa funcionalidade. Ela é bem simples e como já descrito anteriormente possui apenas uma chave e um valor que é associado a essa chave. Com tudo isso configurado utilizaremos o Producer que foi criado e configurado anteriormente e enviaremos a mensagem.
A biblioteca Sarama é sem dúvidas a mais consolidada e utilizada mas é um pouco mais verbosa e baixo nível, outras bibliotecas adicionam camadas de abstração que facilitam a vida do programador e tornam o utilização mais amigável. Cada biblioteca tem sua particularidade e as escolhas de design trazem consigo um trade off, por ser mais verbosa e com menos abstrações a biblioteca tem uma maior curva de aprendizado mas possibilita maior controle e flexibilidade ao programador, mas isso é um assunto para uma publicação futura. Abaixo deixo o código completo disponível para quem quiser copiar, estudar e realizar seu próprio teste.
Isso é tudo pessoal
Ficou com a alguma dúvida ? Ou sentiu que esqueci de alguma coisa ? Se sinta a vontade para deixar um comentário. Continuarei compartilhando o que estou estudando, aprendendo e aplicando no meu dia a dia.
Top comments (0)