471 lines
19 KiB
Python
471 lines
19 KiB
Python
import os
|
||
import sys
|
||
import subprocess
|
||
import tkinter as tk
|
||
from tkinter import ttk, messagebox, filedialog
|
||
import tushare as ts
|
||
import threading
|
||
import time
|
||
import datetime # 新增导入
|
||
import re
|
||
|
||
class StockMonitor:
|
||
def __init__(self, master):
|
||
self.master = master
|
||
self.master.title("股票价格监控系统")
|
||
ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d')
|
||
self.pro = ts.pro_api()
|
||
self.time_interval = 10 # 监控间隔时间(秒)
|
||
self.auto_push_var = tk.IntVar(value=0) # 自动下单开关
|
||
self.create_widgets()
|
||
self.monitor_active = True
|
||
self.start_monitor()
|
||
self.trigger_count = 0
|
||
self.previous_trading_status = None
|
||
self.start_status_update()
|
||
|
||
|
||
# 构建界面
|
||
def create_widgets(self):
|
||
# 创建Treeview容器框架
|
||
tree_frame = ttk.Frame(self.master)
|
||
tree_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)
|
||
# tree_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True, side=tk.LEFT)
|
||
# 设置字体和样式
|
||
style = ttk.Style()
|
||
style.configure('Larger.TButton', font=('微软雅黑', 12))
|
||
# style.configure('Log.TText', font=('微软雅黑', 12))
|
||
|
||
# # 创建日志输出框
|
||
log_frame = ttk.Frame(self.master)
|
||
log_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, padx=5, pady=5)
|
||
|
||
# self.log_label = ttk.Label(log_frame, text="日志输出:", font=('微软雅黑', 12))
|
||
self.log_text = tk.Text(log_frame, wrap=tk.WORD, height=10, state=tk.DISABLED, font=('微软雅黑', 11), fg='blue')
|
||
scrollbar = ttk.Scrollbar(log_frame, command=self.log_text.yview)
|
||
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
|
||
self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||
self.log_text.config(yscrollcommand=scrollbar.set)
|
||
|
||
# 创建Treeview
|
||
self.tree = ttk.Treeview(
|
||
tree_frame,
|
||
columns=('code', 'name', 'open_zf', 'lt_pan', 'price', 'target', 'status', 'alert_time'),
|
||
show='headings',
|
||
height=20 # 设置默认显示行数
|
||
)
|
||
# 文字新增样式配置
|
||
self.tree.tag_configure('triggered', foreground='red', font=('Verdana', 12, 'bold')) # 修改样式配置
|
||
self.tree.tag_configure('wrong_buy', foreground='green', font=('Verdana', 11, 'bold')) # 修改样式配置
|
||
|
||
# 配置列属性
|
||
columns = [
|
||
('code', '代码', 80),
|
||
('name', '名称', 80),
|
||
('open_zf', '开盘涨幅', 80),
|
||
('lt_pan', '流通盘', 80),
|
||
('price', '当前价', 80),
|
||
('target', '目标价', 80),
|
||
('status', '状态', 60),
|
||
('alert_time', '预警时间', 200)
|
||
]
|
||
|
||
for col_id, col_text, col_width in columns:
|
||
self.tree.heading(col_id, text=col_text)
|
||
self.tree.column(col_id, width=col_width, anchor=tk.CENTER)
|
||
|
||
# 创建滚动条
|
||
vsb = ttk.Scrollbar(
|
||
tree_frame,
|
||
orient="vertical",
|
||
command=self.tree.yview
|
||
)
|
||
self.tree.configure(yscrollcommand=vsb.set)
|
||
|
||
# 布局组件
|
||
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||
vsb.pack(side=tk.RIGHT, fill=tk.Y)
|
||
|
||
# 创建控制按钮框架
|
||
control_frame = ttk.Frame(self.master)
|
||
control_frame.pack(pady=20)
|
||
|
||
# 文件选择按钮(原有代码保持不变)
|
||
self.btn_load = ttk.Button(
|
||
control_frame,
|
||
text="选择监控文件",
|
||
command=self.select_file,
|
||
style='Larger.TButton', # 应用样式
|
||
width=18 # 新增宽度设置
|
||
)
|
||
self.btn_load.pack(side=tk.LEFT, padx=(5, 15)) # 显式指定侧边排列
|
||
|
||
# 文件选择按钮(原有代码保持不变)
|
||
self.btn_moniter = ttk.Button(
|
||
control_frame,
|
||
text="开始监控",
|
||
command=self.select_file,
|
||
style='Larger.TButton', # 应用样式
|
||
width=15, # 新增宽度设置,
|
||
state=tk.DISABLED # 初始禁用
|
||
)
|
||
self.btn_moniter.pack(side=tk.LEFT, padx=(10, 15)) # 显式指定侧边排列
|
||
|
||
# 复选框-自动下单
|
||
self.auto_push_cb = ttk.Checkbutton(
|
||
control_frame,
|
||
text="自动下单",
|
||
variable=self.auto_push_var,
|
||
state=tk.DISABLED
|
||
)
|
||
self.auto_push_cb.pack(side=tk.LEFT, padx=5)
|
||
|
||
# 显示触发股票数
|
||
status_frame = ttk.Frame(self.master)
|
||
status_frame.pack(pady=5, fill=tk.X)
|
||
# 创建数量显示Label
|
||
self.trigger_count_label = ttk.Label(
|
||
status_frame,
|
||
text="已触发股票数量:0",
|
||
font=('微软雅黑', 14),
|
||
foreground='red'
|
||
)
|
||
self.trigger_count_label.pack(padx=10, pady=5, side=tk.LEFT, anchor=tk.W)
|
||
|
||
# 开盘时间显示
|
||
self.trading_status_label = ttk.Label(
|
||
status_frame,
|
||
text="当前状态:非开盘时间",
|
||
font=('微软雅黑', 14),
|
||
foreground='red'
|
||
)
|
||
self.trading_status_label.pack(padx=10, pady=5, side=tk.RIGHT, anchor=tk.E)
|
||
|
||
# 在control_frame内添加定时监控组件
|
||
timing_frame = ttk.Frame(control_frame)
|
||
timing_frame.pack(side=tk.LEFT, padx=(5, 15))
|
||
|
||
# 时间输入框
|
||
timing_label = ttk.Label(timing_frame, text="定时时间:")
|
||
timing_label.pack(side=tk.LEFT)
|
||
self.timing_time = tk.StringVar()
|
||
self.timing_entry = ttk.Entry(timing_frame, textvariable=self.timing_time, width=8)
|
||
self.timing_entry.pack(side=tk.LEFT)
|
||
|
||
# 定时监控复选框
|
||
self.timing_enabled = tk.BooleanVar()
|
||
self.timing_checkbox = ttk.Checkbutton(
|
||
control_frame,
|
||
text="定时监控",
|
||
variable=self.timing_enabled
|
||
)
|
||
self.timing_checkbox.pack(side=tk.LEFT, padx=5)
|
||
|
||
def append_log(self, message):
|
||
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 格式化到秒
|
||
self.log_text.config(state=tk.NORMAL)
|
||
self.log_text.insert(tk.END, f"[{current_time}] {message}\n")
|
||
self.log_text.see(tk.END)
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
# 选中文件读取
|
||
def select_file(self):
|
||
filepath = filedialog.askopenfilename(
|
||
title="选择监控列表文件",
|
||
filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")]
|
||
)
|
||
if filepath:
|
||
# 验证文件名是否为纯日期格式
|
||
filename = os.path.basename(filepath)
|
||
if not re.match(r'^\d{8}\.txt$', filename):
|
||
messagebox.showerror("错误", "文件名必须为8位数字日期格式(如:20250401.txt)")
|
||
return
|
||
|
||
trade_date = filename.split('.')[0]
|
||
self.load_stocks(filepath, trade_date)
|
||
|
||
def load_stocks(self, filepath, trade_date):
|
||
try:
|
||
self.tree.delete(*self.tree.get_children())
|
||
with open(filepath, 'r') as f:
|
||
# 读取标题行并验证列
|
||
headers = next(f).strip().split(',')
|
||
required_cols = ['条件选股', '代码']
|
||
for col in required_cols:
|
||
if col not in headers:
|
||
raise KeyError(f"缺少必要列:{col}")
|
||
# 获取列索引
|
||
name_col = headers.index('条件选股')
|
||
code_col = headers.index('代码')
|
||
# price_col = headers.index('成交')
|
||
|
||
# 批量获取历史数据
|
||
codes = []
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith('#'):
|
||
continue
|
||
parts = line.split(',')
|
||
if len(parts) <= max(name_col, code_col):
|
||
continue
|
||
code = parts[code_col].strip()
|
||
code = f"{code:0>6}" # 标准化为6位代码
|
||
codes.append(self.format_code(code))
|
||
|
||
# 获取历史收盘价
|
||
df = self.pro.daily(
|
||
ts_code=','.join(codes),
|
||
trade_date=trade_date
|
||
)
|
||
if df.empty:
|
||
raise ValueError("未找到历史数据,请确认:\n1.日期是否为交易日\n2.股票代码是否正确")
|
||
|
||
# 创建收盘价映射表
|
||
close_prices = {row['ts_code']: row['close'] for _, row in df.iterrows()}
|
||
|
||
# 回到文件开头并处理数据
|
||
f.seek(0)
|
||
next(f) # 跳过标题行
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith('#'):
|
||
continue
|
||
parts = line.split(',')
|
||
if len(parts) <= max(name_col, code_col):
|
||
raise ValueError(f"数据行字段不足:{line}")
|
||
name = parts[name_col].strip()
|
||
code = parts[code_col].strip()
|
||
# 确保代码保持字符串格式,保留前导零
|
||
code = f"{code:0>6}" # 确保代码是6位,不足前面补零
|
||
f_code = self.format_code(code)
|
||
|
||
# 获取收盘价
|
||
if f_code not in close_prices:
|
||
print(f"跳过 {code},未找到收盘价数据")
|
||
continue
|
||
|
||
close_price = close_prices[f_code]
|
||
# 计算目标价
|
||
target = round(float(close_price) * 1.049, 2)
|
||
self.add_stock(code, name, target)
|
||
|
||
# 在加载成功后更新标题
|
||
formatted_date = f"{trade_date[:4]}-{trade_date[4:6]}-{trade_date[6:]}" # 格式化为YYYY-MM-DD
|
||
self.master.title(f"股票价格监控系统 - {formatted_date}") # 新增标题设置
|
||
self.append_log(f"加载文件成功:{filepath}")
|
||
except Exception as e:
|
||
self.append_log(f"加载文件失败:{str(e)}")
|
||
# messagebox.showerror("错误", f"加载文件失败:{str(e)}")
|
||
|
||
def add_stock(self, code, name, target):
|
||
"""添加股票到监控列表"""
|
||
codes = self.format_code(str(code))
|
||
self.tree.insert('', 'end', values=(codes, name, '-', '-', '-', target, '监控中', ''))
|
||
|
||
# 检查是否为交易日
|
||
def is_trading_time(self):
|
||
now = datetime.datetime.now()
|
||
current_time = now.time()
|
||
weekday = now.weekday() # 0-4是周一到周五
|
||
return (
|
||
weekday < 5 and # 仅工作日
|
||
(
|
||
(datetime.time(9, 30) <= current_time <= datetime.time(11, 30)) or
|
||
(datetime.time(13, 0) <= current_time <= datetime.time(15, 0))
|
||
)
|
||
)
|
||
|
||
def update_trading_status(self):
|
||
current_status = "开盘中" if self.is_trading_time() else "非开盘时间"
|
||
color = "green" if current_status == "开盘中" else "red"
|
||
# 状态变更时才记录日志
|
||
if current_status != self.previous_trading_status:
|
||
self.append_log(f"系统状态更新:{current_status}")
|
||
self.previous_trading_status = current_status
|
||
# 变更UI
|
||
self.master.after(0, lambda s=current_status, c=color:
|
||
self.trading_status_label.config(text=f"当前状态:{s}", foreground=c))
|
||
|
||
# 监控状态
|
||
def start_status_update(self):
|
||
def _loop():
|
||
while self.monitor_active:
|
||
self.update_trading_status()
|
||
time.sleep(1) # 每秒检查一次
|
||
|
||
threading.Thread(target=_loop, daemon=True).start()
|
||
|
||
def validate_time(self, time_str):
|
||
try:
|
||
datetime.datetime.strptime(time_str, "%H:%M")
|
||
return True
|
||
except ValueError:
|
||
return False
|
||
|
||
def on_timing_change(self, *args):
|
||
if self.timing_enabled.get():
|
||
time_str = self.timing_time.get().strip()
|
||
if not self.validate_time(time_str):
|
||
messagebox.showerror("错误", "时间格式应为 HH:MM(如:09:30)")
|
||
self.timing_enabled.set(False)
|
||
return
|
||
self.start_monitor_at_time(time_str)
|
||
else:
|
||
self.monitor_active = True
|
||
self.start_monitor()
|
||
|
||
def start_monitor_at_time(self, target_time):
|
||
now = datetime.datetime.now()
|
||
target = datetime.datetime.strptime(target_time, "%H:%M")
|
||
target = now.replace(
|
||
hour=target.hour, minute=target.minute, second=0, microsecond=0
|
||
)
|
||
if target < now:
|
||
target += datetime.timedelta(days=1)
|
||
delta = target - now
|
||
threading.Timer(delta.total_seconds(), self.start_monitor).start()
|
||
|
||
# 格式化代码
|
||
def format_code(self, code):
|
||
# 为股票代码添加后缀
|
||
code = f"{code:0>6}" # 确保代码是6位,不足前面补零
|
||
if code.startswith(("6", "9")):
|
||
f_code = f"{code}.SH"
|
||
elif code.startswith(("0", "2", "3")):
|
||
f_code = f"{code}.SZ"
|
||
else:
|
||
print(f"未知的股票代码格式: {code}")
|
||
return None
|
||
return f_code
|
||
|
||
def play_audio(self, file_path):
|
||
try:
|
||
if not os.path.exists(file_path):
|
||
raise FileNotFoundError(f"文件路径不存在: {file_path}")
|
||
|
||
if sys.platform == "win32":
|
||
os.startfile(file_path)
|
||
elif sys.platform == "darwin":
|
||
subprocess.run(["open", file_path], check=True)
|
||
else:
|
||
subprocess.run(["xdg-open", file_path], check=True)
|
||
except FileNotFoundError as e:
|
||
print(f"错误: {e}")
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"播放失败: {e}")
|
||
except Exception as e:
|
||
print(f"未知错误: {e}")
|
||
|
||
def update_prices(self):
|
||
codes = [self.tree.item(item)['values'][0] for item in self.tree.get_children()]
|
||
codes = [self.format_code(str(code)) for code in codes]
|
||
required_columns = ['TS_CODE', 'LOW', 'PRICE']
|
||
|
||
# 判断是否开盘时间
|
||
if not self.is_trading_time():
|
||
return
|
||
|
||
try:
|
||
batch_size = 40
|
||
for i in range(0, len(codes), batch_size):
|
||
batch = codes[i:i+batch_size]
|
||
self.append_log("行情实时数据更新...")
|
||
# print(f"更新批次:{batch}")
|
||
df = ts.realtime_quote(ts_code=','.join(batch))
|
||
for _, row in df.iterrows():
|
||
code = row['TS_CODE']
|
||
current_price = row['PRICE']
|
||
# 计算开盘涨幅 / (OPEN-PRE_CLOSE)/PRE_CLOSE*100
|
||
open_price = row['OPEN']
|
||
pre_close = row['PRE_CLOSE']
|
||
high_price = row['HIGH']
|
||
open_zf = round((float(open_price) - float(pre_close)) / float(pre_close) * 100, 2)
|
||
|
||
# 计算流通盘
|
||
lt_pan = row['VOLUME'] / 100000000 # 转换为亿
|
||
|
||
item = self.find_tree_item(code)
|
||
if item:
|
||
self.tree.set(item, 'price', f"{current_price:.2f}")
|
||
target = float(self.tree.item(item)['values'][5])
|
||
# 设置开盘涨幅/ (OPEN-PRE_CLOSE)*
|
||
self.tree.set(item, 'open_zf', f"{open_zf:.2f}%")
|
||
self.tree.set(item, 'lt_pan', f"{lt_pan:.2f}亿")
|
||
|
||
if current_price >= target and self.tree.item(item)['values'][6] != '已触发':
|
||
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
self.master.after(0, self.tree.set, item, 'alert_time', now)
|
||
|
||
# 自动下单
|
||
if self.auto_push_var.get():
|
||
self.autopush(item)
|
||
|
||
self.tree.set(item, 'status', '已触发')
|
||
self.tree.item(item, tags=('triggered',))
|
||
self.append_log(f"{code} 已触发!当前价:{current_price}")
|
||
|
||
# 使用系统命令播放音频(Windows)
|
||
self.play_audio("example.mp3")
|
||
|
||
# 错买的判断
|
||
if current_price < target and high_price >= target and self.tree.item(item)['values'][6] != '已触发':
|
||
self.tree.set(item, 'status', '错买')
|
||
self.tree.item(item, tags=('wrong_buy',))
|
||
|
||
except Exception as e:
|
||
print(f"更新失败:{str(e)}")
|
||
# 在遍历完所有股票后统计数量
|
||
triggered_count = 0
|
||
for item in self.tree.get_children():
|
||
if self.tree.item(item)['values'][6] == '已触发' or self.tree.item(item)['values'][6] == '错买':
|
||
triggered_count += 1
|
||
|
||
# 正确写法:
|
||
self.master.after(0, lambda: self.trigger_count_label.config(text=f"已触发股票数量:{triggered_count}"))
|
||
|
||
def find_tree_item(self, code):
|
||
for item in self.tree.get_children():
|
||
if self.tree.item(item)['values'][0] == code:
|
||
return item
|
||
return None
|
||
|
||
def start_monitor(self):
|
||
if self.timing_enabled.get():
|
||
self.monitor_active = True
|
||
else:
|
||
self.monitor_active = True # 非定时模式直接启动
|
||
|
||
def monitor_loop():
|
||
while self.monitor_active:
|
||
self.update_prices()
|
||
time.sleep(self.time_interval)
|
||
threading.Thread(target=monitor_loop, daemon=True).start()
|
||
|
||
def on_closing(self):
|
||
self.monitor_active = False
|
||
self.master.destroy()
|
||
|
||
# 扩展自动下单----------------------------------------------------------
|
||
def autopush(self, item):
|
||
code = self.tree.item(item)['values'][0]
|
||
current_price = float(self.tree.item(item)['values'][2])
|
||
print(code)
|
||
confirm = messagebox.askyesno(
|
||
"确认下单",
|
||
f"确认为 {code} 在当前价 {current_price} 自动买入吗?"
|
||
)
|
||
if confirm:
|
||
# 调用真实交易接口示例:
|
||
# result = trade_api.sell(code, current_price)
|
||
# ... 处理下单结果 ...
|
||
messagebox.showinfo("下单成功", "已执行自动买入操作")
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
root = tk.Tk()
|
||
app = StockMonitor(root)
|
||
root.protocol("WM_DELETE_WINDOW", app.on_closing)
|
||
root.mainloop()
|
||
|