"""OCZ策略(One Candle Zone突破回踩策略)。 策略逻辑: 1. 寻找关键阻力位(最近N日最高价) 2. 等待大阳线突破(实体占比>B%,涨幅>R%,放量>V1倍) 3. 首次回踩到阻力位(±TOL容错)且缩量时买入 4. 持有期间: - 止损:跌破阻力位立即卖出 - 止盈:达到盈亏比1:2的目标价位立即卖出 - 或持有指定天数后卖出 特殊风控说明: - 止损位 = 突破的阻力位(买入时记录) - 止盈位 = 买入价 + (买入价 - 止损位) * 2 - 例如:阻力位100元,买入价105元,则止损100元(-5元),止盈115元(+10元) 参数说明: - N: 阻力回望长度(默认30日) - B: 实体占比门槛(默认60%) - V1: 放量倍数(默认1.5倍) - TOL: 回踩容错百分比(默认1.5%) - R: 收盘涨幅门槛(默认6%) - hold_days: 持有天数(默认5天) - volatility_min: 最小波动率(默认2.5%) - volatility_max: 最大波动率(默认8.0%) """ from typing import Dict, List, Optional import pandas as pd from strategies.base_strategy import BaseStrategy from utils.logger import setup_logger logger = setup_logger(__name__) class OczStrategy(BaseStrategy): """OCZ突破回踩策略。""" def __init__( self, initial_cash: float, N: int = 30, # 阻力回望长度 B: float = 60.0, # 实体占比门槛(%) V1: float = 1.5, # 放量倍数 TOL: float = 1.5, # 回踩容错(%) R: float = 4.0, # 收盘涨幅门槛(%) hold_days: int = 5, # 持有天数 volatility_min: float = 2.5, # 最小波动率(%) volatility_max: float = 8.0, # 最大波动率(%) max_positions: int = 2, position_pct_per_stock: float = 0.2, stop_loss: Optional[object] = None, take_profit: Optional[object] = None, position_sizer: Optional[object] = None, date_index_dict: Optional[Dict[str, Dict[str, int]]] = None, buy_signal_index: Optional[Dict[str, List[str]]] = None, ): """初始化OCZ策略。""" super().__init__( initial_cash=initial_cash, max_positions=max_positions, stop_loss=stop_loss, take_profit=take_profit, position_sizer=position_sizer, date_index_dict=date_index_dict, buy_signal_index=buy_signal_index, ) self.N = int(N) self.B = float(B) self.V1 = float(V1) self.TOL = float(TOL) self.R = float(R) self.hold_days = int(hold_days) self.volatility_min = float(volatility_min) self.volatility_max = float(volatility_max) self.position_pct_per_stock = float(position_pct_per_stock) # 记录每只股票的突破信息(用于特殊止损止盈) # 格式: {ts_code: {"resistance": 阻力位, "buy_price": 买入价, "stop_loss": 止损价, "take_profit": 止盈价}} self.position_risk_info: Dict[str, Dict[str, float]] = {} logger.info( f"OczStrategy 初始化参数: N={self.N}, B={self.B}, V1={self.V1}, " f"TOL={self.TOL}, R={self.R}, hold_days={self.hold_days}, " f"max_positions={max_positions}, position_pct_per_stock={self.position_pct_per_stock}" ) def on_bar(self, current_date: str, data_dict: Dict[str, pd.DataFrame]) -> None: """每个交易日的交易决策逻辑。""" # 1. 先处理卖出逻辑(止损/止盈/持有到期) for ts_code in list(self.positions.keys()): df = data_dict.get(ts_code) if df is None or df.empty: continue # 使用预计算的日期索引 date_index = self.date_index_dict.get(ts_code) if date_index is None or current_date not in date_index: continue idx = date_index[current_date] row = df.iloc[idx] close = float(row["close"]) low = float(row["low"]) # 用于检查是否跌破阻力位 pos = self.positions[ts_code] risk_info = self.position_risk_info.get(ts_code, {}) should_sell = False sell_reason = "" # 检查特殊止损:跌破阻力位 if risk_info and "stop_loss" in risk_info: stop_loss_price = risk_info["stop_loss"] if low <= stop_loss_price: # 最低价跌破止损位 should_sell = True sell_reason = f"跌破阻力位止损({stop_loss_price:.2f})" # 检查特殊止盈:达到盈亏比1:2目标 if not should_sell and risk_info and "take_profit" in risk_info: take_profit_price = risk_info["take_profit"] if close >= take_profit_price: # 收盘价达到止盈位 should_sell = True sell_reason = f"达到止盈位({take_profit_price:.2f})" # 检查持有到期 if not should_sell and pos.days_held >= self.hold_days: should_sell = True sell_reason = f"持有到期({self.hold_days}天)" # 执行卖出 if should_sell: quantity = pos.quantity if quantity > 0: self.sell(ts_code, close, quantity) # 清理风控信息 if ts_code in self.position_risk_info: del self.position_risk_info[ts_code] # logger.info(f"{current_date} 卖出 {ts_code} 原因: {sell_reason}") # 已移除:交易日志 # 2. 处理买入逻辑(回踩信号) if len(self.positions) >= self.max_positions: return # 优化:如果现金过少(不足初始资金1%),直接返回 if self.cash < self.initial_cash * 0.01: return # 使用买入信号索引获取今天有信号的股票列表 signal_stocks = self.buy_signal_index.get(current_date, []) if not signal_stocks: return # 今天没有任何买入信号 candidates = [] # 候选股票列表:[(ts_code, price), ...] # 只遍历有买入信号的股票 for ts_code in signal_stocks: if ts_code in self.positions: continue df = data_dict.get(ts_code) if df is None or df.empty: continue # 使用预计算的日期索引 date_index = self.date_index_dict.get(ts_code) if date_index is None or current_date not in date_index: continue idx = date_index[current_date] # 检查是否有足够的历史数据 if idx < self.N + 2: continue row = df.iloc[idx] # 确认买入信号存在并获取阻力位 if "pullback_signal" in df.columns and row["pullback_signal"]: price = float(row["close"]) # 获取突破的阻力位(从预计算的resistance列中读取) resistance = float(row["resistance"]) if "resistance" in df.columns else price * 0.95 candidates.append((ts_code, price, resistance)) if not candidates: return # 按价格排序(可选,这里简单按顺序买入) remain_slots = self.max_positions - len(self.positions) selected = candidates[:remain_slots] # 依次买入选中的股票 for item in selected: ts_code, price, resistance = item # 解包包含阻力位的元组 # 使用仓位管理器计算买入数量(如果有的话) if self.position_sizer is not None: df = data_dict.get(ts_code) quantity = self.position_sizer.calc_shares( ts_code=ts_code, cash=self.cash, price=price, remain_slots=remain_slots, df=df, ) else: # 默认:按配置的比例分配仓位 target_cash = self.initial_cash * self.position_pct_per_stock actual_cash = min(target_cash, self.cash) # 计算能买多少股(向下取整到100的整数倍) quantity = int(actual_cash // price) quantity = (quantity // 100) * 100 if quantity <= 0: continue # 买入前记录当前现金和仓位 before_cash = self.cash before_positions = len(self.positions) self.buy(ts_code, price, quantity) # 买入成功(现金减少且仓位增加) if self.cash < before_cash and len(self.positions) > before_positions: # 计算并记录止损止盈价位 stop_loss_price = resistance # 止损位 = 阻力位 risk_distance = price - resistance # 风险距离 = 买入价 - 阻力位 take_profit_price = price + risk_distance * 2 # 止盈位 = 买入价 + 风险距离 * 2 (盈亏比1:2) # 保存风控信息 self.position_risk_info[ts_code] = { "resistance": resistance, "buy_price": price, "stop_loss": stop_loss_price, "take_profit": take_profit_price, } # logger.info( # f"{current_date} 买入 {ts_code} 价格:{price:.2f} 阻力位:{resistance:.2f} " # f"止损:{stop_loss_price:.2f}(-{risk_distance:.2f}) 止盈:{take_profit_price:.2f}(+{risk_distance*2:.2f})" # ) # 已移除:交易日志