DEV Community

Cover image for Building Real-Time Communication: Harnessing WebRTC with FastAPI Part 3- Wrapping Every thing up
Wassaf Shahzasd
Wassaf Shahzasd

Posted on

Building Real-Time Communication: Harnessing WebRTC with FastAPI Part 3- Wrapping Every thing up

Welcome to the last part of my series where we will be building a google-meet-clone using FastAPI and WebRTC. If you haven't read the previous article you can read it here.

πŸš€ On the previous tutorial.

Our project directory looked something like this.
πŸ“
|-- πŸ—‹ main.py
|-- πŸ—‹ requirements.py
|-- πŸ“ static
|-- |-- home.css
|-- |-- home.js
|-- πŸ“ templates
|-- |-- main.html
|-- |-- home.html
|-- πŸ“ .env

Create a new file on the same level as main.py called manager.py. Here we will create a WebSocket manager which will allow users to join, leave and message in a chat room.

from fastapi.websockets import WebSocket

class SignalManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    @property
    def is_empty(self):
        return len(self.active_connections) == 0

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_json(message)

    async def broadcast(self, message: dict, websocket: WebSocket):
        for connection in self.active_connections:
            if connection != websocket:
                await connection.send_json(message)
Enter fullscreen mode Exit fullscreen mode

The Signal Manager class is responsible for accepting web socket connections and sending mass and/or personal messages. The active_connections property is responsible for managing all active connections. The broadcast function sends a message to all other web sockets in the room except the web socket which sent the message.

πŸ‘¨β€πŸ’Ό Chat Room Manager Class

Now create a RoomManager class since there can be multiple rooms with multiple connections. just below the SignalManager class.

class MeetingManager:
    def __init__(self) -> None:
        self.rooms: dict[str, SignalManager] = {} 

    async def join(self, id: str, websocket: WebSocket):
        if id in self.rooms:
            await self.rooms[id].connect(websocket)
        else:
            self.rooms[id] = SignalManager()
            await self.rooms[id].connect(websocket)
        await self.rooms[id].broadcast({"type":"USER_JOIN"}, websocket)

    def leave(self, id: str, websocket: WebSocket):
        self.rooms[id].disconnect(websocket)
        if self.rooms[id].is_empty:
            del self.rooms[id]

Enter fullscreen mode Exit fullscreen mode

This class is responsible to managing the chat rooms and for allowing users to join and disconnect a room.

One point of interest is scalable WebSocket . To implement scalable WebSocket's, We need to implement WebSocket with redis.

πŸ”Œ Plugging every thing together.

Open up main.py and import the Meeting Manager class from manager.py and create a manager instance.

from manager import MeetingManager

manager = MeetingManager()
Enter fullscreen mode Exit fullscreen mode

Lets create a function which allows a WebSocket to join a room.

@app.websocket("/ws/{client_id}")
async def connet_websocket(websocket: WebSocket, client_id: str):
    await meeting_manager.join(client_id, websocket)
    try:
        while True:
            data = await websocket.receive_json()
            await meeting_manager.rooms[client_id].broadcast(data, websocket)
    except WebSocketDisconnect:
        meeting_manager.leave(client_id, websocket)
Enter fullscreen mode Exit fullscreen mode

The above code accepts a WebSocket connection and then awaits until the WebSocket disconnects. Until then it waits to receive messages and broadcasts it to the whole room.
Like I explained previously, we will be using WebSocket's as a signaling server to send both SDP offers and ICE candidates.

Lets add the URL for the video page.

@app.get("/room/{roomName}")
def get_video(request: Request, roomName:str):
    return templates.TemplateResponse(request=request, name="index.html")
Enter fullscreen mode Exit fullscreen mode

This is the URL which the home.js redirects to after your enter the meeting room name. Don't worry about the index.html, we will create that later.

πŸ“Business on the front, Party on the back

Now lets create the index.html in the templates folder. This html file is responsible for rendering the video tags.

{% extends "main.html" %}
{% block script  %}
    <link rel='stylesheet' type='text/css' media='screen' href="{{ url_for('static', path='/index.css') }}">
    <script src="{{ url_for('static', path='/index.js') }}""></script>
{% endblock %}
{% block content %}
    <div id="videos">
        <video class="video-player" id="user-1" autoplay></video>
        <video class="video-player" id="user-2" autoplay></video>
    </div>
{% endblock %}
Enter fullscreen mode Exit fullscreen mode

Right now we are only working with the assumption that only two users will join the call, later on you can add on to this and create it for multiple video calls.

πŸ† Moving on to the Hard stuff.

Create a index.js file inside the static folder. Our project directory becomes the following.
πŸ“
|-- πŸ—‹ main.py
|-- πŸ—‹ requirements.py
|-- πŸ“ static
|-- |-- home.css
|-- |-- home.js
|-- |-- index.js
|-- πŸ“ templates
|-- |-- main.html
|-- |-- home.html
|-- |-- index.html
|-- πŸ“ .env

Initializing some variables which will be used later on.

let localStream;
let remoteStream;
let peerConnection;
let socket;
let makingOffer = false;
let polite = false
Enter fullscreen mode Exit fullscreen mode

Now lets create a connect function which connects the WebSocket to the chat room. So that we can use our FastAPI Server as the singling server.

let connect = async (callback) => {
  let roomName = window.location.pathname.split("/")[1];
  socket = new WebSocket(`ws://localhost:8000/ws/${roomName}"`);
  socket.onopen = async (_) =>  {
    await callback()
  };

  socket.onmessage = handleMessage;
};
Enter fullscreen mode Exit fullscreen mode

This function takes in a callback which is called once the socket is successfully connected. We also assign a helper function called handleMessage to the onmessage event. Don't worry we will define the function later on.

Now lets create a init function which is first run when the index.js file is loaded.

let init = async () => {
  localStream = await navigator.mediaDevices.getUserMedia({
    video: true,
    audio: false,

.
  });
  document.getElementById("user-1").srcObject = localStream;
  await connect(createStreams);
}
Enter fullscreen mode Exit fullscreen mode

This function does two things.

  • It gets the Users Media object and assigns it to the video tag, defined in the index.html.
  • It also calls the connect function, with the createStreams function as a callback.

πŸ“Ό Creating Video Streams

Lets create a function which creates peerConnection object and remote media streams.The full function looks like this


let createStreams = async () => {
  peerConnection = new RTCPeerConnection(config);
  remoteStream = new MediaStream();

  localStream.getTracks().forEach((track) => {
    peerConnection.addTrack(track, localStream);
  });

  // This function is called each time a peer connects.
  peerConnection.ontrack = (event) => {
    console.log("adding track");
    event.streams[0]
      .getTracks()
      .forEach((track) => remoteStream.addTrack(track));
  };

  peerConnection.onicecandidate = async (event) => {
    if (event.candidate) {
      socket.send(
        JSON.stringify({ type: "candidate", candidate: event.candidate })
      );
    }
  };
  peerConnection.onnegotiationneeded = async () => {
    try {
      makingOffer = true;
      await peerConnection.setLocalDescription();
      // signaler.send({ description: pc.localDescription });
      socket.send(
        JSON.stringify({
          type: "OFFER",
          message: peerConnection.localDescription,
        })
      );
    } catch (err) {
      console.error(err);
    } finally {
      makingOffer = false;
    }
  };

  document.getElementById("user-2").srcObject = remoteStream;
};
Enter fullscreen mode Exit fullscreen mode

This function first creates a peerConnection Object from the provided config and initializes a empty remoteStream.
The Config object specifies various options but the one we care about are the stun servers which allow us to establish a peer-to-peer connection.
Our Config Object looks something like this

const config = {
  iceServers: [
    {
      urls: [
        "stun:stun1.l.google.com:19302",
        "stun:stun1.l.google.com:19302",
        "stun:stun2.l.google.com:19302",
      ],
    },
  ],
};

Enter fullscreen mode Exit fullscreen mode

The next thing we do is get out local tracks and add them to the peerConnection so its available so that they are available to other.(The full function is available just above, this part is just to make it easier to follow)

  localStream.getTracks().forEach((track) => {
    peerConnection.addTrack(track, localStream);
  });

Enter fullscreen mode Exit fullscreen mode

I the next part of the function, we attach a callback to the ontrack event which is triggered every time a remote peer connects.

peerConnection.ontrack = (event) => {
    console.log("adding track");
    event.streams[0]
      .getTracks()
      .forEach((track) => remoteStream.addTrack(track));
  };
Enter fullscreen mode Exit fullscreen mode

Now we create an event handler to trickle ice candidates to the connected peers and we use the socket object to send the ice candidates.

  peerConnection.onicecandidate = async (event) => {
    if (event.candidate) {
      socket.send(
        JSON.stringify({ type: "candidate", candidate: event.candidate })
      );
    }
  };
Enter fullscreen mode Exit fullscreen mode

The next part peerConnection.onnegotiationneeded will be discussed next, lets just skip it for now. Finally we attack the remote stream the user-2 video object.

🀝 The Perfect Negotiation Pattern

For exchanging the SDP offers and answers we will be implementing the perfect negotiation pattern as described in the following article. The main gist of the article is that we will have two types of peers, A polite peer which in case of Offer collision ignores its offers and accepts the other senders offer. An impolite peer which in case keeps sending offers and in case of collision rudely ignores the others offer.
The way you decide who is polite and not depends on you, In this tutorial if a user is already present in the call he will always be polite otherwise impolite.

Lets go back to the createStreams functions and take a look on the onnegotiationneeded event handler. This event in triggered in the following casses

  • When your call addTrack or removeTrack
  • When track track constraints are changed
  • When you call setLocalDescription()
  • When you explicitly requests renegotiation by calling createOffer() or createAnswer()
 peerConnection.onnegotiationneeded = async () => {
    try {
      makingOffer = true;
      await peerConnection.setLocalDescription();
      socket.send(
        JSON.stringify({
          type: "OFFER",
          message: peerConnection.localDescription,
        })
      );
    } catch (err) {
      console.error(err);
    } finally {
      makingOffer = false;
    }
  };
Enter fullscreen mode Exit fullscreen mode

First we set our local description and then send a message of type OFFER to other peers.

πŸͺ Handling the messages

If we go back to the connect function, we see that we attach a event handler called handleMessage to the onmessage event. This function will handle ice candidates and the SDP Answer and Offer we receive from out custom singling server.

let handleMessage = async ({ data }) => {
  data = JSON.parse(data);
  if (data["type"] == "USER_JOIN") {
    polite = true
    createStreams();
  }
  if (data["type"] === "OFFER") {
    console.log("received offer")
    handlePerfectNegotiation(data)
  }
  if (data["type"] === "ANSWER") {
    console.log("received answer")
    handlePerfectNegotiation(data)
  }
  if(data["type"] === "candidate") {
    handleIceCandidate(data)
  }
};
Enter fullscreen mode Exit fullscreen mode

This is a relatively simple function, In case of a new user joining we set the polite to true which is false by default.
In case of the ANSWER or OFFER, we call the handlePerfectNegotiation function, This is the last piece of the puzzle. So lets go over it

let handlePerfectNegotiation = async ({ message }) => {
  try {
    if (message) {
      const offerCollision =
        message.type === "offer" &&
        (makingOffer || peerConnection.signalingState !== "stable");

      ignoreOffer = !polite && offerCollision;
      if (ignoreOffer) {
        return;
      }

      await peerConnection.setRemoteDescription(message);
      if (message.type === "offer") {
        await peerConnection.setLocalDescription();
        socket.send(JSON.stringify({
          type: "ANSWER",
          message: peerConnection.localDescription,
        }));
      }
    }
  } catch (err) {
    console.error(err);
  }
};
Enter fullscreen mode Exit fullscreen mode

This function, when called from the side of the impolite peer ignores collision and keeps sending SDP offers and incase of the polite peer it acknowledges the collision, retracts its offers and accepts the offer of the impolite peer.
Now all we need is to handle the ice candidates and we are good to go.

let handleIceCandidate = async ({ candidate }) => {
  if (peerConnection && peerConnection.remoteDescription) {
    peerConnection.addIceCandidate(candidate);
  }
};
Enter fullscreen mode Exit fullscreen mode

Tying everything together with

document.addEventListener(
  "DOMContentLoaded",
  async function () {
    await init();
  },
  false
);
Enter fullscreen mode Exit fullscreen mode

and Boom we are done πŸ₯³πŸ₯³πŸ₯³πŸ₯³πŸ₯³πŸ₯³πŸ₯³

Thank you for following along and please share your feedback if you have any.

Follow Best Practices

In this article we have not followed some best practices, In a production environment. The websocket URL would not be hardcoded and on disconnect we would remove the streams.

Useful Links.

Top comments (0)