Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Refactoring #128

Merged
merged 23 commits into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.3.3
rev: v0.5.0
hooks:
- id: ruff
- id: ruff-format
95 changes: 41 additions & 54 deletions metrontagger/duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@

import io
from dataclasses import dataclass
from itertools import groupby
from logging import getLogger
from pathlib import Path

import pandas as pd
import questionary
from darkseid.comic import Comic
from imagehash import average_hash
from PIL import Image, UnidentifiedImageError
from tqdm import tqdm

from metrontagger.styles import Styles

LOGGER = getLogger(__name__)


@dataclass
class DuplicateIssue:
Expand Down Expand Up @@ -78,38 +81,31 @@ def _image_hashes(self: Duplicates) -> list[dict[str, any]]:
"""

hashes_lst = []
for item in self._file_lst:
questionary.print("Getting page hashes.", style=Styles.INFO)
for item in tqdm(self._file_lst):
comic = Comic(item)
if not comic.is_writable():
questionary.print(f"'{comic}' is not writable. Skipping...")
LOGGER.error(f"{comic} is not writable.")
continue
questionary.print(
f"Attempting to get page hashes for '{comic}'.",
style=Styles.WARNING,
)
for i in range(comic.get_number_of_pages()):
pages = [comic.get_page(i) for i in range(comic.get_number_of_pages())]
for i, page in enumerate(pages):
try:
with Image.open(io.BytesIO(comic.get_page(i))) as img:
try:
img_hash = average_hash(img)
except OSError:
questionary.print(
f"Unable to get image hash for page {i} of '{comic}'",
style=Styles.ERROR,
)
continue
with Image.open(io.BytesIO(page)) as img:
img_hash = average_hash(img)
image_info = {
"path": str(comic.path),
"index": i,
"hash": str(img_hash),
}
hashes_lst.append(image_info)
except UnidentifiedImageError:
questionary.print(
f"UnidentifiedImageError: Skipping page {i} of '{comic}'",
style=Styles.ERROR,
except (UnidentifiedImageError, OSError) as e:
error_message = (
f"UnidentifiedImageError: Skipping page {i} of '{comic}'"
if isinstance(e, UnidentifiedImageError)
else f"Unable to get image hash for page {i} of '{comic}'"
)
continue
LOGGER.exception("%s", error_message)

return hashes_lst

def _get_page_hashes(self: Duplicates) -> pd.DataFrame:
Expand All @@ -124,38 +120,36 @@ def _get_page_hashes(self: Duplicates) -> pd.DataFrame:

comic_hashes = self._image_hashes()
self._data_frame = pd.DataFrame(comic_hashes)
hashes = self._data_frame["hash"]
return self._data_frame[hashes.isin(hashes[hashes.duplicated()])].sort_values("hash")
return self._data_frame[self._data_frame["hash"].duplicated(keep=False)].sort_values(
"hash"
)

def get_distinct_hashes(self: Duplicates) -> list[str]:
"""Method to get distinct hash values.

This method retrieves page hashes, identifies distinct hash values, and returns a list of unique hash values.


Returns:
list[str]: A list of distinct hash values.
"""

page_hashes = self._get_page_hashes()
return [key for key, _group in groupby(page_hashes["hash"])]
return list(set(page_hashes["hash"]))

def get_comic_info_for_distinct_hash(self: Duplicates, img_hash: str) -> DuplicateIssue:
def get_comic_info_for_distinct_hash(self: Duplicates, img_hash: str) -> DuplicateIssue: # noqa: ARG002
"""Method to retrieve comic information for a distinct hash value.

This method takes a hash value, finds the corresponding comic information in the data frame, and returns a
DuplicateIssue object with the comic's path and page index.


Args:
img_hash: str: The hash value to search for in the data frame.

Returns:
DuplicateIssue: A DuplicateIssue object representing the comic information.
"""

idx = self._data_frame.loc[self._data_frame["hash"] == img_hash].index[0]
row = self._data_frame.iloc[idx]
row = self._data_frame.query("hash == @img_hash").iloc[0]
return DuplicateIssue(row["path"], row["index"])

def get_comic_list_from_hash(self: Duplicates, img_hash: str) -> list[DuplicateIssue]:
Expand All @@ -164,19 +158,16 @@ def get_comic_list_from_hash(self: Duplicates, img_hash: str) -> list[DuplicateI
This method retrieves comic information from the data frame based on the hash value and returns a list of
DuplicateIssue objects.


Args:
img_hash: str: The hash value to search for in the data frame.

Returns:
list[DuplicateIssue]: A list of DuplicateIssue objects representing comics with the specified hash value.
"""

comic_lst = []
for i in self._data_frame.loc[self._data_frame["hash"] == img_hash].index:
row = self._data_frame.iloc[i]
comic_lst.append(DuplicateIssue(row["path"], [row["index"]]))
return comic_lst
filtered_df = self._data_frame[self._data_frame["hash"] == img_hash]
return [
DuplicateIssue(row["path"], [row["index"]]) for _, row in filtered_df.iterrows()
]

@staticmethod
def delete_comic_pages(dups_lst: list[DuplicateIssue]) -> None:
Expand All @@ -185,34 +176,30 @@ def delete_comic_pages(dups_lst: list[DuplicateIssue]) -> None:
This method iterates over a list of DuplicateIssue objects, attempts to remove the specified pages from each
comic, and provides feedback on the success of the operation.


Args:
dups_lst: list[DuplicateIssue]: A list of DuplicateIssue objects representing duplicate pages to be removed.

Returns:
None
"""
results = [
(comic, comic.remove_pages(item.pages_index))
for item in tqdm(dups_lst)
for comic in [Comic(item.path_)]
]

for item in dups_lst:
comic = Comic(item.path_)
if comic.remove_pages(item.pages_index):
questionary.print(
f"Removed duplicate pages from {comic}",
style=Styles.SUCCESS,
)
else:
questionary.print(
f"Failed to remove duplicate pages from {comic}",
style=Styles.WARNING,
)
for comic, success in results:
questionary.print(
f"{'Removed' if success else 'Failed to remove'} duplicate pages from {comic}",
style=Styles.SUCCESS if success else Styles.WARNING,
)

@staticmethod
def show_image(first_comic: DuplicateIssue) -> None:
"""Method to show the user an image from a comic.

This method takes a DuplicateIssue object, retrieves the image data, and displays the image to the user.


Args:
first_comic: DuplicateIssue: The DuplicateIssue object representing the comic to display.

Expand All @@ -224,11 +211,11 @@ def show_image(first_comic: DuplicateIssue) -> None:
# noinspection PyTypeChecker
img_data = comic.get_page(first_comic.pages_index)
try:
image = Image.open(io.BytesIO(img_data))
with io.BytesIO(img_data) as img_io:
image = Image.open(img_io)
image.show()
except UnidentifiedImageError:
questionary.print(
f"Unable to show image from {comic}.",
style=Styles.WARNING,
)
return
image.show()
105 changes: 40 additions & 65 deletions metrontagger/filerenamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,8 @@ def _remove_empty_separators(value: str) -> str:
Returns:
str: The string with empty separators removed.
"""

value = re.sub(r"\(\s*[-:]*\s*\)", "", value)
value = re.sub(r"\[\s*[-:]*\s*]", "", value)
return re.sub(r"\{\s*[-:]*\s*}", "", value)
pattern = r"(\(\s*[-:]*\s*\)|\[\s*[-:]*\s*]|\{\s*[-:]*\s*})"
return re.sub(pattern, "", value)

@staticmethod
def _remove_duplicate_hyphen_underscore(value: str) -> str:
Expand All @@ -189,10 +187,7 @@ def _remove_duplicate_hyphen_underscore(value: str) -> str:
Returns:
str: The string with duplicate hyphens and underscores cleaned up.
"""

value = re.sub(r"[-_]{2,}\s+", "-- ", value)
value = re.sub(r"(\s--)+", " --", value)
return re.sub(r"(\s-)+", " -", value)
return re.sub(r"([-_]){2,}", r"\1", value)

def smart_cleanup_string(self: FileRenamer, new_name: str) -> str:
"""Perform smart cleanup on the provided new name string.
Expand All @@ -207,20 +202,17 @@ def smart_cleanup_string(self: FileRenamer, new_name: str) -> str:
str: The cleaned up string after applying smart cleanup operations.
"""

# remove empty braces,brackets, parentheses
# remove empty braces, brackets, parentheses
new_name = self._remove_empty_separators(new_name)

# remove duplicate spaces
new_name = " ".join(new_name.split())

# remove remove duplicate -, _,
# remove duplicate spaces, duplicate hyphens and underscores, and trailing dashes
new_name = re.sub(r"\s+", " ", new_name) # remove duplicate spaces
new_name = self._remove_duplicate_hyphen_underscore(new_name)
new_name = re.sub(
r"-{1,2}\s*$", "", new_name
) # remove dash or double dash at end of line

# remove dash or double dash at end of line
new_name = re.sub(r"-{1,2}\s*$", "", new_name)

# remove duplicate spaces (again!)
return " ".join(new_name.split())
return new_name.strip()

def determine_name(self: FileRenamer, filename: Path) -> str | None:
"""Determine the new filename based on metadata.
Expand All @@ -240,12 +232,10 @@ def determine_name(self: FileRenamer, filename: Path) -> str | None:
md = self.metadata
new_name = self.template

new_name = self.replace_token(
new_name, md.series.name if md.series is not None else "Unknown", "%series%"
)
new_name = self.replace_token(
new_name, md.series.volume if md.series is not None else 0, "%volume%"
)
series_name = md.series.name if md.series else "Unknown"
series_volume = md.series.volume if md.series else 0
new_name = self.replace_token(new_name, series_name, "%series%")
new_name = self.replace_token(new_name, series_volume, "%volume%")

if md.issue is None:
issue_str = None
Expand All @@ -257,54 +247,39 @@ def determine_name(self: FileRenamer, filename: Path) -> str | None:

new_name = self.replace_token(new_name, md.issue_count, "%issuecount%")
new_name = self.replace_token(
new_name, md.cover_date.year if md.cover_date is not None else "Unknown", "%year%"
new_name, md.cover_date.year if md.cover_date else "Unknown", "%year%"
)
new_name = self.replace_token(
new_name, "Unknown" if md.publisher is None else md.publisher.name, "%publisher%"
)
if md.cover_date is not None:
new_name = self.replace_token(new_name, md.cover_date.month, "%month%")
month_name = None
if md.cover_date is not None and (
md.cover_date.month is not None
and (
(isinstance(md.cover_date.month, str) and str(md.cover_date.month).isdigit())
or isinstance(md.cover_date.month, int)
)
and int(md.cover_date.month) in range(1, 13)
):
date_time = datetime.datetime( # noqa: DTZ001
1970,
int(md.cover_date.month),
1,
0,
0,
)
month_name = date_time.strftime("%B")
new_name = self.replace_token(new_name, month_name, "%month_name%")

new_name = self.replace_token(
new_name,
md.alternate_series,
"%alternateseries%",
)
new_name = self.replace_token(
new_name,
md.alternate_number,
"%alternatenumber%",
)
if md.cover_date:
new_name = self.replace_token(new_name, md.cover_date.month, "%month%")
if (
isinstance(md.cover_date.month, str | int)
and 1 <= int(md.cover_date.month) <= 12 # noqa: PLR2004
):
month_name = datetime.datetime(1970, int(md.cover_date.month), 1).strftime( # noqa: DTZ001
"%B"
)
else:
month_name = None
new_name = self.replace_token(new_name, month_name, "%month_name%")

new_name = self.replace_token(new_name, md.alternate_series, "%alternateseries%")
new_name = self.replace_token(new_name, md.alternate_number, "%alternatenumber%")
new_name = self.replace_token(new_name, md.alternate_count, "%alternatecount%")
new_name = self.replace_token(new_name, md.imprint, "%imprint%")
if md.series is not None:
match md.series.format:
case "Hard Cover":
new_name = self.replace_token(new_name, "HC", "%format%")
case "Trade Paperback":
new_name = self.replace_token(new_name, "TPB", "%format%")
case "Digital Chapters":
new_name = self.replace_token(new_name, "Digital Chapter", "%format%")
case _:
new_name = self.replace_token(new_name, "", "%format%")

if md.series:
format_mapping = {
"Hard Cover": "HC",
"Trade Paperback": "TPB",
"Digital Chapters": "Digital Chapter",
}
format_value = format_mapping.get(md.series.format, "")
new_name = self.replace_token(new_name, format_value, "%format%")

new_name = self.replace_token(new_name, md.age_rating, "%maturityrating%")
new_name = self.replace_token(new_name, md.series_group, "%seriesgroup%")
new_name = self.replace_token(new_name, md.scan_info, "%scaninfo%")
Expand Down
Loading
Loading