Python 异步编程实战
从同步到异步
传统的同步 I/O 模型中,线程在等待 I/O 操作时被阻塞,导致资源浪费。异步编程允许在等待 I/O 时切换到其他任务,大幅提升并发能力。
asyncio 基础
Python 3.4 引入 asyncio 标准库,提供了异步 I/O 框架。
协程
使用 async def 定义协程,await 等待协程执行完成:
import asyncio
async def fetch_data():
print("开始获取数据...")
await asyncio.sleep(1)
print("数据获取完成")
return {"data": "example"}
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())
并发执行多个协程
async def main():
task1 = asyncio.create_task(fetch_data())
task2 = asyncio.create_task(fetch_data())
results = await asyncio.gather(task1, task2)
print(results)
awaitable 对象
三种 awaitable 对象:
- 协程(coroutine)
- Task:使用
asyncio.create_task()创建 - Future:底层 awaitable,通常不直接使用
异步上下文管理器
class AsyncResource:
async def __aenter__(self):
print("获取资源")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await asyncio.sleep(0.3)
async def main():
async with AsyncResource() as res:
print("使用资源中...")
实际应用场景
HTTP 请求
使用 aiohttp 发送并发 HTTP 请求:
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
异步数据库操作
使用 asyncpg 连接 PostgreSQL:
import asyncpg
async def query_db():
conn = await asyncpg.connect(user='user', password='pass',
database='db', host='localhost')
rows = await conn.fetch('SELECT * FROM users')
await conn.close()
return rows
常见陷阱
阻塞事件循环
在协程中调用阻塞的同步函数会阻塞事件循环。使用 loop.run_in_executor() 将阻塞操作交给线程池:
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, time.sleep, 1)
忘记 await
协程被创建为 Task 但不 await 它,会导致协程永远不会执行。
过多的并发
使用信号量控制并发度:
semaphore = asyncio.Semaphore(10)
async def limited_fetch(url):
async with semaphore:
return await fetch_url(url)