820 lines
35 KiB
Python
820 lines
35 KiB
Python
import tkinter as tk
|
||
from tkinter import ttk
|
||
from tkinter import messagebox # 👈 新增这一行
|
||
import matplotlib.pyplot as plt
|
||
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
||
import pandas as pd
|
||
import os
|
||
import logging
|
||
import tushare as ts
|
||
import datetime
|
||
import glob
|
||
from functools import lru_cache
|
||
import threading # 👈 新增这一行
|
||
from order_executor import OrderExecutor
|
||
|
||
plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置中文字体
|
||
plt.rcParams['axes.unicode_minus'] = False # 解决负号 '-' 显示为方块的问题
|
||
|
||
@lru_cache(maxsize=100)
|
||
|
||
class StockAnalysisApp:
|
||
def __init__(self, root):
|
||
self.root = root
|
||
self.root.title("短线速逆-股票数据分析")
|
||
self.root.geometry("1000x1000")
|
||
self.days_var = tk.StringVar(value="1") # 默认显示1天
|
||
self.count_days = 3 # 用于计算的天数
|
||
self.days_for_limit_up = 5 # 公开参数,判断几天内是否有涨停
|
||
self.trade_cal_cache = {} # 新增交易日历缓存字典
|
||
# 初始化配置
|
||
self.setup_config()
|
||
self.setup_ui()
|
||
|
||
# 新增:初始化 OrderExecutor 实例
|
||
self.order_executor = OrderExecutor()
|
||
|
||
def setup_config(self):
|
||
"""初始化配置"""
|
||
# 初始化 Matplotlib 字体设置
|
||
# plt.rcParams['font.sans-serif'] = ['SimHei']
|
||
# plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
# 设置 Tushare API Token
|
||
ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d')
|
||
self.pro = ts.pro_api()
|
||
|
||
# 数据目录
|
||
self.day_data_dir = r"D:\gp_data\day"
|
||
self.history_data_dir = r"D:\gp_data\history"
|
||
self.huice_data_dir = r"D:\gp_data\huice"
|
||
|
||
# 初始化变量
|
||
self.year_var = tk.StringVar()
|
||
self.month_var = tk.StringVar()
|
||
self.date_var = tk.StringVar()
|
||
self.offline_mode = tk.BooleanVar(value=False)
|
||
self.selected_board = tk.StringVar(value="all")
|
||
self.days_var = tk.StringVar(value="1")
|
||
self.TARGET_RATIO = 1.049 # 新增配置项
|
||
self.MIN_LIMIT_UP_COUNT = 2
|
||
|
||
def setup_ui(self):
|
||
"""设置用户界面"""
|
||
self.choose_frame = ttk.Frame(self.root)
|
||
self.choose_frame.grid(row=0, column=0, rowspan=1, padx=10, pady=10, sticky="ew")
|
||
self.root.grid_columnconfigure(0, weight=1)
|
||
# 年份选择框
|
||
current_year = datetime.datetime.now().year
|
||
years = [str(current_year - i) for i in range(3)]
|
||
|
||
# 创建蓝色大字体样式
|
||
style = ttk.Style()
|
||
style.configure('Blue.TLabel', font=('微软雅黑', 12), foreground='blue')
|
||
style.configure('Blue.TButton', font=('微软雅黑', 12), foreground='blue')
|
||
style.configure('Red.TButton', font=('微软雅黑', 14), foreground='red')
|
||
style.configure('Blue.TCombobox', font=('微软雅黑', 14), foreground='red')
|
||
style.map('Blue.TCombobox',
|
||
fieldbackground=[('readonly', 'white')],
|
||
foreground=[('readonly', 'blue')],
|
||
selectbackground=[('readonly', 'white')],
|
||
selectforeground=[('readonly', 'blue')])
|
||
|
||
ttk.Label(self.choose_frame, text="选择年份:", style='Blue.TLabel').grid(row=0, column=0, padx=5, pady=10, sticky="w")
|
||
|
||
self.year_var.set(years[0])
|
||
year_menu = ttk.Combobox(self.choose_frame, textvariable=self.year_var, values=years, width=8, style='Blue.TCombobox')
|
||
year_menu.grid(row=0, column=1, padx=1, pady=5, sticky="w")
|
||
year_menu.bind("<<ComboboxSelected>>", lambda _: self.update_months())
|
||
|
||
# 月份选择框
|
||
ttk.Label(self.choose_frame, text="选择月份:", style='Blue.TLabel').grid(row=0, column=2, padx=2, pady=10)
|
||
month_menu = ttk.Combobox(self.choose_frame, textvariable=self.month_var, width=10, state='readonly', style='Blue.TCombobox')
|
||
month_menu.grid(row=0, column=3, padx=2, pady=10)
|
||
month_menu.bind("<<ComboboxSelected>>", lambda _: (self.update_dates(), self.plot_monthly_data()))
|
||
|
||
# 日期选择框
|
||
ttk.Label(self.choose_frame, text="选择日期:", style='Blue.TLabel').grid(row=0, column=4, padx=10, pady=10)
|
||
date_menu = ttk.Combobox(self.choose_frame, textvariable=self.date_var, width=10, state='readonly', style='Blue.TCombobox')
|
||
date_menu.grid(row=0, column=5, padx=10, pady=10)
|
||
# date_menu.bind("<<ComboboxSelected>>", lambda _: (self.update_plot()))
|
||
|
||
# 增加读取按钮
|
||
load_btn = ttk.Button(self.choose_frame, text="读取数据", style='Blue.TButton', command=self.load_data)
|
||
load_btn.grid(row=0, column=6, padx=10, pady=10)
|
||
|
||
# 增加导出按钮
|
||
export_btn = ttk.Button(self.choose_frame, text="导出Excel", style='Red.TButton', command=self.export_to_excel)
|
||
export_btn.grid(row=0, column=7, padx=10, pady=10)
|
||
|
||
self.set_frame = ttk.Frame(self.root)
|
||
self.set_frame.grid(row=1, column=0, rowspan=1, padx=10, pady=10, sticky="ew")
|
||
|
||
# 显示触发股票数
|
||
self.trigger_count_label = ttk.Label(
|
||
self.set_frame,
|
||
text="已触发股票数量:0",
|
||
font=('微软雅黑', 14),
|
||
foreground='red'
|
||
)
|
||
self.trigger_count_label.grid(row=0, column=1, padx=10, pady=10)
|
||
# self.trigger_count_label.pack(padx=10, pady=5, side=tk.LEFT, anchor=tk.W)
|
||
|
||
# 窗口置顶复选框
|
||
self.topmost_var = tk.BooleanVar()
|
||
self.topmost_cb = ttk.Checkbutton(
|
||
self.set_frame,
|
||
text="窗口置顶",
|
||
variable=self.topmost_var,
|
||
command=self.toggle_topmost
|
||
)
|
||
self.topmost_cb.grid(row=0, column=2, padx=10, pady=10)
|
||
|
||
# 增加Treeview显示
|
||
style = ttk.Style()
|
||
style.configure('Treeview', rowheight=20) # 设置行高
|
||
|
||
self.tree_frame = ttk.Frame(self.root)
|
||
self.tree_frame.grid(row=2, column=0, rowspan=1, padx=10, pady=10, sticky="ew")
|
||
# 增加图表显示
|
||
self.setup_treeview()
|
||
|
||
# 增加导出按钮
|
||
export_btn = ttk.Button(self.set_frame, text="测试", command=self.showdate)
|
||
export_btn.grid(row=0, column=0, padx=10, pady=10)
|
||
|
||
# 新增:创业板筛选复选框
|
||
self.gem_only_var = tk.BooleanVar()
|
||
self.gem_only_cb = ttk.Checkbutton(
|
||
self.set_frame,
|
||
text="仅显示创业板",
|
||
variable=self.gem_only_var,
|
||
command=self.toggle_gem_filter
|
||
)
|
||
self.gem_only_cb.grid(row=0, column=4, padx=10, pady=10)
|
||
|
||
# 创建图表区域
|
||
self.chart_frame = ttk.Frame(self.tree_frame)
|
||
self.chart_frame.grid(row=2, column=0, rowspan=5, padx=10, pady=10)
|
||
self.fig, self.ax = plt.subplots(figsize=(6, 4))
|
||
self.canvas = FigureCanvasTkAgg(self.fig, master=self.chart_frame)
|
||
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
|
||
|
||
# 在 setup_ui 方法中找到放置按钮的位置,并添加如下代码:
|
||
plot_btn = ttk.Button(self.set_frame, text="生成折线图", command=self.plot_monthly_data)
|
||
plot_btn.grid(row=0, column=3, padx=10, pady=10)
|
||
|
||
def setup_treeview(self):
|
||
"""设置Treeview组件"""
|
||
# 配置列属性
|
||
self.base_columns = [
|
||
('code', '代码', 80),
|
||
('name', '名称', 80),
|
||
('open_zf', '开盘涨幅', 80),
|
||
('day_zf', '当日涨幅', 80), # 新增当日涨幅列
|
||
('day_zz', '当日振幅', 80), # 新增当日振幅列
|
||
('lt_pan', '流通盘', 80),
|
||
('target', '目标价', 80),
|
||
('status', '状态', 60),
|
||
# ('limit_up', '涨停标记', 60), # 新增涨停列
|
||
]
|
||
# # 添加动态获利天数列
|
||
days = int(self.count_days)
|
||
for day in range(1, days+1):
|
||
self.base_columns.append((f'profit_{day}', f'T{day}获利%', 60))
|
||
|
||
self.tree = ttk.Treeview(
|
||
self.tree_frame,
|
||
columns=[col[0] for col in self.base_columns],
|
||
show='headings',
|
||
height=20 # 设置默认显示行数
|
||
)
|
||
# 文字新增样式配置
|
||
self.tree.tag_configure('triggered', foreground='red', font=('Verdana', 12, 'bold')) # 修改样式配置
|
||
self.tree.tag_configure('wrong_buy', foreground='green', font=('Verdana', 11, 'bold')) # 修改样式配置
|
||
|
||
self.tree.grid(row=0, column=0, sticky="nsew") # 修改为 nsew 让控件向四个方向扩展
|
||
|
||
for col_id, col_text, col_width in self.base_columns:
|
||
self.tree.heading(col_id, text=col_text,
|
||
command=lambda c=col_id: self.treeview_sort_column(self.tree, c, False))
|
||
self.tree.heading(col_id, text=col_text) # 设置列标题
|
||
self.tree.column(col_id, width=col_width, anchor=tk.CENTER)
|
||
|
||
scrollbar = ttk.Scrollbar(self.tree_frame, orient="vertical", command=self.tree.yview)
|
||
scrollbar.grid(row=0, column=1, sticky="ns")
|
||
self.tree.configure(yscrollcommand=scrollbar.set)
|
||
|
||
# 添加以下代码让 tree_frame 的列可以扩展
|
||
self.tree_frame.grid_columnconfigure(0, weight=1)
|
||
|
||
# 新增:绑定双击事件
|
||
self.tree.bind("<Double-1>", self.on_tree_double_click)
|
||
|
||
def toggle_gem_filter(self):
|
||
"""切换仅显示创业板状态时重新加载数据"""
|
||
self.load_data()
|
||
|
||
def get_pure_code(self, code_with_suffix):
|
||
"""去掉股票代码的 .SH 或 .SZ 后缀"""
|
||
if isinstance(code_with_suffix, str):
|
||
if '.' in code_with_suffix:
|
||
return code_with_suffix.split('.')[0]
|
||
return code_with_suffix
|
||
|
||
# 新增:处理 TreeView 双击事件
|
||
def on_tree_double_click(self, event):
|
||
"""处理 TreeView 的双击事件"""
|
||
item = self.tree.selection() # 获取选中的行
|
||
if item:
|
||
values = self.tree.item(item)['values'] # 获取该行的值
|
||
if values:
|
||
code = values[0] # 提取股票代码
|
||
print(code)
|
||
codes = self.get_pure_code(code)
|
||
self.place_order(codes,0) # 调用下单方法
|
||
|
||
# 新增:下单方法
|
||
def place_order(self, code, tag=0):
|
||
"""下单方法"""
|
||
try:
|
||
# 调用 OrderExecutor 的下单功能
|
||
self.order_executor.place_order(code, auto_push=tag) # auto_push=1 表示自动下单
|
||
print("下单成功", f"已成功下单股票代码: {code}")
|
||
except Exception as e:
|
||
print("下单失败", f"下单失败: {str(e)}")
|
||
|
||
def treeview_sort_column(self, col, reverse):
|
||
"""点击表头排序功能"""
|
||
# 获取当前所有行数据
|
||
l = [(self.tree.set(k, col), k) for k in self.tree.get_children('')]
|
||
|
||
# 特殊处理状态列排序
|
||
if col == 'status':
|
||
# 修改优先级顺序,已触发排最前,错买其次,监控中最后
|
||
priority_order = {'已触发': 0, '错买': 1, '监控中': 2}
|
||
l.sort(key=lambda t: priority_order.get(t[0], 3), reverse=reverse)
|
||
elif col == 'limit_up':
|
||
# 处理涨停列排序,'是' 排前面
|
||
l.sort(key=lambda t: (t[0] != '是'), reverse=reverse)
|
||
else:
|
||
# 尝试转换为数字排序
|
||
try:
|
||
l.sort(key=lambda t: float(t[0].replace('%', '')), reverse=reverse)
|
||
except (ValueError, AttributeError):
|
||
l.sort(reverse=reverse)
|
||
|
||
# 重新排列Treeview中的行
|
||
for index, (val, k) in enumerate(l):
|
||
self.tree.move(k, '', index)
|
||
|
||
# 反转排序顺序
|
||
self.tree.heading(col, command=lambda: self.treeview_sort_column(col, not reverse))
|
||
|
||
def export_to_excel(self):
|
||
"""将Treeview数据导出到Excel文件"""
|
||
try:
|
||
# 检查Treeview是否为空
|
||
if not self.tree.get_children():
|
||
tk.messagebox.showwarning("导出警告", "当前没有可导出的数据!")
|
||
return
|
||
|
||
# 检查并创建目录
|
||
if not os.path.exists(self.huice_data_dir):
|
||
os.makedirs(self.huice_data_dir)
|
||
|
||
# 获取当前日期作为默认文件名
|
||
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
filename = f"{self.huice_data_dir}\短线速逆回测数据_{now}.xlsx"
|
||
|
||
# 获取Treeview所有数据
|
||
data = []
|
||
columns = self.tree["columns"]
|
||
data.append(columns) # 添加表头
|
||
|
||
for item in self.tree.get_children():
|
||
values = [self.tree.set(item, col) for col in columns]
|
||
data.append(values)
|
||
|
||
# 创建DataFrame并保存为Excel
|
||
df = pd.DataFrame(data[1:], columns=data[0])
|
||
df.to_excel(filename, index=False)
|
||
|
||
# 提示导出成功
|
||
tk.messagebox.showinfo("导出成功", f"数据已成功导出到: {filename}")
|
||
except Exception as e:
|
||
tk.messagebox.showerror("导出错误", f"导出失败: {str(e)}")
|
||
|
||
def format_code(self, code):
|
||
# 为股票代码添加后缀
|
||
code = f"{code:0>6}" # 确保代码是6位,不足前面补零
|
||
if code.startswith(("6", "9")):
|
||
f_code = f"{code}.SH"
|
||
elif code.startswith(("0", "2", "3")):
|
||
f_code = f"{code}.SZ"
|
||
else:
|
||
print(f"未知的股票代码格式: {code}")
|
||
return None
|
||
return f_code
|
||
|
||
def toggle_topmost(self):
|
||
"""切换窗口置顶状态"""
|
||
self.root.attributes('-topmost', self.topmost_var.get())
|
||
if self.topmost_var.get():
|
||
status = "窗口已置顶"
|
||
else:
|
||
status = "取消窗口置顶"
|
||
|
||
# 获取当前时间
|
||
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
print(f"[{current_time}] {status}")
|
||
|
||
|
||
# 读取历史数据内容, 写入 treeview
|
||
def load_data(self):
|
||
"""加载数据"""
|
||
self.trigger_count_label.config(text="数据加载中...")
|
||
year = self.year_var.get()
|
||
month = self.month_var.get().zfill(2) # 补零
|
||
date = self.date_var.get().zfill(2) # 补零
|
||
trade_date = f"{year}{month}{date}" # 格式化日期
|
||
file_name = f"{year}{month}{date}.txt"
|
||
# 修改原路径拼接方式
|
||
file_path = os.path.join(self.history_data_dir, f"{year}{month}{date}.txt")
|
||
|
||
# 检查文件是否存在
|
||
if not os.path.exists(file_path):
|
||
tk.messagebox.showwarning("文件不存在", f"未找到文件: {file_path}")
|
||
return
|
||
|
||
# 清空Treeview
|
||
for item in self.tree.get_children():
|
||
self.tree.delete(item)
|
||
|
||
try:
|
||
with open(file_path, 'r') as f:
|
||
# 读取标题行并验证列
|
||
headers = next(f).strip().split(',')
|
||
required_cols = ['条件选股', '代码']
|
||
for col in required_cols:
|
||
if col not in headers:
|
||
raise KeyError(f"缺少必要列:{col}")
|
||
# 获取列索引
|
||
name_col = headers.index('条件选股')
|
||
code_col = headers.index('代码')
|
||
|
||
# 批量获取历史数据
|
||
codes = []
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith('#'):
|
||
continue
|
||
parts = line.split(',')
|
||
if len(parts) <= max(name_col, code_col):
|
||
continue
|
||
code = parts[code_col].strip()
|
||
# 确保代码是字符串格式
|
||
if isinstance(code, float): # 如果code是浮点数
|
||
code = str(int(code)) if code.is_integer() else str(code)
|
||
code = f"{code:0>6}" # 标准化为6位代码
|
||
|
||
# 判断是否仅显示创业板
|
||
if self.gem_only_var.get():
|
||
if code.startswith("30"): # 创业板股票代码以 "30" 开头
|
||
codes.append(self.format_code(code))
|
||
else:
|
||
codes.append(self.format_code(code))
|
||
|
||
days = int(self.count_days) + 1 # 增加天数
|
||
# print(trade_date)
|
||
trade_dates = [None] * days # 初始化列表,用于存储未来的交易日
|
||
for day in range(0, days):
|
||
#print(day)
|
||
trade_dates[day] = self.get_next_trade_date(trade_date, day)
|
||
print(trade_dates)
|
||
# 获取历史收盘价--------------------------
|
||
import time
|
||
MAX_RETRIES = 3
|
||
|
||
for attempt in range(MAX_RETRIES):
|
||
try:
|
||
df_all = self.pro.daily(
|
||
ts_code=','.join(codes),
|
||
start_date=min(trade_date, *trade_dates),
|
||
end_date=max(trade_date, *trade_dates)
|
||
)
|
||
break
|
||
except Exception as e:
|
||
if attempt == MAX_RETRIES - 1:
|
||
raise
|
||
time.sleep(2 ** attempt) # 指数退避
|
||
# print(df_all)
|
||
|
||
if df_all.empty:
|
||
raise ValueError("未找到历史数据,请确认:\n1.日期是否为交易日\n2.股票代码是否正确")
|
||
|
||
# 创建价格映射表(包含T0和T1的价格数据)
|
||
price_map = {}
|
||
|
||
for _, row in df_all.iterrows():
|
||
ts_code = row['ts_code']
|
||
trade_date = row['trade_date']
|
||
|
||
# 添加各天的数据
|
||
for day in range(1, days + 1):
|
||
if trade_date == trade_dates[day - 1]:
|
||
price_map.setdefault(ts_code, {})[f'T{day}_open'] = row['open']
|
||
price_map.setdefault(ts_code, {})[f'T{day}_close'] = row['close']
|
||
price_map.setdefault(ts_code, {})[f'T{day}_high'] = row['high']
|
||
price_map.setdefault(ts_code, {})[f'T{day}_low'] = row['low'] # 添加最低价
|
||
price_map.setdefault(ts_code, {})[f'T{day}_pct_chg'] = row['pct_chg'] # 当日涨幅
|
||
price_map.setdefault(ts_code, {})[f'T{day}_pre_close'] = row['pre_close'] # 前一日收盘价
|
||
# price_map.setdefault(ts_code, {})[f'T{day}_limit'] = row['limit_status'] # 假设 limit_status 表示涨停状态
|
||
print(price_map)
|
||
# 回到文件开头并处理数据
|
||
f.seek(0)
|
||
next(f) # 跳过标题行
|
||
for line in f:
|
||
line = line.strip()
|
||
# 计算涨停标记
|
||
if not line or line.startswith('#'):
|
||
continue
|
||
parts = line.split(',')
|
||
if len(parts) <= max(name_col, code_col):
|
||
raise ValueError(f"数据行字段不足:{line}")
|
||
name = parts[name_col].strip()
|
||
code = parts[code_col].strip()
|
||
|
||
# 确保代码保持字符串格式,保留前导零
|
||
code = f"{code:0>6}" # 确保代码是6位,不足前面补零
|
||
f_code = self.format_code(code)
|
||
|
||
if not f_code: # 添加检查,如果格式不正确则跳过
|
||
print(f"跳过 {code},股票代码格式不正确")
|
||
continue
|
||
|
||
if f_code not in price_map or 'T1_close' not in price_map[f_code]:
|
||
print(f"跳过 {code},未找到收盘价数据")
|
||
continue
|
||
|
||
# 计算重要数据
|
||
T1_close = price_map[f_code]['T1_close']
|
||
T2_open = price_map[f_code]['T2_open']
|
||
T2_high = price_map[f_code]['T2_high']
|
||
T2_close = price_map[f_code]['T2_close']
|
||
# 计算目标价
|
||
target_pic = round(float(T1_close) * 1.049, 2)
|
||
kp_zf = round(float(T2_open) / float(T1_close) - 1, 4) * 100
|
||
status = self.check_status(target_pic, T2_high, T2_close)
|
||
|
||
lt_pan = 0 # 初始值,实际值会在异步线程中更新
|
||
|
||
profit_values = []
|
||
for day in range(2, days + 1):
|
||
if f'T{day}_close' in price_map.get(f_code, {}):
|
||
if day == 2:
|
||
day_close = price_map[f_code][f'T{day}_close']
|
||
else:
|
||
day_close = price_map[f_code][f'T{day}_high']
|
||
profit_pct = round((float(day_close) - float(target_pic)) / float(target_pic) * 100, 2)
|
||
profit_values.append(f"{profit_pct:.2f}%")
|
||
else:
|
||
profit_values.append("-")
|
||
limit_up_flag = False
|
||
for day in range(1, self.days_for_limit_up + 1):
|
||
limit_key = f'T{day}_limit'
|
||
if limit_key in price_map.get(f_code, {}) and price_map[f_code][limit_key] == 'U':
|
||
limit_up_flag = True
|
||
break
|
||
# 从缓存获取流通盘数据
|
||
self.add_stock(code, name, kp_zf, target_pic, lt_pan, status, profit_values, price_map)
|
||
|
||
# 总数
|
||
total_stocks = len(self.tree.get_children())
|
||
status_col = self.tree['columns'].index('status')
|
||
triggered_count = 0
|
||
for item in self.tree.get_children():
|
||
status = self.tree.item(item)['values'][status_col]
|
||
if status == '触发' or status == '错买':
|
||
triggered_count += 1
|
||
|
||
# 正确写法:
|
||
self.choose_frame.after(0, lambda: self.trigger_count_label.config(text=f"已触发股票数量:{triggered_count}/{total_stocks}"))
|
||
|
||
except Exception as e:
|
||
print(f"加载文件失败:{str(e)}", 'error')
|
||
self.trigger_count_label.config(text="加载失败")
|
||
|
||
def plot_monthly_data(self):
|
||
"""绘制所选月份每日股票数量的折线图"""
|
||
selected_year = self.year_var.get()
|
||
selected_month = self.month_var.get().zfill(2)
|
||
|
||
if not selected_year or not selected_month:
|
||
print("请选择年份和月份")
|
||
return
|
||
|
||
# 构建日期前缀 (格式: YYYYMM)
|
||
date_prefix = f"{selected_year}{selected_month}"
|
||
|
||
# 统计每天的数据量
|
||
daily_counts = {}
|
||
for filename in os.listdir(self.history_data_dir):
|
||
if filename.startswith(date_prefix) and filename.endswith('.txt'):
|
||
try:
|
||
# 提取日期部分 (格式: YYYYMMDD.txt)
|
||
day = filename[6:8] # 获取DD部分
|
||
date_key = f"{selected_year}-{selected_month}-{day}"
|
||
file_path = os.path.join(self.history_data_dir, filename)
|
||
|
||
# 统计文件行数(不包括标题行)
|
||
with open(file_path, 'r') as f:
|
||
lines = f.readlines()
|
||
count = len(lines) - 1 # 减去标题行
|
||
daily_counts[date_key] = count
|
||
except Exception as e:
|
||
print(f"处理文件 {filename} 时出错: {e}")
|
||
continue
|
||
|
||
if not daily_counts:
|
||
print("没有数据可供绘图")
|
||
return
|
||
|
||
# 清除之前的图表内容
|
||
self.ax.clear()
|
||
|
||
# 按照日期排序,并提取天数作为x轴标签
|
||
sorted_full_dates = sorted(daily_counts.keys())
|
||
|
||
# 只保留“日”作为x轴标签
|
||
x_labels = [date.split('-')[2] for date in sorted_full_dates]
|
||
counts = [daily_counts[date] for date in sorted_full_dates]
|
||
|
||
# 使用整数索引作为x轴,之后设置自定义标签
|
||
x_indices = list(range(len(sorted_full_dates)))
|
||
|
||
# 绘制折线图
|
||
self.ax.plot(x_indices, counts, marker='o', linestyle='-', color='blue')
|
||
self.ax.set_title(f'{selected_year}-{selected_month} 每日股票数量统计')
|
||
self.ax.set_xlabel('日期(日)')
|
||
self.ax.set_ylabel('股票数量')
|
||
self.ax.grid(True)
|
||
|
||
# 设置x轴刻度和标签
|
||
self.ax.set_xticks(x_indices)
|
||
self.ax.set_xticklabels(x_labels, rotation=45, ha='right')
|
||
|
||
# 更新画布
|
||
self.canvas.draw()
|
||
|
||
def check_status(self, target_pic, high_pic, close_pic):
|
||
"""检查股票状态"""
|
||
if high_pic > target_pic and close_pic >= target_pic:
|
||
return '触发'
|
||
if high_pic >= target_pic and close_pic < target_pic:
|
||
return '错买'
|
||
return '未触发'
|
||
|
||
def add_stock(self, code, name, open_zf, target, lt_pan=0, status='计算中', profit_values=None, price_map=None):
|
||
"""添加股票到监控列表"""
|
||
codes = self.format_code(str(code))
|
||
# 构建values列表,包含基础列和多天获利数据
|
||
# 获取T1收盘价
|
||
if price_map is not None and codes in price_map and 'T1_close' in price_map[codes]:
|
||
# print(codes)
|
||
t1_close = price_map[codes]['T1_close']
|
||
t1_open = price_map[codes]['T1_open']
|
||
t1_pct_chg = price_map[codes]['T1_pct_chg']
|
||
t1_pre_close = price_map[codes]['T1_pre_close']
|
||
|
||
else:
|
||
t1_close = 0
|
||
print(f"跳过 {code},未找到T1收盘价数据")
|
||
|
||
# 计算S点日涨幅(T2_open与T1_close的涨跌幅)
|
||
if price_map is not None and codes in price_map and 'T1_open' in price_map[codes] and t1_close > 0:
|
||
day_zf = round(float(t1_pct_chg), 2)
|
||
else:
|
||
day_zf = 0
|
||
|
||
# 计算当日振幅(T1_high与T1_low的差值相对于T0_close的百分比)
|
||
if (price_map is not None and codes in price_map and
|
||
'T1_high' in price_map[codes] and 'T1_low' in price_map[codes] and t1_close > 0):
|
||
day_zz = round(
|
||
(float(price_map[codes]['T1_high']) - float(price_map[codes]['T1_low'])) / float(t1_pre_close) * 100, 2)
|
||
else:
|
||
day_zz = 0
|
||
|
||
values = [
|
||
codes,
|
||
name,
|
||
f"{open_zf:.2f}%",
|
||
f"{day_zf:.2f}%", # 当日涨幅
|
||
f"{day_zz:.2f}%", # 当日振幅
|
||
f"{lt_pan:.2f}",
|
||
target,
|
||
status
|
||
]
|
||
# 添加获利数据
|
||
if profit_values:
|
||
values.extend(profit_values)
|
||
else:
|
||
# 如果没有提供profit_values,添加默认值
|
||
values.extend(["-"] * self.count_days)
|
||
|
||
item = self.tree.insert('', 'end', values=values)
|
||
# 根据状态设置标签样式
|
||
if status == '触发':
|
||
self.tree.item(item, tags=('triggered',))
|
||
elif status == '错买':
|
||
self.tree.item(item, tags=('wrong_buy',))
|
||
|
||
|
||
|
||
# 以下是原有函数改为类方法
|
||
def treeview_sort_column(self, tv, col, reverse):
|
||
"""对 Treeview 的列进行排序"""
|
||
data = [(tv.set(k, col), k) for k in tv.get_children("")]
|
||
try:
|
||
data.sort(key=lambda t: float(t[0].rstrip("%")), reverse=reverse)
|
||
except ValueError:
|
||
data.sort(reverse=reverse)
|
||
|
||
for index, (val, k) in enumerate(data):
|
||
tv.move(k, "", index)
|
||
|
||
tv.heading(col, command=lambda: self.treeview_sort_column(tv, col, not reverse))
|
||
|
||
def get_next_trade_date(self, date, n=1) -> str:
|
||
"""获取n个交易日后的日期"""
|
||
try:
|
||
# 标准化输入日期格式
|
||
clean_date = date.replace("-", "")
|
||
if len(clean_date) != 8:
|
||
raise ValueError("日期格式应为YYYYMMDD")
|
||
|
||
# 检查缓存中是否已有该日期的交易日历
|
||
if clean_date not in self.trade_cal_cache:
|
||
# 从API获取交易日历(获取前后30天的数据以确保覆盖)
|
||
start_date = (datetime.datetime.strptime(clean_date, "%Y%m%d") -
|
||
datetime.timedelta(days=30)).strftime("%Y%m%d")
|
||
end_date = (datetime.datetime.strptime(clean_date, "%Y%m%d") +
|
||
datetime.timedelta(days=30)).strftime("%Y%m%d")
|
||
|
||
trade_cal = self.pro.trade_cal(exchange='', start_date=start_date, end_date=end_date)
|
||
# 只保留交易日
|
||
trade_dates = trade_cal[trade_cal['is_open'] == 1]['cal_date'].tolist()
|
||
self.trade_cal_cache[clean_date] = sorted(trade_dates)
|
||
|
||
trade_dates = self.trade_cal_cache[clean_date]
|
||
current_index = trade_dates.index(clean_date)
|
||
|
||
if current_index + n >= len(trade_dates):
|
||
# 如果超出范围,返回计算的工作日日期
|
||
future_date = (datetime.datetime.strptime(clean_date, "%Y%m%d") +
|
||
datetime.timedelta(days=n)).strftime("%Y%m%d")
|
||
return future_date
|
||
|
||
return trade_dates[current_index + n]
|
||
|
||
except Exception as e:
|
||
print(f"获取交易日历时出错: {e}")
|
||
# 如果API查询失败,返回计算的工作日日期
|
||
try:
|
||
future_date = (datetime.datetime.strptime(clean_date, "%Y%m%d") +
|
||
datetime.timedelta(days=n)).strftime("%Y%m%d")
|
||
return future_date
|
||
except:
|
||
return None
|
||
|
||
def showdate(self):
|
||
next_trade_date = self.get_next_trade_date("20250516", 1) # 获取20240101的下一个交易日
|
||
print(f"下一个交易日是: {next_trade_date}")
|
||
print(self.trade_cal_cache)
|
||
|
||
def update_months(self):
|
||
"""更新月份"""
|
||
selected_year = self.year_var.get()
|
||
months = set()
|
||
# 遍历历史数据目录
|
||
for filename in os.listdir(self.history_data_dir):
|
||
if filename.startswith(selected_year) and filename.endswith('.txt'):
|
||
try:
|
||
# 提取月份部分 (格式: YYYYMMDD.txt)
|
||
month = filename[4:6] # 获取MM部分
|
||
months.add(month)
|
||
except (IndexError, ValueError):
|
||
continue
|
||
|
||
# 将月份排序并更新下拉菜单
|
||
sorted_months = sorted(months)
|
||
month_menu = self.choose_frame.children['!combobox2'] # 获取月份下拉框
|
||
month_menu['values'] = sorted_months
|
||
|
||
# 如果有月份数据,默认选择第一个
|
||
if sorted_months:
|
||
self.month_var.set(sorted_months[0])
|
||
self.update_dates() # 自动更新日期
|
||
|
||
def update_dates(self):
|
||
"""更新日期"""
|
||
selected_year = self.year_var.get()
|
||
selected_month = self.month_var.get().zfill(2)
|
||
if not 1 <= int(selected_month) <= 12:
|
||
tk.messagebox.showerror("错误", "无效的月份")
|
||
return
|
||
dates = []
|
||
# 构建日期前缀 (格式: YYYYMM)
|
||
date_prefix = f"{selected_year}{selected_month}"
|
||
# 遍历历史数据目录
|
||
for filename in os.listdir(self.history_data_dir):
|
||
if filename.startswith(date_prefix) and filename.endswith('.txt'):
|
||
try:
|
||
# 提取日期部分 (格式: YYYYMMDD.txt)
|
||
day = filename[6:8] # 获取DD部分
|
||
dates.append(day)
|
||
except (IndexError, ValueError):
|
||
continue
|
||
|
||
# 将日期排序并更新下拉菜单
|
||
sorted_dates = sorted(dates)
|
||
date_menu = self.choose_frame.children['!combobox3'] # 获取日期下拉框
|
||
date_menu['values'] = sorted_dates
|
||
|
||
# 如果有日期数据,默认选择第一个
|
||
if sorted_dates:
|
||
self.date_var.set(sorted_dates[0])
|
||
|
||
|
||
class DataDownloaderApp:
|
||
def __init__(self):
|
||
# self.downloader = DataDownloader()
|
||
self.root = tk.Tk()
|
||
self.running_threads = []
|
||
self.setup_ui()
|
||
|
||
def setup_ui(self):
|
||
# 添加UI设置代码
|
||
self.root.title("数据下载器")
|
||
|
||
# 创建复选框
|
||
self.update_codes_var = tk.BooleanVar()
|
||
self.update_index_var = tk.BooleanVar()
|
||
self.update_stocks_var = tk.BooleanVar()
|
||
|
||
tk.Checkbutton(self.root, text="更新股票代码表", variable=self.update_codes_var).pack(anchor=tk.W)
|
||
tk.Checkbutton(self.root, text="更新指数数据", variable=self.update_index_var).pack(anchor=tk.W)
|
||
tk.Checkbutton(self.root, text="更新个股数据", variable=self.update_stocks_var).pack(anchor=tk.W)
|
||
|
||
# 创建按钮
|
||
tk.Button(self.root, text="开始更新", command=self.on_update_button_click).pack(pady=10)
|
||
|
||
# 创建状态标签
|
||
self.codes_completion_label = tk.Label(self.root, text="")
|
||
self.codes_completion_label.pack()
|
||
self.index_completion_label = tk.Label(self.root, text="")
|
||
self.index_completion_label.pack()
|
||
self.stocks_completion_label = tk.Label(self.root, text="")
|
||
self.stocks_completion_label.pack()
|
||
|
||
def on_update_button_click(self):
|
||
# 添加按钮点击事件处理代码
|
||
if self.update_codes_var.get():
|
||
thread = threading.Thread(target=self.update_codes)
|
||
thread.start()
|
||
self.running_threads.append(thread)
|
||
|
||
if self.update_index_var.get():
|
||
thread = threading.Thread(target=self.update_index)
|
||
thread.start()
|
||
self.running_threads.append(thread)
|
||
|
||
if self.update_stocks_var.get():
|
||
thread = threading.Thread(target=self.update_stocks)
|
||
thread.start()
|
||
self.running_threads.append(thread)
|
||
|
||
def update_codes(self):
|
||
# 添加更新股票代码表的代码
|
||
self.codes_completion_label.config(text="正在更新股票代码表...")
|
||
self.downloader.update_codes()
|
||
self.codes_completion_label.config(text="股票代码表更新完成!")
|
||
|
||
def update_index(self):
|
||
# 添加更新指数数据的代码
|
||
self.index_completion_label.config(text="正在更新指数数据...")
|
||
self.downloader.update_index()
|
||
self.index_completion_label.config(text="指数数据更新完成!")
|
||
|
||
def update_stocks(self):
|
||
# 添加更新个股数据的代码
|
||
self.stocks_completion_label.config(text="正在更新个股数据...")
|
||
self.downloader.update_stocks()
|
||
self.stocks_completion_label.config(text="个股数据更新完成!")
|
||
|
||
|
||
# 运行应用
|
||
if __name__ == "__main__":
|
||
root = tk.Tk()
|
||
app = StockAnalysisApp(root)
|
||
root.mainloop() |