import os import tkinter as tk from tkinter import ttk, messagebox, filedialog import tushare as ts import threading import time import datetime # 新增导入 import re class StockMonitor: def __init__(self, master): self.master = master self.master.title("股票价格监控系统") ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d') self.pro = ts.pro_api() self.time_interval = 10 # 监控间隔时间(秒) self.auto_push_var = tk.IntVar(value=0) # 自动下单开关 self.create_widgets() self.monitor_active = True self.start_monitor() # 构建界面 def create_widgets(self): # 创建Treeview容器框架 tree_frame = ttk.Frame(self.master) tree_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True) style = ttk.Style() style.configure('Larger.TButton', font=('微软雅黑', 12)) # 创建Treeview self.tree = ttk.Treeview( tree_frame, columns=('code', 'name', 'price', 'target', 'status', 'alert_time'), show='headings', height=20 # 设置默认显示行数 ) # self.tree.tag_configure('triggered', background='red') # 新增样式配置 self.tree.tag_configure('triggered', foreground='red', font=('Verdana', 12, 'bold')) # 修改样式配置 # 配置列属性 columns = [ ('code', '代码', 80), ('name', '名称', 80), ('price', '当前价', 80), ('target', '目标价', 80), ('status', '状态', 60), ('alert_time', '预警时间', 180) ] for col_id, col_text, col_width in columns: self.tree.heading(col_id, text=col_text) self.tree.column(col_id, width=col_width, anchor=tk.CENTER) # 创建滚动条 vsb = ttk.Scrollbar( tree_frame, orient="vertical", command=self.tree.yview ) self.tree.configure(yscrollcommand=vsb.set) # 布局组件 self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) vsb.pack(side=tk.RIGHT, fill=tk.Y) # 创建控制按钮框架 control_frame = ttk.Frame(self.master) control_frame.pack(pady=10) # 文件选择按钮(原有代码保持不变) self.btn_load = ttk.Button( control_frame, text="选择监控文件", command=self.select_file, style='Larger.TButton' # 应用样式 ) # self.btn_load.pack(pady=10) self.btn_load.pack(side=tk.LEFT, padx=5) # 显式指定侧边排列 # 添加复选框 self.auto_push_cb = ttk.Checkbutton( control_frame, text="自动下单", variable=self.auto_push_var ) self.auto_push_cb.pack(side=tk.LEFT, padx=5) def select_file(self): filepath = filedialog.askopenfilename( title="选择监控列表文件", filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")] ) if filepath: # 验证文件名是否为纯日期格式 filename = os.path.basename(filepath) if not re.match(r'^\d{8}\.txt$', filename): messagebox.showerror("错误", "文件名必须为8位数字日期格式(如:20250401.txt)") return trade_date = filename.split('.')[0] self.load_stocks(filepath, trade_date) def load_stocks(self, filepath, trade_date): try: self.tree.delete(*self.tree.get_children()) with open(filepath, 'r') as f: # 读取标题行并验证列 headers = next(f).strip().split(',') required_cols = ['条件选股', '代码'] for col in required_cols: if col not in headers: raise KeyError(f"缺少必要列:{col}") # 获取列索引 name_col = headers.index('条件选股') code_col = headers.index('代码') # price_col = headers.index('成交') # 批量获取历史数据 codes = [] for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split(',') if len(parts) <= max(name_col, code_col): continue code = parts[code_col].strip() code = f"{code:0>6}" # 标准化为6位代码 codes.append(self.format_code(code)) # 获取历史收盘价 df = self.pro.daily( ts_code=','.join(codes), trade_date=trade_date ) if df.empty: raise ValueError("未找到历史数据,请确认:\n1.日期是否为交易日\n2.股票代码是否正确") # 创建收盘价映射表 close_prices = {row['ts_code']: row['close'] for _, row in df.iterrows()} # 回到文件开头并处理数据 f.seek(0) next(f) # 跳过标题行 for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split(',') if len(parts) <= max(name_col, code_col): raise ValueError(f"数据行字段不足:{line}") name = parts[name_col].strip() code = parts[code_col].strip() # 确保代码保持字符串格式,保留前导零 code = f"{code:0>6}" # 确保代码是6位,不足前面补零 f_code = self.format_code(code) # current_price = parts[price_col].strip() # 获取收盘价 if f_code not in close_prices: print(f"跳过 {code},未找到收盘价数据") continue close_price = close_prices[f_code] # 计算目标价 target = round(float(close_price) * 1.049, 2) self.add_stock(code, name, target) # 在加载成功后更新标题 formatted_date = f"{trade_date[:4]}-{trade_date[4:6]}-{trade_date[6:]}" # 格式化为YYYY-MM-DD self.master.title(f"股票价格监控系统 - {formatted_date}") # 新增标题设置 messagebox.showinfo("成功", f"加载文件成功:{filepath}") except Exception as e: messagebox.showerror("错误", f"加载文件失败:{str(e)}") def add_stock(self, code, name, target): """添加股票到监控列表""" codes = self.format_code(str(code)) self.tree.insert('', 'end', values=(codes, name, '-', target, '监控中', '')) def format_code(self, code): # 为股票代码添加后缀 code = f"{code:0>6}" # 确保代码是6位,不足前面补零 if code.startswith(("6", "9")): f_code = f"{code}.SH" elif code.startswith(("0", "2", "3")): f_code = f"{code}.SZ" else: print(f"未知的股票代码格式: {code}") return None return f_code def update_prices(self): codes = [self.tree.item(item)['values'][0] for item in self.tree.get_children()] codes = [self.format_code(str(code)) for code in codes] required_columns = ['TS_CODE', 'LOW', 'PRICE'] try: batch_size = 40 for i in range(0, len(codes), batch_size): batch = codes[i:i+batch_size] print(f"更新批次:{batch}") df = ts.realtime_quote(ts_code=','.join(batch)) # print("所有列名:", df.columns.tolist()) # print(df[required_columns]) for _, row in df.iterrows(): code = row['TS_CODE'] # print(code) # current_price = row['close'] current_price = row['PRICE'] # print(current_price) item = self.find_tree_item(code) if item: self.tree.set(item, 'price', f"{current_price:.2f}") target = float(self.tree.item(item)['values'][3]) # print(current_price) # print(target) if current_price >= target and self.tree.item(item)['values'][4] != '已触发': now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.master.after(0, self.tree.set, item, 'alert_time', now) # 自动下单 if self.auto_push_var.get(): self.autopush(item) self.tree.set(item, 'status', '已触发') self.tree.item(item, tags=('triggered',)) messagebox.showwarning( "价格提醒", f"{code} 已达到目标价!当前价:{current_price}" ) except Exception as e: print(f"更新失败:{str(e)}") def find_tree_item(self, code): for item in self.tree.get_children(): if self.tree.item(item)['values'][0] == code: return item return None def start_monitor(self): def monitor_loop(): while self.monitor_active: self.update_prices() time.sleep(self.time_interval) threading.Thread(target=monitor_loop, daemon=True).start() def on_closing(self): self.monitor_active = False self.master.destroy() # 扩展自动下单: def autopush(self, item): code = self.tree.item(item)['values'][0] current_price = float(self.tree.item(item)['values'][2]) print(code) confirm = messagebox.askyesno( "确认下单", f"确认为 {code} 在当前价 {current_price} 自动买入吗?" ) if confirm: # 调用真实交易接口示例: # result = trade_api.sell(code, current_price) # ... 处理下单结果 ... messagebox.showinfo("下单成功", "已执行自动买入操作") if __name__ == "__main__": root = tk.Tk() app = StockMonitor(root) root.protocol("WM_DELETE_WINDOW", app.on_closing) root.mainloop()