您现在的位置是:网站首页 > 博客日记 >

一个简单的应用密钥激活系统

作者:YXN-python 阅读量:54 发布日期:2024-12-28

1. 功能概述

用户购买软件后,获得唯一的激活密钥。

用户输入密钥后,系统验证有效性并激活软件。

2. 核心流程

生成密钥

密钥格式:KEY-XXXXX-XXXXXXXX(16位,字母+数字混合)。

通过算法加密绑定用户设备信息(如MAC等)。

验证密钥

请求服务器API,检查密钥是否有效且未过期。

激活状态存储

本地存储:写入注册表或配置文件(加密存储)。

服务器记录:标记密钥为已使用,绑定设备信息防重复激活。

安装库

pip install -r requirements.txt

pycryptodome==3.22.0
requests==2.32.3
aiomysql==0.2.0
fastapi==0.115.6
uvicorn==0.33.0
cryptography==44.0.2

下面代码来说比较乱,随便整理一下就好了

服务端

import asyncio
from datetime import datetime
import json
from typing import List, Dict, Union
from pydantic import BaseModel

from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
import aiomysql
from cryptography.fernet import Fernet

from config import FERNET_KEY, DB_CONFIG
from test_data import generate_key_code

app = FastAPI()

# 加密配置
fernet = Fernet(FERNET_KEY)


async def get_db() -> aiomysql.Pool:
    return await aiomysql.create_pool(**DB_CONFIG)


class User(BaseModel):
    id: int
    username: str
    password_hash: str
    status: str


class ActivationKey(BaseModel):
    id: int
    user_id: int
    app_name: str
    key_code: str
    type: int
    status: str
    last_used_ip: Union[str, None]


def is_admin():
    return True


# 管理员权限验证
def get_admin_user():
    # 管理员验证逻辑
    if not is_admin():
        raise HTTPException(status_code=403, detail="无权限访问")


@app.post("/api/create_key", dependencies=[Depends(get_admin_user)])
async def create_key(request: Request):
    """生成新激活码"""
    try:
        encrypted_data = await request.json()
        data = json.loads(fernet.decrypt(encrypted_data.encode()).decode())
        user_id = data['user_id']
        key_type = data['type']
        app_name = data['app_name']

        # 生成唯一激活码
        key_code = generate_key_code()

        async with await get_db() as pool:
            async with pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(
                        "INSERT INTO activation_keys (user_id, app_name, key_code, type, status) VALUES (%s, %s, %s, %s, '激活')",
                        (user_id, app_name, key_code, key_type)
                    )
        return {"success": True, "key_code": key_code}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/api/list_keys", dependencies=[Depends(get_admin_user)])
async def list_keys(request: Request):
    """列出所有激活码"""
    try:
        async with await get_db() as pool:
            async with pool.acquire() as conn:
                async with conn.cursor(aiomysql.DictCursor) as cur:
                    await cur.execute("""
                        SELECT 
                            u.username,
                            u.status AS user_status,
                            ak.app_name,
                            ak.key_code,
                            IF(ak.last_used_fingerprint is null, '否','是') as fingerprint,
                            ak.status AS key_status,
                            CASE 
                                WHEN ak.type = 1 THEN '多机器多IP'
                                WHEN ak.type = 2 THEN '多机器单IP'
                                WHEN ak.type = 3 THEN '单机器多IP'
                                WHEN ak.type = 4 THEN '单机器单IP'
                            END AS type_text,
                            ak.id
                        FROM 
                            activation_keys ak
                        JOIN 
                            users u ON ak.user_id = u.id;
                    """)
                    keys = await cur.fetchall()
        return {"keys": keys}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/api/update_key", dependencies=[Depends(get_admin_user)])
async def update_key(request: Request):
    """更新激活码状态/类型"""
    try:
        encrypted_data = await request.json()
        data = json.loads(fernet.decrypt(encrypted_data.encode()).decode())

        key_id = data['id']
        new_status = data.get('status')
        new_type = data.get('type')

        query = "UPDATE activation_keys SET "
        params = []
        if new_status:
            query += "status = %s, "
            params.append(new_status)
        if new_type:
            query += "type = %s, "
            params.append(new_type)
        query = query.rstrip(', ') + " WHERE id = %s"
        params.append(key_id)

        async with await get_db() as pool:
            async with pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(query, tuple(params))
        return {"success": True}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/api/delete_key", dependencies=[Depends(get_admin_user)])
async def delete_key(request: Request):
    """删除激活码"""
    try:
        encrypted_data = await request.json()
        data = json.loads(fernet.decrypt(encrypted_data.encode()).decode())
        key_id = data['id']

        async with await get_db() as pool:
            async with pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute("DELETE FROM activation_keys WHERE id = %s", (key_id,))
        return {"success": True}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/api/list_users", dependencies=[Depends(get_admin_user)])
async def list_users():
    """列出所有用户"""
    try:
        async with await get_db() as pool:
            async with pool.acquire() as conn:
                async with conn.cursor(aiomysql.DictCursor) as cur:
                    await cur.execute("""
                        SELECT
                            u.id,
                            u.username,
                            u.status,
                            count(ak.key_code) as `total`,
                            sum(if(ak.status='锁定', 0, 1)) as ycount
                        FROM
                            activation_keys ak
                            JOIN users u ON ak.user_id = u.id
                        GROUP BY
                            u.id, u.username, u.status
                    """)
                    users = await cur.fetchall()
        return {"users": users}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/api/update_user", dependencies=[Depends(get_admin_user)])
async def update_user(request: Request):
    """更新用户状态"""
    try:
        encrypted_data = await request.json()
        data = json.loads(fernet.decrypt(encrypted_data.encode()).decode())
        user_id = data['user_id']
        new_status = data['status']

        async with await get_db() as pool:
            async with pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(
                        "UPDATE users SET status = %s WHERE id = %s",
                        (new_status, user_id)
                    )
        return {"success": True}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/api/get_keys")
async def get_keys(request: Request):
    try:
        encrypted_data = await request.json()
        data_str = fernet.decrypt(encrypted_data.encode()).decode()
        data = json.loads(data_str)
        app_name = data['app']
        username = data['user']
        password = data['password']
        user = await get_user_by_username(username)
        if not user or not verify_password(password, user.password_hash):
            raise HTTPException(status_code=401, detail="账号或密码错误")

        keys = await get_activation_keys(user.id, app_name)

        if not keys:
            raise HTTPException(status_code=404, detail="未找到激活码")

        return {"keys": [key.model_dump() for key in keys]}

    except Exception as e:
        return JSONResponse({"error": str(e)}, status_code=500)


@app.post("/api/login")
async def login(request: Request):
    try:
        encrypted_data = await request.json()
        data_str = fernet.decrypt(encrypted_data.encode()).decode()
        data = json.loads(data_str)

        key_code = data['key']
        fingerprint = data['fingerprint']
        client_ip = request.client.host if request.client else "127.0.0.1"

        # 验证激活码
        key = await get_activation_key_by_code(key_code)

        if not key or key.status == "锁定":
            raise HTTPException(status_code=403, detail="激活码无效或已锁定")

        # 类型验证
        if key.type == 1:  # 多机器多IP
            pass
        elif key.type == 2:  # 多机器单IP
            if key.last_used_ip and key.last_used_ip != client_ip:
                raise HTTPException(status_code=403, detail="IP地址不匹配")
        elif key.type == 3:  # 单机器多IP
            if key.last_used_fingerprint and key.last_used_fingerprint != fingerprint:
                raise HTTPException(status_code=403, detail="设备指纹不匹配")
        elif key.type == 4:  # 单机器单IP
            if (key.last_used_fingerprint and key.last_used_fingerprint != fingerprint) or \
                    (key.last_used_ip and key.last_used_ip != client_ip):
                raise HTTPException(status_code=403, detail="设备或IP地址不匹配")
        else:
            return JSONResponse({"error": "数据异常!"}, status_code=400)

        # 更新使用记录
        await update_key_usage(key.id, fingerprint, client_ip)

        return {"success": True}

    except Exception as e:
        return JSONResponse({"error": str(e)}, status_code=500)


@app.post("/api/check_activation")
async def check_activation(request: Request):
    try:
        encrypted_data = await request.json()
        data_str = fernet.decrypt(encrypted_data.encode()).decode()
        data = json.loads(data_str)

        key_code = data['key']
        fingerprint = data['fingerprint']
        client_ip = request.client.host if request.client else "127.0.0.1"

        # 验证激活码
        key = await get_activation_key_by_code(key_code)
        if not key or key.status == "锁定":
            raise HTTPException(status_code=403, detail="激活码无效或已锁定")

        # 类型验证逻辑(与登录接口相同)
        if key.type == 1:  # 多机器多IP
            pass
        elif key.type == 2:  # 多机器单IP
            if key.last_used_ip and key.last_used_ip != client_ip:
                raise HTTPException(status_code=403, detail="IP地址不匹配")
        elif key.type == 3:  # 单机器多IP
            if key.last_used_fingerprint and key.last_used_fingerprint != fingerprint:
                raise HTTPException(status_code=403, detail="设备指纹不匹配")
        elif key.type == 4:  # 单机器单IP
            if (key.last_used_fingerprint and key.last_used_fingerprint != fingerprint) or \
                    (key.last_used_ip and key.last_used_ip != client_ip):
                raise HTTPException(status_code=403, detail="设备或IP地址不匹配")

        return {"success": True}

    except Exception as e:
        return JSONResponse({"error": str(e)}, status_code=500)


async def get_user_by_username(username: str) -> User:
    async with await get_db() as pool:
        async with pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute("SELECT * FROM users WHERE username = %s", (username,))
                user = await cur.fetchone()
                return User(**user) if user else None


async def get_activation_keys(user_id: int, app_name: str) -> List[ActivationKey]:
    async with await get_db() as pool:
        async with pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(
                    "SELECT * FROM activation_keys WHERE user_id = %s AND app_name = %s AND status != '锁定'",
                    (user_id, app_name)
                )
                keys = await cur.fetchall()
                return [ActivationKey(**k) for k in keys]


async def get_activation_key_by_code(key_code: str) -> ActivationKey:
    async with await get_db() as pool:
        async with pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute("SELECT * FROM activation_keys WHERE key_code = %s", (key_code,))
                key = await cur.fetchone()
                return ActivationKey(**key) if key else None


async def update_key_usage(key_id: int, fingerprint: str, ip: str):
    async with await get_db() as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    "UPDATE activation_keys SET last_used_fingerprint = %s, last_used_ip = %s, last_used_time = %s WHERE id = %s",
                    (fingerprint, ip, datetime.now(), key_id)
                )


def verify_password(plain_password: str, hashed_password: str) -> bool:
    password = fernet.decrypt(hashed_password).decode()
    return plain_password == password


# 启动服务
if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app,
                host="0.0.0.0",
                port=8000,
                # ssl_keyfile=SSL_KEY_FILE,  # 私钥文件
                # ssl_certfile=SSL_CERT_FILE,  # 证书文件
                )

 

客户端

import tkinter as tk
from tkinter import messagebox
import json
import hashlib
import uuid
import os
import threading
import platform
import requests
from cryptography.fernet import Fernet

from config import FERNET_KEY, ACTIVATION_FILE, SERVER_URL, APP_NAME


fernet = Fernet(FERNET_KEY)


class ActivationApp:
    def __init__(self, root):
        self.root = root
        self.root.withdraw()  # 隐藏根窗口
        self.root.geometry("300x150")  # 设置窗口大小
        self.center_window(self.root)  # 居中显示
        self.fingerprint = self.get_machine_fingerprint()
        self.current_key_code = None
        self.load_activation_file()
        self.polling_task = None  # 用于存储轮询任务

        if self.current_key_code and self.validate_saved_key(self.current_key_code):
            self.show_main_window(self.current_key_code)
        else:
            self.show_activation_window()

    def show_activation_window(self):
        """显示激活界面"""
        self.activation_window = tk.Toplevel(self.root)
        self.activation_window.title("激活界面")
        self.activation_window.geometry("300x150")
        self.activation_window.resizable(False, False)  # 禁用窗口放大
        self.center_window(self.activation_window)  # 居中显示
        self.activation_window.protocol("WM_DELETE_WINDOW", self.root.quit)  # 关闭时退出整个程序

        self.username_var = tk.StringVar()
        self.password_var = tk.StringVar()

        tk.Label(self.activation_window, text="账号:").grid(row=0, column=0, padx=10, pady=5)
        tk.Entry(self.activation_window, textvariable=self.username_var).grid(row=0, column=1, padx=10, pady=5)

        tk.Label(self.activation_window, text="密码:").grid(row=1, column=0, padx=10, pady=5)
        tk.Entry(self.activation_window, show="*", textvariable=self.password_var).grid(row=1, column=1, padx=10,
                                                                                        pady=5)

        register_btn = tk.Button(self.activation_window, text="去注册", command=self.open_register_page)
        activate_btn = tk.Button(self.activation_window, text="激活", command=self.activate)

        # 放置按钮
        register_btn.grid(row=2, column=0, padx=5, pady=5, sticky="ew")
        activate_btn.grid(row=2, column=1, padx=5, pady=5, sticky="ew")

        # 让两列等宽,并居中
        self.activation_window.grid_columnconfigure(0, weight=1)  # 第0列权重1
        self.activation_window.grid_columnconfigure(1, weight=1)  # 第1列权重1

    def show_key_selection(self, keys):
        selection_win = tk.Toplevel(self.root)
        selection_win.title("选择激活码")
        selection_win.resizable(False, False)  # 禁用窗口放大
        selection_win.geometry("400x250")
        self.center_window(selection_win)  # 居中显示

        # 创建滚动条
        scrollbar = tk.Scrollbar(selection_win, orient=tk.VERTICAL, width=20)
        scrollbar.pack(side=tk.RIGHT, fill=tk.Y)

        listbox = tk.Listbox(selection_win, width=50, height=10, yscrollcommand=scrollbar.set)
        for key in keys:
            type_desc = self.get_type_desc(key["type"])
            listbox.insert(tk.END, f"{key['key_code']} - {type_desc}")
        listbox.pack(padx=10, pady=10)
        scrollbar.config(command=listbox.yview)
        select_btn = tk.Button(
            selection_win,
            text="选择",
            command=lambda: self.select_key(listbox, selection_win)
        )
        select_btn.pack(pady=5)

    def show_main_window(self, key_code):
        """显示主界面"""
        self.main_window = tk.Toplevel(self.root)
        self.main_window.title("主界面")
        # self.main_window.resizable(False, False)  # 禁用窗口放大
        self.main_window.geometry("800x600")
        self.center_window(self.main_window, 800, 600)  # 居中显示
        self.main_window.protocol("WM_DELETE_WINDOW", self.on_close_main_window)  # 关闭主界面时触发

        # 配置菜单栏
        menu_bar = tk.Menu(self.main_window)
        about_menu = tk.Menu(menu_bar, tearoff=0)
        about_menu.add_command(label="更改激活码", command=self.logout)
        about_menu.add_command(label="退出激活码", command=self.logout)
        menu_bar.add_cascade(label="关于", menu=about_menu)
        self.main_window.config(menu=menu_bar)

        # 启动轮询
        self.start_polling(self.main_window, key_code)

    @staticmethod
    def center_window(window, width=None, height=None):
        """使窗口居中显示"""
        window.update_idletasks()  # 确保窗口尺寸已计算

        if width is None:
            width = window.winfo_width()
        if height is None:
            height = window.winfo_height()

        screen_width = window.winfo_screenwidth()
        screen_height = window.winfo_screenheight()

        x = int((screen_width - width) / 2)
        y = int((screen_height - height) / 2)

        window.geometry(f"{width}x{height}+{x}+{y}")

    def get_machine_fingerprint(self):
        components = [
            str(uuid.getnode()),
            platform.node(),
            platform.processor()
        ]
        return hashlib.sha256("".join(components).encode()).hexdigest()

    def load_activation_file(self):
        """加载激活文件"""
        if os.path.exists(ACTIVATION_FILE):
            try:
                with open(ACTIVATION_FILE, "r") as f:
                    lines = f.readlines()
                    self.current_key_code = lines[0].strip()
                    saved_fingerprint = lines[1].strip()

                    # 检查指纹是否匹配
                    if saved_fingerprint == self.fingerprint:
                        return
                    else:
                        self.current_key_code = None
            except:
                self.current_key_code = None

    def validate_saved_key(self, key_code):
        """验证激活码有效性"""
        try:
            data = {
                "key": key_code,
                "fingerprint": self.fingerprint
            }
            encrypted_data = fernet.encrypt(json.dumps(data).encode()).decode()
            response = requests.post(f"{SERVER_URL}check_activation", json=encrypted_data, timeout=5)
            return response.json().get("success", False)
        except:
            return False

    def activate(self):
        username = self.username_var.get()
        password = self.password_var.get()
        if not all([username, password]):
            messagebox.showerror("错误", "账号和密码不能为空")
            return

        data = {
            "app": APP_NAME,
            "user": username,
            "password": password
        }
        encrypted_data = fernet.encrypt(json.dumps(data).encode()).decode()

        try:
            response = requests.post(f"{SERVER_URL}get_keys", json=encrypted_data, timeout=5)
            if response.status_code == 200:
                keys = response.json()["keys"]
                if len(keys) == 1:
                    self.login_with_key(keys[0]["key_code"], self.activation_window)
                else:
                    self.show_key_selection(keys)
            else:
                messagebox.showerror("错误", response.json().get("error", "未知错误"))
        except requests.exceptions.RequestException as e:
            messagebox.showerror("错误", '网络错误!')

    def get_type_desc(self, type_code):
        return {
            1: "多机器多IP",
            2: "多机器单IP",
            3: "单机器多IP",
            4: "单机器单IP"
        }.get(type_code, "未知类型")

    def select_key(self, listbox, parent_window):
        selected = listbox.curselection()
        if not selected:
            messagebox.showerror("错误", "未选择激活码")
            return

        selected_key_str = listbox.get(selected[0])
        key_code = selected_key_str.split(" - ")[0]

        # 在后台线程执行登录操作
        threading.Thread(
            target=self.login_with_key,
            args=(key_code, parent_window),
            daemon=True
        ).start()

    def login_with_key(self, key_code, parent_window):
        try:
            data = {
                "key": key_code,
                "fingerprint": self.fingerprint
            }
            encrypted_data = fernet.encrypt(json.dumps(data).encode()).decode()
            response = requests.post(f"{SERVER_URL}login", json=encrypted_data, timeout=10)

            if response.json().get("success"):
                self.current_key_code = key_code
                self.save_activation_file(key_code)

                # 销毁所有激活相关窗口
                self.activation_window.destroy()
                parent_window.destroy()

                # 显示主界面
                self.show_main_window(key_code)
            else:
                messagebox.showerror("登录失败", response.json().get("error"))
        except requests.exceptions.RequestException as e:
            messagebox.showerror("错误", '网络错误!')
        except Exception as e:
            messagebox.showerror("错误", '未知错误!')

    def save_activation_file(self, key_code):
        """将激活码和指纹保存到文件"""
        with open(ACTIVATION_FILE, "w") as f:
            f.write(f"{key_code}\n")
            f.write(f"{self.fingerprint}\n")

    def start_polling(self, window, key_code):
        """每5分钟检查激活状态"""

        def check_valid():
            try:
                data = {
                    "key": key_code,
                    "fingerprint": self.fingerprint
                }
                encrypted_data = fernet.encrypt(json.dumps(data).encode()).decode()
                response = requests.post(f"{SERVER_URL}check_activation", json=encrypted_data, timeout=5)

                if not response.json().get("success"):
                    messagebox.showerror("验证失败", "激活码已失效,请重新激活")
                    self.on_close_main_window()
                else:
                    self.polling_task = window.after(300000, check_valid)  # 存储任务ID
            except Exception as e:
                messagebox.showerror("错误", '未知错误!')
                self.on_close_main_window()

        check_valid()  # 立即执行第一次检查

    def on_close_main_window(self, show_activation=False):
        """主界面关闭时触发的清理操作"""
        # 取消轮询任务
        if self.polling_task:
            self.main_window.after_cancel(self.polling_task)
            self.polling_task = None

        # 销毁主窗口
        self.main_window.destroy()
        # 返回激活界面
        if show_activation:
            self.show_activation_window()
        else:
            self.root.quit()

    def logout(self):
        if messagebox.askokcancel("确认退出", "确定要退出激活码吗?"):
            self.clear_activation_file()
            self.on_close_main_window(show_activation=True)

    def clear_activation_file(self):
        """清除激活文件"""
        if os.path.exists(ACTIVATION_FILE):
            os.remove(ACTIVATION_FILE)

    def open_register_page(self):
        import webbrowser
        webbrowser.open("https://your-registration-page.com")


if __name__ == "__main__":
    root = tk.Tk()
    app = ActivationApp(root)
    root.mainloop()

 

管理端

import json
from tkinter import messagebox, ttk, simpledialog
import tkinter as tk
import requests

from client import SERVER_URL
from server import fernet


class AdminWindow:
    def __init__(self, root):
        self.root = root
        self.root.title("激活码管理")
        self.root.geometry("600x400")

        # 创建选项卡
        self.tab_control = ttk.Notebook(root)

        # 生成激活码界面
        self.create_key_tab()

        # 管理激活码界面
        self.manage_keys_tab()

        # 用户管理界面
        self.user_management_tab()

        self.tab_control.pack(expand=1, fill='both')

    def create_key_tab(self):
        """生成激活码界面"""
        tab = ttk.Frame(self.tab_control)
        self.tab_control.add(tab, text='生成激活码')

        # 用户选择
        user_id_var = tk.StringVar()
        tk.Label(tab, text="用户ID:").grid(row=0, column=0)
        tk.Entry(tab, textvariable=user_id_var).grid(row=0, column=1)

        # 类型选择
        key_types = {
            "多机器多IP": 1,
            "多机器单IP": 2,
            "单机器多IP": 3,
            "单机器单IP": 4
        }
        key_type_var = tk.StringVar()
        key_type_var.set('请选择:')
        tk.Label(tab, text="类型:").grid(row=1, column=0)
        tk.OptionMenu(tab, key_type_var, *key_types.keys()).grid(row=1, column=1)

        # 应用名称
        app_name_var = tk.StringVar()
        app_name_var.set("test_app")
        tk.Label(tab, text="应用名称:").grid(row=2, column=0)
        tk.Entry(tab, textvariable=app_name_var).grid(row=2, column=1)

        # 生成按钮
        def generate_key():
            try:
                user_id = user_id_var.get()
                key_type = key_type_var.get()
                app_name = app_name_var.get()
                if any([not user_id, not key_type, not app_name]):
                    return tk.messagebox.showerror("错误", "请填写完整信息")

                response = requests.post(
                    f"{SERVER_URL}create_key",
                    json=fernet.encrypt(json.dumps({
                        "user_id": user_id,
                        "type": key_types.get(key_type),
                        "app_name": app_name
                    }).encode()).decode()
                )
                messagebox.showinfo("成功", f"生成的激活码:{response.json()['key_code']}")
            except Exception as e:
                messagebox.showerror("错误", str(e))

        tk.Button(tab, text="生成激活码", command=generate_key).grid(row=3, columnspan=2)

    def manage_keys_tab(self):
        """激活码管理界面"""
        tab = ttk.Frame(self.tab_control)
        self.tab_control.add(tab, text='激活码管理')

        # 列表显示
        columns = ('#', '用户名', '用户状态', '应用名', '激活码', '是否绑定', '类型', '激活码状态')
        col_width = [30, 40, 100, 30, 50, 50, 30]  # 列宽
        yscroll1 = tk.Scrollbar(tab, orient=tk.VERTICAL)
        self.keys_tree = ttk.Treeview(tab, height=10, yscrollcommand=yscroll1.set, columns=columns, show="headings")
        yscroll1.config(command=self.keys_tree.yview)
        yscroll1.pack(side=tk.RIGHT, fill=tk.Y)
        for i, column in enumerate(columns):
            self.keys_tree.heading(f"#{i}", text=column, anchor=tk.CENTER)  # 定义表头
            self.keys_tree.column(column=column, width=col_width[i - 1], stretch=True)  # 定义列
        self.keys_tree.pack(fill="both", expand=True, pady=10)

        # 刷新按钮
        def refresh_keys():
            try:
                response = requests.get(f"{SERVER_URL}list_keys")
                keys = response.json()["keys"]
                self.keys_tree.delete(*self.keys_tree.get_children())
                for key in keys:
                    item = self.keys_tree.insert("", "end", values=(
                        key['username'], key['user_status'], key['app_name'],
                        key['key_code'], key['fingerprint'], key['type_text'], key['key_status']
                    ))
                    # 将ID存储在item的tag中
                    self.keys_tree.item(item, tags=(str(key['id']),))
            except Exception as e:
                messagebox.showerror("错误", str(e))

        # 操作按钮
        def update_key():
            selected = self.keys_tree.selection()
            if selected:
                key_id = int(self.keys_tree.item(selected[0], 'tags')[0])
                new_status = simpledialog.askstring("修改状态", "输入新状态(激活/锁定)")
                if new_status:
                    try:
                        requests.post(
                            f"{SERVER_URL}update_key",
                            json=fernet.encrypt(json.dumps({
                                "id": key_id,
                                "status": new_status
                            }).encode()).decode()
                        )
                        refresh_keys()
                    except Exception as e:
                        messagebox.showerror("错误", str(e))

        def delete_key():
            selected = self.keys_tree.selection()
            if selected:
                if not tk.messagebox.askokcancel('提⽰', '确定要删除吗?'):
                    return
                key_id = int(self.keys_tree.item(selected[0], 'tags')[0])
                try:
                    requests.post(
                        f"{SERVER_URL}delete_key",
                        json=fernet.encrypt(json.dumps({"id": key_id}).encode()).decode()
                    )
                    refresh_keys()
                except Exception as e:
                    messagebox.showerror("错误", str(e))

        tk.Button(tab, text="刷新列表", command=refresh_keys).\
            pack(side="left", expand=True, fill="x", padx=5, pady=(5, 10))
        tk.Button(tab, text="修改状态", command=update_key).\
            pack(side="left", expand=True, fill="x", padx=5, pady=(5, 10))
        tk.Button(tab, text="删除激活码", command=delete_key).\
            pack(side="left", expand=True, fill="x", padx=5, pady=(5, 10))

    def user_management_tab(self):
        """用户管理界面"""
        tab = ttk.Frame(self.tab_control)
        self.tab_control.add(tab, text='用户管理')

        # 用户列表
        columns2 = ("#", "用户名", "用户状态", '激活码数', '有效激活码数')
        col_width = [100, 150, 100, 100]
        yscroll2 = tk.Scrollbar(tab, orient=tk.VERTICAL)
        self.users_tree = ttk.Treeview(tab, columns=columns2, height=10, yscrollcommand=yscroll2.set, show="headings")
        yscroll2.config(command=self.users_tree.yview)
        yscroll2.pack(side=tk.RIGHT, fill=tk.Y)
        for i, column in enumerate(columns2):
            self.users_tree.heading(f"#{i}", text=column, anchor=tk.CENTER)  # 定义表头
            self.users_tree.column(column=column, width=col_width[i - 1], stretch=True)  # 定义列
        self.users_tree.pack(fill="both", expand=True, pady=10)

        # 刷新按钮
        def refresh_users():
            try:
                response = requests.get(f"{SERVER_URL}list_users")
                users = response.json()["users"]
                self.users_tree.delete(*self.users_tree.get_children())
                for user in users:
                    item = self.users_tree.insert("", "end", values=(
                        user['username'], user['status'], user['total'], user['ycount']
                    ))
                    # 将ID存储在item的tag中
                    self.users_tree.item(item, tags=(str(user['id']),))
            except Exception as e:
                messagebox.showerror("错误", str(e))

        # 修改用户状态
        def update_user_status():
            selected = self.users_tree.selection()
            if selected:
                user_id = int(self.users_tree.item(selected[0], 'tags')[0])
                new_status = simpledialog.askstring("修改用户状态", "输入新状态(正常/锁定)")
                if new_status:
                    try:
                        requests.post(
                            f"{SERVER_URL}update_user",
                            json=fernet.encrypt(json.dumps({
                                "user_id": user_id,
                                "status": new_status
                            }).encode()).decode()
                        )
                        refresh_users()
                    except Exception as e:
                        messagebox.showerror("错误", str(e))

        tk.Button(tab, text="修改用户状态", command=update_user_status). \
            pack(side="left", expand=True, fill="x", padx=5, pady=(5, 10))
        tk.Button(tab, text="刷新用户列表", command=refresh_users). \
            pack(side="left", expand=True, fill="x", padx=5, pady=(5, 10))


if __name__ == "__main__":
    root = tk.Tk()
    app = AdminWindow(root)
    root.mainloop()

 

生成测试数据

import hashlib
import uuid
import asyncio
import random

import aiomysql
from cryptography.fernet import Fernet  # 生成加密激活码

# 数据库配置(与服务端一致)
DB_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': 'root',
    'db': 'key',
    'autocommit': True,
    'minsize': 1,
    'maxsize': 5
}

# 加密密钥(与服务端一致)
FERNET_KEY = b'NieREysz8S3G-KTXWMV8l81sY5hWIJ-5r_cuAwWubfE='
fernet = Fernet(FERNET_KEY)


async def create_test_data(app_nam: str=None, user_info: tuple = None):
    async with await aiomysql.create_pool(**DB_CONFIG) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # 清空数据(可选)
                await cur.execute("DELETE FROM activation_keys")
                await cur.execute("DELETE FROM users")

                # 创建测试用户
                if user_info:
                    for user in user_info:
                        await create_user(cur, *user)
                else:
                    await create_user(cur, "user1", "1", "正常")
                    await create_user(cur, "user2", "1", "锁定")

                # 为第一个用户生成激活码
                user1_id = await get_user_id(cur, "user1")
                await create_activation_keys(cur, user1_id, app_nam)

                print("测试数据创建完成!")


async def create_user(cur, username, password, status):
    # 加密密码(简化处理,实际应使用bcrypt)
    password_hash = fernet.encrypt(password.encode()).decode()

    await cur.execute(
        "INSERT INTO users (username, password_hash, status) VALUES (%s, %s, %s)",
        (username, password_hash, status)
    )
    print(f"创建用户:{username}")


async def get_user_id(cur, username):
    await cur.execute("SELECT id FROM users WHERE username = %s", (username,))
    return (await cur.fetchone())[0]


async def create_activation_keys(cur, user_id, app_name):
    key_types = [1, 2, 3, 4]  # 四种类型各生成一个
    for key_type in key_types:
        key_code = generate_key_code()
        await cur.execute(
            "INSERT INTO activation_keys (user_id, app_name, key_code, type, status, creation_time) VALUES (%s, %s, %s, %s, %s, %s)",
            (user_id, app_name, key_code, key_type, "激活")
        )
        print(f"创建激活码:{key_code} (类型{key_type})")


def generate_key_code() -> str:
    """生成唯一激活码(示例格式:KEY-XXXX-XXXX)"""
    text = str(uuid.uuid4())
    md5 = hashlib.new('md5', str(text).encode('utf-8')).hexdigest()[0:8].upper()
    return f"KEY-{random.randint(1000, 9999)}{md5}"


if __name__ == "__main__":
    app_name = "test_app"
    user_set = (
        ("user1", "1", "正常"),
        ("user2", "1", "正常"),
        ("user3", "1", "正常"),
        ("user4", "1", "正常"),
        ("user5", "1", "锁定"),
    )
    asyncio.run(create_test_data(app_name, user_set))

 

配置文件

一起写的,都是串起来引用的,实际肯定是要分开的

# 密钥
FERNET_KEY = b'NieREysz8S3G-KTXWMV8l81sY5hWIJ-5r_cuAwWubfE='
# 服务器地址
SERVER_URL = "http://localhost:8000/api/"
# 应用名称
APP_NAME = "test_app"
# 数据库配置
DB_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': 'root',
    'db': 'key',
    'autocommit': True,
    'minsize': 1,
    'maxsize': 5
}
# 纯文本文件路径
ACTIVATION_FILE = "ACTIVATION"

# SSL证书和私钥文件路径
SSL_CERT_FILE = "./server.crt"
SSL_KEY_FILE = "./server.key"

 

数据库

我这里没有添加激活码过期时间,实际应用中肯定是需要的

DROP TABLE IF EXISTS `activation_keys`;
CREATE TABLE `activation_keys`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL COMMENT '用户id',
  `app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'app名',
  `key_code` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '激活码',
  `status` enum('未激活','激活','锁定') CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '未激活' COMMENT '状态',
  `type` int(11) NOT NULL COMMENT '类型;[1,2,3,4];多机器多IP: 1, 多机器单IP: 2,单机器多IP: 3,单机器单IP: 4',
  `last_used_fingerprint` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '指纹',
  `last_used_ip` varchar(15) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '最后登录ip',
  `last_used_time` datetime NULL DEFAULT NULL COMMENT '最后登录时间',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `key_code`(`key_code` ASC) USING BTREE,
  INDEX `user_id`(`user_id` ASC) USING BTREE,
  CONSTRAINT `activation_keys_ibfk_1` FOREIGN KEY (`user_id`) REFERENCES `users` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT
) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

DROP TABLE IF EXISTS `users`;
CREATE TABLE `users`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `password_hash` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `status` enum('正常','锁定') CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT '正常',
  `created_at` datetime NULL DEFAULT CURRENT_TIMESTAMP,
  `last_login` datetime NULL DEFAULT NULL,
  `last_login_ip` varchar(15) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `username`(`username` ASC) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;


INSERT INTO `activation_keys` VALUES (1, 1, 'test_app', 'KEY-4589114ADCAD', '激活', 1, NULL, NULL, NULL, '2025-03-28 16:35:19', NULL);
INSERT INTO `activation_keys` VALUES (2, 1, 'test_app', 'KEY-694030F9AB25', '激活', 2, NULL, NULL, NULL, '2025-03-28 16:35:19', NULL);
INSERT INTO `activation_keys` VALUES (3, 1, 'test_app', 'KEY-2245774406B2', '激活', 3, NULL, NULL, NULL, '2025-03-28 16:35:19', NULL);
INSERT INTO `activation_keys` VALUES (4, 1, 'test_app', 'KEY-289219C94AD9', '激活', 4, NULL, NULL, NULL, '2025-03-28 16:35:19', NULL);
INSERT INTO `activation_keys` VALUES (5, 2, 'test_app', 'KEY-89866730F732', '激活', 1, NULL, NULL, NULL, '2025-03-28 16:35:19', NULL);
INSERT INTO `activation_keys` VALUES (6, 2, 'test_app', 'KEY-80079478EC59', '激活', 2, NULL, NULL, NULL, '2025-03-28 16:35:19', NULL);
INSERT INTO `activation_keys` VALUES (7, 2, 'test_app', 'KEY-2027C0F250E0', '激活', 3, NULL, NULL, NULL, '2025-03-28 16:35:19', NULL);
INSERT INTO `activation_keys` VALUES (8, 2, 'test_app', 'KEY-8778C8E997C2', '激活', 4, NULL, NULL, NULL, '2025-03-28 16:35:19', NULL);

INSERT INTO `users` VALUES (1, 'user1', 'gAAAAABn5l9H8qWH7KrcOXR4h62TmwD6PHRdB1wp0lA4dZo0j7CUhNySFSuf56EgOR1TpFpWS4GEOFJ0JBcmD7NtfPPsnXmbvg==', '正常', '2025-03-28 16:35:19', NULL, NULL);
INSERT INTO `users` VALUES (2, 'user2', 'gAAAAABn5l9HJKyVasZ2PJQerwFwe3LYN4cNNYarovnhFncOXr5M0PRTqCgGdHOhhoiSRAwUMURCaCphBOBp8YtKoEQqhtKn5w==', '正常', '2025-03-28 16:35:19', NULL, NULL);
INSERT INTO `users` VALUES (3, 'user3', 'gAAAAABn5l9H-i50eRe7hj4L0DeUHf35t4bq_n4Xog7_Y5OrhaM9Sh_Aczz_N7p6azCBuNE6ey8fFWaNPn9Xq86YxS08mlWeLQ==', '正常', '2025-03-28 16:35:19', NULL, NULL);
INSERT INTO `users` VALUES (4, 'user4', 'gAAAAABn5l9Hzegxt_3nDUe0VsE-gfUaeNX2d2PRAoMVBkyj6GJC9KYNgAGfLUxV8oPEFxBWRLgMBingoxOs2yol3Q7dgmMDNQ==', '正常', '2025-03-28 16:35:19', NULL, NULL);
INSERT INTO `users` VALUES (5, 'user5', 'gAAAAABn5l9HzJBW3dgKAbQ4dac8q9re_I2vjGDqj3QHUzwhDjs5LjYkS7PDUZwwnDgAk1Rvzf8kDg6aJFYpN8rEHv7HEHRIsA==', '锁定', '2025-03-28 16:35:19', NULL, NULL);

 

 

YXN-python

2024-12-28