refactor(market_data): 优化交易日和停牌状态检查逻辑

- 精简导入,移除未使用的functools.lru_cache依赖
- 新增is_trading_day函数,基于缓存交易日期集合快速判断交易日
- 优化get_latest_trade_date函数,避免整文件读取,改为逐行读取首条有效数据
- 简化calculate_trading_days_diff日期处理,改用字符串直接比较日期顺序
- 重构check_stock_suspended函数,统一处理单日和区间停牌数据格式并改进日志记录
- 统一输出字段命名,替换中文字段为英文is_suspended和suspend_dates
- 规范日志和打印输出,修正检查文件数和检查日期显示
- 优化输出CSV字段顺序和内容,保证数据一致性和清晰度
This commit is contained in:
2025-12-17 11:48:24 +08:00
parent 9b8096cc77
commit 28537e9260
3 changed files with 90 additions and 65 deletions

View File

@@ -1,4 +1,3 @@
import os
import csv
import datetime
import logging
@@ -7,7 +6,6 @@ from log_style import setup_logger
import pandas as pd
import tushare as ts
import time
from functools import lru_cache
# -------------------------- 配置参数区域 --------------------------
class Config:
@@ -99,7 +97,7 @@ tushare_manager = TushareManager(Config.TUSHARE_TOKENS)
# 全局变量,用于缓存交易日历
trade_calendar_cache = None
trade_calendar_dates = None
trade_calendar_dates = None # 交易日期集合,用于快速查询
def get_trade_calendar():
"""
@@ -138,6 +136,24 @@ def get_trade_calendar():
logger.error(f"获取交易日历失败: {str(e)}")
return None
def is_trading_day(check_date):
"""
检查指定日期是否为交易日
:param check_date: 检查日期格式YYYYMMDD
:return: True表示是交易日False表示不是None表示查询失败
"""
global trade_calendar_dates
# 如果还没有缓存交易日历,先获取
if trade_calendar_dates is None:
calendar_df = get_trade_calendar()
if calendar_df is None:
return None
# 使用集合快速查询
return check_date in trade_calendar_dates
def get_latest_trade_date(file_path):
"""
从txt文件中获取最新的交易日期
@@ -148,13 +164,13 @@ def get_latest_trade_date(file_path):
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
if len(lines) < 2: # 至少需要有表头和一行数据
# 优化:只读取前几行,不读取整个文件
header_line = f.readline().strip()
if not header_line:
logger.warning(f"文件 {file_path} 内容不足")
return None
# 解析表头找到trade_date列的索引
header_line = lines[0].strip()
headers = header_line.split('\t')
if 'trade_date' not in headers:
logger.warning(f"文件 {file_path} 缺少trade_date列")
@@ -162,14 +178,15 @@ def get_latest_trade_date(file_path):
trade_date_idx = headers.index('trade_date')
# 跳过空行,从文件开头查找第一行有效数据(最新日期)
for i in range(1, len(lines)): # 从第二行开始查找,跳过表头行(0)
line = lines[i].strip()
for line in f:
line = line.strip()
if line: # 找到非空行
columns = line.split('\t')
if len(columns) <= trade_date_idx: # 确保有足够的列
logger.warning(f"文件 {file_path} 数据格式不正确")
return None
return columns[trade_date_idx]
# 如果没有找到有效数据行
logger.warning(f"文件 {file_path} 无有效数据行")
return None
@@ -185,13 +202,8 @@ def calculate_trading_days_diff(start_date, end_date):
:return: 交易日数量如果计算失败返回None
"""
try:
# 确保日期格式正确
start = datetime.datetime.strptime(start_date, '%Y%m%d')
end = datetime.datetime.strptime(end_date, '%Y%m%d')
# 如果开始日期大于结束日期,交换
if start > end:
start, end = end, start
# 确保日期格式正确并交换(如果需要)
if start_date > end_date:
start_date, end_date = end_date, start_date
# 获取缓存的交易日历
@@ -199,6 +211,8 @@ def calculate_trading_days_diff(start_date, end_date):
if calendar_df is None:
# 如果获取交易日历失败,使用简单的日期差作为近似值
start = datetime.datetime.strptime(start_date, '%Y%m%d')
end = datetime.datetime.strptime(end_date, '%Y%m%d')
days_diff = (end - start).days
logger.warning(f"无法获取交易日历,使用自然日差近似:{days_diff}")
return days_diff
@@ -287,17 +301,17 @@ def check_stock_suspended(ts_code, check_date):
# 检查指定日期是否在任何停牌期间内
for _, row in suspend_df.iterrows():
# 检查是否是单日停牌数据格式包含trade_date和suspend_type
# 处理单日停牌数据格式包含trade_date和suspend_type
if 'trade_date' in row and 'suspend_type' in row:
trade_date = row['trade_date']
suspend_type = row['suspend_type']
# 更新最近的停牌日期
if latest_suspend_start is None or trade_date > latest_suspend_start:
latest_suspend_start = trade_date
# 如果suspend_type为'S',表示该日停牌
if suspend_type == 'S':
# 更新最近的停牌日期(仅在实际停牌时更新)
if latest_suspend_start is None or trade_date > latest_suspend_start:
latest_suspend_start = trade_date
# 保存停牌日期
suspend_dates_list.append(trade_date)
@@ -308,38 +322,38 @@ def check_stock_suspended(ts_code, check_date):
continue
# 处理传统的停牌数据格式包含suspend_date和resume_date
if 'suspend_date' not in row or 'resume_date' not in row:
# 检查是否是有效数据可能是API返回的其他格式
if 'ts_code' in row and 'trade_date' in row:
# 这是有效的单日停牌数据,只是字段名称不同
trade_date = row['trade_date']
logger.info(f"股票 {ts_code}{trade_date} 处于停牌状态")
if trade_date == check_date:
is_suspended = True
suspend_dates_list.append(trade_date)
latest_suspend_start = trade_date
else:
logger.warning(f"停牌数据缺少必要字段: {row}")
continue
if 'suspend_date' in row and 'resume_date' in row:
suspend_start = row['suspend_date']
suspend_end = row['resume_date']
suspend_start = row['suspend_date']
suspend_end = row['resume_date']
# 更新最近的停牌开始日期
if latest_suspend_start is None or suspend_start > latest_suspend_start:
latest_suspend_start = suspend_start
# 如果恢复日期为None或00000000表示尚未复牌
if not suspend_end or suspend_end == '00000000':
suspend_end = end_date # 使用查询结束日期
# 保存停牌日期范围
suspend_dates_list.append(f"{suspend_start}-{suspend_end}")
# 检查日期是否在停牌期间内
if suspend_start <= check_date <= suspend_end:
logger.info(f"股票 {ts_code}{check_date} 处于停牌状态({suspend_start}{suspend_end}")
is_suspended = True
# 更新最近的停牌开始日期
if latest_suspend_start is None or suspend_start > latest_suspend_start:
latest_suspend_start = suspend_start
# 如果恢复日期为None或00000000表示尚未复牌
if not suspend_end or suspend_end == '00000000':
suspend_end = end_date # 使用查询结束日期
# 保存停牌日期范围
suspend_dates_list.append(f"{suspend_start}-{suspend_end}")
# 检查日期是否在停牌期间内
if suspend_start <= check_date <= suspend_end:
logger.info(f"股票 {ts_code}{check_date} 处于停牌状态({suspend_start}{suspend_end}")
is_suspended = True
elif 'ts_code' in row and 'trade_date' in row:
# 处理其他格式的单日停牌数据
trade_date = row['trade_date']
if latest_suspend_start is None or trade_date > latest_suspend_start:
latest_suspend_start = trade_date
suspend_dates_list.append(trade_date)
logger.info(f"股票 {ts_code}{trade_date} 处于停牌状态")
if trade_date == check_date:
is_suspended = True
else:
logger.warning(f"停牌数据缺少必要字段: {row}")
# 合并停牌日期范围
suspend_dates = ", ".join(suspend_dates_list)
@@ -492,8 +506,8 @@ def check_market_data(online_check=Config.DEFAULT_ONLINE_CHECK):
'trading_days_diff': 'N/A',
'online_data_exists': 'N/A',
'status': '文件内容异常',
'是否在停牌状态': 'N/A',
'停牌的日期': 'N/A'
'is_suspended': 'N/A',
'suspend_dates': 'N/A'
})
elif latest_date != check_date:
# 计算交易日差
@@ -515,11 +529,10 @@ def check_market_data(online_check=Config.DEFAULT_ONLINE_CHECK):
'latest_date': latest_date,
'trading_days_diff': trading_days_diff or 'N/A',
'online_data_exists': '' if online_data_exists else '' if online_data_exists is False else '未检查',
'status': status
})
# 移除单个文件的完整日志
'status': status,
'is_suspended': 'N/A', # 先设为N/A后续会更新
'suspend_dates': 'N/A' # 先设为N/A后续会更新
})
# 更新进度
completed += 1
progress = (completed / total) * 100
@@ -555,8 +568,8 @@ def check_market_data(online_check=Config.DEFAULT_ONLINE_CHECK):
is_suspended, suspend_dates, latest_suspend_start = check_stock_suspended(ts_code, check_date)
# 更新文件信息
file_info['是否在停牌状态'] = '' if is_suspended else '' if is_suspended is not None else '检查失败'
file_info['停牌的日期'] = suspend_dates if suspend_dates else ''
file_info['is_suspended'] = '' if is_suspended else '' if is_suspended is not None else '检查失败'
file_info['suspend_dates'] = suspend_dates if suspend_dates else ''
if is_suspended is True:
logger.info(f"股票 {ts_code} 当前处于停牌状态")
@@ -586,22 +599,21 @@ def check_market_data(online_check=Config.DEFAULT_ONLINE_CHECK):
# 输出结果到CSV文件
output_file = Config.OUTPUT_FILE
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['file_name', 'ts_code', 'latest_date', 'trading_days_diff', 'online_data_exists', 'status', '是否在停牌状态', '停牌的日期']
fieldnames = ['file_name', 'ts_code', 'latest_date', 'trading_days_diff', 'online_data_exists', 'status', 'is_suspended', 'suspend_dates']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for file_info in incomplete_files:
writer.writerow(file_info)
total_files = len(list(data_dir.glob('*.txt')))
logger.info(f"检查完成,共检查 {total_files} 个文件")
logger.info(f"检查完成,共检查 {total} 个文件")
logger.info(f"发现 {len(incomplete_files)} 个未更新到最新的数据文件")
logger.info(f"检查结果已输出到:{output_file}")
# 打印总结
print(f"\n=== 行情数据检查结果 ===")
print(f"检查日期:{today}")
print(f"检查文件总数:{total_files}")
print(f"检查日期:{check_date}")
print(f"检查文件总数:{total}")
print(f"未更新到最新的文件数:{len(incomplete_files)}")
print(f"在线检查功能:{'开启' if online_check else '关闭'}")
print(f"检查结果已保存到:{output_file}")