Here are the 2 prerequisites to test the solution of integrating Lambda with RDS via Secrets Manager on AWS
AWS Account: You need to have an AWS account to access AWS services. If you don’t have one, you can sign up for an AWS account at aws.amazon.com.
Budget Alarms: Set up budget alarms in AWS Budgets to monitor your AWS spending. This helps you avoid unexpected charges while testing the solution.
You need to create a VPC with private subnets so that we can run our RDS instance in private subnet
Before creating RDS, I will create my DB subnet groups so that when I launch my RDS instance I can select DB subnet group
I am going to launch an RDS MySQL instance within Free tier
Db instance class : db.t2.micro
Db subnet group : : secret-manager-subnet
I will deploy my RDS instance to VPC and RDS db subnet group created in the above steps
I will select passwords to be managed in secrets manager
I will create Lambda function and in our VPC and private subnets. I will leave Role creation by default.
Then I will add all necessary permissions to the role — RDS,EC2(createnetworkinterface), secrets manager
We need pyMySQL modules, so the easiest way is to add an AWS layer's to python function. I will add AWSSDKPandas-Python312
The role added to the lambda function should have permissions to read secret manager, so modify the policy to give read permissions to secret manager
I will use the following python code to test the connection using secret manager. In the general configuration, I will add timeout to 1 min 3sec
import pymysql
import json
import boto3
def lambda_handler(event, context):
# Initialize the Secrets Manager client
client = boto3.client('secretsmanager')
# Retrieve the secret value
secret_name = “<SECRET MANAGER NAME>”
response = client.get_secret_value(SecretId=secret_name)
secret = json.loads(response['SecretString'])
# Extract database credentials
db_username = secret['username']
db_password = secret['password']
db_host = “RDS ENDPOINT“
# Establish a connection to the database
try:
conn = pymysql.connect(host=db_host, user=db_username, password=db_password, connect_timeout=5)
print("Successfully connected to the database")
# Perform operations here if needed
# For example:
# with conn.cursor() as cursor:
# cursor.execute("SELECT * FROM your_table;")
# result = cursor.fetchall()
# print(result)
except Exception as e:
print("Database connection error:", e)
finally:
# Close the database connection
if conn:
conn.close()
return {
'statusCode': 200,
'body': json.dumps('Database connection successful!')
}
I will configure a custom test event
If you have any concerns/doubts/help follow me regarding this post, you can DM on LinkedIn, and also I invite you to my discord server and stay updated on cloud and DevOps.
Top comments (0)