import os import csv import datetime import logging from pathlib import Path from log_style import setup_logger import pandas as pd import tushare as ts import time from functools import lru_cache # 配置日志 logger = setup_logger( name='market_data_check', log_file='market_data_check.log', level=logging.INFO, log_format='%(asctime)s - %(levelname)s - %(message)s', console=True, log_dir='.', backup_count=3 ) logger.propagate = False # 避免日志消息向上传递到父记录器,防止重复输出 # Tushare配置 TUSHARE_TOKENS = [ '9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d' # 主账户 ] # API请求频率控制 MAX_REQUESTS_PER_MINUTE = 500 class TushareManager: """Tushare API管理类,处理账户轮询和请求频率控制""" def __init__(self, tokens): self.tokens = tokens self.current_token_index = 0 self.last_request_time = time.time() self.request_count = 0 def get_pro_api(self): """获取Tushare API实例,自动处理账户轮询""" # 简单的账户轮询 token = self.tokens[self.current_token_index] self.current_token_index = (self.current_token_index + 1) % len(self.tokens) return ts.pro_api(token) def control_request_rate(self): """控制请求频率,确保不超过API限制""" current_time = time.time() time_since_last_request = current_time - self.last_request_time # 如果超过1分钟,重置计数器 if time_since_last_request > 60: self.request_count = 0 self.last_request_time = current_time # 如果请求次数超过限制,等待 if self.request_count >= MAX_REQUESTS_PER_MINUTE: wait_time = 60 - time_since_last_request + 1 logger.info(f"请求频率过高,等待 {wait_time:.1f} 秒") time.sleep(wait_time) self.request_count = 0 self.last_request_time = time.time() # 简单的速率限制(每0.1秒一个请求) if time_since_last_request < 0.1: time.sleep(0.1 - time_since_last_request) self.request_count += 1 self.last_request_time = current_time # 创建Tushare管理器实例 tushare_manager = TushareManager(TUSHARE_TOKENS) # 全局变量,用于缓存交易日历 trade_calendar_cache = None trade_calendar_dates = None def get_trade_calendar(): """ 一次性获取较大范围的交易日历并缓存到内存 :return: 交易日历DataFrame """ global trade_calendar_cache, trade_calendar_dates # 如果已经缓存,直接返回 if trade_calendar_cache is not None: return trade_calendar_cache try: # 计算日期范围:过去2年到未来1个月 today = datetime.datetime.now() start_date = (today - datetime.timedelta(days=730)).strftime('%Y%m%d') # 过去2年 end_date = (today + datetime.timedelta(days=30)).strftime('%Y%m%d') # 未来1个月 pro = tushare_manager.get_pro_api() tushare_manager.control_request_rate() df = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date) if df is None or df.empty: logger.warning(f"未获取到{start_date}至{end_date}的交易日历") return None # 缓存结果 trade_calendar_cache = df # 同时创建一个日期集合,方便快速查询 trade_calendar_dates = set(df[df['is_open'] == 1]['cal_date'].tolist()) logger.info(f"成功获取并缓存交易日历: {start_date}至{end_date}") return df except Exception as e: logger.error(f"获取交易日历失败: {str(e)}") return None def get_latest_trade_date(file_path): """ 从txt文件中获取最新的交易日期 :param file_path: 文件路径 :return: 最新交易日期字符串,如'20251204',如果文件为空返回None """ try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() if len(lines) < 2: # 至少需要有表头和一行数据 logger.warning(f"文件 {file_path} 内容不足") return None # 第二行是第一行数据(最新的交易日期) first_data_line = lines[1].strip() if not first_data_line: logger.warning(f"文件 {file_path} 数据行为空") return None # 按制表符分割 columns = first_data_line.split('\t') if len(columns) < 2: # 至少需要有ts_code和trade_date logger.warning(f"文件 {file_path} 数据格式不正确") return None return columns[1] except Exception as e: logger.error(f"读取文件 {file_path} 时出错: {str(e)}") return None def calculate_trading_days_diff(start_date, end_date): """ 计算两个日期之间的交易日数量 :param start_date: 开始日期,格式YYYYMMDD :param end_date: 结束日期,格式YYYYMMDD :return: 交易日数量,如果计算失败返回None """ try: # 确保日期格式正确 start = datetime.datetime.strptime(start_date, '%Y%m%d') end = datetime.datetime.strptime(end_date, '%Y%m%d') # 如果开始日期大于结束日期,交换 if start > end: start, end = end, start start_date, end_date = end_date, start_date # 获取缓存的交易日历 calendar_df = get_trade_calendar() if calendar_df is None: # 如果获取交易日历失败,使用简单的日期差作为近似值 days_diff = (end - start).days logger.warning(f"无法获取交易日历,使用自然日差近似:{days_diff}天") return days_diff # 筛选出指定日期范围内的交易日 mask = (calendar_df['cal_date'] >= start_date) & (calendar_df['cal_date'] <= end_date) & (calendar_df['is_open'] == 1) trading_days_count = calendar_df[mask]['cal_date'].count() return trading_days_count except Exception as e: logger.error(f"计算交易日差失败: {str(e)}") return None def check_online_data_exists(ts_code, trade_date): """ 检查在线数据是否存在 :param ts_code: 股票代码,如'688800.SH' :param trade_date: 交易日期,格式YYYYMMDD :return: True表示数据存在,False表示不存在,None表示查询失败 """ try: pro = tushare_manager.get_pro_api() tushare_manager.control_request_rate() # 查询指定日期的交易数据 df = pro.daily(ts_code=ts_code, trade_date=trade_date) if df is None or df.empty: logger.info(f"在线数据中未找到 {ts_code} {trade_date} 的交易数据") return False else: logger.info(f"在线数据中找到 {ts_code} {trade_date} 的交易数据") return True except Exception as e: logger.error(f"查询在线数据失败 {ts_code} {trade_date}: {str(e)}") return None def check_market_data(online_check=False): """ 检查所有行情数据文件的完整性 Args: online_check: 是否进行在线数据检查,默认False """ # 设置数据目录 data_dir = Path(r'D:\gp_data\day') # 获取当天日期(格式:YYYYMMDD) today = datetime.datetime.now().strftime('%Y%m%d') logger.info(f"开始检查行情数据完整性,当前日期:{today}") # 获取所有txt文件列表 all_files = list(data_dir.glob('*.txt')) total = len(all_files) completed = 0 # 记录开始时间 start_time = datetime.datetime.now() # 存储不完整的数据文件 incomplete_files = [] # 遍历目录下的所有txt文件 for file_path in all_files: file_name = file_path.name # 从文件名中提取股票代码(如:688800.SH_daily_data.txt -> 688800.SH) ts_code = file_name.split('_')[0] # 获取最新交易日期 latest_date = get_latest_trade_date(file_path) if latest_date is None: incomplete_files.append({ 'file_name': file_name, 'ts_code': ts_code, 'latest_date': 'N/A', 'trading_days_diff': 'N/A', 'online_data_exists': 'N/A', 'status': '文件内容异常' }) elif latest_date != today: # 计算交易日差 trading_days_diff = calculate_trading_days_diff(latest_date, today) # 检查在线数据是否存在 online_data_exists = None if online_check: online_data_exists = check_online_data_exists(ts_code, today) status = '数据不完整' if online_check and online_data_exists: status += ',在线数据已更新' incomplete_files.append({ 'file_name': file_name, 'ts_code': ts_code, 'latest_date': latest_date, 'trading_days_diff': trading_days_diff or 'N/A', 'online_data_exists': '是' if online_data_exists else '否' if online_data_exists is False else '未检查', 'status': status }) # 移除单个文件的完整日志 # 更新进度 completed += 1 progress = (completed / total) * 100 elapsed = (datetime.datetime.now() - start_time).total_seconds() # 显示进度条 print(f"\r进度: [{'#' * int(progress / 2)}{' ' * (50 - int(progress / 2))}] {progress:.1f}% | 已完成: {completed}/{total} | 耗时: {elapsed:.1f}s", end='', flush=True) # 进度条完成后换行 print() # 输出结果到CSV文件 output_file = Path('market_data_check_result.csv') with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = ['file_name', 'ts_code', 'latest_date', 'trading_days_diff', 'online_data_exists', 'status'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for file_info in incomplete_files: writer.writerow(file_info) total_files = len(list(data_dir.glob('*.txt'))) logger.info(f"检查完成,共检查 {total_files} 个文件") logger.info(f"发现 {len(incomplete_files)} 个未更新到最新的数据文件") logger.info(f"检查结果已输出到:{output_file}") # 打印总结 print(f"\n=== 行情数据检查结果 ===") print(f"检查日期:{today}") print(f"检查文件总数:{total_files}") print(f"未更新到最新的文件数:{len(incomplete_files)}") print(f"在线检查功能:{'开启' if online_check else '关闭'}") print(f"检查结果已保存到:{output_file}") if incomplete_files: print(f"\n未更新到最新的文件列表:") print(f"{'文件名称':<30} {'股票代码':<15} {'最新日期':<12} {'交易日差':<12} {'在线数据':<12} {'状态':<20}") print("-" * 100) for file_info in incomplete_files: print(f"{file_info['file_name']:<30} {file_info['ts_code']:<15} {file_info['latest_date']:<12} " f"{str(file_info['trading_days_diff']):<12} {file_info['online_data_exists']:<12} {file_info['status']:<20}") if __name__ == "__main__": # 默认关闭在线检查功能 check_market_data() # 如果需要开启在线检查功能,可以使用以下方式 # check_market_data(online_check=True)