DEV Community

Masui Masanori
Masui Masanori

Posted on

3 2

[Go][Pion/WebRTC] Closing chan and adding DataChannel

Intro

I will try updating sending data through channels of goroutine and adding Data channels of WebRTC.

Examples

[Go] Closing chan

Last time, I wrote handling "webrtc.PeerConnection" events by channels like below.

sseClient.go

...
type SSEClient struct {
    candidateFound        chan *webrtc.ICECandidate
    changeConnectionState chan webrtc.PeerConnectionState
    addTrack              chan *webrtc.TrackRemote
    userName              string
    w                     http.ResponseWriter
}

func newSSEClient(userName string, w http.ResponseWriter) *SSEClient {
    return &SSEClient{
        candidateFound:        make(chan *webrtc.ICECandidate),
        changeConnectionState: make(chan webrtc.PeerConnectionState),
        addTrack:              make(chan *webrtc.TrackRemote),
        userName:              userName,
        w:                     w,
    }
}

func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    flusher, _ := w.(http.Flusher)
    defer func() {
        hub.unregister <- ps
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
    }()
    for {
        select {
        case candidate := <-newClient.candidateFound:
            jsonValue, err := NewCandidateMessageJSON(newClient.userName, candidate)
            if err != nil {
                return
            }
            fmt.Fprintf(w, "data: %s\n\n", jsonValue)
            flusher.Flush()
        case track := <-newClient.addTrack:
            hub.addTrack <- track
        case connectionState := <-newClient.changeConnectionState:
            switch connectionState {
            case webrtc.PeerConnectionStateFailed:
                return
            case webrtc.PeerConnectionStateClosed:
                return
            }
        case <-r.Context().Done():
            return
        }
    }
}
...
Enter fullscreen mode Exit fullscreen mode

peerConnectionState.go

...
type PeerConnectionState struct {
    peerConnection        *webrtc.PeerConnection
    client                *SSEClient
}
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
        if i == nil {
            return
        }
        client.candidateFound <- i
    })
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // panic after closing all SSEClient channels.
        client.changeConnectionState <- p
    })
    peerConnection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
        client.addTrack <- t
    })

    return &PeerConnectionState{
        peerConnection: peerConnection,
        client:         client,
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

But after closing all SSEClient channels, panic would be occurred in "peerConnection.OnConnectionStateChange".
Because it tried sending "closed" state by the closed channel.

So I add another channel and check if the channels are closed before sending values.

sseClient.go

...
type SSEClient struct {
    candidateFound        chan *webrtc.ICECandidate
    changeConnectionState chan webrtc.PeerConnectionState
    addTrack              chan *webrtc.TrackRemote
    heartbeat             chan int
    userName              string
    w                     http.ResponseWriter
}
...
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    defer func() {
        hub.unregister <- ps
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
        close(newClient.heartbeat)
    }()
...
}
Enter fullscreen mode Exit fullscreen mode

peerConnectionState.go

...
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    // must be a "buffered channel" to avoid blocking on setting the first value.
    heartbeat := make(chan int, 1)
    candidateFound := make(chan *webrtc.ICECandidate)
    changeConnectionState := make(chan webrtc.PeerConnectionState)
    addTrack := make(chan *webrtc.TrackRemote)

    // set the first value to avoid blocking on reading the channel value on first time.
    heartbeat <- 1
...
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // until the channel being closing, "ok" value keeps being "true".
        _, ok := <-heartbeat
        if ok {
            changeConnectionState <- p
            // write value for next reading.
            heartbeat <- 1
        }
    })
...
Enter fullscreen mode Exit fullscreen mode

But I still couldn't solve the problem after the change.

Although I close the all channels, the "ok" value didn't changed.

peerConnectionState.go

...
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // "ok" value is "true".
        _, ok := <-heartbeat
        if ok {
            // but the channels are closed.
            changeConnectionState <- p
            heartbeat <- 1
        }
    })
...
Enter fullscreen mode Exit fullscreen mode

Thus, I read all values of "heartbeat" before closing it.

sseClient.go

...
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    defer func() {
        hub.unregister <- ps
        // read all values before closing.
        for i := 0; i < len(newClient.heartbeat); i++ {
            <-newClient.heartbeat
        }
        close(newClient.heartbeat)
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
    }()
...
}
Enter fullscreen mode Exit fullscreen mode

Finally, I moved the channels that handles WebRTC events to PeerConnectionState.
Because the events belongs to webrtc.PeerConnection.

[WebRTC] Adding WebRTC DataChannel

From now, I will try sending data by WebRTC DataChannel.

I can send texts(string) and binaries(ex.UInt8Array) by WebRTC DataChannel.
I can negotiate the RTCDataChannel's connection by the application.

To do this, I have to set "negotiated" of RTCDataChannelInit true when I create RTCDataChannel instances from RTCPeerConnection.

const dc = peerConnection.createDataChannel(label, {
    id,
    negotiated: true,
    ordered: false
});
// handling received data as texts.
const decoder = new TextDecoder("utf-8");
dc.onmessage = (ev) => {        
    const message = decoder.decode(new Uint8Array(ev.data));
    // TODO: handle string value.
};
Enter fullscreen mode Exit fullscreen mode

I also must create them before the first offer/answer exchange is complete.

Now I can separate the RTCDataChannels by their IDs.

I can create webrtc.DataChannel in the same way in Go.

func newWebRTCDataChannel(label string, id uint16, peerConnection *webrtc.PeerConnection) (*webrtc.DataChannel, error) {
    negotiated := true
    ordered := false
    dc, err := peerConnection.CreateDataChannel(label, &webrtc.DataChannelInit{
        ID:         &id,
        Negotiated: &negotiated,
        Ordered:    &ordered,
    })
    if err != nil {
        return nil, err
    }
    dc.OnMessage(func(msg webrtc.DataChannelMessage) {
        message := string(msg.Data)
        // TODO: handle string value.
    })
    return dc, nil
}
Enter fullscreen mode Exit fullscreen mode

In this time, I just send data from one client to other clients, and I don't change on the server side.

dc.DataChannels[id].Send(msg.Data)
Enter fullscreen mode Exit fullscreen mode

Another important thing is the DataChannels must be closed before closing the PeerConnection.

Heroku

Build apps, not infrastructure.

Dealing with servers, hardware, and infrastructure can take up your valuable time. Discover the benefits of Heroku, the PaaS of choice for developers since 2007.

Visit Site

Top comments (0)

Heroku

Build apps, not infrastructure.

Dealing with servers, hardware, and infrastructure can take up your valuable time. Discover the benefits of Heroku, the PaaS of choice for developers since 2007.

Visit Site

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay