diff --git a/tests/test_finder.py b/tests/test_finder.py index cec535ba49..369f5c8135 100644 --- a/tests/test_finder.py +++ b/tests/test_finder.py @@ -1,6 +1,6 @@ from pathlib import Path -from unblob.finder import make_yara_rules, search_yara_patterns +from unblob.finder import make_handler_map, make_yara_rules, search_yara_patterns from unblob.models import Handler @@ -58,3 +58,16 @@ def test_search_yara_patterns(tmp_path: Path): assert result2.handler is handler2 assert result2.match.strings == [(10, "$tar_magic", b"ustar")] + + +def test_make_handler_map(): + handler_map = make_handler_map(tuple([TestHandler1, TestHandler2])) + assert isinstance(handler_map["handler1"], TestHandler1) + assert isinstance(handler_map["handler2"], TestHandler2) + + +def test_make_handler_map_instances_are_cached(): + handler_map1 = make_handler_map(tuple([TestHandler1, TestHandler2])) + handler_map2 = make_handler_map(tuple([TestHandler1, TestHandler2])) + assert handler_map1["handler1"] is handler_map2["handler1"] + assert handler_map1["handler2"] is handler_map2["handler2"] diff --git a/tests/test_handlers.py b/tests/test_handlers.py index d7e7c562e0..bd3aa495bf 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -11,10 +11,12 @@ import shlex import subprocess from pathlib import Path +from typing import Type import pytest from unblob import handlers +from unblob.models import Handler from unblob.processing import DEFAULT_DEPTH, process_file TEST_DATA_PATH = Path(__file__).parent / "integration" @@ -68,12 +70,12 @@ def test_all_handlers(input_dir: Path, output_dir: Path, tmp_path: Path): "handler", ( pytest.param(handler, id=handler.NAME) - for handler_map in handlers._ALL_MODULES_BY_PRIORITY - for handler in handler_map.values() + for handlers_in_priority in handlers.ALL_HANDLERS_BY_PRIORITY + for handler in handlers_in_priority ), ) -def test_missing_handlers_integrations_tests(handler): - handler_module_path = Path(inspect.getfile(handler.__class__)) +def test_missing_handlers_integrations_tests(handler: Type[Handler]): + handler_module_path = Path(inspect.getfile(handler)) handler_test_path = handler_module_path.relative_to( HANDLERS_PACKAGE_PATH ).with_suffix("") diff --git a/unblob/finder.py b/unblob/finder.py index e60f1de00c..adcd07e37f 100644 --- a/unblob/finder.py +++ b/unblob/finder.py @@ -6,13 +6,13 @@ from functools import lru_cache from operator import itemgetter from pathlib import Path -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, Type import yara from structlog import get_logger from .file_utils import LimitedStartReader -from .handlers import _ALL_MODULES_BY_PRIORITY +from .handlers import ALL_HANDLERS_BY_PRIORITY from .logging import noformat from .models import Handler, ValidChunk, YaraMatchResult from .state import exit_code_var @@ -38,10 +38,11 @@ def search_chunks_by_priority( # noqa: C901 """ all_chunks = [] - for priority_level, handlers in enumerate(_ALL_MODULES_BY_PRIORITY, start=1): + for priority_level, handler_classes in enumerate(ALL_HANDLERS_BY_PRIORITY, start=1): logger.info("Starting priority level", priority_level=noformat(priority_level)) - yara_rules = make_yara_rules(tuple(handlers.values())) - yara_results = search_yara_patterns(yara_rules, handlers, path) + yara_rules = make_yara_rules(handler_classes) + handler_map = make_handler_map(handler_classes) + yara_results = search_yara_patterns(yara_rules, handler_map, path) for result in yara_results: handler, match = result.handler, result.match @@ -87,7 +88,7 @@ def search_chunks_by_priority( # noqa: C901 @lru_cache -def make_yara_rules(handlers: Tuple[Handler, ...]): +def make_yara_rules(handlers: Tuple[Type[Handler], ...]): """Make yara.Rule by concatenating all handlers yara rules and compiling them.""" all_yara_rules = "\n".join( _YARA_RULE_TEMPLATE.format(NAME=h.NAME, YARA_RULE=h.YARA_RULE.strip()) @@ -98,8 +99,13 @@ def make_yara_rules(handlers: Tuple[Handler, ...]): return compiled_rules +@lru_cache +def make_handler_map(handler_classes: Tuple[Type[Handler], ...]) -> Dict[str, Handler]: + return {h.NAME: h() for h in handler_classes} + + def search_yara_patterns( - yara_rules: yara.Rule, handlers: Dict[str, Handler], full_path: Path + yara_rules: yara.Rule, handler_map: Dict[str, Handler], full_path: Path ) -> List[YaraMatchResult]: """Search with the compiled YARA rules and identify the handler which defined the rule.""" # YARA uses a memory mapped file internally when given a path @@ -107,7 +113,7 @@ def search_yara_patterns( yara_results = [] for match in yara_matches: - handler = handlers[match.rule] + handler = handler_map[match.rule] yara_res = YaraMatchResult(handler=handler, match=match) yara_results.append(yara_res) diff --git a/unblob/handlers/__init__.py b/unblob/handlers/__init__.py index 560ee16376..34ef76961f 100644 --- a/unblob/handlers/__init__.py +++ b/unblob/handlers/__init__.py @@ -1,16 +1,11 @@ -from typing import Dict, List, Type +from typing import List, Tuple, Type from ..models import Handler from .archive import ar, arc, arj, cab, cpio, dmg, rar, sevenzip, tar, zip from .filesystem import cramfs, fat, iso9660, squashfs, ubi - -def _make_handler_map(*handlers: Type[Handler]) -> Dict[str, Handler]: - return {h.NAME: h() for h in handlers} - - -_ALL_MODULES_BY_PRIORITY: List[Dict[str, Handler]] = [ - _make_handler_map( +ALL_HANDLERS_BY_PRIORITY: List[Tuple[Type[Handler], ...]] = [ + ( cramfs.CramFSHandler, fat.FATHandler, squashfs.SquashFSv3Handler, @@ -18,7 +13,7 @@ def _make_handler_map(*handlers: Type[Handler]) -> Dict[str, Handler]: ubi.UBIHandler, ubi.UBIFSHandler, ), - _make_handler_map( + ( ar.ARHandler, arc.ARCHandler, arj.ARJHandler,