DEV Community

Naveen Karasu
Naveen Karasu

Posted on

Day 9/90: Network Scanning with Sockets -- Python Security

Day 9/90: Network Scanning with Sockets -- Python Security

90 Day Python Security Scripting Challenge

Today I dug into Python's socket module to build network scanning tools from scratch. Understanding socket-level scanning is essential before relying on wrapper tools.

Threaded Port Scanner with Service Mapping

A practical scanner needs speed and context. This version scans ports in parallel and maps open ports to known service names.

import socket
import concurrent.futures

COMMON_SERVICES = {
    21: "FTP", 22: "SSH", 25: "SMTP", 53: "DNS",
    80: "HTTP", 110: "POP3", 143: "IMAP", 443: "HTTPS",
    993: "IMAPS", 995: "POP3S", 3306: "MySQL",
    5432: "PostgreSQL", 6379: "Redis", 8080: "HTTP-Alt",
}

def probe_port(host, port, timeout=1.5):
    """Probe a single port and return service info."""
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(timeout)
    try:
        if s.connect_ex((host, port)) == 0:
            svc = COMMON_SERVICES.get(port, "unknown")
            return {"port": port, "state": "open", "service": svc}
    except socket.error:
        pass
    finally:
        s.close()
    return None

def scan_host(host, ports, max_threads=60):
    """Parallel scan with service identification."""
    findings = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as ex:
        jobs = {ex.submit(probe_port, host, p): p for p in ports}
        for job in concurrent.futures.as_completed(jobs):
            result = job.result()
            if result:
                findings.append(result)
                print(f"  {result['port']}/tcp {result['service']}")
    return sorted(findings, key=lambda x: x["port"])

results = scan_host("10.0.0.5", range(1, 1025))
print(f"\n{len(results)} open ports found")
Enter fullscreen mode Exit fullscreen mode

The service mapping dictionary provides instant context for each open port. In a real assessment, you would expand this with a full IANA port-to-service database.

Socket-Level Banner Analysis

Banners tell you exactly what software version is running. This function grabs banners and extracts version numbers with regex.

import socket
import re

def extract_version(banner_text):
    """Pull version strings from service banners."""
    patterns = [
        r"(OpenSSH[_\s][\d.]+)",
        r"(Apache/[\d.]+)",
        r"(nginx/[\d.]+)",
        r"(MySQL[\s][\d.]+)",
        r"(\d+\.\d+\.\d+)",
    ]
    for pat in patterns:
        match = re.search(pat, banner_text)
        if match:
            return match.group(1)
    return None

def fingerprint_service(host, port, timeout=3):
    """Connect, grab banner, and extract version."""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    try:
        sock.connect((host, port))
        if port in (80, 8080, 8443):
            sock.sendall(b"HEAD / HTTP/1.0\r\nHost: target\r\n\r\n")
        banner = sock.recv(1024).decode(errors="replace")
        version = extract_version(banner)
        return {
            "port": port,
            "banner": banner.strip()[:120],
            "version": version,
        }
    except (socket.timeout, OSError):
        return {"port": port, "banner": None, "version": None}
    finally:
        sock.close()

for p in [22, 80, 3306]:
    info = fingerprint_service("10.0.0.5", p)
    if info["version"]:
        print(f"  Port {p}: {info['version']}")
Enter fullscreen mode Exit fullscreen mode

Version strings are the first thing you check against CVE databases. An outdated OpenSSH or Apache version immediately flags a finding in your assessment report.

Handling Connection States

Different connection outcomes tell you different things about the target's network configuration.

import socket
import errno

def classify_port(host, port, timeout=2):
    """Classify port state based on socket error codes."""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    try:
        code = sock.connect_ex((host, port))
        if code == 0:
            return "open"
        elif code == errno.ECONNREFUSED:
            return "closed"
        elif code == errno.ETIMEDOUT:
            return "filtered"
        else:
            return f"unknown (errno {code})"
    except socket.timeout:
        return "filtered"
    except OSError as e:
        return f"error: {e.strerror}"
    finally:
        sock.close()

for port in [22, 80, 443, 445, 8080]:
    state = classify_port("10.0.0.5", port)
    print(f"  {port}/tcp -> {state}")
Enter fullscreen mode Exit fullscreen mode

Distinguishing closed from filtered is critical for understanding the target's firewall configuration. Closed ports prove the host is alive but not listening. Filtered ports suggest a firewall is silently dropping traffic.

Key Takeaways

Socket-level scanning gives you complete control over probe behavior. Threading makes it practical for real assessments. Banner analysis turns open ports into actionable vulnerability data.


Day 9/90 of the 90 Day Python Security Scripting Challenge

Keep all security scripts in a dedicated venv with pinned dependencies. Run bandit against your own code regularly -- security tools should be held to the same standard as the systems they protect.

Top comments (0)