Files
real_view/监控个股.py
lintaogood a7c6c48d28 提交更新
增加显示合格的个股数量,新增数据列,流通盘,和 开盘涨幅。
2025-04-11 13:09:53 +08:00

306 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import tushare as ts
import threading
import time
import datetime # 新增导入
import re
class StockMonitor:
def __init__(self, master):
self.master = master
self.master.title("股票价格监控系统")
ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d')
self.pro = ts.pro_api()
self.time_interval = 10 # 监控间隔时间(秒)
self.auto_push_var = tk.IntVar(value=0) # 自动下单开关
self.create_widgets()
self.monitor_active = True
self.start_monitor()
self.trigger_count = 0
#self.trigger_count_label = None # 新增Label变量
# 构建界面
def create_widgets(self):
# 创建Treeview容器框架
tree_frame = ttk.Frame(self.master)
tree_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)
style = ttk.Style()
style.configure('Larger.TButton', font=('微软雅黑', 12))
# 创建Treeview
self.tree = ttk.Treeview(
tree_frame,
columns=('code', 'name', 'open_zf', 'lt_pan', 'price', 'target', 'status', 'alert_time'),
show='headings',
height=20 # 设置默认显示行数
)
# 新增样式配置
self.tree.tag_configure('triggered', foreground='red', font=('Verdana', 11, 'bold')) # 修改样式配置
# 配置列属性
columns = [
('code', '代码', 80),
('name', '名称', 80),
('open_zf', '开盘涨幅', 80),
('lt_pan', '流通盘', 80),
('price', '当前价', 80),
('target', '目标价', 80),
('status', '状态', 60),
('alert_time', '预警时间', 200)
]
for col_id, col_text, col_width in columns:
self.tree.heading(col_id, text=col_text)
self.tree.column(col_id, width=col_width, anchor=tk.CENTER)
# 创建滚动条
vsb = ttk.Scrollbar(
tree_frame,
orient="vertical",
command=self.tree.yview
)
self.tree.configure(yscrollcommand=vsb.set)
# 布局组件
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
# 创建控制按钮框架
control_frame = ttk.Frame(self.master)
control_frame.pack(pady=10)
# 文件选择按钮(原有代码保持不变)
self.btn_load = ttk.Button(
control_frame,
text="选择监控文件",
command=self.select_file,
style='Larger.TButton' # 应用样式
)
# self.btn_load.pack(pady=10)
self.btn_load.pack(side=tk.LEFT, padx=5) # 显式指定侧边排列
# 添加复选框
self.auto_push_cb = ttk.Checkbutton(
control_frame,
text="自动下单",
variable=self.auto_push_var
)
self.auto_push_cb.pack(side=tk.LEFT, padx=5)
status_frame = ttk.Frame(self.master)
status_frame.pack(pady=5, fill=tk.X)
# 创建数量显示Label
self.trigger_count_label = ttk.Label(
status_frame,
text="已触发股票数量0",
font=('微软雅黑', 14),
foreground='red'
)
self.trigger_count_label.pack(padx=10, pady=5, anchor=tk.W)
def select_file(self):
filepath = filedialog.askopenfilename(
title="选择监控列表文件",
filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")]
)
if filepath:
# 验证文件名是否为纯日期格式
filename = os.path.basename(filepath)
if not re.match(r'^\d{8}\.txt$', filename):
messagebox.showerror("错误", "文件名必须为8位数字日期格式20250401.txt")
return
trade_date = filename.split('.')[0]
self.load_stocks(filepath, trade_date)
def load_stocks(self, filepath, trade_date):
try:
self.tree.delete(*self.tree.get_children())
with open(filepath, 'r') as f:
# 读取标题行并验证列
headers = next(f).strip().split(',')
required_cols = ['条件选股', '代码']
for col in required_cols:
if col not in headers:
raise KeyError(f"缺少必要列:{col}")
# 获取列索引
name_col = headers.index('条件选股')
code_col = headers.index('代码')
# price_col = headers.index('成交')
# 批量获取历史数据
codes = []
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split(',')
if len(parts) <= max(name_col, code_col):
continue
code = parts[code_col].strip()
code = f"{code:0>6}" # 标准化为6位代码
codes.append(self.format_code(code))
# 获取历史收盘价
df = self.pro.daily(
ts_code=','.join(codes),
trade_date=trade_date
)
if df.empty:
raise ValueError("未找到历史数据,请确认:\n1.日期是否为交易日\n2.股票代码是否正确")
# 创建收盘价映射表
close_prices = {row['ts_code']: row['close'] for _, row in df.iterrows()}
# 回到文件开头并处理数据
f.seek(0)
next(f) # 跳过标题行
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split(',')
if len(parts) <= max(name_col, code_col):
raise ValueError(f"数据行字段不足:{line}")
name = parts[name_col].strip()
code = parts[code_col].strip()
# 确保代码保持字符串格式,保留前导零
code = f"{code:0>6}" # 确保代码是6位不足前面补零
f_code = self.format_code(code)
# current_price = parts[price_col].strip()
# 获取收盘价
if f_code not in close_prices:
print(f"跳过 {code},未找到收盘价数据")
continue
close_price = close_prices[f_code]
# 计算目标价
target = round(float(close_price) * 1.049, 2)
self.add_stock(code, name, target)
# 在加载成功后更新标题
formatted_date = f"{trade_date[:4]}-{trade_date[4:6]}-{trade_date[6:]}" # 格式化为YYYY-MM-DD
self.master.title(f"股票价格监控系统 - {formatted_date}") # 新增标题设置
messagebox.showinfo("成功", f"加载文件成功:{filepath}")
except Exception as e:
messagebox.showerror("错误", f"加载文件失败:{str(e)}")
def add_stock(self, code, name, target):
"""添加股票到监控列表"""
codes = self.format_code(str(code))
self.tree.insert('', 'end', values=(codes, name, '-', '-', '-', target, '监控中', ''))
# 格式化代码
def format_code(self, code):
# 为股票代码添加后缀
code = f"{code:0>6}" # 确保代码是6位不足前面补零
if code.startswith(("6", "9")):
f_code = f"{code}.SH"
elif code.startswith(("0", "2", "3")):
f_code = f"{code}.SZ"
else:
print(f"未知的股票代码格式: {code}")
return None
return f_code
def update_prices(self):
codes = [self.tree.item(item)['values'][0] for item in self.tree.get_children()]
codes = [self.format_code(str(code)) for code in codes]
required_columns = ['TS_CODE', 'LOW', 'PRICE']
try:
batch_size = 40
for i in range(0, len(codes), batch_size):
batch = codes[i:i+batch_size]
print(f"更新批次:{batch}")
df = ts.realtime_quote(ts_code=','.join(batch))
for _, row in df.iterrows():
code = row['TS_CODE']
current_price = row['PRICE']
# 计算开盘涨幅 / (OPEN-PRE_CLOSE)/PRE_CLOSE*100
open_price = row['OPEN']
pre_close = row['PRE_CLOSE']
open_zf = round((float(open_price) - float(pre_close)) / float(pre_close) * 100, 2)
# 计算流通盘
lt_pan = row['VOLUME'] / 100000000 # 转换为亿
item = self.find_tree_item(code)
if item:
self.tree.set(item, 'price', f"{current_price:.2f}")
target = float(self.tree.item(item)['values'][5])
# 设置开盘涨幅/ (OPEN-PRE_CLOSE)*
self.tree.set(item, 'open_zf', f"{open_zf:.2f}%")
self.tree.set(item, 'lt_pan', f"{lt_pan:.2f}亿")
# print(current_price)
# print(target)
if current_price >= target and self.tree.item(item)['values'][6] != '已触发':
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.master.after(0, self.tree.set, item, 'alert_time', now)
# 自动下单
if self.auto_push_var.get():
self.autopush(item)
self.tree.set(item, 'status', '已触发')
self.tree.item(item, tags=('triggered',))
messagebox.showwarning(
"价格提醒",
f"{code} 已达到目标价!当前价:{current_price}"
)
# 使用系统命令播放音频Windows
#os.system('start example.mp3') # 替换为你的音频文件路径
except Exception as e:
print(f"更新失败:{str(e)}")
# 在遍历完所有股票后统计数量
triggered_count = 0
for item in self.tree.get_children():
if self.tree.item(item)['values'][6] == '已触发':
triggered_count += 1
# 更新Label显示确保在主线程更新
#self.master.after(0, self.trigger_count_label.config, "text", f"已触发股票数量:{triggered_count}")
# 正确写法:
self.master.after(0, lambda: self.trigger_count_label.config(text=f"已触发股票数量:{triggered_count}"))
def find_tree_item(self, code):
for item in self.tree.get_children():
if self.tree.item(item)['values'][0] == code:
return item
return None
def start_monitor(self):
def monitor_loop():
while self.monitor_active:
self.update_prices()
time.sleep(self.time_interval)
threading.Thread(target=monitor_loop, daemon=True).start()
def on_closing(self):
self.monitor_active = False
self.master.destroy()
# 扩展自动下单----------------------------------------------------------
def autopush(self, item):
code = self.tree.item(item)['values'][0]
current_price = float(self.tree.item(item)['values'][2])
print(code)
confirm = messagebox.askyesno(
"确认下单",
f"确认为 {code} 在当前价 {current_price} 自动买入吗?"
)
if confirm:
# 调用真实交易接口示例:
# result = trade_api.sell(code, current_price)
# ... 处理下单结果 ...
messagebox.showinfo("下单成功", "已执行自动买入操作")
if __name__ == "__main__":
root = tk.Tk()
app = StockMonitor(root)
root.protocol("WM_DELETE_WINDOW", app.on_closing)
root.mainloop()