Files
real_view/短线速逆_历史.py
2025-07-28 12:15:45 +08:00

820 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import tkinter as tk
from tkinter import ttk
from tkinter import messagebox # 👈 新增这一行
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import pandas as pd
import os
import logging
import tushare as ts
import datetime
import glob
from functools import lru_cache
import threading # 👈 新增这一行
from order_executor import OrderExecutor
plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置中文字体
plt.rcParams['axes.unicode_minus'] = False # 解决负号 '-' 显示为方块的问题
@lru_cache(maxsize=100)
class StockAnalysisApp:
def __init__(self, root):
self.root = root
self.root.title("短线速逆-股票数据分析")
self.root.geometry("1000x1000")
self.days_var = tk.StringVar(value="1") # 默认显示1天
self.count_days = 3 # 用于计算的天数
self.days_for_limit_up = 5 # 公开参数,判断几天内是否有涨停
self.trade_cal_cache = {} # 新增交易日历缓存字典
# 初始化配置
self.setup_config()
self.setup_ui()
# 新增:初始化 OrderExecutor 实例
self.order_executor = OrderExecutor()
def setup_config(self):
"""初始化配置"""
# 初始化 Matplotlib 字体设置
# plt.rcParams['font.sans-serif'] = ['SimHei']
# plt.rcParams['axes.unicode_minus'] = False
# 设置 Tushare API Token
ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d')
self.pro = ts.pro_api()
# 数据目录
self.day_data_dir = r"D:\gp_data\day"
self.history_data_dir = r"D:\gp_data\history"
self.huice_data_dir = r"D:\gp_data\huice"
# 初始化变量
self.year_var = tk.StringVar()
self.month_var = tk.StringVar()
self.date_var = tk.StringVar()
self.offline_mode = tk.BooleanVar(value=False)
self.selected_board = tk.StringVar(value="all")
self.days_var = tk.StringVar(value="1")
self.TARGET_RATIO = 1.049 # 新增配置项
self.MIN_LIMIT_UP_COUNT = 2
def setup_ui(self):
"""设置用户界面"""
self.choose_frame = ttk.Frame(self.root)
self.choose_frame.grid(row=0, column=0, rowspan=1, padx=10, pady=10, sticky="ew")
self.root.grid_columnconfigure(0, weight=1)
# 年份选择框
current_year = datetime.datetime.now().year
years = [str(current_year - i) for i in range(3)]
# 创建蓝色大字体样式
style = ttk.Style()
style.configure('Blue.TLabel', font=('微软雅黑', 12), foreground='blue')
style.configure('Blue.TButton', font=('微软雅黑', 12), foreground='blue')
style.configure('Red.TButton', font=('微软雅黑', 14), foreground='red')
style.configure('Blue.TCombobox', font=('微软雅黑', 14), foreground='red')
style.map('Blue.TCombobox',
fieldbackground=[('readonly', 'white')],
foreground=[('readonly', 'blue')],
selectbackground=[('readonly', 'white')],
selectforeground=[('readonly', 'blue')])
ttk.Label(self.choose_frame, text="选择年份:", style='Blue.TLabel').grid(row=0, column=0, padx=5, pady=10, sticky="w")
self.year_var.set(years[0])
year_menu = ttk.Combobox(self.choose_frame, textvariable=self.year_var, values=years, width=8, style='Blue.TCombobox')
year_menu.grid(row=0, column=1, padx=1, pady=5, sticky="w")
year_menu.bind("<<ComboboxSelected>>", lambda _: self.update_months())
# 月份选择框
ttk.Label(self.choose_frame, text="选择月份:", style='Blue.TLabel').grid(row=0, column=2, padx=2, pady=10)
month_menu = ttk.Combobox(self.choose_frame, textvariable=self.month_var, width=10, state='readonly', style='Blue.TCombobox')
month_menu.grid(row=0, column=3, padx=2, pady=10)
month_menu.bind("<<ComboboxSelected>>", lambda _: (self.update_dates(), self.plot_monthly_data()))
# 日期选择框
ttk.Label(self.choose_frame, text="选择日期:", style='Blue.TLabel').grid(row=0, column=4, padx=10, pady=10)
date_menu = ttk.Combobox(self.choose_frame, textvariable=self.date_var, width=10, state='readonly', style='Blue.TCombobox')
date_menu.grid(row=0, column=5, padx=10, pady=10)
# date_menu.bind("<<ComboboxSelected>>", lambda _: (self.update_plot()))
# 增加读取按钮
load_btn = ttk.Button(self.choose_frame, text="读取数据", style='Blue.TButton', command=self.load_data)
load_btn.grid(row=0, column=6, padx=10, pady=10)
# 增加导出按钮
export_btn = ttk.Button(self.choose_frame, text="导出Excel", style='Red.TButton', command=self.export_to_excel)
export_btn.grid(row=0, column=7, padx=10, pady=10)
self.set_frame = ttk.Frame(self.root)
self.set_frame.grid(row=1, column=0, rowspan=1, padx=10, pady=10, sticky="ew")
# 显示触发股票数
self.trigger_count_label = ttk.Label(
self.set_frame,
text="已触发股票数量0",
font=('微软雅黑', 14),
foreground='red'
)
self.trigger_count_label.grid(row=0, column=1, padx=10, pady=10)
# self.trigger_count_label.pack(padx=10, pady=5, side=tk.LEFT, anchor=tk.W)
# 窗口置顶复选框
self.topmost_var = tk.BooleanVar()
self.topmost_cb = ttk.Checkbutton(
self.set_frame,
text="窗口置顶",
variable=self.topmost_var,
command=self.toggle_topmost
)
self.topmost_cb.grid(row=0, column=2, padx=10, pady=10)
# 增加Treeview显示
style = ttk.Style()
style.configure('Treeview', rowheight=20) # 设置行高
self.tree_frame = ttk.Frame(self.root)
self.tree_frame.grid(row=2, column=0, rowspan=1, padx=10, pady=10, sticky="ew")
# 增加图表显示
self.setup_treeview()
# 增加导出按钮
export_btn = ttk.Button(self.set_frame, text="测试", command=self.showdate)
export_btn.grid(row=0, column=0, padx=10, pady=10)
# 新增:创业板筛选复选框
self.gem_only_var = tk.BooleanVar()
self.gem_only_cb = ttk.Checkbutton(
self.set_frame,
text="仅显示创业板",
variable=self.gem_only_var,
command=self.toggle_gem_filter
)
self.gem_only_cb.grid(row=0, column=4, padx=10, pady=10)
# 创建图表区域
self.chart_frame = ttk.Frame(self.tree_frame)
self.chart_frame.grid(row=2, column=0, rowspan=5, padx=10, pady=10)
self.fig, self.ax = plt.subplots(figsize=(6, 4))
self.canvas = FigureCanvasTkAgg(self.fig, master=self.chart_frame)
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
# 在 setup_ui 方法中找到放置按钮的位置,并添加如下代码:
plot_btn = ttk.Button(self.set_frame, text="生成折线图", command=self.plot_monthly_data)
plot_btn.grid(row=0, column=3, padx=10, pady=10)
def setup_treeview(self):
"""设置Treeview组件"""
# 配置列属性
self.base_columns = [
('code', '代码', 80),
('name', '名称', 80),
('open_zf', '开盘涨幅', 80),
('day_zf', '当日涨幅', 80), # 新增当日涨幅列
('day_zz', '当日振幅', 80), # 新增当日振幅列
('lt_pan', '流通盘', 80),
('target', '目标价', 80),
('status', '状态', 60),
# ('limit_up', '涨停标记', 60), # 新增涨停列
]
# # 添加动态获利天数列
days = int(self.count_days)
for day in range(1, days+1):
self.base_columns.append((f'profit_{day}', f'T{day}获利%', 60))
self.tree = ttk.Treeview(
self.tree_frame,
columns=[col[0] for col in self.base_columns],
show='headings',
height=20 # 设置默认显示行数
)
# 文字新增样式配置
self.tree.tag_configure('triggered', foreground='red', font=('Verdana', 12, 'bold')) # 修改样式配置
self.tree.tag_configure('wrong_buy', foreground='green', font=('Verdana', 11, 'bold')) # 修改样式配置
self.tree.grid(row=0, column=0, sticky="nsew") # 修改为 nsew 让控件向四个方向扩展
for col_id, col_text, col_width in self.base_columns:
self.tree.heading(col_id, text=col_text,
command=lambda c=col_id: self.treeview_sort_column(self.tree, c, False))
self.tree.heading(col_id, text=col_text) # 设置列标题
self.tree.column(col_id, width=col_width, anchor=tk.CENTER)
scrollbar = ttk.Scrollbar(self.tree_frame, orient="vertical", command=self.tree.yview)
scrollbar.grid(row=0, column=1, sticky="ns")
self.tree.configure(yscrollcommand=scrollbar.set)
# 添加以下代码让 tree_frame 的列可以扩展
self.tree_frame.grid_columnconfigure(0, weight=1)
# 新增:绑定双击事件
self.tree.bind("<Double-1>", self.on_tree_double_click)
def toggle_gem_filter(self):
"""切换仅显示创业板状态时重新加载数据"""
self.load_data()
def get_pure_code(self, code_with_suffix):
"""去掉股票代码的 .SH 或 .SZ 后缀"""
if isinstance(code_with_suffix, str):
if '.' in code_with_suffix:
return code_with_suffix.split('.')[0]
return code_with_suffix
# 新增:处理 TreeView 双击事件
def on_tree_double_click(self, event):
"""处理 TreeView 的双击事件"""
item = self.tree.selection() # 获取选中的行
if item:
values = self.tree.item(item)['values'] # 获取该行的值
if values:
code = values[0] # 提取股票代码
print(code)
codes = self.get_pure_code(code)
self.place_order(codes,0) # 调用下单方法
# 新增:下单方法
def place_order(self, code, tag=0):
"""下单方法"""
try:
# 调用 OrderExecutor 的下单功能
self.order_executor.place_order(code, auto_push=tag) # auto_push=1 表示自动下单
print("下单成功", f"已成功下单股票代码: {code}")
except Exception as e:
print("下单失败", f"下单失败: {str(e)}")
def treeview_sort_column(self, col, reverse):
"""点击表头排序功能"""
# 获取当前所有行数据
l = [(self.tree.set(k, col), k) for k in self.tree.get_children('')]
# 特殊处理状态列排序
if col == 'status':
# 修改优先级顺序,已触发排最前,错买其次,监控中最后
priority_order = {'已触发': 0, '错买': 1, '监控中': 2}
l.sort(key=lambda t: priority_order.get(t[0], 3), reverse=reverse)
elif col == 'limit_up':
# 处理涨停列排序,'是' 排前面
l.sort(key=lambda t: (t[0] != ''), reverse=reverse)
else:
# 尝试转换为数字排序
try:
l.sort(key=lambda t: float(t[0].replace('%', '')), reverse=reverse)
except (ValueError, AttributeError):
l.sort(reverse=reverse)
# 重新排列Treeview中的行
for index, (val, k) in enumerate(l):
self.tree.move(k, '', index)
# 反转排序顺序
self.tree.heading(col, command=lambda: self.treeview_sort_column(col, not reverse))
def export_to_excel(self):
"""将Treeview数据导出到Excel文件"""
try:
# 检查Treeview是否为空
if not self.tree.get_children():
tk.messagebox.showwarning("导出警告", "当前没有可导出的数据!")
return
# 检查并创建目录
if not os.path.exists(self.huice_data_dir):
os.makedirs(self.huice_data_dir)
# 获取当前日期作为默认文件名
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.huice_data_dir}\短线速逆回测数据_{now}.xlsx"
# 获取Treeview所有数据
data = []
columns = self.tree["columns"]
data.append(columns) # 添加表头
for item in self.tree.get_children():
values = [self.tree.set(item, col) for col in columns]
data.append(values)
# 创建DataFrame并保存为Excel
df = pd.DataFrame(data[1:], columns=data[0])
df.to_excel(filename, index=False)
# 提示导出成功
tk.messagebox.showinfo("导出成功", f"数据已成功导出到: {filename}")
except Exception as e:
tk.messagebox.showerror("导出错误", f"导出失败: {str(e)}")
def format_code(self, code):
# 为股票代码添加后缀
code = f"{code:0>6}" # 确保代码是6位不足前面补零
if code.startswith(("6", "9")):
f_code = f"{code}.SH"
elif code.startswith(("0", "2", "3")):
f_code = f"{code}.SZ"
else:
print(f"未知的股票代码格式: {code}")
return None
return f_code
def toggle_topmost(self):
"""切换窗口置顶状态"""
self.root.attributes('-topmost', self.topmost_var.get())
if self.topmost_var.get():
status = "窗口已置顶"
else:
status = "取消窗口置顶"
# 获取当前时间
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] {status}")
# 读取历史数据内容, 写入 treeview
def load_data(self):
"""加载数据"""
self.trigger_count_label.config(text="数据加载中...")
year = self.year_var.get()
month = self.month_var.get().zfill(2) # 补零
date = self.date_var.get().zfill(2) # 补零
trade_date = f"{year}{month}{date}" # 格式化日期
file_name = f"{year}{month}{date}.txt"
# 修改原路径拼接方式
file_path = os.path.join(self.history_data_dir, f"{year}{month}{date}.txt")
# 检查文件是否存在
if not os.path.exists(file_path):
tk.messagebox.showwarning("文件不存在", f"未找到文件: {file_path}")
return
# 清空Treeview
for item in self.tree.get_children():
self.tree.delete(item)
try:
with open(file_path, 'r') as f:
# 读取标题行并验证列
headers = next(f).strip().split(',')
required_cols = ['条件选股', '代码']
for col in required_cols:
if col not in headers:
raise KeyError(f"缺少必要列:{col}")
# 获取列索引
name_col = headers.index('条件选股')
code_col = headers.index('代码')
# 批量获取历史数据
codes = []
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split(',')
if len(parts) <= max(name_col, code_col):
continue
code = parts[code_col].strip()
# 确保代码是字符串格式
if isinstance(code, float): # 如果code是浮点数
code = str(int(code)) if code.is_integer() else str(code)
code = f"{code:0>6}" # 标准化为6位代码
# 判断是否仅显示创业板
if self.gem_only_var.get():
if code.startswith("30"): # 创业板股票代码以 "30" 开头
codes.append(self.format_code(code))
else:
codes.append(self.format_code(code))
days = int(self.count_days) + 1 # 增加天数
# print(trade_date)
trade_dates = [None] * days # 初始化列表,用于存储未来的交易日
for day in range(0, days):
#print(day)
trade_dates[day] = self.get_next_trade_date(trade_date, day)
print(trade_dates)
# 获取历史收盘价--------------------------
import time
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
try:
df_all = self.pro.daily(
ts_code=','.join(codes),
start_date=min(trade_date, *trade_dates),
end_date=max(trade_date, *trade_dates)
)
break
except Exception as e:
if attempt == MAX_RETRIES - 1:
raise
time.sleep(2 ** attempt) # 指数退避
# print(df_all)
if df_all.empty:
raise ValueError("未找到历史数据,请确认:\n1.日期是否为交易日\n2.股票代码是否正确")
# 创建价格映射表包含T0和T1的价格数据
price_map = {}
for _, row in df_all.iterrows():
ts_code = row['ts_code']
trade_date = row['trade_date']
# 添加各天的数据
for day in range(1, days + 1):
if trade_date == trade_dates[day - 1]:
price_map.setdefault(ts_code, {})[f'T{day}_open'] = row['open']
price_map.setdefault(ts_code, {})[f'T{day}_close'] = row['close']
price_map.setdefault(ts_code, {})[f'T{day}_high'] = row['high']
price_map.setdefault(ts_code, {})[f'T{day}_low'] = row['low'] # 添加最低价
price_map.setdefault(ts_code, {})[f'T{day}_pct_chg'] = row['pct_chg'] # 当日涨幅
price_map.setdefault(ts_code, {})[f'T{day}_pre_close'] = row['pre_close'] # 前一日收盘价
# price_map.setdefault(ts_code, {})[f'T{day}_limit'] = row['limit_status'] # 假设 limit_status 表示涨停状态
print(price_map)
# 回到文件开头并处理数据
f.seek(0)
next(f) # 跳过标题行
for line in f:
line = line.strip()
# 计算涨停标记
if not line or line.startswith('#'):
continue
parts = line.split(',')
if len(parts) <= max(name_col, code_col):
raise ValueError(f"数据行字段不足:{line}")
name = parts[name_col].strip()
code = parts[code_col].strip()
# 确保代码保持字符串格式,保留前导零
code = f"{code:0>6}" # 确保代码是6位不足前面补零
f_code = self.format_code(code)
if not f_code: # 添加检查,如果格式不正确则跳过
print(f"跳过 {code},股票代码格式不正确")
continue
if f_code not in price_map or 'T1_close' not in price_map[f_code]:
print(f"跳过 {code},未找到收盘价数据")
continue
# 计算重要数据
T1_close = price_map[f_code]['T1_close']
T2_open = price_map[f_code]['T2_open']
T2_high = price_map[f_code]['T2_high']
T2_close = price_map[f_code]['T2_close']
# 计算目标价
target_pic = round(float(T1_close) * 1.049, 2)
kp_zf = round(float(T2_open) / float(T1_close) - 1, 4) * 100
status = self.check_status(target_pic, T2_high, T2_close)
lt_pan = 0 # 初始值,实际值会在异步线程中更新
profit_values = []
for day in range(2, days + 1):
if f'T{day}_close' in price_map.get(f_code, {}):
if day == 2:
day_close = price_map[f_code][f'T{day}_close']
else:
day_close = price_map[f_code][f'T{day}_high']
profit_pct = round((float(day_close) - float(target_pic)) / float(target_pic) * 100, 2)
profit_values.append(f"{profit_pct:.2f}%")
else:
profit_values.append("-")
limit_up_flag = False
for day in range(1, self.days_for_limit_up + 1):
limit_key = f'T{day}_limit'
if limit_key in price_map.get(f_code, {}) and price_map[f_code][limit_key] == 'U':
limit_up_flag = True
break
# 从缓存获取流通盘数据
self.add_stock(code, name, kp_zf, target_pic, lt_pan, status, profit_values, price_map)
# 总数
total_stocks = len(self.tree.get_children())
status_col = self.tree['columns'].index('status')
triggered_count = 0
for item in self.tree.get_children():
status = self.tree.item(item)['values'][status_col]
if status == '触发' or status == '错买':
triggered_count += 1
# 正确写法:
self.choose_frame.after(0, lambda: self.trigger_count_label.config(text=f"已触发股票数量:{triggered_count}/{total_stocks}"))
except Exception as e:
print(f"加载文件失败:{str(e)}", 'error')
self.trigger_count_label.config(text="加载失败")
def plot_monthly_data(self):
"""绘制所选月份每日股票数量的折线图"""
selected_year = self.year_var.get()
selected_month = self.month_var.get().zfill(2)
if not selected_year or not selected_month:
print("请选择年份和月份")
return
# 构建日期前缀 (格式: YYYYMM)
date_prefix = f"{selected_year}{selected_month}"
# 统计每天的数据量
daily_counts = {}
for filename in os.listdir(self.history_data_dir):
if filename.startswith(date_prefix) and filename.endswith('.txt'):
try:
# 提取日期部分 (格式: YYYYMMDD.txt)
day = filename[6:8] # 获取DD部分
date_key = f"{selected_year}-{selected_month}-{day}"
file_path = os.path.join(self.history_data_dir, filename)
# 统计文件行数(不包括标题行)
with open(file_path, 'r') as f:
lines = f.readlines()
count = len(lines) - 1 # 减去标题行
daily_counts[date_key] = count
except Exception as e:
print(f"处理文件 {filename} 时出错: {e}")
continue
if not daily_counts:
print("没有数据可供绘图")
return
# 清除之前的图表内容
self.ax.clear()
# 按照日期排序并提取天数作为x轴标签
sorted_full_dates = sorted(daily_counts.keys())
# 只保留“日”作为x轴标签
x_labels = [date.split('-')[2] for date in sorted_full_dates]
counts = [daily_counts[date] for date in sorted_full_dates]
# 使用整数索引作为x轴之后设置自定义标签
x_indices = list(range(len(sorted_full_dates)))
# 绘制折线图
self.ax.plot(x_indices, counts, marker='o', linestyle='-', color='blue')
self.ax.set_title(f'{selected_year}-{selected_month} 每日股票数量统计')
self.ax.set_xlabel('日期(日)')
self.ax.set_ylabel('股票数量')
self.ax.grid(True)
# 设置x轴刻度和标签
self.ax.set_xticks(x_indices)
self.ax.set_xticklabels(x_labels, rotation=45, ha='right')
# 更新画布
self.canvas.draw()
def check_status(self, target_pic, high_pic, close_pic):
"""检查股票状态"""
if high_pic > target_pic and close_pic >= target_pic:
return '触发'
if high_pic >= target_pic and close_pic < target_pic:
return '错买'
return '未触发'
def add_stock(self, code, name, open_zf, target, lt_pan=0, status='计算中', profit_values=None, price_map=None):
"""添加股票到监控列表"""
codes = self.format_code(str(code))
# 构建values列表包含基础列和多天获利数据
# 获取T1收盘价
if price_map is not None and codes in price_map and 'T1_close' in price_map[codes]:
# print(codes)
t1_close = price_map[codes]['T1_close']
t1_open = price_map[codes]['T1_open']
t1_pct_chg = price_map[codes]['T1_pct_chg']
t1_pre_close = price_map[codes]['T1_pre_close']
else:
t1_close = 0
print(f"跳过 {code}未找到T1收盘价数据")
# 计算S点日涨幅T2_open与T1_close的涨跌幅
if price_map is not None and codes in price_map and 'T1_open' in price_map[codes] and t1_close > 0:
day_zf = round(float(t1_pct_chg), 2)
else:
day_zf = 0
# 计算当日振幅T1_high与T1_low的差值相对于T0_close的百分比
if (price_map is not None and codes in price_map and
'T1_high' in price_map[codes] and 'T1_low' in price_map[codes] and t1_close > 0):
day_zz = round(
(float(price_map[codes]['T1_high']) - float(price_map[codes]['T1_low'])) / float(t1_pre_close) * 100, 2)
else:
day_zz = 0
values = [
codes,
name,
f"{open_zf:.2f}%",
f"{day_zf:.2f}%", # 当日涨幅
f"{day_zz:.2f}%", # 当日振幅
f"{lt_pan:.2f}",
target,
status
]
# 添加获利数据
if profit_values:
values.extend(profit_values)
else:
# 如果没有提供profit_values添加默认值
values.extend(["-"] * self.count_days)
item = self.tree.insert('', 'end', values=values)
# 根据状态设置标签样式
if status == '触发':
self.tree.item(item, tags=('triggered',))
elif status == '错买':
self.tree.item(item, tags=('wrong_buy',))
# 以下是原有函数改为类方法
def treeview_sort_column(self, tv, col, reverse):
"""对 Treeview 的列进行排序"""
data = [(tv.set(k, col), k) for k in tv.get_children("")]
try:
data.sort(key=lambda t: float(t[0].rstrip("%")), reverse=reverse)
except ValueError:
data.sort(reverse=reverse)
for index, (val, k) in enumerate(data):
tv.move(k, "", index)
tv.heading(col, command=lambda: self.treeview_sort_column(tv, col, not reverse))
def get_next_trade_date(self, date, n=1) -> str:
"""获取n个交易日后的日期"""
try:
# 标准化输入日期格式
clean_date = date.replace("-", "")
if len(clean_date) != 8:
raise ValueError("日期格式应为YYYYMMDD")
# 检查缓存中是否已有该日期的交易日历
if clean_date not in self.trade_cal_cache:
# 从API获取交易日历(获取前后30天的数据以确保覆盖)
start_date = (datetime.datetime.strptime(clean_date, "%Y%m%d") -
datetime.timedelta(days=30)).strftime("%Y%m%d")
end_date = (datetime.datetime.strptime(clean_date, "%Y%m%d") +
datetime.timedelta(days=30)).strftime("%Y%m%d")
trade_cal = self.pro.trade_cal(exchange='', start_date=start_date, end_date=end_date)
# 只保留交易日
trade_dates = trade_cal[trade_cal['is_open'] == 1]['cal_date'].tolist()
self.trade_cal_cache[clean_date] = sorted(trade_dates)
trade_dates = self.trade_cal_cache[clean_date]
current_index = trade_dates.index(clean_date)
if current_index + n >= len(trade_dates):
# 如果超出范围,返回计算的工作日日期
future_date = (datetime.datetime.strptime(clean_date, "%Y%m%d") +
datetime.timedelta(days=n)).strftime("%Y%m%d")
return future_date
return trade_dates[current_index + n]
except Exception as e:
print(f"获取交易日历时出错: {e}")
# 如果API查询失败返回计算的工作日日期
try:
future_date = (datetime.datetime.strptime(clean_date, "%Y%m%d") +
datetime.timedelta(days=n)).strftime("%Y%m%d")
return future_date
except:
return None
def showdate(self):
next_trade_date = self.get_next_trade_date("20250516", 1) # 获取20240101的下一个交易日
print(f"下一个交易日是: {next_trade_date}")
print(self.trade_cal_cache)
def update_months(self):
"""更新月份"""
selected_year = self.year_var.get()
months = set()
# 遍历历史数据目录
for filename in os.listdir(self.history_data_dir):
if filename.startswith(selected_year) and filename.endswith('.txt'):
try:
# 提取月份部分 (格式: YYYYMMDD.txt)
month = filename[4:6] # 获取MM部分
months.add(month)
except (IndexError, ValueError):
continue
# 将月份排序并更新下拉菜单
sorted_months = sorted(months)
month_menu = self.choose_frame.children['!combobox2'] # 获取月份下拉框
month_menu['values'] = sorted_months
# 如果有月份数据,默认选择第一个
if sorted_months:
self.month_var.set(sorted_months[0])
self.update_dates() # 自动更新日期
def update_dates(self):
"""更新日期"""
selected_year = self.year_var.get()
selected_month = self.month_var.get().zfill(2)
if not 1 <= int(selected_month) <= 12:
tk.messagebox.showerror("错误", "无效的月份")
return
dates = []
# 构建日期前缀 (格式: YYYYMM)
date_prefix = f"{selected_year}{selected_month}"
# 遍历历史数据目录
for filename in os.listdir(self.history_data_dir):
if filename.startswith(date_prefix) and filename.endswith('.txt'):
try:
# 提取日期部分 (格式: YYYYMMDD.txt)
day = filename[6:8] # 获取DD部分
dates.append(day)
except (IndexError, ValueError):
continue
# 将日期排序并更新下拉菜单
sorted_dates = sorted(dates)
date_menu = self.choose_frame.children['!combobox3'] # 获取日期下拉框
date_menu['values'] = sorted_dates
# 如果有日期数据,默认选择第一个
if sorted_dates:
self.date_var.set(sorted_dates[0])
class DataDownloaderApp:
def __init__(self):
# self.downloader = DataDownloader()
self.root = tk.Tk()
self.running_threads = []
self.setup_ui()
def setup_ui(self):
# 添加UI设置代码
self.root.title("数据下载器")
# 创建复选框
self.update_codes_var = tk.BooleanVar()
self.update_index_var = tk.BooleanVar()
self.update_stocks_var = tk.BooleanVar()
tk.Checkbutton(self.root, text="更新股票代码表", variable=self.update_codes_var).pack(anchor=tk.W)
tk.Checkbutton(self.root, text="更新指数数据", variable=self.update_index_var).pack(anchor=tk.W)
tk.Checkbutton(self.root, text="更新个股数据", variable=self.update_stocks_var).pack(anchor=tk.W)
# 创建按钮
tk.Button(self.root, text="开始更新", command=self.on_update_button_click).pack(pady=10)
# 创建状态标签
self.codes_completion_label = tk.Label(self.root, text="")
self.codes_completion_label.pack()
self.index_completion_label = tk.Label(self.root, text="")
self.index_completion_label.pack()
self.stocks_completion_label = tk.Label(self.root, text="")
self.stocks_completion_label.pack()
def on_update_button_click(self):
# 添加按钮点击事件处理代码
if self.update_codes_var.get():
thread = threading.Thread(target=self.update_codes)
thread.start()
self.running_threads.append(thread)
if self.update_index_var.get():
thread = threading.Thread(target=self.update_index)
thread.start()
self.running_threads.append(thread)
if self.update_stocks_var.get():
thread = threading.Thread(target=self.update_stocks)
thread.start()
self.running_threads.append(thread)
def update_codes(self):
# 添加更新股票代码表的代码
self.codes_completion_label.config(text="正在更新股票代码表...")
self.downloader.update_codes()
self.codes_completion_label.config(text="股票代码表更新完成!")
def update_index(self):
# 添加更新指数数据的代码
self.index_completion_label.config(text="正在更新指数数据...")
self.downloader.update_index()
self.index_completion_label.config(text="指数数据更新完成!")
def update_stocks(self):
# 添加更新个股数据的代码
self.stocks_completion_label.config(text="正在更新个股数据...")
self.downloader.update_stocks()
self.stocks_completion_label.config(text="个股数据更新完成!")
# 运行应用
if __name__ == "__main__":
root = tk.Tk()
app = StockAnalysisApp(root)
root.mainloop()