Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve minimize performance #427

Merged
merged 1 commit into from
Aug 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions policy_sentry/writing/minimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,59 +21,56 @@
* For managed policies: You can add up to 10 managed policies to a user, role, or group.
* The size of each managed policy cannot exceed 6,144 characters.
"""
from __future__ import annotations

import logging
import functools

# from policyuniverse.expander_minimizer import _get_prefixes_for_action

logger = logging.getLogger(__name__)


# Borrowed from policyuniverse to reduce size
# https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/expander_minimizer.py#L45
@functools.lru_cache(maxsize=1024)
def _get_prefixes_for_action(action):
def _get_prefixes_for_action(action: str) -> list[str]:
"""
:param action: iam:cat
:return: [ "iam:", "iam:c", "iam:ca", "iam:cat" ]
"""
(technology, permission) = action.split(":")
retval = ["{}:".format(technology)]
phrase = ""
for char in permission:
newphrase = "{}{}".format(phrase, char)
retval.append("{}:{}".format(technology, newphrase))
phrase = newphrase
technology, permission = action.split(":")
retval = [f"{technology}:{permission[:i]}" for i in range(len(permission) + 1)]

return retval


# Adapted version of policyuniverse's _get_denied_prefixes_from_desired, here:
# https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/expander_minimizer.py#L101
def get_denied_prefixes_from_desired(
desired_actions, all_actions
): # pylint: disable=missing-function-docstring
desired_actions: list[str], all_actions: set[str]
) -> set[str]: # pylint: disable=missing-function-docstring
"""
Adapted version of policyuniverse's _get_denied_prefixes_from_desired, here: https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/expander_minimizer.py#L101
"""
denied_actions = all_actions.difference(desired_actions)
denied_prefixes = set()
for denied_action in denied_actions:
for denied_prefix in _get_prefixes_for_action(denied_action):
denied_prefixes.add(denied_prefix)
denied_prefixes = {
denied_prefix
for denied_action in denied_actions
for denied_prefix in _get_prefixes_for_action(denied_action)
}

return denied_prefixes


# Adapted version of policyuniverse's _check_permission_length. We are commenting out the skipping prefix message
# https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/expander_minimizer.py#L111
def check_min_permission_length(
permission, minchars=None
): # pylint: disable=missing-function-docstring
permission: str, minchars: int | None = None
) -> bool: # pylint: disable=missing-function-docstring
"""
Adapted version of policyuniverse's _check_permission_length. We are commenting out the skipping prefix message
https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/expander_minimizer.py#L111
"""
if minchars and len(permission) < int(minchars) and permission != "":
if minchars and permission and len(permission) < int(minchars):
# print(
# "Skipping prefix {} because length of {}".format(
# permission, len(permission)
Expand All @@ -87,8 +84,8 @@ def check_min_permission_length(
# This is a condensed version of policyuniverse's minimize_statement_actions, changed for our purposes.
# https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/expander_minimizer.py#L123
def minimize_statement_actions(
desired_actions, all_actions, minchars=None
): # pylint: disable=missing-function-docstring
desired_actions: list[str], all_actions: set[str], minchars: int | None = None
) -> list[str]: # pylint: disable=missing-function-docstring
"""
This is a condensed version of policyuniverse's minimize_statement_actions, changed for our purposes.
https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/expander_minimizer.py#L123
Expand All @@ -110,16 +107,16 @@ def minimize_statement_actions(
continue
# If the action name is not empty
if prefix not in denied_prefixes:
if permission != "":
if permission:
if prefix not in desired_actions:
prefix = "{}*".format(prefix)
prefix = f"{prefix}*"
minimized_actions.add(prefix)
found_prefix = True
break

if not found_prefix:
logger.debug(
"Could not suitable prefix. Defaulting to %s".format(prefixes[-1])
f"Could not find suitable prefix. Defaulting to {prefixes[-1]}"
)
minimized_actions.add(prefixes[-1])
# sort the actions
Expand Down
34 changes: 33 additions & 1 deletion test/writing/test_minimize.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import unittest
import json
from policy_sentry.writing.sid_group import SidGroup
from policy_sentry.writing.minimize import minimize_statement_actions
from policy_sentry.writing.minimize import (
minimize_statement_actions,
check_min_permission_length,
)
from policy_sentry.querying.all import get_all_actions
from policy_sentry.util.policy_files import get_sid_names_from_policy, get_statement_from_policy_using_sid

Expand Down Expand Up @@ -45,6 +48,35 @@ def test_minimize_statement_actions_funky_case(self):
sorted(desired_result),
)

def test_minimize_statement_actions_with_min_chars(self):
actions_to_minimize = [
"ec2:AuthorizeSecurityGroupEgress",
"ec2:AuthorizeSecurityGroupIngress",
]
desired_result = ["ec2:authorizesecurity*"]
all_actions = get_all_actions(lowercase=True)
minchars = 17
self.maxDiff = None
self.assertListEqual(
sorted(
minimize_statement_actions(actions_to_minimize, all_actions, minchars)
),
sorted(desired_result),
)

def test_minimize_statement_actions_with_wildcard(self):
actions_to_minimize = ["ec2:*"]
desired_result = ["ec2:*"]
all_actions = get_all_actions(lowercase=True)
minchars = None
self.maxDiff = None
self.assertListEqual(
sorted(
minimize_statement_actions(actions_to_minimize, all_actions, minchars)
),
sorted(desired_result),
)

def test_minimize_rw_same_one(self):
cfg = {
"mode": "crud",
Expand Down