Python 异步编程实战

2026-06-22 · 6 阅读 · 241字
Python

Python 异步编程实战

从同步到异步

传统的同步 I/O 模型中,线程在等待 I/O 操作时被阻塞,导致资源浪费。异步编程允许在等待 I/O 时切换到其他任务,大幅提升并发能力。

asyncio 基础

Python 3.4 引入 asyncio 标准库,提供了异步 I/O 框架。

协程

使用 async def 定义协程,await 等待协程执行完成:

import asyncio

async def fetch_data():
    print("开始获取数据...")
    await asyncio.sleep(1)
    print("数据获取完成")
    return {"data": "example"}

async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

并发执行多个协程

async def main():
    task1 = asyncio.create_task(fetch_data())
    task2 = asyncio.create_task(fetch_data())
    results = await asyncio.gather(task1, task2)
    print(results)

awaitable 对象

三种 awaitable 对象:

  1. 协程(coroutine)
  2. Task:使用 asyncio.create_task() 创建
  3. Future:底层 awaitable,通常不直接使用

异步上下文管理器

class AsyncResource:
    async def __aenter__(self):
        print("获取资源")
        await asyncio.sleep(0.5)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("释放资源")
        await asyncio.sleep(0.3)

async def main():
    async with AsyncResource() as res:
        print("使用资源中...")

实际应用场景

HTTP 请求

使用 aiohttp 发送并发 HTTP 请求:

import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

异步数据库操作

使用 asyncpg 连接 PostgreSQL:

import asyncpg

async def query_db():
    conn = await asyncpg.connect(user='user', password='pass',
                                  database='db', host='localhost')
    rows = await conn.fetch('SELECT * FROM users')
    await conn.close()
    return rows

常见陷阱

阻塞事件循环

在协程中调用阻塞的同步函数会阻塞事件循环。使用 loop.run_in_executor() 将阻塞操作交给线程池:

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, time.sleep, 1)

忘记 await

协程被创建为 Task 但不 await 它,会导致协程永远不会执行。

过多的并发

使用信号量控制并发度:

semaphore = asyncio.Semaphore(10)

async def limited_fetch(url):
    async with semaphore:
        return await fetch_url(url)