import os import sys import tkinter as tk from tkinter import ttk, messagebox, filedialog import tushare as ts import pandas as pd import threading import time import datetime # 新增导入 import re import win32gui import win32con import pyautogui from playsound import playsound import pygetwindow as gw import queue from threading import Lock from concurrent.futures import ThreadPoolExecutor from order_executor import OrderExecutor from logger_utils import setup_logger from logger_utils import setup_logger, LOG_STYLES # 导入统一样式映射 from logger_utils import log_info, log_warning, log_error, log_trigger, log_debug class StockMonitor: def __init__(self, master): # 设置全局日志回调 self.master = master self.master.title("股票价格监控系统") # # 创建日志输出框 log_frame = ttk.Frame(self.master) log_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, padx=5, pady=5) # self.log_label = ttk.Label(log_frame, text="日志输出:", font=('微软雅黑', 12)) self.log_text = tk.Text(log_frame, wrap=tk.WORD, height=10, state=tk.DISABLED, font=('微软雅黑', 11), fg='blue') scrollbar = ttk.Scrollbar(log_frame, command=self.log_text.yview) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) self.log_text.config(yscrollcommand=scrollbar.set) setup_logger(self.append_log) # 注册主程序的日志回调函数 # 【关键点】自动加载所有日志 tag 样式 for tag, style in LOG_STYLES.items(): self.log_text.tag_configure(tag, **style) # 注册全局日志回调 setup_logger(self.append_log) self.log_queue = queue.Queue() self.start_log_processor() ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d') self.pro = ts.pro_api() self.monitor_lock = Lock() self.timer_lock = Lock() self.thread_pool = ThreadPoolExecutor(max_workers=4) # 新增线程池 self.executor = ThreadPoolExecutor(max_workers=2) # 添加这行 self.float_share_cache = {} # 流通盘缓存 self.monitor_lock = threading.Lock() # 缓存锁 self.error_count = 0 # 初始化错误计数器 self.max_errors = 5 # 最大错误次数 self.pause_on_error = False # 是否暂停刷新 self.time_interval = 5 # 监控间隔时间(秒) self.MIN_OPEN_CHANGE = -2.5 # 最小开盘涨幅(%) self.MAX_OPEN_CHANGE = 3 # 最大开盘涨幅(%) self.out_path = r"D:\gp_data" # 输出目录 self.TARGET_RATIO = 1.049 # 目标涨幅比例 self.limit_up_cache = {} # 新增涨停次数缓存 self.auto_push_var = tk.IntVar(value=0) # 自动下单开关 self.auto_check_var = tk.IntVar(value=0) # 自动检查开关 self.auto_export_var = tk.IntVar(value=1) # 自动检查开关 self.create_widgets() self.monitor_active = True self.timer_active = False # 新增计时器状态标志 self.float_share_cache = {} # 新增流通盘数据缓存 self.float_share_cache_max = 1000 # 添加缓存上限 self.start_monitor() self.trigger_count = 0 self.previous_trading_status = None self.start_status_update() # 添加触发提醒标签 self.alert_label = tk.Label( self.master, text="", font=('微软雅黑', 50, 'bold'), fg='red', bg='yellow', width=20, # 宽度(字符数) height=4 # 高度(行数) ) self.alert_label.place(relx=0.5, rely=0.5, anchor='center') self.alert_label.place_forget() # 初始隐藏 self.flash_count = 0 self.max_flash = 0 # 闪动次数 self.target_window_title = "东方财富终端" # 交易窗口的名称 # 构建界面 def create_widgets(self): # 创建Treeview容器框架 tree_frame = ttk.Frame(self.master) tree_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True) # 设置字体和样式 style = ttk.Style() style.configure('Larger.TButton', font=('微软雅黑', 12)) # 创建Treeview self.tree = ttk.Treeview( tree_frame, columns=('code', 'name', 'open_zf', 'lt_pan', 'price', 'target', 'profit_pct', 'status', 'alert_time'), show='headings', height=20 # 设置默认显示行数 ) # 配置样式 style = ttk.Style() style.configure('Treeview', rowheight=20) # 设置行高 # 文字新增样式配置 self.tree.tag_configure('triggered', foreground='red', font=('Verdana', 12, 'bold')) # 修改样式配置 self.tree.tag_configure('wrong_buy', foreground='green', font=('Verdana', 11, 'bold')) # 修改样式配置 # 添加双击事件绑定 # 修改绑定方式为使用 lambda 忽略事件参数 self.tree.bind('', lambda e: self.place_order()) # 配置列属性 columns = [ ('code', '代码', 80), ('name', '名称', 80), ('open_zf', '开盘涨幅', 80), ('lt_pan', '流通盘', 80), ('price', '当前价', 80), ('target', '目标价', 80), ('profit_pct', '获利%', 60), # 新增列 ('status', '状态', 60), ('alert_time', '预警时间', 200) ] for col_id, col_text, col_width in columns: self.tree.heading(col_id, text=col_text, command=lambda c=col_id: self.treeview_sort_column(c, False)) self.tree.column(col_id, width=col_width, anchor=tk.CENTER) # 创建滚动条 vsb = ttk.Scrollbar( tree_frame, orient="vertical", command=self.tree.yview ) self.tree.configure(yscrollcommand=vsb.set) # 布局组件 self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) vsb.pack(side=tk.RIGHT, fill=tk.Y) # 创建控制按钮框架 control_frame = ttk.Frame(self.master) control_frame.pack(pady=20) # 文件选择按钮(原有代码保持不变) self.btn_load = ttk.Button( control_frame, text="选择监控文件", command=self.select_file, style='Larger.TButton', # 应用样式 width=18 # 新增宽度设置 ) self.btn_load.pack(side=tk.LEFT, padx=(5, 15)) # 显式指定侧边排列 self.btn_save = ttk.Button( control_frame, text="保存结果", command=self.export_to_excel, style='Larger.TButton', # 应用样式 width=18 # 新增宽度设置 ) self.btn_save.pack(side=tk.LEFT, padx=(5, 15)) # 显式指定侧边排列 # 窗口置顶复选框 self.topmost_var = tk.BooleanVar() self.topmost_cb = ttk.Checkbutton( control_frame, text="窗口置顶", variable=self.topmost_var, command=self.toggle_topmost ) self.topmost_cb.pack(side=tk.LEFT, padx=5) # 复选框-自动保存treeview结果 self.auto_export_cb = ttk.Checkbutton( control_frame, text="自动保存结果", variable=self.auto_export_var, # state=tk.ENABLED ) self.auto_export_cb.pack(side=tk.LEFT, padx=5) # 复选框-自动保存treeview结果 self.auto_check_cb = ttk.Checkbutton( control_frame, text="调试", variable=self.auto_check_var, # state=tk.ENABLED ) self.auto_check_cb.pack(side=tk.LEFT, padx=5) # 复选框-自动下单 self.auto_push_cb = ttk.Checkbutton( control_frame, text="半自动下单", variable=self.auto_push_var, state=tk.NORMAL ) self.auto_push_cb.pack(side=tk.LEFT, padx=5) # 显示触发股票数 status_frame = ttk.Frame(self.master) status_frame.pack(pady=5, fill=tk.X) # 创建数量显示Label self.trigger_count_label = ttk.Label( status_frame, text="已触发股票数量:0", font=('微软雅黑', 14), foreground='red' ) self.trigger_count_label.pack(padx=10, pady=5, side=tk.LEFT, anchor=tk.W) # 在status_frame中添加市场状态显示 self.market_status_label = ttk.Label( status_frame, text="市场状态:-", font=('微软雅黑', 14), foreground='blue' ) self.market_status_label.pack(padx=20, pady=5, side=tk.LEFT, anchor=tk.W) # 开盘时间显示 self.trading_status_label = ttk.Label( status_frame, text="当前状态:非交易时间", font=('微软雅黑', 14), foreground='red' ) self.trading_status_label.pack(padx=10, pady=5, side=tk.RIGHT, anchor=tk.E) # 添加状态灯(Canvas) self.market_status_light = tk.Canvas(status_frame, width=20, height=20, bg='white', highlightthickness=0) self.market_status_light.create_oval(2, 2, 18, 18, fill='gray', tags='light') self.market_status_light.pack(padx=5, side=tk.RIGHT) # 最后更新时间标签 self.last_update_label = ttk.Label( status_frame, text="最后更新: --:--:--", font=('微软雅黑', 14), foreground='gray' ) self.last_update_label.pack(padx=5, side=tk.RIGHT) # 在control_frame内添加定时监控组件 timing_frame = ttk.Frame(control_frame) timing_frame.pack(side=tk.LEFT, padx=(5, 15)) # 时间输入框 timing_label = ttk.Label(timing_frame, text="定时时间:") timing_label.pack(side=tk.LEFT) self.timing_time = tk.StringVar() self.timing_time.set("10:30") # 设置默认值为 9:30 self.timing_entry = ttk.Entry(timing_frame, textvariable=self.timing_time, width=8) self.timing_entry.pack(side=tk.LEFT) # 定时监控复选框 self.timing_enabled = tk.BooleanVar() self.timing_checkbox = ttk.Checkbutton( control_frame, text="定时结束监控", variable=self.timing_enabled, state=tk.NORMAL # state = tk.DISABLED ) self.timing_checkbox.pack(side=tk.LEFT, padx=5) def clean_cache(self): with self.monitor_lock: # 清理超过1000条记录的缓存 if len(self.float_share_cache) > self.float_share_cache_max: oldest_keys = sorted(self.float_share_cache.keys())[:100] for key in oldest_keys: del self.float_share_cache[key] def toggle_topmost(self): """切换窗口置顶状态""" self.master.attributes('-topmost', self.topmost_var.get()) if self.topmost_var.get(): log_info("窗口已置顶") else: log_info("取消窗口置顶") def get_limit_up_count(self, code, days=10): """获取最近days天内涨停次数""" with self.monitor_lock: # 新增锁机制 if code in self.limit_up_cache: return self.limit_up_cache[code] try: end_date = datetime.datetime.now().strftime('%Y%m%d') start_date = (datetime.datetime.now() - datetime.timedelta(days=days)).strftime('%Y%m%d') df = self.pro.daily( ts_code=code, start_date=start_date, end_date=end_date, freq='D' ) if df.empty: return 0 # 涨停条件:当日收盘价等于当日最高价,且涨幅>=9.8% df['is_limit_up'] = (df['close'] == df['high']) & ((df['close'] / df['pre_close'] - 1) >= 0.098) return df['is_limit_up'].sum() except Exception as e: log_error(f"获取涨停次数失败: {str(e)}") return 0 # 从缓存中获取 self.limit_up_cache[code] = count return count def check_buy_conditions(self, code, current_price, open_price, pre_close, high_price): """检查所有买入条件""" open_zf = round((float(open_price) - float(pre_close)) / float(pre_close) * 100, 2) # 基础条件 is_within_open_range = (self.MIN_OPEN_CHANGE <= open_zf <= self.MAX_OPEN_CHANGE) is_reached_target = (float(current_price) >= float(pre_close) * self.TARGET_RATIO) return is_within_open_range and is_reached_target def place_order(self): selection = self.tree.selection() if not selection: log_warning("未选中任何股票") return item = selection[0] code = self.tree.item(item, 'values')[0] pure_code = code.split('.')[0] log_info(f"选中股票代码: {pure_code}") try: auto_flag = self.auto_push_var.get() executor = OrderExecutor() executor.place_order(pure_code, auto_flag) except Exception as e: log_error(f"下单失败: {str(e)}") # treeview 排序 def treeview_sort_column(self, col, reverse): """点击表头排序功能""" # 获取当前所有行数据 l = [(self.tree.set(k, col), k) for k in self.tree.get_children('')] # 特殊处理状态列排序 if col == 'status': # 修改优先级顺序,已触发排最前,错买其次,监控中最后 priority_order = {'已触发': 0, '错买': 1, '监控中': 2} l.sort(key=lambda t: priority_order.get(t[0], 3), reverse=reverse) else: # 尝试转换为数字排序 try: l.sort(key=lambda t: float(t[0].replace('%', '')), reverse=reverse) except (ValueError, AttributeError): l.sort(reverse=reverse) # 重新排列Treeview中的行 for index, (val, k) in enumerate(l): self.tree.move(k, '', index) # 反转排序顺序 self.tree.heading(col, command=lambda: self.treeview_sort_column(col, not reverse)) def _do_append_log(self, info='message', log_type='default'): """实际执行日志插入操作的方法""" self.log_text.config(state=tk.NORMAL) # 规范化 log_type log_type = log_type.lower() valid_tags = ['default', 'info', 'loading', 'warning', 'error', 'trigger', 'debug'] if log_type not in valid_tags: log_type = 'default' self.log_text.insert(tk.END, f"{info}\n", (log_type,)) self.log_text.see(tk.END) self.log_text.config(state=tk.DISABLED) def start_log_processor(self): def log_worker(): while True: record = self.log_queue.get() if record is None: break self._do_append_log(**record) self.log_queue.task_done() threading.Thread(target=log_worker, daemon=True).start() # 输出log def append_log(self, info='message', log_type='default'): self.log_queue.put({'info': info, 'log_type': log_type}) def flash_alert(self, code, price): """显示闪动提醒""" self.alert_label.config(text=f"{code}\n触发! {price}") self.alert_label.place(relx=0.5, rely=0.5, anchor='center') self.flash_count = 0 self.do_flash() def do_flash(self): """执行闪动动画""" if self.flash_count < self.max_flash * 2: # 每次闪动包含显示和隐藏 if self.flash_count % 2 == 0: self.alert_label.place_forget() else: self.alert_label.place(relx=0.5, rely=0.5, anchor='center') self.flash_count += 1 self.master.after(500, self.do_flash) # 每500毫秒切换一次 else: self.alert_label.place_forget() def start_timer(self, message): """启动计时器,每秒输出日志""" self.timer_active = True def timer_loop(): count = 1 while self.timer_active: log_warning(f"{message}...{count}..等待") count += 1 time.sleep(1) threading.Thread(target=timer_loop, daemon=True).start() def stop_timer(self): """停止计时器""" self.timer_active = False # 选中文件读取 def safe_update_tree_item(self, item, values): """线程安全更新treeview条目""" if self.master and hasattr(self.master, '_windowingsystem'): # 更安全的检查 try: self.master.after(0, lambda: self.tree.item(item, values=values) if hasattr(self, 'tree') else None) except Exception as e: print(f"安全更新失败: {e}") # 添加错误打印 def select_file(self): filepath = filedialog.askopenfilename( title="选择监控列表文件", filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")] ) if filepath: # 验证文件名是否为纯日期格式 filename = os.path.basename(filepath) if not re.match(r'^\d{8}\.txt$', filename): messagebox.showerror("错误", "文件名必须为8位数字日期格式(如:20250401.txt)") return trade_date = filename.split('.')[0] self.load_stocks(filepath, trade_date) # 判断是否勾选定时监控 if self.timing_enabled.get(): time_str = self.timing_time.get().strip() if not self.validate_time(time_str): messagebox.showerror("错误", "时间格式应为 HH:MM(如:09:30)") return # 将时间字符串转换为 datetime 对象 target_time = datetime.datetime.strptime(time_str, "%H:%M").time() # 模拟日期,假设为当前日期 target_datetime = datetime.datetime.combine(datetime.datetime.now().date(), target_time) # 判断是否在开盘时间内 if not self.is_trading_time_at(target_datetime): log_trigger(f"定时时间 {time_str} 不在开盘时间中") else: log_trigger(f"定时时间 {time_str} 开始监控") self.start_monitor_at_time(time_str) else: self.start_monitor() def is_trading_time_at(self, target_datetime): """判断指定时间是否为交易日的开盘时间""" current_time = target_datetime.time() weekday = target_datetime.weekday() # 0-4 是周一到周五 return ( weekday < 5 and # 仅工作日 ( (datetime.time(9, 30) <= current_time <= datetime.time(11, 30)) or (datetime.time(13, 0) <= current_time <= datetime.time(15, 0)) ) ) def load_stocks(self, filepath, trade_date): # 判断是否是当日的股票数据,如果是就不操作,因为监控的是昨天或者以前的数据,不能是当天的 today = datetime.datetime.now().strftime("%Y%m%d") if trade_date == today: log_error(f"尝试加载当日股票数据:{filepath}, 请加载昨天或者以前的日期") return try: self.tree.delete(*self.tree.get_children()) with open(filepath, 'r') as f: # 读取标题行并验证列 headers = next(f).strip().split(',') required_cols = ['条件选股', '代码'] for col in required_cols: if col not in headers: raise KeyError(f"缺少必要列:{col}") # 获取列索引 name_col = headers.index('条件选股') code_col = headers.index('代码') # 批量获取历史数据 codes = [] for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split(',') if len(parts) <= max(name_col, code_col): continue code = parts[code_col].strip() code = f"{code:0>6}" # 标准化为6位代码 codes.append(self.format_code(code)) # 获取历史收盘价 df = self.pro.daily(ts_code=','.join(codes), trade_date=trade_date) if df.empty: raise ValueError("未找到历史数据,请确认:\n1.日期是否为交易日\n2.股票代码是否正确") # 创建收盘价映射表 close_prices = {row['ts_code']: row['close'] for _, row in df.iterrows()} # 启动线程获取流通盘数据(只调用一次) self.executor.submit(self.fetch_share_capital_data, codes) # 回到文件开头并处理数据 f.seek(0) next(f) # 跳过标题行 for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split(',') if len(parts) <= max(name_col, code_col): raise ValueError(f"数据行字段不足:{line}") name = parts[name_col].strip() code = parts[code_col].strip() code = f"{code:0>6}" # 确保代码是6位,不足前面补零 f_code = self.format_code(code) # 获取收盘价 if f_code not in close_prices: print(f"跳过 {code},未找到收盘价数据") continue close_price = close_prices[f_code] target = round(float(close_price) * 1.049, 2) # 从缓存获取流通盘数据 lt_pan = 0 # 初始值,实际值会在异步线程中更新 self.add_stock(code, name, target, lt_pan) # 格式化显示日期 formatted_date = f"{trade_date[:4]}-{trade_date[4:6]}-{trade_date[6:]}" self.master.title(f"股票价格监控系统 - {formatted_date}") log_info(f"加载文件成功:{filepath}") except Exception as e: log_error(f"加载文件失败:{str(e)}") # def batch_get_share_capital(self, trade_date, codes=None): # """批量获取流通盘数据""" # self.start_timer('获取流通盘数据') # # def fetch_data(): # try: # dc_df = ts.realtime_list(src='dc') # if dc_df.empty: # raise ValueError("未找到股本数据") # # # 创建代码到流通市值的映射(单位:亿) # float_mv_dict = { # row['TS_CODE']: row['FLOAT_MV'] / 100000000 # for _, row in dc_df.iterrows() # } # # # 停止计时器 # self.stop_timer() # # # 在主线程中更新UI # self.master.after(0, update_ui, float_mv_dict) # # except Exception as e: # log_error(f"批量获取流通盘数据失败:{str(e)},10秒后重试") # self.stop_timer() # self.master.after(10000, lambda: self.batch_get_share_capital(trade_date, codes)) # # def update_ui(float_mv_dict): # # 更新缓存数据 # self.float_share_cache.update(float_mv_dict) # # # 更新UI # for item in self.tree.get_children(): # code = self.tree.item(item, 'values')[0] # lt_pan = self.float_share_cache.get(code, 0) # self.tree.set(item, 'lt_pan', f"{lt_pan:.2f}亿") # log_info("流通盘数据更新完成") # # # 启动线程获取数据 # threading.Thread(target=fetch_data, daemon=True).start() def fetch_share_capital_data(self, codes): """异步获取流通盘数据(运行在子线程)""" try: dc_df = ts.realtime_list(src='dc') if dc_df.empty: raise ValueError("未找到股本数据") float_mv_dict = { row['TS_CODE']: row['FLOAT_MV'] / 100000000 for _, row in dc_df.iterrows() } # 更新缓存 with self.monitor_lock: self.float_share_cache.update(float_mv_dict) # 触发UI更新(必须在主线程) self.master.after(0, self.update_share_capital_ui, codes) except Exception as e: log_error(f"批量获取流通盘数据失败:{str(e)}") # 可选:10秒后重试 self.master.after(10000, lambda: self.fetch_share_capital_data(codes)) def add_stock(self, code, name, target, lt_pan=0): """添加股票到监控列表""" codes = self.format_code(str(code)) self.tree.insert('', 'end', values=( codes, name, '-', f"{lt_pan:.2f}", '-', target, '-', '监控中', '' # 初始显示为'-',等待异步加载 )) # 检查是否为交易日 def is_trading_time(self): now = datetime.datetime.now() current_time = now.time() weekday = now.weekday() # 0-4是周一到周五 return ( weekday < 5 and # 仅工作日 ( (datetime.time(9, 30) <= current_time <= datetime.time(11, 30)) or (datetime.time(13, 0) <= current_time <= datetime.time(15, 0)) ) ) def update_trading_status(self): current_status = "开盘中" if self.is_trading_time() else "收盘时间" color = "green" if current_status == "开盘中" else "red" # 状态变更时才记录日志 if current_status != self.previous_trading_status: log_info(f"系统状态更新:{current_status}") self.previous_trading_status = current_status # 变更UI self.master.after(0, lambda s=current_status, c=color: self.trading_status_label.config(text=f"当前状态:{s}", foreground=c)) def update_share_capital_ui(self, codes): """在主线程中更新 Treeview 的流通盘信息""" try: for item in self.tree.get_children(): code = self.tree.item(item, 'values')[0] lt_pan = self.float_share_cache.get(code, 0) self.tree.set(item, 'lt_pan', f"{lt_pan:.2f}亿") log_info("流通盘数据更新完成") except Exception as e: log_error(f"更新流通盘UI时发生错误:{str(e)}") # 监控状态 def start_status_update(self): def _loop(): while self.monitor_active: self.update_trading_status() time.sleep(1) # 每秒检查一次 threading.Thread(target=_loop, daemon=True).start() def validate_time(self, time_str): try: datetime.datetime.strptime(time_str, "%H:%M") return True except ValueError: return False def on_timing_change(self, *args): if self.timing_enabled.get(): time_str = self.timing_time.get().strip() if not self.validate_time(time_str): messagebox.showerror("错误", "时间格式应为 HH:MM(如:09:30)") self.timing_enabled.set(False) return self.start_monitor_at_time(time_str) else: self.monitor_active = True self.start_monitor() def start_monitor_at_time(self, target_time): now = datetime.datetime.now() target = datetime.datetime.strptime(target_time, "%H:%M") target = now.replace( hour=target.hour, minute=target.minute, second=0, microsecond=0 ) if target < now: target += datetime.timedelta(days=1) delta = target - now threading.Timer(delta.total_seconds(), self.start_monitor).start() # 格式化代码 def format_code(self, code): # 为股票代码添加后缀 code = f"{code:0>6}" # 确保代码是6位,不足前面补零 if code.startswith(("6", "9")): f_code = f"{code}.SH" elif code.startswith(("0", "2", "3")): f_code = f"{code}.SZ" else: print(f"未知的股票代码格式: {code}") return None return f_code def play_audio(self, file_path): try: if not os.path.exists(file_path): raise FileNotFoundError(f"文件路径不存在: {file_path}") # 使用playsound库静默播放 playsound(file_path, block=False) # block=False表示非阻塞播放 except FileNotFoundError as e: log_error(f"音频文件错误: {e}") except Exception as e: log_error(f"播放音频失败: {e}") def fetch_batch_data(self, batch): """并发获取单个批次的行情数据""" try: df = ts.realtime_quote(ts_code=','.join(batch)) if df is None or df.empty: return None return {row['TS_CODE']: row for _, row in df.iterrows()} except Exception as e: log_error(f"获取行情失败(批次): {str(e)}") return None def process_all_results(self, all_results): """批量处理行情数据并准备更新项""" updates = [] triggered_items = [] # 提前获取列索引 status_col = self.tree['columns'].index('status') target_col = self.tree['columns'].index('target') price_col = self.tree['columns'].index('price') profit_pct_col = self.tree['columns'].index('profit_pct') open_zf_col = self.tree['columns'].index('open_zf') alert_time_col = self.tree['columns'].index('alert_time') now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") for code, row in all_results.items(): current_price = float(row['PRICE']) open_price = float(row['OPEN']) pre_close = float(row['PRE_CLOSE']) high_price = float(row['HIGH']) item = self.find_tree_item(code) if not item: continue values = list(self.tree.item(item, 'values')) # 计算开盘涨幅 open_zf = round((open_price - pre_close) / pre_close * 100, 2) values[open_zf_col] = f"{open_zf:.2f}%" values[price_col] = f"{current_price:.2f}" # 计算获利比例 target = float(values[target_col]) profit_pct = round((current_price - target) / target * 100, 2) values[profit_pct_col] = f"{profit_pct:.2f}%" # 判断买入条件 buy_condition = self.check_buy_conditions(code, current_price, open_price, pre_close, high_price) # 正常触发条件 if buy_condition and values[status_col] != '已触发': values[alert_time_col] = now values[status_col] = '已触发' log_trigger(f"{code} 已触发!当前价:{current_price}") triggered_items.append(code) # 自动下单 if self.auto_push_var.get(): self.place_order() # 错买判断 was_above_target = high_price >= target is_current_below = current_price < target if was_above_target and is_current_below and values[status_col] != '错买': values[status_col] = '错买' log_warning(f"{code} 价格回落!错买,当前价:{current_price}") updates.append((item, values)) # 批量更新 UI if updates: self.master.after(0, self.batch_safe_update_tree_items, updates) # 统计并更新状态 if updates: triggered_count = sum(1 for _, vals in updates if vals[status_col] in ('已触发', '错买')) self.master.after(0, lambda: self.trigger_count_label.config( text=f"已触发股票数量:{triggered_count}" )) total_stocks = len(updates) if total_stocks > 0: trigger_percent = (triggered_count / total_stocks) * 100 market_status = self.get_market_status(trigger_percent) self.master.after(0, lambda: self.market_status_label.config( text=f"市场状态:{market_status}", foreground=self.get_market_status_color(market_status) )) # ✅ 新增:在数据更新后重新按状态排序 self.treeview_sort_column('status', False) def batch_safe_update_tree_items(self, items_values_list): """批量安全更新 Treeview 行""" for item, values in items_values_list: self._update_tree_item_safe(item, values) def update_prices(self): if not self.is_trading_time() and not self.auto_check_var.get(): if not getattr(self, '_logged_market_close', False): log_info("当前为收盘时间,停止更新行情数据") self._logged_market_close = True return else: if getattr(self, '_logged_market_close', False): del self._logged_market_close log_info("已进入交易时间或调试模式,恢复行情更新") if self.pause_on_error: log_error("暂停更新,等待恢复") time.sleep(30) self.pause_on_error = False return if not hasattr(self, 'tree') or not self.tree.winfo_exists(): return items = self.tree.get_children() if not items: return codes = [self.tree.item(item)['values'][0] for item in items] formatted_codes = [self.format_code(str(code)) for code in codes if code] if not formatted_codes: log_error("没有可监控的股票代码") return batch_size = 40 try: with ThreadPoolExecutor(max_workers=4) as executor: futures = [] for i in range(0, len(formatted_codes), batch_size): if not self.monitor_active: return batch = formatted_codes[i:i + batch_size] futures.append(executor.submit(self.fetch_batch_data, batch)) all_results = {} for future in futures: result = future.result() if result: all_results.update(result) self.process_all_results(all_results) now = datetime.datetime.now() self.last_update_label.config(text=f"最后更新: {now.strftime('%H:%M:%S')}") self.market_status_light.itemconfig('light', fill='lime') self.master.after(500, lambda: self.market_status_light.itemconfig('light', fill='gray')) except Exception as e: self.error_count += 1 if self.error_count >= self.max_errors: log_error("连续错误过多,暂停更新30秒") self.pause_on_error = True self.error_count = 0 log_error(f"更新价格严重错误: {str(e)}") import traceback traceback.print_exc() def _update_tree_item_safe(self, item, values): """实际更新 Treeview 行的方法""" if not item or not self.tree.exists(item): return try: status_col = self.tree['columns'].index('status') current_status = values[status_col] # 设置 tag if current_status == '已触发': self.tree.item(item, tags=('triggered',)) elif current_status == '错买': self.tree.item(item, tags=('wrong_buy',)) else: self.tree.item(item, tags=()) # 更新整行数据 self.tree.item(item, values=values) except Exception as e: log_error(f"更新 Treeview 条目失败: {str(e)}") def find_tree_item(self, code): for item in self.tree.get_children(): if self.tree.item(item)['values'][0] == code: return item return None def start_monitor(self): if self.timing_enabled.get(): self.monitor_active = True else: self.monitor_active = True # 非定时模式直接启动 self.thread_pool.submit(self.monitor_loop) def monitor_loop(self): while self.monitor_active: self.update_prices() time.sleep(self.time_interval) def get_market_status(self, percent): """根据触发百分比返回市场状态""" if percent == 0: return "极弱" elif percent <= 15: return "弱势" elif percent <= 20: return "中等" else: return "活跃" @staticmethod def get_market_status_color(status): """根据市场状态返回颜色""" return { "极弱": "red", "弱势": "orange", "中等": "blue", "活跃": "green" }.get(status, "black") def export_to_excel(self): """将Treeview数据导出到Excel文件""" try: # 检查Treeview是否为空 if not self.tree.get_children(): tk.messagebox.showwarning("导出警告", "当前没有可导出的数据!") return # 获取当前日期作为默认文件名 now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(self.out_path, f"短线速逆数据_{now}.xlsx") # 获取Treeview所有数据 data = [] columns = self.tree["columns"] data.append(columns) # 添加表头 for item in self.tree.get_children(): values = [self.tree.set(item, col) for col in columns] data.append(values) # 创建DataFrame并保存为Excel df = pd.DataFrame(data[1:], columns=data[0]) df.to_excel(filename, index=False) # 提示导出成功 tk.messagebox.showinfo("导出成功", f"数据已成功导出到: {filename}") except Exception as e: tk.messagebox.showerror("导出错误", f"导出失败: {str(e)}") def on_closing(self): try: self.monitor_active = False # 停止所有后台循环 self.executor.shutdown(wait=False) self.thread_pool.shutdown(wait=False) # 取消所有 pending 的 after 事件 for job in self.master.tk.eval('after info').split(): self.master.after_cancel(job) time.sleep(0.2) # 给线程一点时间退出 if hasattr(self, 'master') and self.master.winfo_exists(): self.master.quit() self.master.destroy() except Exception as e: log_error(f"关闭程序失败: {str(e)}") sys.exit(0) if __name__ == "__main__": try: root = tk.Tk() app = StockMonitor(root) root.protocol("WM_DELETE_WINDOW", app.on_closing) root.mainloop() except Exception as e: print(f"程序发生异常: {str(e)}") import traceback traceback.print_exc() # 打印完整堆栈信息 sys.exit(1)