Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.4.13: 跟进网页端的正则表达式,完善【多图合并pdf】插件的一些细节。 #187

Merged
merged 10 commits into from
Jan 4, 2024
2 changes: 1 addition & 1 deletion src/jmcomic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# 被依赖方 <--- 使用方
# config <--- entity <--- toolkit <--- client <--- option <--- downloader

__version__ = '2.4.12'
__version__ = '2.4.13'

from .api import *
from .jm_plugin import *
Expand Down
2 changes: 1 addition & 1 deletion src/jmcomic/jm_client_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def transfer_to(self,
JmImageTool.save_resp_img(
self,
path,
need_convert=suffix_not_equal(img_url, path),
need_convert=suffix_not_equal(img_url[:img_url.find("?")], path),
)
else:
# 解密图片并保存文件
Expand Down
23 changes: 14 additions & 9 deletions src/jmcomic/jm_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,16 @@ def decide_image_suffix(self, image: JmImageDetail) -> str:
# 非动图,以配置为先
return self.download.image.suffix or image.img_file_suffix

def decide_image_save_dir(self, photo) -> str:
def decide_image_save_dir(self, photo, ensure_exists=True) -> str:
# 使用 self.dir_rule 决定 save_dir
save_dir = self.dir_rule.decide_image_save_dir(
photo.from_album,
photo
)

mkdir_if_not_exists(save_dir)
if ensure_exists:
mkdir_if_not_exists(save_dir)

return save_dir
Comment on lines +245 to 255
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addition of the ensure_exists parameter to decide_image_save_dir method is a good enhancement for ensuring the existence of the directory before saving files. However, the mkdir_if_not_exists function is called without any error handling. If directory creation fails, it could cause unhandled exceptions during runtime.

- mkdir_if_not_exists(save_dir)
+ try:
+     mkdir_if_not_exists(save_dir)
+ except OSError as e:
+     # Handle exception, possibly log it, and decide on further action
+     # (e.g., re-raise, return a default value, etc.)

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def decide_image_save_dir(self, photo, ensure_exists=True) -> str:
# 使用 self.dir_rule 决定 save_dir
save_dir = self.dir_rule.decide_image_save_dir(
photo.from_album,
photo
)
mkdir_if_not_exists(save_dir)
if ensure_exists:
mkdir_if_not_exists(save_dir)
return save_dir
def decide_image_save_dir(self, photo, ensure_exists=True) -> str:
# 使用 self.dir_rule 决定 save_dir
save_dir = self.dir_rule.decide_image_save_dir(
photo.from_album,
photo
)
if ensure_exists:
try:
mkdir_if_not_exists(save_dir)
except OSError as e:
# Handle exception, possibly log it, and decide on further action
# (e.g., re-raise, return a default value, etc.)
return save_dir


def decide_image_filepath(self, image: JmImageDetail, consider_custom_suffix=True) -> str:
Expand Down Expand Up @@ -504,7 +506,7 @@ def call_all_plugin(self, group: str, safe=True, **extra):

plugin_registry = JmModuleConfig.REGISTRY_PLUGIN
for pinfo in plugin_list:
key, kwargs = pinfo['plugin'], pinfo['kwargs']
key, kwargs = pinfo['plugin'], pinfo.get('kwargs', None) # kwargs为None
plugin_class: Optional[Type[JmOptionPlugin]] = plugin_registry.get(key, None)

ExceptionTool.require_true(plugin_class is not None, f'[{group}] 未注册的plugin: {key}')
Expand All @@ -517,7 +519,7 @@ def call_all_plugin(self, group: str, safe=True, **extra):
else:
raise e

def invoke_plugin(self, plugin_class, kwargs: Any, extra: dict, pinfo: dict):
def invoke_plugin(self, plugin_class, kwargs: Optional[Dict], extra: dict, pinfo: dict):
# 检查插件的参数类型
kwargs = self.fix_kwargs(kwargs)
# 把插件的配置数据kwargs和附加数据extra合并,extra会覆盖kwargs
Expand Down Expand Up @@ -591,15 +593,18 @@ def handle_plugin_jmcomic_exception(self, e, pinfo: dict, kwargs: dict, plugin):
raise e

# noinspection PyMethodMayBeStatic
def fix_kwargs(self, kwargs) -> Dict[str, Any]:
def fix_kwargs(self, kwargs: Optional[Dict]) -> Dict[str, Any]:
"""
kwargs将来要传给方法参数,这要求kwargs的key是str类型,
该方法检查kwargs的key的类型,如果不是str,尝试转为str,不行则抛异常。
"""
ExceptionTool.require_true(
isinstance(kwargs, dict),
f'插件的kwargs参数必须为dict类型,而不能是类型: {type(kwargs)}'
)
if kwargs is None:
kwargs = {}
else:
ExceptionTool.require_true(
isinstance(kwargs, dict),
f'插件的kwargs参数必须为dict类型,而不能是类型: {type(kwargs)}'
)

kwargs: dict
new_kwargs: Dict[str, Any] = {}
Comment on lines +596 to 610
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix_kwargs method now initializes kwargs to an empty dictionary if it is None. This change ensures that the method can handle None values gracefully. The type check for kwargs being a dictionary is also a good practice to avoid runtime type errors. However, the method could be simplified by using a default value for kwargs in the method signature.

- def fix_kwargs(self, kwargs: Optional[Dict]) -> Dict[str, Any]:
+ def fix_kwargs(self, kwargs: Optional[Dict] = {}) -> Dict[str, Any]:
- if kwargs is None:
-     kwargs = {}

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def fix_kwargs(self, kwargs: Optional[Dict]) -> Dict[str, Any]:
"""
kwargs将来要传给方法参数这要求kwargs的key是str类型
该方法检查kwargs的key的类型如果不是str尝试转为str不行则抛异常
"""
ExceptionTool.require_true(
isinstance(kwargs, dict),
f'插件的kwargs参数必须为dict类型,而不能是类型: {type(kwargs)}'
)
if kwargs is None:
kwargs = {}
else:
ExceptionTool.require_true(
isinstance(kwargs, dict),
f'插件的kwargs参数必须为dict类型,而不能是类型: {type(kwargs)}'
)
kwargs: dict
new_kwargs: Dict[str, Any] = {}
def fix_kwargs(self, kwargs: Optional[Dict] = {}) -> Dict[str, Any]:
"""
kwargs将来要传给方法参数这要求kwargs的key是str类型
该方法检查kwargs的key的类型如果不是str尝试转为str不行则抛异常
"""
else:
ExceptionTool.require_true(
isinstance(kwargs, dict),
f'插件的kwargs参数必须为dict类型,而不能是类型: {type(kwargs)}'
)
kwargs: dict
new_kwargs: Dict[str, Any] = {}

Expand Down
141 changes: 99 additions & 42 deletions src/jmcomic/jm_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class JmOptionPlugin:
def __init__(self, option: JmOption):
self.option = option
self.log_enable = True
self.delete_original_file = False

def invoke(self, **kwargs) -> None:
"""
Expand Down Expand Up @@ -66,6 +67,33 @@ def warning_lib_not_install(self, lib: str):
import warnings
warnings.warn(msg)

def execute_deletion(self, paths: List[str]):
"""
删除文件和文件夹
:param paths: 路径列表
"""
if self.delete_original_file is not True:
return

for p in paths:
if file_not_exists(p):
continue

if os.path.isdir(p):
os.rmdir(p)
self.log(f'删除文件夹: {p}', 'remove')
else:
os.remove(p)
self.log(f'删除原文件: {p}', 'remove')

# noinspection PyMethodMayBeStatic
def execute_cmd(self, cmd):
"""
执行shell命令,这里采用简单的实现
:param cmd: shell命令
"""
return os.system(cmd)


class JmLoginPlugin(JmOptionPlugin):
"""
Expand Down Expand Up @@ -342,12 +370,16 @@ def do_zip(self, source_dir, zip_path, all_filepath, msg):
return self.unified_path(source_dir)

def after_zip(self, dir_zip_dict: Dict[str, Optional[str]]):
# 是否要删除所有原文件
if self.delete_original_file is True:
self.delete_all_files_and_empty_dir(
all_downloaded=self.downloader.all_downloaded,
dir_list=list(dir_zip_dict.keys())
)
# 删除所有原文件
dirs = sorted(dir_zip_dict.keys(), reverse=True)
image_paths = [
path
for photo_dict in self.downloader.all_downloaded.values()
for image_list in photo_dict.values()
for path, image in image_list
]
self.execute_deletion(image_paths)
self.execute_deletion(dirs)

# noinspection PyMethodMayBeStatic
def get_zip_path(self, album, photo, filename_rule, suffix, zip_dir):
Expand All @@ -361,28 +393,6 @@ def get_zip_path(self, album, photo, filename_rule, suffix, zip_dir):
filename + fix_suffix(suffix),
)

# noinspection PyMethodMayBeStatic
def delete_all_files_and_empty_dir(self, all_downloaded: dict, dir_list: List[str]):
"""
删除所有文件和文件夹
"""
import os
for photo_dict in all_downloaded.values():
for image_list in photo_dict.values():
for f, _ in image_list:
# check not exist
if file_not_exists(f):
continue

os.remove(f)
self.log(f'删除原文件: {f}', 'remove')

for d in sorted(dir_list, reverse=True):
# check exist
if file_exists(d):
os.rmdir(d)
self.log(f'删除文件夹: {d}', 'remove')


class ClientProxyPlugin(JmOptionPlugin):
plugin_key = 'client_proxy'
Expand Down Expand Up @@ -581,10 +591,7 @@ def main(self):
else:
self.zip_with_password()

# 删除导出的原文件
if self.delete_original_file is True:
for f in self.files:
os.remove(f)
self.execute_deletion(self.files)

def handle_folder(self, fid: str, fname: str):
self.log(f'【收藏夹: {fname}, fid: {fid}】开始获取数据')
Expand Down Expand Up @@ -646,35 +653,77 @@ def zip_with_password(self):
os.chdir(self.save_dir)
cmd = f'7z a "{self.zip_filepath}" "{self.save_dir}" -p{self.zip_password} -mhe=on'
self.require_true(
0 == os.system(cmd),
0 == self.execute_cmd(cmd),
'加密压缩文件失败'
)


class ConvertJpgToPdfPlugin(JmOptionPlugin):
plugin_key = 'j2p'

def check_image_suffix_is_valid(self, std_suffix):
"""
检查option配置的图片后缀转换,目前限制使用Magick时只能搭配jpg
暂不探究Magick是否支持更多图片格式
"""
cur_suffix: Optional[str] = self.option.download.image.suffix

ExceptionTool.require_true(
cur_suffix is not None and cur_suffix.endswith(std_suffix),
'请把图片的后缀转换配置为jpg,不然无法使用Magick!'
f'(当前配置是[{cur_suffix}])\n'
f'配置模板如下: \n'
f'```\n'
f'download:\n'
f' image:\n'
f' suffix: {std_suffix} # 当前配置是{cur_suffix}\n'
f'```'
)
Comment on lines +664 to +681
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check_image_suffix_is_valid method throws an exception if the image suffix is not set to .jpg. This is a restrictive design choice that may not be flexible for future requirements. Consider if this restriction is necessary or if it could be made more flexible.


def invoke(self,
photo: JmPhotoDetail,
downloader=None,
pdf_dir=None,
filename_rule='Pid',
quality=100,
delete_original_file=False,
overwrite_cmd=None,
overwrite_jpg=None,
**kwargs,
):
self.delete_original_file = delete_original_file

# 检查图片后缀配置
suffix = overwrite_jpg or '.jpg'
self.check_image_suffix_is_valid(suffix)

# 处理文件夹配置
filename = DirRule.apply_rule_directly(None, photo, filename_rule)
photo_dir = self.option.decide_image_save_dir(photo)

# 处理生成的pdf文件的路径
if pdf_dir is None:
pdf_dir = photo_dir
else:
pdf_dir = fix_filepath(pdf_dir, True)
mkdir_if_not_exists(pdf_dir)

pdf_filepath = f'{pdf_dir}{filename}.pdf'

def get_cmd(suffix='.jpg'):
return f'magick convert -quality {quality} "{photo_dir}*{suffix}" "{pdf_filepath}"'
pdf_filepath = os.path.join(pdf_dir, f'{filename}.pdf')

# 生成命令
def generate_cmd():
return (
overwrite_cmd or
'magick convert -quality {quality} "{photo_dir}*{suffix}" "{pdf_filepath}"'
).format(
quality=quality,
photo_dir=photo_dir,
suffix=suffix,
pdf_filepath=pdf_filepath,
)

cmd = get_cmd()
self.log(f'execute command: {cmd}')
cmd = generate_cmd()
self.log(f'Execute Command: [{cmd}]')
code = self.execute_cmd(cmd)

self.require_true(
Expand All @@ -686,6 +735,14 @@ def get_cmd(suffix='.jpg'):

self.log(f'Convert Successfully: JM{photo.id} → {pdf_filepath}')

# noinspection PyMethodMayBeStatic
def execute_cmd(self, cmd):
return os.system(cmd)
if downloader is not None:
from .jm_downloader import JmDownloader
downloader: JmDownloader

paths = [
path
for path, image in downloader.all_downloaded[photo.from_album][photo]
]

paths.append(self.option.decide_image_save_dir(photo, ensure_exists=False))
self.execute_deletion(paths)
26 changes: 22 additions & 4 deletions src/jmcomic/jm_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class JmcomicText:
# 點擊喜歡
pattern_html_album_likes = compile(r'<span id="albim_likes_\d+">(.*?)</span>')
# 觀看
pattern_html_album_views = compile(r'<span>(.*?)</span> (次觀看|观看次数)')
pattern_html_album_views = compile(r'<span>(.*?)</span>\n<span>(次觀看|观看次数)</span>')
# 評論(div)
pattern_html_album_comment_count = compile(r'<div class="badge"[^>]*?id="total_video_comments">(\d+)</div>'), 0

Expand Down Expand Up @@ -266,19 +266,37 @@ def add(w=None):
ret.append(w)
char_list.clear()

def find_right_pair(left_pair, i):
stack = [left_pair]
j = i + 1

while j < length and len(stack) != 0:
c = title[j]
if c in bracket_map:
stack.append(c)
elif c == bracket_map[stack[-1]]:
stack.pop()

j += 1

if len(stack) == 0:
return j
else:
return -1
Comment on lines +269 to +285
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The find_right_pair function has been introduced to locate the correct closing character for a given opening character. This function uses a stack to match pairs of characters, which is a common and effective algorithm for this purpose. However, there is no error handling if the closing character is not found (i.e., find_right_pair returns -1). It would be beneficial to add error handling to provide more informative feedback in such cases.

def find_right_pair(left_pair, i):
    stack = [left_pair]
    j = i + 1

    while j < length and len(stack) != 0:
        c = title[j]
        if c in bracket_map:
            stack.append(c)
        elif c == bracket_map[stack[-1]]:
            stack.pop()

        j += 1

    if len(stack) == 0:
        return j
+   else:
+       raise ValueError(f"Unmatched character {left_pair} at position {i}")

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def find_right_pair(left_pair, i):
stack = [left_pair]
j = i + 1
while j < length and len(stack) != 0:
c = title[j]
if c in bracket_map:
stack.append(c)
elif c == bracket_map[stack[-1]]:
stack.pop()
j += 1
if len(stack) == 0:
return j
else:
return -1
def find_right_pair(left_pair, i):
stack = [left_pair]
j = i + 1
while j < length and len(stack) != 0:
c = title[j]
if c in bracket_map:
stack.append(c)
elif c == bracket_map[stack[-1]]:
stack.pop()
j += 1
if len(stack) == 0:
return j
else:
raise ValueError(f"Unmatched character {left_pair} at position {i}")


while i < length:
c = title[i]

if c in bracket_map:
# 上一个单词结束
add()
# 定位右括号
j = title.find(bracket_map[c], i)
j = find_right_pair(c, i)
ExceptionTool.require_true(j != -1, f'未闭合的 {c}{bracket_map[c]}: {title[i:]}')
# 整个括号的单词结束
add(title[i:j + 1])
add(title[i:j])
# 移动指针
i = j + 1
i = j
else:
char_list.append(c)
i += 1
Expand Down
2 changes: 1 addition & 1 deletion usage/workflow_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def main():

def get_option():
# 读取 option 配置文件
option = create_option('../assets/option/option_workflow_download.yml')
option = create_option(os.path.abspath(os.path.join(__file__, '../../assets/option/option_workflow_download.yml')))

# 支持工作流覆盖配置文件的配置
cover_option_config(option)
Expand Down
5 changes: 5 additions & 0 deletions usage/workflow_export_favorites.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@


def prepare_actions_input_and_secrets():
"""
本函数替代对配置文件中的 ${} 的解析函数
目的是为了支持:当没有配置环境变量时,可以找另一个环境变量来用
"""

def env(match: Match) -> str:
name = match[1]
value = os.getenv(name, '')
Expand Down