DEV Community

Cover image for Implementando Transactional Outbox com Go, DynamoDB, MongoDB, Kafka e RabbitMq
Eder Matos
Eder Matos

Posted on

Implementando Transactional Outbox com Go, DynamoDB, MongoDB, Kafka e RabbitMq

Introdução

O padrão Transactional Outbox é uma solução de arquitetura que ajuda a garantir a consistência de dados entre um banco de dados e um sistema de mensageria. Ele é especialmente útil em sistemas distribuídos onde é necessário garantir que uma mensagem seja enviada somente se a transação do banco de dados for bem-sucedida. Este artigo tem como objetivo introduzir esse padrão e mostrar como implementar utilizando a linguagem Go, Apacke Kafka, DynamoDB ou MongoDB, sem grandes esforços.

Teoria por Trás do Padrão Transactional Outbox
O padrão Transactional Outbox resolve o problema de consistência de dados em sistemas distribuídos. Quando uma transação envolve tanto uma atualização no banco de dados quanto o envio de uma mensagem para um sistema de mensageria, o padrão Transactional Outbox garante que essas duas operações sejam executadas de forma atômica. Ou seja, ambas são concluídas com sucesso ou nenhuma delas é concluída. Isso é alcançado ao primeiro salvar o evento em uma tabela no banco de dados e, posteriormente, ler e processar esses eventos para enviá-los ao sistema de mensageria.

Apache Kafka
Apache Kafka é uma plataforma de streaming de eventos distribuída que permite publicar, armazenar e consumir fluxos de registros em tempo real. Ele é frequentemente usado para construir pipelines de dados e sistemas de mensageria resilientes. Kafka é conhecido por sua alta taxa de transferência, baixa latência e capacidade de armazenamento durável.

Amazon DynamoDB
Amazon DynamoDB é um serviço de banco de dados NoSQL totalmente gerenciado que oferece desempenho previsível e alta escalabilidade. DynamoDB Streams é um recurso que captura todas as alterações feitas nas tabelas do DynamoDB, permitindo que aplicações consumam e processem essas alterações em tempo real. Isso é especialmente útil para implementar o padrão Transactional Outbox, pois permite que as mudanças sejam monitoradas e processadas de forma assíncrona.

MongoDB
MongoDB é um banco de dados NoSQL orientado a documentos que utiliza documentos semelhantes a JSON com esquemas. Ele é altamente escalável e flexível, permitindo que desenvolvedores armazenem dados em estruturas complexas de maneira eficiente. MongoDB é amplamente utilizado para aplicações que exigem grande flexibilidade e desempenho em consultas, como aplicações web e móveis.

MongoDB Change Streams
O MongoDB Change Streams permite que as aplicações recebam notificações em tempo real sobre mudanças em documentos e coleções no banco de dados. Isso é particularmente útil para construir sistemas reativos e pipelines de dados que precisam processar eventos à medida que eles ocorrem no banco de dados.

Streams
Streams são fluxos de dados contínuos que podem ser processados em tempo real. Em sistemas de mensageria e banco de dados, streams permitem a captura e processamento de eventos, como alterações em registros de banco de dados ou mensagens publicadas em um tópico.

Implementação Prática

Vamos implementar uma aplicação simples em Go que utiliza o padrão Transactional Outbox para garantir a consistência de dados. Nessa aplicação, teremos um caso de uso onde iremos processar um pagamento. Para realizar essa ação, vamos receber os dados de pagamento, como número de cartão de crédito, data de expiração, CVV, nome do dono do cartão e um ID da ordem de compra. Nesta aplicação, o que vamos fazer é chamar um gateway de pagamento, para realizar o processamento do pagamento.

Caso o gateway retorne com sucesso, vamos emitir um evento informando que o pagamento foi processado com sucesso. Caso o gateway retorne com erro, vamos emitir outro evento dizendo que ocorreu uma falha no pagamento, informando no evento o motivo da falha, literalmente a mensagem de erro. Para isso, vamos iniciar com o seguinte código.

type (
    ProcessPaymentUseCase struct {
        eventEmitter   event.Emitter
        paymentGateway payment.Gateway
    }

    Input struct {
        PurchaseId         string
        Amount             float64
        CardNumber         string
        CardHolderName     string
        CardExpirationDate string
        CardCVV            string
    }
)

func New(eventEmitter event.Emitter, paymentGateway payment.Gateway) *ProcessPaymentUseCase {
    return &ProcessPaymentUseCase{eventEmitter: eventEmitter, paymentGateway: paymentGateway}
}

func (uc *ProcessPaymentUseCase) Execute(input Input) error {
    paymentInput := payment.Input{
        CardNumber:         input.CardNumber,
        CardHolderName:     input.CardHolderName,
        CardExpirationDate: input.CardExpirationDate,
        CardCVV:            input.CardCVV,
        Amount:             input.Amount,
    }
    paymentOutput, err := uc.paymentGateway.Pay(paymentInput)
    if err != nil {
        return uc.eventEmitter.Emit(events.NewPaymentFailedEvent(input.PurchaseId, err.Error()))
    }
    return uc.eventEmitter.Emit(events.NewPaymentProcessedEvent(input.PurchaseId, paymentOutput.TransactionId))
}
Enter fullscreen mode Exit fullscreen mode

Temos o nosso caso de uso, onde ele recebe duas dependências: a primeira é o EventEmitter e a segunda é o PaymentGateway. O EventEmitter é uma interface com a seguinte forma:


type Emitter interface {
    Emit(event *events.Event) error
}

type Event struct {
    ID      string            `json:"id,omitempty"`
    Name    string            `json:"name,omitempty"`
    Payload map[string]string `json:"payload,omitempty"`
}
Enter fullscreen mode Exit fullscreen mode

Já o PaymentGateway tem a seguinte forma:

type (
    Input struct {
        CardNumber         string
        CardHolderName     string
        CardExpirationDate string
        CardCVV            string
        Amount             float64
    }

    Output struct {
        TransactionId string
    }

    Gateway interface {
        Pay(payment Input) (*Output, error)
    }
)

Enter fullscreen mode Exit fullscreen mode

Para criar uma implementação fictícia, vamos criar duas implementações do PaymentGateway. Uma que aceita pagamentos acima de R$ 20,00 e outra que aceita pagamentos apenas abaixo de R$ 20,00, como o código a seguir:


type MasterCardPaymentGateway struct{}

func (m *MasterCardPaymentGateway) Pay(input payment.Input) (*payment.Output, error) {
    if input.Amount <= 20 {
        return nil, errors.New("amount too high")
    }
    return &payment.Output{
        TransactionId: uuid.NewString(),
    }, nil
}

type VisaPaymentGateway struct{}

func (v *VisaPaymentGateway) Pay(input payment.Input) (*payment.Output, error) {
    if input.Amount > 20 {
        return nil, errors.New("amount too high")
    }
    return &payment.Output{
        TransactionId: uuid.NewString(),
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

Então, como o intuito não é realmente enviar essa mensagem para o sistema de mensageria aqui, mas apenas salvar essa mensagem no banco de dados, temos uma implementação que faz exatamente isso. Simplesmente salvamos esse evento no banco. Utilizando o padrão repositório, temos o OutboxRepository, com duas implementações para este nosso artigo. A primeira utilizando MongoDB, a segunda utilizando DynamoDB. Abaixo, então, vemos o código tanto da implementação do EventEmitter como do OutboxRepository.

type OutboxEventEmitter struct {
    outboxRepository repository.OutboxRepository
}

func NewOutboxEventEmitter(outboxRepository repository.OutboxRepository) *OutboxEventEmitter {
    return &OutboxEventEmitter{outboxRepository: outboxRepository}
}

func (d *OutboxEventEmitter) Emit(event *events.Event) error {
    payload, err := json.Marshal(event)
    if err != nil {
        return err
    }
    outbox := repository.NewOutbox(event.ID, event.Name, string(payload))
    return d.outboxRepository.Save(outbox)
}
Enter fullscreen mode Exit fullscreen mode

Outbox

type (
    Outbox struct {
        Id          string     `json:"id" bson:"_id"`
        Name        string     `json:"name" bson:"name"`
        Payload     string     `json:"payload" bson:"payload"`
        Status      string     `json:"status" bson:"status"`
        CreatedAt   time.Time  `json:"created_at" bson:"created_at"`
        ProcessedAt *time.Time `json:"processed_at" bson:"processed_at"`
    }

    OutboxRepository interface {
        Save(outbox *Outbox) error
    }
)

func NewOutbox(id, name, payload string) *Outbox {
    return &Outbox{
        Id:        id,
        Name:      name,
        Payload:   payload,
        Status:    "PENDING",
        CreatedAt: time.Now(),
    }
}
Enter fullscreen mode Exit fullscreen mode

Implementação do outbox repository com MongoDB

type mongoOutboxRepository struct {
    collection *mongo.Collection
}

func NewMongoOutboxRepository(collection *mongo.Collection) OutboxRepository {
    return &mongoOutboxRepository{collection: collection}
}

func (r *mongoOutboxRepository) Save(outbox *Outbox) error {
    _, err := r.collection.InsertOne(context.TODO(), outbox)
    return err
}
Enter fullscreen mode Exit fullscreen mode

Implementação do outbox repository com DynamoDB


type dynamoDBOutboxRepository struct {
    tableName    string
    dynamoClient *dynamodb.DynamoDB
}

func NewDynamoDBOutboxRepository(tableName string, dynamoClient *dynamodb.DynamoDB) OutboxRepository {
    return &dynamoDBOutboxRepository{tableName: tableName, dynamoClient: dynamoClient}
}

func (r *dynamoDBOutboxRepository) Save(outbox *Outbox) error {
    item, err := dynamodbattribute.MarshalMap(outbox)
    if err != nil {
        return err
    }
    input := &dynamodb.PutItemInput{
        TableName: aws.String(r.tableName),
        Item:      item,
    }
    _, err = r.dynamoClient.PutItem(input)
    return err
}
Enter fullscreen mode Exit fullscreen mode

E com isso, temos o nosso serviço pronto. Dentro das suas responsabilidades, ele fez tudo o que estava proposto. Para verificarmos, temos o seguinte código de exemplo:

func main() {
    outboxRepository := mongoOutboxRepository()
    outboxEventEmitter := events.NewOutboxEventEmitter(outboxRepository)
    paymentGateway := &gateway.VisaPaymentGateway{}
    processPayment := process_payment.New(outboxEventEmitter, paymentGateway)
    input := process_payment.Input{
        PurchaseId:         uuid.NewString(),
        Amount:             10,
        CardNumber:         "1234123412341234",
        CardHolderName:     "Any name",
        CardExpirationDate: "10/2024",
        CardCVV:            "123",
    }
    err := processPayment.Execute(input)
    if err != nil {
        slog.Error("Payment process is failed", err)
        return
    }
    slog.Info("Payment process is done")
}
Enter fullscreen mode Exit fullscreen mode

Agora então, caso executemos o programa utilizando o MongoDB, podemos ver o registro inserido na coleção do MongoDB. O mesmo vale para o DynamoDB.

Envio das mensagens para o serviço de mensageria

Agora, então, temos que ter alguma forma de ler esses dados da tabela e enviá-los realmente para o serviço de mensageria. Neste caso, vamos utilizar o Kafka e o RabbitMQ. Para isso, vamos criar uma outra aplicação que também terá o Outbox repositório. Terá um emissor de evento para o RabbitMQ e um para o Kafka, e terá que, de alguma forma, obter os dados tanto do MongoDB quanto do DynamoDB, dependendo da estratégia.

Para isso, vamos definir uma interface chamada OutboxStream. Essa interface terá o método FetchEvents, que nos devolverá um canal do Go, onde, nesse canal, teremos o ID do registro que foi inserido ou do registro que deve ser processado neste momento. E, então, teremos uma implementação dessa interface para o MongoDBStreams e uma para o DynamoDBStreams.

type OutboxStream interface {
    FetchEvents() (chan string, error)
}
Enter fullscreen mode Exit fullscreen mode

Uma vez que temos este canal de eventos, podemos percorrer este canal e, para cada item que estiver no canal, podemos obter o registro que está no valor de dados a partir do repositório. Então, começamos o processamento, verificando se o registro já foi processado. Se já foi processado, simplesmente o ignoramos. Agora, se ele não foi processado ou foi processado com erro, seguimos com o processamento.

O processamento consiste basicamente em obter os dados do evento que estão no próprio registro, no campo payload, e, a partir desses dados, enviar ao sistema de mensageria normalmente. Caso o sistema de mensageria retorne algum erro, marcamos esse registro como falho, com erro, e salvamos novamente no banco para ser processado depois. Caso o envio da mensagem para o sistema de mensageria tenha sido bem-sucedido, marcamos esse registro como processado e salvamos no banco de dados.

func main() {
    eventEmitter := NewRabbitMqEventEmitter(RabbitMqServer)
    outboxRepository, outboxStream := mongoOutbox()
    outboxHandler := NewOutboxHandler(outboxRepository, eventEmitter)

    events, err := outboxStream.FetchEvents()
    if err != nil {
        panic(err)
    }

    for id := range events {
        outbox, err := outboxRepository.Get(id)
        if err != nil {
            continue
        }
        outboxHandler.Handle(outbox)
    }
}
Enter fullscreen mode Exit fullscreen mode

Struct responsável pelo processamento do registro


type OutboxHandler struct {
    outboxRepository OutboxRepository
    eventEmitter     EventEmitter
}

func NewOutboxHandler(outboxRepository OutboxRepository, eventEmitter EventEmitter) *OutboxHandler {
    return &OutboxHandler{outboxRepository: outboxRepository, eventEmitter: eventEmitter}
}

func (handler OutboxHandler) Handle(outbox *Outbox) {
    if outbox == nil || outbox.Status == "PROCESSED" {
        return
    }
    var messageEvent Event
    err := json.Unmarshal([]byte(outbox.Payload), &messageEvent)
    if err != nil {
        _ = handler.outboxRepository.Update(outbox)
        slog.Error("Error unmarshalling message event: " + err.Error())
        return
    }
    err = handler.eventEmitter.Emit(&messageEvent)
    if err != nil {
        outbox.MarkAsError()
        _ = handler.outboxRepository.Update(outbox)
        return
    }
    outbox.MarkAsProcessed()
    _ = handler.outboxRepository.Update(outbox)
}
Enter fullscreen mode Exit fullscreen mode

Portanto, agora, a única coisa que nos resta é criar a implementação para o Outbox Stream, tanto para o DynamoDB Stream quanto para o MongoDB Stream. Para isso, vamos começar, primeiramente, com o DynamoDB.

DynamoDB Streams

Aqui, basicamente, temos que entender, então, como o SDK da AWS para Go nos permite consumir os dados do stream. Abaixo, temos o código para consumir esses dados do stream

type DynamoStream struct {
    dynamoStreamClient *dynamodbstreams.DynamoDBStreams
    awsSession         *session.Session
    tableName          string
    dynamoDB           *dynamodb.DynamoDB
}

func NewDynamoStream(awsSession *session.Session, tableName string, dynamoDB *dynamodb.DynamoDB) OutboxStream {
    return &DynamoStream{
        dynamoStreamClient: dynamodbstreams.New(awsSession),
        dynamoDB:           dynamoDB,
        awsSession:         awsSession,
        tableName:          tableName,
    }
}

func (stream *DynamoStream) getStreamArn() (string, error) {
    result, err := stream.dynamoDB.DescribeTable(&dynamodb.DescribeTableInput{TableName: aws.String(stream.tableName)})
    if err != nil {
        return "", err
    }
    if result.Table.StreamSpecification != nil && *result.Table.StreamSpecification.StreamEnabled {
        return *result.Table.LatestStreamArn, nil
    }
    return "", fmt.Errorf("streams not enabled for table %s", stream.tableName)
}

func (stream *DynamoStream) FetchEvents() (chan string, error) {
    streamArn, err := stream.getStreamArn()
    if err != nil {
        return nil, err
    }
    events := make(chan string)
    describeStreamInput := &dynamodbstreams.DescribeStreamInput{StreamArn: aws.String(streamArn)}
    describeStreamOutput, err := stream.dynamoStreamClient.DescribeStream(describeStreamInput)
    if err != nil {
        return nil, err
    }
    for _, shard := range describeStreamOutput.StreamDescription.Shards {
        go stream.processShard(*shard.ShardId, events, streamArn)
    }
    return events, nil
}

func (stream *DynamoStream) processShard(shardID string, events chan<- string, streamArn string) {
    shardIteratorInput := &dynamodbstreams.GetShardIteratorInput{
        StreamArn:         aws.String(streamArn),
        ShardId:           aws.String(shardID),
        ShardIteratorType: aws.String(dynamodbstreams.ShardIteratorTypeTrimHorizon),
    }

    shardIteratorOutput, err := stream.dynamoStreamClient.GetShardIterator(shardIteratorInput)
    if err != nil {
        return
    }

    ShardIterator := shardIteratorOutput.ShardIterator
    backoff := time.Second

    for {
        getRecordsInput := &dynamodbstreams.GetRecordsInput{ShardIterator: ShardIterator}
        records, err := stream.dynamoStreamClient.GetRecords(getRecordsInput)
        if err != nil {
            continue
        }

        for _, record := range records.Records {
            id := record.Dynamodb.NewImage["id"].S
            status := record.Dynamodb.NewImage["status"].S
            if *record.EventName == "INSERT" {
                backoff = time.Second
                events <- *id
            } else if *record.EventName == "MODIFY" && *status == "ERROR" {
                backoff = time.Second
                go func(id string) {
                    time.Sleep(5 * time.Second)
                    events <- id
                }(*id)
            }
        }
        ShardIterator = records.NextShardIterator
        time.Sleep(backoff)

        if backoff < 30*time.Second {
            backoff *= 2
        } else {
            backoff = 30 * time.Second
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Analisando o código, primeiramente recebemos o nome da tabela do DynamoDB e, a partir disso, obtemos o ARN do stream. Para isso, temos que habilitar o uso do stream dentro do DynamoDB. A partir daí, temos basicamente um loop infinito que fica obtendo novos registros que podem existir no DynamoDB Stream. Podemos percorrer esses registros obtendo informações sobre o registro em si.

No nosso caso, temos o ID do registro e o status. Além disso, podemos obter a informação sobre o que aconteceu com o registro, tendo as opções de Insert, Modify e Remove. No nosso caso, sempre que um novo registro for inserido, devemos pegar o ID desse registro e adicioná-lo ao nosso canal. E caso o registro tenha sido modificado para o status de Error, também devemos pegar o ID desse registro e inseri-lo no nosso canal.

Um detalhe é que, sempre que um registro for atualizado para o status de Error, neste nosso evento, eu optei por esperar 5 segundos fixos antes de uma nova tentativa, mas podemos evoluir para estratégias de retentativas mais elaboradas.

Mas para também implementar uma regra de busca, como estamos em um loop infinito, fazer essa consulta sem um delay pode fazer com que a própria AWS nos bloqueie por questões de rate limit. Então optei por utilizar uma estratégia de back-off, onde inicialmente temos um delay de um segundo, e a cada iteração que não encontra nenhum dado no stream, multiplicamos esse delay por dois.

Na primeira iteração o delay será de um segundo, depois dois, quatro, oito, dezesseis, trinta e dois. Esse é o tempo que será esperado em segundos. Assim que esse valor ultrapassar trinta segundos, então mantemos um back-off fixo de trinta segundos. Portanto, vamos esperar exponencialmente, com um tempo máximo de trinta segundos.

MongoDB Streams

Agora, para o MongoDB, podemos utilizar o método Watch vindo da Collection para sermos notificados sempre que algo acontecer. Esse "algo" nós definimos como filtro do Watch através de uma Pipeline do MongoDB. No nosso caso, vamos tratar de duas Pipelines.

A primeira Pipeline é onde o tipo da operação seja Insert. Então, sempre que houver a inserção de um novo registro nessa Collection, vamos ser notificados. A segunda Pipeline é onde o tipo de operação seja Update e o status do registro que foi atualizado seja Error. Isso indica que o registro foi atualizado para Error, e então vamos ser notificados também. Seguimos a mesma regra que temos na implementação do DynamoDB, esperando 5 segundos no caso de Error antes de mandar para o processamento.

Além disso, ao contrário do DynamoDB Streams, o MongoDB Streams não reprocessa todos os eventos que já aconteceram, apenas novos eventos ou novas operações. Isso significa que dados que estavam esperando para serem processados no banco antes da aplicação subir não serão levados em consideração pelo Stream.

Portanto, assim que a aplicação subir, vamos fazer uma consulta na tabela buscando pelos registros que não foram processados ainda. Esses registros serão então adicionados ao canal.

Com isso, temos o código a seguir:

type MongoStream struct {
    collection *mongo.Collection
}

func NewMongoStream(collection *mongo.Collection) *MongoStream {
    return &MongoStream{collection: collection}
}

func (stream *MongoStream) FetchEvents() (chan string, error) {
    ch := make(chan string)
    go stream.consumeExistingEvents(ch)
    go stream.consumeErrorEvents(ch)
    go stream.consumeNewEvents(ch)
    return ch, nil
}

func (stream *MongoStream) consumeExistingEvents(ch chan string) {
    cursor, err := stream.collection.Find(context.TODO(), bson.M{"status": bson.M{"$ne": "PROCESSED"}})
    if err != nil {
        log.Fatalf("Failed to find existing events: %v", err)
    }
    defer cursor.Close(context.TODO())

    for cursor.Next(context.TODO()) {
        var outbox Outbox
        if err := cursor.Decode(&outbox); err != nil {
            log.Printf("Failed to decode existing outbox: %v", err)
            continue
        }
        ch <- outbox.Id
    }

    if err := cursor.Err(); err != nil {
        log.Printf("Cursor error: %v", err)
    }
}

func (stream *MongoStream) consumeNewEvents(ch chan string) {
    pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}}
    opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
    changeStream, err := stream.collection.Watch(context.TODO(), pipeline, opts)
    if err != nil {
        log.Fatalf("Failed to start change stream: %v", err)
    }

    defer changeStream.Close(context.TODO())
    defer close(ch)

    for changeStream.Next(context.TODO()) {
        var changeEvent struct {
            DocumentKey primitive.M `bson:"documentKey,omitempty"`
        }
        if err := changeStream.Decode(&changeEvent); err != nil {
            log.Printf("Failed to decode change stream document: %v", err)
            continue
        }
        ch <- changeEvent.DocumentKey["_id"].(string)
    }

    if err := changeStream.Err(); err != nil {
        log.Printf("Change stream error: %v", err)
    }
}

func (stream *MongoStream) consumeErrorEvents(ch chan string) {
    pipeline := mongo.Pipeline{bson.D{
        {"$match", bson.D{
            {"operationType", "update"},
            {"fullDocument.status", "ERROR"},
        }},
    }}
    opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
    changeStream, err := stream.collection.Watch(context.TODO(), pipeline, opts)
    if err != nil {
        log.Fatalf("Failed to start change stream: %v", err)
    }

    defer changeStream.Close(context.TODO())
    defer close(ch)

    for changeStream.Next(context.TODO()) {
        var changeEvent struct {
            DocumentKey primitive.M `bson:"documentKey,omitempty"`
        }
        if err := changeStream.Decode(&changeEvent); err != nil {
            log.Printf("Failed to decode change stream document: %v", err)
            continue
        }
        go func() {
            time.Sleep(time.Second * 5)
            ch <- changeEvent.DocumentKey["_id"].(string)
        }()
    }

    if err := changeStream.Err(); err != nil {
        log.Printf("Change stream error: %v", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

E por fim, para fecharmos o ciclo, temos a implementação para o envio do evento tanto para o Kafka quanto para o RabbitMQ.

KafkaEventEmitter

type KafkaEventEmitter struct {
    writer *kafka.Writer
}

func NewKafkaEventEmitter(brokers []string, topic string) *KafkaEventEmitter {
    return &KafkaEventEmitter{
        writer: &kafka.Writer{
            Addr:     kafka.TCP(brokers...),
            Topic:    topic,
            Balancer: &kafka.LeastBytes{},
        },
    }
}

func (k *KafkaEventEmitter) Emit(event *Event) error {
    eventBytes, err := json.Marshal(event)
    if err != nil {
        slog.Error("Error on emit event", "event", event, "error", err)
        return err
    }
    message := kafka.Message{
        Value: eventBytes,
        Topic: event.Name,
        Key:   []byte(event.ID),
    }
    return k.writer.WriteMessages(context.Background(), message)
}
Enter fullscreen mode Exit fullscreen mode

RabbitMqEventEmitter

type RabbitMqEventEmitter struct {
    connection      *amqp.Connection
    producerChannel *amqp.Channel
}

func NewRabbitMqEventEmitter(server string) EventEmitter {
    connection, err := amqp.Dial(server)
    if err != nil {
        panic(err)
    }
    producerChannel, err := connection.Channel()
    if err != nil {
        panic(err)
    }
    return &RabbitMqEventEmitter{
        connection:      connection,
        producerChannel: producerChannel,
    }
}

func (e *RabbitMqEventEmitter) Emit(event *Event) error {
    eventBytes, err := json.Marshal(event)
    if err != nil {
        slog.Error("Error on emit event", "event", event, "error", err)
        return err
    }
    err = e.producerChannel.Publish(
        "amq.direct",
        event.Name,
        false,
        false,
        amqp.Publishing{ContentType: "text/plain", Body: eventBytes},
    )
    if err != nil {
        slog.Error("Error on publish event", "event", event, "error", err)
        return err
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

E com isso, finalizamos a aplicação que irá fazer o processamento dos eventos que foram inseridos na nossa tabela. Em caso de falha na publicação no serviço de mensageria, o evento permanece com status de erro para ser processado posteriormente. Podemos ainda definir estratégias para um número limite de tentativas de publicação do evento e, caso esse número seja atingido, emitir notificações via e-mail ou algum alerta em um sistema de monitoramento para que uma equipe veja o motivo do erro.

Pensando em uma grande empresa, pode ser desenvolvido um SDK interno para abstrair a publicação do evento, que no nosso caso é o envio do evento para uma tabela, para que todos os micro-serviços não precisem conhecer os detalhes do DynamoDB, MongoDB ou do lugar onde realmente está sendo salvo.

Evoluções

Uma evolução interessante a ser feita é no próprio registro do evento, tendo também dados referentes ao serviço de mensageria específico. Por exemplo, se estamos utilizando um tópico no Kafka, a informação do nome do tópico poderia estar no evento. Ou, se estamos utilizando o RabbitMQ, no evento poderíamos ter o nome da Exchange e a routing key, que seriam utilizadas para o envio da mensagem. Se o intuito da mensagem é ser publicada no SQS, por exemplo, podemos ter o ARN ou qualquer informação que identifique a fila SQS onde a mensagem deve ser publicada.

Neste artigo abordamos isso de forma bem simples, onde o processador identifica qual o destino da mensagem, mas poderíamos evoluir para que essas informações estivessem no registro que foi salvo no banco de dados também.

Conclusão

O uso do padrão Transactional Outbox traz uma segurança muito maior para a resiliência das transações. Com o uso deste padrão, temos a opção de manter um registro de todos os eventos que ocorreram, o que pode ser extremamente útil para fins de auditoria.

A capacidade de armazenar de maneira consistente os eventos e garantir que eles sejam processados corretamente pelos sistemas de mensageria contribui para a robustez do sistema. Além disso, esse padrão permite a recuperação e reprocessamento de eventos em caso de falhas temporárias, aumentando a confiabilidade da aplicação.

A escolha entre DynamoDB, MongoDB ou qualquer outro serviço para implementar esse padrão pode depender de vários fatores, incluindo a familiaridade da equipe com a tecnologia, requisitos de escalabilidade e o ecossistema ao redor. Ambas as soluções oferecem mecanismos de streaming que podem ser utilizados para garantir a consistência de dados e facilitar a construção de sistemas resilientes.

Espero que este artigo tenha fornecido uma visão clara e detalhada sobre como implementar o padrão Transactional Outbox, juntamente com exemplos de código e considerações práticas para diferentes cenários de uso.

Repositório do Github

Top comments (0)