forked from ixnet/readwise_telegram_bot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
235 lines (182 loc) · 6.67 KB
/
utils.py
File metadata and controls
235 lines (182 loc) · 6.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import logging
import re
import urllib
import tldextract
import textdistance as td
import requests
import extruct
from telegram import *
from cleantext import clean
from bs4 import BeautifulSoup
_logger = logging.getLogger()
def extract_url_metadata(url: str, timeout: int = 5) -> dict:
"""
Extract metadata from URL using extruct + requests.
Returns dict with keys:
- title: page title (from <title> or metadata)
- og_title: Open Graph title
- description: page description
- image: main image URL
- all_metadata: raw extruct output
"""
result = {
"title": "",
"og_title": "",
"description": "",
"image": "",
"all_metadata": {}
}
try:
response = requests.get(url, timeout=timeout, headers={
'User-Agent': 'Mozilla/5.0 (compatible; ReadwiseTelegramBot/1.0)'
}, verify=True)
response.raise_for_status()
# Extract metadata using extruct
metadata = extruct.extract(response.text, base_url=url)
result["all_metadata"] = metadata
# Try <title> tag first (using BeautifulSoup for HTML parsing)
soup = BeautifulSoup(response.content, 'html.parser')
if soup.title and soup.title.string:
result["title"] = soup.title.string.strip()
# Fallback: Open Graph title
if metadata.get('opengraph'):
og = metadata['opengraph'][0] # First OG object
if og.get('og:title'):
result["og_title"] = og['og:title']
if og.get('og:description'):
result["description"] = og['og:description']
if og.get('og:image'):
result["image"] = og['og:image']
# Fallback: JSON-LD headline/name
if metadata.get('json-ld'):
for item in metadata['json-ld']:
if isinstance(item, dict):
if item.get('headline'):
result["title"] = item['headline']
break
elif item.get('name'):
result["title"] = item['name']
break
except (requests.RequestException, Exception):
pass # Return empty dict on error
return result
def _find_js_redirect(r):
try:
content = r.read().decode()
match = re.search(r'''^\s*window.location.href\s*=\s*["'](http[s*]://.*)["'].*;''', content,
re.IGNORECASE | re.MULTILINE)
if match and match.regs and len(match.regs) > 0:
return match.group(1)
return None
except BaseException as err:
_logger.warning(err)
return None
async def _parse_url(url):
text = "http://" + url if "://" not in url else url
try:
req = urllib.request.Request(text)
req.add_header('User-Agent',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:90.0) Gecko/20100101 Firefox/90.0')
r = urllib.request.urlopen(req, timeout=5)
js_redirect = _find_js_redirect(r)
if js_redirect:
return js_redirect
return r.url
except BaseException as err:
_logger.warning(err)
return text
async def parse_urls(message: Message) -> list[str]:
try:
urls = []
# urls = _extractor.find_urls(text, only_unique=True, check_dns=True)
entities = message.parse_entities([MessageEntity.TEXT_LINK])
if entities:
urls.extend(
[entity.url for entity in entities.keys() if entity.url]
)
entities = message.parse_entities([MessageEntity.URL])
if entities:
urls.extend(
[entity for entity in entities.values()]
)
return urls
except (TypeError, AttributeError) as err:
_logger.warning(err)
return []
if not urls or len(urls) == 0:
return []
return list(set(urls))
def _is_unsupported(url: str):
urls_unsupported = [
'truefriend.com',
'github.com/unchartedsky/pdfthis',
'samsungpop.com/streamdocs',
]
for url_unsupported in urls_unsupported:
if url_unsupported in url:
return True
return False
async def filter_valid_urls(urls: list[str]):
allowed = []
for url in urls:
link = await _parse_url(url)
if not link:
continue
url = "http://" + url if "://" not in link else link
if _is_unsupported(url):
continue
url = url.replace('://blog.naver.com', '://m.blog.naver.com')
url = url.replace('://cafe.naver.com', '://m.cafe.naver.com')
url = url.replace('://post.naver.com', '://m.post.naver.com')
url = re.sub(r'(https://n\.news\.naver\.com/article/)(\d{3}/\d{9})', r'\1print/\2', url)
r = tldextract.extract(url)
if not r:
continue
if not r.registered_domain:
continue
# if r.registered_domain.casefold() == "t.me".casefold():
# continue
if url.startswith('https://t.me'):
pattern = r"https:\/\/t\.me\/.+\/\d+"
result = re.search(pattern, url)
if not result:
continue
allowed.append(url)
return list(set(allowed))
async def is_empty_text(text: str, urls: list[str], entities: tuple[MessageEntity]):
for e in reversed(entities):
text = text[:e.offset] + text[(e.offset + e.length + 1):]
text = clean(text, to_ascii=False, no_emoji=True, no_line_breaks=True, no_punct=True, no_currency_symbols=True)
if len(text) == 0:
return True
for url in urls:
try:
parsed_url = await _parse_url(url)
metadata = extract_url_metadata(parsed_url)
title = metadata["og_title"] or metadata["title"] # Prefer OG title
except Exception:
continue
if not title:
continue
title = clean(title, to_ascii=False, no_emoji=True, no_line_breaks=True, no_punct=True,
no_currency_symbols=True)
if not title:
continue
if text in title:
return True
# NOTE 더 좋은 방법을 나중에 찾자
if len(text) < len(title):
title = title[:len(text)]
else:
text = text[:len(title)]
similarity = td.levenshtein.normalized_similarity(text, title)
if similarity > 0.7:
return True
return False
def get_tags(msg: Message):
tags = msg.parse_entities([MessageEntity.HASHTAG])
if not tags:
tags = msg.parse_caption_entities([MessageEntity.HASHTAG])
if not tags:
return []
return [tag.replace('#', '') for tag in tags.values()]