"""基准指数加载与收益计算模块。 专门负责基准指数(如上证指数 000001.SH)的行情读取和收益率计算。 与策略完全解耦,仅用于最后图表对比。 """ from __future__ import annotations from pathlib import Path from typing import Optional, Tuple import pandas as pd from utils.logger import setup_logger logger = setup_logger(__name__) def load_benchmark( benchmark_file: Path, start_date: str, end_date: str, ) -> pd.DataFrame: """加载基准指数行情数据。 参数: benchmark_file: 基准指数数据文件路径 start_date: 开始日期(YYYYMMDD) end_date: 结束日期(YYYYMMDD) 返回: pd.DataFrame: 包含 ['trade_date', 'close'] 的数据框 """ if not benchmark_file.exists(): logger.warning(f"基准指数文件不存在: {benchmark_file}") return pd.DataFrame() try: # 读取基准指数数据 df = pd.read_csv( benchmark_file, sep="\t", encoding="utf-8", dtype=str, ) # 检查必要列 if "trade_date" not in df.columns or "close" not in df.columns: logger.error(f"基准指数文件缺少必要列: {benchmark_file}") return pd.DataFrame() # 转换数据类型 df["trade_date"] = df["trade_date"].astype(str) df["close"] = pd.to_numeric(df["close"], errors="coerce") # 过滤日期区间 if start_date: df = df[df["trade_date"] >= start_date] if end_date: df = df[df["trade_date"] <= end_date] # 排序并重置索引 df = df.sort_values("trade_date").reset_index(drop=True) # 只保留需要的列 df = df[["trade_date", "close"]] logger.info(f"基准指数数据加载成功: {len(df)} 条记录") return df except Exception as e: logger.error(f"加载基准指数数据失败: {e}") return pd.DataFrame() def calc_benchmark_return( benchmark_df: pd.DataFrame, calendar: list = None, ) -> Tuple[pd.DataFrame, dict]: """计算基准指数的收益率曲线。 参数: benchmark_df: 基准指数数据(包含 trade_date, close) calendar: 交易日历(可选,用于对齐) 返回: (equity_curve, stats): - equity_curve: 归一化收益曲线 DataFrame ['trade_date', 'benchmark_value'] - stats: 统计信息字典 """ if benchmark_df.empty: logger.warning("基准指数数据为空,无法计算收益") return pd.DataFrame(), {} df = benchmark_df.copy() # 如果提供了交易日历,只保留日历中的日期 if calendar: df = df[df["trade_date"].isin(calendar)] if df.empty: logger.warning("基准指数数据与交易日历无交集") return pd.DataFrame(), {} # 计算归一化收益(以初始值为1) initial_close = df["close"].iloc[0] df["benchmark_value"] = df["close"] / initial_close # 计算累计收益率 final_value = df["benchmark_value"].iloc[-1] cum_return = final_value - 1.0 # 计算年化收益率 n_days = len(df) years = n_days / 252.0 if years > 0: ann_return = (final_value ** (1.0 / years)) - 1.0 else: ann_return = 0.0 # 计算最大回撤 cummax = df["benchmark_value"].cummax() drawdown = df["benchmark_value"] / cummax - 1.0 max_drawdown = float(drawdown.min()) # 计算波动率(年化) df["daily_return"] = df["close"].pct_change() volatility = df["daily_return"].std() * (252 ** 0.5) # 计算夏普比率(假设无风险利率为0) avg_return = df["daily_return"].mean() std_return = df["daily_return"].std() if std_return > 0: sharpe = (avg_return * 252) / (std_return * (252 ** 0.5)) else: sharpe = 0.0 stats = { "cum_return": cum_return, "ann_return": ann_return, "max_drawdown": max_drawdown, "volatility": volatility, "sharpe": sharpe, "trading_days": n_days, "years": years, } # 只返回需要的列 result_df = df[["trade_date", "benchmark_value"]].copy() logger.info("基准指数收益计算完成:") logger.info(f" 累计收益: {cum_return*100:+.2f}%") logger.info(f" 年化收益: {ann_return*100:+.2f}%") logger.info(f" 最大回撤: {max_drawdown*100:.2f}%") logger.info(f" 夏普比率: {sharpe:.4f}") return result_df, stats def merge_with_strategy( equity_df: pd.DataFrame, benchmark_df: pd.DataFrame, initial_cash: float = 1_000_000, ) -> pd.DataFrame: """将策略收益与基准收益合并到一个DataFrame。 参数: equity_df: 策略资金曲线(包含 trade_date, total_asset) benchmark_df: 基准收益曲线(包含 trade_date, benchmark_value) initial_cash: 初始资金(用于归一化) 返回: pd.DataFrame: 合并后的数据框 ['trade_date', 'strategy_value', 'benchmark_value'] """ if equity_df.empty or benchmark_df.empty: logger.warning("策略或基准数据为空,无法合并") return pd.DataFrame() # 归一化策略收益(以初始资金为1) strategy_df = equity_df.copy() strategy_df["strategy_value"] = strategy_df["total_asset"] / initial_cash # 合并两个数据框 merged = pd.merge( strategy_df[["trade_date", "strategy_value"]], benchmark_df[["trade_date", "benchmark_value"]], on="trade_date", how="inner", ) logger.info(f"策略与基准数据合并完成,共 {len(merged)} 个交易日") return merged