DEV Community

Railander Marques
Railander Marques

Posted on

Stream Dynamo para RDS Mysql

Visão Geral

Esta implementação tem como objetivo capturar eventos de alteração na tabela DynamoDB “account_answers_full” (atraves de lambda) e replicar esses eventos para um banco de dados relacional RDS (MySQL). Para isso, utiliza duas funções Lambda que se comunicam através de uma fila SQS da AWS.

Uma Lambda captura eventos de alteração na tabela DynamoDB 'account_answers_full' e os envia como mensagens para uma fila SQS, utilizando uma DLQ para gerenciar mensagens não processadas e permitir o reprocessamento dos dados em caso de falhas temporárias. Em seguida, outra Lambda consome essas mensagens da SQS, estabelece conexão com o RDS via ENI (recomenda-se utilizar RDS Proxy para gerenciar essas conexões), processa e executa operações correspondentes no banco de dados RDS MySQL.

Image description


Passo 1: DynamoDB para SQS

Image description

  1. Tabela DynamoDB:
    • Nome: account_answers_full
    • Índices Globais Secundários:
      • uid-question-index: uid (Número), question (Número)
      • uid-section_id-index: uid (Número), section_id (Número)
      • uid-usection_id-index: uid (Número), usection_id (Número)\n
  2. Trigger Lambda 1 (dynamo-to-sqs-connector):
    • Esta Lambda é acionada por eventos de alteração na tabela DynamoDB (INSERT, MODIFY, REMOVE).
    • Extrai os detalhes do evento, empacota-os em uma mensagem e os envia para uma fila SQS ==(Use fila FIFO, fila standard pode causar problemas de duplicação)==.
  • Código da Lambda 1:
const { SendMessageCommand, SQSClient } = require("@aws-sdk/client-sqs");

const sqs = new SQSClient({});
const queueUrl = process.env.SQS_QUEUE_URL; // VARIÁVEL DE AMBIENTE COM SQS FIFO - SOMENTE FIFO FUNCIONA NESSE CÓDIGO

exports.handler = async (event) => {
  for (const record of event.Records) {
    if (['INSERT', 'MODIFY', 'REMOVE'].includes(record.eventName)) {
      try {
        const item = record.dynamodb.NewImage ?? record.dynamodb.OldImage
        const message = {
          eventName: record.eventName,
          dynamodb: record.dynamodb
        };

        // Usando usection_id, section_id e question como parte do MessageGroupId
        const usection_id = item.usection_id.N;
        const section_id = item.section_id.N;
        const question = item.question.N;
        const messageGroupId = `usection-${usection_id}-section-${section_id}-question-${question}`;


        const command = new SendMessageCommand({
          QueueUrl: queueUrl,
          MessageBody: JSON.stringify(message),
          MessageGroupId: messageGroupId
        });

        await sqs.send(command);

        console.info(`Message sent to SQS: ${JSON.stringify(message)}`);
      } catch (error) {
        console.error(`Error sending message to SQS: ${error.message}`, { error });
      }
    }
  }

  console.info('Lambda execution completed');
};
Enter fullscreen mode Exit fullscreen mode

[lambda_1_eprep-9c1a3420-b362-42e0-8493-033e3e0e99b1.zip 740]
https://github.com/aldeiacloud/StreamDynamoMysql/raw/refs/heads/main/lambda_1_eprep-9c1a3420-b362-42e0-8493-033e3e0e99b1.zip

  • Variáveis de Ambiente:

    • SQS_QUEUE_URL: URL da fila SQS para onde as mensagens são enviadas. (Perceba que tem o Endpoint).

Image description

  • Logs:

Image description

Image description

ScreenShot Trigger 1:

Image description



Passo 2: SQS para RDS

Image description

  1. Trigger Lambda 2 (sqs-to-rds-connector):
    • Acionada por mensagens na fila SQS FIFO (com DLQ) enviadas pela Lambda 1.
    • Processa as mensagens, extrai os detalhes do evento DynamoDB e executa operações correspondentes no banco de dados RDS.
  • Código da Lambda 2:
const { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
const mysql = require('mysql');
const { promisify } = require('util')

// Configuração personalizada para aumentar o timeout nas tentativas de conexão
// Configuração do cliente DLQ com o endpoint HTTP
const dlqEndpointUrl = process.env.DLQ_ENDPOINT_URL;

const sqs = new SQSClient({
    maxRetries: 10,
    httpOptions: {
        timeout: 120000
    }
});

// Carrega as credenciais do RDS a partir das variáveis de ambiente
const dbConfig = {
    user: process.env.RDS_USERNAME,
    password: process.env.RDS_PASSWORD,
    host: process.env.RDS_HOST,
    database: process.env.RDS_DATABASE,
    port: process.env.RDS_PORT
};

// Função para conectar ao banco de dados MySQL com tentativas de retry
async function connectToMySQL(retryAttempts = 5, delay = 120000) {
    for (let attempt = 0; attempt < retryAttempts; attempt++) {
        try {
            const connection = mysql.createConnection(dbConfig);
            await promisify(connection.connect).call(connection);
            return connection;
        } catch (err) {
            console.error("Error connecting to MySQL", { error: err.message });
            if (attempt < retryAttempts - 1) {
                console.info(`Retrying in ${delay / 1000} seconds...`);
                await new Promise(res => setTimeout(res, delay));
            } else {
                throw err;
            }
        }
    }
}

// Função para processar um registro do SQS
async function processRecord(record) {
    let connection;
    try {
        console.info("Processing SQS message:", record);

        const messageBody = JSON.parse(record.body);
        console.info(`Parsed message body: ${record.body}`);

        const eventName = messageBody.eventName;
        const dynamodb = messageBody.dynamodb;

        const keys = dynamodb.Keys;
        const newImage = dynamodb?.NewImage || {};

        const usectionId = keys.usection_id.N;
        const question = keys.question.N;
        const sectionId = newImage?.section_id ? newImage?.section_id?.N : "";

        connection = await connectToMySQL();
        const query = promisify(connection.query).bind(connection);

        let sqlScript, sqlValue;

        // Realiza a operação correspondente com base no evento
        if (eventName === "INSERT") {
            sqlScript = "INSERT INTO account_answers (usection_id, question, section_id) VALUES (?, ?, ?)";
            sqlValue = [usectionId, question, sectionId];
            await query(sqlScript, sqlValue);
            console.info("Record inserted successfully into account_answers table", { sqlValue });

        } else if (eventName === "MODIFY") {
            sqlScript = "UPDATE account_answers SET section_id = ? WHERE usection_id = ? AND question = ?";
            sqlValue = [sectionId, usectionId, question];
            await query(sqlScript, sqlValue);
            console.info("Record modified successfully in account_answers table", { sqlValue });

        } else if (eventName === "REMOVE") {
            sqlScript = "DELETE FROM account_answers WHERE usection_id = ? AND question = ?";
            sqlValue = [usectionId, question];
            await query(sqlScript, sqlValue);
            console.info("Record removed successfully from account_answers table", { sqlValue });
        }

    } catch (err) {
        console.error("Error processing SQS message", JSON.stringify(err));
        throw new Error(err ?? "Unknow error processing SQS message");
    } finally {
        if (connection) {
            connection.end();
        }
    }
}

// Função para reprocessar mensagens da DLQ com timeout e intervalo entre tentativas
async function reprocessDlqMessages(timeoutSeconds = 300, retryIntervalSeconds = 1) {
    try {
        console.info("Starting reprocessing of DLQ messages");

        const startTime = Date.now();

        while (true) {
            // Verifica se o tempo máximo de execução foi atingido
            if ((Date.now() - startTime) > (timeoutSeconds * 1000)) {
                console.info("Timeout reached for DLQ reprocessing");
                break;
            }

            try {
                const command = new ReceiveMessageCommand({
                    QueueUrl: dlqEndpointUrl,
                    MaxNumberOfMessages: 10,
                    WaitTimeSeconds: 10
                });

                const response = await sqs.send(command)

                const messages = response.Messages || [];
                if (messages.length === 0) {
                    console.info("No more messages in DLQ");
                    break;
                }

                for (const message of messages) {
                    try {
                        await processWithRetry(message)
                        console.log('deu certo o retry dlq')
                        const command = new DeleteMessageCommand({
                            QueueUrl: dlqEndpointUrl,
                            ReceiptHandle: message.ReceiptHandle
                        });
                        await sqs.send(command);
                    } catch (err) {
                        console.error("Failed to reprocess message from DLQ", { message, error: err.message });
                    }
                }

                // Aguarda antes de tentar a próxima iteração para evitar sobrecarga no endpoint
                await new Promise(res => setTimeout(res, retryIntervalSeconds * 1000));

            } catch (err) {
                console.error("Error receiving messages from DLQ", { error: err.message });
                await new Promise(res => setTimeout(res, retryIntervalSeconds * 1000));  // Espera antes de tentar novamente em caso de falha
            }
        }

        console.info("Reprocessing of DLQ messages completed");

    } catch (err) {
        console.error("Error reprocessing DLQ messages", { error: err.message });
        throw err;
    }
}

// Função para processar um registro com tentativas de retry
async function processWithRetry(record, retries = 3, backoffInSeconds = 2) {
    let attempt = 0;
    while (attempt < retries) {
        try {
            await processRecord(record);
            console.log('sucesso')
            return;
        } catch (err) {
            attempt++;
            console.error(`Attempt ${attempt} failed: ${JSON.stringify(err)}`);
            if (attempt < retries) {
                await new Promise(res => setTimeout(res, backoffInSeconds * 1000 * (2 ** (attempt - 1))));  // Exponential backoff
            }else {
                throw err;
            }
        }
    }
}

// Função Lambda Handler
exports.handler = async (event, context) => {
    console.info("Starting the lambda_handler function for SQS to RDS");

    try {
        for (const record of event.Records) {
            try {
                await processWithRetry(record);
            } catch (err) {
                // Envia mensagem para DLQ para reprocessamento posterior
                console.error("Failed to process record after retries, sending to DLQ");
                try {
                    const command = new SendMessageCommand({
                        QueueUrl: dlqEndpointUrl,
                        MessageBody: JSON.stringify(record)
                    });

                    await sqs.send(command)
                } catch (err) {
                    console.error("Failed to send message to DLQ", { error: err.message });
                }
            }
        }

        // Reprocessa mensagens da DLQ se houver
        await reprocessDlqMessages();

    } catch (err) {
        console.error("Error processing records from SQS", { error: err.message });
    }

    console.info("lambda_handler function execution completed for SQS to RDS");
};
Enter fullscreen mode Exit fullscreen mode

[ae3075c0-896a-4473-acc2-b0e151a1a280.zip 2628477]
https://github.com/aldeiacloud/StreamDynamoMysql/raw/refs/heads/main/ae3075c0-896a-4473-acc2-b0e151a1a280.zip


:::info
Explicação e Resumo do Código

:::

1. Importações e Configurações Iniciais

const { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
const mysql = require('mysql');
const { promisify } = require('util');
Enter fullscreen mode Exit fullscreen mode
  • SQSClient e comandos: Importa classes para interagir com o SQS, que é um serviço de fila gerenciado da AWS.
  • mysql: Para conexão com um banco de dados MySQL.
  • promisify: Utilizado para transformar funções que usam callbacks em funções que retornam promessas, facilitando o uso de async/await.

2. Configuração do Cliente SQS


const dlqEndpointUrl = process.env.DLQ_ENDPOINT_URL;

const sqs = new SQSClient({
    maxRetries: 10,
    httpOptions: {
        timeout: 120000
    }
});
Enter fullscreen mode Exit fullscreen mode
  • dlqEndpointUrl: URL da Dead Letter Queue (DLQ), onde mensagens falhas são enviadas.
  • SQSClient: Configura o cliente com um número máximo de tentativas e um timeout para requisições HTTP.

3. Configuração do Banco de Dados


const dbConfig = {
    user: process.env.RDS_USERNAME,
    password: process.env.RDS_PASSWORD,
    host: process.env.RDS_HOST,
    database: process.env.RDS_DATABASE,
    port: process.env.RDS_PORT
};
Enter fullscreen mode Exit fullscreen mode
  • Carrega as credenciais do RDS (Relational Database Service) a partir de variáveis de ambiente.

4. Conexão ao Banco de Dados


async function connectToMySQL(retryAttempts = 5, delay = 120000) {
    // Tenta conectar ao MySQL com múltiplas tentativas
}
Enter fullscreen mode Exit fullscreen mode
  • Tenta criar uma conexão com o banco de dados e, se falhar, aguarda um tempo antes de tentar novamente.

5. Processamento de Mensagens do SQS


async function processRecord(record) {
    let connection;
    try {
        const messageBody = JSON.parse(record.body);
        const eventName = messageBody.eventName;
        const dynamodb = messageBody.dynamodb;
        const keys = dynamodb.Keys;
        const newImage = dynamodb?.NewImage || {};

        const usectionId = keys.usection_id.N;
        const question = keys.question.N;
        const sectionId = newImage?.section_id ? newImage?.section_id?.N : "";

        connection = await connectToMySQL();
        const query = promisify(connection.query).bind(connection);

        let sqlScript, sqlValue;

        // Operações com base no tipo de evento
        if (eventName === "INSERT") {
            sqlScript = "INSERT INTO account_answers (usection_id, question, section_id) VALUES (?, ?, ?)";
            sqlValue = [usectionId, question, sectionId];
            await query(sqlScript, sqlValue);
            console.info("Record inserted successfully");

        } else if (eventName === "MODIFY") {
            sqlScript = "UPDATE account_answers SET section_id = ? WHERE usection_id = ? AND question = ?";
            sqlValue = [sectionId, usectionId, question];
            await query(sqlScript, sqlValue);
            console.info("Record modified successfully");

        } else if (eventName === "REMOVE") {
            sqlScript = "DELETE FROM account_answers WHERE usection_id = ? AND question = ?";
            sqlValue = [usectionId, question];
            await query(sqlScript, sqlValue);
            console.info("Record removed successfully");
        }

    } catch (err) {
        console.error("Error processing SQS message", JSON.stringify(err));
        throw new Error(err ?? "Unknown error processing SQS message");
    } finally {
        if (connection) {
            connection.end();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
  • processRecord: Recebe uma mensagem do SQS e processa com base no tipo de evento (INSERT, MODIFY, REMOVE).
    • INSERT: Adiciona um novo registro na tabela account_answers.
    • MODIFY: Atualiza um registro existente.
    • REMOVE: Remove um registro existente.
  • A conexão com o MySQL é encerrada após a operação.

6. Reprocessamento de Mensagens da DLQ


async function reprocessDlqMessages(timeoutSeconds = 300, retryIntervalSeconds = 1) {
    // Tenta reprocessar mensagens na DLQ
}
Enter fullscreen mode Exit fullscreen mode
  • Este método tenta ler mensagens da DLQ e processá-las novamente.
  • Se a reprocessamento falhar, loga um erro e continua tentando.

7. Tentativas de Processamento com Retry


async function processWithRetry(record, retries = 3, backoffInSeconds = 2) {
    // Tenta processar um registro várias vezes com um intervalo entre as tentativas
}
Enter fullscreen mode Exit fullscreen mode
  • Implementa um mecanismo de retry com um backoff exponencial, permitindo que o sistema tente processar mensagens que falharam antes.\n

8. Função Lambda Handler


exports.handler = async (event, context) => {
    // Função principal que é chamada quando a Lambda é acionada
}
Enter fullscreen mode Exit fullscreen mode
  • Processa cada registro recebido do SQS.
  • Se uma mensagem falhar após as tentativas, ela é enviada para a DLQ.
  • Também tenta reprocessar mensagens da DLQ ao final do processamento.

Resumo

  1. Inserção: Quando um evento de inserção ocorre, um novo registro é adicionado ao banco.
  2. Atualização: Para eventos de modificação, um registro existente é atualizado.
  3. Remoção: Para eventos de remoção, um registro específico é deletado.
  4. SQS e DLQ: Mensagens são enviadas para uma fila (SQS) e, se falharem várias vezes, são redirecionadas para uma DLQ para posterior análise.
  5. Conexões: O código gerencia as conexões ao MySQL, garantindo que sejam fechadas após o uso.

Screenshots:

Image description

Image description

Image description

Image description

Image description

Observações:
:::warning

  1. Colocar todas as permissões dos serviços necessárias, incluindo SQS, Dynamo, CloudWatch...
  2. A Lambda 2, que conecta no banco de dados, precisa ter uma ENI, para fazer a conexão (Verificar SG)
  3. Esse tutorial esta configurado para fazer insert, delete ou update em 1 tabela especifica em 3 colunas, caso necessite de mais, precisa ajustar as 2 lambdas no tratamento do SQS e no script SQL.

:::

Configuração do SQS e Teste de Carga:

Filas SQS Fifo

Image description

Fila 1 - SQS

Image description

Image description

Image description

Fila 2 - SQS DLQ

Image description

Image description

Image description


:::info
Ponderações:

:::

Lambda 2

Duração baixa e quantidade de invocação alta, 100% de sucesso.

Image description

Quantidade Máxima de conexão no RDS:

Image description

Lambda 1

Percebemos um total de concurrents maior no envio do dado do Dynamo para o SQS, sem erros no envio, invocações muito altas, alimentando a fila de SQS, que contem uma outra fila DLQ para reprocessar em caso de falha temporária.

Image description


:::success
Fiz outro teste de carga, reiniciando o banco de dados antes da execução e a arquitetura conseguiu inserir as 10 mil linhas após o banco iniciar:

:::

Image description

(…)

Image description

Image description


:::success
Resultados e Observações:

Não tive problemas de muitas conexões no RDS ou problemas no SQS:

1- Endpoint SQS para evitar timeout no endpoint para conexão no SQS, tanto na DLQ tanto na principal;

2- Escolha da quantidade de concurrency da lambda 2, quanto maior, mais conexões no RDS;

3- Garantir o uso da fila do tipo Fifo com desduplicação ativa.

4- Fiz um teste reiniciando o banco de dados e os dados foram enviados para o banco após reiniciar.

:::


Top comments (0)