DEV Community

Tomasz Szuster
Tomasz Szuster

Posted on • Updated on

AWS WAF with CDK simple way, L2?

Below content is for those of you that want to use more L2 CDK constructs when creating IaC for AWS WAF.

Short intro, the current state of WAF L2's for CDK is that they mostly don't exist, thus having easy to use methods is far-far away.

Although closer than you may think.

Let's share what I've created sometime ago.

Python WAF class to manage more or less sensible values for starting your CDK project

class WAFv2(Construct):
    """Implement a v2 WAF where logs are sent to the AWS CloudWatch logs."""

    # pylint: disable=W0235
    def __init__(self, scope: Construct, construct_id: str) -> None:
        super().__init__(scope, construct_id)

    @staticmethod
    def __aws_account_takeover_prevention(aws_account_takeover_prevention):
        return wafv2.CfnWebACL.RuleProperty(
            name="AWS-AWSManagedRulesATPRuleSet",
            priority=6,
            override_action=wafv2.CfnWebACL.OverrideActionProperty(none={}),
            statement=wafv2.CfnWebACL.StatementProperty(
                managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty(
                    name="AWSManagedRulesATPRuleSet",
                    vendor_name="AWS",
                    managed_rule_group_configs=[
                        wafv2.CfnWebACL.ManagedRuleGroupConfigProperty(
                            login_path=aws_account_takeover_prevention["login_path"],  # type: ignore
                        ),
                        wafv2.CfnWebACL.ManagedRuleGroupConfigProperty(
                            password_field=wafv2.CfnWebACL.FieldIdentifierProperty(
                                identifier=aws_account_takeover_prevention["password_field"]  # type: ignore
                            ),
                        ),
                        wafv2.CfnWebACL.ManagedRuleGroupConfigProperty(
                            payload_type="FORM_ENCODED",
                        ),
                        wafv2.CfnWebACL.ManagedRuleGroupConfigProperty(
                            username_field=wafv2.CfnWebACL.FieldIdentifierProperty(
                                identifier=aws_account_takeover_prevention["username_field"]  # type: ignore
                            ),
                        ),
                    ],
                )
            ),
            visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
                cloud_watch_metrics_enabled=True,
                metric_name="AWS-AWSManagedRulesATPRuleSet",
                sampled_requests_enabled=True,
            ),
        )

    @staticmethod
    def __aws_sqli_rule():
        return wafv2.CfnWebACL.RuleProperty(
            name="AWS-AWSManagedRulesSQLiRuleSet",
            priority=5,
            override_action=wafv2.CfnWebACL.OverrideActionProperty(none={}),
            statement=wafv2.CfnWebACL.StatementProperty(
                managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty(
                    name="AWSManagedRulesSQLiRuleSet",
                    vendor_name="AWS",
                )
            ),
            visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
                cloud_watch_metrics_enabled=True,
                metric_name="AWS-AWSManagedRulesSQLiRuleSet",
                sampled_requests_enabled=True,
            ),
        )

    @staticmethod
    def __aws_bad_inputs_rule():
        return wafv2.CfnWebACL.RuleProperty(
            name="AWS-AWSManagedRulesKnownBadInputsRuleSet",
            priority=4,
            override_action=wafv2.CfnWebACL.OverrideActionProperty(none={}),
            statement=wafv2.CfnWebACL.StatementProperty(
                managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty(
                    name="AWSManagedRulesKnownBadInputsRuleSet",
                    vendor_name="AWS",
                )
            ),
            visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
                cloud_watch_metrics_enabled=True,
                metric_name="AWS-AWSManagedRulesKnownBadInputsRuleSet",
                sampled_requests_enabled=True,
            ),
        )

    @staticmethod
    def __rate_list(rate_value: int):
        return wafv2.CfnWebACL.RuleProperty(
            name=f"Custom-RateLimit{rate_value}",
            priority=1,
            action=wafv2.CfnWebACL.RuleActionProperty(block=wafv2.CfnWebACL.BlockActionProperty()),
            statement=wafv2.CfnWebACL.StatementProperty(
                rate_based_statement=wafv2.CfnWebACL.RateBasedStatementProperty(
                    aggregate_key_type="IP",
                    limit=rate_value,
                )
            ),
            visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
                cloud_watch_metrics_enabled=True,
                metric_name=f"Custom-RateLimit{rate_value}",
                sampled_requests_enabled=True,
            ),
        )

    @staticmethod
    def __aws_ip_reputation_list():
        return wafv2.CfnWebACL.RuleProperty(
            name="AWS-AWSManagedRulesAmazonIpReputationList",
            priority=0,
            override_action=wafv2.CfnWebACL.OverrideActionProperty(none={}),
            statement=wafv2.CfnWebACL.StatementProperty(
                managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty(
                    name="AWSManagedRulesAmazonIpReputationList",
                    vendor_name="AWS",
                )
            ),
            visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
                cloud_watch_metrics_enabled=True,
                metric_name="AWS-AWSManagedRulesAmazonIpReputationList",
                sampled_requests_enabled=True,
            ),
        )

    @staticmethod
    def __aws_common_rule(aws_common_excluded_rules):
        return wafv2.CfnWebACL.RuleProperty(
            name="AWS-AWSManagedRulesCommonRuleSet",
            priority=2,
            override_action=wafv2.CfnWebACL.OverrideActionProperty(none={}),
            statement=wafv2.CfnWebACL.StatementProperty(
                managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty(
                    excluded_rules=aws_common_excluded_rules,
                    name="AWSManagedRulesCommonRuleSet",
                    vendor_name="AWS",
                )
            ),
            visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
                cloud_watch_metrics_enabled=True,
                metric_name="AWS-AWSManagedRulesCommonRuleSet",
                sampled_requests_enabled=True,
            ),
        )

    @staticmethod
    def __aws_anonymous_list():
        return wafv2.CfnWebACL.RuleProperty(
            name="AWS-AWSManagedRulesAnonymousIpList",
            priority=3,
            override_action=wafv2.CfnWebACL.OverrideActionProperty(none={}),
            statement=wafv2.CfnWebACL.StatementProperty(
                managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty(
                    name="AWSManagedRulesAnonymousIpList",
                    vendor_name="AWS",
                )
            ),
            visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
                cloud_watch_metrics_enabled=True,
                metric_name="AWS-AWSManagedRulesAnonymousIpList",
                sampled_requests_enabled=True,
            ),
        )

    def web_acl(
        self,
        name: str,
        rate_value: Union[int, None],
        aws_common_rule: bool = True,
        aws_common_rule_ignore_list: Optional[list] = None,
        aws_anony_list: bool = False,
        aws_bad_inputs_rule: bool = False,
        aws_sqli_rule: bool = False,
        aws_account_takeover_prevention: Union[bool, Dict[Any, Any]] = False,
        waf_scope: Literal["REGIONAL", "CLOUDFRONT"] = "REGIONAL",
    ) -> wafv2.CfnWebACL:
        """Create AWS WAF with opinionated default rules.

        The minimal configuration will create WAF ACL with aws_reputation_list

        :param aws_common_rule_ignore_list: List of strings that contain rules to be ignored.
        :param aws_account_takeover_prevention: The definition for account takeover prevention rule
        :param aws_sqli_rule: The WAF managed rule by AWS AWS-AWSManagedRulesSQLiRuleSet
        :param aws_bad_inputs_rule: The WAF managed rule by AWS AWS-AWSManagedRulesKnownBadInputsRuleSet
        :param aws_anony_list: The WAF managed rule by AWS AWS-AWSManagedRulesAnonymousIpList
        :param aws_common_rule: The WAF managed rule by AWS AWS-AWSManagedRulesCommonRuleSet
        :param rate_value: The number of packets per seconds for custom rate limiting
        :param waf_scope: The WAF scope, it could be regional for API GW, Cognito and ALB or
        CLOUDFRONT for cloudfront distributions
        :param name: Then name of WAF ACL
        :return:
        """

        # 0. Reputation List. The first rule is enabled by default
        aws_ip_rep_list = self.__aws_ip_reputation_list()
        waf_rules = [aws_ip_rep_list]

        if rate_value:
            # 1. Custom Rate Limit
            rate_list = self.__rate_list(rate_value)
            waf_rules.append(rate_list)

        # 2. Common Rule
        aws_common_excluded_rules = None
        if aws_common_rule_ignore_list:
            aws_common_excluded_rules = [
                wafv2.CfnWebACL.ExcludedRuleProperty(name=rule_name) for rule_name in aws_common_rule_ignore_list
            ]
        if aws_common_rule:
            aws_common_rule = self.__aws_common_rule(aws_common_excluded_rules)
            waf_rules.append(aws_common_rule)

        if aws_anony_list:
            # 3. AnonymousIpList
            aws_anony_list = self.__aws_anonymous_list()
            waf_rules.append(aws_anony_list)

        if aws_bad_inputs_rule:
            # 4. Known Bad Inputs Rule
            aws_bad_inputs_rule = self.__aws_bad_inputs_rule()
            waf_rules.append(aws_bad_inputs_rule)

        if aws_sqli_rule:
            # 5. SQLi Rule
            aws_sqli_rule = self.__aws_sqli_rule()
            waf_rules.append(aws_sqli_rule)

        if aws_account_takeover_prevention:
            # 6. Account takeover prevention
            aws_account_takeover_prevention_rule = self.__aws_account_takeover_prevention(
                aws_account_takeover_prevention
            )
            waf_rules.append(aws_account_takeover_prevention_rule)

        return wafv2.CfnWebACL(
            self,
            "WAF ACL",
            default_action=wafv2.CfnWebACL.DefaultActionProperty(allow={}),
            scope=waf_scope,
            visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
                cloud_watch_metrics_enabled=True, metric_name="web-acl", sampled_requests_enabled=True
            ),
            name=name,
            rules=waf_rules,
        )

    def web_acl_association(self, resource_arn, web_acl_arn: str) -> wafv2.CfnWebACLAssociation:
        """Associate AWS Resource with WAF.

        :param resource_arn: The ARN of resource that will be protected by WAF
        :param web_acl_arn: The WEB Application Access Control List ARN
        :return: wafv2.CfnWebACLAssociation
        """
        return wafv2.CfnWebACLAssociation(self, "ACLAssociation", resource_arn=resource_arn, web_acl_arn=web_acl_arn)

    def web_acl_log(self, web_acl_arn: str, log_group_name: str) -> wafv2.CfnLoggingConfiguration:
        """Configure provided a log group as a target for WAF log destination.

        :param web_acl_arn: The WEB Application Access Control List ARN
        :param log_group_name: The name of log group
        :return: AWS CDK WAFv2.CfnLoggingConfiguration
        """
        log_group = logs.LogGroup(
            self,
            id="web_acl_log_group",
            log_group_name=log_group_name,
            retention=logs.RetentionDays.ONE_WEEK,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        return wafv2.CfnLoggingConfiguration(
            self,
            id="web_acl_cfn_log_configuration",
            log_destination_configs=[
                cdk.Stack.of(self).format_arn(
                    arn_format=cdk.ArnFormat.COLON_RESOURCE_NAME,
                    service="logs",
                    resource="log-group",
                    resource_name=log_group.log_group_name,
                )
            ],
            resource_arn=web_acl_arn,
        )

Enter fullscreen mode Exit fullscreen mode

What we can have from it?
Long story short, 5 of AWS managed rules and 1 custom rate limit

  • AWSManagedRulesAmazonIpReputationList
  • AWSManagedRulesCommonRuleSet
  • AWSManagedRulesAnonymousIpList
  • AWSManagedRulesKnownBadInputsRuleSet
  • AWSManagedRulesSQLiRuleSet
  • AWSManagedRulesATPRuleSet
  • Custom-RateLimit

Example use:

alb_wafv2_construct = WAFv2(self, construct_id="wafv2_construct")

alb_wafv2_acl = alb_wafv2_construct.web_acl(
            name=f"{standard_prefix}-wafv2",
            rate_value=500,
            aws_common_rule=True,
            aws_sqli_rule=True,
            aws_anony_list=True,
            aws_bad_inputs_rule=True,
            aws_account_takeover_prevention={ 
                 "login_path": "/important_page/login",
                 "payload_type": "FORM_ENCODED",
                 "password_field": "PasswordField",
                 "username_field": "LoginField",
             },
        )

alb_wafv2_construct.web_acl_association(resource_arn=alb.load_balancer_arn, web_acl_arn=alb_wafv2_acl.attr_arn)

alb_wafv2_log_group = alb_wafv2_construct.web_acl_log(log_group_name=f"aws-waf-logs-{standard_prefix}-wafv2")

alb_wafv2_construct.web_acl_log_config(log_group=alb_wafv2_log_group, web_acl_arn=alb_wafv2_acl.attr_arn)

Enter fullscreen mode Exit fullscreen mode

And more or less you have a good starting point with WAF ACL attached to your ALB.

Top comments (2)

Collapse
 
enrialonso profile image
Enrique Alonso

I think it's important to emphasize in this post that the logs must have a prefix in this case starts with "aws-waf-logs-", as explained in the AWS documentation. If you can include this information in the post, it would be helpful for other people who come across it. Thank you for sharing!

Collapse
 
airmonitor profile image
Tomasz Szuster

Hey @enrialonso

Yup, good finding.
This was already in the example code thre

alb_wafv2_log_group = alb_wafv2_construct.web_acl_log(log_group_name=f"aws-waf-logs-{standard_prefix}-wafv2")
Enter fullscreen mode Exit fullscreen mode