Python异步编程完全指南:从入门到精通
异步编程是现代Python开发中不可或缺的技能。本文将带你从零开始,全面掌握Python异步编程的核心概念和实践技巧。
为什么需要异步编程
同步 vs 异步
同步代码的问题:
import requests
import time
def fetch_all_urls(urls):
results = []
for url in urls:
response = requests.get(url) # 阻塞等待
results.append(response.text)
return results
# 10个URL,每个耗时1秒 = 总共10秒
urls = [f"https://api.example.com/data/{i}" for i in range(10)]
start = time.time()
data = fetch_all_urls(urls)
print(f"耗时: {time.time() - start:.2f}秒") # 约10秒
异步代码的优势:
import aiohttp
import asyncio
import time
async def fetch_all_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def fetch_one(session, url):
async with session.get(url) as response:
return await response.text()
# 10个URL并发请求 = 约1秒
urls = [f"https://api.example.com/data/{i}" for i in range(10)]
start = time.time()
data = asyncio.run(fetch_all_urls(urls))
print(f"耗时: {time.time() - start:.2f}秒") # 约1秒
核心概念详解
事件循环
事件循环是异步编程的核心:
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# 获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# 或者使用更简洁的方式
asyncio.run(main())
协程
协程是使用async def定义的函数:
async def my_coroutine():
"""这是一个协程"""
await asyncio.sleep(1)
return "完成"
# 协程调用后返回协程对象,不会立即执行
coro = my_coroutine()
print(type(coro)) # <class 'coroutine'>
# 需要await或run来执行
result = asyncio.run(my_coroutine())
await关键字
await用于等待协程完成:
async def step_one():
await asyncio.sleep(1)
return "步骤1完成"
async def step_two():
await asyncio.sleep(1)
return "步骤2完成"
async def main():
# 顺序执行
result1 = await step_one()
result2 = await step_two()
print(result1, result2)
并发控制
asyncio.gather
并行运行多个协程:
async def fetch_data(url):
await asyncio.sleep(1) # 模拟网络请求
return f"数据来自 {url}"
async def main():
urls = ["url1", "url2", "url3", "url4", "url5"]
# 并行执行所有请求
results = await asyncio.gather(
*[fetch_data(url) for url in urls]
)
print(results)
asyncio.run(main())
asyncio.wait
更灵活的等待控制:
async def main():
tasks = [asyncio.create_task(fetch_data(f"url{i}")) for i in range(5)]
# 等待所有完成
done, pending = await asyncio.wait(tasks)
# 等待第一个完成
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# 等待任意一个异常
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION
)
信号量限制并发数
async def fetch_with_limit(urls, max_concurrent=10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(url):
async with semaphore:
return await fetch_data(url)
tasks = [fetch_one(url) for url in urls]
return await asyncio.gather(*tasks)
实战案例:异步爬虫
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
class AsyncCrawler:
def __init__(self, max_concurrent=10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def init_session(self):
self.session = aiohttp.ClientSession()
async def close_session(self):
if self.session:
await self.session.close()
async def fetch_page(self, url):
async with self.semaphore:
try:
async with self.session.get(url, timeout=10) as response:
html = await response.text()
return self.parse_page(html, url)
except Exception as e:
print(f"错误: {url} - {e}")
return None
def parse_page(self, html, url):
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title')
return {
'url': url,
'title': title.text if title else '无标题'
}
async def crawl(self, urls):
await self.init_session()
try:
tasks = [self.fetch_page(url) for url in urls]
results = await asyncio.gather(*tasks)
return [r for r in results if r]
finally:
await self.close_session()
# 使用示例
async def main():
crawler = AsyncCrawler(max_concurrent=5)
urls = [f"https://example.com/page/{i}" for i in range(100)]
start = time.time()
results = await crawler.crawl(urls)
print(f"爬取 {len(results)} 页,耗时 {time.time()-start:.2f}秒")
asyncio.run(main())
异步上下文管理器
class AsyncContextManager:
async def __aenter__(self):
print("进入上下文")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("退出上下文")
await asyncio.sleep(0.5)
return False
async def main():
async with AsyncContextManager() as manager:
print("执行操作")
asyncio.run(main())
异步迭代器
python
class AsyncRange:
---
📌 更多精彩内容,关注我的[博客](https://wdsega.github.io),每周更新!
Top comments (0)