DEV Community

Cover image for Temporal Python 1.0.0 – A Durable, Distributed Asyncio Event Loop
Rain Leander for Temporal

Posted on • Originally published at temporal.io

Temporal Python 1.0.0 – A Durable, Distributed Asyncio Event Loop

We are excited about the GA release of the Temporal Python SDK. Python is now a fully-supported workflow language
in Temporal, and our use of native asyncio constructs makes it a
perfect fit for Python developers looking to write durable workflows.

Here are some links to read more about Temporal Python:

Like essentially all of Temporal, Temporal Python is MIT licensed, and open source contributions are very welcome. Join
us in the #python-sdk channel in Temporal Slack or ask questions in the
Community Forum.

Most of the details about how to use the SDK are in the aforementioned links, so this post will only give a high-level
refresher of what Temporal and Temporal Python are. Then we will dive into some of the details of how Temporal Python
leverages asyncio and other Temporal Python features.

Intro to Temporal

Temporal is a workflow system allowing developers to write workflows in code. Workflows run in any number of workers.
Code in a workflow translates to events in Temporal which means the workflow code can be replayed with the events on
different workers as needed in an Event Sourcing approach. Therefore workflow code must be deterministic in order to
safely rerun.

Workflows can call out to activities, which also run on workers and are general purpose functions that can do anything.

Clients can signal, query, cancel, and/or terminate workflows.

Since workflows routinely wait on timers, async tasks, managed coroutines, cancellations, etc., workflows are modeled
well as Python asyncio event loops.

Intro to Temporal Python

To give a quick walk through of Temporal Python, we'll implement a simplified form of one-click buying where a purchase
is started and then, unless cancelled, will be performed in 10 seconds.

Implementing an Activity

First, let's create a simple activity that does an HTTP POST of a purchase via aiohttp:

import aiohttp
from temporalio import activity
from temporalio.exceptions import ApplicationError

@dataclass
class Purchase:
    item_id: str
    user_id: str

@activity.defn
async def do_purchase(purchase: Purchase) -> None:
    async with aiohttp.ClientSession() as sess:
        async with sess.post("https://api.example.com/purchase", json=asdict(purchase)) as resp:
            # We don't want to retry client failure
            if resp.status >= 400 and resp.status < 500:
                raise ApplicationError(f"Status: {resp.status}", resp.json(), non_retryable=True)
            # Otherwise, fail on bad status which will be inherently retried
            resp.raise_for_status()
Enter fullscreen mode Exit fullscreen mode

Implementing a Workflow

Now we want to execute that activity from a workflow after 10 seconds unless we receive a cancel:

import asyncio
from datetime import timedelta
from enum import IntEnum
from temporalio import workflow

# Import our activity, but pass it through the sandbox
with workflow.unsafe.imports_passed_through():
    from .my_activities import Purchase, do_purchase

class PurchaseStatus(IntEnum):
    PENDING = 1
    CONFIRMED = 2
    CANCELLED = 3
    COMPLETED = 4

@workflow.defn
class OneClickBuyWorkflow:
    def __init__(self) -> None:
        self.status = PurchaseStatus.PENDING
        self.purchase: Optional[Purchase] = None

    @workflow.run
    async def run(self, purchase: Purchase) -> PurchaseStatus:
        self.purchase = self.purchase or purchase

        # Give user 10 seconds to cancel or update before we send it through
        try:
            await asyncio.sleep(10)
        except asyncio.CancelledError:
            self.status = PurchaseStatus.CANCELLED
            return self.status

        # Update status, purchase, and update status again
        self.status = PurchaseStatus.CONFIRMED
        await workflow.execute_activity(
            Purchaser.purchase,
            self.purchase,
            start_to_close_timeout=timedelta(minutes=1),
            cancellation_type=workflow.ActivityCancellationType.WAIT_CANCELLATION_COMPLETED,
        )
        self.status = PurchaseStatus.COMPLETED
        return self.status

    @workflow.signal
    def update_purchase(self, purchase: Purchase) -> None:
        self.purchase = purchase

    @workflow.query
    def current_status(self) -> PurchaseStatus:
        return self.status
Enter fullscreen mode Exit fullscreen mode

See the asyncio.sleep in there? That's no normal local-process sleep; that's a durable timer backed by Temporal. See
the "Temporal Workflows are Asyncio Event Loops" section later.

Running a Worker

Workflows and activities are run in workers like so:

from temporalio.client import Client
from temporalio.worker import Worker
from .my_workflows import OneClickBuyWorkflow
from .my_activities import do_purchase

# Create and run a worker on a task queue for the workflow and activity
worker = Worker(
    await Client.connect("my.temporal.host:7233"),
    task_queue="my-task-queue",
    workflows=[OneClickBuyWorkflow],
    activities=[do_purchase],
)
await worker.run()
Enter fullscreen mode Exit fullscreen mode

Using a Client

Now we can start the workflow, send it a signal, check it's status, etc.:

from temporalio.client import Client
from .my_activities import Purchase
from .my_workflows import OneClickBuyWorkflow, PurchaseStatus

# Create the client
client = await Client.connect("my.temporal.host:7233")

# Start a workflow
handle = await client.start_workflow(
    OneClickBuyWorkflow.run,
    Purchase(item_id="item1", user_id="user1"),
    id="user1-purchase1",
    task_queue="my-task-queue",
)

# We can cancel it if we want
await handle.cancel()

# We can query its status, even if the workflow is complete
status = await handle.query(OneClickBuyWorkflow.current_status)
assert status == PurchaseStatus.CANCELLED

# We can do many other things with the client like sending a signal to update
# the purchase, wait for workflow completion, terminate the workflow, etc.
Enter fullscreen mode Exit fullscreen mode

This only scratches the surface of what can be done with Temporal Python. See the
Python SDK project for more details.

Now that we've given a brief overview of Temporal Python, let's discuss how it leverages asyncio.

Temporal Workflows are Asyncio Event Loops

Here we'll describe asyncio and how Temporal leverages it.

Asyncio Behind the Scenes

When running the following in a Python async def function, what happens?

await asyncio.sleep(10)
Enter fullscreen mode Exit fullscreen mode

It sleeps for 10 seconds. But asyncio.sleep is not
like a time.sleep — it does not block a thread. Instead, it is
more like a JS setTimeout (or a Go time.Sleep or a
Rust Tokio sleep, etc.) in that it instructs the underlying scheduler or event loop to yield to other work and only
resume after that time has passed.

In Python as of this writing, here's what sleep(delay, result=None) looks like:

loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
                    futures._set_result_unless_cancelled,
                    future, result)
try:
    return await future
finally:
    h.cancel()
Enter fullscreen mode Exit fullscreen mode

So it obtains the current event loop, creates a future on it, then tells the event loop via
call_later to invoke the future
result setter after a certain amount of time. How call_later is implemented is up to the event loop. The default event
loop just puts a timer on a heap of scheduled things to be checked against event loop "time" on each single event loop
iteration.

Asyncio event loops are simply classes that implement
asyncio.AbstractEventLoop. In most
normal cases, developers will use
asyncio.run(my_async_func()) or similar which
will create the default event loop for that system and
run_until_complete.
run_until_complete is implemented by default as
run_forever with
stop invoked when the task is done.

So how is run_forever implemented? Exactly as one might expect for an event loop — with a loop:

while True:
    self._run_once()
    if self._stopping:
        break
Enter fullscreen mode Exit fullscreen mode

This just repeatedly calls _run_once which just processes all things that are ready to be processed (e.g. futures that
have become ready, timers that have passed their time, etc.) waiting to be woken up by the underlying system to
run an iteration again.

So, an event loop is just a triggered "loop" that executes all ready tasks until all are yielded and then waits to be
triggered again.

Temporal's Asyncio Event Loop

In all Temporal SDK languages workflows work similarly. When triggered, they continually run all coroutines individually
and cooperatively until they are all yielded waiting on updates from the Temporal server. Then we send the
since-collected commands of that most recent iteration to the server. Once the server sends us events back (e.g. a
completed timer, a completed activity, a received signal, etc.), we populate yielded values and trigger the workflow to
do the same processing again.

So, a Temporal workflow is just a triggered "loop" that executes all ready tasks until all are yielded and then waits to
be triggered again. Just like an event loop. Therefore it feels very natural to develop Temporal workflow coroutines as
asyncio coroutines.

We have created our own instance of asyncio.AbstractEventLoop that is set as the currently running loop for Temporal
workflows. We don't even need to implement run_until_complete or run_forever, because we always only ever want to
run one iteration of the event loop.

Durability

When we sleep or start a task in normal Python asyncio code and the process crashes, our state and ability to pick up
where we left off are lost. Temporal workflows however are built to be durable and resumable. Since workflow code is
deterministic and all of these constructs are durable, a worker/process crash is no problem. Temporal is built to replay
existing events/code to get to where it left off. Another worker can easily resume the workflow with no data or
functionality loss. Once a workflow is accepted by Temporal, it is ensured to run to completion.

Timers

Asyncio timers are built as Temporal timers. For example, inside a workflow if we run:

await asyncio.sleep(10)
Enter fullscreen mode Exit fullscreen mode

We are actually starting a Temporal durable timer. The timer will be visible in the UI. This is the same for everything
that ends up calling call_later. So
if we have:

await asyncio.wait_for(workflow.execute_child_workflow(MyChildWorkflow.run), timeout=10)
Enter fullscreen mode Exit fullscreen mode

We have started a child workflow and a 10-second timer which behaves just like Python does. That means if the 10-second
timeout occurs before the completion of the child workflow, this will cancel the child workflow and raise a
TimeoutError.

We can even get the current loop time via:

asyncio.get_running_loop().time()
Enter fullscreen mode Exit fullscreen mode

This effectively the same as workflow.now() but shows that the event loop is at a deterministic point in time. We can
even use a time-skipping test workflow environment and control the time manually.

Tasks

Creating a task on the event loop is the proper way to start a Temporal coroutine in a workflow. So we can easily have
something like:

async with asyncio.TaskGroup() as tg:
    for activity_param in range(20):
        tg.create_task(
            workflow.execute_activity(
                my_activity,
                activity_param,
                start_to_close_timeout=timedelta(minutes=5),
            ),
        )
Enter fullscreen mode Exit fullscreen mode

This uses the new TaskGroup functionality in
Python 3.11 to start 20 activities and wait for them all to complete.

In fact Temporal Python was developed before task groups were made available, but since we leverage native asyncio
constructs, newer features automatically work.

Activities and child workflow handles are actually implemented as extensions of asyncio tasks. So starting an activity
or a started child workflow can be used as tasks. This is acceptable:

handle = workflow.start_activity(my_activity, start_to_close_timeout=timedelta(minutes=5))
handle.set_name("my-task-name-for-stack-traces")
workflow.logger.info("Result: %s", await handle)
Enter fullscreen mode Exit fullscreen mode

Cancellation

Luckily for us, Python cancellation and Temporal cancellation work almost exactly the same. An asyncio cancellation and
a Temporal cancellation are merely requests that raise errors in the underlying code, but can be caught and ignored.
This means if we cancel a workflow from a client, Temporal will relay that to the workflow as
task cancellation which causes an
asyncio.CancelledError to be raised.

We can even use shielding to ensure
something like a really important activity cannot be cancelled, for example:

await asyncio.shield(workflow.execute_activity(
    do_not_cancel_me,
    start_to_close_timeout=timedelta(minutes=5),
))
Enter fullscreen mode Exit fullscreen mode

Note, this will not stop the outer part from still raising a cancelled error which, if uncaught, will cancel the
workflow thereby cancelling the activity anyways. See
Python shielding docs on how to
ignore cancellation altogether.

This means task cancellation also works. Say we want to be able to cancel activities as a group after 3 seconds:

multiple_activities = asyncio.create_task(asyncio.gather(
    workflow.execute_activity(my_activity1, start_to_close_timeout=timedelta(minutes=5)),
    workflow.execute_activity(my_activity2, start_to_close_timeout=timedelta(minutes=5)),
))
await asyncio.sleep(3)
multiple_activities.cancel()
await multiple_activities
Enter fullscreen mode Exit fullscreen mode

This actually issues durable cancellation requests to the activities wherever they may be running if they have started.

Synchronization Primitives

We can also use most deterministic asyncio synchronization primitives. For example, it is very common to use
asyncio.Queue or
asyncio.Event. There are often workflows written
like:

@workflow.defn
class MyWorkflow:
    def __init__(self) -> None:
        self.should_proceed = asyncio.Event()

    @workflow.run
    async def run(self) -> None:
        workflow.logger.info("Waiting...")
        await self.should_proceed.wait()
        workflow.logger.info("Completing!")

    @workflow.signal
    def proceed(self) -> None:
        self.should_proceed.set()
Enter fullscreen mode Exit fullscreen mode

Then a client may signal the workflow to proceed.

Wait Condition

Since we control each event loop iteration, we can do more advanced things than normal Python code. For example, we
offer a wait_condition that does not return until a callback is true. We invoke that callback on each iteration. So
the same above signal example could be written as:

@workflow.defn
class MyWorkflow:
    def __init__(self) -> None:
        self.should_proceed = False

    @workflow.run
    async def run(self) -> None:
        workflow.logger.info("Waiting...")
        await workflow.wait_condition(lambda: self.should_proceed)
        workflow.logger.info("Completing!")

    @workflow.signal
    def proceed(self) -> None:
        self.should_proceed = True
Enter fullscreen mode Exit fullscreen mode

This type of feature isn't normally available in Python asyncio loops because there is not an easy way to run a callback
every iteration of the event loop.

Limitations

Only deterministic asyncio constructs are implemented. So everything relating to subprocesses, disk/network IO, etc.
will fail if invoked.

Currently, only relative-time-based functionality is implemented with the Temporal event loop. Therefore using
call_at will fail. Unfortunately the
newer high-level asyncio.timeout is implemented
via call_at even though it takes a relative time, so it cannot be used yet either. It is possible we may one day
remove this limitation and allow absolute time relative to workflow time, but it can lead to confusing code for a user.

Other Temporal Python Features

While this post focused on how Temporal workflows are durable asyncio event loops, there are some other interesting
aspects about Temporal Python worth noting.

Fully Typed

The entire library is fully typed with the latest Python type hinting capabilities. The library was designed with typing
in mind and many calls were developed as generics to help developers catch mistakes. For example, given a workflow:

@workflow.defn
class MyWorkflow
    @workflow.run
    async def run(self, param: int) -> None:
        ...
Enter fullscreen mode Exit fullscreen mode

MyPy (or other type checkers) would report an error if we did:

await my_client.execute_workflow(MyWorkflow.run, "some param", id="id", task_queue="tq")
Enter fullscreen mode Exit fullscreen mode

This fails because the workflow doesn't take a string argument, it takes an integer one. This type safety is prevalent
throughout the library. See the API documentation for all of the types and overloads.

Multiple Activity Styles

We have only shown async def activities which are the most common and recommended way to develop activities. But many
Python uses require non-async invocations and calling those in an async context will block the event loop.

Developers can use
run_in_executor from their
async activity if they'd like (and this is a very common approach), but we also support non-async activities. For
threaded activities, a worker can be given a
ThreadPoolExecutor to run activities.
We even support multiprocess activities with a
ProcessPoolExecutor. Extra effort was
made to support activity heartbeating and cancellation of activities across threads and even processes. See the
repository documentation for more info.

Workflow Sandboxing

All workflows run in a sandbox by default. For each workflow
run, the sandbox essentially re-imports the file the workflow is in to ensure no global state pollution. Also, it
proxies known non-deterministic standard library calls to prevent accidental things like accessing disk or random inside
a workflow. Most non-standard-library imports should be marked as pass through at import, e.g.:

with workflow.unsafe.imports_passed_through():
    import pydantic
Enter fullscreen mode Exit fullscreen mode

This keeps them from being re-imported which saves memory and performance. The sandbox is not foolproof or secure. There
are some known caveats and the sandbox can be disabled per workflow or for the entire worker. See the repository
documentation for more details.

There will be a future blog post on how the sandbox works in detail and how we built it.

Rust Core

Both this Python SDK and the TypeScript SDK (and the upcoming
Ruby and .NET SDKs) are backed by
the same Rust core. Unlike many "SDKs", the Temporal SDKs are not just simple
smart clients. Rather, they are entire complex state machines. In Temporal, users run the workers and all workflow and
activity code happens on the worker, so advanced machinations are needed to ensure these run properly.

In Python we use PyO3 and PyO3 Asyncio
with some custom Rust bridge code to make this work with the Rust core. This means code not only runs fast, but
automatically incorporates state machine fixes and improvements as the core is improved.

Also see the post Why Rust powers Temporal's new Core SDK.

Replaying

One of the most powerful yet often overlooked features of Temporal is the ability to replay workflow code using
historical runs. We can fetch workflow histories from the Temporal server and run them on our local workflow code using
a replayer.

Replaying past workflows on newer workflow code can help catch incompatible/non-deterministic changes and other
unexpected bugs before deployment.

Also, replaying a past workflow history on local workflow code allows us to debug that code. Simply disable the deadlock
detector on the replayer via debug_mode=True, and we can put breakpoints in our workflow code and step through a
workflow exactly how it originally ran.

Test Environments

Temporal Python offers two test server implementations to make testing easy.

A full Temporal server running locally on SQLite can be started via
await temporalio.testing.WorkflowEnvironment.start_local(). This downloads an executable into a temporary location if
not already present and starts it rather quickly. This has all features a standard Temporal server has since it is a
full, single-binary Temporal server. The UI can even be enabled.

We also provide a time-skipping server implementation.
await temporalio.testing.WorkflowEnvironment.start_time_skipping() starts a server that can skip time to the next
workflow event. Using this, time can also be manually skipped. This is the perfect solution for testing workflows that
sleep for a long time or in cases where we want to check how a timeout may affect the system. Like the full Temporal
server, this one also downloads a binary lazily on first call then executes. It is actually written in Java as part of
our Java SDK and compiled natively via GraalVM.

Wrap-up

So that's a basic introduction to Temporal Python, how it tightly integrates with asyncio, and some of its other
features. There are many more features not covered in this post.

Check out the repository and give it a try! Also, join us on the
#python-sdk channel in Temporal Slack, the Community Forum,
on Twitter, etc. Our community has meetups,
workshops, conferences, and more. In particular, our
YouTube Channel has lots of good content from past presentations.

Top comments (0)