DEV Community

Ashish Shukla for Python Discipline @EPAM India

Posted on • Edited on

Futures : A deep dive into python's concurrent.futures.Future - Part 1 ( Let's make our own Future )

Introduction

This is first part of a 2 part series of blogs that will contain deep dive into python's concurrent.futures.Future . What it does, how it does what it does and why it exists, we will explore everything and dive deep into CPython's implementation for the same.

In this part we will talk about what concurrent.futures.Future is and what they really are under the hood and by the end we will have our own implementation of a future class.

Why concurrent.futures ?

Concurrent package only has one module as of now till Python - 3.13 which is futures. The intent of this module is to allow users to, as python docs say,

"provide a high-level interface for asynchronously executing callables".

This module primarily contains two things-

  1. Executors
  2. Futures

Let's say I want to get results from a list of APIs concurrently, then this is what a typical piece of code involving ThreadPoolExecutors looks like-

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 5)]

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(requests.get, url) for url in urls]
    for future in as_completed(futures):
        res = future.result()
        print(f"{res.url} - {res.status_code} - {res.json()['title']}")
Enter fullscreen mode Exit fullscreen mode

There are different types of executors in Python but one thing that's consistent across them is their usage of futures for sending back results of the execution.

On a high level Executors are what manage how and where your callables will run but because the execution is happening asynchronously the result is not received immediately. Instead of immediately returning a result executors return a promise of returning a result or error which is represented by an object called Future.

Let's talk about the future !

Let's try to build an intuition for what future should look like. Now we know that executor will execute a callable, so it should have a method that will take a callable ( at least, apart from this it can obviously take other arguments as well ) and once we submit this job for execution it will return us a future. Because we are submitting this callable for execution let's say that executors will have the function submit.

Because we will get future objects, there must be a Future class,

class Future:
    ...
Enter fullscreen mode Exit fullscreen mode

Now once we submit the callable for execution some processing happens in the background, after execution of this callable a couple of things may happen-

  1. It returns the return value
  2. It raises an exception

We need to have two methods for being able to set these, let's say those methods are -

class Future:
    def set_exception(self, exception):
        ...

    def set_result(self, result):
        ...
Enter fullscreen mode Exit fullscreen mode

Once these values ( exception or result ) are set the caller should also be able to get these, let's say those methods are -

class Future:
    def set_exception(self, exception):
        ...

    def set_result(self, result):
        ...

    def result(self):
        ...

    def exception(self):
        ...
Enter fullscreen mode Exit fullscreen mode

But how will I know that the future's value is set ? or is it pending execution ? or is corresponding callable actually running in executor ? Let's add a state attribute and some methods to check this state -

PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'

class Future:
    def __init__(self):
        self._state = PENDING

    def set_exception(self, exception):
        ...

    def set_result(self, result):
        ...

    def result(self):
        ...

    def exception(self):
        ...

    def running(self):
       ...

    def pending(self):
       ...

    def done(self):
       ...

    def set_running(self):
       ...
Enter fullscreen mode Exit fullscreen mode

Ok now we should be able to use this class for a basic Future implementation. But there is one state that's not-so-intuitive, and that is cancelled state. Think of it this way, when future represents a pending result, its possible that the execution of the callable was just cancelled and it never ran. So we need one more state cancelled-

PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'
CANCELLED = 'CANCELLED'

class Future:
    def __init__(self):
        self._state = PENDING

    def set_exception(self, exception):
        ...

    def set_result(self, result):
        ...

    def result(self):
        ...

    def exception(self):
        ...

    def running(self):
       ...

    def pending(self):
       ...

    def done(self):
       ...

    def cancelled(self):
       ...

    def set_running(self):
       ...

    def cancel(self):
       ...
Enter fullscreen mode Exit fullscreen mode

But thats just signatures ?

Yes, now let's try to fill in the functionalities of these methods one by one.

Get States

Let's get the obvious stuff out of the way first. That is state methods. The implementation for these methods is pretty obvious, if the self._state attribute matches with the expected state of the method return True else False. Remember that a cancelled state also means that the future is "done" as corresponding callable has reached its end -

PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'
CANCELLED = 'CANCELLED'

class Future:
   def __init__(self):
      self._state = PENDING

   def running(self):
      return self._state == RUNNING

   def pending(self):
      return self._state == PENDING

   def done(self):
      return self._state == FINISHED or self._state == CANCELLED

   def cancelled(self):
      return self._state == CANCELLED
Enter fullscreen mode Exit fullscreen mode

Set Exception

Setting exception should be straight forward, unless the future is already completed we should just set the state and mark it finished.

class InvalidStateError(Exception):
     """Raise this exception if we are trying to do something
     but its not allowed because of state mismatch"""

class Future:
   def __init__(self):
       self._exception = None

   def set_exception(self, exception):
       if self._state in {CANCELLED, FINISHED}:
            raise InvalidStateError
       self._exception = exception
       self._state = FINISHED
Enter fullscreen mode Exit fullscreen mode

Get Exception

Getting the exception should be straight forward as well, unless the future is cancelled if its finished just return the set exception-

class Future:
   def exception(self):
       if self._state == CANCELLED:
           raise InvalidStateError
       elif self._state == FINISHED:
           return self._exception
Enter fullscreen mode Exit fullscreen mode

Set Result

This is also pretty straight-forward, almost same as setting exception, as long as future isn't cancelled or finished set the result and mark it finished.

class InvalidStateError(Exception):
     """Raise this exception if we are trying to do something
     but its not allowed because of state mismatch"""

class Future:
   def __init__(self):
       self._result = None

   def set_result(self, result):
       if self._state in {CANCELLED, FINISHED}:
            raise InvalidStateError
       self._result = result
       self._state = FINISHED
Enter fullscreen mode Exit fullscreen mode

Get Result

This gets a little tricky, first, same as getting exception, if its cancelled we cant return the result, but if its finished its possible that there was an exception, if there was an exception then we should re-raise it else we can return the result-

class Future:
   def result(self):
       if self._state == CANCELLED:
           raise InvalidStateError
       elif self._state == FINISHED:
           if self._exception:
               raise self._exception
           else:
               return self._result
Enter fullscreen mode Exit fullscreen mode

Set cancelled

If a future is still pending execution we can mark it cancelled but if its already running or finished we cant cancel it anymore-

class Future:
    def cancel(self):
        if self._state in {RUNNING, FINISHED}:
            return False

        if self._state == CANCELLED:
            return True

        self._state = CANCELLED
        return True
Enter fullscreen mode Exit fullscreen mode

Set Running

A future can only move to running state if its pending else the state is invalid.

class Future:
    def set_running(self):
        if self._state == PENDING:
            self._state = RUNNING
            return

        raise InvalidStateError
Enter fullscreen mode Exit fullscreen mode

Put it all together

Let's put it all together, and there we have our basic implementation of a Future object that can be used by an Executor to communicate whats going on to the caller,

PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'
CANCELLED = 'CANCELLED'

class InvalidStateError(Exception):
     """Raise this exception if we are trying to do something
     but its not allowed because of state mismatch"""

class Future:
    def __init__(self):
        self._state = PENDING
        self._result = None
        self._exception = None

    def running(self):
        return self._state == RUNNING

    def pending(self):
        return self._state == PENDING

    def done(self):
        return self._state == FINISHED or self._state == CANCELLED

    def cancelled(self):
        return self._state == CANCELLED

    def set_exception(self, exception):
       if self._state in {CANCELLED, FINISHED}:
           raise InvalidStateError
       self._exception = exception
       self._state = FINISHED

    def exception(self):
       if self._state == CANCELLED:
           raise InvalidStateError
       elif self._state == FINISHED:
           return self._exception

    def set_result(self, result):
       if self._state in {CANCELLED, FINISHED}:
           raise InvalidStateError
       self._result = result
       self._state = FINISHED

    def result(self):
       if self._state == CANCELLED:
           raise InvalidStateError
       elif self._state == FINISHED:
           if self._exception:
               raise self._exception
           else:
               return self._result

    def cancel(self):
        if self._state in {RUNNING, FINISHED}:
            return False

        if self._state == CANCELLED:
            return True

        self._state = CANCELLED
        return True

    def set_running(self):
        if self._state == PENDING:
            self._state = RUNNING
            return

        raise InvalidStateError
Enter fullscreen mode Exit fullscreen mode

Use the future in an Executor

Let’s implement a very basic executor that runs callables on background threads using the threading module and connects the results back to the Future.

import threading

class Executor:
    def submit(self, fn, *args, **kwargs):
        future = Future()

        def run():
            try:
                future.set_running()
                result = fn(*args, **kwargs)
                future.set_result(result)
            except Exception as e:
                future.set_exception(e)

        t = threading.Thread(target=run)
        t.start()
        return future
Enter fullscreen mode Exit fullscreen mode

Now we can use it like this -

import time
def i_sleep():
    time.sleep(5)
    return 100

executor = Executor()
future = executor.submit(i_sleep)

print(future.result())
Enter fullscreen mode Exit fullscreen mode

We expect it to print 100, but hold on, it prints None instead !

Why is that happening ?

At the time we call print(future.result()), the future might not be finished yet, because i_sleep() sleeps for 5 seconds, but our main thread does not wait until the background thread completes. So when future.result() is called, the self._state is likely still RUNNING, and our method doesn't return anything in that case — leading to None being printed.

So how do we solve it ?

We need to wait for the result if the future is still running. There are many threading synchronization primitives that we can use, like threading.Event or threading.Condition, even a Busy-waiting works but that will waste CPU cycles. Let's do this with threading.Condition as it is what CPython's implementation also uses internally.

A quick note on Conditions, Conditions are a synchronization primitive and when you do with condition it acquires the lock, condition.wait() releases the lock and sleeps till someone else calls condition.notify() or condition.notify_all().

For Synchronization here we need to focus on these things-

  1. Whatever function we want to call, we must acquire the lock first to avoid race-condition
  2. Whenever we are done setting a state we need to notify the waiting code that it can proceed now.
  3. If we are trying to return the result we should wait till the state is a final one ie. CANCELLED or FINISHED

Let's put it in the class's implementation-

import threading

PENDING = "PENDING"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
CANCELLED = "CANCELLED"


class InvalidStateError(Exception):
    """Raised if an operation is attempted in an invalid state."""


class Future:
    def __init__(self):
        self._state = PENDING
        self._result = None
        self._exception = None
        self._condition = threading.Condition()

    def running(self):
        with self._condition:
            return self._state == RUNNING

    def pending(self):
        with self._condition:
            return self._state == PENDING

    def done(self):
        with self._condition:
            return self._state in {FINISHED, CANCELLED}

    def cancelled(self):
        with self._condition:
            return self._state == CANCELLED

    def set_exception(self, exception):
        with self._condition:
            if self._state in {CANCELLED, FINISHED}:
                raise InvalidStateError
            self._exception = exception
            self._state = FINISHED
            self._condition.notify_all()

    def exception(self):
        with self._condition:
            if self._state == CANCELLED:
                raise InvalidStateError
            elif self._state == FINISHED:
                return self._exception

    def set_result(self, result):
        with self._condition:
            if self._state in {CANCELLED, FINISHED}:
                raise InvalidStateError
            self._result = result
            self._state = FINISHED
            self._condition.notify_all()

    def result(self):
        with self._condition:
            while self._state not in {FINISHED, CANCELLED}:
                self._condition.wait()
            if self._state == CANCELLED:
                raise InvalidStateError
            if self._exception:
                raise self._exception
            return self._result

    def cancel(self):
        with self._condition:
            if self._state in {RUNNING, FINISHED}:
                return False
            if self._state == CANCELLED:
                return True
            self._state = CANCELLED
            self._condition.notify_all()
            return True

    def set_running(self):
        with self._condition:
            if self._state == PENDING:
                self._state = RUNNING
                return
            raise InvalidStateError
Enter fullscreen mode Exit fullscreen mode

Now when we use this with our Executor it works and returns 100 instead of None.

But I saw the CPython implementation and ...

Ok, I know you saw CPython's future class implementation and you feel somewhat cheated.

You are probably thinking why Condition is preferred over Event in CPython implementation, what is self._done_callbacks doing what is this new state CANCELLED_AND_NOTIFIED, what does it mean to add_done_callback, why are they writing the same code twice in result and exception methods and so on !

But I think we can agree that the crux of our future implementation and CPython's future implementation looks about the same.

We will explore all of it and uncover the mysteries of CPython's future implementation in the second part.

Thanks for reading !

Disclaimer: This is a personal blog. The views and opinions expressed here are only those of the author and do not represent those of any organization or any individual with whom the author may be associated, professionally or personally.

Top comments (0)