详谈python异步编程
作者:YXN-python 阅读量:102 发布日期:2025-02-28
理解并发和并行
要理解asyncio运行原理,首先我们绕不开的概念就是要理解并发和并行,很多人可能经常听说这2个概念,但可能实际用的时候还比较模糊,今天我们就把这2个概念讲明白:
1. 并发:
- 并发是指系统可以同时管理多个任务,但这些任务并不真正同时运行。
- 在 Python 的 asyncio 中,所有任务运行在单个线程和单个 CPU 核心上,通过事件循环在任务之间快速切换,当某个任务遇到 I/O(例如等待文件读写或网络请求)时,其他任务可以继续执行。
如果看完上面的不太明白,可以想一下小学教的时间利用,假设你要做两件事,煮饭和煮菜,分别使用电饭锅和燃气灶,电饭锅和燃气灶可以同时做饭和做菜,但是你是在淘米后再做的菜,是在饭在电饭锅中煮时候的间隙时间,这就是并发。
2. 并行:
- 并行指真正的同时执行多个任务,比如利用多线程或多进程让多个任务同时在多个 CPU 核心上运行。
同样我在讲个例子,如果有2个人,一个人去做饭,一个人去做菜,这就是并行,因为2个人同时做了不同的事情。
提问环节
asyncio是并发还是并行?
是并发。因为asyncio 使用的是单线程事件循环,虽然它能通过任务间的切换实现“同时运行”的效果,但本质上同一时刻只有一个任务在运行,asyncio 不会使用多个线程或多个 CPU 核心,因此它不是并行的。
为什么asyncio只能使用单 CPU 核心?我主机有8核,为什么不能使用?
因为asyncio 本质上是一种 协程 的实现,依赖单线程的 事件循环,即使你的主机有 8 核,asyncio 依然只能使用 一个线程,因此只能运行在一个 CPU 核心上。
为什么协程 就只能在一个CPU核心上运行?
理解了asyncio是协程的实现,但协程为什么就只能在一个cpu核心上运行呢?要回答这个问题,就不得不说协程的本质了。
1. 协程的本质
协程是一种轻量级的用户态线程,它依赖 事件循环 来管理任务的调度和切换,所有协程都在同一个线程中运行。
- 单线程事件循环: 协程是基于单线程的机制,所有任务的执行都由同一个事件循环调度,因此,事件循环只能运行在一个 CPU 核心上,协程的运行也被限制在这个核心中。
- 非抢占式调度: 协程的调度是 非抢占式 的,即任务之间的切换是由代码中的 await、yield 等显式控制的,只有当任务主动交出控制权时,事件循环才会切换到其他协程运行。
这意味着协程只能依赖单线程,无法直接利用多核 CPU 的并行计算能力。
为什么协程不支持多核?
因为协程的目标是简单性和效率,所以协程的目标是为I/O密集型任务 提供高效的并发处理,而不是多核并行计算,一句话解答了所有疑惑,因为协程压根就不是为多核进行设计的,人家就是为I/O任务准备的。
异步编程的关键组件
1、协程(Coroutines)
- 定义:使用 async def 定义的函数。
- 特性:可以暂停和恢复执行,使用 await 关键字。
示例:简单协程
import asyncio
async def greet(name):
print(f"Hello, {name}!")
await asyncio.sleep(1)
print(f"Goodbye, {name}!")
2、事件循环(Event Loop)
更多参考文章:Python异步编程:事件循环解析
- 作用:管理和调度协程的执行。
- 获取方式:使用 asyncio.get_event_loop() 或 asyncio.run() 自动管理。
3、任务(Tasks)
Task用于并发调度协程, 通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行, 除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。
本质上是将协程对象封装成task对象,并将协程立即加入事件循环,同时追踪协程的状态。
注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。
- 定义:协程的封装,允许并发执行。
- 创建方式:asyncio.create_task(coroutine) 或 asyncio.ensure_future(coroutine)
示例:创建任务
async def main():
task1 = asyncio.create_task(greet("Alice"))
task2 = asyncio.create_task(greet("Bob"))
await task1
await task2
4、Future 对象
Future对象是一个相对更偏向底层的可对象,通常我们不会直接用到这个对象,而是直接使用Task对象来完成任务的并和状态的追踪.(Task 是 Futrue的子类)Future为我们提供了异步编程中的 最终结果 的处理(Task类也具备状态处理的功能)
- 定义:表示一个将来会完成的操作。
- 用途:协程之间的通信和结果传递。
- 创建方式:asyncio.Future()
示例:使用 Future
# 这里的fut是一个asyncio.Future对象,用于存储结果
async def set_future(fut, delay, value):
await asyncio.sleep(delay)
fut.set_result(value) # 设置Future的结果,通知等待它的协程任务继续执行,调用这个函数,状态就从pending变成done,等待它的所有协程会被事件循环恢复执行
async def main():
fut = asyncio.Future() # 创建一个Future对象,用于表示一个尚未完成的操作,初始状态下,fut是”挂起“pending状态
asyncio.create_task(set_future(fut, 2, "Future Result"))# 启动一个并发任务,运行set_future协程,任务将在2秒后调用set_result设置结果
result = await fut # 挂起当前协程main,直到fut的结果被设置
print(result)
关键语法:async 和 await
async
- 用途:定义一个协程函数。
await
- 用途:等待一个协程或可等待对象完成。
- 限制:只能在协程内部使用。
示例:
async def my_coroutine():
await asyncio.sleep(1)
asyncio.run() 的作用
- 功能:
-
- 创建并运行事件循环。
- 执行顶层协程。
- 关闭事件循环。
- 语法:asyncio.run(main())
示例:
async def main():
print("Hello, Async World!")
asyncio.run(main())
uvloop:提升性能
Python标准库中提供了asyncio模块,用于支持基于协程的异步编程。uvloop是 asyncio 中的事件循环的替代方案,替换后可以使得asyncio性能提高。事实上,uvloop要比nodejs、gevent等其他python异步框架至少要快2倍,性能可以比肩Go语言。
在项目中想要使用uvloop替换asyncio的事件循环也非常简单,只要在代码中这么做就行。
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写asyncio的代码
# 内部的事件循环自动化会变为uvloop
asyncio.run(...)
事件循环
参考文章:Python异步编程:事件循环解析
异步生成器
更多参考这篇文章:Python异步生成器
import asyncio
# 定义异步生成器
async def async_counter():
for i in range(5):
print(f"准备生成 {i}")
await asyncio.sleep(1) # 模拟I/O操作
yield i
print(f"已生成 {i}")
# 使用异步生成器
async def main():
async for number in async_counter():
print(f"收到: {number}")
# 运行事件循环
asyncio.run(main())
异步迭代器
更多参考这篇文章:异步迭代器
import asyncio
class AsyncCounter:
def __init__(self, start, end):
self.current = start
self.end = end
# 初始化异步迭代器
def __aiter__(self):
return self
# 异步返回下一个值
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
self.current += 1
await asyncio.sleep(1) # 模拟异步任务
return self.current
# 使用异步迭代器
async def main():
async for number in AsyncCounter(1, 5):
print(number)
# 运行异步任务
asyncio.run(main())
Python异步上下文:实现连接池管理
更多参考这篇文章:Python异步上下文:实现连接池管理
class AsyncContextManager:
async def __aenter__(self):
print("异步进入上下文")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("异步退出上下文")
await asyncio.sleep(0.1)
async def example():
async with AsyncContextManager() as cm:
print("在异步上下文中执行操作")
asyncio.run(example())
异步IO操作
跳转:Python异步IO、文件、网络、数据库
使用案例
异步Redis
当通过python去操作redis时,链接、设置值、获取值 这些都涉及网络IO请求,使用asycio异步的方式可以在IO等待时去做一些其他任务,从而提升性能。
import asyncio
import aioredis
async def execute(address, password):
print("开始执行", address)
# 网络IO操作:先去连接 47.93.4.197:6379,遇到IO则自动切换任务,去连接47.93.4.198:6379
redis = await aioredis.create_redis_pool(address, password=password)
# 网络IO操作:遇到IO会自动切换任务
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO操作:遇到IO会自动切换任务
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 网络IO操作:遇到IO会自动切换任务
await redis.wait_closed()
print("结束", address)
task_list = [
execute('redis://47.93.4.197:6379', "root!2345"),
execute('redis://47.93.4.198:6379', "root!2345")
]
asyncio.run(asyncio.wait(task_list))
异步MySQL
当通过python去操作MySQL时,连接、执行SQL、关闭都涉及网络IO请求,使用asycio异步的方式可以在IO等待时去做一些其他任务,从而提升性能。
import asyncio
import aiomysql
async def execute(host, password):
print("开始", host)
# 网络IO操作:先去连接 47.93.40.197,遇到IO则自动切换任务,去连接47.93.40.198:6379
conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
# 网络IO操作:遇到IO会自动切换任务
cur = await conn.cursor()
# 网络IO操作:遇到IO会自动切换任务
await cur.execute("SELECT Host,User FROM user")
# 网络IO操作:遇到IO会自动切换任务
result = await cur.fetchall()
print(result)
# 网络IO操作:遇到IO会自动切换任务
await cur.close()
conn.close()
print("结束", host)
task_list = [
execute('47.93.40.197', "root!2345"),
execute('47.93.40.197', "root!2345")
]
asyncio.run(asyncio.wait(task_list))
FastAPI框架
FastAPI是一款用于构建API的高性能web框架,框架基于Python3.6+的 type hints搭建。
接下里的异步示例以FastAPI和uvicorn来讲解(uvicorn是一个支持异步的asgi)。
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10)
@app.get("/")
def index():
""" 普通操作接口 """
return {"message": "Hello World"}
@app.get("/red")
async def red():
""" 异步操作接口 """
print("请求来了")
await asyncio.sleep(3)
# 连接池获取一个连接
conn = await REDIS_POOL.acquire()
redis = Redis(conn)
# 设置值
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 读取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
# 连接归还连接池
REDIS_POOL.release(conn)
return result
if __name__ == '__main__':
uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")
小贴士:避免异步编程中的坑
1. 滥用 await
问题:
在每个异步任务中频繁使用 await,可能会导致性能下降,因为每次 await 都会暂停当前协程并切换到事件循环。
如果多个任务之间没有依赖关系,最好使用 asyncio.gather 或 asyncio.create_task 来并发运行任务,而不是逐个 await。
示例:
# 错误示例:任务逐个执行
async def process_tasks():
await task1()
await task2()
await task3()
# 正确示例:任务并发运行
async def process_tasks():
await asyncio.gather(task1(), task2(), task3())
解决方法:
如果任务之间没有依赖关系,尽量使用并发工具(如 asyncio.gather)来提高效率。
2. 未正确关闭异步资源
问题:
在异步编程中,你通常会使用异步资源(如数据库连接、HTTP 客户端会话等)。
如果没有正确关闭这些资源,可能会导致资源泄漏或连接池耗尽。
比如,使用 aiohttp.ClientSession 时,如果忘记关闭会话,可能会出现未释放的连接问题。
示例:
import aiohttp
async def fetch_data():
session = aiohttp.ClientSession() # 创建会话
async with session.get("https://example.com") as response:
data = await response.text()
# 会话未关闭,可能导致资源泄漏
解决方法:
使用 async with 来管理资源,确保资源在使用结束后正确释放。
async def fetch_data():
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as response:
data = await response.text()
3. 忽视异常处理
问题:
异步任务中抛出的异常可能不会立即显现,尤其是使用 asyncio.gather 或 create_task 时,异常可能被悄悄吞掉,导致程序行为异常或数据丢失。
如果不主动捕获异常,可能会错过重要的错误信息。
示例:
async def task_with_error():
raise ValueError("任务失败")
async def main():
await asyncio.gather(task_with_error(), another_task())
解决方法:
使用 try...except 捕获异常,并记录日志或采取补救措施。
async def main():
try:
await asyncio.gather(task_with_error(), another_task())
except Exception as e:
print(f"捕获到异常: {e}")
如果使用 create_task,可以为任务添加回调函数来处理异常:
async def task_with_error():
raise ValueError("任务失败")
def handle_task_exception(task):
try:
task.result()
except Exception as e:
print(f"捕获到任务异常: {e}")
async def main():
task = asyncio.create_task(task_with_error())
task.add_done_callback(handle_task_exception)
4. 误解 asyncio.gather 的行为
问题:
asyncio.gather 会运行所有任务并收集它们的结果。如果其中一个任务失败,它会立即抛出异常并取消其他任务。
如果你希望即使某些任务失败,也能继续运行其他任务,需要特别处理。
解决方法:
使用 return_exceptions=True 参数,确保所有任务都能运行完毕,即使某些任务失败:
async def task1():
return 1
async def task2():
raise ValueError("任务失败")
async def main():
results = await asyncio.gather(task1(), task2(), return_exceptions=True)
print(results) # 输出: [1, ValueError('任务失败')]
5. 事件循环嵌套
问题:
如果你在一个已经运行的事件循环中调用 asyncio.run,会导致 RuntimeError: This event loop is already running 错误。
这种情况通常发生在交互式环境(如 Jupyter Notebook)或某些异步框架中。
解决方法:
1、重构代码以避免多次启动事件循环
将异步操作封装为协程并通过 `await` 调用,确保整个程序仅在入口处调用一次 `asyncio.run`。
asyncio.run(async_task())
# 改为
await async_task() # 直接 await 协程,而不是启动新循环
2、检查事件循环状态并动态处理
在调用 asyncio.run 前检查是否有运行中的事件循环,若有则直接运行协程或安排任务。
注意:在同步上下文中直接创建任务可能无法立即执行,需确保事件循环在运行。
async def async_task():
print("执行异步任务")
def run_async_task():
try:
loop = asyncio.get_running_loop()
# 已有循环运行,安排任务
loop.create_task(async_task())
except RuntimeError:
# 无循环运行,启动新循环
asyncio.run(async_task())
3、使用 nest_asyncio 库允许嵌套事件循环(不推荐生产环境使用)。
通过 nest_asyncio 修补事件循环以支持嵌套运行,适用于 Jupyter 等环境。
pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()
6. 任务未正确取消
问题:
异步任务可能需要被取消(例如超时或用户中断),但如果没有正确处理取消逻辑,可能会导致资源泄漏或程序挂起。
解决方法:
使用 asyncio.TimeoutError 或 cancel 方法正确取消任务。
async def long_running_task():
try:
await asyncio.sleep(10) # 模拟长时间运行的任务
except asyncio.CancelledError:
print("任务被取消")
raise
async def main():
task = asyncio.create_task(long_running_task())
await asyncio.sleep(1)
task.cancel() # 取消任务
await task # 等待任务完成
7. 滥用线程池
问题:
异步编程主要用于 I/O 密集型任务,但有时需要处理 CPU 密集型任务(如图像处理或复杂计算)。如果你直接在协程中运行这些任务,可能会阻塞事件循环。
使用 ThreadPoolExecutor 时也要小心,线程池的大小需要合理配置,否则可能导致性能问题。
解决方法:
使用 asyncio.to_thread 或显式的线程池来运行 CPU 密集型任务。
import asyncio
def cpu_intensive_task():
return sum(range(10**6))
async def main():
result = await asyncio.to_thread(cpu_intensive_task)
print(result)
asyncio.run(main())
8. 未正确处理协程生命周期
问题:
如果一个协程被创建但没有被 await 或添加到事件循环中,它将永远不会执行。
这种情况通常发生在忘记 await 或错误地创建了协程但没有运行它。
解决方法:
确保所有协程都被正确 await 或通过 asyncio.create_task 添加到事件循环中。
async def my_task():
print("任务开始")
await asyncio.sleep(1)
print("任务结束")
# 错误:协程未运行
my_task()
# 正确:使用 await 或 create_task
await my_task()
asyncio.create_task(my_task())
YXN-python
2025-02-28