您现在的位置是:网站首页 > 博客日记 >

详谈python异步编程

作者:YXN-python 阅读量:232 发布日期:2025-02-28

理解并发和并行

要理解asyncio运行原理,首先我们绕不开的概念就是要理解并发和并行,很多人可能经常听说这2个概念,但可能实际用的时候还比较模糊,今天我们就把这2个概念讲明白:

1. 并发:

  • 并发是指系统可以同时管理多个任务,但这些任务并不真正同时运行
  • 在 Python 的 asyncio 中,所有任务运行在单个线程和单个 CPU 核心上,通过事件循环在任务之间快速切换,当某个任务遇到 I/O(例如等待文件读写或网络请求)时,其他任务可以继续执行。

如果看完上面的不太明白,可以想一下小学教的时间利用,假设你要做两件事,煮饭和煮菜,分别使用电饭锅和燃气灶,电饭锅和燃气灶可以同时做饭和做菜,但是你是在淘米后再做的菜,是在饭在电饭锅中煮时候的间隙时间,这就是并发。

2. 并行:

  • 并行指真正的同时执行多个任务,比如利用多线程或多进程让多个任务同时在多个 CPU 核心上运行。

同样我在讲个例子,如果有2个人,一个人去做饭,一个人去做菜,这就是并行,因为2个人同时做了不同的事情。

 

提问环节

asyncio是并发还是并行?

是并发。因为asyncio 使用的是单线程事件循环,虽然它能通过任务间的切换实现“同时运行”的效果,但本质上同一时刻只有一个任务在运行asyncio 不会使用多个线程或多个 CPU 核心,因此它不是并行的

 

为什么asyncio只能使用单 CPU 核心?我主机有8核,为什么不能使用?

因为asyncio 本质上是一种 协程 的实现,依赖单线程的 事件循环,即使你的主机有 8 核,asyncio 依然只能使用 一个线程,因此只能运行在一个 CPU 核心上。

 

为什么协程 就只能在一个CPU核心上运行?

理解了asyncio是协程的实现,但协程为什么就只能在一个cpu核心上运行呢?要回答这个问题,就不得不说协程的本质了。

1. 协程的本质

协程是一种轻量级的用户态线程,它依赖 事件循环 来管理任务的调度和切换,所有协程都在同一个线程中运行。

  • 单线程事件循环: 协程是基于单线程的机制,所有任务的执行都由同一个事件循环调度,因此,事件循环只能运行在一个 CPU 核心上,协程的运行也被限制在这个核心中。
  • 非抢占式调度: 协程的调度是 非抢占式 的,即任务之间的切换是由代码中的 await、yield 等显式控制的,只有当任务主动交出控制权时,事件循环才会切换到其他协程运行。

这意味着协程只能依赖单线程,无法直接利用多核 CPU 的并行计算能力。

 

为什么协程不支持多核?

因为协程的目标是简单性和效率,所以协程的目标是为I/O密集型任务 提供高效的并发处理,而不是多核并行计算,一句话解答了所有疑惑,因为协程压根就不是为多核进行设计的,人家就是为I/O任务准备的。

 

异步编程的关键组件

1、协程(Coroutines)

  • 定义:使用 async def 定义的函数。
  • 特性:可以暂停和恢复执行,使用 await 关键字。

示例:简单协程

import asyncio

async def greet(name):
    print(f"Hello, {name}!")
    await asyncio.sleep(1)
    print(f"Goodbye, {name}!")

 

2、事件循环(Event Loop)

更多参考文章:Python异步编程:事件循环解析

  • 作用:管理和调度协程的执行。
  • 获取方式:使用 asyncio.get_event_loop()asyncio.run() 自动管理。

 

3、任务(Tasks)

Task用于并发调度协程, 通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行, 除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task() ensure_future() 函数。不建议手动实例化 Task 对象。

本质上是将协程对象封装成task对象,并将协程立即加入事件循环,同时追踪协程的状态。

注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。

  • 定义:协程的封装,允许并发执行。
  • 创建方式:asyncio.create_task(coroutine)asyncio.ensure_future(coroutine)

示例:创建任务

async def main():
    task1 = asyncio.create_task(greet("Alice"))
    task2 = asyncio.create_task(greet("Bob"))
    await task1
    await task2

 

4、Future 对象

Future对象是一个相对更偏向底层的可对象,通常我们不会直接用到这个对象,而是直接使用Task对象来完成任务的并和状态的追踪.(Task Futrue的子类)Future为我们提供了异步编程中的 最终结果 的处理(Task类也具备状态处理的功能)

  • 定义:表示一个将来会完成的操作。
  • 用途:协程之间的通信和结果传递。
  • 创建方式:asyncio.Future()

示例:使用 Future

# 这里的fut是一个asyncio.Future对象,用于存储结果
async def set_future(fut, delay, value):
    await asyncio.sleep(delay)
    fut.set_result(value) # 设置Future的结果,通知等待它的协程任务继续执行,调用这个函数,状态就从pending变成done,等待它的所有协程会被事件循环恢复执行

async def main():
    fut = asyncio.Future() # 创建一个Future对象,用于表示一个尚未完成的操作,初始状态下,fut是”挂起“pending状态
    asyncio.create_task(set_future(fut, 2, "Future Result"))# 启动一个并发任务,运行set_future协程,任务将在2秒后调用set_result设置结果
    result = await fut # 挂起当前协程main,直到fut的结果被设置
    print(result)

 

关键语法:async 和 await

async

  • 用途:定义一个协程函数。

await

  • 用途:等待一个协程或可等待对象完成。
  • 限制:只能在协程内部使用。

示例:

async def my_coroutine():
    await asyncio.sleep(1)

 

asyncio.run() 的作用

  • 功能:
    • 创建并运行事件循环。
    • 执行顶层协程。
    • 关闭事件循环。
  • 语法:asyncio.run(main())

示例:

async def main():
    print("Hello, Async World!")

asyncio.run(main())

 

超时与屏蔽取消

asyncio.wait_for()asyncio.shield()

import asyncio

async def eternity():
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    try:
        # 等待 eternity(),但最多等 1 秒
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

    # asyncio.shield() 可以保护一个任务不被取消
    task = asyncio.create_task(eternity())
    try:
        # 即使 wait_for 超时并抛出异常,被 shield 包裹的任务也会继续在后台运行
        await asyncio.wait_for(asyncio.shield(task), timeout=1.0)
    except asyncio.TimeoutError:
        print("Timeout, but the task is still running!")
        # 可以在这里选择继续等待 task 或做其他处理
        # await task

asyncio.run(main())

 

等待多个任务完成

wait() 比 gather() 更底层,它允许你指定等待策略(return_when),并返回两个集合:已完成的任务和未完成的任务。

import asyncio

async def foo():
    await asyncio.sleep(1)
    return 42

async def main():
    tasks = {asyncio.create_task(foo()) for _ in range(5)}
    # 等待所有任务完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    # 也可以等待第一个完成:return_when=asyncio.FIRST_COMPLETED
    for task in done:
        print(f"Task result: {task.result()}")

asyncio.run(main())

 

Python 3.11+ 的重要新特性

任务组和异常组

参考:Python中的异步任务组asyncio.TaskGroup

 

超时控制

asyncio.timeout(delay): 提供了一个更清晰、更Pythonic的方式来为代码块设置超时。

作用:创建一个异步上下文管理器,在内部的代码块执行时间超过 delay 秒时,会引发 asyncio.TimeoutError

优势:asyncio.wait_for() 更易于管理局部的、精细化的超时控制。

async def main():
    try:
        async with asyncio.timeout(1.0): # 进入一个超时上下文
            await asyncio.sleep(10)
    except TimeoutError:
        print("操作超时")

 

Python 3.12 的新增方法与特性

新的运行器: asyncio.Runner()

提供了一个更灵活、可重用的方式来运行异步程序,是 asyncio.run() 的底层实现。

作用:通过一个类来管理事件循环的生命周期。允许在同一个 Runner 实例上多次运行异步函数,并可以控制循环的最终关闭策略。

方法:

runner = asyncio.Runner()

runner.run(coro): 运行一个协程(类似于 asyncio.run())。

runner.close(): 显式关闭运行器及其管理的事件循环。

优势:适用于需要多次运行异步代码且希望避免每次创建新循环开销的场景(如测试框架、交互式环境)。

示例:

with asyncio.Runner() as runner:
    runner.run(coro1())
    runner.run(coro2()) # 使用同一个事件循环
# 退出 with 块后,runner 自动关闭

 

简易服务器创建: asyncio.start_server() 的显著增强

asyncio.start_server() 现在返回一个功能丰富得多的 Server 对象。

新增方法与属性(在返回的 Server 对象上):

  • server.get_loop(): 获取服务器所使用的事件循环。
  • server.is_serving(): 如果服务器正在运行并接受新连接,则返回 True
  • server.sockets: 一个包含服务器正在监听的套接字列表。
  • server.start_serving(): 启动服务器但不等待 serve_forever 任务完成。这允许你在启动后执行其他操作。
  • server.serve_forever(): 运行服务器直到被 stop() 中断。
  • server.stop(): 停止接受新连接,但不影响现有连接。
  • server.wait_closed(): 等待服务器完全关闭(即所有现有连接都已处理完毕)。

优势:提供了对服务器生命周期更精细的控制,例如可以实现平滑重启(先停止接受新连接,等待旧连接处理完再关闭)。

 

响应式任务取消: asyncio.Task.uncancel()

一个相当底层的API,主要用于实现屏蔽取消的复杂逻辑。

作用:递减任务的“被请求取消”计数。如果一个任务在取消过程中又决定继续运行(例如,在 except CancelledError 中恢复了状态),它可以调用此方法来抵消之前的取消请求,从而避免自己被真正取消。

示例:

async def resilient_task():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("收到取消请求,但我打算忽略它!")
        # 做一些清理和恢复工作...
        task = asyncio.current_task()
        task.uncancel() # 抵消取消计数,任务将继续运行
        # 通常这里会重新抛出 CancelledError 如果不想真正取消的话

 

调试与 introspection 增强

asyncio.Task.get_name()asyncio.Task.set_name(): 3.8 就引入了任务命名,但在 3.12 中相关API更加成熟和完善,便于调试和日志记录。

 

3.11+跟新总结与推荐用法

版本 核心新增 推荐使用场景
3.11 TaskGroup, timeout(), ExceptionGroup 所有新的异步代码。使用 TaskGroup 替代 gather 来管理任务组,使用 timeout() 进行精细的超时控制。
3.12 Runner, 增强的 Server API, uncancel() 需要复用事件循环时用 Runner;构建网络服务器时使用新的 Server 方法;实现高级取消逻辑时用 uncancel

 

uvloop:提升性能

Python标准库中提供了asyncio模块,用于支持基于协程的异步编程。uvloopasyncio 中的事件循环的替代方案,替换后可以使得asyncio性能提高。事实上,uvloop要比nodejs、gevent等其他python异步框架至少要快2倍,性能可以比肩Go语言。

在项目中想要使用uvloop替换asyncio的事件循环也非常简单,只要在代码中这么做就行。

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
 
# 编写asyncio的代码
 
# 内部的事件循环自动化会变为uvloop
asyncio.run(...)

 

事件循环

参考文章:Python异步编程:事件循环解析

 

异步生成器

更多参考这篇文章:Python异步生成器

import asyncio

# 定义异步生成器
async def async_counter():
    for i in range(5):
        print(f"准备生成 {i}")
        await asyncio.sleep(1)  # 模拟I/O操作
        yield i
        print(f"已生成 {i}")

# 使用异步生成器
async def main():
    async for number in async_counter():
        print(f"收到: {number}")

# 运行事件循环
asyncio.run(main())

 

异步迭代器

更多参考这篇文章:异步迭代器

import asyncio

class AsyncCounter:
    def __init__(self, start, end):
        self.current = start
        self.end = end

    # 初始化异步迭代器
    def __aiter__(self):
        return self

    # 异步返回下一个值
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        self.current += 1
        await asyncio.sleep(1)  # 模拟异步任务
        return self.current


# 使用异步迭代器
async def main():
    async for number in AsyncCounter(1, 5):
        print(number)

# 运行异步任务
asyncio.run(main())

 

Python异步上下文:实现连接池管理

更多参考这篇文章:Python异步上下文:实现连接池管理

class AsyncContextManager:
    async def __aenter__(self):
        print("异步进入上下文")
        await asyncio.sleep(0.1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("异步退出上下文")
        await asyncio.sleep(0.1)

async def example():
    async with AsyncContextManager() as cm:
        print("在异步上下文中执行操作")

asyncio.run(example())

 

异步IO操作

跳转:Python异步IO、文件、网络、数据库

 

使用案例

异步Redis

当通过python去操作redis时,链接、设置值、获取值 这些都涉及网络IO请求,使用asycio异步的方式可以在IO等待时去做一些其他任务,从而提升性能。

import asyncio
import aioredis
 
 
async def execute(address, password):
    print("开始执行", address)
 
    # 网络IO操作:先去连接 47.93.4.197:6379,遇到IO则自动切换任务,去连接47.93.4.198:6379
    redis = await aioredis.create_redis_pool(address, password=password)
 
    # 网络IO操作:遇到IO会自动切换任务
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
 
    # 网络IO操作:遇到IO会自动切换任务
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
 
    redis.close()
    # 网络IO操作:遇到IO会自动切换任务
    await redis.wait_closed()
 
    print("结束", address)
 
 
task_list = [
    execute('redis://47.93.4.197:6379', "root!2345"),
    execute('redis://47.93.4.198:6379', "root!2345")
]
 
asyncio.run(asyncio.wait(task_list))

异步MySQL

当通过python去操作MySQL时,连接、执行SQL、关闭都涉及网络IO请求,使用asycio异步的方式可以在IO等待时去做一些其他任务,从而提升性能。

import asyncio
import aiomysql
 
 
async def execute(host, password):
    print("开始", host)
    # 网络IO操作:先去连接 47.93.40.197,遇到IO则自动切换任务,去连接47.93.40.198:6379
    conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
 
    # 网络IO操作:遇到IO会自动切换任务
    cur = await conn.cursor()
 
    # 网络IO操作:遇到IO会自动切换任务
    await cur.execute("SELECT Host,User FROM user")
 
    # 网络IO操作:遇到IO会自动切换任务
    result = await cur.fetchall()
    print(result)
 
    # 网络IO操作:遇到IO会自动切换任务
    await cur.close()
    conn.close()
    print("结束", host)
 
 
task_list = [
    execute('47.93.40.197', "root!2345"),
    execute('47.93.40.197', "root!2345")
]
 
asyncio.run(asyncio.wait(task_list))

FastAPI框架

FastAPI是一款用于构建API的高性能web框架,框架基于Python3.6+type hints搭建。

接下里的异步示例以FastAPIuvicorn来讲解(uvicorn是一个支持异步的asgi)。

import asyncio
 
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
 
app = FastAPI()
 
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10)
 
 
@app.get("/")
def index():
    """ 普通操作接口 """
    return {"message": "Hello World"}
 
 
@app.get("/red")
async def red():
    """ 异步操作接口 """
    
    print("请求来了")
 
    await asyncio.sleep(3)
    # 连接池获取一个连接
    conn = await REDIS_POOL.acquire()
    redis = Redis(conn)
 
    # 设置值
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
 
    # 读取值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
 
    # 连接归还连接池
    REDIS_POOL.release(conn)
 
    return result
 
 
if __name__ == '__main__':
    uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")

 

 

小贴士:避免异步编程中的坑

 

1. 滥用 await

问题:

在每个异步任务中频繁使用 await,可能会导致性能下降,因为每次 await 都会暂停当前协程并切换到事件循环。

如果多个任务之间没有依赖关系,最好使用 asyncio.gatherasyncio.create_task 来并发运行任务,而不是逐个 await

示例:

# 错误示例:任务逐个执行
async def process_tasks():
    await task1()
    await task2()
    await task3()

# 正确示例:任务并发运行
async def process_tasks():
    await asyncio.gather(task1(), task2(), task3())

解决方法:

如果任务之间没有依赖关系,尽量使用并发工具(如 asyncio.gather)来提高效率。

 

2. 未正确关闭异步资源

问题:

在异步编程中,你通常会使用异步资源(如数据库连接、HTTP 客户端会话等)。

如果没有正确关闭这些资源,可能会导致资源泄漏或连接池耗尽。

比如,使用 aiohttp.ClientSession 时,如果忘记关闭会话,可能会出现未释放的连接问题。

示例:

import aiohttp

async def fetch_data():
    session = aiohttp.ClientSession()  # 创建会话
    async with session.get("https://example.com") as response:
        data = await response.text()
    # 会话未关闭,可能导致资源泄漏

解决方法:

使用 async with 来管理资源,确保资源在使用结束后正确释放。

async def fetch_data():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as response:
            data = await response.text()

 

3. 忽视异常处理

问题:

异步任务中抛出的异常可能不会立即显现,尤其是使用 asyncio.gathercreate_task 时,异常可能被悄悄吞掉,导致程序行为异常或数据丢失。

如果不主动捕获异常,可能会错过重要的错误信息。

示例:

async def task_with_error():
    raise ValueError("任务失败")

async def main():
    await asyncio.gather(task_with_error(), another_task())

解决方法:

使用 try...except 捕获异常,并记录日志或采取补救措施。

async def main():
    try:
        await asyncio.gather(task_with_error(), another_task())
    except Exception as e:
        print(f"捕获到异常: {e}")

如果使用 create_task,可以为任务添加回调函数来处理异常:

async def task_with_error():
    raise ValueError("任务失败")

def handle_task_exception(task):
    try:
        task.result()
    except Exception as e:
        print(f"捕获到任务异常: {e}")

async def main():
    task = asyncio.create_task(task_with_error())
    task.add_done_callback(handle_task_exception)

 

4. 误解 asyncio.gather 的行为

问题:

asyncio.gather 会运行所有任务并收集它们的结果。如果其中一个任务失败,它会立即抛出异常并取消其他任务。

如果你希望即使某些任务失败,也能继续运行其他任务,需要特别处理。

解决方法:

使用 return_exceptions=True 参数,确保所有任务都能运行完毕,即使某些任务失败:

async def task1():
    return 1

async def task2():
    raise ValueError("任务失败")

async def main():
    results = await asyncio.gather(task1(), task2(), return_exceptions=True)
    print(results)  # 输出: [1, ValueError('任务失败')]

 

5. 事件循环嵌套

问题:

如果你在一个已经运行的事件循环中调用 asyncio.run,会导致 RuntimeError: This event loop is already running 错误。

这种情况通常发生在交互式环境(如 Jupyter Notebook)或某些异步框架中。

解决方法:

1、重构代码以避免多次启动事件循环

将异步操作封装为协程并通过 `await` 调用,确保整个程序仅在入口处调用一次 `asyncio.run`。

asyncio.run(async_task())
# 改为
await async_task()  # 直接 await 协程,而不是启动新循环

2、检查事件循环状态并动态处理

在调用 asyncio.run 前检查是否有运行中的事件循环,若有则直接运行协程或安排任务。

注意:在同步上下文中直接创建任务可能无法立即执行,需确保事件循环在运行。

async def async_task():
    print("执行异步任务")

def run_async_task():
    try:
        loop = asyncio.get_running_loop()
        # 已有循环运行,安排任务
        loop.create_task(async_task())
    except RuntimeError:
        # 无循环运行,启动新循环
        asyncio.run(async_task())

3、使用 nest_asyncio 库允许嵌套事件循环(不推荐生产环境使用)。

通过 nest_asyncio 修补事件循环以支持嵌套运行,适用于 Jupyter 等环境。

pip install nest_asyncio

import nest_asyncio
nest_asyncio.apply()

 

6. 任务未正确取消

问题:

异步任务可能需要被取消(例如超时或用户中断),但如果没有正确处理取消逻辑,可能会导致资源泄漏或程序挂起。

解决方法:

使用 asyncio.TimeoutErrorcancel 方法正确取消任务。

async def long_running_task():
    try:
        await asyncio.sleep(10)  # 模拟长时间运行的任务
    except asyncio.CancelledError:
        print("任务被取消")
        raise

async def main():
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(1)
    task.cancel()  # 取消任务
    await task  # 等待任务完成

 

7. 滥用线程池

问题:

异步编程主要用于 I/O 密集型任务,但有时需要处理 CPU 密集型任务(如图像处理或复杂计算)或在协程中调用阻塞性的同步函数(如 time.sleep(), requests.get())会阻塞整个事件循环,破坏并发性。必须将它们放到单独的线程中运行。

Python 3.9+ 推荐方式:asyncio.to_thread()

使用 ThreadPoolExecutor 时也要小心,线程池的大小需要合理配置,否则可能导致性能问题。

解决方法:

使用 asyncio.to_thread 或显式的线程池来运行 CPU 密集型任务。

import asyncio

def cpu_intensive_task():
    return sum(range(10**6))

async def main():
    result = await asyncio.to_thread(cpu_intensive_task)
    print(result)
asyncio.run(main())

底层方式:loop.run_in_executor()

# 在协程内部使用
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, cpu_intensive_task) # None 表示使用默认的线程池执行器

 

8. 未正确处理协程生命周期

问题:

如果一个协程被创建但没有被 await 或添加到事件循环中,它将永远不会执行。

这种情况通常发生在忘记 await 或错误地创建了协程但没有运行它。

解决方法:

确保所有协程都被正确 await 或通过 asyncio.create_task 添加到事件循环中。

async def my_task():
    print("任务开始")
    await asyncio.sleep(1)
    print("任务结束")

# 错误:协程未运行
my_task()

# 正确:使用 await 或 create_task
await my_task()
asyncio.create_task(my_task())

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

YXN-python

2025-02-28