import os import tkinter as tk from tkinter import ttk, messagebox, filedialog import tushare as ts import threading import time import datetime # 新增导入 import re class StockMonitor: def __init__(self, master): self.master = master self.master.title("股票价格监控系统") ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d') self.pro = ts.pro_api() self.create_widgets() self.monitor_active = True self.start_monitor() def create_widgets(self): # 创建Treeview容器框架 tree_frame = ttk.Frame(self.master) tree_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True) # 创建Treeview self.tree = ttk.Treeview( tree_frame, columns=('code', 'name', 'price', 'target', 'status', 'alert_time'), show='headings', height=15 # 设置默认显示行数 ) # self.tree.tag_configure('triggered', background='red') # 新增样式配置 self.tree.tag_configure('triggered', foreground='red') # 修改样式配置 # 配置列属性 columns = [ ('code', '代码', 80), ('name', '名称', 80), ('price', '当前价', 80), ('target', '目标价', 80), ('status', '状态', 60), ('alert_time', '预警时间', 100) ] for col_id, col_text, col_width in columns: self.tree.heading(col_id, text=col_text) self.tree.column(col_id, width=col_width, anchor=tk.CENTER) # 创建滚动条 vsb = ttk.Scrollbar( tree_frame, orient="vertical", command=self.tree.yview ) self.tree.configure(yscrollcommand=vsb.set) # 布局组件 self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) vsb.pack(side=tk.RIGHT, fill=tk.Y) # 文件选择按钮(原有代码保持不变) self.btn_load = ttk.Button( self.master, text="选择监控列表文件", command=self.select_file ) self.btn_load.pack(pady=10) def select_file(self): filepath = filedialog.askopenfilename( title="选择监控列表文件", filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")] ) if filepath: # 验证文件名是否为纯日期格式 filename = os.path.basename(filepath) if not re.match(r'^\d{8}\.txt$', filename): messagebox.showerror("错误", "文件名必须为8位数字日期格式(如:20250401.txt)") return trade_date = filename.split('.')[0] self.load_stocks(filepath, trade_date) def load_stocks(self, filepath, trade_date): try: self.tree.delete(*self.tree.get_children()) with open(filepath, 'r') as f: # 读取标题行并验证列 headers = next(f).strip().split(',') required_cols = ['条件选股', '代码'] for col in required_cols: if col not in headers: raise KeyError(f"缺少必要列:{col}") # 获取列索引 name_col = headers.index('条件选股') code_col = headers.index('代码') # price_col = headers.index('成交') # 批量获取历史数据 codes = [] for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split(',') if len(parts) <= max(name_col, code_col): continue code = parts[code_col].strip() code = f"{code:0>6}" # 标准化为6位代码 codes.append(self.format_code(code)) # 获取历史收盘价 df = self.pro.daily( ts_code=','.join(codes), trade_date=trade_date ) if df.empty: raise ValueError("未找到历史数据,请确认:\n1.日期是否为交易日\n2.股票代码是否正确") # 创建收盘价映射表 close_prices = {row['ts_code']: row['close'] for _, row in df.iterrows()} # 回到文件开头并处理数据 f.seek(0) next(f) # 跳过标题行 for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split(',') if len(parts) <= max(name_col, code_col): raise ValueError(f"数据行字段不足:{line}") name = parts[name_col].strip() code = parts[code_col].strip() # 确保代码保持字符串格式,保留前导零 code = f"{code:0>6}" # 确保代码是6位,不足前面补零 f_code = self.format_code(code) # current_price = parts[price_col].strip() # 获取收盘价 if f_code not in close_prices: print(f"跳过 {code},未找到收盘价数据") continue close_price = close_prices[f_code] # 计算目标价 target = round(float(close_price) * 1.049, 2) self.add_stock(code, name, target) # 在加载成功后更新标题 formatted_date = f"{trade_date[:4]}-{trade_date[4:6]}-{trade_date[6:]}" # 格式化为YYYY-MM-DD self.master.title(f"股票价格监控系统 - {formatted_date}") # 新增标题设置 messagebox.showinfo("成功", f"加载文件成功:{filepath}") except Exception as e: messagebox.showerror("错误", f"加载文件失败:{str(e)}") def add_stock(self, code, name, target): """添加股票到监控列表""" codes = self.format_code(str(code)) self.tree.insert('', 'end', values=(codes, name, '-', target, '监控中', '')) def format_code(self, code): # 为股票代码添加后缀 code = f"{code:0>6}" # 确保代码是6位,不足前面补零 if code.startswith(("6", "9")): f_code = f"{code}.SH" elif code.startswith(("0", "2", "3")): f_code = f"{code}.SZ" else: print(f"未知的股票代码格式: {code}") return None return f_code def update_prices(self): codes = [self.tree.item(item)['values'][0] for item in self.tree.get_children()] codes = [self.format_code(str(code)) for code in codes] required_columns = ['TS_CODE', 'LOW', 'PRICE'] try: batch_size = 40 for i in range(0, len(codes), batch_size): batch = codes[i:i+batch_size] # print(f"更新批次:{batch}") df = ts.realtime_quote(ts_code=','.join(batch)) print("所有列名:", df.columns.tolist()) print(df[required_columns]) for _, row in df.iterrows(): code = row['TS_CODE'] print(code) # current_price = row['close'] current_price = row['PRICE'] print(current_price) item = self.find_tree_item(code) if item: self.tree.set(item, 'price', f"{current_price:.2f}") target = float(self.tree.item(item)['values'][3]) print(current_price) print(target) if current_price >= target and self.tree.item(item)['values'][4] != '已触发': now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.master.after(0, self.tree.set, item, 'alert_time', now) self.tree.set(item, 'status', '已触发') self.tree.item(item, tags=('triggered',)) messagebox.showwarning( "价格提醒", f"{code} 已达到目标价!当前价:{current_price}" ) except Exception as e: print(f"更新失败:{str(e)}") def find_tree_item(self, code): for item in self.tree.get_children(): if self.tree.item(item)['values'][0] == code: return item return None def start_monitor(self): def monitor_loop(): while self.monitor_active: self.update_prices() time.sleep(10) threading.Thread(target=monitor_loop, daemon=True).start() def on_closing(self): self.monitor_active = False self.master.destroy() if __name__ == "__main__": root = tk.Tk() app = StockMonitor(root) root.protocol("WM_DELETE_WINDOW", app.on_closing) root.mainloop()