DEV Community

ByteLedger
ByteLedger

Posted on

python mail format parser

emlgen/
init.py
cli.py # argparse CLI
io_utils.py # read/write EML, CRLF-safe
ids.py # new GUID/Message-ID/boundary/timestamp
transform.py # token map, replacements, text injection

emlgen/io_utils.py

from email import policy
from email.parser import BytesParser
from email.generator import BytesGenerator
from io import BytesIO
import os

def read_eml(path: str):
    with open(path, "rb") as f:
        return BytesParser(policy=policy.default).parse(f)

def write_eml(msg, path: str):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    bio = BytesIO()
    BytesGenerator(bio, policy=policy.SMTP).flatten(msg)  # CRLF
    with open(path, "wb") as f:
        f.write(bio.getvalue())

Enter fullscreen mode Exit fullscreen mode

emlgen/ids.py

import uuid, datetime
from email.utils import make_msgid

def new_guid(): return str(uuid.uuid4())

def new_message_id(domain="example.test"):
    return make_msgid(domain=domain)          # includes <...>

def boundary(tag="MESSAGE_ID"):               # e.g. __MESSAGE_ID__abc123
    return f"__{tag}__{uuid.uuid4().hex[:10]}"

def ts_compact():
    dt = datetime.datetime.utcnow()
    return dt.strftime("%Y%m%d%H%M%S") + f"{int(dt.microsecond/1000):03d}"

Enter fullscreen mode Exit fullscreen mode

emlgen/transform.py

import re
from email import encoders

def get_boundary(msg):
    b = msg.get_boundary()
    if b: return b
    m = re.search(r'boundary="?(.*?)"(;|$)', msg.get('Content-Type',''), re.I)
    return m.group(1) if m else None

def set_boundary(mp, val):
    if mp.is_multipart():
        mp.set_boundary(val)

def walk_leaf_parts(msg):
    for p in msg.walk():
        if not p.is_multipart():
            return_part = p
            yield return_part

def collect_tokens(msg):
    toks = {}
    mid = (msg.get("Message-ID") or "").strip()
    toks["Message-ID"] = mid
    toks["Message-ID-unbr"] = mid[1:-1] if mid.startswith("<") and mid.endswith(">") else ""
    toks["ConversationID"] = (msg.get("X-Header-ConversationID") or "").strip()
    toks["Boundary"] = get_boundary(msg) or ""
    toks["Filenames"] = [p.get_filename() for p in walk_leaf_parts(msg) if p.get_filename()]
    return toks

def rename_attachment_headers(part, new_filename):
    if part.get("Content-Disposition"):
        part.set_param("filename", new_filename, header="Content-Disposition")
    part.set_param("name", new_filename, header="Content-Type")

def replace_text_payload(part, replace_map, keep_b64=True):
    orig_cte = (part.get("Content-Transfer-Encoding") or "").lower()
    charset = part.get_content_charset() or "utf-8"
    raw = part.get_payload(decode=True) or b""
    try:   text = raw.decode(charset, errors="replace")
    except: text = raw.decode("utf-8", errors="replace")

    for old, new in replace_map.items():
        if old: text = text.replace(old, new)

    part.set_payload(text, charset=charset)
    if part["Content-Transfer-Encoding"]:
        del part["Content-Transfer-Encoding"]
    if keep_b64 and orig_cte == "base64":
        encoders.encode_base64(part)

def inject_text(msg, text, where="html-end", once_marker="<!--emlgen-injected-->"):
    """
    Adds `text` into text/html or text/plain part.
    where: 'html-top' | 'html-end' | 'text-top' | 'text-end'
    Adds a marker to avoid duplicate injections on re-runs.
    """
    for p in walk_leaf_parts(msg):
        ctype = (p.get_content_type() or "").lower()
        if ctype == "text/html":
            raw = p.get_payload(decode=True) or b""
            charset = p.get_content_charset() or "utf-8"
            try: html = raw.decode(charset, errors="replace")
            except: html = raw.decode("utf-8", errors="replace")
            if once_marker in html: continue
            if where == "html-top":
                html = once_marker + text + html
            elif where == "html-end":
                # insert before </body> if present else append
                html = re.sub(r"</body\s*>", f"{text}\n{once_marker}</body>", html, flags=re.I) \
                       if re.search(r"</body\s*>", html, re.I) else html + "\n" + text + "\n" + once_marker
            p.set_payload(html, charset=charset)
            if p["Content-Transfer-Encoding"]: del p["Content-Transfer-Encoding"]
            encoders.encode_base64(p)  # keep original style
        elif ctype == "text/plain":
            raw = p.get_payload(decode=True) or b""
            charset = p.get_content_charset() or "utf-8"
            try: txt = raw.decode(charset, errors="replace")
            except: txt = raw.decode("utf-8", errors="replace")
            if once_marker in txt: continue
            if where == "text-top":
                txt = once_marker + "\n" + text + "\n\n" + txt
            elif where == "text-end":
                txt = txt + "\n\n" + text + "\n" + once_marker
            p.set_payload(txt, charset=charset)
            if p["Content-Transfer-Encoding"]: del p["Content-Transfer-Encoding"]
            encoders.encode_base64(p)

Enter fullscreen mode Exit fullscreen mode

emlgen/cli.py

import argparse, os, re
from .io_utils import read_eml, write_eml
from .ids import new_guid, new_message_id, boundary as new_boundary, ts_compact
from .transform import collect_tokens, set_boundary, walk_leaf_parts, \
                        rename_attachment_headers, replace_text_payload, inject_text

def build_replace_map(old_tokens, new_mid, new_mid_unbr, new_conv, new_bnd):
    r = {}
    if old_tokens.get("ConversationID"):     r[old_tokens["ConversationID"]] = new_conv
    if old_tokens.get("Message-ID"):         r[old_tokens["Message-ID"]] = new_mid
    if old_tokens.get("Message-ID-unbr"):    r[old_tokens["Message-ID-unbr"]] = new_mid_unbr
    if old_tokens.get("Boundary"):           r[old_tokens["Boundary"]] = new_bnd
    return r

def generate_one(template_msg, domain, add_text=None, add_where="html-end"):
    msg = template_msg.clone() if hasattr(template_msg, "clone") else template_msg

    toks = collect_tokens(msg)
    new_conv = new_guid()
    new_mid  = new_message_id(domain)
    new_mid_unbr = new_mid[1:-1]
    new_bnd  = new_boundary("MESSAGE_ID")
    new_ts   = ts_compact()

    # headers
    if toks.get("ConversationID"): msg.replace_header("X-Header-ConversationID", new_conv)
    if msg.get("Message-ID"):      msg.replace_header("Message-ID", new_mid)
    set_boundary(msg, new_bnd)

    # parts
    rmap = build_replace_map(toks, new_mid, new_mid_unbr, new_conv, new_bnd)
    for p in walk_leaf_parts(msg):
        ctype = (p.get_content_type() or "").lower()
        fn = p.get_filename()
        if fn:
            nf = fn
            if toks.get("ConversationID") and toks["ConversationID"] in nf:
                nf = nf.replace(toks["ConversationID"], new_conv)
            m = re.search(r"\(\d{14,20}\)", nf)
            if m: nf = nf.replace(m.group(0), f"({new_ts})")
            if nf != fn: rename_attachment_headers(p, nf)
        if ctype.startswith("text/") or ctype in ("application/xml","text/xml","application/json"):
            replace_text_payload(p, rmap, keep_b64=True)

    # optional injection
    if add_text:
        inject_text(msg, add_text, where=add_where)

    return msg, {"conv": new_conv, "mid": new_mid, "bnd": new_bnd}

def main():
    ap = argparse.ArgumentParser(prog="emlgen", description="EML batch generator")
    ap.add_argument("--input", required=True, help="source .eml")
    ap.add_argument("--count", required=True, type=int)
    ap.add_argument("--out", required=True)
    ap.add_argument("--domain", default="example.test")
    ap.add_argument("--add-text", default=None, help="Text to inject into email body")
    ap.add_argument("--add-where", default="html-end",
                    choices=["html-top","html-end","text-top","text-end"])
    args = ap.parse_args()

    from .io_utils import read_eml
    tmpl = read_eml(args.input)

    os.makedirs(args.out, exist_ok=True)
    mpath = os.path.join(args.out, "manifest.csv")
    with open(mpath, "w", encoding="utf-8") as mf:
        mf.write("filename,conversation_id,message_id,boundary\n")
        for _ in range(args.count):
            msg, meta = generate_one(tmpl, args.domain, args.add_text, args.add_where)
            name = f"{meta['conv']}_{ts_compact()}.eml"
            write_eml(msg, os.path.join(args.out, name))
            mf.write(f"{name},{meta['conv']},{meta['mid']},{meta['bnd']}\n")
    print(f"Done. Output: {args.out}\nManifest: {mpath}")

if __name__ == "__main__":
    main()

Enter fullscreen mode Exit fullscreen mode
HTML body (append before </body>)

python -m emlgen.cli --input sample.eml --count 50 --out out/batch_0050 \
  --add-text "<p style='color:#888'>[TEST DATA] Load-run 2025-09-24</p>" \
  --add-where html-end


TEXT body (append at end)

python -m emlgen.cli --input sample.eml --count 50 --out out/batch_0050_txt \
  --add-text "[TEST DATA] Load-run 2025-09-24" \
  --add-where text-end
Enter fullscreen mode Exit fullscreen mode

Top comments (0)