DEV Community

Masui Masanori
Masui Masanori

Posted on

11 3

[Go] Try Pion/WebRTC with SSE

Intro

This time, I will try Pion/WebRTC.

Because Pion already has good examples, I will create a sample based on SFU-WebSocket of example-webrtc-applications.

I will try changing these points.

  • Use SSE(Server-Sent Events) for signaling
  • Start connecting manually

I will add WebRTC functions into the last sample project.

And I also refer this post(especially the client-side).

Environments

  • Go ver.go1.18.2 windows/amd64
  • Node.js ver.18.1.0

Connecting with WebRTC SFU

When I tried WebRTC last time, the server-side application just worked for signaling.
After signaling, the clients were directly connected to each other.

Image description

This time they will only connect to the server-side application.
After connecting, the clients will send video tracks and audio tracks to the server-side application.
And The server-side application send other clients' tracks as remote tracks to the clients.

Image description

Samples

This time, I publish the sample project on GitHub.

Client-side

Because the process for connecting starts from the server-side application.

So the client-side just needs handling offer and ICE candidate events.
And I will create a RTCPeerConnection on start.

main.page.ts



...
function handleReceivedMessage(value: string) {
const message = JSON.parse(value);
if(!checkIsClientMessage(message)) {
return;
}
switch(message.event) {
case "text":
view.addReceivedText({ user: message.userName, message: message.data });
break;
case "offer":
webrtc.handleOffer(JSON.parse(message.data));
break;
case "candidate":
webrtc.handleCandidate(JSON.parse(message.data));
break;
}
}
function sendAnswer(data: RTCSessionDescriptionInit) {
if(!hasAnyTexts(userName)) {
return;
}
sse.sendMessage({userName, event: "answer", data: JSON.stringify(data)});
}
function sendCandidate(data: RTCIceCandidate) {
if(!hasAnyTexts(userName)) {
return;
}
sse.sendMessage({userName, event: "candidate", data: JSON.stringify(data)});
}
function checkIsClientMessage(value: any): value is ClientMessage {
// All messages from the server-side application are sent as "ClientMessage".
if(value == null) {
return false;
}
if(("event" in value &&
typeof value["event"] === "string") === false) {
return false;
}
if(("data" in value &&
typeof value["data"] === "string") === false) {
return false;
}
return true;
}
init();

Enter fullscreen mode Exit fullscreen mode




webrtc.controller.ts




export class WebRtcController {
private webcamStream: MediaStream|null = null;
private peerConnection: RTCPeerConnection|null = null;
private answerSentEvent: ((data: RTCSessionDescriptionInit) => void)|null = null;
private candidateSentEvent: ((data: RTCIceCandidate) => void)|null = null;
public init() {
const localVideo = document.getElementById("local_video") as HTMLVideoElement;
localVideo.addEventListener("canplay", () => {
const width = 320;
const height = localVideo.videoHeight / (localVideo.videoWidth/width);

localVideo.setAttribute("width", width.toString());
localVideo.setAttribute("height", height.toString());
}, false);
navigator.mediaDevices.getUserMedia({ video: true, audio: true })
.then(stream => {
localVideo.srcObject = stream;
localVideo.play();
this.webcamStream = stream;
// create a RTCPeerConnection after getting local MediaStream
this.connect();
})
.catch(err => console.error(An error occurred: </span><span class="p">${</span><span class="nx">err</span><span class="p">}</span><span class="s2">));
}
...
/** handle received offer and send answer /
public handleOffer(data: RTCSessionDescription|null|undefined) {
if(this.peerConnection == null ||
data == null) {
return;
}
this.peerConnection.setRemoteDescription(data);
this.peerConnection.createAnswer()
.then(answer => {
if(this.peerConnection != null) {
this.peerConnection.setLocalDescription(answer);
}
if(this.answerSentEvent != null) {
this.answerSentEvent(answer);
}
});
}
/* add ICE Candidate */
public handleCandidate(data: RTCIceCandidate|null|undefined) {
if(this.peerConnection == null ||
data == null) {
return;
}
this.peerConnection.addIceCandidate(data);
}
private connect() {
if(this.webcamStream == null) {
return;
}
this.peerConnection = new RTCPeerConnection({
iceServers: [{
urls: stun:stun.l.google.com:19302, // A STUN server
}]
});
// Add remote video tracks as new video elements.
this.peerConnection.ontrack = (ev) => {
if (ev.track.kind === "audio" ||
ev.streams[0] == null) {
return;
}

const remoteVideo = document.createElement("video");
remoteVideo.srcObject = ev.streams[0];
remoteVideo.autoplay = true;
remoteVideo.controls = true;
const videoArea = document.getElementById("remote_video_area") as HTMLElement;
videoArea.appendChild(remoteVideo);
ev.track.onmute = () => {
remoteVideo.play();
};
ev.streams[0].onremovetrack = () => {
if (remoteVideo.parentNode) {
remoteVideo.parentNode.removeChild(remoteVideo);
}
};
};
this.webcamStream.getTracks().forEach(track => {
if(this.peerConnection == null ||
this.webcamStream == null) {
return;
}
this.peerConnection.addTrack(track, this.webcamStream)
});
this.peerConnection.onicecandidate = ev => {
if (ev.candidate == null ||
this.candidateSentEvent == null) {
return;
}
this.candidateSentEvent(ev.candidate);
};
}

}

Enter fullscreen mode Exit fullscreen mode




Server-side

sseClient.go



package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"

<span class="s">"github.com/pion/webrtc/v3"</span>
Enter fullscreen mode Exit fullscreen mode

)

type SSEClient struct {
candidateFound chan webrtc.ICECandidate
changeConnectionState chan webrtc.PeerConnectionState
addTrack chan webrtc.TrackRemote
userName string
w http.ResponseWriter
}

func newSSEClient(userName string, w http.ResponseWriter) SSEClient {
return &SSEClient{
candidateFound: make(chan webrtc.ICECandidate),
changeConnectionState: make(chan webrtc.PeerConnectionState),
addTrack: make(chan *webrtc.TrackRemote),
userName: userName,
w: w,
}
}

func registerSSEClient(w http.ResponseWriter, r http.Request, hub SSEHub) {
userName, err := getParam(r, "user")
if err != nil {
log.Println(err.Error())
fmt.Fprint(w, "The parameters have no names")
return
}
newClient := newSSEClient(userName, w)
ps, err := NewPeerConnectionState(newClient)
if err != nil {
log.Println(err.Error())
fmt.Fprint(w, "Failed connection")
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

<span class="n">hub</span><span class="o">.</span><span class="n">register</span> <span class="o">&lt;-</span> <span class="n">ps</span>

<span class="c">// For pushing data to clients, I call "flusher.Flush()"</span>
<span class="n">flusher</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">w</span><span class="o">.</span><span class="p">(</span><span class="n">http</span><span class="o">.</span><span class="n">Flusher</span><span class="p">)</span>
<span class="k">defer</span> <span class="k">func</span><span class="p">()</span> <span class="p">{</span>
    <span class="n">hub</span><span class="o">.</span><span class="n">unregister</span> <span class="o">&lt;-</span> <span class="n">ps</span>
    <span class="k">if</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">ConnectionState</span><span class="p">()</span> <span class="o">!=</span> <span class="n">webrtc</span><span class="o">.</span><span class="n">PeerConnectionStateClosed</span> <span class="p">{</span>
        <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">Close</span><span class="p">()</span>
    <span class="p">}</span>
    <span class="nb">close</span><span class="p">(</span><span class="n">newClient</span><span class="o">.</span><span class="n">candidateFound</span><span class="p">)</span>
    <span class="nb">close</span><span class="p">(</span><span class="n">newClient</span><span class="o">.</span><span class="n">changeConnectionState</span><span class="p">)</span>
    <span class="nb">close</span><span class="p">(</span><span class="n">newClient</span><span class="o">.</span><span class="n">addTrack</span><span class="p">)</span>
<span class="p">}()</span>
<span class="k">for</span> <span class="p">{</span>
    <span class="c">// handle PeerConnection events and close SSE event.</span>
    <span class="k">select</span> <span class="p">{</span>
    <span class="k">case</span> <span class="n">candidate</span> <span class="o">:=</span> <span class="o">&lt;-</span><span class="n">newClient</span><span class="o">.</span><span class="n">candidateFound</span><span class="o">:</span>
        <span class="n">jsonValue</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">NewCandidateMessageJSON</span><span class="p">(</span><span class="n">newClient</span><span class="o">.</span><span class="n">userName</span><span class="p">,</span> <span class="n">candidate</span><span class="p">)</span>
        <span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
            <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="o">.</span><span class="n">Error</span><span class="p">())</span>
            <span class="k">return</span>
        <span class="p">}</span>
        <span class="n">fmt</span><span class="o">.</span><span class="n">Fprintf</span><span class="p">(</span><span class="n">w</span><span class="p">,</span> <span class="s">"data: %s</span><span class="se">\n\n</span><span class="s">"</span><span class="p">,</span> <span class="n">jsonValue</span><span class="p">)</span>
        <span class="n">flusher</span><span class="o">.</span><span class="n">Flush</span><span class="p">()</span>
    <span class="k">case</span> <span class="n">track</span> <span class="o">:=</span> <span class="o">&lt;-</span><span class="n">newClient</span><span class="o">.</span><span class="n">addTrack</span><span class="o">:</span>
        <span class="n">hub</span><span class="o">.</span><span class="n">addTrack</span> <span class="o">&lt;-</span> <span class="n">track</span>
    <span class="k">case</span> <span class="n">connectionState</span> <span class="o">:=</span> <span class="o">&lt;-</span><span class="n">newClient</span><span class="o">.</span><span class="n">changeConnectionState</span><span class="o">:</span>
        <span class="k">switch</span> <span class="n">connectionState</span> <span class="p">{</span>
        <span class="k">case</span> <span class="n">webrtc</span><span class="o">.</span><span class="n">PeerConnectionStateFailed</span><span class="o">:</span>
            <span class="k">return</span>
        <span class="k">case</span> <span class="n">webrtc</span><span class="o">.</span><span class="n">PeerConnectionStateClosed</span><span class="o">:</span>
            <span class="k">return</span>
        <span class="p">}</span>
    <span class="k">case</span> <span class="o">&lt;-</span><span class="n">r</span><span class="o">.</span><span class="n">Context</span><span class="p">()</span><span class="o">.</span><span class="n">Done</span><span class="p">()</span><span class="o">:</span>
        <span class="c">// when "es.close()" is called, this loop operation will be ended.</span>
        <span class="k">return</span>
    <span class="p">}</span>
<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}
func sendSSEMessage(w http.ResponseWriter, r http.Request, hub SSEHub) {
w.Header().Set("Content-Type", "application/json")
body, err := ioutil.ReadAll(r.Body)

<span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
    <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="o">.</span><span class="n">Error</span><span class="p">())</span>
    <span class="n">j</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">json</span><span class="o">.</span><span class="n">Marshal</span><span class="p">(</span><span class="n">GetFailed</span><span class="p">(</span><span class="s">"Failed reading values from body"</span><span class="p">))</span>
    <span class="n">w</span><span class="o">.</span><span class="n">Write</span><span class="p">(</span><span class="n">j</span><span class="p">)</span>
    <span class="k">return</span>
<span class="p">}</span>
<span class="n">message</span> <span class="o">:=</span> <span class="o">&amp;</span><span class="n">ClientMessage</span><span class="p">{}</span>
<span class="n">err</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">Unmarshal</span><span class="p">(</span><span class="n">body</span><span class="p">,</span> <span class="o">&amp;</span><span class="n">message</span><span class="p">)</span>
<span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
    <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="o">.</span><span class="n">Error</span><span class="p">())</span>
    <span class="n">j</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">json</span><span class="o">.</span><span class="n">Marshal</span><span class="p">(</span><span class="n">GetFailed</span><span class="p">(</span><span class="s">"Failed converting to ClientMessage"</span><span class="p">))</span>
    <span class="n">w</span><span class="o">.</span><span class="n">Write</span><span class="p">(</span><span class="n">j</span><span class="p">)</span>
    <span class="k">return</span>
<span class="p">}</span>
<span class="n">w</span><span class="o">.</span><span class="n">WriteHeader</span><span class="p">(</span><span class="m">200</span><span class="p">)</span>
<span class="n">hub</span><span class="o">.</span><span class="n">broadcast</span> <span class="o">&lt;-</span> <span class="o">*</span><span class="n">message</span>
<span class="n">data</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">json</span><span class="o">.</span><span class="n">Marshal</span><span class="p">(</span><span class="n">GetSucceeded</span><span class="p">())</span>
<span class="n">w</span><span class="o">.</span><span class="n">Write</span><span class="p">(</span><span class="n">data</span><span class="p">)</span>
Enter fullscreen mode Exit fullscreen mode

}

Enter fullscreen mode Exit fullscreen mode




peerConnectionState.go




package main

import (
"github.com/pion/webrtc/v3"
)

type PeerConnectionState struct {
peerConnection webrtc.PeerConnection
client SSEClient
}

func NewPeerConnectionState(client SSEClient) (PeerConnectionState, error) {
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{
"stun:stun.l.google.com:19302",
},
},
},
})
if err != nil {
return nil, err
}
for , typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
if , err := peerConnection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
}); err != nil {
return nil, err
}
}
// Add event handlers.
peerConnection.OnICECandidate(func(i webrtc.ICECandidate) {
if i == nil {
return
}
client.candidateFound <- i
})
peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
// avoid panic after closing channel
if p == webrtc.PeerConnectionStateClosed {
_, ok := <-client.changeConnectionState
if ok {
client.changeConnectionState <- p
}
return
}
client.changeConnectionState <- p
})
peerConnection.OnTrack(func(t webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
client.addTrack <- t
})

<span class="k">return</span> <span class="o">&amp;</span><span class="n">PeerConnectionState</span><span class="p">{</span>
    <span class="n">peerConnection</span><span class="o">:</span> <span class="n">peerConnection</span><span class="p">,</span>
    <span class="n">client</span><span class="o">:</span>         <span class="n">client</span><span class="p">,</span>
<span class="p">},</span> <span class="no">nil</span>
Enter fullscreen mode Exit fullscreen mode

}

Enter fullscreen mode Exit fullscreen mode




sseHub.go




package main

import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"

<span class="s">"github.com/pion/rtcp"</span>
<span class="s">"github.com/pion/webrtc/v3"</span>
Enter fullscreen mode Exit fullscreen mode

)

type SSEHub struct {
clients map[PeerConnectionState]bool
broadcast chan ClientMessage
register chan PeerConnectionState
unregister chan PeerConnectionState
trackLocals map[string]webrtc.TrackLocalStaticRTP
addTrack chan *webrtc.TrackRemote
}

func newSSEHub() SSEHub {
return &SSEHub{
clients: make(map[PeerConnectionState]bool),
broadcast: make(chan ClientMessage),
register: make(chan PeerConnectionState),
unregister: make(chan PeerConnectionState),
trackLocals: map[string]webrtc.TrackLocalStaticRTP{},
addTrack: make(chan webrtc.TrackRemote),
}
}
func (h SSEHub) run() {
go func() {
for range time.NewTicker(time.Second 3).C {
dispatchKeyFrame(h)
}
}()
for {
select {
case client := <-h.register:
h.clients[client] = true
signalPeerConnections(h)
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
signalPeerConnections(h)
}
case track := <-h.addTrack:
trackLocal, err := webrtc.NewTrackLocalStaticRTP(track.Codec().RTPCodecCapability,
track.ID(), track.StreamID())
if err != nil {
log.Println(err.Error())
return
}
h.trackLocals[track.ID()] = trackLocal
signalPeerConnections(h)
go updateTrackValue(h, track)

    <span class="k">case</span> <span class="n">message</span> <span class="o">:=</span> <span class="o">&lt;-</span><span class="n">h</span><span class="o">.</span><span class="n">broadcast</span><span class="o">:</span>
        <span class="n">handleReceivedMessage</span><span class="p">(</span><span class="n">h</span><span class="p">,</span> <span class="n">message</span><span class="p">)</span>
    <span class="p">}</span>
<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}
func updateTrackValue(h SSEHub, track webrtc.TrackRemote) {
defer func() {
delete(h.trackLocals, track.ID())
signalPeerConnections(h)
}()

<span class="n">buf</span> <span class="o">:=</span> <span class="nb">make</span><span class="p">([]</span><span class="kt">byte</span><span class="p">,</span> <span class="m">1500</span><span class="p">)</span>

<span class="k">for</span> <span class="p">{</span>
    <span class="n">i</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">track</span><span class="o">.</span><span class="n">Read</span><span class="p">(</span><span class="n">buf</span><span class="p">)</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="k">return</span>
    <span class="p">}</span>
    <span class="k">if</span> <span class="n">_</span><span class="p">,</span> <span class="n">err</span> <span class="o">=</span> <span class="n">h</span><span class="o">.</span><span class="n">trackLocals</span><span class="p">[</span><span class="n">track</span><span class="o">.</span><span class="n">ID</span><span class="p">()]</span><span class="o">.</span><span class="n">Write</span><span class="p">(</span><span class="n">buf</span><span class="p">[</span><span class="o">:</span><span class="n">i</span><span class="p">]);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="k">return</span>
    <span class="p">}</span>
<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}
func handleReceivedMessage(h *SSEHub, message ClientMessage) {
switch message.Event {
case TextEvent:
m, _ := json.Marshal(message)
jsonText := string(m)

    <span class="k">for</span> <span class="n">client</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">h</span><span class="o">.</span><span class="n">clients</span> <span class="p">{</span>
        <span class="n">flusher</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">client</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">w</span><span class="o">.</span><span class="p">(</span><span class="n">http</span><span class="o">.</span><span class="n">Flusher</span><span class="p">)</span>

        <span class="n">fmt</span><span class="o">.</span><span class="n">Fprintf</span><span class="p">(</span><span class="n">client</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">w</span><span class="p">,</span> <span class="s">"data: %s</span><span class="se">\n\n</span><span class="s">"</span><span class="p">,</span> <span class="n">jsonText</span><span class="p">)</span>
        <span class="n">flusher</span><span class="o">.</span><span class="n">Flush</span><span class="p">()</span>
    <span class="p">}</span>
<span class="k">case</span> <span class="n">CandidateEvent</span><span class="o">:</span>
    <span class="n">candidate</span> <span class="o">:=</span> <span class="n">webrtc</span><span class="o">.</span><span class="n">ICECandidateInit</span><span class="p">{}</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">json</span><span class="o">.</span><span class="n">Unmarshal</span><span class="p">([]</span><span class="kt">byte</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">Data</span><span class="p">),</span> <span class="o">&amp;</span><span class="n">candidate</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
        <span class="k">return</span>
    <span class="p">}</span>
    <span class="k">for</span> <span class="n">pc</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">h</span><span class="o">.</span><span class="n">clients</span> <span class="p">{</span>
        <span class="k">if</span> <span class="n">pc</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">userName</span> <span class="o">==</span> <span class="n">message</span><span class="o">.</span><span class="n">UserName</span> <span class="p">{</span>
            <span class="k">if</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">pc</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">AddICECandidate</span><span class="p">(</span><span class="n">candidate</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
                <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
                <span class="k">return</span>
            <span class="p">}</span>
        <span class="p">}</span>
    <span class="p">}</span>
<span class="k">case</span> <span class="n">AnswerEvent</span><span class="o">:</span>
    <span class="n">answer</span> <span class="o">:=</span> <span class="n">webrtc</span><span class="o">.</span><span class="n">SessionDescription</span><span class="p">{}</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">json</span><span class="o">.</span><span class="n">Unmarshal</span><span class="p">([]</span><span class="kt">byte</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">Data</span><span class="p">),</span> <span class="o">&amp;</span><span class="n">answer</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
        <span class="k">return</span>
    <span class="p">}</span>
    <span class="k">for</span> <span class="n">pc</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">h</span><span class="o">.</span><span class="n">clients</span> <span class="p">{</span>
        <span class="k">if</span> <span class="n">pc</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">userName</span> <span class="o">==</span> <span class="n">message</span><span class="o">.</span><span class="n">UserName</span> <span class="p">{</span>
            <span class="k">if</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">pc</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">SetRemoteDescription</span><span class="p">(</span><span class="n">answer</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
                <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
                <span class="k">return</span>
            <span class="p">}</span>
        <span class="p">}</span>
    <span class="p">}</span>

<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}
func signalPeerConnections(h SSEHub) {
defer func() {
dispatchKeyFrame(h)
}()
for syncAttempt := 0; ; syncAttempt++ {
if syncAttempt == 25 {
// Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
go func() {
time.Sleep(time.Second 3)
signalPeerConnections(h)
}()
return
}
// For ignoring errors like below, execute attemptSync multiple times.
// InvalidModificationError: invalid proposed signaling state transition: have-local-offer->SetLocal(offer)->have-local-offer
if !attemptSync(h) {
break
}
}
}
// Share received tracks to all connected peers.
func attemptSync(h *SSEHub) bool {
for ps := range h.clients {
if ps.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
delete(h.clients, ps)
// We modified the slice, start from the beginning
return true
}
existingSenders := map[string]bool{}

    <span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">sender</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">GetSenders</span><span class="p">()</span> <span class="p">{</span>
        <span class="k">if</span> <span class="n">sender</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span> <span class="o">==</span> <span class="no">nil</span> <span class="p">{</span>
            <span class="k">continue</span>
        <span class="p">}</span>
        <span class="n">existingSenders</span><span class="p">[</span><span class="n">sender</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span><span class="o">.</span><span class="n">ID</span><span class="p">()]</span> <span class="o">=</span> <span class="no">true</span>

        <span class="k">if</span> <span class="n">_</span><span class="p">,</span> <span class="n">ok</span> <span class="o">:=</span> <span class="n">h</span><span class="o">.</span><span class="n">trackLocals</span><span class="p">[</span><span class="n">sender</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span><span class="o">.</span><span class="n">ID</span><span class="p">()];</span> <span class="o">!</span><span class="n">ok</span> <span class="p">{</span>
            <span class="k">if</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">RemoveTrack</span><span class="p">(</span><span class="n">sender</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
                <span class="k">return</span> <span class="no">true</span>
            <span class="p">}</span>
        <span class="p">}</span>
    <span class="p">}</span>
    <span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">receiver</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">GetReceivers</span><span class="p">()</span> <span class="p">{</span>
        <span class="k">if</span> <span class="n">receiver</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span> <span class="o">==</span> <span class="no">nil</span> <span class="p">{</span>
            <span class="k">continue</span>
        <span class="p">}</span>
        <span class="n">existingSenders</span><span class="p">[</span><span class="n">receiver</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span><span class="o">.</span><span class="n">ID</span><span class="p">()]</span> <span class="o">=</span> <span class="no">true</span>
    <span class="p">}</span>
    <span class="k">for</span> <span class="n">trackID</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">h</span><span class="o">.</span><span class="n">trackLocals</span> <span class="p">{</span>
        <span class="k">if</span> <span class="n">_</span><span class="p">,</span> <span class="n">ok</span> <span class="o">:=</span> <span class="n">existingSenders</span><span class="p">[</span><span class="n">trackID</span><span class="p">];</span> <span class="o">!</span><span class="n">ok</span> <span class="p">{</span>
            <span class="k">if</span> <span class="n">_</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">AddTrack</span><span class="p">(</span><span class="n">h</span><span class="o">.</span><span class="n">trackLocals</span><span class="p">[</span><span class="n">trackID</span><span class="p">]);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
                <span class="k">return</span> <span class="no">true</span>
            <span class="p">}</span>
        <span class="p">}</span>
    <span class="p">}</span>

    <span class="n">offer</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">CreateOffer</span><span class="p">(</span><span class="no">nil</span><span class="p">)</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="k">return</span> <span class="no">true</span>
    <span class="p">}</span>
    <span class="n">messageJSON</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">NewOfferMessageJSON</span><span class="p">(</span><span class="n">ps</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">userName</span><span class="p">,</span> <span class="n">offer</span><span class="p">)</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="k">return</span> <span class="no">true</span>
    <span class="p">}</span>

    <span class="k">if</span> <span class="n">err</span> <span class="o">=</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">SetLocalDescription</span><span class="p">(</span><span class="n">offer</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="k">return</span> <span class="no">true</span>
    <span class="p">}</span>
    <span class="n">flusher</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">ps</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">w</span><span class="o">.</span><span class="p">(</span><span class="n">http</span><span class="o">.</span><span class="n">Flusher</span><span class="p">)</span>

    <span class="n">fmt</span><span class="o">.</span><span class="n">Fprintf</span><span class="p">(</span><span class="n">ps</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">w</span><span class="p">,</span> <span class="s">"data: %s</span><span class="se">\n\n</span><span class="s">"</span><span class="p">,</span> <span class="n">messageJSON</span><span class="p">)</span>
    <span class="n">flusher</span><span class="o">.</span><span class="n">Flush</span><span class="p">()</span>
<span class="p">}</span>
<span class="k">return</span> <span class="no">false</span>
Enter fullscreen mode Exit fullscreen mode

}
func dispatchKeyFrame(h *SSEHub) {
for ps := range h.clients {
for _, receiver := range ps.peerConnection.GetReceivers() {
if receiver.Track() == nil {
continue
}

        <span class="n">_</span> <span class="o">=</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">WriteRTCP</span><span class="p">([]</span><span class="n">rtcp</span><span class="o">.</span><span class="n">Packet</span><span class="p">{</span>
            <span class="o">&amp;</span><span class="n">rtcp</span><span class="o">.</span><span class="n">PictureLossIndication</span><span class="p">{</span>
                <span class="n">MediaSSRC</span><span class="o">:</span> <span class="kt">uint32</span><span class="p">(</span><span class="n">receiver</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span><span class="o">.</span><span class="n">SSRC</span><span class="p">()),</span>
            <span class="p">},</span>
        <span class="p">})</span>
    <span class="p">}</span>
<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}

Enter fullscreen mode Exit fullscreen mode




Channels

I create channels in SSEClient and SSEHub.
I tried adding some channels into SSEClient to send messages from SSEHub first.

But if I did that, the application hang when I sent text messages after connecting WebRTC.
Because I think the cause is a circular reference, I remove these channels and send messages from SSEHub.

Resources

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read more →

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more

👋 Kindness is contagious

Immerse yourself in a wealth of knowledge with this piece, supported by the inclusive DEV Community—every developer, no matter where they are in their journey, is invited to contribute to our collective wisdom.

A simple “thank you” goes a long way—express your gratitude below in the comments!

Gathering insights enriches our journey on DEV and fortifies our community ties. Did you find this article valuable? Taking a moment to thank the author can have a significant impact.

Okay