Hi DevTo. It's my first post. So be gentle with me :)
The problem
I have been using Beanie ODM and MongoDB in my work for a long time.
It's a great tool, but managing sessions and transactions across the service layer was problematic for me.
My service layers need to know about the session object to pass it in Beanie ODM model methods to bind it with the session.
It looks like this
class SomeService:
def run_business_logic(db_session: AsyncIOMotorClientSession) -> Product:
chocolate = Category(name="Chocolate", description="A preparation of roasted and ground cacao seeds.")
tonybar = Product(name="Tony's", price=5.95, category=chocolate)
await tonybar.save(session=db_session) # set session
return tonybar
important_service = SomeService()
@router.post("")
async def do_some_service_logic(
db_session: AsyncIOMotorClientSession = Depends(get_session),
):
async with await session.start_transaction(): # Start transaction
return await important_service.run_business_logic(
db_session=db_session, # We need to pass the session on the service layer to work with Beanie
)
It's making me feel frustrated, because now I need to pass the session object to my service.
Game changer
But things changed after Beanie released version 2.0, and Beanie moved from Motor in favour of the PyMongo Async client.
So pymongo gives the option to bind all calls to MongoDB with the method .bind()
So basically, pymongo can create a session, and can bind to this session all calls to MongoDB.
Example from the documentation.
async with client.start_session() as s:
async with s.bind():
# session=s is passed implicitly
await client.db.collection.insert_one({"x": 1})
For more details, here is how it works inside the pymongo client:
_SESSION: ContextVar[Optional[AsyncClientSession]] = ContextVar("SESSION", default=None)
class _AsyncBoundSessionContext:
"""Context manager returned by AsyncClientSession.bind() that manages bound state."""
def __init__(self, session: AsyncClientSession, end_session: bool) -> None:
self._session = session
self._session_token: Optional[Token[AsyncClientSession]] = None
self._end_session = end_session
async def __aenter__(self) -> AsyncClientSession:
self._session_token = _SESSION.set(self._session) # type: ignore[assignment]
return self._session
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
if self._session_token:
_SESSION.reset(self._session_token) # type: ignore[arg-type]
self._session_token = None
if self._end_session:
await self._session.end_session()
class AsyncClientSession:
...
def bind(self, end_session: bool = True) -> _AsyncBoundSessionContext:
return _AsyncBoundSessionContext(self, end_session)
So PyMongo will set the session object in _SESSION ContextVar and on any call to the database will check if there is a session, if there is a session, it will use it.
Because PyMongo sits underneath Beanie, this works transparently. I don't need to change Beanie itself at all. Just create a session and call .bind() method, and everything else will be handled by the pymongo client.
Because _SESSION is ContextVar, this means that the session is task-safe for us.
For a transaction, we can use a session that is set in _SESSION context.
Now we can wrap all this logic in Python decorators to reuse it:
from pymongo.asynchronous.client_session import _SESSION, AsyncClientSession
def get_client() -> "AsyncMongoClient":
# Function that returns the current Mongo client
...
def auto_session(**session_kwargs) -> Callable:
""" Decorator that sets session automatically"""
def decorator(f: Callable) -> Callable:
@wraps(f)
async def wrapper(*args, **kwargs):
client = get_client() # Return MongoClientAsync
async with client.start_session(**session_kwargs) as s:
async with s.bind():
return await f(*args, **kwargs)
return wrapper
return decorator
def wrap_in_transaction(**transaction_kwargs) -> Callable:
""" Decorator that sets transaction automatically with bind session"""
def decorator(f: Callable) -> Callable:
@wraps(f)
async def wrapper(*args, **kwargs):
# Take session from context directly from pymongo. asynchronous.client_session
db_session: AsyncClientSession | None = _SESSION.get()
if not db_session:
raise Exception("Session not initialized, start session with using .bind() method")
async with await db_session.start_transaction(**transaction_kwargs):
return await f(*args, **kwargs)
return wrapper
return decorator
And we can use decorators on the endpoint level.
# The Service layer is now clean. No 'session' argument needed!
class SomeService:
async def run_business_logic(self) -> Product:
chocolate = Category(name="Chocolate", description="...")
tonybar = Product(name="Tony's", price=5.95, category=chocolate)
# Beanie finds the session automatically via PyMongo's ContextVar
await tonybar.save()
return tonybar
# The Controller remains clean and focused on HTTP
@router.post("")
@auto_session() # Automatically starts and binds the session
@wrap_in_transaction() # Wraps the execution in a transaction if the service raise error, Mongo will rollback data
async def do_some_service_logic():
return await important_service.run_business_logic()
Now our services do not know anything about the session, which means more clean code for us:
- Sessions are task-safe, each call of the endpoint will get its own session
- We do not bind transactions to a session, we just use it from the context of the pymongo client
- Business logic (Our services) stays focused on the domain, not knowing about how to handle database sessions.
Top comments (0)