652 lines
27 KiB
Python
652 lines
27 KiB
Python
import csv
|
||
import datetime
|
||
import logging
|
||
from pathlib import Path
|
||
from log_style import setup_logger
|
||
import pandas as pd
|
||
import tushare as ts
|
||
import time
|
||
|
||
# -------------------------- 配置参数区域 --------------------------
|
||
class Config:
|
||
"""配置参数类,集中管理所有配置"""
|
||
# 日志配置
|
||
LOG_NAME = 'market_data_check'
|
||
LOG_FILE = 'market_data_check.log'
|
||
LOG_LEVEL = logging.INFO
|
||
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
||
LOG_CONSOLE = True
|
||
LOG_DIR = '.'
|
||
LOG_BACKUP_COUNT = 3
|
||
|
||
# Tushare配置
|
||
TUSHARE_TOKENS = [
|
||
'9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d' # 主账户
|
||
]
|
||
|
||
# API请求频率控制
|
||
MAX_REQUESTS_PER_MINUTE = 500
|
||
|
||
# 数据目录配置
|
||
DATA_DIR = Path(r'D:\gp_data\day')
|
||
|
||
# 交易日历配置
|
||
TRADE_CALENDAR_START_YEARS = 2 # 过去2年
|
||
TRADE_CALENDAR_END_MONTHS = 1 # 未来1个月
|
||
|
||
# 输出文件配置
|
||
OUTPUT_FILE = Path('market_data_check_result.csv')
|
||
|
||
# 默认参数配置
|
||
DEFAULT_ONLINE_CHECK = False
|
||
|
||
# 配置日志
|
||
logger = setup_logger(
|
||
name=Config.LOG_NAME,
|
||
log_file=Config.LOG_FILE,
|
||
level=Config.LOG_LEVEL,
|
||
log_format=Config.LOG_FORMAT,
|
||
console=Config.LOG_CONSOLE,
|
||
log_dir=Config.LOG_DIR,
|
||
backup_count=Config.LOG_BACKUP_COUNT
|
||
)
|
||
logger.propagate = False # 避免日志消息向上传递到父记录器,防止重复输出
|
||
|
||
class TushareManager:
|
||
"""Tushare API管理类,处理账户轮询和请求频率控制"""
|
||
def __init__(self, tokens):
|
||
self.tokens = tokens
|
||
self.current_token_index = 0
|
||
self.last_request_time = time.time()
|
||
self.request_count = 0
|
||
|
||
def get_pro_api(self):
|
||
"""获取Tushare API实例,自动处理账户轮询"""
|
||
# 简单的账户轮询
|
||
token = self.tokens[self.current_token_index]
|
||
self.current_token_index = (self.current_token_index + 1) % len(self.tokens)
|
||
return ts.pro_api(token)
|
||
|
||
def control_request_rate(self):
|
||
"""控制请求频率,确保不超过API限制"""
|
||
current_time = time.time()
|
||
time_since_last_request = current_time - self.last_request_time
|
||
|
||
# 如果超过1分钟,重置计数器
|
||
if time_since_last_request > 60:
|
||
self.request_count = 0
|
||
self.last_request_time = current_time
|
||
|
||
# 如果请求次数超过限制,等待
|
||
if self.request_count >= Config.MAX_REQUESTS_PER_MINUTE:
|
||
wait_time = 60 - time_since_last_request + 1
|
||
logger.info(f"请求频率过高,等待 {wait_time:.1f} 秒")
|
||
time.sleep(wait_time)
|
||
self.request_count = 0
|
||
self.last_request_time = time.time()
|
||
|
||
# 简单的速率限制(每0.1秒一个请求)
|
||
if time_since_last_request < 0.1:
|
||
time.sleep(0.1 - time_since_last_request)
|
||
|
||
self.request_count += 1
|
||
self.last_request_time = current_time
|
||
|
||
# 创建Tushare管理器实例
|
||
tushare_manager = TushareManager(Config.TUSHARE_TOKENS)
|
||
|
||
# 全局变量,用于缓存交易日历
|
||
trade_calendar_cache = None
|
||
trade_calendar_dates = None # 交易日期集合,用于快速查询
|
||
|
||
def get_trade_calendar():
|
||
"""
|
||
一次性获取较大范围的交易日历并缓存到内存
|
||
:return: 交易日历DataFrame
|
||
"""
|
||
global trade_calendar_cache, trade_calendar_dates
|
||
|
||
# 如果已经缓存,直接返回
|
||
if trade_calendar_cache is not None:
|
||
return trade_calendar_cache
|
||
|
||
try:
|
||
# 计算日期范围:过去N年到未来M个月
|
||
today = datetime.datetime.now()
|
||
start_date = (today - datetime.timedelta(days=365 * Config.TRADE_CALENDAR_START_YEARS)).strftime('%Y%m%d')
|
||
end_date = (today + datetime.timedelta(days=30 * Config.TRADE_CALENDAR_END_MONTHS)).strftime('%Y%m%d')
|
||
|
||
pro = tushare_manager.get_pro_api()
|
||
tushare_manager.control_request_rate()
|
||
|
||
df = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date)
|
||
if df is None or df.empty:
|
||
logger.warning(f"未获取到{start_date}至{end_date}的交易日历")
|
||
return None
|
||
|
||
# 缓存结果
|
||
trade_calendar_cache = df
|
||
|
||
# 同时创建一个日期集合,方便快速查询
|
||
trade_calendar_dates = set(df[df['is_open'] == 1]['cal_date'].tolist())
|
||
|
||
logger.info(f"成功获取并缓存交易日历: {start_date}至{end_date}")
|
||
return df
|
||
except Exception as e:
|
||
logger.error(f"获取交易日历失败: {str(e)}")
|
||
return None
|
||
|
||
def is_trading_day(check_date):
|
||
"""
|
||
检查指定日期是否为交易日
|
||
:param check_date: 检查日期,格式YYYYMMDD
|
||
:return: True表示是交易日,False表示不是,None表示查询失败
|
||
"""
|
||
global trade_calendar_dates
|
||
|
||
# 如果还没有缓存交易日历,先获取
|
||
if trade_calendar_dates is None:
|
||
calendar_df = get_trade_calendar()
|
||
if calendar_df is None:
|
||
return None
|
||
|
||
# 使用集合快速查询
|
||
return check_date in trade_calendar_dates
|
||
|
||
|
||
def get_latest_trade_date(file_path):
|
||
"""
|
||
从txt文件中获取最新的交易日期
|
||
注意:现在数据文件按日期降序保存,最新的交易日期在文件第一行数据(跳过表头)
|
||
|
||
:param file_path: 文件路径
|
||
:return: 最新交易日期字符串,如'20251204',如果文件为空返回None
|
||
"""
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
# 优化:只读取前几行,不读取整个文件
|
||
header_line = f.readline().strip()
|
||
if not header_line:
|
||
logger.warning(f"文件 {file_path} 内容不足")
|
||
return None
|
||
|
||
# 解析表头,找到trade_date列的索引
|
||
headers = header_line.split('\t')
|
||
if 'trade_date' not in headers:
|
||
logger.warning(f"文件 {file_path} 缺少trade_date列")
|
||
return None
|
||
trade_date_idx = headers.index('trade_date')
|
||
|
||
# 跳过空行,从文件开头查找第一行有效数据(最新日期)
|
||
for line in f:
|
||
line = line.strip()
|
||
if line: # 找到非空行
|
||
columns = line.split('\t')
|
||
if len(columns) <= trade_date_idx: # 确保有足够的列
|
||
logger.warning(f"文件 {file_path} 数据格式不正确")
|
||
return None
|
||
return columns[trade_date_idx]
|
||
|
||
# 如果没有找到有效数据行
|
||
logger.warning(f"文件 {file_path} 无有效数据行")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"读取文件 {file_path} 时出错: {str(e)}")
|
||
return None
|
||
|
||
def calculate_trading_days_diff(start_date, end_date):
|
||
"""
|
||
计算两个日期之间的交易日数量
|
||
:param start_date: 开始日期,格式YYYYMMDD
|
||
:param end_date: 结束日期,格式YYYYMMDD
|
||
:return: 交易日数量,如果计算失败返回None
|
||
"""
|
||
try:
|
||
# 确保日期格式正确并交换(如果需要)
|
||
if start_date > end_date:
|
||
start_date, end_date = end_date, start_date
|
||
|
||
# 获取缓存的交易日历
|
||
calendar_df = get_trade_calendar()
|
||
|
||
if calendar_df is None:
|
||
# 如果获取交易日历失败,使用简单的日期差作为近似值
|
||
start = datetime.datetime.strptime(start_date, '%Y%m%d')
|
||
end = datetime.datetime.strptime(end_date, '%Y%m%d')
|
||
days_diff = (end - start).days
|
||
logger.warning(f"无法获取交易日历,使用自然日差近似:{days_diff}天")
|
||
return days_diff
|
||
|
||
# 筛选出指定日期范围内的交易日
|
||
mask = (calendar_df['cal_date'] >= start_date) & (calendar_df['cal_date'] <= end_date) & (calendar_df['is_open'] == 1)
|
||
trading_days_count = calendar_df[mask]['cal_date'].count()
|
||
|
||
return trading_days_count
|
||
except Exception as e:
|
||
logger.error(f"计算交易日差失败: {str(e)}")
|
||
return None
|
||
|
||
def check_online_data_exists(ts_code, trade_date):
|
||
"""
|
||
检查在线数据是否存在
|
||
:param ts_code: 股票代码,如'688800.SH'
|
||
:param trade_date: 交易日期,格式YYYYMMDD
|
||
:return: True表示数据存在,False表示不存在,None表示查询失败
|
||
"""
|
||
try:
|
||
pro = tushare_manager.get_pro_api()
|
||
tushare_manager.control_request_rate()
|
||
|
||
# 查询指定日期的交易数据
|
||
df = pro.daily(ts_code=ts_code, trade_date=trade_date)
|
||
|
||
if df is None or df.empty:
|
||
logger.info(f"在线数据中未找到 {ts_code} {trade_date} 的交易数据")
|
||
return False
|
||
else:
|
||
logger.info(f"在线数据中找到 {ts_code} {trade_date} 的交易数据")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"查询在线数据失败 {ts_code} {trade_date}: {str(e)}")
|
||
return None
|
||
|
||
def get_suspend_info(ts_code, start_date, end_date):
|
||
"""
|
||
获取股票停牌信息
|
||
:param ts_code: 股票代码,如'688800.SH'
|
||
:param start_date: 开始日期,格式YYYYMMDD
|
||
:param end_date: 结束日期,格式YYYYMMDD
|
||
:return: 停牌信息DataFrame,如果查询失败返回None
|
||
"""
|
||
try:
|
||
pro = tushare_manager.get_pro_api()
|
||
tushare_manager.control_request_rate()
|
||
|
||
# 查询指定日期范围内的停牌数据
|
||
df = pro.suspend_d(ts_code=ts_code, start_date=start_date, end_date=end_date)
|
||
|
||
if df is None or df.empty:
|
||
logger.info(f"未找到 {ts_code} 在 {start_date} 至 {end_date} 期间的停牌数据")
|
||
return None
|
||
else:
|
||
logger.info(f"找到 {ts_code} 在 {start_date} 至 {end_date} 期间的停牌数据: {len(df)}条")
|
||
return df
|
||
except Exception as e:
|
||
logger.error(f"查询停牌信息失败 {ts_code} {start_date}-{end_date}: {str(e)}")
|
||
return None
|
||
|
||
def check_stock_suspended(ts_code, check_date):
|
||
"""
|
||
检查股票在指定日期是否停牌
|
||
:param ts_code: 股票代码,如'688800.SH'
|
||
:param check_date: 检查日期,格式YYYYMMDD
|
||
:return: (is_suspended, suspend_dates, earliest_suspend_start) -
|
||
is_suspended表示是否停牌,suspend_dates表示停牌日期范围,earliest_suspend_start表示最早的停牌开始日期
|
||
"""
|
||
try:
|
||
# 为了确保获取完整的停牌信息,查询范围扩大1个月
|
||
check_dt = datetime.datetime.strptime(check_date, '%Y%m%d')
|
||
start_date = (check_dt - datetime.timedelta(days=30)).strftime('%Y%m%d')
|
||
end_date = (check_dt + datetime.timedelta(days=30)).strftime('%Y%m%d')
|
||
|
||
# 获取停牌信息
|
||
suspend_df = get_suspend_info(ts_code, start_date, end_date)
|
||
|
||
if suspend_df is None:
|
||
return False, "", None # 未找到停牌数据,假设未停牌
|
||
|
||
suspend_dates_list = []
|
||
is_suspended = False
|
||
earliest_suspend_start = None # 改为最早的停牌开始日期
|
||
|
||
# 检查指定日期是否在任何停牌期间内
|
||
for _, row in suspend_df.iterrows():
|
||
# 处理单日停牌数据格式(包含trade_date和suspend_type)
|
||
if 'trade_date' in row and 'suspend_type' in row:
|
||
trade_date = row['trade_date']
|
||
suspend_type = row['suspend_type']
|
||
|
||
# 如果suspend_type为'S',表示该日停牌
|
||
if suspend_type == 'S':
|
||
# 更新最早的停牌日期(取最小值)
|
||
if earliest_suspend_start is None or trade_date < earliest_suspend_start:
|
||
earliest_suspend_start = trade_date
|
||
|
||
# 保存停牌日期
|
||
suspend_dates_list.append(trade_date)
|
||
|
||
# 检查当前检查日期是否就是停牌日期
|
||
if trade_date == check_date:
|
||
logger.info(f"股票 {ts_code} 在 {check_date} 处于停牌状态")
|
||
is_suspended = True
|
||
continue
|
||
|
||
# 处理传统的停牌数据格式(包含suspend_date和resume_date)
|
||
if 'suspend_date' in row and 'resume_date' in row:
|
||
suspend_start = row['suspend_date']
|
||
suspend_end = row['resume_date']
|
||
|
||
# 更新最早的停牌开始日期(取最小值)
|
||
if earliest_suspend_start is None or suspend_start < earliest_suspend_start:
|
||
earliest_suspend_start = suspend_start
|
||
|
||
# 如果恢复日期为None或00000000,表示尚未复牌
|
||
if not suspend_end or suspend_end == '00000000':
|
||
suspend_end = end_date # 使用查询结束日期
|
||
|
||
# 保存停牌日期范围
|
||
suspend_dates_list.append(f"{suspend_start}-{suspend_end}")
|
||
|
||
# 检查日期是否在停牌期间内
|
||
if suspend_start <= check_date <= suspend_end:
|
||
logger.info(f"股票 {ts_code} 在 {check_date} 处于停牌状态({suspend_start}至{suspend_end})")
|
||
is_suspended = True
|
||
elif 'ts_code' in row and 'trade_date' in row:
|
||
# 处理其他格式的单日停牌数据
|
||
trade_date = row['trade_date']
|
||
if earliest_suspend_start is None or trade_date < earliest_suspend_start:
|
||
earliest_suspend_start = trade_date
|
||
|
||
suspend_dates_list.append(trade_date)
|
||
logger.info(f"股票 {ts_code} 在 {trade_date} 处于停牌状态")
|
||
|
||
if trade_date == check_date:
|
||
is_suspended = True
|
||
else:
|
||
logger.warning(f"停牌数据缺少必要字段: {row}")
|
||
|
||
# 合并停牌日期范围
|
||
suspend_dates = ", ".join(suspend_dates_list)
|
||
|
||
if not is_suspended:
|
||
logger.info(f"股票 {ts_code} 在 {check_date} 未处于停牌状态")
|
||
|
||
return is_suspended, suspend_dates, earliest_suspend_start
|
||
except Exception as e:
|
||
logger.error(f"检查股票停牌状态失败 {ts_code} {check_date}: {str(e)}")
|
||
return None, "", None
|
||
|
||
def update_stock_data(ts_code):
|
||
"""
|
||
更新指定股票的行情数据
|
||
:param ts_code: 股票代码,如'688800.SH'
|
||
:return: True表示更新成功,False表示更新失败
|
||
"""
|
||
try:
|
||
logger.info(f"开始更新股票 {ts_code} 的行情数据")
|
||
|
||
# 构建输出文件路径
|
||
output_file = Config.DATA_DIR / f"{ts_code}_daily_data.txt"
|
||
|
||
# 获取Tushare API实例
|
||
pro = tushare_manager.get_pro_api()
|
||
|
||
# 检查是否存在现有数据文件
|
||
if output_file.exists():
|
||
# 读取现有数据,获取最新的交易日期
|
||
try:
|
||
# 使用与update_tushare_totxt.py相同的方式读取数据
|
||
df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
|
||
|
||
if not df.empty and 'trade_date' in df.columns:
|
||
# 获取最新交易日期
|
||
latest_date = df['trade_date'].max()
|
||
|
||
# 计算下一个交易日的起始日期(避免重复获取同一天数据)
|
||
latest_dt = datetime.datetime.strptime(str(latest_date), '%Y%m%d')
|
||
next_dt = latest_dt + datetime.timedelta(days=1)
|
||
next_date = next_dt.strftime('%Y%m%d')
|
||
|
||
logger.info(f"股票 {ts_code} 现有最新日期: {latest_date},将获取 {next_date} 至今的数据")
|
||
|
||
# 控制请求频率
|
||
tushare_manager.control_request_rate()
|
||
|
||
# 获取最新日期之后的数据
|
||
new_df = pro.daily(ts_code=ts_code, start_date=next_date)
|
||
|
||
if new_df is not None and not new_df.empty:
|
||
logger.info(f"获取到 {ts_code} 的新数据 {len(new_df)} 条")
|
||
|
||
# 合并现有数据和新数据
|
||
combined_df = pd.concat([df, new_df], ignore_index=True)
|
||
|
||
# 去重,避免重复数据
|
||
combined_df = combined_df.drop_duplicates(subset=['trade_date', 'ts_code'], keep='last')
|
||
|
||
# 按交易日期降序排序,最新交易日排在最前面
|
||
combined_df = combined_df.sort_values('trade_date', ascending=False)
|
||
|
||
# 保存合并后的数据
|
||
combined_df.to_csv(output_file, index=False, sep='\t', encoding='utf-8')
|
||
logger.info(f"股票 {ts_code} 的行情数据已成功更新")
|
||
return True
|
||
else:
|
||
logger.info(f"未获取到股票 {ts_code} 的新数据")
|
||
return True
|
||
else:
|
||
logger.warning(f"文件 {output_file} 内容异常,重新获取全部数据")
|
||
except Exception as e:
|
||
logger.error(f"读取文件 {output_file} 失败: {str(e)}")
|
||
|
||
# 文件不存在或读取失败,获取全部数据
|
||
logger.info(f"获取股票 {ts_code} 的全部行情数据")
|
||
|
||
# 控制请求频率
|
||
tushare_manager.control_request_rate()
|
||
|
||
# 获取全部数据
|
||
full_df = pro.daily(ts_code=ts_code)
|
||
|
||
if full_df is not None and not full_df.empty:
|
||
# 按交易日期降序排序,最新交易日排在最前面
|
||
full_df = full_df.sort_values('trade_date', ascending=False)
|
||
|
||
# 保存数据
|
||
full_df.to_csv(output_file, index=False, sep='\t', encoding='utf-8')
|
||
logger.info(f"股票 {ts_code} 的行情数据已成功获取并保存")
|
||
return True
|
||
else:
|
||
logger.warning(f"未能获取到股票 {ts_code} 的行情数据")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"更新股票 {ts_code} 数据失败: {str(e)}")
|
||
return False
|
||
|
||
def check_market_data(online_check=Config.DEFAULT_ONLINE_CHECK):
|
||
"""
|
||
检查所有行情数据文件的完整性
|
||
|
||
Args:
|
||
online_check: 是否进行在线数据检查,默认False
|
||
"""
|
||
# 设置数据目录
|
||
data_dir = Config.DATA_DIR
|
||
|
||
# 获取当前时间
|
||
now = datetime.datetime.now()
|
||
today = now.strftime('%Y%m%d')
|
||
|
||
# 添加时间判断逻辑:如果当前时间早于16:00,检查日期为前一天;否则为当天
|
||
if now.hour < 16:
|
||
# 获取前一天日期
|
||
yesterday = now - datetime.timedelta(days=1)
|
||
check_date = yesterday.strftime('%Y%m%d')
|
||
logger.info(f"当前时间{now.strftime('%Y-%m-%d %H:%M:%S')}早于16:00,检查日期调整为前一天:{check_date}")
|
||
else:
|
||
check_date = today
|
||
logger.info(f"开始检查行情数据完整性,检查日期:{check_date}")
|
||
|
||
# 优先缓存交易日历,确保后续所有操作都能使用缓存
|
||
logger.info("正在加载交易日历缓存...")
|
||
calendar_df = get_trade_calendar()
|
||
if calendar_df is None:
|
||
logger.warning("交易日历加载失败,部分功能可能受影响")
|
||
else:
|
||
logger.info(f"交易日历缓存成功,共 {len(calendar_df)} 条记录")
|
||
# 验证检查日期是否为交易日
|
||
if is_trading_day(check_date):
|
||
logger.info(f"检查日期 {check_date} 是交易日")
|
||
else:
|
||
logger.warning(f"检查日期 {check_date} 不是交易日,可能所有文件都未更新")
|
||
|
||
# 获取所有txt文件列表
|
||
all_files = list(data_dir.glob('*.txt'))
|
||
total = len(all_files)
|
||
completed = 0
|
||
logger.info(f"开始检查 {total} 个数据文件...")
|
||
|
||
# 记录开始时间
|
||
start_time = datetime.datetime.now()
|
||
|
||
# 存储不完整的数据文件
|
||
incomplete_files = []
|
||
|
||
# 遍历目录下的所有txt文件
|
||
for file_path in all_files:
|
||
file_name = file_path.name
|
||
|
||
# 从文件名中提取股票代码(如:688800.SH_daily_data.txt -> 688800.SH)
|
||
ts_code = file_name.split('_')[0]
|
||
|
||
# 获取最新交易日期
|
||
latest_date = get_latest_trade_date(file_path)
|
||
|
||
if latest_date is None:
|
||
incomplete_files.append({
|
||
'file_name': file_name,
|
||
'ts_code': ts_code,
|
||
'latest_date': 'N/A',
|
||
'trading_days_diff': 'N/A',
|
||
'online_data_exists': 'N/A',
|
||
'status': '文件内容异常',
|
||
'is_suspended': 'N/A',
|
||
'suspend_dates': 'N/A'
|
||
})
|
||
elif latest_date != check_date:
|
||
# 计算交易日差
|
||
trading_days_diff = calculate_trading_days_diff(latest_date, check_date)
|
||
|
||
# 检查在线数据是否存在
|
||
online_data_exists = None
|
||
if online_check:
|
||
online_data_exists = check_online_data_exists(ts_code, check_date)
|
||
|
||
status = '数据不完整'
|
||
if online_check and online_data_exists:
|
||
status += ',在线数据已更新'
|
||
|
||
# 先收集数据不完整的个股,不进行停牌检查
|
||
incomplete_files.append({
|
||
'file_name': file_name,
|
||
'ts_code': ts_code,
|
||
'latest_date': latest_date,
|
||
'trading_days_diff': trading_days_diff or 'N/A',
|
||
'online_data_exists': '是' if online_data_exists else '否' if online_data_exists is False else '未检查',
|
||
'status': status,
|
||
'is_suspended': 'N/A', # 先设为N/A,后续会更新
|
||
'suspend_dates': 'N/A' # 先设为N/A,后续会更新
|
||
})
|
||
# 更新进度
|
||
completed += 1
|
||
progress = (completed / total) * 100
|
||
elapsed = (datetime.datetime.now() - start_time).total_seconds()
|
||
|
||
# 显示进度条
|
||
print(f"\r进度: [{'#' * int(progress / 2)}{' ' * (50 - int(progress / 2))}] {progress:.1f}% | 已完成: {completed}/{total} | 耗时: {elapsed:.1f}s", end='', flush=True)
|
||
|
||
# 添加调试信息
|
||
logger.info(f"收集到的不完整文件数量: {len(incomplete_files)}")
|
||
if incomplete_files:
|
||
logger.info(f"前5个不完整文件示例: {[f['ts_code'] for f in incomplete_files[:5]]}")
|
||
|
||
# 进度条完成后换行
|
||
print()
|
||
|
||
# 对收集到的不完整个股进行统一的停牌检查
|
||
logger.info(f"开始对 {len(incomplete_files)} 个数据不完整的个股进行停牌检查")
|
||
|
||
# 创建新的列表存储经过停牌检查后的结果
|
||
final_incomplete_files = []
|
||
|
||
for file_info in incomplete_files:
|
||
ts_code = file_info['ts_code']
|
||
latest_date = file_info['latest_date']
|
||
|
||
# 如果是文件内容异常,直接添加到最终列表
|
||
if file_info['status'] == '文件内容异常':
|
||
final_incomplete_files.append(file_info)
|
||
continue
|
||
|
||
# 进行停牌检查
|
||
is_suspended, suspend_dates, earliest_suspend_start = check_stock_suspended(ts_code, check_date)
|
||
|
||
# 更新文件信息
|
||
file_info['is_suspended'] = '是' if is_suspended else '否' if is_suspended is not None else '检查失败'
|
||
file_info['suspend_dates'] = suspend_dates if suspend_dates else '无'
|
||
|
||
if is_suspended is True:
|
||
logger.info(f"股票 {ts_code} 当前处于停牌状态")
|
||
|
||
if earliest_suspend_start is not None:
|
||
# 检查最新行情日期是否等于或晚于停牌开始日期
|
||
if latest_date >= earliest_suspend_start:
|
||
logger.info(f"股票 {ts_code} 的最新行情日期 {latest_date} >= 停牌开始日期 {earliest_suspend_start},数据完整,不输出报告")
|
||
continue # 跳过输出报告
|
||
else:
|
||
logger.info(f"股票 {ts_code} 的最新行情日期 {latest_date} < 停牌开始日期 {earliest_suspend_start},开始更新数据")
|
||
# 更新数据到最新
|
||
update_success = update_stock_data(ts_code)
|
||
if update_success:
|
||
# 重新读取文件获取最新日期
|
||
file_path = Config.DATA_DIR / f"{ts_code}_daily_data.txt"
|
||
updated_latest_date = get_latest_trade_date(file_path)
|
||
if updated_latest_date:
|
||
file_info['latest_date'] = updated_latest_date
|
||
logger.info(f"股票 {ts_code} 数据更新成功,最新日期: {updated_latest_date}")
|
||
# 继续输出报告,因为已经更新了数据
|
||
elif is_suspended is None:
|
||
logger.warning(f"股票 {ts_code} 的停牌检查失败,继续输出报告")
|
||
|
||
# 如果没有停牌或停牌检查失败,添加到最终列表
|
||
final_incomplete_files.append(file_info)
|
||
|
||
# 更新incomplete_files为经过停牌检查后的最终列表
|
||
incomplete_files = final_incomplete_files
|
||
logger.info(f"停牌检查完成,剩余 {len(incomplete_files)} 个需要输出报告的不完整个股")
|
||
|
||
# 输出结果到CSV文件
|
||
output_file = Config.OUTPUT_FILE
|
||
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
|
||
fieldnames = ['file_name', 'ts_code', 'latest_date', 'trading_days_diff', 'online_data_exists', 'status', 'is_suspended', 'suspend_dates']
|
||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||
|
||
writer.writeheader()
|
||
for file_info in incomplete_files:
|
||
writer.writerow(file_info)
|
||
|
||
logger.info(f"检查完成,共检查 {total} 个文件")
|
||
logger.info(f"发现 {len(incomplete_files)} 个未更新到最新的数据文件")
|
||
logger.info(f"检查结果已输出到:{output_file}")
|
||
|
||
# 打印总结
|
||
print(f"\n=== 行情数据检查结果 ===")
|
||
print(f"检查日期:{check_date}")
|
||
print(f"检查文件总数:{total}")
|
||
print(f"未更新到最新的文件数:{len(incomplete_files)}")
|
||
print(f"在线检查功能:{'开启' if online_check else '关闭'}")
|
||
print(f"检查结果已保存到:{output_file}")
|
||
|
||
if incomplete_files:
|
||
print(f"\n未更新到最新的文件列表:")
|
||
print(f"{'文件名称':<30} {'股票代码':<15} {'最新日期':<12} {'交易日差':<12} {'在线数据':<12} {'状态':<20}")
|
||
print("-" * 100)
|
||
for file_info in incomplete_files:
|
||
print(f"{file_info['file_name']:<30} {file_info['ts_code']:<15} {file_info['latest_date']:<12} "
|
||
f"{str(file_info['trading_days_diff']):<12} {file_info['online_data_exists']:<12} {file_info['status']:<20}")
|
||
|
||
if __name__ == "__main__":
|
||
# 默认关闭在线检查功能
|
||
check_market_data()
|
||
# 如果需要开启在线检查功能,可以使用以下方式
|
||
# check_market_data(online_check=True) |