您现在的位置是:网站首页 > 博客日记 >

python异常重试装饰器

作者:YXN-python 阅读量:67 发布日期:2024-07-11

1、retry装饰器

不支持异步函数

from retrying import retry


@retry(stop_max_attempt_number=3)
def my_function():
    print("执行函数")
    raise ValueError("这是一个错误")

# 调用函数
my_function()

2、自己写一个

支持异步函数

import asyncio
import inspect
import time

def retry_on_exception(retries, exception=Exception, delay=1):
    """
    异常重试装饰器
    :param exception: 需要捕获的异常类型
    :param retries: 重试次数
    :param delay: 重试间隔时间
    :return:
    """
    def decorator(func):
        if inspect.iscoroutinefunction(func):
            # 异步函数的重试逻辑
            async def wrapper(*args, **kwargs):
                nonlocal retries
                while retries > 1:
                    try:
                        return await func(*args, **kwargs)
                    except exception as e:
                        print(f"捕获到异常:{e},将在{delay}秒后重试...")
                        await asyncio.sleep(delay)
                        retries -= 1
                return await func(*args, **kwargs)
        else:
            # 同步函数的重试逻辑
            def wrapper(*args, **kwargs):
                nonlocal retries
                while retries > 1:
                    try:
                        return func(*args, **kwargs)
                    except exception as e:
                        print(f"捕获到异常:{e},将在{delay}秒后重试...")
                        time.sleep(delay)
                        retries -= 1
                return func(*args, **kwargs)

        return wrapper

    return decorator


# 使用装饰器
@retry_on_exception(retries=3)
def my_function():
    raise ValueError("这是一个错误")


# 调用函数
my_function()

YXN-python

2024-07-11