Coroutines in Python are a powerful tool for writing asynchronous code. They've revolutionized how we handle concurrent operations, making it easier to build scalable and efficient applications. I've spent a lot of time working with coroutines, and I'm excited to share some insights on creating custom asynchronous primitives.
Let's start with the basics. Coroutines are special functions that can be paused and resumed, allowing for cooperative multitasking. They're the foundation of Python's async/await syntax. When you define a coroutine, you're essentially creating a function that can yield control back to the event loop, allowing other tasks to run.
To create a custom awaitable object, you need to implement the await method. This method should return an iterator. Here's a simple example:
class CustomAwaitable:
def __init__(self, value):
self.value = value
def __await__(self):
yield
return self.value
async def use_custom_awaitable():
result = await CustomAwaitable(42)
print(result) # Output: 42
This CustomAwaitable class can be used with the await keyword, just like built-in awaitables. When awaited, it yields control once, then returns its value.
But what if we want to create more complex asynchronous primitives? Let's look at implementing a custom semaphore. Semaphores are used to control access to a shared resource by multiple coroutines:
import asyncio
class CustomSemaphore:
def __init__(self, value=1):
self._value = value
self._waiters = []
async def acquire(self):
while self._value <= 0:
fut = asyncio.get_running_loop().create_future()
self._waiters.append(fut)
await fut
self._value -= 1
def release(self):
self._value += 1
if self._waiters:
asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
self.release()
async def worker(semaphore, num):
async with semaphore:
print(f"Worker {num} acquired the semaphore")
await asyncio.sleep(1)
print(f"Worker {num} released the semaphore")
async def main():
semaphore = CustomSemaphore(2)
tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
await asyncio.gather(*tasks)
asyncio.run(main())
This CustomSemaphore class implements the acquire and release methods, as well as the async context manager protocol (aenter and aexit). It allows a maximum of two coroutines to acquire the semaphore simultaneously.
Now, let's talk about creating efficient event loops. While Python's asyncio provides a robust event loop implementation, there might be cases where you need a custom one. Here's a basic example of a custom event loop:
import time
from collections import deque
class CustomEventLoop:
def __init__(self):
self._ready = deque()
self._stopping = False
def call_soon(self, callback, *args):
self._ready.append((callback, args))
def run_forever(self):
while not self._stopping:
self._run_once()
def _run_once(self):
ntodo = len(self._ready)
for _ in range(ntodo):
callback, args = self._ready.popleft()
callback(*args)
def stop(self):
self._stopping = True
def run_until_complete(self, coro):
def _done_callback(fut):
self.stop()
task = self.create_task(coro)
task.add_done_callback(_done_callback)
self.run_forever()
return task.result()
def create_task(self, coro):
task = Task(coro, self)
self.call_soon(task._step)
return task
class Task:
def __init__(self, coro, loop):
self._coro = coro
self._loop = loop
self._done = False
self._result = None
self._callbacks = []
def _step(self):
try:
if self._done:
return
result = self._coro.send(None)
if isinstance(result, SleepHandle):
result._task = self
self._loop.call_soon(result._wake_up)
else:
self._loop.call_soon(self._step)
except StopIteration as e:
self.set_result(e.value)
def set_result(self, result):
self._result = result
self._done = True
for callback in self._callbacks:
self._loop.call_soon(callback, self)
def add_done_callback(self, callback):
if self._done:
self._loop.call_soon(callback, self)
else:
self._callbacks.append(callback)
def result(self):
if not self._done:
raise RuntimeError('Task is not done')
return self._result
class SleepHandle:
def __init__(self, duration):
self._duration = duration
self._task = None
self._start_time = time.time()
def _wake_up(self):
if time.time() - self._start_time >= self._duration:
self._task._loop.call_soon(self._task._step)
else:
self._task._loop.call_soon(self._wake_up)
async def sleep(duration):
return SleepHandle(duration)
async def example():
print("Start")
await sleep(1)
print("After 1 second")
await sleep(2)
print("After 2 more seconds")
return "Done"
loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)
This custom event loop implements basic functionality like running tasks, handling coroutines, and even a simple sleep function. It's not as feature-rich as Python's built-in event loop, but it demonstrates the core concepts.
One of the challenges in writing asynchronous code is managing task priorities. While Python's asyncio doesn't provide built-in priority queues for tasks, we can implement our own:
import asyncio
import heapq
class PriorityEventLoop(asyncio.AbstractEventLoop):
def __init__(self):
self._ready = []
self._stopping = False
self._clock = 0
def call_at(self, when, callback, *args, context=None):
handle = asyncio.Handle(callback, args, self, context)
heapq.heappush(self._ready, (when, handle))
return handle
def call_later(self, delay, callback, *args, context=None):
return self.call_at(self._clock + delay, callback, *args, context=context)
def call_soon(self, callback, *args, context=None):
return self.call_at(self._clock, callback, *args, context=context)
def time(self):
return self._clock
def stop(self):
self._stopping = True
def is_running(self):
return not self._stopping
def run_forever(self):
while self._ready and not self._stopping:
self._run_once()
def _run_once(self):
if not self._ready:
return
when, handle = heapq.heappop(self._ready)
self._clock = when
handle._run()
def create_task(self, coro):
return asyncio.Task(coro, loop=self)
def run_until_complete(self, future):
asyncio.futures._chain_future(future, self.create_future())
self.run_forever()
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
return future.result()
def create_future(self):
return asyncio.Future(loop=self)
async def low_priority_task():
print("Low priority task started")
await asyncio.sleep(2)
print("Low priority task finished")
async def high_priority_task():
print("High priority task started")
await asyncio.sleep(1)
print("High priority task finished")
async def main():
loop = asyncio.get_event_loop()
loop.call_later(0.1, loop.create_task, low_priority_task())
loop.call_later(0, loop.create_task, high_priority_task())
await asyncio.sleep(3)
asyncio.run(main())
This PriorityEventLoop uses a heap queue to manage tasks based on their scheduled execution time. You can assign priorities by scheduling tasks with different delays.
Handling cancellation gracefully is another important aspect of working with coroutines. Here's an example of how to implement cancellable tasks:
import asyncio
async def cancellable_operation():
try:
print("Operation started")
await asyncio.sleep(5)
print("Operation completed")
except asyncio.CancelledError:
print("Operation was cancelled")
# Perform any necessary cleanup
raise # Re-raise the CancelledError
async def main():
task = asyncio.create_task(cancellable_operation())
await asyncio.sleep(2)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Main: task was cancelled")
asyncio.run(main())
In this example, the cancellable_operation catches the CancelledError, performs any necessary cleanup, and then re-raises the exception. This allows for graceful handling of cancellation while still propagating the cancellation status.
Let's explore implementing custom async iterators. These are useful for creating sequences that can be iterated over asynchronously:
class AsyncRange:
def __init__(self, start, stop, step=1):
self.start = start
self.stop = stop
self.step = step
def __aiter__(self):
return self
async def __anext__(self):
if self.start >= self.stop:
raise StopAsyncIteration
value = self.start
self.start += self.step
await asyncio.sleep(0.1) # Simulate some async work
return value
async def main():
async for i in AsyncRange(0, 5):
print(i)
asyncio.run(main())
This AsyncRange class implements the async iterator protocol, allowing it to be used in async for loops.
Finally, let's look at implementing custom async context managers. These are useful for managing resources that need to be acquired and released asynchronously:
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(1) # Simulate async acquisition
return self
async def __aexit__(self, exc_type, exc, tb):
print("Releasing resource")
await asyncio.sleep(1) # Simulate async release
async def main():
async with AsyncResource() as resource:
print("Using resource")
await asyncio.sleep(1)
asyncio.run(main())
This AsyncResource class implements the aenter and aexit methods, allowing it to be used with the async with statement.
In conclusion, Python's coroutine system provides a powerful foundation for building custom asynchronous primitives. By understanding the underlying mechanisms and protocols, you can create tailored solutions for specific asynchronous challenges, optimize performance in complex concurrent scenarios, and extend Python's async capabilities. Remember, while these custom implementations are great for learning and specific use cases, Python's built-in asyncio library is highly optimized and should be your go-to for most scenarios. Happy coding!
Our Creations
Be sure to check out our creations:
Investor Central | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)