Skip to content

Commit

Permalink
Feature/fr batch image processing (#29)
Browse files Browse the repository at this point in the history
* add ! processing case if face regions for image are not detected

* chg ! move face_recognition settings to constance

* chg ! tests

* chg ! get some parametres for fr from proto file

* chg ! refactor DuplicationDetector, add NMS

* chg ! optimize find duplicates, tests

* chg ! tests

* add ! batch processing

* chg ! tests
  • Loading branch information
vitali-yanushchyk-valor authored Jun 10, 2024
1 parent c84dd61 commit af9234c
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 182 deletions.
15 changes: 13 additions & 2 deletions src/hope_dedup_engine/apps/faces/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,20 @@
@shared_task(bind=True, soft_time_limit=0.5 * 60 * 60, time_limit=1 * 60 * 60)
@task_lifecycle(name="Deduplicate", ttl=1 * 60 * 60)
# TODO: Use DeduplicationSet objects as input to deduplication pipeline
def deduplicate(self, filename: str):
def deduplicate(self, filenames: tuple[str], ignore_pairs: tuple[tuple[str, str]] = tuple()) -> tuple[tuple[str]]:
"""
Deduplicate a set of filenames, ignoring any specified pairs of filenames.
Args:
filenames (tuple[str]): A tuple of filenames to process.
ignore_pairs (tuple[tuple[str, str]]): A tuple of tuples, where each inner tuple contains
a pair of filenames to be ignored in the duplication check.
Returns:
tuple[tuple[str]]: A tuple of tuples, where each inner tuple represents a group of duplicates.
"""
try:
dd = DuplicationDetector(filename)
dd = DuplicationDetector(filenames, ignore_pairs)
return dd.find_duplicates()
except Exception as e:
self.update_state(state=states.FAILURE, meta={"exc_message": str(e), "traceback": traceback.format_exc()})
Expand Down
19 changes: 14 additions & 5 deletions src/hope_dedup_engine/apps/faces/utils/celery_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
import logging
from functools import wraps

Expand All @@ -8,16 +9,17 @@
redis_client = redis.Redis.from_url(settings.CELERY_BROKER_URL)


def task_lifecycle(name: str, ttl: int):
def decorator(func):
def task_lifecycle(name: str, ttl: int) -> callable:
def decorator(func) -> callable:
@wraps(func)
def wrapper(self, *args, **kwargs):
def wrapper(self, *args, **kwargs) -> any:
logger = logging.getLogger(func.__module__)
logger.info(f"{name} task started")
result = None

filename: str = args[0] if args else kwargs.get("filename")
lock_name: str = f"{name}_{filename}"
filenames = args[0] if args else kwargs.get("filenames")
ignore_pairs = args[1] if args else kwargs.get("ignore_pairs")
lock_name: str = f"{name}_{_get_hash(filenames, ignore_pairs)}"
if not _acquire_lock(lock_name, ttl):
logger.info(f"Task {name} with brocker lock {lock_name} is already running.")
return None
Expand All @@ -43,3 +45,10 @@ def _acquire_lock(lock_name: str, ttl: int = 1 * 60 * 60) -> bool:

def _release_lock(lock_name: str) -> None:
redis_client.delete(lock_name)


def _get_hash(filenames: tuple[str], ignore_pairs: tuple[tuple[str, str]]) -> str:
fn_str: str = ",".join(sorted(filenames))
ip_sorted = sorted((min(item1, item2), max(item1, item2)) for item1, item2 in ignore_pairs)
ip_str = ",".join(f"{item1},{item2}" for item1, item2 in ip_sorted)
return hashlib.sha256(f"{fn_str}{ip_str}".encode()).hexdigest()
184 changes: 123 additions & 61 deletions src/hope_dedup_engine/apps/faces/utils/duplication_detector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import os
import re
from collections import defaultdict
from dataclasses import dataclass

from django.conf import settings

Expand All @@ -17,14 +19,27 @@ class DuplicationDetector:
A class to detect and process duplicate faces in images.
"""

def __init__(self, filename: str) -> None:
@dataclass(frozen=True, slots=True)
class BlobFromImageConfig:
shape: dict[str, int]
scale_factor: float
mean_values: tuple[float, float, float]

@dataclass(frozen=True, slots=True)
class FaceEncodingsConfig:
num_jitters: int
model: str

logger: logging.Logger = logging.getLogger(__name__)

def __init__(self, filenames: tuple[str], ignore_pairs: tuple[str, str] = tuple()) -> None:
"""
Initialize the DuplicationDetector with the given filename.
Initialize the DuplicationDetector with the given filenames.
Args:
filename (str): The filename of the image to process.
filenames (list[str]): The filenames of the images to process.
ignore_pairs (list[tuple[str, str]]): The pairs of filenames to ignore.
"""
self.logger: logging.Logger = logging.getLogger(__name__)
self.storages: dict[str, CV2DNNStorage | HDEAzureStorage | HOPEAzureStorage] = {
"images": HOPEAzureStorage(),
"cv2dnn": CV2DNNStorage(settings.CV2DNN_PATH),
Expand All @@ -35,23 +50,28 @@ def __init__(self, filename: str) -> None:
if not self.storages.get("cv2dnn").exists(file):
raise FileNotFoundError(f"File {file} does not exist in storage.")

self.shape: dict[str, int] = self._get_shape()
self.net: cv2.dnn_Net = self._set_net(self.storages.get("cv2dnn"))

self.filename: str = filename
self.encodings_filename: str = f"{self.filename}.npy"
self.scale_factor: float = config.BLOB_FROM_IMAGE_SCALE_FACTOR
self.mean_values: tuple[float, float, float] = tuple(map(float, config.BLOB_FROM_IMAGE_MEAN_VALUES.split(", ")))
# self.mean_values: config.BLOB_FROM_IMAGE_MEAN_VALUES
self.filenames: tuple[str] = filenames
self.ignore_set: set[tuple[str, str]] = self._get_pairs_to_ignore(ignore_pairs)

self.blob_from_image_cfg = self.BlobFromImageConfig(
shape=self._get_shape(),
scale_factor=config.BLOB_FROM_IMAGE_SCALE_FACTOR,
mean_values=(
tuple(map(float, config.BLOB_FROM_IMAGE_MEAN_VALUES.split(", ")))
if isinstance(config.BLOB_FROM_IMAGE_MEAN_VALUES, str)
else config.BLOB_FROM_IMAGE_MEAN_VALUES
),
)
self.face_detection_confidence: float = config.FACE_DETECTION_CONFIDENCE
self.face_encodings_model: str = config.FACE_ENCODINGS_MODEL
self.face_encodings_num_jitters: int = config.FACE_ENCODINGS_NUM_JITTERS
self.distance_threshold: float = config.FACE_DISTANCE_THRESHOLD
self.nms_threshold: float = config.NMS_THRESHOLD
self.face_encodings_cfg = self.FaceEncodingsConfig(
num_jitters=config.FACE_ENCODINGS_NUM_JITTERS,
model=config.FACE_ENCODINGS_MODEL,
)

@property
def has_encodings(self) -> bool:
return self.storages["encoded"].exists(self.encodings_filename)
self.nms_threshold: float = config.NMS_THRESHOLD

def _set_net(self, storage: CV2DNNStorage) -> cv2.dnn_Net:
net = cv2.dnn.readNetFromCaffe(
Expand All @@ -75,20 +95,44 @@ def _get_shape(self) -> dict[str, int]:
else:
raise ValueError("Could not find input_shape in prototxt file.")

def _get_face_detections_dnn(self) -> list[tuple[int, int, int, int]]:
def _get_pairs_to_ignore(self, ignore: tuple[tuple[str, str]]) -> set[tuple[str, str]]:
ignore = tuple(tuple(pair) for pair in ignore)
if not ignore:
return set()
if all(
isinstance(pair, tuple) and len(pair) == 2 and all(isinstance(item, str) and item for item in pair)
for pair in ignore
):
return {(item1, item2) for item1, item2 in ignore} | {(item2, item1) for item1, item2 in ignore}
elif len(ignore) == 2 and all(isinstance(item, str) for item in ignore):
return {(ignore[0], ignore[1]), (ignore[1], ignore[0])}
else:
raise ValueError(
"Invalid format for 'ignore'. Expected tuple of tuples each containing exactly two strings."
)

def _encodings_filename(self, filename: str) -> str:
return f"{filename}.npy"

def _has_encodings(self, filename: str) -> bool:
return self.storages["encoded"].exists(self._encodings_filename(filename))

def _get_face_detections_dnn(self, filename: str) -> list[tuple[int, int, int, int]]:
face_regions: list[tuple[int, int, int, int]] = []
try:
with self.storages["images"].open(self.filename, "rb") as img_file:
with self.storages["images"].open(filename, "rb") as img_file:
img_array = np.frombuffer(img_file.read(), dtype=np.uint8)
# Decode image from binary buffer to 3D numpy array (height, width, channels of BlueGreeRed color space)
image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
(h, w) = image.shape[:2]
# Create a blob (4D tensor) from the image
blob = cv2.dnn.blobFromImage(
image=cv2.resize(image, dsize=(self.shape["height"], self.shape["width"])),
size=(self.shape["height"], self.shape["width"]),
scalefactor=self.scale_factor,
mean=self.mean_values,
image=cv2.resize(
image, dsize=(self.blob_from_image_cfg.shape["height"], self.blob_from_image_cfg.shape["width"])
),
size=(self.blob_from_image_cfg.shape["height"], self.blob_from_image_cfg.shape["width"]),
scalefactor=self.blob_from_image_cfg.scale_factor,
mean=self.blob_from_image_cfg.mean_values,
)
self.net.setInput(blob)
# Forward pass to get output with shape (1, 1, N, 7),
Expand All @@ -111,7 +155,7 @@ def _get_face_detections_dnn(self) -> list[tuple[int, int, int, int]]:
for i in indices:
face_regions.append(tuple(boxes[i]))
except Exception as e:
self.logger.exception("Error processing face detection for image %s", self.filename)
self.logger.exception("Error processing face detection for image %s", filename)
raise e
return face_regions

Expand All @@ -120,74 +164,92 @@ def _load_encodings_all(self) -> dict[str, list[np.ndarray]]:
try:
_, files = self.storages["encoded"].listdir("")
for file in files:
if file.endswith(".npy"):
if self._has_encodings(filename := os.path.splitext(file)[0]):
with self.storages["encoded"].open(file, "rb") as f:
data[os.path.splitext(file)[0]] = np.load(f, allow_pickle=False)
data[filename] = np.load(f, allow_pickle=False)
except Exception as e:
self.logger.exception("Error loading encodings.")
raise e
return data

def _encode_face(self) -> None:
def _encode_face(self, filename: str) -> None:
try:
with self.storages["images"].open(self.filename, "rb") as img_file:
with self.storages["images"].open(filename, "rb") as img_file:
image = face_recognition.load_image_file(img_file)
encodings: list = []
face_regions = self._get_face_detections_dnn()
face_regions = self._get_face_detections_dnn(filename)
if not face_regions:
self.logger.error("No face regions detected in image %s", self.filename)
self.logger.error("No face regions detected in image %s", filename)
else:
for region in face_regions:
if isinstance(region, (list, tuple)) and len(region) == 4:
top, right, bottom, left = region
# Compute the face encodings for the face regions in the image
face_encodings = face_recognition.face_encodings(
image,
[(top, right, bottom, left)],
num_jitters=self.face_encodings_num_jitters,
model=self.face_encodings_model,
num_jitters=self.face_encodings_cfg.num_jitters,
model=self.face_encodings_cfg.model,
)
encodings.extend(face_encodings)
else:
self.logger.error("Invalid face region.")
with self.storages["encoded"].open(self.encodings_filename, "wb") as f:
self.logger.error("Invalid face region %s", region)
with self.storages["encoded"].open(self._encodings_filename(filename), "wb") as f:
np.save(f, encodings)
except Exception as e:
self.logger.exception("Error processing face encodings for image %s", self.filename)
self.logger.exception("Error processing face encodings for image %s", filename)
raise e

def find_duplicates(self) -> tuple[str]:
def _get_duplicated_groups(self, checked: set[tuple[str, str, float]]) -> tuple[tuple[str]]:
# Dictionary to store connections between paths where distances are less than the threshold
groups = []
connections = defaultdict(set)
for path1, path2, dist in checked:
if dist < self.distance_threshold:
connections[path1].add(path2)
connections[path2].add(path1)
# Iterate over each path and form groups
for path, neighbors in connections.items():
# Check if the path has already been included in any group
if not any(path in group for group in groups):
new_group = {path}
queue = list(neighbors)
# Try to expand the group ensuring each new path is duplicated to all in the group
while queue:
neighbor = queue.pop(0)
if neighbor not in new_group and all(neighbor in connections[member] for member in new_group):
new_group.add(neighbor)
# Add neighbors of the current neighbor, excluding those already in the group
queue.extend([n for n in connections[neighbor] if n not in new_group])
# Add the newly formed group to the list of groups
groups.append(new_group)
return tuple(map(tuple, groups))

def find_duplicates(self) -> tuple[tuple[str]]:
"""
Find and return a list of duplicate images based on face encodings.
Returns:
tuple[str]: A tuple of filenames of duplicate images.
tuple[tuple[str]]: A tuple of filenames of duplicate images.
"""
duplicated_images: set[str] = set()
path1 = self.filename
try:
if not self.has_encodings:
self._encode_face()
for filename in self.filenames:
if not self._has_encodings(filename):
self._encode_face(filename)
encodings_all = self._load_encodings_all()
encodings1 = encodings_all[path1]

checked_pairs = set()
for path2, encodings2 in encodings_all.items():
if path1 != path2:
for encoding1 in encodings1:
for encoding2 in encodings2:
if (path1, path2, tuple(encoding1), tuple(encoding2)) in checked_pairs:
continue

distance = face_recognition.face_distance([encoding1], encoding2)
if distance < self.distance_threshold:
duplicated_images.update([path1, path2])
break

checked_pairs.add((path1, path2, tuple(encoding1), tuple(encoding2)))
if path2 in duplicated_images:
break
return tuple(duplicated_images)

checked = set()
for path1, encodings1 in encodings_all.items():
for path2, encodings2 in encodings_all.items():
if path1 < path2 and (path1, path2) not in self.ignore_set:
min_distance = float("inf")
for encoding1 in encodings1:
if (
current_min := min(face_recognition.face_distance(encodings2, encoding1))
) < min_distance:
min_distance = current_min
checked.add((path1, path2, min_distance))

return self._get_duplicated_groups(checked)
except Exception as e:
self.logger.exception("Error finding duplicates for image %s", path1)
self.logger.exception("Error finding duplicates for images %s", self.filenames)
raise e
2 changes: 1 addition & 1 deletion src/hope_dedup_engine/apps/faces/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ def to_python(self, value):
def prepare_value(self, value):
if isinstance(value, tuple):
return ", ".join(map(str, value))
return value
return super().prepare_value(value)
16 changes: 15 additions & 1 deletion tests/faces/faces_const.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
from typing import Final

FILENAME: Final[str] = "test_file.jpg"
FILENAMES: Final[list[str]] = ["test_file.jpg", "test_file2.jpg"]
FILENAME_ENCODED_FORMAT: Final[str] = "{}.npy"
FILENAMES: Final[list[str]] = ["test_file.jpg", "test_file2.jpg", "test_file3.jpg"]
IGNORE_PAIRS: Final[list[tuple[str, str]]] = [
("ignore_file.jpg", "ignore_file2.jpg"),
("ignore_file4.jpg", "ignore_file3.jpg"),
]

CELERY_TASK_NAME: Final[str] = "Deduplicate"
CELERY_TASK_TTL: Final[int] = 1 * 60 * 60
CELERY_TASK_DELAYS: Final[dict[str, int]] = {
"SoftTimeLimitExceeded": 5 * 60 * 60,
"TimeLimitExceeded": 10 * 60 * 60,
"CustomException": 0,
}

DEPLOY_PROTO_CONTENT: Final[str] = "input_shape { dim: 1 dim: 3 dim: 300 dim: 300 }"
DEPLOY_PROTO_SHAPE: Final[dict[str, int]] = {"batch_size": 1, "channels": 3, "height": 300, "width": 300}
FACE_REGIONS_INVALID: Final[list[list[tuple[int, int, int, int]]]] = [[], [(0, 0, 10)]]
Expand Down
16 changes: 13 additions & 3 deletions tests/faces/fixtures/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from unittest.mock import patch

import pytest
from freezegun import freeze_time

import docker
from docker import from_env

from ..faces_const import FILENAMES


@pytest.fixture(scope="session")
def docker_client():
client = docker.from_env()
client = from_env()
yield client
client.close()

Expand All @@ -19,8 +22,15 @@ def mock_redis_client():


@pytest.fixture
def mock_duplication_detector():
def mock_dd_find():
with patch(
"hope_dedup_engine.apps.faces.utils.duplication_detector.DuplicationDetector.find_duplicates"
) as mock_find:
mock_find.return_value = (FILENAMES[:2],) # Assuming the first two are duplicates based on mock data
yield mock_find


@pytest.fixture
def time_control():
with freeze_time("2024-01-01") as frozen_time:
yield frozen_time
Loading

0 comments on commit af9234c

Please sign in to comment.