This Python tool automates data collection from the BUFFALO WSR-3200AX4S router’s web admin interface using an async aiohttp client. It loads credentials and the base URL from a TOML config, performs a login flow, fetches operational data, and then logs out, while validating responses to detect abnormal pages or unexpected content types.
A key step is extracting an httoken embedded in an HTML <img title="spacer"> data URI, then using that token to request /cgi/cgi_info.js?_tn=.... Every HTTP response is saved to disk with a timestamped filename plus a companion JSON file containing request/response headers and body metadata for reproducibility and debugging.
- Async GET/POST with per-request
Referermanagement - MD5-hashes the password for form submission
- Verifies HTML
<title>expectations and JavaScript MIME type - Parses
addCfg(...)entries to extract WAN DHCP DNS tokens - Produces an audit-friendly response archive under
outputs/
config.toml
[client]
base_url = "http://192.168.1.1:80"
name = "admin"
password = "pass"
main.py
import asyncio
import base64
import hashlib
import json
import logging
import os
import re
from dataclasses import dataclass, replace
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlencode, urljoin, urlparse
import aiohttp
from lxml import html as lxml_html
# TOML loader: Python 3.11+ -> tomllib, else -> tomli
try:
import tomllib # type: ignore
except Exception: # pragma: no cover
tomllib = None # type: ignore
import tomli # type: ignore
# ----------------------------
# Configuration
# ----------------------------
@dataclass(frozen=True)
class Config:
base_url: str = "http://192.168.1.1:80"
name: str = "admin"
password: str = "pass"
cookie: str = "lang=8; mobile=false; url=" # As specified (only "url=")
@staticmethod
def _load_toml(path: Path) -> Dict[str, Any]:
if not path.exists():
raise FileNotFoundError(f"TOML config file not found: {path}")
data = path.read_bytes()
if tomllib is not None:
return tomllib.loads(data.decode("utf-8"))
return tomli.loads(data.decode("utf-8"))
@classmethod
def from_toml(cls, path: Path) -> "Config":
"""
Load base_url/name/password from TOML:
[client]
base_url = "..."
name = "..."
password = "..."
"""
raw = cls._load_toml(path)
client = raw.get("client", {})
if not isinstance(client, dict):
raise ValueError("Invalid TOML format: [client] must be a table")
# Optional override with defaults
base_url = client.get("base_url", cls.base_url)
name = client.get("name", cls.name)
password = client.get("password", cls.password)
# Basic validation
if not isinstance(base_url, str) or not base_url:
raise ValueError("TOML: client.base_url must be a non-empty string")
if not isinstance(name, str) or not name:
raise ValueError("TOML: client.name must be a non-empty string")
if not isinstance(password, str) or not password:
raise ValueError("TOML: client.password must be a non-empty string")
return replace(cls(), base_url=base_url, name=name, password=password)
# ----------------------------
# Logging
# ----------------------------
def setup_logging() -> None:
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)-8s %(name)s: %(message)s",
)
logger = logging.getLogger("client")
# ----------------------------
# HTML helpers
# ----------------------------
def extract_title(html_text: str) -> Optional[str]:
"""
Extract <title>...</title> text from HTML.
Returns None if not found or parsing fails.
"""
try:
doc = lxml_html.fromstring(html_text)
titles = doc.xpath("//title")
if not titles:
return None
title_text = titles[0].text_content()
return title_text.strip() if title_text is not None else ""
except Exception:
return None
def assert_title_equals(html_text: str, expected: str, *, context: str) -> None:
"""
Validate that the HTML <title> equals the expected string.
"""
log = logging.getLogger("validator")
title = extract_title(html_text)
if title != expected:
log.error(
"Validation failed (%s): unexpected <title>. expected=%r actual=%r",
context,
expected,
title,
)
raise ValueError(
f"Validation failed ({context}): unexpected <title> (expected={expected!r}, actual={title!r})"
)
log.info("Validation passed (%s): <title> matched %r", context, expected)
def assert_title_absent(html_text: str, *, context: str) -> None:
"""
Validate that the HTML contains no <title> tag.
"""
log = logging.getLogger("validator")
title = extract_title(html_text)
if title is not None:
log.error("Validation failed (%s): <title> tag must be absent but was present: %r", context, title)
raise ValueError(f"Validation failed ({context}): <title> tag must be absent but was present")
log.info("Validation passed (%s): <title> tag is absent", context)
def normalize_content_type(content_type: str) -> str:
"""
Normalize Content-Type by removing parameters and lowercasing.
Example: 'application/x-javascript; charset=UTF-8' -> 'application/x-javascript'
"""
return (content_type or "").split(";", 1)[0].strip().lower()
def assert_content_type_equals(content_type: str, expected: str, *, context: str) -> None:
"""
Validate that Content-Type matches expected (ignoring parameters).
"""
log = logging.getLogger("validator")
actual_norm = normalize_content_type(content_type)
expected_norm = normalize_content_type(expected)
if actual_norm != expected_norm:
log.error(
"Validation failed (%s): unexpected Content-Type. expected=%r actual=%r",
context,
expected_norm,
actual_norm,
)
raise ValueError(
f"Validation failed ({context}): unexpected Content-Type (expected={expected_norm!r}, actual={actual_norm!r})"
)
log.info("Validation passed (%s): Content-Type matched %r", context, expected_norm)
# ----------------------------
# httoken extraction
# ----------------------------
def get_httoken(html_text: str) -> str:
"""
Input: HTML text
Find: <img title="spacer" src="data:image/gif;base64,..." border="0">
Return: base64.b64decode(datauri[78:]).decode()
"""
log = logging.getLogger("get_httoken")
doc = lxml_html.fromstring(html_text)
imgs = doc.xpath("//img[@title='spacer']")
log.debug("Found %d img elements with title='spacer'", len(imgs))
if not imgs:
raise ValueError("No <img> tag with title='spacer' was found")
target = None
for img in imgs:
src = img.get("src") or ""
if src.startswith("data:image/gif;base64,"):
target = img
break
if target is None:
target = imgs[0]
datauri = target.get("src") or ""
log.debug("Selected img src length=%d, head=%r", len(datauri), datauri[:120])
# As specified: try datauri[78:] first
try:
token = base64.b64decode(datauri[78:]).decode()
log.debug("Decoded token length=%d (using datauri[78:])", len(token))
return token
except Exception as e:
log.exception("Decode failed with datauri[78:]: %s", e)
# Debug fallback (out of spec, but helpful for diagnosis)
prefix = "data:image/gif;base64,"
if datauri.startswith(prefix):
token = base64.b64decode(datauri[len(prefix) :]).decode()
log.debug("Decoded token length=%d (fallback: strip data URI prefix)", len(token))
return token
raise ValueError("Failed to decode httoken")
def md5_hex(s: str) -> str:
return hashlib.md5(s.encode()).hexdigest()
# ----------------------------
# Utility: safe filename
# ----------------------------
_SAFE_RE = re.compile(r"[^A-Za-z0-9._-]+")
def safe_filename(s: str, max_len: int = 120) -> str:
s = _SAFE_RE.sub("_", s).strip("_")
return (s[:max_len] if len(s) > max_len else s) or "root"
# ----------------------------
# Reproduce request body (form submit)
# ----------------------------
def build_request_body_bytes(data: Optional[Dict[str, Any]]) -> bytes:
"""
Reproduce the encoded body that aiohttp would generate (for saving/debugging).
Current behavior assumes form(dict) and produces application/x-www-form-urlencoded.
"""
if not data:
return b""
return urlencode(data, doseq=True).encode("utf-8")
# ----------------------------
# Save response (body + meta JSON)
# ----------------------------
class ResponseSaver:
def __init__(self, out_dir: Path) -> None:
self.out_dir = out_dir
self.out_dir.mkdir(parents=True, exist_ok=True)
self.seq = 0
self.log = logging.getLogger("saver")
def _make_stem(self, method: str, url: str, status: int) -> str:
self.seq += 1
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
p = urlparse(url)
path_part = safe_filename(p.path.lstrip("/") or "root")
query_part = safe_filename(p.query) if p.query else ""
if query_part:
path_part = f"{path_part}__q_{query_part}"
return f"{ts}_{self.seq:04d}_{method.upper()}_{status}_{path_part}"
def _guess_ext(self, content_type: str) -> str:
ct = (content_type or "").lower()
if "text/html" in ct:
return ".html"
if "application/json" in ct or ct.endswith("+json"):
return ".json"
if ct.startswith("text/"):
return ".txt"
return ".bin"
def save(
self,
*,
method: str,
url: str,
status: int,
reason: str,
content_type: str,
# request
request_headers: Any,
request_body: bytes,
request_data: Optional[Dict[str, Any]],
# response
response_headers: Any,
response_raw_headers: Any,
response_body: bytes,
) -> Dict[str, Path]:
stem = self._make_stem(method, url, status)
body_ext = self._guess_ext(content_type)
body_path = self.out_dir / f"{stem}{body_ext}"
meta_path = self.out_dir / f"{stem}.headers.json"
body_path.write_bytes(response_body)
req_headers_map = dict(request_headers)
req_headers_list = [[k, v] for k, v in req_headers_map.items()]
req_body_text: Optional[str] = None
if request_body:
try:
req_body_text = request_body.decode("utf-8")
except Exception:
req_body_text = None
req_body_b64: Optional[str] = None
if request_body:
req_body_b64 = base64.b64encode(request_body).decode("ascii")
resp_raw_list = []
try:
for k, v in response_raw_headers:
resp_raw_list.append([k.decode(errors="replace"), v.decode(errors="replace")])
except Exception:
resp_raw_list = []
set_cookie_headers: List[str] = []
try:
set_cookie_headers = response_headers.getall("Set-Cookie", [])
except Exception:
pass
meta = {
"saved_at": datetime.now().isoformat(),
"method": method.upper(),
"url": url,
"request": {
"headers_map": req_headers_map,
"headers_list": req_headers_list,
"body_bytes": len(request_body),
"body_text_utf8": req_body_text,
"body_base64": req_body_b64,
"form_data": request_data or None,
},
"response": {
"status": status,
"reason": reason,
"content_type": content_type,
"headers_map": dict(response_headers),
"headers_raw": resp_raw_list,
"set_cookie": set_cookie_headers,
"body_file": str(body_path.name),
"body_bytes": len(response_body),
},
}
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
self.log.info("Saved response body: %s", body_path)
self.log.info("Saved meta json: %s", meta_path)
return {"body": body_path, "meta_json": meta_path}
# ----------------------------
# Parse cgi_info.js for addCfg(...) calls
# ----------------------------
def _iter_addcfg_args_str(js_text: str) -> List[str]:
"""
Scan JavaScript text and extract the raw argument string inside addCfg(...).
Algorithm (as specified):
- Find literal substring: 'addCfg('
- After that position, find the first occurrence of either:
1) ');' or
2) ')\\n'
(No need to consider occurrences inside quotes.)
- Use that ')' position as the end of the addCfg(...) call.
- Extract args_str as the substring between '(' and ')'.
Returns:
A list of args_str strings (raw, not bracketed).
"""
results: List[str] = []
needle = "addCfg("
i = 0
while True:
start = js_text.find(needle, i)
if start < 0:
break
args_start = start + len(needle)
end1 = js_text.find(");", args_start) # points at ')' when found
end2 = js_text.find(")\n", args_start) # points at ')' when found
ends = [e for e in (end1, end2) if e >= 0]
if not ends:
# No terminator found; stop scanning to avoid infinite loop.
break
end = min(ends)
args_str = js_text[args_start:end]
results.append(args_str)
# Advance scanning position: move past the ')' we used.
i = end + 1
return results
def extract_wan_dhcp_dns_tokens(cgi_info_js_text: str) -> List[List[str]]:
"""
Find addCfg(...) calls, parse args as JSON list using:
args = json.loads("[" + args_str + "]")
Then:
- Find entries where:
args[0] == "wan_dhcp_dns"
args[1] == "ARC_WAN_0_IP4_DNS"
- For each match, split args[2] by whitespace and log the tokens.
Returns:
List of token lists (one per match).
"""
log = logging.getLogger("cgi_info_parser")
args_str_list = _iter_addcfg_args_str(cgi_info_js_text)
log.info("Found %d addCfg(...) call(s) by scanning", len(args_str_list))
all_tokens: List[List[str]] = []
for idx, args_str in enumerate(args_str_list, 1):
try:
args = json.loads("[" + args_str + "]")
except Exception as e:
log.debug("Skipping addCfg call #%d: JSON parse failed: %s; args_str head=%r", idx, e, args_str[:120])
continue
if not isinstance(args, list) or len(args) < 3:
continue
if args[0] != "wan_dhcp_dns" or args[1] != "ARC_WAN_0_IP4_DNS":
continue
if not isinstance(args[2], str):
continue
tokens = [t for t in args[2].split() if t]
log.info('Match %d: args[0]=%r args[1]=%r args[2]=%r', idx, args[0], args[1], args[2])
log.info("Match %d tokens (%d): %s", idx, len(tokens), tokens)
all_tokens.append(tokens)
log.info(
'Matched %d call(s) where args[0]=="wan_dhcp_dns" and args[1]=="ARC_WAN_0_IP4_DNS"',
len(all_tokens),
)
return all_tokens
# ----------------------------
# HTTP client (Referer management)
# ----------------------------
class HttpClient:
def __init__(self, session: aiohttp.ClientSession, saver: ResponseSaver) -> None:
self.session = session
self.saver = saver
self.last_url: Optional[str] = None
self.log = logging.getLogger("request")
async def request_text(
self,
method: str,
url: str,
*,
data: Optional[Dict[str, Any]] = None,
) -> str:
text, _content_type, _status = await self.request_text_meta(method, url, data=data)
return text
async def request_text_meta(
self,
method: str,
url: str,
*,
data: Optional[Dict[str, Any]] = None,
) -> Tuple[str, str, int]:
"""
Same behavior as request_text, but also returns (text, content_type, status).
"""
method_u = method.upper()
if data is not None:
self.log.debug("%s %s data=%s", method_u, url, data)
else:
self.log.debug("%s %s", method_u, url)
request_body_bytes = build_request_body_bytes(data)
# Per-request header: Referer is the previous URL (omit for the first request)
req_headers_override: Dict[str, str] = {}
if self.last_url:
req_headers_override["Referer"] = self.last_url
# POST: explicitly set Content-Type and send urlencoded bytes
req_data_to_send: Any = None
if method_u == "POST":
req_headers_override["Content-Type"] = "application/x-www-form-urlencoded"
req_data_to_send = request_body_bytes
async with self.session.request(
method=method_u,
url=url,
data=req_data_to_send,
headers=req_headers_override if req_headers_override else None,
) as resp:
sent_req_headers = resp.request_info.headers
response_body = await resp.read()
content_type = resp.headers.get("Content-Type", "")
self.saver.save(
method=method_u,
url=url,
status=resp.status,
reason=resp.reason or "",
content_type=content_type,
request_headers=sent_req_headers,
request_body=request_body_bytes,
request_data=data,
response_headers=resp.headers,
response_raw_headers=resp.raw_headers,
response_body=response_body,
)
self.log.debug("%s %s -> %s, bytes=%d", method_u, url, resp.status, len(response_body))
charset = resp.charset or "utf-8"
text = response_body.decode(charset, errors="replace")
self.log.debug("Response head: %r", text[:300])
# Update Referer for the next request
self.last_url = url
resp.raise_for_status()
return text, content_type, resp.status
# ----------------------------
# Flow implementation
# ----------------------------
async def login(client: HttpClient, cfg: Config) -> None:
log = logging.getLogger("login")
login_html_url = urljoin(cfg.base_url + "/", "login.html")
html_text = await client.request_text("GET", login_html_url)
httoken = get_httoken(html_text)
pws = md5_hex(cfg.password)
login_cgi_url = urljoin(cfg.base_url + "/", "login.cgi")
form = {
"name": cfg.name,
"pws": pws,
"url": "/",
"mobile": "0",
"httoken": httoken,
}
log.info("Logging in as name=%s", cfg.name)
login_resp_html, _ct, _st = await client.request_text_meta("POST", login_cgi_url, data=form)
# Rule: If login.cgi <title> is not "BUFFALO AirStation", treat as not logged in.
assert_title_equals(login_resp_html, "BUFFALO AirStation", context="login.cgi response title check")
log.info("Login POST completed")
async def fetch_info(client: HttpClient, cfg: Config) -> None:
"""
Behavior:
- Fetch /info.html and extract httoken
- Rule: If info.html contains <title>, treat as abnormal.
- Fetch /cgi/cgi_info.js?_tn={httoken} and save it (request_text already saves)
- Rule: If cgi_info.js Content-Type is not application/x-javascript, treat as abnormal.
- Additionally:
- Scan for addCfg(...) calls
- Parse args via json.loads("[" + args_str + "]")
- If args[0]=="wan_dhcp_dns" and args[1]=="ARC_WAN_0_IP4_DNS",
split args[2] by whitespace and log the resulting tokens
"""
log = logging.getLogger("fetch_info")
info_url = urljoin(cfg.base_url + "/", "info.html")
html_text = await client.request_text("GET", info_url)
# Rule: info.html must NOT contain <title>
assert_title_absent(html_text, context="info.html title absence check")
httoken = get_httoken(html_text)
log.info("Extracted httoken from info.html: %r", httoken)
cgi_info_url = urljoin(cfg.base_url + "/", f"cgi/cgi_info.js?_tn={httoken}")
cgi_info_js_text, cgi_ct, _st = await client.request_text_meta("GET", cgi_info_url)
# Rule: cgi_info.js Content-Type must be application/x-javascript
assert_content_type_equals(cgi_ct, "application/x-javascript", context="cgi_info.js Content-Type check")
extract_wan_dhcp_dns_tokens(cgi_info_js_text)
async def logout(client: HttpClient, cfg: Config) -> None:
log = logging.getLogger("logout")
logout_html_url = urljoin(cfg.base_url + "/", "logout.html")
html_text = await client.request_text("GET", logout_html_url)
httoken = get_httoken(html_text)
pws = md5_hex(cfg.password)
logout_cgi_url = urljoin(cfg.base_url + "/", "logout.cgi")
form = {
"name": cfg.name,
"pws": pws,
"url": "/",
"mobile": "0",
"httoken": httoken,
}
log.info("Logging out as name=%s", cfg.name)
logout_resp_html, _ct, _st = await client.request_text_meta("POST", logout_cgi_url, data=form)
# Rule: logout.cgi <title> must be "LOGIN"
assert_title_equals(logout_resp_html, "LOGIN", context="logout.cgi response title check")
log.info("Logout POST completed")
# ----------------------------
# main
# ----------------------------
async def main() -> None:
setup_logging()
# You can override the config path with environment variable CONFIG_PATH.
# Default is ./config.toml
config_path = Path(os.environ.get("CONFIG_PATH", "config.toml"))
cfg = Config.from_toml(config_path)
run_ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
base_out = Path(os.environ.get("OUT_DIR", ".")) / "outputs"
responses_dir = base_out / f"responses_{run_ts}"
saver = ResponseSaver(responses_dir)
jar = aiohttp.CookieJar(unsafe=True)
headers = {
"User-Agent": "aiohttp-client/1.0",
"Cookie": cfg.cookie,
}
timeout = aiohttp.ClientTimeout(total=60)
logger.info("Loaded config from: %s", config_path)
logger.info("Base URL: %s", cfg.base_url)
logger.info("Responses will be saved under: %s", responses_dir)
async with aiohttp.ClientSession(cookie_jar=jar, headers=headers, timeout=timeout) as session:
client = HttpClient(session, saver)
logged_in = False
try:
await login(client, cfg)
logged_in = True
await fetch_info(client, cfg)
finally:
if logged_in:
try:
await logout(client, cfg)
except Exception:
logger.exception("Logout failed (ignored)")
logger.info("Done")
if __name__ == "__main__":
asyncio.run(main())
Top comments (0)