Skip to content

Commit

Permalink
v0.1.5 update
Browse files Browse the repository at this point in the history
  • Loading branch information
FutureUniant committed Sep 17, 2024
1 parent 0e03fc5 commit 82fbeba
Show file tree
Hide file tree
Showing 18 changed files with 781 additions and 88 deletions.
Binary file modified app/db/config.db
Binary file not shown.
76 changes: 49 additions & 27 deletions app/src/algorithm/base/modnet/modnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ def _download(self):
def matting(self, input_data, output_data):

video = input_data["video_path"]
background = input_data["image_path"]
fps = input_data["fps"]
result_type = input_data["result_type"]

background_type = input_data["background_type"]
background = input_data["background"]
align_type = input_data["align"]

result = output_data["video_path"]

self.modnet.eval()
Expand All @@ -96,18 +99,36 @@ def matting(self, input_data, output_data):
rh = rh - rh % 32
rw = rw - rw % 32

if result_type == "compose":
# background_np = cv2.imread(background)
if result_type != "compose":
print("MattingModel's result_type must be compose.")
exit()

if background_type == "image":
background_np = cv2.imdecode(np.fromfile(background, dtype=np.uint8), cv2.IMREAD_COLOR)
background_np = cv2.cvtColor(background_np, cv2.COLOR_BGR2RGB)
background_np = cv2.resize(background_np, (rw, rh), cv2.INTER_AREA)
else:
background_video = cv2.VideoCapture(background)
if background_video.isOpened():
background_ret, background_frame = background_video.read()
background_frame = cv2.resize(background_frame, (rw, rh), cv2.INTER_AREA)
background_frame = cv2.cvtColor(background_frame, cv2.COLOR_BGR2RGB)
else:
background_ret = False
if not background_ret:
print('Failed to read the video: {0}'.format(video))
exit()
if align_type == "align":
background_num_frame = int(background_video.get(cv2.CAP_PROP_FRAME_COUNT))
interval_align = 1 if background_num_frame > num_frame else background_num_frame / num_frame

# video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(result, fourcc, fps, (w, h))

self.logger.write_log(f"follow:2:1:{num_frame}:0")
print('Start matting...')
background_count = 1
with tqdm(range(num_frame)) as t:
for c in t:
frame_np = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
Expand All @@ -129,37 +150,38 @@ def matting(self, input_data, output_data):
elif result_type == "matte":
view_np = matte_np * np.full(frame_np.shape, 255.0)
else:
view_np = matte_np * frame_np + (1 - matte_np) * background_np
if background_type == "image":
view_np = matte_np * frame_np + (1 - matte_np) * background_np
else:
view_np = matte_np * frame_np + (1 - matte_np) * background_frame
if align_type == "align":
if int(interval_align * c) >= background_count:
background_count += 1
background_ret, temp_background_frame = background_video.read()
if background_ret:
background_frame = temp_background_frame
background_frame = cv2.resize(background_frame, (rw, rh), cv2.INTER_AREA)
background_frame = cv2.cvtColor(background_frame, cv2.COLOR_BGR2RGB)

else:
background_ret, background_frame = background_video.read()
if not background_ret:
background_video.release()
background_video = cv2.VideoCapture(background)
background_ret, background_frame = background_video.read()
background_frame = cv2.resize(background_frame, (rw, rh), cv2.INTER_AREA)
background_frame = cv2.cvtColor(background_frame, cv2.COLOR_BGR2RGB)
view_np = cv2.cvtColor(view_np.astype(np.uint8), cv2.COLOR_RGB2BGR)
view_np = cv2.resize(view_np, (w, h))
video_writer.write(view_np)

rval, frame = vc.read()
c += 1
self.logger.write_log(f"follow:2:1:{num_frame}:{c}")
self.logger.write_log(f"follow:2:1:{num_frame}:{c + 1}")

video_writer.release()
vc.release()
if background_type != "image" and background_video.isOpened():
background_video.release()
print('Save the result video to {0}'.format(result))
return result


if __name__ == '__main__':

input_datas = {
"config": {
"device": "cuda",
"model-type": "webcam",
"result_type": "foreground", # foreground/matte
"fps": 30,
},
"input": {
"video_path": r"F:\demo\抠图\测试视频.mp4",
"image_path": r"F:\demo\audio\emoti\temp.txt"
},
"output": {
"video_path": r"F:\demo\抠图\测试视频-mat.mp4",
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,59 @@ def change_background(input_data):
video = VideoFileClip(video_path)
fps = video.fps
vw, vh = video.size
input_data["input"]["fps"] = fps

result_type = input_data["input"]["result_type"]
if result_type == "compose":
image_path = input_data["input"]["image_path"]
temp_image_path = os.path.join(os.path.dirname(image_path), f"temp.{image_path.split('.')[-1]}")
reszie_type = input_data["config"]["resize"]
input_data["input"]["fps"] = fps

image = Image.open(image_path)
iw, ih = image.size
# save background after resize
background_type = input_data["input"]["background_type"]
background_path = input_data["input"]["background"]
temp_background_path = os.path.join(os.path.dirname(background_path), f"temp.{background_path.split('.')[-1]}")
if background_type == "image":
background_file = Image.open(background_path)
iw, ih = background_file.size
ratio = max(vw / iw, vh / ih)
iw, ih = math.ceil(ratio * iw), math.ceil(ratio * ih)
image = image.resize((iw, ih))
bw, bh = math.ceil(ratio * iw), math.ceil(ratio * ih)
else:
background_file = VideoFileClip(background_path)
bvw, bvh = background_file.size
background_file.close()
ratio = max(vw / bvw, vh / bvh)
bw, bh = math.ceil(ratio * bvw), math.ceil(ratio * bvh)
resize_type = input_data["input"]["resize"]

if resize_type == "resize":
bw, bh = vw, vh
left, top = 0, 0
elif resize_type == "center":
left, top = int(0.5 * (bw - vw)), int(0.5 * (bh - vh))
elif resize_type == "left-top":
left, top = 0, 0
if reszie_type == "resize":
image = image.resize((vw, vh))
elif reszie_type == "center":
left, top = int(0.5 * (iw - vw)), int(0.5 * (ih - vh))
elif reszie_type == "left-top":
left, top = 0, 0
elif reszie_type == "left-down":
left, top = 0, ih - vh
elif reszie_type == "right-top":
left, top = iw - vw, 0
elif reszie_type == "right-down":
left, top = iw - vw, ih - vh
elif reszie_type == "top-center":
left, top = int(0.5 * (iw - vw)), 0
elif reszie_type == "down-center":
left, top = int(0.5 * (iw - vw)), ih - vh
elif reszie_type == "left-center":
left, top = 0, int(0.5 * (ih - vh))
elif reszie_type == "right-center":
left, top = iw - vw, int(0.5 * (ih - vh))
elif resize_type == "left-down":
left, top = 0, bh - vh
elif resize_type == "right-top":
left, top = bw - vw, 0
elif resize_type == "right-down":
left, top = bw - vw, bh - vh
elif resize_type == "top-center":
left, top = int(0.5 * (bw - vw)), 0
elif resize_type == "down-center":
left, top = int(0.5 * (bw - vw)), bh - vh
elif resize_type == "left-center":
left, top = 0, int(0.5 * (bh - vh))
else:
# resize_type == "right-center"
left, top = bw - vw, int(0.5 * (bh - vh))

image = image.crop((left, top, left + vw, top + vh))
image.save(temp_image_path)
input_data["input"]["image_path"] = temp_image_path
if background_type == "image":
background_file = background_file.resize((bw, bh))
background_file = background_file.crop((left, top, left + vw, top + vh))
background_file.save(temp_background_path)
else:
background_file = VideoFileClip(background_path, target_resolution=(bh, bw))
background_file = background_file.crop(x1=left, y1=top, x2=left + vw, y2=top + vh)
background_file.write_videofile(temp_background_path)

input_data["input"]["background"] = temp_background_path
mat_model = MattingModel(input_data["config"], logger)
mat_path = mat_model.matting(input_data["input"], input_data["output"])

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import os
import shutil

import cv2
import numpy as np
from PIL import Image,ImageFont,ImageDraw
from app.src.utils.logger import Logger
from moviepy.editor import VideoFileClip, ImageSequenceClip

from app.src.algorithm.base.sam2.video_predictor import VideoPredictor


class LocalModel:
def __init__(self, config, logger):
self.logger = logger
self.config = config
self.size = config["input"]["size"]
self.lama_model = None
self.sam2_video_model = None
self.sam2_image_model = None
self.point_diameter = int(min(self.size) * 0.05)

def initial(self):
self.logger.write_log("interval:1:1:1:0:Video Initial")
if self.sam2_video_model is None:
self.sam2_video_model = VideoPredictor(self.config["config"]["sam2"], self.logger)
self.sam2_video_model.set_video(self.config["input"]["video_frame_path"])
self.logger.write_log("interval:1:1:1:0:Video Initial")

def reset(self):
self.sam2_video_model.reset()

def add_point(self, point, label, ann_frame_idx):
if self.sam2_video_model is None:
self.sam2_video_model = VideoPredictor(self.config["config"]["sam2"], self.logger)
self.sam2_video_model.set_video(self.config["input"]["video_frame_path"])
out_obj_ids, out_mask_logits = self.sam2_video_model.add_point(
point,
label,
ann_frame_idx
)
return out_obj_ids, out_mask_logits

def show_segment_frame(self, mask_logits, ann_frame_idx):
frame_path = os.path.join(self.config["input"]["video_frame_path"], self.sam2_video_model.frame_names[ann_frame_idx])
frame = Image.open(frame_path)
mask_color = self.config["input"]["mask_color"]
mask_color = Image.new("RGBA", (self.size[0], self.size[1]), mask_color)
mask_color = np.array(mask_color)
mask_logits = (mask_logits[0] > 0.0).cpu().numpy()
mask_logits = mask_logits.reshape((self.size[1], self.size[0], -1))
mask_rgba = Image.fromarray(mask_color * mask_logits)

show_frame = Image.alpha_composite(frame.convert('RGBA'), mask_rgba).convert("RGB")
return show_frame

def _get_mask(self, mask_image, mask_logit, size):
w, h = size
mask_logit = mask_logit.reshape(h, w, 1) * 255
mask_image = np.logical_or(mask_image, mask_logit)
return mask_image

def processing(self, process_type, output_path):
self.logger.write_log("interval:3:1:1:0")
if self.sam2_video_model is None:
self.sam2_video_model = VideoPredictor(self.config["config"]["sam2"], self.logger)
self.sam2_video_model.set_video(self.config["input"]["video_frame_path"])
video_segments = self.sam2_video_model.propagate_video()
self.logger.write_log("interval:3:1:1:1")

process_temp_dir = self.config["output"]["process_temp_dir"]
image_paths = list()
process_num = len(video_segments)
self.logger.write_log(f"follow:3:2:{process_num}:0")
for out_frame_idx, item in video_segments.items():
mask_image = np.zeros((self.size[1], self.size[0], 1))
for out_obj_id, out_mask in item.items():
mask_image = self._get_mask(mask_image, out_mask, self.size)

frame_path = os.path.join(self.config["input"]["video_frame_path"], self.sam2_video_model.frame_names[out_frame_idx])
name, _ = self.sam2_video_model.frame_names[out_frame_idx].rsplit(".", 1)
temp_image_path = os.path.join(process_temp_dir, f"{name}.png")

frame = Image.open(frame_path)
gray_frame = frame.convert("L").convert("RGB")
color_frame = frame
if process_type == "gray":
background = np.array(color_frame)
foreground = np.array(gray_frame)
else:
background = np.array(gray_frame)
foreground = np.array(color_frame)

if len(item.items()) > 0:
output_image = mask_image * foreground + (1 - mask_image) * background
Image.fromarray(np.uint8(output_image)).save(temp_image_path)
else:
Image.fromarray(background).save(temp_image_path)
self.logger.write_log(f"follow:3:2:{process_num}:{out_frame_idx+1}")
image_paths.append(temp_image_path)
self.logger.write_log(f"follow:3:2:{process_num}:{process_num}")

self.logger.write_log(f"interval:3:3:1:0")
video_path = self.config["input"]["video_path"]
video = VideoFileClip(video_path)
fps = video.fps
output_video = ImageSequenceClip(image_paths, fps=fps)
output_video = output_video.set_audio(video.audio)
output_video.write_videofile(output_path)
shutil.rmtree(process_temp_dir, ignore_errors=True)
video.close()
self.logger.write_log(f"interval:3:3:1:1")


def video_optimize_local_processing(input_data, local_model=None):

timestamp = input_data["input"]["timestamp"]
log_path = input_data["input"]["log_path"]
logger = Logger(log_path, timestamp)
if local_model is None:
local_model = LocalModel(input_data, logger)
else:
config = local_model.config
config.update(input_data)
local_model.config = config

opt_type = input_data["type"]
if opt_type == "add":
# Add a point
prompt = input_data["input"]["prompt"]
_, out_mask_logits = local_model.add_point(
prompt["data"],
prompt["value"],
input_data["input"]["ann_frame_idx"],
)
show_frame = local_model.show_segment_frame(out_mask_logits, input_data["input"]["ann_frame_idx"])
show_frame.save(input_data["output"]["show_temp_image"])
elif opt_type == "remove":
# Remove a point
prompts = input_data["input"]["prompts"]
ann_frame_idx = input_data["input"]["ann_frame_idx"]
local_model.reset()
out_mask_logits = None
for frame_id, frame_prompts in prompts.items():
if frame_id == ann_frame_idx:
for prompt in frame_prompts:
if prompt["type"] == "point":
_, out_mask_logits = local_model.add_point(
np.array(prompt["data"]),
np.array([prompt["value"]]),
frame_id,
)
else:
for prompt in frame_prompts:
if prompt["type"] == "point":
_, _ = local_model.add_point(
np.array(prompt["data"]),
np.array([prompt["value"]]),
frame_id,
)
if out_mask_logits is not None:
show_frame = local_model.show_segment_frame(out_mask_logits, input_data["input"]["ann_frame_idx"])
show_frame.save(input_data["output"]["show_temp_image"])
elif opt_type == "initial":
local_model.initial()
else:
local_model.processing(input_data["input"]["process_type"], input_data["output"]["video_path"])
return local_model

Loading

0 comments on commit 82fbeba

Please sign in to comment.