DEV Community

Masui Masanori
Masui Masanori

Posted on

[Go][Pion/WebRTC] Closing chan and adding DataChannel

Intro

I will try updating sending data through channels of goroutine and adding Data channels of WebRTC.

Examples

[Go] Closing chan

Last time, I wrote handling "webrtc.PeerConnection" events by channels like below.

sseClient.go

...
type SSEClient struct {
    candidateFound        chan *webrtc.ICECandidate
    changeConnectionState chan webrtc.PeerConnectionState
    addTrack              chan *webrtc.TrackRemote
    userName              string
    w                     http.ResponseWriter
}

func newSSEClient(userName string, w http.ResponseWriter) *SSEClient {
    return &SSEClient{
        candidateFound:        make(chan *webrtc.ICECandidate),
        changeConnectionState: make(chan webrtc.PeerConnectionState),
        addTrack:              make(chan *webrtc.TrackRemote),
        userName:              userName,
        w:                     w,
    }
}

func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    flusher, _ := w.(http.Flusher)
    defer func() {
        hub.unregister <- ps
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
    }()
    for {
        select {
        case candidate := <-newClient.candidateFound:
            jsonValue, err := NewCandidateMessageJSON(newClient.userName, candidate)
            if err != nil {
                return
            }
            fmt.Fprintf(w, "data: %s\n\n", jsonValue)
            flusher.Flush()
        case track := <-newClient.addTrack:
            hub.addTrack <- track
        case connectionState := <-newClient.changeConnectionState:
            switch connectionState {
            case webrtc.PeerConnectionStateFailed:
                return
            case webrtc.PeerConnectionStateClosed:
                return
            }
        case <-r.Context().Done():
            return
        }
    }
}
...
Enter fullscreen mode Exit fullscreen mode

peerConnectionState.go

...
type PeerConnectionState struct {
    peerConnection        *webrtc.PeerConnection
    client                *SSEClient
}
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
        if i == nil {
            return
        }
        client.candidateFound <- i
    })
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // panic after closing all SSEClient channels.
        client.changeConnectionState <- p
    })
    peerConnection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
        client.addTrack <- t
    })

    return &PeerConnectionState{
        peerConnection: peerConnection,
        client:         client,
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

But after closing all SSEClient channels, panic would be occurred in "peerConnection.OnConnectionStateChange".
Because it tried sending "closed" state by the closed channel.

So I add another channel and check if the channels are closed before sending values.

sseClient.go

...
type SSEClient struct {
    candidateFound        chan *webrtc.ICECandidate
    changeConnectionState chan webrtc.PeerConnectionState
    addTrack              chan *webrtc.TrackRemote
    heartbeat             chan int
    userName              string
    w                     http.ResponseWriter
}
...
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    defer func() {
        hub.unregister <- ps
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
        close(newClient.heartbeat)
    }()
...
}
Enter fullscreen mode Exit fullscreen mode

peerConnectionState.go

...
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    // must be a "buffered channel" to avoid blocking on setting the first value.
    heartbeat := make(chan int, 1)
    candidateFound := make(chan *webrtc.ICECandidate)
    changeConnectionState := make(chan webrtc.PeerConnectionState)
    addTrack := make(chan *webrtc.TrackRemote)

    // set the first value to avoid blocking on reading the channel value on first time.
    heartbeat <- 1
...
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // until the channel being closing, "ok" value keeps being "true".
        _, ok := <-heartbeat
        if ok {
            changeConnectionState <- p
            // write value for next reading.
            heartbeat <- 1
        }
    })
...
Enter fullscreen mode Exit fullscreen mode

But I still couldn't solve the problem after the change.

Although I close the all channels, the "ok" value didn't changed.

peerConnectionState.go

...
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // "ok" value is "true".
        _, ok := <-heartbeat
        if ok {
            // but the channels are closed.
            changeConnectionState <- p
            heartbeat <- 1
        }
    })
...
Enter fullscreen mode Exit fullscreen mode

Thus, I read all values of "heartbeat" before closing it.

sseClient.go

...
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    defer func() {
        hub.unregister <- ps
        // read all values before closing.
        for i := 0; i < len(newClient.heartbeat); i++ {
            <-newClient.heartbeat
        }
        close(newClient.heartbeat)
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
    }()
...
}
Enter fullscreen mode Exit fullscreen mode

Finally, I moved the channels that handles WebRTC events to PeerConnectionState.
Because the events belongs to webrtc.PeerConnection.

[WebRTC] Adding WebRTC DataChannel

From now, I will try sending data by WebRTC DataChannel.

I can send texts(string) and binaries(ex.UInt8Array) by WebRTC DataChannel.
I can negotiate the RTCDataChannel's connection by the application.

To do this, I have to set "negotiated" of RTCDataChannelInit true when I create RTCDataChannel instances from RTCPeerConnection.

const dc = peerConnection.createDataChannel(label, {
    id,
    negotiated: true,
    ordered: false
});
// handling received data as texts.
const decoder = new TextDecoder("utf-8");
dc.onmessage = (ev) => {        
    const message = decoder.decode(new Uint8Array(ev.data));
    // TODO: handle string value.
};
Enter fullscreen mode Exit fullscreen mode

I also must create them before the first offer/answer exchange is complete.

Now I can separate the RTCDataChannels by their IDs.

I can create webrtc.DataChannel in the same way in Go.

func newWebRTCDataChannel(label string, id uint16, peerConnection *webrtc.PeerConnection) (*webrtc.DataChannel, error) {
    negotiated := true
    ordered := false
    dc, err := peerConnection.CreateDataChannel(label, &webrtc.DataChannelInit{
        ID:         &id,
        Negotiated: &negotiated,
        Ordered:    &ordered,
    })
    if err != nil {
        return nil, err
    }
    dc.OnMessage(func(msg webrtc.DataChannelMessage) {
        message := string(msg.Data)
        // TODO: handle string value.
    })
    return dc, nil
}
Enter fullscreen mode Exit fullscreen mode

In this time, I just send data from one client to other clients, and I don't change on the server side.

dc.DataChannels[id].Send(msg.Data)
Enter fullscreen mode Exit fullscreen mode

Another important thing is the DataChannels must be closed before closing the PeerConnection.

Top comments (0)