emlgen/
init.py
cli.py # argparse CLI
io_utils.py # read/write EML, CRLF-safe
ids.py # new GUID/Message-ID/boundary/timestamp
transform.py # token map, replacements, text injection
emlgen/io_utils.py
from email import policy
from email.parser import BytesParser
from email.generator import BytesGenerator
from io import BytesIO
import os
def read_eml(path: str):
with open(path, "rb") as f:
return BytesParser(policy=policy.default).parse(f)
def write_eml(msg, path: str):
os.makedirs(os.path.dirname(path), exist_ok=True)
bio = BytesIO()
BytesGenerator(bio, policy=policy.SMTP).flatten(msg) # CRLF
with open(path, "wb") as f:
f.write(bio.getvalue())
emlgen/ids.py
import uuid, datetime
from email.utils import make_msgid
def new_guid(): return str(uuid.uuid4())
def new_message_id(domain="example.test"):
return make_msgid(domain=domain) # includes <...>
def boundary(tag="MESSAGE_ID"): # e.g. __MESSAGE_ID__abc123
return f"__{tag}__{uuid.uuid4().hex[:10]}"
def ts_compact():
dt = datetime.datetime.utcnow()
return dt.strftime("%Y%m%d%H%M%S") + f"{int(dt.microsecond/1000):03d}"
emlgen/transform.py
import re
from email import encoders
def get_boundary(msg):
b = msg.get_boundary()
if b: return b
m = re.search(r'boundary="?(.*?)"(;|$)', msg.get('Content-Type',''), re.I)
return m.group(1) if m else None
def set_boundary(mp, val):
if mp.is_multipart():
mp.set_boundary(val)
def walk_leaf_parts(msg):
for p in msg.walk():
if not p.is_multipart():
return_part = p
yield return_part
def collect_tokens(msg):
toks = {}
mid = (msg.get("Message-ID") or "").strip()
toks["Message-ID"] = mid
toks["Message-ID-unbr"] = mid[1:-1] if mid.startswith("<") and mid.endswith(">") else ""
toks["ConversationID"] = (msg.get("X-Header-ConversationID") or "").strip()
toks["Boundary"] = get_boundary(msg) or ""
toks["Filenames"] = [p.get_filename() for p in walk_leaf_parts(msg) if p.get_filename()]
return toks
def rename_attachment_headers(part, new_filename):
if part.get("Content-Disposition"):
part.set_param("filename", new_filename, header="Content-Disposition")
part.set_param("name", new_filename, header="Content-Type")
def replace_text_payload(part, replace_map, keep_b64=True):
orig_cte = (part.get("Content-Transfer-Encoding") or "").lower()
charset = part.get_content_charset() or "utf-8"
raw = part.get_payload(decode=True) or b""
try: text = raw.decode(charset, errors="replace")
except: text = raw.decode("utf-8", errors="replace")
for old, new in replace_map.items():
if old: text = text.replace(old, new)
part.set_payload(text, charset=charset)
if part["Content-Transfer-Encoding"]:
del part["Content-Transfer-Encoding"]
if keep_b64 and orig_cte == "base64":
encoders.encode_base64(part)
def inject_text(msg, text, where="html-end", once_marker="<!--emlgen-injected-->"):
"""
Adds `text` into text/html or text/plain part.
where: 'html-top' | 'html-end' | 'text-top' | 'text-end'
Adds a marker to avoid duplicate injections on re-runs.
"""
for p in walk_leaf_parts(msg):
ctype = (p.get_content_type() or "").lower()
if ctype == "text/html":
raw = p.get_payload(decode=True) or b""
charset = p.get_content_charset() or "utf-8"
try: html = raw.decode(charset, errors="replace")
except: html = raw.decode("utf-8", errors="replace")
if once_marker in html: continue
if where == "html-top":
html = once_marker + text + html
elif where == "html-end":
# insert before </body> if present else append
html = re.sub(r"</body\s*>", f"{text}\n{once_marker}</body>", html, flags=re.I) \
if re.search(r"</body\s*>", html, re.I) else html + "\n" + text + "\n" + once_marker
p.set_payload(html, charset=charset)
if p["Content-Transfer-Encoding"]: del p["Content-Transfer-Encoding"]
encoders.encode_base64(p) # keep original style
elif ctype == "text/plain":
raw = p.get_payload(decode=True) or b""
charset = p.get_content_charset() or "utf-8"
try: txt = raw.decode(charset, errors="replace")
except: txt = raw.decode("utf-8", errors="replace")
if once_marker in txt: continue
if where == "text-top":
txt = once_marker + "\n" + text + "\n\n" + txt
elif where == "text-end":
txt = txt + "\n\n" + text + "\n" + once_marker
p.set_payload(txt, charset=charset)
if p["Content-Transfer-Encoding"]: del p["Content-Transfer-Encoding"]
encoders.encode_base64(p)
emlgen/cli.py
import argparse, os, re
from .io_utils import read_eml, write_eml
from .ids import new_guid, new_message_id, boundary as new_boundary, ts_compact
from .transform import collect_tokens, set_boundary, walk_leaf_parts, \
rename_attachment_headers, replace_text_payload, inject_text
def build_replace_map(old_tokens, new_mid, new_mid_unbr, new_conv, new_bnd):
r = {}
if old_tokens.get("ConversationID"): r[old_tokens["ConversationID"]] = new_conv
if old_tokens.get("Message-ID"): r[old_tokens["Message-ID"]] = new_mid
if old_tokens.get("Message-ID-unbr"): r[old_tokens["Message-ID-unbr"]] = new_mid_unbr
if old_tokens.get("Boundary"): r[old_tokens["Boundary"]] = new_bnd
return r
def generate_one(template_msg, domain, add_text=None, add_where="html-end"):
msg = template_msg.clone() if hasattr(template_msg, "clone") else template_msg
toks = collect_tokens(msg)
new_conv = new_guid()
new_mid = new_message_id(domain)
new_mid_unbr = new_mid[1:-1]
new_bnd = new_boundary("MESSAGE_ID")
new_ts = ts_compact()
# headers
if toks.get("ConversationID"): msg.replace_header("X-Header-ConversationID", new_conv)
if msg.get("Message-ID"): msg.replace_header("Message-ID", new_mid)
set_boundary(msg, new_bnd)
# parts
rmap = build_replace_map(toks, new_mid, new_mid_unbr, new_conv, new_bnd)
for p in walk_leaf_parts(msg):
ctype = (p.get_content_type() or "").lower()
fn = p.get_filename()
if fn:
nf = fn
if toks.get("ConversationID") and toks["ConversationID"] in nf:
nf = nf.replace(toks["ConversationID"], new_conv)
m = re.search(r"\(\d{14,20}\)", nf)
if m: nf = nf.replace(m.group(0), f"({new_ts})")
if nf != fn: rename_attachment_headers(p, nf)
if ctype.startswith("text/") or ctype in ("application/xml","text/xml","application/json"):
replace_text_payload(p, rmap, keep_b64=True)
# optional injection
if add_text:
inject_text(msg, add_text, where=add_where)
return msg, {"conv": new_conv, "mid": new_mid, "bnd": new_bnd}
def main():
ap = argparse.ArgumentParser(prog="emlgen", description="EML batch generator")
ap.add_argument("--input", required=True, help="source .eml")
ap.add_argument("--count", required=True, type=int)
ap.add_argument("--out", required=True)
ap.add_argument("--domain", default="example.test")
ap.add_argument("--add-text", default=None, help="Text to inject into email body")
ap.add_argument("--add-where", default="html-end",
choices=["html-top","html-end","text-top","text-end"])
args = ap.parse_args()
from .io_utils import read_eml
tmpl = read_eml(args.input)
os.makedirs(args.out, exist_ok=True)
mpath = os.path.join(args.out, "manifest.csv")
with open(mpath, "w", encoding="utf-8") as mf:
mf.write("filename,conversation_id,message_id,boundary\n")
for _ in range(args.count):
msg, meta = generate_one(tmpl, args.domain, args.add_text, args.add_where)
name = f"{meta['conv']}_{ts_compact()}.eml"
write_eml(msg, os.path.join(args.out, name))
mf.write(f"{name},{meta['conv']},{meta['mid']},{meta['bnd']}\n")
print(f"Done. Output: {args.out}\nManifest: {mpath}")
if __name__ == "__main__":
main()
HTML body (append before </body>)
python -m emlgen.cli --input sample.eml --count 50 --out out/batch_0050 \
--add-text "<p style='color:#888'>[TEST DATA] Load-run 2025-09-24</p>" \
--add-where html-end
TEXT body (append at end)
python -m emlgen.cli --input sample.eml --count 50 --out out/batch_0050_txt \
--add-text "[TEST DATA] Load-run 2025-09-24" \
--add-where text-end
Top comments (0)