diff --git a/requirements.txt b/requirements.txt index 550d49f..e518254 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ zmq>=0.0.0 pywin32>=311 loguru>=0.7.3 debugpy>=1.8.20 +PyQt5-stubs>=5.15.6.0 diff --git a/src/lib_zmq_plugins/client/zmq_client.py b/src/lib_zmq_plugins/client/zmq_client.py index ded70e5..9ddbd64 100644 --- a/src/lib_zmq_plugins/client/zmq_client.py +++ b/src/lib_zmq_plugins/client/zmq_client.py @@ -104,31 +104,6 @@ def connect(self) -> None: # 启动后立即发一次心跳作为探测 self._probe_connection() - @property - def is_connected(self) -> bool: - """当前连接状态""" - return self._is_connected - - @property - def reconnect_count(self) -> int: - """重连次数""" - return self._reconnect_count - - @property - def endpoint(self) -> str: - """当前端点地址""" - return self._endpoint - - @property - def pub_endpoint(self) -> str: - """PUB 端点地址""" - return self._pub_endpoint - - @property - def ctrl_endpoint(self) -> str: - """CTRL 端点地址""" - return self._ctrl_endpoint - def disconnect(self) -> None: self._stopped.set() if self._thread and self._thread.is_alive(): @@ -145,6 +120,8 @@ def _create_sockets(self) -> None: return self._sub_socket = self._ctx.socket(zmq.SUB) self._dealer_socket = self._ctx.socket(zmq.DEALER) + if self._dealer_socket is None: + return self._dealer_socket.setsockopt_string(zmq.IDENTITY, uuid.uuid4().hex) # 启用 ZMQ 原生心跳,自动检测服务端断开 @@ -154,7 +131,8 @@ def _create_sockets(self) -> None: self._dealer_socket.setsockopt(zmq.HEARTBEAT_IVL, 5000) self._dealer_socket.setsockopt(zmq.HEARTBEAT_TIMEOUT, 5000) self._dealer_socket.setsockopt(zmq.HEARTBEAT_TTL, 10000) - + if self._sub_socket is None: + return self._sub_socket.connect(self._pub_endpoint) self._dealer_socket.connect(self._ctrl_endpoint) @@ -202,7 +180,8 @@ def _request_snapshot(self, topic: str) -> None: try: self._dealer_socket.send(payload) except zmq.ZMQError: - self._log.warning("Failed to send sync request for topic: %s", topic) + self._log.warning( + "Failed to send sync request for topic: %s", topic) self._sync_topics.pop(rid, None) # ── 指令发送(可在任意线程调用) ── @@ -231,7 +210,7 @@ def request( def _poll_loop(self) -> None: while not self._stopped.is_set(): try: - events = self._poller.poll(timeout=200) + events = self._poller.poll(timeout=200) # type: ignore except zmq.ZMQError: self._handle_reconnect(0.1) continue @@ -275,7 +254,7 @@ def _probe_connection(self) -> bool: def _handle_sub_message(self) -> None: try: - msg = self._sub_socket.recv_multipart(zmq.NOBLOCK) + msg = self._sub_socket.recv_multipart(zmq.NOBLOCK) # type: ignore except zmq.Again: return if len(msg) < 2: @@ -293,7 +272,8 @@ def _handle_sub_message(self) -> None: def _handle_dealer_message(self) -> None: try: - msg = self._dealer_socket.recv_multipart(zmq.NOBLOCK) + msg = self._dealer_socket.recv_multipart( # type: ignore + zmq.NOBLOCK) except zmq.Again: return if len(msg) < 2: @@ -312,7 +292,8 @@ def _handle_dealer_message(self) -> None: snapshot = self._serializer.decode_event(resp.data) self._notify_subscribers(topic, snapshot) except Exception: - self._log.warning("Failed to decode snapshot", exc_info=True) + self._log.warning( + "Failed to decode snapshot", exc_info=True) else: self._sync_topics.pop(resp.request_id, None) diff --git a/src/lib_zmq_plugins/serializer.py b/src/lib_zmq_plugins/serializer.py index e66a8ab..f11ec93 100644 --- a/src/lib_zmq_plugins/serializer.py +++ b/src/lib_zmq_plugins/serializer.py @@ -29,7 +29,7 @@ def _make_union(types: list[type]) -> type: """将类型列表转为 Union 类型,供 msgspec 多态反序列化使用""" if len(types) == 1: return types[0] - return Union[tuple(types)] + return Union[tuple(types)] # type: ignore class Serializer: diff --git a/src/lib_zmq_plugins/server/zmq_server.py b/src/lib_zmq_plugins/server/zmq_server.py index 1b89858..3051e53 100644 --- a/src/lib_zmq_plugins/server/zmq_server.py +++ b/src/lib_zmq_plugins/server/zmq_server.py @@ -46,7 +46,8 @@ def __init__(self, endpoint: str, log_handler: LogHandler | None = None) -> None self._pub_endpoint, self._ctrl_endpoint = _derive_endpoints(endpoint) self._serializer = Serializer() self._serializer.register_command_types(SyncCommand) - self._handlers: dict[str, Callable[[BaseCommand], CommandResponse | None]] = {} + self._handlers: dict[str, Callable[[ + BaseCommand], CommandResponse | None]] = {} self._snapshot_providers: dict[str, Callable[[], BaseEvent]] = {} self._log: LogHandler = log_handler or NullHandler() @@ -89,7 +90,8 @@ def start(self) -> None: self._ctx = zmq.Context() self._pub_socket = self._ctx.socket(zmq.PUB) self._router_socket = self._ctx.socket(zmq.ROUTER) - + if self._pub_socket is None or self._router_socket is None: + raise RuntimeError("Failed to create socket") # 启用 ZMQ 原生心跳,与客户端匹配 # HEARTBEAT_IVL: 每 5 秒发送心跳 # HEARTBEAT_TIMEOUT: 5 秒内没收到回复视为断连 @@ -108,7 +110,8 @@ def start(self) -> None: self._thread = threading.Thread(target=self._poll_loop, daemon=True) self._thread.start() - self._log.info("Server started: pub=%s, ctrl=%s", self._pub_endpoint, self._ctrl_endpoint) + self._log.info("Server started: pub=%s, ctrl=%s", + self._pub_endpoint, self._ctrl_endpoint) def stop(self) -> None: self._stopped.set() @@ -141,14 +144,15 @@ def publish(self, topic: type[BaseEvent], event: BaseEvent) -> None: def _poll_loop(self) -> None: while not self._stopped.is_set(): try: - events = self._poller.poll(timeout=100) + events = self._poller.poll(timeout=100) # type: ignore except zmq.ZMQError: break for socket, _ in events: if socket is self._router_socket: try: - msg = self._router_socket.recv_multipart(zmq.NOBLOCK) + msg = self._router_socket.recv_multipart( # type: ignore + zmq.NOBLOCK) except zmq.Again: continue if len(msg) < 2: @@ -158,7 +162,8 @@ def _poll_loop(self) -> None: try: cmd = self._serializer.decode_command(payload) except Exception: - self._log.warning("Failed to decode command", exc_info=True) + self._log.warning( + "Failed to decode command", exc_info=True) continue self._dispatch(client_id, cmd) @@ -167,11 +172,12 @@ def _dispatch(self, client_id: bytes, cmd: BaseCommand) -> None: if isinstance(tag, type): tag = tag.__name__ tag = str(tag) - - self._log.info("[Server] 收到命令: tag=%s, request_id=%s", tag, cmd.request_id) + + self._log.info("[Server] 收到命令: tag=%s, request_id=%s", + tag, cmd.request_id) if tag == "__sync__": - self._handle_sync(client_id, cmd) + self._handle_sync(client_id, cmd) # type: ignore return handler = self._handlers.get(tag) @@ -181,7 +187,8 @@ def _dispatch(self, client_id: bytes, cmd: BaseCommand) -> None: try: result = handler(cmd) - self._log.info("[Server] handler 执行完成: tag=%s, result=%s", tag, result) + self._log.info( + "[Server] handler 执行完成: tag=%s, result=%s", tag, result) except Exception as e: self._log.error("Handler error for %s: %s", tag, e, exc_info=True) if cmd.request_id: @@ -211,7 +218,8 @@ def _handle_sync(self, client_id: bytes, cmd: SyncCommand) -> None: request_id=cmd.request_id, success=True, data=payload ) except Exception as e: - self._log.error("Snapshot provider error for %s: %s", topic, e, exc_info=True) + self._log.error( + "Snapshot provider error for %s: %s", topic, e, exc_info=True) resp = CommandResponse( request_id=cmd.request_id, success=False, error=str(e) ) @@ -225,4 +233,5 @@ def _send_to_client(self, client_id: bytes, resp: CommandResponse) -> None: [client_id, b"", self._serializer.encode_response(resp)] ) except zmq.ZMQError: - self._log.warning("Failed to send response to client", exc_info=True) + self._log.warning( + "Failed to send response to client", exc_info=True) diff --git a/src/mineSweeperGUI.py b/src/mineSweeperGUI.py index ce6ac5a..ea56818 100644 --- a/src/mineSweeperGUI.py +++ b/src/mineSweeperGUI.py @@ -278,7 +278,7 @@ def game_state(self, game_state: str): current_status=state_map.get(game_state, 0), ) GameServerBridge.instance().send_event(event) - self._send_board_update_event() + self._send_board_update_event() @property def row(self): diff --git a/src/plugin_manager/config_widget.py b/src/plugin_manager/config_widget.py index 3b4e6a4..b7568bb 100644 --- a/src/plugin_manager/config_widget.py +++ b/src/plugin_manager/config_widget.py @@ -67,6 +67,10 @@ def _setup_ui(self) -> None: return for name, config_field in fields.items(): + # 跳过不可见的配置项(用于插件存储私有数据) + if not config_field.visible: + continue + # 使用 config_field 自己的 create_widget 方法 widget = config_field.create_widget() diff --git a/src/plugin_manager/plugin_loader.py b/src/plugin_manager/plugin_loader.py index 752b0dc..eb923a9 100644 --- a/src/plugin_manager/plugin_loader.py +++ b/src/plugin_manager/plugin_loader.py @@ -28,7 +28,7 @@ class PluginLoader: - 实例化插件类 """ - def __init__(self, plugin_dirs: list[str | Path] | None = None): + def __init__(self, plugin_dirs: list[Path] | None = None): self._plugin_dirs: list[Path] = [] self._added_paths: set[Path] = set() # 已添加到 sys.path 的目录 if plugin_dirs: diff --git a/src/plugin_manager/plugin_manager.py b/src/plugin_manager/plugin_manager.py index 68726b3..e73ae5c 100644 --- a/src/plugin_manager/plugin_manager.py +++ b/src/plugin_manager/plugin_manager.py @@ -6,6 +6,8 @@ """ from __future__ import annotations +import atexit +import signal import threading from pathlib import Path from typing import TYPE_CHECKING, Any @@ -48,18 +50,21 @@ def critical(self, msg: str, /, *args: object, **kwargs: object) -> None: class PluginManager: """ 插件管理器 - + 核心设计: - 所有插件共享一个 ZMQClient - 事件通过 EventDispatcher 内部分发给各插件 - 支持动态加载插件 - 拥有独立的 PyQt 主窗口 """ - + + # 共享的 Manager 实例(用于信号处理器和 atexit 回调) + _instance: "PluginManager | None" = None + def __init__( self, endpoint: str, - plugin_dirs: list[str | Path] | None = None, + plugin_dirs: list[Path] | None = None, log_handler: LogHandler | None = None, ): self._endpoint = endpoint @@ -81,155 +86,195 @@ def __init__( ) self._dispatcher = EventDispatcher() self._loader = PluginLoader(plugin_dirs) - + # 插件管理 self._plugins: dict[str, BasePlugin] = {} self._plugins_lock = threading.RLock() - + # 主窗口 self._main_window = None self._app = None - + # 注册类型 self._client.register_event_types(*EVENT_TYPES) self._client.register_command_types(*COMMAND_TYPES) - + self._started = False - + self._shutdown_requested = False + + # 注册自身为共享实例 + PluginManager._instance = self + + # 注册信号处理和 atexit + self._register_shutdown_handlers() + # ═══════════════════════════════════════════════════════════════════ # 属性 # ═══════════════════════════════════════════════════════════════════ - + @property def client(self) -> ZMQClient: return self._client - + @property def dispatcher(self) -> EventDispatcher: return self._dispatcher - + @property def plugins(self) -> dict[str, BasePlugin]: return self._plugins.copy() - + @property def main_window(self): return self._main_window - + @property def is_connected(self) -> bool: """当前连接状态""" return self._client.is_connected - + @property def reconnect_count(self) -> int: """重连次数""" return self._client.reconnect_count - + @property def connection_endpoint(self) -> str: """连接端点地址""" return self._endpoint - + # ═══════════════════════════════════════════════════════════════════ # 生命周期 # ═══════════════════════════════════════════════════════════════════ - + + def _register_shutdown_handlers(self) -> None: + """注册信号处理和 atexit 回调,确保优雅退出""" + + def handle_signal(signum: int, frame) -> None: + sig_name = signal.Signals(signum).name + logger.info(f"收到信号 {sig_name},开始优雅关闭...") + self._graceful_shutdown() + + # 注册 SIGTERM 信号处理(父进程 terminate() 会发送此信号) + signal.signal(signal.SIGTERM, handle_signal) + + # 注册 SIGINT 信号处理(Ctrl+C) + signal.signal(signal.SIGINT, handle_signal) + + # 注册 atexit 回调作为兜底 + atexit.register(self._graceful_shutdown) + + def _graceful_shutdown(self) -> None: + """优雅关闭:调用所有插件的 shutdown 方法,然后清理资源""" + if self._shutdown_requested: + return + self._shutdown_requested = True + + logger.info("开始优雅关闭...") + + # 1. 关闭所有插件(让插件保存状态等) + self._shutdown_plugins() + + # 2. 断开 ZMQ 连接 + try: + self._client.disconnect() + except Exception as e: + logger.error(f"断开 ZMQ 连接失败: {e}") + + # 3. 清理事件分发器 + self._dispatcher.clear() + + self._started = False + logger.info("优雅关闭完成") + def start(self) -> None: """启动插件管理器(后台模式,无界面)""" if self._started: return - + self._load_plugins() self._client.connect() self._setup_zmq_subscriptions() self._initialize_plugins() self._validate_control_authorizations() - + self._started = True logger.info("Plugin manager started") - + def _validate_control_authorizations(self) -> None: """验证控制授权配置,清除无效插件的授权""" from plugin_sdk.control_auth import ControlAuthorizationManager - + auth_manager = ControlAuthorizationManager.instance() - + # 获取已加载的插件名称 loaded_plugins = set(self._plugins.keys()) - + # 验证并清除无效授权 removed = auth_manager.validate_authorizations(loaded_plugins) - + if removed: logger.warning(f"控制授权已清除(插件未加载): {removed}") - + # 保存更新后的配置 auth_manager.save() - + def stop(self) -> None: - """停止插件管理器""" - if not self._started: - return - - self._shutdown_plugins() - self._client.disconnect() - self._dispatcher.clear() - - self._started = False - logger.info("Plugin manager stopped") - - def start_with_gui(self, app: QApplication = None, *, show_main_window: bool = True) -> None: + """停止插件管理器(调用优雅关闭)""" + self._graceful_shutdown() + + def start_with_gui(self, app: QApplication = None, *, show_main_window: bool = True) -> None: # type: ignore """ 启动插件管理器并显示主界面 - + Args: app: QApplication 实例,如果不提供则创建新的 show_main_window: 是否显示主窗口(False 时仅在托盘运行) """ from PyQt5.QtWidgets import QApplication from .main_window import PluginManagerWindow - + # 创建或使用现有的 QApplication if app is None: app = QApplication.instance() if app is None: app = QApplication([]) - + self._app = app - + # 启动核心功能 self.start() - + # 创建主窗口(始终创建以支持托盘图标) self._main_window = PluginManagerWindow(self) self._main_window.setWindowTitle(f"插件管理器 - {self._endpoint}") if show_main_window: self._main_window.show() - - logger.info(f"Plugin manager started with GUI (window={show_main_window})") - + + logger.info( + f"Plugin manager started with GUI (window={show_main_window})") + def exec_gui(self, *, show_main_window: bool = True) -> int: """ 启动 GUI 事件循环 - + Args: show_main_window: 是否显示主窗口(False 时仅在托盘运行) - + Returns: 退出代码 """ if self._app is None: self.start_with_gui(show_main_window=show_main_window) - - result = self._app.exec_() + + result = self._app.exec_() # type: ignore self.stop() return result - + # ═══════════════════════════════════════════════════════════════════ # ZMQ 订阅(使用事件类) # ═══════════════════════════════════════════════════════════════════ - + def _setup_zmq_subscriptions(self) -> None: """设置 ZMQ 事件订阅""" for event_type in EVENT_TYPES: @@ -239,25 +284,25 @@ def _setup_zmq_subscriptions(self) -> None: event_type, lambda event, t=tag: self._dispatcher.dispatch(t, event), ) - + # ═══════════════════════════════════════════════════════════════════ # 插件管理 # ═══════════════════════════════════════════════════════════════════ - + def add_plugin_dir(self, path: str | Path) -> None: self._loader.add_plugin_dir(path) - + def _load_plugins(self) -> None: plugins = self._loader.load_all() - + with self._plugins_lock: for plugin in plugins: self._plugins[plugin.name] = plugin plugin.set_client(self._client) plugin.set_event_dispatcher(self._dispatcher) - + logger.info(f"Loaded {len(plugins)} plugin(s)") - + def _initialize_plugins(self) -> None: with self._plugins_lock: for plugin in self._plugins.values(): @@ -266,8 +311,9 @@ def _initialize_plugins(self) -> None: plugin.initialize() logger.info(f"Initialized plugin: {plugin.name}") except Exception as e: - logger.error(f"Failed to initialize plugin {plugin.name}: {e}", exc_info=True) - + logger.error( + f"Failed to initialize plugin {plugin.name}: {e}", exc_info=True) + def _shutdown_plugins(self) -> None: with self._plugins_lock: for plugin in self._plugins.values(): @@ -275,29 +321,30 @@ def _shutdown_plugins(self) -> None: plugin.shutdown() logger.info(f"Shutdown plugin: {plugin.name}") except Exception as e: - logger.error(f"Failed to shutdown plugin {plugin.name}: {e}", exc_info=True) - + logger.error( + f"Failed to shutdown plugin {plugin.name}: {e}", exc_info=True) + def get_plugin(self, name: str) -> BasePlugin | None: return self._plugins.get(name) - + def enable_plugin(self, name: str) -> bool: plugin = self._plugins.get(name) if plugin: plugin.enable() return True return False - + def disable_plugin(self, name: str) -> bool: plugin = self._plugins.get(name) if plugin: plugin.disable() return True return False - + # ═══════════════════════════════════════════════════════════════════ # 界面管理 # ═══════════════════════════════════════════════════════════════════ - + def get_plugin_widgets(self) -> dict[str, QWidget]: widgets = {} with self._plugins_lock: @@ -305,21 +352,21 @@ def get_plugin_widgets(self) -> dict[str, QWidget]: if plugin.widget: widgets[name] = plugin.widget return widgets - + # ═══════════════════════════════════════════════════════════════════ # ZMQ 回调 # ═══════════════════════════════════════════════════════════════════ - + def _on_connected(self) -> None: logger.info("Connected to main process") if self._main_window: self._main_window.set_connected(True) - + def _on_disconnected(self) -> None: logger.warning("Disconnected from main process") if self._main_window: self._main_window.set_connected(False) - + def __repr__(self) -> str: return f"" @@ -332,18 +379,18 @@ def run_plugin_manager_process( ) -> int: """ 在独立进程中运行插件管理器 - + Args: endpoint: ZMQ Server 地址 plugin_dirs: 插件目录列表 with_gui: 是否显示界面(False 为完全无界面后台模式) show_main_window: 是否显示主窗口(False 时 GUI 仅显示托盘图标) - + Returns: 退出代码 """ manager = PluginManager( - endpoint=endpoint, plugin_dirs=plugin_dirs, log_handler=_LogHandler() + endpoint=endpoint, plugin_dirs=plugin_dirs, log_handler=_LogHandler() # type: ignore ) try: diff --git a/src/plugin_sdk/config_types/base_config.py b/src/plugin_sdk/config_types/base_config.py index 3d4ed68..1c541e2 100644 --- a/src/plugin_sdk/config_types/base_config.py +++ b/src/plugin_sdk/config_types/base_config.py @@ -11,7 +11,7 @@ from typing import Any, Callable, ClassVar, Generic, TypeVar from PyQt5.QtWidgets import QWidget -from PyQt5.QtCore import pyqtSignal, QObject +from PyQt5.QtCore import pyqtBoundSignal, pyqtSignal, QObject T = TypeVar("T") @@ -49,7 +49,7 @@ def __init__( widget: QWidget, getter: Callable[[], Any], setter: Callable[[Any], None], - signal: QObject, + signal: pyqtBoundSignal, parent: QWidget | None = None, ): super().__init__(parent) @@ -89,6 +89,7 @@ class BaseConfig(ABC, Generic[T]): label: 显示标签 description: tooltip 提示 validator: 自定义验证函数 + visible: 是否在 UI 中展示(默认 True,设为 False 可隐藏) 类属性: widget_type: UI 控件类型标识,由工厂使用 @@ -98,6 +99,7 @@ class BaseConfig(ABC, Generic[T]): label: str = "" description: str = "" validator: Callable[[T], bool] | None = None + visible: bool = True # 是否在 UI 中展示 # 类变量:用于 UI 工厂识别 widget_type: ClassVar[str] = "base" diff --git a/src/plugin_sdk/plugin_base.py b/src/plugin_sdk/plugin_base.py index ab2c89e..9eefbe1 100644 --- a/src/plugin_sdk/plugin_base.py +++ b/src/plugin_sdk/plugin_base.py @@ -10,8 +10,11 @@ """ from __future__ import annotations +from pathlib import Path + + from .service_registry import ServiceNotFoundError -from lib_zmq_plugins.shared.base import BaseEvent, get_event_tag +from lib_zmq_plugins.shared.base import BaseEvent, CommandResponse, get_event_tag from PyQt5.QtGui import QIcon, QPixmap, QPainter, QPen, QColor, QBrush, QFont from PyQt5.QtCore import Qt, QThread, QObject, pyqtSignal, pyqtSlot @@ -29,7 +32,7 @@ if TYPE_CHECKING: from .config_types import OtherInfoBase - + from plugin_manager.logging_setup import LogConfig if TYPE_CHECKING: from PyQt5.QtGui import QIcon @@ -77,8 +80,8 @@ def make_plugin_icon( p.setBrush(Qt.NoBrush) # type: ignore[attr-defined] font = QFont("Segoe UI Emoji", int(size * 0.44), QFont.Bold) p.setFont(font) - p.drawText(pix.rect(), Qt.AlignCenter | Qt.AlignVCenter, - symbol) # type: ignore[attr-defined] + p.drawText(pix.rect(), Qt.AlignCenter | Qt.AlignVCenter, # type: ignore[attr-defined] + symbol) p.end() return QIcon(pix) @@ -249,7 +252,7 @@ def __init__(self, info: PluginInfo): # 连接 gui_call 信号到槽(QueuedConnection 跨线程安全) self.gui_call.connect( - self._on_gui_call, Qt.ConnectionType.QueuedConnection) + self._on_gui_call, Qt.ConnectionType.QueuedConnection) # type: ignore # 每个插件拥有独立的 loguru logger(日志写入 plugins/.log) from plugin_manager.logging_setup import get_plugin_logger diff --git a/src/plugin_sdk/server_bridge.py b/src/plugin_sdk/server_bridge.py index 720ead3..0347120 100644 --- a/src/plugin_sdk/server_bridge.py +++ b/src/plugin_sdk/server_bridge.py @@ -7,13 +7,13 @@ # 初始化 bridge = GameServerBridge.instance() - + # 注册指令处理器(在 start 之前) bridge.register_handler(NewGameCommand, my_handler) - + # 启动服务 bridge.start() - + # 发送事件 bridge.send_event(event) """ @@ -44,24 +44,25 @@ class GameServerBridge(QObject): """ 游戏服务端桥接器(全局单例) - + 只负责 ZMQ 通信层封装,不绑定任何业务逻辑。 指令处理器由外部注册。 - + 处理器自动在主线程中执行(通过信号槽机制)。 """ - + # 内部信号:用于调度到主线程 - _execute_signal = pyqtSignal(object, object, object) # (handler, cmd, future_or_none) - + # (handler, cmd, future_or_none) + _execute_signal = pyqtSignal(object, object, object) + _instance: GameServerBridge | None = None - + def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = QObject.__new__(cls) cls._instance._initialized = False return cls._instance - + @classmethod def instance( cls, @@ -70,11 +71,11 @@ def instance( ) -> GameServerBridge: """ 获取全局单例 - + Args: endpoint: ZMQ端点地址 log_handler: 日志处理器 - + Returns: GameServerBridge 实例 """ @@ -99,22 +100,23 @@ def __init__( self._endpoint = endpoint self._server = ZMQServer(endpoint=endpoint, log_handler=log_handler) - + # 保存主线程引用 self._main_thread = threading.main_thread() - + # 保存处理器 self._handlers: dict[str, Callable] = {} - + # 连接内部信号 - self._execute_signal.connect(self._on_execute, Qt.QueuedConnection) + self._execute_signal.connect( + self._on_execute, Qt.QueuedConnection) # type: ignore # 注册类型 self._server.register_event_types(*EVENT_TYPES) self._server.register_command_types(*COMMAND_TYPES) - + self._initialized = True - + @property def endpoint(self) -> str: return self._endpoint @@ -123,7 +125,7 @@ def endpoint(self) -> str: def server(self) -> ZMQServer: """获取底层 ZMQ Server 实例""" return self._server - + def _on_execute( self, handler: Callable, @@ -147,29 +149,29 @@ def register_handler( ) -> None: """ 注册指令处理器 - + 处理器会自动在主线程中执行。 - + Args: command_type: 指令类型 handler: 处理函数,接收指令,返回响应 - + Usage:: - + def handle_new_game(cmd: NewGameCommand) -> CommandResponse: # 处理逻辑(cmd 类型被正确推断) return CommandResponse(request_id=cmd.request_id, success=True) - + bridge.register_handler(NewGameCommand, handle_new_game) """ # 获取 tag tag = command_type.__struct_config__.tag str_tag = str(tag) logger.info(f"注册处理器: tag={tag}, command_type={command_type.__name__}") - + # 保存 handler self._handlers[str_tag] = handler - + # 注册到 server def wrapped_handler(cmd: _C) -> CommandResponse | None: if cmd.request_id: @@ -181,8 +183,9 @@ def wrapped_handler(cmd: _C) -> CommandResponse | None: # 异步命令:不等待结果 self._execute_signal.emit(handler, cmd, None) return None - - self._server.register_handler(command_type, wrapped_handler) + + self._server.register_handler( + command_type, wrapped_handler) # type: ignore logger.info(f"处理器已注册: tag={tag}") def start(self) -> None: @@ -194,11 +197,11 @@ def stop(self) -> None: """停止服务""" self._server.stop() logger.info("Game server bridge stopped") - + def send_event(self, event: BaseEvent) -> None: """ 发送事件到客户端 - + Args: event: 事件对象 """ diff --git a/src/plugins/hello_world.py b/src/plugins/hello_world.py deleted file mode 100644 index eebeeaf..0000000 --- a/src/plugins/hello_world.py +++ /dev/null @@ -1,360 +0,0 @@ -""" -Hello World 示例插件 - -演示基本的事件订阅、pyqtSignal 跨线程 GUI 更新。 -""" -from __future__ import annotations - -from typing import Any - -from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QTextEdit, QDial -from PyQt5.QtCore import pyqtSignal, Qt - -from plugin_sdk import ( - BasePlugin, - PluginInfo, - make_plugin_icon, - WindowMode, - OtherInfoBase, - BoolConfig, - IntConfig, - FloatConfig, - ChoiceConfig, - TextConfig, - ColorConfig, - FileConfig, - PathConfig, - LongTextConfig, - RangeConfig, - BaseConfig, - ConfigWidgetBase, - ConfigWidgetWrapper, -) -from shared_types.events import VideoSaveEvent - - -# ═══════════════════════════════════════════════════════════════════ -# 自定义配置类型示例 -# ═══════════════════════════════════════════════════════════════════ - - -class DialConfig(BaseConfig[int]): - """ - 自定义配置类型: 旋钮控件 → QDial - - 演示如何继承 BaseConfig 创建自定义配置类型 - """ - widget_type = "dial" - - def __init__( - self, - default: int = 50, - label: str = "", - min_value: int = 0, - max_value: int = 100, - notch_step: int = 10, - **kwargs, - ): - super().__init__(default, label, **kwargs) - self.min_value = min_value - self.max_value = max_value - self.notch_step = notch_step - - def create_widget(self) -> ConfigWidgetBase: - """创建 QDial 控件""" - widget = QDial() - widget.setRange(self.min_value, self.max_value) - widget.setValue(int(self.default)) - widget.setNotchesVisible(True) - widget.setSingleStep(self.notch_step) - widget.setPageStep(self.notch_step * 2) - widget.setMinimumSize(60, 60) - widget.setMaximumSize(100, 100) - - if self.description: - widget.setToolTip(self.description) - - return ConfigWidgetWrapper(widget, widget.value, widget.setValue, widget.valueChanged) - - def to_storage(self, value: int) -> int: - return int(value) - - def from_storage(self, data: Any) -> int: - try: - return int(data) - except (ValueError, TypeError): - return int(self.default) - - -# ═══════════════════════════════════════════════════════════════════ -# 插件配置 - 包含所有配置类型示例 -# ═══════════════════════════════════════════════════════════════════ - - -class HelloPluginConfig(OtherInfoBase): - """ - Hello World 插件配置 - - 包含所有支持的配置类型示例 - """ - - # ── BoolConfig: 布尔值 → QCheckBox ──────────────── - enable_auto_log = BoolConfig( - default=True, - label="自动记录日志", - description="收到事件时自动记录到日志", - ) - show_timestamp = BoolConfig( - default=True, - label="显示时间戳", - description="在日志中显示时间戳", - ) - - # ── IntConfig: 整数 → QSpinBox ──────────────────── - max_log_lines = IntConfig( - default=100, - label="最大日志行数", - description="保留的最大日志行数", - min_value=10, - max_value=1000, - step=10, - ) - refresh_interval = IntConfig( - default=5, - label="刷新间隔(秒)", - description="自动刷新的间隔时间", - min_value=1, - max_value=60, - ) - - # ── FloatConfig: 浮点数 → QDoubleSpinBox ────────── - min_rtime_filter = FloatConfig( - default=0.0, - label="最小时间筛选(秒)", - description="只记录大于此时间的游戏", - min_value=0.0, - max_value=999.0, - decimals=2, - ) - zoom_factor = FloatConfig( - default=1.0, - label="缩放因子", - description="UI 缩放比例", - min_value=0.5, - max_value=2.0, - step=0.1, - decimals=1, - ) - - # ── ChoiceConfig: 选择 → QComboBox ──────────────── - log_level = ChoiceConfig( - default="INFO", - label="日志级别", - description="日志显示级别", - choices=[ - ("DEBUG", "DEBUG"), - ("INFO", "INFO"), - ("WARNING", "WARNING"), - ("ERROR", "ERROR"), - ], - ) - display_mode = ChoiceConfig( - default="compact", - label="显示模式", - choices=[ - ("compact", "紧凑"), - ("detailed", "详细"), - ("minimal", "极简"), - ], - ) - - # ── TextConfig: 文本 → QLineEdit ────────────────── - player_name = TextConfig( - default="", - label="玩家名称", - placeholder="输入玩家名称...", - ) - api_token = TextConfig( - default="", - label="API Token", - description="用于远程同步的认证令牌", - password=True, - placeholder="输入密钥...", - ) - - # ── ColorConfig: 颜色 → 颜色选择按钮 ────────────── - theme_color = ColorConfig( - default="#4CAF50", - label="主题颜色", - description="插件的主题颜色", - ) - highlight_color = ColorConfig( - default="#FF5722", - label="高亮颜色", - description="重要信息的高亮颜色", - ) - - # ── FileConfig: 文件 → 文件选择器 ──────────────── - export_file = FileConfig( - default="", - label="导出文件", - description="日志导出文件路径", - filter="Text Files (*.txt);;JSON Files (*.json)", - save_mode=True, - ) - import_file = FileConfig( - default="", - label="导入文件", - description="导入配置文件", - filter="JSON Files (*.json)", - ) - - # ── PathConfig: 目录 → 目录选择器 ──────────────── - log_directory = PathConfig( - default="", - label="日志目录", - description="日志文件保存目录", - ) - cache_directory = PathConfig( - default="", - label="缓存目录", - description="临时缓存文件目录", - ) - - # ── LongTextConfig: 多行文本 → QTextEdit ──────── - welcome_message = LongTextConfig( - default="欢迎使用 Hello World 插件!", - label="欢迎消息", - placeholder="输入欢迎消息...", - max_height=80, - ) - custom_script = LongTextConfig( - default="", - label="自定义脚本", - description="自定义处理脚本(Python 代码)", - placeholder="# 在此输入 Python 代码...", - max_height=120, - ) - - # ── RangeConfig: 数值范围 → 两个 QSpinBox ────── - rtime_range = RangeConfig( - default=(0, 300), - label="时间范围(秒)", - description="只记录此时间范围内的游戏", - min_value=0, - max_value=999, - ) - bbbv_range = RangeConfig( - default=(0, 999), - label="3BV 范围", - description="只记录此 3BV 范围内的游戏", - min_value=0, - max_value=9999, - ) - - # ── 自定义配置类型: DialConfig → QDial ───────────── - volume = DialConfig( - default=50, - label="音量", - description="自定义旋钮控件示例", - min_value=0, - max_value=100, - notch_step=10, - ) - sensitivity = DialConfig( - default=5, - label="灵敏度", - description="响应灵敏度设置", - min_value=1, - max_value=10, - notch_step=1, - ) - - -# ═══════════════════════════════════════════════════════════════════ -# UI 组件 -# ═══════════════════════════════════════════════════════════════════ - - -class HelloWidget(QWidget): - """简单的 UI 界面""" - - _update_signal = pyqtSignal(str) - - def __init__(self, parent=None): - super().__init__(parent) - self._count = 0 - - layout = QVBoxLayout(self) - - self._title = QLabel("Hello World Plugin") - self._title.setStyleSheet("font-size: 18px; font-weight: bold; padding: 10px;") - layout.addWidget(self._title) - - self._info = QLabel("Waiting for game data...") - layout.addWidget(self._info) - - self._log = QTextEdit() - self._log.setReadOnly(True) - layout.addWidget(self._log) - - self._update_signal.connect(self._append_log) - - def _append_log(self, text: str): - """Slot: executed on main thread""" - self._log.append(text) - self._count += 1 - self._info.setText(f"Received {self._count} record(s)") - - -# ═══════════════════════════════════════════════════════════════════ -# 插件主体 -# ═══════════════════════════════════════════════════════════════════ - - -class HelloPlugin(BasePlugin): - - @classmethod - def plugin_info(cls) -> PluginInfo: - return PluginInfo( - name="hello_world", - version="1.0.0", - author="Example", - description="Hello World - demonstrates event subscription and pyqtSignal GUI update", - icon=make_plugin_icon("#4CAF50", "H", 64), - window_mode=WindowMode.TAB, - other_info=HelloPluginConfig, - ) - - def _setup_subscriptions(self) -> None: - self.subscribe(VideoSaveEvent, self._on_video_save) - - def _create_widget(self) -> QWidget: - self._widget = HelloWidget() - return self._widget - - def on_initialized(self) -> None: - self.logger.info("HelloPlugin initialized") - if self.other_info: - self.logger.info(f"配置: {self.other_info.to_dict()}") - # 连接配置变化信号 - self.config_changed.connect(self._on_config_changed) - - def _on_config_changed(self, name: str, value) -> None: - """配置变化时的回调""" - self.logger.info(f"配置变化: {name} = {value}") - - def on_shutdown(self) -> None: - self.logger.info("HelloPlugin shutting down") - - def _on_video_save(self, event: VideoSaveEvent): - self.logger.info( - f"Game: time={event.rtime}s, level={event.level}, " - f"3BV={event.bbbv}, L={event.left} R={event.right}" - ) - info_text = ( - f"[{event.rtime:.2f}s] {event.level} | " - f"3BV={event.bbbv} | L={event.left} R={event.right}" - ) - # pyqtSignal emit -> auto QueuedConnection cross-thread to main thread - self._widget._update_signal.emit(info_text) \ No newline at end of file diff --git a/src/plugins/history/plugin.py b/src/plugins/history/plugin.py index 0445704..ca082a8 100644 --- a/src/plugins/history/plugin.py +++ b/src/plugins/history/plugin.py @@ -15,7 +15,7 @@ from plugin_sdk import ( BasePlugin, PluginInfo, make_plugin_icon, WindowMode, - OtherInfoBase, IntConfig, + OtherInfoBase, IntConfig, TextConfig, ChoiceConfig, ) from shared_types.events import VideoSaveEvent from plugins.services.history import HistoryService, GameRecord @@ -25,7 +25,7 @@ class HistoryConfig(OtherInfoBase): """历史记录插件配置""" - + float_decimals = IntConfig( default=2, label="小数位数", @@ -34,6 +34,39 @@ class HistoryConfig(OtherInfoBase): max_value=10, ) + # 隐藏字段:保存排序和过滤状态 + saved_filter = TextConfig( + default="[]", + label="保存的过滤条件", + visible=False, + ) + + saved_sort = TextConfig( + default="[]", + label="保存的排序条件", + visible=False, + ) + + saved_show_fields = TextConfig( + default="[]", + label="保存的列显示配置", + visible=False, + ) + + page_size = ChoiceConfig( + default="50", + label="每页条数", + choices=[ + ("10", "10"), + ("20", "20"), + ("50", "50"), + ("100", "100"), + ("200", "200"), + ("500", "500"), + ("1000", "1000"), + ], + ) + class HistoryPlugin(BasePlugin): """ @@ -44,6 +77,7 @@ class HistoryPlugin(BasePlugin): - 服务:提供 HistoryService 接口供其他插件查询历史记录 """ video_save_over = pyqtSignal() + _widget: HistoryMainWidget @classmethod def plugin_info(cls) -> PluginInfo: @@ -53,7 +87,7 @@ def plugin_info(cls) -> PluginInfo: author="ljzloser", version="1.0.0", icon=make_plugin_icon("#7b1fa2", "\N{SCROLL}"), - window_mode=WindowMode.TAB, + window_mode=WindowMode.TAB, # type: ignore other_info=HistoryConfig, ) @@ -66,16 +100,50 @@ def _setup_subscriptions(self) -> None: def _create_widget(self) -> QWidget: db_path = self.data_dir / "history.db" config_path = self.data_dir / "history_show_fields.json" - - # 获取配置中的小数位数 + + # 获取配置中的小数位数和每页条数 float_decimals = 2 + page_size = "50" if self.other_info: float_decimals = self.other_info.float_decimals - - self._widget = HistoryMainWidget(db_path, config_path, float_decimals) + page_size = self.other_info.page_size + + self._widget = HistoryMainWidget( + db_path, config_path, float_decimals, page_size) + + # 连接排序和过滤状态变化信号 + self._widget.filter_sort_state_changed.connect( + self._on_filter_sort_state_changed) + + # 连接列显示配置变化信号 + self._widget.show_fields_changed.connect( + self._on_show_fields_changed) + + # 恢复保存的排序和过滤状态 + if self.other_info: + self._widget.set_filter_sort_state( + self.other_info.saved_filter, + self.other_info.saved_sort + ) + # 恢复保存的列显示配置 + self._widget.restore_show_fields(self.other_info.saved_show_fields) + self.video_save_over.connect(self._widget.query_button.click) return self._widget + def _on_filter_sort_state_changed(self, filter_json: str, sort_json: str) -> None: + """保存排序和过滤状态""" + if self.other_info: + self.other_info.saved_filter = filter_json + self.other_info.saved_sort = sort_json + self.save_config() + + def _on_show_fields_changed(self, show_fields_json: str) -> None: + """保存列显示配置""" + if self.other_info: + self.other_info.saved_show_fields = show_fields_json + self.save_config() + def on_initialized(self) -> None: self._init_db() self.register_service(self, protocol=HistoryService) @@ -268,5 +336,5 @@ def delete_record(self, record_id: int) -> bool: conn.close() def _on_config_changed(self, name: str, value: Any) -> None: - if name == "float_decimals" and hasattr(self, '_widget'): + if name == "float_decimals": self._widget.set_float_decimals(value) diff --git a/src/plugins/history/widgets.py b/src/plugins/history/widgets.py index c5e4693..f3fd771 100644 --- a/src/plugins/history/widgets.py +++ b/src/plugins/history/widgets.py @@ -11,14 +11,14 @@ import sys from datetime import datetime from pathlib import Path -from typing import Any +from typing import Any, cast -from PyQt5.QtCore import Qt, QCoreApplication -from PyQt5.QtGui import QCloseEvent as _QCloseEvent +from PyQt5.QtCore import QEvent, Qt, QCoreApplication, QModelIndex, QTimer, pyqtSignal +from PyQt5.QtGui import QCloseEvent as _QCloseEvent, QStandardItemModel, QStandardItem, QPalette from PyQt5.QtWidgets import ( + QAbstractItemView, QWidget, QVBoxLayout, - QTableWidget, QMenu, QAction, QTableView, @@ -32,168 +32,780 @@ QHBoxLayout, QPushButton, QSpacerItem, + QSplitter, QSizePolicy, QLabel, QHeaderView, + QStyledItemDelegate, + QStyle, + QApplication, ) from plugin_manager.app_paths import get_executable_dir +from shared_types.widgets import EditableComboBox + from .models import HistoryData, LogicSymbol, CompareSymbol from .table_model import HistoryTableModel _translate = QCoreApplication.translate -class FilterWidget(QWidget): - """筛选条件控件""" +class ComboBoxDelegate(QStyledItemDelegate): + """通用的 ComboBox 代理""" + + def __init__(self, items: list[str], parent=None): + super().__init__(parent) + self._items = items + + def createEditor(self, parent, option, index): + editor = QComboBox(parent) + editor.addItems(self._items) + return editor + + def setEditorData(self, editor: QComboBox, index): + value = index.model().data(index, Qt.EditRole) + if value: + idx = editor.findText(value) + if idx >= 0: + editor.setCurrentIndex(idx) + + def setModelData(self, editor: QComboBox, model, index): + model.setData(index, editor.currentText(), Qt.EditRole) + + def updateEditorGeometry(self, editor, option, index): + editor.setGeometry(option.rect) + + +class EditableComboBoxDelegate(QStyledItemDelegate): + """可编辑的 ComboBox 代理(带补全)""" + + def __init__(self, items: list[str], parent=None): + super().__init__(parent) + self._items = items + + def createEditor(self, parent, option, index): + editor = EditableComboBox(self._items, parent) + return editor + + def setEditorData(self, editor: EditableComboBox, index): + value = index.model().data(index, Qt.EditRole) + if value: + editor.setCurrentText(value) + + def setModelData(self, editor: EditableComboBox, model, index): + model.setData(index, editor.currentText(), Qt.EditRole) + + def updateEditorGeometry(self, editor, option, index): + editor.setGeometry(option.rect) + + +class FilterValueDelegate(QStyledItemDelegate): + """值列的智能代理,根据同行字段类型动态决定编辑器""" + + COL_FIELD = 1 # FilterModel.COL_FIELD + COL_COMPARE = 2 # FilterModel.COL_COMPARE def __init__(self, float_decimals: int = 2, parent=None): super().__init__(parent) self._float_decimals = float_decimals - vbox = QVBoxLayout(self) - self.table = QTableWidget(self) - self.table.setColumnCount(6) - self.table.setHorizontalHeaderLabels( + self._editor_widgets = [] # 缓存创建的编辑器widget + + def paint(self, painter, option, index): + """根据字段类型绘制单元格""" + # 检查选中状态 + is_selected = option.state & QStyle.State_Selected + + if is_selected: # type: ignore + # 选中时绘制背景 + painter.fillRect(option.rect, option.palette.highlight()) + # 使用高亮文本颜色 + text_role = QPalette.HighlightedText + else: + text_role = QPalette.WindowText + + field_value, _, _ = self._get_field_info(index) + raw_value = index.data(Qt.EditRole) + + if field_value is None or raw_value is None: + super().paint(painter, option, index) + return + + display_text = str(raw_value) + + if isinstance(field_value, datetime): + # 日期类型显示为可读格式 + try: + ts = int(raw_value) + if ts > 1e15: # 微秒 + ts = ts / 1_000_000 + elif ts > 1e12: # 毫秒 + ts = ts / 1_000 + dt = datetime.fromtimestamp(ts) + display_text = dt.strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, TypeError, OSError): + display_text = raw_value + elif isinstance(field_value, float): + # 浮点数显示带小数位 + try: + display_text = f"{float(raw_value):.{self._float_decimals}f}" + except (ValueError, TypeError): + display_text = raw_value + + # 使用 QStyle 绘制文本 + style = QApplication.style() + style.drawItemText( + painter, + option.rect, + Qt.AlignCenter | Qt.AlignVCenter, # type: ignore + option.palette, + True, + display_text, + text_role + ) + + def _get_field_info(self, index: QModelIndex) -> tuple: + """获取同行的字段信息和比较符""" + model = index.model() + row = index.row() + + # 获取字段名 + field_index = model.index(row, self.COL_FIELD) + field_name = model.data(field_index, Qt.EditRole) + + # 获取比较符 + compare_index = model.index(row, self.COL_COMPARE) + compare_text = model.data(compare_index, Qt.EditRole) + + if not field_name: + return None, None, None + + try: + field_value = HistoryData.get_field_value(field_name) + except (KeyError, IndexError): + return None, None, None + + compare = None + if compare_text: + try: + compare = CompareSymbol.from_display_name(compare_text) + except ValueError: + pass + + return field_value, compare, field_name + + def _create_editor_by_type(self, parent, field_value, compare, field_name): + """根据字段类型创建编辑器""" + from shared_types.enums import BaseDiaPlayEnum + + # 如果是包含/不包含比较符,使用 LineEdit + if compare and compare in (CompareSymbol.Contains, CompareSymbol.NotContains): + return QLineEdit(parent) + + if isinstance(field_value, BaseDiaPlayEnum): + editor = QComboBox(parent) + editor.addItems([e.display_name for e in field_value.__class__]) + return editor + elif isinstance(field_value, int): + return QSpinBox(parent) + elif isinstance(field_value, float): + editor = QDoubleSpinBox(parent) + editor.setDecimals(self._float_decimals) + editor.setRange(-1e15, 1e15) + return editor + elif isinstance(field_value, datetime): + editor = QDateTimeEdit(parent) + editor.setDisplayFormat("yyyy-MM-dd HH:mm:ss") + editor.setCalendarPopup(True) + return editor + else: + return QLineEdit(parent) + + def createEditor(self, parent, option, index): + field_value, compare, field_name = self._get_field_info(index) + if field_value is None: + return QLineEdit(parent) + return self._create_editor_by_type(parent, field_value, compare, field_name) + + def setEditorData(self, editor, index): + field_value, compare, field_name = self._get_field_info(index) + if field_value is None: + return + + raw_value = index.model().data(index, Qt.EditRole) + + from shared_types.enums import BaseDiaPlayEnum + if isinstance(field_value, BaseDiaPlayEnum) and isinstance(editor, QComboBox): + if raw_value: + idx = editor.findText(raw_value) + if idx >= 0: + editor.setCurrentIndex(idx) + elif isinstance(field_value, int) and isinstance(editor, QSpinBox): + try: + editor.setValue(int(raw_value) if raw_value else 0) + except (ValueError, TypeError): + editor.setValue(0) + elif isinstance(field_value, float) and isinstance(editor, QDoubleSpinBox): + try: + editor.setValue(float(raw_value) if raw_value else 0.0) + except (ValueError, TypeError): + editor.setValue(0.0) + elif isinstance(field_value, datetime) and isinstance(editor, QDateTimeEdit): + try: + if raw_value: + # raw_value 可能是 int/float 时间戳,或字符串形式的时间戳 + if isinstance(raw_value, (int, float)): + dt = datetime.fromtimestamp(raw_value / 1_000_000) + else: + # 先尝试作为字符串时间戳解析 + try: + ts = int(raw_value) + if ts > 1e15: # 微秒 + ts = ts / 1_000_000 + elif ts > 1e12: # 毫秒 + ts = ts / 1_000 + dt = datetime.fromtimestamp(ts) + except (ValueError, TypeError): + # 再尝试解析日期时间字符串 + for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"): + try: + dt = datetime.strptime(raw_value, fmt) + break + except ValueError: + continue + else: + dt = datetime.now() + else: + dt = datetime.now() + editor.setDateTime(dt) + except (ValueError, TypeError): + editor.setDateTime(datetime.now()) + else: + if isinstance(editor, QLineEdit): + editor.setText(raw_value or "") + + def setModelData(self, editor, model, index): + field_value, compare, field_name = self._get_field_info(index) + if field_value is None: + if isinstance(editor, QLineEdit): + model.setData(index, editor.text(), Qt.EditRole) + return + + from shared_types.enums import BaseDiaPlayEnum + if isinstance(field_value, BaseDiaPlayEnum) and isinstance(editor, QComboBox): + model.setData(index, editor.currentText(), Qt.EditRole) + elif isinstance(field_value, (int, float)) and isinstance(editor, (QSpinBox, QDoubleSpinBox)): + model.setData(index, str(editor.value()), Qt.EditRole) + elif isinstance(field_value, datetime) and isinstance(editor, QDateTimeEdit): + ts = int(editor.dateTime().toPyDateTime().timestamp() * 1_000_000) + model.setData(index, str(ts), Qt.EditRole) + else: + if isinstance(editor, QLineEdit): + model.setData(index, editor.text(), Qt.EditRole) + + def updateEditorGeometry(self, editor, option, index): + editor.setGeometry(option.rect) + + +class AutoEditTableView(QTableView): + """自动进入编辑状态的 TableView""" + + def __init__(self, parent=None): + super().__init__(parent) + self._auto_edit_columns = set() # 需要自动编辑的列 + self.suppress_auto_edit = False # 标记是否阻止自动编辑 + self.clicked.connect(self._on_click_auto_edit) + # 垂直表头点击选中整行 + self.verticalHeader().sectionClicked.connect(self._on_header_select_row) + self.verticalHeader().setMaximumWidth(24) + self.verticalHeader().setMinimumWidth(24) + + def mousePressEvent(self, event): + """鼠标按下事件:右键点击时阻止自动编辑""" + # 右键点击,阻止自动编辑 + if event.button() == Qt.RightButton: + self.suppress_auto_edit = True + super().mousePressEvent(event) + self.reset_suppress_flag() + + def reset_suppress_flag(self): + """重置阻止自动编辑标志""" + self.suppress_auto_edit = False + + def setAutoEditColumn(self, column: int): + """设置指定列自动进入编辑""" + self._auto_edit_columns.add(column) + + def setAutoEditColumns(self, columns: list[int]): + """设置多个列自动进入编辑""" + self._auto_edit_columns.update(columns) + + def _on_header_select_row(self, section: int): + """通过垂直表头点击选中整行时,清除单元格焦点""" + self.selectRow(section) + # 清除单元格焦点,让焦点回到 TableView 本身 + self.setFocus() + + def _on_click_auto_edit(self, index: QModelIndex): + """鼠标点击时自动进入编辑""" + # 如果 index 无效,不做任何操作 + if not index.isValid(): + return + + # 右键点击后不自动编辑 + if self.suppress_auto_edit: + return + + # 如果模型没有行,不自动编辑 + model = self.model() + if model is None or model.rowCount() == 0: + return + + if index.column() in self._auto_edit_columns: + if self.state() != QTableView.EditingState: + self.edit(index) + return + + # 如果不是自动编辑列,不进入编辑 + self.selectionModel().setCurrentIndex(index, self.selectionModel().NoUpdate) + + def currentChanged(self, current: QModelIndex, previous: QModelIndex): + """当焦点单元格改变时,自动进入编辑状态(表头选择整行时不触发)""" + super().currentChanged(current, previous) + + # 如果没有有效的单元格,不自动进入编辑 + if not current.isValid(): + return + + # 右键点击或表头选择整行,不自动进入编辑 + if self.suppress_auto_edit: + return + + if current.column() in self._auto_edit_columns: + QTimer.singleShot(0, lambda: self._try_edit(current)) + + def _try_edit(self, index: QModelIndex): + """尝试进入编辑状态""" + if self.state() != QTableView.EditingState: + self.edit(index) + + def addRow(self): + """添加一行""" + self.suppress_auto_edit = True + model = self.model() + if model is None: + return + + model.insertRow(model.rowCount()) + self.suppress_auto_edit = False + + def insertRow(self, row: int): + """插入一行""" + self.suppress_auto_edit = True + model = self.model() + if model is None: + return + + model.insertRow(row) + self.suppress_auto_edit = False + + def removeRow(self, row: int): + """删除一行""" + self.suppress_auto_edit = True + model = self.model() + if model is None: + return + + model.removeRow(row) + self.suppress_auto_edit = False + + +class FilterModel(QStandardItemModel): + """过滤表格模型""" + + COL_LBRACKET, COL_FIELD, COL_COMPARE, COL_VALUE, COL_RBRACKET, COL_LOGIC = range( + 6) + + def __init__(self, parent=None): + super().__init__(0, 6, parent) + self.setHorizontalHeaderLabels( ["左括号", "字段", "比较符", "值", "右括号", "逻辑符"] ) + + def data(self, index, role=Qt.DisplayRole): + """重写 data 方法,格式化某些列的显示""" + if not index.isValid(): + return None + + if role == Qt.DisplayRole: + column = index.column() + raw_data = super().data(index, Qt.EditRole) + + # 值列需要格式化显示 + if column == self.COL_VALUE: + if not raw_data: + return "" + + # 获取同行字段名来确定类型 + field_index = self.index(index.row(), self.COL_FIELD) + field_name = super().data(field_index, Qt.EditRole) + + if field_name: + field_value = HistoryData.get_field_value(field_name) + if isinstance(field_value, datetime): + # 尝试解析时间戳并格式化为可读格式 + try: + ts = int(raw_data) + if ts > 1e15: # 微秒 + ts = ts / 1_000_000 + elif ts > 1e12: # 毫秒 + ts = ts / 1_000 + dt = datetime.fromtimestamp(ts) + return dt.strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, TypeError, OSError): + return raw_data + + return raw_data + + return super().data(index, role) + + def get_row_data(self, row: int) -> dict: + """获取指定行的所有数据""" + return { + "left_bracket": self.data(self.index(row, self.COL_LBRACKET)), + "field": self.data(self.index(row, self.COL_FIELD)), + "compare": self.data(self.index(row, self.COL_COMPARE)), + "value": self.data(self.index(row, self.COL_VALUE)), + "right_bracket": self.data(self.index(row, self.COL_RBRACKET)), + "logic": self.data(self.index(row, self.COL_LOGIC)), + } + + def get_field_value_type(self, row: int): + """获取指定行字段的原始值类型""" + field_name = str(self.data(self.index(row, self.COL_FIELD))) + return HistoryData.get_field_value(field_name) + + +class SortModel(QStandardItemModel): + """排序表格模型""" + + COL_FIELD, COL_ORDER = range(2) + + def __init__(self, parent=None): + super().__init__(0, 2, parent) + self.setHorizontalHeaderLabels(["排序字段", "升序/降序"]) + + def get_row_data(self, row: int) -> dict: + """获取指定行的所有数据""" + return { + "field": self.data(self.index(row, self.COL_FIELD)), + "order": self.data(self.index(row, self.COL_ORDER)), + } + + +class FilterWidget(QWidget): + """筛选条件控件""" + + def __init__(self, float_decimals: int = 2, parent=None): + super().__init__(parent) + self._float_decimals = float_decimals + + # 使用 QSplitter 实现横向分割 + splitter = QSplitter(Qt.Horizontal) + splitter.setChildrenCollapsible(False) + + # 左侧:过滤表格 + filter_widget = QWidget() + filter_layout = QVBoxLayout(filter_widget) + filter_layout.setContentsMargins(0, 0, 0, 0) + self.table = AutoEditTableView() + self.table.setModel(FilterModel(self)) self.table.horizontalHeader().setDefaultAlignment(Qt.AlignCenter) + self.table.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive) self.table.setContextMenuPolicy(Qt.CustomContextMenu) self.table.customContextMenuRequested.connect(self.show_context_menu) - self.table.setSelectionBehavior(QTableView.SelectRows) - self.table.setSelectionMode(QTableView.SingleSelection) - vbox.addWidget(self.table) - self.setLayout(vbox) - + self.table.setSelectionBehavior(QTableView.SelectItems) + self.table.setSelectionMode(QTableView.ExtendedSelection) + self.table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Minimum) + # 设置所有列自动进入编辑 + self.table.setAutoEditColumns( + list(range(FilterModel.COL_LBRACKET, FilterModel.COL_LOGIC + 1))) + # 禁用双击编辑 + self.table.setEditTriggers(QTableView.NoEditTriggers) + filter_layout.addWidget(self.table) + splitter.addWidget(filter_widget) + + # 右侧:排序表格 + sort_widget = QWidget() + sort_layout = QVBoxLayout(sort_widget) + sort_layout.setContentsMargins(0, 0, 0, 0) + sort_layout.setSpacing(0) + + self.sort_table = AutoEditTableView() + self.sort_table.setModel(SortModel(self)) + self.sort_table.horizontalHeader().setDefaultAlignment(Qt.AlignCenter) + self.sort_table.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive) + self.sort_table.setContextMenuPolicy(Qt.CustomContextMenu) + self.sort_table.customContextMenuRequested.connect( + self.show_sort_context_menu) + self.sort_table.setSelectionBehavior(QTableView.SelectItems) + self.sort_table.setSelectionMode(QTableView.ExtendedSelection) + self.sort_table.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Minimum) + # 设置所有列自动进入编辑 + self.sort_table.setAutoEditColumns( + [SortModel.COL_FIELD, SortModel.COL_ORDER]) + # 禁用双击编辑 + self.sort_table.setEditTriggers(QTableView.NoEditTriggers) + sort_layout.addWidget(self.sort_table) + splitter.addWidget(sort_widget) + + # 设置初始比例 + splitter.setStretchFactor(0, 3) + splitter.setStretchFactor(1, 1) + splitter.setMinimumSize(0, 0) + + # 主布局 + main_layout = QVBoxLayout(self) + main_layout.setContentsMargins(0, 0, 0, 0) + main_layout.addWidget(splitter) + self.setLayout(main_layout) + + self._setup_delegates() + self._connect_field_change_signal() + + def _connect_field_change_signal(self): + """当字段列改变时,更新值列的默认值""" + self.table.model().dataChanged.connect(self._on_field_changed) + + def _on_field_changed(self, topLeft, bottomRight, roles): + """字段列改变时触发""" + if Qt.EditRole in roles and topLeft.column() == FilterModel.COL_FIELD: + for row in range(topLeft.row(), bottomRight.row() + 1): + self._update_value_default(row) + + def _update_value_default(self, row: int): + """更新指定行的值列默认值""" + model = self.table.model() + field_name = model.data(model.index(row, FilterModel.COL_FIELD)) + + if not field_name: + return + + field_value = HistoryData.get_field_value(field_name) + new_default = self._get_default_value(field_value) + + # 获取当前值(使用 EditRole 获取原始数据) + current_value = model.data(model.index( + row, FilterModel.COL_VALUE), Qt.EditRole) + + # 始终更新为新类型的默认值 + model.setData( + model.index(row, FilterModel.COL_VALUE), + new_default, + Qt.EditRole + ) + + def _get_default_value(self, field_value) -> str: + """获取字段类型的默认值""" + from shared_types.enums import BaseDiaPlayEnum + + if field_value is None: + return "" + elif isinstance(field_value, BaseDiaPlayEnum): + # 枚举类型返回第一个选项 + return field_value.__class__.display_names()[0] + elif isinstance(field_value, datetime): + # 日期类型返回当前时间戳(微秒) + ts = int(datetime.now().timestamp() * 1_000_000) + return str(ts) + elif isinstance(field_value, float): + return "0.0" + elif isinstance(field_value, int): + return "0" + else: + return "" + + def _setup_delegates(self): + """设置列代理""" + # 左括号 + self.table.setItemDelegateForColumn( + FilterModel.COL_LBRACKET, + ComboBoxDelegate(["", "(", "(("], self) + ) + # 字段 + self.table.setItemDelegateForColumn( + FilterModel.COL_FIELD, + EditableComboBoxDelegate(HistoryData.fields(), self) + ) + # 比较符 + self.table.setItemDelegateForColumn( + FilterModel.COL_COMPARE, + ComboBoxDelegate(CompareSymbol.display_names(), self) + ) + # 值列 - 使用智能代理 + self.table.setItemDelegateForColumn( + FilterModel.COL_VALUE, + FilterValueDelegate(self._float_decimals, self) + ) + # 右括号 + self.table.setItemDelegateForColumn( + FilterModel.COL_RBRACKET, + ComboBoxDelegate(["", ")", "))"], self) + ) + # 逻辑符 + self.table.setItemDelegateForColumn( + FilterModel.COL_LOGIC, + ComboBoxDelegate(LogicSymbol.display_names(), self) + ) + + # 排序表格代理 + self.sort_table.setItemDelegateForColumn( + SortModel.COL_FIELD, + EditableComboBoxDelegate(HistoryData.fields(), self) + ) + self.sort_table.setItemDelegateForColumn( + SortModel.COL_ORDER, + ComboBoxDelegate( + [_translate("Form", "升序"), _translate("Form", "降序")], self + ) + ) + + def show_sort_context_menu(self, pos): + """排序列表右键菜单""" + menu = QMenu(self) + menu.addAction(_translate("Form", "添加"), self._add_sort_row) + menu.addAction(_translate("Form", "插入"), self._insert_sort_row) + menu.addAction(_translate("Form", "删除"), self._del_sort_row) + menu.exec_(self.sort_table.mapToGlobal(pos)) + + def _add_sort_row(self): + """添加排序行""" + model = self.sort_table.model() + row = model.rowCount() + model.insertRow(row) + model.setData(model.index(row, SortModel.COL_FIELD), + HistoryData.fields()[0]) + model.setData(model.index(row, SortModel.COL_ORDER), + _translate("Form", "升序")) + + def _add_sort_row_at(self, row: int, field: str | None = None, order: str | None = None): + """在指定位置添加排序行""" + model = self.sort_table.model() + if field is None: + field = HistoryData.fields()[0] + if order is None: + order = _translate("Form", "升序") + + self.sort_table.insertRow(row) + model.setData(model.index(row, SortModel.COL_FIELD), field) + model.setData(model.index(row, SortModel.COL_ORDER), order) + + def _insert_sort_row(self): + """在当前行前插入排序行""" + current_row = self.sort_table.currentIndex().row() + self._add_sort_row_at(current_row if current_row >= 0 else 0) + + def _del_sort_row(self): + """删除选中的排序行""" + selected_rows = set( + index.row() for index in self.sort_table.selectionModel().selectedIndexes()) + if not selected_rows: + return + for row in sorted(selected_rows, reverse=True): + self.sort_table.removeRow(row) + + def _del_sort_row_at(self, row: int): + """删除指定行的排序""" + if row >= 0: + self.sort_table.removeRow(row) + + def gen_order_str(self) -> str: + """生成排序 SQL 语句""" + model = self.sort_table.model() + if model.rowCount() == 0: + return "" + + orders = [] + for row in range(model.rowCount()): + field = model.data(model.index(row, SortModel.COL_FIELD)) + order_text = model.data(model.index(row, SortModel.COL_ORDER)) + order_sql = "ASC" if order_text == _translate( + "Form", "升序") else "DESC" + orders.append(f"{field} {order_sql}") + + if orders: + return " ORDER BY " + ", ".join(orders) + return "" + def set_float_decimals(self, decimals: int) -> None: """动态设置小数位数""" self._float_decimals = decimals + # 更新值列代理 + self.table.setItemDelegateForColumn( + FilterModel.COL_VALUE, + FilterValueDelegate(self._float_decimals, self) + ) def show_context_menu(self, pos): menu = QMenu(self) menu.addAction(_translate("Form", "添加"), self.add_row) - menu.addAction(_translate("Form", "删除"), self.del_row) menu.addAction( _translate("Form", "插入"), lambda: self.insert_row( - self.table.currentRow()) + self.table.currentIndex().row()) ) + menu.addAction(_translate("Form", "删除"), self.del_row) menu.exec_(self.table.mapToGlobal(pos)) - def _build_left_bracket(self): - w = QComboBox(self) - w.addItems(["", "(", "(("]) - return w - - def _build_field(self): - w = QComboBox(self) - w.addItems(HistoryData.fields()) - w.currentIndexChanged.connect(self.on_field_changed) - return w - - def _build_compare(self): - w = QComboBox(self) - w.addItems(CompareSymbol.display_names()) - w.currentIndexChanged.connect(self.on_compare_changed) - return w - - def _build_right_bracket(self): - w = QComboBox(self) - w.addItems(["", ")", "))"]) - return w - - def _build_logic(self): - w = QComboBox(self) - w.addItems(LogicSymbol.display_names()) - return w - - def on_field_changed(self, index): - combo: QComboBox = self.sender() - item_index = self.table.indexAt(combo.pos()) - if not item_index.isValid(): - return - row = item_index.row() - field_name = combo.currentText() - compare_w: QComboBox = self.table.cellWidget(row, 2) - compare = CompareSymbol.from_display_name(compare_w.currentText()) - field_cls = HistoryData.get_field_value(field_name) - self.table.setCellWidget( - row, 3, self._build_value_widget(compare, field_cls)) - - def on_compare_changed(self, index): - combo: QComboBox = self.sender() - item_index = self.table.indexAt(combo.pos()) - if not item_index.isValid(): - return - row = item_index.row() - field_w: QComboBox = self.table.cellWidget(row, 1) - field_name = field_w.currentText() - compare = CompareSymbol.from_display_name(combo.currentText()) - field_cls = HistoryData.get_field_value(field_name) - self.table.setCellWidget( - row, 3, self._build_value_widget(compare, field_cls)) - - def _build_value_widget(self, compare: CompareSymbol, field_value: Any): - from shared_types.enums import BaseDiaPlayEnum - - if compare not in (CompareSymbol.Contains, CompareSymbol.NotContains): - if isinstance(field_value, BaseDiaPlayEnum): - w = QComboBox(self) - # 获取该枚举类的所有成员的 display_name - enum_cls = field_value.__class__ - w.addItems([e.display_name for e in enum_cls]) - return w - elif isinstance(field_value, int): - return QSpinBox(self) - elif isinstance(field_value, float): - w = QDoubleSpinBox(self) - w.setDecimals(self._float_decimals) - return w - elif isinstance(field_value, str): - return QLineEdit(self) - elif isinstance(field_value, datetime): - w = QDateTimeEdit(self) - w.setDateTime(datetime.now()) # 默认当前时间 - return w - return QLineEdit(self) - def add_row(self): - self.insert_row(self.table.rowCount()) + self.insert_row(self.table.model().rowCount()) def del_row(self): - self.table.removeRow(self.table.currentRow()) + # 获取所有选中的行 + selected_rows = set(index.row() + for index in self.table.selectionModel().selectedIndexes()) + if not selected_rows: + return + # 取消行选中 + self.table.clearSelection() + # 从后往前删除,避免索引变化 + for row in sorted(selected_rows, reverse=True): + self.table.removeRow(row) def insert_row(self, row: int): + if row < 0: + row = 0 + model = self.table.model() self.table.insertRow(row) - field_w = self._build_field() - compare_w = self._build_compare() - compare = CompareSymbol.from_display_name(compare_w.currentText()) - field_value = HistoryData.get_field_value(field_w.currentText()) - self.table.setCellWidget(row, 0, self._build_left_bracket()) - self.table.setCellWidget(row, 1, field_w) - self.table.setCellWidget(row, 2, compare_w) - self.table.setCellWidget( - row, 3, self._build_value_widget(compare, field_value)) - self.table.setCellWidget(row, 4, self._build_right_bracket()) - self.table.setCellWidget(row, 5, self._build_logic()) + # 设置默认值 + model.setData(model.index(row, FilterModel.COL_FIELD), + HistoryData.fields()[0]) + model.setData(model.index(row, FilterModel.COL_COMPARE), + CompareSymbol.display_names()[0]) + model.setData(model.index(row, FilterModel.COL_LOGIC), + LogicSymbol.display_names()[0]) + # 不需要手动设置值,代理会根据字段类型自动处理 def gen_filter_str(self): + model = cast(FilterModel, self.table.model()) filter_str = "" left_count = 0 right_count = 0 - for row in range(self.table.rowCount()): - left_bracket_w = self.table.cellWidget(row, 0) - field_w = self.table.cellWidget(row, 1) - compare_w = self.table.cellWidget(row, 2) - value_w = self.table.cellWidget(row, 3) - right_bracket_w = self.table.cellWidget(row, 4) - logic_w = self.table.cellWidget(row, 5) - - left_bracket = left_bracket_w.currentText() - field = field_w.currentText() - field_init_value = HistoryData.get_field_value(field) - compare = CompareSymbol.from_display_name(compare_w.currentText()) - right_bracket = right_bracket_w.currentText() - logic = LogicSymbol.from_display_name(logic_w.currentText()).to_sql + for row in range(model.rowCount()): + data = model.get_row_data(row) + field_value_type = model.get_field_value_type(row) + + left_bracket = data["left_bracket"] or "" + field = data["field"] or "" + compare_text = data["compare"] or "" + value = data["value"] or "" + right_bracket = data["right_bracket"] or "" + logic_text = data["logic"] or "" + + if not field or not compare_text: + continue + + compare = CompareSymbol.from_display_name(compare_text) + logic = LogicSymbol.from_display_name(logic_text).to_sql if left_bracket == "(": left_count += 1 @@ -210,73 +822,87 @@ def gen_filter_str(self): ) return None - # 获取值 + # 处理值 from shared_types.enums import BaseDiaPlayEnum - if isinstance(value_w, QComboBox): - # 如果字段是 Enum 类型,需要获取对应的枚举值 - if isinstance(field_init_value, BaseDiaPlayEnum): - enum_cls = field_init_value.__class__ - display_name = value_w.currentText() - # 找到对应的枚举成员 - for e in enum_cls: - if e.display_name == display_name: - value = str(e.value) - break - else: - value = value_w.currentText() - else: - value = value_w.currentText() - elif isinstance(value_w, QDateTimeEdit): - value = int( - value_w.dateTime().toPyDateTime().timestamp() * 1_000_000) - elif isinstance(value_w, QSpinBox): - value = str(value_w.value()) - elif isinstance(value_w, QDoubleSpinBox): - value = str(value_w.value()) - elif isinstance(value_w, QLineEdit): - if compare in (CompareSymbol.Contains, CompareSymbol.NotContains): - if isinstance(field_init_value, (int, float)): - values = value_w.text().split(",") - for v in values: - if not v.replace("-", "").isdigit(): - QMessageBox.warning( - self, "错误", f"第{row}行 {v} 不是数字" - ) - return None - value = ",".join(v for v in values) - elif isinstance(field_init_value, datetime): - values = value_w.text().split(",") - for v in values: + if isinstance(field_value_type, BaseDiaPlayEnum): + enum_cls = field_value_type.__class__ + for e in enum_cls: + if e.display_name == value: + value = str(e.value) + break + elif compare in (CompareSymbol.Contains, CompareSymbol.NotContains): + if isinstance(field_value_type, (int, float)): + values = value.split(",") + for v in values: + if not v.replace("-", "").replace(".", "").isdigit(): + QMessageBox.warning( + self, "错误", f"第{row}行 {v} 不是数字" + ) + return None + value = ",".join(v for v in values) + elif isinstance(field_value_type, datetime): + values = value.split(",") + parsed_values = [] + for v in values: + v = v.strip() + if not v: + continue + try: + # 尝试解析为时间戳(微秒) + ts = int(float(v)) + parsed_values.append(str(ts)) + except ValueError: + # 尝试解析为日期时间字符串 try: - datetime.strptime(v, "%Y-%m-%d %H:%M:%S") - except ValueError: + for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"): + try: + dt = datetime.strptime(v, fmt) + parsed_values.append( + str(int(dt.timestamp() * 1_000_000))) + break + except ValueError: + continue + else: + raise ValueError(f"无法解析日期: {v}") + except ValueError as e: QMessageBox.warning( self, "错误", f"第{row}行 {v} 不是合法的日期时间" ) return None - values = [ - int( - datetime.strptime( - v, "%Y-%m-%d %H:%M:%S").timestamp() - * 1_000_000 - ) - for v in values - ] - value = ",".join(str(v) for v in values) - else: - value = ",".join( - f"'{v}'" for v in value_w.text().split(",")) - value = f"({value})" + value = ",".join(parsed_values) if parsed_values else "" else: - value = f"'{value_w.text()}'" - else: - value = str( - getattr(value_w, "value", value_w.text()) - if hasattr(value_w, "value") - else "" - ) - - is_last = row == self.table.rowCount() - 1 + value = ",".join( + f"'{v}'" for v in value.split(",") if v.strip()) + value = f"({value})" if value else "()" + elif isinstance(field_value_type, datetime) and value: + try: + # 可能是时间戳字符串 + ts = int(float(value)) + value = str(ts) + except ValueError: + # 尝试解析日期时间字符串 + try: + for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"): + try: + dt = datetime.strptime(value, fmt) + value = str(int(dt.timestamp() * 1_000_000)) + break + except ValueError: + continue + else: + QMessageBox.warning( + self, "错误", f"第{row}行 {value} 不是合法的日期时间" + ) + return None + except ValueError: + QMessageBox.warning( + self, "错误", f"第{row}行 {value} 不是合法的日期时间" + ) + return None + elif value and not value.startswith("'"): + value = f"'{value}'" + + is_last = row == model.rowCount() - 1 filter_str += ( f" {left_bracket} {field} {compare.to_sql} {value} {right_bracket} " ) @@ -292,6 +918,9 @@ def gen_filter_str(self): class HistoryTable(QWidget): """历史记录表格""" + # 信号:列显示配置变化 (show_fields_json) + show_fields_changed = pyqtSignal(str) + HEADERS = [ "replay_id", "game_board_state", @@ -360,7 +989,7 @@ def load(self, data: list[HistoryData]): def refresh(self): parent_widget = self.parent() if hasattr(parent_widget, "load_data"): - parent_widget.load_data() + parent_widget.load_data() # type: ignore def show_context_menu(self, pos): menu = QMenu(self) @@ -385,6 +1014,8 @@ def _on_toggle_field(self, action: QAction): else: self.showFields.discard(name) self.model.update_show_fields(self.showFields) + self.show_fields_changed.emit(json.dumps( + list(self.showFields), ensure_ascii=False)) def _get_current_replay_id(self) -> int | None: row_idx = self.table.currentIndex().row() @@ -394,7 +1025,7 @@ def _get_current_replay_id(self) -> int | None: if "replay_id" in visible: col = visible.index("replay_id") rid = self.model.data(self.model.index(row_idx, col), Qt.UserRole) - return rid + return rid # type: ignore return getattr(self.model._data[row_idx], "replay_id", None) def _read_raw_data(self, replay_id: int) -> bytes | None: @@ -452,11 +1083,17 @@ def export_row(self): class HistoryMainWidget(QWidget): """历史记录插件的主界面(作为插件的 widget 返回)""" + # 信号:排序和过滤状态变化 (filter_json, sort_json) + filter_sort_state_changed = pyqtSignal(str, str) + # 信号:列显示配置变化 (show_fields_json) + show_fields_changed = pyqtSignal(str) + def __init__( self, db_path: Path, config_path: Path, float_decimals: int = 2, + page_size: str = "50", parent=None, ): super().__init__(parent) @@ -490,6 +1127,10 @@ def __init__( self.one_page_combo = QComboBox() self.one_page_combo.addItems( ["10", "20", "50", "100", "200", "500", "1000"]) + # 设置默认每页条数 + idx = self.one_page_combo.findText(page_size) + if idx >= 0: + self.one_page_combo.setCurrentIndex(idx) self.limit_label = QLabel("") limit_layout.addItem( @@ -510,6 +1151,25 @@ def __init__( self._connect_signals() self.load_data() + def set_filter_sort_state(self, filter_json: str, sort_json: str) -> None: + """设置排序和过滤状态(由插件调用)""" + try: + filter_rows = json.loads(filter_json) + if filter_rows: + self._set_filter_rows(filter_rows) + except (json.JSONDecodeError, TypeError): + pass + + try: + sort_rows = json.loads(sort_json) + if sort_rows: + self._set_sort_rows(sort_rows) + except (json.JSONDecodeError, TypeError): + pass + + # 恢复后触发一次查询 + self._on_query() + def _connect_signals(self): self.query_button.clicked.connect(self._on_query) self.previous_button.clicked.connect( @@ -520,6 +1180,7 @@ def _connect_signals(self): ) self.one_page_combo.currentTextChanged.connect(self.load_data) self.page_spin.valueChanged.connect(self.load_data) + self.table.show_fields_changed.connect(self.show_fields_changed) def _on_query(self): if self.page_spin.value() > 1: @@ -548,11 +1209,13 @@ def load_data(self): conn.row_factory = sqlite3.Row cursor = conn.cursor() filter_str = self.filter_widget.gen_filter_str() + order_str = self.filter_widget.gen_order_str() sql = "SELECT *, COUNT(*) OVER() AS total_count FROM history" if filter_str: sql += " WHERE " + filter_str elif filter_str is None: return + sql += order_str sql += self._get_limit_str() cursor.execute(sql) datas = cursor.fetchall() @@ -575,12 +1238,79 @@ def load_data(self): self.table.load(history_data) + # 保存当前的排序和过滤状态 + self._save_filter_sort_state() + + def _get_filter_rows(self) -> list[dict]: + """获取过滤表格的所有行数据""" + model = cast(FilterModel, self.filter_widget.table.model()) + rows = [] + for row in range(model.rowCount()): + rows.append(model.get_row_data(row)) + return rows + + def _get_sort_rows(self) -> list[dict]: + """获取排序表格的所有行数据""" + model = cast(SortModel, self.filter_widget.sort_table.model()) + rows = [] + for row in range(model.rowCount()): + rows.append(model.get_row_data(row)) + return rows + + def _set_filter_rows(self, rows: list[dict]) -> None: + """恢复过滤表格的行数据""" + model = self.filter_widget.table.model() + model.removeRows(0, model.rowCount()) + for row_data in rows: + row = model.rowCount() + model.insertRow(row) + model.setData(model.index(row, FilterModel.COL_LBRACKET), + row_data.get("left_bracket")) + model.setData(model.index(row, FilterModel.COL_FIELD), + row_data.get("field")) + model.setData(model.index(row, FilterModel.COL_COMPARE), + row_data.get("compare")) + model.setData(model.index(row, FilterModel.COL_VALUE), + row_data.get("value"), Qt.EditRole) + model.setData(model.index(row, FilterModel.COL_RBRACKET), + row_data.get("right_bracket")) + model.setData(model.index(row, FilterModel.COL_LOGIC), + row_data.get("logic")) + + def _set_sort_rows(self, rows: list[dict]) -> None: + """恢复排序表格的行数据""" + model = self.filter_widget.sort_table.model() + model.removeRows(0, model.rowCount()) + for row_data in rows: + row = model.rowCount() + model.insertRow(row) + model.setData(model.index(row, SortModel.COL_FIELD), + row_data.get("field")) + model.setData(model.index(row, SortModel.COL_ORDER), + row_data.get("order")) + + def _save_filter_sort_state(self) -> None: + """发射排序和过滤状态变化信号""" + filter_rows = self._get_filter_rows() + sort_rows = self._get_sort_rows() + self.filter_sort_state_changed.emit( + json.dumps(filter_rows, ensure_ascii=False), json.dumps(sort_rows, ensure_ascii=False)) + def closeEvent(self, event: _QCloseEvent): - """关闭时保存列显示配置""" - with open(self._config_path, "w") as f: - json.dump(list(self.table.showFields), f) + """关闭事件""" super().closeEvent(event) - + def set_float_decimals(self, decimals: int) -> None: """动态设置小数位数""" self.filter_widget.set_float_decimals(decimals) + + def restore_show_fields(self, show_fields_json: str) -> None: + """恢复列显示配置""" + try: + fields = json.loads(show_fields_json) + if not fields: + fields = self.table.HEADERS + self.table.showFields = set(fields) + self.table.model.update_show_fields(self.table.showFields) + except (json.JSONDecodeError, TypeError): + pass diff --git a/src/plugins/llm_minesweeper_controller/api_client.py b/src/plugins/llm_minesweeper_controller/api_client.py index 35c51af..157f3f5 100644 --- a/src/plugins/llm_minesweeper_controller/api_client.py +++ b/src/plugins/llm_minesweeper_controller/api_client.py @@ -16,17 +16,17 @@ class ChatResponse: status_code: int = 0 raw_data: Optional[Dict[str, Any]] = None error: Optional[str] = None - + # 解析后的内容 content: Optional[str] = None # 文本内容 tool_calls: Optional[List[Dict[str, Any]]] = None # tool_calls 列表 finish_reason: Optional[str] = None - + @property def has_tool_calls(self) -> bool: """是否有 tool_calls""" return bool(self.tool_calls) - + @property def has_content(self) -> bool: """是否有文本内容""" @@ -57,8 +57,9 @@ def chat( self, messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]] = None, - temperature: float = 0.3, + temperature: float = 0.2, max_tokens: Optional[int] = None, + reasoning_effort: Optional[str] = None, ) -> ChatResponse: """ 发送聊天请求 (OpenAI /v1/chat/completions) @@ -68,6 +69,7 @@ def chat( tools: OpenAI tools格式的函数定义列表 temperature: 温度参数 max_tokens: 最大token数 + reasoning_effort: 深度思考强度 ("low"/"medium"/"high"),None表示关闭 Returns: ChatResponse @@ -83,6 +85,9 @@ def chat( if max_tokens is not None: payload["max_tokens"] = max_tokens + if reasoning_effort: + payload["reasoning_effort"] = reasoning_effort + if tools: payload["tools"] = tools payload["tool_choice"] = "auto" @@ -136,16 +141,19 @@ def _parse_success_response(self, response_data: Dict[str, Any]) -> ChatResponse try: choice = response_data.get("choices", [{}])[0] message = choice.get("message", {}) - + # 提取文本内容 content = message.get("content") - + # 提取 tool_calls tool_calls = message.get("tool_calls") - + # 提取 finish_reason finish_reason = choice.get("finish_reason") - + + # 打印思考内容(如果存在) + thinking = message.get("thinking") or message.get("reasoning") + return ChatResponse( success=True, status_code=200, @@ -166,11 +174,11 @@ def _parse_success_response(self, response_data: Dict[str, Any]) -> ChatResponse def build_tool_result_message(tool_call_id: str, result: Any) -> Dict[str, Any]: """ 构建 tool 结果消息 - + Args: tool_call_id: tool_call 的 ID result: 函数执行结果(会被 JSON 序列化) - + Returns: 可直接追加到 messages 的消息字典 """ @@ -178,7 +186,7 @@ def build_tool_result_message(tool_call_id: str, result: Any) -> Dict[str, Any]: content = result else: content = json.dumps(result, ensure_ascii=False) - + return { "role": "tool", "tool_call_id": tool_call_id, @@ -192,11 +200,11 @@ def build_assistant_tool_message( ) -> Dict[str, Any]: """ 构建 assistant 的 tool_calls 消息(用于多轮对话时保存历史) - + Args: content: 文本内容(可能为 None) tool_calls: tool_calls 列表 - + Returns: 可追加到 messages 的消息字典 """ @@ -214,4 +222,4 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.close() \ No newline at end of file + self.close() diff --git a/src/plugins/llm_minesweeper_controller/config.py b/src/plugins/llm_minesweeper_controller/config.py index 9c38df2..fec7bd2 100644 --- a/src/plugins/llm_minesweeper_controller/config.py +++ b/src/plugins/llm_minesweeper_controller/config.py @@ -3,7 +3,7 @@ """ from __future__ import annotations -from plugin_sdk import OtherInfoBase, BoolConfig, IntConfig, TextConfig, LongTextConfig, ChoiceConfig +from plugin_sdk import OtherInfoBase, BoolConfig, IntConfig, TextConfig, ChoiceConfig class LlmMinesweeperControllerConfig(OtherInfoBase): @@ -47,77 +47,30 @@ class LlmMinesweeperControllerConfig(OtherInfoBase): label="自动执行LLM操作(否则需确认)", ) - # 提示词设置 - system_prompt = LongTextConfig( - default="""你是一个扫雷AI。你只能通过调用工具来操作游戏。 - -# 绝对规则 -每次回复只允许调用1个工具函数。 -所有推理在你的"内部思考"中完成,不要输出到回复里。 - -# 推理策略 -- 基础:数字=周围未揭开数 → 全是雷;数字=周围旗子数 → 全安全 -- 进阶:用相邻数字的差值与非共享未知格做减法约束 -- 盲猜(仅无逻辑解时):选长连续边界中段,绝对禁止猜边角 - -# 决策树(每次操作前必须按此顺序检查!) - -## 检查1:能标旗吗?→ 右键 `"right"` -``` -对于每个数字N: - 未揭开格子数 = N ? - → 是:这N个格子必定是雷 → `click_cell(..., "right")` 标旗 - → 继续检查其他数字 -``` -**标旗不会死!遇到确定的雷必须标旗!** - -## 标旗的修正 -如果发现之前标错了旗(数字逻辑矛盾),可以再次 `click_cell(..., "right")` 取消标旗。 -右键点击已标旗(F)的格子 = 取消标旗。 - -## 检查2:能中键吗?→ 中键 `"middle"` -``` -对于每个数字N: - 周围已标旗数 = N ? - → 是:周围剩余格子全部安全 → `click_cell(..., "middle")` 批量揭开 - → 继续检查其他数字 -``` -**这是最常用的批量揭开操作,必须优先使用!** - -## 检查3:能左键吗?→ 左键 `"left"` -``` -不属于以上两种情况,但确定安全? -→ 是:`click_cell(..., "left")` 揭开(通常是数字0) -``` - -## 强制规则 -- 检查顺序:标旗 → 中键 → 左键,**不能跳过** - -# 游戏状态判断 -- 棋盘出现 M → 调用 start_new_game -- cells 为空 → 调用 start_new_game -- 否则 → 分析推理后调用 click_cell 或 get_local_board - -# 操作流程 -1. 先调用 get_board_state 获取全局 -2. 若有确定操作,直接调用 click_cell -3. 若需要细节,调用 get_local_board(radius=4) -4. 循环直到胜利 + temperature = IntConfig( + default=30, + label="温度参数(0-100)", + ) -# 棋盘与状态定义 -- 格子状态:`-1`(未揭开)、`0-8`(周围雷数)、`F`(已标旗)、`M`(踩到的红雷)、`m`(未踩的白雷) -- 游戏状态容错:若棋盘出现 `M` 必为失败;若非雷格全揭开必为胜利;以棋盘实际画面为准。 + max_history_messages = IntConfig( + default=20, + label="上下文上限", + description="超过此值时触发压缩", + ) -# 可用工具 -- `get_board_state()`:获取全局视图。 -- `get_local_board(col, row, radius=4)`:获取局部细节,返回(2*radius+1)x(2*radius+1)的区域。 -- `click_cell(col, row, button)`:执行操作,button可为 `"left"`(揭开)、`"right"`(标旗) 或 `"middle"`(快速揭开周围格子)。 -- `start_new_game(difficulty)`:开始新游戏,difficulty为 `"easy"`(8x8)、`"medium"`(16x16) 或 `"hard"`(16x30)。 -""", - label="系统提示词", + min_history_messages = IntConfig( + default=5, + label="上下文下限", + description="压缩后保留的最少消息数", ) - temperature = IntConfig( - default=30, - label="温度参数(0-100)", + deep_thinking = ChoiceConfig( + default="medium", + label="深度思考", + choices=[ + ("off", "关闭"), + ("low", "低"), + ("medium", "中"), + ("high", "高"), + ], ) diff --git a/src/plugins/llm_minesweeper_controller/plugin.py b/src/plugins/llm_minesweeper_controller/plugin.py index be38856..4fcc62a 100644 --- a/src/plugins/llm_minesweeper_controller/plugin.py +++ b/src/plugins/llm_minesweeper_controller/plugin.py @@ -4,6 +4,7 @@ from __future__ import annotations from ctypes import cast +import hashlib import json from typing import Dict, Any, Optional, List @@ -21,46 +22,435 @@ from .function_registry import FunctionRegistry +# 系统提示词(写死在代码中) +SYSTEM_PROMPT = """你是一个扫雷AI。你只能通过调用工具来操作游戏。 + +# 棋盘格式(最重要!) +- 棋盘是 `cells[row][col]` 二维数组 +- `row` 是行索引(0开始,从上到下) +- `col` 是列索引(0开始,从左到右) +- **只能左键点击 `cells[row][col] == -1` 的未揭开格子!** +- 已揭开的格子(值为 0-8)不能再次点击! +- 值为 `F` 的已标旗格子也不能点击! + +# 游戏阶段判断(根据 game_status 字段!) + +## 阶段1:开局(game_status=ready) +- 棋盘全部是 `-1`(未揭开) +- **必须直接执行第一次点击!** 不要反复查询! +- 选择中间区域的格子,如 `click_cell(row=rows//2, col=cols//2, button="left")` + +## 阶段2:正常游戏(game_status=playing) +按照下方决策树操作。 + +## 阶段3:游戏结束 +- game_status=win → 调用 start_new_game 开始新游戏 +- game_status=fail → 调用 start_new_game 开始新游戏 + +# 决策树(阶段2时使用,按此顺序检查!) + +## 检查1:能标旗吗?→ 右键 `"right"` +``` +对于每个数字N: + 周围 `-1` 格子数 = N ? + → 是:这N个格子必定是雷 → `click_cell(row=X, col=Y, button="right")` 标旗 +``` +**标旗点击的是 `-1` 未揭开格子!右键只会标旗,不会揭开格子!** +- 右键点击 `-1` 格子 = 标旗(F) +- 右键点击已标旗(F)格子 = 取消标旗 + +## 检查2:能中键吗?→ 中键 `"middle"` +``` +对于每个数字N: + 周围已标旗数 = N ? + → 是:周围剩余 `-1` 格子全部安全 → `click_cell(row=X, col=Y, button="middle")` 批量揭开 +``` +**中键点击的是数字格子(N),不是 `-1` 格子!** + +## 检查3:能左键吗?→ 左键 `"left"` +``` +不属于以上两种情况,但确定安全? +→ 是:`click_cell(row=X, col=Y, button="left")` 揭开周围的 `-1` 格子 +``` +**左键点击的是 `-1` 未揭开格子!左键只能点击 `-1` 格子来揭开!** + +## 强制规则 +- 点击 `-1` 格子:只能左键(揭开)或右键(标旗) +- 点击数字格子:只能中键(批量揭开周围) +- 检查顺序:标旗 → 中键 → 左键,**不能跳过** + +# 棋盘与状态定义 +- 格子状态:`-1`(未揭开)、`0-8`(周围雷数)、`F`(已标旗)、`M`(踩到的红雷)、`m`(未踩的白雷) +- game_status: `ready`(准备), `playing`(游戏中), `win`(胜利), `fail`(失败) + +# 可用工具 +- `get_board_state()`:获取全局视图,返回 `cells[row][col]` 格式。 +- `click_cell(row, col, button)`:执行操作,**row和col必须是未被揭开的 `-1` 格子**。 + - button `"left"` 揭开格子 + - button `"right"` 标旗/取消标旗 + - button `"middle"` 快速揭开周围格子 +- `start_new_game(difficulty)`:开始新游戏,`difficulty`为 `"easy"`、`"medium"` 或 `"hard"`。 +""" + + +class ExecutionSummary: + """执行摘要,用于压缩历史上下文""" + + def __init__(self): + self.actions: List[Dict[str, Any]] = [] # 记录执行的操作详情 + self.queries: int = 0 + self.clicks: int = 0 + self.flags: int = 0 + self.unflags: int = 0 + self.middles: int = 0 + self.games_started: int = 0 + self.last_game_status: str = "" + + def add_action(self, func_name: str, args: Dict, result: str): + """添加一个操作记录""" + action = {"func": func_name, "args": args, + "result_preview": self._shorten_result(result)} + self.actions.append(action) + + if func_name == "get_board_state": + self.queries += 1 + elif func_name == "click_cell": + button = args.get("button", "") + if button == "left": + self.clicks += 1 + elif button == "right": + # 检查是否取消标旗 + result_lower = result.lower() + if "取消" in result or "unflag" in result_lower: + self.unflags += 1 + else: + self.flags += 1 + elif button == "middle": + self.middles += 1 + elif func_name == "start_new_game": + self.games_started += 1 + self.last_game_status = args.get("difficulty", "") + + @staticmethod + def _shorten_result(result: str, max_len: int = 50) -> str: + """缩短结果文本""" + if not result: + return "" + result = result.strip() + if len(result) <= max_len: + return result + return result[:max_len] + "..." + + def to_summary_text(self) -> str: + """生成压缩摘要文本""" + lines = ["[历史执行摘要]"] + + # 统计信息 + stats = [] + if self.clicks > 0: + stats.append(f"左键{self.clicks}次") + if self.flags > 0: + stats.append(f"标旗{self.flags}次") + if self.unflags > 0: + stats.append(f"取消标旗{self.unflags}次") + if self.middles > 0: + stats.append(f"中键{self.middles}次") + if self.queries > 0: + stats.append(f"查询{self.queries}次") + if self.games_started > 0: + stats.append(f"新游戏{self.games_started}次") + + if stats: + lines.append(f"执行统计: {', '.join(stats)}") + + # 最近的操作记录(简化为统计格式) + click_actions = [a for a in self.actions if a.get( + "func") == "click_cell"] + if click_actions: + recent = click_actions[-5:] # 只保留最近5个 + lines.append(f"最近操作({len(click_actions)}个点击):") + for a in recent: + args = a["args"] + button = args.get("button", "") + col, row = args.get("col"), args.get("row") + btn_name = {"left": "左", "right": "右", + "middle": "中"}.get(button, button) + lines.append(f" - {btn_name}键({col},{row})") + + if self.last_game_status: + lines.append(f"最后游戏: {self.last_game_status}") + + return "\n".join(lines) + + class LLMWorker(QThread): """LLM 工作线程""" log_signal = pyqtSignal(str) chat_signal = pyqtSignal(str, str) # role, text finished_signal = pyqtSignal(bool, str) # success, message + summary_signal = pyqtSignal(str) # 上下文摘要更新 def __init__(self, client: LLMClient, registry: FunctionRegistry, - messages: List[Dict[str, Any]]): + messages: List[Dict[str, Any]], max_history: int = 20, + min_history: int = 5, + config: Optional["LlmMinesweeperControllerConfig"] = None): super().__init__() self.client = client self.registry = registry self.messages = messages + self.max_history = max_history # 上限:超过此值触发压缩 + self.min_history = min_history # 下限:压缩后保留的最少消息数 + self.config = config self._stop_flag = False + self._execution_summary: ExecutionSummary | None = None # 压缩摘要 def stop(self) -> None: """请求停止工作线程""" self._stop_flag = True self.requestInterruption() + def _emit_summary_update(self) -> None: + """发送摘要更新信号到 UI""" + if self._execution_summary: + summary_text = self._execution_summary.to_summary_text() + self.summary_signal.emit(summary_text) + else: + self.summary_signal.emit("") + + def _trim_history(self) -> None: + """压缩历史消息,只保留下限数量的最近消息""" + if self.max_history <= 0: + return + + # 移除旧的压缩摘要 + self.messages[:] = [m for m in self.messages + if not (m.get("role") == "user" and "[上下文压缩]" in (m.get("content") or ""))] + + # 分离消息 + system_msgs = [m for m in self.messages if m.get("role") == "system"] + other_msgs = [m for m in self.messages if m.get("role") != "system"] + + # 如果超过上限,压缩旧消息到下限 + if len(other_msgs) > self.max_history: + old_msgs = other_msgs[:-self.min_history] # 保留下限数量的消息 + self._compress_history(old_msgs) + + # 移除旧的压缩摘要(压缩后又插入了) + self.messages[:] = [m for m in self.messages + if not (m.get("role") == "user" and "[上下文压缩]" in (m.get("content") or ""))] + + # 重新分离并截取到下限 + system_msgs = [m for m in self.messages if m.get("role") == "system"] + other_msgs = [m for m in self.messages if m.get("role") != "system"] + + if len(other_msgs) > self.min_history: + other_msgs = other_msgs[-self.min_history:] + + # 构建最终消息列表 + result = system_msgs[:] + + # 插入压缩摘要(如果有) + if self._execution_summary and (self._execution_summary.actions or + self._execution_summary.queries > 0 or self._execution_summary.clicks > 0): + summary_text = self._execution_summary.to_summary_text() + result.append({ + "role": "user", + "content": f"[上下文压缩] 以下是之前的执行摘要:\n{summary_text}" + }) + + # 添加最近的消息(最多到下限) + result.extend(other_msgs) + self.messages[:] = result + + # 发送摘要更新信号到 UI + self._emit_summary_update() + + def _clean_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """清理消息列表,移除无效值,确保API兼容""" + clean = [] + for i, msg in enumerate(messages): + role = msg.get("role") + if not role: + continue + + clean_msg: Dict[str, Any] = {"role": role} + + # 处理 content - 只能是字符串或None,不能是其他类型 + content = msg.get("content") + if content is not None and isinstance(content, str): + clean_msg["content"] = content + + # 处理 tool_calls - 确保格式正确 + tool_calls = msg.get("tool_calls") + if tool_calls and isinstance(tool_calls, list): + clean_tcs = [] + for j, tc in enumerate(tool_calls): + if not tc: + continue + func = tc.get("function") or {} + args = func.get("arguments", "{}") + # arguments 必须是字符串 + if not isinstance(args, str): + args = json.dumps(args, ensure_ascii=False) + + clean_tc: Dict[str, Any] = { + "id": str(tc.get("id") or f"call_{i}_{j}"), + "type": "function", + "function": { + "name": str(func.get("name") or ""), + "arguments": args + } + } + clean_tcs.append(clean_tc) + + if clean_tcs: + clean_msg["tool_calls"] = clean_tcs + + # 处理 tool 角色的 tool_call_id + if role == "tool": + tool_call_id = msg.get("tool_call_id") + if tool_call_id: + clean_msg["tool_call_id"] = str(tool_call_id) + # tool 消息必须有 content + tool_content = msg.get("content") + if tool_content is not None: + clean_msg["content"] = str(tool_content) + + clean.append(clean_msg) + return clean + + def _compress_history(self, old_msgs: List[Dict[str, Any]]) -> None: + """压缩历史消息:将旧的消息压缩成摘要""" + # old_msgs 是即将被压缩的旧消息 + + if not old_msgs: + return + + # 统计旧消息中的操作 - 按顺序配对 assistant tool_calls 和 tool 结果 + summary = self._execution_summary or ExecutionSummary() + + # 按顺序遍历消息,模拟函数调用流程 + pending_actions: List[Dict] = [] # 待匹配结果的操作 + + for msg in old_msgs: + if msg.get("role") == "assistant" and "tool_calls" in msg: + for tc in msg.get("tool_calls", []): + func_data = tc.get("function", {}) + func_name = func_data.get("name", "") + try: + args_str = func_data.get("arguments", "{}") + args = json.loads(args_str) if isinstance( + args_str, str) else args_str + except json.JSONDecodeError: + args = {} + pending_actions.append( + {"func": func_name, "args": args, "result": ""}) + + elif msg.get("role") == "tool": + # 匹配到 pending_actions 中的操作 + content = msg.get("content", "") or "" + if pending_actions: + action = pending_actions.pop(0) + summary.add_action(action["func"], action["args"], content) + + # 如果还有未匹配的操作(异常情况),也加入摘要 + for action in pending_actions: + summary.add_action( + action["func"], action["args"], action["result"]) + + # 构建压缩摘要消息 + if summary.actions or summary.queries > 0 or summary.clicks > 0: + self._execution_summary = summary + summary_text = summary.to_summary_text() + self.log_signal.emit(f"📦 上下文压缩: 合并了 {len(old_msgs)} 条旧消息") + + # 移除之前的压缩摘要消息(避免累积) + self.messages[:] = [m for m in self.messages + if not (m.get("role") == "user" and "[上下文压缩]" in (m.get("content") or ""))] + + # 在消息列表开头添加压缩摘要(作为 user 消息) + insert_idx = len( + [m for m in self.messages if m.get("role") == "system"]) + self.messages.insert(insert_idx, { + "role": "user", + "content": f"[上下文压缩] 以下是之前的执行摘要:\n{summary_text}" + }) + def run(self): - """执行多轮对话循环(无上限)""" + """执行多轮对话循环(有历史消息上限)""" try: tools = self.registry.get_tools_schema() round_num = 0 + # 连续查询棋盘次数(不含click操作时) + consecutive_query_count = 0 + MAX_CONSECUTIVE_QUERIES = 3 # 连续3次查询后强制决策 + + # 记录最近点击的棋盘状态,用于检测重复点击 + last_click_board_hash = "" + consecutive_no_change_count = 0 + MAX_CONSECUTIVE_NO_CHANGE = 3 # 连续3次点击后棋盘无变化则警告 + while True: # 检查停止标志 if self._stop_flag or self.isInterruptionRequested(): self.finished_signal.emit(False, "用户请求停止") return + # 只在超过上限时才裁剪历史消息 + current_msg_count = len( + [m for m in self.messages if m.get("role") != "system"]) + if current_msg_count > self.max_history: + self._trim_history() + self.log_signal.emit( + f"📦 上下文压缩: {current_msg_count} -> {len([m for m in self.messages if m.get('role') != 'system'])} 条 (上限: {self.max_history}, 下限: {self.min_history})") + + self.log_signal.emit( + f"当前历史消息数: {current_msg_count} (上限: {self.max_history}, 下限: {self.min_history})") + round_num += 1 self.log_signal.emit(f"=== 第 {round_num} 轮对话 ===") - # 调用 LLM + # 如果连续查询次数过多,添加强制决策提示 + if consecutive_query_count >= MAX_CONSECUTIVE_QUERIES: + force_decision_prompt = ( + f"[系统] 你已连续查询棋盘 {consecutive_query_count} 次但未执行任何操作!" + "现在必须基于已有信息做出决策:要么执行 click_cell 操作,要么调用 start_new_game。" + "不要继续查询棋盘状态!" + ) + self.messages.append( + {"role": "user", "content": force_decision_prompt}) + self.log_signal.emit("⚠️ 强制决策:连续查询次数过多,要求AI必须执行操作") + consecutive_query_count = 0 # 重置计数 + + # 如果连续点击但棋盘无变化,添加强制决策提示 + if consecutive_no_change_count >= MAX_CONSECUTIVE_NO_CHANGE: + force_decision_prompt = ( + f"[系统] 警告!你已连续 {consecutive_no_change_count} 次执行点击操作,但棋盘状态没有变化!" + "可能的原因:1) 点击了已揭开的格子 2) 点击了边界外 3) 游戏已结束。" + "请先调用 get_board_state 检查当前状态,再决定下一步操作。" + "如果游戏已结束(win/fail),必须调用 start_new_game 开始新游戏!" + ) + self.messages.append( + {"role": "user", "content": force_decision_prompt}) + self.log_signal.emit( + f"⚠️ 强制检查:连续{consecutive_no_change_count}次点击无效果,要求检查棋盘状态") + consecutive_no_change_count = 0 # 重置计数 + + # 调用 LLM(清理消息中的无效值) + clean_messages = self._clean_messages(self.messages) + + # reasoning_effort 参数 + reasoning_effort = None + if self.config.deep_thinking != "off": + reasoning_effort = self.config.deep_thinking + response: ChatResponse = self.client.chat( - messages=self.messages, + messages=clean_messages, tools=tools, - temperature=0.3, + temperature=0.2, + reasoning_effort=reasoning_effort, ) if not response.success: @@ -68,21 +458,22 @@ def run(self): False, f"API 调用失败: {response.error}") return - # 显示 LLM 文本回复 - if response.has_content: - self.chat_signal.emit("assistant", response.content) - # 检查是否需要调用工具 if response.has_tool_calls: - # 将 assistant 消息加入历史 - self.messages.append( - LLMClient.build_assistant_tool_message( - response.content, - response.tool_calls - ) - ) + # 构建 assistant 消息,合并 content 和 tool_calls + assistant_msg: Dict[str, Any] = { + "role": "assistant", + "tool_calls": response.tool_calls + } + if response.has_content: + content = response.content or "" + self.chat_signal.emit("assistant", content) + assistant_msg["content"] = content + + self.messages.append(assistant_msg) # 处理每个 tool_call + has_action = False # 本轮是否有实际操作 for tool_call in response.tool_calls: tool_call_id = tool_call.get("id", "") func_data = tool_call.get("function", {}) @@ -110,6 +501,46 @@ def run(self): tool_call_id, result) self.messages.append(tool_msg) + # 记录到执行摘要(用于上下文压缩) + if self._execution_summary is None: + self._execution_summary = ExecutionSummary() + self._execution_summary.add_action( + func_name, func_args, tool_msg.get("content", "")) + # 实时更新摘要显示 + self._emit_summary_update() + + # 记录是否有实际操作(click_cell 或 start_new_game) + if func_name in ("click_cell", "start_new_game"): + has_action = True + + # 检查棋盘是否真的发生了变化 + if func_name == "click_cell": + current_board = self.registry.execute_function( + "get_board_state", {}) + if current_board and "cells" in current_board: + board_str = json.dumps( + current_board["cells"], ensure_ascii=False) + current_hash = hashlib.md5( + board_str.encode()).hexdigest() + + if current_hash == last_click_board_hash: + consecutive_no_change_count += 1 + self.log_signal.emit( + f"⚠️ 棋盘无变化! 连续无变化次数: {consecutive_no_change_count}/{MAX_CONSECUTIVE_NO_CHANGE}") + else: + consecutive_no_change_count = 0 + last_click_board_hash = current_hash + self.log_signal.emit("✓ 棋盘已更新") + + # 根据是否有实际操作更新连续查询计数 + if has_action: + consecutive_query_count = 0 + self.log_signal.emit("✓ 检测到实际操作,重置查询计数") + else: + consecutive_query_count += 1 + self.log_signal.emit( + f"⚠️ 无实际操作,连续查询计数: {consecutive_query_count}/{MAX_CONSECUTIVE_QUERIES}") + # 继续下一轮对话(让 LLM 处理工具结果) continue @@ -138,10 +569,6 @@ def plugin_info(cls) -> PluginInfo: required_controls=[NewGameCommand, MouseClickCommand], ) - @property - def other_info(self) -> LlmMinesweeperControllerConfig: - return super().other_info # type: ignore - def _setup_subscriptions(self) -> None: self.subscribe(BoardUpdateEvent, self._on_board_update) self.subscribe(GameStatusChangeEvent, self._on_game_status_change) @@ -163,6 +590,7 @@ def on_initialized(self) -> None: # 设置 UI 回调 self._widget.set_test_button_callback(self._test_connection) self._widget.set_analyze_callback(self._start_analysis) + self._widget.set_stop_button_callback(self._stop_analysis) # 监听配置变化 self.config_changed.connect(self._on_config_changed) @@ -176,6 +604,9 @@ def on_initialized(self) -> None: # 当前工作线程 self._worker: Optional[LLMWorker] = None + # 用户主动停止标志 + self._user_stopped: bool = False + def _log_control_auth_status(self) -> None: """检查控制权限""" has_new_game = self.has_control_auth(NewGameCommand) @@ -187,6 +618,8 @@ def _log_control_auth_status(self) -> None: def _init_llm_client(self) -> None: """初始化 LLM 客户端""" + if self.other_info is None: + return api_key = self.other_info.api_key base_url = self.other_info.api_base_url model = self.other_info.model_name @@ -241,16 +674,16 @@ def start_new_game(difficulty: str = None) -> Dict[str, Any]: def get_board_state() -> Dict[str, Any]: return self._get_current_board_state() - @registry.register( - description="获取局部棋盘区域,返回以(col,row)为中心的局部格子,radius自己决定(建议3-5)", - param_descriptions={ - "col": "中心列索引 (从 0 开始)", - "row": "中心行索引 (从 0 开始)", - "radius": "半径,自己决定大小,默认3,返回(2*radius+1)x(2*radius+1)的区域", - } - ) - def get_local_board(col: int, row: int, radius: int = 3) -> Dict[str, Any]: - return self._get_local_board(col, row, radius) + # @registry.register( + # description="获取局部棋盘区域,返回以(col,row)为中心的局部格子,radius自己决定(建议3-5)", + # param_descriptions={ + # "col": "中心列索引 (从 0 开始)", + # "row": "中心行索引 (从 0 开始)", + # "radius": "半径,自己决定大小,默认3,返回(2*radius+1)x(2*radius+1)的区域", + # } + # ) + # def get_local_board(col: int, row: int, radius: int = 3) -> Dict[str, Any]: + # return self._get_local_board(col, row, radius) def on_control_auth_changed(self, cmd_type, granted: bool) -> None: """控制权限变更回调""" @@ -307,16 +740,28 @@ def _on_game_status_change(self, event: GameStatusChangeEvent) -> None: self._widget.log_message(f"游戏状态变化: {last_name} -> {current_name}") - # 记录当前游戏状态 + # 始终更新游戏状态 self._game_status = event.current_status - # 如果游戏结束(胜利或失败),更新状态显示 + # 游戏结束时(win/fail -> ready/playing),清空 worker 的历史上下文 + if event.last_status in (3, 4) and event.current_status in (1, 2): + if self._worker: + # 保留系统消息,只清空其他消息 + self._worker.messages[:] = [ + m for m in self._worker.messages if m.get("role") == "system"] + # 重置执行摘要 + self._worker._execution_summary = None + # 通知 UI 摘要已清空 + self._widget.update_summary("") + self._widget.log_message("已清空历史上下文") + + # 更新状态显示 if event.current_status == 3: self._widget.update_status("游戏胜利!") elif event.current_status == 4: self._widget.update_status("游戏失败!") - elif event.current_status == 1: - self._widget.update_status("游戏准备中...") + elif event.current_status == 2: + self._widget.update_status("游戏中...") # ═══════════════════════════════════════════════════════════════ # LLM 对话流程 @@ -364,24 +809,43 @@ def _start_analysis(self) -> None: self._widget.log_message("已有分析任务在运行") return + # 重置用户停止标志 + self._user_stopped = False + # 构建初始消息 messages = self._build_initial_messages() # 创建并启动工作线程 + max_history = self.other_info.max_history_messages + min_history = self.other_info.min_history_messages self._worker = LLMWorker( client=self.llm_client, registry=self.function_registry, messages=messages, + max_history=max_history, + min_history=min_history, + config=self.other_info, ) # 连接信号 self._worker.log_signal.connect(self._widget.log_message) self._worker.chat_signal.connect(self._widget.add_chat_message) self._worker.finished_signal.connect(self._on_analysis_finished) + self._worker.summary_signal.connect(self._widget.update_summary) self._widget.set_buttons_enabled(False) self._worker.start() + def _stop_analysis(self) -> None: + """停止 LLM 分析""" + if self._worker and self._worker.isRunning(): + self.logger.info("用户请求停止 LLM 分析") + self._widget.log_message("正在停止分析...") + self._user_stopped = True + self._worker.stop() + else: + self._widget.log_message("没有正在运行的分析任务") + def _on_analysis_finished(self, success: bool, message: str) -> None: """分析完成回调""" if self._widget is None: @@ -394,6 +858,11 @@ def _on_analysis_finished(self, success: bool, message: str) -> None: self._widget.update_status("分析中断") self._widget.log_message(message) + # 如果是用户主动停止,不再自动继续分析 + if self._user_stopped: + self._widget.log_message("用户已停止,不再自动继续") + return + # 无论成功还是失败,如果游戏状态是进行中,自动继续分析 # 防止AI没有进行任何函数调用就结束 if self._game_status == 2: # playing @@ -405,10 +874,8 @@ def _build_initial_messages(self) -> List[Dict[str, Any]]: """构建初始消息列表""" messages = [] - # 系统提示词 - system_prompt = self.other_info.system_prompt - if system_prompt: - messages.append({"role": "system", "content": system_prompt}) + # 系统提示词(写死在代码中) + messages.append({"role": "system", "content": SYSTEM_PROMPT}) # 当前棋盘状态 board_state = self._get_current_board_state() diff --git a/src/plugins/llm_minesweeper_controller/widgets.py b/src/plugins/llm_minesweeper_controller/widgets.py index 3cf4f0e..0417180 100644 --- a/src/plugins/llm_minesweeper_controller/widgets.py +++ b/src/plugins/llm_minesweeper_controller/widgets.py @@ -17,6 +17,8 @@ class LlmMinesweeperControllerWidget(QWidget): _chat_signal = pyqtSignal(str, str) # role, text _status_signal = pyqtSignal(str) _enable_buttons_signal = pyqtSignal(bool) + _summary_signal = pyqtSignal(str) + _stop_signal = pyqtSignal() def __init__(self, parent=None): super().__init__(parent) @@ -31,9 +33,24 @@ def __init__(self, parent=None): status_group.setLayout(status_layout) layout.addWidget(status_group) - # 对话显示区 - chat_group = QGroupBox("LLM对话") - chat_layout = QVBoxLayout() + # 上下文摘要 + 对话区(水平布局) + main_splitter = QSplitter(Qt.Horizontal) + + # 上下文摘要(左侧) + self._summary_text = QTextEdit() + self._summary_text.setReadOnly(True) + self._summary_text.setMaximumWidth(280) + self._summary_text.setMinimumWidth(200) + self._summary_text.setStyleSheet(""" + QTextEdit { + font-family: Consolas, 'Microsoft YaHei', monospace; + font-size: 11px; + color: #0078d4; + background-color: #f0f6ff; + } + """) + + # 对话显示区(右侧,宽) self._chat_text = QTextEdit() self._chat_text.setReadOnly(True) self._chat_text.setStyleSheet(""" @@ -42,16 +59,20 @@ def __init__(self, parent=None): font-size: 13px; } """) - chat_layout.addWidget(self._chat_text) - chat_group.setLayout(chat_layout) - layout.addWidget(chat_group, stretch=1) + + main_splitter.addWidget(self._summary_text) + main_splitter.addWidget(self._chat_text) + main_splitter.setStretchFactor(0, 0) # 摘要不拉伸 + main_splitter.setStretchFactor(1, 1) # 对话拉伸 + + layout.addWidget(main_splitter, stretch=1) # 日志显示区 log_group = QGroupBox("日志") log_layout = QVBoxLayout() self._log_text = QTextEdit() self._log_text.setReadOnly(True) - self._log_text.setMaximumHeight(120) + self._log_text.setMaximumHeight(80) self._log_text.setStyleSheet("font-size: 11px; color: #666;") log_layout.addWidget(self._log_text) log_group.setLayout(log_layout) @@ -61,10 +82,14 @@ def __init__(self, parent=None): button_layout = QHBoxLayout() self._analyze_button = QPushButton("🤖 分析并操作") self._analyze_button.setStyleSheet("padding: 6px; font-size: 14px;") + self._stop_button = QPushButton("⏹ 停止") + self._stop_button.setStyleSheet("padding: 6px; font-size: 14px;") + self._stop_button.setEnabled(False) # 默认禁用,有任务时才启用 self._test_button = QPushButton("🔗 测试连接") self._clear_chat_button = QPushButton("🗑 清除对话") self._clear_log_button = QPushButton("🗑 清除日志") button_layout.addWidget(self._analyze_button) + button_layout.addWidget(self._stop_button) button_layout.addWidget(self._test_button) button_layout.addWidget(self._clear_chat_button) button_layout.addWidget(self._clear_log_button) @@ -75,6 +100,8 @@ def __init__(self, parent=None): self._chat_signal.connect(self._on_chat) self._status_signal.connect(self._on_status) self._enable_buttons_signal.connect(self._on_enable_buttons) + self._summary_signal.connect(self._on_summary) + self._stop_signal.connect(self._on_stop_clicked) # 按钮事件 self._clear_log_button.clicked.connect(self._clear_log) @@ -100,7 +127,8 @@ def _on_chat(self, role: str, text: str) -> None: ) # 滚动到底部 scrollbar = self._chat_text.verticalScrollBar() - scrollbar.setValue(scrollbar.maximum()) + if scrollbar: + scrollbar.setValue(scrollbar.maximum()) def _on_status(self, text: str) -> None: self._status_label.setText(text) @@ -108,6 +136,11 @@ def _on_status(self, text: str) -> None: def _on_enable_buttons(self, enabled: bool) -> None: self._analyze_button.setEnabled(enabled) self._test_button.setEnabled(enabled) + self._stop_button.setEnabled(not enabled) # 停止按钮与主按钮相反 + + def _on_stop_clicked(self) -> None: + """停止按钮点击处理""" + self._stop_signal.emit() def _clear_log(self) -> None: self._log_text.clear() @@ -115,6 +148,12 @@ def _clear_log(self) -> None: def _clear_chat(self) -> None: self._chat_text.clear() + def _on_summary(self, text: str) -> None: + """更新上下文摘要显示""" + self._summary_text.setPlainText(text) + # 滚动到顶部 + self._summary_text.moveCursor(self._summary_text.textCursor().Start) + # ── 线程安全的公开方法(由插件调用) ── def log_message(self, text: str) -> None: @@ -129,8 +168,14 @@ def update_status(self, text: str) -> None: def set_buttons_enabled(self, enabled: bool) -> None: self._enable_buttons_signal.emit(enabled) + def update_summary(self, text: str) -> None: + self._summary_signal.emit(text) + def set_analyze_callback(self, callback) -> None: self._analyze_button.clicked.connect(callback) def set_test_button_callback(self, callback) -> None: self._test_button.clicked.connect(callback) + + def set_stop_button_callback(self, callback) -> None: + self._stop_button.clicked.connect(callback) diff --git a/src/plugins/stats_panel.py b/src/plugins/stats_panel.py deleted file mode 100644 index e9d5bf7..0000000 --- a/src/plugins/stats_panel.py +++ /dev/null @@ -1,196 +0,0 @@ -""" -实时游戏统计面板 - -通过 HistoryService 获取历史记录进行统计分析。 -收到 VideoSaveEvent 时触发刷新,不直接使用 event 数据。 -""" -from __future__ import annotations - -from collections import defaultdict - -from PyQt5.QtWidgets import ( - QWidget, QVBoxLayout, QHBoxLayout, QLabel, QTableWidget, - QTableWidgetItem, QGroupBox, QHeaderView, -) -from PyQt5.QtCore import pyqtSignal - -from plugin_sdk import BasePlugin, PluginInfo, make_plugin_icon, WindowMode -from shared_types.events import VideoSaveEvent -from plugins.services.history import HistoryService, GameRecord - - -class StatsPanel(QWidget): - """统计面板 UI""" - - _signal_refresh = pyqtSignal() - - def __init__(self, parent=None): - super().__init__(parent) - self._total_games = 0 - self._best_time = float('inf') - self._stats_by_level = defaultdict( - lambda: {"count": 0, "best_time": float('inf')}) - - self._setup_ui() - self._signal_refresh.connect(self._do_refresh) - - def _setup_ui(self): - main_layout = QVBoxLayout(self) - - cards_layout = QHBoxLayout() - - self._lbl_total = self._make_stat_card("Total", "0", "#1976D2") - self._lbl_best = self._make_stat_card("Best", "--", "#F57C00") - - for card in [self._lbl_total, self._lbl_best]: - cards_layout.addWidget(card) - - main_layout.addLayout(cards_layout) - - group = QGroupBox("Recent Games") - group_layout = QVBoxLayout(group) - - self._table = QTableWidget() - self._table.setColumnCount(4) - self._table.setHorizontalHeaderLabels( - ["Level", "Time(s)", "3BV", "Clicks"]) - self._table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) - self._table.setAlternatingRowColors(True) - self._table.setSelectionBehavior(QTableWidget.SelectRows) - group_layout.addWidget(self._table) - - main_layout.addWidget(group) - - @staticmethod - def _make_stat_card(title: str, value: str, color: str) -> QWidget: - card = QWidget() - card.setStyleSheet( - f"background: {color}; border-radius: 8px; padding: 8px;") - layout = QVBoxLayout(card) - layout.setContentsMargins(12, 8, 12, 8) - - lbl_title = QLabel(title) - lbl_title.setStyleSheet( - "color: rgba(255,255,255,0.8); font-size: 12px;") - lbl_value = QLabel(value) - lbl_value.setStyleSheet( - "color: white; font-size: 24px; font-weight: bold;") - - layout.addWidget(lbl_title) - layout.addWidget(lbl_value) - return card - - def _do_refresh(self): - """刷新显示(由插件调用)""" - # 更新总数显示 - self._lbl_total.findChild(QLabel).setText(str(self._total_games)) - - # 更新最佳时间 - if self._best_time < float('inf'): - self._lbl_best.findChild(QLabel).setText(f"{self._best_time:.2f}") - - def clear_table(self): - """清空表格""" - self._table.setRowCount(0) - - def add_record(self, level: int, rtime: float, bbbv: int, left: int, right: int): - """添加一条记录到表格""" - row = self._table.rowCount() - self._table.insertRow(row) - self._table.setItem(row, 0, QTableWidgetItem(str(level))) - self._table.setItem(row, 1, QTableWidgetItem(f"{rtime:.2f}")) - self._table.setItem(row, 2, QTableWidgetItem(str(bbbv))) - ops = left + right - self._table.setItem(row, 3, QTableWidgetItem(str(ops))) - - def update_stats(self, total: int, best_time: float): - """更新统计数据""" - self._total_games = total - self._best_time = best_time - - -class StatsPlugin(BasePlugin): - """实时游戏统计插件 - - 数据来源:仅依赖 HistoryService - - 初始化时加载历史统计 - - 收到 VideoSaveEvent 时触发刷新(重新查询历史) - """ - - @classmethod - def plugin_info(cls) -> PluginInfo: - return PluginInfo( - name="stats_panel", - version="1.0.0", - author="Example", - description="Real-time game statistics panel (via HistoryService)", - icon=make_plugin_icon("#E91E63", "S", 64), - window_mode=WindowMode.TAB, - ) - - def _setup_subscriptions(self) -> None: - self.subscribe(VideoSaveEvent, self._on_video_save) - - def _create_widget(self) -> QWidget: - self._panel = StatsPanel() - return self._panel - - def on_initialized(self) -> None: - # 检查 HistoryService 是否可用 - history = self.wait_for_service(HistoryService, 10) - if history is not None: - self.logger.info("HistoryService 已连接") - self._load_history_stats() - else: - self.logger.warning("HistoryService 不可用,统计面板将无法工作") - - def _load_history_stats(self) -> None: - """从 HistoryService 加载历史统计(在服务提供者线程执行)""" - try: - # 获取服务代理对象(IDE 友好) - history = self.get_service_proxy(HistoryService) - - # 直接调用方法(IDE 完整补全) - total = history.get_record_count() - self.logger.info(f"历史记录总数: {total}") - - # 清空表格 - self._panel.clear_table() - - # 获取最近记录 - records = history.query_records(100, 0, None) - - # 计算最佳时间 - best_time = float('inf') - for r in records: - if r.rtime > 0 and r.rtime < best_time: - best_time = r.rtime - - # 更新统计 - self._panel.update_stats(total, best_time) - - # 添加最近记录到表格(显示最近 20 条) - for r in records[:20]: - self._panel.add_record( - level=r.level, - rtime=r.rtime, - bbbv=r.bbbv, - left=r.left, - right=r.right, - ) - - # 触发 UI 刷新 - self._panel._signal_refresh.emit() - - except Exception as e: - self.logger.warning(f"加载历史统计失败: {e}") - - def on_shutdown(self) -> None: - self.logger.info("StatsPlugin shutting down") - - def _on_video_save(self, event: VideoSaveEvent): - """收到录像保存事件,触发重新加载历史统计""" - self.logger.info(f"Video saved, refreshing stats...") - - # 不直接使用 event 数据,而是重新查询 HistoryService - self._load_history_stats() diff --git a/src/plugins/test_control_a.py b/src/plugins/test_control_a.py deleted file mode 100644 index aed4bbf..0000000 --- a/src/plugins/test_control_a.py +++ /dev/null @@ -1,108 +0,0 @@ -""" -测试控制插件 A - -声明需要 NewGameCommand 控制权限 -""" -from __future__ import annotations - -from PyQt5.QtWidgets import QMessageBox, QWidget, QVBoxLayout, QLabel, QPushButton -from PyQt5.QtCore import pyqtSignal - -from plugin_sdk import BasePlugin, PluginInfo, make_plugin_icon, WindowMode -from shared_types.commands import NewGameCommand -from shared_types.events import VideoSaveEvent - - -class TestControlWidgetA(QWidget): - """测试插件 A 的界面""" - - def __init__(self, parent=None): - super().__init__(parent) - - layout = QVBoxLayout(self) - - self._title = QLabel("测试控制插件 A") - self._title.setStyleSheet("font-size: 16px; font-weight: bold;") - layout.addWidget(self._title) - - self._status = QLabel("状态: 等待初始化...") - layout.addWidget(self._status) - - self._btn = QPushButton("开始新游戏 (16x30x99)") - self._btn.setEnabled(False) - layout.addWidget(self._btn) - - self._btn.clicked.connect(self._on_click) - - def set_has_permission(self, has: bool) -> None: - if has: - self._status.setText("状态: ✅ 已获得 NewGameCommand 权限") - self._btn.setEnabled(True) - else: - self._status.setText("状态: ❌ 未获得 NewGameCommand 权限") - self._btn.setEnabled(False) - - def _on_click(self) -> None: - # 由插件连接 - pass - - -class TestControlPluginA(BasePlugin): - """测试控制插件 A""" - - @classmethod - def plugin_info(cls) -> PluginInfo: - return PluginInfo( - name="test_control_a", - version="1.0.0", - author="Test", - description="测试控制权限 A - NewGameCommand", - icon=make_plugin_icon("#e91e63", "A"), - window_mode=WindowMode.TAB, - required_controls=[NewGameCommand], # 声明需要的控制权限 - ) - - def _setup_subscriptions(self) -> None: - self.subscribe(VideoSaveEvent, self._on_video_save) - - def _create_widget(self) -> QWidget: - self._widget = TestControlWidgetA() - self._widget._btn.clicked.connect(self._on_new_game_click) - return self._widget - - def on_initialized(self) -> None: - self.logger.info("TestControlPluginA 初始化") - - # 检查是否有控制权限 - has_auth = self.has_control_auth(NewGameCommand) - self.logger.info(f"NewGameCommand 权限: {has_auth}") - - # 更新界面 - self.run_on_gui(self._widget.set_has_permission, has_auth) - - def on_control_auth_changed(self, command_type: type, granted: bool) -> None: - """权限变更回调""" - if command_type == NewGameCommand: - self.logger.info(f"NewGameCommand 权限变更: {granted}") - self.run_on_gui(self._widget.set_has_permission, granted) - - def _on_new_game_click(self) -> None: - if self.has_control_auth(NewGameCommand): - self.logger.info("发送 NewGameCommand") - result = self.request(NewGameCommand(rows=16, cols=30, mines=99)) - if result is not None: - QMessageBox.information( - self.widget, "NewGameCommand 响应", f"请求 ID: {result.request_id}, 成功: {result.success}") - else: - self.logger.warning("没有 NewGameCommand 权限") - - def _on_video_save(self, event: VideoSaveEvent) -> None: - self.logger.info(f"收到游戏结束事件: {event.rtime}s") - - def on_shutdown(self) -> None: - self.logger.info("TestControlPluginA 关闭") - - def on_control_auth_changed(self, command_type: type, granted: bool) -> None: - # if command_type == NewGameCommand: - # self.run_on_gui(self._widget.set_has_permission, granted) - pass diff --git a/src/plugins/test_control_b.py b/src/plugins/test_control_b.py deleted file mode 100644 index 7b5c168..0000000 --- a/src/plugins/test_control_b.py +++ /dev/null @@ -1,127 +0,0 @@ -""" -测试控制插件 B - -声明需要 NewGameCommand 控制权限 -""" -from __future__ import annotations - -from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QPushButton, QSpinBox -from PyQt5.QtCore import pyqtSignal, Qt - -from plugin_sdk import BasePlugin, PluginInfo, make_plugin_icon, WindowMode -from shared_types.commands import NewGameCommand -from shared_types.events import VideoSaveEvent - - -class TestControlWidgetB(QWidget): - """测试插件 B 的界面""" - - def __init__(self, parent=None): - super().__init__(parent) - - layout = QVBoxLayout(self) - - self._title = QLabel("测试控制插件 B") - self._title.setStyleSheet("font-size: 16px; font-weight: bold;") - layout.addWidget(self._title) - - self._status = QLabel("状态: 等待初始化...") - layout.addWidget(self._status) - - # 游戏参数输入 - self._rows_spin = QSpinBox() - self._rows_spin.setRange(1, 100) - self._rows_spin.setValue(16) - layout.addWidget(QLabel("行数:")) - layout.addWidget(self._rows_spin) - - self._cols_spin = QSpinBox() - self._cols_spin.setRange(1, 100) - self._cols_spin.setValue(30) - layout.addWidget(QLabel("列数:")) - layout.addWidget(self._cols_spin) - - self._mines_spin = QSpinBox() - self._mines_spin.setRange(1, 999) - self._mines_spin.setValue(99) - layout.addWidget(QLabel("雷数:")) - layout.addWidget(self._mines_spin) - - self._btn = QPushButton("开始新游戏") - self._btn.setEnabled(False) - layout.addWidget(self._btn) - - self._btn.clicked.connect(self._on_click) - - def set_has_permission(self, has: bool) -> None: - if has: - self._status.setText("状态: ✅ 已获得 NewGameCommand 权限") - self._btn.setEnabled(True) - else: - self._status.setText("状态: ❌ 未获得 NewGameCommand 权限") - self._btn.setEnabled(False) - - def get_params(self) -> tuple[int, int, int]: - return ( - self._rows_spin.value(), - self._cols_spin.value(), - self._mines_spin.value(), - ) - - def _on_click(self) -> None: - # 由插件连接 - pass - - -class TestControlPluginB(BasePlugin): - """测试控制插件 B""" - - @classmethod - def plugin_info(cls) -> PluginInfo: - return PluginInfo( - name="test_control_b", - version="1.0.0", - author="Test", - description="测试控制权限 B - NewGameCommand", - icon=make_plugin_icon("#2196f3", "B"), - window_mode=WindowMode.TAB, - required_controls=[NewGameCommand], # 声明需要的控制权限 - ) - - def _setup_subscriptions(self) -> None: - self.subscribe(VideoSaveEvent, self._on_video_save) - - def _create_widget(self) -> QWidget: - self._widget = TestControlWidgetB() - self._widget._btn.clicked.connect(self._on_new_game_click) - return self._widget - - def on_initialized(self) -> None: - self.logger.info("TestControlPluginB 初始化") - - # 检查是否有控制权限 - has_auth = self.has_control_auth(NewGameCommand) - self.logger.info(f"NewGameCommand 权限: {has_auth}") - - # 更新界面 - self.run_on_gui(self._widget.set_has_permission, has_auth) - - def on_control_auth_changed(self, command_type: type, granted: bool) -> None: - """权限变更回调""" - if command_type == NewGameCommand: - self.logger.info(f"NewGameCommand 权限变更: {granted}") - self.run_on_gui(self._widget.set_has_permission, granted) - - def _on_new_game_click(self) -> None: - rows, cols, mines = self._widget.get_params() - if self.has_control_auth(NewGameCommand): - self.logger.info(f"发送 NewGameCommand: {rows}x{cols}x{mines}") - self.send_command(NewGameCommand(rows=rows, cols=cols, mines=mines)) - else: - self.logger.warning("没有 NewGameCommand 权限") - - def _on_video_save(self, event: VideoSaveEvent) -> None: - self.logger.info(f"收到游戏结束事件: {event.rtime}s") - - def on_shutdown(self) -> None: - self.logger.info("TestControlPluginB 关闭") \ No newline at end of file diff --git a/src/shared_types/events.py b/src/shared_types/events.py index 79b4ac4..fa03989 100644 --- a/src/shared_types/events.py +++ b/src/shared_types/events.py @@ -13,7 +13,7 @@ class BoardUpdateEvent(BaseEvent, tag="board_update"): """ 棋盘更新事件 - 每次棋盘状态变化时发送 - + Attributes: rows: 行数 cols: 列数 @@ -37,7 +37,7 @@ class BoardUpdateEvent(BaseEvent, tag="board_update"): class GameStatusChangeEvent(BaseEvent, tag="game_status_change"): """ 游戏状态变化事件 - + Attributes: last_status: 上一个游戏状态 current_status: 当前游戏状态 @@ -51,6 +51,13 @@ class ContextChangeEvent(BaseEvent, tag="context_change"): pass +class ButtonClickEvent(BaseEvent, tag="button_click"): + """按钮点击事件""" + col = 0 + row = 0 + button = 0 + + class VideoSaveEvent(BaseEvent, tag="video_save"): """录像保存事件""" @@ -98,5 +105,6 @@ class VideoSaveEvent(BaseEvent, tag="video_save"): EVENT_TYPES = [ BoardUpdateEvent, GameStatusChangeEvent, + ButtonClickEvent, VideoSaveEvent, -] \ No newline at end of file +] diff --git a/src/shared_types/widgets/__init__.py b/src/shared_types/widgets/__init__.py new file mode 100644 index 0000000..4594118 --- /dev/null +++ b/src/shared_types/widgets/__init__.py @@ -0,0 +1,10 @@ +""" +共享控件模块 + +存放项目中共用的自定义控件 +""" +from .editable_combo_box import EditableComboBox + +__all__ = [ + "EditableComboBox", +] diff --git a/src/shared_types/widgets/editable_combo_box.py b/src/shared_types/widgets/editable_combo_box.py new file mode 100644 index 0000000..d9a213b --- /dev/null +++ b/src/shared_types/widgets/editable_combo_box.py @@ -0,0 +1,77 @@ +""" +可编辑的组合框控件 + +支持补全但不允许新增,焦点移开后验证输入 +""" + +from PyQt5.QtCore import Qt +from PyQt5.QtWidgets import QComboBox, QCompleter + + +class EditableComboBox(QComboBox): + """可编辑的组合框,支持补全但不允许新增,焦点移开后验证输入""" + + def __init__( + self, + items: list[str], + parent=None, + default_index: int = 0, + case_sensitive: bool = False, + filter_mode: str = "contains", + ): + """ + Args: + items: 下拉选项列表 + parent: 父控件 + default_index: 默认选中的索引 + case_sensitive: 补全时是否大小写敏感 + filter_mode: 补全过滤模式,"contains" 包含匹配,"startswith" 前缀匹配 + """ + super().__init__(parent) + self._items = list(items) + self._default_index = default_index + + self.setEditable(True) + self.addItems(items) + if items and 0 <= default_index < len(items): + self.setCurrentIndex(default_index) + + # 设置补全器 + completer = QCompleter(items, self) + completer.setCaseSensitivity( + Qt.CaseSensitive if case_sensitive else Qt.CaseInsensitive + ) + completer.setFilterMode( + Qt.MatchStartsWith if filter_mode == "startswith" else Qt.MatchContains + ) + completer.setCompletionMode(QCompleter.PopupCompletion) # 改为弹出式,更高效 + self.setCompleter(completer) + + # 焦点移开时验证输入 + self.lineEdit().editingFinished.connect(self._validate_input) + + def _validate_input(self): + """验证输入,如果不在有效列表中则恢复默认值""" + current_text = self.currentText() + if current_text not in self._items: + # 恢复为默认值 + if self._items: + self.setCurrentIndex(self._default_index) + else: + self.setCurrentText("") + + def setItems(self, items: list[str]): + """动态更新选项列表""" + old_text = self.currentText() + self._items = list(items) + self.clear() + self.addItems(items) + # 尝试恢复之前的选中项 + if old_text in items: + self.setCurrentText(old_text) + elif items and 0 <= self._default_index < len(items): + self.setCurrentIndex(self._default_index) + + def currentData(self) -> str: + """返回当前选中的值(始终返回有效值)""" + return self.currentText()