您现在的位置是:网站首页 > 博客日记 >

python支持同步、异步的函数调用限制装饰器

作者:YXN-python 阅读量:9 发布日期:2025-06-11

这个装饰器确保被装饰的函数在同一时间只能执行一次,其他调用会被立即拒绝(可自定义拒绝回调和返回值),支持同步/异步函数且线程安全。

应用场景

  • 按钮点击限制(GUI应用)
  • API端点限流
  • 表单提交限制,防止重复提交

核心功能

  • 严格单次调用限制:确保函数在执行完成前只能被调用一次,后续调用会被立即拒绝
  • 线程安全:使用 threading.Lock() 保证多线程环境下的安全性
  • 同步/异步兼容:自动识别被装饰函数是同步还是异步,无需手动指定

回调机制:

  • 成功回调 (on_success):函数执行成功后触发
  • 拒绝回调 (on_rejected):函数被拒绝执行时触发
  • 自定义拒绝返回值:可指定被拒绝时返回的默认值
import inspect
import threading
import time
from typing import Callable, Optional, Any


def single_call(
        timeout: float = 0,
        on_success: Optional[Callable[..., Any]] = None,
        on_rejected: Optional[Callable[..., Any]] = None,
        reject_result: Any = None,

):
    """
    函数调用限制装饰器

    参数:
        timeout: 两次允许执行的最小时间间隔(秒),0表示只限制执行期间
            timeout = 0: 仅限制函数不能同时执行(前一次执行完成后才能开始下一次)
            timeout > 0: 即使前一次执行已完成,仍需等待设定的时间间隔(如2秒)后才允许再次执行
        on_success: 成功执行后的回调函数
        on_rejected: 被拒绝执行时的回调函数
        reject_result: 被拒绝时返回的默认值
    """

    def decorator(func):
        is_running = False
        last_run_time = 0
        lock = threading.Lock()

        if inspect.iscoroutinefunction(func):
            async def async_wrapped(*args, **kwargs):
                nonlocal is_running, last_run_time

                current_time = time.time()
                with lock:
                    if is_running or (current_time - last_run_time < timeout):
                        if on_rejected:
                            try:
                                if inspect.iscoroutinefunction(on_rejected):
                                    await on_rejected(*args, **kwargs)
                                else:
                                    on_rejected(*args, **kwargs)
                            except Exception as e:
                                print(f"拒绝回调错误: {e}")
                        return reject_result

                    is_running = True
                    last_run_time = current_time

                try:
                    result = await func(*args, **kwargs)
                    if on_success:
                        with lock:
                            try:
                                if inspect.iscoroutinefunction(on_success):
                                    await on_success(*args, **kwargs)
                                else:
                                    on_success(*args, **kwargs)
                            except Exception as e:
                                print(f"成功回调错误: {e}")
                    return result
                finally:
                    with lock:
                        is_running = False

            return async_wrapped
        else:
            def sync_wrapped(*args, **kwargs):
                nonlocal is_running, last_run_time

                current_time = time.time()
                with lock:
                    if is_running or (current_time - last_run_time < timeout):
                        if on_rejected:
                            try:
                                on_rejected(*args, **kwargs)
                            except Exception as e:
                                print(f"拒绝回调错误: {e}")
                        return reject_result

                    is_running = True
                    last_run_time = current_time

                try:
                    result = func(*args, **kwargs)
                    if on_success:
                        with lock:
                            try:
                                on_success(*args, **kwargs)
                            except Exception as e:
                                print(f"成功回调错误: {e}")
                    return result
                finally:
                    with lock:
                        is_running = False

            return sync_wrapped

    return decorator

多线程环境下的同步函数调用示例

import threading
import time


def success_handler(num):
    print(f"线程 {num} 执行完毕")


def rejected_callback(num):
    print(f'线程 {num} 被拒绝了')


@single_call(on_rejected=rejected_callback, on_success=success_handler, timeout=1)
def worker(num):
    print(f'线程 {num} 开始')
    time.sleep(2)
    print(f'线程 {num} 结束')


threads = []
for i in range(4):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

threading.Thread(target=worker, args=(4,)).start()

"""
线程 0 开始
线程 1 被拒绝了
线程 2 被拒绝了
线程 3 被拒绝了
线程 0 结束
线程 4 开始
线程 4 结束
"""

异步函数在多线程环境中的调用示例

import asyncio
import threading

async def async_success_callback(num):
    print(f"线程 {num} 操作成功")


async def async_reject_callback(num):
    print(f"线程 {num} 被拒绝")


@single_call(
    on_success=async_success_callback,
    on_rejected=async_reject_callback
)
async def async_operation(task_id):
    print(f"开始异步 {task_id}")
    await asyncio.sleep(1)
    print(f"结束异步 {task_id}")


def thread_worker(task_id):
    print(f"线程 {task_id} 启动")
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    result = loop.run_until_complete(async_operation(task_id))
    loop.close()


threads = []
for i in range(3):
    t = threading.Thread(target=thread_worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

threading.Thread(target=thread_worker, args=(100,)).start()

"""
线程 0 启动
线程 1 启动
线程 2 启动
开始异步 1
线程 0 被拒绝
线程 2 被拒绝
结束异步 1
线程 1 操作成功
线程 100 启动
开始异步 100
结束异步 100
线程 100 操作成功
"""

 

 

YXN-python

2025-06-11