import os import sys import subprocess import tkinter as tk from tkinter import ttk, messagebox, filedialog import tushare as ts import threading import time import datetime # 新增导入 import re class StockMonitor: def __init__(self, master): self.master = master self.master.title("股票价格监控系统") ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d') self.pro = ts.pro_api() self.time_interval = 10 # 监控间隔时间(秒) self.auto_push_var = tk.IntVar(value=0) # 自动下单开关 self.create_widgets() self.monitor_active = True self.start_monitor() self.trigger_count = 0 self.previous_trading_status = None self.start_status_update() # 构建界面 def create_widgets(self): # 创建Treeview容器框架 tree_frame = ttk.Frame(self.master) tree_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True) # tree_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True, side=tk.LEFT) # 设置字体和样式 style = ttk.Style() style.configure('Larger.TButton', font=('微软雅黑', 12)) # style.configure('Log.TText', font=('微软雅黑', 12)) # # 创建日志输出框 log_frame = ttk.Frame(self.master) log_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, padx=5, pady=5) # self.log_label = ttk.Label(log_frame, text="日志输出:", font=('微软雅黑', 12)) self.log_text = tk.Text(log_frame, wrap=tk.WORD, height=10, state=tk.DISABLED, font=('微软雅黑', 11), fg='blue') scrollbar = ttk.Scrollbar(log_frame, command=self.log_text.yview) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) self.log_text.config(yscrollcommand=scrollbar.set) # 创建Treeview self.tree = ttk.Treeview( tree_frame, columns=('code', 'name', 'open_zf', 'lt_pan', 'price', 'target', 'status', 'alert_time'), show='headings', height=20 # 设置默认显示行数 ) # 文字新增样式配置 self.tree.tag_configure('triggered', foreground='red', font=('Verdana', 12, 'bold')) # 修改样式配置 self.tree.tag_configure('wrong_buy', foreground='green', font=('Verdana', 11, 'bold')) # 修改样式配置 # 配置列属性 columns = [ ('code', '代码', 80), ('name', '名称', 80), ('open_zf', '开盘涨幅', 80), ('lt_pan', '流通盘', 80), ('price', '当前价', 80), ('target', '目标价', 80), ('status', '状态', 60), ('alert_time', '预警时间', 200) ] for col_id, col_text, col_width in columns: self.tree.heading(col_id, text=col_text) self.tree.column(col_id, width=col_width, anchor=tk.CENTER) # 创建滚动条 vsb = ttk.Scrollbar( tree_frame, orient="vertical", command=self.tree.yview ) self.tree.configure(yscrollcommand=vsb.set) # 布局组件 self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) vsb.pack(side=tk.RIGHT, fill=tk.Y) # 创建控制按钮框架 control_frame = ttk.Frame(self.master) control_frame.pack(pady=20) # 文件选择按钮(原有代码保持不变) self.btn_load = ttk.Button( control_frame, text="选择监控文件", command=self.select_file, style='Larger.TButton', # 应用样式 width=18 # 新增宽度设置 ) self.btn_load.pack(side=tk.LEFT, padx=(5, 15)) # 显式指定侧边排列 # 文件选择按钮(原有代码保持不变) self.btn_moniter = ttk.Button( control_frame, text="开始监控", command=self.select_file, style='Larger.TButton', # 应用样式 width=15, # 新增宽度设置, state=tk.DISABLED # 初始禁用 ) self.btn_moniter.pack(side=tk.LEFT, padx=(10, 15)) # 显式指定侧边排列 # 复选框-自动下单 self.auto_push_cb = ttk.Checkbutton( control_frame, text="自动下单", variable=self.auto_push_var, state=tk.DISABLED ) self.auto_push_cb.pack(side=tk.LEFT, padx=5) # 显示触发股票数 status_frame = ttk.Frame(self.master) status_frame.pack(pady=5, fill=tk.X) # 创建数量显示Label self.trigger_count_label = ttk.Label( status_frame, text="已触发股票数量:0", font=('微软雅黑', 14), foreground='red' ) self.trigger_count_label.pack(padx=10, pady=5, side=tk.LEFT, anchor=tk.W) # 开盘时间显示 self.trading_status_label = ttk.Label( status_frame, text="当前状态:非开盘时间", font=('微软雅黑', 14), foreground='red' ) self.trading_status_label.pack(padx=10, pady=5, side=tk.RIGHT, anchor=tk.E) # 在control_frame内添加定时监控组件 timing_frame = ttk.Frame(control_frame) timing_frame.pack(side=tk.LEFT, padx=(5, 15)) # 时间输入框 timing_label = ttk.Label(timing_frame, text="定时时间:") timing_label.pack(side=tk.LEFT) self.timing_time = tk.StringVar() self.timing_entry = ttk.Entry(timing_frame, textvariable=self.timing_time, width=8) self.timing_entry.pack(side=tk.LEFT) # 定时监控复选框 self.timing_enabled = tk.BooleanVar() self.timing_checkbox = ttk.Checkbutton( control_frame, text="定时监控", variable=self.timing_enabled ) self.timing_checkbox.pack(side=tk.LEFT, padx=5) def append_log(self, message): current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 格式化到秒 self.log_text.config(state=tk.NORMAL) self.log_text.insert(tk.END, f"[{current_time}] {message}\n") self.log_text.see(tk.END) self.log_text.config(state=tk.DISABLED) # 选中文件读取 def select_file(self): filepath = filedialog.askopenfilename( title="选择监控列表文件", filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")] ) if filepath: # 验证文件名是否为纯日期格式 filename = os.path.basename(filepath) if not re.match(r'^\d{8}\.txt$', filename): messagebox.showerror("错误", "文件名必须为8位数字日期格式(如:20250401.txt)") return trade_date = filename.split('.')[0] self.load_stocks(filepath, trade_date) def load_stocks(self, filepath, trade_date): try: self.tree.delete(*self.tree.get_children()) with open(filepath, 'r') as f: # 读取标题行并验证列 headers = next(f).strip().split(',') required_cols = ['条件选股', '代码'] for col in required_cols: if col not in headers: raise KeyError(f"缺少必要列:{col}") # 获取列索引 name_col = headers.index('条件选股') code_col = headers.index('代码') # price_col = headers.index('成交') # 批量获取历史数据 codes = [] for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split(',') if len(parts) <= max(name_col, code_col): continue code = parts[code_col].strip() code = f"{code:0>6}" # 标准化为6位代码 codes.append(self.format_code(code)) # 获取历史收盘价 df = self.pro.daily( ts_code=','.join(codes), trade_date=trade_date ) if df.empty: raise ValueError("未找到历史数据,请确认:\n1.日期是否为交易日\n2.股票代码是否正确") # 创建收盘价映射表 close_prices = {row['ts_code']: row['close'] for _, row in df.iterrows()} # 回到文件开头并处理数据 f.seek(0) next(f) # 跳过标题行 for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split(',') if len(parts) <= max(name_col, code_col): raise ValueError(f"数据行字段不足:{line}") name = parts[name_col].strip() code = parts[code_col].strip() # 确保代码保持字符串格式,保留前导零 code = f"{code:0>6}" # 确保代码是6位,不足前面补零 f_code = self.format_code(code) # 获取收盘价 if f_code not in close_prices: print(f"跳过 {code},未找到收盘价数据") continue close_price = close_prices[f_code] # 计算目标价 target = round(float(close_price) * 1.049, 2) self.add_stock(code, name, target) # 在加载成功后更新标题 formatted_date = f"{trade_date[:4]}-{trade_date[4:6]}-{trade_date[6:]}" # 格式化为YYYY-MM-DD self.master.title(f"股票价格监控系统 - {formatted_date}") # 新增标题设置 self.append_log(f"加载文件成功:{filepath}") except Exception as e: self.append_log(f"加载文件失败:{str(e)}") # messagebox.showerror("错误", f"加载文件失败:{str(e)}") def add_stock(self, code, name, target): """添加股票到监控列表""" codes = self.format_code(str(code)) self.tree.insert('', 'end', values=(codes, name, '-', '-', '-', target, '监控中', '')) # 检查是否为交易日 def is_trading_time(self): now = datetime.datetime.now() current_time = now.time() weekday = now.weekday() # 0-4是周一到周五 return ( weekday < 5 and # 仅工作日 ( (datetime.time(9, 30) <= current_time <= datetime.time(11, 30)) or (datetime.time(13, 0) <= current_time <= datetime.time(15, 0)) ) ) def update_trading_status(self): current_status = "开盘中" if self.is_trading_time() else "非开盘时间" color = "green" if current_status == "开盘中" else "red" # 状态变更时才记录日志 if current_status != self.previous_trading_status: self.append_log(f"系统状态更新:{current_status}") self.previous_trading_status = current_status # 变更UI self.master.after(0, lambda s=current_status, c=color: self.trading_status_label.config(text=f"当前状态:{s}", foreground=c)) # 监控状态 def start_status_update(self): def _loop(): while self.monitor_active: self.update_trading_status() time.sleep(1) # 每秒检查一次 threading.Thread(target=_loop, daemon=True).start() def validate_time(self, time_str): try: datetime.datetime.strptime(time_str, "%H:%M") return True except ValueError: return False def on_timing_change(self, *args): if self.timing_enabled.get(): time_str = self.timing_time.get().strip() if not self.validate_time(time_str): messagebox.showerror("错误", "时间格式应为 HH:MM(如:09:30)") self.timing_enabled.set(False) return self.start_monitor_at_time(time_str) else: self.monitor_active = True self.start_monitor() def start_monitor_at_time(self, target_time): now = datetime.datetime.now() target = datetime.datetime.strptime(target_time, "%H:%M") target = now.replace( hour=target.hour, minute=target.minute, second=0, microsecond=0 ) if target < now: target += datetime.timedelta(days=1) delta = target - now threading.Timer(delta.total_seconds(), self.start_monitor).start() # 格式化代码 def format_code(self, code): # 为股票代码添加后缀 code = f"{code:0>6}" # 确保代码是6位,不足前面补零 if code.startswith(("6", "9")): f_code = f"{code}.SH" elif code.startswith(("0", "2", "3")): f_code = f"{code}.SZ" else: print(f"未知的股票代码格式: {code}") return None return f_code def play_audio(self, file_path): try: if not os.path.exists(file_path): raise FileNotFoundError(f"文件路径不存在: {file_path}") if sys.platform == "win32": os.startfile(file_path) elif sys.platform == "darwin": subprocess.run(["open", file_path], check=True) else: subprocess.run(["xdg-open", file_path], check=True) except FileNotFoundError as e: print(f"错误: {e}") except subprocess.CalledProcessError as e: print(f"播放失败: {e}") except Exception as e: print(f"未知错误: {e}") def update_prices(self): codes = [self.tree.item(item)['values'][0] for item in self.tree.get_children()] codes = [self.format_code(str(code)) for code in codes] required_columns = ['TS_CODE', 'LOW', 'PRICE'] # 判断是否开盘时间 if not self.is_trading_time(): return try: batch_size = 40 for i in range(0, len(codes), batch_size): batch = codes[i:i+batch_size] self.append_log("行情实时数据更新...") # print(f"更新批次:{batch}") df = ts.realtime_quote(ts_code=','.join(batch)) for _, row in df.iterrows(): code = row['TS_CODE'] current_price = row['PRICE'] # 计算开盘涨幅 / (OPEN-PRE_CLOSE)/PRE_CLOSE*100 open_price = row['OPEN'] pre_close = row['PRE_CLOSE'] high_price = row['HIGH'] open_zf = round((float(open_price) - float(pre_close)) / float(pre_close) * 100, 2) # 计算流通盘 lt_pan = row['VOLUME'] / 100000000 # 转换为亿 item = self.find_tree_item(code) if item: self.tree.set(item, 'price', f"{current_price:.2f}") target = float(self.tree.item(item)['values'][5]) # 设置开盘涨幅/ (OPEN-PRE_CLOSE)* self.tree.set(item, 'open_zf', f"{open_zf:.2f}%") self.tree.set(item, 'lt_pan', f"{lt_pan:.2f}亿") if current_price >= target and self.tree.item(item)['values'][6] != '已触发': now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.master.after(0, self.tree.set, item, 'alert_time', now) # 自动下单 if self.auto_push_var.get(): self.autopush(item) self.tree.set(item, 'status', '已触发') self.tree.item(item, tags=('triggered',)) self.append_log(f"{code} 已触发!当前价:{current_price}") # 使用系统命令播放音频(Windows) self.play_audio("example.mp3") # 错买的判断 if current_price < target and high_price >= target and self.tree.item(item)['values'][6] != '已触发': self.tree.set(item, 'status', '错买') self.tree.item(item, tags=('wrong_buy',)) except Exception as e: print(f"更新失败:{str(e)}") # 在遍历完所有股票后统计数量 triggered_count = 0 for item in self.tree.get_children(): if self.tree.item(item)['values'][6] == '已触发' or self.tree.item(item)['values'][6] == '错买': triggered_count += 1 # 正确写法: self.master.after(0, lambda: self.trigger_count_label.config(text=f"已触发股票数量:{triggered_count}")) def find_tree_item(self, code): for item in self.tree.get_children(): if self.tree.item(item)['values'][0] == code: return item return None def start_monitor(self): if self.timing_enabled.get(): self.monitor_active = True else: self.monitor_active = True # 非定时模式直接启动 def monitor_loop(): while self.monitor_active: self.update_prices() time.sleep(self.time_interval) threading.Thread(target=monitor_loop, daemon=True).start() def on_closing(self): self.monitor_active = False self.master.destroy() # 扩展自动下单---------------------------------------------------------- def autopush(self, item): code = self.tree.item(item)['values'][0] current_price = float(self.tree.item(item)['values'][2]) print(code) confirm = messagebox.askyesno( "确认下单", f"确认为 {code} 在当前价 {current_price} 自动买入吗?" ) if confirm: # 调用真实交易接口示例: # result = trade_api.sell(code, current_price) # ... 处理下单结果 ... messagebox.showinfo("下单成功", "已执行自动买入操作") if __name__ == "__main__": root = tk.Tk() app = StockMonitor(root) root.protocol("WM_DELETE_WINDOW", app.on_closing) root.mainloop()