Visão Geral
Esta implementação tem como objetivo capturar eventos de alteração na tabela DynamoDB “account_answers_full” (atraves de lambda) e replicar esses eventos para um banco de dados relacional RDS (MySQL). Para isso, utiliza duas funções Lambda que se comunicam através de uma fila SQS da AWS.
Uma Lambda captura eventos de alteração na tabela DynamoDB 'account_answers_full' e os envia como mensagens para uma fila SQS, utilizando uma DLQ para gerenciar mensagens não processadas e permitir o reprocessamento dos dados em caso de falhas temporárias. Em seguida, outra Lambda consome essas mensagens da SQS, estabelece conexão com o RDS via ENI (recomenda-se utilizar RDS Proxy para gerenciar essas conexões), processa e executa operações correspondentes no banco de dados RDS MySQL.
Passo 1: DynamoDB para SQS
-
Tabela DynamoDB:
- Nome: account_answers_full
- Índices Globais Secundários:
- uid-question-index: uid (Número), question (Número)
- uid-section_id-index: uid (Número), section_id (Número)
- uid-usection_id-index: uid (Número), usection_id (Número)\n
-
Trigger Lambda 1 (dynamo-to-sqs-connector):
- Esta Lambda é acionada por eventos de alteração na tabela DynamoDB (INSERT, MODIFY, REMOVE).
- Extrai os detalhes do evento, empacota-os em uma mensagem e os envia para uma fila SQS ==(Use fila FIFO, fila standard pode causar problemas de duplicação)==.
- Código da Lambda 1:
const { SendMessageCommand, SQSClient } = require("@aws-sdk/client-sqs");
const sqs = new SQSClient({});
const queueUrl = process.env.SQS_QUEUE_URL; // VARIÁVEL DE AMBIENTE COM SQS FIFO - SOMENTE FIFO FUNCIONA NESSE CÓDIGO
exports.handler = async (event) => {
for (const record of event.Records) {
if (['INSERT', 'MODIFY', 'REMOVE'].includes(record.eventName)) {
try {
const item = record.dynamodb.NewImage ?? record.dynamodb.OldImage
const message = {
eventName: record.eventName,
dynamodb: record.dynamodb
};
// Usando usection_id, section_id e question como parte do MessageGroupId
const usection_id = item.usection_id.N;
const section_id = item.section_id.N;
const question = item.question.N;
const messageGroupId = `usection-${usection_id}-section-${section_id}-question-${question}`;
const command = new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(message),
MessageGroupId: messageGroupId
});
await sqs.send(command);
console.info(`Message sent to SQS: ${JSON.stringify(message)}`);
} catch (error) {
console.error(`Error sending message to SQS: ${error.message}`, { error });
}
}
}
console.info('Lambda execution completed');
};
[lambda_1_eprep-9c1a3420-b362-42e0-8493-033e3e0e99b1.zip 740]
https://github.com/aldeiacloud/StreamDynamoMysql/raw/refs/heads/main/lambda_1_eprep-9c1a3420-b362-42e0-8493-033e3e0e99b1.zip
-
Variáveis de Ambiente:
- SQS_QUEUE_URL: URL da fila SQS para onde as mensagens são enviadas. (Perceba que tem o Endpoint).
- Logs:
ScreenShot Trigger 1:
Passo 2: SQS para RDS
-
Trigger Lambda 2 (sqs-to-rds-connector):
- Acionada por mensagens na fila SQS FIFO (com DLQ) enviadas pela Lambda 1.
- Processa as mensagens, extrai os detalhes do evento DynamoDB e executa operações correspondentes no banco de dados RDS.
- Código da Lambda 2:
const { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
const mysql = require('mysql');
const { promisify } = require('util')
// Configuração personalizada para aumentar o timeout nas tentativas de conexão
// Configuração do cliente DLQ com o endpoint HTTP
const dlqEndpointUrl = process.env.DLQ_ENDPOINT_URL;
const sqs = new SQSClient({
maxRetries: 10,
httpOptions: {
timeout: 120000
}
});
// Carrega as credenciais do RDS a partir das variáveis de ambiente
const dbConfig = {
user: process.env.RDS_USERNAME,
password: process.env.RDS_PASSWORD,
host: process.env.RDS_HOST,
database: process.env.RDS_DATABASE,
port: process.env.RDS_PORT
};
// Função para conectar ao banco de dados MySQL com tentativas de retry
async function connectToMySQL(retryAttempts = 5, delay = 120000) {
for (let attempt = 0; attempt < retryAttempts; attempt++) {
try {
const connection = mysql.createConnection(dbConfig);
await promisify(connection.connect).call(connection);
return connection;
} catch (err) {
console.error("Error connecting to MySQL", { error: err.message });
if (attempt < retryAttempts - 1) {
console.info(`Retrying in ${delay / 1000} seconds...`);
await new Promise(res => setTimeout(res, delay));
} else {
throw err;
}
}
}
}
// Função para processar um registro do SQS
async function processRecord(record) {
let connection;
try {
console.info("Processing SQS message:", record);
const messageBody = JSON.parse(record.body);
console.info(`Parsed message body: ${record.body}`);
const eventName = messageBody.eventName;
const dynamodb = messageBody.dynamodb;
const keys = dynamodb.Keys;
const newImage = dynamodb?.NewImage || {};
const usectionId = keys.usection_id.N;
const question = keys.question.N;
const sectionId = newImage?.section_id ? newImage?.section_id?.N : "";
connection = await connectToMySQL();
const query = promisify(connection.query).bind(connection);
let sqlScript, sqlValue;
// Realiza a operação correspondente com base no evento
if (eventName === "INSERT") {
sqlScript = "INSERT INTO account_answers (usection_id, question, section_id) VALUES (?, ?, ?)";
sqlValue = [usectionId, question, sectionId];
await query(sqlScript, sqlValue);
console.info("Record inserted successfully into account_answers table", { sqlValue });
} else if (eventName === "MODIFY") {
sqlScript = "UPDATE account_answers SET section_id = ? WHERE usection_id = ? AND question = ?";
sqlValue = [sectionId, usectionId, question];
await query(sqlScript, sqlValue);
console.info("Record modified successfully in account_answers table", { sqlValue });
} else if (eventName === "REMOVE") {
sqlScript = "DELETE FROM account_answers WHERE usection_id = ? AND question = ?";
sqlValue = [usectionId, question];
await query(sqlScript, sqlValue);
console.info("Record removed successfully from account_answers table", { sqlValue });
}
} catch (err) {
console.error("Error processing SQS message", JSON.stringify(err));
throw new Error(err ?? "Unknow error processing SQS message");
} finally {
if (connection) {
connection.end();
}
}
}
// Função para reprocessar mensagens da DLQ com timeout e intervalo entre tentativas
async function reprocessDlqMessages(timeoutSeconds = 300, retryIntervalSeconds = 1) {
try {
console.info("Starting reprocessing of DLQ messages");
const startTime = Date.now();
while (true) {
// Verifica se o tempo máximo de execução foi atingido
if ((Date.now() - startTime) > (timeoutSeconds * 1000)) {
console.info("Timeout reached for DLQ reprocessing");
break;
}
try {
const command = new ReceiveMessageCommand({
QueueUrl: dlqEndpointUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 10
});
const response = await sqs.send(command)
const messages = response.Messages || [];
if (messages.length === 0) {
console.info("No more messages in DLQ");
break;
}
for (const message of messages) {
try {
await processWithRetry(message)
console.log('deu certo o retry dlq')
const command = new DeleteMessageCommand({
QueueUrl: dlqEndpointUrl,
ReceiptHandle: message.ReceiptHandle
});
await sqs.send(command);
} catch (err) {
console.error("Failed to reprocess message from DLQ", { message, error: err.message });
}
}
// Aguarda antes de tentar a próxima iteração para evitar sobrecarga no endpoint
await new Promise(res => setTimeout(res, retryIntervalSeconds * 1000));
} catch (err) {
console.error("Error receiving messages from DLQ", { error: err.message });
await new Promise(res => setTimeout(res, retryIntervalSeconds * 1000)); // Espera antes de tentar novamente em caso de falha
}
}
console.info("Reprocessing of DLQ messages completed");
} catch (err) {
console.error("Error reprocessing DLQ messages", { error: err.message });
throw err;
}
}
// Função para processar um registro com tentativas de retry
async function processWithRetry(record, retries = 3, backoffInSeconds = 2) {
let attempt = 0;
while (attempt < retries) {
try {
await processRecord(record);
console.log('sucesso')
return;
} catch (err) {
attempt++;
console.error(`Attempt ${attempt} failed: ${JSON.stringify(err)}`);
if (attempt < retries) {
await new Promise(res => setTimeout(res, backoffInSeconds * 1000 * (2 ** (attempt - 1)))); // Exponential backoff
}else {
throw err;
}
}
}
}
// Função Lambda Handler
exports.handler = async (event, context) => {
console.info("Starting the lambda_handler function for SQS to RDS");
try {
for (const record of event.Records) {
try {
await processWithRetry(record);
} catch (err) {
// Envia mensagem para DLQ para reprocessamento posterior
console.error("Failed to process record after retries, sending to DLQ");
try {
const command = new SendMessageCommand({
QueueUrl: dlqEndpointUrl,
MessageBody: JSON.stringify(record)
});
await sqs.send(command)
} catch (err) {
console.error("Failed to send message to DLQ", { error: err.message });
}
}
}
// Reprocessa mensagens da DLQ se houver
await reprocessDlqMessages();
} catch (err) {
console.error("Error processing records from SQS", { error: err.message });
}
console.info("lambda_handler function execution completed for SQS to RDS");
};
[ae3075c0-896a-4473-acc2-b0e151a1a280.zip 2628477]
https://github.com/aldeiacloud/StreamDynamoMysql/raw/refs/heads/main/ae3075c0-896a-4473-acc2-b0e151a1a280.zip
:::info
Explicação e Resumo do Código
:::
1. Importações e Configurações Iniciais
const { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
const mysql = require('mysql');
const { promisify } = require('util');
- SQSClient e comandos: Importa classes para interagir com o SQS, que é um serviço de fila gerenciado da AWS.
- mysql: Para conexão com um banco de dados MySQL.
-
promisify: Utilizado para transformar funções que usam callbacks em funções que retornam promessas, facilitando o uso de
async/await
.
2. Configuração do Cliente SQS
const dlqEndpointUrl = process.env.DLQ_ENDPOINT_URL;
const sqs = new SQSClient({
maxRetries: 10,
httpOptions: {
timeout: 120000
}
});
- dlqEndpointUrl: URL da Dead Letter Queue (DLQ), onde mensagens falhas são enviadas.
- SQSClient: Configura o cliente com um número máximo de tentativas e um timeout para requisições HTTP.
3. Configuração do Banco de Dados
const dbConfig = {
user: process.env.RDS_USERNAME,
password: process.env.RDS_PASSWORD,
host: process.env.RDS_HOST,
database: process.env.RDS_DATABASE,
port: process.env.RDS_PORT
};
- Carrega as credenciais do RDS (Relational Database Service) a partir de variáveis de ambiente.
4. Conexão ao Banco de Dados
async function connectToMySQL(retryAttempts = 5, delay = 120000) {
// Tenta conectar ao MySQL com múltiplas tentativas
}
- Tenta criar uma conexão com o banco de dados e, se falhar, aguarda um tempo antes de tentar novamente.
5. Processamento de Mensagens do SQS
async function processRecord(record) {
let connection;
try {
const messageBody = JSON.parse(record.body);
const eventName = messageBody.eventName;
const dynamodb = messageBody.dynamodb;
const keys = dynamodb.Keys;
const newImage = dynamodb?.NewImage || {};
const usectionId = keys.usection_id.N;
const question = keys.question.N;
const sectionId = newImage?.section_id ? newImage?.section_id?.N : "";
connection = await connectToMySQL();
const query = promisify(connection.query).bind(connection);
let sqlScript, sqlValue;
// Operações com base no tipo de evento
if (eventName === "INSERT") {
sqlScript = "INSERT INTO account_answers (usection_id, question, section_id) VALUES (?, ?, ?)";
sqlValue = [usectionId, question, sectionId];
await query(sqlScript, sqlValue);
console.info("Record inserted successfully");
} else if (eventName === "MODIFY") {
sqlScript = "UPDATE account_answers SET section_id = ? WHERE usection_id = ? AND question = ?";
sqlValue = [sectionId, usectionId, question];
await query(sqlScript, sqlValue);
console.info("Record modified successfully");
} else if (eventName === "REMOVE") {
sqlScript = "DELETE FROM account_answers WHERE usection_id = ? AND question = ?";
sqlValue = [usectionId, question];
await query(sqlScript, sqlValue);
console.info("Record removed successfully");
}
} catch (err) {
console.error("Error processing SQS message", JSON.stringify(err));
throw new Error(err ?? "Unknown error processing SQS message");
} finally {
if (connection) {
connection.end();
}
}
}
-
processRecord: Recebe uma mensagem do SQS e processa com base no tipo de evento (INSERT, MODIFY, REMOVE).
-
INSERT: Adiciona um novo registro na tabela
account_answers
. - MODIFY: Atualiza um registro existente.
- REMOVE: Remove um registro existente.
-
INSERT: Adiciona um novo registro na tabela
- A conexão com o MySQL é encerrada após a operação.
6. Reprocessamento de Mensagens da DLQ
async function reprocessDlqMessages(timeoutSeconds = 300, retryIntervalSeconds = 1) {
// Tenta reprocessar mensagens na DLQ
}
- Este método tenta ler mensagens da DLQ e processá-las novamente.
- Se a reprocessamento falhar, loga um erro e continua tentando.
7. Tentativas de Processamento com Retry
async function processWithRetry(record, retries = 3, backoffInSeconds = 2) {
// Tenta processar um registro várias vezes com um intervalo entre as tentativas
}
- Implementa um mecanismo de retry com um backoff exponencial, permitindo que o sistema tente processar mensagens que falharam antes.\n
8. Função Lambda Handler
exports.handler = async (event, context) => {
// Função principal que é chamada quando a Lambda é acionada
}
- Processa cada registro recebido do SQS.
- Se uma mensagem falhar após as tentativas, ela é enviada para a DLQ.
- Também tenta reprocessar mensagens da DLQ ao final do processamento.
Resumo
- Inserção: Quando um evento de inserção ocorre, um novo registro é adicionado ao banco.
- Atualização: Para eventos de modificação, um registro existente é atualizado.
- Remoção: Para eventos de remoção, um registro específico é deletado.
- SQS e DLQ: Mensagens são enviadas para uma fila (SQS) e, se falharem várias vezes, são redirecionadas para uma DLQ para posterior análise.
- Conexões: O código gerencia as conexões ao MySQL, garantindo que sejam fechadas após o uso.
Screenshots:
Observações:
:::warning
- Colocar todas as permissões dos serviços necessárias, incluindo SQS, Dynamo, CloudWatch...
- A Lambda 2, que conecta no banco de dados, precisa ter uma ENI, para fazer a conexão (Verificar SG)
- Esse tutorial esta configurado para fazer insert, delete ou update em 1 tabela especifica em 3 colunas, caso necessite de mais, precisa ajustar as 2 lambdas no tratamento do SQS e no script SQL.
:::
Configuração do SQS e Teste de Carga:
Filas SQS Fifo
Fila 1 - SQS
Fila 2 - SQS DLQ
:::info
Ponderações:
:::
Lambda 2
Duração baixa e quantidade de invocação alta, 100% de sucesso.
Quantidade Máxima de conexão no RDS:
Lambda 1
Percebemos um total de concurrents maior no envio do dado do Dynamo para o SQS, sem erros no envio, invocações muito altas, alimentando a fila de SQS, que contem uma outra fila DLQ para reprocessar em caso de falha temporária.
:::success
Fiz outro teste de carga, reiniciando o banco de dados antes da execução e a arquitetura conseguiu inserir as 10 mil linhas após o banco iniciar:
:::
(…)
:::success
Resultados e Observações:
Não tive problemas de muitas conexões no RDS ou problemas no SQS:
1- Endpoint SQS para evitar timeout no endpoint para conexão no SQS, tanto na DLQ tanto na principal;
2- Escolha da quantidade de concurrency da lambda 2, quanto maior, mais conexões no RDS;
3- Garantir o uso da fila do tipo Fifo com desduplicação ativa.
4- Fiz um teste reiniciando o banco de dados e os dados foram enviados para o banco após reiniciar.
:::
Top comments (0)