#!/usr/bin/env python3
import argparse, os, re, uuid, datetime
from email import policy
from email.parser import BytesParser
from email.generator import BytesGenerator
from email.message import EmailMessage
from email.headerregistry import Address
from io import BytesIO
# ---------- helpers ----------
def new_guid():
return str(uuid.uuid4())
def new_message_id(domain="example.test"):
# You can use your org domain if desired
from email.utils import make_msgid
return make_msgid(domain=domain) # returns like <...@domain>
def new_boundary(tag="MESSAGE_ID"):
# Boundary visible pattern similar to your sample (customize as needed)
token = uuid.uuid4().hex[:10]
return f"__{tag}__{token}"
def now_compact_ts():
# e.g., 20250911104117810 (yyyyMMddHHmmssfff)
dt = datetime.datetime.utcnow()
return dt.strftime("%Y%m%d%H%M%S") + f"{int(dt.microsecond/1000):03d}"
def get_boundary_from_header(msg):
# Works on multipart messages
# email.message has get_boundary(); fallback to regex on header if needed
b = msg.get_boundary()
if b:
return b
cth = msg.get("Content-Type", "")
m = re.search(r'boundary="?(.*?)"?(\s*;|$)', cth, re.I)
return m.group(1) if m else None
def set_boundary(msg: EmailMessage, boundary: str):
# Only relevant for multipart containers
if msg.is_multipart():
msg.set_boundary(boundary)
def replace_text_payload(part, replace_map, keep_encoding=True):
"""
Decode text-ish payload, apply replacements, and re-encode according to original part headers.
keep_encoding=True will retain original Content-Transfer-Encoding (e.g., base64/quoted-printable).
"""
orig_cte = (part.get("Content-Transfer-Encoding") or "").lower()
charset = part.get_content_charset() or "utf-8"
raw = part.get_payload(decode=True) or b""
try:
text = raw.decode(charset, errors="replace")
except Exception:
text = raw.decode("utf-8", errors="replace")
for old, new in replace_map.items():
if old:
text = text.replace(old, new)
# Re-set payload
if keep_encoding and orig_cte in ("base64", "quoted-printable"):
# set_payload with string + set_charset lets library re-encode
part.set_payload(text, charset=charset)
# The library will choose q-p by default for 8-bit; force base64 if originally base64:
if orig_cte == "base64":
part["Content-Transfer-Encoding"] = "base64"
else:
part.set_payload(text, charset=charset)
def rename_attachment_headers(part, new_filename):
# Update filename in both headers (Content-Disposition and Content-Type's "name=")
disp = part.get("Content-Disposition")
if disp:
part.set_param("filename", new_filename, header="Content-Disposition")
ctype = part.get_content_type()
# We need to re-apply name parameter on Content-Type header
part.set_param("name", new_filename, header="Content-Type")
def walk_leaf_parts(msg):
for p in msg.walk():
if not p.is_multipart():
yield p
def collect_tokens(msg):
"""
Collects tokens we intend to replace everywhere:
- message-id (without < > also useful)
- conversation id (custom header)
- boundary string
- attachment filenames
Returns dict with keys for mapping.
"""
tokens = {}
mid = (msg.get("Message-ID") or "").strip()
tokens["Message-ID"] = mid
# Unbracketed variant
if mid.startswith("<") and mid.endswith(">"):
tokens["Message-ID-unbracketed"] = mid[1:-1]
conv = (msg.get("X-Header-ConversationID") or "").strip()
tokens["ConversationID"] = conv
bnd = get_boundary_from_header(msg) or ""
tokens["Boundary"] = bnd
# any filenames present on parts
filenames = []
for p in walk_leaf_parts(msg):
fn = p.get_filename()
if fn:
filenames.append(fn)
tokens["Filenames"] = filenames
return tokens
def write_eml(msg, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
bio = BytesIO()
# Ensure CRLF line endings
BytesGenerator(bio, policy=policy.SMTP).flatten(msg)
with open(path, "wb") as f:
f.write(bio.getvalue())
# ---------- core generation ----------
def generate_variant_from_template(template_bytes: bytes, domain_for_msgid="example.test"):
# Parse template
msg = BytesParser(policy=policy.default).parsebytes(template_bytes)
# Collect old tokens
toks = collect_tokens(msg)
# Build new tokens
new_conv = new_guid()
new_mid = new_message_id(domain_for_msgid) # includes <...>
new_mid_unbr = new_mid[1:-1] # without angle brackets
new_bnd = new_boundary("MESSAGE_ID")
new_ts = now_compact_ts()
# Build replacement map (textual content + XML/HTML body)
replace_map = {}
if toks.get("ConversationID"):
replace_map[toks["ConversationID"]] = new_conv
if toks.get("Message-ID"):
replace_map[toks["Message-ID"]] = new_mid # in case body contains bracketed mid
if toks.get("Message-ID-unbracketed"):
replace_map[toks["Message-ID-unbracketed"]] = new_mid_unbr
if toks.get("Boundary"):
replace_map[toks["Boundary"]] = new_bnd
# Also map filenames (if they embed conv id or timestamp string). We’ll replace exact old names later when we rename.
old_filenames = toks.get("Filenames", [])
# --- Update top-level headers ---
if toks.get("ConversationID"):
msg.replace_header("X-Header-ConversationID", new_conv)
if msg.get("Message-ID"):
msg.replace_header("Message-ID", new_mid)
# Boundary (top-level multipart)
set_boundary(msg, new_bnd)
# --- Update parts ---
for part in walk_leaf_parts(msg):
ctype = (part.get_content_type() or "").lower()
orig_cte = (part.get("Content-Transfer-Encoding") or "").lower()
# If an attachment has a filename that embeds the old conv id / timestamp, build a new one
fn = part.get_filename()
if fn:
new_fn = fn
# Replace conv id if present
if toks.get("ConversationID") and toks["ConversationID"] in new_fn:
new_fn = new_fn.replace(toks["ConversationID"], new_conv)
# Replace timestamp-like token inside filename (simple heuristic for (YYYYMMDD...))
m = re.search(r"\(\d{14,20}\)", new_fn)
if m:
new_fn = new_fn.replace(m.group(0), f"({new_ts})")
# Apply rename
if new_fn != fn:
rename_attachment_headers(part, new_fn)
# For text/html, text/plain, application/xml, text/xml → decode → replace → re-encode
if ctype.startswith("text/") or ctype in ("application/xml", "text/xml", "application/json"):
replace_text_payload(part, replace_map, keep_encoding=True)
else:
# Binary: do nothing to payload, but boundary replacements in headers are already handled.
pass
# If there are nested multiparts, update their boundaries too
for p in msg.walk():
if p.is_multipart() and p is not msg:
# Give child multiparts their own unique boundary so no collisions
set_boundary(p, new_boundary("PART"))
return msg, {
"new_message_id": new_mid,
"new_conversation_id": new_conv,
"new_boundary": new_bnd
}
# ---------- CLI ----------
def main():
ap = argparse.ArgumentParser(description="Duplicate a sample EML into many unique EMLs.")
ap.add_argument("--input", required=True, help="Path to the source sample .eml (raw).")
ap.add_argument("--count", type=int, required=True, help="How many output files to generate.")
ap.add_argument("--out", required=True, help="Output folder.")
ap.add_argument("--domain", default="example.test", help="Domain to use for new Message-IDs.")
args = ap.parse_args()
with open(args.input, "rb") as f:
template_bytes = f.read()
os.makedirs(args.out, exist_ok=True)
manifest_path = os.path.join(args.out, "manifest.csv")
with open(manifest_path, "w", encoding="utf-8", newline="") as mf:
mf.write("filename,new_message_id,new_conversation_id,new_boundary\n")
for i in range(args.count):
msg, meta = generate_variant_from_template(template_bytes, domain_for_msgid=args.domain)
# Decide output filename: use conversation id + timestamp for uniqueness
ts = now_compact_ts()
out_name = f"{meta['new_conversation_id']}_{ts}.eml"
out_path = os.path.join(args.out, out_name)
write_eml(msg, out_path)
mf.write(f"{out_name},{meta['new_message_id']},{meta['new_conversation_id']},{meta['new_boundary']}\n")
print(f"✅ Generated {args.count} EML(s) into: {args.out}")
print(f" Manifest: {manifest_path}")
if __name__ == "__main__":
main()
python gen_eml_batch.py --input sample.eml --count 100 --out out/batch_0100
Parses the original (non-decoded) .eml.
Generates per-file new ConversationID (GUID), new Message-ID, and new MIME boundary.
Renames attachment filenames if they embed the old ConversationID and/or a timestamp in parentheses.
Replaces tokens inside HTML/XML/text parts (so duplicates won’t be flagged as identical).
Keeps binary attachments unchanged, preserving base64.
Writes output with CRLF line endings & a manifest.csv.
# add
from email import encoders
def replace_text_payload(part, replace_map, keep_encoding=True):
"""
Decode text payload, apply replacements, and re-encode.
If the original CTE was base64, re-apply base64 WITHOUT creating duplicate
'Content-Transfer-Encoding' headers.
"""
orig_cte = (part.get("Content-Transfer-Encoding") or "").lower()
charset = part.get_content_charset() or "utf-8"
raw = part.get_payload(decode=True) or b""
try:
text = raw.decode(charset, errors="replace")
except Exception:
text = raw.decode("utf-8", errors="replace")
# do replacements
for old, new in replace_map.items():
if old:
text = text.replace(old, new)
# 1) set_payload first (this may add a CTE like 'quoted-printable' or '8bit')
part.set_payload(text, charset=charset)
# 2) now remove any CTE header that set_payload just created
if part["Content-Transfer-Encoding"]:
del part["Content-Transfer-Encoding"]
# 3) re-apply original encoding if it was base64
if keep_encoding and orig_cte == "base64":
encoders.encode_base64(part)
# else leave it with no CTE; the library will choose a suitable one on output
Top comments (0)