python支持同步、异步的函数调用限制装饰器
作者:YXN-python 阅读量:9 发布日期:2025-06-11
这个装饰器确保被装饰的函数在同一时间只能执行一次,其他调用会被立即拒绝(可自定义拒绝回调和返回值),支持同步/异步函数且线程安全。
应用场景
- 按钮点击限制(GUI应用)
- API端点限流
- 表单提交限制,防止重复提交
核心功能
- 严格单次调用限制:确保函数在执行完成前只能被调用一次,后续调用会被立即拒绝
- 线程安全:使用 threading.Lock() 保证多线程环境下的安全性
- 同步/异步兼容:自动识别被装饰函数是同步还是异步,无需手动指定
回调机制:
- 成功回调 (on_success):函数执行成功后触发
- 拒绝回调 (on_rejected):函数被拒绝执行时触发
- 自定义拒绝返回值:可指定被拒绝时返回的默认值
import inspect
import threading
import time
from typing import Callable, Optional, Any
def single_call(
timeout: float = 0,
on_success: Optional[Callable[..., Any]] = None,
on_rejected: Optional[Callable[..., Any]] = None,
reject_result: Any = None,
):
"""
函数调用限制装饰器
参数:
timeout: 两次允许执行的最小时间间隔(秒),0表示只限制执行期间
timeout = 0: 仅限制函数不能同时执行(前一次执行完成后才能开始下一次)
timeout > 0: 即使前一次执行已完成,仍需等待设定的时间间隔(如2秒)后才允许再次执行
on_success: 成功执行后的回调函数
on_rejected: 被拒绝执行时的回调函数
reject_result: 被拒绝时返回的默认值
"""
def decorator(func):
is_running = False
last_run_time = 0
lock = threading.Lock()
if inspect.iscoroutinefunction(func):
async def async_wrapped(*args, **kwargs):
nonlocal is_running, last_run_time
current_time = time.time()
with lock:
if is_running or (current_time - last_run_time < timeout):
if on_rejected:
try:
if inspect.iscoroutinefunction(on_rejected):
await on_rejected(*args, **kwargs)
else:
on_rejected(*args, **kwargs)
except Exception as e:
print(f"拒绝回调错误: {e}")
return reject_result
is_running = True
last_run_time = current_time
try:
result = await func(*args, **kwargs)
if on_success:
with lock:
try:
if inspect.iscoroutinefunction(on_success):
await on_success(*args, **kwargs)
else:
on_success(*args, **kwargs)
except Exception as e:
print(f"成功回调错误: {e}")
return result
finally:
with lock:
is_running = False
return async_wrapped
else:
def sync_wrapped(*args, **kwargs):
nonlocal is_running, last_run_time
current_time = time.time()
with lock:
if is_running or (current_time - last_run_time < timeout):
if on_rejected:
try:
on_rejected(*args, **kwargs)
except Exception as e:
print(f"拒绝回调错误: {e}")
return reject_result
is_running = True
last_run_time = current_time
try:
result = func(*args, **kwargs)
if on_success:
with lock:
try:
on_success(*args, **kwargs)
except Exception as e:
print(f"成功回调错误: {e}")
return result
finally:
with lock:
is_running = False
return sync_wrapped
return decorator
多线程环境下的同步函数调用示例
import threading
import time
def success_handler(num):
print(f"线程 {num} 执行完毕")
def rejected_callback(num):
print(f'线程 {num} 被拒绝了')
@single_call(on_rejected=rejected_callback, on_success=success_handler, timeout=1)
def worker(num):
print(f'线程 {num} 开始')
time.sleep(2)
print(f'线程 {num} 结束')
threads = []
for i in range(4):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
threading.Thread(target=worker, args=(4,)).start()
"""
线程 0 开始
线程 1 被拒绝了
线程 2 被拒绝了
线程 3 被拒绝了
线程 0 结束
线程 4 开始
线程 4 结束
"""
异步函数在多线程环境中的调用示例
import asyncio
import threading
async def async_success_callback(num):
print(f"线程 {num} 操作成功")
async def async_reject_callback(num):
print(f"线程 {num} 被拒绝")
@single_call(
on_success=async_success_callback,
on_rejected=async_reject_callback
)
async def async_operation(task_id):
print(f"开始异步 {task_id}")
await asyncio.sleep(1)
print(f"结束异步 {task_id}")
def thread_worker(task_id):
print(f"线程 {task_id} 启动")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(async_operation(task_id))
loop.close()
threads = []
for i in range(3):
t = threading.Thread(target=thread_worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
threading.Thread(target=thread_worker, args=(100,)).start()
"""
线程 0 启动
线程 1 启动
线程 2 启动
开始异步 1
线程 0 被拒绝
线程 2 被拒绝
结束异步 1
线程 1 操作成功
线程 100 启动
开始异步 100
结束异步 100
线程 100 操作成功
"""
YXN-python
2025-06-11