"""持仓规模优化模块。 支持多种仓位管理方法: - equal_weight: 等权分配(默认) - kelly: Kelly公式(需要历史胜率和盈亏比) - volatility_target: 波动率目标(根据股票波动率调整仓位) 使用方法: position_sizer = PositionSizing(method="equal_weight", max_positions=2) # 在策略中使用 if position_sizer.can_open(ts_code, cash, price): shares = position_sizer.calc_shares(ts_code, cash, price, df) """ from __future__ import annotations from typing import Dict, Optional import pandas as pd from utils.logger import setup_logger logger = setup_logger(__name__) class PositionSizing: """持仓规模优化管理器。 支持多种方法: - equal_weight: 等权分配 - kelly: Kelly公式 - volatility_target: 波动率目标 """ def __init__( self, method: str = "equal_weight", max_positions: int = 2, kelly_risk_free: float = 0.03, kelly_max_fraction: float = 0.25, volatility_target: float = 0.15, volatility_window: int = 20, ): """初始化持仓规模管理器。 参数: method: 仓位管理方法 ("equal_weight", "kelly", "volatility_target") max_positions: 最大持仓数 kelly_risk_free: Kelly公式的无风险利率 kelly_max_fraction: Kelly公式的最大仓位比例(防止过度杠杆) volatility_target: 目标波动率(年化) volatility_window: 计算波动率的窗口期 """ self.method = method self.max_positions = max_positions self.kelly_risk_free = kelly_risk_free self.kelly_max_fraction = kelly_max_fraction self.volatility_target = volatility_target self.volatility_window = volatility_window # 记录每只股票的历史交易统计(用于Kelly公式) self.trade_stats: Dict[str, Dict] = {} def can_open(self, ts_code: str, cash: float, price: float, current_positions: int) -> bool: """判断是否可以开新仓位。 参数: ts_code: 股票代码 cash: 当前可用现金 price: 当前价格 current_positions: 当前持仓数量 返回: bool: 是否可以开仓 """ # 检查仓位是否已满 if current_positions >= self.max_positions: return False # 检查资金是否足够买入至少 100 股 min_cost = price * 100 if cash < min_cost: return False return True def calc_shares( self, ts_code: str, cash: float, price: float, remain_slots: int, df: Optional[pd.DataFrame] = None, ) -> int: """计算应该买入的股数。 参数: ts_code: 股票代码 cash: 当前可用现金 price: 当前价格 remain_slots: 剩余可用仓位数 df: 股票历史数据(用于计算波动率等指标) 返回: int: 买入股数(已取整到100的整数倍) """ if self.method == "equal_weight": return self._equal_weight_shares(cash, price, remain_slots) elif self.method == "kelly": return self._kelly_shares(ts_code, cash, price, remain_slots) elif self.method == "volatility_target": return self._volatility_target_shares(ts_code, cash, price, remain_slots, df) else: logger.warning(f"未知的仓位管理方法: {self.method},使用等权分配") return self._equal_weight_shares(cash, price, remain_slots) def _equal_weight_shares(self, cash: float, price: float, remain_slots: int) -> int: """等权分配:平均分配现金到剩余仓位。 例如:现金100万,剩余2个仓位,每个仓位分配50万 """ if remain_slots <= 0: return 0 cash_per_stock = cash / remain_slots shares = int(cash_per_stock // price) # A股规则:向下取整到100的整数倍 shares = (shares // 100) * 100 return shares def _kelly_shares(self, ts_code: str, cash: float, price: float, remain_slots: int) -> int: """Kelly公式:根据历史胜率和盈亏比计算最优仓位。 Kelly% = (胜率 * 盈亏比 - 败率) / 盈亏比 注意:需要积累一定的交易历史才能准确计算。 如果没有历史数据,回退到等权分配。 """ stats = self.trade_stats.get(ts_code) if stats is None or stats.get("total_trades", 0) < 10: # 交易次数不足,使用等权分配 logger.debug(f"{ts_code} 历史交易不足,使用等权分配") return self._equal_weight_shares(cash, price, remain_slots) win_rate = stats.get("win_rate", 0.5) avg_win = stats.get("avg_win", 0.05) avg_loss = stats.get("avg_loss", 0.05) if avg_loss <= 0: avg_loss = 0.01 # 避免除零 profit_loss_ratio = avg_win / avg_loss kelly_fraction = (win_rate * profit_loss_ratio - (1 - win_rate)) / profit_loss_ratio # Kelly公式可能给出负值或过大值,需要限制 kelly_fraction = max(0, min(kelly_fraction, self.kelly_max_fraction)) # 考虑剩余仓位数,分配资金 cash_per_stock = cash / remain_slots kelly_cash = cash_per_stock * kelly_fraction shares = int(kelly_cash // price) shares = (shares // 100) * 100 logger.debug( f"{ts_code} Kelly仓位: {kelly_fraction*100:.2f}%, " f"胜率={win_rate*100:.1f}%, 盈亏比={profit_loss_ratio:.2f}" ) return shares def _volatility_target_shares( self, ts_code: str, cash: float, price: float, remain_slots: int, df: Optional[pd.DataFrame], ) -> int: """波动率目标:根据股票波动率调整仓位。 波动率高的股票减小仓位,波动率低的股票增大仓位。 目标:使每个仓位的波动率贡献接近目标波动率。 """ if df is None or df.empty: logger.debug(f"{ts_code} 缺少历史数据,使用等权分配") return self._equal_weight_shares(cash, price, remain_slots) # 计算历史波动率(日收益率标准差 * sqrt(252)) if "close" not in df.columns or len(df) < self.volatility_window: logger.debug(f"{ts_code} 数据不足,使用等权分配") return self._equal_weight_shares(cash, price, remain_slots) returns = df["close"].pct_change().dropna() if len(returns) < self.volatility_window: logger.debug(f"{ts_code} 数据不足,使用等权分配") return self._equal_weight_shares(cash, price, remain_slots) # 使用最近 volatility_window 天的数据计算波动率 recent_volatility = returns.tail(self.volatility_window).std() * (252 ** 0.5) if recent_volatility <= 0: logger.debug(f"{ts_code} 波动率为0,使用等权分配") return self._equal_weight_shares(cash, price, remain_slots) # 仓位调整因子 = 目标波动率 / 实际波动率 volatility_factor = self.volatility_target / recent_volatility volatility_factor = max(0.5, min(volatility_factor, 2.0)) # 限制在 [0.5, 2.0] # 基础等权分配 * 波动率因子 base_shares = self._equal_weight_shares(cash, price, remain_slots) adjusted_shares = int(base_shares * volatility_factor) adjusted_shares = (adjusted_shares // 100) * 100 logger.debug( f"{ts_code} 波动率调整: 实际={recent_volatility*100:.2f}%, " f"目标={self.volatility_target*100:.2f}%, 因子={volatility_factor:.2f}" ) return adjusted_shares def update_trade_stats(self, ts_code: str, profit_pct: float) -> None: """更新交易统计(用于Kelly公式)。 参数: ts_code: 股票代码 profit_pct: 本次交易盈亏比例(正数为盈利,负数为亏损) """ if ts_code not in self.trade_stats: self.trade_stats[ts_code] = { "total_trades": 0, "wins": 0, "losses": 0, "total_win": 0.0, "total_loss": 0.0, } stats = self.trade_stats[ts_code] stats["total_trades"] += 1 if profit_pct > 0: stats["wins"] += 1 stats["total_win"] += profit_pct else: stats["losses"] += 1 stats["total_loss"] += abs(profit_pct) # 计算平均值 stats["win_rate"] = stats["wins"] / stats["total_trades"] stats["avg_win"] = stats["total_win"] / stats["wins"] if stats["wins"] > 0 else 0.05 stats["avg_loss"] = stats["total_loss"] / stats["losses"] if stats["losses"] > 0 else 0.05 def get_stats(self, ts_code: str) -> Optional[Dict]: """获取某只股票的交易统计。""" return self.trade_stats.get(ts_code) def clear_stats(self) -> None: """清空所有交易统计。""" self.trade_stats.clear()