DEV Community

vast cow
vast cow

Posted on

SSDP Proxy to Help DiXiM Play Discover DIGA

Overview

This SSDP proxy is a helper tool designed to make it easier for DiXiM Play to discover DIGA devices.

DiXiM Play uses a mechanism called SSDP (Simple Service Discovery Protocol) to search for recording devices and media servers on the network. However, depending on the network configuration and device response behavior, DiXiM Play may sometimes fail to discover a DIGA device correctly.

This proxy receives discovery requests from DiXiM Play, retransmits them to actual devices such as DIGA, and then returns the responses it receives back to DiXiM Play.

Purpose

The purpose of this tool is to bridge communication when device discovery requests do not properly reach between DiXiM Play and DIGA.

It is particularly useful in situations such as:

  • DIGA does not appear in the DiXiM Play device list.
  • DIGA is on the same network but is not detected.
  • You want to forward discovery requests arriving over IPv6 as IPv4 SSDP searches toward DIGA.
  • You want to more reliably discover devices that advertise themselves as MediaServer devices.

Usage

This SSDP proxy is intended to run continuously on the network.

It is typically deployed on a device such as a Raspberry Pi connected to the same network as the DIGA. Once started, the proxy receives discovery requests from DiXiM Play, converts them into a format that DIGA can respond to, and forwards them appropriately.

The basic workflow is as follows:

  1. DiXiM Play searches for media servers on the network.
  2. The SSDP proxy receives the discovery request.
  3. The proxy sends the request toward DIGA as an IPv4 SSDP search.
  4. DIGA returns a response.
  5. The proxy forwards the response back to DiXiM Play.
  6. DIGA becomes discoverable within DiXiM Play.

Main Configuration Items

When using this tool, you should verify several settings according to your environment.

Network Interface

Specify the network interface to use.

For example, if you are using a wired Ethernet connection, specify eth0. On a Raspberry Pi connected via Ethernet, this setting is often sufficient.

IPv4 Address

Configure the IPv4 address of the device running the proxy.

For example, if the Raspberry Pi's address is 192.168.40.2, set that address here. This address must be reachable on the same network as the DIGA.

Response Timeout

Configure how long the proxy waits for responses from devices such as DIGA.

If the timeout is too short, responses may be missed. If it is too long, unnecessary waiting may occur. A few seconds is usually sufficient.

Features

This SSDP proxy includes several mechanisms to improve DIGA discovery from DiXiM Play.

Supports Both IPv4 and IPv6 Discovery Requests

The proxy can receive discovery requests from DiXiM Play over either IPv4 or IPv6.

Received requests are then retransmitted as IPv4 SSDP discovery requests, which are more likely to reach DIGA devices successfully.

Can Search for MediaServer:2 Alongside MediaServer:1

When DiXiM Play searches for MediaServer:1, the proxy can optionally search for MediaServer:2 as well.

This helps reduce cases where devices are missed due to differences in device types or response behavior.

Suppresses Duplicate Responses

If the same device returns multiple similar responses, duplicate responses received within a short period are suppressed.

This prevents DiXiM Play from receiving the same response repeatedly.

Notes

This tool is intended solely to assist SSDP-based device discovery. It does not directly play recorded content from DIGA, nor does it implement DLNA functionality itself.

In addition, some network environments may restrict multicast traffic. In such cases, you may need to review the configuration of routers, switches, firewalls, and operating systems involved in the network.

Summary

This SSDP proxy is a simple helper utility designed to assist device discovery when DiXiM Play cannot find a DIGA device.

By receiving discovery requests from DiXiM Play, forwarding them appropriately to DIGA, and relaying the responses back, it improves the likelihood that DIGA will be detected successfully.

If DIGA exists on the network but cannot be found by DiXiM Play, using this proxy may provide a practical workaround.

#!/usr/bin/env python3
import asyncio
import logging
import socket
import struct
import time
from dataclasses import dataclass, field
from typing import Any


# ============================================================
# Configuration
# ============================================================

SSDP_PORT = 1900

MCAST_GRP_V4 = "239.255.255.250"
MCAST_GRP_V6 = "ff02::c"

IFACE_NAME = "eth0"

# Raspberry Pi eth0 IPv4 address
INTERFACE_IPV4 = "192.168.40.2"

# Source IPv4 address for upstream M-SEARCH
# Normally this can be the same as INTERFACE_IPV4.
UPSTREAM_SOURCE_IPV4 = INTERFACE_IPV4

# Number of seconds to wait for responses from upstream
SESSION_TTL_SECONDS = 5.0

# When MediaServer:1 is received, also search for MediaServer:2
EXPAND_MEDIASERVER_1_TO_2 = True

# Suppress duplicate forwarding of SSDP responses
RESPONSE_DEDUP_SECONDS = 3.0

# IPv4 multicast TTL
IPV4_MCAST_TTL = 4

# IPv6 multicast hop limit
IPV6_MCAST_HOPS = 10


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)


# ============================================================
# Data structures
# ============================================================

@dataclass
class ClientSession:
    client_family: str              # "IPv4" or "IPv6"
    client_addr: Any                # IPv4: (ip, port), IPv6: (ip, port, flowinfo, scopeid)
    requested_st: str
    upstream_sts: set[str]
    expires_at: float
    created_at: float = field(default_factory=time.monotonic)


# ============================================================
# SSDP parse/build utility
# ============================================================

def parse_ssdp_packet(data: bytes) -> dict[str, str]:
    text = data.decode("utf-8", errors="replace")
    lines = text.replace("\r\n", "\n").split("\n")

    headers: dict[str, str] = {
        "_request_line": lines[0].strip() if lines else "",
    }

    for line in lines[1:]:
        line = line.strip()
        if not line:
            break

        if ":" not in line:
            continue

        k, v = line.split(":", 1)
        headers[k.strip().upper()] = v.strip()

    return headers


def get_method(headers: dict[str, str]) -> str:
    request_line = headers.get("_request_line", "")
    if not request_line:
        return ""

    return request_line.split(" ", 1)[0].upper()


def is_msearch_discover(headers: dict[str, str]) -> bool:
    method = get_method(headers)
    if method != "M-SEARCH":
        return False

    man = headers.get("MAN", "")
    if "ssdp:discover" not in man.lower():
        return False

    if not headers.get("ST", ""):
        return False

    return True


def normalize_st(st: str) -> str:
    return st.strip()


def expanded_upstream_sts(requested_st: str) -> set[str]:
    requested_st = normalize_st(requested_st)

    sts = {requested_st}

    if EXPAND_MEDIASERVER_1_TO_2:
        if requested_st == "urn:schemas-upnp-org:device:MediaServer:1":
            sts.add("urn:schemas-upnp-org:device:MediaServer:2")

    return sts


def response_matches_session(response_st: str, session: ClientSession) -> bool:
    response_st = normalize_st(response_st)

    if session.requested_st == "ssdp:all":
        return True

    return response_st in session.upstream_sts


def build_ipv4_msearch(original_headers: dict[str, str], upstream_st: str) -> bytes:
    """
    Rebuild an M-SEARCH received from Host A for IPv4 multicast.

    Even if the request came from IPv6, rewrite HOST to 239.255.255.250:1900.
    Replace ST with upstream_st.
    Prefer the original MX and MAN values.
    """
    mx = original_headers.get("MX", "3").strip() or "3"
    man = original_headers.get("MAN", '"ssdp:discover"').strip() or '"ssdp:discover"'

    payload = (
        "M-SEARCH * HTTP/1.1\r\n"
        f"HOST: {MCAST_GRP_V4}:{SSDP_PORT}\r\n"
        f"MAN: {man}\r\n"
        f"MX: {mx}\r\n"
        f"ST: {upstream_st}\r\n"
        "\r\n"
    )

    return payload.encode("ascii")


def first_line(data: bytes) -> bytes:
    return data.splitlines()[0] if data.splitlines() else b""


# ============================================================
# SSDP Proxy core
# ============================================================

class SSDPProxy:
    def __init__(
        self,
        upstream_sock_v4: socket.socket,
        client_reply_sock_v4: socket.socket,
        client_reply_sock_v6: socket.socket,
    ):
        self.upstream_sock_v4 = upstream_sock_v4
        self.client_reply_sock_v4 = client_reply_sock_v4
        self.client_reply_sock_v6 = client_reply_sock_v6

        self.sessions: list[ClientSession] = []
        self.recent_responses: dict[tuple[Any, str, str], float] = {}

    def cleanup(self) -> None:
        now = time.monotonic()

        self.sessions = [
            s for s in self.sessions
            if s.expires_at > now
        ]

        self.recent_responses = {
            k: expires_at
            for k, expires_at in self.recent_responses.items()
            if expires_at > now
        }

    def handle_client_msearch(
        self,
        family: str,
        data: bytes,
        addr: Any,
    ) -> None:
        self.cleanup()

        headers = parse_ssdp_packet(data)

        if not is_msearch_discover(headers):
            # Non M-SEARCH UDP packets are expected on SSDP multicast sockets.
            # Keep this at DEBUG so the default INFO log level stays quiet.
            logging.debug(
                "[%s client] ignored non-M-SEARCH/discover UDP from %r first_line=%r",
                family,
                addr,
                first_line(data),
            )
            return

        requested_st = normalize_st(headers["ST"])

        now = time.monotonic()

        upstream_sts = expanded_upstream_sts(requested_st)

        session = ClientSession(
            client_family=family,
            client_addr=addr,
            requested_st=requested_st,
            upstream_sts=upstream_sts,
            expires_at=now + SESSION_TTL_SECONDS,
        )
        self.sessions.append(session)

        logging.info(
            "[%s client] M-SEARCH from %r ST=%r -> upstream_sts=%r",
            family,
            addr,
            requested_st,
            sorted(upstream_sts),
        )

        for upstream_st in sorted(upstream_sts):
            payload = build_ipv4_msearch(headers, upstream_st)
            try:
                sent = self.upstream_sock_v4.sendto(
                    payload,
                    (MCAST_GRP_V4, SSDP_PORT),
                )
                logging.info(
                    "[upstream IPv4] sent M-SEARCH ST=%r bytes=%d for client=%r",
                    upstream_st,
                    sent,
                    addr,
                )
            except OSError as e:
                logging.exception(
                    "[upstream IPv4] failed to send M-SEARCH ST=%r for client=%r error=%s",
                    upstream_st,
                    addr,
                    e,
                )

    def handle_upstream_response(
        self,
        data: bytes,
        addr: tuple[str, int],
    ) -> None:
        self.cleanup()

        headers = parse_ssdp_packet(data)
        request_line = headers.get("_request_line", "")

        if not request_line.startswith("HTTP/"):
            logging.info(
                "[upstream IPv4] ignored non-response from %r first_line=%r",
                addr,
                first_line(data),
            )
            return

        response_st = normalize_st(headers.get("ST", ""))
        usn = headers.get("USN", "")

        if not response_st:
            logging.info(
                "[upstream IPv4] ignored response from %r reason=missing ST first_line=%r",
                addr,
                first_line(data),
            )
            return

        matched_sessions = [
            s for s in self.sessions
            if response_matches_session(response_st, s)
        ]

        if not matched_sessions:
            logging.info(
                "[upstream IPv4] response from %r ST=%r USN=%r has no active client session",
                addr,
                response_st,
                usn,
            )
            return

        logging.info(
            "[upstream IPv4] response from %r ST=%r USN=%r matched_sessions=%d",
            addr,
            response_st,
            usn,
            len(matched_sessions),
        )

        now = time.monotonic()

        for session in matched_sessions:
            dedup_key = (
                session.client_addr,
                response_st,
                usn,
            )

            if dedup_key in self.recent_responses:
                logging.info(
                    "[relay] duplicate response suppressed client=%r ST=%r USN=%r",
                    session.client_addr,
                    response_st,
                    usn,
                )
                continue

            self.recent_responses[dedup_key] = now + RESPONSE_DEDUP_SECONDS

            try:
                if session.client_family == "IPv4":
                    sent = self.client_reply_sock_v4.sendto(
                        data,
                        session.client_addr,
                    )
                elif session.client_family == "IPv6":
                    sent = self.client_reply_sock_v6.sendto(
                        data,
                        session.client_addr,
                    )
                else:
                    logging.warning(
                        "[relay] unknown client family=%r client=%r",
                        session.client_family,
                        session.client_addr,
                    )
                    continue

                logging.info(
                    "[relay] forwarded upstream response to [%s] %r ST=%r USN=%r bytes=%d",
                    session.client_family,
                    session.client_addr,
                    response_st,
                    usn,
                    sent,
                )

            except OSError as e:
                logging.exception(
                    "[relay] failed to forward to [%s] %r ST=%r USN=%r error=%s",
                    session.client_family,
                    session.client_addr,
                    response_st,
                    usn,
                    e,
                )


# ============================================================
# asyncio protocols
# ============================================================

class ClientMSearchProtocol(asyncio.DatagramProtocol):
    def __init__(self, proxy: SSDPProxy, family: str):
        self.proxy = proxy
        self.family = family

    def connection_made(self, transport: asyncio.BaseTransport) -> None:
        logging.info("[%s client] listener ready", self.family)

    def datagram_received(self, data: bytes, addr: Any) -> None:
        self.proxy.handle_client_msearch(self.family, data, addr)


class UpstreamResponseProtocol(asyncio.DatagramProtocol):
    def __init__(self, proxy: SSDPProxy):
        self.proxy = proxy

    def connection_made(self, transport: asyncio.BaseTransport) -> None:
        logging.info("[upstream IPv4] response listener ready")

    def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
        self.proxy.handle_upstream_response(data, addr)


# ============================================================
# Socket creation
# ============================================================

def make_client_listen_socket_v4() -> socket.socket:
    """
    Receive IPv4 SSDP multicast M-SEARCH from Host A.
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    try:
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    except OSError:
        pass

    sock.bind(("0.0.0.0", SSDP_PORT))

    mreq = socket.inet_aton(MCAST_GRP_V4) + socket.inet_aton(INTERFACE_IPV4)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)

    sock.setblocking(False)
    return sock


def make_client_listen_socket_v6(ifindex: int) -> socket.socket:
    """
    Receive IPv6 SSDP multicast M-SEARCH from Host A.
    """
    sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    try:
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    except OSError:
        pass

    try:
        sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
    except OSError:
        pass

    sock.bind(("::", SSDP_PORT))

    group_bin = socket.inet_pton(socket.AF_INET6, MCAST_GRP_V6)
    mreq = group_bin + struct.pack("@I", ifindex)
    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)

    sock.setblocking(False)
    return sock


def make_upstream_socket_v4() -> socket.socket:
    """
    Socket used to send IPv4 M-SEARCH to the actual device group.
    Responses return to this socket's ephemeral port.
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    # Send multicast from eth0's IPv4 address.
    sock.setsockopt(
        socket.IPPROTO_IP,
        socket.IP_MULTICAST_IF,
        socket.inet_aton(INTERFACE_IPV4),
    )

    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, IPV4_MCAST_TTL)

    # Even if the client listener receives multicast sent by this process,
    # the source port is the upstream ephemeral port, so it has no practical
    # downstream impact. However, disable unnecessary loopback.
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0)

    # Bind to port 0 to allocate a temporary port for upstream responses.
    sock.bind((UPSTREAM_SOURCE_IPV4, 0))

    sock.setblocking(False)
    return sock


def make_client_reply_socket_v4() -> socket.socket:
    """
    Socket used to forward responses to IPv4 Host A.
    Bind to port 1900 because the source port should be 1900.
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    try:
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    except OSError:
        pass

    sock.bind((INTERFACE_IPV4, SSDP_PORT))
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, 1)

    return sock


def make_client_reply_socket_v6(ifindex: int) -> socket.socket:
    """
    Socket used to forward responses to IPv6 Host A.
    Bind to [::]:1900 because the source port should be 1900.
    """
    sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    try:
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    except OSError:
        pass

    try:
        sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
    except OSError:
        pass

    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, ifindex)
    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, IPV6_MCAST_HOPS)

    sock.bind(("::", SSDP_PORT))

    return sock


# ============================================================
# main
# ============================================================

async def main() -> None:
    loop = asyncio.get_running_loop()

    ifindex = socket.if_nametoindex(IFACE_NAME)

    logging.info("interface %s index=%d ipv4=%s", IFACE_NAME, ifindex, INTERFACE_IPV4)

    client_listen_v4 = make_client_listen_socket_v4()
    client_listen_v6 = make_client_listen_socket_v6(ifindex)

    upstream_v4 = make_upstream_socket_v4()

    client_reply_v4 = make_client_reply_socket_v4()
    client_reply_v6 = make_client_reply_socket_v6(ifindex)

    logging.info(
        "[upstream IPv4] source=%r multicast_target=%s:%d",
        upstream_v4.getsockname(),
        MCAST_GRP_V4,
        SSDP_PORT,
    )

    proxy = SSDPProxy(
        upstream_sock_v4=upstream_v4,
        client_reply_sock_v4=client_reply_v4,
        client_reply_sock_v6=client_reply_v6,
    )

    await loop.create_datagram_endpoint(
        lambda: ClientMSearchProtocol(proxy, "IPv4"),
        sock=client_listen_v4,
    )

    logging.info(
        "[IPv4 client] listening on 0.0.0.0:%d joined %s on %s",
        SSDP_PORT,
        MCAST_GRP_V4,
        INTERFACE_IPV4,
    )

    await loop.create_datagram_endpoint(
        lambda: ClientMSearchProtocol(proxy, "IPv6"),
        sock=client_listen_v6,
    )

    logging.info(
        "[IPv6 client] listening on [::]:%d joined [%s%%%s]",
        SSDP_PORT,
        MCAST_GRP_V6,
        IFACE_NAME,
    )

    await loop.create_datagram_endpoint(
        lambda: UpstreamResponseProtocol(proxy),
        sock=upstream_v4,
    )

    await asyncio.Event().wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logging.info("stopped")
Enter fullscreen mode Exit fullscreen mode

Top comments (0)