Skip to content

Commit

Permalink
Merge pull request #41 from citizensadvice/OPS-5438_waf_rule_to_restr…
Browse files Browse the repository at this point in the history
…ict_paths_to_specific_ips

OPS-5438: feat(waf builder) rule to restrict uri to allowed ips
  • Loading branch information
ruthwells authored Nov 6, 2023
2 parents b723c7e + 9fe9479 commit eaedbac
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 0 deletions.
119 changes: 119 additions & 0 deletions ca_cdk_constructs/edge_services/waf_rule_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,122 @@ def ip_rule_property(
)
),
)

def restricted_uri_string_property(
scope,
name: str,
priority: int,
restricted_uri_string: str,
allowed_addresses: dict[str, list[str]] = {},
count_only: bool = False,
cloud_watch_metrics_enabled: bool = False,
) -> waf.CfnWebACL.RuleProperty:
"""A wrapper that returns an `aws_wafv2.CfnWebACL.RuleProperty` object to be used in a list and passed to the CfnWebACL instance.
:param scope: `self`, scope in which this resource is defined.
:param name: The name of the rule. You can't change the name of a Rule after you create it.
:param priority: If you define more than one Rule in a WebACL , AWS WAF evaluates each request against the Rules in order based on the value of Priority . AWS WAF processes rules with lower priority first. The priorities don't need to be consecutive, but they must all be different.
:param restricted_uri_string: Access to any URL containing this string will be restricted to the IP addresses in allowed_addresses
:param allowed_addresses: A dictionary of strings that specifies zero or more IP addresses or blocks of IP addresses for "IPV4" and "IPV6". Defaults to {}. All addresses must be specified using Classless Inter-Domain Routing (CIDR) notation.
:param count_only: Set to True to only count and not BLOCK on matching requests, defaults to False.
:param cloud_watch_metrics_enabled: Set to True to enable logging via Cloudwatch, defaults to Fasle.
:return: aws_cdk.aws_wafv2.CfnWebACL.RuleProperty.
:example:
```
restricted_uri_string_property(
scope,
name=f"{scope.stack_name}AllowToAccessUriString-helptoclaim",
priority=0,
restricted_uri_string="helptoclaim"
allowed_addresses={"IPV4": ["1.1.1.1/32"], "IPV6": ["2a00:1d40:11a5::111"]},
count_only=True,
)
"""

# Count must be set to {} or None.
if count_only:
count_only = {}
else:
count_only = None

# block unless counting
action = waf.CfnWebACL.RuleActionProperty(
count=count_only, block=waf.CfnWebACL.BlockActionProperty()
)

# keys of addresses dict can only be "IPV4" or "IPV6" - easiest way to
# check is by using set differences
if set(allowed_addresses.keys()) - set(list(["IPV4", "IPV6"])) != set(list([])):
raise AttributeError("keys for addresses dict must only be 'IPV4' or 'IPV6'!")

# Need IPv4 and IPv6 IP sets
ipv4_arn = waf.CfnIPSet(
scope,
f"{name}IpSetIPV4",
addresses=allowed_addresses.get("IPV4", []),
description=f"{name}IpSetIPV4",
ip_address_version="IPV4",
scope="CLOUDFRONT",
).attr_arn

ipv6_arn = waf.CfnIPSet(
scope,
f"{name}IpSetIPV6",
addresses=allowed_addresses.get("IPV6", []),
description=f"{name}IpSetIPV6",
ip_address_version="IPV6",
scope="CLOUDFRONT",
).attr_arn

return waf.CfnWebACL.RuleProperty(
name=f"{name}Rule",
priority=priority,
visibility_config=waf.CfnWebACL.VisibilityConfigProperty(
cloud_watch_metrics_enabled=cloud_watch_metrics_enabled,
metric_name=f"{name}Metric",
sampled_requests_enabled=True,
),
action=action,
statement=waf.CfnWebACL.StatementProperty(
and_statement=waf.CfnWebACL.AndStatementProperty(
# Block if uri contains our string AND ip address NOT in IPv4 OR IPv6 allowed lists.
statements=[
waf.CfnWebACL.StatementProperty(
byte_match_statement=waf.CfnWebACL.ByteMatchStatementProperty(
field_to_match=waf.CfnWebACL.FieldToMatchProperty(
uri_path={}
),
positional_constraint="CONTAINS",
text_transformations=[
waf.CfnWebACL.TextTransformationProperty(
priority=0, type="URL_DECODE"
)
],
search_string=restricted_uri_string,
),
),
waf.CfnWebACL.StatementProperty(
not_statement=waf.CfnWebACL.NotStatementProperty(
statement=waf.CfnWebACL.StatementProperty(
or_statement=waf.CfnWebACL.OrStatementProperty(
statements=[
waf.CfnWebACL.StatementProperty(
ip_set_reference_statement=waf.CfnWebACL.IPSetReferenceStatementProperty(
arn=ipv4_arn
),
),
waf.CfnWebACL.StatementProperty(
ip_set_reference_statement=waf.CfnWebACL.IPSetReferenceStatementProperty(
arn=ipv6_arn
),
),
]
)
)
)
)
]
)
)
)
33 changes: 33 additions & 0 deletions ca_cdk_constructs/edge_services/waf_v2_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ca_cdk_constructs.edge_services.waf_rule_templates import (
managed_rule_group_property,
ip_rule_property,
restricted_uri_string_property,
)


Expand All @@ -22,6 +23,7 @@ class WafV2Builder:
add_custom_rule: Adds a custom rule to the WAFv2 WebACL.
add_managed_rule: Adds a managed rule to the WAFv2 WebACL.
add_ip_rule: Adds an IP rule to the WAFv2 WebACL.
add_restricted_uri_string_rule: Adds a rule to the WAFv2 WebACL that restricts access to specific URIs to specific IP addresses
get_rules: Returns the list of rules added to the WAFv2 WebACL.
build: Builds the WAFv2 WebACL.
Expand Down Expand Up @@ -152,6 +154,37 @@ def add_ip_rule(
)
)

def add_restricted_uri_string_rule(
self,
name: str,
priority: int,
restricted_uri_string: str,
allowed_addresses: dict[str, list[str]] = {},
count_only: Optional[bool] = False,
cloud_watch_metrics_enabled: Optional[bool] = False,
) -> None:
"""
Adds an IP rule to the WAFv2 WebACL.
:param name: The name of the rule.
:param priority: The priority of the rule.
:param restricted_uri_string: Any URI containing this string will be blocked except to allowed_addresses
:param allowed_addresses: The addresses to use. A dictionary of address types to addresses.
:param count_only: Whether to only count the requests. Defaults to False
:param cloud_watch_metrics_enabled: Whether to enable CloudWatch metrics. Defaults to False.
"""
self.rules.append(
restricted_uri_string_property(
self.scope,
name,
priority,
restricted_uri_string,
allowed_addresses,
count_only,
cloud_watch_metrics_enabled,
)
)

def get_rules(self) -> list[waf.CfnWebACL.RuleProperty]:
"""
Returns the list of rules that have been added to the builder.
Expand Down
16 changes: 16 additions & 0 deletions tests/waf/test_waf_v2_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,19 @@ def test_waf_v2_get_rules(waf_builder):
rules = waf_builder.get_rules()
assert len(rules) == 1
assert type(rules[0]) == aws_wafv2.CfnWebACL.RuleProperty


def test_waf_v2_restricted_uri_string_rule(waf_builder):
waf_builder.add_restricted_uri_string_rule(
name="RestrictAccessToURIFoo",
priority=0,
restricted_uri_string="Foo",
allowed_addresses={"IPV4": ["1.1.1.1/32"], "IPV6": []}
)
waf_builder.build()
rules = waf_builder.get_rules()
# basically testing that we block with AND (check for path, NOT ( OR ( in allowed ipv4 or allowed ipv6 sets)))
assert type(rules[0]) == aws_wafv2.CfnWebACL.RuleProperty
assert type(rules[0].statement.and_statement.statements[0].byte_match_statement) == aws_wafv2.CfnWebACL.ByteMatchStatementProperty
assert type(rules[0].statement.and_statement.statements[1].not_statement.statement.or_statement.statements[0].ip_set_reference_statement) ==aws_wafv2.CfnWebACL.IPSetReferenceStatementProperty
assert type(rules[0].statement.and_statement.statements[1].not_statement.statement.or_statement.statements[1].ip_set_reference_statement) ==aws_wafv2.CfnWebACL.IPSetReferenceStatementProperty

0 comments on commit eaedbac

Please sign in to comment.