import os import pandas as pd import numpy as np from datetime import datetime, timedelta # 回测配置 DATA_PATH = r'D:\gp_data\day' # 日线数据存放路径 START_DATE = '20150101' # 回测开始日期 END_DATE = '20231231' # 回测结束日期 STOP_LOSS = 0.03 # 止损比例(3%) COMMISSION = 0.0003 # 单边交易佣金(万分之三) # 策略信号计算函数 def calculate_signal(df): """计算土信号""" df = df.copy() # 计算基础参数 df['X_1'] = df['low'].rolling(10, min_periods=1).min() # 10日最低价 df['X_2'] = df['high'].rolling(25, min_periods=1).max() # 25日最高价 df['X_5'] = 3.5 # 固定阈值 # 标准化波动率(X_6) df['X_6'] = ((df['close'] - df['X_1']) / (df['X_2'] - df['X_1']) * 4) df['X_6'] = df['X_6'].ewm(span=4, adjust=False).mean().shift(1) # 4日EMA并滞后1日 # 波动率过滤(X_7) df['cross_flag'] = (df['X_6'] > df['X_5']).astype(int) # 当前是否突破 df['cross_flag_shift'] = df['cross_flag'].shift(1) # 前一日状态 df['cross_event'] = (df['cross_flag'] > df['cross_flag_shift']).astype(int) # 上穿事件 df['X_7'] = df['cross_event'].rolling(5).max().replace(1, 0).shift(1) # 过去5日有上穿则为0 # 动量计算(X_43) df['X_10'] = (df['close'].shift(1) - df['close'].shift(2)) / df['close'].shift(2) * 100 df['X_43'] = df['X_10'].rolling(2).sum() * df['X_7'] # 复合指标(简化版X_39) ema_open_12 = df['open'].ewm(span=12, adjust=False).mean() df['X_15'] = df['X_7'] * (df['open'] - ema_open_12) / ema_open_12 * 200 # 斜率计算 def calc_slope(series): if len(series) < 2: return np.nan return np.polyfit([0, 1], series.values, 1)[0] df['X_41'] = df['X_15'].rolling(2).apply(calc_slope, raw=False) df['X_42'] = df['X_6'].rolling(2).apply(calc_slope, raw=False) # 生成最终信号 df['signal'] = np.where( (df['X_43'] > 8) & (~df['ts_code'].str.startswith(('313', '314', '315'))) & # 排除ST (df['vol'] > 0) & # 排除零成交量 (df['X_7'] == 1), # 波动率过滤 (df['X_41'] * 0.02 - df['X_42'] * df['X_7']) * df['X_7'], np.nan ) return df # 回测引擎核心逻辑 def backtest_strategy(): all_trades = [] # 遍历所有股票文件 for filename in os.listdir(DATA_PATH): if not filename.endswith('_daily_data.txt'): continue # 读取数据 code = filename.split('_')[0] df = pd.read_csv( os.path.join(DATA_PATH, filename), sep='\t', parse_dates=['trade_date'], dtype={'ts_code': str} ) # 过滤日期范围 df = df[(df['trade_date'] >= pd.to_datetime(START_DATE)) & (df['trade_date'] <= pd.to_datetime(END_DATE))] if df.empty: continue # 按日期排序并计算信号 df = df.sort_values('trade_date') df = calculate_signal(df) # 提取有效信号 signals = df[df['signal'].notna()] # 逐笔交易处理 for idx, row in signals.iterrows(): buy_date = row['trade_date'] buy_price = row['open'] # 获取次日数据 next_day = df[df['trade_date'] > buy_date].head(1) if next_day.empty: continue # 无次日数据(如停牌) sell_date = next_day['trade_date'].values[0] sell_high = next_day['high'].values[0] sell_low = next_day['low'].values[0] # 计算止损价 stop_price = buy_price * (1 - STOP_LOSS) # 确定卖出价格 if sell_low < stop_price: sell_price = stop_price # 触发止损 else: sell_price = sell_high # 按次日最高价卖出 # 计算收益率(扣除双边佣金) ret = (sell_price / buy_price - 1) - 2 * COMMISSION success = 1 if ret > 0 else 0 # 记录交易 all_trades.append({ 'code': code, 'buy_date': buy_date.strftime('%Y%m%d'), 'sell_date': sell_date.strftime('%Y%m%d'), 'buy_price': buy_price, 'sell_price': sell_price, 'return': ret, 'success': success, 'hold_days': 1 }) # 绩效分析 if not all_trades: print("未产生任何交易信号") return results = pd.DataFrame(all_trades) # 关键指标计算 total_trades = len(results) win_rate = results['success'].mean() avg_return = results['return'].mean() annualized_return = avg_return * 240 # 假设年240个交易日 # 输出结果 print(f"回测期间:{START_DATE} 至 {END_DATE}") print(f"总交易次数:{total_trades}") print(f"胜率:{win_rate:.2%}") print(f"单次平均收益率:{avg_return:.2%}") print(f"年化收益率(估算):{annualized_return:.2%}") # 保存详细结果 results.to_csv('trading_records.csv', index=False) print("详细交易记录已保存至 trading_records.csv") if __name__ == '__main__': backtest_strategy()