Skip to content

Commit

Permalink
Add: include when a dependency is gated in the dependency graph
Browse files Browse the repository at this point in the history
  • Loading branch information
NiklasHargarter committed Dec 31, 2024
1 parent d8d8fdd commit ae12633
Showing 1 changed file with 103 additions and 38 deletions.
141 changes: 103 additions & 38 deletions troubadix/standalone_plugins/dependency_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,27 @@
from troubadix.helper.helper import is_enterprise_folder
from troubadix.helper.patterns import _get_special_script_tag_pattern

EXTENSIONS = (".nasl",)
DEPENDENCY_REGEX = r"script_dependencies\((.*?)\);"
EXTENSIONS = (".nasl",) # not sure if inc files can also have dependencies
DEPENDENCY_PATTERN = _get_special_script_tag_pattern(
"dependencies", flags=re.DOTALL | re.MULTILINE
)
IF_BLOCK_PATTERN = re.compile(
r'if\s*\(FEED_NAME\s*==\s*"GSF"\s*\|\|\s*FEED_NAME\s*==\s*"GEF"\s*\|\|\s*FEED_NAME\s*==\s*"SCM"\)\s*'
r"(?:\{[^}]*\}\s*|[^\{;]*;)"
) # Matches specific if blocks used to gate code to run only for enterprise feeds


@dataclass
class Script:
name: str
path: Path
feed: str
dependencies: list[str]
ungated_dependencies: list[str] # not in a enterprise gate
gated_dependencies: list[str] # inside a enterprise gate

@property
def dependencies(self) -> list[str]:
return self.ungated_dependencies + self.gated_dependencies


def directory_type(string: str) -> Path:
Expand Down Expand Up @@ -59,6 +67,7 @@ def parse_args() -> Namespace:
return parser.parse_args()


# Usefull? Or is full only ever used and can therfore be removed?
def get_feed(root, feed) -> list[Script]:
match feed:
case "21.04":
Expand All @@ -83,12 +92,24 @@ def get_scripts(directory) -> list[Script]:
root_path = Path(root)
for file in files:
if file.endswith(EXTENSIONS):
path = root_path / file
relative_path = path.relative_to(directory)
path = root_path / file # absolute path for file access
relative_path = path.relative_to(
directory
) # relative path to \nasl will be used a identifier
name = str(relative_path)
feed = determine_feed(relative_path)
dependencies = extract_dependencies(path)
scripts.append(Script(name, path, feed, dependencies))
ungated_dependencies, gated_dependencies = extract_dependencies(
path
)
scripts.append(
Script(
name,
path,
feed,
ungated_dependencies,
gated_dependencies,
)
)
return scripts


Expand All @@ -100,38 +121,46 @@ def determine_feed(script_relative_path: Path) -> str:
return "community"


# works but not used, skips gsf folder
# could be used to only determine a scripts feed
# by only fetching from enterprise folder in a seperate call
def community_files(directory):
enterprise_dir = directory / "gsf"
for root, dirs, files in os.walk(directory):
root_path = Path(root)
# durch edit in place wird gsf folder ausgelassen
dirs[:] = [d for d in dirs if root_path / d != enterprise_dir]
for file in files:
if file.endswith(EXTENSIONS):
yield root_path / file
def split_dependencies(value: str) -> list[str]:
"""
removes blank lines, strips comments, cleans dependencies,
splits them by commas, and excludes empty strings.
"""
return [
dep
for line in value.splitlines()
if line.strip() # Ignore blank or whitespace-only lines
# ignore comment, clean line of unwanted chars, split by ','
for dep in re.sub(r'[\'"\s]', "", line.split("#", 1)[0]).split(",")
if dep # Include only non-empty
]


def extract_dependencies(file_path: Path) -> list[str]:
deps = []
def extract_dependencies(file_path: Path) -> tuple[list[str], list[str]]:
ungated_deps = []
gated_deps = []

try:
with file_path.open("r", encoding=CURRENT_ENCODING) as file:
content = file.read()

matches = DEPENDENCY_PATTERN.finditer(content)
for match in matches:
for line in match.group("value").splitlines():
subject = line[: line.index("#")] if "#" in line else line
_dependencies = re.sub(r'[\'"\s]', "", subject).split(",")
deps.extend([dep for dep in _dependencies if dep != ""])
if_blocks = [
(m.start(), m.end()) for m in IF_BLOCK_PATTERN.finditer(content)
]

for match in DEPENDENCY_PATTERN.finditer(content):
start, end = match.span()
is_gated = any(
start >= block_start and end <= block_end
for block_start, block_end in if_blocks
)
dependencies = split_dependencies(match.group("value"))
(gated_deps if is_gated else ungated_deps).extend(dependencies)

except Exception as e:
logging.error(f"Error processing {file_path}: {e}")

return deps
return (ungated_deps, gated_deps)


def create_graph(scripts: list[Script]):
Expand All @@ -141,8 +170,10 @@ def create_graph(scripts: list[Script]):
for script in scripts:
# explicit add incase the script has no dependencies
graph.add_node(script.name, feed=script.feed)
for dep in script.dependencies:
graph.add_edge(script.name, dep)
for dep in script.ungated_dependencies:
graph.add_edge(script.name, dep, is_gated=False)
for dep in script.gated_dependencies:
graph.add_edge(script.name, dep, is_gated=True)
return graph


Expand All @@ -164,9 +195,9 @@ def check_duplicates(scripts: list[Script]):

def check_missing_dependencies(scripts: list[Script], graph: nx.DiGraph) -> int:
"""
checks if any scripts that are depended on are missing from the list of scripts
also logs the scripts dependending on the missing script
Checks if any scripts that are depended on are missing from
the list of scripts created from the local file system,
logs the scripts dependending on the missing script
"""
dependencies = {dep for script in scripts for dep in script.dependencies}
script_names = {script.name for script in scripts}
Expand Down Expand Up @@ -199,16 +230,50 @@ def check_cycles(graph) -> int:

def cross_feed_dependencies(graph):
"""
checks if scripts in community depend on scripts in enterprise folders
creates a list of script and dep for scripts
in community feed that depend on scripts in enterprise folders
"""
cross_feed_dependencies = [
return [
(u, v)
for u, v in graph.edges
if graph.nodes[u]["feed"] == "community"
and graph.nodes[v].get("feed", "unknown") == "enterprise"
]
for u, v in cross_feed_dependencies:
logging.info(f"cross-feed-dependency: {u} depends on {v}")


def ungated_cross_feed_dependencies(graph):
"""
Checks if scripts in the community feed have dependencies to enterprise scripts,
but are not contained within a gate.
"""
cross_feed_dependencies = [
(u, v)
for u, v, is_gated in graph.edges.data("is_gated")
if graph.nodes[u]["feed"] == "community"
and graph.nodes[v].get("feed", "unknown") == "enterprise"
and not is_gated
]

return cross_feed_dependencies


def check_cross_feed_dependecies(graph):
cfd = cross_feed_dependencies(graph)
logging.info(f" {len(cfd)} cross-feed-dependencies were found:")
for u, v in cfd:
logging.warning(f"ungated cross-feed-dependency: {u} depends on {v}")

ungated_cfd = ungated_cross_feed_dependencies(graph)
logging.info(
f" {len(ungated_cfd)} ungated cross-feed-dependencies were found:"
)
for u, v in ungated_cfd:
logging.error(f"ungated cross-feed-dependency: {u} depends on {v}")

if ungated_cfd:
return 1
else:
return 0


def main():
Expand All @@ -229,7 +294,7 @@ def main():
check_duplicates(scripts)
failed += check_missing_dependencies(scripts, graph)
failed += check_cycles(graph)
cross_feed_dependencies(graph)
failed += check_cross_feed_dependecies(graph)

return failed

Expand Down

0 comments on commit ae12633

Please sign in to comment.