异步迭代器
作者:YXN-python 阅读量:23 发布日期:2024-12-19
特点
- 延迟计算:返回的值是异步获取的,可以通过 `await` 等待其完成。
- 使用 `async` 和 `await`:异步迭代器通常需要配合 `async` 和 `await` 关键字来使用。
- 支持异步 for 循环:你可以使用 `async for` 来迭代异步迭代器,这样就能在异步环境中逐个获取值。
基本使用
创建异步迭代器
一个简单的异步迭代器通常需要实现__aiter__() 和__anext__() 方法。
import asyncio
class AsyncCounter:
def __init__(self, start, end):
self.current = start
self.end = end
# 初始化异步迭代器
def __aiter__(self):
return self
# 异步返回下一个值
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
self.current += 1
await asyncio.sleep(1) # 模拟异步任务
return self.current
# 使用异步迭代器
async def main():
async for number in AsyncCounter(1, 5):
print(number)
# 运行异步任务
asyncio.run(main())
应用场景
1. 处理大规模异步数据流
在一些场景中,我们可能需要从外部资源(比如数据库、文件或者远程API)异步地读取大量数据。异步迭代器可以帮助我们分批次、延迟加载这些数据,从而避免内存溢出和提高效率。
例如,异步地读取一个大型文件:
import aiofiles
import asyncio
class AsyncFileIterator:
def __init__(self, file_path):
self.file_path = file_path
self.file = None
def __aiter__(self):
return self
# __anext__ 是异步方法,负责读取文件中的一行
async def __anext__(self):
if self.file is None: # 在第一次迭代时打开文件
self.file = await aiofiles.open(self.file_path, mode='r')
line = await self.file.readline()
if not line:
await self.file.close() # 文件读取完毕,关闭文件
raise StopAsyncIteration # 文件读取完毕,停止迭代
return line.strip() # 返回去掉换行符的行
# 使用异步文件迭代器
async def main():
async for line in AsyncFileIterator('codebook_min.txt'):
print(line)
# 运行异步任务
asyncio.run(main())
当然,也可以:
import asyncio
import aiofiles
async def read_large_file(file_path):
async with aiofiles.open(file_path, mode='r') as file:
async for line in file:
yield line.strip() # 使用 yield 返回每一行的内容
async def process_file():
async for line in read_large_file("codebook_min.txt"):
print(line) # 处理每一行内容
# 运行异步事件循环
asyncio.run(process_file())
2. 处理异步请求和响应
异步迭代器也能应用在异步请求和响应的场景中,比如从网络API异步地获取数据。你可以将数据请求分批次进行迭代,避免同步阻塞。
import asyncio
import aiohttp
class AsyncAPIClient:
def __init__(self, base_url, max_requests=5):
self.base_url = base_url
self.max_requests = max_requests # 最大请求次数
self.request_count = 0 # 请求计数器
self.session = None
def __aiter__(self):
return self
async def __anext__(self):
# 如果达到请求次数限制,抛出 StopAsyncIteration
if self.request_count >= self.max_requests:
raise StopAsyncIteration
if not self.session:
self.session = aiohttp.ClientSession()
async with self.session.get(self.base_url) as response:
data = await response.json()
self.request_count += 1 # 增加请求计数
if not data:
await self.session.close()
raise StopAsyncIteration
return data
async def close(self):
if self.session:
await self.session.close()
# 假设我们有一个异步API客户端
async def fetch_data():
client = AsyncAPIClient('https://yixiuna.top/ip', max_requests=3) # 限制最多请求3次
async for data in client:
print(data)
await client.close() # 关闭客户端会话
# 异步请求数据
asyncio.run(fetch_data())
YXN-python
2024-12-19