diff --git a/requirements.txt b/requirements.txt
index e518254..260a72a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,3 +7,4 @@ pywin32>=311
loguru>=0.7.3
debugpy>=1.8.20
PyQt5-stubs>=5.15.6.0
+requests>=2.33.1
diff --git a/src/plugin_manager/main_window.py b/src/plugin_manager/main_window.py
index 8e0f8a8..86e6c4d 100644
--- a/src/plugin_manager/main_window.py
+++ b/src/plugin_manager/main_window.py
@@ -49,6 +49,7 @@
from .settings_manager import SettingsManager
from plugin_sdk.plugin_base import PluginLifecycle, WindowMode, LogLevel
from plugin_sdk.control_auth import ControlAuthorizationManager
+from plugin_sdk.config_types import OtherInfoBase
from .app_paths import get_data_dir
if TYPE_CHECKING:
@@ -65,7 +66,7 @@
class DetachedPluginWindow(QDialog):
"""
弹出的独立插件窗口
-
+
特性:
- 关闭时自动将 widget 嵌回主窗口标签页
- 标题栏显示"📎 嵌回"提示
@@ -88,9 +89,9 @@ def __init__(self, plugin_name: str, widget: QWidget, parent=None):
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
-
- # 顶部提示栏
+ # 顶部提示栏
hint_bar = QLabel(self.tr("📎 关闭此窗口可自动嵌回到标签页"))
+ hint_bar.setFixedHeight(25)
hint_bar.setAlignment(Qt.AlignCenter)
hint_bar.setStyleSheet("""
QLabel {
@@ -102,13 +103,12 @@ def __init__(self, plugin_name: str, widget: QWidget, parent=None):
}
""")
layout.addWidget(hint_bar)
-
# 将 widget 从旧父窗口转移到新窗口,并确保可见
widget.setParent(self)
layout.addWidget(widget)
widget.setVisible(True)
widget.show()
-
+
def start_drag(self, global_pos: QPoint) -> None:
"""启动拖拽模式(从外部调用)"""
self._dragging = True
@@ -116,18 +116,18 @@ def start_drag(self, global_pos: QPoint) -> None:
self._drag_offset = global_pos - self.pos()
# 捕获鼠标
self.grabMouse()
-
+
def mousePressEvent(self, event) -> None:
if event.button() == Qt.LeftButton:
self._dragging = True
self._drag_offset = event.globalPos() - self.pos()
super().mousePressEvent(event)
-
+
def mouseMoveEvent(self, event) -> None:
if self._dragging and event.buttons() & Qt.LeftButton:
self.move(event.globalPos() - self._drag_offset)
super().mouseMoveEvent(event)
-
+
def mouseReleaseEvent(self, event) -> None:
if event.button() == Qt.LeftButton:
self._dragging = False
@@ -159,8 +159,9 @@ def mouseMoveEvent(self, event: QMouseEvent) -> None:
if (
self._drag_start_pos is not None
and (event.buttons() & Qt.LeftButton)
- ):
- distance = (event.globalPos() - self._drag_start_pos).manhattanLength()
+ ): # type: ignore
+ distance = (event.globalPos() -
+ self._drag_start_pos).manhattanLength()
if distance > QApplication.startDragDistance():
idx = self.tabAt(self.mapFromGlobal(self._drag_start_pos))
if idx >= 0:
@@ -293,7 +294,7 @@ def _attach_tab(self, name: str) -> None:
# 从窗口取出 widget
window.layout().removeWidget(widget)
- widget.setParent(None)
+ widget.setParent(None) # type: ignore
window.deleteLater()
del self._detached_windows[name]
@@ -322,7 +323,7 @@ def _on_tab_close_requested(self, index: int) -> None:
# 隐藏并移除标签页,不销毁 widget(保留插件数据)
self.removeTab(index)
widget.hide()
- widget.setParent(None)
+ widget.setParent(None) # type: ignore
self.tab_close_requested.emit(name)
@@ -367,7 +368,8 @@ def set_status(
"""更新连接状态显示"""
if connected:
self._status_label.setText(self.tr("● 已连接"))
- self._status_label.setStyleSheet("color: green; font-weight: bold;")
+ self._status_label.setStyleSheet(
+ "color: green; font-weight: bold;")
if reconnect_count > 0:
self._reconnect_label.setText(
self.tr("(重连 {n} 次").format(n=reconnect_count)
@@ -398,7 +400,7 @@ def __init__(
self,
plugin_name: str,
state: PluginState,
- other_info: "OtherInfoBase | None" = None,
+ other_info: OtherInfoBase | None = None,
parent=None,
):
super().__init__(parent)
@@ -430,8 +432,8 @@ def __init__(
form2 = QFormLayout(grp2)
self._combo_mode = QComboBox()
- for mode in WindowMode._values():
- label = WindowMode.LABELS.get(mode, mode)
+ for mode in WindowMode:
+ label = WindowMode.LABELS().get(mode, mode)
self._combo_mode.addItem(label, mode)
# WindowMode 继承自 str,直接 str() 转换
idx = self._combo_mode.findData(str(state.window_mode))
@@ -447,8 +449,8 @@ def __init__(
form3 = QFormLayout(grp3)
self._combo_loglevel = QComboBox()
- for level in LogLevel._values():
- label = LogLevel.LABELS.get(level, level)
+ for level in LogLevel:
+ label = LogLevel.LABELS().get(level, level)
self._combo_loglevel.addItem(label, level)
# LogLevel 继承自 str,直接 str() 转换
_lvl_idx = self._combo_loglevel.findData(str(state.log_level).upper())
@@ -506,63 +508,67 @@ def apply_config(self) -> None:
class BasicSettingsDialog(QDialog):
"""
插件管理器基础设置对话框
-
+
包含日志等级等基础配置。
"""
-
+
# 设置变更信号
settings_changed = pyqtSignal()
-
+
def __init__(self, settings_manager: "SettingsManager", parent=None) -> None:
super().__init__(parent)
self._settings_manager = settings_manager
self.setWindowTitle("基础设置")
self.setMinimumWidth(400)
self._setup_ui()
-
+
def _setup_ui(self) -> None:
layout = QVBoxLayout(self)
-
+
# ── 主进程日志设置组 ──
main_log_group = QGroupBox("主进程文件日志")
main_log_layout = QFormLayout(main_log_group)
-
+
self._file_log_level_combo = QComboBox()
self._file_log_level_combo.addItems(SettingsManager.LOG_LEVELS)
- index = self._file_log_level_combo.findText(self._settings_manager.file_log_level)
+ index = self._file_log_level_combo.findText(
+ self._settings_manager.file_log_level)
if index >= 0:
self._file_log_level_combo.setCurrentIndex(index)
-
+
file_log_label = QLabel("日志等级")
file_log_label.setToolTip("主进程日志文件的记录等级")
main_log_layout.addRow(file_log_label, self._file_log_level_combo)
-
+
layout.addWidget(main_log_group)
-
+
# ── 日志查看器设置组 ──
viewer_group = QGroupBox("日志查看器")
viewer_layout = QFormLayout(viewer_group)
-
+
self._viewer_log_level_combo = QComboBox()
self._viewer_log_level_combo.addItems(SettingsManager.LOG_LEVELS)
- index = self._viewer_log_level_combo.findText(self._settings_manager.viewer_log_level)
+ index = self._viewer_log_level_combo.findText(
+ self._settings_manager.viewer_log_level)
if index >= 0:
self._viewer_log_level_combo.setCurrentIndex(index)
-
+
viewer_log_label = QLabel("日志等级")
viewer_log_label.setToolTip("日志查看器显示的日志等级")
viewer_layout.addRow(viewer_log_label, self._viewer_log_level_combo)
-
+
self._auto_scroll_cb = QCheckBox()
- self._auto_scroll_cb.setChecked(self._settings_manager.viewer_auto_scroll)
+ self._auto_scroll_cb.setChecked(
+ self._settings_manager.viewer_auto_scroll)
viewer_layout.addRow("自动滚动", self._auto_scroll_cb)
-
+
self._show_source_cb = QCheckBox()
- self._show_source_cb.setChecked(self._settings_manager.viewer_show_source)
+ self._show_source_cb.setChecked(
+ self._settings_manager.viewer_show_source)
viewer_layout.addRow("显示来源", self._show_source_cb)
-
+
layout.addWidget(viewer_group)
-
+
# 按钮盒
button_box = QDialogButtonBox(
QDialogButtonBox.Ok | QDialogButtonBox.Cancel
@@ -570,35 +576,37 @@ def _setup_ui(self) -> None:
button_box.accepted.connect(self._on_accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
-
+
def _on_accept(self) -> None:
"""保存设置"""
changed = False
-
+
# 主进程文件日志等级
new_file_level = self._file_log_level_combo.currentText()
if new_file_level != self._settings_manager.file_log_level:
- self._settings_manager.set_file_log_level(new_file_level) # type: ignore
+ self._settings_manager.set_file_log_level(
+ new_file_level) # type: ignore
changed = True
-
+
# 日志查看器等级
new_viewer_level = self._viewer_log_level_combo.currentText()
if new_viewer_level != self._settings_manager.viewer_log_level:
- self._settings_manager.set_viewer_log_level(new_viewer_level) # type: ignore
+ self._settings_manager.set_viewer_log_level(
+ new_viewer_level) # type: ignore
changed = True
-
+
# 自动滚动
new_auto_scroll = self._auto_scroll_cb.isChecked()
if new_auto_scroll != self._settings_manager.viewer_auto_scroll:
self._settings_manager.set_viewer_auto_scroll(new_auto_scroll)
changed = True
-
+
# 显示来源
new_show_source = self._show_source_cb.isChecked()
if new_show_source != self._settings_manager.viewer_show_source:
self._settings_manager.set_viewer_show_source(new_show_source)
changed = True
-
+
if changed:
self.settings_changed.emit()
self.accept()
@@ -611,16 +619,16 @@ def _on_accept(self) -> None:
class LogViewerDialog(QDialog):
"""
日志查看对话框(非模态)
-
+
通过 loguru sink 实时显示日志。
"""
-
+
# 日志信号: time_str, level, source, message
_log_signal = pyqtSignal(str, str, str, str)
-
+
# 支持的日志等级
LOG_LEVELS = ["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
-
+
def __init__(
self,
plugin_names: list[str],
@@ -643,64 +651,64 @@ def __init__(
self.setWindowTitle("日志查看")
self.setMinimumSize(900, 600)
self.setWindowFlags(self.windowFlags() | Qt.WindowMaximizeButtonHint)
-
+
self._plugin_names = plugin_names
self._current_log = initial_log
self._current_level = initial_level
self._auto_scroll_default = auto_scroll
self._show_source_default = show_source
self._sink_id: int | None = None
-
+
self._setup_ui()
self._log_signal.connect(self._append_log_line)
self._attach_sink(initial_log)
-
+
def _setup_ui(self) -> None:
layout = QVBoxLayout(self)
-
+
# 顶部:日志选择
top_layout = QHBoxLayout()
-
+
top_layout.addWidget(QLabel("日志源:"))
-
+
self._log_combo = QComboBox()
self._log_combo.addItem("主进程", "main")
for name in self._plugin_names:
self._log_combo.addItem(f"插件: {name}", name)
self._log_combo.currentIndexChanged.connect(self._on_log_changed)
top_layout.addWidget(self._log_combo)
-
+
top_layout.addSpacing(20)
-
+
# 日志等级
top_layout.addWidget(QLabel("等级:"))
-
+
self._level_combo = QComboBox()
self._level_combo.addItems(self.LOG_LEVELS)
self._level_combo.setCurrentText(self._current_level)
self._level_combo.currentIndexChanged.connect(self._on_level_changed)
top_layout.addWidget(self._level_combo)
-
+
top_layout.addSpacing(20)
-
+
# 自动滚动
self._auto_scroll_cb = QCheckBox("自动滚动")
self._auto_scroll_cb.setChecked(self._auto_scroll_default)
top_layout.addWidget(self._auto_scroll_cb)
-
+
# 显示来源
self._show_source_cb = QCheckBox("显示来源")
self._show_source_cb.setChecked(self._show_source_default)
top_layout.addWidget(self._show_source_cb)
-
+
# 清空按钮
clear_btn = QPushButton("清空")
clear_btn.clicked.connect(self._clear_log)
top_layout.addWidget(clear_btn)
-
+
top_layout.addStretch()
layout.addLayout(top_layout)
-
+
# 中部:日志内容
self._log_view = QPlainTextEdit()
self._log_view.setReadOnly(True)
@@ -714,12 +722,12 @@ def _setup_ui(self) -> None:
}
""")
layout.addWidget(self._log_view)
-
+
# 底部:按钮
button_box = QDialogButtonBox(QDialogButtonBox.Close)
button_box.rejected.connect(self.close)
layout.addWidget(button_box)
-
+
def _attach_sink(self, log_name: str) -> None:
"""添加 loguru sink"""
# 移除旧 sink
@@ -729,36 +737,37 @@ def _attach_sink(self, log_name: str) -> None:
except ValueError:
pass
self._sink_id = None
-
+
self._current_log = log_name
-
+
# 清空显示
self._log_view.clear()
-
+
# 保存信号引用(闭包需要)
log_signal = self._log_signal
-
+
# 根据日志源设置过滤器
if log_name == "main":
# 主进程日志:排除插件日志
- def filter_func(record):
- return "plugin" not in record["extra"]
+ def filter_func(record): # type: ignore
+ return True
else:
# 插件日志:只显示该插件
def filter_func(record, pn=log_name):
return record["extra"].get("plugin") == pn
-
+
# sink 函数
def sink_write(message):
record = message.record
time_obj = record["time"]
- time_str = time_obj.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 去掉最后3位微秒
+ time_str = time_obj.strftime(
+ "%Y-%m-%d %H:%M:%S.%f")[:-3] # 去掉最后3位微秒
level = record["level"].name
# 来源信息: name:function:line
source = f"{record['name']}:{record['function']}:{record['line']}"
text = str(message)
log_signal.emit(time_str, level, source, text)
-
+
# 添加 sink
self._sink_id = loguru.logger.add(
sink_write,
@@ -766,44 +775,49 @@ def sink_write(message):
filter=filter_func,
format="{message}",
)
- logger.debug(f"Log viewer sink attached: {log_name}, level={self._current_level}, sink_id={self._sink_id}")
-
+ logger.debug(
+ f"Log viewer sink attached: {log_name}, level={self._current_level}, sink_id={self._sink_id}")
+
def _append_log_line(self, time_str: str, level: str, source: str, message: str) -> None:
"""追加一行日志"""
if self._show_source_cb.isChecked():
line = f"{time_str} | {level:<7} | {source} | {message}"
else:
line = f"{time_str} | {level:<7} | {message}"
-
+
# 简单的颜色标记
if "ERROR" in level or "CRITICAL" in level:
- self._log_view.appendHtml(f'{line}')
+ self._log_view.appendHtml(
+ f'{line}')
elif "WARNING" in level:
- self._log_view.appendHtml(f'{line}')
+ self._log_view.appendHtml(
+ f'{line}')
elif "DEBUG" in level or "TRACE" in level:
- self._log_view.appendHtml(f'{line}')
+ self._log_view.appendHtml(
+ f'{line}')
elif "INFO" in level:
- self._log_view.appendHtml(f'{line}')
+ self._log_view.appendHtml(
+ f'{line}')
else:
self._log_view.appendPlainText(line)
-
+
if self._auto_scroll_cb.isChecked():
self._log_view.ensureCursorVisible()
-
+
def _on_log_changed(self, index: int) -> None:
"""日志源切换"""
log_name = self._log_combo.itemData(index)
self._attach_sink(log_name)
-
+
def _on_level_changed(self, index: int) -> None:
"""日志等级切换"""
self._current_level = self._level_combo.currentText()
self._attach_sink(self._current_log)
-
+
def _clear_log(self) -> None:
"""清空日志显示"""
self._log_view.clear()
-
+
def closeEvent(self, event) -> None:
"""关闭时移除 sink"""
if self._sink_id is not None:
@@ -813,7 +827,7 @@ def closeEvent(self, event) -> None:
pass
self._sink_id = None
super().closeEvent(event)
-
+
def show_log(self, log_name: str) -> None:
"""切换到指定日志"""
# 如果日志源不在列表中,添加它
@@ -821,10 +835,10 @@ def show_log(self, log_name: str) -> None:
if index < 0 and log_name != "main":
self._log_combo.addItem(f"插件: {log_name}", log_name)
index = self._log_combo.count() - 1
-
+
if index >= 0:
self._log_combo.setCurrentIndex(index)
-
+
def update_settings(self, level: str, auto_scroll: bool, show_source: bool) -> None:
"""更新设置"""
# 更新日志等级
@@ -832,10 +846,10 @@ def update_settings(self, level: str, auto_scroll: bool, show_source: bool) -> N
self._current_level = level
self._level_combo.setCurrentText(level)
self._attach_sink(self._current_log)
-
+
# 更新自动滚动
self._auto_scroll_cb.setChecked(auto_scroll)
-
+
# 更新显示来源
self._show_source_cb.setChecked(show_source)
@@ -847,11 +861,11 @@ def update_settings(self, level: str, auto_scroll: bool, show_source: bool) -> N
class ControlAuthorizationDialog(QDialog):
"""
控制授权配置对话框
-
+
管理插件对控制命令的使用权限。
只显示声明了需要该控制权限的插件。
"""
-
+
def __init__(
self,
plugin_controls: dict[str, list[type]],
@@ -865,17 +879,17 @@ def __init__(
super().__init__(parent)
self._plugin_controls = plugin_controls
self._auth_manager = ControlAuthorizationManager.instance()
-
+
self.setWindowTitle(self.tr("控制授权配置"))
self.setMinimumWidth(500)
self.setMinimumHeight(300)
-
+
self._setup_ui()
self._load_authorizations()
-
+
def _setup_ui(self) -> None:
layout = QVBoxLayout(self)
-
+
# 说明文字
info_label = QLabel(self.tr(
"每个控制命令只能授权给一个插件。\n"
@@ -884,7 +898,7 @@ def _setup_ui(self) -> None:
))
info_label.setStyleSheet("color: gray; padding: 8px;")
layout.addWidget(info_label)
-
+
# 表格
self._table = QTableWidget()
self._table.setColumnCount(3)
@@ -900,7 +914,7 @@ def _setup_ui(self) -> None:
self._table.setEditTriggers(QTableWidget.NoEditTriggers)
self._table.setAlternatingRowColors(True)
layout.addWidget(self._table)
-
+
# 按钮
btns = QDialogButtonBox(
QDialogButtonBox.Ok | QDialogButtonBox.Cancel
@@ -908,26 +922,26 @@ def _setup_ui(self) -> None:
btns.accepted.connect(self._on_accept)
btns.rejected.connect(self.reject)
layout.addWidget(btns)
-
+
def _load_authorizations(self) -> None:
"""加载授权状态到表格"""
control_types = self._auth_manager.get_all_control_types()
status = self._auth_manager.get_authorization_status()
-
+
self._table.setRowCount(len(control_types))
self._combos: list[QComboBox] = []
-
+
for row, cmd_type in enumerate(control_types):
try:
tag = self._auth_manager._get_tag(cmd_type)
except ValueError:
continue
-
+
# 控制命令名称
name_item = QTableWidgetItem(cmd_type.__name__)
name_item.setData(Qt.UserRole, cmd_type) # 存储类型
self._table.setItem(row, 0, name_item)
-
+
# 找出声明了该控制权限的插件
eligible_plugins = [
plugin_name
@@ -937,34 +951,35 @@ def _load_authorizations(self) -> None:
for c in controls
)
]
-
+
# 插件下拉框
combo = QComboBox()
combo.addItem(self.tr("未授权"), None) # index 0 = 未授权
-
+
if eligible_plugins:
for plugin_name in sorted(eligible_plugins):
combo.addItem(plugin_name, plugin_name)
else:
# 没有插件声明需要该权限,禁用下拉框
combo.setEnabled(False)
-
+
# 设置当前值
current_plugin = status.get(tag)
if current_plugin and current_plugin in eligible_plugins:
idx = combo.findData(current_plugin)
if idx >= 0:
combo.setCurrentIndex(idx)
-
+
self._table.setCellWidget(row, 1, combo)
self._combos.append(combo)
-
+
# 状态显示
self._update_status(row, current_plugin, eligible_plugins)
-
+
# 连接下拉框变化
- combo.currentIndexChanged.connect(lambda _, r=row: self._on_combo_changed(r))
-
+ combo.currentIndexChanged.connect(
+ lambda _, r=row: self._on_combo_changed(r))
+
def _is_same_command_type(self, type1: type, type2: type) -> bool:
"""判断两个命令类型是否相同(通过 tag)"""
try:
@@ -973,7 +988,7 @@ def _is_same_command_type(self, type1: type, type2: type) -> bool:
return tag1 == tag2
except ValueError:
return type1 is type2
-
+
def _update_status(
self,
row: int,
@@ -991,27 +1006,28 @@ def _update_status(
status_item = QTableWidgetItem(self.tr("○ 未授权"))
status_item.setForeground(QColor("#ff9800"))
self._table.setItem(row, 2, status_item)
-
+
def _on_combo_changed(self, row: int) -> None:
"""下拉框变化时更新状态"""
- combo = self._table.cellWidget(row, 1)
+ combo: QComboBox = self._table.cellWidget(row, 1) # type: ignore
plugin_name = combo.currentData()
self._update_status(row, plugin_name, []) # 简化,不重新计算 eligible_plugins
-
+
def _on_accept(self) -> None:
"""确定按钮:保存授权"""
for row in range(self._table.rowCount()):
- name_item = self._table.item(row, 0)
- combo = self._table.cellWidget(row, 1)
-
+ name_item: QTableWidgetItem = self._table.item(
+ row, 0) # type: ignore
+ combo: QComboBox = self._table.cellWidget(row, 1) # type: ignore
+
cmd_type = name_item.data(Qt.UserRole)
plugin_name = combo.currentData()
-
+
if plugin_name is None:
self._auth_manager.revoke(cmd_type)
else:
self._auth_manager.authorize(cmd_type, plugin_name)
-
+
self._auth_manager.save()
self.accept()
@@ -1031,9 +1047,10 @@ def __init__(self, plugin_manager: PluginManager, parent=None):
self._manager = plugin_manager
# 状态持久化
- self._state_mgr = PluginStateManager(get_data_dir() / "plugin_states.json")
+ self._state_mgr = PluginStateManager(
+ get_data_dir() / "plugin_states.json")
self._state_mgr.load()
-
+
# 设置管理
self._settings_mgr = SettingsManager(get_data_dir())
@@ -1064,36 +1081,36 @@ def _setup_ui(self) -> None:
"""构建界面"""
# ── 菜单栏 ──
menubar = self.menuBar()
-
+
# 选项菜单
options_menu = menubar.addMenu(self.tr("选项"))
-
+
# 设置子菜单
settings_menu = options_menu.addMenu(self.tr("设置"))
-
+
# 基础设置动作
act_basic_settings = settings_menu.addAction(self.tr("基础设置..."))
act_basic_settings.triggered.connect(self._open_basic_settings_dialog)
-
+
# 控制授权动作
act_control_auth = settings_menu.addAction(self.tr("控制授权..."))
act_control_auth.triggered.connect(self._open_control_auth_dialog)
-
+
settings_menu.addSeparator()
-
+
# 调试动作
self._debug_act = settings_menu.addAction(self.tr("启动调试"))
self._debug_act.triggered.connect(self._start_debug)
-
+
options_menu.addSeparator()
-
+
# 插件开发指南动作
act_dev_guide = options_menu.addAction(self.tr("插件开发指南"))
act_dev_guide.triggered.connect(self._open_dev_guide)
-
+
# ── 查看菜单 ──
view_menu = menubar.addMenu(self.tr("查看"))
-
+
# 日志查看动作
act_log_viewer = view_menu.addAction(self.tr("日志查看"))
act_log_viewer.triggered.connect(lambda: self._open_log_viewer())
@@ -1225,7 +1242,7 @@ def _really_quit(self) -> None:
self._tray_icon.hide()
self._state_mgr.save()
self._manager.stop()
- QApplication.instance().quit()
+ QApplication.instance().quit() # type: ignore
def _open_global_settings(self) -> None:
"""打开全局设置(目前显示控制授权对话框)"""
@@ -1299,7 +1316,8 @@ def _open_dev_guide(self) -> None:
def _connect_signals(self) -> None:
self._refresh_btn.clicked.connect(self._refresh_plugin_list)
self._list.itemDoubleClicked.connect(self._on_list_double_clicked)
- self._list.customContextMenuRequested.connect(self._on_list_context_menu)
+ self._list.customContextMenuRequested.connect(
+ self._on_list_context_menu)
self.connection_changed.connect(self._on_conn_changed)
# 控制授权按钮
@@ -1328,12 +1346,12 @@ def _open_log_viewer(self, initial_log: str = "main") -> None:
"""打开日志查看对话框(非模态)"""
# 获取所有插件名称
plugin_names = list(self._manager.plugins.keys())
-
+
# 获取日志查看器设置
viewer_level = self._settings_mgr.viewer_log_level
auto_scroll = self._settings_mgr.viewer_auto_scroll
show_source = self._settings_mgr.viewer_show_source
-
+
# 创建或复用对话框
if not hasattr(self, "_log_viewer_dlg") or self._log_viewer_dlg is None:
self._log_viewer_dlg = LogViewerDialog(
@@ -1341,9 +1359,10 @@ def _open_log_viewer(self, initial_log: str = "main") -> None:
)
else:
# 更新设置并切换到指定日志
- self._log_viewer_dlg.update_settings(viewer_level, auto_scroll, show_source)
+ self._log_viewer_dlg.update_settings(
+ viewer_level, auto_scroll, show_source)
self._log_viewer_dlg.show_log(initial_log)
-
+
self._log_viewer_dlg.show()
self._log_viewer_dlg.raise_()
self._log_viewer_dlg.activateWindow()
@@ -1352,13 +1371,13 @@ def _open_control_auth_dialog(self) -> None:
"""打开控制授权配置对话框"""
# 获取插件声明需要的控制权限
plugin_controls: dict[str, list[type]] = {}
-
+
for p in self._manager.plugins.values():
if p.lifecycle == PluginLifecycle.READY:
required = p.info.required_controls or []
if required:
plugin_controls[p.name] = required
-
+
dialog = ControlAuthorizationDialog(plugin_controls, self)
dialog.exec_()
@@ -1398,11 +1417,11 @@ def _start_debug(self) -> None:
# 解决 PyInstaller 打包后子进程找不到 Python/debugpy 的问题
debugpy.listen(("0.0.0.0", 5678), in_process_debug_adapter=True)
PluginManagerWindow._debug_active = True
-
+
# 启动后禁用菜单项,不可重复启动
self._debug_act.setText(self.tr("调试已启动"))
self._debug_act.setEnabled(False)
-
+
self.statusBar().showMessage(self.tr("调试服务已在端口 5678 启动,等待 VS Code 连接。重启插件管理器可关闭调试。"))
logger.info("Debug server started on port 5678")
except ImportError as e:
@@ -1411,7 +1430,8 @@ def _start_debug(self) -> None:
f"debugpy import failed:\n{e}",
)
except Exception as e:
- QMessageBox.warning(self, "Debug", f"Failed to start debugger:\n{e}")
+ QMessageBox.warning(
+ self, "Debug", f"Failed to start debugger:\n{e}")
# ── 插件列表 ────────────────────────────────────────
@@ -1475,7 +1495,7 @@ def _refresh_plugin_list(self) -> None:
# 恢复选中项
if current_name:
for i in range(lst.count()):
- if lst.item(i).data(Qt.UserRole) == current_name:
+ if lst.item(i).data(Qt.UserRole) == current_name: # type: ignore
lst.setCurrentRow(i)
break
@@ -1517,7 +1537,8 @@ def _on_list_context_menu(self, pos) -> None:
act_enable = menu.addAction("✅ " + self.tr("启用"))
act_disable = menu.addAction("❌ " + self.tr("禁用"))
act_enable.setEnabled(can_control and not plugin.is_enabled)
- act_disable.setEnabled(lc == PluginLifecycle.READY and plugin.is_enabled)
+ act_disable.setEnabled(
+ lc == PluginLifecycle.READY and plugin.is_enabled)
act_enable.triggered.connect(lambda: self._toggle_plugin(name, True))
act_disable.triggered.connect(lambda: self._toggle_plugin(name, False))
@@ -1525,11 +1546,15 @@ def _on_list_context_menu(self, pos) -> None:
# 插件详情(子菜单,只读)
detail_menu = QMenu("ℹ️ " + self.tr("插件详情"), self)
- detail_menu.addAction(self.tr("名称: {name}").format(name=plugin.name)).setEnabled(False)
- detail_menu.addAction(self.tr("版本: {v}").format(v=plugin.info.version)).setEnabled(False)
- detail_menu.addAction(self.tr("作者: {a}").format(a=plugin.info.author or '-')).setEnabled(False)
+ detail_menu.addAction(self.tr("名称: {name}").format(
+ name=plugin.name)).setEnabled(False)
+ detail_menu.addAction(self.tr("版本: {v}").format(
+ v=plugin.info.version)).setEnabled(False)
+ detail_menu.addAction(self.tr("作者: {a}").format(
+ a=plugin.info.author or '-')).setEnabled(False)
desc = plugin.info.description or self.tr("暂无描述")
- detail_menu.addAction(self.tr("描述: {d}").format(d=desc)).setEnabled(False)
+ detail_menu.addAction(
+ self.tr("描述: {d}").format(d=desc)).setEnabled(False)
menu.addMenu(detail_menu)
menu.addSeparator()
@@ -1543,9 +1568,11 @@ def _on_list_context_menu(self, pos) -> None:
act_close = menu.addAction("🚫 " + self.tr("关闭窗口"))
# 窗口操作只有就绪状态才可用
- can_open = lc == PluginLifecycle.READY and (has_closed or (not has_tab and plugin.widget is not None))
+ can_open = lc == PluginLifecycle.READY and (
+ has_closed or (not has_tab and plugin.widget is not None))
act_open.setEnabled(can_open)
- act_close.setEnabled(lc == PluginLifecycle.READY and (has_tab or has_detached))
+ act_close.setEnabled(
+ lc == PluginLifecycle.READY and (has_tab or has_detached))
# 打开日志文件
act_log = menu.addAction("📋 " + self.tr("打开日志"))
@@ -1558,7 +1585,8 @@ def _on_list_context_menu(self, pos) -> None:
# 设置
act_settings = menu.addAction("⚙️ " + self.tr("设置..."))
- act_settings.triggered.connect(lambda: self._open_plugin_settings(name))
+ act_settings.triggered.connect(
+ lambda: self._open_plugin_settings(name))
menu.exec_(self._list.viewport().mapToGlobal(pos))
@@ -1613,7 +1641,7 @@ def _close_plugin_window(self, name: str) -> None:
widget = item.widget()
if widget is not None:
lay.removeWidget(widget)
- widget.setParent(None)
+ widget.setParent(None) # type: ignore
window.blockSignals(True) # 防止 closeEvent 二次触发
window.close()
window.deleteLater()
@@ -1625,7 +1653,7 @@ def _close_plugin_window(self, name: str) -> None:
widget = t.widget(i)
t.removeTab(i)
widget.hide()
- widget.setParent(None)
+ widget.setParent(None) # type: ignore
break
self._closed_plugins.add(name)
@@ -1652,7 +1680,7 @@ def _open_plugin_settings(self, name: str) -> None:
# 应用插件自定义配置
if other_info:
dlg.apply_config()
- plugin.save_config()
+ plugin.save_config() # type: ignore
# 立即应用启用/禁用
if current.enabled != new_state.enabled:
@@ -1661,7 +1689,7 @@ def _open_plugin_settings(self, name: str) -> None:
else:
self._close_plugin_window(name)
self._manager.disable_plugin(name)
-
+
# 立即应用窗口模式变化
if current.window_mode != new_state.window_mode:
t = self._tab_widget
@@ -1686,7 +1714,7 @@ def _open_plugin_settings(self, name: str) -> None:
self._closed_plugins.discard(name)
if name in t._detached_windows:
t._attach_tab(name)
-
+
# 立即应用日志级别
if plugin and current.log_level != new_state.log_level:
plugin.set_log_level(new_state.log_level)
@@ -1797,5 +1825,3 @@ def closeEvent(self, event) -> None:
event.accept()
else:
event.ignore()
-
-
diff --git a/src/plugin_sdk/plugin_base.py b/src/plugin_sdk/plugin_base.py
index 3fbb1c7..1048e4e 100644
--- a/src/plugin_sdk/plugin_base.py
+++ b/src/plugin_sdk/plugin_base.py
@@ -26,16 +26,14 @@
from abc import abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass, field
-from enum import Enum
+from enum import Enum, StrEnum
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Generic, Type, TypeVar, cast
-
+from .config_types import OtherInfoBase
_E = TypeVar("_E", bound="BaseEvent")
_T = TypeVar("_T") # 用于服务获取方法的泛型
if TYPE_CHECKING:
- from .config_types import OtherInfoBase
from plugin_manager.logging_setup import LogConfig
-if TYPE_CHECKING:
from PyQt5.QtGui import QIcon
@@ -89,22 +87,24 @@ def make_plugin_icon(
return QIcon(pix)
-class WindowMode(str):
+class WindowMode(StrEnum):
"""窗口加载方式枚举"""
TAB = "tab" # 标签页内加载
DETACHED = "detached" # 独立窗口加载
CLOSED = "closed" # 不自动加载
@classmethod
- def _values(cls) -> list[str]:
- return [cls.TAB, cls.DETACHED, cls.CLOSED]
+ def _values(cls):
+ return [level.value for level in cls]
+
+ @classmethod
+ def LABELS(cls):
- # 用于 QComboBox 的显示标签映射
- LABELS = {
- TAB: "标签页内",
- DETACHED: "独立窗口",
- CLOSED: "不自动加载",
- }
+ return {
+ cls.TAB: "标签页内",
+ cls.DETACHED: "独立窗口",
+ cls.CLOSED: "不自动加载",
+ }
class _ServiceProxy:
@@ -146,7 +146,7 @@ class PluginLifecycle(str, Enum):
STOPPED = "STOPPED" # 已停止
-class LogLevel(str):
+class LogLevel(StrEnum):
"""日志级别枚举"""
TRACE = "TRACE"
DEBUG = "DEBUG"
@@ -155,17 +155,18 @@ class LogLevel(str):
ERROR = "ERROR"
@classmethod
- def _values(cls) -> list[str]:
- return [cls.TRACE, cls.DEBUG, cls.INFO, cls.WARNING, cls.ERROR]
+ def _values(cls):
+ return [level.value for level in cls]
- # 用于 QComboBox 的显示标签(中文友好)
- LABELS = {
- TRACE: "TRACE (最详细)",
- DEBUG: "DEBUG",
- INFO: "INFO (常规)",
- WARNING: "WARNING",
- ERROR: "ERROR (仅错误)",
- }
+ @classmethod
+ def LABELS(cls):
+ return {
+ cls.TRACE: "TRACE (最详细)",
+ cls.DEBUG: "DEBUG",
+ cls.INFO: "INFO (常规)",
+ cls.WARNING: "WARNING",
+ cls.ERROR: "ERROR (仅错误)",
+ }
@dataclass
@@ -179,8 +180,8 @@ class PluginInfo(Generic[ConfigT]):
enabled: bool = True # 是否启用
priority: int = 100 # 优先级(数值越小越先执行)
show_window: bool = True # 初始化时是否显示窗口
- window_mode: WindowMode = cast(WindowMode, "tab") # 窗口加载方式
- log_level: LogLevel = cast(LogLevel, "DEBUG") # 默认日志级别
+ window_mode: WindowMode = WindowMode.TAB # 窗口加载方式
+ log_level: LogLevel = LogLevel.INFO # 默认日志级别
icon: QIcon | None = None # 插件图标,None 使用默认蓝色问号
log_config: "LogConfig | None" = None # 日志轮转配置,None 使用全局默认值
# 插件自定义配置类(继承自 OtherInfoBase)
@@ -361,7 +362,7 @@ def set_log_level(self, level: LogLevel | str) -> None:
if isinstance(level, str):
level = LogLevel(level.upper())
self._log_level = level
- set_plugin_log_level(self._log_sink_id, level)
+ set_plugin_log_level(self._log_sink_id, level.value)
self.logger.debug(f"Log level changed to {level}")
@property
@@ -581,7 +582,10 @@ def shutdown(self) -> None:
self._registered_protocols.clear()
if self._widget:
- self._widget.deleteLater()
+ try:
+ self._widget.deleteLater()
+ except RuntimeError:
+ pass
self._widget = None
# 清空队列残留事件
diff --git a/src/plugins/history/columns_dialog.py b/src/plugins/history/columns_dialog.py
new file mode 100644
index 0000000..c45b026
--- /dev/null
+++ b/src/plugins/history/columns_dialog.py
@@ -0,0 +1,76 @@
+"""
+列显示设置对话框
+"""
+
+from __future__ import annotations
+
+from PyQt5.QtCore import QCoreApplication
+from PyQt5.QtWidgets import (
+ QWidget,
+ QVBoxLayout,
+ QHBoxLayout,
+ QPushButton,
+ QScrollArea,
+ QCheckBox,
+ QDialog,
+)
+
+_translate = QCoreApplication.translate
+
+
+class ColumnsDialog(QDialog):
+ """列显示设置对话框"""
+
+ def __init__(self, headers: list[str], show_fields: list[str], parent=None):
+ super().__init__(parent)
+ self.setWindowTitle(_translate("Form", "列设置"))
+ self.resize(300, 500)
+ self._headers = headers
+
+ layout = QVBoxLayout(self)
+
+ # 全选/取消全选
+ select_layout = QHBoxLayout()
+ self.select_all_btn = QPushButton(_translate("Form", "全选"))
+ self.deselect_all_btn = QPushButton(_translate("Form", "取消全选"))
+ select_layout.addWidget(self.select_all_btn)
+ select_layout.addWidget(self.deselect_all_btn)
+ layout.addLayout(select_layout)
+
+ # 勾选列表
+ scroll = QScrollArea(self)
+ scroll.setWidgetResizable(True)
+ scroll_widget = QWidget()
+ scroll_layout = QVBoxLayout(scroll_widget)
+ scroll_layout.setContentsMargins(4, 4, 4, 4)
+
+ self.checks: dict[str, QCheckBox] = {}
+ for field in headers:
+ cb = QCheckBox(field)
+ cb.setChecked(field in show_fields)
+ scroll_layout.addWidget(cb)
+ self.checks[field] = cb
+
+ scroll_layout.addStretch()
+ scroll.setWidget(scroll_widget)
+ layout.addWidget(scroll)
+
+ # 确定按钮
+ self.ok_button = QPushButton(_translate("Form", "确定"))
+ layout.addWidget(self.ok_button)
+
+ self.select_all_btn.clicked.connect(self._select_all)
+ self.deselect_all_btn.clicked.connect(self._deselect_all)
+ self.ok_button.clicked.connect(self.accept)
+
+ def _select_all(self):
+ for cb in self.checks.values():
+ cb.setChecked(True)
+
+ def _deselect_all(self):
+ for cb in self.checks.values():
+ cb.setChecked(False)
+
+ def get_show_fields(self) -> list[str]:
+ """获取当前勾选的字段集合"""
+ return [field for field, cb in self.checks.items() if cb.isChecked()]
diff --git a/src/plugins/history/delegates.py b/src/plugins/history/delegates.py
new file mode 100644
index 0000000..e478545
--- /dev/null
+++ b/src/plugins/history/delegates.py
@@ -0,0 +1,280 @@
+"""
+表格代理组件
+"""
+
+from __future__ import annotations
+
+from datetime import datetime
+
+from PyQt5.QtCore import Qt, QModelIndex
+from PyQt5.QtGui import QPalette
+from PyQt5.QtWidgets import (
+ QComboBox,
+ QLineEdit,
+ QSpinBox,
+ QDoubleSpinBox,
+ QDateTimeEdit,
+ QStyledItemDelegate,
+ QStyle,
+ QApplication,
+)
+
+from shared_types.widgets import EditableComboBox
+
+from .models import HistoryData, CompareSymbol
+
+
+class ComboBoxDelegate(QStyledItemDelegate):
+ """通用的 ComboBox 代理"""
+
+ def __init__(self, items: list[str], parent=None):
+ super().__init__(parent)
+ self._items = items
+
+ def createEditor(self, parent, option, index):
+ editor = QComboBox(parent)
+ editor.addItems(self._items)
+ return editor
+
+ def setEditorData(self, editor: QComboBox, index):
+ value = index.model().data(index, Qt.EditRole)
+ if value:
+ idx = editor.findText(value)
+ if idx >= 0:
+ editor.setCurrentIndex(idx)
+
+ def setModelData(self, editor: QComboBox, model, index):
+ model.setData(index, editor.currentText(), Qt.EditRole)
+
+ def updateEditorGeometry(self, editor, option, index):
+ editor.setGeometry(option.rect)
+
+
+class EditableComboBoxDelegate(QStyledItemDelegate):
+ """可编辑的 ComboBox 代理(带补全)"""
+
+ def __init__(self, items: list[str], parent=None):
+ super().__init__(parent)
+ self._items = items
+
+ def createEditor(self, parent, option, index):
+ editor = EditableComboBox(self._items, parent)
+ return editor
+
+ def setEditorData(self, editor: EditableComboBox, index):
+ value = index.model().data(index, Qt.EditRole)
+ if value:
+ editor.setCurrentText(value)
+
+ def setModelData(self, editor: EditableComboBox, model, index):
+ model.setData(index, editor.currentText(), Qt.EditRole)
+
+ def updateEditorGeometry(self, editor, option, index):
+ editor.setGeometry(option.rect)
+
+
+class FilterValueDelegate(QStyledItemDelegate):
+ """值列的智能代理,根据同行字段类型动态决定编辑器"""
+
+ COL_FIELD = 1 # FilterModel.COL_FIELD
+ COL_COMPARE = 2 # FilterModel.COL_COMPARE
+
+ def __init__(self, float_decimals: int = 2, parent=None):
+ super().__init__(parent)
+ self._float_decimals = float_decimals
+ self._editor_widgets = [] # 缓存创建的编辑器widget
+
+ def paint(self, painter, option, index):
+ """根据字段类型绘制单元格"""
+ # 检查选中状态
+ is_selected = option.state & QStyle.State_Selected
+
+ if is_selected: # type: ignore
+ # 选中时绘制背景
+ painter.fillRect(option.rect, option.palette.highlight())
+ # 使用高亮文本颜色
+ text_role = QPalette.HighlightedText
+ else:
+ text_role = QPalette.WindowText
+
+ field_value, _, _ = self._get_field_info(index)
+ raw_value = index.data(Qt.EditRole)
+
+ if field_value is None or raw_value is None:
+ super().paint(painter, option, index)
+ return
+
+ display_text = str(raw_value)
+
+ if isinstance(field_value, datetime):
+ # 日期类型显示为可读格式
+ try:
+ ts = int(raw_value)
+ if ts > 1e15: # 微秒
+ ts = ts / 1_000_000
+ elif ts > 1e12: # 毫秒
+ ts = ts / 1_000
+ dt = datetime.fromtimestamp(ts)
+ display_text = dt.strftime("%Y-%m-%d %H:%M:%S")
+ except (ValueError, TypeError, OSError):
+ display_text = raw_value
+ elif isinstance(field_value, float):
+ # 浮点数显示带小数位
+ try:
+ display_text = f"{float(raw_value):.{self._float_decimals}f}"
+ except (ValueError, TypeError):
+ display_text = raw_value
+
+ # 使用 QStyle 绘制文本
+ style = QApplication.style()
+ style.drawItemText(
+ painter,
+ option.rect,
+ Qt.AlignCenter | Qt.AlignVCenter, # type: ignore
+ option.palette,
+ True,
+ display_text,
+ text_role
+ )
+
+ def _get_field_info(self, index: QModelIndex) -> tuple:
+ """获取同行的字段信息和比较符"""
+ model = index.model()
+ row = index.row()
+
+ # 获取字段名
+ field_index = model.index(row, self.COL_FIELD)
+ field_name = model.data(field_index, Qt.EditRole)
+
+ # 获取比较符
+ compare_index = model.index(row, self.COL_COMPARE)
+ compare_text = model.data(compare_index, Qt.EditRole)
+
+ if not field_name:
+ return None, None, None
+
+ try:
+ field_value = HistoryData.get_field_value(field_name)
+ except (KeyError, IndexError):
+ return None, None, None
+
+ compare = None
+ if compare_text:
+ try:
+ compare = CompareSymbol.from_display_name(compare_text)
+ except ValueError:
+ pass
+
+ return field_value, compare, field_name
+
+ def _create_editor_by_type(self, parent, field_value, compare, field_name):
+ """根据字段类型创建编辑器"""
+ from shared_types.enums import BaseDiaPlayEnum
+
+ # 如果是包含/不包含比较符,使用 LineEdit
+ if compare.value in (CompareSymbol.Contains, CompareSymbol.NotContains):
+ return QLineEdit(parent)
+
+ if isinstance(field_value, BaseDiaPlayEnum):
+ editor = QComboBox(parent)
+ editor.addItems([e.display_name for e in field_value.__class__])
+ return editor
+ elif isinstance(field_value, int):
+ return QSpinBox(parent)
+ elif isinstance(field_value, float):
+ editor = QDoubleSpinBox(parent)
+ editor.setDecimals(self._float_decimals)
+ editor.setRange(-1e15, 1e15)
+ return editor
+ elif isinstance(field_value, datetime):
+ editor = QDateTimeEdit(parent)
+ editor.setDisplayFormat("yyyy-MM-dd HH:mm:ss")
+ editor.setCalendarPopup(True)
+ return editor
+ else:
+ return QLineEdit(parent)
+
+ def createEditor(self, parent, option, index):
+ field_value, compare, field_name = self._get_field_info(index)
+ if field_value is None:
+ return QLineEdit(parent)
+ return self._create_editor_by_type(parent, field_value, compare, field_name)
+
+ def setEditorData(self, editor, index):
+ field_value, compare, field_name = self._get_field_info(index)
+ if field_value is None:
+ return
+
+ raw_value = index.model().data(index, Qt.EditRole)
+
+ from shared_types.enums import BaseDiaPlayEnum
+ if isinstance(field_value, BaseDiaPlayEnum) and isinstance(editor, QComboBox):
+ if raw_value:
+ idx = editor.findText(raw_value)
+ if idx >= 0:
+ editor.setCurrentIndex(idx)
+ elif isinstance(field_value, int) and isinstance(editor, QSpinBox):
+ try:
+ editor.setValue(int(raw_value) if raw_value else 0)
+ except (ValueError, TypeError):
+ editor.setValue(0)
+ elif isinstance(field_value, float) and isinstance(editor, QDoubleSpinBox):
+ try:
+ editor.setValue(float(raw_value) if raw_value else 0.0)
+ except (ValueError, TypeError):
+ editor.setValue(0.0)
+ elif isinstance(field_value, datetime) and isinstance(editor, QDateTimeEdit):
+ try:
+ if raw_value:
+ # raw_value 可能是 int/float 时间戳,或字符串形式的时间戳
+ if isinstance(raw_value, (int, float)):
+ dt = datetime.fromtimestamp(raw_value / 1_000_000)
+ else:
+ # 先尝试作为字符串时间戳解析
+ try:
+ ts = int(raw_value)
+ if ts > 1e15: # 微秒
+ ts = ts / 1_000_000
+ elif ts > 1e12: # 毫秒
+ ts = ts / 1_000
+ dt = datetime.fromtimestamp(ts)
+ except (ValueError, TypeError):
+ # 再尝试解析日期时间字符串
+ for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
+ try:
+ dt = datetime.strptime(raw_value, fmt)
+ break
+ except ValueError:
+ continue
+ else:
+ dt = datetime.now()
+ else:
+ dt = datetime.now()
+ editor.setDateTime(dt)
+ except (ValueError, TypeError):
+ editor.setDateTime(datetime.now())
+ else:
+ if isinstance(editor, QLineEdit):
+ editor.setText(raw_value or "")
+
+ def setModelData(self, editor, model, index):
+ field_value, compare, field_name = self._get_field_info(index)
+ if field_value is None:
+ if isinstance(editor, QLineEdit):
+ model.setData(index, editor.text(), Qt.EditRole)
+ return
+
+ from shared_types.enums import BaseDiaPlayEnum
+ if isinstance(field_value, BaseDiaPlayEnum) and isinstance(editor, QComboBox):
+ model.setData(index, editor.currentText(), Qt.EditRole)
+ elif isinstance(field_value, (int, float)) and isinstance(editor, (QSpinBox, QDoubleSpinBox)):
+ model.setData(index, str(editor.value()), Qt.EditRole)
+ elif isinstance(field_value, datetime) and isinstance(editor, QDateTimeEdit):
+ ts = int(editor.dateTime().toPyDateTime().timestamp() * 1_000_000)
+ model.setData(index, str(ts), Qt.EditRole)
+ else:
+ if isinstance(editor, QLineEdit):
+ model.setData(index, editor.text(), Qt.EditRole)
+
+ def updateEditorGeometry(self, editor, option, index):
+ editor.setGeometry(option.rect)
diff --git a/src/plugins/history/filter_dialog.py b/src/plugins/history/filter_dialog.py
new file mode 100644
index 0000000..3e3ac81
--- /dev/null
+++ b/src/plugins/history/filter_dialog.py
@@ -0,0 +1,362 @@
+"""
+过滤条件对话框
+"""
+
+from __future__ import annotations
+
+from datetime import datetime
+from typing import cast
+
+from PyQt5.QtCore import Qt, QCoreApplication, QTimer
+from PyQt5.QtWidgets import (
+ QVBoxLayout,
+ QMenu,
+ QTableView,
+ QMessageBox,
+ QSizePolicy,
+ QHeaderView,
+ QDialog,
+)
+
+from .delegates import ComboBoxDelegate, EditableComboBoxDelegate, FilterValueDelegate
+from .models import HistoryData, LogicSymbol, CompareSymbol
+from .table_views import AutoEditTableView, FilterModel
+
+_translate = QCoreApplication.translate
+
+
+class FilterDialog(QDialog):
+ """过滤条件对话框"""
+
+ def __init__(self, float_decimals: int = 2, parent=None):
+ super().__init__(parent)
+ self.setWindowTitle(_translate("Form", "过滤条件"))
+ self.resize(700, 300)
+ self._float_decimals = float_decimals
+
+ layout = QVBoxLayout(self)
+
+ self.table = AutoEditTableView()
+ self.table.setModel(FilterModel(self))
+ self.table.horizontalHeader().setDefaultAlignment(Qt.AlignCenter)
+ self.table.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
+ self.table.setContextMenuPolicy(Qt.CustomContextMenu)
+ self.table.customContextMenuRequested.connect(self.show_context_menu)
+ self.table.setSelectionBehavior(QTableView.SelectItems)
+ self.table.setSelectionMode(QTableView.ExtendedSelection)
+ self.table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
+ # 设置所有列自动进入编辑
+ self.table.setAutoEditColumns(
+ list(range(FilterModel.COL_LBRACKET, FilterModel.COL_LOGIC + 1)))
+ # 禁用双击编辑
+ self.table.setEditTriggers(QTableView.NoEditTriggers)
+ layout.addWidget(self.table)
+
+ self._setup_delegates()
+ self._connect_field_change_signal()
+
+ def _connect_field_change_signal(self):
+ """当字段列改变时,更新值列的默认值"""
+ self.table.model().dataChanged.connect(self._on_field_changed)
+
+ def _on_field_changed(self, topLeft, bottomRight, roles):
+ """字段列或比较符列改变时触发"""
+ if Qt.EditRole not in roles:
+ return
+ model = self.table.model()
+ for row in range(topLeft.row(), bottomRight.row() + 1):
+ for col in range(topLeft.column(), bottomRight.column() + 1):
+ if col == FilterModel.COL_FIELD:
+ self._update_value_default(row)
+ # 字段或比较符变化时,关闭值列的编辑器,下次打开会使用正确的编辑器类型
+ if col in (FilterModel.COL_FIELD, FilterModel.COL_COMPARE):
+ value_index = model.index(row, FilterModel.COL_VALUE)
+ self._close_value_editor(value_index)
+
+ def _close_value_editor(self, index):
+ """关闭值列的编辑器(针对非持久编辑器)"""
+ # 检查值列是否正在编辑
+ if self.table.state() == QTableView.EditingState and self.table.currentIndex() == index:
+ # 临时阻止自动编辑
+ self.table.suppress_auto_edit = True
+ # 先移到其他列,关闭当前编辑器
+ row = index.row()
+ model = self.table.model()
+ self.table.setCurrentIndex(
+ model.index(row, FilterModel.COL_LBRACKET))
+ # 恢复自动编辑
+ self.table.suppress_auto_edit = False
+ # 延迟移回值列,让编辑器重新创建
+ QTimer.singleShot(
+ 50, lambda idx=index: self.table.setCurrentIndex(idx))
+
+ def _update_value_default(self, row: int):
+ """更新指定行的值列默认值"""
+ model = self.table.model()
+ field_name = model.data(model.index(row, FilterModel.COL_FIELD))
+
+ if not field_name:
+ return
+
+ field_value = HistoryData.get_field_value(field_name)
+ new_default = self._get_default_value(field_value)
+
+ # 获取当前值(使用 EditRole 获取原始数据)
+ current_value = model.data(model.index(
+ row, FilterModel.COL_VALUE), Qt.EditRole)
+
+ # 始终更新为新类型的默认值
+ model.setData(
+ model.index(row, FilterModel.COL_VALUE),
+ new_default,
+ Qt.EditRole
+ )
+
+ def _get_default_value(self, field_value) -> str:
+ """获取字段类型的默认值"""
+ from shared_types.enums import BaseDiaPlayEnum
+
+ if field_value is None:
+ return ""
+ elif isinstance(field_value, BaseDiaPlayEnum):
+ # 枚举类型返回第一个选项
+ return field_value.__class__.display_names()[0]
+ elif isinstance(field_value, datetime):
+ # 日期类型返回当前时间戳(微秒)
+ ts = int(datetime.now().timestamp() * 1_000_000)
+ return str(ts)
+ elif isinstance(field_value, float):
+ return "0.0"
+ elif isinstance(field_value, int):
+ return "0"
+ else:
+ return ""
+
+ def _setup_delegates(self):
+ """设置列代理"""
+ # 左括号
+ self.table.setItemDelegateForColumn(
+ FilterModel.COL_LBRACKET,
+ ComboBoxDelegate(["", "(", "(("], self)
+ )
+ # 字段
+ self.table.setItemDelegateForColumn(
+ FilterModel.COL_FIELD,
+ EditableComboBoxDelegate(HistoryData.fields(), self)
+ )
+ # 比较符
+ self.table.setItemDelegateForColumn(
+ FilterModel.COL_COMPARE,
+ ComboBoxDelegate(CompareSymbol.display_names(), self)
+ )
+ # 值列 - 使用智能代理
+ self.table.setItemDelegateForColumn(
+ FilterModel.COL_VALUE,
+ FilterValueDelegate(self._float_decimals, self)
+ )
+ # 右括号
+ self.table.setItemDelegateForColumn(
+ FilterModel.COL_RBRACKET,
+ ComboBoxDelegate(["", ")", "))"], self)
+ )
+ # 逻辑符
+ self.table.setItemDelegateForColumn(
+ FilterModel.COL_LOGIC,
+ ComboBoxDelegate(LogicSymbol.display_names(), self)
+ )
+
+ def set_float_decimals(self, decimals: int) -> None:
+ """动态设置小数位数"""
+ self._float_decimals = decimals
+ # 更新值列代理
+ self.table.setItemDelegateForColumn(
+ FilterModel.COL_VALUE,
+ FilterValueDelegate(self._float_decimals, self)
+ )
+
+ def show_context_menu(self, pos):
+ menu = QMenu(self)
+ menu.addAction(_translate("Form", "添加"), self.add_row)
+ menu.addAction(
+ _translate("Form", "插入"), lambda: self.insert_row(
+ self.table.currentIndex().row())
+ )
+ menu.addAction(_translate("Form", "删除"), self.del_row)
+ menu.exec_(self.table.mapToGlobal(pos))
+
+ def add_row(self):
+ self.insert_row(self.table.model().rowCount())
+
+ def del_row(self):
+ # 获取所有选中的行
+ selected_rows = set(index.row()
+ for index in self.table.selectionModel().selectedIndexes())
+ if not selected_rows:
+ return
+ # 取消行选中
+ self.table.clearSelection()
+ # 从后往前删除,避免索引变化
+ for row in sorted(selected_rows, reverse=True):
+ self.table.removeRow(row)
+
+ def insert_row(self, row: int):
+ if row < 0:
+ row = 0
+ model = self.table.model()
+ self.table.insertRow(row)
+ # 设置默认值
+ model.setData(model.index(row, FilterModel.COL_FIELD),
+ HistoryData.fields()[0])
+ model.setData(model.index(row, FilterModel.COL_COMPARE),
+ CompareSymbol.display_names()[0])
+ model.setData(model.index(row, FilterModel.COL_LOGIC),
+ LogicSymbol.display_names()[0])
+ # 不需要手动设置值,代理会根据字段类型自动处理
+
+ def gen_filter_str(self):
+ model = cast(FilterModel, self.table.model())
+ filter_str = ""
+ left_count = 0
+ right_count = 0
+ for row in range(model.rowCount()):
+ data = model.get_row_data(row)
+ field_value_type = model.get_field_value_type(row)
+
+ left_bracket = data["left_bracket"] or ""
+ field = data["field"] or ""
+ compare_text = data["compare"] or ""
+ value = data["value"] or ""
+ right_bracket = data["right_bracket"] or ""
+ logic_text = data["logic"] or ""
+
+ if not field or not compare_text:
+ continue
+
+ compare = CompareSymbol.from_display_name(compare_text)
+ logic = LogicSymbol.from_display_name(logic_text).to_sql
+
+ if left_bracket == "(":
+ left_count += 1
+ elif left_bracket == "((":
+ left_count += 2
+ if right_bracket == ")":
+ right_count += 1
+ elif right_bracket == "))":
+ right_count += 2
+
+ if right_count > left_count:
+ QMessageBox.warning(
+ self, "错误", f"第{row}行 右括号数量大于左括号数量,请检查"
+ )
+ return None
+
+ # 处理值
+ from shared_types.enums import BaseDiaPlayEnum
+ if isinstance(field_value_type, BaseDiaPlayEnum) and compare.value not in (CompareSymbol.Contains, CompareSymbol.NotContains):
+ enum_cls = field_value_type.__class__
+ for e in enum_cls:
+ if e.display_name == value:
+ value = str(e.value)
+ break
+ elif compare.value in (CompareSymbol.Contains, CompareSymbol.NotContains):
+ if isinstance(field_value_type, (int, float)):
+ values = value.split(",")
+ for v in values:
+ if not v.replace("-", "").replace(".", "").isdigit():
+ QMessageBox.warning(
+ self, "错误", f"第{row}行 {v} 不是数字"
+ )
+ return None
+ value = ",".join(v for v in values)
+ elif isinstance(field_value_type, datetime):
+ values = value.split(",")
+ parsed_values = []
+ for v in values:
+ v = v.strip()
+ if not v:
+ continue
+ try:
+ # 尝试解析为时间戳(微秒)
+ ts = int(float(v))
+ parsed_values.append(str(ts))
+ except ValueError:
+ # 尝试解析为日期时间字符串
+ try:
+ for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
+ try:
+ dt = datetime.strptime(v, fmt)
+ parsed_values.append(
+ str(int(dt.timestamp() * 1_000_000)))
+ break
+ except ValueError:
+ continue
+ else:
+ raise ValueError(f"无法解析日期: {v}")
+ except ValueError as e:
+ QMessageBox.warning(
+ self, "错误", f"第{row}行 {v} 不是合法的日期时间"
+ )
+ return None
+ value = ",".join(parsed_values) if parsed_values else ""
+ elif isinstance(field_value_type, BaseDiaPlayEnum):
+ enum_cls = field_value_type.__class__
+ values = value.split(",")
+ parsed_values = []
+ for v in values:
+ v = v.strip()
+ if not v:
+ continue
+ for e in enum_cls:
+ if e.display_name == v:
+ parsed_values.append(str(e.value))
+ break
+ else:
+ QMessageBox.warning(
+ self, "错误", f"第{row}行 {v} 不是合法的枚举选项"
+ )
+ return None
+ value = ",".join(parsed_values) if parsed_values else ""
+
+ else:
+ value = ",".join(
+ f"'{v}'" for v in value.split(",") if v.strip())
+ value = f"({value})" if value else "()"
+ elif isinstance(field_value_type, datetime) and value:
+ try:
+ # 可能是时间戳字符串
+ ts = int(float(value))
+ value = str(ts)
+ except ValueError:
+ # 尝试解析日期时间字符串
+ try:
+ for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
+ try:
+ dt = datetime.strptime(value, fmt)
+ value = str(int(dt.timestamp() * 1_000_000))
+ break
+ except ValueError:
+ continue
+ else:
+ QMessageBox.warning(
+ self, "错误", f"第{row}行 {value} 不是合法的日期时间"
+ )
+ return None
+ except ValueError:
+ QMessageBox.warning(
+ self, "错误", f"第{row}行 {value} 不是合法的日期时间"
+ )
+ return None
+ elif value and not value.startswith("'"):
+ value = f"'{value}'"
+
+ is_last = row == model.rowCount() - 1
+ filter_str += (
+ f" {left_bracket} {field} {compare.to_sql} {value} {right_bracket} "
+ )
+ if not is_last:
+ filter_str += logic
+
+ if left_count != right_count:
+ QMessageBox.warning(self, "错误", "左括号数量和右括号数量不匹配,请检查")
+ return None
+ return filter_str
diff --git a/src/plugins/history/history_table.py b/src/plugins/history/history_table.py
new file mode 100644
index 0000000..d1c665e
--- /dev/null
+++ b/src/plugins/history/history_table.py
@@ -0,0 +1,176 @@
+"""
+历史记录表格
+"""
+
+from __future__ import annotations
+
+from ast import List
+import sqlite3
+import subprocess
+import sys
+from pathlib import Path
+
+from PyQt5.QtCore import Qt, QCoreApplication, pyqtSignal
+from PyQt5.QtGui import QCloseEvent as _QCloseEvent
+from PyQt5.QtWidgets import (
+ QWidget,
+ QVBoxLayout,
+ QMenu,
+ QTableView,
+ QMessageBox,
+ QFileDialog,
+ QHeaderView,
+)
+
+from plugin_manager.app_paths import get_executable_dir
+
+from .models import HistoryData
+from .table_model import HistoryTableModel
+
+_translate = QCoreApplication.translate
+
+
+class HistoryTable(QWidget):
+ """历史记录表格"""
+
+ # 信号:列显示配置变化 (show_fields_json)
+ show_fields_changed = pyqtSignal(str)
+
+ HEADERS = [
+ "replay_id",
+ "game_board_state",
+ "rtime",
+ "left",
+ "right",
+ "double",
+ "left_s",
+ "right_s",
+ "double_s",
+ "level",
+ "cl",
+ "cl_s",
+ "ce",
+ "ce_s",
+ "rce",
+ "lce",
+ "dce",
+ "bbbv",
+ "bbbv_solved",
+ "bbbv_s",
+ "flag",
+ "path",
+ "etime",
+ "start_time",
+ "end_time",
+ "mode",
+ "software",
+ "player_identifier",
+ "race_identifier",
+ "uniqueness_identifier",
+ "stnb",
+ "corr",
+ "thrp",
+ "ioe",
+ "is_official",
+ "is_fair",
+ "op",
+ "isl",
+ "pluck",
+ ]
+
+ def __init__(self, show_fields: list[str], db_path: Path, parent=None):
+ super().__init__(parent)
+ self._db_path = db_path
+ layout = QVBoxLayout(self)
+ self.table = QTableView(self)
+ layout.addWidget(self.table)
+ self.setLayout(layout)
+
+ self.table.setEditTriggers(QTableView.NoEditTriggers)
+ self.table.setContextMenuPolicy(Qt.CustomContextMenu)
+ self.table.customContextMenuRequested.connect(self.show_context_menu)
+ self.showFields: list[str] = show_fields
+ self.headers = self.HEADERS
+
+ self.model = HistoryTableModel([], self.headers, self.showFields, self)
+ self.table.setModel(self.model)
+ self.table.horizontalHeader().setDefaultAlignment(Qt.AlignCenter)
+ self.table.setSelectionBehavior(QTableView.SelectRows)
+ self.table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
+
+ def load(self, data: list[HistoryData]):
+ self.model.update_data(data)
+
+ def refresh(self):
+ parent_widget = self.parent()
+ if hasattr(parent_widget, "load_data"):
+ parent_widget.load_data() # type: ignore
+
+ def show_context_menu(self, pos):
+ menu = QMenu(self)
+ menu.addAction(_translate("Form", "播放"), self.play_row)
+ menu.addAction(_translate("Form", "导出"), self.export_row)
+ menu.addAction(_translate("Form", "刷新"), self.refresh)
+ menu.exec_(self.table.mapToGlobal(pos))
+
+ def _get_current_replay_id(self) -> int | None:
+ row_idx = self.table.currentIndex().row()
+ if row_idx < 0:
+ return None
+ visible = self.model._visible_headers
+ if "replay_id" in visible:
+ col = visible.index("replay_id")
+ rid = self.model.data(self.model.index(row_idx, col), Qt.UserRole)
+ return rid # type: ignore
+ return getattr(self.model._data[row_idx], "replay_id", None)
+
+ def _read_raw_data(self, replay_id: int) -> bytes | None:
+ conn = sqlite3.connect(self._db_path)
+ try:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT raw_data FROM history WHERE replay_id = ?", (
+ replay_id,)
+ )
+ row = cursor.fetchone()
+ return row[0] if row else None
+ finally:
+ conn.close()
+
+ def save_evf(self, evf_path: str):
+ replay_id = self._get_current_replay_id()
+ if replay_id is None:
+ return
+ raw_data = self._read_raw_data(replay_id)
+ if raw_data is None:
+ return
+ with open(evf_path, "wb") as f:
+ f.write(raw_data)
+
+ def play_row(self):
+ exec_dir = get_executable_dir()
+ temp_filename = exec_dir / "tmp.evf"
+ self.save_evf(str(temp_filename))
+
+ exe = exec_dir / "metaminesweeper.exe"
+ main_py = exec_dir / "src" / "main.py"
+
+ if main_py.exists():
+ subprocess.Popen(
+ [sys.executable, str(main_py), str(temp_filename)])
+ elif exe.exists():
+ subprocess.Popen([str(exe), str(temp_filename)])
+ else:
+ QMessageBox.warning(
+ self, "错误", "找不到主程序 (main.py 或 metaminesweeper.exe)"
+ )
+
+ def export_row(self):
+ file_path, _ = QFileDialog.getSaveFileName(
+ self,
+ _translate("Form", "导出evf文件"),
+ str(get_executable_dir()),
+ "evf文件 (*.evf)",
+ )
+ if file_path:
+ self.save_evf(file_path)
diff --git a/src/plugins/history/main_widget.py b/src/plugins/history/main_widget.py
new file mode 100644
index 0000000..83ff100
--- /dev/null
+++ b/src/plugins/history/main_widget.py
@@ -0,0 +1,383 @@
+"""
+历史记录插件主界面
+"""
+
+from __future__ import annotations
+
+import json
+import math
+import sqlite3
+from datetime import datetime
+from pathlib import Path
+from typing import cast
+
+from PyQt5.QtCore import Qt, QCoreApplication, pyqtSignal
+from PyQt5.QtGui import QCloseEvent as _QCloseEvent
+from PyQt5.QtWidgets import (
+ QWidget,
+ QVBoxLayout,
+ QHBoxLayout,
+ QPushButton,
+ QSpacerItem,
+ QSizePolicy,
+ QLabel,
+ QComboBox,
+ QSpinBox,
+ QMessageBox,
+)
+
+from .columns_dialog import ColumnsDialog
+from .filter_dialog import FilterDialog
+from .history_table import HistoryTable
+from .models import HistoryData, CompareSymbol, LogicSymbol
+from .table_views import SortModel
+from .sort_dialog import SortDialog
+from .table_views import FilterModel
+
+_translate = QCoreApplication.translate
+
+
+class HistoryMainWidget(QWidget):
+ """历史记录插件的主界面(作为插件的 widget 返回)"""
+
+ # 信号:排序和过滤状态变化 (filter_json, sort_json)
+ filter_sort_state_changed = pyqtSignal(str, str)
+ # 信号:列显示配置变化 (show_fields_json)
+ show_fields_changed = pyqtSignal(str)
+
+ def __init__(
+ self,
+ db_path: Path,
+ config_path: Path,
+ float_decimals: int = 2,
+ page_size: str = "50",
+ parent=None,
+ ):
+ super().__init__(parent)
+ self._db_path = db_path
+ self._config_path = config_path
+
+ self.setWindowTitle(_translate("Form", "历史记录"))
+ self.resize(800, 600)
+
+ layout = QVBoxLayout(self)
+
+ # 查询按钮 + 过滤/排序按钮
+ btn_layout = QHBoxLayout()
+ self.query_button = QPushButton(_translate("Form", "查询"))
+ self.filter_button = QPushButton(_translate("Form", "过滤"))
+ self.sort_button = QPushButton(_translate("Form", "排序"))
+ self.columns_button = QPushButton(_translate("Form", "列设置"))
+ btn_layout.addWidget(self.query_button)
+ btn_layout.addWidget(self.filter_button)
+ btn_layout.addWidget(self.sort_button)
+ btn_layout.addWidget(self.columns_button)
+ btn_layout.addItem(
+ QSpacerItem(10, 10, QSizePolicy.Expanding, QSizePolicy.Minimum)
+ )
+
+ # 过滤和排序对话框
+ self.filter_dialog = FilterDialog(float_decimals, self)
+ self.sort_dialog = SortDialog(self)
+
+ # 当前过滤/排序条件显示
+ self.filter_label = QLabel("")
+ self.filter_label.setWordWrap(True)
+ self.sort_label = QLabel("")
+ self.sort_label.setWordWrap(True)
+
+ # 表格
+ self.table = HistoryTable(self._get_show_fields(), db_path, self)
+
+ # 列设置对话框(需要在 table 创建之后)
+ self.columns_dialog = ColumnsDialog(
+ HistoryTable.HEADERS, self.table.showFields, self)
+
+ # 分页
+ limit_layout = QHBoxLayout()
+ self.previous_button = QPushButton(_translate("Form", "上一页"))
+ self.page_spin = QSpinBox()
+ self.page_spin.setMinimum(1)
+ self.page_spin.setValue(1)
+ self.next_button = QPushButton(_translate("Form", "下一页"))
+ self.one_page_combo = QComboBox()
+ self.one_page_combo.addItems(
+ ["10", "20", "50", "100", "200", "500", "1000"])
+ # 设置默认每页条数
+ idx = self.one_page_combo.findText(page_size)
+ if idx >= 0:
+ self.one_page_combo.setCurrentIndex(idx)
+
+ self.limit_label = QLabel("")
+ limit_layout.addItem(
+ QSpacerItem(10, 10, QSizePolicy.Expanding, QSizePolicy.Minimum)
+ )
+ limit_layout.addWidget(self.limit_label)
+ limit_layout.addWidget(self.previous_button)
+ limit_layout.addWidget(self.page_spin)
+ limit_layout.addWidget(self.next_button)
+ limit_layout.addWidget(self.one_page_combo)
+
+ layout.addLayout(btn_layout)
+ layout.addWidget(self.filter_label)
+ layout.addWidget(self.sort_label)
+ layout.addWidget(self.table)
+ layout.addLayout(limit_layout)
+ self.setLayout(layout)
+
+ self._connect_signals()
+ self.load_data()
+
+ def set_filter_sort_state(self, filter_json: str, sort_json: str) -> None:
+ """设置排序和过滤状态(由插件调用)"""
+ try:
+ filter_rows = json.loads(filter_json)
+ if filter_rows:
+ self._set_filter_rows(filter_rows)
+ except (json.JSONDecodeError, TypeError):
+ pass
+
+ try:
+ sort_rows = json.loads(sort_json)
+ if sort_rows:
+ self._set_sort_rows(sort_rows)
+ except (json.JSONDecodeError, TypeError):
+ pass
+
+ # 恢复后触发一次查询
+ self._on_query()
+
+ def _connect_signals(self):
+ self.query_button.clicked.connect(self._on_query)
+ self.filter_button.clicked.connect(self.filter_dialog.show)
+ self.sort_button.clicked.connect(self.sort_dialog.show)
+ self.columns_button.clicked.connect(self.columns_dialog.show)
+ self.filter_dialog.finished.connect(lambda: self._on_query())
+ self.sort_dialog.finished.connect(lambda: self._on_query())
+ self.columns_dialog.finished.connect(self._on_columns_changed)
+ self.previous_button.clicked.connect(
+ lambda: self.page_spin.setValue(self.page_spin.value() - 1)
+ )
+ self.next_button.clicked.connect(
+ lambda: self.page_spin.setValue(self.page_spin.value() + 1)
+ )
+ self.one_page_combo.currentTextChanged.connect(self.load_data)
+ self.page_spin.valueChanged.connect(self.load_data)
+ self.table.show_fields_changed.connect(self.show_fields_changed)
+
+ def _on_query(self):
+ if self.page_spin.value() > 1:
+ self.page_spin.setValue(1)
+ else:
+ self.load_data()
+
+ def _get_limit_str(self):
+ per_page = int(self.one_page_combo.currentText())
+ offset = (self.page_spin.value() - 1) * per_page
+ return f" LIMIT {per_page} OFFSET {offset}"
+
+ def _get_show_fields(self) -> list[str]:
+ if not self._config_path.exists():
+ return list(HistoryData.fields())
+ with open(self._config_path, "r") as f:
+ return list(json.load(f))
+
+ def _on_columns_changed(self):
+ """列设置对话框关闭后应用更改"""
+ new_fields = self.columns_dialog.get_show_fields()
+ self.table.showFields = new_fields
+ self.table.model.update_show_fields(new_fields)
+ self.show_fields_changed.emit(json.dumps(
+ list(new_fields), ensure_ascii=False))
+
+ def load_data(self):
+ if not self._db_path.exists():
+ QMessageBox.warning(self, "错误", "历史记录数据库不存在")
+ return
+
+ try:
+ conn = sqlite3.connect(self._db_path)
+ conn.row_factory = sqlite3.Row
+ cursor = conn.cursor()
+ filter_str = self.filter_dialog.gen_filter_str()
+ order_str = self.sort_dialog.gen_order_str()
+ sql = "SELECT *, COUNT(*) OVER() AS total_count FROM history"
+ if filter_str:
+ sql += " WHERE " + filter_str
+ elif filter_str is None:
+ return
+ sql += order_str
+ sql += self._get_limit_str()
+ cursor.execute(sql)
+ datas = cursor.fetchall()
+
+ if not datas:
+ self.page_spin.setMaximum(1)
+ self.limit_label.setText("共0行,0页")
+ else:
+ per_page = int(self.one_page_combo.currentText())
+ total = datas[0]["total_count"]
+ max_page = math.ceil(total / per_page)
+ self.page_spin.setMaximum(max_page)
+ self.limit_label.setText(f"共{total}行,{max_page}页")
+
+ history_data = [HistoryData.from_dict(dict(d)) for d in datas]
+ conn.close()
+ except sqlite3.Error as e:
+ QMessageBox.warning(self, "错误", f"加载历史记录失败: {e}")
+ return
+
+ self.table.load(history_data)
+
+ # 保存当前的排序和过滤状态
+ self._save_filter_sort_state(filter_str, order_str)
+
+ def _format_filter_display(self, filter_rows: list[dict]) -> str:
+ """将过滤行数据格式化为易读字符串"""
+ if not filter_rows:
+ return ""
+ parts = []
+ for row_data in filter_rows:
+ left_bracket = row_data.get("left_bracket") or ""
+ field = row_data.get("field") or ""
+ compare_text = row_data.get("compare") or ""
+ value = row_data.get("value") or ""
+ right_bracket = row_data.get("right_bracket") or ""
+ logic_text = row_data.get("logic") or ""
+
+ if not field or not compare_text:
+ continue
+
+ # 格式化值:日期时间戳转为可读格式
+ try:
+ field_value = HistoryData.get_field_value(field)
+ if isinstance(field_value, datetime) and value:
+ try:
+ ts = int(float(value))
+ if ts > 1e15:
+ ts = ts // 1_000_000
+ elif ts > 1e12:
+ ts = ts // 1_000
+ value = datetime.fromtimestamp(
+ ts).strftime("%Y-%m-%d %H:%M:%S")
+ except (ValueError, TypeError, OSError):
+ pass
+ except (KeyError, IndexError):
+ pass
+
+ part = f"{left_bracket}{field} {compare_text} {value}{right_bracket}"
+ parts.append((part, logic_text))
+
+ if not parts:
+ return ""
+
+ result = ""
+ for i, (part, logic) in enumerate(parts):
+ result += part
+ if i < len(parts) - 1 and logic:
+ result += f" {logic} "
+ return result
+
+ def _format_sort_display(self, sort_rows: list[dict]) -> str:
+ """将排序行数据格式化为易读字符串"""
+ if not sort_rows:
+ return ""
+ parts = []
+ for row_data in sort_rows:
+ field = row_data.get("field") or ""
+ order = row_data.get("order") or ""
+ if not field:
+ continue
+ parts.append(f"{field} {order}")
+ return ", ".join(parts)
+
+ def _get_filter_rows(self) -> list[dict]:
+ """获取过滤表格的所有行数据"""
+ model = cast(FilterModel, self.filter_dialog.table.model())
+ rows = []
+ for row in range(model.rowCount()):
+ rows.append(model.get_row_data(row))
+ return rows
+
+ def _get_sort_rows(self) -> list[dict]:
+ """获取排序表格的所有行数据"""
+ model = cast(SortModel, self.sort_dialog.sort_table.model())
+ rows = []
+ for row in range(model.rowCount()):
+ rows.append(model.get_row_data(row))
+ return rows
+
+ def _set_filter_rows(self, rows: list[dict]) -> None:
+ """恢复过滤表格的行数据"""
+ model = self.filter_dialog.table.model()
+ model.removeRows(0, model.rowCount())
+ for row_data in rows:
+ row = model.rowCount()
+ model.insertRow(row)
+ model.setData(model.index(row, FilterModel.COL_LBRACKET),
+ row_data.get("left_bracket"))
+ model.setData(model.index(row, FilterModel.COL_FIELD),
+ row_data.get("field"))
+ model.setData(model.index(row, FilterModel.COL_COMPARE),
+ row_data.get("compare"))
+ model.setData(model.index(row, FilterModel.COL_VALUE),
+ row_data.get("value"), Qt.EditRole)
+ model.setData(model.index(row, FilterModel.COL_RBRACKET),
+ row_data.get("right_bracket"))
+ model.setData(model.index(row, FilterModel.COL_LOGIC),
+ row_data.get("logic"))
+
+ def _set_sort_rows(self, rows: list[dict]) -> None:
+ """恢复排序表格的行数据"""
+ model = self.sort_dialog.sort_table.model()
+ model.removeRows(0, model.rowCount())
+ for row_data in rows:
+ row = model.rowCount()
+ model.insertRow(row)
+ model.setData(model.index(row, SortModel.COL_FIELD),
+ row_data.get("field"))
+ model.setData(model.index(row, SortModel.COL_ORDER),
+ row_data.get("order"))
+
+ def _save_filter_sort_state(self, filter_str: str = "", order_str: str = "") -> None:
+ """发射排序和过滤状态变化信号"""
+ filter_rows = self._get_filter_rows()
+ sort_rows = self._get_sort_rows()
+ self.filter_sort_state_changed.emit(
+ json.dumps(filter_rows, ensure_ascii=False), json.dumps(sort_rows, ensure_ascii=False))
+
+ # 更新过滤条件标签(易读格式)
+ filter_display = self._format_filter_display(filter_rows)
+ if filter_display:
+ self.filter_label.setText(f"过滤: {filter_display}")
+ else:
+ self.filter_label.setText("过滤: 无")
+
+ # 更新排序条件标签(易读格式)
+ sort_display = self._format_sort_display(sort_rows)
+ if sort_display:
+ self.sort_label.setText(f"排序: {sort_display}")
+ else:
+ self.sort_label.setText("排序: 无")
+
+ def closeEvent(self, event: _QCloseEvent):
+ """关闭事件"""
+ super().closeEvent(event)
+
+ def set_float_decimals(self, decimals: int) -> None:
+ """动态设置小数位数"""
+ self.filter_dialog.set_float_decimals(decimals)
+
+ def restore_show_fields(self, show_fields_json: str) -> None:
+ """恢复列显示配置"""
+ try:
+ fields = json.loads(show_fields_json)
+ if not fields:
+ fields = self.table.HEADERS
+ self.table.showFields = list(fields)
+ self.table.model.update_show_fields(self.table.showFields)
+ # 同步更新列设置对话框的勾选状态
+ for field, cb in self.columns_dialog.checks.items():
+ cb.setChecked(field in self.table.showFields)
+ except (json.JSONDecodeError, TypeError):
+ pass
diff --git a/src/plugins/history/sort_dialog.py b/src/plugins/history/sort_dialog.py
new file mode 100644
index 0000000..5b7d8b1
--- /dev/null
+++ b/src/plugins/history/sort_dialog.py
@@ -0,0 +1,131 @@
+"""
+排序条件对话框
+"""
+
+from __future__ import annotations
+
+from PyQt5.QtCore import Qt, QCoreApplication
+from PyQt5.QtWidgets import (
+ QVBoxLayout,
+ QMenu,
+ QTableView,
+ QSizePolicy,
+ QHeaderView,
+ QDialog,
+)
+
+from .delegates import ComboBoxDelegate, EditableComboBoxDelegate
+from .models import HistoryData
+from .table_views import AutoEditTableView, SortModel
+
+_translate = QCoreApplication.translate
+
+
+class SortDialog(QDialog):
+ """排序条件对话框"""
+
+ def __init__(self, parent=None):
+ super().__init__(parent)
+ self.setWindowTitle(_translate("Form", "排序条件"))
+ self.resize(400, 300)
+
+ layout = QVBoxLayout(self)
+
+ self.sort_table = AutoEditTableView()
+ self.sort_table.setModel(SortModel(self))
+ self.sort_table.horizontalHeader().setDefaultAlignment(Qt.AlignCenter)
+ self.sort_table.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
+ self.sort_table.setContextMenuPolicy(Qt.CustomContextMenu)
+ self.sort_table.customContextMenuRequested.connect(
+ self.show_sort_context_menu)
+ self.sort_table.setSelectionBehavior(QTableView.SelectItems)
+ self.sort_table.setSelectionMode(QTableView.ExtendedSelection)
+ self.sort_table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
+ # 设置所有列自动进入编辑
+ self.sort_table.setAutoEditColumns(
+ [SortModel.COL_FIELD, SortModel.COL_ORDER])
+ # 禁用双击编辑
+ self.sort_table.setEditTriggers(QTableView.NoEditTriggers)
+ layout.addWidget(self.sort_table)
+
+ self._setup_delegates()
+
+ def _setup_delegates(self):
+ """设置列代理"""
+ self.sort_table.setItemDelegateForColumn(
+ SortModel.COL_FIELD,
+ EditableComboBoxDelegate(HistoryData.fields(), self)
+ )
+ self.sort_table.setItemDelegateForColumn(
+ SortModel.COL_ORDER,
+ ComboBoxDelegate(
+ [_translate("Form", "升序"), _translate("Form", "降序")], self
+ )
+ )
+
+ def show_sort_context_menu(self, pos):
+ """排序列表右键菜单"""
+ menu = QMenu(self)
+ menu.addAction(_translate("Form", "添加"), self._add_sort_row)
+ menu.addAction(_translate("Form", "插入"), self._insert_sort_row)
+ menu.addAction(_translate("Form", "删除"), self._del_sort_row)
+ menu.exec_(self.sort_table.mapToGlobal(pos))
+
+ def _add_sort_row(self):
+ """添加排序行"""
+ model = self.sort_table.model()
+ row = model.rowCount()
+ model.insertRow(row)
+ model.setData(model.index(row, SortModel.COL_FIELD),
+ HistoryData.fields()[0])
+ model.setData(model.index(row, SortModel.COL_ORDER),
+ _translate("Form", "升序"))
+
+ def _add_sort_row_at(self, row: int, field: str | None = None, order: str | None = None):
+ """在指定位置添加排序行"""
+ model = self.sort_table.model()
+ if field is None:
+ field = HistoryData.fields()[0]
+ if order is None:
+ order = _translate("Form", "升序")
+
+ self.sort_table.insertRow(row)
+ model.setData(model.index(row, SortModel.COL_FIELD), field)
+ model.setData(model.index(row, SortModel.COL_ORDER), order)
+
+ def _insert_sort_row(self):
+ """在当前行前插入排序行"""
+ current_row = self.sort_table.currentIndex().row()
+ self._add_sort_row_at(current_row if current_row >= 0 else 0)
+
+ def _del_sort_row(self):
+ """删除选中的排序行"""
+ selected_rows = set(
+ index.row() for index in self.sort_table.selectionModel().selectedIndexes())
+ if not selected_rows:
+ return
+ for row in sorted(selected_rows, reverse=True):
+ self.sort_table.removeRow(row)
+
+ def _del_sort_row_at(self, row: int):
+ """删除指定行的排序"""
+ if row >= 0:
+ self.sort_table.removeRow(row)
+
+ def gen_order_str(self) -> str:
+ """生成排序 SQL 语句"""
+ model = self.sort_table.model()
+ if model.rowCount() == 0:
+ return ""
+
+ orders = []
+ for row in range(model.rowCount()):
+ field = model.data(model.index(row, SortModel.COL_FIELD))
+ order_text = model.data(model.index(row, SortModel.COL_ORDER))
+ order_sql = "ASC" if order_text == _translate(
+ "Form", "升序") else "DESC"
+ orders.append(f"{field} {order_sql}")
+
+ if orders:
+ return " ORDER BY " + ", ".join(orders)
+ return ""
diff --git a/src/plugins/history/table_model.py b/src/plugins/history/table_model.py
index 2ac2190..1ae9d1c 100644
--- a/src/plugins/history/table_model.py
+++ b/src/plugins/history/table_model.py
@@ -17,7 +17,7 @@ def __init__(
self,
data: list[HistoryData],
headers: list[str],
- show_fields: set[str],
+ show_fields: list[str],
parent=None,
):
super().__init__(parent)
@@ -73,7 +73,7 @@ def update_data(self, data: list[HistoryData]):
self._data = data
self.endResetModel()
- def update_show_fields(self, show_fields: set[str]):
+ def update_show_fields(self, show_fields: list[str]):
self.beginResetModel()
self._show_fields = show_fields
self._visible_headers = [h for h in self._headers if h in show_fields]
diff --git a/src/plugins/history/table_views.py b/src/plugins/history/table_views.py
new file mode 100644
index 0000000..ee140a6
--- /dev/null
+++ b/src/plugins/history/table_views.py
@@ -0,0 +1,209 @@
+"""
+自定义表格视图和模型
+"""
+
+from __future__ import annotations
+
+from datetime import datetime
+
+from PyQt5.QtCore import Qt, QModelIndex, QTimer
+from PyQt5.QtGui import QStandardItemModel
+from PyQt5.QtWidgets import QTableView
+
+from .models import HistoryData
+
+
+class AutoEditTableView(QTableView):
+ """自动进入编辑状态的 TableView"""
+
+ def __init__(self, parent=None):
+ super().__init__(parent)
+ self._auto_edit_columns = set() # 需要自动编辑的列
+ self.suppress_auto_edit = False # 标记是否阻止自动编辑
+ self.clicked.connect(self._on_click_auto_edit)
+ # 垂直表头点击选中整行
+ self.verticalHeader().sectionClicked.connect(self._on_header_select_row)
+ self.verticalHeader().setMaximumWidth(24)
+ self.verticalHeader().setMinimumWidth(24)
+
+ def mousePressEvent(self, event):
+ """鼠标按下事件:右键点击时阻止自动编辑"""
+ # 右键点击,阻止自动编辑
+ if event.button() == Qt.RightButton:
+ self.suppress_auto_edit = True
+ super().mousePressEvent(event)
+ self.reset_suppress_flag()
+
+ def reset_suppress_flag(self):
+ """重置阻止自动编辑标志"""
+ self.suppress_auto_edit = False
+
+ def setAutoEditColumn(self, column: int):
+ """设置指定列自动进入编辑"""
+ self._auto_edit_columns.add(column)
+
+ def setAutoEditColumns(self, columns: list[int]):
+ """设置多个列自动进入编辑"""
+ self._auto_edit_columns.update(columns)
+
+ def _on_header_select_row(self, section: int):
+ """通过垂直表头点击选中整行时,清除单元格焦点"""
+ self.selectRow(section)
+ # 清除单元格焦点,让焦点回到 TableView 本身
+ self.setFocus()
+
+ def _on_click_auto_edit(self, index: QModelIndex):
+ """鼠标点击时自动进入编辑"""
+ # 如果 index 无效,不做任何操作
+ if not index.isValid():
+ return
+
+ # 右键点击后不自动编辑
+ if self.suppress_auto_edit:
+ return
+
+ # 如果模型没有行,不自动编辑
+ model = self.model()
+ if model is None or model.rowCount() == 0:
+ return
+
+ if index.column() in self._auto_edit_columns:
+ if self.state() != QTableView.EditingState:
+ self.edit(index)
+ return
+
+ # 如果不是自动编辑列,不进入编辑
+ self.selectionModel().setCurrentIndex(index, self.selectionModel().NoUpdate)
+
+ def currentChanged(self, current: QModelIndex, previous: QModelIndex):
+ """当焦点单元格改变时,自动进入编辑状态(表头选择整行时不触发)"""
+ super().currentChanged(current, previous)
+
+ # 如果没有有效的单元格,不自动进入编辑
+ if not current.isValid():
+ return
+
+ # 右键点击或表头选择整行,不自动进入编辑
+ if self.suppress_auto_edit:
+ return
+
+ if current.column() in self._auto_edit_columns:
+ QTimer.singleShot(0, lambda: self._try_edit(current))
+
+ def _try_edit(self, index: QModelIndex):
+ """尝试进入编辑状态"""
+ if self.state() != QTableView.EditingState:
+ self.edit(index)
+
+ def addRow(self):
+ """添加一行"""
+ self.suppress_auto_edit = True
+ model = self.model()
+ if model is None:
+ return
+
+ model.insertRow(model.rowCount())
+ self.suppress_auto_edit = False
+
+ def insertRow(self, row: int):
+ """插入一行"""
+ self.suppress_auto_edit = True
+ model = self.model()
+ if model is None:
+ return
+
+ model.insertRow(row)
+ self.suppress_auto_edit = False
+
+ def removeRow(self, row: int):
+ """删除一行"""
+ self.suppress_auto_edit = True
+ model = self.model()
+ if model is None:
+ return
+
+ model.removeRow(row)
+ self.suppress_auto_edit = False
+
+
+class FilterModel(QStandardItemModel):
+ """过滤表格模型"""
+
+ COL_LBRACKET, COL_FIELD, COL_COMPARE, COL_VALUE, COL_RBRACKET, COL_LOGIC = range(
+ 6)
+
+ def __init__(self, parent=None):
+ super().__init__(0, 6, parent)
+ self.setHorizontalHeaderLabels(
+ ["左括号", "字段", "比较符", "值", "右括号", "逻辑符"]
+ )
+
+ def data(self, index, role=Qt.DisplayRole):
+ """重写 data 方法,格式化某些列的显示"""
+ if not index.isValid():
+ return None
+
+ if role == Qt.DisplayRole:
+ column = index.column()
+ raw_data = super().data(index, Qt.EditRole)
+
+ # 值列需要格式化显示
+ if column == self.COL_VALUE:
+ if not raw_data:
+ return ""
+
+ # 获取同行字段名来确定类型
+ field_index = self.index(index.row(), self.COL_FIELD)
+ field_name = super().data(field_index, Qt.EditRole)
+
+ if field_name:
+ field_value = HistoryData.get_field_value(field_name)
+ if isinstance(field_value, datetime):
+ # 尝试解析时间戳并格式化为可读格式
+ try:
+ ts = int(raw_data)
+ if ts > 1e15: # 微秒
+ ts = ts / 1_000_000
+ elif ts > 1e12: # 毫秒
+ ts = ts / 1_000
+ dt = datetime.fromtimestamp(ts)
+ return dt.strftime("%Y-%m-%d %H:%M:%S")
+ except (ValueError, TypeError, OSError):
+ return raw_data
+
+ return raw_data
+
+ return super().data(index, role)
+
+ def get_row_data(self, row: int) -> dict:
+ """获取指定行的所有数据"""
+ return {
+ "left_bracket": self.data(self.index(row, self.COL_LBRACKET)),
+ "field": self.data(self.index(row, self.COL_FIELD)),
+ "compare": self.data(self.index(row, self.COL_COMPARE)),
+ "value": self.data(self.index(row, self.COL_VALUE)),
+ "right_bracket": self.data(self.index(row, self.COL_RBRACKET)),
+ "logic": self.data(self.index(row, self.COL_LOGIC)),
+ }
+
+ def get_field_value_type(self, row: int):
+ """获取指定行字段的原始值类型"""
+ field_name = str(self.data(self.index(row, self.COL_FIELD)))
+ return HistoryData.get_field_value(field_name)
+
+
+class SortModel(QStandardItemModel):
+ """排序表格模型"""
+
+ COL_FIELD, COL_ORDER = range(2)
+
+ def __init__(self, parent=None):
+ super().__init__(0, 2, parent)
+ self.setHorizontalHeaderLabels(["排序字段", "升序/降序"])
+
+ def get_row_data(self, row: int) -> dict:
+ """获取指定行的所有数据"""
+ return {
+ "field": self.data(self.index(row, self.COL_FIELD)),
+ "order": self.data(self.index(row, self.COL_ORDER)),
+ }
diff --git a/src/plugins/history/widgets.py b/src/plugins/history/widgets.py
index f3fd771..3fa4570 100644
--- a/src/plugins/history/widgets.py
+++ b/src/plugins/history/widgets.py
@@ -1,1316 +1,25 @@
"""
-UI 组件
+UI 组件(兼容性转发,实际类已拆分到各子模块)
"""
-from __future__ import annotations
-
-import json
-import math
-import sqlite3
-import subprocess
-import sys
-from datetime import datetime
-from pathlib import Path
-from typing import Any, cast
-
-from PyQt5.QtCore import QEvent, Qt, QCoreApplication, QModelIndex, QTimer, pyqtSignal
-from PyQt5.QtGui import QCloseEvent as _QCloseEvent, QStandardItemModel, QStandardItem, QPalette
-from PyQt5.QtWidgets import (
- QAbstractItemView,
- QWidget,
- QVBoxLayout,
- QMenu,
- QAction,
- QTableView,
- QMessageBox,
- QFileDialog,
- QComboBox,
- QLineEdit,
- QSpinBox,
- QDoubleSpinBox,
- QDateTimeEdit,
- QHBoxLayout,
- QPushButton,
- QSpacerItem,
- QSplitter,
- QSizePolicy,
- QLabel,
- QHeaderView,
- QStyledItemDelegate,
- QStyle,
- QApplication,
-)
-
-from plugin_manager.app_paths import get_executable_dir
-
-from shared_types.widgets import EditableComboBox
-
-from .models import HistoryData, LogicSymbol, CompareSymbol
-from .table_model import HistoryTableModel
-
-_translate = QCoreApplication.translate
-
-
-class ComboBoxDelegate(QStyledItemDelegate):
- """通用的 ComboBox 代理"""
-
- def __init__(self, items: list[str], parent=None):
- super().__init__(parent)
- self._items = items
-
- def createEditor(self, parent, option, index):
- editor = QComboBox(parent)
- editor.addItems(self._items)
- return editor
-
- def setEditorData(self, editor: QComboBox, index):
- value = index.model().data(index, Qt.EditRole)
- if value:
- idx = editor.findText(value)
- if idx >= 0:
- editor.setCurrentIndex(idx)
-
- def setModelData(self, editor: QComboBox, model, index):
- model.setData(index, editor.currentText(), Qt.EditRole)
-
- def updateEditorGeometry(self, editor, option, index):
- editor.setGeometry(option.rect)
-
-
-class EditableComboBoxDelegate(QStyledItemDelegate):
- """可编辑的 ComboBox 代理(带补全)"""
-
- def __init__(self, items: list[str], parent=None):
- super().__init__(parent)
- self._items = items
-
- def createEditor(self, parent, option, index):
- editor = EditableComboBox(self._items, parent)
- return editor
-
- def setEditorData(self, editor: EditableComboBox, index):
- value = index.model().data(index, Qt.EditRole)
- if value:
- editor.setCurrentText(value)
-
- def setModelData(self, editor: EditableComboBox, model, index):
- model.setData(index, editor.currentText(), Qt.EditRole)
-
- def updateEditorGeometry(self, editor, option, index):
- editor.setGeometry(option.rect)
-
-
-class FilterValueDelegate(QStyledItemDelegate):
- """值列的智能代理,根据同行字段类型动态决定编辑器"""
-
- COL_FIELD = 1 # FilterModel.COL_FIELD
- COL_COMPARE = 2 # FilterModel.COL_COMPARE
-
- def __init__(self, float_decimals: int = 2, parent=None):
- super().__init__(parent)
- self._float_decimals = float_decimals
- self._editor_widgets = [] # 缓存创建的编辑器widget
-
- def paint(self, painter, option, index):
- """根据字段类型绘制单元格"""
- # 检查选中状态
- is_selected = option.state & QStyle.State_Selected
-
- if is_selected: # type: ignore
- # 选中时绘制背景
- painter.fillRect(option.rect, option.palette.highlight())
- # 使用高亮文本颜色
- text_role = QPalette.HighlightedText
- else:
- text_role = QPalette.WindowText
-
- field_value, _, _ = self._get_field_info(index)
- raw_value = index.data(Qt.EditRole)
-
- if field_value is None or raw_value is None:
- super().paint(painter, option, index)
- return
-
- display_text = str(raw_value)
-
- if isinstance(field_value, datetime):
- # 日期类型显示为可读格式
- try:
- ts = int(raw_value)
- if ts > 1e15: # 微秒
- ts = ts / 1_000_000
- elif ts > 1e12: # 毫秒
- ts = ts / 1_000
- dt = datetime.fromtimestamp(ts)
- display_text = dt.strftime("%Y-%m-%d %H:%M:%S")
- except (ValueError, TypeError, OSError):
- display_text = raw_value
- elif isinstance(field_value, float):
- # 浮点数显示带小数位
- try:
- display_text = f"{float(raw_value):.{self._float_decimals}f}"
- except (ValueError, TypeError):
- display_text = raw_value
-
- # 使用 QStyle 绘制文本
- style = QApplication.style()
- style.drawItemText(
- painter,
- option.rect,
- Qt.AlignCenter | Qt.AlignVCenter, # type: ignore
- option.palette,
- True,
- display_text,
- text_role
- )
-
- def _get_field_info(self, index: QModelIndex) -> tuple:
- """获取同行的字段信息和比较符"""
- model = index.model()
- row = index.row()
-
- # 获取字段名
- field_index = model.index(row, self.COL_FIELD)
- field_name = model.data(field_index, Qt.EditRole)
-
- # 获取比较符
- compare_index = model.index(row, self.COL_COMPARE)
- compare_text = model.data(compare_index, Qt.EditRole)
-
- if not field_name:
- return None, None, None
-
- try:
- field_value = HistoryData.get_field_value(field_name)
- except (KeyError, IndexError):
- return None, None, None
-
- compare = None
- if compare_text:
- try:
- compare = CompareSymbol.from_display_name(compare_text)
- except ValueError:
- pass
-
- return field_value, compare, field_name
-
- def _create_editor_by_type(self, parent, field_value, compare, field_name):
- """根据字段类型创建编辑器"""
- from shared_types.enums import BaseDiaPlayEnum
-
- # 如果是包含/不包含比较符,使用 LineEdit
- if compare and compare in (CompareSymbol.Contains, CompareSymbol.NotContains):
- return QLineEdit(parent)
-
- if isinstance(field_value, BaseDiaPlayEnum):
- editor = QComboBox(parent)
- editor.addItems([e.display_name for e in field_value.__class__])
- return editor
- elif isinstance(field_value, int):
- return QSpinBox(parent)
- elif isinstance(field_value, float):
- editor = QDoubleSpinBox(parent)
- editor.setDecimals(self._float_decimals)
- editor.setRange(-1e15, 1e15)
- return editor
- elif isinstance(field_value, datetime):
- editor = QDateTimeEdit(parent)
- editor.setDisplayFormat("yyyy-MM-dd HH:mm:ss")
- editor.setCalendarPopup(True)
- return editor
- else:
- return QLineEdit(parent)
-
- def createEditor(self, parent, option, index):
- field_value, compare, field_name = self._get_field_info(index)
- if field_value is None:
- return QLineEdit(parent)
- return self._create_editor_by_type(parent, field_value, compare, field_name)
-
- def setEditorData(self, editor, index):
- field_value, compare, field_name = self._get_field_info(index)
- if field_value is None:
- return
-
- raw_value = index.model().data(index, Qt.EditRole)
-
- from shared_types.enums import BaseDiaPlayEnum
- if isinstance(field_value, BaseDiaPlayEnum) and isinstance(editor, QComboBox):
- if raw_value:
- idx = editor.findText(raw_value)
- if idx >= 0:
- editor.setCurrentIndex(idx)
- elif isinstance(field_value, int) and isinstance(editor, QSpinBox):
- try:
- editor.setValue(int(raw_value) if raw_value else 0)
- except (ValueError, TypeError):
- editor.setValue(0)
- elif isinstance(field_value, float) and isinstance(editor, QDoubleSpinBox):
- try:
- editor.setValue(float(raw_value) if raw_value else 0.0)
- except (ValueError, TypeError):
- editor.setValue(0.0)
- elif isinstance(field_value, datetime) and isinstance(editor, QDateTimeEdit):
- try:
- if raw_value:
- # raw_value 可能是 int/float 时间戳,或字符串形式的时间戳
- if isinstance(raw_value, (int, float)):
- dt = datetime.fromtimestamp(raw_value / 1_000_000)
- else:
- # 先尝试作为字符串时间戳解析
- try:
- ts = int(raw_value)
- if ts > 1e15: # 微秒
- ts = ts / 1_000_000
- elif ts > 1e12: # 毫秒
- ts = ts / 1_000
- dt = datetime.fromtimestamp(ts)
- except (ValueError, TypeError):
- # 再尝试解析日期时间字符串
- for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
- try:
- dt = datetime.strptime(raw_value, fmt)
- break
- except ValueError:
- continue
- else:
- dt = datetime.now()
- else:
- dt = datetime.now()
- editor.setDateTime(dt)
- except (ValueError, TypeError):
- editor.setDateTime(datetime.now())
- else:
- if isinstance(editor, QLineEdit):
- editor.setText(raw_value or "")
-
- def setModelData(self, editor, model, index):
- field_value, compare, field_name = self._get_field_info(index)
- if field_value is None:
- if isinstance(editor, QLineEdit):
- model.setData(index, editor.text(), Qt.EditRole)
- return
-
- from shared_types.enums import BaseDiaPlayEnum
- if isinstance(field_value, BaseDiaPlayEnum) and isinstance(editor, QComboBox):
- model.setData(index, editor.currentText(), Qt.EditRole)
- elif isinstance(field_value, (int, float)) and isinstance(editor, (QSpinBox, QDoubleSpinBox)):
- model.setData(index, str(editor.value()), Qt.EditRole)
- elif isinstance(field_value, datetime) and isinstance(editor, QDateTimeEdit):
- ts = int(editor.dateTime().toPyDateTime().timestamp() * 1_000_000)
- model.setData(index, str(ts), Qt.EditRole)
- else:
- if isinstance(editor, QLineEdit):
- model.setData(index, editor.text(), Qt.EditRole)
-
- def updateEditorGeometry(self, editor, option, index):
- editor.setGeometry(option.rect)
-
-
-class AutoEditTableView(QTableView):
- """自动进入编辑状态的 TableView"""
-
- def __init__(self, parent=None):
- super().__init__(parent)
- self._auto_edit_columns = set() # 需要自动编辑的列
- self.suppress_auto_edit = False # 标记是否阻止自动编辑
- self.clicked.connect(self._on_click_auto_edit)
- # 垂直表头点击选中整行
- self.verticalHeader().sectionClicked.connect(self._on_header_select_row)
- self.verticalHeader().setMaximumWidth(24)
- self.verticalHeader().setMinimumWidth(24)
-
- def mousePressEvent(self, event):
- """鼠标按下事件:右键点击时阻止自动编辑"""
- # 右键点击,阻止自动编辑
- if event.button() == Qt.RightButton:
- self.suppress_auto_edit = True
- super().mousePressEvent(event)
- self.reset_suppress_flag()
-
- def reset_suppress_flag(self):
- """重置阻止自动编辑标志"""
- self.suppress_auto_edit = False
-
- def setAutoEditColumn(self, column: int):
- """设置指定列自动进入编辑"""
- self._auto_edit_columns.add(column)
-
- def setAutoEditColumns(self, columns: list[int]):
- """设置多个列自动进入编辑"""
- self._auto_edit_columns.update(columns)
-
- def _on_header_select_row(self, section: int):
- """通过垂直表头点击选中整行时,清除单元格焦点"""
- self.selectRow(section)
- # 清除单元格焦点,让焦点回到 TableView 本身
- self.setFocus()
-
- def _on_click_auto_edit(self, index: QModelIndex):
- """鼠标点击时自动进入编辑"""
- # 如果 index 无效,不做任何操作
- if not index.isValid():
- return
-
- # 右键点击后不自动编辑
- if self.suppress_auto_edit:
- return
-
- # 如果模型没有行,不自动编辑
- model = self.model()
- if model is None or model.rowCount() == 0:
- return
-
- if index.column() in self._auto_edit_columns:
- if self.state() != QTableView.EditingState:
- self.edit(index)
- return
-
- # 如果不是自动编辑列,不进入编辑
- self.selectionModel().setCurrentIndex(index, self.selectionModel().NoUpdate)
-
- def currentChanged(self, current: QModelIndex, previous: QModelIndex):
- """当焦点单元格改变时,自动进入编辑状态(表头选择整行时不触发)"""
- super().currentChanged(current, previous)
-
- # 如果没有有效的单元格,不自动进入编辑
- if not current.isValid():
- return
-
- # 右键点击或表头选择整行,不自动进入编辑
- if self.suppress_auto_edit:
- return
-
- if current.column() in self._auto_edit_columns:
- QTimer.singleShot(0, lambda: self._try_edit(current))
-
- def _try_edit(self, index: QModelIndex):
- """尝试进入编辑状态"""
- if self.state() != QTableView.EditingState:
- self.edit(index)
-
- def addRow(self):
- """添加一行"""
- self.suppress_auto_edit = True
- model = self.model()
- if model is None:
- return
-
- model.insertRow(model.rowCount())
- self.suppress_auto_edit = False
-
- def insertRow(self, row: int):
- """插入一行"""
- self.suppress_auto_edit = True
- model = self.model()
- if model is None:
- return
-
- model.insertRow(row)
- self.suppress_auto_edit = False
-
- def removeRow(self, row: int):
- """删除一行"""
- self.suppress_auto_edit = True
- model = self.model()
- if model is None:
- return
-
- model.removeRow(row)
- self.suppress_auto_edit = False
-
-
-class FilterModel(QStandardItemModel):
- """过滤表格模型"""
-
- COL_LBRACKET, COL_FIELD, COL_COMPARE, COL_VALUE, COL_RBRACKET, COL_LOGIC = range(
- 6)
-
- def __init__(self, parent=None):
- super().__init__(0, 6, parent)
- self.setHorizontalHeaderLabels(
- ["左括号", "字段", "比较符", "值", "右括号", "逻辑符"]
- )
-
- def data(self, index, role=Qt.DisplayRole):
- """重写 data 方法,格式化某些列的显示"""
- if not index.isValid():
- return None
-
- if role == Qt.DisplayRole:
- column = index.column()
- raw_data = super().data(index, Qt.EditRole)
-
- # 值列需要格式化显示
- if column == self.COL_VALUE:
- if not raw_data:
- return ""
-
- # 获取同行字段名来确定类型
- field_index = self.index(index.row(), self.COL_FIELD)
- field_name = super().data(field_index, Qt.EditRole)
-
- if field_name:
- field_value = HistoryData.get_field_value(field_name)
- if isinstance(field_value, datetime):
- # 尝试解析时间戳并格式化为可读格式
- try:
- ts = int(raw_data)
- if ts > 1e15: # 微秒
- ts = ts / 1_000_000
- elif ts > 1e12: # 毫秒
- ts = ts / 1_000
- dt = datetime.fromtimestamp(ts)
- return dt.strftime("%Y-%m-%d %H:%M:%S")
- except (ValueError, TypeError, OSError):
- return raw_data
-
- return raw_data
-
- return super().data(index, role)
-
- def get_row_data(self, row: int) -> dict:
- """获取指定行的所有数据"""
- return {
- "left_bracket": self.data(self.index(row, self.COL_LBRACKET)),
- "field": self.data(self.index(row, self.COL_FIELD)),
- "compare": self.data(self.index(row, self.COL_COMPARE)),
- "value": self.data(self.index(row, self.COL_VALUE)),
- "right_bracket": self.data(self.index(row, self.COL_RBRACKET)),
- "logic": self.data(self.index(row, self.COL_LOGIC)),
- }
-
- def get_field_value_type(self, row: int):
- """获取指定行字段的原始值类型"""
- field_name = str(self.data(self.index(row, self.COL_FIELD)))
- return HistoryData.get_field_value(field_name)
-
-
-class SortModel(QStandardItemModel):
- """排序表格模型"""
-
- COL_FIELD, COL_ORDER = range(2)
-
- def __init__(self, parent=None):
- super().__init__(0, 2, parent)
- self.setHorizontalHeaderLabels(["排序字段", "升序/降序"])
-
- def get_row_data(self, row: int) -> dict:
- """获取指定行的所有数据"""
- return {
- "field": self.data(self.index(row, self.COL_FIELD)),
- "order": self.data(self.index(row, self.COL_ORDER)),
- }
-
-
-class FilterWidget(QWidget):
- """筛选条件控件"""
-
- def __init__(self, float_decimals: int = 2, parent=None):
- super().__init__(parent)
- self._float_decimals = float_decimals
-
- # 使用 QSplitter 实现横向分割
- splitter = QSplitter(Qt.Horizontal)
- splitter.setChildrenCollapsible(False)
-
- # 左侧:过滤表格
- filter_widget = QWidget()
- filter_layout = QVBoxLayout(filter_widget)
- filter_layout.setContentsMargins(0, 0, 0, 0)
- self.table = AutoEditTableView()
- self.table.setModel(FilterModel(self))
- self.table.horizontalHeader().setDefaultAlignment(Qt.AlignCenter)
- self.table.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
- self.table.setContextMenuPolicy(Qt.CustomContextMenu)
- self.table.customContextMenuRequested.connect(self.show_context_menu)
- self.table.setSelectionBehavior(QTableView.SelectItems)
- self.table.setSelectionMode(QTableView.ExtendedSelection)
- self.table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Minimum)
- # 设置所有列自动进入编辑
- self.table.setAutoEditColumns(
- list(range(FilterModel.COL_LBRACKET, FilterModel.COL_LOGIC + 1)))
- # 禁用双击编辑
- self.table.setEditTriggers(QTableView.NoEditTriggers)
- filter_layout.addWidget(self.table)
- splitter.addWidget(filter_widget)
-
- # 右侧:排序表格
- sort_widget = QWidget()
- sort_layout = QVBoxLayout(sort_widget)
- sort_layout.setContentsMargins(0, 0, 0, 0)
- sort_layout.setSpacing(0)
-
- self.sort_table = AutoEditTableView()
- self.sort_table.setModel(SortModel(self))
- self.sort_table.horizontalHeader().setDefaultAlignment(Qt.AlignCenter)
- self.sort_table.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
- self.sort_table.setContextMenuPolicy(Qt.CustomContextMenu)
- self.sort_table.customContextMenuRequested.connect(
- self.show_sort_context_menu)
- self.sort_table.setSelectionBehavior(QTableView.SelectItems)
- self.sort_table.setSelectionMode(QTableView.ExtendedSelection)
- self.sort_table.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Minimum)
- # 设置所有列自动进入编辑
- self.sort_table.setAutoEditColumns(
- [SortModel.COL_FIELD, SortModel.COL_ORDER])
- # 禁用双击编辑
- self.sort_table.setEditTriggers(QTableView.NoEditTriggers)
- sort_layout.addWidget(self.sort_table)
- splitter.addWidget(sort_widget)
-
- # 设置初始比例
- splitter.setStretchFactor(0, 3)
- splitter.setStretchFactor(1, 1)
- splitter.setMinimumSize(0, 0)
-
- # 主布局
- main_layout = QVBoxLayout(self)
- main_layout.setContentsMargins(0, 0, 0, 0)
- main_layout.addWidget(splitter)
- self.setLayout(main_layout)
-
- self._setup_delegates()
- self._connect_field_change_signal()
-
- def _connect_field_change_signal(self):
- """当字段列改变时,更新值列的默认值"""
- self.table.model().dataChanged.connect(self._on_field_changed)
-
- def _on_field_changed(self, topLeft, bottomRight, roles):
- """字段列改变时触发"""
- if Qt.EditRole in roles and topLeft.column() == FilterModel.COL_FIELD:
- for row in range(topLeft.row(), bottomRight.row() + 1):
- self._update_value_default(row)
-
- def _update_value_default(self, row: int):
- """更新指定行的值列默认值"""
- model = self.table.model()
- field_name = model.data(model.index(row, FilterModel.COL_FIELD))
-
- if not field_name:
- return
-
- field_value = HistoryData.get_field_value(field_name)
- new_default = self._get_default_value(field_value)
-
- # 获取当前值(使用 EditRole 获取原始数据)
- current_value = model.data(model.index(
- row, FilterModel.COL_VALUE), Qt.EditRole)
-
- # 始终更新为新类型的默认值
- model.setData(
- model.index(row, FilterModel.COL_VALUE),
- new_default,
- Qt.EditRole
- )
-
- def _get_default_value(self, field_value) -> str:
- """获取字段类型的默认值"""
- from shared_types.enums import BaseDiaPlayEnum
-
- if field_value is None:
- return ""
- elif isinstance(field_value, BaseDiaPlayEnum):
- # 枚举类型返回第一个选项
- return field_value.__class__.display_names()[0]
- elif isinstance(field_value, datetime):
- # 日期类型返回当前时间戳(微秒)
- ts = int(datetime.now().timestamp() * 1_000_000)
- return str(ts)
- elif isinstance(field_value, float):
- return "0.0"
- elif isinstance(field_value, int):
- return "0"
- else:
- return ""
-
- def _setup_delegates(self):
- """设置列代理"""
- # 左括号
- self.table.setItemDelegateForColumn(
- FilterModel.COL_LBRACKET,
- ComboBoxDelegate(["", "(", "(("], self)
- )
- # 字段
- self.table.setItemDelegateForColumn(
- FilterModel.COL_FIELD,
- EditableComboBoxDelegate(HistoryData.fields(), self)
- )
- # 比较符
- self.table.setItemDelegateForColumn(
- FilterModel.COL_COMPARE,
- ComboBoxDelegate(CompareSymbol.display_names(), self)
- )
- # 值列 - 使用智能代理
- self.table.setItemDelegateForColumn(
- FilterModel.COL_VALUE,
- FilterValueDelegate(self._float_decimals, self)
- )
- # 右括号
- self.table.setItemDelegateForColumn(
- FilterModel.COL_RBRACKET,
- ComboBoxDelegate(["", ")", "))"], self)
- )
- # 逻辑符
- self.table.setItemDelegateForColumn(
- FilterModel.COL_LOGIC,
- ComboBoxDelegate(LogicSymbol.display_names(), self)
- )
-
- # 排序表格代理
- self.sort_table.setItemDelegateForColumn(
- SortModel.COL_FIELD,
- EditableComboBoxDelegate(HistoryData.fields(), self)
- )
- self.sort_table.setItemDelegateForColumn(
- SortModel.COL_ORDER,
- ComboBoxDelegate(
- [_translate("Form", "升序"), _translate("Form", "降序")], self
- )
- )
-
- def show_sort_context_menu(self, pos):
- """排序列表右键菜单"""
- menu = QMenu(self)
- menu.addAction(_translate("Form", "添加"), self._add_sort_row)
- menu.addAction(_translate("Form", "插入"), self._insert_sort_row)
- menu.addAction(_translate("Form", "删除"), self._del_sort_row)
- menu.exec_(self.sort_table.mapToGlobal(pos))
-
- def _add_sort_row(self):
- """添加排序行"""
- model = self.sort_table.model()
- row = model.rowCount()
- model.insertRow(row)
- model.setData(model.index(row, SortModel.COL_FIELD),
- HistoryData.fields()[0])
- model.setData(model.index(row, SortModel.COL_ORDER),
- _translate("Form", "升序"))
-
- def _add_sort_row_at(self, row: int, field: str | None = None, order: str | None = None):
- """在指定位置添加排序行"""
- model = self.sort_table.model()
- if field is None:
- field = HistoryData.fields()[0]
- if order is None:
- order = _translate("Form", "升序")
-
- self.sort_table.insertRow(row)
- model.setData(model.index(row, SortModel.COL_FIELD), field)
- model.setData(model.index(row, SortModel.COL_ORDER), order)
-
- def _insert_sort_row(self):
- """在当前行前插入排序行"""
- current_row = self.sort_table.currentIndex().row()
- self._add_sort_row_at(current_row if current_row >= 0 else 0)
-
- def _del_sort_row(self):
- """删除选中的排序行"""
- selected_rows = set(
- index.row() for index in self.sort_table.selectionModel().selectedIndexes())
- if not selected_rows:
- return
- for row in sorted(selected_rows, reverse=True):
- self.sort_table.removeRow(row)
-
- def _del_sort_row_at(self, row: int):
- """删除指定行的排序"""
- if row >= 0:
- self.sort_table.removeRow(row)
-
- def gen_order_str(self) -> str:
- """生成排序 SQL 语句"""
- model = self.sort_table.model()
- if model.rowCount() == 0:
- return ""
-
- orders = []
- for row in range(model.rowCount()):
- field = model.data(model.index(row, SortModel.COL_FIELD))
- order_text = model.data(model.index(row, SortModel.COL_ORDER))
- order_sql = "ASC" if order_text == _translate(
- "Form", "升序") else "DESC"
- orders.append(f"{field} {order_sql}")
-
- if orders:
- return " ORDER BY " + ", ".join(orders)
- return ""
-
- def set_float_decimals(self, decimals: int) -> None:
- """动态设置小数位数"""
- self._float_decimals = decimals
- # 更新值列代理
- self.table.setItemDelegateForColumn(
- FilterModel.COL_VALUE,
- FilterValueDelegate(self._float_decimals, self)
- )
-
- def show_context_menu(self, pos):
- menu = QMenu(self)
- menu.addAction(_translate("Form", "添加"), self.add_row)
- menu.addAction(
- _translate("Form", "插入"), lambda: self.insert_row(
- self.table.currentIndex().row())
- )
- menu.addAction(_translate("Form", "删除"), self.del_row)
- menu.exec_(self.table.mapToGlobal(pos))
-
- def add_row(self):
- self.insert_row(self.table.model().rowCount())
-
- def del_row(self):
- # 获取所有选中的行
- selected_rows = set(index.row()
- for index in self.table.selectionModel().selectedIndexes())
- if not selected_rows:
- return
- # 取消行选中
- self.table.clearSelection()
- # 从后往前删除,避免索引变化
- for row in sorted(selected_rows, reverse=True):
- self.table.removeRow(row)
-
- def insert_row(self, row: int):
- if row < 0:
- row = 0
- model = self.table.model()
- self.table.insertRow(row)
- # 设置默认值
- model.setData(model.index(row, FilterModel.COL_FIELD),
- HistoryData.fields()[0])
- model.setData(model.index(row, FilterModel.COL_COMPARE),
- CompareSymbol.display_names()[0])
- model.setData(model.index(row, FilterModel.COL_LOGIC),
- LogicSymbol.display_names()[0])
- # 不需要手动设置值,代理会根据字段类型自动处理
-
- def gen_filter_str(self):
- model = cast(FilterModel, self.table.model())
- filter_str = ""
- left_count = 0
- right_count = 0
- for row in range(model.rowCount()):
- data = model.get_row_data(row)
- field_value_type = model.get_field_value_type(row)
-
- left_bracket = data["left_bracket"] or ""
- field = data["field"] or ""
- compare_text = data["compare"] or ""
- value = data["value"] or ""
- right_bracket = data["right_bracket"] or ""
- logic_text = data["logic"] or ""
-
- if not field or not compare_text:
- continue
-
- compare = CompareSymbol.from_display_name(compare_text)
- logic = LogicSymbol.from_display_name(logic_text).to_sql
-
- if left_bracket == "(":
- left_count += 1
- elif left_bracket == "((":
- left_count += 2
- if right_bracket == ")":
- right_count += 1
- elif right_bracket == "))":
- right_count += 2
-
- if right_count > left_count:
- QMessageBox.warning(
- self, "错误", f"第{row}行 右括号数量大于左括号数量,请检查"
- )
- return None
-
- # 处理值
- from shared_types.enums import BaseDiaPlayEnum
- if isinstance(field_value_type, BaseDiaPlayEnum):
- enum_cls = field_value_type.__class__
- for e in enum_cls:
- if e.display_name == value:
- value = str(e.value)
- break
- elif compare in (CompareSymbol.Contains, CompareSymbol.NotContains):
- if isinstance(field_value_type, (int, float)):
- values = value.split(",")
- for v in values:
- if not v.replace("-", "").replace(".", "").isdigit():
- QMessageBox.warning(
- self, "错误", f"第{row}行 {v} 不是数字"
- )
- return None
- value = ",".join(v for v in values)
- elif isinstance(field_value_type, datetime):
- values = value.split(",")
- parsed_values = []
- for v in values:
- v = v.strip()
- if not v:
- continue
- try:
- # 尝试解析为时间戳(微秒)
- ts = int(float(v))
- parsed_values.append(str(ts))
- except ValueError:
- # 尝试解析为日期时间字符串
- try:
- for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
- try:
- dt = datetime.strptime(v, fmt)
- parsed_values.append(
- str(int(dt.timestamp() * 1_000_000)))
- break
- except ValueError:
- continue
- else:
- raise ValueError(f"无法解析日期: {v}")
- except ValueError as e:
- QMessageBox.warning(
- self, "错误", f"第{row}行 {v} 不是合法的日期时间"
- )
- return None
- value = ",".join(parsed_values) if parsed_values else ""
- else:
- value = ",".join(
- f"'{v}'" for v in value.split(",") if v.strip())
- value = f"({value})" if value else "()"
- elif isinstance(field_value_type, datetime) and value:
- try:
- # 可能是时间戳字符串
- ts = int(float(value))
- value = str(ts)
- except ValueError:
- # 尝试解析日期时间字符串
- try:
- for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
- try:
- dt = datetime.strptime(value, fmt)
- value = str(int(dt.timestamp() * 1_000_000))
- break
- except ValueError:
- continue
- else:
- QMessageBox.warning(
- self, "错误", f"第{row}行 {value} 不是合法的日期时间"
- )
- return None
- except ValueError:
- QMessageBox.warning(
- self, "错误", f"第{row}行 {value} 不是合法的日期时间"
- )
- return None
- elif value and not value.startswith("'"):
- value = f"'{value}'"
-
- is_last = row == model.rowCount() - 1
- filter_str += (
- f" {left_bracket} {field} {compare.to_sql} {value} {right_bracket} "
- )
- if not is_last:
- filter_str += logic
-
- if left_count != right_count:
- QMessageBox.warning(self, "错误", "左括号数量和右括号数量不匹配,请检查")
- return None
- return filter_str
-
-
-class HistoryTable(QWidget):
- """历史记录表格"""
-
- # 信号:列显示配置变化 (show_fields_json)
- show_fields_changed = pyqtSignal(str)
-
- HEADERS = [
- "replay_id",
- "game_board_state",
- "rtime",
- "left",
- "right",
- "double",
- "left_s",
- "right_s",
- "double_s",
- "level",
- "cl",
- "cl_s",
- "ce",
- "ce_s",
- "rce",
- "lce",
- "dce",
- "bbbv",
- "bbbv_solved",
- "bbbv_s",
- "flag",
- "path",
- "etime",
- "start_time",
- "end_time",
- "mode",
- "software",
- "player_identifier",
- "race_identifier",
- "uniqueness_identifier",
- "stnb",
- "corr",
- "thrp",
- "ioe",
- "is_official",
- "is_fair",
- "op",
- "isl",
- "pluck",
- ]
-
- def __init__(self, show_fields: set[str], db_path: Path, parent=None):
- super().__init__(parent)
- self._db_path = db_path
- layout = QVBoxLayout(self)
- self.table = QTableView(self)
- layout.addWidget(self.table)
- self.setLayout(layout)
-
- self.table.setEditTriggers(QTableView.NoEditTriggers)
- self.table.setContextMenuPolicy(Qt.CustomContextMenu)
- self.table.customContextMenuRequested.connect(self.show_context_menu)
- self.showFields: set[str] = show_fields
- self.headers = self.HEADERS
-
- self.model = HistoryTableModel([], self.headers, self.showFields, self)
- self.table.setModel(self.model)
- self.table.horizontalHeader().setDefaultAlignment(Qt.AlignCenter)
- self.table.setSelectionBehavior(QTableView.SelectRows)
- self.table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
-
- def load(self, data: list[HistoryData]):
- self.model.update_data(data)
-
- def refresh(self):
- parent_widget = self.parent()
- if hasattr(parent_widget, "load_data"):
- parent_widget.load_data() # type: ignore
-
- def show_context_menu(self, pos):
- menu = QMenu(self)
- menu.addAction(_translate("Form", "播放"), self.play_row)
- menu.addAction(_translate("Form", "导出"), self.export_row)
- menu.addAction(_translate("Form", "刷新"), self.refresh)
- submenu = QMenu(_translate("Form", "显示字段"), self)
- for field in self.headers:
- action = QAction(field, self)
- action.setCheckable(True)
- action.setChecked(field in self.showFields)
- action.triggered.connect(
- lambda checked, a=action: self._on_toggle_field(a))
- submenu.addAction(action)
- menu.addMenu(submenu)
- menu.exec_(self.table.mapToGlobal(pos))
-
- def _on_toggle_field(self, action: QAction):
- name = action.text()
- if action.isChecked():
- self.showFields.add(name)
- else:
- self.showFields.discard(name)
- self.model.update_show_fields(self.showFields)
- self.show_fields_changed.emit(json.dumps(
- list(self.showFields), ensure_ascii=False))
-
- def _get_current_replay_id(self) -> int | None:
- row_idx = self.table.currentIndex().row()
- if row_idx < 0:
- return None
- visible = self.model._visible_headers
- if "replay_id" in visible:
- col = visible.index("replay_id")
- rid = self.model.data(self.model.index(row_idx, col), Qt.UserRole)
- return rid # type: ignore
- return getattr(self.model._data[row_idx], "replay_id", None)
-
- def _read_raw_data(self, replay_id: int) -> bytes | None:
- conn = sqlite3.connect(self._db_path)
- try:
- cursor = conn.cursor()
- cursor.execute(
- "SELECT raw_data FROM history WHERE replay_id = ?", (
- replay_id,)
- )
- row = cursor.fetchone()
- return row[0] if row else None
- finally:
- conn.close()
-
- def save_evf(self, evf_path: str):
- replay_id = self._get_current_replay_id()
- if replay_id is None:
- return
- raw_data = self._read_raw_data(replay_id)
- if raw_data is None:
- return
- with open(evf_path, "wb") as f:
- f.write(raw_data)
-
- def play_row(self):
- exec_dir = get_executable_dir()
- temp_filename = exec_dir / "tmp.evf"
- self.save_evf(str(temp_filename))
-
- exe = exec_dir / "metaminesweeper.exe"
- main_py = exec_dir / "main.py"
-
- if main_py.exists():
- subprocess.Popen(
- [sys.executable, str(main_py), str(temp_filename)])
- elif exe.exists():
- subprocess.Popen([str(exe), str(temp_filename)])
- else:
- QMessageBox.warning(
- self, "错误", "找不到主程序 (main.py 或 metaminesweeper.exe)"
- )
-
- def export_row(self):
- file_path, _ = QFileDialog.getSaveFileName(
- self,
- _translate("Form", "导出evf文件"),
- str(get_executable_dir()),
- "evf文件 (*.evf)",
- )
- if file_path:
- self.save_evf(file_path)
-
-
-class HistoryMainWidget(QWidget):
- """历史记录插件的主界面(作为插件的 widget 返回)"""
-
- # 信号:排序和过滤状态变化 (filter_json, sort_json)
- filter_sort_state_changed = pyqtSignal(str, str)
- # 信号:列显示配置变化 (show_fields_json)
- show_fields_changed = pyqtSignal(str)
-
- def __init__(
- self,
- db_path: Path,
- config_path: Path,
- float_decimals: int = 2,
- page_size: str = "50",
- parent=None,
- ):
- super().__init__(parent)
- self._db_path = db_path
- self._config_path = config_path
-
- self.setWindowTitle(_translate("Form", "历史记录"))
- self.resize(800, 600)
-
- layout = QVBoxLayout(self)
-
- # 查询按钮
- btn_layout = QHBoxLayout()
- self.query_button = QPushButton(_translate("Form", "查询"))
- btn_layout.addWidget(self.query_button)
- btn_layout.addItem(
- QSpacerItem(10, 10, QSizePolicy.Expanding, QSizePolicy.Minimum)
- )
-
- # 筛选 + 表格
- self.filter_widget = FilterWidget(float_decimals, self)
- self.table = HistoryTable(self._get_show_fields(), db_path, self)
-
- # 分页
- limit_layout = QHBoxLayout()
- self.previous_button = QPushButton(_translate("Form", "上一页"))
- self.page_spin = QSpinBox()
- self.page_spin.setMinimum(1)
- self.page_spin.setValue(1)
- self.next_button = QPushButton(_translate("Form", "下一页"))
- self.one_page_combo = QComboBox()
- self.one_page_combo.addItems(
- ["10", "20", "50", "100", "200", "500", "1000"])
- # 设置默认每页条数
- idx = self.one_page_combo.findText(page_size)
- if idx >= 0:
- self.one_page_combo.setCurrentIndex(idx)
-
- self.limit_label = QLabel("")
- limit_layout.addItem(
- QSpacerItem(10, 10, QSizePolicy.Expanding, QSizePolicy.Minimum)
- )
- limit_layout.addWidget(self.limit_label)
- limit_layout.addWidget(self.previous_button)
- limit_layout.addWidget(self.page_spin)
- limit_layout.addWidget(self.next_button)
- limit_layout.addWidget(self.one_page_combo)
-
- layout.addLayout(btn_layout)
- layout.addWidget(self.filter_widget)
- layout.addWidget(self.table)
- layout.addLayout(limit_layout)
- self.setLayout(layout)
-
- self._connect_signals()
- self.load_data()
-
- def set_filter_sort_state(self, filter_json: str, sort_json: str) -> None:
- """设置排序和过滤状态(由插件调用)"""
- try:
- filter_rows = json.loads(filter_json)
- if filter_rows:
- self._set_filter_rows(filter_rows)
- except (json.JSONDecodeError, TypeError):
- pass
-
- try:
- sort_rows = json.loads(sort_json)
- if sort_rows:
- self._set_sort_rows(sort_rows)
- except (json.JSONDecodeError, TypeError):
- pass
-
- # 恢复后触发一次查询
- self._on_query()
-
- def _connect_signals(self):
- self.query_button.clicked.connect(self._on_query)
- self.previous_button.clicked.connect(
- lambda: self.page_spin.setValue(self.page_spin.value() - 1)
- )
- self.next_button.clicked.connect(
- lambda: self.page_spin.setValue(self.page_spin.value() + 1)
- )
- self.one_page_combo.currentTextChanged.connect(self.load_data)
- self.page_spin.valueChanged.connect(self.load_data)
- self.table.show_fields_changed.connect(self.show_fields_changed)
-
- def _on_query(self):
- if self.page_spin.value() > 1:
- self.page_spin.setValue(1)
- else:
- self.load_data()
-
- def _get_limit_str(self):
- per_page = int(self.one_page_combo.currentText())
- offset = (self.page_spin.value() - 1) * per_page
- return f" LIMIT {per_page} OFFSET {offset}"
-
- def _get_show_fields(self) -> set[str]:
- if not self._config_path.exists():
- return set(HistoryData.fields())
- with open(self._config_path, "r") as f:
- return set(json.load(f))
-
- def load_data(self):
- if not self._db_path.exists():
- QMessageBox.warning(self, "错误", "历史记录数据库不存在")
- return
-
- try:
- conn = sqlite3.connect(self._db_path)
- conn.row_factory = sqlite3.Row
- cursor = conn.cursor()
- filter_str = self.filter_widget.gen_filter_str()
- order_str = self.filter_widget.gen_order_str()
- sql = "SELECT *, COUNT(*) OVER() AS total_count FROM history"
- if filter_str:
- sql += " WHERE " + filter_str
- elif filter_str is None:
- return
- sql += order_str
- sql += self._get_limit_str()
- cursor.execute(sql)
- datas = cursor.fetchall()
-
- if not datas:
- self.page_spin.setMaximum(1)
- self.limit_label.setText("共0行,0页")
- else:
- per_page = int(self.one_page_combo.currentText())
- total = datas[0]["total_count"]
- max_page = math.ceil(total / per_page)
- self.page_spin.setMaximum(max_page)
- self.limit_label.setText(f"共{total}行,{max_page}页")
-
- history_data = [HistoryData.from_dict(dict(d)) for d in datas]
- conn.close()
- except sqlite3.Error as e:
- QMessageBox.warning(self, "错误", f"加载历史记录失败: {e}")
- return
-
- self.table.load(history_data)
-
- # 保存当前的排序和过滤状态
- self._save_filter_sort_state()
-
- def _get_filter_rows(self) -> list[dict]:
- """获取过滤表格的所有行数据"""
- model = cast(FilterModel, self.filter_widget.table.model())
- rows = []
- for row in range(model.rowCount()):
- rows.append(model.get_row_data(row))
- return rows
-
- def _get_sort_rows(self) -> list[dict]:
- """获取排序表格的所有行数据"""
- model = cast(SortModel, self.filter_widget.sort_table.model())
- rows = []
- for row in range(model.rowCount()):
- rows.append(model.get_row_data(row))
- return rows
-
- def _set_filter_rows(self, rows: list[dict]) -> None:
- """恢复过滤表格的行数据"""
- model = self.filter_widget.table.model()
- model.removeRows(0, model.rowCount())
- for row_data in rows:
- row = model.rowCount()
- model.insertRow(row)
- model.setData(model.index(row, FilterModel.COL_LBRACKET),
- row_data.get("left_bracket"))
- model.setData(model.index(row, FilterModel.COL_FIELD),
- row_data.get("field"))
- model.setData(model.index(row, FilterModel.COL_COMPARE),
- row_data.get("compare"))
- model.setData(model.index(row, FilterModel.COL_VALUE),
- row_data.get("value"), Qt.EditRole)
- model.setData(model.index(row, FilterModel.COL_RBRACKET),
- row_data.get("right_bracket"))
- model.setData(model.index(row, FilterModel.COL_LOGIC),
- row_data.get("logic"))
-
- def _set_sort_rows(self, rows: list[dict]) -> None:
- """恢复排序表格的行数据"""
- model = self.filter_widget.sort_table.model()
- model.removeRows(0, model.rowCount())
- for row_data in rows:
- row = model.rowCount()
- model.insertRow(row)
- model.setData(model.index(row, SortModel.COL_FIELD),
- row_data.get("field"))
- model.setData(model.index(row, SortModel.COL_ORDER),
- row_data.get("order"))
-
- def _save_filter_sort_state(self) -> None:
- """发射排序和过滤状态变化信号"""
- filter_rows = self._get_filter_rows()
- sort_rows = self._get_sort_rows()
- self.filter_sort_state_changed.emit(
- json.dumps(filter_rows, ensure_ascii=False), json.dumps(sort_rows, ensure_ascii=False))
-
- def closeEvent(self, event: _QCloseEvent):
- """关闭事件"""
- super().closeEvent(event)
-
- def set_float_decimals(self, decimals: int) -> None:
- """动态设置小数位数"""
- self.filter_widget.set_float_decimals(decimals)
-
- def restore_show_fields(self, show_fields_json: str) -> None:
- """恢复列显示配置"""
- try:
- fields = json.loads(show_fields_json)
- if not fields:
- fields = self.table.HEADERS
- self.table.showFields = set(fields)
- self.table.model.update_show_fields(self.table.showFields)
- except (json.JSONDecodeError, TypeError):
- pass
+from .columns_dialog import ColumnsDialog
+from .delegates import ComboBoxDelegate, EditableComboBoxDelegate, FilterValueDelegate
+from .filter_dialog import FilterDialog
+from .history_table import HistoryTable
+from .main_widget import HistoryMainWidget
+from .sort_dialog import SortDialog
+from .table_views import AutoEditTableView, FilterModel, SortModel
+
+__all__ = [
+ "ComboBoxDelegate",
+ "EditableComboBoxDelegate",
+ "FilterValueDelegate",
+ "AutoEditTableView",
+ "FilterModel",
+ "SortModel",
+ "FilterDialog",
+ "SortDialog",
+ "ColumnsDialog",
+ "HistoryTable",
+ "HistoryMainWidget",
+]