Overview
This SSDP proxy is a helper tool designed to make it easier for DiXiM Play to discover DIGA devices.
DiXiM Play uses a mechanism called SSDP (Simple Service Discovery Protocol) to search for recording devices and media servers on the network. However, depending on the network configuration and device response behavior, DiXiM Play may sometimes fail to discover a DIGA device correctly.
This proxy receives discovery requests from DiXiM Play, retransmits them to actual devices such as DIGA, and then returns the responses it receives back to DiXiM Play.
Purpose
The purpose of this tool is to bridge communication when device discovery requests do not properly reach between DiXiM Play and DIGA.
It is particularly useful in situations such as:
- DIGA does not appear in the DiXiM Play device list.
- DIGA is on the same network but is not detected.
- You want to forward discovery requests arriving over IPv6 as IPv4 SSDP searches toward DIGA.
- You want to more reliably discover devices that advertise themselves as MediaServer devices.
Usage
This SSDP proxy is intended to run continuously on the network.
It is typically deployed on a device such as a Raspberry Pi connected to the same network as the DIGA. Once started, the proxy receives discovery requests from DiXiM Play, converts them into a format that DIGA can respond to, and forwards them appropriately.
The basic workflow is as follows:
- DiXiM Play searches for media servers on the network.
- The SSDP proxy receives the discovery request.
- The proxy sends the request toward DIGA as an IPv4 SSDP search.
- DIGA returns a response.
- The proxy forwards the response back to DiXiM Play.
- DIGA becomes discoverable within DiXiM Play.
Main Configuration Items
When using this tool, you should verify several settings according to your environment.
Network Interface
Specify the network interface to use.
For example, if you are using a wired Ethernet connection, specify eth0. On a Raspberry Pi connected via Ethernet, this setting is often sufficient.
IPv4 Address
Configure the IPv4 address of the device running the proxy.
For example, if the Raspberry Pi's address is 192.168.40.2, set that address here. This address must be reachable on the same network as the DIGA.
Response Timeout
Configure how long the proxy waits for responses from devices such as DIGA.
If the timeout is too short, responses may be missed. If it is too long, unnecessary waiting may occur. A few seconds is usually sufficient.
Features
This SSDP proxy includes several mechanisms to improve DIGA discovery from DiXiM Play.
Supports Both IPv4 and IPv6 Discovery Requests
The proxy can receive discovery requests from DiXiM Play over either IPv4 or IPv6.
Received requests are then retransmitted as IPv4 SSDP discovery requests, which are more likely to reach DIGA devices successfully.
Can Search for MediaServer:2 Alongside MediaServer:1
When DiXiM Play searches for MediaServer:1, the proxy can optionally search for MediaServer:2 as well.
This helps reduce cases where devices are missed due to differences in device types or response behavior.
Suppresses Duplicate Responses
If the same device returns multiple similar responses, duplicate responses received within a short period are suppressed.
This prevents DiXiM Play from receiving the same response repeatedly.
Notes
This tool is intended solely to assist SSDP-based device discovery. It does not directly play recorded content from DIGA, nor does it implement DLNA functionality itself.
In addition, some network environments may restrict multicast traffic. In such cases, you may need to review the configuration of routers, switches, firewalls, and operating systems involved in the network.
Summary
This SSDP proxy is a simple helper utility designed to assist device discovery when DiXiM Play cannot find a DIGA device.
By receiving discovery requests from DiXiM Play, forwarding them appropriately to DIGA, and relaying the responses back, it improves the likelihood that DIGA will be detected successfully.
If DIGA exists on the network but cannot be found by DiXiM Play, using this proxy may provide a practical workaround.
#!/usr/bin/env python3
import asyncio
import logging
import socket
import struct
import time
from dataclasses import dataclass, field
from typing import Any
# ============================================================
# Configuration
# ============================================================
SSDP_PORT = 1900
MCAST_GRP_V4 = "239.255.255.250"
MCAST_GRP_V6 = "ff02::c"
IFACE_NAME = "eth0"
# Raspberry Pi eth0 IPv4 address
INTERFACE_IPV4 = "192.168.40.2"
# Source IPv4 address for upstream M-SEARCH
# Normally this can be the same as INTERFACE_IPV4.
UPSTREAM_SOURCE_IPV4 = INTERFACE_IPV4
# Number of seconds to wait for responses from upstream
SESSION_TTL_SECONDS = 5.0
# When MediaServer:1 is received, also search for MediaServer:2
EXPAND_MEDIASERVER_1_TO_2 = True
# Suppress duplicate forwarding of SSDP responses
RESPONSE_DEDUP_SECONDS = 3.0
# IPv4 multicast TTL
IPV4_MCAST_TTL = 4
# IPv6 multicast hop limit
IPV6_MCAST_HOPS = 10
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# ============================================================
# Data structures
# ============================================================
@dataclass
class ClientSession:
client_family: str # "IPv4" or "IPv6"
client_addr: Any # IPv4: (ip, port), IPv6: (ip, port, flowinfo, scopeid)
requested_st: str
upstream_sts: set[str]
expires_at: float
created_at: float = field(default_factory=time.monotonic)
# ============================================================
# SSDP parse/build utility
# ============================================================
def parse_ssdp_packet(data: bytes) -> dict[str, str]:
text = data.decode("utf-8", errors="replace")
lines = text.replace("\r\n", "\n").split("\n")
headers: dict[str, str] = {
"_request_line": lines[0].strip() if lines else "",
}
for line in lines[1:]:
line = line.strip()
if not line:
break
if ":" not in line:
continue
k, v = line.split(":", 1)
headers[k.strip().upper()] = v.strip()
return headers
def get_method(headers: dict[str, str]) -> str:
request_line = headers.get("_request_line", "")
if not request_line:
return ""
return request_line.split(" ", 1)[0].upper()
def is_msearch_discover(headers: dict[str, str]) -> bool:
method = get_method(headers)
if method != "M-SEARCH":
return False
man = headers.get("MAN", "")
if "ssdp:discover" not in man.lower():
return False
if not headers.get("ST", ""):
return False
return True
def normalize_st(st: str) -> str:
return st.strip()
def expanded_upstream_sts(requested_st: str) -> set[str]:
requested_st = normalize_st(requested_st)
sts = {requested_st}
if EXPAND_MEDIASERVER_1_TO_2:
if requested_st == "urn:schemas-upnp-org:device:MediaServer:1":
sts.add("urn:schemas-upnp-org:device:MediaServer:2")
return sts
def response_matches_session(response_st: str, session: ClientSession) -> bool:
response_st = normalize_st(response_st)
if session.requested_st == "ssdp:all":
return True
return response_st in session.upstream_sts
def build_ipv4_msearch(original_headers: dict[str, str], upstream_st: str) -> bytes:
"""
Rebuild an M-SEARCH received from Host A for IPv4 multicast.
Even if the request came from IPv6, rewrite HOST to 239.255.255.250:1900.
Replace ST with upstream_st.
Prefer the original MX and MAN values.
"""
mx = original_headers.get("MX", "3").strip() or "3"
man = original_headers.get("MAN", '"ssdp:discover"').strip() or '"ssdp:discover"'
payload = (
"M-SEARCH * HTTP/1.1\r\n"
f"HOST: {MCAST_GRP_V4}:{SSDP_PORT}\r\n"
f"MAN: {man}\r\n"
f"MX: {mx}\r\n"
f"ST: {upstream_st}\r\n"
"\r\n"
)
return payload.encode("ascii")
def first_line(data: bytes) -> bytes:
return data.splitlines()[0] if data.splitlines() else b""
# ============================================================
# SSDP Proxy core
# ============================================================
class SSDPProxy:
def __init__(
self,
upstream_sock_v4: socket.socket,
client_reply_sock_v4: socket.socket,
client_reply_sock_v6: socket.socket,
):
self.upstream_sock_v4 = upstream_sock_v4
self.client_reply_sock_v4 = client_reply_sock_v4
self.client_reply_sock_v6 = client_reply_sock_v6
self.sessions: list[ClientSession] = []
self.recent_responses: dict[tuple[Any, str, str], float] = {}
def cleanup(self) -> None:
now = time.monotonic()
self.sessions = [
s for s in self.sessions
if s.expires_at > now
]
self.recent_responses = {
k: expires_at
for k, expires_at in self.recent_responses.items()
if expires_at > now
}
def handle_client_msearch(
self,
family: str,
data: bytes,
addr: Any,
) -> None:
self.cleanup()
headers = parse_ssdp_packet(data)
if not is_msearch_discover(headers):
# Non M-SEARCH UDP packets are expected on SSDP multicast sockets.
# Keep this at DEBUG so the default INFO log level stays quiet.
logging.debug(
"[%s client] ignored non-M-SEARCH/discover UDP from %r first_line=%r",
family,
addr,
first_line(data),
)
return
requested_st = normalize_st(headers["ST"])
now = time.monotonic()
upstream_sts = expanded_upstream_sts(requested_st)
session = ClientSession(
client_family=family,
client_addr=addr,
requested_st=requested_st,
upstream_sts=upstream_sts,
expires_at=now + SESSION_TTL_SECONDS,
)
self.sessions.append(session)
logging.info(
"[%s client] M-SEARCH from %r ST=%r -> upstream_sts=%r",
family,
addr,
requested_st,
sorted(upstream_sts),
)
for upstream_st in sorted(upstream_sts):
payload = build_ipv4_msearch(headers, upstream_st)
try:
sent = self.upstream_sock_v4.sendto(
payload,
(MCAST_GRP_V4, SSDP_PORT),
)
logging.info(
"[upstream IPv4] sent M-SEARCH ST=%r bytes=%d for client=%r",
upstream_st,
sent,
addr,
)
except OSError as e:
logging.exception(
"[upstream IPv4] failed to send M-SEARCH ST=%r for client=%r error=%s",
upstream_st,
addr,
e,
)
def handle_upstream_response(
self,
data: bytes,
addr: tuple[str, int],
) -> None:
self.cleanup()
headers = parse_ssdp_packet(data)
request_line = headers.get("_request_line", "")
if not request_line.startswith("HTTP/"):
logging.info(
"[upstream IPv4] ignored non-response from %r first_line=%r",
addr,
first_line(data),
)
return
response_st = normalize_st(headers.get("ST", ""))
usn = headers.get("USN", "")
if not response_st:
logging.info(
"[upstream IPv4] ignored response from %r reason=missing ST first_line=%r",
addr,
first_line(data),
)
return
matched_sessions = [
s for s in self.sessions
if response_matches_session(response_st, s)
]
if not matched_sessions:
logging.info(
"[upstream IPv4] response from %r ST=%r USN=%r has no active client session",
addr,
response_st,
usn,
)
return
logging.info(
"[upstream IPv4] response from %r ST=%r USN=%r matched_sessions=%d",
addr,
response_st,
usn,
len(matched_sessions),
)
now = time.monotonic()
for session in matched_sessions:
dedup_key = (
session.client_addr,
response_st,
usn,
)
if dedup_key in self.recent_responses:
logging.info(
"[relay] duplicate response suppressed client=%r ST=%r USN=%r",
session.client_addr,
response_st,
usn,
)
continue
self.recent_responses[dedup_key] = now + RESPONSE_DEDUP_SECONDS
try:
if session.client_family == "IPv4":
sent = self.client_reply_sock_v4.sendto(
data,
session.client_addr,
)
elif session.client_family == "IPv6":
sent = self.client_reply_sock_v6.sendto(
data,
session.client_addr,
)
else:
logging.warning(
"[relay] unknown client family=%r client=%r",
session.client_family,
session.client_addr,
)
continue
logging.info(
"[relay] forwarded upstream response to [%s] %r ST=%r USN=%r bytes=%d",
session.client_family,
session.client_addr,
response_st,
usn,
sent,
)
except OSError as e:
logging.exception(
"[relay] failed to forward to [%s] %r ST=%r USN=%r error=%s",
session.client_family,
session.client_addr,
response_st,
usn,
e,
)
# ============================================================
# asyncio protocols
# ============================================================
class ClientMSearchProtocol(asyncio.DatagramProtocol):
def __init__(self, proxy: SSDPProxy, family: str):
self.proxy = proxy
self.family = family
def connection_made(self, transport: asyncio.BaseTransport) -> None:
logging.info("[%s client] listener ready", self.family)
def datagram_received(self, data: bytes, addr: Any) -> None:
self.proxy.handle_client_msearch(self.family, data, addr)
class UpstreamResponseProtocol(asyncio.DatagramProtocol):
def __init__(self, proxy: SSDPProxy):
self.proxy = proxy
def connection_made(self, transport: asyncio.BaseTransport) -> None:
logging.info("[upstream IPv4] response listener ready")
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
self.proxy.handle_upstream_response(data, addr)
# ============================================================
# Socket creation
# ============================================================
def make_client_listen_socket_v4() -> socket.socket:
"""
Receive IPv4 SSDP multicast M-SEARCH from Host A.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except OSError:
pass
sock.bind(("0.0.0.0", SSDP_PORT))
mreq = socket.inet_aton(MCAST_GRP_V4) + socket.inet_aton(INTERFACE_IPV4)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setblocking(False)
return sock
def make_client_listen_socket_v6(ifindex: int) -> socket.socket:
"""
Receive IPv6 SSDP multicast M-SEARCH from Host A.
"""
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except OSError:
pass
try:
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
except OSError:
pass
sock.bind(("::", SSDP_PORT))
group_bin = socket.inet_pton(socket.AF_INET6, MCAST_GRP_V6)
mreq = group_bin + struct.pack("@I", ifindex)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
sock.setblocking(False)
return sock
def make_upstream_socket_v4() -> socket.socket:
"""
Socket used to send IPv4 M-SEARCH to the actual device group.
Responses return to this socket's ephemeral port.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Send multicast from eth0's IPv4 address.
sock.setsockopt(
socket.IPPROTO_IP,
socket.IP_MULTICAST_IF,
socket.inet_aton(INTERFACE_IPV4),
)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, IPV4_MCAST_TTL)
# Even if the client listener receives multicast sent by this process,
# the source port is the upstream ephemeral port, so it has no practical
# downstream impact. However, disable unnecessary loopback.
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0)
# Bind to port 0 to allocate a temporary port for upstream responses.
sock.bind((UPSTREAM_SOURCE_IPV4, 0))
sock.setblocking(False)
return sock
def make_client_reply_socket_v4() -> socket.socket:
"""
Socket used to forward responses to IPv4 Host A.
Bind to port 1900 because the source port should be 1900.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except OSError:
pass
sock.bind((INTERFACE_IPV4, SSDP_PORT))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, 1)
return sock
def make_client_reply_socket_v6(ifindex: int) -> socket.socket:
"""
Socket used to forward responses to IPv6 Host A.
Bind to [::]:1900 because the source port should be 1900.
"""
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except OSError:
pass
try:
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
except OSError:
pass
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, ifindex)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, IPV6_MCAST_HOPS)
sock.bind(("::", SSDP_PORT))
return sock
# ============================================================
# main
# ============================================================
async def main() -> None:
loop = asyncio.get_running_loop()
ifindex = socket.if_nametoindex(IFACE_NAME)
logging.info("interface %s index=%d ipv4=%s", IFACE_NAME, ifindex, INTERFACE_IPV4)
client_listen_v4 = make_client_listen_socket_v4()
client_listen_v6 = make_client_listen_socket_v6(ifindex)
upstream_v4 = make_upstream_socket_v4()
client_reply_v4 = make_client_reply_socket_v4()
client_reply_v6 = make_client_reply_socket_v6(ifindex)
logging.info(
"[upstream IPv4] source=%r multicast_target=%s:%d",
upstream_v4.getsockname(),
MCAST_GRP_V4,
SSDP_PORT,
)
proxy = SSDPProxy(
upstream_sock_v4=upstream_v4,
client_reply_sock_v4=client_reply_v4,
client_reply_sock_v6=client_reply_v6,
)
await loop.create_datagram_endpoint(
lambda: ClientMSearchProtocol(proxy, "IPv4"),
sock=client_listen_v4,
)
logging.info(
"[IPv4 client] listening on 0.0.0.0:%d joined %s on %s",
SSDP_PORT,
MCAST_GRP_V4,
INTERFACE_IPV4,
)
await loop.create_datagram_endpoint(
lambda: ClientMSearchProtocol(proxy, "IPv6"),
sock=client_listen_v6,
)
logging.info(
"[IPv6 client] listening on [::]:%d joined [%s%%%s]",
SSDP_PORT,
MCAST_GRP_V6,
IFACE_NAME,
)
await loop.create_datagram_endpoint(
lambda: UpstreamResponseProtocol(proxy),
sock=upstream_v4,
)
await asyncio.Event().wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("stopped")
Top comments (0)