From 198ec2dc8f88c6da73387461cbd1b1de0b0f472f Mon Sep 17 00:00:00 2001 From: lintaogood Date: Mon, 1 Sep 2025 12:08:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E6=96=B0=E5=BB=BA=E5=86=85?= =?UTF-8?q?=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/.gitignore | 3 + .idea/MarsCodeWorkspaceAppSettings.xml | 6 + .idea/dx_sl.iml | 12 + .../inspectionProfiles/profiles_settings.xml | 6 + .idea/misc.xml | 7 + .idea/modules.xml | 8 + .idea/vcs.xml | 7 + StatusStyleDelegate.py | 113 ++ log_style_manager.py | 99 ++ logger_utils_new.py | 73 ++ order_executor.ahk | 58 ++ order_executor.py | 290 ++++++ quote_manager.py | 248 +++++ stock_monitor_pyside.py | 981 ++++++++++++++++++ 14 files changed, 1911 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/MarsCodeWorkspaceAppSettings.xml create mode 100644 .idea/dx_sl.iml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 StatusStyleDelegate.py create mode 100644 log_style_manager.py create mode 100644 logger_utils_new.py create mode 100644 order_executor.ahk create mode 100644 order_executor.py create mode 100644 quote_manager.py create mode 100644 stock_monitor_pyside.py diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..50d9d22 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml diff --git a/.idea/MarsCodeWorkspaceAppSettings.xml b/.idea/MarsCodeWorkspaceAppSettings.xml new file mode 100644 index 0000000..fb4c0c9 --- /dev/null +++ b/.idea/MarsCodeWorkspaceAppSettings.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/dx_sl.iml b/.idea/dx_sl.iml new file mode 100644 index 0000000..5a6edef --- /dev/null +++ b/.idea/dx_sl.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..5f1aa97 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..3ad9de2 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..6a392ba --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/StatusStyleDelegate.py b/StatusStyleDelegate.py new file mode 100644 index 0000000..ec6f6ce --- /dev/null +++ b/StatusStyleDelegate.py @@ -0,0 +1,113 @@ +# StatusStyleDelegate.py +from PySide6.QtWidgets import QStyledItemDelegate +from PySide6.QtGui import QColor, QFont, QBrush +from PySide6.QtCore import Qt, QTimer +from PySide6.QtGui import QPalette + +class StatusStyleDelegate(QStyledItemDelegate): + def __init__(self, parent=None, table_view=None): + super().__init__(parent) + self.flash_cycles = {} # 存储闪烁周期: {(row, col): (timer, cycle_count, original_brush)} + # self.flash_duration = 1000 # 闪烁持续时间(毫秒) + # self.updated_cells = set() # 存储需要高亮的单元格 (row, col) + self.table_view = table_view # 保存 QTableView 实例 + + def flash_cell(self, row, column): + # 如果单元格已经在闪烁,重置计时器 + if (row, column) in self.flash_cycles: + timer, _, _ = self.flash_cycles[(row, column)] + timer.stop() + del self.flash_cycles[(row, column)] + + # 获取原始背景色 + index = self.table_view.model().index(row, column) + original_brush = self.get_original_background(index) + + # 创建周期性计时器实现闪烁效果 + cycle_count = 0 + timer = QTimer() + timer.setInterval(200) # 200ms切换一次颜色 + + def update_flash(): + nonlocal cycle_count + cycle_count += 1 + # 闪烁3个周期(6次切换)后停止 + if cycle_count > 6: + timer.stop() + if (row, column) in self.flash_cycles: + del self.flash_cycles[(row, column)] + self.table_view.viewport().update() + return + + # 切换背景色 (黄色 <-> 原始色) + is_highlight = (cycle_count % 2 == 1) + self.flash_cycles[(row, column)] = (timer, cycle_count, original_brush) + self.table_view.viewport().update() + + timer.timeout.connect(update_flash) + self.flash_cycles[(row, column)] = (timer, cycle_count, original_brush) + timer.start() + update_flash() # 立即开始第一个周期 + + + def get_original_background(self, index): + # 获取单元格原始背景色 + model = index.model() + status_index = model.index(index.row(), 7) + status = model.data(status_index, Qt.ItemDataRole.DisplayRole) + + if status == "已触发": + return QBrush(QColor('lightcoral')) + elif status == "错买": + return QBrush(QColor('lightgreen')) + else: + return QBrush(QColor('white')) + + def initStyleOption(self, option, index): + super().initStyleOption(option, index) + + model = index.model() + status_index = model.index(index.row(), 7) + status = model.data(status_index, Qt.ItemDataRole.DisplayRole) + + + if status == "已触发": + option.font = QFont("Arial", 14, QFont.Weight.Bold) + option.palette.setColor(QPalette.ColorRole.Text, QColor('red')) + option.backgroundBrush = QBrush(QColor('lightcoral')) # 背景色 + elif status == "错买": + option.font = QFont("Arial", 14, QFont.Weight.Bold) + option.palette.setColor(QPalette.ColorRole.Text, QColor('green')) # 文字颜色 + option.backgroundBrush = QBrush(QColor('lightgreen')) # 背景色 + elif status == "监控中": + option.font = QFont("Arial", 12, QFont.Weight.Normal) + option.palette.setColor(QPalette.ColorRole.Text, QColor('black')) + option.backgroundBrush = QBrush(QColor('white')) + else: + option.font = QFont("Arial", 12, QFont.Weight.Normal) + option.palette.setColor(QPalette.ColorRole.Text, QColor('black')) + option.backgroundBrush = QBrush(QColor('white')) + + # 应用闪烁效果 (黄色 <-> 原始色交替) + row = index.row() + col = index.column() + if (row, col) in self.flash_cycles: + timer, cycle_count, original_brush = self.flash_cycles[(row, col)] + if cycle_count % 2 == 1: + # 使用高对比度黄色 (#FFFF99) + option.backgroundBrush = QBrush(QColor(255, 255, 153)) + else: + option.backgroundBrush = original_brush + + # 新增对获利列的处理 + if index.column() == 6: # 检查是否为获利列 + option.font = QFont(option.font().family(), option.font().pointSize(), QFont.Weight.Bold) + profit_text = model.data(index, Qt.ItemDataRole.DisplayRole) + try: + profit_value = float(profit_text.replace('%', '')) # 移除百分号再转换 + if profit_value > 0: + option.palette.setColor(QPalette.ColorRole.Text, QColor('red')) + elif profit_value < 0: + option.palette.setColor(QPalette.ColorRole.Text, QColor('green')) + except ValueError: + pass # 如果转换失败,保持默认颜色 \ No newline at end of file diff --git a/log_style_manager.py b/log_style_manager.py new file mode 100644 index 0000000..3f1ddfc --- /dev/null +++ b/log_style_manager.py @@ -0,0 +1,99 @@ +# log_style_manager.py + +from PySide6.QtGui import QTextDocument, QTextBlockFormat, QTextCharFormat, QFont, QColor +from PySide6.QtCore import Qt +from PySide6.QtGui import QTextCursor + + +class LogStyleManager: + """ + 日志样式管理器,统一控制 QTextEdit 的字体、颜色、行间距等 + """ + + def __init__(self, text_edit): + self.text_edit = text_edit + self.default_font = self.text_edit.font() + self.default_style = { + 'default': {'color': 'white', 'bold': False}, + 'info': {'color': 'cyan', 'bold': False}, + 'warning': {'color': 'yellow', 'bold': False}, + 'error': {'color': 'red', 'bold': False}, + 'trigger': {'color': '#28a745', 'bold': False}, + } + + def set_log_type_mapping(self, mapping=None): + """ + 设置日志类型与样式的映射关系 + :param mapping: dict,键为日志类型(如 'info'),值为对应的样式字典 + 样式字典支持: + - color: 颜色字符串 + - bold: 是否加粗 + - background: 背景颜色(可选) + - italic: 是否斜体 + """ + default_mapping = { + 'default': {'color': 'white', 'bold': False}, + 'info': {'color': 'cyan', 'bold': False}, + 'warning': {'color': 'yellow', 'bold': False}, + 'error': {'color': 'red', 'bold': True}, # 错误信息加粗 + 'trigger': {'color': '#28a745', 'bold': False}, + 'debug': {'color': 'gray', 'italic': True} # 新增 debug 类型 + } + + # 如果传入了新的映射规则,则更新默认样式 + if mapping is not None: + self.default_style = mapping + else: + self.default_style = default_mapping + + def set_global_font(self, font_family="微软雅黑", font_size=13): + """设置全局字体""" + self.default_font.setFamily(font_family) + self.default_font.setPointSize(font_size) + self.text_edit.setFont(self.default_font) + + def set_letter_spacing(self, spacing=5): + """设置字间距""" + self.default_font.setLetterSpacing(QFont.SpacingType.AbsoluteSpacing, spacing) + self.text_edit.setFont(self.default_font) + + def set_line_height(self, line_height=150): + """设置段落行高(百分比)""" + doc = self.text_edit.document() + block_format = QTextBlockFormat() + + block_format.setLineHeight(float(line_height), 0) + + block_format.setTopMargin(0) + block_format.setBottomMargin(0) + + cursor = QTextCursor(doc) + cursor.select(QTextCursor.SelectionType.Document) # 修改此处 + cursor.mergeBlockFormat(block_format) + cursor.clearSelection() + + def apply_log_style(self, log_type='default'): + """返回适用于当前日志类型的 QTextCharFormat""" + style = self.default_style.get(log_type.lower(), self.default_style['default']) + fmt = QTextCharFormat() + fmt.setForeground(QColor(style['color'])) + if style.get('bold', False): + fmt.setFontWeight(QFont.Weight.Bold) + return fmt + + def insert_log(self, message, log_type='default'): + """插入带样式的日志信息""" + fmt = self.apply_log_style(log_type) + + cursor = self.text_edit.textCursor() + cursor.movePosition(QTextCursor.End) + cursor.mergeCharFormat(fmt) + cursor.insertText(message + "\n") + self.text_edit.setTextCursor(cursor) + self.text_edit.ensureCursorVisible() + + def reset_styles(self): + """重置所有样式到默认状态""" + self.text_edit.setStyleSheet("") + self.text_edit.setFont(self.default_font) + self.set_line_height(100) diff --git a/logger_utils_new.py b/logger_utils_new.py new file mode 100644 index 0000000..404b62a --- /dev/null +++ b/logger_utils_new.py @@ -0,0 +1,73 @@ +# logger_utils.py + +import queue +import datetime +from colorama import Fore, Style, init + +# 初始化 colorama(仅 Windows 需要) +init(autoreset=True) + +# 日志等级颜色映射(控制台 + UI) +# LOG_COLORS → 控制台日志颜色(命令行) +# LOG_STYLES → UI 日志样式(图形界面) +LOG_COLORS = { + 'default': Fore.CYAN, + 'info': Fore.CYAN, + 'warning': Fore.YELLOW, + 'error': Fore.RED, + 'trigger': Fore.GREEN, +} + +# 【新增】UI 样式映射 +LOG_STYLES = { + 'default': {'color': 'cyan'}, + 'info': {'color': 'cyan'}, + 'warning': {'color': 'yellow'}, + 'error': {'color': 'red'}, + 'trigger': {'color': '#28a745'}, +} + +class Logger: + def __init__(self): + self.log_queue = queue.Queue() + self.append_log_func = None # 主程序的日志回调函数 + + def set_append_log(self, func): + """设置主程序中的 append_log 函数""" + self.append_log_func = func + + def log(self, message, level='default'): + timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + full_message = f"[{timestamp}] {message}" + + # 控制台带颜色输出 + color = LOG_COLORS.get(level.lower(), Fore.WHITE) + print(f"{color}{full_message}{Style.RESET_ALL}") + + # 触发UI更新(如果已注册) + if self.append_log_func: + log_type = level.lower() + style = LOG_STYLES.get(log_type, LOG_STYLES['default']) + + # 调用主程序的 append_log 方法,只传递纯文本和日志类型 + self.append_log_func(full_message, log_type) + + def info(self, message): self.log(message, 'info') + def warning(self, message): self.log(message, 'warning') + def error(self, message): self.log(message, 'error') + def trigger(self, message): self.log(message, 'trigger') + def debug(self, message): self.log(message, 'debug') + + +# 全局日志实例 +global_logger = Logger() + +# 便捷方法,方便其他模块直接使用 +def setup_logger(append_log_func): + global_logger.set_append_log(append_log_func) + +def log_info(msg): global_logger.info(msg) +def log_warning(msg): global_logger.warning(msg) +def log_error(msg): global_logger.error(msg) +def log_trigger(msg): global_logger.trigger(msg) +def log_debug(msg): global_logger.debug(msg) diff --git a/order_executor.ahk b/order_executor.ahk new file mode 100644 index 0000000..a6edab8 --- /dev/null +++ b/order_executor.ahk @@ -0,0 +1,58 @@ +#NoEnv +SendMode Input +SetWorkingDir %A_ScriptDir% +DetectHiddenWindows, On +SetTitleMatchMode, 2 + +; 获取命令行参数 +param1 = %1% + +if (param1 = "click") { + x := %2% + y := %3% + MouseMove, x, y + Click +} else if (param1 = "image_click") { + ; 图像识别点击功能 + imagePath := %2% + winTitle := %3% + confidence := %4% + + ; 设置搜索区域 + if (winTitle != "") { + WinGet, hwnd, ID, %winTitle% + if (!hwnd) { + ExitApp, 1 ; 未找到窗口,返回错误 + } + WinGetPos, winX, winY, winW, winH, ahk_id %hwnd% + searchX := winX + searchY := winY + searchW := winW + searchH := winH + } else { + ; 全屏搜索 + searchX := 0 + searchY := 0 + searchW := A_ScreenWidth + searchH := A_ScreenHeight + } + + ; 使用ImageSearch进行图像识别 + ImageSearch, foundX, foundY, searchX, searchY, searchX+searchW, searchY+searchH, *%confidence% %imagePath% + if (ErrorLevel = 0) { + ; 找到图像,计算中心点并点击 + centerX := foundX + centerY := foundY + MouseMove, centerX, centerY + Click + ExitApp, 0 ; 成功,返回0 + } else { + ExitApp, 1 ; 未找到图像,返回错误 + } +} else if (param1 = "type") { + Send, %2% +} else if (param1 = "press") { + Send, {%2%} +} + +ExitApp \ No newline at end of file diff --git a/order_executor.py b/order_executor.py new file mode 100644 index 0000000..4ebd167 --- /dev/null +++ b/order_executor.py @@ -0,0 +1,290 @@ +import subprocess +import time +import pygetwindow as gw +import win32gui +import win32con +import cv2 +import os +import pyautogui +# from logger_utils import log_info, log_warning, log_error, log_trigger +from logger_utils_new import log_info, log_warning, log_error, log_trigger, log_debug, setup_logger, LOG_STYLES + + +# 移除 pyautogui 导入 + + +class OrderExecutor: + def __init__(self, target_window_titles=None): + if target_window_titles is None: + # 默认支持的行情软件列表 + self.target_window_titles = ["东方财富终端", "通达信金融终端", "同花顺"] + else: + self.target_window_titles = target_window_titles + + self.current_software = None + self.order_window_title = 'FlashTradeDlgDlg' + self.order_count_window_title = '提示' + self.ahk_script_path = "D:/gp_data/order_executor.ahk" + + def detect_trading_software(self): + """检测当前运行的行情软件""" + for title in self.target_window_titles: + window_list = gw.getWindowsWithTitle(title) + if window_list: + self.current_software = title + log_info(f"检测到行情软件: {title}") + return title + log_warning("未检测到支持的行情软件") + return None + + def run_ahk_script(self, command): + """执行AHK脚本命令""" + try: + # 尝试查找AHK安装路径 + ahk_path = None + # 常见安装路径 + possible_paths = [ + "C:\\Program Files\\AutoHotkey\\v2\\AutoHotkey.exe", # v2版本 + "C:\\Program Files\\AutoHotkey\\AutoHotkey.exe", + "AutoHotkey.exe" # 如果在PATH环境变量中 + ] + + for path in possible_paths: + if os.path.exists(path): + ahk_path = path + break + + if not ahk_path: + log_error("未找到AutoHotkey安装路径") + return False + + subprocess.Popen( + [ahk_path, '/restart', self.ahk_script_path, command], + creationflags=subprocess.CREATE_NO_WINDOW + ) + return True + except Exception as e: + log_error(f"执行AHK脚本失败: {e}") + return False + + def click_confirm_button(self, window_title, confirm_ratio=(0.8, 0.8), delay=0.3): + """ + 修改为使用AHK点击确认按钮 + """ + try: + time.sleep(delay) + window_list = gw.getWindowsWithTitle(window_title) + if not window_list: + log_warning(f"未找到行情软件 '{window_title}' 的窗口") + return False + + window = window_list[0] + left, top, width, height = window.left, window.top, window.width, window.height + x = left + int(width * confirm_ratio[0]) + y = top + int(height * confirm_ratio[1]) + + # 使用AHK执行点击 + command = f"click {x} {y}" + self.run_ahk_script(command) + log_trigger(f"AHK点击 '{window_title}' ,位置: ({x}, {y})") + return True + + except Exception as e: + log_error(f"点击确认按钮失败: {e}") + return False + + def click_button_by_image(self, image_path, window_title=None, + timeout=10, retry_interval=0.5, + confidence=0.8, move_duration=0.2): + """ + 修改为使用AHK进行图像识别点击 + """ + log_info(f"开始AHK图像识别: {image_path}") + + # 构建AHK命令 + command = f"image_click {image_path}" + if window_title: + command += f" {window_title}" + command += f" {confidence}" + + # 执行AHK命令 + success = self.run_ahk_script(command) + if success: + log_trigger(f"AHK成功识别并点击图像 '{image_path}'") + return True, (0, 0) # AHK不返回坐标,用(0,0)占位 + else: + log_warning(f"AHK未能识别图像 '{image_path}'") + return False, None + + def place_order_tongdaxin(self, pure_code, auto_push): + """通达信下单方法""" + try: + # 获取所有匹配标题的窗口 + window_list = gw.getWindowsWithTitle(self.current_software) + if not window_list: + log_warning(f"未找到标题为 '{self.current_software}' 的窗口") + return False + + # 筛选逻辑:优先选择可见且活动的窗口 + target_window = None + + def is_window_visible(hwnd): + return win32gui.IsWindowVisible(hwnd) and win32gui.IsWindowEnabled(hwnd) + + for window in window_list: + hwnd = window._hWnd + if is_window_visible(hwnd) and window.isActive: + target_window = window + break + # 如果没有活动窗口,选择第一个可见窗口 + if not target_window: + for window in window_list: + hwnd = window._hWnd + if is_window_visible(hwnd): + target_window = window + break + # 如果都没有,选择第一个匹配的窗口 + if not target_window: + target_window = window_list[0] + + # 使用筛选后的目标窗口进行操作 + hwnd = target_window._hWnd + log_info(f"找到窗口句柄: {hwnd}") + target_window.restore() + target_window.maximize() + target_window.activate() + time.sleep(0.2) + + # 通达信操作方式:先按F1进入买入界面 + self.run_ahk_script("press f1") + time.sleep(0.1) + + # 输入代码 + self.run_ahk_script(f"type {pure_code}") + self.run_ahk_script("press enter") + time.sleep(0.1) + + # 判断是否自动下单 + if auto_push == 1: + # 在通达信中直接按F2进行全仓买入 + self.run_ahk_script("press f2") + time.sleep(0.1) + self.run_ahk_script("press enter") + time.sleep(0.5) + + # 判断是否弹出仓位的框 + if self.is_window_exists(self.order_count_window_title, 0.5): + log_error(f"剩余金额不满足购买{pure_code}最低需求。") + + except Exception as e: + log_error(f"通达信下单失败: {str(e)}") + return False + + def place_order_eastmoney(self, pure_code, auto_push): + """东方财富下单方法""" + try: + # 获取所有匹配标题的窗口 + window_list = gw.getWindowsWithTitle(self.current_software) + if not window_list: + log_warning(f"未找到标题为 '{self.current_software}' 的窗口") + return False + + # 筛选逻辑:优先选择可见且活动的窗口 + target_window = None + + def is_window_visible(hwnd): + return win32gui.IsWindowVisible(hwnd) and win32gui.IsWindowEnabled(hwnd) + + for window in window_list: + hwnd = window._hWnd + if is_window_visible(hwnd) and window.isActive: + target_window = window + break + # 如果没有活动窗口,选择第一个可见窗口 + if not target_window: + for window in window_list: + hwnd = window._hWnd + if is_window_visible(hwnd): + target_window = window + break + # 如果都没有,选择第一个匹配的窗口 + if not target_window: + target_window = window_list[0] + + # 使用筛选后的目标窗口进行操作 + hwnd = target_window._hWnd + log_info(f"找到窗口句柄: {hwnd}") + target_window.restore() + target_window.maximize() + target_window.activate() + time.sleep(0.2) + + # 使用AHK点击中心位置 + self.click_confirm_button(self.current_software, (0.5, 0.5), 0.1) + + # 使用AHK输入代码 + self.run_ahk_script(f"type {pure_code}") + self.run_ahk_script("press enter") + time.sleep(0.1) + + # 判断是否自动下单 + if auto_push == 1: + self.run_ahk_script("type 21") + self.run_ahk_script("press enter") + time.sleep(0.1) + + # 点击全仓按钮 + success, pos = self.click_button_by_image( + image_path="../images/full_position.png", + window_title=self.order_window_title, + timeout=10, + retry_interval=0.3, + confidence=0.9 + ) + + if success: + log_info("已点击全仓按钮") + # 判断是否弹出仓位的框 + if self.is_window_exists(self.order_count_window_title, 0.5): + log_error(f"剩余金额不满足购买{pure_code}最低需求。") + else: + log_warning("未找到全仓按钮图像") + + except Exception as e: + log_error(f"东方财富下单失败: {str(e)}") + return False + + def place_order(self, pure_code, auto_push): + """ + 根据检测到的行情软件选择相应的下单方法 + """ + # 检测当前运行的行情软件 + if not self.current_software: + self.detect_trading_software() + + if not self.current_software: + log_error("未找到支持的行情软件窗口") + return False + + # 根据不同的行情软件调用不同的下单方法 + if "通达信" in self.current_software: + return self.place_order_tongdaxin(pure_code, auto_push) + elif "东方财富" in self.current_software: + return self.place_order_eastmoney(pure_code, auto_push) + else: + # 默认使用东方财富的方法 + log_warning(f"未针对 {self.current_software} 实现行情软件的特殊操作,使用默认方法") + return self.place_order_eastmoney(pure_code, auto_push) + + def is_window_exists(self, window_title, timeout=1.0): + """ + 检查窗口是否存在 + """ + start_time = time.time() + while time.time() - start_time < timeout: + window_list = gw.getWindowsWithTitle(window_title) + if window_list: + return True + time.sleep(0.1) + return False + diff --git a/quote_manager.py b/quote_manager.py new file mode 100644 index 0000000..33f61b9 --- /dev/null +++ b/quote_manager.py @@ -0,0 +1,248 @@ +""" +行情数据管理模块 - 支持多数据源(Tushare/AKShare) +""" +import pandas as pd +import tushare as ts +import akshare as ak +from typing import List, Dict, Optional # 添加 Optional 导入 +import logging +import time +from enum import Enum, auto +import threading + +# 添加 Tushare Token 设置 +ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d') + +# 通用股票排除规则 +STOCK_EXCLUSION_RULES = { + 'exclude_st': True, # 排除ST/*ST股票 + 'exclude_b_share': True, # 排除B股 + 'exclude_star_market': True, # 排除科创板(688开头) + 'exclude_gem': False, # 排除创业板(300开头) - 默认不排除 + 'exclude_bj': True, # 排除北交所股票 + 'custom_exclusions': [] # 自定义排除列表 +} + +class DataSource(Enum): + TUSHARE = "tushare" + AKSHARE = "akshare" + LOCAL = "local" + +class QuoteManager: + _instance = None + _lock = threading.Lock() + + def __new__(cls): + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._init_manager() + return cls._instance + + def _init_manager(self): + self._cache = {} + self._cache_ttl = 60 + self._data_source = DataSource.TUSHARE + self.max_retries = 3 # 默认最大重试次数 + self.retry_interval = 2 # 默认重试间隔(秒) + + def set_retry_policy(self, max_retries: int, retry_interval: float = 2): + """设置重试策略""" + self.max_retries = max_retries + self.retry_interval = retry_interval + + def set_data_source(self, source: DataSource): + """设置数据源""" + self._data_source = source + + def get_realtime_quotes(self, codes: List[str]) -> Dict[str, pd.DataFrame]: + """获取实时行情(带重试机制)""" + last_error = None + for attempt in range(self.max_retries): + try: + if self._data_source == DataSource.TUSHARE: + return self._get_tushare_quotes(codes) + elif self._data_source == DataSource.AKSHARE: + return self._get_akshare_quotes(codes) + else: + raise ValueError("不支持的数据源") + except Exception as e: + last_error = e + if attempt < self.max_retries - 1: # 不是最后一次尝试 + time.sleep(self.retry_interval) + continue + raise Exception(f"获取行情失败(尝试{self.max_retries}次): {str(last_error)}") + + def _get_tushare_quotes(self, codes: List[str], max_retries: int = 3, retry_interval: float = 2) -> Dict[str, pd.DataFrame]: + """使用 Tushare 获取实时行情(带重试机制)""" + for attempt in range(max_retries): + try: + df = ts.realtime_quote(ts_code=','.join(codes)) + if df is None or df.empty: + raise Exception("返回数据为空") + return {row['TS_CODE']: row for _, row in df.iterrows()} + except Exception as e: + if attempt < max_retries - 1: + time.sleep(retry_interval) + continue + raise Exception(f"Tushare 行情获取失败(尝试{max_retries}次): {str(e)}") + + def _get_akshare_quotes(self, codes: List[str]) -> Dict[str, pd.DataFrame]: + """使用 AKShare 获取实时行情""" + # 这里需要实现 AKShare 的获取逻辑 + raise NotImplementedError("AKShare 实现待完成") + + def _convert_akshare_format(self, row) -> Dict: + """将AKShare数据格式转换为统一格式""" + return { + 'TS_CODE': row['代码'], + 'PRICE': row['最新价'], + 'OPEN': row['今开'], + 'PRE_CLOSE': row['昨收'], + 'HIGH': row['最高'], + 'LOW': row['最低'], + 'VOLUME': row['成交量'] + } + + # ... existing code ... + def _get_tushare_all_stocks(self) -> List[str]: + """使用Tushare获取所有A股股票列表""" + try: + pro = ts.pro_api() # 获取Tushare专业版API接口 + # 获取所有股票列表 + stock_basic = pro.stock_basic(exchange='', list_status='L', + fields='ts_code,symbol,name,area,industry,list_date') + # 根据通用规则过滤股票 + filtered_stocks = self._filter_stocks(stock_basic['ts_code'].tolist()) + return filtered_stocks + except Exception as e: + logging.error(f"获取股票列表失败: {str(e)}") + return [] + + def _filter_stocks(self, stock_list: List[str]) -> List[str]: + """ + 根据通用规则过滤股票列表 + :param stock_list: 原始股票列表 + :return: 过滤后的股票列表 + """ + filtered_stocks = [] + + for stock in stock_list: + exclude = False + + # 检查是否在自定义排除列表中 + if stock in STOCK_EXCLUSION_RULES.get('custom_exclusions', []): + exclude = True + + # 检查是否排除ST/*ST股票 + if STOCK_EXCLUSION_RULES.get('exclude_st', True): + # 注意:这里需要获取股票名称来判断是否为ST股票 + # 在实际应用中,您可能需要通过其他方式获取股票名称 + pass # ST股票的判断需要额外的数据支持 + + # 检查是否排除B股 + if STOCK_EXCLUSION_RULES.get('exclude_b_share', True) and stock.endswith('.BJ'): + exclude = True + + # 检查是否排除科创板股票(688开头) + if STOCK_EXCLUSION_RULES.get('exclude_star_market', True) and stock.startswith('688'): + exclude = True + + # 检查是否排除创业板股票(300开头) + if STOCK_EXCLUSION_RULES.get('exclude_gem', False) and stock.startswith('300'): + exclude = True + + # 检查是否排除北交所股票 + if STOCK_EXCLUSION_RULES.get('exclude_bj', True) and stock.endswith('.BJ'): + exclude = True + + # 如果没有被排除,则添加到结果列表中 + if not exclude: + filtered_stocks.append(stock) + + return filtered_stocks + + def set_exclusion_rules(self, rules: Dict): + """ + 设置股票排除规则 + :param rules: 排除规则字典 + """ + global STOCK_EXCLUSION_RULES + STOCK_EXCLUSION_RULES.update(rules) + + def get_exclusion_rules(self) -> Dict: + """ + 获取当前的股票排除规则 + :return: 排除规则字典 + """ + global STOCK_EXCLUSION_RULES + return STOCK_EXCLUSION_RULES.copy() + + def get_quote(self, code: str) -> Optional[Dict]: + """获取单个股票行情(兼容旧接口)""" + try: + quotes = self.get_realtime_quotes([code]) + if not quotes or code not in quotes: + return None + row = quotes[code] + return { + 'price': row['PRICE'], + 'avg_price': (row['OPEN'] + row['PRE_CLOSE']) / 2, + 'volume': row['VOLUME'] + } + except Exception as e: + logging.error(f"获取股票{code}行情失败: {str(e)}") + return None + + def get_daily_data(self, codes: List[str], start_date: str = None, end_date: str = None, + max_retries: Optional[int] = None, retry_interval: Optional[float] = None) -> Dict[ + str, pd.DataFrame]: + """获取股票日线数据""" + # 如果没有传入参数,则使用实例的默认值 + max_retries = max_retries if max_retries is not None else self.max_retries + retry_interval = retry_interval if retry_interval is not None else self.retry_interval + + try: + if self._data_source == DataSource.TUSHARE: + return self._get_tushare_daily_data(codes, start_date, end_date, max_retries, retry_interval) + elif self._data_source == DataSource.AKSHARE: + return self._get_akshare_daily_data(codes, start_date, end_date, max_retries, retry_interval) + else: + raise ValueError("不支持的数据源") + except Exception as e: + logging.error(f"获取日线数据失败: {str(e)}") + return {} + + def _get_tushare_daily_data(self, codes: List[str], start_date: str = None, end_date: str = None, + max_retries: int = 3, retry_interval: float = 2) -> Dict[str, pd.DataFrame]: + """使用Tushare获取日线数据""" + daily_data = {} + pro = ts.pro_api() # 获取Tushare专业版API接口 + for code in codes: + for attempt in range(max_retries): + try: + # 如果没有指定日期范围,默认获取最近30天的数据 + if not end_date: + end_date = pd.Timestamp.now().strftime('%Y%m%d') + if not start_date: + start_date = (pd.Timestamp.now() - pd.Timedelta(days=30)).strftime('%Y%m%d') + + df = pro.daily(ts_code=code, start_date=start_date, end_date=end_date) + if df is not None and not df.empty: + df = df.sort_values('trade_date') + daily_data[code] = df + break # 成功获取数据,跳出重试循环 + except Exception as e: + if attempt < max_retries - 1: + time.sleep(retry_interval) + continue + logging.error(f"获取{code}日线数据失败: {str(e)}") + break + return daily_data + + def _get_akshare_daily_data(self, codes: List[str], start_date: str = None, end_date: str = None, + max_retries: int = 3, retry_interval: float = 2) -> Dict[str, pd.DataFrame]: + """使用AKShare获取日线数据""" + # AKShare实现待完成 + raise NotImplementedError("AKShare日线数据获取待实现") + diff --git a/stock_monitor_pyside.py b/stock_monitor_pyside.py new file mode 100644 index 0000000..0ed3d85 --- /dev/null +++ b/stock_monitor_pyside.py @@ -0,0 +1,981 @@ +import sys +import os +import time +import datetime +import chardet +import csv +import pandas as pd +import tushare as ts +import threading +import socket +from PySide6.QtCore import Qt, QModelIndex +from PySide6.QtCore import QTimer, QThread, Signal, QObject, QPropertyAnimation, QEasingCurve +from PySide6.QtWidgets import ( + QApplication, QMainWindow, QTableView, QPushButton, + QVBoxLayout, QHBoxLayout, QWidget, QLabel, QFileDialog, + QTextEdit, QCheckBox, QLineEdit, QMessageBox, QSplitter, + QMenuBar, QMenu, QHeaderView, QDialog +) +# 正确导入方式 +from PySide6.QtWidgets import QApplication, QTableView, QHeaderView, QStyledItemDelegate +from PySide6.QtGui import QAction, QFont, QTextCharFormat, QColor, QIcon, QStandardItemModel, QStandardItem, QPalette +from PySide6.QtGui import QTextCursor, QBrush +from logger_utils_new import log_info, log_warning, log_error, log_trigger, log_debug, setup_logger, LOG_STYLES +from StatusStyleDelegate import StatusStyleDelegate +from order_executor import OrderExecutor +from log_style_manager import LogStyleManager +from typing import List + + +# 设置 Tushare Token +# ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d') +pro = ts.pro_api() + +from quote_manager import QuoteManager, DataSource + +class PriceUpdateWorker(QThread): + data_ready = Signal(dict) + update_finished = Signal() + + def __init__(self, stock_codes, parent=None): + super().__init__(parent) + self.stock_codes = stock_codes + # 引用行情数据 + self.quote_manager = QuoteManager() + self.quote_manager.set_data_source(DataSource.TUSHARE) + # 添加超时时间(秒) + self.quote_manager.set_retry_policy(max_retries=3, retry_interval=2) + + def run(self): + try: + all_results = self.quote_manager.get_realtime_quotes(self.stock_codes) + self.data_ready.emit(all_results) + except Exception as e: + log_error(f"行情获取失败: {str(e)}") + finally: + self.update_finished.emit() + + +class FetchShareCapitalWorker(QThread): + data_ready = Signal(dict) + + def run(self): + try: + log_info("获取流通盘数据...") + retry_count = 3 # 设置最大重试次数 + for attempt in range(retry_count): + try: + dc_df = ts.realtime_list(src='dc') + if dc_df is None or dc_df.empty: + log_warning(f"第 {attempt + 1} 次获取流通盘失败,数据为空,准备重试...") + time.sleep(2) # 等待 2 秒后重试 + continue + + float_mv_dict = { + row['TS_CODE']: row['FLOAT_MV'] / 1e8 + for _, row in dc_df.iterrows() + } + + log_info(f"流通盘缓存已更新,共 {len(float_mv_dict)} 条记录") + self.data_ready.emit(float_mv_dict) + return # 成功退出循环 + + except Exception as e: + log_error(f"第 {attempt + 1} 次获取流通盘失败: {str(e)}") + time.sleep(2) + + log_error("多次尝试获取流通盘失败,请检查网络连接、Token 权限或 Tushare 接口状态。") + self.data_ready.emit({}) # 发送空数据以表示失败 + + except Exception as e: + log_error(f"获取流通盘数据时发生异常: {str(e)}") + self.data_ready.emit({}) + +from typing import List +# 自定义模型以支持按业务优先级排序 +class CustomSortModel(QStandardItemModel): + def sort(self, column, order=Qt.SortOrder.AscendingOrder): + if column != 7: + return super().sort(column, order) + + status_priority = {"已触发": 0, "错买": 1, "监控中": 2} + + def get_sort_key(item): + if item is None: + return 3 + role = item.data(Qt.ItemDataRole.UserRole) + if role is not None: + return role + text = item.text() + return status_priority.get(text, 3) + + # 保存当前所有行 + all_rows = [self.takeRow(0) for _ in range(self.rowCount())] + + # 按照状态列排序 + all_rows.sort(key=lambda row_items: get_sort_key(row_items[7]), + reverse=(order == Qt.SortOrder.DescendingOrder)) + + # 清空并重新添加 + self.removeRows(0, self.rowCount()) + for row_items in all_rows: + self.appendRow(row_items) + + self.layoutChanged.emit() + + +class MainWindow(QMainWindow): + log_signal = Signal(str, str) # message, level + OUT_PATH = r"D:\gp_data" + ping_updated_signal = Signal(float) # 新增:用于线程安全地传递延迟值 + def __init__(self): + super().__init__() + self.setWindowTitle("股票价格监控系统") + self.resize(1200, 800) + self.capital_threshold = 60 # 流通盘阈值60亿 + self.open_zf_threshold_up = 3.0 # 开盘涨幅阈值3% + self.open_zf_threshold_down = -2.0 # 开盘跌幅阈值2% + # 初始化 delegate + self.delegate = StatusStyleDelegate(self) + + # 初始化股票计数器标签 + self.total_stocks_label = QLabel("总数: 0") + self.triggered_stocks_label = QLabel("触发: 0") + self.total_stocks_label.setObjectName("total_stocks_label") + self.triggered_stocks_label.setObjectName("triggered_stocks_label") + self.trading_status_label = QLabel("当前状态:非交易时间") + self.trading_status_label.setObjectName("trading_status_label") + self.data_refresh_label = QLabel("数据状态:") + self.data_refresh_label.setObjectName("data_refresh_label") + + self.ping_updated_signal.connect(self.update_ping_ui) # 连接信号到 UI 更新函数 + + self.ping_label = QLabel("实时行情延迟: --ms") + self.ping_label.setObjectName("ping_label") + self.ping_light = self.create_status_light() + self.ping_light.setObjectName("ping_light") # 避免重复创建 + + self.price_update_worker = None # 用于保存当前线程实例 + self.fetch_worker = None # 用于保存流通盘数据获取线程实例 + self.current_all_results = {} # 缓存最新行情数据 + self.calculated_open_pct = set() # 记录已计算开盘涨幅的股票 + + # 初始化模型 + self.standard_model = CustomSortModel() + self.standard_model.setHorizontalHeaderLabels([ + "代码", "名称", "开盘涨幅", "流通盘", "当前价", "目标价", "获利%", "状态", "预警时间" + ]) + + # 表格视图 + self.table_view = QTableView() + self.table_view.setModel(self.standard_model) + self.table_view.setSortingEnabled(True) # 启用排序 + self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Fixed) + + # 设置每列宽度 + widths = [140, 120, 100, 100, 80, 80, 80, 80, 220] + for col_index, width in enumerate(widths): + self.table_view.horizontalHeader().resizeSection(col_index, width) + + # 设置委托(确保 self.status_delegate 已在 __init__ 中初始化) + # 修改前: + # self.status_delegate = StatusStyleDelegate() + # 修改后: + self.status_delegate = StatusStyleDelegate(parent=self, table_view=self.table_view) + for col in range(8): # 对前8列应用 + self.table_view.setItemDelegateForColumn(col, self.status_delegate) + + # 新增大流通盘颜色样式 + self.large_cap_delegate = StatusStyleDelegate(parent=self, table_view=self.table_view) + self.table_view.setItemDelegateForColumn(3, self.large_cap_delegate) # 第4列为流通盘列 + # 添加缺失的delegate初始化 + self.delegate = StatusStyleDelegate(self, table_view=self.table_view) + + # 在 init_ui() 之前或之后都可以 + self.load_stylesheet() + + # 加载UI布局 + self.init_ui() + + # 初始化日志样式管理器 + self.log_style_manager = LogStyleManager(self.log_box) + self.log_style_manager.set_global_font(font_family="微软雅黑", font_size=12) + self.log_style_manager.set_letter_spacing(spacing=1) # 设置字间距 + self.log_style_manager.set_line_height(line_height=120) # 设置行高为 1.2 倍 + + # 设置日志区域段落格式 + # self.setup_log_paragraph_format() + + # 其它初始化 + # font = self.log_box.font() + # font.setLetterSpacing(QFont.SpacingType.AbsoluteSpacing, 1) # 设置每个字符之间固定间距为 1 像素 + # self.log_box.setFont(font) + + # 初始化变量 + self.order_executor = OrderExecutor() # 初始化下单执行器 + self.data_rows = [] + self.stock_codes = [] + self.auto_push_var = False + setup_logger(self.append_log) + + # 初始化日志文件路径(关键修复点) + if not hasattr(self, 'log_file'): + self.log_file = os.path.join(self.OUT_PATH, f"monitor_{datetime.datetime.now().strftime('%Y%m%d')}.log") + + # 启动定时器 + self.timer = QTimer() + self.timer.timeout.connect(self.update_prices) + self.timer.start(5000) + + # 启动状态更新线程 + self.start_status_update() + self.start_ping_update() # 启动 Ping 更新 + # print(type(self.standard_model)) # 应该输出 + + def on_share_capital_data_ready(self, float_share_data): + # 在主线程中更新UI + self.float_share_cache = float_share_data + self.update_share_capital_ui(float_share_data) + + def on_fetch_finished(self): + # 重新启用控件 + self.btn_load.setEnabled(True) + self.btn_save.setEnabled(True) + self.checkbox_debug.setEnabled(True) + self.fetch_worker = None + + def load_stylesheet(self): + qss_file = os.path.join(self.OUT_PATH, "style.qss") + if os.path.exists(qss_file): + with open(qss_file, 'r', encoding='utf-8') as f: + self.setStyleSheet(f.read()) + log_info(f"样式文件已加载: {qss_file}") + else: + log_error(f"样式文件不存在: {qss_file}") + + def add_test_data(self): + """添加测试数据""" + statuses = ["监控中", "错买", "已触发"] + for i in range(10): + code = f"{i + 1:06d}.SZ" + name = f"测试股票{i + 1}" + status = statuses[i % 3] + + items = [ + QStandardItem(code), + QStandardItem(name), + QStandardItem("0.00%"), + QStandardItem("10.00亿"), + QStandardItem("15.00"), + QStandardItem("16.00"), + QStandardItem("6.67%"), + QStandardItem(status), + QStandardItem(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ] + + # 设置排序角色 + if status == "已触发": + items[7].setData(0, Qt.ItemDataRole.UserRole) + elif status == "错买": + items[7].setData(1, Qt.ItemDataRole.UserRole) + elif status == "监控中": + items[7].setData(2, Qt.ItemDataRole.UserRole) + + self.standard_model.appendRow(items) + + def start_price_update_thread(self): + if self.price_update_worker is not None and self.price_update_worker.isRunning(): + return # 避免重复启动线程 + + self.price_update_worker = PriceUpdateWorker(self.stock_codes, self) + self.price_update_worker.data_ready.connect(self.on_data_ready) + self.price_update_worker.update_finished.connect(self.on_update_finished) + self.price_update_worker.start() + + def init_ui(self): + # 主窗口控件与布局 + main_widget = QWidget() + layout = QVBoxLayout() + + # 状态栏 + status_layout = QHBoxLayout() + + self.status_light = self.create_status_light() + self.update_light = self.create_status_light() + + # self.ping_label = QLabel("实时行情延迟: --ms") + # self.ping_light = self.create_status_light() # 新增的小灯 + + status_layout.addWidget(self.trading_status_label) + status_layout.addWidget(self.status_light) + status_layout.addStretch() + + # 在 update_light 前添加文字标签 + status_layout.addWidget(self.data_refresh_label) + status_layout.addWidget(self.update_light) + + # 新增网络延迟标签和小灯 + status_layout.addSpacing(20) # 添加间距 + # 状态栏部分 + status_layout.addWidget(self.ping_label) + status_layout.addWidget(self.ping_light) + + layout.addLayout(status_layout) + + # 表格视图 + self.table_view = QTableView() + self.standard_model.setHorizontalHeaderLabels([ + "代码", "名称", "开盘涨幅", "流通盘", "当前价", "目标价", "获利%", "状态", "预警时间" + ]) + self.table_view.setModel(self.standard_model) + + # 设置表头行为(先设 Stretch,再设固定宽度) + self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.Fixed) + for col_index, width in enumerate([140, 120, 100, 100, 80, 80, 100, 80, 220]): + self.table_view.horizontalHeader().resizeSection(col_index, width) + + # 设置委托(确保 self.status_delegate 已在 __init__ 中初始化) + self.table_view.setItemDelegateForColumn(7, self.status_delegate) + self.table_view.setSortingEnabled(True) + + # 双击事件绑定 + self.table_view.doubleClicked.connect(self.place_order) + layout.addWidget(self.table_view) + + # 控制面板 + control_layout = QHBoxLayout() + button_font = QFont() + button_font.setPointSize(8) + + self.btn_load = QPushButton("选择文件") + self.btn_save = QPushButton("保存结果") + self.checkbox_auto = QCheckBox("自动下单") + self.checkbox_debug = QCheckBox("调试模式") + self.checkbox_topmost = QCheckBox("窗口置顶") + + # 设置对象名 + self.btn_load.setObjectName("btn_load") + self.btn_save.setObjectName("btn_save") + for btn in [self.btn_load, self.btn_save]: + btn.setObjectName(btn.text().replace(" ", "_")) + btn.setFont(button_font) + + for cb in [self.checkbox_auto, self.checkbox_debug, self.checkbox_topmost]: + cb.setFont(button_font) + cb.setObjectName(cb.text()) + + # 连接事件 + self.checkbox_topmost.stateChanged.connect(self.toggle_always_on_top) + self.btn_load.clicked.connect(self.select_file) + self.btn_save.clicked.connect(self.export_to_excel) + + control_layout.addWidget(self.btn_load) + control_layout.addWidget(self.btn_save) + control_layout.addWidget(self.checkbox_auto) + control_layout.addWidget(self.checkbox_topmost) + control_layout.addWidget(self.checkbox_debug) + + # 添加股票计数器标签到控制布局 + control_layout.addWidget(self.total_stocks_label) + control_layout.addWidget(self.triggered_stocks_label) + # self.total_stocks_label.setStyleSheet("color: green; font-size: 14pt;") + # self.triggered_stocks_label.setStyleSheet("color: blue; font-size: 14pt;") + + layout.addLayout(control_layout) + + # 日志区域 + self.log_box = QTextEdit() + self.log_box.setReadOnly(True) + self.log_box.setFixedHeight(250) # 设置固定高度为 250 像素 + layout.addWidget(self.log_box) + + # 设置主窗口布局 + main_widget.setLayout(layout) + self.setCentralWidget(main_widget) + + # 确保样式表加载 + # self.load_stylesheet() + # self.update_ping_ui(123.45) + + def start_ping_update(self): + def ping_loop(): + while True: + try: + # log_info("开始测速...") + delay = self.ping_tushare_server() + # log_info(f"获取到延迟: {delay:.2f}ms") + self.ping_updated_signal.emit(delay) # 使用信号通知主线程 + except Exception as e: + log_error(f"Ping 失败: {str(e)}") + time.sleep(60) # 每分钟更新一次 + + # log_info("启动后台 Ping 线程...") + threading.Thread(target=ping_loop, daemon=True).start() + + def ping_tushare_server(self): + """ + 使用 Tushare 实时行情接口测试 API 响应速度 + 返回三次请求的平均延迟(单位:毫秒) + """ + delays = [] + + for _ in range(3): # 进行3次尝试 + start = time.time() + try: + # 使用一个轻量级的 API 请求进行测试 + df = ts.realtime_quote(ts_code='000001.SZ') + # log_debug("API 调用成功,返回结果:") + # log_debug(str(df)) # 打印原始返回结果,便于调试 + if df is not None and not df.empty: + end = time.time() + delay = (end - start) * 1000 # 转换为毫秒 + delays.append(delay) + # log_info(f"单次延迟: {delay:.2f}ms") + else: + log_warning("API 返回空数据") + except Exception as e: + log_warning(f"API 请求失败: {str(e)}") + continue + + if delays: + avg_delay = round(sum(delays) / len(delays), 2) + # log_info(f"平均延迟: {avg_delay:.2f}ms") + return avg_delay # 返回平均延迟 + else: + log_error("无法成功调用 Tushare API,未获取到任何有效延迟数据") + raise Exception("无法成功调用 Tushare API") + + def update_ping_ui(self, delay): + if not hasattr(self, 'ping_label') or not hasattr(self, 'ping_light'): + log_warning("ping_label 或 ping_light 尚未初始化") + return + + try: + self.ping_label.setText(f"实时行情延迟: {delay:.2f}ms") + # log_debug(f"更新网络延迟标签为: {delay:.2f}ms") + self.flash_ping_light(delay) + except Exception as e: + log_error(f"更新延迟UI失败: {str(e)}") + self.ping_light.setStyleSheet("background-color: gray; border-radius: 10px;") + + def flash_ping_light(self, delay): + light = self.ping_light + if delay < 100: # 如果延迟小于100ms,显示绿色 + color = "#00FF00" + elif delay < 300: # 如果延迟在100ms到300ms之间,显示黄色 + color = "yellow" + else: # 如果延迟大于300ms,显示红色 + color = "red" + # log_debug(f"设置小灯颜色为: {color}") + light.setStyleSheet(f"background-color: {color}; border-radius: 10px;") + anim = QPropertyAnimation(light, b"geometry") + anim.setDuration(300) + anim.setKeyValueAt(0, light.geometry()) + anim.setKeyValueAt(0.5, light.geometry().adjusted(-3, -3, 3, 3)) + anim.setKeyValueAt(1, light.geometry()) + anim.setLoopCount(2) + anim.setEasingCurve(QEasingCurve.Type.OutBounce) + anim.start() + # QTimer.singleShot(600, lambda: light.setStyleSheet("background-color: gray; border-radius: 10px;")) # 0.6秒后恢复灰色 + + def create_status_light(self): + light = QLabel() + light.setFixedSize(20, 20) + light.setStyleSheet("background-color: gray; border-radius: 10px;") + return light + + def place_order(self, code, auto_flag): + """ + 下单接口,根据股票代码执行下单操作 + :param code: 股票代码 (str) + :param auto_flag: 是否自动下单 (int: 0/1) + """ + log_info(f"开始处理下单请求,股票代码: {code},自动模式: {auto_flag}") + try: + self.order_executor.place_order(code, auto_flag) + except Exception as e: + log_error(f"下单过程中发生异常: {e}") + + def update_stock_counters(self): + total = self.standard_model.rowCount() + triggered = 0 + for i in range(total): + status_item = self.standard_model.item(i, 7) # 假设状态是第8列(索引从0开始) + if status_item and status_item.text() == "已触发" or status_item.text() == "错买": + triggered += 1 + self.total_stocks_label.setText(f"总数: {triggered}/{total}") + self.triggered_stocks_label.setText(f"触发: {triggered}") + + def load_stylesheet(self): + qss_file = os.path.join(self.OUT_PATH, "style.qss") + log_info(f"加载样式文件路径: {qss_file}") + + if os.path.exists(qss_file): + with open(qss_file, 'r', encoding='utf-8') as f: + self.setStyleSheet(f.read()) + else: + log_error(f"样式文件不存在:{qss_file}") + + def dragEnterEvent(self, event): + if event.mimeData().hasUrls(): + url = event.mimeData().urls()[0] + if url.toString().lower().endswith('.txt'): + event.acceptProposedAction() + + def dropEvent(self, event): + urls = event.mimeData().urls() + if urls: + file_path = urls[0].toLocalFile() + today = datetime.datetime.now().strftime("%Y%m%d") + trade_date = os.path.basename(file_path).split('.')[0] + + if trade_date == today: + log_error("不能加载今日文件,请选择历史日期文件。") + else: + self.load_stocks(file_path, trade_date) + + def select_file(self): + file_path, _ = QFileDialog.getOpenFileName(self, "选择监控文件", self.OUT_PATH +"\history", "文本文件 (*.txt)") + if file_path: + trade_date = os.path.basename(file_path).split('.')[0] + today = datetime.datetime.now().strftime("%Y%m%d") + if trade_date == today: + log_error("不能加载今日文件,请选择历史日期文件。") + return + self.load_stocks(file_path, trade_date) + + def load_stocks(self, file_path, trade_date): + try: + with open(file_path, 'rb') as f: + raw_data = f.read(10000) + encoding = chardet.detect(raw_data)['encoding'] or 'utf-8' + + with open(file_path, 'r', encoding=encoding, errors='ignore') as f: + reader = csv.reader(f) + headers = next(reader) + required_cols = ['条件选股', '代码'] + for col in required_cols: + if col not in headers: + raise KeyError(f"缺少必要列:{col}") + + name_col = headers.index('条件选股') + code_col = headers.index('代码') + + codes = [] + self.standard_model.removeRows(0, self.standard_model.rowCount()) + + for line in reader: + if not line or len(line) <= max(name_col, code_col): + continue + name = line[name_col].strip() + code = line[code_col].strip().zfill(6) + formatted_code = self.format_code(code) + codes.append(formatted_code) + + row_items = [ + QStandardItem(formatted_code), + QStandardItem(name), + QStandardItem("-"), + QStandardItem("0"), + QStandardItem("-"), + QStandardItem("-"), + QStandardItem("-"), + QStandardItem("监控中"), + QStandardItem("") + ] + self.standard_model.appendRow(row_items) + + df = pro.daily(ts_code=','.join(codes), trade_date=trade_date) + close_prices = {row['ts_code']: row['close'] for _, row in df.iterrows()} + + for i in range(self.standard_model.rowCount()): + code_item = self.standard_model.item(i, 0) + code = code_item.text() + target_price = round(close_prices.get(code, 0) * 1.049, 2) + self.standard_model.setItem(i, 5, QStandardItem(str(target_price))) + + self.stock_codes = [self.format_code(code.zfill(6)) for code in codes] + # threading.Thread(target=self.fetch_share_capital_data, daemon=True).start() + + # 禁用相关控件以防止用户交互 + self.btn_load.setEnabled(False) + self.btn_save.setEnabled(False) + self.checkbox_debug.setEnabled(False) + + # 启动后台线程获取流通盘数据 + self.fetch_worker = FetchShareCapitalWorker() + self.fetch_worker.data_ready.connect(self.on_share_capital_data_ready) + self.fetch_worker.finished.connect(self.on_fetch_finished) + self.fetch_worker.start() + + # 更新股票计数器 + self.update_stock_counters() + + except Exception as e: + log_error(f"加载失败: {str(e)}") + # 重新启用控件 + # self.btn_load.setEnabled(True) + # self.btn_save.setEnabled(True) + # self.checkbox_debug.setEnabled(True) + + + def format_code(self, code): + # 如果已包含市场后缀(.SH 或 .SZ),则不再处理 + if '.' in code: + return code + code = code.zfill(6) + if code.startswith(("6", "9")): + return f"{code}.SH" + elif code.startswith(("0", "2", "3")): + return f"{code}.SZ" + else: + return code + + def fetch_share_capital_data(self): + log_info("获取流通盘数据...") + retry_count = 3 # 设置最大重试次数 + for attempt in range(retry_count): + try: + dc_df = ts.realtime_list(src='dc') + # print(dc_df) + if dc_df is None or dc_df.empty: + log_warning(f"第 {attempt + 1} 次获取流通盘失败,数据为空,准备重试...") + time.sleep(2) # 等待 2 秒后重试 + continue + + float_mv_dict = { + row['TS_CODE']: row['FLOAT_MV'] / 1e8 + for _, row in dc_df.iterrows() + } + + self.float_share_cache = float_mv_dict + log_info(f"流通盘缓存已更新,共 {len(float_mv_dict)} 条记录") + # self.update_share_capital_ui() + return float_mv_dict # 返回数据而不是直接更新UI + + except Exception as e: + log_error(f"第 {attempt + 1} 次获取流通盘失败: {str(e)}") + time.sleep(2) + + log_error("多次尝试获取流通盘失败,请检查网络连接、Token 权限或 Tushare 接口状态。") + + def update_share_capital_ui(self, float_share_data): + try: + for i in range(self.standard_model.rowCount()): + code = self.standard_model.item(i, 0).text() + lt_pan = self.float_share_cache.get(code, 0) + # 判断流通盘是否大于100亿 + if lt_pan < self.capital_threshold: + item = QStandardItem(f"{lt_pan:.2f}亿") + item.setBackground(QBrush(QColor("yellow"))) + item.setForeground(QColor("red")) + self.standard_model.setItem(i, 3, item) + else: + self.standard_model.setItem(i, 3, QStandardItem(f"{lt_pan:.2f}亿")) + except Exception as e: + log_error(f"更新流通盘数据失败: {str(e)}") + + def on_data_ready(self, all_results): + self.current_all_results = all_results + + def on_update_finished(self): + now = datetime.datetime.now() + is_open = self.is_trading_time() + + for i in range(self.standard_model.rowCount()): + code_item = self.standard_model.item(i, 0) + code = code_item.text() + quote = self.current_all_results.get(code) + + if quote is None or quote.empty: + continue + # 记录原始值用于比较 + original_price = self.standard_model.item(i, 4).text() + original_profit = self.standard_model.item(i, 6).text() + + current_price = float(quote['PRICE']) + target_price = float(self.standard_model.item(i, 5).text()) + # 计算获利百分比 + profit_pct = round((current_price - target_price) / target_price * 100, 2) # 新增这行 + + # 当前价更新时闪烁 + if original_price != f"{current_price:.2f}": + self.standard_model.setItem(i, 4, QStandardItem(f"{current_price:.2f}")) + self.status_delegate.flash_cell(i, 4) # 触发闪烁 + + # 获利%更新时闪烁 + if original_profit != f"{profit_pct:.2f}%": + self.standard_model.setItem(i, 6, QStandardItem(f"{profit_pct:.2f}%")) + self.status_delegate.flash_cell(i, 6) # 触发闪烁 + profit_item = QStandardItem(f"{profit_pct:.2f}%") + # 根据获利百分比设置字体颜色 + if profit_pct > 0: + profit_item.setForeground(QColor("red")) + elif profit_pct < 0: + profit_item.setForeground(QColor("green")) + self.standard_model.setItem(i, 6, profit_item) + self.status_delegate.flash_cell(i, 6) # 触发闪烁 + + + + # # 判断获利是否大于0 + # if profit_pct > 0: + # profit_item = self.standard_model.item(i, 6) + # profit_item.setForeground(QColor("red")) + # + # else: + # profit_item = self.standard_model.item(i, 6) + # profit_item.setForeground(QColor("green")) + + # 开盘涨幅逻辑:仅当开盘后第一次计算 开盘涨幅更新时闪烁 + if is_open and code not in self.calculated_open_pct: + try: + open_price = float(quote['OPEN']) + pre_close = float(quote['PRE_CLOSE']) + open_zf = round((open_price - pre_close) / pre_close * 100, 2) + # print(open_zf) + open_zf_item = QStandardItem(f"{open_zf:.2f}%") + # self.standard_model.setItem(i, 2, QStandardItem(f"{open_zf:.2f}%")) + # 如果开盘涨幅小于等于阈值,标记为黄色 + if self.open_zf_threshold_down < open_zf <= self.open_zf_threshold_up: + open_zf_item.setBackground(QBrush(QColor("yellow"))) + open_zf_item.setForeground(QColor("red")) + + self.standard_model.setItem(i, 2, open_zf_item) + self.status_delegate.flash_cell(i, 2) # 触发闪烁 + self.calculated_open_pct.add(code) + # print(open_zf) + # 新增判断:开盘涨幅大于 3%,标记为“错买” + # if open_zf > self.open_zf_threshold: + # profit_item = self.standard_model.item(i, 2) + # profit_item.setForeground(QColor("red")) + # profit_item.setBackground(QBrush(QColor("yellow"))) + + except Exception as e: + log_error(f"计算开盘涨幅失败: {str(e)}") + + # 状态判断和更新 + status_item = self.standard_model.item(i, 7) + current_status = status_item.text() if status_item else "监控中" + # 初始化new_status为当前状态 + new_status = current_status # 添加这行初始化new_status + # ... 状态判断逻辑 ... + if new_status != current_status: + # ... 设置新状态代码 ... + self.standard_model.setItem(i, 7, status_item) + self.status_delegate.flash_cell(i, 7) # 触发闪烁 + + high_price = float(quote['HIGH']) + target_price = float(self.standard_model.item(i, 5).text()) + + if current_price >= target_price: + new_status = "已触发" + elif high_price >= target_price: + new_status = "错买" + else: + new_status = "监控中" + + if new_status != current_status: + status_item = QStandardItem(new_status) + if new_status == "已触发": + status_item.setData(0, Qt.ItemDataRole.UserRole) + elif new_status == "错买": + status_item.setData(1, Qt.ItemDataRole.UserRole) + elif new_status == "监控中": + status_item.setData(2, Qt.ItemDataRole.UserRole) + self.standard_model.setItem(i, 7, status_item) + self.delegate.flash_cell(i, 7) # 添加状态列闪烁 + log_trigger(f"股票 {code} 状态更新: {current_status} -> {new_status}") + + time_item = self.standard_model.item(i, 8) + if new_status in ["已触发", "错买"] and (time_item is None or not time_item.text()): + time_item = QStandardItem(now.strftime("%Y-%m-%d %H:%M:%S")) + self.standard_model.setItem(i, 8, time_item) + log_trigger(f"{code} 状态已更新为 {new_status}") + + + # 排序触发 + self.table_view.sortByColumn(7, Qt.SortOrder.AscendingOrder) + + # 更新计数器 + self.update_stock_counters() + + # 闪烁灯效果 + self.flash_update_light() + + def update_prices(self): + if not self.is_trading_time() and not self.checkbox_debug.isChecked(): + return + + self.start_price_update_thread() + + def flash_update_light(self): + light = self.update_light + light.setStyleSheet("background-color: #00FF00; border-radius: 10px;") + anim = QPropertyAnimation(light, b"geometry") + anim.setDuration(300) + anim.setKeyValueAt(0, light.geometry()) + anim.setKeyValueAt(0.5, light.geometry().adjusted(-3, -3, 3, 3)) + anim.setKeyValueAt(1, light.geometry()) + anim.setLoopCount(2) + anim.setEasingCurve(QEasingCurve.Type.OutBounce) + anim.start() + QTimer.singleShot(600, lambda: light.setStyleSheet("background-color: gray; border-radius: 10px;")) + + def place_order(self, index=None): + if not index or not index.isValid(): + return + + code = self.standard_model.item(index.row(), 0).text() + pure_code = code.split('.')[0] + # log_info(f"选中股票代码: {pure_code}") + self.order_executor.place_order(pure_code, self.checkbox_auto.isChecked()) + + # 自动按状态排序 + self.table_view.sortByColumn(7, Qt.SortOrder.AscendingOrder) # 第7列为“状态”列 + + def toggle_always_on_top(self, state): + if state == 2: + self.setWindowFlags(self.windowFlags() | Qt.WindowType.WindowStaysOnTopHint) + log_info("启用窗口置顶") + else: + self.setWindowFlags(self.windowFlags() & ~Qt.WindowType.WindowStaysOnTopHint) + log_info("取消窗口置顶") + + self.show() # 必须调用 show() 才会生效 + + def export_to_excel(self): + try: + if not self.standard_model.rowCount(): + log_warning("没有可导出的数据!") + self.show_warning("没有数据可以导出。") + return + + today = datetime.datetime.now().strftime("%Y%m%d") + filename = os.path.join(self.OUT_PATH, f"{today}_监控结果.xlsx") + + # 准备数据 + data = [] + for i in range(self.standard_model.rowCount()): + row = [self.standard_model.item(i, j).text() for j in range(self.standard_model.columnCount())] + data.append(row) + + df = pd.DataFrame(data, columns=[ + "代码", "名称", "开盘涨幅", "流通盘", "当前价", "目标价", "获利%", "状态", "预警时间" + ]) + + # 写入 Excel + df.to_excel(filename, index=False) + log_info(f"数据已保存至: {filename}") + self.show_info("导出成功", f"数据已保存到: {filename}") + + except Exception as e: + log_error(f"导出失败: {str(e)}") + self.show_error(f"导出Excel数据时发生错误:{str(e)}") + + def is_trading_time(self): + now = datetime.datetime.now().time() + weekday = datetime.datetime.now().weekday() + return ( + weekday < 5 and( + (datetime.time(9, 30) <= now <= datetime.time(11, 30)) or + (datetime.time(13, 0) <= now <= datetime.time(15, 0)) + ) + ) + + def sort_table_by_status(self): + model = self.standard_model + rows = [] + + for i in range(model.rowCount()): + status_item = model.item(i, 7) + if not status_item: + continue + status = status_item.text() + row_data = [] + fonts = [] + bg_colors = [] + fg_colors = [] + + for j in range(model.columnCount()): + item = model.item(i, j) + if item: + row_data.append(item.text()) + fonts.append(item.font()) + bg_colors.append(item.background().color()) + fg_colors.append(item.foreground().color()) + else: + row_data.append("") + fonts.append(QFont()) + bg_colors.append(QColor('white')) + fg_colors.append(QColor('black')) + + rows.append((status, row_data, fonts, bg_colors, fg_colors)) + + # 按照状态排序:触发 > 错买 > 监控中 + sorted_rows = sorted(rows, key=lambda x: {'已触发': 0, '错买': 1, '监控中': 2}.get(x[0], 3)) + + model.removeRows(0, model.rowCount()) + + for _, row_data, fonts, bg_colors, fg_colors in sorted_rows: + items = [] + for j in range(len(row_data)): + item = QStandardItem(row_data[j]) + item.setFont(fonts[j]) + item.setBackground(QColor(bg_colors[j])) + item.setForeground(QColor(fg_colors[j])) + items.append(item) + model.appendRow(items) + + def start_status_update(self): + def check_status(): + while True: + is_open = self.is_trading_time() + status_text = "开盘中" if is_open else "收盘时间" + color = "lime" if is_open else "red" + self.trading_status_label.setText(f"当前状态:{status_text}") + self.status_light.setStyleSheet(f"background-color: {color}; border-radius: 10px;") + time.sleep(10) + + threading.Thread(target=check_status, daemon=True).start() + + + def append_log(self, full_message, log_type='default'): + """ + 接收全局日志并显示在 UI 中 + :param full_message: 带时间戳的完整日志信息 (str) + :param log_type: 日志类型 (str),如 info/warning/error/trigger 等 + """ + self.log_style_manager.insert_log(full_message, log_type) + + # 写入文件 + with open(self.log_file, "a", encoding="utf-8") as f: + f.write(full_message + "\n") + + def show_error(self, msg): + QMessageBox.critical(self, "错误", msg) + + def show_warning(self, msg): + QMessageBox.warning(self, "警告", msg) + + def show_info(self, title, msg): + QMessageBox.information(self, title, msg) + + def closeEvent(self, event): + reply = QMessageBox.question( + self, '退出', + "确定要退出吗?", + QMessageBox.Yes | QMessageBox.No, + QMessageBox.No + ) + if reply == QMessageBox.Yes: + event.accept() + else: + event.ignore() + + +if __name__ == "__main__": + app = QApplication(sys.argv) + window = MainWindow() + window.show() + sys.exit(app.exec()) \ No newline at end of file