-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathReadDict.py
77 lines (70 loc) · 2.19 KB
/
ReadDict.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# %% 导入包
from PyQt5.QtCore import QThread, pyqtSignal
from queue import Queue
# %% 定义一个类,读取外部字典
class ReadDict(QThread):
"""
生产者类
"""
# 定义一些类变量
password_num = 0
# 定义一些信号
producing_password = pyqtSignal(str)
producing_password_num = pyqtSignal(int)
def __init__(self, name: str, queue: Queue, dict_path: str, batch_size: int):
"""
构造方法\n
:param name: 线程名称
:param queue: 队列
:param dict_path: 外部字典文件路径字符串
:param batch_size: 批量处理的密码数量
"""
super(ReadDict, self).__init__()
self.setObjectName(name)
self.queue = queue
self.dict_path = dict_path
self.batch_size = batch_size
self.stop_flag = False
def run(self):
"""
运行线程\n
:return:
"""
passwords = []
self.stop_flag = False
for password in self.generate_passwords():
if self.stop_flag:
return None
# 发射信号,当前处理的密码及序号
self.emit_signal(password)
# 将要批量写入的密码组成元组
passwords.append(password)
if len(passwords) == self.batch_size:
self.queue.put(tuple(passwords))
passwords.clear()
self.queue.put(tuple(passwords))
# 生成完全部密码之后要释放毒丸
self.queue.put(tuple())
def stop(self):
"""
终止线程\n
:return:
"""
self.stop_flag = True
def generate_passwords(self) -> [str]:
"""
从文件枚举密码\n
:return: 枚举密码的生成器
"""
with open(self.dict_path) as f:
for password in f:
yield password.strip()
def emit_signal(self, password: str):
"""
向外发射信号\n
:param password: 密码字符串
:return:
"""
self.producing_password.emit(password)
ReadDict.password_num = ReadDict.password_num + 1
self.producing_password_num.emit(ReadDict.password_num)