DEV Community

Choon-Siang Lai
Choon-Siang Lai

Posted on • Originally published at kitfucoda.Medium on

Telegram Chatbot’s Evolution: Decoupling Parallel Python’s Shared State for Clarity

After building the intro animation last week,,) I finally took the time to make a video based on the article on jQuery as promised. In the future, articles that are accompanied by a video will all have a play button emoji ▶️ added to the title. While video is still made irregularly, we are going to meet here more regularly. This week, let’s revisit our chatbot project discussed not too long ago. It mostly works fine, but there’s one thing I want to improve on — removing the hardcoded global queue.

How to write an AsyncIO Telegram bot in Python


Illustration from copilot for the topic

The Global Headache: When a Simple Assignment Becomes a Core Problem

Spreading workload to run in parallel is a hard problem, especially when it comes to keeping them synchronized. Previously in our chatbot, we split the bot and webhook to run in 2 different processes, managed by the ProcessPoolExecutor. The whole architecture mimics my experimental bot named BigMeow..) The project is definitely flawed, as it is a culmination of my learning on AsyncIO without a formal introduction to the topic.

GitHub - coolsilon/bigmeow_bot

Not too long ago I was going through a job interview. As part of the process, I was asked to work on a take-home assignment. It was unimaginative as ever, though I took the opportunity to implement an experimental background batch processing module. The whole design, again, is similar to the chatbot, so it works as expected.


Photo by Pascal van de Vendel on Unsplash

When I finally started to write tests to validate my work, that was when my nightmare began.

Though I hacked through the tests, there were multiple minor issues left unresolved after I submitted my work. The hardcoded queue in the settings module severely limits the flexibility and hacks like patches needed to be applied in tests. Despite rejecting the offer, the frustration lingered on my mind for quite a while. How can I do better to avoid spending endless hours fixing tests? Most of my code is rather testable, but there must be something I am missing. During the weekend, I finally took some time going through the code to prepare a new article, and a revelation came and it clicked.

What if I stop hard-coding the queue and pass it into each process explicitly, like the exit event?

With some help from my LLM chatbot, I managed to remove the hardcoded queue. Then I proceeded to pass it into the child processes explicitly, for both the background processing module, and the FastAPI web application. We would talk more about it, including test preparation, in the coming week. For now, let’s shift the focus back to our chatbot.

Exposing the Entanglement: Diving into the Original Architecture


Photo by Josh Redd on Unsplash

Let’s briefly review the current setup, as I try to recall why it was written this way. By understanding the original design decision, we can then start working out a solution, addressing the inflexibility that came with the hard-coded problem.

The reason for having a settings module was basically I needed a place to store some configuration globals. Over time, the list of configurable options piles up. Seeing how convenient it is, I eventually set up a queue to allow the web application process to pass incoming messages from the webhook endpoint to the process hosting the Telegram bot. At the time, it was just a simple queue I created from multiprocessing.Queue. Nothing fancy, just a boring process-safe queue that can be safely shared by multiple processes.

On the other hand, in the main process, we created a multiprocessing.Manager, and created an Event object with it. The created Event object is also process-safe, i.e. can be shared by multiple processes. That event object is then passed explicitly to each child process, to be listened to for a cue to exit.

The multiprocessing.Manager here acts almost like a coordinator, making sure processes can communicate effectively with each other whenever needed. Unfortunately, this means messages would be passing through an additional layer before hitting the target. Still the overhead is worth it, when it comes to queues. There is a risk of non-managed queue getting stuck in a shut down, especially when it is not empty. The Queue object created with the multiprocessing.Manager would handle it gracefully.

Delving deeper into multiprocessing.Manager isn’t the point of the article, and I am clearly not capable of doing it as of now. Just understand it is beneficial in orchestrating synchronization between processes, and we are going to utilize it in our Telegram bot.

From Global to Explicit: Refactoring with Centralized State

Knowing the limitation is one thing; next we are going to fix it. As we tackle the hard-coding problem, we will see how it becomes more flexible, while not breaking the inter-process communication.


Photo by U. Storsberg on Unsplash

Since we created a multiprocessing.Manager already to create an Event object, let’s extend it to also create the queue. Move the telegram_queue to our main module

import signal
import multiprocessing
from functools import partial

def shutdown_handler(
    _signum, _frame, exit_event: threading.Event
) -> None:
    exit_event.set()

manager = multiprocessing.Manager()
exit_event = manager.Event()
telegram_queue = manager.Queue() # MOVED telegram_queue here

for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
    signal.signal(
        s, partial(shutdown_handler, exit_event=exit_event)
    )

with ProcessPoolExecutor() as executor:
    executor.submit(process_run, tg_run, exit_event)
    executor.submit(process_run, web_run, exit_event)
Enter fullscreen mode Exit fullscreen mode

They can be passed into the child processes as-is, but I prefer to group them into an object (for reference, BigMeow now has 11 synchronization primitives, namely queues, locks, and events). Let’s create a simple dataclass for this:

from dataclasses import dataclass
from threading import Event
from queue import Queue

@dataclass
class SyncStore:
    exit_event: Event
    telegram_queue: Queue
Enter fullscreen mode Exit fullscreen mode

Notice multiprocessing.Manager creates proxies of the synchronization primitives. They often have the same programming interface as existing objects, hence the type annotation specifies threading.Event and queue.Queue. Next we instantiate an object by

import multiprocessing

manager = multiprocessing.Manager()
sync_store = SyncStore(
    manager.Event(),
    manager.Queue()
)
Enter fullscreen mode Exit fullscreen mode

After that, we adapt the changes in our main module, remember to also pass the new shiny sync_store into our child processes in the executor.submit call

for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
    signal.signal(
        s, partial(shutdown_handler, exit_event=sync_store.exit_event)
    )

with ProcessPoolExecutor() as executor:
    executor.submit(process_run, tg_run, sync_store)
    executor.submit(process_run, web_run, sync_store)
Enter fullscreen mode Exit fullscreen mode

There is more work for the FastAPI web application module, so we are doing it last, For now, we first modify the function signature of run() in our Telegram bot, and update the exit_event.wait() call accordingly

async def run(sync_store: SyncStore) -> None:
    async with application:
        ...

        await asyncio.to_thread(sync_store.exit_event.wait)

        ...
Enter fullscreen mode Exit fullscreen mode

Previously, the coroutine function message_consume implicitly read from settings.telegram_queue. As it is now made available to run(), we can proceed to pass it as an argument. Also there’s no reason why the bot’s application object to be global, let’s move everything into the function:

# remove this line
# application = ApplicationBuilder().token("Your telegram token").build()

async def run(sync_store: SyncStore) -> None:
    application = ApplicationBuilder().token("Your telegram token").build()
    async with application:
        await setup(application) # pass the application object explicitly
        # Starting the bot
        await application.start()
        # Receive messages (pass the object explicitly)
        asyncio.create_task(loop_queue(message_consume, application, sync_store.telegram_queue))
        # Wait for exit signal
        await asyncio.to_thread(sync_store.exit_event.wait)
        # Stopping the bot
        await application.stop()
Enter fullscreen mode Exit fullscreen mode

Both loop_queue and message_consume would need to adapt to the change

from typing import Any
from telegram.ext import Application
from queue import Queue

# Take additional args to be passed to the coroutine function
async def loop_queue(coro_func: Callable[[], Awaitable[None]], *args: Any) -> None:
    while True:
        await coro_func(*args)

async def message_consume(application: Application, queue: Queue) -> None:
    with suppress(queue.Empty):
        await application.update_queue.put(
            Update.de_json(
                # no more consuming from settings.telegram_queue
                await asyncio.to_thread(queue.get, timeout=10),
                application.bot,
            )
        )
Enter fullscreen mode Exit fullscreen mode

Finally, applying the changes to the FastAPI web application module. Like the Telegram bot module, we first revise the run() function signature

async def run(sync_store: SyncStore) -> None:
    ...
Enter fullscreen mode Exit fullscreen mode

Then, we will have to register our store to theapp.state..) Also update the uvicorn config accordingly as app is modified in our function.

async def run(sync_store: SyncStore) -> None:
    app.state.sync_store = sync_store

    server = uvicorn.Server(
        uvicorn.Config(
            app, # we made changes to global app, so specify explicitly
            host="0.0.0.0",
            port=80,
            log_level="info",
        )
    )

    ...
Enter fullscreen mode Exit fullscreen mode

Usually this is done in the lifespan, where we yield an object, that can be later accessed through request.state in the handler functions, as shown below:

from fastapi import FastAPI, Request

@asynccontextmanager
async def lifespan(app: FastAPI):
    yield {'foo', 'bar'} # yield the object here

app = FastAPI(lifespan=lifespan)

@app.get('/')
async def index(request: Request):
    print(request.state.foo) # 'bar'
Enter fullscreen mode Exit fullscreen mode

Unfortunately, in our case app is instantiated outside of run() and it has no knowledge of sync_store at the time. Considering now we just store it at app.state directly, we could still define a lifespan handler, to check for the presence of the store, at startup:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # do a check when the application starts up to ensure sync_store is available
    if not (hasattr("sync_store", app.state) and isinstance(app.state.sync_store, SyncStore)):
        raise RuntimeError("SyncStore object is not found")

    yield

app = FastAPI(lifespan=lifespan)
Enter fullscreen mode Exit fullscreen mode

Lastly, update the webhook, to use the queue from sync_store, via request.app.state.sync_store.telegram_queue

@app.post("/webhook/telegram", include_in_schema=False)
async def telegram_webhook(
    request: Request,
    x_telegram_bot_api_secret_token: Annotated[
        str, Header(pattern=settings.TELEGRAM_SECRET_TOKEN, strict=True)
    ],
) -> None:
    asyncio.create_task(
        asyncio.to_thread(
            # change how the queue is accessed
            request.app.state.sync_store.telegram_queue.put,
            await request.json()
        )
    )
Enter fullscreen mode Exit fullscreen mode

Although the refactoring looks rather significant, the core logic underneath is unaffected. Apart from changing how the queue is accessed in both the Telegram and the FastAPI web application, there’s no change on how it is done. Data is still being produced by the web application via .put(), and consumed by the .get() in the bot.

Beyond the Fix: Testability, Scalability, and What’s Next

So back to the original objective of this refactoring — remove the hard-coded telegram_queue. Fortunately, we have done so successfully, though the process was somewhat involved. The biggest advantage of passing them explicitly; is how it improves testability. No more patching and replacing globals are needed, just pass the test queue to the function, or properly populate the app.state while creating FastAPI’s TestClient fixture.

Testing would be covered next week.

We also explored grouping all the synchronization primitives into one object. When the number of child processes increases, the number of such objects would accumulate inevitably. Even if we cherry-pick, ensuring only the relevant ones go into each child process, some process would end up taking too many arguments. Thus, creating an object containing them can be a good workaround to the problem.

Other than the rather involved gymnastics of moving things around, there’s no change to the underlying workflow. Still it brings consistency compared to the original implementation. Now all synchronization primitives are all defined in the same place, all managed by the multiprocessing.Manager, and are introduced to each child process explicitly as arguments. Additionally, the usage of multiprocessing.Manager makes it easy to scale the application as the manager can run as a server.


Photo by Sharon Co Images on Unsplash

This is it, the journey started from a very unassuming take-home assignment and in the end became a valuable lesson. On a high level it isn’t quite significant, as it doesn’t bring much optimization (if not slower due to the change to the queue managed by multiprocessing.Manager). Even so, by applying what I learned to my chatbot, it immediately became relevant as the bot is still running out there serving its purpose. Turning the experience into this article; is also how I reciprocate to the community.

Hmm, that was the third time my LLM Editor suggested I should use a semicolon..)

Anyway, that’s all I have for this week, thanks for reading, and I shall write again, next week.

Just so you know, while an AI assistant helped me refine this article’s clarity and structure, the voice, all the technical content, and every line of code presented here are entirely mine. This piece reflects my personal experiences and insights. If you’re interested in job opportunities or collaborating on projects, please feel free to reach out via LinkedIn or connect with me right here on Medium.


Top comments (0)