import sys import os import time import datetime import chardet import csv import pandas as pd import tushare as ts import threading import socket from PySide6.QtCore import Qt, QModelIndex from PySide6.QtCore import QTimer, QThread, Signal, QObject, QPropertyAnimation, QEasingCurve from PySide6.QtWidgets import ( QApplication, QMainWindow, QTableView, QPushButton, QVBoxLayout, QHBoxLayout, QWidget, QLabel, QFileDialog, QTextEdit, QCheckBox, QLineEdit, QMessageBox, QSplitter, QMenuBar, QMenu, QHeaderView, QDialog ) # 正确导入方式 from PySide6.QtWidgets import QApplication, QTableView, QHeaderView, QStyledItemDelegate from PySide6.QtGui import QAction, QFont, QTextCharFormat, QColor, QIcon, QStandardItemModel, QStandardItem, QPalette from PySide6.QtGui import QTextCursor, QBrush from logger_utils_new import log_info, log_warning, log_error, log_trigger, log_debug, setup_logger, LOG_STYLES from StatusStyleDelegate import StatusStyleDelegate from order_executor import OrderExecutor from log_style_manager import LogStyleManager from typing import List # 设置 Tushare Token # ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d') pro = ts.pro_api() from quote_manager import QuoteManager, DataSource class PriceUpdateWorker(QThread): data_ready = Signal(dict) update_finished = Signal() def __init__(self, stock_codes, parent=None): super().__init__(parent) self.stock_codes = stock_codes # 引用行情数据 self.quote_manager = QuoteManager() self.quote_manager.set_data_source(DataSource.TUSHARE) # 添加超时时间(秒) self.quote_manager.set_retry_policy(max_retries=3, retry_interval=2) def run(self): try: all_results = self.quote_manager.get_realtime_quotes(self.stock_codes) self.data_ready.emit(all_results) except Exception as e: log_error(f"行情获取失败: {str(e)}") finally: self.update_finished.emit() class FetchShareCapitalWorker(QThread): data_ready = Signal(dict) def run(self): try: log_info("获取流通盘数据...") retry_count = 3 # 设置最大重试次数 for attempt in range(retry_count): try: dc_df = ts.realtime_list(src='dc') if dc_df is None or dc_df.empty: log_warning(f"第 {attempt + 1} 次获取流通盘失败,数据为空,准备重试...") time.sleep(2) # 等待 2 秒后重试 continue float_mv_dict = { row['TS_CODE']: row['FLOAT_MV'] / 1e8 for _, row in dc_df.iterrows() } log_info(f"流通盘缓存已更新,共 {len(float_mv_dict)} 条记录") self.data_ready.emit(float_mv_dict) return # 成功退出循环 except Exception as e: log_error(f"第 {attempt + 1} 次获取流通盘失败: {str(e)}") time.sleep(2) log_error("多次尝试获取流通盘失败,请检查网络连接、Token 权限或 Tushare 接口状态。") self.data_ready.emit({}) # 发送空数据以表示失败 except Exception as e: log_error(f"获取流通盘数据时发生异常: {str(e)}") self.data_ready.emit({}) from typing import List # 自定义模型以支持按业务优先级排序 class CustomSortModel(QStandardItemModel): def sort(self, column, order=Qt.SortOrder.AscendingOrder): if column != 7: return super().sort(column, order) status_priority = {"已触发": 0, "错买": 1, "监控中": 2} def get_sort_key(item): if item is None: return 3 role = item.data(Qt.ItemDataRole.UserRole) if role is not None: return role text = item.text() return status_priority.get(text, 3) # 保存当前所有行 all_rows = [self.takeRow(0) for _ in range(self.rowCount())] # 按照状态列排序 all_rows.sort(key=lambda row_items: get_sort_key(row_items[7]), reverse=(order == Qt.SortOrder.DescendingOrder)) # 清空并重新添加 self.removeRows(0, self.rowCount()) for row_items in all_rows: self.appendRow(row_items) self.layoutChanged.emit() class MainWindow(QMainWindow): log_signal = Signal(str, str) # message, level OUT_PATH = r"D:\gp_data" ping_updated_signal = Signal(float) # 新增:用于线程安全地传递延迟值 def __init__(self): super().__init__() self.setWindowTitle("股票价格监控系统") self.resize(1200, 800) self.capital_threshold = 60 # 流通盘阈值60亿 self.open_zf_threshold_up = 3.0 # 开盘涨幅阈值3% self.open_zf_threshold_down = -2.0 # 开盘跌幅阈值2% # 初始化 delegate self.delegate = StatusStyleDelegate(self) # 初始化股票计数器标签 self.total_stocks_label = QLabel("总数: 0") self.triggered_stocks_label = QLabel("触发: 0") self.total_stocks_label.setObjectName("total_stocks_label") self.triggered_stocks_label.setObjectName("triggered_stocks_label") self.trading_status_label = QLabel("当前状态:非交易时间") self.trading_status_label.setObjectName("trading_status_label") self.data_refresh_label = QLabel("数据状态:") self.data_refresh_label.setObjectName("data_refresh_label") self.ping_updated_signal.connect(self.update_ping_ui) # 连接信号到 UI 更新函数 self.ping_label = QLabel("实时行情延迟: --ms") self.ping_label.setObjectName("ping_label") self.ping_light = self.create_status_light() self.ping_light.setObjectName("ping_light") # 避免重复创建 self.price_update_worker = None # 用于保存当前线程实例 self.fetch_worker = None # 用于保存流通盘数据获取线程实例 self.current_all_results = {} # 缓存最新行情数据 self.calculated_open_pct = set() # 记录已计算开盘涨幅的股票 # 初始化模型 self.standard_model = CustomSortModel() self.standard_model.setHorizontalHeaderLabels([ "代码", "名称", "开盘涨幅", "流通盘", "当前价", "目标价", "获利%", "状态", "预警时间" ]) # 表格视图 self.table_view = QTableView() self.table_view.setModel(self.standard_model) self.table_view.setSortingEnabled(True) # 启用排序 self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Fixed) # 设置每列宽度 widths = [140, 120, 100, 100, 80, 80, 80, 80, 220] for col_index, width in enumerate(widths): self.table_view.horizontalHeader().resizeSection(col_index, width) # 设置委托(确保 self.status_delegate 已在 __init__ 中初始化) # 修改前: # self.status_delegate = StatusStyleDelegate() # 修改后: self.status_delegate = StatusStyleDelegate(parent=self, table_view=self.table_view) for col in range(8): # 对前8列应用 self.table_view.setItemDelegateForColumn(col, self.status_delegate) # 新增大流通盘颜色样式 self.large_cap_delegate = StatusStyleDelegate(parent=self, table_view=self.table_view) self.table_view.setItemDelegateForColumn(3, self.large_cap_delegate) # 第4列为流通盘列 # 添加缺失的delegate初始化 self.delegate = StatusStyleDelegate(self, table_view=self.table_view) # 在 init_ui() 之前或之后都可以 self.load_stylesheet() # 加载UI布局 self.init_ui() # 初始化日志样式管理器 self.log_style_manager = LogStyleManager(self.log_box) self.log_style_manager.set_global_font(font_family="微软雅黑", font_size=12) self.log_style_manager.set_letter_spacing(spacing=1) # 设置字间距 self.log_style_manager.set_line_height(line_height=120) # 设置行高为 1.2 倍 # 设置日志区域段落格式 # self.setup_log_paragraph_format() # 其它初始化 # font = self.log_box.font() # font.setLetterSpacing(QFont.SpacingType.AbsoluteSpacing, 1) # 设置每个字符之间固定间距为 1 像素 # self.log_box.setFont(font) # 初始化变量 self.order_executor = OrderExecutor() # 初始化下单执行器 self.data_rows = [] self.stock_codes = [] self.auto_push_var = False setup_logger(self.append_log) # 初始化日志文件路径(关键修复点) if not hasattr(self, 'log_file'): self.log_file = os.path.join(self.OUT_PATH, f"monitor_{datetime.datetime.now().strftime('%Y%m%d')}.log") # 启动定时器 self.timer = QTimer() self.timer.timeout.connect(self.update_prices) self.timer.start(5000) # 启动状态更新线程 self.start_status_update() self.start_ping_update() # 启动 Ping 更新 # print(type(self.standard_model)) # 应该输出 def on_share_capital_data_ready(self, float_share_data): # 在主线程中更新UI self.float_share_cache = float_share_data self.update_share_capital_ui(float_share_data) def on_fetch_finished(self): # 重新启用控件 self.btn_load.setEnabled(True) self.btn_save.setEnabled(True) self.checkbox_debug.setEnabled(True) self.fetch_worker = None def load_stylesheet(self): qss_file = os.path.join(self.OUT_PATH, "style.qss") if os.path.exists(qss_file): with open(qss_file, 'r', encoding='utf-8') as f: self.setStyleSheet(f.read()) log_info(f"样式文件已加载: {qss_file}") else: log_error(f"样式文件不存在: {qss_file}") def add_test_data(self): """添加测试数据""" statuses = ["监控中", "错买", "已触发"] for i in range(10): code = f"{i + 1:06d}.SZ" name = f"测试股票{i + 1}" status = statuses[i % 3] items = [ QStandardItem(code), QStandardItem(name), QStandardItem("0.00%"), QStandardItem("10.00亿"), QStandardItem("15.00"), QStandardItem("16.00"), QStandardItem("6.67%"), QStandardItem(status), QStandardItem(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) ] # 设置排序角色 if status == "已触发": items[7].setData(0, Qt.ItemDataRole.UserRole) elif status == "错买": items[7].setData(1, Qt.ItemDataRole.UserRole) elif status == "监控中": items[7].setData(2, Qt.ItemDataRole.UserRole) self.standard_model.appendRow(items) def start_price_update_thread(self): if self.price_update_worker is not None and self.price_update_worker.isRunning(): return # 避免重复启动线程 self.price_update_worker = PriceUpdateWorker(self.stock_codes, self) self.price_update_worker.data_ready.connect(self.on_data_ready) self.price_update_worker.update_finished.connect(self.on_update_finished) self.price_update_worker.start() def init_ui(self): # 主窗口控件与布局 main_widget = QWidget() layout = QVBoxLayout() # 状态栏 status_layout = QHBoxLayout() self.status_light = self.create_status_light() self.update_light = self.create_status_light() # self.ping_label = QLabel("实时行情延迟: --ms") # self.ping_light = self.create_status_light() # 新增的小灯 status_layout.addWidget(self.trading_status_label) status_layout.addWidget(self.status_light) status_layout.addStretch() # 在 update_light 前添加文字标签 status_layout.addWidget(self.data_refresh_label) status_layout.addWidget(self.update_light) # 新增网络延迟标签和小灯 status_layout.addSpacing(20) # 添加间距 # 状态栏部分 status_layout.addWidget(self.ping_label) status_layout.addWidget(self.ping_light) layout.addLayout(status_layout) # 表格视图 self.table_view = QTableView() self.standard_model.setHorizontalHeaderLabels([ "代码", "名称", "开盘涨幅", "流通盘", "当前价", "目标价", "获利%", "状态", "预警时间" ]) self.table_view.setModel(self.standard_model) # 设置表头行为(先设 Stretch,再设固定宽度) self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.Fixed) for col_index, width in enumerate([140, 120, 100, 100, 80, 80, 100, 80, 220]): self.table_view.horizontalHeader().resizeSection(col_index, width) # 设置委托(确保 self.status_delegate 已在 __init__ 中初始化) self.table_view.setItemDelegateForColumn(7, self.status_delegate) self.table_view.setSortingEnabled(True) # 双击事件绑定 self.table_view.doubleClicked.connect(self.place_order) layout.addWidget(self.table_view) # 控制面板 control_layout = QHBoxLayout() button_font = QFont() button_font.setPointSize(8) self.btn_load = QPushButton("选择文件") self.btn_save = QPushButton("保存结果") self.checkbox_auto = QCheckBox("自动下单") self.checkbox_debug = QCheckBox("调试模式") self.checkbox_topmost = QCheckBox("窗口置顶") # 设置对象名 self.btn_load.setObjectName("btn_load") self.btn_save.setObjectName("btn_save") for btn in [self.btn_load, self.btn_save]: btn.setObjectName(btn.text().replace(" ", "_")) btn.setFont(button_font) for cb in [self.checkbox_auto, self.checkbox_debug, self.checkbox_topmost]: cb.setFont(button_font) cb.setObjectName(cb.text()) # 连接事件 self.checkbox_topmost.stateChanged.connect(self.toggle_always_on_top) self.btn_load.clicked.connect(self.select_file) self.btn_save.clicked.connect(self.export_to_excel) control_layout.addWidget(self.btn_load) control_layout.addWidget(self.btn_save) control_layout.addWidget(self.checkbox_auto) control_layout.addWidget(self.checkbox_topmost) control_layout.addWidget(self.checkbox_debug) # 添加股票计数器标签到控制布局 control_layout.addWidget(self.total_stocks_label) control_layout.addWidget(self.triggered_stocks_label) # self.total_stocks_label.setStyleSheet("color: green; font-size: 14pt;") # self.triggered_stocks_label.setStyleSheet("color: blue; font-size: 14pt;") layout.addLayout(control_layout) # 日志区域 self.log_box = QTextEdit() self.log_box.setReadOnly(True) self.log_box.setFixedHeight(250) # 设置固定高度为 250 像素 layout.addWidget(self.log_box) # 设置主窗口布局 main_widget.setLayout(layout) self.setCentralWidget(main_widget) # 确保样式表加载 # self.load_stylesheet() # self.update_ping_ui(123.45) def start_ping_update(self): def ping_loop(): while True: try: # log_info("开始测速...") delay = self.ping_tushare_server() # log_info(f"获取到延迟: {delay:.2f}ms") self.ping_updated_signal.emit(delay) # 使用信号通知主线程 except Exception as e: log_error(f"Ping 失败: {str(e)}") time.sleep(60) # 每分钟更新一次 # log_info("启动后台 Ping 线程...") threading.Thread(target=ping_loop, daemon=True).start() def ping_tushare_server(self): """ 使用 Tushare 实时行情接口测试 API 响应速度 返回三次请求的平均延迟(单位:毫秒) """ delays = [] for _ in range(3): # 进行3次尝试 start = time.time() try: # 使用一个轻量级的 API 请求进行测试 df = ts.realtime_quote(ts_code='000001.SZ') # log_debug("API 调用成功,返回结果:") # log_debug(str(df)) # 打印原始返回结果,便于调试 if df is not None and not df.empty: end = time.time() delay = (end - start) * 1000 # 转换为毫秒 delays.append(delay) # log_info(f"单次延迟: {delay:.2f}ms") else: log_warning("API 返回空数据") except Exception as e: log_warning(f"API 请求失败: {str(e)}") continue if delays: avg_delay = round(sum(delays) / len(delays), 2) # log_info(f"平均延迟: {avg_delay:.2f}ms") return avg_delay # 返回平均延迟 else: log_error("无法成功调用 Tushare API,未获取到任何有效延迟数据") raise Exception("无法成功调用 Tushare API") def update_ping_ui(self, delay): if not hasattr(self, 'ping_label') or not hasattr(self, 'ping_light'): log_warning("ping_label 或 ping_light 尚未初始化") return try: self.ping_label.setText(f"实时行情延迟: {delay:.2f}ms") # log_debug(f"更新网络延迟标签为: {delay:.2f}ms") self.flash_ping_light(delay) except Exception as e: log_error(f"更新延迟UI失败: {str(e)}") self.ping_light.setStyleSheet("background-color: gray; border-radius: 10px;") def flash_ping_light(self, delay): light = self.ping_light if delay < 100: # 如果延迟小于100ms,显示绿色 color = "#00FF00" elif delay < 300: # 如果延迟在100ms到300ms之间,显示黄色 color = "yellow" else: # 如果延迟大于300ms,显示红色 color = "red" # log_debug(f"设置小灯颜色为: {color}") light.setStyleSheet(f"background-color: {color}; border-radius: 10px;") anim = QPropertyAnimation(light, b"geometry") anim.setDuration(300) anim.setKeyValueAt(0, light.geometry()) anim.setKeyValueAt(0.5, light.geometry().adjusted(-3, -3, 3, 3)) anim.setKeyValueAt(1, light.geometry()) anim.setLoopCount(2) anim.setEasingCurve(QEasingCurve.Type.OutBounce) anim.start() # QTimer.singleShot(600, lambda: light.setStyleSheet("background-color: gray; border-radius: 10px;")) # 0.6秒后恢复灰色 def create_status_light(self): light = QLabel() light.setFixedSize(20, 20) light.setStyleSheet("background-color: gray; border-radius: 10px;") return light def place_order(self, code, auto_flag): """ 下单接口,根据股票代码执行下单操作 :param code: 股票代码 (str) :param auto_flag: 是否自动下单 (int: 0/1) """ log_info(f"开始处理下单请求,股票代码: {code},自动模式: {auto_flag}") try: self.order_executor.place_order(code, auto_flag) except Exception as e: log_error(f"下单过程中发生异常: {e}") def update_stock_counters(self): total = self.standard_model.rowCount() triggered = 0 for i in range(total): status_item = self.standard_model.item(i, 7) # 假设状态是第8列(索引从0开始) if status_item and status_item.text() == "已触发" or status_item.text() == "错买": triggered += 1 self.total_stocks_label.setText(f"总数: {triggered}/{total}") self.triggered_stocks_label.setText(f"触发: {triggered}") def load_stylesheet(self): qss_file = os.path.join(self.OUT_PATH, "style.qss") log_info(f"加载样式文件路径: {qss_file}") if os.path.exists(qss_file): with open(qss_file, 'r', encoding='utf-8') as f: self.setStyleSheet(f.read()) else: log_error(f"样式文件不存在:{qss_file}") def dragEnterEvent(self, event): if event.mimeData().hasUrls(): url = event.mimeData().urls()[0] if url.toString().lower().endswith('.txt'): event.acceptProposedAction() def dropEvent(self, event): urls = event.mimeData().urls() if urls: file_path = urls[0].toLocalFile() today = datetime.datetime.now().strftime("%Y%m%d") trade_date = os.path.basename(file_path).split('.')[0] if trade_date == today: log_error("不能加载今日文件,请选择历史日期文件。") else: self.load_stocks(file_path, trade_date) def select_file(self): file_path, _ = QFileDialog.getOpenFileName(self, "选择监控文件", self.OUT_PATH +"\history", "文本文件 (*.txt)") if file_path: trade_date = os.path.basename(file_path).split('.')[0] today = datetime.datetime.now().strftime("%Y%m%d") if trade_date == today: log_error("不能加载今日文件,请选择历史日期文件。") return self.load_stocks(file_path, trade_date) def load_stocks(self, file_path, trade_date): try: with open(file_path, 'rb') as f: raw_data = f.read(10000) encoding = chardet.detect(raw_data)['encoding'] or 'utf-8' with open(file_path, 'r', encoding=encoding, errors='ignore') as f: reader = csv.reader(f) headers = next(reader) required_cols = ['条件选股', '代码'] for col in required_cols: if col not in headers: raise KeyError(f"缺少必要列:{col}") name_col = headers.index('条件选股') code_col = headers.index('代码') codes = [] self.standard_model.removeRows(0, self.standard_model.rowCount()) for line in reader: if not line or len(line) <= max(name_col, code_col): continue name = line[name_col].strip() code = line[code_col].strip().zfill(6) formatted_code = self.format_code(code) codes.append(formatted_code) row_items = [ QStandardItem(formatted_code), QStandardItem(name), QStandardItem("-"), QStandardItem("0"), QStandardItem("-"), QStandardItem("-"), QStandardItem("-"), QStandardItem("监控中"), QStandardItem("") ] self.standard_model.appendRow(row_items) df = pro.daily(ts_code=','.join(codes), trade_date=trade_date) close_prices = {row['ts_code']: row['close'] for _, row in df.iterrows()} for i in range(self.standard_model.rowCount()): code_item = self.standard_model.item(i, 0) code = code_item.text() target_price = round(close_prices.get(code, 0) * 1.049, 2) self.standard_model.setItem(i, 5, QStandardItem(str(target_price))) self.stock_codes = [self.format_code(code.zfill(6)) for code in codes] # threading.Thread(target=self.fetch_share_capital_data, daemon=True).start() # 禁用相关控件以防止用户交互 self.btn_load.setEnabled(False) self.btn_save.setEnabled(False) self.checkbox_debug.setEnabled(False) # 启动后台线程获取流通盘数据 self.fetch_worker = FetchShareCapitalWorker() self.fetch_worker.data_ready.connect(self.on_share_capital_data_ready) self.fetch_worker.finished.connect(self.on_fetch_finished) self.fetch_worker.start() # 更新股票计数器 self.update_stock_counters() except Exception as e: log_error(f"加载失败: {str(e)}") # 重新启用控件 # self.btn_load.setEnabled(True) # self.btn_save.setEnabled(True) # self.checkbox_debug.setEnabled(True) def format_code(self, code): # 如果已包含市场后缀(.SH 或 .SZ),则不再处理 if '.' in code: return code code = code.zfill(6) if code.startswith(("6", "9")): return f"{code}.SH" elif code.startswith(("0", "2", "3")): return f"{code}.SZ" else: return code def fetch_share_capital_data(self): log_info("获取流通盘数据...") retry_count = 3 # 设置最大重试次数 for attempt in range(retry_count): try: dc_df = ts.realtime_list(src='dc') # print(dc_df) if dc_df is None or dc_df.empty: log_warning(f"第 {attempt + 1} 次获取流通盘失败,数据为空,准备重试...") time.sleep(2) # 等待 2 秒后重试 continue float_mv_dict = { row['TS_CODE']: row['FLOAT_MV'] / 1e8 for _, row in dc_df.iterrows() } self.float_share_cache = float_mv_dict log_info(f"流通盘缓存已更新,共 {len(float_mv_dict)} 条记录") # self.update_share_capital_ui() return float_mv_dict # 返回数据而不是直接更新UI except Exception as e: log_error(f"第 {attempt + 1} 次获取流通盘失败: {str(e)}") time.sleep(2) log_error("多次尝试获取流通盘失败,请检查网络连接、Token 权限或 Tushare 接口状态。") def update_share_capital_ui(self, float_share_data): try: for i in range(self.standard_model.rowCount()): code = self.standard_model.item(i, 0).text() lt_pan = self.float_share_cache.get(code, 0) # 判断流通盘是否大于100亿 if lt_pan < self.capital_threshold: item = QStandardItem(f"{lt_pan:.2f}亿") item.setBackground(QBrush(QColor("yellow"))) item.setForeground(QColor("red")) self.standard_model.setItem(i, 3, item) else: self.standard_model.setItem(i, 3, QStandardItem(f"{lt_pan:.2f}亿")) except Exception as e: log_error(f"更新流通盘数据失败: {str(e)}") def on_data_ready(self, all_results): self.current_all_results = all_results def on_update_finished(self): now = datetime.datetime.now() is_open = self.is_trading_time() for i in range(self.standard_model.rowCount()): code_item = self.standard_model.item(i, 0) code = code_item.text() quote = self.current_all_results.get(code) if quote is None or quote.empty: continue # 记录原始值用于比较 original_price = self.standard_model.item(i, 4).text() original_profit = self.standard_model.item(i, 6).text() current_price = float(quote['PRICE']) target_price = float(self.standard_model.item(i, 5).text()) # 计算获利百分比 profit_pct = round((current_price - target_price) / target_price * 100, 2) # 新增这行 # 当前价更新时闪烁 if original_price != f"{current_price:.2f}": self.standard_model.setItem(i, 4, QStandardItem(f"{current_price:.2f}")) self.status_delegate.flash_cell(i, 4) # 触发闪烁 # 获利%更新时闪烁 if original_profit != f"{profit_pct:.2f}%": self.standard_model.setItem(i, 6, QStandardItem(f"{profit_pct:.2f}%")) self.status_delegate.flash_cell(i, 6) # 触发闪烁 profit_item = QStandardItem(f"{profit_pct:.2f}%") # 根据获利百分比设置字体颜色 if profit_pct > 0: profit_item.setForeground(QColor("red")) elif profit_pct < 0: profit_item.setForeground(QColor("green")) self.standard_model.setItem(i, 6, profit_item) self.status_delegate.flash_cell(i, 6) # 触发闪烁 # # 判断获利是否大于0 # if profit_pct > 0: # profit_item = self.standard_model.item(i, 6) # profit_item.setForeground(QColor("red")) # # else: # profit_item = self.standard_model.item(i, 6) # profit_item.setForeground(QColor("green")) # 开盘涨幅逻辑:仅当开盘后第一次计算 开盘涨幅更新时闪烁 if is_open and code not in self.calculated_open_pct: try: open_price = float(quote['OPEN']) pre_close = float(quote['PRE_CLOSE']) open_zf = round((open_price - pre_close) / pre_close * 100, 2) # print(open_zf) open_zf_item = QStandardItem(f"{open_zf:.2f}%") # self.standard_model.setItem(i, 2, QStandardItem(f"{open_zf:.2f}%")) # 如果开盘涨幅小于等于阈值,标记为黄色 if self.open_zf_threshold_down < open_zf <= self.open_zf_threshold_up: open_zf_item.setBackground(QBrush(QColor("yellow"))) open_zf_item.setForeground(QColor("red")) self.standard_model.setItem(i, 2, open_zf_item) self.status_delegate.flash_cell(i, 2) # 触发闪烁 self.calculated_open_pct.add(code) # print(open_zf) # 新增判断:开盘涨幅大于 3%,标记为“错买” # if open_zf > self.open_zf_threshold: # profit_item = self.standard_model.item(i, 2) # profit_item.setForeground(QColor("red")) # profit_item.setBackground(QBrush(QColor("yellow"))) except Exception as e: log_error(f"计算开盘涨幅失败: {str(e)}") # 状态判断和更新 status_item = self.standard_model.item(i, 7) current_status = status_item.text() if status_item else "监控中" # 初始化new_status为当前状态 new_status = current_status # 添加这行初始化new_status # ... 状态判断逻辑 ... if new_status != current_status: # ... 设置新状态代码 ... self.standard_model.setItem(i, 7, status_item) self.status_delegate.flash_cell(i, 7) # 触发闪烁 high_price = float(quote['HIGH']) target_price = float(self.standard_model.item(i, 5).text()) if current_price >= target_price: new_status = "已触发" elif high_price >= target_price: new_status = "错买" else: new_status = "监控中" if new_status != current_status: status_item = QStandardItem(new_status) if new_status == "已触发": status_item.setData(0, Qt.ItemDataRole.UserRole) elif new_status == "错买": status_item.setData(1, Qt.ItemDataRole.UserRole) elif new_status == "监控中": status_item.setData(2, Qt.ItemDataRole.UserRole) self.standard_model.setItem(i, 7, status_item) self.delegate.flash_cell(i, 7) # 添加状态列闪烁 log_trigger(f"股票 {code} 状态更新: {current_status} -> {new_status}") time_item = self.standard_model.item(i, 8) if new_status in ["已触发", "错买"] and (time_item is None or not time_item.text()): time_item = QStandardItem(now.strftime("%Y-%m-%d %H:%M:%S")) self.standard_model.setItem(i, 8, time_item) log_trigger(f"{code} 状态已更新为 {new_status}") # 排序触发 self.table_view.sortByColumn(7, Qt.SortOrder.AscendingOrder) # 更新计数器 self.update_stock_counters() # 闪烁灯效果 self.flash_update_light() def update_prices(self): if not self.is_trading_time() and not self.checkbox_debug.isChecked(): return self.start_price_update_thread() def flash_update_light(self): light = self.update_light light.setStyleSheet("background-color: #00FF00; border-radius: 10px;") anim = QPropertyAnimation(light, b"geometry") anim.setDuration(300) anim.setKeyValueAt(0, light.geometry()) anim.setKeyValueAt(0.5, light.geometry().adjusted(-3, -3, 3, 3)) anim.setKeyValueAt(1, light.geometry()) anim.setLoopCount(2) anim.setEasingCurve(QEasingCurve.Type.OutBounce) anim.start() QTimer.singleShot(600, lambda: light.setStyleSheet("background-color: gray; border-radius: 10px;")) def place_order(self, index=None): if not index or not index.isValid(): return code = self.standard_model.item(index.row(), 0).text() pure_code = code.split('.')[0] # log_info(f"选中股票代码: {pure_code}") self.order_executor.place_order(pure_code, self.checkbox_auto.isChecked()) # 自动按状态排序 self.table_view.sortByColumn(7, Qt.SortOrder.AscendingOrder) # 第7列为“状态”列 def toggle_always_on_top(self, state): if state == 2: self.setWindowFlags(self.windowFlags() | Qt.WindowType.WindowStaysOnTopHint) log_info("启用窗口置顶") else: self.setWindowFlags(self.windowFlags() & ~Qt.WindowType.WindowStaysOnTopHint) log_info("取消窗口置顶") self.show() # 必须调用 show() 才会生效 def export_to_excel(self): try: if not self.standard_model.rowCount(): log_warning("没有可导出的数据!") self.show_warning("没有数据可以导出。") return today = datetime.datetime.now().strftime("%Y%m%d") filename = os.path.join(self.OUT_PATH, f"{today}_监控结果.xlsx") # 准备数据 data = [] for i in range(self.standard_model.rowCount()): row = [self.standard_model.item(i, j).text() for j in range(self.standard_model.columnCount())] data.append(row) df = pd.DataFrame(data, columns=[ "代码", "名称", "开盘涨幅", "流通盘", "当前价", "目标价", "获利%", "状态", "预警时间" ]) # 写入 Excel df.to_excel(filename, index=False) log_info(f"数据已保存至: {filename}") self.show_info("导出成功", f"数据已保存到: {filename}") except Exception as e: log_error(f"导出失败: {str(e)}") self.show_error(f"导出Excel数据时发生错误:{str(e)}") def is_trading_time(self): now = datetime.datetime.now().time() weekday = datetime.datetime.now().weekday() return ( weekday < 5 and( (datetime.time(9, 30) <= now <= datetime.time(11, 30)) or (datetime.time(13, 0) <= now <= datetime.time(15, 0)) ) ) def sort_table_by_status(self): model = self.standard_model rows = [] for i in range(model.rowCount()): status_item = model.item(i, 7) if not status_item: continue status = status_item.text() row_data = [] fonts = [] bg_colors = [] fg_colors = [] for j in range(model.columnCount()): item = model.item(i, j) if item: row_data.append(item.text()) fonts.append(item.font()) bg_colors.append(item.background().color()) fg_colors.append(item.foreground().color()) else: row_data.append("") fonts.append(QFont()) bg_colors.append(QColor('white')) fg_colors.append(QColor('black')) rows.append((status, row_data, fonts, bg_colors, fg_colors)) # 按照状态排序:触发 > 错买 > 监控中 sorted_rows = sorted(rows, key=lambda x: {'已触发': 0, '错买': 1, '监控中': 2}.get(x[0], 3)) model.removeRows(0, model.rowCount()) for _, row_data, fonts, bg_colors, fg_colors in sorted_rows: items = [] for j in range(len(row_data)): item = QStandardItem(row_data[j]) item.setFont(fonts[j]) item.setBackground(QColor(bg_colors[j])) item.setForeground(QColor(fg_colors[j])) items.append(item) model.appendRow(items) def start_status_update(self): def check_status(): while True: is_open = self.is_trading_time() status_text = "开盘中" if is_open else "收盘时间" color = "lime" if is_open else "red" self.trading_status_label.setText(f"当前状态:{status_text}") self.status_light.setStyleSheet(f"background-color: {color}; border-radius: 10px;") time.sleep(10) threading.Thread(target=check_status, daemon=True).start() def append_log(self, full_message, log_type='default'): """ 接收全局日志并显示在 UI 中 :param full_message: 带时间戳的完整日志信息 (str) :param log_type: 日志类型 (str),如 info/warning/error/trigger 等 """ self.log_style_manager.insert_log(full_message, log_type) # 写入文件 with open(self.log_file, "a", encoding="utf-8") as f: f.write(full_message + "\n") def show_error(self, msg): QMessageBox.critical(self, "错误", msg) def show_warning(self, msg): QMessageBox.warning(self, "警告", msg) def show_info(self, title, msg): QMessageBox.information(self, title, msg) def closeEvent(self, event): reply = QMessageBox.question( self, '退出', "确定要退出吗?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) if reply == QMessageBox.Yes: event.accept() else: event.ignore() if __name__ == "__main__": app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec())