DEV Community

sangarshanan
sangarshanan

Posted on

Realtime channels with FastAPI + Broadcaster

Websockets are awesome, just learnt that before websockets people were polling (eww). Well actually polling helps when you know the exact time interval of your refresh But we are gonna be the cool realtime streamers.

Websockets allow full-duplex, bidirectional connections between a client and a server over the web with a single TCP connection (A protocol for sending and receiving packets of data across IPs in a reliable way, simply by acknowledgement)

We are gonna be using Fastapi and Starlette to define Websocket endpoint and Broadcaster to publish messages to this websocket. Others can just subscribe to the websocket endpoint to receive the published messages in real time

So this is what my API looks like, there is just one endpoint to publish messages to a channel (lebowski). Now the goal is to have a websocket that hoomans/robots can subscribe to follow the updates to lebowski in real time.



import json
from fastapi import FastAPI
from pydantic import BaseModel

class Publish(BaseModel):
    channel: str = "lebowski"
    message: str

app = FastAPI()

@app.post("/push")
async def push_message(publish: Publish):
    return Publish(channel =publish.channel, 
    message =json.dumps(publish.message))


Enter fullscreen mode Exit fullscreen mode

We are gonna use broadcaster and Starlette to define a websocket endpoint

With starlette we can use WebSocketEndpoint and use it to create a WebsocketRoute, WebSocketEndpoint has three overridable methods for handling specific ASGI websocket message types: on_connect, on_receive, on_disconnect.



class Echo(WebSocketEndpoint):
    encoding = "text"
    async def on_connect(self, websocket):
        await websocket.accept()
    async def on_receive(self, websocket, data):
        await websocket.send_text(f"Message text was: {data}")
    async def on_disconnect(self, websocket, close_code):
        pass

routes = [WebSocketRoute("/ws", Echo)]


Enter fullscreen mode Exit fullscreen mode

This is a simple echo websocket that send back what it receives. But I am not worrying about all this I am using broadcaster. With broadcaster we can define a simple broadcasting API onto a number of different backend services like Redis PUB/SUB, Kafka, Postgres LISTEN/NOTIFY or an in-memory one.



from starlette.concurrency import run_until_first_complete
from starlette.routing import WebSocketRoute

from broadcaster import Broadcast

broadcast = Broadcast("postgresql://postgres@localhost/test")

async def events_ws(websocket):
    await websocket.accept()
    await run_until_first_complete(
        (events_ws_receiver, {"websocket": websocket}),
        (events_ws_sender, {"websocket": websocket}),
    )


async def events_ws_receiver(websocket):
    async for message in websocket.iter_text():
        await broadcast.publish(channel="events", message=message)


async def events_ws_sender(websocket):
    async with broadcast.subscribe(channel="events") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)
routes = [
    WebSocketRoute("/events", events_ws, name="events_ws"),
]


Enter fullscreen mode Exit fullscreen mode

We have defined two async functions to receive and publish messages and passed it to a starlette WebSocketRoute. Used Postgres as a backend for the broadcaster.

Now that we have defined a websocket route with broadcaster, lets just add it FastAPI and seal the deal



from fastapi import FastAPI

app = FastAPI(
    routes=routes, 
    on_startup=[broadcast.connect],
    on_shutdown=[broadcast.disconnect],
)

@app.post("/push")
async def push_message(publish: Publish):
    await broadcast.publish(publish.channel, 
    json.dumps(publish.message))
    return Publish(channel =publish.channel, 
    message =json.dumps(publish.message))


Enter fullscreen mode Exit fullscreen mode

I have added the websocket route to the FastAPI app and am publishing to the channel on every call to the API.

Now to test things I am writing a dummy subscriber to listen to the broadcast (With the API is running on port 1234)



import json
import asyncio
import websockets
from websockets.exceptions import ConnectionClosed

async def connect(uri):
    async with websockets.connect(uri) as websocket:
        print("Connected..")
        while True:
            message = await websocket.recv()
            action = json.loads(message)
            print(action)

async def hello():
    uri = "ws://localhost:1234/events"
    try:
        await connect(uri)
    except ConnectionClosed:
        await asyncio.sleep(3)
        print("Not able to connect.. Retying in 3 seconds")
        await connect(uri)

asyncio.get_event_loop().run_until_complete(hello())


Enter fullscreen mode Exit fullscreen mode

That's it :) In action below 👇

alt text

Ok lastly, FastAPI is awesome and I have been using a lot lately. It's powerful, easy to learn and the async community powering the whole ecosystem makes me wanna cry happy tears 😭

:wq

Top comments (5)

Collapse
 
bendog profile image
Benjamin.dog

I have to admit I've read through this a few times, and I've used FastAPI a bunch, and I have no idea how this is meant to work, it seems like you have four separate python files, maybe five? Which aren't importing from each other?
This might be why people are asking for a repo to try and make sense of how the code fits together?

Collapse
 
praveen_26 profile image
Praveen Patidar

Hi Guys,
Is this broadcast will not notify on websocket when changes are made in Postgres when another API trigerred which make changes in Postgres DB and but Broadcast will not publish in that API.

Thanks In advance

Collapse
 
perigk profile image
Periklis Gkolias

Hi there @sangarshan,

Very nice introduction, thank you. Do you happen to have the code in a GitHub repo or so?

Collapse
 
sangarshanan profile image
sangarshanan

Thank you : ) this was arbitrary series of experiments so I did not push the code anywhere but rather used the gists as the part of the writeup

Collapse
 
__b3895ff0361 profile image
Артем Астахов

for those that understood nothing: just check the official broadcast docs. most of author's writings are from there, only it really works

pypi.org/project/broadcaster/