If you're a JavaScript developer diving into Python, you've probably wondered: "Where are the Promises?" Python's approach to asynchronous programming is different, but just as powerful. This comprehensive guide will show you how to translate every JavaScript Promise pattern into Python's asyncio equivalents.
The Fundamental Difference
JavaScript uses Promises as objects representing future values, while Python uses coroutines with the async
/await
syntax directly. Here's the basic comparison:
JavaScript Promise
function fetchData() {
return new Promise((resolve) => {
setTimeout(() => resolve("Data fetched!"), 1000);
});
}
fetchData().then(result => console.log(result));
Python Coroutine
import asyncio
async def fetch_data():
await asyncio.sleep(1)
return "Data fetched!"
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())
Promise.all() → asyncio.gather()
Use case: Wait for multiple asynchronous operations to complete simultaneously.
JavaScript Promise.all()
async function fetchMultipleData() {
const urls = [
'https://api.example.com/users',
'https://api.example.com/posts',
'https://api.example.com/comments'
];
const promises = urls.map(url => fetch(url).then(r => r.json()));
try {
const results = await Promise.all(promises);
console.log('All data fetched:', results);
return results;
} catch (error) {
console.error('One or more requests failed:', error);
throw error;
}
}
Python asyncio.gather()
import asyncio
import aiohttp
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.json()
async def fetch_multiple_data():
urls = [
'https://api.example.com/users',
'https://api.example.com/posts',
'https://api.example.com/comments'
]
async with aiohttp.ClientSession() as session:
try:
results = await asyncio.gather(
*[fetch_data(session, url) for url in urls]
)
print('All data fetched:', results)
return results
except Exception as error:
print('One or more requests failed:', error)
raise error
# Alternative with error handling per task
async def fetch_multiple_data_safe():
urls = [
'https://api.example.com/users',
'https://api.example.com/posts',
'https://api.example.com/comments'
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_data(session, url) for url in urls],
return_exceptions=True # Don't fail on first exception
)
# Process results and exceptions
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f'Request {i} failed: {result}')
else:
print(f'Request {i} succeeded: {len(result)} items')
return results
Key Differences:
-
JavaScript:
Promise.all([...promises])
-
Python:
asyncio.gather(*coroutines)
(note the unpacking with*
) -
Error Handling: Python's
return_exceptions=True
lets you handle individual failures
Promise.race() → asyncio.wait() with FIRST_COMPLETED
Use case: Return the result of whichever operation completes first.
JavaScript Promise.race()
async function raceExample() {
const slowPromise = new Promise(resolve =>
setTimeout(() => resolve('Slow result'), 3000)
);
const fastPromise = new Promise(resolve =>
setTimeout(() => resolve('Fast result'), 1000)
);
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), 2000)
);
try {
const result = await Promise.race([slowPromise, fastPromise, timeoutPromise]);
console.log('First result:', result); // "Fast result"
return result;
} catch (error) {
console.error('First to complete was an error:', error);
throw error;
}
}
Python asyncio.wait() with FIRST_COMPLETED
async def slow_task():
await asyncio.sleep(3)
return 'Slow result'
async def fast_task():
await asyncio.sleep(1)
return 'Fast result'
async def timeout_task():
await asyncio.sleep(2)
raise Exception('Timeout')
async def race_example():
tasks = [
asyncio.create_task(slow_task()),
asyncio.create_task(fast_task()),
asyncio.create_task(timeout_task())
]
try:
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Get the first completed result
first_task = list(done)[0]
result = first_task.result() # This will raise if the task failed
print('First result:', result) # "Fast result"
# Cancel remaining tasks
for task in pending:
task.cancel()
return result
except Exception as error:
print('First to complete was an error:', error)
# Cancel all remaining tasks
for task in tasks:
if not task.done():
task.cancel()
raise error
# Alternative: Using asyncio.wait_for() for timeout patterns
async def race_with_timeout():
try:
result = await asyncio.wait_for(slow_task(), timeout=2.0)
return result
except asyncio.TimeoutError:
print("Operation timed out")
raise
Promise.allSettled() → asyncio.wait() with ALL_COMPLETED
Use case: Wait for all operations to complete, regardless of success or failure.
JavaScript Promise.allSettled()
async function allSettledExample() {
const promises = [
Promise.resolve('Success 1'),
Promise.reject(new Error('Error 1')),
Promise.resolve('Success 2'),
new Promise(resolve => setTimeout(() => resolve('Delayed success'), 1000))
];
const results = await Promise.allSettled(promises);
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`Promise ${index} succeeded:`, result.value);
} else {
console.log(`Promise ${index} failed:`, result.reason);
}
});
return results;
}
Python asyncio.wait() with ALL_COMPLETED
async def success_task_1():
return 'Success 1'
async def error_task():
raise Exception('Error 1')
async def success_task_2():
return 'Success 2'
async def delayed_task():
await asyncio.sleep(1)
return 'Delayed success'
async def all_settled_example():
tasks = [
asyncio.create_task(success_task_1()),
asyncio.create_task(error_task()),
asyncio.create_task(success_task_2()),
asyncio.create_task(delayed_task())
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
results = []
for i, task in enumerate(tasks):
try:
result = task.result()
print(f'Task {i} succeeded: {result}')
results.append({'status': 'fulfilled', 'value': result})
except Exception as error:
print(f'Task {i} failed: {error}')
results.append({'status': 'rejected', 'reason': str(error)})
return results
# Python 3.11+ TaskGroup approach (Recommended)
async def all_settled_with_task_group():
results = []
# Handle successful tasks
async def safe_task(task_func, task_name):
try:
result = await task_func()
return {'status': 'fulfilled', 'value': result, 'name': task_name}
except Exception as error:
return {'status': 'rejected', 'reason': str(error), 'name': task_name}
tasks = [
safe_task(success_task_1, 'task1'),
safe_task(error_task, 'task2'),
safe_task(success_task_2, 'task3'),
safe_task(delayed_task, 'task4')
]
results = await asyncio.gather(*tasks)
for result in results:
if result['status'] == 'fulfilled':
print(f"{result['name']} succeeded: {result['value']}")
else:
print(f"{result['name']} failed: {result['reason']}")
return results
Promise.any() → Custom Implementation
Use case: Return the first successful result, or fail if all operations fail.
JavaScript Promise.any()
async function anyExample() {
const promises = [
Promise.reject(new Error('Error 1')),
Promise.reject(new Error('Error 2')),
new Promise(resolve => setTimeout(() => resolve('Success!'), 1000)),
Promise.reject(new Error('Error 3'))
];
try {
const result = await Promise.any(promises);
console.log('First success:', result); // "Success!"
return result;
} catch (aggregateError) {
console.error('All promises failed:', aggregateError.errors);
throw aggregateError;
}
}
Python Custom Implementation
class AllFailedError(Exception):
def __init__(self, errors):
self.errors = errors
super().__init__(f"All operations failed: {errors}")
async def any_successful(*coroutines):
"""Python equivalent of Promise.any()"""
if not coroutines:
raise ValueError("At least one coroutine required")
tasks = [asyncio.create_task(coro) for coro in coroutines]
exceptions = []
try:
while tasks:
done, tasks = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
try:
result = task.result()
# Cancel remaining tasks
for remaining_task in tasks:
remaining_task.cancel()
return result
except Exception as error:
exceptions.append(error)
# All tasks failed
raise AllFailedError(exceptions)
finally:
# Ensure all tasks are cancelled
for task in tasks:
if not task.done():
task.cancel()
# Usage example
async def failing_task_1():
await asyncio.sleep(0.5)
raise Exception('Error 1')
async def failing_task_2():
await asyncio.sleep(0.3)
raise Exception('Error 2')
async def success_task():
await asyncio.sleep(1)
return 'Success!'
async def failing_task_3():
await asyncio.sleep(0.8)
raise Exception('Error 3')
async def any_example():
try:
result = await any_successful(
failing_task_1(),
failing_task_2(),
success_task(),
failing_task_3()
)
print('First success:', result) # "Success!"
return result
except AllFailedError as error:
print('All operations failed:', error.errors)
raise error
Modern Python: TaskGroup (Python 3.11+)
Python 3.11 introduced TaskGroup
, which provides a more structured way to handle concurrent tasks:
# TaskGroup automatically handles cleanup and error propagation
async def modern_concurrent_example():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data('https://api.example.com/users'))
task2 = tg.create_task(fetch_data('https://api.example.com/posts'))
task3 = tg.create_task(fetch_data('https://api.example.com/comments'))
# All tasks completed successfully if we reach here
return [task1.result(), task2.result(), task3.result()]
# TaskGroup with exception handling
async def modern_with_error_handling():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(success_task_1())
task2 = tg.create_task(error_task()) # This will cause TaskGroup to fail
task3 = tg.create_task(success_task_2())
# Won't reach here if any task fails
return [task1.result(), task2.result(), task3.result()]
except* Exception as eg: # Exception groups (Python 3.11+)
for error in eg.exceptions:
print(f"Task failed: {error}")
raise
Performance Comparison
Here's a practical example showing the performance benefits of concurrent execution:
import time
import asyncio
import aiohttp
async def time_comparison():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1'
]
# Sequential execution
start_time = time.time()
async with aiohttp.ClientSession() as session:
results_sequential = []
for url in urls:
async with session.get(url) as response:
results_sequential.append(await response.json())
sequential_time = time.time() - start_time
# Concurrent execution with asyncio.gather()
start_time = time.time()
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
return await response.json()
results_concurrent = await asyncio.gather(*[fetch(url) for url in urls])
concurrent_time = time.time() - start_time
print(f"Sequential execution: {sequential_time:.2f} seconds")
print(f"Concurrent execution: {concurrent_time:.2f} seconds")
print(f"Speedup: {sequential_time/concurrent_time:.2f}x")
# Expected output:
# Sequential execution: 3.15 seconds
# Concurrent execution: 1.12 seconds
# Speedup: 2.81x
Quick Reference Cheat Sheet
JavaScript | Python | Use Case |
---|---|---|
Promise.all(promises) |
asyncio.gather(*coroutines) |
Wait for all, fail fast |
Promise.allSettled(promises) |
asyncio.wait(tasks, return_when=ALL_COMPLETED) |
Wait for all, collect results/errors |
Promise.race(promises) |
asyncio.wait(tasks, return_when=FIRST_COMPLETED) |
Return first to complete |
Promise.any(promises) |
Custom any_successful() function |
Return first success |
new Promise((resolve, reject) => ...) |
async def coroutine(): ... |
Create async operation |
promise.then().catch() |
try: await coroutine() except: |
Handle results/errors |
Promise.resolve(value) |
return value (in async function) |
Return immediate value |
Promise.reject(error) |
raise error (in async function) |
Return immediate error |
Best Practices
Use
asyncio.gather()
for the most common concurrent patterns - it's the closest equivalent toPromise.all()
Prefer
TaskGroup
(Python 3.11+) for better error handling and resource cleanupUse
return_exceptions=True
withgather()
when you want to handle individual failuresAlways cancel pending tasks when using
asyncio.wait()
to prevent resource leaksUse
aiohttp
instead ofrequests
for HTTP operations in async codeConsider
asyncio.wait_for()
for simple timeout scenarios instead of implementing race conditions
Conclusion
While Python doesn't have Promises, its asyncio patterns are equally powerful and often more explicit about error handling and resource management. The key is understanding that Python's async
/await
syntax works directly with coroutines, eliminating the need for Promise wrapper objects.
Whether you're migrating from JavaScript or just learning async patterns, these equivalents will help you write efficient, concurrent Python code that scales beautifully.
Top comments (0)