Files
real_view/监控个股.py

471 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import sys
import subprocess
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import tushare as ts
import threading
import time
import datetime # 新增导入
import re
class StockMonitor:
def __init__(self, master):
self.master = master
self.master.title("股票价格监控系统")
ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d')
self.pro = ts.pro_api()
self.time_interval = 10 # 监控间隔时间(秒)
self.auto_push_var = tk.IntVar(value=0) # 自动下单开关
self.create_widgets()
self.monitor_active = True
self.start_monitor()
self.trigger_count = 0
self.previous_trading_status = None
self.start_status_update()
# 构建界面
def create_widgets(self):
# 创建Treeview容器框架
tree_frame = ttk.Frame(self.master)
tree_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)
# tree_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True, side=tk.LEFT)
# 设置字体和样式
style = ttk.Style()
style.configure('Larger.TButton', font=('微软雅黑', 12))
# style.configure('Log.TText', font=('微软雅黑', 12))
# # 创建日志输出框
log_frame = ttk.Frame(self.master)
log_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, padx=5, pady=5)
# self.log_label = ttk.Label(log_frame, text="日志输出:", font=('微软雅黑', 12))
self.log_text = tk.Text(log_frame, wrap=tk.WORD, height=10, state=tk.DISABLED, font=('微软雅黑', 11), fg='blue')
scrollbar = ttk.Scrollbar(log_frame, command=self.log_text.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.log_text.config(yscrollcommand=scrollbar.set)
# 创建Treeview
self.tree = ttk.Treeview(
tree_frame,
columns=('code', 'name', 'open_zf', 'lt_pan', 'price', 'target', 'status', 'alert_time'),
show='headings',
height=20 # 设置默认显示行数
)
# 文字新增样式配置
self.tree.tag_configure('triggered', foreground='red', font=('Verdana', 12, 'bold')) # 修改样式配置
self.tree.tag_configure('wrong_buy', foreground='green', font=('Verdana', 11, 'bold')) # 修改样式配置
# 配置列属性
columns = [
('code', '代码', 80),
('name', '名称', 80),
('open_zf', '开盘涨幅', 80),
('lt_pan', '流通盘', 80),
('price', '当前价', 80),
('target', '目标价', 80),
('status', '状态', 60),
('alert_time', '预警时间', 200)
]
for col_id, col_text, col_width in columns:
self.tree.heading(col_id, text=col_text)
self.tree.column(col_id, width=col_width, anchor=tk.CENTER)
# 创建滚动条
vsb = ttk.Scrollbar(
tree_frame,
orient="vertical",
command=self.tree.yview
)
self.tree.configure(yscrollcommand=vsb.set)
# 布局组件
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
# 创建控制按钮框架
control_frame = ttk.Frame(self.master)
control_frame.pack(pady=20)
# 文件选择按钮(原有代码保持不变)
self.btn_load = ttk.Button(
control_frame,
text="选择监控文件",
command=self.select_file,
style='Larger.TButton', # 应用样式
width=18 # 新增宽度设置
)
self.btn_load.pack(side=tk.LEFT, padx=(5, 15)) # 显式指定侧边排列
# 文件选择按钮(原有代码保持不变)
self.btn_moniter = ttk.Button(
control_frame,
text="开始监控",
command=self.select_file,
style='Larger.TButton', # 应用样式
width=15, # 新增宽度设置,
state=tk.DISABLED # 初始禁用
)
self.btn_moniter.pack(side=tk.LEFT, padx=(10, 15)) # 显式指定侧边排列
# 复选框-自动下单
self.auto_push_cb = ttk.Checkbutton(
control_frame,
text="自动下单",
variable=self.auto_push_var,
state=tk.DISABLED
)
self.auto_push_cb.pack(side=tk.LEFT, padx=5)
# 显示触发股票数
status_frame = ttk.Frame(self.master)
status_frame.pack(pady=5, fill=tk.X)
# 创建数量显示Label
self.trigger_count_label = ttk.Label(
status_frame,
text="已触发股票数量0",
font=('微软雅黑', 14),
foreground='red'
)
self.trigger_count_label.pack(padx=10, pady=5, side=tk.LEFT, anchor=tk.W)
# 开盘时间显示
self.trading_status_label = ttk.Label(
status_frame,
text="当前状态:非开盘时间",
font=('微软雅黑', 14),
foreground='red'
)
self.trading_status_label.pack(padx=10, pady=5, side=tk.RIGHT, anchor=tk.E)
# 在control_frame内添加定时监控组件
timing_frame = ttk.Frame(control_frame)
timing_frame.pack(side=tk.LEFT, padx=(5, 15))
# 时间输入框
timing_label = ttk.Label(timing_frame, text="定时时间:")
timing_label.pack(side=tk.LEFT)
self.timing_time = tk.StringVar()
self.timing_entry = ttk.Entry(timing_frame, textvariable=self.timing_time, width=8)
self.timing_entry.pack(side=tk.LEFT)
# 定时监控复选框
self.timing_enabled = tk.BooleanVar()
self.timing_checkbox = ttk.Checkbutton(
control_frame,
text="定时监控",
variable=self.timing_enabled
)
self.timing_checkbox.pack(side=tk.LEFT, padx=5)
def append_log(self, message):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 格式化到秒
self.log_text.config(state=tk.NORMAL)
self.log_text.insert(tk.END, f"[{current_time}] {message}\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
# 选中文件读取
def select_file(self):
filepath = filedialog.askopenfilename(
title="选择监控列表文件",
filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")]
)
if filepath:
# 验证文件名是否为纯日期格式
filename = os.path.basename(filepath)
if not re.match(r'^\d{8}\.txt$', filename):
messagebox.showerror("错误", "文件名必须为8位数字日期格式20250401.txt")
return
trade_date = filename.split('.')[0]
self.load_stocks(filepath, trade_date)
def load_stocks(self, filepath, trade_date):
try:
self.tree.delete(*self.tree.get_children())
with open(filepath, 'r') as f:
# 读取标题行并验证列
headers = next(f).strip().split(',')
required_cols = ['条件选股', '代码']
for col in required_cols:
if col not in headers:
raise KeyError(f"缺少必要列:{col}")
# 获取列索引
name_col = headers.index('条件选股')
code_col = headers.index('代码')
# price_col = headers.index('成交')
# 批量获取历史数据
codes = []
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split(',')
if len(parts) <= max(name_col, code_col):
continue
code = parts[code_col].strip()
code = f"{code:0>6}" # 标准化为6位代码
codes.append(self.format_code(code))
# 获取历史收盘价
df = self.pro.daily(
ts_code=','.join(codes),
trade_date=trade_date
)
if df.empty:
raise ValueError("未找到历史数据,请确认:\n1.日期是否为交易日\n2.股票代码是否正确")
# 创建收盘价映射表
close_prices = {row['ts_code']: row['close'] for _, row in df.iterrows()}
# 回到文件开头并处理数据
f.seek(0)
next(f) # 跳过标题行
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split(',')
if len(parts) <= max(name_col, code_col):
raise ValueError(f"数据行字段不足:{line}")
name = parts[name_col].strip()
code = parts[code_col].strip()
# 确保代码保持字符串格式,保留前导零
code = f"{code:0>6}" # 确保代码是6位不足前面补零
f_code = self.format_code(code)
# 获取收盘价
if f_code not in close_prices:
print(f"跳过 {code},未找到收盘价数据")
continue
close_price = close_prices[f_code]
# 计算目标价
target = round(float(close_price) * 1.049, 2)
self.add_stock(code, name, target)
# 在加载成功后更新标题
formatted_date = f"{trade_date[:4]}-{trade_date[4:6]}-{trade_date[6:]}" # 格式化为YYYY-MM-DD
self.master.title(f"股票价格监控系统 - {formatted_date}") # 新增标题设置
self.append_log(f"加载文件成功:{filepath}")
except Exception as e:
self.append_log(f"加载文件失败:{str(e)}")
# messagebox.showerror("错误", f"加载文件失败:{str(e)}")
def add_stock(self, code, name, target):
"""添加股票到监控列表"""
codes = self.format_code(str(code))
self.tree.insert('', 'end', values=(codes, name, '-', '-', '-', target, '监控中', ''))
# 检查是否为交易日
def is_trading_time(self):
now = datetime.datetime.now()
current_time = now.time()
weekday = now.weekday() # 0-4是周一到周五
return (
weekday < 5 and # 仅工作日
(
(datetime.time(9, 30) <= current_time <= datetime.time(11, 30)) or
(datetime.time(13, 0) <= current_time <= datetime.time(15, 0))
)
)
def update_trading_status(self):
current_status = "开盘中" if self.is_trading_time() else "非开盘时间"
color = "green" if current_status == "开盘中" else "red"
# 状态变更时才记录日志
if current_status != self.previous_trading_status:
self.append_log(f"系统状态更新:{current_status}")
self.previous_trading_status = current_status
# 变更UI
self.master.after(0, lambda s=current_status, c=color:
self.trading_status_label.config(text=f"当前状态:{s}", foreground=c))
# 监控状态
def start_status_update(self):
def _loop():
while self.monitor_active:
self.update_trading_status()
time.sleep(1) # 每秒检查一次
threading.Thread(target=_loop, daemon=True).start()
def validate_time(self, time_str):
try:
datetime.datetime.strptime(time_str, "%H:%M")
return True
except ValueError:
return False
def on_timing_change(self, *args):
if self.timing_enabled.get():
time_str = self.timing_time.get().strip()
if not self.validate_time(time_str):
messagebox.showerror("错误", "时间格式应为 HH:MM09:30")
self.timing_enabled.set(False)
return
self.start_monitor_at_time(time_str)
else:
self.monitor_active = True
self.start_monitor()
def start_monitor_at_time(self, target_time):
now = datetime.datetime.now()
target = datetime.datetime.strptime(target_time, "%H:%M")
target = now.replace(
hour=target.hour, minute=target.minute, second=0, microsecond=0
)
if target < now:
target += datetime.timedelta(days=1)
delta = target - now
threading.Timer(delta.total_seconds(), self.start_monitor).start()
# 格式化代码
def format_code(self, code):
# 为股票代码添加后缀
code = f"{code:0>6}" # 确保代码是6位不足前面补零
if code.startswith(("6", "9")):
f_code = f"{code}.SH"
elif code.startswith(("0", "2", "3")):
f_code = f"{code}.SZ"
else:
print(f"未知的股票代码格式: {code}")
return None
return f_code
def play_audio(self, file_path):
try:
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件路径不存在: {file_path}")
if sys.platform == "win32":
os.startfile(file_path)
elif sys.platform == "darwin":
subprocess.run(["open", file_path], check=True)
else:
subprocess.run(["xdg-open", file_path], check=True)
except FileNotFoundError as e:
print(f"错误: {e}")
except subprocess.CalledProcessError as e:
print(f"播放失败: {e}")
except Exception as e:
print(f"未知错误: {e}")
def update_prices(self):
codes = [self.tree.item(item)['values'][0] for item in self.tree.get_children()]
codes = [self.format_code(str(code)) for code in codes]
required_columns = ['TS_CODE', 'LOW', 'PRICE']
# 判断是否开盘时间
if not self.is_trading_time():
return
try:
batch_size = 40
for i in range(0, len(codes), batch_size):
batch = codes[i:i+batch_size]
self.append_log("行情实时数据更新...")
# print(f"更新批次:{batch}")
df = ts.realtime_quote(ts_code=','.join(batch))
for _, row in df.iterrows():
code = row['TS_CODE']
current_price = row['PRICE']
# 计算开盘涨幅 / (OPEN-PRE_CLOSE)/PRE_CLOSE*100
open_price = row['OPEN']
pre_close = row['PRE_CLOSE']
high_price = row['HIGH']
open_zf = round((float(open_price) - float(pre_close)) / float(pre_close) * 100, 2)
# 计算流通盘
lt_pan = row['VOLUME'] / 100000000 # 转换为亿
item = self.find_tree_item(code)
if item:
self.tree.set(item, 'price', f"{current_price:.2f}")
target = float(self.tree.item(item)['values'][5])
# 设置开盘涨幅/ (OPEN-PRE_CLOSE)*
self.tree.set(item, 'open_zf', f"{open_zf:.2f}%")
self.tree.set(item, 'lt_pan', f"{lt_pan:.2f}亿")
if current_price >= target and self.tree.item(item)['values'][6] != '已触发':
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.master.after(0, self.tree.set, item, 'alert_time', now)
# 自动下单
if self.auto_push_var.get():
self.autopush(item)
self.tree.set(item, 'status', '已触发')
self.tree.item(item, tags=('triggered',))
self.append_log(f"{code} 已触发!当前价:{current_price}")
# 使用系统命令播放音频Windows
self.play_audio("example.mp3")
# 错买的判断
if current_price < target and high_price >= target and self.tree.item(item)['values'][6] != '已触发':
self.tree.set(item, 'status', '错买')
self.tree.item(item, tags=('wrong_buy',))
except Exception as e:
print(f"更新失败:{str(e)}")
# 在遍历完所有股票后统计数量
triggered_count = 0
for item in self.tree.get_children():
if self.tree.item(item)['values'][6] == '已触发' or self.tree.item(item)['values'][6] == '错买':
triggered_count += 1
# 正确写法:
self.master.after(0, lambda: self.trigger_count_label.config(text=f"已触发股票数量:{triggered_count}"))
def find_tree_item(self, code):
for item in self.tree.get_children():
if self.tree.item(item)['values'][0] == code:
return item
return None
def start_monitor(self):
if self.timing_enabled.get():
self.monitor_active = True
else:
self.monitor_active = True # 非定时模式直接启动
def monitor_loop():
while self.monitor_active:
self.update_prices()
time.sleep(self.time_interval)
threading.Thread(target=monitor_loop, daemon=True).start()
def on_closing(self):
self.monitor_active = False
self.master.destroy()
# 扩展自动下单----------------------------------------------------------
def autopush(self, item):
code = self.tree.item(item)['values'][0]
current_price = float(self.tree.item(item)['values'][2])
print(code)
confirm = messagebox.askyesno(
"确认下单",
f"确认为 {code} 在当前价 {current_price} 自动买入吗?"
)
if confirm:
# 调用真实交易接口示例:
# result = trade_api.sell(code, current_price)
# ... 处理下单结果 ...
messagebox.showinfo("下单成功", "已执行自动买入操作")
if __name__ == "__main__":
root = tk.Tk()
app = StockMonitor(root)
root.protocol("WM_DELETE_WINDOW", app.on_closing)
root.mainloop()