-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
106 lines (83 loc) · 3.33 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from peewee import SqliteDatabase, Model, CharField, BlobField, MySQLDatabase
from telegram.ext import ContextTypes
from jdatetime import datetime
import os
db = SqliteDatabase("resumes.db")
# db = MySQLDatabase('miliatio_telegrambot', user='miliatio_telegrambot', password='Kjkszpj123!', host='localhost', port=3306, charset='utf8mb4')
class LongBlobField(BlobField):
field_type = 'LONGBLOB'
class Resume(Model):
name = CharField()
phone = CharField()
department = CharField()
specialization = CharField()
resume = LongBlobField()
year = CharField()
month = CharField()
day = CharField()
class Meta:
database = db
def create_tables():
with db:
db.create_tables([Resume])
def insert_resume(context: ContextTypes.DEFAULT_TYPE):
with db:
Resume.create(
name=context.user_data["name"],
phone=context.user_data["phone"],
department=context.user_data["department"],
specialization=context.user_data["specialization"],
resume=context.user_data["resume"],
year=datetime.now().year,
month=datetime.now().month,
day=datetime.now().day,
)
def get_resumes_single_date(context: ContextTypes.DEFAULT_TYPE) -> list:
resumes = []
with db:
if context.user_data["department_or_specialization"] == "department":
resume_query = Resume.select().where(
Resume.department == context.user_data["department"],
Resume.year == context.user_data["year"],
Resume.month == context.user_data["month"],
Resume.day == context.user_data["day"],
)
else:
resume_query = Resume.select().where(
Resume.specialization == context.user_data["specialization"],
Resume.year == context.user_data["year"],
Resume.month == context.user_data["month"],
Resume.day == context.user_data["day"],
)
for resume in resume_query:
resumes.append(resume.__dict__["__data__"])
return resumes
def insert_resume_with_pdf(context: ContextTypes.DEFAULT_TYPE, pdf_file_path: str):
if not os.path.exists(pdf_file_path):
raise FileNotFoundError(f"The file {pdf_file_path} does not exist.")
with open(pdf_file_path, "rb") as file:
pdf_content = file.read()
with db:
Resume.create(
name=context.user_data["name"],
phone=context.user_data["phone"],
department=context.user_data["department"],
specialization=context.user_data["specialization"],
resume=pdf_content,
year=datetime.now().year,
month=datetime.now().month,
day=datetime.now().day,
)
def save_pdf_from_blob(resume_id, output_file_path):
resume = Resume.get(Resume.id == resume_id)
pdf_content = resume.resume
with open(output_file_path, "wb") as pdf_file:
pdf_file.write(pdf_content)
def delete_resume(resume_id):
with db:
Resume.delete().where(Resume.id == resume_id).execute()
class DemoContext:
def __init__(self, user_data):
self.user_data = user_data
if __name__ == "__main__":
create_tables()