Introduction
This is first part of a 2 part series of blogs that will contain deep dive into python's concurrent.futures.Future
. What it does, how it does what it does and why it exists, we will explore everything and dive deep into CPython's implementation for the same.
In this part we will talk about what concurrent.futures.Future
is and what they really are under the hood and by the end we will have our own implementation of a future class.
Why concurrent.futures
?
Concurrent package only has one module as of now till Python - 3.13 which is futures. The intent of this module is to allow users to, as python docs say,
"provide a high-level interface for asynchronously executing callables".
This module primarily contains two things-
- Executors
- Futures
Let's say I want to get results from a list of APIs concurrently, then this is what a typical piece of code involving ThreadPoolExecutors
looks like-
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 5)]
with ThreadPoolExecutor() as executor:
futures = [executor.submit(requests.get, url) for url in urls]
for future in as_completed(futures):
res = future.result()
print(f"{res.url} - {res.status_code} - {res.json()['title']}")
There are different types of executors in Python but one thing that's consistent across them is their usage of futures for sending back results of the execution.
On a high level Executors are what manage how and where your callables will run but because the execution is happening asynchronously the result is not received immediately. Instead of immediately returning a result executors return a promise of returning a result or error which is represented by an object called Future.
Let's talk about the future !
Let's try to build an intuition for what future should look like. Now we know that executor will execute a callable, so it should have a method that will take a callable ( at least, apart from this it can obviously take other arguments as well ) and once we submit this job for execution it will return us a future. Because we are submitting this callable for execution let's say that executors will have the function submit
.
Because we will get future objects, there must be a Future class,
class Future:
...
Now once we submit the callable for execution some processing happens in the background, after execution of this callable a couple of things may happen-
- It returns the return value
- It raises an exception
We need to have two methods for being able to set these, let's say those methods are -
class Future:
def set_exception(self, exception):
...
def set_result(self, result):
...
Once these values ( exception or result ) are set the caller should also be able to get these, let's say those methods are -
class Future:
def set_exception(self, exception):
...
def set_result(self, result):
...
def result(self):
...
def exception(self):
...
But how will I know that the future's value is set ? or is it pending execution ? or is corresponding callable actually running in executor ? Let's add a state attribute and some methods to check this state -
PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'
class Future:
def __init__(self):
self._state = PENDING
def set_exception(self, exception):
...
def set_result(self, result):
...
def result(self):
...
def exception(self):
...
def running(self):
...
def pending(self):
...
def done(self):
...
def set_running(self):
...
Ok now we should be able to use this class for a basic Future implementation. But there is one state that's not-so-intuitive, and that is cancelled
state. Think of it this way, when future represents a pending result, its possible that the execution of the callable was just cancelled and it never ran. So we need one more state cancelled
-
PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'
CANCELLED = 'CANCELLED'
class Future:
def __init__(self):
self._state = PENDING
def set_exception(self, exception):
...
def set_result(self, result):
...
def result(self):
...
def exception(self):
...
def running(self):
...
def pending(self):
...
def done(self):
...
def cancelled(self):
...
def set_running(self):
...
def cancel(self):
...
But thats just signatures ?
Yes, now let's try to fill in the functionalities of these methods one by one.
Get States
Let's get the obvious stuff out of the way first. That is state methods. The implementation for these methods is pretty obvious, if the self._state
attribute matches with the expected state of the method return True
else False
. Remember that a cancelled state also means that the future is "done" as corresponding callable has reached its end -
PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'
CANCELLED = 'CANCELLED'
class Future:
def __init__(self):
self._state = PENDING
def running(self):
return self._state == RUNNING
def pending(self):
return self._state == PENDING
def done(self):
return self._state == FINISHED or self._state == CANCELLED
def cancelled(self):
return self._state == CANCELLED
Set Exception
Setting exception should be straight forward, unless the future is already completed we should just set the state and mark it finished.
class InvalidStateError(Exception):
"""Raise this exception if we are trying to do something
but its not allowed because of state mismatch"""
class Future:
def __init__(self):
self._exception = None
def set_exception(self, exception):
if self._state in {CANCELLED, FINISHED}:
raise InvalidStateError
self._exception = exception
self._state = FINISHED
Get Exception
Getting the exception should be straight forward as well, unless the future is cancelled if its finished just return the set exception-
class Future:
def exception(self):
if self._state == CANCELLED:
raise InvalidStateError
elif self._state == FINISHED:
return self._exception
Set Result
This is also pretty straight-forward, almost same as setting exception, as long as future isn't cancelled or finished set the result and mark it finished.
class InvalidStateError(Exception):
"""Raise this exception if we are trying to do something
but its not allowed because of state mismatch"""
class Future:
def __init__(self):
self._result = None
def set_result(self, result):
if self._state in {CANCELLED, FINISHED}:
raise InvalidStateError
self._result = result
self._state = FINISHED
Get Result
This gets a little tricky, first, same as getting exception, if its cancelled we cant return the result, but if its finished its possible that there was an exception, if there was an exception then we should re-raise it else we can return the result-
class Future:
def result(self):
if self._state == CANCELLED:
raise InvalidStateError
elif self._state == FINISHED:
if self._exception:
raise self._exception
else:
return self._result
Set cancelled
If a future is still pending execution we can mark it cancelled but if its already running or finished we cant cancel it anymore-
class Future:
def cancel(self):
if self._state in {RUNNING, FINISHED}:
return False
if self._state == CANCELLED:
return True
self._state = CANCELLED
return True
Set Running
A future can only move to running state if its pending else the state is invalid.
class Future:
def set_running(self):
if self._state == PENDING:
self._state = RUNNING
return
raise InvalidStateError
Put it all together
Let's put it all together, and there we have our basic implementation of a Future object that can be used by an Executor to communicate whats going on to the caller,
PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'
CANCELLED = 'CANCELLED'
class InvalidStateError(Exception):
"""Raise this exception if we are trying to do something
but its not allowed because of state mismatch"""
class Future:
def __init__(self):
self._state = PENDING
self._result = None
self._exception = None
def running(self):
return self._state == RUNNING
def pending(self):
return self._state == PENDING
def done(self):
return self._state == FINISHED or self._state == CANCELLED
def cancelled(self):
return self._state == CANCELLED
def set_exception(self, exception):
if self._state in {CANCELLED, FINISHED}:
raise InvalidStateError
self._exception = exception
self._state = FINISHED
def exception(self):
if self._state == CANCELLED:
raise InvalidStateError
elif self._state == FINISHED:
return self._exception
def set_result(self, result):
if self._state in {CANCELLED, FINISHED}:
raise InvalidStateError
self._result = result
self._state = FINISHED
def result(self):
if self._state == CANCELLED:
raise InvalidStateError
elif self._state == FINISHED:
if self._exception:
raise self._exception
else:
return self._result
def cancel(self):
if self._state in {RUNNING, FINISHED}:
return False
if self._state == CANCELLED:
return True
self._state = CANCELLED
return True
def set_running(self):
if self._state == PENDING:
self._state = RUNNING
return
raise InvalidStateError
Use the future in an Executor
Let’s implement a very basic executor that runs callables on background threads using the threading module and connects the results back to the Future.
import threading
class Executor:
def submit(self, fn, *args, **kwargs):
future = Future()
def run():
try:
future.set_running()
result = fn(*args, **kwargs)
future.set_result(result)
except Exception as e:
future.set_exception(e)
t = threading.Thread(target=run)
t.start()
return future
Now we can use it like this -
import time
def i_sleep():
time.sleep(5)
return 100
executor = Executor()
future = executor.submit(i_sleep)
print(future.result())
We expect it to print 100
, but hold on, it prints None
instead !
Why is that happening ?
At the time we call print(future.result())
, the future might not be finished yet, because i_sleep()
sleeps for 5
seconds, but our main thread does not wait until the background thread completes. So when future.result()
is called, the self._state
is likely still RUNNING
, and our method doesn't return anything in that case — leading to None
being printed.
So how do we solve it ?
We need to wait for the result if the future is still running. There are many threading synchronization primitives that we can use, like threading.Event
or threading.Condition
, even a Busy-waiting works but that will waste CPU cycles. Let's do this with threading.Condition
as it is what CPython's implementation also uses internally.
A quick note on Conditions, Conditions are a synchronization primitive and when you do with condition
it acquires the lock, condition.wait()
releases the lock and sleeps till someone else calls condition.notify()
or condition.notify_all()
.
For Synchronization here we need to focus on these things-
- Whatever function we want to call, we must acquire the lock first to avoid race-condition
- Whenever we are done setting a state we need to notify the waiting code that it can proceed now.
- If we are trying to return the result we should wait till the state is a final one ie.
CANCELLED
orFINISHED
Let's put it in the class's implementation-
import threading
PENDING = "PENDING"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
CANCELLED = "CANCELLED"
class InvalidStateError(Exception):
"""Raised if an operation is attempted in an invalid state."""
class Future:
def __init__(self):
self._state = PENDING
self._result = None
self._exception = None
self._condition = threading.Condition()
def running(self):
with self._condition:
return self._state == RUNNING
def pending(self):
with self._condition:
return self._state == PENDING
def done(self):
with self._condition:
return self._state in {FINISHED, CANCELLED}
def cancelled(self):
with self._condition:
return self._state == CANCELLED
def set_exception(self, exception):
with self._condition:
if self._state in {CANCELLED, FINISHED}:
raise InvalidStateError
self._exception = exception
self._state = FINISHED
self._condition.notify_all()
def exception(self):
with self._condition:
if self._state == CANCELLED:
raise InvalidStateError
elif self._state == FINISHED:
return self._exception
def set_result(self, result):
with self._condition:
if self._state in {CANCELLED, FINISHED}:
raise InvalidStateError
self._result = result
self._state = FINISHED
self._condition.notify_all()
def result(self):
with self._condition:
while self._state not in {FINISHED, CANCELLED}:
self._condition.wait()
if self._state == CANCELLED:
raise InvalidStateError
if self._exception:
raise self._exception
return self._result
def cancel(self):
with self._condition:
if self._state in {RUNNING, FINISHED}:
return False
if self._state == CANCELLED:
return True
self._state = CANCELLED
self._condition.notify_all()
return True
def set_running(self):
with self._condition:
if self._state == PENDING:
self._state = RUNNING
return
raise InvalidStateError
Now when we use this with our Executor it works and returns 100
instead of None
.
But I saw the CPython implementation and ...
Ok, I know you saw CPython's future class implementation and you feel somewhat cheated.
You are probably thinking why Condition
is preferred over Event
in CPython implementation, what is self._done_callbacks
doing what is this new state CANCELLED_AND_NOTIFIED
, what does it mean to add_done_callback
, why are they writing the same code twice in result
and exception
methods and so on !
But I think we can agree that the crux of our future implementation and CPython's future implementation looks about the same.
We will explore all of it and uncover the mysteries of CPython's future implementation in the second part.
Thanks for reading !
Disclaimer: This is a personal blog. The views and opinions expressed here are only those of the author and do not represent those of any organization or any individual with whom the author may be associated, professionally or personally.
Top comments (0)