Skip to content

Commit

Permalink
Refactor processing, get rid of strategies.py
Browse files Browse the repository at this point in the history
We decided that we don't want to implement the strategy abstraction right now.
Moved search chunk related functionality into finder.py and deleted strategies.py
The functions are ordered in the call order.
  • Loading branch information
kissgyorgy committed Dec 6, 2021
1 parent c14badc commit 86bc41f
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 140 deletions.
60 changes: 60 additions & 0 deletions tests/test_finder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from pathlib import Path

from unblob.finder import make_yara_rules, search_yara_patterns
from unblob.models import Handler


class _BaseTestHandler(Handler):
def calculate_chunk(self, *args, **kwargs):
pass

@staticmethod
def make_extract_command(*args, **kwargs):
return []


class TestHandler1(_BaseTestHandler):
NAME = "handler1"
YARA_RULE = r"""
strings:
$magic = { 21 3C }
condition:
$magic
"""


class TestHandler2(_BaseTestHandler):
NAME = "handler2"
YARA_RULE = r"""
strings:
$tar_magic = { 75 73 74 61 72 }
condition:
$tar_magic
"""


def test_make_yara_rules():
rules = make_yara_rules(tuple([TestHandler1, TestHandler2]))
matches = rules.match(data=b"!< ustar")
assert len(matches) == 2
assert matches[0].strings == [(0, "$magic", b"!<")]
assert matches[1].strings == [(10, "$tar_magic", b"ustar")]


def test_search_yara_patterns(tmp_path: Path):
handler1 = TestHandler1()
handler2 = TestHandler2
rules = make_yara_rules(tuple([TestHandler1, TestHandler2]))
handler_map = {"handler1": handler1, "handler2": handler2}
test_file = tmp_path / "test_file"
test_file.write_bytes(b"!< ustar")
results = search_yara_patterns(rules, handler_map, test_file)

assert len(results) == 2
result1, result2 = results

assert result1.handler is handler1
assert result1.match.strings == [(0, "$magic", b"!<")]

assert result2.handler is handler2
assert result2.match.strings == [(10, "$tar_magic", b"ustar")]
2 changes: 1 addition & 1 deletion tests/test_strategies.py → tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from unblob.models import UnknownChunk, ValidChunk
from unblob.strategies import calculate_unknown_chunks, remove_inner_chunks
from unblob.processing import calculate_unknown_chunks, remove_inner_chunks


@pytest.mark.parametrize(
Expand Down
3 changes: 3 additions & 0 deletions unblob/extractor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
File extraction related functions.
"""
import io
import shlex
import subprocess
Expand Down
83 changes: 77 additions & 6 deletions unblob/finder.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
"""
Searching Chunk related functions.
The main "entry point" is search_chunks_by_priority.
"""
import io
from functools import lru_cache
from operator import itemgetter
from pathlib import Path
from typing import Dict, List, Tuple

import yara
from structlog import get_logger

from .handlers import Handler
from .models import YaraMatchResult
from .file_utils import LimitedStartReader
from .handlers import _ALL_MODULES_BY_PRIORITY
from .logging import noformat
from .models import Handler, ValidChunk, YaraMatchResult
from .state import exit_code_var

logger = get_logger()

Expand All @@ -19,8 +28,67 @@
"""


def search_chunks_by_priority( # noqa: C901
path: Path, file: io.BufferedReader, file_size: int
) -> List[ValidChunk]:
"""Search all ValidChunks within the file.
Collect all the registered handlers by priority, search for YARA patterns and run
Handler.calculate_chunk() on the found matches.
We don't deal with offset within already found ValidChunks and invalid chunks are thrown away.
"""
all_chunks = []

for priority_level, handlers in enumerate(_ALL_MODULES_BY_PRIORITY, start=1):
logger.info("Starting priority level", priority_level=noformat(priority_level))
yara_rules = make_yara_rules(tuple(handlers.values()))
yara_results = search_yara_patterns(yara_rules, handlers, path)

for result in yara_results:
handler, match = result.handler, result.match

sorted_matches = sorted(match.strings, key=itemgetter(0))
for offset, identifier, string_data in sorted_matches:
real_offset = offset + handler.YARA_MATCH_OFFSET

if any(chunk.contains_offset(real_offset) for chunk in all_chunks):
continue

logger.info(
"Calculating chunk for YARA match",
start_offset=offset,
real_offset=real_offset,
identifier=identifier,
)

limited_reader = LimitedStartReader(file, real_offset)
try:
chunk = handler.calculate_chunk(limited_reader, real_offset)
except Exception as exc:
exit_code_var.set(1)
logger.error(
"Unhandled Exception during chunk calculation", exc_info=exc
)
continue

# We found some random bytes this handler couldn't parse
if chunk is None:
continue

if chunk.end_offset > file_size or chunk.start_offset < 0:
exit_code_var.set(1)
logger.error("Chunk overflows file", chunk=chunk)
continue

chunk.handler = handler
logger.info("Found valid chunk", chunk=chunk, handler=handler.NAME)
all_chunks.append(chunk)

return all_chunks


@lru_cache
def _make_yara_rules(handlers: Tuple[Handler, ...]):
def make_yara_rules(handlers: Tuple[Handler, ...]):
"""Make yara.Rule by concatenating all handlers yara rules and compiling them."""
all_yara_rules = "\n".join(
_YARA_RULE_TEMPLATE.format(NAME=h.NAME, YARA_RULE=h.YARA_RULE.strip())
for h in handlers
Expand All @@ -30,10 +98,10 @@ def _make_yara_rules(handlers: Tuple[Handler, ...]):
return compiled_rules


def search_chunks(
handlers: Dict[str, Handler], full_path: Path
def search_yara_patterns(
yara_rules: yara.Rule, handlers: Dict[str, Handler], full_path: Path
) -> List[YaraMatchResult]:
yara_rules = _make_yara_rules(tuple(handlers.values()))
"""Search with the compiled YARA rules and identify the handler which defined the rule."""
# YARA uses a memory mapped file internally when given a path
yara_matches: List[yara.Match] = yara_rules.match(str(full_path), timeout=60)

Expand All @@ -43,4 +111,7 @@ def search_chunks(
yara_res = YaraMatchResult(handler=handler, match=match)
yara_results.append(yara_res)

if yara_results:
logger.info("Found YARA results", count=noformat(len(yara_results)))

return yara_results
67 changes: 62 additions & 5 deletions unblob/processing.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import stat
from operator import attrgetter
from pathlib import Path
from typing import List

from structlog import get_logger

from .extractor import carve_unknown_chunks, extract_valid_chunks, make_extract_dir
from .strategies import (
calculate_unknown_chunks,
remove_inner_chunks,
search_chunks_by_priority,
)
from .finder import search_chunks_by_priority
from .iter_utils import pairwise
from .models import UnknownChunk, ValidChunk

logger = get_logger()

Expand Down Expand Up @@ -61,3 +61,60 @@ def process_file(
process_file(
extract_root, new_path, extract_root, max_depth, current_depth + 1
)


def remove_inner_chunks(chunks: List[ValidChunk]) -> List[ValidChunk]:
"""Remove all chunks from the list which are within another bigger chunks."""
if not chunks:
return []

chunks_by_size = sorted(chunks, key=attrgetter("size"), reverse=True)
outer_chunks = [chunks_by_size[0]]
for chunk in chunks_by_size[1:]:
if not any(outer.contains(chunk) for outer in outer_chunks):
outer_chunks.append(chunk)

outer_count = len(outer_chunks)
removed_count = len(chunks) - outer_count
logger.info(
"Removed inner chunks",
outer_chunk_count=outer_count,
removed_inner_chunk_count=removed_count,
)
return outer_chunks


def calculate_unknown_chunks(
chunks: List[ValidChunk], file_size: int
) -> List[UnknownChunk]:
"""Calculate the empty gaps between chunks."""
if not chunks or file_size == 0:
return []

sorted_by_offset = sorted(chunks, key=attrgetter("start_offset"))

unknown_chunks = []

first = sorted_by_offset[0]
if first.start_offset != 0:
unknown_chunk = UnknownChunk(0, first.start_offset)
unknown_chunks.append(unknown_chunk)

for chunk, next_chunk in pairwise(sorted_by_offset):
diff = next_chunk.start_offset - chunk.end_offset
if diff != 0:
unknown_chunk = UnknownChunk(
start_offset=chunk.end_offset,
end_offset=next_chunk.start_offset,
)
unknown_chunks.append(unknown_chunk)

last = sorted_by_offset[-1]
if last.end_offset < file_size:
unknown_chunk = UnknownChunk(
start_offset=last.end_offset,
end_offset=file_size,
)
unknown_chunks.append(unknown_chunk)

return unknown_chunks
Loading

0 comments on commit 86bc41f

Please sign in to comment.