python异常重试装饰器
作者:YXN-python 阅读量:67 发布日期:2024-07-11
1、retry装饰器
不支持异步函数
from retrying import retry
@retry(stop_max_attempt_number=3)
def my_function():
print("执行函数")
raise ValueError("这是一个错误")
# 调用函数
my_function()
2、自己写一个
支持异步函数
import asyncio
import inspect
import time
def retry_on_exception(retries, exception=Exception, delay=1):
"""
异常重试装饰器
:param exception: 需要捕获的异常类型
:param retries: 重试次数
:param delay: 重试间隔时间
:return:
"""
def decorator(func):
if inspect.iscoroutinefunction(func):
# 异步函数的重试逻辑
async def wrapper(*args, **kwargs):
nonlocal retries
while retries > 1:
try:
return await func(*args, **kwargs)
except exception as e:
print(f"捕获到异常:{e},将在{delay}秒后重试...")
await asyncio.sleep(delay)
retries -= 1
return await func(*args, **kwargs)
else:
# 同步函数的重试逻辑
def wrapper(*args, **kwargs):
nonlocal retries
while retries > 1:
try:
return func(*args, **kwargs)
except exception as e:
print(f"捕获到异常:{e},将在{delay}秒后重试...")
time.sleep(delay)
retries -= 1
return func(*args, **kwargs)
return wrapper
return decorator
# 使用装饰器
@retry_on_exception(retries=3)
def my_function():
raise ValueError("这是一个错误")
# 调用函数
my_function()
YXN-python
2024-07-11