Forem

Masui Masanori
Masui Masanori

Posted on

3 2

[Go][Pion/WebRTC] Closing chan and adding DataChannel

Intro

I will try updating sending data through channels of goroutine and adding Data channels of WebRTC.

Examples

[Go] Closing chan

Last time, I wrote handling "webrtc.PeerConnection" events by channels like below.

sseClient.go

...
type SSEClient struct {
    candidateFound        chan *webrtc.ICECandidate
    changeConnectionState chan webrtc.PeerConnectionState
    addTrack              chan *webrtc.TrackRemote
    userName              string
    w                     http.ResponseWriter
}

func newSSEClient(userName string, w http.ResponseWriter) *SSEClient {
    return &SSEClient{
        candidateFound:        make(chan *webrtc.ICECandidate),
        changeConnectionState: make(chan webrtc.PeerConnectionState),
        addTrack:              make(chan *webrtc.TrackRemote),
        userName:              userName,
        w:                     w,
    }
}

func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    flusher, _ := w.(http.Flusher)
    defer func() {
        hub.unregister <- ps
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
    }()
    for {
        select {
        case candidate := <-newClient.candidateFound:
            jsonValue, err := NewCandidateMessageJSON(newClient.userName, candidate)
            if err != nil {
                return
            }
            fmt.Fprintf(w, "data: %s\n\n", jsonValue)
            flusher.Flush()
        case track := <-newClient.addTrack:
            hub.addTrack <- track
        case connectionState := <-newClient.changeConnectionState:
            switch connectionState {
            case webrtc.PeerConnectionStateFailed:
                return
            case webrtc.PeerConnectionStateClosed:
                return
            }
        case <-r.Context().Done():
            return
        }
    }
}
...
Enter fullscreen mode Exit fullscreen mode

peerConnectionState.go

...
type PeerConnectionState struct {
    peerConnection        *webrtc.PeerConnection
    client                *SSEClient
}
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
        if i == nil {
            return
        }
        client.candidateFound <- i
    })
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // panic after closing all SSEClient channels.
        client.changeConnectionState <- p
    })
    peerConnection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
        client.addTrack <- t
    })

    return &PeerConnectionState{
        peerConnection: peerConnection,
        client:         client,
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

But after closing all SSEClient channels, panic would be occurred in "peerConnection.OnConnectionStateChange".
Because it tried sending "closed" state by the closed channel.

So I add another channel and check if the channels are closed before sending values.

sseClient.go

...
type SSEClient struct {
    candidateFound        chan *webrtc.ICECandidate
    changeConnectionState chan webrtc.PeerConnectionState
    addTrack              chan *webrtc.TrackRemote
    heartbeat             chan int
    userName              string
    w                     http.ResponseWriter
}
...
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    defer func() {
        hub.unregister <- ps
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
        close(newClient.heartbeat)
    }()
...
}
Enter fullscreen mode Exit fullscreen mode

peerConnectionState.go

...
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    // must be a "buffered channel" to avoid blocking on setting the first value.
    heartbeat := make(chan int, 1)
    candidateFound := make(chan *webrtc.ICECandidate)
    changeConnectionState := make(chan webrtc.PeerConnectionState)
    addTrack := make(chan *webrtc.TrackRemote)

    // set the first value to avoid blocking on reading the channel value on first time.
    heartbeat <- 1
...
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // until the channel being closing, "ok" value keeps being "true".
        _, ok := <-heartbeat
        if ok {
            changeConnectionState <- p
            // write value for next reading.
            heartbeat <- 1
        }
    })
...
Enter fullscreen mode Exit fullscreen mode

But I still couldn't solve the problem after the change.

Although I close the all channels, the "ok" value didn't changed.

peerConnectionState.go

...
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // "ok" value is "true".
        _, ok := <-heartbeat
        if ok {
            // but the channels are closed.
            changeConnectionState <- p
            heartbeat <- 1
        }
    })
...
Enter fullscreen mode Exit fullscreen mode

Thus, I read all values of "heartbeat" before closing it.

sseClient.go

...
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    defer func() {
        hub.unregister <- ps
        // read all values before closing.
        for i := 0; i < len(newClient.heartbeat); i++ {
            <-newClient.heartbeat
        }
        close(newClient.heartbeat)
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
    }()
...
}
Enter fullscreen mode Exit fullscreen mode

Finally, I moved the channels that handles WebRTC events to PeerConnectionState.
Because the events belongs to webrtc.PeerConnection.

[WebRTC] Adding WebRTC DataChannel

From now, I will try sending data by WebRTC DataChannel.

I can send texts(string) and binaries(ex.UInt8Array) by WebRTC DataChannel.
I can negotiate the RTCDataChannel's connection by the application.

To do this, I have to set "negotiated" of RTCDataChannelInit true when I create RTCDataChannel instances from RTCPeerConnection.

const dc = peerConnection.createDataChannel(label, {
    id,
    negotiated: true,
    ordered: false
});
// handling received data as texts.
const decoder = new TextDecoder("utf-8");
dc.onmessage = (ev) => {        
    const message = decoder.decode(new Uint8Array(ev.data));
    // TODO: handle string value.
};
Enter fullscreen mode Exit fullscreen mode

I also must create them before the first offer/answer exchange is complete.

Now I can separate the RTCDataChannels by their IDs.

I can create webrtc.DataChannel in the same way in Go.

func newWebRTCDataChannel(label string, id uint16, peerConnection *webrtc.PeerConnection) (*webrtc.DataChannel, error) {
    negotiated := true
    ordered := false
    dc, err := peerConnection.CreateDataChannel(label, &webrtc.DataChannelInit{
        ID:         &id,
        Negotiated: &negotiated,
        Ordered:    &ordered,
    })
    if err != nil {
        return nil, err
    }
    dc.OnMessage(func(msg webrtc.DataChannelMessage) {
        message := string(msg.Data)
        // TODO: handle string value.
    })
    return dc, nil
}
Enter fullscreen mode Exit fullscreen mode

In this time, I just send data from one client to other clients, and I don't change on the server side.

dc.DataChannels[id].Send(msg.Data)
Enter fullscreen mode Exit fullscreen mode

Another important thing is the DataChannels must be closed before closing the PeerConnection.

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read more →

Top comments (0)

👋 Kindness is contagious

Discover a treasure trove of wisdom within this insightful piece, highly respected in the nurturing DEV Community enviroment. Developers, whether novice or expert, are encouraged to participate and add to our shared knowledge basin.

A simple "thank you" can illuminate someone's day. Express your appreciation in the comments section!

On DEV, sharing ideas smoothens our journey and strengthens our community ties. Learn something useful? Offering a quick thanks to the author is deeply appreciated.

Okay