DEV Community

cloudy
cloudy

Posted on

AWS Security Hub Findings to Azure Blob

AWS Security Hub provides comprehensive view of your high-priority security alerts and your compliance status across AWS accounts.

AWS Security Hub collects data from Guard Duty, Inspector, Config, Macie, Firewall Manager, Systems Manager and other connected partner services to provide a single view of the resources in your account.

The integration with AWS Organizations allows you to automatically enable Security Hub and its automated security checks in any existing and newly created accounts in the organization. This enable you to centrally view security findings form all the member aws accounts.

We had a requirement to visualise the finding using PowerBI for this I created the following to send the existing findings to Azure Blob Storage.

Once the security findings are sent to Azure Blob storage it can be processed and consumed within PowerBI.

Following requires:

  • Azure Blobstorage container
  • Azure Blob SAS URL for secure authentication
import json
import boto3
import logging
import os
from datetime import datetime, timedelta
from azure.storage.blob import BlobServiceClient, generate_account_sas, ResourceTypes, AccountSasPermissions, BlobClient, ContainerClient


date = datetime.now().strftime("%Y%m%d%H%M%S")

filename = "findings-"+date+".json"

connect_str = 'BlobEndpoint=https://url'
container = 'containername'

blob_client = BlobClient.from_connection_string(conn_str=connect_str, container_name=container, blob_name =filename)


securityhub_client = boto3.client('securityhub', region_name='ap-southeast-2')

_filter = {
    'ComplianceStatus': [
        {
            'Value': 'FAILED',
            'Comparison': 'EQUALS'
        }
    ],
    'RecordState': [
        {
            'Value': 'ACTIVE',
            'Comparison': 'EQUALS'
        }
    ],
}


results = []
token = ''
while True:
    response = securityhub_client.get_findings(
        NextToken=token,
        Filters=_filter
    )

    results.extend(response["Findings"])

    token = response.get('NextToken')

    if token == '' or token == None:
        break


print (json.dumps(results, indent=4))


blob_client.upload_blob(json.dumps(results))

Enter fullscreen mode Exit fullscreen mode

Top comments (0)