您现在的位置是:网站首页 > 博客日记 >

详谈python异步编程

作者:YXN-python 阅读量:102 发布日期:2025-02-28

理解并发和并行

要理解asyncio运行原理,首先我们绕不开的概念就是要理解并发和并行,很多人可能经常听说这2个概念,但可能实际用的时候还比较模糊,今天我们就把这2个概念讲明白:

1. 并发:

  • 并发是指系统可以同时管理多个任务,但这些任务并不真正同时运行
  • 在 Python 的 asyncio 中,所有任务运行在单个线程和单个 CPU 核心上,通过事件循环在任务之间快速切换,当某个任务遇到 I/O(例如等待文件读写或网络请求)时,其他任务可以继续执行。

如果看完上面的不太明白,可以想一下小学教的时间利用,假设你要做两件事,煮饭和煮菜,分别使用电饭锅和燃气灶,电饭锅和燃气灶可以同时做饭和做菜,但是你是在淘米后再做的菜,是在饭在电饭锅中煮时候的间隙时间,这就是并发。

2. 并行:

  • 并行指真正的同时执行多个任务,比如利用多线程或多进程让多个任务同时在多个 CPU 核心上运行。

同样我在讲个例子,如果有2个人,一个人去做饭,一个人去做菜,这就是并行,因为2个人同时做了不同的事情。

 

提问环节

asyncio是并发还是并行?

是并发。因为asyncio 使用的是单线程事件循环,虽然它能通过任务间的切换实现“同时运行”的效果,但本质上同一时刻只有一个任务在运行asyncio 不会使用多个线程或多个 CPU 核心,因此它不是并行的

 

为什么asyncio只能使用单 CPU 核心?我主机有8核,为什么不能使用?

因为asyncio 本质上是一种 协程 的实现,依赖单线程的 事件循环,即使你的主机有 8 核,asyncio 依然只能使用 一个线程,因此只能运行在一个 CPU 核心上。

 

为什么协程 就只能在一个CPU核心上运行?

理解了asyncio是协程的实现,但协程为什么就只能在一个cpu核心上运行呢?要回答这个问题,就不得不说协程的本质了。

1. 协程的本质

协程是一种轻量级的用户态线程,它依赖 事件循环 来管理任务的调度和切换,所有协程都在同一个线程中运行。

  • 单线程事件循环: 协程是基于单线程的机制,所有任务的执行都由同一个事件循环调度,因此,事件循环只能运行在一个 CPU 核心上,协程的运行也被限制在这个核心中。
  • 非抢占式调度: 协程的调度是 非抢占式 的,即任务之间的切换是由代码中的 await、yield 等显式控制的,只有当任务主动交出控制权时,事件循环才会切换到其他协程运行。

这意味着协程只能依赖单线程,无法直接利用多核 CPU 的并行计算能力。

 

为什么协程不支持多核?

因为协程的目标是简单性和效率,所以协程的目标是为I/O密集型任务 提供高效的并发处理,而不是多核并行计算,一句话解答了所有疑惑,因为协程压根就不是为多核进行设计的,人家就是为I/O任务准备的。

 

异步编程的关键组件

1、协程(Coroutines)

  • 定义:使用 async def 定义的函数。
  • 特性:可以暂停和恢复执行,使用 await 关键字。

示例:简单协程

import asyncio

async def greet(name):
    print(f"Hello, {name}!")
    await asyncio.sleep(1)
    print(f"Goodbye, {name}!")

2、事件循环(Event Loop)

更多参考文章:Python异步编程:事件循环解析

  • 作用:管理和调度协程的执行。
  • 获取方式:使用 asyncio.get_event_loop()asyncio.run() 自动管理。

3、任务(Tasks)

Task用于并发调度协程, 通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行, 除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task() ensure_future() 函数。不建议手动实例化 Task 对象。

本质上是将协程对象封装成task对象,并将协程立即加入事件循环,同时追踪协程的状态。

注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。

  • 定义:协程的封装,允许并发执行。
  • 创建方式:asyncio.create_task(coroutine)asyncio.ensure_future(coroutine)

示例:创建任务

async def main():
    task1 = asyncio.create_task(greet("Alice"))
    task2 = asyncio.create_task(greet("Bob"))
    await task1
    await task2

4、Future 对象

Future对象是一个相对更偏向底层的可对象,通常我们不会直接用到这个对象,而是直接使用Task对象来完成任务的并和状态的追踪.(Task Futrue的子类)Future为我们提供了异步编程中的 最终结果 的处理(Task类也具备状态处理的功能)

  • 定义:表示一个将来会完成的操作。
  • 用途:协程之间的通信和结果传递。
  • 创建方式:asyncio.Future()

示例:使用 Future

# 这里的fut是一个asyncio.Future对象,用于存储结果
async def set_future(fut, delay, value):
    await asyncio.sleep(delay)
    fut.set_result(value) # 设置Future的结果,通知等待它的协程任务继续执行,调用这个函数,状态就从pending变成done,等待它的所有协程会被事件循环恢复执行

async def main():
    fut = asyncio.Future() # 创建一个Future对象,用于表示一个尚未完成的操作,初始状态下,fut是”挂起“pending状态
    asyncio.create_task(set_future(fut, 2, "Future Result"))# 启动一个并发任务,运行set_future协程,任务将在2秒后调用set_result设置结果
    result = await fut # 挂起当前协程main,直到fut的结果被设置
    print(result)

关键语法:async 和 await

async

  • 用途:定义一个协程函数。

await

  • 用途:等待一个协程或可等待对象完成。
  • 限制:只能在协程内部使用。

示例:

async def my_coroutine():
    await asyncio.sleep(1)

asyncio.run() 的作用

  • 功能:
    • 创建并运行事件循环。
    • 执行顶层协程。
    • 关闭事件循环。
  • 语法:asyncio.run(main())

示例:

async def main():
    print("Hello, Async World!")

asyncio.run(main())

 

uvloop:提升性能

Python标准库中提供了asyncio模块,用于支持基于协程的异步编程。uvloopasyncio 中的事件循环的替代方案,替换后可以使得asyncio性能提高。事实上,uvloop要比nodejs、gevent等其他python异步框架至少要快2倍,性能可以比肩Go语言。

在项目中想要使用uvloop替换asyncio的事件循环也非常简单,只要在代码中这么做就行。

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
 
# 编写asyncio的代码
 
# 内部的事件循环自动化会变为uvloop
asyncio.run(...)

 

事件循环

参考文章:Python异步编程:事件循环解析

 

异步生成器

更多参考这篇文章:Python异步生成器

import asyncio

# 定义异步生成器
async def async_counter():
    for i in range(5):
        print(f"准备生成 {i}")
        await asyncio.sleep(1)  # 模拟I/O操作
        yield i
        print(f"已生成 {i}")

# 使用异步生成器
async def main():
    async for number in async_counter():
        print(f"收到: {number}")

# 运行事件循环
asyncio.run(main())

 

异步迭代器

更多参考这篇文章:异步迭代器

import asyncio

class AsyncCounter:
    def __init__(self, start, end):
        self.current = start
        self.end = end

    # 初始化异步迭代器
    def __aiter__(self):
        return self

    # 异步返回下一个值
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        self.current += 1
        await asyncio.sleep(1)  # 模拟异步任务
        return self.current


# 使用异步迭代器
async def main():
    async for number in AsyncCounter(1, 5):
        print(number)

# 运行异步任务
asyncio.run(main())

 

Python异步上下文:实现连接池管理

更多参考这篇文章:Python异步上下文:实现连接池管理

class AsyncContextManager:
    async def __aenter__(self):
        print("异步进入上下文")
        await asyncio.sleep(0.1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("异步退出上下文")
        await asyncio.sleep(0.1)

async def example():
    async with AsyncContextManager() as cm:
        print("在异步上下文中执行操作")

asyncio.run(example())

 

异步IO操作

跳转:Python异步IO、文件、网络、数据库

 

使用案例

异步Redis

当通过python去操作redis时,链接、设置值、获取值 这些都涉及网络IO请求,使用asycio异步的方式可以在IO等待时去做一些其他任务,从而提升性能。

import asyncio
import aioredis
 
 
async def execute(address, password):
    print("开始执行", address)
 
    # 网络IO操作:先去连接 47.93.4.197:6379,遇到IO则自动切换任务,去连接47.93.4.198:6379
    redis = await aioredis.create_redis_pool(address, password=password)
 
    # 网络IO操作:遇到IO会自动切换任务
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
 
    # 网络IO操作:遇到IO会自动切换任务
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
 
    redis.close()
    # 网络IO操作:遇到IO会自动切换任务
    await redis.wait_closed()
 
    print("结束", address)
 
 
task_list = [
    execute('redis://47.93.4.197:6379', "root!2345"),
    execute('redis://47.93.4.198:6379', "root!2345")
]
 
asyncio.run(asyncio.wait(task_list))

异步MySQL

当通过python去操作MySQL时,连接、执行SQL、关闭都涉及网络IO请求,使用asycio异步的方式可以在IO等待时去做一些其他任务,从而提升性能。

import asyncio
import aiomysql
 
 
async def execute(host, password):
    print("开始", host)
    # 网络IO操作:先去连接 47.93.40.197,遇到IO则自动切换任务,去连接47.93.40.198:6379
    conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
 
    # 网络IO操作:遇到IO会自动切换任务
    cur = await conn.cursor()
 
    # 网络IO操作:遇到IO会自动切换任务
    await cur.execute("SELECT Host,User FROM user")
 
    # 网络IO操作:遇到IO会自动切换任务
    result = await cur.fetchall()
    print(result)
 
    # 网络IO操作:遇到IO会自动切换任务
    await cur.close()
    conn.close()
    print("结束", host)
 
 
task_list = [
    execute('47.93.40.197', "root!2345"),
    execute('47.93.40.197', "root!2345")
]
 
asyncio.run(asyncio.wait(task_list))

FastAPI框架

FastAPI是一款用于构建API的高性能web框架,框架基于Python3.6+type hints搭建。

接下里的异步示例以FastAPIuvicorn来讲解(uvicorn是一个支持异步的asgi)。

import asyncio
 
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
 
app = FastAPI()
 
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10)
 
 
@app.get("/")
def index():
    """ 普通操作接口 """
    return {"message": "Hello World"}
 
 
@app.get("/red")
async def red():
    """ 异步操作接口 """
    
    print("请求来了")
 
    await asyncio.sleep(3)
    # 连接池获取一个连接
    conn = await REDIS_POOL.acquire()
    redis = Redis(conn)
 
    # 设置值
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
 
    # 读取值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
 
    # 连接归还连接池
    REDIS_POOL.release(conn)
 
    return result
 
 
if __name__ == '__main__':
    uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")

 

 

 

 

 

 

 

 

小贴士:避免异步编程中的坑

 

1. 滥用 await

问题:

在每个异步任务中频繁使用 await,可能会导致性能下降,因为每次 await 都会暂停当前协程并切换到事件循环。

如果多个任务之间没有依赖关系,最好使用 asyncio.gatherasyncio.create_task 来并发运行任务,而不是逐个 await

示例:

# 错误示例:任务逐个执行
async def process_tasks():
    await task1()
    await task2()
    await task3()

# 正确示例:任务并发运行
async def process_tasks():
    await asyncio.gather(task1(), task2(), task3())

解决方法:

如果任务之间没有依赖关系,尽量使用并发工具(如 asyncio.gather)来提高效率。

 

2. 未正确关闭异步资源

问题:

在异步编程中,你通常会使用异步资源(如数据库连接、HTTP 客户端会话等)。

如果没有正确关闭这些资源,可能会导致资源泄漏或连接池耗尽。

比如,使用 aiohttp.ClientSession 时,如果忘记关闭会话,可能会出现未释放的连接问题。

示例:

import aiohttp

async def fetch_data():
    session = aiohttp.ClientSession()  # 创建会话
    async with session.get("https://example.com") as response:
        data = await response.text()
    # 会话未关闭,可能导致资源泄漏

解决方法:

使用 async with 来管理资源,确保资源在使用结束后正确释放。

async def fetch_data():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as response:
            data = await response.text()

 

3. 忽视异常处理

问题:

异步任务中抛出的异常可能不会立即显现,尤其是使用 asyncio.gathercreate_task 时,异常可能被悄悄吞掉,导致程序行为异常或数据丢失。

如果不主动捕获异常,可能会错过重要的错误信息。

示例:

async def task_with_error():
    raise ValueError("任务失败")

async def main():
    await asyncio.gather(task_with_error(), another_task())

解决方法:

使用 try...except 捕获异常,并记录日志或采取补救措施。

async def main():
    try:
        await asyncio.gather(task_with_error(), another_task())
    except Exception as e:
        print(f"捕获到异常: {e}")

如果使用 create_task,可以为任务添加回调函数来处理异常:

async def task_with_error():
    raise ValueError("任务失败")

def handle_task_exception(task):
    try:
        task.result()
    except Exception as e:
        print(f"捕获到任务异常: {e}")

async def main():
    task = asyncio.create_task(task_with_error())
    task.add_done_callback(handle_task_exception)

 

4. 误解 asyncio.gather 的行为

问题:

asyncio.gather 会运行所有任务并收集它们的结果。如果其中一个任务失败,它会立即抛出异常并取消其他任务。

如果你希望即使某些任务失败,也能继续运行其他任务,需要特别处理。

解决方法:

使用 return_exceptions=True 参数,确保所有任务都能运行完毕,即使某些任务失败:

async def task1():
    return 1

async def task2():
    raise ValueError("任务失败")

async def main():
    results = await asyncio.gather(task1(), task2(), return_exceptions=True)
    print(results)  # 输出: [1, ValueError('任务失败')]

 

5. 事件循环嵌套

问题:

如果你在一个已经运行的事件循环中调用 asyncio.run,会导致 RuntimeError: This event loop is already running 错误。

这种情况通常发生在交互式环境(如 Jupyter Notebook)或某些异步框架中。

解决方法:

1、重构代码以避免多次启动事件循环

将异步操作封装为协程并通过 `await` 调用,确保整个程序仅在入口处调用一次 `asyncio.run`。

asyncio.run(async_task())
# 改为
await async_task()  # 直接 await 协程,而不是启动新循环

2、检查事件循环状态并动态处理

在调用 asyncio.run 前检查是否有运行中的事件循环,若有则直接运行协程或安排任务。

注意:在同步上下文中直接创建任务可能无法立即执行,需确保事件循环在运行。

async def async_task():
    print("执行异步任务")

def run_async_task():
    try:
        loop = asyncio.get_running_loop()
        # 已有循环运行,安排任务
        loop.create_task(async_task())
    except RuntimeError:
        # 无循环运行,启动新循环
        asyncio.run(async_task())

3、使用 nest_asyncio 库允许嵌套事件循环(不推荐生产环境使用)。

通过 nest_asyncio 修补事件循环以支持嵌套运行,适用于 Jupyter 等环境。

pip install nest_asyncio

import nest_asyncio
nest_asyncio.apply()

 

6. 任务未正确取消

问题:

异步任务可能需要被取消(例如超时或用户中断),但如果没有正确处理取消逻辑,可能会导致资源泄漏或程序挂起。

解决方法:

使用 asyncio.TimeoutErrorcancel 方法正确取消任务。

async def long_running_task():
    try:
        await asyncio.sleep(10)  # 模拟长时间运行的任务
    except asyncio.CancelledError:
        print("任务被取消")
        raise

async def main():
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(1)
    task.cancel()  # 取消任务
    await task  # 等待任务完成

 

7. 滥用线程池

问题:

异步编程主要用于 I/O 密集型任务,但有时需要处理 CPU 密集型任务(如图像处理或复杂计算)。如果你直接在协程中运行这些任务,可能会阻塞事件循环

使用 ThreadPoolExecutor 时也要小心,线程池的大小需要合理配置,否则可能导致性能问题。

解决方法:

使用 asyncio.to_thread 或显式的线程池来运行 CPU 密集型任务。

import asyncio

def cpu_intensive_task():
    return sum(range(10**6))

async def main():
    result = await asyncio.to_thread(cpu_intensive_task)
    print(result)
asyncio.run(main())

 

8. 未正确处理协程生命周期

问题:

如果一个协程被创建但没有被 await 或添加到事件循环中,它将永远不会执行。

这种情况通常发生在忘记 await 或错误地创建了协程但没有运行它。

解决方法:

确保所有协程都被正确 await 或通过 asyncio.create_task 添加到事件循环中。

async def my_task():
    print("任务开始")
    await asyncio.sleep(1)
    print("任务结束")

# 错误:协程未运行
my_task()

# 正确:使用 await 或 create_task
await my_task()
asyncio.create_task(my_task())

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

YXN-python

2025-02-28