提交新建内容

This commit is contained in:
2025-09-01 12:08:41 +08:00
commit 198ec2dc8f
14 changed files with 1911 additions and 0 deletions

3
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,3 @@
# 默认忽略的文件
/shelf/
/workspace.xml

6
.idea/MarsCodeWorkspaceAppSettings.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="com.codeverse.userSettings.MarscodeWorkspaceAppSettingsState">
<option name="progress" value="1.0" />
</component>
</project>

12
.idea/dx_sl.iml generated Normal file
View File

@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.10" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AhkProjectSettings">
<option name="defaultAhkSdk" value="AutoHotkey v2.0.19" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/dx_sl.iml" filepath="$PROJECT_DIR$/.idea/dx_sl.iml" />
</modules>
</component>
</project>

7
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

113
StatusStyleDelegate.py Normal file
View File

@@ -0,0 +1,113 @@
# StatusStyleDelegate.py
from PySide6.QtWidgets import QStyledItemDelegate
from PySide6.QtGui import QColor, QFont, QBrush
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QPalette
class StatusStyleDelegate(QStyledItemDelegate):
def __init__(self, parent=None, table_view=None):
super().__init__(parent)
self.flash_cycles = {} # 存储闪烁周期: {(row, col): (timer, cycle_count, original_brush)}
# self.flash_duration = 1000 # 闪烁持续时间(毫秒)
# self.updated_cells = set() # 存储需要高亮的单元格 (row, col)
self.table_view = table_view # 保存 QTableView 实例
def flash_cell(self, row, column):
# 如果单元格已经在闪烁,重置计时器
if (row, column) in self.flash_cycles:
timer, _, _ = self.flash_cycles[(row, column)]
timer.stop()
del self.flash_cycles[(row, column)]
# 获取原始背景色
index = self.table_view.model().index(row, column)
original_brush = self.get_original_background(index)
# 创建周期性计时器实现闪烁效果
cycle_count = 0
timer = QTimer()
timer.setInterval(200) # 200ms切换一次颜色
def update_flash():
nonlocal cycle_count
cycle_count += 1
# 闪烁3个周期(6次切换)后停止
if cycle_count > 6:
timer.stop()
if (row, column) in self.flash_cycles:
del self.flash_cycles[(row, column)]
self.table_view.viewport().update()
return
# 切换背景色 (黄色 <-> 原始色)
is_highlight = (cycle_count % 2 == 1)
self.flash_cycles[(row, column)] = (timer, cycle_count, original_brush)
self.table_view.viewport().update()
timer.timeout.connect(update_flash)
self.flash_cycles[(row, column)] = (timer, cycle_count, original_brush)
timer.start()
update_flash() # 立即开始第一个周期
def get_original_background(self, index):
# 获取单元格原始背景色
model = index.model()
status_index = model.index(index.row(), 7)
status = model.data(status_index, Qt.ItemDataRole.DisplayRole)
if status == "已触发":
return QBrush(QColor('lightcoral'))
elif status == "错买":
return QBrush(QColor('lightgreen'))
else:
return QBrush(QColor('white'))
def initStyleOption(self, option, index):
super().initStyleOption(option, index)
model = index.model()
status_index = model.index(index.row(), 7)
status = model.data(status_index, Qt.ItemDataRole.DisplayRole)
if status == "已触发":
option.font = QFont("Arial", 14, QFont.Weight.Bold)
option.palette.setColor(QPalette.ColorRole.Text, QColor('red'))
option.backgroundBrush = QBrush(QColor('lightcoral')) # 背景色
elif status == "错买":
option.font = QFont("Arial", 14, QFont.Weight.Bold)
option.palette.setColor(QPalette.ColorRole.Text, QColor('green')) # 文字颜色
option.backgroundBrush = QBrush(QColor('lightgreen')) # 背景色
elif status == "监控中":
option.font = QFont("Arial", 12, QFont.Weight.Normal)
option.palette.setColor(QPalette.ColorRole.Text, QColor('black'))
option.backgroundBrush = QBrush(QColor('white'))
else:
option.font = QFont("Arial", 12, QFont.Weight.Normal)
option.palette.setColor(QPalette.ColorRole.Text, QColor('black'))
option.backgroundBrush = QBrush(QColor('white'))
# 应用闪烁效果 (黄色 <-> 原始色交替)
row = index.row()
col = index.column()
if (row, col) in self.flash_cycles:
timer, cycle_count, original_brush = self.flash_cycles[(row, col)]
if cycle_count % 2 == 1:
# 使用高对比度黄色 (#FFFF99)
option.backgroundBrush = QBrush(QColor(255, 255, 153))
else:
option.backgroundBrush = original_brush
# 新增对获利列的处理
if index.column() == 6: # 检查是否为获利列
option.font = QFont(option.font().family(), option.font().pointSize(), QFont.Weight.Bold)
profit_text = model.data(index, Qt.ItemDataRole.DisplayRole)
try:
profit_value = float(profit_text.replace('%', '')) # 移除百分号再转换
if profit_value > 0:
option.palette.setColor(QPalette.ColorRole.Text, QColor('red'))
elif profit_value < 0:
option.palette.setColor(QPalette.ColorRole.Text, QColor('green'))
except ValueError:
pass # 如果转换失败,保持默认颜色

99
log_style_manager.py Normal file
View File

@@ -0,0 +1,99 @@
# log_style_manager.py
from PySide6.QtGui import QTextDocument, QTextBlockFormat, QTextCharFormat, QFont, QColor
from PySide6.QtCore import Qt
from PySide6.QtGui import QTextCursor
class LogStyleManager:
"""
日志样式管理器,统一控制 QTextEdit 的字体、颜色、行间距等
"""
def __init__(self, text_edit):
self.text_edit = text_edit
self.default_font = self.text_edit.font()
self.default_style = {
'default': {'color': 'white', 'bold': False},
'info': {'color': 'cyan', 'bold': False},
'warning': {'color': 'yellow', 'bold': False},
'error': {'color': 'red', 'bold': False},
'trigger': {'color': '#28a745', 'bold': False},
}
def set_log_type_mapping(self, mapping=None):
"""
设置日志类型与样式的映射关系
:param mapping: dict键为日志类型'info'),值为对应的样式字典
样式字典支持:
- color: 颜色字符串
- bold: 是否加粗
- background: 背景颜色(可选)
- italic: 是否斜体
"""
default_mapping = {
'default': {'color': 'white', 'bold': False},
'info': {'color': 'cyan', 'bold': False},
'warning': {'color': 'yellow', 'bold': False},
'error': {'color': 'red', 'bold': True}, # 错误信息加粗
'trigger': {'color': '#28a745', 'bold': False},
'debug': {'color': 'gray', 'italic': True} # 新增 debug 类型
}
# 如果传入了新的映射规则,则更新默认样式
if mapping is not None:
self.default_style = mapping
else:
self.default_style = default_mapping
def set_global_font(self, font_family="微软雅黑", font_size=13):
"""设置全局字体"""
self.default_font.setFamily(font_family)
self.default_font.setPointSize(font_size)
self.text_edit.setFont(self.default_font)
def set_letter_spacing(self, spacing=5):
"""设置字间距"""
self.default_font.setLetterSpacing(QFont.SpacingType.AbsoluteSpacing, spacing)
self.text_edit.setFont(self.default_font)
def set_line_height(self, line_height=150):
"""设置段落行高(百分比)"""
doc = self.text_edit.document()
block_format = QTextBlockFormat()
block_format.setLineHeight(float(line_height), 0)
block_format.setTopMargin(0)
block_format.setBottomMargin(0)
cursor = QTextCursor(doc)
cursor.select(QTextCursor.SelectionType.Document) # 修改此处
cursor.mergeBlockFormat(block_format)
cursor.clearSelection()
def apply_log_style(self, log_type='default'):
"""返回适用于当前日志类型的 QTextCharFormat"""
style = self.default_style.get(log_type.lower(), self.default_style['default'])
fmt = QTextCharFormat()
fmt.setForeground(QColor(style['color']))
if style.get('bold', False):
fmt.setFontWeight(QFont.Weight.Bold)
return fmt
def insert_log(self, message, log_type='default'):
"""插入带样式的日志信息"""
fmt = self.apply_log_style(log_type)
cursor = self.text_edit.textCursor()
cursor.movePosition(QTextCursor.End)
cursor.mergeCharFormat(fmt)
cursor.insertText(message + "\n")
self.text_edit.setTextCursor(cursor)
self.text_edit.ensureCursorVisible()
def reset_styles(self):
"""重置所有样式到默认状态"""
self.text_edit.setStyleSheet("")
self.text_edit.setFont(self.default_font)
self.set_line_height(100)

73
logger_utils_new.py Normal file
View File

@@ -0,0 +1,73 @@
# logger_utils.py
import queue
import datetime
from colorama import Fore, Style, init
# 初始化 colorama仅 Windows 需要)
init(autoreset=True)
# 日志等级颜色映射(控制台 + UI
# LOG_COLORS → 控制台日志颜色(命令行)
# LOG_STYLES → UI 日志样式(图形界面)
LOG_COLORS = {
'default': Fore.CYAN,
'info': Fore.CYAN,
'warning': Fore.YELLOW,
'error': Fore.RED,
'trigger': Fore.GREEN,
}
# 【新增】UI 样式映射
LOG_STYLES = {
'default': {'color': 'cyan'},
'info': {'color': 'cyan'},
'warning': {'color': 'yellow'},
'error': {'color': 'red'},
'trigger': {'color': '#28a745'},
}
class Logger:
def __init__(self):
self.log_queue = queue.Queue()
self.append_log_func = None # 主程序的日志回调函数
def set_append_log(self, func):
"""设置主程序中的 append_log 函数"""
self.append_log_func = func
def log(self, message, level='default'):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
full_message = f"[{timestamp}] {message}"
# 控制台带颜色输出
color = LOG_COLORS.get(level.lower(), Fore.WHITE)
print(f"{color}{full_message}{Style.RESET_ALL}")
# 触发UI更新如果已注册
if self.append_log_func:
log_type = level.lower()
style = LOG_STYLES.get(log_type, LOG_STYLES['default'])
# 调用主程序的 append_log 方法,只传递纯文本和日志类型
self.append_log_func(full_message, log_type)
def info(self, message): self.log(message, 'info')
def warning(self, message): self.log(message, 'warning')
def error(self, message): self.log(message, 'error')
def trigger(self, message): self.log(message, 'trigger')
def debug(self, message): self.log(message, 'debug')
# 全局日志实例
global_logger = Logger()
# 便捷方法,方便其他模块直接使用
def setup_logger(append_log_func):
global_logger.set_append_log(append_log_func)
def log_info(msg): global_logger.info(msg)
def log_warning(msg): global_logger.warning(msg)
def log_error(msg): global_logger.error(msg)
def log_trigger(msg): global_logger.trigger(msg)
def log_debug(msg): global_logger.debug(msg)

58
order_executor.ahk Normal file
View File

@@ -0,0 +1,58 @@
#NoEnv
SendMode Input
SetWorkingDir %A_ScriptDir%
DetectHiddenWindows, On
SetTitleMatchMode, 2
; 获取命令行参数
param1 = %1%
if (param1 = "click") {
x := %2%
y := %3%
MouseMove, x, y
Click
} else if (param1 = "image_click") {
; 图像识别点击功能
imagePath := %2%
winTitle := %3%
confidence := %4%
; 设置搜索区域
if (winTitle != "") {
WinGet, hwnd, ID, %winTitle%
if (!hwnd) {
ExitApp, 1 ; 未找到窗口,返回错误
}
WinGetPos, winX, winY, winW, winH, ahk_id %hwnd%
searchX := winX
searchY := winY
searchW := winW
searchH := winH
} else {
; 全屏搜索
searchX := 0
searchY := 0
searchW := A_ScreenWidth
searchH := A_ScreenHeight
}
; 使用ImageSearch进行图像识别
ImageSearch, foundX, foundY, searchX, searchY, searchX+searchW, searchY+searchH, *%confidence% %imagePath%
if (ErrorLevel = 0) {
; 找到图像,计算中心点并点击
centerX := foundX
centerY := foundY
MouseMove, centerX, centerY
Click
ExitApp, 0 ; 成功返回0
} else {
ExitApp, 1 ; 未找到图像,返回错误
}
} else if (param1 = "type") {
Send, %2%
} else if (param1 = "press") {
Send, {%2%}
}
ExitApp

290
order_executor.py Normal file
View File

@@ -0,0 +1,290 @@
import subprocess
import time
import pygetwindow as gw
import win32gui
import win32con
import cv2
import os
import pyautogui
# from logger_utils import log_info, log_warning, log_error, log_trigger
from logger_utils_new import log_info, log_warning, log_error, log_trigger, log_debug, setup_logger, LOG_STYLES
# 移除 pyautogui 导入
class OrderExecutor:
def __init__(self, target_window_titles=None):
if target_window_titles is None:
# 默认支持的行情软件列表
self.target_window_titles = ["东方财富终端", "通达信金融终端", "同花顺"]
else:
self.target_window_titles = target_window_titles
self.current_software = None
self.order_window_title = 'FlashTradeDlgDlg'
self.order_count_window_title = '提示'
self.ahk_script_path = "D:/gp_data/order_executor.ahk"
def detect_trading_software(self):
"""检测当前运行的行情软件"""
for title in self.target_window_titles:
window_list = gw.getWindowsWithTitle(title)
if window_list:
self.current_software = title
log_info(f"检测到行情软件: {title}")
return title
log_warning("未检测到支持的行情软件")
return None
def run_ahk_script(self, command):
"""执行AHK脚本命令"""
try:
# 尝试查找AHK安装路径
ahk_path = None
# 常见安装路径
possible_paths = [
"C:\\Program Files\\AutoHotkey\\v2\\AutoHotkey.exe", # v2版本
"C:\\Program Files\\AutoHotkey\\AutoHotkey.exe",
"AutoHotkey.exe" # 如果在PATH环境变量中
]
for path in possible_paths:
if os.path.exists(path):
ahk_path = path
break
if not ahk_path:
log_error("未找到AutoHotkey安装路径")
return False
subprocess.Popen(
[ahk_path, '/restart', self.ahk_script_path, command],
creationflags=subprocess.CREATE_NO_WINDOW
)
return True
except Exception as e:
log_error(f"执行AHK脚本失败: {e}")
return False
def click_confirm_button(self, window_title, confirm_ratio=(0.8, 0.8), delay=0.3):
"""
修改为使用AHK点击确认按钮
"""
try:
time.sleep(delay)
window_list = gw.getWindowsWithTitle(window_title)
if not window_list:
log_warning(f"未找到行情软件 '{window_title}' 的窗口")
return False
window = window_list[0]
left, top, width, height = window.left, window.top, window.width, window.height
x = left + int(width * confirm_ratio[0])
y = top + int(height * confirm_ratio[1])
# 使用AHK执行点击
command = f"click {x} {y}"
self.run_ahk_script(command)
log_trigger(f"AHK点击 '{window_title}' ,位置: ({x}, {y})")
return True
except Exception as e:
log_error(f"点击确认按钮失败: {e}")
return False
def click_button_by_image(self, image_path, window_title=None,
timeout=10, retry_interval=0.5,
confidence=0.8, move_duration=0.2):
"""
修改为使用AHK进行图像识别点击
"""
log_info(f"开始AHK图像识别: {image_path}")
# 构建AHK命令
command = f"image_click {image_path}"
if window_title:
command += f" {window_title}"
command += f" {confidence}"
# 执行AHK命令
success = self.run_ahk_script(command)
if success:
log_trigger(f"AHK成功识别并点击图像 '{image_path}'")
return True, (0, 0) # AHK不返回坐标用(0,0)占位
else:
log_warning(f"AHK未能识别图像 '{image_path}'")
return False, None
def place_order_tongdaxin(self, pure_code, auto_push):
"""通达信下单方法"""
try:
# 获取所有匹配标题的窗口
window_list = gw.getWindowsWithTitle(self.current_software)
if not window_list:
log_warning(f"未找到标题为 '{self.current_software}' 的窗口")
return False
# 筛选逻辑:优先选择可见且活动的窗口
target_window = None
def is_window_visible(hwnd):
return win32gui.IsWindowVisible(hwnd) and win32gui.IsWindowEnabled(hwnd)
for window in window_list:
hwnd = window._hWnd
if is_window_visible(hwnd) and window.isActive:
target_window = window
break
# 如果没有活动窗口,选择第一个可见窗口
if not target_window:
for window in window_list:
hwnd = window._hWnd
if is_window_visible(hwnd):
target_window = window
break
# 如果都没有,选择第一个匹配的窗口
if not target_window:
target_window = window_list[0]
# 使用筛选后的目标窗口进行操作
hwnd = target_window._hWnd
log_info(f"找到窗口句柄: {hwnd}")
target_window.restore()
target_window.maximize()
target_window.activate()
time.sleep(0.2)
# 通达信操作方式先按F1进入买入界面
self.run_ahk_script("press f1")
time.sleep(0.1)
# 输入代码
self.run_ahk_script(f"type {pure_code}")
self.run_ahk_script("press enter")
time.sleep(0.1)
# 判断是否自动下单
if auto_push == 1:
# 在通达信中直接按F2进行全仓买入
self.run_ahk_script("press f2")
time.sleep(0.1)
self.run_ahk_script("press enter")
time.sleep(0.5)
# 判断是否弹出仓位的框
if self.is_window_exists(self.order_count_window_title, 0.5):
log_error(f"剩余金额不满足购买{pure_code}最低需求。")
except Exception as e:
log_error(f"通达信下单失败: {str(e)}")
return False
def place_order_eastmoney(self, pure_code, auto_push):
"""东方财富下单方法"""
try:
# 获取所有匹配标题的窗口
window_list = gw.getWindowsWithTitle(self.current_software)
if not window_list:
log_warning(f"未找到标题为 '{self.current_software}' 的窗口")
return False
# 筛选逻辑:优先选择可见且活动的窗口
target_window = None
def is_window_visible(hwnd):
return win32gui.IsWindowVisible(hwnd) and win32gui.IsWindowEnabled(hwnd)
for window in window_list:
hwnd = window._hWnd
if is_window_visible(hwnd) and window.isActive:
target_window = window
break
# 如果没有活动窗口,选择第一个可见窗口
if not target_window:
for window in window_list:
hwnd = window._hWnd
if is_window_visible(hwnd):
target_window = window
break
# 如果都没有,选择第一个匹配的窗口
if not target_window:
target_window = window_list[0]
# 使用筛选后的目标窗口进行操作
hwnd = target_window._hWnd
log_info(f"找到窗口句柄: {hwnd}")
target_window.restore()
target_window.maximize()
target_window.activate()
time.sleep(0.2)
# 使用AHK点击中心位置
self.click_confirm_button(self.current_software, (0.5, 0.5), 0.1)
# 使用AHK输入代码
self.run_ahk_script(f"type {pure_code}")
self.run_ahk_script("press enter")
time.sleep(0.1)
# 判断是否自动下单
if auto_push == 1:
self.run_ahk_script("type 21")
self.run_ahk_script("press enter")
time.sleep(0.1)
# 点击全仓按钮
success, pos = self.click_button_by_image(
image_path="../images/full_position.png",
window_title=self.order_window_title,
timeout=10,
retry_interval=0.3,
confidence=0.9
)
if success:
log_info("已点击全仓按钮")
# 判断是否弹出仓位的框
if self.is_window_exists(self.order_count_window_title, 0.5):
log_error(f"剩余金额不满足购买{pure_code}最低需求。")
else:
log_warning("未找到全仓按钮图像")
except Exception as e:
log_error(f"东方财富下单失败: {str(e)}")
return False
def place_order(self, pure_code, auto_push):
"""
根据检测到的行情软件选择相应的下单方法
"""
# 检测当前运行的行情软件
if not self.current_software:
self.detect_trading_software()
if not self.current_software:
log_error("未找到支持的行情软件窗口")
return False
# 根据不同的行情软件调用不同的下单方法
if "通达信" in self.current_software:
return self.place_order_tongdaxin(pure_code, auto_push)
elif "东方财富" in self.current_software:
return self.place_order_eastmoney(pure_code, auto_push)
else:
# 默认使用东方财富的方法
log_warning(f"未针对 {self.current_software} 实现行情软件的特殊操作,使用默认方法")
return self.place_order_eastmoney(pure_code, auto_push)
def is_window_exists(self, window_title, timeout=1.0):
"""
检查窗口是否存在
"""
start_time = time.time()
while time.time() - start_time < timeout:
window_list = gw.getWindowsWithTitle(window_title)
if window_list:
return True
time.sleep(0.1)
return False

248
quote_manager.py Normal file
View File

@@ -0,0 +1,248 @@
"""
行情数据管理模块 - 支持多数据源(Tushare/AKShare)
"""
import pandas as pd
import tushare as ts
import akshare as ak
from typing import List, Dict, Optional # 添加 Optional 导入
import logging
import time
from enum import Enum, auto
import threading
# 添加 Tushare Token 设置
ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d')
# 通用股票排除规则
STOCK_EXCLUSION_RULES = {
'exclude_st': True, # 排除ST/*ST股票
'exclude_b_share': True, # 排除B股
'exclude_star_market': True, # 排除科创板(688开头)
'exclude_gem': False, # 排除创业板(300开头) - 默认不排除
'exclude_bj': True, # 排除北交所股票
'custom_exclusions': [] # 自定义排除列表
}
class DataSource(Enum):
TUSHARE = "tushare"
AKSHARE = "akshare"
LOCAL = "local"
class QuoteManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init_manager()
return cls._instance
def _init_manager(self):
self._cache = {}
self._cache_ttl = 60
self._data_source = DataSource.TUSHARE
self.max_retries = 3 # 默认最大重试次数
self.retry_interval = 2 # 默认重试间隔(秒)
def set_retry_policy(self, max_retries: int, retry_interval: float = 2):
"""设置重试策略"""
self.max_retries = max_retries
self.retry_interval = retry_interval
def set_data_source(self, source: DataSource):
"""设置数据源"""
self._data_source = source
def get_realtime_quotes(self, codes: List[str]) -> Dict[str, pd.DataFrame]:
"""获取实时行情(带重试机制)"""
last_error = None
for attempt in range(self.max_retries):
try:
if self._data_source == DataSource.TUSHARE:
return self._get_tushare_quotes(codes)
elif self._data_source == DataSource.AKSHARE:
return self._get_akshare_quotes(codes)
else:
raise ValueError("不支持的数据源")
except Exception as e:
last_error = e
if attempt < self.max_retries - 1: # 不是最后一次尝试
time.sleep(self.retry_interval)
continue
raise Exception(f"获取行情失败(尝试{self.max_retries}次): {str(last_error)}")
def _get_tushare_quotes(self, codes: List[str], max_retries: int = 3, retry_interval: float = 2) -> Dict[str, pd.DataFrame]:
"""使用 Tushare 获取实时行情(带重试机制)"""
for attempt in range(max_retries):
try:
df = ts.realtime_quote(ts_code=','.join(codes))
if df is None or df.empty:
raise Exception("返回数据为空")
return {row['TS_CODE']: row for _, row in df.iterrows()}
except Exception as e:
if attempt < max_retries - 1:
time.sleep(retry_interval)
continue
raise Exception(f"Tushare 行情获取失败(尝试{max_retries}次): {str(e)}")
def _get_akshare_quotes(self, codes: List[str]) -> Dict[str, pd.DataFrame]:
"""使用 AKShare 获取实时行情"""
# 这里需要实现 AKShare 的获取逻辑
raise NotImplementedError("AKShare 实现待完成")
def _convert_akshare_format(self, row) -> Dict:
"""将AKShare数据格式转换为统一格式"""
return {
'TS_CODE': row['代码'],
'PRICE': row['最新价'],
'OPEN': row['今开'],
'PRE_CLOSE': row['昨收'],
'HIGH': row['最高'],
'LOW': row['最低'],
'VOLUME': row['成交量']
}
# ... existing code ...
def _get_tushare_all_stocks(self) -> List[str]:
"""使用Tushare获取所有A股股票列表"""
try:
pro = ts.pro_api() # 获取Tushare专业版API接口
# 获取所有股票列表
stock_basic = pro.stock_basic(exchange='', list_status='L',
fields='ts_code,symbol,name,area,industry,list_date')
# 根据通用规则过滤股票
filtered_stocks = self._filter_stocks(stock_basic['ts_code'].tolist())
return filtered_stocks
except Exception as e:
logging.error(f"获取股票列表失败: {str(e)}")
return []
def _filter_stocks(self, stock_list: List[str]) -> List[str]:
"""
根据通用规则过滤股票列表
:param stock_list: 原始股票列表
:return: 过滤后的股票列表
"""
filtered_stocks = []
for stock in stock_list:
exclude = False
# 检查是否在自定义排除列表中
if stock in STOCK_EXCLUSION_RULES.get('custom_exclusions', []):
exclude = True
# 检查是否排除ST/*ST股票
if STOCK_EXCLUSION_RULES.get('exclude_st', True):
# 注意这里需要获取股票名称来判断是否为ST股票
# 在实际应用中,您可能需要通过其他方式获取股票名称
pass # ST股票的判断需要额外的数据支持
# 检查是否排除B股
if STOCK_EXCLUSION_RULES.get('exclude_b_share', True) and stock.endswith('.BJ'):
exclude = True
# 检查是否排除科创板股票(688开头)
if STOCK_EXCLUSION_RULES.get('exclude_star_market', True) and stock.startswith('688'):
exclude = True
# 检查是否排除创业板股票(300开头)
if STOCK_EXCLUSION_RULES.get('exclude_gem', False) and stock.startswith('300'):
exclude = True
# 检查是否排除北交所股票
if STOCK_EXCLUSION_RULES.get('exclude_bj', True) and stock.endswith('.BJ'):
exclude = True
# 如果没有被排除,则添加到结果列表中
if not exclude:
filtered_stocks.append(stock)
return filtered_stocks
def set_exclusion_rules(self, rules: Dict):
"""
设置股票排除规则
:param rules: 排除规则字典
"""
global STOCK_EXCLUSION_RULES
STOCK_EXCLUSION_RULES.update(rules)
def get_exclusion_rules(self) -> Dict:
"""
获取当前的股票排除规则
:return: 排除规则字典
"""
global STOCK_EXCLUSION_RULES
return STOCK_EXCLUSION_RULES.copy()
def get_quote(self, code: str) -> Optional[Dict]:
"""获取单个股票行情(兼容旧接口)"""
try:
quotes = self.get_realtime_quotes([code])
if not quotes or code not in quotes:
return None
row = quotes[code]
return {
'price': row['PRICE'],
'avg_price': (row['OPEN'] + row['PRE_CLOSE']) / 2,
'volume': row['VOLUME']
}
except Exception as e:
logging.error(f"获取股票{code}行情失败: {str(e)}")
return None
def get_daily_data(self, codes: List[str], start_date: str = None, end_date: str = None,
max_retries: Optional[int] = None, retry_interval: Optional[float] = None) -> Dict[
str, pd.DataFrame]:
"""获取股票日线数据"""
# 如果没有传入参数,则使用实例的默认值
max_retries = max_retries if max_retries is not None else self.max_retries
retry_interval = retry_interval if retry_interval is not None else self.retry_interval
try:
if self._data_source == DataSource.TUSHARE:
return self._get_tushare_daily_data(codes, start_date, end_date, max_retries, retry_interval)
elif self._data_source == DataSource.AKSHARE:
return self._get_akshare_daily_data(codes, start_date, end_date, max_retries, retry_interval)
else:
raise ValueError("不支持的数据源")
except Exception as e:
logging.error(f"获取日线数据失败: {str(e)}")
return {}
def _get_tushare_daily_data(self, codes: List[str], start_date: str = None, end_date: str = None,
max_retries: int = 3, retry_interval: float = 2) -> Dict[str, pd.DataFrame]:
"""使用Tushare获取日线数据"""
daily_data = {}
pro = ts.pro_api() # 获取Tushare专业版API接口
for code in codes:
for attempt in range(max_retries):
try:
# 如果没有指定日期范围默认获取最近30天的数据
if not end_date:
end_date = pd.Timestamp.now().strftime('%Y%m%d')
if not start_date:
start_date = (pd.Timestamp.now() - pd.Timedelta(days=30)).strftime('%Y%m%d')
df = pro.daily(ts_code=code, start_date=start_date, end_date=end_date)
if df is not None and not df.empty:
df = df.sort_values('trade_date')
daily_data[code] = df
break # 成功获取数据,跳出重试循环
except Exception as e:
if attempt < max_retries - 1:
time.sleep(retry_interval)
continue
logging.error(f"获取{code}日线数据失败: {str(e)}")
break
return daily_data
def _get_akshare_daily_data(self, codes: List[str], start_date: str = None, end_date: str = None,
max_retries: int = 3, retry_interval: float = 2) -> Dict[str, pd.DataFrame]:
"""使用AKShare获取日线数据"""
# AKShare实现待完成
raise NotImplementedError("AKShare日线数据获取待实现")

981
stock_monitor_pyside.py Normal file
View File

@@ -0,0 +1,981 @@
import sys
import os
import time
import datetime
import chardet
import csv
import pandas as pd
import tushare as ts
import threading
import socket
from PySide6.QtCore import Qt, QModelIndex
from PySide6.QtCore import QTimer, QThread, Signal, QObject, QPropertyAnimation, QEasingCurve
from PySide6.QtWidgets import (
QApplication, QMainWindow, QTableView, QPushButton,
QVBoxLayout, QHBoxLayout, QWidget, QLabel, QFileDialog,
QTextEdit, QCheckBox, QLineEdit, QMessageBox, QSplitter,
QMenuBar, QMenu, QHeaderView, QDialog
)
# 正确导入方式
from PySide6.QtWidgets import QApplication, QTableView, QHeaderView, QStyledItemDelegate
from PySide6.QtGui import QAction, QFont, QTextCharFormat, QColor, QIcon, QStandardItemModel, QStandardItem, QPalette
from PySide6.QtGui import QTextCursor, QBrush
from logger_utils_new import log_info, log_warning, log_error, log_trigger, log_debug, setup_logger, LOG_STYLES
from StatusStyleDelegate import StatusStyleDelegate
from order_executor import OrderExecutor
from log_style_manager import LogStyleManager
from typing import List
# 设置 Tushare Token
# ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d')
pro = ts.pro_api()
from quote_manager import QuoteManager, DataSource
class PriceUpdateWorker(QThread):
data_ready = Signal(dict)
update_finished = Signal()
def __init__(self, stock_codes, parent=None):
super().__init__(parent)
self.stock_codes = stock_codes
# 引用行情数据
self.quote_manager = QuoteManager()
self.quote_manager.set_data_source(DataSource.TUSHARE)
# 添加超时时间(秒)
self.quote_manager.set_retry_policy(max_retries=3, retry_interval=2)
def run(self):
try:
all_results = self.quote_manager.get_realtime_quotes(self.stock_codes)
self.data_ready.emit(all_results)
except Exception as e:
log_error(f"行情获取失败: {str(e)}")
finally:
self.update_finished.emit()
class FetchShareCapitalWorker(QThread):
data_ready = Signal(dict)
def run(self):
try:
log_info("获取流通盘数据...")
retry_count = 3 # 设置最大重试次数
for attempt in range(retry_count):
try:
dc_df = ts.realtime_list(src='dc')
if dc_df is None or dc_df.empty:
log_warning(f"{attempt + 1} 次获取流通盘失败,数据为空,准备重试...")
time.sleep(2) # 等待 2 秒后重试
continue
float_mv_dict = {
row['TS_CODE']: row['FLOAT_MV'] / 1e8
for _, row in dc_df.iterrows()
}
log_info(f"流通盘缓存已更新,共 {len(float_mv_dict)} 条记录")
self.data_ready.emit(float_mv_dict)
return # 成功退出循环
except Exception as e:
log_error(f"{attempt + 1} 次获取流通盘失败: {str(e)}")
time.sleep(2)
log_error("多次尝试获取流通盘失败请检查网络连接、Token 权限或 Tushare 接口状态。")
self.data_ready.emit({}) # 发送空数据以表示失败
except Exception as e:
log_error(f"获取流通盘数据时发生异常: {str(e)}")
self.data_ready.emit({})
from typing import List
# 自定义模型以支持按业务优先级排序
class CustomSortModel(QStandardItemModel):
def sort(self, column, order=Qt.SortOrder.AscendingOrder):
if column != 7:
return super().sort(column, order)
status_priority = {"已触发": 0, "错买": 1, "监控中": 2}
def get_sort_key(item):
if item is None:
return 3
role = item.data(Qt.ItemDataRole.UserRole)
if role is not None:
return role
text = item.text()
return status_priority.get(text, 3)
# 保存当前所有行
all_rows = [self.takeRow(0) for _ in range(self.rowCount())]
# 按照状态列排序
all_rows.sort(key=lambda row_items: get_sort_key(row_items[7]),
reverse=(order == Qt.SortOrder.DescendingOrder))
# 清空并重新添加
self.removeRows(0, self.rowCount())
for row_items in all_rows:
self.appendRow(row_items)
self.layoutChanged.emit()
class MainWindow(QMainWindow):
log_signal = Signal(str, str) # message, level
OUT_PATH = r"D:\gp_data"
ping_updated_signal = Signal(float) # 新增:用于线程安全地传递延迟值
def __init__(self):
super().__init__()
self.setWindowTitle("股票价格监控系统")
self.resize(1200, 800)
self.capital_threshold = 60 # 流通盘阈值60亿
self.open_zf_threshold_up = 3.0 # 开盘涨幅阈值3%
self.open_zf_threshold_down = -2.0 # 开盘跌幅阈值2%
# 初始化 delegate
self.delegate = StatusStyleDelegate(self)
# 初始化股票计数器标签
self.total_stocks_label = QLabel("总数: 0")
self.triggered_stocks_label = QLabel("触发: 0")
self.total_stocks_label.setObjectName("total_stocks_label")
self.triggered_stocks_label.setObjectName("triggered_stocks_label")
self.trading_status_label = QLabel("当前状态:非交易时间")
self.trading_status_label.setObjectName("trading_status_label")
self.data_refresh_label = QLabel("数据状态:")
self.data_refresh_label.setObjectName("data_refresh_label")
self.ping_updated_signal.connect(self.update_ping_ui) # 连接信号到 UI 更新函数
self.ping_label = QLabel("实时行情延迟: --ms")
self.ping_label.setObjectName("ping_label")
self.ping_light = self.create_status_light()
self.ping_light.setObjectName("ping_light") # 避免重复创建
self.price_update_worker = None # 用于保存当前线程实例
self.fetch_worker = None # 用于保存流通盘数据获取线程实例
self.current_all_results = {} # 缓存最新行情数据
self.calculated_open_pct = set() # 记录已计算开盘涨幅的股票
# 初始化模型
self.standard_model = CustomSortModel()
self.standard_model.setHorizontalHeaderLabels([
"代码", "名称", "开盘涨幅", "流通盘", "当前价", "目标价", "获利%", "状态", "预警时间"
])
# 表格视图
self.table_view = QTableView()
self.table_view.setModel(self.standard_model)
self.table_view.setSortingEnabled(True) # 启用排序
self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Fixed)
# 设置每列宽度
widths = [140, 120, 100, 100, 80, 80, 80, 80, 220]
for col_index, width in enumerate(widths):
self.table_view.horizontalHeader().resizeSection(col_index, width)
# 设置委托(确保 self.status_delegate 已在 __init__ 中初始化)
# 修改前:
# self.status_delegate = StatusStyleDelegate()
# 修改后:
self.status_delegate = StatusStyleDelegate(parent=self, table_view=self.table_view)
for col in range(8): # 对前8列应用
self.table_view.setItemDelegateForColumn(col, self.status_delegate)
# 新增大流通盘颜色样式
self.large_cap_delegate = StatusStyleDelegate(parent=self, table_view=self.table_view)
self.table_view.setItemDelegateForColumn(3, self.large_cap_delegate) # 第4列为流通盘列
# 添加缺失的delegate初始化
self.delegate = StatusStyleDelegate(self, table_view=self.table_view)
# 在 init_ui() 之前或之后都可以
self.load_stylesheet()
# 加载UI布局
self.init_ui()
# 初始化日志样式管理器
self.log_style_manager = LogStyleManager(self.log_box)
self.log_style_manager.set_global_font(font_family="微软雅黑", font_size=12)
self.log_style_manager.set_letter_spacing(spacing=1) # 设置字间距
self.log_style_manager.set_line_height(line_height=120) # 设置行高为 1.2 倍
# 设置日志区域段落格式
# self.setup_log_paragraph_format()
# 其它初始化
# font = self.log_box.font()
# font.setLetterSpacing(QFont.SpacingType.AbsoluteSpacing, 1) # 设置每个字符之间固定间距为 1 像素
# self.log_box.setFont(font)
# 初始化变量
self.order_executor = OrderExecutor() # 初始化下单执行器
self.data_rows = []
self.stock_codes = []
self.auto_push_var = False
setup_logger(self.append_log)
# 初始化日志文件路径(关键修复点)
if not hasattr(self, 'log_file'):
self.log_file = os.path.join(self.OUT_PATH, f"monitor_{datetime.datetime.now().strftime('%Y%m%d')}.log")
# 启动定时器
self.timer = QTimer()
self.timer.timeout.connect(self.update_prices)
self.timer.start(5000)
# 启动状态更新线程
self.start_status_update()
self.start_ping_update() # 启动 Ping 更新
# print(type(self.standard_model)) # 应该输出 <class '__main__.CustomSortModel'>
def on_share_capital_data_ready(self, float_share_data):
# 在主线程中更新UI
self.float_share_cache = float_share_data
self.update_share_capital_ui(float_share_data)
def on_fetch_finished(self):
# 重新启用控件
self.btn_load.setEnabled(True)
self.btn_save.setEnabled(True)
self.checkbox_debug.setEnabled(True)
self.fetch_worker = None
def load_stylesheet(self):
qss_file = os.path.join(self.OUT_PATH, "style.qss")
if os.path.exists(qss_file):
with open(qss_file, 'r', encoding='utf-8') as f:
self.setStyleSheet(f.read())
log_info(f"样式文件已加载: {qss_file}")
else:
log_error(f"样式文件不存在: {qss_file}")
def add_test_data(self):
"""添加测试数据"""
statuses = ["监控中", "错买", "已触发"]
for i in range(10):
code = f"{i + 1:06d}.SZ"
name = f"测试股票{i + 1}"
status = statuses[i % 3]
items = [
QStandardItem(code),
QStandardItem(name),
QStandardItem("0.00%"),
QStandardItem("10.00亿"),
QStandardItem("15.00"),
QStandardItem("16.00"),
QStandardItem("6.67%"),
QStandardItem(status),
QStandardItem(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
]
# 设置排序角色
if status == "已触发":
items[7].setData(0, Qt.ItemDataRole.UserRole)
elif status == "错买":
items[7].setData(1, Qt.ItemDataRole.UserRole)
elif status == "监控中":
items[7].setData(2, Qt.ItemDataRole.UserRole)
self.standard_model.appendRow(items)
def start_price_update_thread(self):
if self.price_update_worker is not None and self.price_update_worker.isRunning():
return # 避免重复启动线程
self.price_update_worker = PriceUpdateWorker(self.stock_codes, self)
self.price_update_worker.data_ready.connect(self.on_data_ready)
self.price_update_worker.update_finished.connect(self.on_update_finished)
self.price_update_worker.start()
def init_ui(self):
# 主窗口控件与布局
main_widget = QWidget()
layout = QVBoxLayout()
# 状态栏
status_layout = QHBoxLayout()
self.status_light = self.create_status_light()
self.update_light = self.create_status_light()
# self.ping_label = QLabel("实时行情延迟: --ms")
# self.ping_light = self.create_status_light() # 新增的小灯
status_layout.addWidget(self.trading_status_label)
status_layout.addWidget(self.status_light)
status_layout.addStretch()
# 在 update_light 前添加文字标签
status_layout.addWidget(self.data_refresh_label)
status_layout.addWidget(self.update_light)
# 新增网络延迟标签和小灯
status_layout.addSpacing(20) # 添加间距
# 状态栏部分
status_layout.addWidget(self.ping_label)
status_layout.addWidget(self.ping_light)
layout.addLayout(status_layout)
# 表格视图
self.table_view = QTableView()
self.standard_model.setHorizontalHeaderLabels([
"代码", "名称", "开盘涨幅", "流通盘", "当前价", "目标价", "获利%", "状态", "预警时间"
])
self.table_view.setModel(self.standard_model)
# 设置表头行为(先设 Stretch再设固定宽度
self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.Fixed)
for col_index, width in enumerate([140, 120, 100, 100, 80, 80, 100, 80, 220]):
self.table_view.horizontalHeader().resizeSection(col_index, width)
# 设置委托(确保 self.status_delegate 已在 __init__ 中初始化)
self.table_view.setItemDelegateForColumn(7, self.status_delegate)
self.table_view.setSortingEnabled(True)
# 双击事件绑定
self.table_view.doubleClicked.connect(self.place_order)
layout.addWidget(self.table_view)
# 控制面板
control_layout = QHBoxLayout()
button_font = QFont()
button_font.setPointSize(8)
self.btn_load = QPushButton("选择文件")
self.btn_save = QPushButton("保存结果")
self.checkbox_auto = QCheckBox("自动下单")
self.checkbox_debug = QCheckBox("调试模式")
self.checkbox_topmost = QCheckBox("窗口置顶")
# 设置对象名
self.btn_load.setObjectName("btn_load")
self.btn_save.setObjectName("btn_save")
for btn in [self.btn_load, self.btn_save]:
btn.setObjectName(btn.text().replace(" ", "_"))
btn.setFont(button_font)
for cb in [self.checkbox_auto, self.checkbox_debug, self.checkbox_topmost]:
cb.setFont(button_font)
cb.setObjectName(cb.text())
# 连接事件
self.checkbox_topmost.stateChanged.connect(self.toggle_always_on_top)
self.btn_load.clicked.connect(self.select_file)
self.btn_save.clicked.connect(self.export_to_excel)
control_layout.addWidget(self.btn_load)
control_layout.addWidget(self.btn_save)
control_layout.addWidget(self.checkbox_auto)
control_layout.addWidget(self.checkbox_topmost)
control_layout.addWidget(self.checkbox_debug)
# 添加股票计数器标签到控制布局
control_layout.addWidget(self.total_stocks_label)
control_layout.addWidget(self.triggered_stocks_label)
# self.total_stocks_label.setStyleSheet("color: green; font-size: 14pt;")
# self.triggered_stocks_label.setStyleSheet("color: blue; font-size: 14pt;")
layout.addLayout(control_layout)
# 日志区域
self.log_box = QTextEdit()
self.log_box.setReadOnly(True)
self.log_box.setFixedHeight(250) # 设置固定高度为 250 像素
layout.addWidget(self.log_box)
# 设置主窗口布局
main_widget.setLayout(layout)
self.setCentralWidget(main_widget)
# 确保样式表加载
# self.load_stylesheet()
# self.update_ping_ui(123.45)
def start_ping_update(self):
def ping_loop():
while True:
try:
# log_info("开始测速...")
delay = self.ping_tushare_server()
# log_info(f"获取到延迟: {delay:.2f}ms")
self.ping_updated_signal.emit(delay) # 使用信号通知主线程
except Exception as e:
log_error(f"Ping 失败: {str(e)}")
time.sleep(60) # 每分钟更新一次
# log_info("启动后台 Ping 线程...")
threading.Thread(target=ping_loop, daemon=True).start()
def ping_tushare_server(self):
"""
使用 Tushare 实时行情接口测试 API 响应速度
返回三次请求的平均延迟(单位:毫秒)
"""
delays = []
for _ in range(3): # 进行3次尝试
start = time.time()
try:
# 使用一个轻量级的 API 请求进行测试
df = ts.realtime_quote(ts_code='000001.SZ')
# log_debug("API 调用成功,返回结果:")
# log_debug(str(df)) # 打印原始返回结果,便于调试
if df is not None and not df.empty:
end = time.time()
delay = (end - start) * 1000 # 转换为毫秒
delays.append(delay)
# log_info(f"单次延迟: {delay:.2f}ms")
else:
log_warning("API 返回空数据")
except Exception as e:
log_warning(f"API 请求失败: {str(e)}")
continue
if delays:
avg_delay = round(sum(delays) / len(delays), 2)
# log_info(f"平均延迟: {avg_delay:.2f}ms")
return avg_delay # 返回平均延迟
else:
log_error("无法成功调用 Tushare API未获取到任何有效延迟数据")
raise Exception("无法成功调用 Tushare API")
def update_ping_ui(self, delay):
if not hasattr(self, 'ping_label') or not hasattr(self, 'ping_light'):
log_warning("ping_label 或 ping_light 尚未初始化")
return
try:
self.ping_label.setText(f"实时行情延迟: {delay:.2f}ms")
# log_debug(f"更新网络延迟标签为: {delay:.2f}ms")
self.flash_ping_light(delay)
except Exception as e:
log_error(f"更新延迟UI失败: {str(e)}")
self.ping_light.setStyleSheet("background-color: gray; border-radius: 10px;")
def flash_ping_light(self, delay):
light = self.ping_light
if delay < 100: # 如果延迟小于100ms显示绿色
color = "#00FF00"
elif delay < 300: # 如果延迟在100ms到300ms之间显示黄色
color = "yellow"
else: # 如果延迟大于300ms显示红色
color = "red"
# log_debug(f"设置小灯颜色为: {color}")
light.setStyleSheet(f"background-color: {color}; border-radius: 10px;")
anim = QPropertyAnimation(light, b"geometry")
anim.setDuration(300)
anim.setKeyValueAt(0, light.geometry())
anim.setKeyValueAt(0.5, light.geometry().adjusted(-3, -3, 3, 3))
anim.setKeyValueAt(1, light.geometry())
anim.setLoopCount(2)
anim.setEasingCurve(QEasingCurve.Type.OutBounce)
anim.start()
# QTimer.singleShot(600, lambda: light.setStyleSheet("background-color: gray; border-radius: 10px;")) # 0.6秒后恢复灰色
def create_status_light(self):
light = QLabel()
light.setFixedSize(20, 20)
light.setStyleSheet("background-color: gray; border-radius: 10px;")
return light
def place_order(self, code, auto_flag):
"""
下单接口,根据股票代码执行下单操作
:param code: 股票代码 (str)
:param auto_flag: 是否自动下单 (int: 0/1)
"""
log_info(f"开始处理下单请求,股票代码: {code},自动模式: {auto_flag}")
try:
self.order_executor.place_order(code, auto_flag)
except Exception as e:
log_error(f"下单过程中发生异常: {e}")
def update_stock_counters(self):
total = self.standard_model.rowCount()
triggered = 0
for i in range(total):
status_item = self.standard_model.item(i, 7) # 假设状态是第8列索引从0开始
if status_item and status_item.text() == "已触发" or status_item.text() == "错买":
triggered += 1
self.total_stocks_label.setText(f"总数: {triggered}/{total}")
self.triggered_stocks_label.setText(f"触发: {triggered}")
def load_stylesheet(self):
qss_file = os.path.join(self.OUT_PATH, "style.qss")
log_info(f"加载样式文件路径: {qss_file}")
if os.path.exists(qss_file):
with open(qss_file, 'r', encoding='utf-8') as f:
self.setStyleSheet(f.read())
else:
log_error(f"样式文件不存在:{qss_file}")
def dragEnterEvent(self, event):
if event.mimeData().hasUrls():
url = event.mimeData().urls()[0]
if url.toString().lower().endswith('.txt'):
event.acceptProposedAction()
def dropEvent(self, event):
urls = event.mimeData().urls()
if urls:
file_path = urls[0].toLocalFile()
today = datetime.datetime.now().strftime("%Y%m%d")
trade_date = os.path.basename(file_path).split('.')[0]
if trade_date == today:
log_error("不能加载今日文件,请选择历史日期文件。")
else:
self.load_stocks(file_path, trade_date)
def select_file(self):
file_path, _ = QFileDialog.getOpenFileName(self, "选择监控文件", self.OUT_PATH +"\history", "文本文件 (*.txt)")
if file_path:
trade_date = os.path.basename(file_path).split('.')[0]
today = datetime.datetime.now().strftime("%Y%m%d")
if trade_date == today:
log_error("不能加载今日文件,请选择历史日期文件。")
return
self.load_stocks(file_path, trade_date)
def load_stocks(self, file_path, trade_date):
try:
with open(file_path, 'rb') as f:
raw_data = f.read(10000)
encoding = chardet.detect(raw_data)['encoding'] or 'utf-8'
with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
reader = csv.reader(f)
headers = next(reader)
required_cols = ['条件选股', '代码']
for col in required_cols:
if col not in headers:
raise KeyError(f"缺少必要列:{col}")
name_col = headers.index('条件选股')
code_col = headers.index('代码')
codes = []
self.standard_model.removeRows(0, self.standard_model.rowCount())
for line in reader:
if not line or len(line) <= max(name_col, code_col):
continue
name = line[name_col].strip()
code = line[code_col].strip().zfill(6)
formatted_code = self.format_code(code)
codes.append(formatted_code)
row_items = [
QStandardItem(formatted_code),
QStandardItem(name),
QStandardItem("-"),
QStandardItem("0"),
QStandardItem("-"),
QStandardItem("-"),
QStandardItem("-"),
QStandardItem("监控中"),
QStandardItem("")
]
self.standard_model.appendRow(row_items)
df = pro.daily(ts_code=','.join(codes), trade_date=trade_date)
close_prices = {row['ts_code']: row['close'] for _, row in df.iterrows()}
for i in range(self.standard_model.rowCount()):
code_item = self.standard_model.item(i, 0)
code = code_item.text()
target_price = round(close_prices.get(code, 0) * 1.049, 2)
self.standard_model.setItem(i, 5, QStandardItem(str(target_price)))
self.stock_codes = [self.format_code(code.zfill(6)) for code in codes]
# threading.Thread(target=self.fetch_share_capital_data, daemon=True).start()
# 禁用相关控件以防止用户交互
self.btn_load.setEnabled(False)
self.btn_save.setEnabled(False)
self.checkbox_debug.setEnabled(False)
# 启动后台线程获取流通盘数据
self.fetch_worker = FetchShareCapitalWorker()
self.fetch_worker.data_ready.connect(self.on_share_capital_data_ready)
self.fetch_worker.finished.connect(self.on_fetch_finished)
self.fetch_worker.start()
# 更新股票计数器
self.update_stock_counters()
except Exception as e:
log_error(f"加载失败: {str(e)}")
# 重新启用控件
# self.btn_load.setEnabled(True)
# self.btn_save.setEnabled(True)
# self.checkbox_debug.setEnabled(True)
def format_code(self, code):
# 如果已包含市场后缀(.SH 或 .SZ则不再处理
if '.' in code:
return code
code = code.zfill(6)
if code.startswith(("6", "9")):
return f"{code}.SH"
elif code.startswith(("0", "2", "3")):
return f"{code}.SZ"
else:
return code
def fetch_share_capital_data(self):
log_info("获取流通盘数据...")
retry_count = 3 # 设置最大重试次数
for attempt in range(retry_count):
try:
dc_df = ts.realtime_list(src='dc')
# print(dc_df)
if dc_df is None or dc_df.empty:
log_warning(f"{attempt + 1} 次获取流通盘失败,数据为空,准备重试...")
time.sleep(2) # 等待 2 秒后重试
continue
float_mv_dict = {
row['TS_CODE']: row['FLOAT_MV'] / 1e8
for _, row in dc_df.iterrows()
}
self.float_share_cache = float_mv_dict
log_info(f"流通盘缓存已更新,共 {len(float_mv_dict)} 条记录")
# self.update_share_capital_ui()
return float_mv_dict # 返回数据而不是直接更新UI
except Exception as e:
log_error(f"{attempt + 1} 次获取流通盘失败: {str(e)}")
time.sleep(2)
log_error("多次尝试获取流通盘失败请检查网络连接、Token 权限或 Tushare 接口状态。")
def update_share_capital_ui(self, float_share_data):
try:
for i in range(self.standard_model.rowCount()):
code = self.standard_model.item(i, 0).text()
lt_pan = self.float_share_cache.get(code, 0)
# 判断流通盘是否大于100亿
if lt_pan < self.capital_threshold:
item = QStandardItem(f"{lt_pan:.2f}亿")
item.setBackground(QBrush(QColor("yellow")))
item.setForeground(QColor("red"))
self.standard_model.setItem(i, 3, item)
else:
self.standard_model.setItem(i, 3, QStandardItem(f"{lt_pan:.2f}亿"))
except Exception as e:
log_error(f"更新流通盘数据失败: {str(e)}")
def on_data_ready(self, all_results):
self.current_all_results = all_results
def on_update_finished(self):
now = datetime.datetime.now()
is_open = self.is_trading_time()
for i in range(self.standard_model.rowCount()):
code_item = self.standard_model.item(i, 0)
code = code_item.text()
quote = self.current_all_results.get(code)
if quote is None or quote.empty:
continue
# 记录原始值用于比较
original_price = self.standard_model.item(i, 4).text()
original_profit = self.standard_model.item(i, 6).text()
current_price = float(quote['PRICE'])
target_price = float(self.standard_model.item(i, 5).text())
# 计算获利百分比
profit_pct = round((current_price - target_price) / target_price * 100, 2) # 新增这行
# 当前价更新时闪烁
if original_price != f"{current_price:.2f}":
self.standard_model.setItem(i, 4, QStandardItem(f"{current_price:.2f}"))
self.status_delegate.flash_cell(i, 4) # 触发闪烁
# 获利%更新时闪烁
if original_profit != f"{profit_pct:.2f}%":
self.standard_model.setItem(i, 6, QStandardItem(f"{profit_pct:.2f}%"))
self.status_delegate.flash_cell(i, 6) # 触发闪烁
profit_item = QStandardItem(f"{profit_pct:.2f}%")
# 根据获利百分比设置字体颜色
if profit_pct > 0:
profit_item.setForeground(QColor("red"))
elif profit_pct < 0:
profit_item.setForeground(QColor("green"))
self.standard_model.setItem(i, 6, profit_item)
self.status_delegate.flash_cell(i, 6) # 触发闪烁
# # 判断获利是否大于0
# if profit_pct > 0:
# profit_item = self.standard_model.item(i, 6)
# profit_item.setForeground(QColor("red"))
#
# else:
# profit_item = self.standard_model.item(i, 6)
# profit_item.setForeground(QColor("green"))
# 开盘涨幅逻辑:仅当开盘后第一次计算 开盘涨幅更新时闪烁
if is_open and code not in self.calculated_open_pct:
try:
open_price = float(quote['OPEN'])
pre_close = float(quote['PRE_CLOSE'])
open_zf = round((open_price - pre_close) / pre_close * 100, 2)
# print(open_zf)
open_zf_item = QStandardItem(f"{open_zf:.2f}%")
# self.standard_model.setItem(i, 2, QStandardItem(f"{open_zf:.2f}%"))
# 如果开盘涨幅小于等于阈值,标记为黄色
if self.open_zf_threshold_down < open_zf <= self.open_zf_threshold_up:
open_zf_item.setBackground(QBrush(QColor("yellow")))
open_zf_item.setForeground(QColor("red"))
self.standard_model.setItem(i, 2, open_zf_item)
self.status_delegate.flash_cell(i, 2) # 触发闪烁
self.calculated_open_pct.add(code)
# print(open_zf)
# 新增判断:开盘涨幅大于 3%,标记为“错买”
# if open_zf > self.open_zf_threshold:
# profit_item = self.standard_model.item(i, 2)
# profit_item.setForeground(QColor("red"))
# profit_item.setBackground(QBrush(QColor("yellow")))
except Exception as e:
log_error(f"计算开盘涨幅失败: {str(e)}")
# 状态判断和更新
status_item = self.standard_model.item(i, 7)
current_status = status_item.text() if status_item else "监控中"
# 初始化new_status为当前状态
new_status = current_status # 添加这行初始化new_status
# ... 状态判断逻辑 ...
if new_status != current_status:
# ... 设置新状态代码 ...
self.standard_model.setItem(i, 7, status_item)
self.status_delegate.flash_cell(i, 7) # 触发闪烁
high_price = float(quote['HIGH'])
target_price = float(self.standard_model.item(i, 5).text())
if current_price >= target_price:
new_status = "已触发"
elif high_price >= target_price:
new_status = "错买"
else:
new_status = "监控中"
if new_status != current_status:
status_item = QStandardItem(new_status)
if new_status == "已触发":
status_item.setData(0, Qt.ItemDataRole.UserRole)
elif new_status == "错买":
status_item.setData(1, Qt.ItemDataRole.UserRole)
elif new_status == "监控中":
status_item.setData(2, Qt.ItemDataRole.UserRole)
self.standard_model.setItem(i, 7, status_item)
self.delegate.flash_cell(i, 7) # 添加状态列闪烁
log_trigger(f"股票 {code} 状态更新: {current_status} -> {new_status}")
time_item = self.standard_model.item(i, 8)
if new_status in ["已触发", "错买"] and (time_item is None or not time_item.text()):
time_item = QStandardItem(now.strftime("%Y-%m-%d %H:%M:%S"))
self.standard_model.setItem(i, 8, time_item)
log_trigger(f"{code} 状态已更新为 {new_status}")
# 排序触发
self.table_view.sortByColumn(7, Qt.SortOrder.AscendingOrder)
# 更新计数器
self.update_stock_counters()
# 闪烁灯效果
self.flash_update_light()
def update_prices(self):
if not self.is_trading_time() and not self.checkbox_debug.isChecked():
return
self.start_price_update_thread()
def flash_update_light(self):
light = self.update_light
light.setStyleSheet("background-color: #00FF00; border-radius: 10px;")
anim = QPropertyAnimation(light, b"geometry")
anim.setDuration(300)
anim.setKeyValueAt(0, light.geometry())
anim.setKeyValueAt(0.5, light.geometry().adjusted(-3, -3, 3, 3))
anim.setKeyValueAt(1, light.geometry())
anim.setLoopCount(2)
anim.setEasingCurve(QEasingCurve.Type.OutBounce)
anim.start()
QTimer.singleShot(600, lambda: light.setStyleSheet("background-color: gray; border-radius: 10px;"))
def place_order(self, index=None):
if not index or not index.isValid():
return
code = self.standard_model.item(index.row(), 0).text()
pure_code = code.split('.')[0]
# log_info(f"选中股票代码: {pure_code}")
self.order_executor.place_order(pure_code, self.checkbox_auto.isChecked())
# 自动按状态排序
self.table_view.sortByColumn(7, Qt.SortOrder.AscendingOrder) # 第7列为“状态”列
def toggle_always_on_top(self, state):
if state == 2:
self.setWindowFlags(self.windowFlags() | Qt.WindowType.WindowStaysOnTopHint)
log_info("启用窗口置顶")
else:
self.setWindowFlags(self.windowFlags() & ~Qt.WindowType.WindowStaysOnTopHint)
log_info("取消窗口置顶")
self.show() # 必须调用 show() 才会生效
def export_to_excel(self):
try:
if not self.standard_model.rowCount():
log_warning("没有可导出的数据!")
self.show_warning("没有数据可以导出。")
return
today = datetime.datetime.now().strftime("%Y%m%d")
filename = os.path.join(self.OUT_PATH, f"{today}_监控结果.xlsx")
# 准备数据
data = []
for i in range(self.standard_model.rowCount()):
row = [self.standard_model.item(i, j).text() for j in range(self.standard_model.columnCount())]
data.append(row)
df = pd.DataFrame(data, columns=[
"代码", "名称", "开盘涨幅", "流通盘", "当前价", "目标价", "获利%", "状态", "预警时间"
])
# 写入 Excel
df.to_excel(filename, index=False)
log_info(f"数据已保存至: {filename}")
self.show_info("导出成功", f"数据已保存到: {filename}")
except Exception as e:
log_error(f"导出失败: {str(e)}")
self.show_error(f"导出Excel数据时发生错误{str(e)}")
def is_trading_time(self):
now = datetime.datetime.now().time()
weekday = datetime.datetime.now().weekday()
return (
weekday < 5 and(
(datetime.time(9, 30) <= now <= datetime.time(11, 30)) or
(datetime.time(13, 0) <= now <= datetime.time(15, 0))
)
)
def sort_table_by_status(self):
model = self.standard_model
rows = []
for i in range(model.rowCount()):
status_item = model.item(i, 7)
if not status_item:
continue
status = status_item.text()
row_data = []
fonts = []
bg_colors = []
fg_colors = []
for j in range(model.columnCount()):
item = model.item(i, j)
if item:
row_data.append(item.text())
fonts.append(item.font())
bg_colors.append(item.background().color())
fg_colors.append(item.foreground().color())
else:
row_data.append("")
fonts.append(QFont())
bg_colors.append(QColor('white'))
fg_colors.append(QColor('black'))
rows.append((status, row_data, fonts, bg_colors, fg_colors))
# 按照状态排序:触发 > 错买 > 监控中
sorted_rows = sorted(rows, key=lambda x: {'已触发': 0, '错买': 1, '监控中': 2}.get(x[0], 3))
model.removeRows(0, model.rowCount())
for _, row_data, fonts, bg_colors, fg_colors in sorted_rows:
items = []
for j in range(len(row_data)):
item = QStandardItem(row_data[j])
item.setFont(fonts[j])
item.setBackground(QColor(bg_colors[j]))
item.setForeground(QColor(fg_colors[j]))
items.append(item)
model.appendRow(items)
def start_status_update(self):
def check_status():
while True:
is_open = self.is_trading_time()
status_text = "开盘中" if is_open else "收盘时间"
color = "lime" if is_open else "red"
self.trading_status_label.setText(f"当前状态:{status_text}")
self.status_light.setStyleSheet(f"background-color: {color}; border-radius: 10px;")
time.sleep(10)
threading.Thread(target=check_status, daemon=True).start()
def append_log(self, full_message, log_type='default'):
"""
接收全局日志并显示在 UI 中
:param full_message: 带时间戳的完整日志信息 (str)
:param log_type: 日志类型 (str),如 info/warning/error/trigger 等
"""
self.log_style_manager.insert_log(full_message, log_type)
# 写入文件
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(full_message + "\n")
def show_error(self, msg):
QMessageBox.critical(self, "错误", msg)
def show_warning(self, msg):
QMessageBox.warning(self, "警告", msg)
def show_info(self, title, msg):
QMessageBox.information(self, title, msg)
def closeEvent(self, event):
reply = QMessageBox.question(
self, '退出',
"确定要退出吗?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.Yes:
event.accept()
else:
event.ignore()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())