243 lines
10 KiB
Python
243 lines
10 KiB
Python
"""OCZ策略(One Candle Zone突破回踩策略)。
|
||
|
||
策略逻辑:
|
||
1. 寻找关键阻力位(最近N日最高价)
|
||
2. 等待大阳线突破(实体占比>B%,涨幅>R%,放量>V1倍)
|
||
3. 首次回踩到阻力位(±TOL容错)且缩量时买入
|
||
4. 持有期间:
|
||
- 止损:跌破阻力位立即卖出
|
||
- 止盈:达到盈亏比1:2的目标价位立即卖出
|
||
- 或持有指定天数后卖出
|
||
|
||
特殊风控说明:
|
||
- 止损位 = 突破的阻力位(买入时记录)
|
||
- 止盈位 = 买入价 + (买入价 - 止损位) * 2
|
||
- 例如:阻力位100元,买入价105元,则止损100元(-5元),止盈115元(+10元)
|
||
|
||
参数说明:
|
||
- N: 阻力回望长度(默认30日)
|
||
- B: 实体占比门槛(默认60%)
|
||
- V1: 放量倍数(默认1.5倍)
|
||
- TOL: 回踩容错百分比(默认1.5%)
|
||
- R: 收盘涨幅门槛(默认6%)
|
||
- hold_days: 持有天数(默认5天)
|
||
- volatility_min: 最小波动率(默认2.5%)
|
||
- volatility_max: 最大波动率(默认8.0%)
|
||
"""
|
||
from typing import Dict, List, Optional
|
||
|
||
import pandas as pd
|
||
|
||
from strategies.base_strategy import BaseStrategy
|
||
from utils.logger import setup_logger
|
||
|
||
logger = setup_logger(__name__)
|
||
|
||
|
||
class OczStrategy(BaseStrategy):
|
||
"""OCZ突破回踩策略。"""
|
||
|
||
def __init__(
|
||
self,
|
||
initial_cash: float,
|
||
N: int = 30, # 阻力回望长度
|
||
B: float = 60.0, # 实体占比门槛(%)
|
||
V1: float = 1.5, # 放量倍数
|
||
TOL: float = 1.5, # 回踩容错(%)
|
||
R: float = 4.0, # 收盘涨幅门槛(%)
|
||
hold_days: int = 5, # 持有天数
|
||
volatility_min: float = 2.5, # 最小波动率(%)
|
||
volatility_max: float = 8.0, # 最大波动率(%)
|
||
max_positions: int = 2,
|
||
position_pct_per_stock: float = 0.2,
|
||
stop_loss: Optional[object] = None,
|
||
take_profit: Optional[object] = None,
|
||
position_sizer: Optional[object] = None,
|
||
date_index_dict: Optional[Dict[str, Dict[str, int]]] = None,
|
||
buy_signal_index: Optional[Dict[str, List[str]]] = None,
|
||
):
|
||
"""初始化OCZ策略。"""
|
||
super().__init__(
|
||
initial_cash=initial_cash,
|
||
max_positions=max_positions,
|
||
stop_loss=stop_loss,
|
||
take_profit=take_profit,
|
||
position_sizer=position_sizer,
|
||
date_index_dict=date_index_dict,
|
||
buy_signal_index=buy_signal_index,
|
||
)
|
||
self.N = int(N)
|
||
self.B = float(B)
|
||
self.V1 = float(V1)
|
||
self.TOL = float(TOL)
|
||
self.R = float(R)
|
||
self.hold_days = int(hold_days)
|
||
self.volatility_min = float(volatility_min)
|
||
self.volatility_max = float(volatility_max)
|
||
self.position_pct_per_stock = float(position_pct_per_stock)
|
||
|
||
# 记录每只股票的突破信息(用于特殊止损止盈)
|
||
# 格式: {ts_code: {"resistance": 阻力位, "buy_price": 买入价, "stop_loss": 止损价, "take_profit": 止盈价}}
|
||
self.position_risk_info: Dict[str, Dict[str, float]] = {}
|
||
|
||
logger.info(
|
||
f"OczStrategy 初始化参数: N={self.N}, B={self.B}, V1={self.V1}, "
|
||
f"TOL={self.TOL}, R={self.R}, hold_days={self.hold_days}, "
|
||
f"max_positions={max_positions}, position_pct_per_stock={self.position_pct_per_stock}"
|
||
)
|
||
|
||
def on_bar(self, current_date: str, data_dict: Dict[str, pd.DataFrame]) -> None:
|
||
"""每个交易日的交易决策逻辑。"""
|
||
# 1. 先处理卖出逻辑(止损/止盈/持有到期)
|
||
for ts_code in list(self.positions.keys()):
|
||
df = data_dict.get(ts_code)
|
||
if df is None or df.empty:
|
||
continue
|
||
|
||
# 使用预计算的日期索引
|
||
date_index = self.date_index_dict.get(ts_code)
|
||
if date_index is None or current_date not in date_index:
|
||
continue
|
||
|
||
idx = date_index[current_date]
|
||
row = df.iloc[idx]
|
||
close = float(row["close"])
|
||
low = float(row["low"]) # 用于检查是否跌破阻力位
|
||
|
||
pos = self.positions[ts_code]
|
||
risk_info = self.position_risk_info.get(ts_code, {})
|
||
|
||
should_sell = False
|
||
sell_reason = ""
|
||
|
||
# 检查特殊止损:跌破阻力位
|
||
if risk_info and "stop_loss" in risk_info:
|
||
stop_loss_price = risk_info["stop_loss"]
|
||
if low <= stop_loss_price: # 最低价跌破止损位
|
||
should_sell = True
|
||
sell_reason = f"跌破阻力位止损({stop_loss_price:.2f})"
|
||
|
||
# 检查特殊止盈:达到盈亏比1:2目标
|
||
if not should_sell and risk_info and "take_profit" in risk_info:
|
||
take_profit_price = risk_info["take_profit"]
|
||
if close >= take_profit_price: # 收盘价达到止盈位
|
||
should_sell = True
|
||
sell_reason = f"达到止盈位({take_profit_price:.2f})"
|
||
|
||
# 检查持有到期
|
||
if not should_sell and pos.days_held >= self.hold_days:
|
||
should_sell = True
|
||
sell_reason = f"持有到期({self.hold_days}天)"
|
||
|
||
# 执行卖出
|
||
if should_sell:
|
||
quantity = pos.quantity
|
||
if quantity > 0:
|
||
self.sell(ts_code, close, quantity)
|
||
# 清理风控信息
|
||
if ts_code in self.position_risk_info:
|
||
del self.position_risk_info[ts_code]
|
||
# logger.info(f"{current_date} 卖出 {ts_code} 原因: {sell_reason}") # 已移除:交易日志
|
||
|
||
# 2. 处理买入逻辑(回踩信号)
|
||
if len(self.positions) >= self.max_positions:
|
||
return
|
||
|
||
# 优化:如果现金过少(不足初始资金1%),直接返回
|
||
if self.cash < self.initial_cash * 0.01:
|
||
return
|
||
|
||
# 使用买入信号索引获取今天有信号的股票列表
|
||
signal_stocks = self.buy_signal_index.get(current_date, [])
|
||
if not signal_stocks:
|
||
return # 今天没有任何买入信号
|
||
|
||
candidates = [] # 候选股票列表:[(ts_code, price), ...]
|
||
|
||
# 只遍历有买入信号的股票
|
||
for ts_code in signal_stocks:
|
||
if ts_code in self.positions:
|
||
continue
|
||
|
||
df = data_dict.get(ts_code)
|
||
if df is None or df.empty:
|
||
continue
|
||
|
||
# 使用预计算的日期索引
|
||
date_index = self.date_index_dict.get(ts_code)
|
||
if date_index is None or current_date not in date_index:
|
||
continue
|
||
|
||
idx = date_index[current_date]
|
||
|
||
# 检查是否有足够的历史数据
|
||
if idx < self.N + 2:
|
||
continue
|
||
|
||
row = df.iloc[idx]
|
||
|
||
# 确认买入信号存在并获取阻力位
|
||
if "pullback_signal" in df.columns and row["pullback_signal"]:
|
||
price = float(row["close"])
|
||
# 获取突破的阻力位(从预计算的resistance列中读取)
|
||
resistance = float(row["resistance"]) if "resistance" in df.columns else price * 0.95
|
||
candidates.append((ts_code, price, resistance))
|
||
|
||
if not candidates:
|
||
return
|
||
|
||
# 按价格排序(可选,这里简单按顺序买入)
|
||
remain_slots = self.max_positions - len(self.positions)
|
||
selected = candidates[:remain_slots]
|
||
|
||
# 依次买入选中的股票
|
||
for item in selected:
|
||
ts_code, price, resistance = item # 解包包含阻力位的元组
|
||
# 使用仓位管理器计算买入数量(如果有的话)
|
||
if self.position_sizer is not None:
|
||
df = data_dict.get(ts_code)
|
||
quantity = self.position_sizer.calc_shares(
|
||
ts_code=ts_code,
|
||
cash=self.cash,
|
||
price=price,
|
||
remain_slots=remain_slots,
|
||
df=df,
|
||
)
|
||
else:
|
||
# 默认:按配置的比例分配仓位
|
||
target_cash = self.initial_cash * self.position_pct_per_stock
|
||
actual_cash = min(target_cash, self.cash)
|
||
|
||
# 计算能买多少股(向下取整到100的整数倍)
|
||
quantity = int(actual_cash // price)
|
||
quantity = (quantity // 100) * 100
|
||
|
||
if quantity <= 0:
|
||
continue
|
||
|
||
# 买入前记录当前现金和仓位
|
||
before_cash = self.cash
|
||
before_positions = len(self.positions)
|
||
|
||
self.buy(ts_code, price, quantity)
|
||
|
||
# 买入成功(现金减少且仓位增加)
|
||
if self.cash < before_cash and len(self.positions) > before_positions:
|
||
# 计算并记录止损止盈价位
|
||
stop_loss_price = resistance # 止损位 = 阻力位
|
||
risk_distance = price - resistance # 风险距离 = 买入价 - 阻力位
|
||
take_profit_price = price + risk_distance * 2 # 止盈位 = 买入价 + 风险距离 * 2 (盈亏比1:2)
|
||
|
||
# 保存风控信息
|
||
self.position_risk_info[ts_code] = {
|
||
"resistance": resistance,
|
||
"buy_price": price,
|
||
"stop_loss": stop_loss_price,
|
||
"take_profit": take_profit_price,
|
||
}
|
||
|
||
# logger.info(
|
||
# f"{current_date} 买入 {ts_code} 价格:{price:.2f} 阻力位:{resistance:.2f} "
|
||
# f"止损:{stop_loss_price:.2f}(-{risk_distance:.2f}) 止盈:{take_profit_price:.2f}(+{risk_distance*2:.2f})"
|
||
# ) # 已移除:交易日志
|