Skip to content

Commit

Permalink
Version: 0.2
Browse files Browse the repository at this point in the history
Change: Keep image links
Bugfix: Remove all invalid HTML elements
Bugfix: Additional smaller changes
Review: Code improvements
  • Loading branch information
awalon committed Jul 5, 2022
1 parent c2098eb commit b4ae948
Showing 1 changed file with 72 additions and 54 deletions.
126 changes: 72 additions & 54 deletions mailToTelegramForwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"""

__appname__ = "Mail to Telegram Forwarder"
__version__ = "0.1.5"
__version__ = "0.2.0"
__author__ = "Awalon (https://github.com/awalon)"

with warnings.catch_warnings(record=True) as w:
Expand All @@ -63,9 +63,6 @@


class Tool:
def __init__(self,):
pass

@staticmethod
def binary_to_string(value, **kwargs) -> str:
encoding = kwargs.get('encoding')
Expand All @@ -75,7 +72,7 @@ def binary_to_string(value, **kwargs) -> str:
try:
return str(bytes.decode(value, encoding=encoding, errors='replace'))
except UnicodeDecodeError as decode_error:
logging.error("Can not decode value: '", value, "' reason: ", decode_error.reason)
logging.error("Can not decode value: '%s' reason: %s" % (value, decode_error.reason))
return ' ###decoder-error:%s### ' % decode_error.reason
else:
return str(value)
Expand Down Expand Up @@ -104,7 +101,7 @@ def build_error_message(message):


class Config:
config = None
config_parser = None

imap_user = None
imap_password = None
Expand Down Expand Up @@ -135,8 +132,8 @@ def __init__(self, cmd_args):
telegram config and configuration which controls behaviour of this script .
"""
try:
self.config = configparser.ConfigParser()
files = self.config.read(cmd_args.config)
self.config_parser = configparser.ConfigParser()
files = self.config_parser.read(cmd_args.config)
if len(files) == 0:
logging.critical("Error parsing config file: File '%s' not found!" % cmd_args.config)
sys.exit(2)
Expand Down Expand Up @@ -182,21 +179,18 @@ def __init__(self, cmd_args):
def get_config(self, section, key, default=None, value_type=None):
value = default
try:
if self.config.has_section(section):
if self.config.has_option(section, key):
if value_type is not None:
# get value based on type of default value
if value_type is int:
value = self.config.getint(section, key)
elif value_type is float:
value = self.config.getfloat(section, key)
elif value_type is bool:
value = self.config.getboolean(section, key)
else:
value = self.config.get(section, key)
if self.config_parser.has_section(section):
if self.config_parser.has_option(section, key):
# get value based on type of default value
if value_type is int:
value = self.config_parser.getint(section, key)
elif value_type is float:
value = self.config_parser.getfloat(section, key)
elif value_type is bool:
value = self.config_parser.getboolean(section, key)
else:
# use string as default
value = self.config.get(section, key)
value = self.config_parser.get(section, key)
else:
# raise exception as both sections are mandatory sections (Mail + Telegram)
logging.warning("Get config value error for '%s'.'%s' (default: '%s'): Missing section '%s'."
Expand Down Expand Up @@ -276,7 +270,7 @@ class MailData:


class TelegramBot:
config: Config = None
config: Config

def __init__(self, config):
self.config = config
Expand All @@ -299,12 +293,15 @@ def cleanup_html(message: str, images: dict[str, MailAttachment] = None) -> str:
# <pre>pre-formatted fixed-width code block</pre>
# <pre><code class="language-python">pre-formatted fixed-width code block
# written in the Python programming language</code></pre>
# span elements only supported as spoiler elements
# tg_msg = re.sub(r'<\s*span\b', '<span class="tg-spoiler"', tg_msg,
# flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE))

tg_body = message
tg_msg = ''
try:
# extract HTML body to get payload from mail
tg_body = re.sub('.*<body[^>]*>(?P<body>.*)</body>.*$', '\g<body>', tg_body,
tg_body = re.sub(r'.*<body[^>]*>(?P<body>.*)</body>.*$', '\g<body>', tg_body,
flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE))

# remove control chars
Expand All @@ -315,31 +312,40 @@ def cleanup_html(message: str, images: dict[str, MailAttachment] = None) -> str:

# handle inline images
image_seen = {}
for match in re.finditer(r'(?P<img><\s*img\s+[^>]*?\s*src\s*=\s*"cid:(?P<cid>[^"]*)"[^>]*?/?\s*>)',
for match in re.finditer(r'(?P<img><\s*img\s+[^>]*?\s*src\s*=\s*"(?P<src>(?P<proto>(cid|https?):/*(?P<cid>[^"]*)))"[^>]*?/?\s*>)',
tg_body, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE)):
img = match.group('img')
cid = match.group('cid')
if cid == '' or cid in image_seen:
continue
image_seen[cid] = True
proto = match.group('proto')

alt = re.sub(r'^.*?((title|alt)\s*=\s*"(?P<alt>[^"]+)")?.*?$', '\g<alt>',
# extract alt or title value
alt = re.sub(r'^.*?((title|alt)\s*=\s*"(?P<alt>[^"]+)")?.*$', '\g<alt>',
img, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE))
if cid in images:
# add image reference
tg_body = tg_body.replace(img, '${file:%s}' % cid)
# extract alt/title attributes of img elements
images[cid].alt = alt

if 'http' in proto:
# web link
src = match.group('src')
tg_body = tg_body.replace(img, "${img-link:%s|%s}" % (src, alt))
else:
# no file found, use alt text
tg_body = tg_body.replace(img, alt)
# attached/embedded image
cid = match.group('cid')
if cid == '' or cid in image_seen:
continue
image_seen[cid] = True

if cid in images:
# add image reference
tg_body = tg_body.replace(img, '${file:%s}' % cid)
# extract alt/title attributes of img elements
images[cid].alt = alt
else:
# no file found, use alt text
tg_body = tg_body.replace(img, alt)

# use alt text for all images without cid (embedded image)
tg_body = re.sub(r'<\s*img\s+[^>]*?((title|alt)\s*=\s*"(?P<alt>[^"]+)")?[^>]*?/?\s*>', '\g<alt>',
tg_body, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE))

# remove multiple line breaks and spaces (regular Browser logic)
tg_body = re.sub(r'[\r\n]', '', tg_body)
tg_body = re.sub(r'\s\s+', ' ', tg_body).strip()

# remove attributes from elements but href of "a"- elements
Expand All @@ -350,9 +356,6 @@ def cleanup_html(message: str, images: dict[str, MailAttachment] = None) -> str:
tg_msg = re.sub(r'<\s*(?P<elem>script|style)\s*>.*?</\s*(?P=elem)\s*>',
'', tg_msg, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE))

# preserve NBSPs
tg_msg = re.sub(r'&nbsp;', ' ', tg_msg, flags=re.IGNORECASE)

# translate paragraphs and line breaks (block elements)
tg_msg = re.sub(r'</?\s*(?P<elem>(p|div|table|h\d+))\s*>', '\n', tg_msg,
flags=(re.MULTILINE | re.IGNORECASE))
Expand All @@ -364,10 +367,12 @@ def cleanup_html(message: str, images: dict[str, MailAttachment] = None) -> str:
tg_msg = re.sub(r'</\s*li\s*>([^<]*</\s*[ou]l\s*>)?', '\n', tg_msg, flags=(re.MULTILINE | re.IGNORECASE))

# remove unsupported tags
regex_filter_elem = re.compile('<\s*(?!/?(bold|strong|i|em|u|ins|s|strike|del|b|a|code|pre))\s*[^>]*?>',
flags=re.MULTILINE)
# https://core.telegram.org/api/entities
regex_filter_elem = re.compile(
r'<\s*(?!/?\s*(?P<elem>bold|strong|i|em|u|ins|s|strike|del|b|a|code|pre)\b)[^>]*>',
flags=(re.MULTILINE | re.IGNORECASE))
tg_msg = re.sub(regex_filter_elem, ' ', tg_msg)
tg_msg = re.sub(r'</?\s*(img|span)\s*[^>]*>', '', tg_msg, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE))
#tg_msg = re.sub(r'</?\s*(img|span)\s*[^>]*>', '', tg_msg, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE))

# remove empty links
tg_msg = re.sub(r'<\s*a\s*>(?P<link>[^<]*)</\s*a\s*>', '\g<link> ', tg_msg,
Expand All @@ -380,6 +385,12 @@ def cleanup_html(message: str, images: dict[str, MailAttachment] = None) -> str:
# remove empty elements
tg_msg = re.sub(r'<\s*\w\s*>\s*</\s*\w\s*>', ' ', tg_msg, flags=(re.DOTALL | re.MULTILINE))

# remove multiple line breaks
tg_msg = re.sub(r'\s*[\r\n](\s*[\r\n])+', "\n", tg_msg)

# preserve NBSPs
tg_msg = re.sub(r'&nbsp;', ' ', tg_msg, flags=re.IGNORECASE)

except Exception as ex:
logging.critical(Tool.build_error_message(ex))

Expand Down Expand Up @@ -430,14 +441,23 @@ def send_message(self, mails: [MailData]):
)
photo_size: [telegram.PhotoSize] = doc_message.photo
image.tg_id = photo_size[0].file_id
# image.tg_id = doc_message.caption

message = message.replace(
'${file:%s}' % image.id,
'🖼 %s' % title
)
image_no += 1

# write image links
for img_link in re.finditer(r'(\${img-link:(?P<src>[^\|]*)\|(?P<alt>[^}]*)})', message,
flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE)):
src = img_link.group('src')
alt = img_link.group('alt')
message = message.replace(
img_link.groups()[0],
'<a href="%s">🖼 %s</a>' % (src, alt)
)

tg_message = bot.send_message(chat_id=self.config.tg_forward_to_chat_id,
parse_mode=parser,
text=message,
Expand Down Expand Up @@ -481,11 +501,10 @@ def send_message(self, mails: [MailData]):
disable_web_page_preview=False)
finally:
pass
pass

except Exception as send_mail_error:
error_msgs = [Tool.binary_to_string(arg) for arg in send_mail_error.args]
msg = "Failed to send Telegram message (UID: %s) to '%s': %s"\
msg = "Failed to send Telegram message (UID: %s) to '%s': %s" \
% (mail.uid, str(self.config.tg_forward_to_chat_id), ', '.join(error_msgs))
logging.critical(Tool.build_error_message(msg))
try:
Expand All @@ -496,7 +515,6 @@ def send_message(self, mails: [MailData]):
disable_web_page_preview=False)
finally:
pass
pass

except telegram.TelegramError as tg_error:
logging.critical(Tool.build_error_message("Failed to send Telegram message: %s" % tg_error.message))
Expand All @@ -511,8 +529,8 @@ def send_message(self, mails: [MailData]):


class Mail:
mailbox: imaplib2.IMAP4_SSL = None
config: Config = None
mailbox: imaplib2.IMAP4_SSL
config: Config
last_uid: str = ''

previous_error = None
Expand Down Expand Up @@ -736,8 +754,8 @@ def parse_mail(self, uid, mail) -> (MailData, None):
content = re.sub(r'(?P<a></a>(\s*&gt;)?)\s*', '\g<a>\n\n', content, flags=re.MULTILINE)

# remove spaces and line breaks on start and end (enhanced strip)
content = re.sub(r'^[\s\n]*', '', content)
content = re.sub(r'[\s\n]*$', '', content)
content = re.sub(r'^\s*', '', content)
content = re.sub(r'\s*$', '', content)

max_len = self.config.imap_max_length
content_len = len(content)
Expand Down Expand Up @@ -832,9 +850,9 @@ def search_mails(self) -> [MailData]:
# build IMAP search string
search_string = self.config.imap_search
if not search_string:
"(UID " + str(self.last_uid) + ":*)"
search_string = "(UID %s:* UNSEEN)" % str(self.last_uid)
else:
search_string = re.sub(r'\${lastUID}', str(self.last_uid), search_string)
search_string = re.sub(r'\${lastUID}', str(self.last_uid), search_string, flags=re.IGNORECASE)

if re.match(r'.*\bUID\b\s*:.*', search_string) and self.last_uid == '':
# empty mailbox
Expand Down Expand Up @@ -881,7 +899,7 @@ def search_mails(self) -> [MailData]:
try:
rv, data = self.mailbox.uid('fetch', num, '(RFC822)')
if rv != 'OK':
logging.error("ERROR getting message", num)
logging.error("ERROR getting message: %s" % num)
return

msg_raw = data[0][1]
Expand Down

0 comments on commit b4ae948

Please sign in to comment.