修改排序方式和更新内容,

This commit is contained in:
2025-12-06 22:50:00 +08:00
parent d02856fa5c
commit 094d2845f3
9 changed files with 54780 additions and 2668 deletions

View File

@@ -4,6 +4,7 @@ import time
import logging
import threading
import socket
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import tushare as ts
@@ -528,6 +529,9 @@ class DataDownloader:
def save_to_txt(self, data: pd.DataFrame, filename: str) -> bool:
try:
# 按交易日期降序排序,确保最新交易日排在最前面
if 'trade_date' in data.columns:
data = data.sort_values('trade_date', ascending=False)
data.to_csv(filename, index=False, sep='\t', encoding='utf-8')
# logging.info(f"数据已保存到 {filename}")
return True
@@ -538,10 +542,68 @@ class DataDownloader:
def process_stock_code(self, code, progress_queue=None): # 修改参数默认值为None
pro = self.account_manager.get_next_account()
try:
df = self.fetch_data_with_retry(pro.daily, ts_code=code)
if df is not None:
output_file = os.path.join(Config.OUTPUT_DIR, f"{code}_daily_data.txt")
self.save_to_txt(df, output_file)
output_file = os.path.join(Config.OUTPUT_DIR, f"{code}_daily_data.txt")
# 检查是否存在现有数据文件
if os.path.exists(output_file):
# 读取现有数据,获取最新的交易日期
existing_df = self.read_from_txt(output_file)
if existing_df is not None and not existing_df.empty:
# 获取最新交易日期
if 'trade_date' in existing_df.columns:
# 由于read_from_txt会将trade_date转换为datetime格式
# 确保现有数据的trade_date列是datetime格式
if not pd.api.types.is_datetime64_any_dtype(existing_df['trade_date']):
existing_df['trade_date'] = pd.to_datetime(existing_df['trade_date'], format='%Y%m%d')
# 获取最新交易日期
latest_date_dt = existing_df['trade_date'].max()
# 计算下一个交易日的起始日期(避免重复获取同一天数据)
next_date_dt = latest_date_dt + timedelta(days=1)
next_date = next_date_dt.strftime('%Y%m%d')
# 获取最新日期之后的数据
df = self.fetch_data_with_retry(pro.daily, ts_code=code, start_date=next_date)
if df is not None and not df.empty:
# 将新数据的trade_date列转换为datetime格式以便合并
df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')
# 合并现有数据和新数据
combined_df = pd.concat([existing_df, df], ignore_index=True)
# 去重,避免重复数据
combined_df = combined_df.drop_duplicates(subset=['trade_date', 'ts_code'], keep='last')
# 按交易日期降序排序,最新交易日排在最前面
combined_df = combined_df.sort_values('trade_date', ascending=False)
# 将trade_date转换回字符串格式保存
combined_df['trade_date'] = combined_df['trade_date'].dt.strftime('%Y%m%d')
# 保存合并后的数据
self.save_to_txt(combined_df, output_file)
else:
# 如果现有数据没有 trade_date 列,重新获取全部数据
df = self.fetch_data_with_retry(pro.daily, ts_code=code)
if df is not None:
self.save_to_txt(df, output_file)
else:
# 现有数据为空,重新获取全部数据
df = self.fetch_data_with_retry(pro.daily, ts_code=code)
if df is not None:
self.save_to_txt(df, output_file)
else:
# 文件不存在,获取全部数据
df = self.fetch_data_with_retry(pro.daily, ts_code=code)
if df is not None:
self.save_to_txt(df, output_file)
if progress_queue is not None: # 添加判断
progress_queue.put(1)
except (ConnectionError, TimeoutError) as e: