DEV Community

Alexander Hose for AWS Community Builders

Posted on • Originally published at alexanderhose.com on

How To Safeguard Your AWS S3 Bucket Files From Malicious Intruders With VirusTotal

AWS S3 is a great storage platform for hosting data. However, it's also a target for malicious intruders. 🎯

Most applications allow the user to upload their own data on S3 buckets. For example, to store images πŸ“·, videos πŸ“Ή, or any other custom data.

How can you safeguard your AWS S3 bucket from attackers uploading malicious files?

One way to safeguard your AWS S3 bucket files is to use VirusTotal to scan them for malware. πŸ” This free online service allows you to quickly scan files for viruses and other malicious content.

In this article, we will create lambda functions who are performing all the logic to scan the files and merge everything together in a Step Function.

Create S3 bucket 🚧

In the first step, we create a new bucket or choose an existing one. The bucket configuration doesn't matter, but we need to ensure that the Amazon EventBridge notifications are enabled. We later use this to trigger our Step Function. You can enable this in the properties of your bucket underneath Event notifications.

S3 Amazon EventBridge notifications

Generate VirusTotal API key ✨

Next, we go to https://www.virustotal.com/ and create an account. Please make sure that the free account is only a limited version and that you are not allowed to use it for business workflows, commercial products, or services.

After the successful registration, we can navigate to our profile https://www.virustotal.com/gui/user/username/apikey and obtain the API key.

Add VirusTotal API key to AWS Secrets Manager πŸ”‘

We take the API key from the previous step and create a new AWS Secrets Manager secret. For later reference, we need to note down the secret name. The key itself should be named api_key and the secret virustotal_api_key.

VirusTotal secret

Create IAM role πŸ›‘οΈ

We need to create an IAM role with access to the encryption keys for the AWS Secrets Manager and access to the S3 bucket. For the S3 access, I have attached the AmazonS3FullAccess role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "secretsmanager:GetSecretValue",
                "ssm:GetParameter"
            ],
            "Resource": "*"
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode

🚨 Make sure to adjust the policies to reflect the least privilege principle

As we attach the role to Lambda, we need to enable a trusted relationship:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode

Create lambda functions πŸ’‘

We need to create two Lambda functions. The first one will upload the file to VirusTotal and the second one will get the results of the scan. Please make sure that both functions have the AWS-Parameters-and-Secrets-Lambda-Extension layer added. You can do that from the bottom of the Code section in your lambda function.

AWS Lambda layers

VirustotalScan

import json
import os
import urllib3
import time
import boto3

s3Client = boto3.client('s3')

url = "https://www.virustotal.com/api/v3/files"

def lambda_handler(event, context):
    secret_id = "virustotal_api_key"
    secret_key = "api_key"
    auth_headers = {"X-Aws-Parameters-Secrets-Token": os.environ.get('AWS_SESSION_TOKEN')}

    http = urllib3.PoolManager()
    r = http.request("GET", "http://localhost:2773/secretsmanager/get?secretId=" + secret_id, headers=auth_headers)

    parameter = json.loads(r.data)
    VIRUSTOTAL_API_KEY = json.loads(parameter["SecretString"])[secret_key]
    headers = {
        "accept": "application/json",
        "x-apikey": VIRUSTOTAL_API_KEY,
    }

    bucketName = event['detail']['bucket']['name']
    objectName = event['detail']['object']['key']
    fileName = event['detail']['object']['key'].split('/')[-1]
    s3file = s3Client.get_object(Bucket=bucketName, Key=objectName)
    s3ContentType = s3file['ContentType']
    s3file = s3file['Body'].read()

    files = {"file": (fileName, s3file, s3ContentType)}
    response = http.request('POST', url, headers=headers, fields = files)
    analysisURL = json.loads(response.data)["data"]["links"]["self"]
    return {
        'analysisURL': analysisURL,
        'bucket': bucketName,
        'key': objectName
    }
Enter fullscreen mode Exit fullscreen mode

This function will first get the API key from the AWS Secrets Manager.

To call the VirusTotal API, we need to always add the API key into the header of the HTTP call:

headers = {
    "accept": "application/json",
    "x-apikey": VIRUSTOTAL_API_KEY,
}
Enter fullscreen mode Exit fullscreen mode

Next, we want to get the file that was uploaded to S3 and save it in the s3file variable. We can retrieve this information from the event variable. This variable is populated by the service triggering the lambda function. Additionally, we want to save the file type in the s3ContentType variable.

bucketName = event['detail']['bucket']['name']
objectName = event['detail']['object']['key']
fileName = event['detail']['object']['key'].split('/')[-1]
s3file = s3Client.get_object(Bucket=bucketName, Key=objectName)
s3ContentType = s3file['ContentType']
s3file = s3file['Body'].read()
Enter fullscreen mode Exit fullscreen mode

Now we can send the file to VirusTotal for the scan.

files = {"file": (fileName, s3file, s3ContentType)}
response = http.request('POST', url, headers=headers, fields = files)
analysisURL = json.loads(response.data)["data"]["links"]["self"]
Enter fullscreen mode Exit fullscreen mode

As the scan can take some time, we return the unique URL of the scan and the S3 file information. This information will be passed to the next lambda function.

return {
    'analysisURL': analysisURL,
    'bucket': bucketName,
    'key': objectName
}
Enter fullscreen mode Exit fullscreen mode

VirustotalResults

import json
import os
import urllib3
import time
import boto3

s3Client = boto3.client('s3')

def lambda_handler(event, context):
    secret_id = "virustotal_api_key"
    secret_key = "api_key"
    auth_headers = {"X-Aws-Parameters-Secrets-Token": os.environ.get('AWS_SESSION_TOKEN')}

    http = urllib3.PoolManager()
    r = http.request("GET", "http://localhost:2773/secretsmanager/get?secretId=" + secret_id, headers=auth_headers)

    parameter = json.loads(r.data)
    VIRUSTOTAL_API_KEY = json.loads(parameter["SecretString"])[secret_key]
    headers = {
        "accept": "application/json",
        "x-apikey": VIRUSTOTAL_API_KEY,
    }

    response = http.request('GET', event['analysisURL'], headers=headers)
    response = json.loads(response.data)
    status = response['data']['attributes']['status']
    if status != "queued":
        suspicious = response['data']['attributes']['stats']['suspicious']
        malicious = response['data']['attributes']['stats']['malicious']
        if(malicious > 0):
            s3Client.put_object_tagging(
                Bucket= event['bucket'],
                Key= event['key'],
                Tagging={
                    'TagSet': [
                        {
                            'Key': 'malicious',
                            'Value': 'true'
                        },
                    ]
                }
            )
        else:
            s3Client.put_object_tagging(
                Bucket= event['bucket'],
                Key= event['key'],
                Tagging={
                    'TagSet': [
                        {
                            'Key': 'malicious',
                            'Value': 'false'
                        },
                    ]
                }
            )
        return {
            'status': 'sucess',
            'suspicious': suspicious,
            'malicious': malicious
        }
    else:
        return {
            'status': 'failed'
        }
Enter fullscreen mode Exit fullscreen mode

This lambda function will retrieve the results of the VirusTotal scan. First, we take the analysis URL from the previous function. It is stored in the event variable. If we call the analysis URL we get information about the status of the analysis and the results of the different scanners.

response = http.request('GET', event['analysisURL'], headers=headers)
response = json.loads(response.data)
status = response['data']['attributes']['status']
Enter fullscreen mode Exit fullscreen mode

The response from VirusTotal will look like this:

{
   "meta":{
      "file_info":{
         ...
      }
   },
   "data":{
      "attributes":{
         "date":1684306866,
         "status":"completed",
         "stats":{
            "harmless":0,
            "type-unsupported":16,
            "suspicious":0,
            "confirmed-timeout":0,
            "timeout":0,
            "failure":0,
            "malicious":0,
            "undetected":59
         },
         "results":{
            "Bkav":{
               "category":"undetected",
               "engine_name":"Bkav",
               "engine_version":"2.0.0.1",
               "result":"None",
               "method":"blacklist",
               "engine_update":"20230516"
            },
            "Lionic":{
               ...
            }
         }
      },
      "type":"analysis",
      "id":"YjdkNjE5MTM4MDViNGNlMmFhZTZkYmE2MzJmMDg1ZjM6MTY4NDMwNjg2Ng==",
      "links":{
        ...
      }
   }
}
Enter fullscreen mode Exit fullscreen mode

If the scan is not finished yet, the status message will be queued otherwise it will state completed ($.data.attributes.status). If the status is queued we will exit the function and return the status as failed.

if status != "queued":
    ...
else:
    return {
        'status': 'failed'
    }
Enter fullscreen mode Exit fullscreen mode

If the scan is completed we will tag the S3 object. To identify how to tag the S3 object, we take the number of scanners that flagged the file as suspicious or malicious. In my case, I just check if the malicious count is higher than 0 and would flag the file as malicious. You can adjust the logic to fit your business purpose.

suspicious = response['data']['attributes']['stats']['suspicious']
malicious = response['data']['attributes']['stats']['malicious']
if(malicious > 0):
    s3Client.put_object_tagging(
        Bucket= event['bucket'],
        Key= event['key'],
        Tagging={
            'TagSet': [
                {
                    'Key': 'malicious',
                    'Value': 'true'
                },
            ]
        }
    )
else:
    s3Client.put_object_tagging(
        Bucket= event['bucket'],
        Key= event['key'],
        Tagging={
            'TagSet': [
                {
                    'Key': 'malicious',
                    'Value': 'false'
                },
            ]
        }
    )
return {
    'status': 'sucess',
    'suspicious': suspicious,
    'malicious': malicious
}
Enter fullscreen mode Exit fullscreen mode

Create Step Functions πŸ”„

Now we can put everything together and create the Step Function logic. The Workflow editor makes it very easy. Just drag and drop the correct functions to reflect the below flow. You can adjust the wait time accordingly. Usually, a wait time of 60 seconds is sufficient.

Step Function setup

Create EventBridge rule ⏰

Let's create the EventBridge rule now. First, we need to define the event pattern. You can define the detail type and bucket:

{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["virustotal"]
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Next, we define the previously created Step Function as our target.

EventBridge Target

Every time we upload a new object to our S3 bucket the file will be scanned via VirusTotal and tagged accordingly.

Conclusion πŸŽ‰

Integrating VirusTotal into your AWS S3 bucket security strategy can provide an additional layer of protection. πŸ”’ VirusTotal's comprehensive threat detection capabilities, combined with the step-by-step guide we have provided, can help you detect and mitigate potential risks effectively. πŸ”

Remember to configure access control and permissions properly, implement encryption, enable logging and monitoring, conduct regular audits and vulnerability assessments, implement two-factor authentication, utilize AWS security services, educate users, and follow security best practices. These measures will enhance the overall security of your AWS S3 bucket files and reduce the risk of malicious intrusions. 🚧

By prioritizing data security and leveraging tools like VirusTotal, you can have peace of mind knowing that your AWS S3 bucket files are well-protected from potential threats.

Top comments (0)