修正了检查行情出现的各种问题。修正了交易日期的问题,

This commit is contained in:
2026-01-17 23:36:37 +08:00
parent 6e9f6a139a
commit 017bdf2b47
14 changed files with 677 additions and 370 deletions

View File

@@ -154,7 +154,41 @@ def is_trading_day(check_date):
return check_date in trade_calendar_dates
def get_previous_trading_day(target_date):
"""
获取指定日期之前的最近一个交易日
:param target_date: 指定日期格式YYYYMMDD
:return: 最近的前一个交易日格式YYYYMMDD如果查询失败返回None
"""
try:
# 确保交易日历已加载
calendar_df = get_trade_calendar()
if calendar_df is None:
logger.error("无法获取交易日历,无法计算前一个交易日")
return None
# 将目标日期转换为datetime对象
target_dt = datetime.datetime.strptime(target_date, '%Y%m%d')
# 筛选出目标日期之前的所有交易日
trading_days = calendar_df[(calendar_df['is_open'] == 1) & (calendar_df['cal_date'] < target_date)]
if trading_days.empty:
logger.warning(f"在目标日期 {target_date} 之前未找到交易日")
return None
# 按日期降序排序,取第一个(最近的)
previous_trading_day = trading_days.sort_values('cal_date', ascending=False).iloc[0]['cal_date']
logger.info(f"目标日期 {target_date} 的前一个交易日是 {previous_trading_day}")
return previous_trading_day
except Exception as e:
logger.error(f"获取前一个交易日失败: {str(e)}")
return None
def get_latest_trade_date(file_path):
"""
从txt文件中获取最新的交易日期
注意:现在数据文件按日期降序保存,最新的交易日期在文件第一行数据(跳过表头)
@@ -289,6 +323,13 @@ def check_stock_suspended(ts_code, check_date):
start_date = (check_dt - datetime.timedelta(days=30)).strftime('%Y%m%d')
end_date = (check_dt + datetime.timedelta(days=30)).strftime('%Y%m%d')
# 计算今天的日期格式YYYYMMDD
today = datetime.datetime.now().strftime('%Y%m%d')
# 确保结束日期不超过今天
if end_date > today:
end_date = today
# 获取停牌信息
suspend_df = get_suspend_info(ts_code, start_date, end_date)
@@ -488,7 +529,14 @@ def check_market_data(online_check=Config.DEFAULT_ONLINE_CHECK):
if is_trading_day(check_date):
logger.info(f"检查日期 {check_date} 是交易日")
else:
logger.warning(f"检查日期 {check_date} 不是交易日,可能所有文件都未更新")
logger.warning(f"检查日期 {check_date} 不是交易日,正在寻找最近的前一个交易日...")
# 寻找最近的前一个交易日
previous_trading_day = get_previous_trading_day(check_date)
if previous_trading_day:
check_date = previous_trading_day
logger.info(f"已将检查日期调整为:{check_date}")
else:
logger.error("无法找到合适的检查日期,将使用原检查日期")
# 获取所有txt文件列表
all_files = list(data_dir.glob('*.txt'))
@@ -618,12 +666,23 @@ def check_market_data(online_check=Config.DEFAULT_ONLINE_CHECK):
# 输出结果到CSV文件
output_file = Config.OUTPUT_FILE
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['file_name', 'ts_code', 'latest_date', 'trading_days_diff', 'online_data_exists', 'status', 'is_suspended', 'suspend_dates']
fieldnames = ['文件名称', '股票代码', '最新日期', '交易日差', '在线数据', '状态', '是否停牌', '停牌日期']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for file_info in incomplete_files:
writer.writerow(file_info)
# 构建中文标题对应的字典
chinese_file_info = {
'文件名称': file_info['file_name'],
'股票代码': file_info['ts_code'],
'最新日期': file_info['latest_date'],
'交易日差': file_info['trading_days_diff'],
'在线数据': file_info['online_data_exists'],
'状态': file_info['status'],
'是否停牌': file_info['is_suspended'],
'停牌日期': file_info['suspend_dates']
}
writer.writerow(chinese_file_info)
logger.info(f"检查完成,共检查 {total} 个文件")
logger.info(f"发现 {len(incomplete_files)} 个未更新到最新的数据文件")