Skip to content

smtp_batch: add feature for grouping and templating #2610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
22 changes: 22 additions & 0 deletions docs/user/bots.md
Original file line number Diff line number Diff line change
Expand Up @@ -5097,6 +5097,28 @@ [email protected],[email protected];[email protected]
[email protected], Mary <[email protected]>; John <[email protected]>
```

**`additional_grouping_keys`**

(optional, list) By-default events are grouped by the E-Mail-Address into buckets. For each bucket one E-Mail is sent. You may add more fields to group-by here to make potentially more buckets.
Side-effect: Every field that is included in the group-by is ensured to be unique for all events in the bucket and may thus be used for templating.
Note: The keys listed here refer to the keys in the events (in contrast to the CSV column names).
Default: `[]`

**`templating`**

(optional, dict) Defines which strings should be processed by jinja2 templating. For templating only keys which are unique for the complete bucket are available. This always includes the destination address (`source.abuse_contact`) and all keys of `additional_grouping_keys` which are present in the bucket. There is one additional key `current_time` available which holds a `datetime.datetime` object of the current (local) time.
Note: The keys available for templating refer to the keys defined for the events (in contrast to the CSV column names). Still the keys get transformed: each `'.'` gets replaced to `_` in order to make referencing the key in jinja2 easier.
Default: `{subject: False, body: False, attachment: False}`

**`allowed_fieldnames`**

(optional, list) Lists the fields which are included in the csv file. Every element should be also included in `fieldnames_translation` to avoid crashes.

**`fieldnames_translation`**

(optional, dict) Maps each the name of each field listed in `allowed_fieldnames` to a different name to be used in the csv header.
**Warning:** The Bot will crash on sending in case a fieldname is present in an event and in `allowed_fieldnames` but not in `fieldnames_translation`.

**`attachment_name`**

(optional, string) Attachment file name for the outgoing messages. May contain date formatting like this `%Y-%m-%d`. Example: "events_%Y-%m-%d" will appear as "events_2022-12-01.zip". Defaults to "intelmq_%Y-%m-%d".
Expand Down
135 changes: 103 additions & 32 deletions intelmq/bots/outputs/smtp_batch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
import sys
from tempfile import NamedTemporaryFile
import time
from typing import Any, Optional
from typing import Any, Iterable, Optional, Dict, List
import zipfile
from base64 import b64decode
from collections import OrderedDict
from io import StringIO
from hashlib import sha256

from redis.exceptions import TimeoutError

Expand All @@ -25,17 +26,37 @@
except ImportError:
Envelope = None

try:
import jinja2
jinja_env = jinja2.Environment()
except ImportError:
jinja2 = None


def hash_arbitrary(value: Any) -> bytes:
value_bytes = None
if isinstance(value, str):
value_bytes = value.encode("utf-8")
elif isinstance(value, int):
value_bytes = bytes(value)
else:
value_bytes = json.dumps(value, sort_keys=True).encode("utf-8")
return sha256(value_bytes).digest()


@dataclass
class Mail:
key: str
to: str
path: str
count: int
template_data: Dict[str, Any]


class SMTPBatchOutputBot(Bot):
# configurable parameters
additional_grouping_keys: Optional[list] = [] # refers to the event directly
templating: Optional[Dict[str, bool]] = {'subject': False, 'body': False, 'attachment': False}
alternative_mails: Optional[str] = None
bcc: Optional[list] = None
email_from: str = ""
Expand Down Expand Up @@ -75,7 +96,20 @@ def process(self):
if "source.abuse_contact" in message:
field = message["source.abuse_contact"]
for mail in (field if isinstance(field, list) else [field]):
self.cache.redis.rpush(f"{self.key}{mail}", message.to_json())
# - Each event goes into one bucket (equivalent to group-by)
# - The id of each bucket is calculated by hashing all the keys that should be grouped for
# - Hashing ensures the redis-key does not grow indefinitely.
# - In order to avoid collisions, each value is hashed before
# appending to the input for the redis-key-hash
# (could also be solved by special separator which would need
# to be escaped or prepending the length of the value).
h = sha256()
h.update(sha256(mail.encode("utf-8")).digest())
for i in self.additional_grouping_keys:
if i not in message:
continue
h.update(hash_arbitrary(message[i]))
self.cache.redis.rpush(f"{self.key}{h.hexdigest()}", message.to_json())

self.acknowledge_message()

Expand All @@ -90,6 +124,8 @@ def set_cache(self):
def init(self):
if Envelope is None:
raise MissingDependencyError('envelope', '>=2.0.0')
if jinja2 is None:
self.logger.warning("No jinja2 installed. Thus, the templating is deactivated.")
self.set_cache()
self.key = f"{self._Bot__bot_id}:"

Expand Down Expand Up @@ -213,7 +249,7 @@ def set_tester(self, force=True):
print("\nWhat e-mail should I use?")
self.testing_to = input()

def send_mails_to_tester(self, mails):
def send_mails_to_tester(self, mails: List[Mail]):
"""
These mails are going to tester's address. Then prints out their count.
:param mails: list
Expand All @@ -222,7 +258,7 @@ def send_mails_to_tester(self, mails):
count = sum([1 for mail in mails if self.build_mail(mail, send=True, override_to=self.testing_to)])
print(f"{count}× mail sent to: {self.testing_to}\n")

def prepare_mails(self):
def prepare_mails(self) -> Iterable[Mail]:
""" Generates Mail objects """

for mail_record in self.cache.redis.keys(f"{self.key}*")[slice(self.limit_results)]:
Expand Down Expand Up @@ -254,7 +290,11 @@ def prepare_mails(self):
# TODO: worthy to generate on the fly https://github.com/certtools/intelmq/pull/2253#discussion_r1172779620
fieldnames = set()
rows_output = []
src_abuse_contact = None
for row in lines:
# obtain this field only once as it is the same for all lines here
if not src_abuse_contact:
src_abuse_contact = row["source.abuse_contact"]
try:
if threshold and row["time.observation"][:19] < threshold.isoformat()[:19]:
continue
Expand Down Expand Up @@ -283,31 +323,45 @@ def prepare_mails(self):
dict_writer.writerow(dict(zip(ordered_fieldnames, ordered_fieldnames)))
dict_writer.writerows(rows_output)

email_to = str(mail_record[len(self.key):], encoding="utf-8")
count = len(rows_output)
if not count:
path = None
else:
filename = f'{time.strftime("%y%m%d")}_{count}_events'
path = NamedTemporaryFile().name

with zipfile.ZipFile(path, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
try:
zf.writestr(filename + ".csv", output.getvalue())
except Exception:
self.logger.error("Error: Cannot zip mail: %r", mail_record)
continue
if not count or count == 0:
# send no mail if no events are present
continue

# collect all data which must be the same for all events of the
# bucket and thus can be used for templating
template_keys = ['source.abuse_contact']
# only collect if templating is enabled (save the memory otherwise)+
if jinja2 and self.templating and any(self.templating.values()):
template_keys.extend(self.additional_grouping_keys)

template_data = {
k.replace(".", "_"): lines[0][k]
for k in template_keys
if k in lines[0]
}

email_to = template_data["source_abuse_contact"]
filename = f'{time.strftime("%y%m%d")}_{count}_events'
path = NamedTemporaryFile().name

with zipfile.ZipFile(path, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
try:
zf.writestr(filename + ".csv", output.getvalue())
except Exception:
self.logger.error("Error: Cannot zip mail: %r", mail_record)
continue

if email_to in self.alternative_mail:
print(f"Alternative: instead of {email_to} we use {self.alternative_mail[email_to]}")
email_to = self.alternative_mail[email_to]
if email_to in self.alternative_mail:
print(f"Alternative: instead of {email_to} we use {self.alternative_mail[email_to]}")
email_to = self.alternative_mail[email_to]

mail = Mail(mail_record, email_to, path, count)
mail = Mail(mail_record, email_to, path, count, template_data)
# build_mail only used to output metadata of the mail -> send=False -> return None
self.build_mail(mail, send=False)
if count:
yield mail
yield mail

def build_mail(self, mail, send=False, override_to=None):
def build_mail(self, mail: Mail, send=False, override_to=None):
""" creates a MIME message
:param mail: Mail object
:param send: True to send through SMTP, False for just printing the information
Expand All @@ -322,15 +376,32 @@ def build_mail(self, mail, send=False, override_to=None):
intended_to = None
email_to = mail.to
email_from = self.email_from

template_data = mail.template_data

text = self.mail_contents
try:
subject = time.strftime(self.subject)
except ValueError:
subject = self.subject
try:
attachment_name = time.strftime(self.attachment_name)
except ValueError:
attachment_name = self.attachment_name
if jinja2 and self.templating and self.templating.get('body', False):
jinja_tmpl = jinja_env.from_string(text)
text = jinja_tmpl.render(current_time=datetime.datetime.now(), **template_data)

if jinja2 and self.templating and self.templating.get('subject', False):
jinja_tmpl = jinja_env.from_string(self.subject)
subject = jinja_tmpl.render(current_time=datetime.datetime.now(), **template_data)
else:
try:
subject = time.strftime(self.subject)
except ValueError:
subject = self.subject

if jinja2 and self.templating and self.templating.get('attachment', False):
jinja_tmpl = jinja_env.from_string(self.attachment_name)
attachment_name = jinja_tmpl.render(current_time=datetime.datetime.now(), **template_data)
else:
try:
attachment_name = time.strftime(self.attachment_name)
except ValueError:
attachment_name = self.attachment_name

if intended_to:
subject += f" (intended for {intended_to})"
else:
Expand Down
Loading
Loading