From 50af4102dd7a5eb21360fab593dc96778803487f Mon Sep 17 00:00:00 2001 From: MarcelMB Date: Wed, 29 Jan 2025 16:17:09 -0800 Subject: [PATCH 1/9] changed gradient method to start with 1st frame --- mio/process/video.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/mio/process/video.py b/mio/process/video.py index dc408ea..9f0b59d 100644 --- a/mio/process/video.py +++ b/mio/process/video.py @@ -160,12 +160,28 @@ def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: if input_frame is None: return None - if self.noise_patch_config.enable and self.previous_frame is not None: - broken, noise_patch = self.noise_detect_helper.detect_frame_with_noisy_buffer( + if self.noise_patch_config.enable: + if self.noise_patch_config.method == "gradient": + # For the gradient method, analyze the first frame immediately + broken, noise_patch = self.noise_detect_helper.detect_frame_with_noisy_buffer( input_frame, - self.previous_frame if self.noise_patch_config.method == "mean_error" else None, + None, # No previous frame needed for gradient method self.noise_patch_config, ) + else: + # For the mean method, wait until the second frame to start analysis + if self.previous_frame is not None: + broken, noise_patch = self.noise_detect_helper.detect_frame_with_noisy_buffer( + input_frame, + self.previous_frame, + self.noise_patch_config, + ) + else: + # If it's the first frame and the method is mean_error, just store the frame + self.previous_frame = input_frame + self.append_output_frame(input_frame) + return input_frame + # Handle noisy frames if not broken: @@ -181,10 +197,8 @@ def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: self.dropped_frame_indices.append(index) return None - elif self.noise_patch_config.enable and self.previous_frame is None: - self.previous_frame = input_frame - self.append_output_frame(input_frame) - return input_frame + self.append_output_frame(input_frame) + return input_frame @property def noise_patch_named_video(self) -> NamedVideo: From 63e591a35ed8546bc6728776386b98018ec7e3ec Mon Sep 17 00:00:00 2001 From: MarcelMB Date: Wed, 29 Jan 2025 16:30:45 -0800 Subject: [PATCH 2/9] linting commit --- mio/process/video.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/mio/process/video.py b/mio/process/video.py index 9f0b59d..74b8734 100644 --- a/mio/process/video.py +++ b/mio/process/video.py @@ -162,12 +162,12 @@ def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: if self.noise_patch_config.enable: if self.noise_patch_config.method == "gradient": - # For the gradient method, analyze the first frame immediately + # For the gradient method, analyze the first frame immediately broken, noise_patch = self.noise_detect_helper.detect_frame_with_noisy_buffer( - input_frame, - None, # No previous frame needed for gradient method - self.noise_patch_config, - ) + input_frame, + None, # No previous frame needed for gradient method + self.noise_patch_config, + ) else: # For the mean method, wait until the second frame to start analysis if self.previous_frame is not None: @@ -175,13 +175,12 @@ def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: input_frame, self.previous_frame, self.noise_patch_config, - ) + ) else: # If it's the first frame and the method is mean_error, just store the frame self.previous_frame = input_frame self.append_output_frame(input_frame) return input_frame - # Handle noisy frames if not broken: From 91f989436d4de7fafbb24c823f4ebba86e223011 Mon Sep 17 00:00:00 2001 From: MarcelMB Date: Fri, 31 Jan 2025 14:47:20 -0800 Subject: [PATCH 3/9] black frame method integrated with gradient method, updates test --- mio/data/config/process/denoise_example.yml | 8 ++ mio/models/process.py | 18 ++- mio/process/frame_helper.py | 118 ++++++++++++++------ tests/test_process/test_frame_helper.py | 1 + 4 files changed, 112 insertions(+), 33 deletions(-) diff --git a/mio/data/config/process/denoise_example.yml b/mio/data/config/process/denoise_example.yml index ee89467..fba0dbc 100644 --- a/mio/data/config/process/denoise_example.yml +++ b/mio/data/config/process/denoise_example.yml @@ -3,6 +3,14 @@ mio_model: mio.models.process.DenoiseConfig mio_version: 0.6.1 noise_patch: enable: true + method: gradient #gradient or mean_error or black_pixels + additional_methods: + - black_pixels + threshold: 20 #for first method + black_pixel_consecutive_threshold: 5 # If 5 consecutive pixels in a row are black, discard the frame + black_pixel_value_threshold: 16 # Any pixel value ≤ 16 is considered "black" + buffer_size: 5032 + buffer_split: 8 method: gradient # gradient or mean_error threshold: 20 device_config_id: wireless-200px diff --git a/mio/models/process.py b/mio/models/process.py index f311817..d364ab7 100644 --- a/mio/models/process.py +++ b/mio/models/process.py @@ -2,7 +2,7 @@ Module for preprocessing data. """ -from typing import Literal, Optional +from typing import Optional from pydantic import BaseModel, Field @@ -55,6 +55,10 @@ class NoisePatchConfig(BaseModel): "gradient: Detection based on the gradient of the frame row." "mean_error: Detection based on the mean error with the same row of the previous frame.", ) + additional_methods: Optional[list[str]] = Field( + default=list, + description="Additional noise detection methods to apply after the primary method.", + ) threshold: float = Field( default=20, description="Threshold for detecting noise." @@ -66,6 +70,18 @@ class NoisePatchConfig(BaseModel): description="ID of the stream device configuration used for aquiring the video." "This is used in the mean_error method to compare frames" " in the units of data transfer buffers.", + black_pixel_consecutive_threshold: int = Field( + default=5, + description="Number of consecutive black pixels required to classify a row as noisy.", + ) + black_pixel_value_threshold: int = Field( + default=20, + description="Pixel intensity value below which a pixel is considered 'black'.", + ) + buffer_size: int = Field( + default=5032, + description="Size of the buffers composing the image." + "This premises that the noisy area will appear in units of buffer_size.", ) buffer_split: int = Field( default=1, diff --git a/mio/process/frame_helper.py b/mio/process/frame_helper.py index 7b83488..5910633 100644 --- a/mio/process/frame_helper.py +++ b/mio/process/frame_helper.py @@ -56,44 +56,49 @@ def detect_frame_with_noisy_buffer( ) logger.debug(f"Buffer size: {px_per_buffer}") - if config.method == "mean_error": - if previous_frame is None: - raise ValueError("mean_error requires a previous frame to compare against") - return self._detect_with_mean_error( - current_frame=current_frame, - previous_frame=previous_frame, - config=config, - ) + methods = [config.method] + if hasattr(config, "additional_methods") and isinstance(config.additional_methods, list): + methods.extend(config.additional_methods) - elif config.method == "gradient": - return self._detect_with_gradient(current_frame, config) - else: - logger.error(f"Unsupported noise detection method: {config.method}") - raise ValueError(f"Unsupported noise detection method: {config.method}") + logger.debug(f"Applying noise detection methods: {methods}") - def _get_buffer_shape( - self, frame_width: int, frame_height: int, px_per_buffer: int - ) -> list[int]: - """ - Get the shape of each buffer in a frame. + noisy_flag = False + combined_mask = np.zeros_like(current_frame, dtype=np.uint8) - Parameters: - frame_width (int): The width of the frame. - frame_height (int): The height of the frame. - px_per_buffer (int): The number of pixels per buffer. + for method in methods: + if method == "mean_error": + if previous_frame is None: + raise ValueError("mean_error requires a previous frame to compare against") - Returns: - list[int]: The shape of each buffer in the frame. - """ - buffer_shape = [] + serialized_current = current_frame.flatten().astype(np.int16) + serialized_previous = previous_frame.flatten().astype(np.int16) + + split_size = config.buffer_size // config.buffer_split + 1 + split_previous = self.split_by_length(serialized_previous, split_size) + split_current = self.split_by_length(serialized_current, split_size) - pixel_index = 0 - while pixel_index < frame_width * frame_height: - buffer_shape.append(int(pixel_index)) - pixel_index += px_per_buffer - logger.debug(f"Split shape: {buffer_shape}") + noisy, mask = self._detect_with_mean_error(split_current, split_previous, config) - return buffer_shape + elif method == "gradient": + noisy, mask = self._detect_with_gradient(current_frame, config) + + elif method == "black_pixels": + noisy, mask = self._detect_black_pixels(current_frame, config) + + else: + logger.error(f"Unsupported noise detection method: {method}") + continue # Skip unknown methods + + if noisy: + logger.info(f"Frame detected as noisy using method: {method}") + else: + logger.debug(f"Frame passed as clean using method: {method}") + + # Combine results + noisy_flag = noisy_flag or noisy + combined_mask = np.maximum(combined_mask, mask) + + return noisy_flag, combined_mask def _detect_with_mean_error( self, @@ -174,6 +179,55 @@ def _detect_with_gradient( return frame_is_noisy, noisy_mask + def _detect_black_pixels( + self, + current_frame: np.ndarray, + config: NoisePatchConfig, + ) -> Tuple[bool, np.ndarray]: + """ + Detect black-out noise by checking for black pixels (value 0) over rows of pixels. + + Returns: + Tuple[bool, np.ndarray]: A boolean indicating if the frame is corrupted and noise mask. + """ + height, width = current_frame.shape + noisy_mask = np.zeros_like(current_frame, dtype=np.uint8) + + # Read values from YAML config + consecutive_threshold = ( + config.black_pixel_consecutive_threshold + ) # How many consecutive pixels must be black + black_pixel_value_threshold = ( + config.black_pixel_value_threshold + ) # Max pixel value considered "black" + + logger.debug(f"Using black pixel threshold: <= {black_pixel_value_threshold}") + logger.debug(f"Consecutive black pixel threshold: {consecutive_threshold}") + + frame_is_noisy = False # Track if frame should be discarded + + for y in range(height): + row = current_frame[y, :] # Extract row + consecutive_count = 0 # Counter for consecutive black pixels + + for x in range(width): + if row[x] <= black_pixel_value_threshold: # Check if pixel is "black" + consecutive_count += 1 + else: + consecutive_count = 0 # Reset if a non-black pixel is found + + # If we exceed the allowed threshold of consecutive black pixels, discard the frame + if consecutive_count >= consecutive_threshold: + logger.debug( + f"Frame noisy due to {consecutive_count} consecutive black pixels " + f"in row {y}." + ) + noisy_mask[y, :] = 1 # Mark row as noisy + frame_is_noisy = True + break # No need to check further in this row + + return frame_is_noisy, noisy_mask + class FrequencyMaskHelper: """ diff --git a/tests/test_process/test_frame_helper.py b/tests/test_process/test_frame_helper.py index 9b6e276..2c240fd 100644 --- a/tests/test_process/test_frame_helper.py +++ b/tests/test_process/test_frame_helper.py @@ -49,6 +49,7 @@ class NoiseGroundTruth(BaseModel): "noise_detection_method,noise_category", [ ("gradient", GroundTruthCategory.check_pattern), + ("gradient", GroundTruthCategory.blacked_out), ("mean_error", GroundTruthCategory.check_pattern), ], ) From 018318683df0749abb02d8c9e968a37220a3b205 Mon Sep 17 00:00:00 2001 From: t-sasatani Date: Thu, 6 Feb 2025 12:33:45 +0900 Subject: [PATCH 4/9] Fix and revive tests for mean error-based detection --- mio/models/process.py | 3 ++- mio/process/frame_helper.py | 35 ++++++++++++++++++------- tests/test_process/test_frame_helper.py | 2 +- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/mio/models/process.py b/mio/models/process.py index d364ab7..56f0188 100644 --- a/mio/models/process.py +++ b/mio/models/process.py @@ -2,7 +2,7 @@ Module for preprocessing data. """ -from typing import Optional +from typing import Literal, Optional from pydantic import BaseModel, Field @@ -70,6 +70,7 @@ class NoisePatchConfig(BaseModel): description="ID of the stream device configuration used for aquiring the video." "This is used in the mean_error method to compare frames" " in the units of data transfer buffers.", + ) black_pixel_consecutive_threshold: int = Field( default=5, description="Number of consecutive black pixels required to classify a row as noisy.", diff --git a/mio/process/frame_helper.py b/mio/process/frame_helper.py index 5910633..f621691 100644 --- a/mio/process/frame_helper.py +++ b/mio/process/frame_helper.py @@ -69,15 +69,9 @@ def detect_frame_with_noisy_buffer( if method == "mean_error": if previous_frame is None: raise ValueError("mean_error requires a previous frame to compare against") - - serialized_current = current_frame.flatten().astype(np.int16) - serialized_previous = previous_frame.flatten().astype(np.int16) - - split_size = config.buffer_size // config.buffer_split + 1 - split_previous = self.split_by_length(serialized_previous, split_size) - split_current = self.split_by_length(serialized_current, split_size) - - noisy, mask = self._detect_with_mean_error(split_current, split_previous, config) + return self._detect_with_mean_error( + current_frame=current_frame, previous_frame=previous_frame, config=config + ) elif method == "gradient": noisy, mask = self._detect_with_gradient(current_frame, config) @@ -100,6 +94,29 @@ def detect_frame_with_noisy_buffer( return noisy_flag, combined_mask + def _get_buffer_shape( + self, frame_width: int, frame_height: int, px_per_buffer: int + ) -> list[int]: + """ + Get the shape of each buffer in a frame. + + Parameters: + frame_width (int): The width of the frame. + frame_height (int): The height of the frame. + px_per_buffer (int): The number of pixels per buffer. + + Returns: + list[int]: The shape of each buffer in the frame. + """ + buffer_shape = [] + + pixel_index = 0 + while pixel_index < frame_width * frame_height: + buffer_shape.append(int(pixel_index)) + pixel_index += px_per_buffer + logger.debug(f"Split shape: {buffer_shape}") + return buffer_shape + def _detect_with_mean_error( self, current_frame: np.ndarray, diff --git a/tests/test_process/test_frame_helper.py b/tests/test_process/test_frame_helper.py index 2c240fd..8e5ae3a 100644 --- a/tests/test_process/test_frame_helper.py +++ b/tests/test_process/test_frame_helper.py @@ -53,7 +53,7 @@ class NoiseGroundTruth(BaseModel): ("mean_error", GroundTruthCategory.check_pattern), ], ) -def test_noise_detection_contrast(video, ground_truth, noise_detection_method, noise_category): +def test_noisy_frame_detection(video, ground_truth, noise_detection_method, noise_category): """ Contrast method of noise detection should correctly label frames corrupted by speckled noise From ec232baf9cd716ce7f63bc56ed797eb646633b69 Mon Sep 17 00:00:00 2001 From: t-sasatani Date: Thu, 6 Feb 2025 13:21:08 +0900 Subject: [PATCH 5/9] Make Helpers subclass of SingleFrameHelper --- mio/process/frame_helper.py | 114 +++++++++++++++++++++++++++--------- mio/process/video.py | 16 ++--- 2 files changed, 92 insertions(+), 38 deletions(-) diff --git a/mio/process/frame_helper.py b/mio/process/frame_helper.py index f621691..939cc3b 100644 --- a/mio/process/frame_helper.py +++ b/mio/process/frame_helper.py @@ -3,6 +3,7 @@ It should be organized in a different way to make it more readable and maintainable. """ +from abc import abstractmethod from typing import Optional, Tuple import cv2 @@ -14,6 +15,34 @@ logger = init_logger("frame_helper") +class SingleFrameHelper: + """ + Helper class for single frame operations. + """ + + def __init__(self): + """ + Initialize the SingleFrameHelper object. + + Returns: + SingleFrameHelper: A SingleFrameHelper object. + """ + pass + + @abstractmethod + def process_frame(self, frame: np.ndarray) -> np.ndarray: + """ + Process a single frame. + + Parameters: + frame (np.ndarray): The frame to process. + + Returns: + np.ndarray: The processed frame. + """ + pass + + class NoiseDetectionHelper: """ Helper class for noise detection and frame processing. @@ -246,84 +275,113 @@ def _detect_black_pixels( return frame_is_noisy, noisy_mask -class FrequencyMaskHelper: +class FrequencyMaskHelper(SingleFrameHelper): """ - Helper class for frame operations. + Helper class for frequency masking operations. """ - def __init__(self): + def __init__(self, height: int, width: int, freq_mask_config: FreqencyMaskingConfig): """ - Initialize the FrameProcessor object. - Block size/buffer size will be set by dev config later. + Initialize the FreqMaskHelper object and generate a frequency mask. + + Parameters: + height (int): The height of the image. + width (int): The width of the image. + freq_mask_config (FreqencyMaskingConfig): Configuration for frequency masking + + Returns: + FreqMaskHelper: A FreqMaskHelper object. + """ + self._height = height + self._width = width + self._freq_mask_config = freq_mask_config + self._freq_mask = self._gen_freq_mask() + + @property + def freq_mask(self) -> np.ndarray: + """ + Get the frequency mask. Returns: - FrequencyMaskHelper: A FrequencyMaskHelper object. + np.ndarray: The frequency mask. """ + return self._freq_mask - def apply_freq_mask(self, img: np.ndarray, mask: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + def process_frame(self, img: np.ndarray) -> np.ndarray: """ Perform FFT/IFFT to remove horizontal stripes from a single frame. Parameters: img (np.ndarray): The image to process. - mask (np.ndarray): The frequency mask to apply. Returns: np.ndarray: The filtered image """ f = np.fft.fft2(img) fshift = np.fft.fftshift(f) - magnitude_spectrum = np.log(np.abs(fshift) + 1) # Use log for better visualization - - # Normalize the magnitude spectrum for visualization - magnitude_spectrum = cv2.normalize( - magnitude_spectrum, None, 0, np.iinfo(np.uint8).max, cv2.NORM_MINMAX - ) # Apply mask and inverse FFT - fshift *= mask + fshift *= self.freq_mask f_ishift = np.fft.ifftshift(fshift) img_back = np.fft.ifft2(f_ishift) img_back = np.abs(img_back) - return np.uint8(img_back), np.uint8(magnitude_spectrum) + return np.uint8(img_back) + + def freq_domain(self, img: np.ndarray) -> np.ndarray: + """ + Compute the frequency spectrum of an image. + + Parameters: + img (np.ndarray): The image to process. + + Returns: + np.ndarray: The frequency spectrum of the image. + """ + f = np.fft.fft2(img) + fshift = np.fft.fftshift(f) + magnitude_spectrum = np.log(np.abs(fshift) + 1) + + # Normalize the magnitude spectrum for visualization + magnitude_spectrum = cv2.normalize( + magnitude_spectrum, None, 0, np.iinfo(np.uint8).max, cv2.NORM_MINMAX + ) + + return np.uint8(magnitude_spectrum) - def gen_freq_mask( + def _gen_freq_mask( self, - width: int, - height: int, - freq_mask_config: FreqencyMaskingConfig, ) -> np.ndarray: """ Generate a mask to filter out horizontal and vertical frequencies. A central circular region can be removed to allow low frequencies to pass. """ - crow, ccol = height // 2, width // 2 + crow, ccol = self._height // 2, self._width // 2 # Create an initial mask filled with ones (pass all frequencies) - mask = np.ones((height, width), np.uint8) + mask = np.ones((self._height, self._width), np.uint8) # Zero out a vertical stripe at the frequency center mask[ :, ccol - - freq_mask_config.vertical_BEF_cutoff : ccol - + freq_mask_config.vertical_BEF_cutoff, + - self._freq_mask_config.vertical_BEF_cutoff : ccol + + self._freq_mask_config.vertical_BEF_cutoff, ] = 0 # Zero out a horizontal stripe at the frequency center mask[ crow - - freq_mask_config.horizontal_BEF_cutoff : crow - + freq_mask_config.horizontal_BEF_cutoff, + - self._freq_mask_config.horizontal_BEF_cutoff : crow + + self._freq_mask_config.horizontal_BEF_cutoff, :, ] = 0 # Define spacial low pass filter - y, x = np.ogrid[:height, :width] + y, x = np.ogrid[: self._height, : self._width] center_mask = (x - ccol) ** 2 + ( y - crow - ) ** 2 <= freq_mask_config.spatial_LPF_cutoff_radius**2 + ) ** 2 <= self._freq_mask_config.spatial_LPF_cutoff_radius**2 # Restore the center circular area to allow low frequencies to pass mask[center_mask] = 1 diff --git a/mio/process/video.py b/mio/process/video.py index 74b8734..7c04b75 100644 --- a/mio/process/video.py +++ b/mio/process/video.py @@ -308,8 +308,10 @@ def __init__( freq_mask_config (FreqencyMaskingConfig): The frequency masking configuration. """ super().__init__(name, output_dir) - self.freq_mask_helper = FrequencyMaskHelper() self.freq_mask_config: FreqencyMaskingConfig = freq_mask_config + self.freq_mask_helper = FrequencyMaskHelper( + height=height, width=width, freq_mask_config=freq_mask_config + ) self.freq_domain_frames = [] self._freq_mask: np.ndarray = None self.frame_width: int = width @@ -322,11 +324,7 @@ def freq_mask(self) -> np.ndarray: Get the frequency mask. """ if self._freq_mask is None: - self._freq_mask = self.freq_mask_helper.gen_freq_mask( - freq_mask_config=self.freq_mask_config, - width=self.frame_width, - height=self.frame_height, - ) + self._freq_mask = self.freq_mask_helper.freq_mask return self._freq_mask @property @@ -357,10 +355,8 @@ def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: if input_frame is None: return None if self.freq_mask_config.enable: - freq_filtered_frame, frame_freq_domain = self.freq_mask_helper.apply_freq_mask( - img=input_frame, - mask=self.freq_mask, - ) + freq_filtered_frame = self.freq_mask_helper.process_frame(img=input_frame) + frame_freq_domain = self.freq_mask_helper.freq_domain(img=input_frame) self.append_output_frame(freq_filtered_frame) self.freq_domain_frames.append(frame_freq_domain) From 62d319637759b7c9a31ff9bca1a40ee5a18d9910 Mon Sep 17 00:00:00 2001 From: t-sasatani Date: Thu, 6 Feb 2025 13:49:02 +0900 Subject: [PATCH 6/9] Make all detectors subclass --- mio/process/frame_helper.py | 357 +++++++++++++++--------- mio/process/video.py | 28 +- tests/test_process/test_frame_helper.py | 11 +- 3 files changed, 228 insertions(+), 168 deletions(-) diff --git a/mio/process/frame_helper.py b/mio/process/frame_helper.py index 939cc3b..49b1452 100644 --- a/mio/process/frame_helper.py +++ b/mio/process/frame_helper.py @@ -1,6 +1,5 @@ """ This module contains a helper class for frame operations. -It should be organized in a different way to make it more readable and maintainable. """ from abc import abstractmethod @@ -42,118 +41,199 @@ def process_frame(self, frame: np.ndarray) -> np.ndarray: """ pass + @abstractmethod + def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: + """ + Process a single frame and verify if it is valid. -class NoiseDetectionHelper: + Parameters: + frame (np.ndarray): The frame to process. + + Returns: + Tuple[bool, np.ndarray]: A boolean indicating if the frame is noisy + and the processed frame. + """ + pass + +class GradientNoiseDetector(SingleFrameHelper): """ - Helper class for noise detection and frame processing. + Helper class for gradient noise detection. """ - def __init__(self, height: int, width: int): + def __init__(self, noise_patch_config: NoisePatchConfig): """ - Initialize the FrameProcessor object. - Block size/buffer size will be set by dev config later. + Initialize the GradientNoiseDetectionHelper object. + + Parameters: + threshold (float): The threshold for noise detection. Returns: - NoiseDetectionHelper: A NoiseDetectionHelper object + GradientNoiseDetectionHelper: A GradientNoiseDetectionHelper object. + """ + self.config = noise_patch_config + + def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: """ + Process a single frame and verify if it is valid. - def detect_frame_with_noisy_buffer( + Parameters: + frame (np.ndarray): The frame to process. + + Returns: + Tuple[bool, np.ndarray]: A boolean indicating if the frame is valid + and the processed frame. + """ + noisy, mask = self._detect_with_gradient(frame) + return noisy, mask + + def _detect_with_gradient( self, current_frame: np.ndarray, - previous_frame: Optional[np.ndarray], - config: NoisePatchConfig, ) -> Tuple[bool, np.ndarray]: """ - Unified noise detection method that supports multiple detection algorithms - (mean_error, gradient, etc.). + Detect noise using local contrast (second derivative) in the x-dimension + (along rows, across columns) + + Returns: + Tuple[bool, np.ndarray]: A boolean indicating if the frame is noisy and the noise mask. + """ + noisy_mask = np.zeros_like(current_frame, dtype=np.uint8) + + diff_x = np.diff(current_frame.astype(np.int16), n=2, axis=1) + mean_second_diff = np.abs(diff_x).mean(axis=1) + noisy_mask[mean_second_diff > self.config.threshold, :] = 1 + logger.debug("Row-wise means of second derivative: %s", mean_second_diff) + + # Determine if the frame is noisy (if any rows are marked as noisy) + frame_is_noisy = noisy_mask.any() + + return frame_is_noisy, noisy_mask + +class BlackAreaDetector(SingleFrameHelper): + """ + Helper class for black area detection. + """ + + def __init__(self, noise_patch_config: NoisePatchConfig): + """ + Initialize the BlackAreaDetectionHelper object. Parameters: - current_frame (np.ndarray): The current frame to process. - previous_frame (Optional[np.ndarray]): The previous frame to compare against. - config (NoisePatchConfig): Configuration for noise detection. + threshold (float): The threshold for noise detection. Returns: - Tuple[bool, np.ndarray]: A boolean indicating if the frame is noisy, - and a spatial mask showing noisy regions. - """ - if config.device_config is not None: - px_per_buffer = config.device_config.px_per_buffer - else: - px_per_buffer = 1000 - logger.warning( - f"Device configuration not found. Using default buffer size: {px_per_buffer}" - ) - logger.debug(f"Buffer size: {px_per_buffer}") + BlackAreaDetectionHelper: A BlackAreaDetectionHelper object. + """ + self.config = noise_patch_config - methods = [config.method] - if hasattr(config, "additional_methods") and isinstance(config.additional_methods, list): - methods.extend(config.additional_methods) + def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: + """ + Process a single frame and verify if it is valid. - logger.debug(f"Applying noise detection methods: {methods}") + Parameters: + frame (np.ndarray): The frame to process. - noisy_flag = False - combined_mask = np.zeros_like(current_frame, dtype=np.uint8) - - for method in methods: - if method == "mean_error": - if previous_frame is None: - raise ValueError("mean_error requires a previous frame to compare against") - return self._detect_with_mean_error( - current_frame=current_frame, previous_frame=previous_frame, config=config - ) + Returns: + Tuple[bool, np.ndarray]: A boolean indicating if the frame is valid + and the processed frame. + """ + noisy, mask = self._detect_black_pixels(frame) + return noisy, mask - elif method == "gradient": - noisy, mask = self._detect_with_gradient(current_frame, config) + def _detect_black_pixels( + self, + current_frame: np.ndarray, + ) -> Tuple[bool, np.ndarray]: + """ + Detect black-out noise by checking for black pixels (value 0) over rows of pixels. - elif method == "black_pixels": - noisy, mask = self._detect_black_pixels(current_frame, config) + Returns: + Tuple[bool, np.ndarray]: A boolean indicating if the frame is corrupted and noise mask. + """ + height, width = current_frame.shape + noisy_mask = np.zeros_like(current_frame, dtype=np.uint8) - else: - logger.error(f"Unsupported noise detection method: {method}") - continue # Skip unknown methods + # Read values from YAML config + consecutive_threshold = ( + self.config.black_pixel_consecutive_threshold + ) # How many consecutive pixels must be black + black_pixel_value_threshold = ( + self.config.black_pixel_value_threshold + ) # Max pixel value considered "black" - if noisy: - logger.info(f"Frame detected as noisy using method: {method}") - else: - logger.debug(f"Frame passed as clean using method: {method}") + logger.debug(f"Using black pixel threshold: <= {black_pixel_value_threshold}") + logger.debug(f"Consecutive black pixel threshold: {consecutive_threshold}") - # Combine results - noisy_flag = noisy_flag or noisy - combined_mask = np.maximum(combined_mask, mask) + frame_is_noisy = False # Track if frame should be discarded - return noisy_flag, combined_mask + for y in range(height): + row = current_frame[y, :] # Extract row + consecutive_count = 0 # Counter for consecutive black pixels - def _get_buffer_shape( - self, frame_width: int, frame_height: int, px_per_buffer: int - ) -> list[int]: + for x in range(width): + if row[x] <= black_pixel_value_threshold: # Check if pixel is "black" + consecutive_count += 1 + else: + consecutive_count = 0 # Reset if a non-black pixel is found + + # If we exceed the allowed threshold of consecutive black pixels, discard the frame + if consecutive_count >= consecutive_threshold: + logger.debug( + f"Frame noisy due to {consecutive_count} consecutive black pixels " + f"in row {y}." + ) + noisy_mask[y, :] = 1 # Mark row as noisy + frame_is_noisy = True + break # No need to check further in this row + + return frame_is_noisy, noisy_mask + +class MSENoiseDetector(SingleFrameHelper): + """ + Helper class for mean squared error noise detection. + """ + + def __init__(self, noise_patch_config: NoisePatchConfig): """ - Get the shape of each buffer in a frame. + Initialize the MeanErrorNoiseDetectionHelper object. Parameters: - frame_width (int): The width of the frame. - frame_height (int): The height of the frame. - px_per_buffer (int): The number of pixels per buffer. + threshold (float): The threshold for noise detection. Returns: - list[int]: The shape of each buffer in the frame. + MeanErrorNoiseDetectionHelper: A MeanErrorNoiseDetectionHelper object. """ - buffer_shape = [] + self.config = noise_patch_config + + def register_previous_frame(self, previous_frame: np.ndarray)-> None: + """ + Register the previous frame for mean error calculation. - pixel_index = 0 - while pixel_index < frame_width * frame_height: - buffer_shape.append(int(pixel_index)) - pixel_index += px_per_buffer - logger.debug(f"Split shape: {buffer_shape}") - return buffer_shape + Parameters: + previous_frame (np.ndarray): The previous frame to compare against. + """ + self.previous_frame = previous_frame + + def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: + """ + Process a single frame and verify if it is valid. + + Parameters: + frame (np.ndarray): The frame to process. + Returns: + Tuple[bool, np.ndarray]: A boolean indicating if the frame is valid + and the processed frame. + """ + noisy, mask = self._detect_with_mean_error(frame) + return noisy, mask + def _detect_with_mean_error( self, current_frame: np.ndarray, - previous_frame: np.ndarray, - config: NoisePatchConfig, ) -> Tuple[bool, np.ndarray]: """ - Detect noise using mean error between current and previous buffers. This is deprecated now. + Detect noise using mean error between current and previous frames. Returns: Tuple[bool, np.ndarray]: A boolean indicating if the frame is noisy and the noise mask. @@ -162,9 +242,9 @@ def _detect_with_mean_error( frame_height = current_frame.shape[0] serialized_current = current_frame.flatten().astype(np.int16) - serialized_previous = previous_frame.flatten().astype(np.int16) + serialized_previous = self.previous_frame.flatten().astype(np.int16) buffer_shape = self._get_buffer_shape( - frame_width, frame_height, config.device_config.px_per_buffer + frame_width, frame_height, self.config.device_config.px_per_buffer ) noisy_parts = np.ones_like(serialized_current, np.uint8) @@ -178,7 +258,7 @@ def _detect_with_mean_error( else buffer_shape[buffer_index + 1] ) - comparison_start = buffer_end - config.buffer_split + comparison_start = buffer_end - self.config.buffer_split while comparison_start > buffer_start: mean_error = abs( serialized_current[comparison_start:buffer_end] @@ -187,93 +267,98 @@ def _detect_with_mean_error( logger.debug( f"Mean error for buffer {buffer_index}:" f" pixels {comparison_start}-{buffer_end}: {mean_error}" - f" (threshold: {config.threshold})" + f" (threshold: {self.config.threshold})" ) - if mean_error > config.threshold: + if mean_error > self.config.threshold: noisy_parts[comparison_start:buffer_end] = np.zeros_like( serialized_current[comparison_start:buffer_end], np.uint8 ) any_buffer_has_noise = True break - comparison_start -= config.buffer_split + comparison_start -= self.config.buffer_split noise_patch = noisy_parts.reshape((frame_height, frame_width)) return any_buffer_has_noise, noise_patch - def _detect_with_gradient( - self, - current_frame: np.ndarray, - config: NoisePatchConfig, - ) -> Tuple[bool, np.ndarray]: +class CombinedNoiseDetector(SingleFrameHelper): + """ + Helper class for combined invalid frame detection. + """ + + def __init__(self, noise_patch_config: NoisePatchConfig): """ - Detect noise using local contrast (second derivative) in the x-dimension - (along rows, across columns) + Initialize the FrameProcessor object. + Block size/buffer size will be set by dev config later. Returns: - Tuple[bool, np.ndarray]: A boolean indicating if the frame is noisy and the noise mask. + NoiseDetectionHelper: A NoiseDetectionHelper object """ - noisy_mask = np.zeros_like(current_frame, dtype=np.uint8) - - diff_x = np.diff(current_frame.astype(np.int16), n=2, axis=1) - mean_second_diff = np.abs(diff_x).mean(axis=1) - noisy_mask[mean_second_diff > config.threshold, :] = 1 - logger.debug("Row-wise means of second derivative: %s", mean_second_diff) - - # Determine if the frame is noisy (if any rows are marked as noisy) - frame_is_noisy = noisy_mask.any() - - return frame_is_noisy, noisy_mask - - def _detect_black_pixels( - self, - current_frame: np.ndarray, - config: NoisePatchConfig, - ) -> Tuple[bool, np.ndarray]: + self.config = noise_patch_config + if noise_patch_config.method is None: + raise ValueError("No noise detection methods provided") + self.methods = noise_patch_config.method + + if "mean_error" in self.methods: + self.mse_detector = MSENoiseDetector(noise_patch_config) + if "gradient" in self.methods: + self.gradient_detector = GradientNoiseDetector(noise_patch_config) + if "black_pixels" in self.methods: + self.black_detector = BlackAreaDetector(noise_patch_config) + + def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: """ - Detect black-out noise by checking for black pixels (value 0) over rows of pixels. + Process a single frame and verify if it is valid. + + Parameters: + frame (np.ndarray): The frame to process. Returns: - Tuple[bool, np.ndarray]: A boolean indicating if the frame is corrupted and noise mask. + Tuple[bool, np.ndarray]: A boolean indicating if the frame is valid + and the processed frame. """ - height, width = current_frame.shape - noisy_mask = np.zeros_like(current_frame, dtype=np.uint8) + noisy_flag = False + combined_noisy_area = np.zeros_like(frame, dtype=np.uint8) - # Read values from YAML config - consecutive_threshold = ( - config.black_pixel_consecutive_threshold - ) # How many consecutive pixels must be black - black_pixel_value_threshold = ( - config.black_pixel_value_threshold - ) # Max pixel value considered "black" + if "mean_error" in self.methods: + noisy, noisy_area = self.mse_detector.process_and_verify_frame(frame) + combined_noisy_area = np.maximum(combined_noisy_area, noisy_area) + noisy_flag = noisy_flag or noisy - logger.debug(f"Using black pixel threshold: <= {black_pixel_value_threshold}") - logger.debug(f"Consecutive black pixel threshold: {consecutive_threshold}") + if "gradient" in self.methods: + noisy, noisy_area = self.gradient_detector.process_and_verify_frame(frame) + combined_noisy_area = np.maximum(combined_noisy_area, noisy_area) + noisy_flag = noisy_flag or noisy - frame_is_noisy = False # Track if frame should be discarded + if "black_pixels" in self.methods: + noisy, noisy_area = self.black_detector.process_and_verify_frame(frame) + combined_noisy_area = np.maximum(combined_noisy_area, noisy_area) + noisy_flag = noisy_flag or noisy - for y in range(height): - row = current_frame[y, :] # Extract row - consecutive_count = 0 # Counter for consecutive black pixels + return noisy_flag, combined_noisy_area - for x in range(width): - if row[x] <= black_pixel_value_threshold: # Check if pixel is "black" - consecutive_count += 1 - else: - consecutive_count = 0 # Reset if a non-black pixel is found + def _get_buffer_shape( + self, frame_width: int, frame_height: int, px_per_buffer: int + ) -> list[int]: + """ + Get the shape of each buffer in a frame. - # If we exceed the allowed threshold of consecutive black pixels, discard the frame - if consecutive_count >= consecutive_threshold: - logger.debug( - f"Frame noisy due to {consecutive_count} consecutive black pixels " - f"in row {y}." - ) - noisy_mask[y, :] = 1 # Mark row as noisy - frame_is_noisy = True - break # No need to check further in this row + Parameters: + frame_width (int): The width of the frame. + frame_height (int): The height of the frame. + px_per_buffer (int): The number of pixels per buffer. - return frame_is_noisy, noisy_mask + Returns: + list[int]: The shape of each buffer in the frame. + """ + buffer_shape = [] + pixel_index = 0 + while pixel_index < frame_width * frame_height: + buffer_shape.append(int(pixel_index)) + pixel_index += px_per_buffer + logger.debug(f"Split shape: {buffer_shape}") + return buffer_shape class FrequencyMaskHelper(SingleFrameHelper): """ diff --git a/mio/process/video.py b/mio/process/video.py index 7c04b75..7b4ead7 100644 --- a/mio/process/video.py +++ b/mio/process/video.py @@ -18,7 +18,7 @@ NoisePatchConfig, ) from mio.plots.video import VideoPlotter -from mio.process.frame_helper import FrequencyMaskHelper, NoiseDetectionHelper, ZStackHelper +from mio.process.frame_helper import CombinedNoiseDetector, FrequencyMaskHelper, ZStackHelper logger = init_logger("video") @@ -130,8 +130,8 @@ def __init__( noise_patch_config (NoisePatchConfig): The noise patch configuration. """ super().__init__(name, output_dir) - self.noise_detect_helper = NoiseDetectionHelper(height=height, width=width) self.noise_patch_config: NoisePatchConfig = noise_patch_config + self.noise_detect_helper = CombinedNoiseDetector(noise_patch_config=noise_patch_config) self.previous_frame: Optional[np.ndarray] = None self.noise_patchs: list[np.ndarray] = [] self.noisy_frames: list[np.ndarray] = [] @@ -143,7 +143,7 @@ def __init__( if noise_patch_config.method == "mean_error": logger.warning( "The mean_error method is unstable and not fully tested yet." - " Use gradient method instead." + " Gradient method is recommended." ) def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: @@ -161,31 +161,11 @@ def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: return None if self.noise_patch_config.enable: - if self.noise_patch_config.method == "gradient": - # For the gradient method, analyze the first frame immediately - broken, noise_patch = self.noise_detect_helper.detect_frame_with_noisy_buffer( - input_frame, - None, # No previous frame needed for gradient method - self.noise_patch_config, - ) - else: - # For the mean method, wait until the second frame to start analysis - if self.previous_frame is not None: - broken, noise_patch = self.noise_detect_helper.detect_frame_with_noisy_buffer( - input_frame, - self.previous_frame, - self.noise_patch_config, - ) - else: - # If it's the first frame and the method is mean_error, just store the frame - self.previous_frame = input_frame - self.append_output_frame(input_frame) - return input_frame + broken, noise_patch = self.noise_detect_helper.process_and_verify_frame(input_frame) # Handle noisy frames if not broken: self.append_output_frame(input_frame) - self.previous_frame = input_frame return input_frame else: index = len(self.output_video) + len(self.noise_patchs) diff --git a/tests/test_process/test_frame_helper.py b/tests/test_process/test_frame_helper.py index 8e5ae3a..6c615a1 100644 --- a/tests/test_process/test_frame_helper.py +++ b/tests/test_process/test_frame_helper.py @@ -8,7 +8,7 @@ from pydantic import BaseModel from mio.models.process import DenoiseConfig, NoisePatchConfig -from mio.process.frame_helper import NoiseDetectionHelper +from mio.process.frame_helper import CombinedNoiseDetector from ..conftest import DATA_DIR @@ -82,10 +82,7 @@ def test_noisy_frame_detection(video, ground_truth, noise_detection_method, nois video = cv2.VideoCapture(video) - detector = NoiseDetectionHelper( - height=int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)), - width=int(video.get(cv2.CAP_PROP_FRAME_WIDTH)), - ) + detector = CombinedNoiseDetector(noise_patch_config=config) detected_frames = [] previous_frame = None @@ -98,9 +95,7 @@ def test_noisy_frame_detection(video, ground_truth, noise_detection_method, nois if previous_frame is None: previous_frame = frame - is_noisy, mask = detector.detect_frame_with_noisy_buffer( - current_frame=frame, previous_frame=previous_frame, config=config - ) + is_noisy, mask = detector.process_and_verify_frame(frame=frame) if not is_noisy: previous_frame = frame From 955ac1686970f3e921b3625f6a264e1dbcae34ba Mon Sep 17 00:00:00 2001 From: t-sasatani Date: Thu, 6 Feb 2025 14:35:36 +0900 Subject: [PATCH 7/9] Unify detector interfaces and fix/pass tests --- mio/process/frame_helper.py | 240 ++++++++++-------------- mio/process/video.py | 5 +- mio/process/zstack_helper.py | 65 +++++++ tests/test_process/test_frame_helper.py | 6 +- 4 files changed, 171 insertions(+), 145 deletions(-) create mode 100644 mio/process/zstack_helper.py diff --git a/mio/process/frame_helper.py b/mio/process/frame_helper.py index 49b1452..8b6e359 100644 --- a/mio/process/frame_helper.py +++ b/mio/process/frame_helper.py @@ -3,7 +3,7 @@ """ from abc import abstractmethod -from typing import Optional, Tuple +from typing import Tuple import cv2 import numpy as np @@ -42,19 +42,77 @@ def process_frame(self, frame: np.ndarray) -> np.ndarray: pass @abstractmethod - def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: + def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: """ - Process a single frame and verify if it is valid. + Find the invalid area in a single frame. Parameters: frame (np.ndarray): The frame to process. Returns: - Tuple[bool, np.ndarray]: A boolean indicating if the frame is noisy - and the processed frame. + Tuple[bool, np.ndarray]: A boolean indicating if the frame is invalid + and the processed frame. """ pass + +class CombinedNoiseDetector(SingleFrameHelper): + """ + Helper class for combined invalid frame detection. + """ + + def __init__(self, noise_patch_config: NoisePatchConfig): + """ + Initialize the FrameProcessor object. + Block size/buffer size will be set by dev config later. + + Returns: + NoiseDetectionHelper: A NoiseDetectionHelper object + """ + self.config = noise_patch_config + if noise_patch_config.method is None: + raise ValueError("No noise detection methods provided") + self.methods = noise_patch_config.method + + if "mean_error" in self.methods: + self.mse_detector = MSENoiseDetector(noise_patch_config) + if "gradient" in self.methods: + self.gradient_detector = GradientNoiseDetector(noise_patch_config) + if "black_area" in self.methods: + self.black_detector = BlackAreaDetector(noise_patch_config) + + def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: + """ + Process a single frame and verify if it is valid. + + Parameters: + frame (np.ndarray): The frame to process. + + Returns: + Tuple[bool, np.ndarray]: A boolean indicating if the frame is valid + and the processed frame. + """ + noisy_flag = False + combined_noisy_area = np.zeros_like(frame, dtype=np.uint8) + + if "mean_error" in self.methods: + noisy, noisy_area = self.mse_detector.find_invalid_area(frame) + combined_noisy_area = np.maximum(combined_noisy_area, noisy_area) + noisy_flag = noisy_flag or noisy + + if "gradient" in self.methods: + noisy, noisy_area = self.gradient_detector.find_invalid_area(frame) + combined_noisy_area = np.maximum(combined_noisy_area, noisy_area) + noisy_flag = noisy_flag or noisy + + if "black_area" in self.methods: + noisy, noisy_area = self.black_detector.find_invalid_area(frame) + combined_noisy_area = np.maximum(combined_noisy_area, noisy_area) + noisy_flag = noisy_flag or noisy + + return noisy_flag, combined_noisy_area + + class GradientNoiseDetector(SingleFrameHelper): """ Helper class for gradient noise detection. @@ -71,8 +129,8 @@ def __init__(self, noise_patch_config: NoisePatchConfig): GradientNoiseDetectionHelper: A GradientNoiseDetectionHelper object. """ self.config = noise_patch_config - - def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: + + def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: """ Process a single frame and verify if it is valid. @@ -108,7 +166,8 @@ def _detect_with_gradient( frame_is_noisy = noisy_mask.any() return frame_is_noisy, noisy_mask - + + class BlackAreaDetector(SingleFrameHelper): """ Helper class for black area detection. @@ -126,7 +185,7 @@ def __init__(self, noise_patch_config: NoisePatchConfig): """ self.config = noise_patch_config - def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: + def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: """ Process a single frame and verify if it is valid. @@ -187,7 +246,8 @@ def _detect_black_pixels( break # No need to check further in this row return frame_is_noisy, noisy_mask - + + class MSENoiseDetector(SingleFrameHelper): """ Helper class for mean squared error noise detection. @@ -204,8 +264,9 @@ def __init__(self, noise_patch_config: NoisePatchConfig): MeanErrorNoiseDetectionHelper: A MeanErrorNoiseDetectionHelper object. """ self.config = noise_patch_config - - def register_previous_frame(self, previous_frame: np.ndarray)-> None: + self.previous_frame = None + + def register_previous_frame(self, previous_frame: np.ndarray) -> None: """ Register the previous frame for mean error calculation. @@ -213,8 +274,8 @@ def register_previous_frame(self, previous_frame: np.ndarray)-> None: previous_frame (np.ndarray): The previous frame to compare against. """ self.previous_frame = previous_frame - - def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: + + def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: """ Process a single frame and verify if it is valid. @@ -225,9 +286,12 @@ def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray] Tuple[bool, np.ndarray]: A boolean indicating if the frame is valid and the processed frame. """ + if self.previous_frame is None: + self.previous_frame = frame + return False, np.zeros_like(frame, dtype=np.uint8) noisy, mask = self._detect_with_mean_error(frame) return noisy, mask - + def _detect_with_mean_error( self, current_frame: np.ndarray, @@ -238,12 +302,15 @@ def _detect_with_mean_error( Returns: Tuple[bool, np.ndarray]: A boolean indicating if the frame is noisy and the noise mask. """ + if self.previous_frame is None: + return False, np.zeros_like(current_frame, dtype=np.uint8) + frame_width = current_frame.shape[1] frame_height = current_frame.shape[0] serialized_current = current_frame.flatten().astype(np.int16) serialized_previous = self.previous_frame.flatten().astype(np.int16) - buffer_shape = self._get_buffer_shape( + buffer_shape = FrameSplitter.get_buffer_shape( frame_width, frame_height, self.config.device_config.px_per_buffer ) @@ -281,84 +348,6 @@ def _detect_with_mean_error( noise_patch = noisy_parts.reshape((frame_height, frame_width)) return any_buffer_has_noise, noise_patch -class CombinedNoiseDetector(SingleFrameHelper): - """ - Helper class for combined invalid frame detection. - """ - - def __init__(self, noise_patch_config: NoisePatchConfig): - """ - Initialize the FrameProcessor object. - Block size/buffer size will be set by dev config later. - - Returns: - NoiseDetectionHelper: A NoiseDetectionHelper object - """ - self.config = noise_patch_config - if noise_patch_config.method is None: - raise ValueError("No noise detection methods provided") - self.methods = noise_patch_config.method - - if "mean_error" in self.methods: - self.mse_detector = MSENoiseDetector(noise_patch_config) - if "gradient" in self.methods: - self.gradient_detector = GradientNoiseDetector(noise_patch_config) - if "black_pixels" in self.methods: - self.black_detector = BlackAreaDetector(noise_patch_config) - - def process_and_verify_frame(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: - """ - Process a single frame and verify if it is valid. - - Parameters: - frame (np.ndarray): The frame to process. - - Returns: - Tuple[bool, np.ndarray]: A boolean indicating if the frame is valid - and the processed frame. - """ - noisy_flag = False - combined_noisy_area = np.zeros_like(frame, dtype=np.uint8) - - if "mean_error" in self.methods: - noisy, noisy_area = self.mse_detector.process_and_verify_frame(frame) - combined_noisy_area = np.maximum(combined_noisy_area, noisy_area) - noisy_flag = noisy_flag or noisy - - if "gradient" in self.methods: - noisy, noisy_area = self.gradient_detector.process_and_verify_frame(frame) - combined_noisy_area = np.maximum(combined_noisy_area, noisy_area) - noisy_flag = noisy_flag or noisy - - if "black_pixels" in self.methods: - noisy, noisy_area = self.black_detector.process_and_verify_frame(frame) - combined_noisy_area = np.maximum(combined_noisy_area, noisy_area) - noisy_flag = noisy_flag or noisy - - return noisy_flag, combined_noisy_area - - def _get_buffer_shape( - self, frame_width: int, frame_height: int, px_per_buffer: int - ) -> list[int]: - """ - Get the shape of each buffer in a frame. - - Parameters: - frame_width (int): The width of the frame. - frame_height (int): The height of the frame. - px_per_buffer (int): The number of pixels per buffer. - - Returns: - list[int]: The shape of each buffer in the frame. - """ - buffer_shape = [] - - pixel_index = 0 - while pixel_index < frame_width * frame_height: - buffer_shape.append(int(pixel_index)) - pixel_index += px_per_buffer - logger.debug(f"Split shape: {buffer_shape}") - return buffer_shape class FrequencyMaskHelper(SingleFrameHelper): """ @@ -474,60 +463,29 @@ def _gen_freq_mask( return mask -class ZStackHelper: +class FrameSplitter: """ - Helper class for Z-stack operations. + Helper class for splitting frames into buffers. + Currently only for getting the buffer shape from pixel count. """ - @staticmethod - def get_minimum_projection(image_list: list[np.ndarray]) -> np.ndarray: - """ - Get the minimum projection of a list of images. - - Parameters: - image_list (list[np.ndarray]): A list of images to project. - - Returns: - np.ndarray: The minimum projection of the images. - """ - stacked_images = np.stack(image_list, axis=0) - min_projection = np.min(stacked_images, axis=0) - return min_projection - - @staticmethod - def normalize_video_stack(image_list: list[np.ndarray]) -> list[np.ndarray]: + def get_buffer_shape(frame_width: int, frame_height: int, px_per_buffer: int) -> list[int]: """ - Normalize a stack of images to 0-255 using max and minimum values of the entire stack. - Return a list of images. + Get the shape of each buffer in a frame. Parameters: - image_list (list[np.ndarray]): A list of images to normalize. + frame_width (int): The width of the frame. + frame_height (int): The height of the frame. + px_per_buffer (int): The number of pixels per buffer. Returns: - list[np.ndarray]: The normalized images as a list. + list[int]: The shape of each buffer in the frame. """ + buffer_shape = [] - # Stack images along a new axis (axis=0) - stacked_images = np.stack(image_list, axis=0) - - # Find the global min and max across the entire stack - global_min = stacked_images.min() - global_max = stacked_images.max() - - range_val = max(global_max - global_min, 1e-5) # Set an epsilon value for stability - - # Normalize each frame using the global min and max - normalized_images = [] - for i in range(stacked_images.shape[0]): - normalized_image = cv2.normalize( - stacked_images[i], - None, - 0, - np.iinfo(np.uint8).max, - cv2.NORM_MINMAX, - dtype=cv2.CV_32F, - ) - normalized_image = (stacked_images[i] - global_min) / range_val * np.iinfo(np.uint8).max - normalized_images.append(normalized_image.astype(np.uint8)) - - return normalized_images + pixel_index = 0 + while pixel_index < frame_width * frame_height: + buffer_shape.append(int(pixel_index)) + pixel_index += px_per_buffer + logger.debug(f"Split shape: {buffer_shape}") + return buffer_shape diff --git a/mio/process/video.py b/mio/process/video.py index 7b4ead7..36d1eaf 100644 --- a/mio/process/video.py +++ b/mio/process/video.py @@ -18,7 +18,8 @@ NoisePatchConfig, ) from mio.plots.video import VideoPlotter -from mio.process.frame_helper import CombinedNoiseDetector, FrequencyMaskHelper, ZStackHelper +from mio.process.frame_helper import CombinedNoiseDetector, FrequencyMaskHelper +from mio.process.zstack_helper import ZStackHelper logger = init_logger("video") @@ -161,7 +162,7 @@ def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: return None if self.noise_patch_config.enable: - broken, noise_patch = self.noise_detect_helper.process_and_verify_frame(input_frame) + broken, noise_patch = self.noise_detect_helper.find_invalid_area(input_frame) # Handle noisy frames if not broken: diff --git a/mio/process/zstack_helper.py b/mio/process/zstack_helper.py new file mode 100644 index 0000000..55fa237 --- /dev/null +++ b/mio/process/zstack_helper.py @@ -0,0 +1,65 @@ +""" +Helper module for Z-stack operations. +""" + +import cv2 +import numpy as np + + +class ZStackHelper: + """ + Helper class for Z-stack operations. + """ + + @staticmethod + def get_minimum_projection(image_list: list[np.ndarray]) -> np.ndarray: + """ + Get the minimum projection of a list of images. + + Parameters: + image_list (list[np.ndarray]): A list of images to project. + + Returns: + np.ndarray: The minimum projection of the images. + """ + stacked_images = np.stack(image_list, axis=0) + min_projection = np.min(stacked_images, axis=0) + return min_projection + + @staticmethod + def normalize_video_stack(image_list: list[np.ndarray]) -> list[np.ndarray]: + """ + Normalize a stack of images to 0-255 using max and minimum values of the entire stack. + Return a list of images. + + Parameters: + image_list (list[np.ndarray]): A list of images to normalize. + + Returns: + list[np.ndarray]: The normalized images as a list. + """ + + # Stack images along a new axis (axis=0) + stacked_images = np.stack(image_list, axis=0) + + # Find the global min and max across the entire stack + global_min = stacked_images.min() + global_max = stacked_images.max() + + range_val = max(global_max - global_min, 1e-5) # Set an epsilon value for stability + + # Normalize each frame using the global min and max + normalized_images = [] + for i in range(stacked_images.shape[0]): + normalized_image = cv2.normalize( + stacked_images[i], + None, + 0, + np.iinfo(np.uint8).max, + cv2.NORM_MINMAX, + dtype=cv2.CV_32F, + ) + normalized_image = (stacked_images[i] - global_min) / range_val * np.iinfo(np.uint8).max + normalized_images.append(normalized_image.astype(np.uint8)) + + return normalized_images diff --git a/tests/test_process/test_frame_helper.py b/tests/test_process/test_frame_helper.py index 6c615a1..fea16ed 100644 --- a/tests/test_process/test_frame_helper.py +++ b/tests/test_process/test_frame_helper.py @@ -49,7 +49,7 @@ class NoiseGroundTruth(BaseModel): "noise_detection_method,noise_category", [ ("gradient", GroundTruthCategory.check_pattern), - ("gradient", GroundTruthCategory.blacked_out), + ("black_area", GroundTruthCategory.blacked_out), ("mean_error", GroundTruthCategory.check_pattern), ], ) @@ -68,6 +68,8 @@ def test_noisy_frame_detection(video, ground_truth, noise_detection_method, nois "see https://github.com/Aharoni-Lab/mio/pull/97" ) global_config: DenoiseConfig = DenoiseConfig.from_id("denoise_example_mean_error") + elif noise_detection_method == "black_area": + global_config: DenoiseConfig = DenoiseConfig.from_id("denoise_example") else: raise ValueError("Invalid noise detection method") @@ -95,7 +97,7 @@ def test_noisy_frame_detection(video, ground_truth, noise_detection_method, nois if previous_frame is None: previous_frame = frame - is_noisy, mask = detector.process_and_verify_frame(frame=frame) + is_noisy, mask = detector.find_invalid_area(frame=frame) if not is_noisy: previous_frame = frame From 83abe4e5658eb980aa45f44d4bba462fafca6406 Mon Sep 17 00:00:00 2001 From: t-sasatani Date: Thu, 6 Feb 2025 15:39:51 +0900 Subject: [PATCH 8/9] Change to method list instead of additional methods / docs --- ...reprocessing.md => preprocessing_guide.md} | 2 +- docs/api/process/frame_helper.md | 7 ++ docs/api/process/index.md | 9 +++ docs/api/process/video.md | 7 ++ docs/api/process/zstack_helper.md | 7 ++ docs/index.md | 3 +- mio/data/config/process/denoise_example.yml | 5 +- .../process/denoise_example_mean_error.yml | 2 +- mio/models/process.py | 12 +-- mio/process/frame_helper.py | 80 +++++++------------ mio/process/video.py | 26 ++---- tests/test_process/test_frame_helper.py | 16 ++-- tests/test_process_video.py | 2 +- 13 files changed, 86 insertions(+), 92 deletions(-) rename docs/api/{preprocessing.md => preprocessing_guide.md} (98%) create mode 100644 docs/api/process/frame_helper.md create mode 100644 docs/api/process/index.md create mode 100644 docs/api/process/video.md create mode 100644 docs/api/process/zstack_helper.md diff --git a/docs/api/preprocessing.md b/docs/api/preprocessing_guide.md similarity index 98% rename from docs/api/preprocessing.md rename to docs/api/preprocessing_guide.md index 0992758..4f67fc9 100644 --- a/docs/api/preprocessing.md +++ b/docs/api/preprocessing_guide.md @@ -1,4 +1,4 @@ -# preprocessing video files +# Preprocessing videos ## Filtering of frames with broken buffers (blocks of a frame) diff --git a/docs/api/process/frame_helper.md b/docs/api/process/frame_helper.md new file mode 100644 index 0000000..22442a1 --- /dev/null +++ b/docs/api/process/frame_helper.md @@ -0,0 +1,7 @@ +# Frame helpers + +```{eval-rst} +.. automodule:: mio.process.frame_helper + :members: + :private-members: +``` \ No newline at end of file diff --git a/docs/api/process/index.md b/docs/api/process/index.md new file mode 100644 index 0000000..eb5fd92 --- /dev/null +++ b/docs/api/process/index.md @@ -0,0 +1,9 @@ +# preprocess + +```{toctree} +:caption: Preprocess + +video +frame_helper +zstack_helper +``` \ No newline at end of file diff --git a/docs/api/process/video.md b/docs/api/process/video.md new file mode 100644 index 0000000..1f3d8f8 --- /dev/null +++ b/docs/api/process/video.md @@ -0,0 +1,7 @@ +# Video preprocessor + +```{eval-rst} +.. automodule:: mio.process.video + :members: + :private-members: +``` \ No newline at end of file diff --git a/docs/api/process/zstack_helper.md b/docs/api/process/zstack_helper.md new file mode 100644 index 0000000..4969d87 --- /dev/null +++ b/docs/api/process/zstack_helper.md @@ -0,0 +1,7 @@ +# z-stack helpers + +```{eval-rst} +.. automodule:: mio.process.zstack_helper + :members: + :private-members: +``` \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 6441012..60d2e02 100644 --- a/docs/index.md +++ b/docs/index.md @@ -9,6 +9,7 @@ Generic I/O interfaces for miniscopes :) guide/installation guide/config cli/index +api/preprocessing_guide ``` ```{toctree} @@ -31,9 +32,9 @@ api/exceptions api/plots/index api/utils api/stream_daq -api/preprocessing api/bit_operation api/vendor/index +api/process/index ``` ```{toctree} diff --git a/mio/data/config/process/denoise_example.yml b/mio/data/config/process/denoise_example.yml index fba0dbc..e8b789b 100644 --- a/mio/data/config/process/denoise_example.yml +++ b/mio/data/config/process/denoise_example.yml @@ -3,15 +3,12 @@ mio_model: mio.models.process.DenoiseConfig mio_version: 0.6.1 noise_patch: enable: true - method: gradient #gradient or mean_error or black_pixels - additional_methods: - - black_pixels + method: [gradient, black_area] threshold: 20 #for first method black_pixel_consecutive_threshold: 5 # If 5 consecutive pixels in a row are black, discard the frame black_pixel_value_threshold: 16 # Any pixel value ≤ 16 is considered "black" buffer_size: 5032 buffer_split: 8 - method: gradient # gradient or mean_error threshold: 20 device_config_id: wireless-200px comparison_unit: 1000 diff --git a/mio/data/config/process/denoise_example_mean_error.yml b/mio/data/config/process/denoise_example_mean_error.yml index e4f9bad..d88cd97 100644 --- a/mio/data/config/process/denoise_example_mean_error.yml +++ b/mio/data/config/process/denoise_example_mean_error.yml @@ -3,7 +3,7 @@ mio_model: mio.models.process.DenoiseConfig mio_version: 0.6.1 noise_patch: enable: true - method: mean_error + method: [mean_error] threshold: 40 device_config_id: wireless-200px comparison_unit: 1000 diff --git a/mio/models/process.py b/mio/models/process.py index 56f0188..3bb65cd 100644 --- a/mio/models/process.py +++ b/mio/models/process.py @@ -2,7 +2,7 @@ Module for preprocessing data. """ -from typing import Literal, Optional +from typing import List, Literal, Optional from pydantic import BaseModel, Field @@ -48,16 +48,12 @@ class NoisePatchConfig(BaseModel): default=True, description="Enable patch based noise handling.", ) - method: Literal["mean_error", "gradient"] = Field( + method: List[Literal["mean_error", "gradient", "black_area"]] = Field( default="gradient", description="Method for detecting noise." - "The gradient method is the current recommended method." "gradient: Detection based on the gradient of the frame row." - "mean_error: Detection based on the mean error with the same row of the previous frame.", - ) - additional_methods: Optional[list[str]] = Field( - default=list, - description="Additional noise detection methods to apply after the primary method.", + "mean_error: Detection based on the mean error with the same row of the previous frame." + "black_area: Detection based on the number of consecutive black pixels in a row.", ) threshold: float = Field( default=20, diff --git a/mio/process/frame_helper.py b/mio/process/frame_helper.py index 8b6e359..05d8289 100644 --- a/mio/process/frame_helper.py +++ b/mio/process/frame_helper.py @@ -14,17 +14,17 @@ logger = init_logger("frame_helper") -class SingleFrameHelper: +class BaseSingleFrameHelper: """ - Helper class for single frame operations. + Base class for single frame operations. """ def __init__(self): """ - Initialize the SingleFrameHelper object. + Initialize the BaseSingleFrameHelper object. Returns: - SingleFrameHelper: A SingleFrameHelper object. + BaseSingleFrameHelper: A BaseSingleFrameHelper object. """ pass @@ -56,7 +56,7 @@ def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: pass -class CombinedNoiseDetector(SingleFrameHelper): +class InvalidFrameDetector(BaseSingleFrameHelper): """ Helper class for combined invalid frame detection. """ @@ -89,8 +89,7 @@ def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: frame (np.ndarray): The frame to process. Returns: - Tuple[bool, np.ndarray]: A boolean indicating if the frame is valid - and the processed frame. + Tuple[bool, np.ndarray]: A boolean indicating invalid frames and the invalid area. """ noisy_flag = False combined_noisy_area = np.zeros_like(frame, dtype=np.uint8) @@ -113,7 +112,7 @@ def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: return noisy_flag, combined_noisy_area -class GradientNoiseDetector(SingleFrameHelper): +class GradientNoiseDetector(BaseSingleFrameHelper): """ Helper class for gradient noise detection. """ @@ -168,7 +167,7 @@ def _detect_with_gradient( return frame_is_noisy, noisy_mask -class BlackAreaDetector(SingleFrameHelper): +class BlackAreaDetector(BaseSingleFrameHelper): """ Helper class for black area detection. """ @@ -248,7 +247,7 @@ def _detect_black_pixels( return frame_is_noisy, noisy_mask -class MSENoiseDetector(SingleFrameHelper): +class MSENoiseDetector(BaseSingleFrameHelper): """ Helper class for mean squared error noise detection. """ @@ -292,10 +291,7 @@ def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: noisy, mask = self._detect_with_mean_error(frame) return noisy, mask - def _detect_with_mean_error( - self, - current_frame: np.ndarray, - ) -> Tuple[bool, np.ndarray]: + def _detect_with_mean_error(self, current_frame: np.ndarray) -> Tuple[bool, np.ndarray]: """ Detect noise using mean error between current and previous frames. @@ -305,51 +301,35 @@ def _detect_with_mean_error( if self.previous_frame is None: return False, np.zeros_like(current_frame, dtype=np.uint8) - frame_width = current_frame.shape[1] - frame_height = current_frame.shape[0] + current_flat = current_frame.astype(np.int16).flatten() + previous_flat = self.previous_frame.astype(np.int16).flatten() - serialized_current = current_frame.flatten().astype(np.int16) - serialized_previous = self.previous_frame.flatten().astype(np.int16) - buffer_shape = FrameSplitter.get_buffer_shape( - frame_width, frame_height, self.config.device_config.px_per_buffer - ) + buffer_indices = FrameSplitter.get_buffer_shape( + current_frame.shape[1], current_frame.shape[0], self.config.device_config.px_per_buffer + ) + [ + current_frame.size + ] # Ensure final boundary is included - noisy_parts = np.ones_like(serialized_current, np.uint8) - any_buffer_has_noise = False - - for buffer_index in range(len(buffer_shape)): - buffer_start = 0 if buffer_index == 0 else buffer_shape[buffer_index] - buffer_end = ( - frame_width * frame_height - if buffer_index == len(buffer_shape) - 1 - else buffer_shape[buffer_index + 1] - ) - - comparison_start = buffer_end - self.config.buffer_split - while comparison_start > buffer_start: - mean_error = abs( - serialized_current[comparison_start:buffer_end] - - serialized_previous[comparison_start:buffer_end] - ).mean() - logger.debug( - f"Mean error for buffer {buffer_index}:" - f" pixels {comparison_start}-{buffer_end}: {mean_error}" - f" (threshold: {self.config.threshold})" + noisy_mask = np.ones_like(current_flat, dtype=np.uint8) + has_noise = False + + for start_idx, end_idx in zip(buffer_indices[:-1], buffer_indices[1:]): + for sub_start in range( + end_idx - self.config.buffer_split, start_idx, -self.config.buffer_split + ): + mean_error = np.mean( + np.abs(current_flat[sub_start:end_idx] - previous_flat[sub_start:end_idx]) ) if mean_error > self.config.threshold: - noisy_parts[comparison_start:buffer_end] = np.zeros_like( - serialized_current[comparison_start:buffer_end], np.uint8 - ) - any_buffer_has_noise = True + noisy_mask[sub_start:end_idx] = 0 + has_noise = True break - comparison_start -= self.config.buffer_split - noise_patch = noisy_parts.reshape((frame_height, frame_width)) - return any_buffer_has_noise, noise_patch + return has_noise, noisy_mask.reshape(current_frame.shape) -class FrequencyMaskHelper(SingleFrameHelper): +class FrequencyMaskHelper(BaseSingleFrameHelper): """ Helper class for frequency masking operations. """ diff --git a/mio/process/video.py b/mio/process/video.py index 36d1eaf..38d95de 100644 --- a/mio/process/video.py +++ b/mio/process/video.py @@ -18,7 +18,7 @@ NoisePatchConfig, ) from mio.plots.video import VideoPlotter -from mio.process.frame_helper import CombinedNoiseDetector, FrequencyMaskHelper +from mio.process.frame_helper import FrequencyMaskHelper, InvalidFrameDetector from mio.process.zstack_helper import ZStackHelper logger = init_logger("video") @@ -119,8 +119,6 @@ def __init__( self, name: str, noise_patch_config: NoisePatchConfig, - width: int, - height: int, output_dir: Path, ) -> None: """ @@ -132,8 +130,7 @@ def __init__( """ super().__init__(name, output_dir) self.noise_patch_config: NoisePatchConfig = noise_patch_config - self.noise_detect_helper = CombinedNoiseDetector(noise_patch_config=noise_patch_config) - self.previous_frame: Optional[np.ndarray] = None + self.noise_detect_helper = InvalidFrameDetector(noise_patch_config=noise_patch_config) self.noise_patchs: list[np.ndarray] = [] self.noisy_frames: list[np.ndarray] = [] self.diff_frames: list[np.ndarray] = [] @@ -141,10 +138,9 @@ def __init__( self.output_enable: bool = noise_patch_config.output_result - if noise_patch_config.method == "mean_error": + if "mean_error" in noise_patch_config.method: logger.warning( - "The mean_error method is unstable and not fully tested yet." - " Gradient method is recommended." + "The mean_error method is unstable and not fully tested yet." " Use with caution." ) def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: @@ -157,22 +153,21 @@ def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: Returns: Optional[np.ndarray]: The processed frame. If the frame is noisy, a None is returned. """ - if input_frame is None: return None if self.noise_patch_config.enable: - broken, noise_patch = self.noise_detect_helper.find_invalid_area(input_frame) + invalid, noisy_area = self.noise_detect_helper.find_invalid_area(input_frame) # Handle noisy frames - if not broken: + if not invalid: self.append_output_frame(input_frame) return input_frame else: index = len(self.output_video) + len(self.noise_patchs) logger.info(f"Dropping frame {index} of original video due to noise.") logger.debug(f"Adding noise patch for frame {index}.") - self.noise_patchs.append((noise_patch * np.iinfo(np.uint8).max).astype(np.uint8)) + self.noise_patchs.append((noisy_area * np.iinfo(np.uint8).max).astype(np.uint8)) self.noisy_frames.append(input_frame) self.dropped_frame_indices.append(index) return None @@ -213,7 +208,6 @@ def export_noise_patch(self) -> None: if self.noise_patch_config.output_noise_patch: logger.info(f"Exporting {self.name} noise patch to {self.output_dir}") - print(f"image shape: {self.noise_patchs[0].shape}, dtype: {self.noise_patchs[0].dtype}") self.noise_patch_named_video.export( output_path=self.output_dir / f"{self.name}", fps=20, @@ -294,7 +288,6 @@ def __init__( height=height, width=width, freq_mask_config=freq_mask_config ) self.freq_domain_frames = [] - self._freq_mask: np.ndarray = None self.frame_width: int = width self.frame_height: int = height self.output_enable: bool = freq_mask_config.output_result @@ -304,9 +297,7 @@ def freq_mask(self) -> np.ndarray: """ Get the frequency mask. """ - if self._freq_mask is None: - self._freq_mask = self.freq_mask_helper.freq_mask - return self._freq_mask + return self.freq_mask_helper.freq_mask @property def freq_mask_named_frame(self) -> NamedFrame: @@ -331,7 +322,6 @@ def process_frame(self, input_frame: np.ndarray) -> Optional[np.ndarray]: Returns: Optional[np.ndarray]: The processed frame. If the input is none, a None is returned. - """ if input_frame is None: return None diff --git a/tests/test_process/test_frame_helper.py b/tests/test_process/test_frame_helper.py index fea16ed..e5df4cb 100644 --- a/tests/test_process/test_frame_helper.py +++ b/tests/test_process/test_frame_helper.py @@ -8,7 +8,7 @@ from pydantic import BaseModel from mio.models.process import DenoiseConfig, NoisePatchConfig -from mio.process.frame_helper import CombinedNoiseDetector +from mio.process.frame_helper import InvalidFrameDetector from ..conftest import DATA_DIR @@ -48,9 +48,9 @@ class NoiseGroundTruth(BaseModel): @pytest.mark.parametrize( "noise_detection_method,noise_category", [ - ("gradient", GroundTruthCategory.check_pattern), - ("black_area", GroundTruthCategory.blacked_out), - ("mean_error", GroundTruthCategory.check_pattern), + (["gradient"], GroundTruthCategory.check_pattern), + (["black_area"], GroundTruthCategory.blacked_out), + (["mean_error"], GroundTruthCategory.check_pattern), ], ) def test_noisy_frame_detection(video, ground_truth, noise_detection_method, noise_category): @@ -58,9 +58,9 @@ def test_noisy_frame_detection(video, ground_truth, noise_detection_method, nois Contrast method of noise detection should correctly label frames corrupted by speckled noise """ - if noise_detection_method == "gradient": + if "gradient" in noise_detection_method: global_config: DenoiseConfig = DenoiseConfig.from_id("denoise_example") - elif noise_detection_method == "mean_error": + elif "mean_error" in noise_detection_method: if "extended" in video: # FIXME: resolve this before merging `feat-preprocess` to `main` pytest.xfail( @@ -68,7 +68,7 @@ def test_noisy_frame_detection(video, ground_truth, noise_detection_method, nois "see https://github.com/Aharoni-Lab/mio/pull/97" ) global_config: DenoiseConfig = DenoiseConfig.from_id("denoise_example_mean_error") - elif noise_detection_method == "black_area": + elif "black_area" in noise_detection_method: global_config: DenoiseConfig = DenoiseConfig.from_id("denoise_example") else: raise ValueError("Invalid noise detection method") @@ -84,7 +84,7 @@ def test_noisy_frame_detection(video, ground_truth, noise_detection_method, nois video = cv2.VideoCapture(video) - detector = CombinedNoiseDetector(noise_patch_config=config) + detector = InvalidFrameDetector(noise_patch_config=config) detected_frames = [] previous_frame = None diff --git a/tests/test_process_video.py b/tests/test_process_video.py index 3f03120..1bb76fd 100644 --- a/tests/test_process_video.py +++ b/tests/test_process_video.py @@ -20,7 +20,7 @@ def test_noise_patch_processor(self): denoise_config.noise_patch.enable = True denoise_config.noise_patch.output_result = True - processor = NoisePatchProcessor("denoise_example", denoise_config.noise_patch, self.width, self.height, self.test_dir) + processor = NoisePatchProcessor("denoise_example", denoise_config.noise_patch, self.test_dir) processed_frame = processor.process_frame(self.test_frame) self.assertIsInstance(processed_frame, np.ndarray) From b23571f88d43d481da84a5741d8bc611bd291de1 Mon Sep 17 00:00:00 2001 From: t-sasatani Date: Thu, 6 Feb 2025 15:57:34 +0900 Subject: [PATCH 9/9] Make each detector config submodels --- mio/data/config/process/denoise_example.yml | 20 ++-- .../process/denoise_example_mean_error.yml | 17 ++- mio/models/process.py | 111 ++++++++++++------ mio/process/frame_helper.py | 30 +++-- 4 files changed, 114 insertions(+), 64 deletions(-) diff --git a/mio/data/config/process/denoise_example.yml b/mio/data/config/process/denoise_example.yml index e8b789b..a1c4d02 100644 --- a/mio/data/config/process/denoise_example.yml +++ b/mio/data/config/process/denoise_example.yml @@ -4,15 +4,17 @@ mio_version: 0.6.1 noise_patch: enable: true method: [gradient, black_area] - threshold: 20 #for first method - black_pixel_consecutive_threshold: 5 # If 5 consecutive pixels in a row are black, discard the frame - black_pixel_value_threshold: 16 # Any pixel value ≤ 16 is considered "black" - buffer_size: 5032 - buffer_split: 8 - threshold: 20 - device_config_id: wireless-200px - comparison_unit: 1000 - diff_multiply: 1 + mean_error_config: + threshold: 40 + device_config_id: wireless-200px + buffer_split: 8 + comparison_unit: 1000 + diff_multiply: 1 + gradient_config: + threshold: 20 + black_area_config: + consecutive_threshold: 5 + value_threshold: 16 output_result: true output_noise_patch: true output_diff: true diff --git a/mio/data/config/process/denoise_example_mean_error.yml b/mio/data/config/process/denoise_example_mean_error.yml index d88cd97..c98b1dd 100644 --- a/mio/data/config/process/denoise_example_mean_error.yml +++ b/mio/data/config/process/denoise_example_mean_error.yml @@ -4,10 +4,17 @@ mio_version: 0.6.1 noise_patch: enable: true method: [mean_error] - threshold: 40 - device_config_id: wireless-200px - comparison_unit: 1000 - diff_multiply: 1 + mean_error_config: + threshold: 40 + device_config_id: wireless-200px + buffer_split: 8 + comparison_unit: 1000 + diff_multiply: 1 + gradient_config: + threshold: 20 + black_area_config: + consecutive_threshold: 5 + value_threshold: 16 output_result: true output_noise_patch: true output_diff: true @@ -32,4 +39,4 @@ interactive_display: display_freq_mask: true end_frame: -1 #-1 means all frames output_result: true -output_dir: user_data/output +output_dir: user_data/output \ No newline at end of file diff --git a/mio/models/process.py b/mio/models/process.py index 3bb65cd..4842b8e 100644 --- a/mio/models/process.py +++ b/mio/models/process.py @@ -38,28 +38,14 @@ class MinimumProjectionConfig(BaseModel): ) -class NoisePatchConfig(BaseModel): +class MSEDetectorConfig(BaseModel): """ - Configuration for patch based noise handling. - This is used to detect noisy areas in each frame and drop the frame if it is noisy. + Configraiton for detecting invalid frames based on mean squared error. """ - enable: bool = Field( - default=True, - description="Enable patch based noise handling.", - ) - method: List[Literal["mean_error", "gradient", "black_area"]] = Field( - default="gradient", - description="Method for detecting noise." - "gradient: Detection based on the gradient of the frame row." - "mean_error: Detection based on the mean error with the same row of the previous frame." - "black_area: Detection based on the number of consecutive black pixels in a row.", - ) threshold: float = Field( - default=20, - description="Threshold for detecting noise." - "This is used together with the method to determine whether a frame is noisy." - "Currently, the value needs to be empirically determined.", + ..., + description="Threshold for detecting invalid frames based on mean squared error.", ) device_config_id: Optional[str] = Field( default=None, @@ -67,14 +53,6 @@ class NoisePatchConfig(BaseModel): "This is used in the mean_error method to compare frames" " in the units of data transfer buffers.", ) - black_pixel_consecutive_threshold: int = Field( - default=5, - description="Number of consecutive black pixels required to classify a row as noisy.", - ) - black_pixel_value_threshold: int = Field( - default=20, - description="Pixel intensity value below which a pixel is considered 'black'.", - ) buffer_size: int = Field( default=5032, description="Size of the buffers composing the image." @@ -90,6 +68,75 @@ class NoisePatchConfig(BaseModel): default=1, description="Multiplier for visualizing the diff between the current and previous frame.", ) + + _device_config: Optional[StreamDevConfig] = None + + @property + def device_config(self) -> StreamDevConfig: + """ + Get the device configuration based on the device_config_id. + This is used in the mean_error method to compare frames in the units of data buffers. + """ + if self._device_config is None: + self._device_config = StreamDevConfig.from_any(self.device_config_id) + return self._device_config + + +class GradientDetectorConfig(BaseModel): + """ + Configraiton for detecting invalid frames based on gradient. + """ + + threshold: float = Field( + ..., + description="Threshold for detecting invalid frames based on gradient.", + ) + + +class BlackAreaDetectorConfig(BaseModel): + """ + Configraiton for detecting invalid frames based on black area. + """ + + consecutive_threshold: int = Field( + default=5, + description="Number of consecutive black pixels required to classify a row as noisy.", + ) + value_threshold: int = Field( + default=0, + description="Pixel intensity value below which a pixel is considered 'black'.", + ) + + +class NoisePatchConfig(BaseModel): + """ + Configuration for patch based noise handling. + This is used to detect noisy areas in each frame and drop the frame if it is noisy. + """ + + enable: bool = Field( + default=True, + description="Enable patch based noise handling.", + ) + method: List[Literal["mean_error", "gradient", "black_area"]] = Field( + default="gradient", + description="Method for detecting noise." + "gradient: Detection based on the gradient of the frame row." + "mean_error: Detection based on the mean error with the same row of the previous frame." + "black_area: Detection based on the number of consecutive black pixels in a row.", + ) + mean_error_config: Optional[MSEDetectorConfig] = Field( + default=None, + description="Configuration for detecting invalid frames based on mean squared error.", + ) + gradient_config: Optional[GradientDetectorConfig] = Field( + default=None, + description="Configuration for detecting invalid frames based on gradient.", + ) + black_area_config: Optional[BlackAreaDetectorConfig] = Field( + default=None, + description="Configuration for detecting invalid frames based on black area.", + ) output_result: bool = Field( default=False, description="Output the output video stream.", @@ -110,18 +157,6 @@ class NoisePatchConfig(BaseModel): description="Output the stack of noisy frames as an independent video stream.", ) - _device_config: Optional[StreamDevConfig] = None - - @property - def device_config(self) -> StreamDevConfig: - """ - Get the device configuration based on the device_config_id. - This is used in the mean_error method to compare frames in the units of data buffers. - """ - if self._device_config is None: - self._device_config = StreamDevConfig.from_any(self.device_config_id) - return self._device_config - class FreqencyMaskingConfig(BaseModel): """ diff --git a/mio/process/frame_helper.py b/mio/process/frame_helper.py index 05d8289..e238f6f 100644 --- a/mio/process/frame_helper.py +++ b/mio/process/frame_helper.py @@ -9,7 +9,13 @@ import numpy as np from mio import init_logger -from mio.models.process import FreqencyMaskingConfig, NoisePatchConfig +from mio.models.process import ( + BlackAreaDetectorConfig, + FreqencyMaskingConfig, + GradientDetectorConfig, + MSEDetectorConfig, + NoisePatchConfig, +) logger = init_logger("frame_helper") @@ -75,11 +81,11 @@ def __init__(self, noise_patch_config: NoisePatchConfig): self.methods = noise_patch_config.method if "mean_error" in self.methods: - self.mse_detector = MSENoiseDetector(noise_patch_config) + self.mse_detector = MSENoiseDetector(noise_patch_config.mean_error_config) if "gradient" in self.methods: - self.gradient_detector = GradientNoiseDetector(noise_patch_config) + self.gradient_detector = GradientNoiseDetector(noise_patch_config.gradient_config) if "black_area" in self.methods: - self.black_detector = BlackAreaDetector(noise_patch_config) + self.black_detector = BlackAreaDetector(noise_patch_config.black_area_config) def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: """ @@ -117,7 +123,7 @@ class GradientNoiseDetector(BaseSingleFrameHelper): Helper class for gradient noise detection. """ - def __init__(self, noise_patch_config: NoisePatchConfig): + def __init__(self, config: GradientDetectorConfig): """ Initialize the GradientNoiseDetectionHelper object. @@ -127,7 +133,7 @@ def __init__(self, noise_patch_config: NoisePatchConfig): Returns: GradientNoiseDetectionHelper: A GradientNoiseDetectionHelper object. """ - self.config = noise_patch_config + self.config = config def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: """ @@ -172,7 +178,7 @@ class BlackAreaDetector(BaseSingleFrameHelper): Helper class for black area detection. """ - def __init__(self, noise_patch_config: NoisePatchConfig): + def __init__(self, config: BlackAreaDetectorConfig): """ Initialize the BlackAreaDetectionHelper object. @@ -182,7 +188,7 @@ def __init__(self, noise_patch_config: NoisePatchConfig): Returns: BlackAreaDetectionHelper: A BlackAreaDetectionHelper object. """ - self.config = noise_patch_config + self.config = config def find_invalid_area(self, frame: np.ndarray) -> Tuple[bool, np.ndarray]: """ @@ -213,10 +219,10 @@ def _detect_black_pixels( # Read values from YAML config consecutive_threshold = ( - self.config.black_pixel_consecutive_threshold + self.config.consecutive_threshold ) # How many consecutive pixels must be black black_pixel_value_threshold = ( - self.config.black_pixel_value_threshold + self.config.value_threshold ) # Max pixel value considered "black" logger.debug(f"Using black pixel threshold: <= {black_pixel_value_threshold}") @@ -252,7 +258,7 @@ class MSENoiseDetector(BaseSingleFrameHelper): Helper class for mean squared error noise detection. """ - def __init__(self, noise_patch_config: NoisePatchConfig): + def __init__(self, config: MSEDetectorConfig): """ Initialize the MeanErrorNoiseDetectionHelper object. @@ -262,7 +268,7 @@ def __init__(self, noise_patch_config: NoisePatchConfig): Returns: MeanErrorNoiseDetectionHelper: A MeanErrorNoiseDetectionHelper object. """ - self.config = noise_patch_config + self.config = config self.previous_frame = None def register_previous_frame(self, previous_frame: np.ndarray) -> None: