diff --git a/ca_cdk_constructs/edge_services/waf_rule_templates.py b/ca_cdk_constructs/edge_services/waf_rule_templates.py index eefdc26..958eba4 100644 --- a/ca_cdk_constructs/edge_services/waf_rule_templates.py +++ b/ca_cdk_constructs/edge_services/waf_rule_templates.py @@ -179,3 +179,122 @@ def ip_rule_property( ) ), ) + +def restricted_uri_string_property( + scope, + name: str, + priority: int, + restricted_uri_string: str, + allowed_addresses: dict[str, list[str]] = {}, + count_only: bool = False, + cloud_watch_metrics_enabled: bool = False, +) -> waf.CfnWebACL.RuleProperty: + """A wrapper that returns an `aws_wafv2.CfnWebACL.RuleProperty` object to be used in a list and passed to the CfnWebACL instance. + + :param scope: `self`, scope in which this resource is defined. + :param name: The name of the rule. You can't change the name of a Rule after you create it. + :param priority: If you define more than one Rule in a WebACL , AWS WAF evaluates each request against the Rules in order based on the value of Priority . AWS WAF processes rules with lower priority first. The priorities don't need to be consecutive, but they must all be different. + :param restricted_uri_string: Access to any URL containing this string will be restricted to the IP addresses in allowed_addresses + :param allowed_addresses: A dictionary of strings that specifies zero or more IP addresses or blocks of IP addresses for "IPV4" and "IPV6". Defaults to {}. All addresses must be specified using Classless Inter-Domain Routing (CIDR) notation. + :param count_only: Set to True to only count and not BLOCK on matching requests, defaults to False. + :param cloud_watch_metrics_enabled: Set to True to enable logging via Cloudwatch, defaults to Fasle. + :return: aws_cdk.aws_wafv2.CfnWebACL.RuleProperty. + + :example: + ``` + restricted_uri_string_property( + scope, + name=f"{scope.stack_name}AllowToAccessUriString-helptoclaim", + priority=0, + restricted_uri_string="helptoclaim" + allowed_addresses={"IPV4": ["1.1.1.1/32"], "IPV6": ["2a00:1d40:11a5::111"]}, + count_only=True, + ) + """ + + # Count must be set to {} or None. + if count_only: + count_only = {} + else: + count_only = None + + # block unless counting + action = waf.CfnWebACL.RuleActionProperty( + count=count_only, block=waf.CfnWebACL.BlockActionProperty() + ) + + # keys of addresses dict can only be "IPV4" or "IPV6" - easiest way to + # check is by using set differences + if set(allowed_addresses.keys()) - set(list(["IPV4", "IPV6"])) != set(list([])): + raise AttributeError("keys for addresses dict must only be 'IPV4' or 'IPV6'!") + + # Need IPv4 and IPv6 IP sets + ipv4_arn = waf.CfnIPSet( + scope, + f"{name}IpSetIPV4", + addresses=allowed_addresses.get("IPV4", []), + description=f"{name}IpSetIPV4", + ip_address_version="IPV4", + scope="CLOUDFRONT", + ).attr_arn + + ipv6_arn = waf.CfnIPSet( + scope, + f"{name}IpSetIPV6", + addresses=allowed_addresses.get("IPV6", []), + description=f"{name}IpSetIPV6", + ip_address_version="IPV6", + scope="CLOUDFRONT", + ).attr_arn + + return waf.CfnWebACL.RuleProperty( + name=f"{name}Rule", + priority=priority, + visibility_config=waf.CfnWebACL.VisibilityConfigProperty( + cloud_watch_metrics_enabled=cloud_watch_metrics_enabled, + metric_name=f"{name}Metric", + sampled_requests_enabled=True, + ), + action=action, + statement=waf.CfnWebACL.StatementProperty( + and_statement=waf.CfnWebACL.AndStatementProperty( + # Block if uri contains our string AND ip address NOT in IPv4 OR IPv6 allowed lists. + statements=[ + waf.CfnWebACL.StatementProperty( + byte_match_statement=waf.CfnWebACL.ByteMatchStatementProperty( + field_to_match=waf.CfnWebACL.FieldToMatchProperty( + uri_path={} + ), + positional_constraint="CONTAINS", + text_transformations=[ + waf.CfnWebACL.TextTransformationProperty( + priority=0, type="URL_DECODE" + ) + ], + search_string=restricted_uri_string, + ), + ), + waf.CfnWebACL.StatementProperty( + not_statement=waf.CfnWebACL.NotStatementProperty( + statement=waf.CfnWebACL.StatementProperty( + or_statement=waf.CfnWebACL.OrStatementProperty( + statements=[ + waf.CfnWebACL.StatementProperty( + ip_set_reference_statement=waf.CfnWebACL.IPSetReferenceStatementProperty( + arn=ipv4_arn + ), + ), + waf.CfnWebACL.StatementProperty( + ip_set_reference_statement=waf.CfnWebACL.IPSetReferenceStatementProperty( + arn=ipv6_arn + ), + ), + ] + ) + ) + ) + ) + ] + ) + ) + ) diff --git a/ca_cdk_constructs/edge_services/waf_v2_builder.py b/ca_cdk_constructs/edge_services/waf_v2_builder.py index 54a49cf..a27ae09 100644 --- a/ca_cdk_constructs/edge_services/waf_v2_builder.py +++ b/ca_cdk_constructs/edge_services/waf_v2_builder.py @@ -4,6 +4,7 @@ from ca_cdk_constructs.edge_services.waf_rule_templates import ( managed_rule_group_property, ip_rule_property, + restricted_uri_string_property, ) @@ -22,6 +23,7 @@ class WafV2Builder: add_custom_rule: Adds a custom rule to the WAFv2 WebACL. add_managed_rule: Adds a managed rule to the WAFv2 WebACL. add_ip_rule: Adds an IP rule to the WAFv2 WebACL. + add_restricted_uri_string_rule: Adds a rule to the WAFv2 WebACL that restricts access to specific URIs to specific IP addresses get_rules: Returns the list of rules added to the WAFv2 WebACL. build: Builds the WAFv2 WebACL. @@ -152,6 +154,37 @@ def add_ip_rule( ) ) + def add_restricted_uri_string_rule( + self, + name: str, + priority: int, + restricted_uri_string: str, + allowed_addresses: dict[str, list[str]] = {}, + count_only: Optional[bool] = False, + cloud_watch_metrics_enabled: Optional[bool] = False, + ) -> None: + """ + Adds an IP rule to the WAFv2 WebACL. + + :param name: The name of the rule. + :param priority: The priority of the rule. + :param restricted_uri_string: Any URI containing this string will be blocked except to allowed_addresses + :param allowed_addresses: The addresses to use. A dictionary of address types to addresses. + :param count_only: Whether to only count the requests. Defaults to False + :param cloud_watch_metrics_enabled: Whether to enable CloudWatch metrics. Defaults to False. + """ + self.rules.append( + restricted_uri_string_property( + self.scope, + name, + priority, + restricted_uri_string, + allowed_addresses, + count_only, + cloud_watch_metrics_enabled, + ) + ) + def get_rules(self) -> list[waf.CfnWebACL.RuleProperty]: """ Returns the list of rules that have been added to the builder. diff --git a/tests/waf/test_waf_v2_builder.py b/tests/waf/test_waf_v2_builder.py index aeb684a..645c61d 100644 --- a/tests/waf/test_waf_v2_builder.py +++ b/tests/waf/test_waf_v2_builder.py @@ -116,3 +116,19 @@ def test_waf_v2_get_rules(waf_builder): rules = waf_builder.get_rules() assert len(rules) == 1 assert type(rules[0]) == aws_wafv2.CfnWebACL.RuleProperty + + +def test_waf_v2_restricted_uri_string_rule(waf_builder): + waf_builder.add_restricted_uri_string_rule( + name="RestrictAccessToURIFoo", + priority=0, + restricted_uri_string="Foo", + allowed_addresses={"IPV4": ["1.1.1.1/32"], "IPV6": []} + ) + waf_builder.build() + rules = waf_builder.get_rules() + # basically testing that we block with AND (check for path, NOT ( OR ( in allowed ipv4 or allowed ipv6 sets))) + assert type(rules[0]) == aws_wafv2.CfnWebACL.RuleProperty + assert type(rules[0].statement.and_statement.statements[0].byte_match_statement) == aws_wafv2.CfnWebACL.ByteMatchStatementProperty + assert type(rules[0].statement.and_statement.statements[1].not_statement.statement.or_statement.statements[0].ip_set_reference_statement) ==aws_wafv2.CfnWebACL.IPSetReferenceStatementProperty + assert type(rules[0].statement.and_statement.statements[1].not_statement.statement.or_statement.statements[1].ip_set_reference_statement) ==aws_wafv2.CfnWebACL.IPSetReferenceStatementProperty