import base64 import copy import email import os import re class Email: """ An email class used for testing. This class targets a specific serial of emails for PYTEST_CURRENT_TEST. As emails are sent out with util/sendmail, the serial number increases, starting at 1. Email content sent out by aurweb is always base64-encoded. Email.parse() decodes that for us and puts it into Email.body. Example: # Get the {test_suite}_{test_function}.1.txt email. email = Email(1).parse() print(email.body) print(email.headers) """ TEST_DIR = "test-emails" def __init__(self, serial: int = 1): self.serial = serial self.content = self._get() @staticmethod def email_prefix(suite: bool = False) -> str: """ Get the email prefix. We find the email prefix by reducing PYTEST_CURRENT_TEST to either {test_suite}_{test_function}. If `suite` is set, we reduce it to {test_suite} only. :param suite: Reduce PYTEST_CURRENT_TEST to {test_suite} :return: Email prefix with '/', '.', ',', and ':' chars replaced by '_' """ value = os.environ.get("PYTEST_CURRENT_TEST", "email").split(" ")[0] if suite: value = value.split(":")[0] return re.sub(r'(\/|\.|,|:)', "_", value) @staticmethod def count() -> int: """ Count the current number of emails sent from the test. This function is **only** supported inside of pytest functions. Do not use it elsewhere as data races will occur. :return: Number of emails sent by the current test """ files = os.listdir(Email.TEST_DIR) prefix = Email.email_prefix() expr = "^" + prefix + r"\.\d+\.txt$" subset = filter(lambda e: re.match(expr, e), files) return len(list(subset)) def _email_path(self) -> str: filename = self.email_prefix() + f".{self.serial}.txt" return os.path.join(Email.TEST_DIR, filename) def _get(self) -> str: """ Get this email's content by reading its file. :return: Email content """ path = self._email_path() with open(path) as f: return f.read() def parse(self) -> "Email": """ Parse this email and base64-decode the body. This function populates Email.message, Email.headers and Email.body. Additionally, after parsing, we write over our email file with self.glue()'d content (base64-decoded). This is done for ease of inspection by users. :return: self """ self.message = email.message_from_string(self.content) self.headers = dict(self.message) # aurweb email notifications always have base64 encoded content. # Decode it here so self.body is human readable. self.body = base64.b64decode(self.message.get_payload()).decode() path = self._email_path() with open(path, "w") as f: f.write(self.glue()) return self def glue(self) -> str: """ Glue parsed content back into a complete email document, but base64-decoded this time. :return: Email document as a string """ headers = copy.copy(self.headers) del headers["Content-Transfer-Encoding"] output = [] for k, v in headers.items(): output.append(f"{k}: {v}") output.append("") output.append(self.body) return "\n".join(output)