DEV Community

ByteLedger
ByteLedger

Posted on

Minimal Python scanner

#!/usr/bin/env python3
import argparse, os, re, uuid, datetime
from email import policy
from email.parser import BytesParser
from email.generator import BytesGenerator
from email.message import EmailMessage
from email.headerregistry import Address
from io import BytesIO

# ---------- helpers ----------

def new_guid():
    return str(uuid.uuid4())

def new_message_id(domain="example.test"):
    # You can use your org domain if desired
    from email.utils import make_msgid
    return make_msgid(domain=domain)  # returns like <...@domain>

def new_boundary(tag="MESSAGE_ID"):
    # Boundary visible pattern similar to your sample (customize as needed)
    token = uuid.uuid4().hex[:10]
    return f"__{tag}__{token}"

def now_compact_ts():
    # e.g., 20250911104117810 (yyyyMMddHHmmssfff)
    dt = datetime.datetime.utcnow()
    return dt.strftime("%Y%m%d%H%M%S") + f"{int(dt.microsecond/1000):03d}"

def get_boundary_from_header(msg):
    # Works on multipart messages
    # email.message has get_boundary(); fallback to regex on header if needed
    b = msg.get_boundary()
    if b:
        return b
    cth = msg.get("Content-Type", "")
    m = re.search(r'boundary="?(.*?)"?(\s*;|$)', cth, re.I)
    return m.group(1) if m else None

def set_boundary(msg: EmailMessage, boundary: str):
    # Only relevant for multipart containers
    if msg.is_multipart():
        msg.set_boundary(boundary)

def replace_text_payload(part, replace_map, keep_encoding=True):
    """
    Decode text-ish payload, apply replacements, and re-encode according to original part headers.
    keep_encoding=True will retain original Content-Transfer-Encoding (e.g., base64/quoted-printable).
    """
    orig_cte = (part.get("Content-Transfer-Encoding") or "").lower()
    charset = part.get_content_charset() or "utf-8"
    raw = part.get_payload(decode=True) or b""
    try:
        text = raw.decode(charset, errors="replace")
    except Exception:
        text = raw.decode("utf-8", errors="replace")
    for old, new in replace_map.items():
        if old:
            text = text.replace(old, new)
    # Re-set payload
    if keep_encoding and orig_cte in ("base64", "quoted-printable"):
        # set_payload with string + set_charset lets library re-encode
        part.set_payload(text, charset=charset)
        # The library will choose q-p by default for 8-bit; force base64 if originally base64:
        if orig_cte == "base64":
            part["Content-Transfer-Encoding"] = "base64"
    else:
        part.set_payload(text, charset=charset)

def rename_attachment_headers(part, new_filename):
    # Update filename in both headers (Content-Disposition and Content-Type's "name=")
    disp = part.get("Content-Disposition")
    if disp:
        part.set_param("filename", new_filename, header="Content-Disposition")
    ctype = part.get_content_type()
    # We need to re-apply name parameter on Content-Type header
    part.set_param("name", new_filename, header="Content-Type")

def walk_leaf_parts(msg):
    for p in msg.walk():
        if not p.is_multipart():
            yield p

def collect_tokens(msg):
    """
    Collects tokens we intend to replace everywhere:
    - message-id (without < > also useful)
    - conversation id (custom header)
    - boundary string
    - attachment filenames
    Returns dict with keys for mapping.
    """
    tokens = {}
    mid = (msg.get("Message-ID") or "").strip()
    tokens["Message-ID"] = mid
    # Unbracketed variant
    if mid.startswith("<") and mid.endswith(">"):
        tokens["Message-ID-unbracketed"] = mid[1:-1]
    conv = (msg.get("X-Header-ConversationID") or "").strip()
    tokens["ConversationID"] = conv
    bnd = get_boundary_from_header(msg) or ""
    tokens["Boundary"] = bnd

    # any filenames present on parts
    filenames = []
    for p in walk_leaf_parts(msg):
        fn = p.get_filename()
        if fn:
            filenames.append(fn)
    tokens["Filenames"] = filenames
    return tokens

def write_eml(msg, path):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    bio = BytesIO()
    # Ensure CRLF line endings
    BytesGenerator(bio, policy=policy.SMTP).flatten(msg)
    with open(path, "wb") as f:
        f.write(bio.getvalue())

# ---------- core generation ----------

def generate_variant_from_template(template_bytes: bytes, domain_for_msgid="example.test"):
    # Parse template
    msg = BytesParser(policy=policy.default).parsebytes(template_bytes)

    # Collect old tokens
    toks = collect_tokens(msg)

    # Build new tokens
    new_conv = new_guid()
    new_mid = new_message_id(domain_for_msgid)     # includes <...>
    new_mid_unbr = new_mid[1:-1]                   # without angle brackets
    new_bnd = new_boundary("MESSAGE_ID")
    new_ts = now_compact_ts()

    # Build replacement map (textual content + XML/HTML body)
    replace_map = {}
    if toks.get("ConversationID"):
        replace_map[toks["ConversationID"]] = new_conv
    if toks.get("Message-ID"):
        replace_map[toks["Message-ID"]] = new_mid  # in case body contains bracketed mid
    if toks.get("Message-ID-unbracketed"):
        replace_map[toks["Message-ID-unbracketed"]] = new_mid_unbr
    if toks.get("Boundary"):
        replace_map[toks["Boundary"]] = new_bnd

    # Also map filenames (if they embed conv id or timestamp string). We’ll replace exact old names later when we rename.
    old_filenames = toks.get("Filenames", [])

    # --- Update top-level headers ---
    if toks.get("ConversationID"):
        msg.replace_header("X-Header-ConversationID", new_conv)
    if msg.get("Message-ID"):
        msg.replace_header("Message-ID", new_mid)
    # Boundary (top-level multipart)
    set_boundary(msg, new_bnd)

    # --- Update parts ---
    for part in walk_leaf_parts(msg):
        ctype = (part.get_content_type() or "").lower()
        orig_cte = (part.get("Content-Transfer-Encoding") or "").lower()

        # If an attachment has a filename that embeds the old conv id / timestamp, build a new one
        fn = part.get_filename()
        if fn:
            new_fn = fn
            # Replace conv id if present
            if toks.get("ConversationID") and toks["ConversationID"] in new_fn:
                new_fn = new_fn.replace(toks["ConversationID"], new_conv)
            # Replace timestamp-like token inside filename (simple heuristic for (YYYYMMDD...))
            m = re.search(r"\(\d{14,20}\)", new_fn)
            if m:
                new_fn = new_fn.replace(m.group(0), f"({new_ts})")
            # Apply rename
            if new_fn != fn:
                rename_attachment_headers(part, new_fn)

        # For text/html, text/plain, application/xml, text/xml → decode → replace → re-encode
        if ctype.startswith("text/") or ctype in ("application/xml", "text/xml", "application/json"):
            replace_text_payload(part, replace_map, keep_encoding=True)
        else:
            # Binary: do nothing to payload, but boundary replacements in headers are already handled.
            pass

    # If there are nested multiparts, update their boundaries too
    for p in msg.walk():
        if p.is_multipart() and p is not msg:
            # Give child multiparts their own unique boundary so no collisions
            set_boundary(p, new_boundary("PART"))

    return msg, {
        "new_message_id": new_mid,
        "new_conversation_id": new_conv,
        "new_boundary": new_bnd
    }

# ---------- CLI ----------

def main():
    ap = argparse.ArgumentParser(description="Duplicate a sample EML into many unique EMLs.")
    ap.add_argument("--input", required=True, help="Path to the source sample .eml (raw).")
    ap.add_argument("--count", type=int, required=True, help="How many output files to generate.")
    ap.add_argument("--out", required=True, help="Output folder.")
    ap.add_argument("--domain", default="example.test", help="Domain to use for new Message-IDs.")
    args = ap.parse_args()

    with open(args.input, "rb") as f:
        template_bytes = f.read()

    os.makedirs(args.out, exist_ok=True)
    manifest_path = os.path.join(args.out, "manifest.csv")
    with open(manifest_path, "w", encoding="utf-8", newline="") as mf:
        mf.write("filename,new_message_id,new_conversation_id,new_boundary\n")

        for i in range(args.count):
            msg, meta = generate_variant_from_template(template_bytes, domain_for_msgid=args.domain)

            # Decide output filename: use conversation id + timestamp for uniqueness
            ts = now_compact_ts()
            out_name = f"{meta['new_conversation_id']}_{ts}.eml"
            out_path = os.path.join(args.out, out_name)

            write_eml(msg, out_path)
            mf.write(f"{out_name},{meta['new_message_id']},{meta['new_conversation_id']},{meta['new_boundary']}\n")

    print(f"✅ Generated {args.count} EML(s) into: {args.out}")
    print(f"   Manifest: {manifest_path}")

if __name__ == "__main__":
    main()

Enter fullscreen mode Exit fullscreen mode

python gen_eml_batch.py --input sample.eml --count 100 --out out/batch_0100
Parses the original (non-decoded) .eml.

Generates per-file new ConversationID (GUID), new Message-ID, and new MIME boundary.

Renames attachment filenames if they embed the old ConversationID and/or a timestamp in parentheses.

Replaces tokens inside HTML/XML/text parts (so duplicates won’t be flagged as identical).

Keeps binary attachments unchanged, preserving base64.

Writes output with CRLF line endings & a manifest.csv.

# add 
from email import encoders

def replace_text_payload(part, replace_map, keep_encoding=True):
    """
    Decode text payload, apply replacements, and re-encode.
    If the original CTE was base64, re-apply base64 WITHOUT creating duplicate
    'Content-Transfer-Encoding' headers.
    """
    orig_cte = (part.get("Content-Transfer-Encoding") or "").lower()
    charset = part.get_content_charset() or "utf-8"

    raw = part.get_payload(decode=True) or b""
    try:
        text = raw.decode(charset, errors="replace")
    except Exception:
        text = raw.decode("utf-8", errors="replace")

    # do replacements
    for old, new in replace_map.items():
        if old:
            text = text.replace(old, new)

    # 1) set_payload first (this may add a CTE like 'quoted-printable' or '8bit')
    part.set_payload(text, charset=charset)

    # 2) now remove any CTE header that set_payload just created
    if part["Content-Transfer-Encoding"]:
        del part["Content-Transfer-Encoding"]

    # 3) re-apply original encoding if it was base64
    if keep_encoding and orig_cte == "base64":
        encoders.encode_base64(part)
    # else leave it with no CTE; the library will choose a suitable one on output

Enter fullscreen mode Exit fullscreen mode

Top comments (0)