DEV Community

Sven Varkel
Sven Varkel

Posted on

Async/await MongoDB in Python

Intro

By now almost everyone knows or at least has heard about "async/await" paradigm on JavaScript/Node.JS. But it seems that async processing is a lot less common in Python. Python feels like "old school" language when compared to JavaScript, it really feels old sometimes. But it's very clear, concise and developers can be really productive in it. It's also very similar to JS with its somewhat loose-but-not-so-loose typing, variable assignment etc.
One big difference is syntax - in JS whitespaces in the code (almost) don't matter while in Python these are important.

However, I try to keep this post short and will focus on a specific task:

- how to use MongoDB asynchronously in Python

Async/await - why bother at all?

First and foremost reason for me is concurrency. With async processing developer can run things in parallel without too much hassle and tinkering with threads or processes. It has its limits and risks but for simple cases it does the job. Async/await will come extra handy in those cases when there's a lot of I/O operations involved. Be it disk reads/writes or accessing the database. Here we are talking about accessing the database, and namely MongoDB.

MongoDB and Python

MongoDB itself feels very "javascriptish" so to say. If you're a seasoned Node.js developer like myself it feels very comfortable and cozy. MongoDB shell has the same language that your code runs in - JavaScript. You can copy-paste code and objects between your IDE/project and MongoDB shell or MongoDB IDE. And so on and on.
For Python-only developers it may be a bit hard to switch between languages-contexts but it's doable. My brain still has 2 halves but these are not called "left half" and "right half" any more but "JS half" and "Python half" ;) Half a brain still does the job, haaa:)

There's an old good Python API for MongoDB - it's called PyMongo. It gets the job done and even more but where's the fun in all that old school serial, non-async processing?

Motor to the rescue

There's a really good async driver API for MongoDB: Motor

Here are some examples.

Connecting to database

from motor.motor_asyncio import AsyncIOMotorClient

uri = "mongodb://dev:dev@localhost:27017/mydatabase?authSource=admin"
client = AsyncIOMotorClient(uri)
Enter fullscreen mode Exit fullscreen mode

The same with some connection args

from motor.motor_asyncio import AsyncIOMotorClient

uri = "mongodb://dev:dev@localhost:27017/?authSource=admin"
connection_args = {
    "zlibCompressionLevel": 7,
    "compressors": "zlib"
}
client = AsyncIOMotorClient(uri, **connection_args)

Enter fullscreen mode Exit fullscreen mode

Getting database and collection instance from client:

db = client.get_database("mydatabase")
collection = db.get_collection("mycollection")
Enter fullscreen mode Exit fullscreen mode

Most common database operations

Find

async def find():
    """
    This method finds items from MongoDB collection and
    processes these by using another asynchronous method
    :return: 
    """
    collection = db.get_collection("mycollection")

    filter_ = {
        "someField": "someValue"
    }
    projection_ = {
        "_id": False  # don't return the _id
    }
    cursor = collection.find(filter=filter_, projection=projection_)

    # it gets interesting here now. Iterate over the cursor asynchronously
    async for item in cursor:
        await do_something_in_an_async_worker_method(item)


async def find_cursor_to_list():
    """
    This method finds items from MongoDB collection and
    asynchronously converts cursor to a list with items 
    :return: 
    """
    collection = db.get_collection("mycollection")

    filter_ = {
        "someField": "someValue"
    }
    projection_ = {
        "_id": False  # don't return the _id
    }
    cursor = collection.find(filter=filter_, projection=projection_)
    # Convert the cursor to a list of items right away.
    # NB! Dangerous with large result sets
    items = await cursor.to_list(length=500)
    return items

Enter fullscreen mode Exit fullscreen mode

Update / upsert

from datetime import datetime


async def update():
    """
    This method updates data in MongoDB asynchronously
    :return: 
    """
    collection = db.get_collection("mycollection")

    filter_ = {
        "someField": "someValue"
    }
    data = {
        "someNewField": "aNewValue"
    }

    update_ = {
        "$set": data,
        "$currentDate": {
            "updatedAt": True  # set field updatedAt to current date automagically. Good practice ;)
        },
        "$setOnInsert": {
            "createdAt": datetime.utcnow()
            # set field createdAt to current date automagically ONLY IF it's a new record
        }

    }
    # if upsert=True and record is not found then it is created
    await collection.update_one(filter_, update_, upsert=True)
Enter fullscreen mode Exit fullscreen mode

Aggregation

It's also possible to run aggregation pipeline with Motor.

async def aggregate():
    """
    This method run async aggregation
    :return: 
    """
    collection = db.get_collection("mycollection")

    pipeline = [
        {
            "$match": {
                "foo": "bar"
            }
        },
        {
            "$group": {
                "_id": None,
                "total": {"$sum": 1}
            }
        },
        {
            "$sort": {"foo": -1}
        }
    ]
    # mind the use of **allowDisk** argument. It's necessary for all
    # bigger result sets that are sorted
    cursor = collection.aggregate(pipeline=pipeline, allowDiskUse=True)
    results = await cursor.to_list(length=1000)
    return results

Enter fullscreen mode Exit fullscreen mode

Bulk writes

Motor can be used for bulk writes that would improve performance
when storing may items from big lists of results of some kind of processing.

Here's an example:

from pymongo import UpdateOne
from datetime import datetime


async def bulk_save(very_big_list: list = list()):
    """

    :param very_big_list: List with a lot of items. Or an item generator ... ? 
    :return: 
    """
    collection = db.get_collection("mycollection")

    # this defines batch size that will be added to database at once
    BATCH_SIZE = 500

    updates = list()

    for item in very_big_list:
        filter_ = {
            "foo": item.get("foo")
        }
        update_ = {
            "$set": item,
            "$currentDate": {
                "updatedAt": True
            },
            "$setOnInsert": {
                "createdAt": datetime.utcnow()
            }
        }
        u = UpdateOne(filter=filter_, update=update_, upsert=True)
        updates.append(u)

        # if list of updates is filled up to BATCH_SIZE push data to database
        if len(updates) >= BATCH_SIZE:
            await collection.bulk_write(updates, ordered=False)
            # re-initialize list of updates
            updates = list()

    # add all remaining items to database
    if len(updates) > 0:
        await collection.bulk_write(updates, ordered=False)

Enter fullscreen mode Exit fullscreen mode

Summary

Since Python is by default "synchronous" and not event-loop-based language it may be a bit hard to get accustomed to async/await pattern in Python.
In this article I gave an overview how to access MongoDB asynchronously in Python by giving examples on connecting the database with Motor client library, on querying and updating data, running aggregations and bulk updates.

Thanks for reading!

Top comments (0)