|
| 1 | +from typing import Optional |
| 2 | +import requests |
| 3 | +import time |
| 4 | +import uuid |
| 5 | + |
| 6 | + |
| 7 | +class Maildrop(object): |
| 8 | + _apikey = "QM8VTHrLR2JloKTJMZ3N6Qa93FVsx8LapKCzEjui" |
| 9 | + _inbox = "https://api.maildrop.cc/v2/mailbox" |
| 10 | + _search_fields = ("sender", "subject", "body",) |
| 11 | + |
| 12 | + def __init__( |
| 13 | + self, |
| 14 | + address: Optional[str] = None, |
| 15 | + message_filter: Optional[str] = None |
| 16 | + ) -> None: |
| 17 | + """Create an instance of the Maildrop.cc client. |
| 18 | +
|
| 19 | + :param address: Optional address to monitor on maildrop. |
| 20 | + Can be a full email with or without domain. |
| 21 | + Defaults to a random uuid. |
| 22 | + Note the address if you plan to re-use it in the future. |
| 23 | + :param message_filter: Optional text to search for in |
| 24 | + sender, subject, and body fields. |
| 25 | + """ |
| 26 | + if address and "@" in address: |
| 27 | + # drop the domain from the email address |
| 28 | + self._address = address.rsplit("@", 1)[0] |
| 29 | + elif address: |
| 30 | + # specifying a specific maildrop.cc email for easy re-use |
| 31 | + self._address = address |
| 32 | + else: |
| 33 | + # generate a completely random (throwaway) maildrop address |
| 34 | + self._address = uuid.uuid4().hex |
| 35 | + |
| 36 | + self._filter = message_filter |
| 37 | + self._seen = set() |
| 38 | + self._emails = list() |
| 39 | + self._client = requests.Session() |
| 40 | + self._client.headers.update({ |
| 41 | + "Content-Type": "application/json", |
| 42 | + "x-api-key": self._apikey |
| 43 | + }) |
| 44 | + print(f"maildrop address set to {self._address}@maildrop.cc") |
| 45 | + |
| 46 | + @property |
| 47 | + def address(self) -> str: |
| 48 | + """Returns the temporary email address being monitored.""" |
| 49 | + return f"{self._address}@maildrop.cc" |
| 50 | + |
| 51 | + @property |
| 52 | + def emails(self) -> list: |
| 53 | + """Returns a list of the emails collected during the monitor loop.""" |
| 54 | + return self._emails |
| 55 | + |
| 56 | + def _get_body(self, email_id: str) -> str: |
| 57 | + """Retrieves the body of an email. |
| 58 | +
|
| 59 | + :param email_id: The unique identifier of the email to retrieve. |
| 60 | + :returns: The body of the requested email. |
| 61 | + :rtype: str |
| 62 | + """ |
| 63 | + resp = self._client.get(f"{self._inbox}/{self._address}/{email_id}") |
| 64 | + if not resp.ok: |
| 65 | + raise RuntimeError( |
| 66 | + f"Issue retrieving specific email ({email_id}) from inbox." |
| 67 | + ) |
| 68 | + return resp.json().get("body", "") |
| 69 | + |
| 70 | + def get_emails(self) -> list: |
| 71 | + """Retrieves all unseen emails.""" |
| 72 | + resp = self._client.get(f"{self._inbox}/{self._address}") |
| 73 | + if not resp.ok: |
| 74 | + raise RuntimeError("Issue retrieving inbox from maildrop.cc.") |
| 75 | + new_emails = [] |
| 76 | + for email in resp.json().get("messages", []): |
| 77 | + email_id = email.get("id") |
| 78 | + if email_id in self._seen: |
| 79 | + continue |
| 80 | + # keep reference of all seen emails to prevent returning duplicates |
| 81 | + self._seen.add(email_id) |
| 82 | + email["body"] = self._get_body(email_id) |
| 83 | + if self._filter: |
| 84 | + if any([ |
| 85 | + self._filter in email.get(field, "") |
| 86 | + for field in self._search_fields |
| 87 | + ]): |
| 88 | + new_emails.append(email) |
| 89 | + else: |
| 90 | + new_emails.append(email) |
| 91 | + self._emails.extend(new_emails) |
| 92 | + return new_emails |
| 93 | + |
| 94 | + def monitor_inbox(self, interval: int = 5) -> None: |
| 95 | + """Check for unseen emails at an interval until told to stop. |
| 96 | +
|
| 97 | + :param interval: Optional number of seconds to wait |
| 98 | + before re-checking for unseen emails. |
| 99 | + Defaults to 5 seconds. |
| 100 | + """ |
| 101 | + print(f"checking inbox every {interval} seconds") |
| 102 | + print("to stop monitoring loop, press & hold ctrl+c") |
| 103 | + while True: |
| 104 | + try: |
| 105 | + emails = self.get_emails() |
| 106 | + if emails: |
| 107 | + print(f"found {len(emails)} new emails") |
| 108 | + for email in emails: |
| 109 | + print("email_id:", email.get("id")) |
| 110 | + print("from:", email.get("sender")) |
| 111 | + print("subject:", email.get("subject")) |
| 112 | + print("body:", email.get("body")) |
| 113 | + print("\n\n===\n\n") |
| 114 | + time.sleep(interval) |
| 115 | + except KeyboardInterrupt: |
| 116 | + print("stopping inbox monitor") |
| 117 | + break |
| 118 | + |
| 119 | + |
| 120 | +if __name__ == "__main__": |
| 121 | + import argparse |
| 122 | + parser = argparse.ArgumentParser() |
| 123 | + parser.add_argument( |
| 124 | + "-a", |
| 125 | + "--address", |
| 126 | + help=( |
| 127 | + "A specific maildrop.cc email address to monitor. " |
| 128 | + "If omitted, a randomly-generated UUID4 is used." |
| 129 | + ), |
| 130 | + default=None |
| 131 | + ) |
| 132 | + parser.add_argument( |
| 133 | + "-f", |
| 134 | + "--filter", |
| 135 | + help=( |
| 136 | + "A string to search for in new emails. " |
| 137 | + "If omitted, all new emails are returned." |
| 138 | + ), |
| 139 | + default=None |
| 140 | + ) |
| 141 | + args = parser.parse_args() |
| 142 | + md = Maildrop(address=args.address, message_filter=args.filter) |
| 143 | + md.monitor_inbox() |
0 commit comments