Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New filters: Noise, grayscale and oily skin detection #65

Open
wants to merge 7 commits into
base: new_filters
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DPF/filters/data_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pandas as pd
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from tqdm.auto import tqdm

from DPF.dataloaders.dataloader_utils import identical_collate_fn
from DPF.modalities import ModalityName
Expand Down
183 changes: 183 additions & 0 deletions DPF/filters/images/face_focus_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List
import cv2
import torch
from torch.multiprocessing import Pool, set_start_method
# Set the start method to 'spawn' at the beginning of your script
try:
set_start_method('spawn')
except RuntimeError:
pass
from functools import partial
import numpy as np
from scipy.stats import kurtosis
from DPF.types import ModalityToDataMapping
from DPF.utils import read_image_rgb_from_bytes
from .img_filter import ImageFilter
from PIL import Image, UnidentifiedImageError
from retinaface.pre_trained_models import get_model

class FaceFocusFilter(ImageFilter):
def __init__(
self,
threshold: float = 2000.0,
detect_face = True,
workers: int = 1,
batch_size: int = 1,
pbar: bool = True,
device=None,
_pbar_position: int = 0
):
super().__init__(pbar, _pbar_position)
self.threshold = threshold
self.detect_face = detect_face
self.num_workers = workers
self.batch_size = batch_size
if not device:
self.device = 'cuda' if torch.cuda.is_available() else "cpu"
else:
self.device = device
self.face_detector = get_model("resnet50_2020-07-20",
max_size=2048,
device=self.device)
self.face_detector.eval()


@property
def result_columns(self) -> list[str]:
return ["face_focus_measure", "bg_focus_measure", "bbox", "faces_count", "confidence", "face_pass"]

@property
def dataloader_kwargs(self) -> dict[str, Any]:
return {
"num_workers": self.num_workers,
"batch_size": self.batch_size,
"drop_last": False,
}

def preprocess_data(
self,
modality2data: ModalityToDataMapping,
metadata: dict[str, Any]
) -> Any:
key = metadata[self.key_column]
try:
pil_image = read_image_rgb_from_bytes(modality2data['image'])
numpy_image = np.array(pil_image)
opencv_image = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR)
return key, opencv_image
except (OSError, UnidentifiedImageError, ValueError) as e:
print(f"Error processing image for key {key}: {str(e)}")
return key, None

def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]:
df_batch_labels = self._get_dict_from_schema()

for key, image in batch:
info = self.process_image(image)

if info:
df_batch_labels["face_focus_measure"].append(info["face_focus_measure"])
df_batch_labels["bg_focus_measure"].append(info["bg_focus_measure"])
df_batch_labels["bbox"].append(info["bbox"])
df_batch_labels["faces_count"].append(info["faces_count"])
df_batch_labels["confidence"].append(info["confidence"])
df_batch_labels["face_pass"].append(info["face_pass"])
else:
df_batch_labels["face_focus_measure"].append(0)
df_batch_labels["bg_focus_measure"].append(0)
df_batch_labels["bbox"].append(False)
df_batch_labels["faces_count"].append(0)
df_batch_labels["confidence"].append(0.0)
df_batch_labels["face_pass"].append(False)

df_batch_labels[self.key_column].append(key)

return df_batch_labels

# def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]:
# df_batch_labels = self._get_dict_from_schema()

# # Create a partial function with self.process_image
# process_image_partial = partial(self.process_image)

# # Use multiprocessing to process images in parallel
# with Pool() as pool:
# results = pool.map(process_image_partial, [image for _, image in batch])

# for (key, _), info in zip(batch, results):
# for column in self.result_columns:
# df_batch_labels[column].append(info.get(column, 0 if column in ['face_focus_measure', 'bg_focus_measure', 'faces_count', 'confidence'] else False))
# df_batch_labels[self.key_column].append(key)

# return df_batch_labels

def tenengrad_variance(self, image):
"""
Calculate the Tenengrad variance focus measure for the given image.
"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
gx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
gy = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
gx_squared = np.square(gx)
gy_squared = np.square(gy)
tenengrad_variance = np.mean(gx_squared + gy_squared)
return tenengrad_variance

def process_image(self, image):

# Calculate the focus measure for the entire image
bg_focus_measure = self.tenengrad_variance(image)

if not self.detect_face:
# Check if the face is in focus
in_focus = bg_focus_measure > self.threshold
return {
"face_focus_measure": 0,
"bg_focus_measure": bg_focus_measure,
"bbox": None,
"faces_count": 0,
"confidence":0,
"face_pass": in_focus
}

# Detect faces in the image
faces = self.face_detector.predict_jsons(image)

if faces is None or len(faces) == 0:
return {
"face_focus_measure": 0,
"bg_focus_measure": bg_focus_measure,
"bbox": None,
"faces_count": 0,
"confidence": 0,
"face_pass": False
}

# Get the face with the highest confidence
face = max(faces, key=lambda x: x['score'])

faces = [x for x in faces if x['score'] > 0.5]

bbox = face['bbox']
landmarks = face['landmarks']

# Extract the face region
x1, y1, x2, y2 = map(int, bbox)
face_region = image[y1:y2, x1:x2]

# Calculate the focus measure for the face region
face_focus_measure = self.tenengrad_variance(face_region)

# Check if the face is in focus
in_focus = face_focus_measure > self.threshold

return {
"face_focus_measure": face_focus_measure,
"bg_focus_measure": bg_focus_measure,
"bbox": bbox,
"faces_count": len(faces),
"confidence": face["score"],
"face_pass": (len(faces) == 1) and in_focus and face['score'] > 0.5
}
133 changes: 133 additions & 0 deletions DPF/filters/images/focus_peaking_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import os
from typing import Any
from deepface import DeepFace
import cv2
import numpy as np
from scipy.stats import kurtosis
from DPF.types import ModalityToDataMapping
from DPF.utils import read_image_rgb_from_bytes
from .img_filter import ImageFilter

class FocusFilter(ImageFilter):
"""
Filter for detecting faces and checking if the face is in focus.

Parameters
----------
face_focus_threshold: float = 2000.0
Threshold value for the Tenengrad variance focus measure to determine if the face is in focus.
workers: int = 16
Number of processes to use for reading data and calculating focus scores.
batch_size: int = 64
Batch size for processing images.
pbar: bool = True
Whether to use a progress bar.
"""

def __init__(
self,
threshold: float = 2000.0,
workers: int = 1,
batch_size: int = 1,
pbar: bool = True,
_pbar_position: int = 0,
detect_face = True
):
super().__init__(pbar, _pbar_position)
self.threshold = threshold
self.num_workers = workers
self.batch_size = batch_size
self.detect_face = detect_face

@property
def result_columns(self) -> list[str]:
return ["in_focus", "focus_measure"]

@property
def dataloader_kwargs(self) -> dict[str, Any]:
return {
"num_workers": self.num_workers,
"batch_size": self.batch_size,
"drop_last": False,
}

def preprocess_data(
self,
modality2data: ModalityToDataMapping,
metadata: dict[str, Any]
) -> Any:
key = metadata[self.key_column]
pil_image = read_image_rgb_from_bytes(modality2data['image'])
image = np.array(pil_image)
return key, image

def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]:
df_batch_labels = self._get_dict_from_schema()

for key, image in batch:
face_info = process_image(image, threshold=self.threshold)
if face_info:
df_batch_labels["face_detected"].append(True)
df_batch_labels["face_in_focus"].append(face_info["face_in_focus"])
df_batch_labels["face_focus_measure"].append(face_info["face_focus_measure"])
else:
df_batch_labels["face_detected"].append(False)
df_batch_labels["face_in_focus"].append(False)
df_batch_labels["face_focus_measure"].append(0.0)
df_batch_labels[self.key_column].append(key)

return df_batch_labels

def tenengrad_variance(image):
"""
Calculate the Tenengrad variance focus measure for the given image.
"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
gx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
gy = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
gx_squared = np.square(gx)
gy_squared = np.square(gy)
tenengrad_variance = np.mean(gx_squared + gy_squared)
return tenengrad_variance

def process_image(image, threshold=2000.0):
# Calculate the focus measure for the entire image
focus_measure = tenengrad_variance(image)

if not detect_faces:
return focus_measure

# Detect faces in the image
faces = DeepFace.extract_faces(image,
enforce_detection=False,
detector_backend='retinaface')

# Filter faces based on confidence and presence of both eyes
filtered_faces = [face for face in faces if face['confidence'] > 0.1]
if not filtered_faces:
return None

face = max(filtered_faces, key=lambda x: x['confidence'])

# Check if exactly one face is detected after filtering
if len(filtered_faces) == 1 and face['confidence'] > 0.5:
face['facial_area']['confidence'] = face['confidence']
if face['facial_area']['left_eye'] is not None and face['facial_area']['right_eye'] is not None:



# Extract the face region
x, y, w, h = face['facial_area']['x'], face['facial_area']['y'], face['facial_area']['w'], face['facial_area']['h']
face_region = image[y:y+h, x:x+w]

# Calculate the focus measure for the face region
face_focus_measure = tenengrad_variance(face_region)

# Check if the face is in focus
face_in_focus = face_focus_measure > threshold

# Add the focus information to the face dictionary
face['facial_area']['face_in_focus'] = face_in_focus
face['facial_area']['face_focus_measure'] = face_focus_measure

return face['facial_area']
Loading