From a4e6904fe21ed174ab6a6246a6008853d352309c Mon Sep 17 00:00:00 2001 From: Dale Wahl Date: Thu, 14 Dec 2023 23:51:02 +0100 Subject: [PATCH] update video hasher and create network vis and allow category wall processor to group similar vids --- .../visualisation/image_category_wall.py | 23 +- processors/visualisation/video_hasher.py | 287 ++++++++++++++++-- 2 files changed, 286 insertions(+), 24 deletions(-) diff --git a/processors/visualisation/image_category_wall.py b/processors/visualisation/image_category_wall.py index fc1a763a8..6ff277882 100644 --- a/processors/visualisation/image_category_wall.py +++ b/processors/visualisation/image_category_wall.py @@ -38,6 +38,8 @@ class ImageWallGenerator(BasicProcessor): number_of_ranges = 10 # number of ranges to use for numeric categories + image_datasets = ["image-downloader", "video-hasher-1"] + config = { "image-visuals.max_per_cat": { "type": UserInput.OPTION_TEXT, @@ -62,7 +64,10 @@ def is_compatible_with(cls, module=None, user=None): :param module: Dataset or processor to determine compatibility with """ - return module.type.startswith("image-to-categories") or module.type.startswith("image-downloader") + return module.type.startswith("image-to-categories") or \ + module.type.startswith("image-downloader") or \ + module.type.startswith("video-hasher-1") or \ + module.type.startswith("video-hash-similarity-matrix") @classmethod def get_options(cls, parent_dataset=None, user=None): @@ -123,10 +128,10 @@ def identity_dataset_types(source_dataset): """ Identify dataset types that are compatible with this processor """ - if source_dataset.type.startswith("image-downloader"): + if any([source_dataset.type.startswith(dataset_prefix) for dataset_prefix in ImageWallGenerator.image_datasets]): image_dataset = source_dataset category_dataset = source_dataset.top_parent() - elif source_dataset.get_parent().type.startswith("image-downloader"): + elif any([source_dataset.get_parent().type.startswith(dataset_prefix) for dataset_prefix in ImageWallGenerator.image_datasets]): image_dataset = source_dataset.get_parent() category_dataset = source_dataset else: @@ -147,6 +152,7 @@ def process(self): if image_dataset.num_rows == 0 or category_dataset == 0: self.dataset.finish_with_error("No images/categories available to render to image wall.") return + self.dataset.log(f"Found {image_dataset.type} w/ {image_dataset.num_rows} images and {category_dataset.type} w/ {category_dataset.num_rows} items") category_column = self.parameters.get("category") if category_column is None: @@ -167,7 +173,10 @@ def process(self): staging_area = self.unpack_archive_contents(image_dataset.get_results_path()) # Map post IDs to filenames - if special_case: + if image_dataset.type == "video-hasher-1": + # We know the post ID is the filename.stem as this dataset is derived from the image dataset + filename_map = {filename.stem + ".mp4": filename for filename in staging_area.iterdir()} + elif special_case: # We know the post ID is the filename.stem as this dataset is derived from the image dataset filename_map = {filename.stem: filename for filename in staging_area.iterdir()} else: @@ -176,7 +185,7 @@ def process(self): image_data = json.load(file) filename_map = {post_id: staging_area.joinpath(image.get("filename")) for image in image_data.values() if image.get("success") for post_id in image.get("post_ids")} - + self.dataset.log(filename_map) # Organize posts into categories category_type = None categories = {} @@ -231,6 +240,10 @@ def process(self): raise ProcessorException( f"Mixed category types detected; unable to render image wall (item {i} {post_category})") + if len(categories) == 0: + self.dataset.finish_with_error("No categories found") + return + # Sort collected category results as needed self.dataset.update_status("Sorting categories") if special_case and category_column == "top_categories": diff --git a/processors/visualisation/video_hasher.py b/processors/visualisation/video_hasher.py index c9f290280..5a1dc5e0f 100644 --- a/processors/visualisation/video_hasher.py +++ b/processors/visualisation/video_hasher.py @@ -9,12 +9,14 @@ import shutil import zipfile +import networkx as nx +import numpy as np from videohash import VideoHash from videohash.exceptions import FFmpegNotFound, FFmpegFailedToExtractFrames from backend.lib.processor import BasicProcessor from backend.lib.preset import ProcessorPreset -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, ProcessorException from common.lib.user_input import UserInput from common.config_manager import config @@ -34,6 +36,28 @@ class VideoHasherPreset(ProcessorPreset): description = "Creates video hashes (64 bits/identifiers) to identify near duplicate videos in a dataset based on hash similarity. Uses video only (no audio; see references). This process can take a long time depending on video length, amount, and frames per second." extension = "csv" + @classmethod + def get_options(cls, parent_dataset=None, user=None): + return { + "frame_interval": { + "type": UserInput.OPTION_TEXT, + "help": "Number of frames extracted per second to extract from video", + "tooltip": "The default value is 1 frame per second. For 1 frame per 5 seconds pass 0.2 (1/5). For 5 fps pass 5. For short videos, more frames per second lead to less collision when creating hashes (unsimilar videos being marked as similar), but require more time (2 fps is double the time of 1 fps).", + "coerce_type": float, + "default": 1, + "min": 0, + "max": 5, + }, + "percent": { + "type": UserInput.OPTION_TEXT, + "help": "Percent similar for video hash network", + "tooltip": "A network edge is created between two videos if the hashes representing the collage of frames are at least this percent similar.", + "default": 95, + "min": 0, + "max": 100 + } + } + @classmethod def is_compatible_with(cls, module=None, user=None): """ @@ -57,12 +81,18 @@ def get_processor_pipeline(self): pipeline = [ # first, create colleges (and hashes) with the default settings { - "type": "video-hasher-1" - }, - # then, extract hashes - { - "type": "video-hasher-2" + "type": "video-hasher-1", + "parameters": { + "frame_interval": self.parameters.get("frame_interval", 1), + } }, + # then create hash similarity network + { + "type": "video-hash-network", + "parameters": { + "percent": self.parameters.get("percent", 90), + } + }, ] return pipeline @@ -90,7 +120,7 @@ class VideoHasher(BasicProcessor): type = "video-hasher-1" # job type ID category = "Visual" # category title = "Create Video colleges" # title displayed in UI - description = "Creates video from frames. Can be used to create video hashes to detect similar videos." # description displayed in UI + description = "Creates colleges from video frames. Can be used to create video hashes to detect similar videos." # description displayed in UI extension = "zip" # extension of result file, used internally and in UI options = { @@ -238,34 +268,162 @@ def process(self): self.dataset.update_status(f'Created {num_posts} video hashes and stored video collages') self.write_archive_and_finish(staging_area) +class VideoHashNetwork(BasicProcessor): + """ + Video Hasher Network + + This creates a network graph of the video hashes similarity + """ + type = "video-hash-network" # job type ID + category = "Visual" # category + title = "Create Video hashes network" # title displayed in UI + description = "Creates hashes network to identify duplicate or similar videos." # description displayed in UI + extension = "gexf" # extension of result file, used internally and in UI + + references = [ + "[Video Hash](https://github.com/akamhy/videohash#readme)", + ] + + @classmethod + def get_options(cls, parent_dataset=None, user=None): + return {"percent": { + "type": UserInput.OPTION_TEXT, + "help": "Percent similar", + "default": 90, + "min": 0, + "max": 100 + }} + + @classmethod + def is_compatible_with(cls, module=None, user=None): + """ + Allow on video hasher + """ + return module.type in ["video-hasher-1"] + + def process(self): + """ + + """ + # Extract hash file from archive + with zipfile.ZipFile(self.source_file, "r") as archive_file: + archive_contents = sorted(archive_file.namelist()) + + if "video_hashes.csv" not in archive_contents: + self.dataset.update_status("Unable to obtain hashes from video colleges.", is_final=True) + self.dataset.finish(0) + return + + # Prepare staging area for videos and video tracking + staging_area = self.dataset.get_staging_area() + self.dataset.log('Staging directory location: %s' % staging_area) + # Extract file + archive_file.extract("video_hashes.csv", staging_area) -class VideoHasherTwo(BasicProcessor): + percent_similar = self.parameters.get("percent", 90) / 100 + network = nx.Graph() + + # Calculate similarities + self.dataset.update_status(f"Collecting video hashes for {percent_similar * 100}% similar network") + hashes = [] + identifiers = [] + bit_length = None + with open(staging_area.joinpath("video_hashes.csv"), "r", encoding="utf-8", newline="") as infile: + reader = csv.DictReader(infile) + for row in reader: + video_hash = [int(bit) for bit in row.get('video_hash')[2:]] + video_id = row.get('id') + + # Network + network.add_node(video_id) + + hashes.append(np.array(video_hash)) + identifiers.append(video_id) + + if bit_length is None: + bit_length = len(video_hash) + + self.dataset.update_status(f"Calculating video hash similarities {percent_similar * 100}% similar") + hashes = np.array(hashes) + comparisons = 0 + expected_comparisons = np.math.comb(len(hashes), 2) + for i, current_hash in enumerate(hashes): + # Remove this hash from hashes (as previous calculations have already occured and it is unnecessary to + # compare a hash to itself) + hashes = hashes[1:] + + # Compare current hash + xor_result = np.bitwise_xor(current_hash, hashes) + + # Add comparisons to network + for j, xor_comparison in enumerate(xor_result): + id1 = identifiers[i] + # Node 2 is this iteration plus comparison number PLUS one as the first hash of this set has been + # removed (e.g., very first ID2 is 0+0+1) + id2 = identifiers[i + j + 1] + + # Check if edge exists (it shouldn't!) + edge = (id1, id2) + if edge in network.edges(): + raise ProcessorException('Error in processing hash similarities') + + # Check if xor_comparison is less than requested similarity + # xor compares each bit and returns 0 if a bit is the same and 1 if different + edge_percent_similar = 1 - (xor_comparison.sum() / bit_length) + if edge_percent_similar > percent_similar: + network.add_edge(id1, id2, weight=edge_percent_similar) + + comparisons += 1 + if comparisons % 50000 == 0: + self.dataset.update_status( + "Calculated %i of %i hash similarities" % (comparisons, expected_comparisons)) + self.dataset.update_progress(comparisons / expected_comparisons) + + self.dataset.update_status("Writing network file") + nx.write_gexf(network, self.dataset.get_results_path()) + self.dataset.finish(len(network.nodes)) + + +class VideoHashSimilarities(BasicProcessor): """ - Video Hasher Two + Video Hasher Similarity calculator - This just grabs the output from the previous processor + This creates a network graph of the video hashes similarity """ - type = "video-hasher-2" # job type ID + type = "video-hash-similarity-matrix" # job type ID category = "Visual" # category - title = "Create Video hashes to identify near duplicate videos" # title displayed in UI - description = "Creates hashes from video colleges. These hashes can be compared to identify duplicate or similar videos." # description displayed in UI + title = "Calculates hashes similarities" # title displayed in UI + description = "Creates hashes network to identify duplicate or similar videos." # description displayed in UI extension = "csv" # extension of result file, used internally and in UI references = [ "[Video Hash](https://github.com/akamhy/videohash#readme)", ] + @classmethod + def get_options(cls, parent_dataset=None, user=None): + return {"percent": { + "type": UserInput.OPTION_TEXT, + "help": "Percent similar", + "default": 95, + "min": 0, + "max": 100 + }} + @classmethod def is_compatible_with(cls, module=None, user=None): """ - Allow on videos only + Allow on video hasher """ return module.type in ["video-hasher-1"] def process(self): """ - Copy the video hash csv file from the archive of the parent dataset. + """ + percent_different = (100 - self.parameters.get("percent", 90)) / 100 + + # Extract hash file from archive with zipfile.ZipFile(self.source_file, "r") as archive_file: archive_contents = sorted(archive_file.namelist()) @@ -279,9 +437,100 @@ def process(self): self.dataset.log('Staging directory location: %s' % staging_area) # Extract file archive_file.extract("video_hashes.csv", staging_area) - # Copy file to new dataset results path - shutil.copy(staging_area.joinpath("video_hashes.csv"), self.dataset.get_results_path()) - self.dataset.update_status("Finished") - self.dataset.finish(self.source_dataset.num_rows) + # Read hash file + self.dataset.update_status(f"Collecting video hashes for {self.parameters.get('percent', 90)}% similar network") + hashes = [] + identifiers = {} + bit_length = None + with staging_area.joinpath("video_hashes.csv").open("r", encoding="utf-8", newline="") as infile: + reader = csv.DictReader(infile) + for i, row in enumerate(reader): + video_hash = [int(bit) for bit in row.get('video_hash')[2:]] + video_id = row.get('id') + + hashes.append(np.array(video_hash)) + identifiers[video_id] = i + + if bit_length is None: + bit_length = len(video_hash) + + # Compare each video with rest + self.dataset.update_status(f"Calculating video hash similarities {self.parameters.get('percent', 90)}% similar") + all_video_hashes = np.array(hashes) + similarity_matrix = [] + bits_threshhold = np.ceil(percent_different * bit_length) + self.dataset.log(f"Bits threshold: {bits_threshhold}") + for vid_hash in hashes: + # Compare video hash to all other hashes and check if below threshold + xor_result = np.bitwise_xor(vid_hash, hashes) + similarity_matrix.append([xor_comparison.sum() <= bits_threshhold for xor_comparison in xor_result]) + + self.dataset.update_status(f"Create groups video hash similarities above {self.parameters.get('percent', 90)}% similar") + # These groups can merge and get rather large as similarities can "chain" + # (e.g., A is similar to B, B is similar C, thus A & B & C are similar) + groups = {"no_matches": []} + video_group_key = {} + group_index = 1 + for i, vid in enumerate(all_video_hashes): + create_new_group = False + if sum(similarity_matrix[i]) > 1: + # matches found! identify group + group = [i] + for j in range(len(similarity_matrix[i])): + if similarity_matrix[i][j] == True and j != i: + group.append(j) + + # check if any of the matches are already in a group + group_key_match = set(video_group_key.get(match) for match in group if video_group_key.get(match)) + if len(group_key_match) == 1: + # One group found, add to that group + group_name = group_key_match.pop() + self.dataset.log(f"Adding to group {group_name}: {group}") + groups[group_name] += group + else: + # Either no existing group or groups need to be merged into new group + create_new_group = True + if len(group_key_match) > 1: + self.dataset.log(f"Merging groups to new group for: {group}") # this is not all yet + # Multiple groups found, remove existing groups and add to the new group + for match in group_key_match: + # add to new group + group += groups.pop(match) + # remove duplicates from new group + group = list(set(group)) + else: + # no existing groups found, create new group + self.dataset.log(f"Creating new group for: {group}") + + else: + # no matches found, add to no_matches group + group_name = "no_matches" + group = [i] + groups[group_name] += group + self.dataset.log(f"No matches: {i}") + + if create_new_group: + # Create new group + group_name = "group_" + str(group_index) + groups[group_name] = group + group_index += 1 + + # Update video group keys + [video_group_key.update({video_index: group_name}) for video_index in group] + + # Write new hash file + self.dataset.update_status("Writing new hash file") + with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as outfile: + with staging_area.joinpath("video_hashes.csv").open("r", encoding="utf-8", newline="") as infile: + reader = csv.DictReader(infile) + writer = csv.DictWriter(outfile, fieldnames=reader.fieldnames + ["group_id"]) + writer.writeheader() + for row in reader: + video_id = row.get('id') + group_id = video_group_key.get(identifiers[video_id]) + row.update({"group_id": group_id}) + writer.writerow(row) + + self.dataset.finish(len(video_group_key))