您现在的位置是:网站首页 > 博客日记 >

异步迭代器

作者:YXN-python 阅读量:23 发布日期:2024-12-19

特点

  • 延迟计算:返回的值是异步获取的,可以通过 `await` 等待其完成。
  • 使用 `async` 和 `await`:异步迭代器通常需要配合 `async` 和 `await` 关键字来使用。
  • 支持异步 for 循环:你可以使用 `async for` 来迭代异步迭代器,这样就能在异步环境中逐个获取值。

基本使用

创建异步迭代器

一个简单的异步迭代器通常需要实现__aiter__() 和__anext__() 方法。

import asyncio

class AsyncCounter:
    def __init__(self, start, end):
        self.current = start
        self.end = end

    # 初始化异步迭代器
    def __aiter__(self):
        return self

    # 异步返回下一个值
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        self.current += 1
        await asyncio.sleep(1)  # 模拟异步任务
        return self.current


# 使用异步迭代器
async def main():
    async for number in AsyncCounter(1, 5):
        print(number)

# 运行异步任务
asyncio.run(main())

应用场景

1. 处理大规模异步数据流

在一些场景中,我们可能需要从外部资源(比如数据库、文件或者远程API)异步地读取大量数据。异步迭代器可以帮助我们分批次、延迟加载这些数据,从而避免内存溢出和提高效率。

例如,异步地读取一个大型文件:

import aiofiles
import asyncio


class AsyncFileIterator:
    def __init__(self, file_path):
        self.file_path = file_path
        self.file = None

    def __aiter__(self):
        return self

    # __anext__ 是异步方法,负责读取文件中的一行
    async def __anext__(self):
        if self.file is None:  # 在第一次迭代时打开文件
            self.file = await aiofiles.open(self.file_path, mode='r')

        line = await self.file.readline()
        if not line:
            await self.file.close()  # 文件读取完毕,关闭文件
            raise StopAsyncIteration  # 文件读取完毕,停止迭代

        return line.strip()  # 返回去掉换行符的行


# 使用异步文件迭代器
async def main():
    async for line in AsyncFileIterator('codebook_min.txt'):
        print(line)


# 运行异步任务
asyncio.run(main())

当然,也可以:

import asyncio
import aiofiles

async def read_large_file(file_path):
    async with aiofiles.open(file_path, mode='r') as file:
        async for line in file:
            yield line.strip()  # 使用 yield 返回每一行的内容

async def process_file():
    async for line in read_large_file("codebook_min.txt"):
        print(line)  # 处理每一行内容

# 运行异步事件循环
asyncio.run(process_file())

2. 处理异步请求和响应

异步迭代器也能应用在异步请求和响应的场景中,比如从网络API异步地获取数据。你可以将数据请求分批次进行迭代,避免同步阻塞。

import asyncio

import aiohttp

class AsyncAPIClient:
    def __init__(self, base_url, max_requests=5):
        self.base_url = base_url
        self.max_requests = max_requests  # 最大请求次数
        self.request_count = 0  # 请求计数器
        self.session = None

    def __aiter__(self):
        return self

    async def __anext__(self):
        # 如果达到请求次数限制,抛出 StopAsyncIteration
        if self.request_count >= self.max_requests:
            raise StopAsyncIteration

        if not self.session:
            self.session = aiohttp.ClientSession()

        async with self.session.get(self.base_url) as response:
            data = await response.json()
            self.request_count += 1  # 增加请求计数
            if not data:
                await self.session.close()
                raise StopAsyncIteration
            return data

    async def close(self):
        if self.session:
            await self.session.close()

# 假设我们有一个异步API客户端
async def fetch_data():
    client = AsyncAPIClient('https://yixiuna.top/ip', max_requests=3)  # 限制最多请求3次
    async for data in client:
        print(data)
    await client.close()  # 关闭客户端会话

# 异步请求数据
asyncio.run(fetch_data())

YXN-python

2024-12-19