From 80f3a4f3fa9369a590ba6f2f07609297b1e69221 Mon Sep 17 00:00:00 2001 From: t-sasatani Date: Wed, 15 Jan 2025 19:29:19 +0900 Subject: [PATCH] Make base class for video processing, change noise masking to subclass --- mio/models/frames.py | 2 +- mio/process/video.py | 216 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 184 insertions(+), 34 deletions(-) diff --git a/mio/models/frames.py b/mio/models/frames.py index 8fde38e..bd8cc7c 100644 --- a/mio/models/frames.py +++ b/mio/models/frames.py @@ -3,7 +3,7 @@ """ from pathlib import Path -from typing import List, Optional, TypeVar +from typing import List, Optional, TypeVar, Union import cv2 import numpy as np diff --git a/mio/process/video.py b/mio/process/video.py index 7914be8..c2dde9f 100644 --- a/mio/process/video.py +++ b/mio/process/video.py @@ -208,6 +208,171 @@ def gen_freq_mask( cv2.destroyAllWindows() return mask +class BaseVideoProcessor: + """ + Base class for defining an abstract video processor. + + Attributes: + name (str): The name of the video processor. + output_frames (list): A list of output frames. + named_frame (NamedFrame): A NamedFrame object. + """ + def __init__(self, name: str, width: int, height: int, output_dir: Path): + """ + Initialize the BaseVideoProcessor object. + + Parameters: + name (str): The name of the video processor. + """ + self.name: str = name + self.output_dir: Path = output_dir + self.output_frames: list = [] + self.output_named_frame = None + self.processor = FrameProcessor( + height=height, + width=width, + ) + + def append_frame(self, frame: np.ndarray)-> None: + """ + Append a frame to the output_frames list. + + Parameters: + frame (np.ndarray): The frame to append. + """ + self.output_frames.append(frame) + + def process_frame(self, frame: np.ndarray)-> None: + """ + Process a single frame. This method should be implemented in the subclass. + """ + pass + + def _set_output_named_frame(self)-> None: + """ + Set the named frame object. + """ + self.output_named_frame = NamedFrame(name=self.name, video_frame=self.output_frames) + + def export_video(self)-> None: + """ + Export the video to a file. + """ + self._set_output_named_frame() + self.output_named_frame.export( + output_path=self.output_dir / "output", + fps=20, + suffix=True, + ) + + def get_output_named_frames(self)-> NamedFrame: + """ + Get a NamedFrame object from the output frames. + """ + self._set_output_named_frame() + return self.output_named_frame + +class NoisePatchProcessor(BaseVideoProcessor): + """ + A class to apply noise patching to a video. + """ + def __init__( + self, name: str, + noise_patch_config: NoisePatchConfig, + width: int, + height: int, + output_dir: Path)-> None: + """ + Initialize the NoisePatchProcessor object. + + Parameters: + name (str): The name of the video processor. + noise_patch_config (NoisePatchConfig): The noise patch configuration. + """ + super().__init__(name, width, height, output_dir) + self.noise_patch_config: NoisePatchConfig = noise_patch_config + self.noise_patchs = [] + self.diff_frames = [] + self.noise_patch_named_frame: NamedFrame = None + self.diff_frames_named_frame: NamedFrame = None + + def process_frame( + self, + raw_frame: np.ndarray, + previous_frame: np.ndarray)-> Tuple[np.ndarray, np.ndarray]: + """ + Process a single frame. + + Parameters: + raw_frame (np.ndarray): The raw frame to process. + previous_frame (np.ndarray): The previous frame to compare against. + """ + + if self.noise_patch_config.enable: + patched_frame, noise_patch = self.processor.patch_noisy_buffer( + raw_frame, + previous_frame, + self.noise_patch_config, + ) + self.append_frame(patched_frame) + self.noise_patchs.append(noise_patch * np.iinfo(np.uint8).max) + self.diff_frames.append( + cv2.absdiff(raw_frame, previous_frame) * self.noise_patch_config.diff_multiply) + + return patched_frame, noise_patch + else: + return raw_frame, None + + def _set_noise_patch_named_frame(self)-> None: + """ + Set the NamedFrame object for the noise patch. + """ + self.noise_patch_named_frame = NamedFrame(name="patched_area", video_frame=self.noise_patchs) + + def _set_diff_frames_named_frame(self)-> None: + """ + Set the NamedFrame object for the difference frames. + """ + self.diff_frames_named_frame = NamedFrame( + name=f"diff_{self.noise_patch_config.diff_multiply}x", + video_frame=self.diff_frames, + ) + + def get_noise_patch_named_frame(self)-> NamedFrame: + """ + Get a NamedFrame object for the noise patch. + """ + self._set_noise_patch_named_frame() + return self.noise_patch_named_frame + + def get_diff_frames_named_frame(self)-> NamedFrame: + """ + Get a NamedFrame object for the difference frames. + """ + self._set_diff_frames_named_frame() + return self.diff_frames_named_frame + + def export_noise_patch(self)-> None: + """ + Export the noise patch to a file. + """ + self._set_noise_patch_named_frame() + self.noise_patch_named_frame.export( + output_path=self.output_dir / f"{self.name}", + fps=20, + suffix=True, + ) + + def export_diff_frames(self)-> None: + """ + Export the difference frames to a file. + """ + self._set_diff_frames_named_frame() + self.diff_frames_named_frame.export( + output_path=self.output_dir / f"{self.name}", + fps=20, + suffix=True, + ) class VideoProcessor: """ @@ -229,8 +394,8 @@ def denoise( ) reader = VideoReader(video_path) - pathstem = Path(video_path).stem + output_dir = Path.cwd() / config.output_dir if not output_dir.exists(): output_dir.mkdir(parents=True) @@ -238,10 +403,15 @@ def denoise( # Initialize lists to store frames raw_frames = [] output_frames = [] - if config.noise_patch.enable: - patched_frames = [] - noise_patchs = [] - diff_frames = [] + + noise_patch_processor = NoisePatchProcessor( + output_dir=output_dir, + name="patch", + noise_patch_config=config.noise_patch, + width=reader.width, + height=reader.height, + ) + if config.frequency_masking.enable: freq_domain_frames = [] freq_filtered_frames = [] @@ -252,7 +422,7 @@ def denoise( width=reader.width, ) - if config.noise_patch.enable: + if config.frequency_masking.enable: freq_mask = processor.gen_freq_mask( freq_mask_config=config.frequency_masking, ) @@ -273,19 +443,11 @@ def denoise( previous_frame = raw_frame.copy() logger.debug(f"Processing frame {index}") - patched_frame, noise_patch = processor.patch_noisy_buffer( + patched_frame, noise_patch = noise_patch_processor.process_frame( raw_frame, previous_frame, - config.noise_patch, ) previous_frame = patched_frame - patched_frames.append(patched_frame) - noise_patchs.append(noise_patch * np.iinfo(np.uint8).max) - - if config.noise_patch.output_diff: - diff_frame = cv2.absdiff(raw_frame, previous_frame) - diff_frames.append(diff_frame * config.noise_patch.diff_multiply) - output_frame = patched_frame if config.frequency_masking.enable: @@ -308,24 +470,12 @@ def denoise( raw_video = NamedFrame(name="RAW", video_frame=raw_frames) if config.noise_patch.enable: - patched_video = NamedFrame(name="patched", video_frame=patched_frames) - if config.noise_patch.output_result: - patched_video.export( - output_dir / f"{pathstem}", - suffix=True, - fps=20, - ) + patched_video = noise_patch_processor.get_output_named_frames() + noise_patch_processor.export_video() if config.noise_patch.output_diff: - diff_video = NamedFrame( - name=f"diff_{config.noise_patch.diff_multiply}x", video_frame=diff_frames - ) - diff_video.export( - output_dir / f"{pathstem}", - suffix=True, - fps=20, - ) + noise_patch_processor.export_diff_frames() if config.noise_patch.output_noise_patch: - noise_patch = NamedFrame(name="noise_patch", video_frame=noise_patchs) + noise_patch_processor.export_noise_patch() if config.frequency_masking.output_mask: freq_mask_frame = NamedFrame( name="freq_mask", static_frame=freq_mask * np.iinfo(np.uint8).max @@ -359,8 +509,8 @@ def denoise( if config.interactive_display.enable: videos = [ raw_video, - noise_patch, - patched_video, + noise_patch_processor.get_noise_patch_named_frame(), + noise_patch_processor.get_output_named_frames(), freq_filtered_video, freq_domain_video, min_proj_frame,