# -*- coding: utf-8 -*- """ 绩效分析模块 计算各种回测绩效指标,如收益率、夏普比率、最大回撤等 """ import pandas as pd import numpy as np import logging from scipy import stats class PerformanceAnalyzer: """ 绩效分析器 提供全面的回测结果绩效分析功能 """ def __init__(self): """ 初始化绩效分析器 """ self.logger = logging.getLogger('performance_analyzer') # 设置无风险利率(年化) self.risk_free_rate = 0.02 # 假设无风险利率为2% def calculate_metrics(self, portfolio): """ 计算绩效指标 Args: portfolio: 投资组合DataFrame,包含收益率等数据 Returns: dict: 包含各种绩效指标的字典 """ self.logger.info('开始计算绩效指标...') metrics = {} try: # 计算基本收益率指标 metrics['total_return'] = self._calculate_total_return(portfolio) metrics['annual_return'] = self._calculate_annual_return(portfolio) metrics['daily_returns'] = portfolio['return'].tolist() metrics['cumulative_returns'] = portfolio['cumulative_return'].tolist() # 计算风险指标 metrics['volatility'] = self._calculate_volatility(portfolio) metrics['annual_volatility'] = self._calculate_annual_volatility(portfolio) metrics['max_drawdown'] = self._calculate_max_drawdown(portfolio) metrics['max_drawdown_duration'] = self._calculate_max_drawdown_duration(portfolio) # 计算风险调整收益指标 metrics['sharpe_ratio'] = self._calculate_sharpe_ratio(portfolio) metrics['sortino_ratio'] = self._calculate_sortino_ratio(portfolio) metrics['calmar_ratio'] = self._calculate_calmar_ratio(portfolio) # 计算交易相关指标(如果有交易记录) if 'signal' in portfolio.columns: trade_metrics = self._calculate_trade_metrics(portfolio) metrics.update(trade_metrics) # 计算其他统计指标 metrics['skewness'] = self._calculate_skewness(portfolio) metrics['kurtosis'] = self._calculate_kurtosis(portfolio) metrics['beta'] = self._calculate_beta(portfolio) # 需要基准数据 self.logger.info('绩效指标计算完成') except Exception as e: self.logger.error(f'计算绩效指标时出错: {str(e)}') # 至少返回基本指标 metrics['total_return'] = self._calculate_total_return(portfolio) metrics['max_drawdown'] = self._calculate_max_drawdown(portfolio) return metrics def _calculate_total_return(self, portfolio): """ 计算总收益率 Args: portfolio: 投资组合数据 Returns: float: 总收益率 """ initial_value = portfolio['total'].iloc[0] final_value = portfolio['total'].iloc[-1] return (final_value - initial_value) / initial_value def _calculate_annual_return(self, portfolio): """ 计算年化收益率 Args: portfolio: 投资组合数据 Returns: float: 年化收益率 """ total_return = self._calculate_total_return(portfolio) # 计算交易日数量 trading_days = len(portfolio) # 假设一年252个交易日 annual_factor = 252.0 / trading_days return (1 + total_return) ** annual_factor - 1 def _calculate_volatility(self, portfolio): """ 计算收益率波动率 Args: portfolio: 投资组合数据 Returns: float: 波动率 """ return portfolio['return'].std() def _calculate_annual_volatility(self, portfolio): """ 计算年化波动率 Args: portfolio: 投资组合数据 Returns: float: 年化波动率 """ daily_vol = self._calculate_volatility(portfolio) return daily_vol * np.sqrt(252) # 年化 def _calculate_max_drawdown(self, portfolio): """ 计算最大回撤 Args: portfolio: 投资组合数据 Returns: float: 最大回撤(负值) """ # 计算累计最大值 cumulative_max = portfolio['total'].cummax() # 计算回撤 drawdown = (portfolio['total'] - cumulative_max) / cumulative_max # 返回最大回撤 return drawdown.min() def _calculate_max_drawdown_duration(self, portfolio): """ 计算最大回撤持续时间 Args: portfolio: 投资组合数据 Returns: int: 最大回撤持续天数 """ cumulative_max = portfolio['total'].cummax() drawdown = (portfolio['total'] - cumulative_max) / cumulative_max # 找出回撤期间 in_drawdown = False current_duration = 0 max_duration = 0 for i, dd in enumerate(drawdown): if dd < 0 and not in_drawdown: in_drawdown = True current_duration = 1 elif dd < 0 and in_drawdown: current_duration += 1 max_duration = max(max_duration, current_duration) elif dd == 0 and in_drawdown: in_drawdown = False current_duration = 0 return max_duration def _calculate_sharpe_ratio(self, portfolio): """ 计算夏普比率 Args: portfolio: 投资组合数据 Returns: float: 夏普比率 """ # 计算超额收益 excess_returns = portfolio['return'] - (self.risk_free_rate / 252) # 计算夏普比率 if excess_returns.std() == 0: return 0 sharpe = excess_returns.mean() / excess_returns.std() * np.sqrt(252) return sharpe def _calculate_sortino_ratio(self, portfolio): """ 计算索提诺比率(只考虑下行风险) Args: portfolio: 投资组合数据 Returns: float: 索提诺比率 """ # 计算超额收益 excess_returns = portfolio['return'] - (self.risk_free_rate / 252) # 计算下行风险(仅考虑负收益) downside_returns = excess_returns[excess_returns < 0] if len(downside_returns) == 0: return float('inf') downside_risk = np.sqrt((downside_returns ** 2).mean()) if downside_risk == 0: return 0 sortino = excess_returns.mean() / downside_risk * np.sqrt(252) return sortino def _calculate_calmar_ratio(self, portfolio): """ 计算卡尔马比率(年化收益/最大回撤) Args: portfolio: 投资组合数据 Returns: float: 卡尔马比率 """ annual_return = self._calculate_annual_return(portfolio) max_drawdown = abs(self._calculate_max_drawdown(portfolio)) if max_drawdown == 0: return float('inf') return annual_return / max_drawdown def _calculate_trade_metrics(self, portfolio): """ 计算交易相关指标 Args: portfolio: 包含信号的投资组合数据 Returns: dict: 交易指标 """ # 找出买入和卖出信号 buy_signals = portfolio[portfolio['signal'] == 1] sell_signals = portfolio[portfolio['signal'] == -1] # 计算交易次数 total_trades = len(buy_signals) + len(sell_signals) # 计算胜率(简化版本,实际应基于完整交易) win_rate = 0 avg_return_per_trade = 0 if total_trades > 0: # 计算每次交易的收益(简化计算) trade_returns = [] position = 0 entry_price = None for date, row in portfolio.iterrows(): if row['signal'] == 1 and position == 0: entry_price = row['price'] position = 1 elif row['signal'] == -1 and position == 1: exit_price = row['price'] trade_return = (exit_price - entry_price) / entry_price trade_returns.append(trade_return) position = 0 if trade_returns: winning_trades = sum(1 for r in trade_returns if r > 0) win_rate = winning_trades / len(trade_returns) avg_return_per_trade = np.mean(trade_returns) return { 'total_trades': total_trades, 'buy_signals': len(buy_signals), 'sell_signals': len(sell_signals), 'win_rate': win_rate, 'avg_return_per_trade': avg_return_per_trade } def _calculate_skewness(self, portfolio): """ 计算收益率偏度 Args: portfolio: 投资组合数据 Returns: float: 偏度 """ return stats.skew(portfolio['return'].dropna()) def _calculate_kurtosis(self, portfolio): """ 计算收益率峰度 Args: portfolio: 投资组合数据 Returns: float: 峰度 """ # 使用Fisher峰度(正态分布的峰度为0) return stats.kurtosis(portfolio['return'].dropna(), fisher=True) def _calculate_beta(self, portfolio, benchmark_returns=None): """ 计算贝塔系数 Args: portfolio: 投资组合数据 benchmark_returns: 基准收益率序列(如果不提供则返回None) Returns: float: 贝塔系数 """ if benchmark_returns is None: # 如果没有基准数据,尝试使用自身作为基准(简化处理) # 实际应用中应该传入市场基准数据 return 1.0 # 确保两个序列长度相同 min_len = min(len(portfolio['return']), len(benchmark_returns)) if min_len < 2: return 1.0 # 计算协方差和方差 cov_matrix = np.cov(portfolio['return'][-min_len:], benchmark_returns[-min_len:]) beta = cov_matrix[0, 1] / cov_matrix[1, 1] if cov_matrix[1, 1] != 0 else 1.0 return beta def calculate_rolling_metrics(self, portfolio, window=20): """ 计算滚动绩效指标 Args: portfolio: 投资组合数据 window: 滚动窗口大小 Returns: pandas.DataFrame: 滚动绩效指标 """ rolling_metrics = pd.DataFrame(index=portfolio.index) # 计算滚动收益率 rolling_metrics['rolling_return'] = portfolio['return'].rolling(window=window).mean() # 计算滚动波动率 rolling_metrics['rolling_volatility'] = portfolio['return'].rolling(window=window).std() # 计算滚动夏普比率(简化版) excess_returns = portfolio['return'] - (self.risk_free_rate / 252) rolling_metrics['rolling_sharpe'] = ( excess_returns.rolling(window=window).mean() / excess_returns.rolling(window=window).std() * np.sqrt(252) ) return rolling_metrics def generate_performance_report(self, metrics, portfolio, output_file=None): """ 生成绩效报告 Args: metrics: 绩效指标字典 portfolio: 投资组合数据 output_file: 输出文件路径 Returns: str: 绩效报告内容 """ # 生成报告内容 report = [] report.append('=' * 60) report.append('回测绩效报告') report.append('=' * 60) # 基本信息 report.append('\n1. 基本信息') report.append('-' * 30) report.append(f"回测期间: {portfolio.index[0].strftime('%Y-%m-%d')} 至 {portfolio.index[-1].strftime('%Y-%m-%d')}") report.append(f"交易天数: {len(portfolio)} 天") report.append(f"初始资金: {portfolio['total'].iloc[0]:,.2f}") report.append(f"最终资金: {portfolio['total'].iloc[-1]:,.2f}") # 收益率指标 report.append('\n2. 收益率指标') report.append('-' * 30) report.append(f"总收益率: {metrics.get('total_return', 0) * 100:.2f}%") report.append(f"年化收益率: {metrics.get('annual_return', 0) * 100:.2f}%") # 风险指标 report.append('\n3. 风险指标') report.append('-' * 30) report.append(f"年化波动率: {metrics.get('annual_volatility', 0) * 100:.2f}%") report.append(f"最大回撤: {metrics.get('max_drawdown', 0) * 100:.2f}%") report.append(f"最大回撤持续天数: {metrics.get('max_drawdown_duration', 0)} 天") # 风险调整收益 report.append('\n4. 风险调整收益') report.append('-' * 30) report.append(f"夏普比率: {metrics.get('sharpe_ratio', 0):.4f}") report.append(f"索提诺比率: {metrics.get('sortino_ratio', 0):.4f}") report.append(f"卡尔马比率: {metrics.get('calmar_ratio', 0):.4f}") # 交易统计 report.append('\n5. 交易统计') report.append('-' * 30) report.append(f"总交易次数: {metrics.get('total_trades', 0)}") report.append(f"买入信号: {metrics.get('buy_signals', 0)}") report.append(f"卖出信号: {metrics.get('sell_signals', 0)}") report.append(f"胜率: {metrics.get('win_rate', 0) * 100:.2f}%") report.append(f"平均每笔收益: {metrics.get('avg_return_per_trade', 0) * 100:.2f}%") # 统计特征 report.append('\n6. 统计特征') report.append('-' * 30) report.append(f"收益率偏度: {metrics.get('skewness', 0):.4f}") report.append(f"收益率峰度: {metrics.get('kurtosis', 0):.4f}") report.append(f"贝塔系数: {metrics.get('beta', 0):.4f}") report.append('\n' + '=' * 60) report.append('报告生成完毕') report.append('=' * 60) report_content = '\n'.join(report) # 保存报告到文件 if output_file: try: with open(output_file, 'w', encoding='utf-8') as f: f.write(report_content) self.logger.info(f'绩效报告已保存到: {output_file}') except Exception as e: self.logger.error(f'保存绩效报告时出错: {str(e)}') return report_content