DEV Community

Atsushi Suzuki
Atsushi Suzuki

Posted on

Automating Security Hub Findings Summary with Bedrock, Slack Notifications, and Zenhub Task Management

We had been operating a system where Config and GuardDuty security notifications were sent via email. However, it took considerable time to understand the issues and investigate them because the notifications were not immediately actionable.

While searching for a solution, I came across a book at a tech book fair titled "Maximizing the Use of Security Hub" that introduced a system for summarizing Security Hub findings in Japanese using Lambda and Bedrock. This system also included suggested countermeasures and automated notifications. Inspired by this approach, I decided to implement it myself.

Security Hub Book Reference

Implementation

Enabling Security Hub

Before writing any code, you need to enable Security Hub. Security Hub consolidates findings from Config, GuardDuty, and Trusted Advisor, allowing you to manage them in one place. It can also trigger EventBridge events.

Slack API

To send notifications to Slack, you need to create a new app in the Slack API management console and enable Incoming Webhooks to obtain a Webhook URL. This URL will be set as the environment variable SLACK_WEBHOOK_URL when creating the Lambda function.

Slack API Screenshot

Project Structure

Here is the project structure:

- Dockerfile
- lambda_function.py
- requirements.txt
- README.md
Enter fullscreen mode Exit fullscreen mode

Full Code

The Lambda function script uses Bedrock to interpret and summarize Security Hub findings. The results are sent as Slack notifications and GitHub Issues for Zenhub task management.

# lambda_function.py
import json
import os
import boto3
import requests
from botocore.exceptions import ClientError


SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
BEDROCK_MODEL_ID = os.environ['BEDROCK_MODEL_ID']
INCLUDED_SEVERITIES = os.environ.get(
    'INCLUDED_SEVERITIES', 'CRITICAL,HIGH,MEDIUM').split(',')
GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
GITHUB_REPO = os.environ['GITHUB_REPO']
MAX_TOKENS = int(os.environ.get('MAX_TOKENS', 1000))

if not SLACK_WEBHOOK_URL or not BEDROCK_MODEL_ID or not GITHUB_TOKEN or not GITHUB_REPO:
    raise ValueError("必要な環境変数が設定されていません。")

bedrock = boto3.client('bedrock-runtime', region_name='ap-northeast-1')


def lambda_handler(event, context):
    print(f"Received event: {json.dumps(event)}")

    findings = event.get('detail', {}).get('findings', [])
    if not findings:
        print("No findings to process.")
        return {
            'statusCode': 200,
            'body': json.dumps('No findings in the event.')
        }

    results = []

    for finding in findings:
        interpreted_finding, severity = interpret_finding(finding)

        # 環境変数で指定された重要度に含まれる場合のみ結果に追加
        if severity in INCLUDED_SEVERITIES:
            results.append(interpreted_finding)

    if results:
        combined_message = "\n\n".join(results)

        issue_title = generate_title_from_summary(combined_message)

        send_slack_notification(
            issue_title,
            combined_message
        )

        create_zenhub_ticket(
            issue_title,
            combined_message
        )

    return {
        'statusCode': 200,
        'body': json.dumps('Processing complete')
    }


def interpret_finding(finding) -> tuple[str, str]:
    system_message = (
        "あなたはセキュリティ専門家です。Security Hub の検出結果を解釈し、日本語で開発者向けに簡潔な説明と推奨される対応方針を提示してください。"
    )

    human_message = (
        f"以下のSecurity Hub Findingを解釈し、開発者向けに簡潔な説明と推奨される対応方針を提示してください:\n\n"
        f"{json.dumps(finding, indent=2)}\n\n"
        "回答は以下の形式で提供してください:\n"
        "- 検出内容の要約:\n"
        "- 重要度: [CRITICAL/HIGH/MEDIUM/LOW]\n"
        "- 影響:\n"
        "- 推奨される対応:\n"
    )

    try:
        response = bedrock.invoke_model(
            modelId=BEDROCK_MODEL_ID,
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": MAX_TOKENS,
                "system": system_message,
                "messages": [{"role": "user", "content": human_message}]
            }),
            contentType="application/json",
            accept="application/json"
        )
        response_body = json.loads(response['body'].read())
        print("Full Bedrock Response:", json.dumps(response_body, indent=2))

        # contentフィールドからtextタイプを抽出
        content_items = response_body.get("content", [])
        extracted_texts = [
            item.get("text", "") for item in content_items if item.get("type") == "text"
        ]

        # 抽出されたテキストを結合
        interpreted_text = "\n".join(extracted_texts).strip()

        severity = "LOW"  # デフォルトの重要度
        if "重要度:" in interpreted_text:
            for line in interpreted_text.splitlines():
                if line.startswith("- 重要度:"):
                    severity = line.replace("- 重要度:", "").strip().split()[0]
                    break

        print(
            f"Bedrock interpretation: {interpreted_text}, Severity: {severity}")
        return interpreted_text, severity

    except ClientError as e:
        print(f"Error calling Bedrock: {e}")
        return "Bedrockの呼び出しに失敗しました。原文を確認してください。"


def generate_title_from_summary(summary: str) -> str:
    lines = summary.splitlines()
    for idx, line in enumerate(lines):
        if line.startswith("- 検出内容の要約:"):
            # 要約の次の行を取得し、最初の句点までを抽出
            if idx + 1 < len(lines):
                first_sentence = lines[idx + 1].split("")[0]
                return first_sentence.strip() if first_sentence else "🔐 Security Hubの検知結果"
    return "🔐 Security Hubの検知結果"


def send_slack_notification(title: str, detail: str) -> None:
    payload = {
        'attachments': [
            {
                'color': '#36a64f',
                'pretext': title,
                'text': detail
            }
        ]
    }
    try:
        response = requests.post(SLACK_WEBHOOK_URL, data=json.dumps(payload))
    except requests.exceptions.RequestException as e:
        print(e)
    else:
        print(response.status_code)


def create_zenhub_ticket(title: str, description: str) -> None:
    payload = {
        "title": title,
        "body": description
    }
    headers = {
        "Authorization": f"token {GITHUB_TOKEN}",
        "Content-Type": "application/json"
    }

    try:
        url = f"https://api.github.com/repos/{GITHUB_REPO}/issues"
        response = requests.post(url, json=payload, headers=headers)
        response.raise_for_status()
        print(
            f"GitHub Issue created successfully: {response.json().get('html_url')}")
    except requests.exceptions.RequestException as e:
        print(f"Error creating GitHub Issue: {e}")
Enter fullscreen mode Exit fullscreen mode
# requirements.txt
boto3
requests
Enter fullscreen mode Exit fullscreen mode

Cost Adjustments via Environment Variables

We used environment variables to configure Bedrock's model (BEDROCK_MODEL_ID), the maximum token count for output (MAX_TOKENS), and the severity levels of findings to include (INCLUDED_SEVERITIES). This allows for flexible cost management.

Initially, the settings were as follows, costing around $10/day:

  • BEDROCK_MODEL_ID: anthropic.claude-3-5-sonnet-20240620-v1:0
  • INCLUDED_SEVERITIES: CRITICAL,HIGH,MEDIUM
  • MAX_TOKENS: 1000

After tweaking the configuration, the cost was reduced to about $1.5/day:

  • BEDROCK_MODEL_ID: anthropic.claude-3-haiku-20240307-v1:0
  • INCLUDED_SEVERITIES: CRITICAL,HIGH
  • MAX_TOKENS: 500

Creating Zenhub Tickets

The create_zenhub_ticket function uses Security Hub findings to create GitHub Issues. To do this, you need to generate a Personal Access Token with the repo scope and set it as the GITHUB_TOKEN environment variable. Specify the repository name in the GITHUB_REPO environment variable in the format <organization>/<repository>.

Generate GitHub Token

GitHub Token Screenshot

Deploying Lambda with Docker

Since we are using external libraries, the Lambda function is deployed via Docker. Below is the Dockerfile:

FROM public.ecr.aws/lambda/python:3.13

WORKDIR /var/task

COPY requirements.txt ./
COPY lambda_function.py ./

RUN pip install -r requirements.txt

CMD ["lambda_function.lambda_handler"]
Enter fullscreen mode Exit fullscreen mode

Use the following commands to build and push the image to ECR:

docker buildx build --platform linux/amd64 -f Dockerfile -t <account-id>.dkr.ecr.ap-northeast-1.amazonaws.com/securityhub-bedrock-slack:latest .
Enter fullscreen mode Exit fullscreen mode
aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin <account-id>.dkr.ecr.ap-northeast-1.amazonaws.com
Enter fullscreen mode Exit fullscreen mode
docker push <account-id>.dkr.ecr.ap-northeast-1.amazonaws.com/securityhub-bedrock-slack
Enter fullscreen mode Exit fullscreen mode

Select the container image that was pushed when the Lambda was created.

Image description

After Lambda creation, extend the timeout period to 1 minute (because the default of 3 seconds would time out).

EventBridge

Create a new rule in EventBridge and select Security Hub Findings - Imported as the event type.

EventBridge Rule Screenshot

Set the newly created Lambda function as the target.

EventBridge Target Screenshot

Slack Notifications and Zenhub Issues

Here is an example of a Slack notification:

Slack Notification Screenshot

And here is an example of a Zenhub Issue:

Zenhub Issue Screenshot

Conclusion

By integrating Security Hub with Bedrock, Slack, and Zenhub, we have successfully automated the process of summarizing findings and creating actionable tasks. This system has significantly reduced the time required to analyze security notifications and improved our response efficiency. Additionally, the flexibility of Bedrock’s configuration allows us to balance cost and output quality based on our needs.

Top comments (2)

Collapse
 
vikasbanage profile image
vikasbanage

Nice integration Atsushi !!!

Collapse
 
suzuki0430 profile image
Atsushi Suzuki

Thanks!