新增尾盘数据统计程序-完整可运行

This commit is contained in:
2025-02-13 12:42:21 +08:00
parent 0f65d8a33f
commit 6b8e1978dc
2 changed files with 139 additions and 51 deletions

View File

@@ -1,4 +1,6 @@
import os
from numba import types # 正确导入
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
@@ -58,35 +60,67 @@ def calculate_technical_indicators(close, high, low, volume,
@jit(nopython=True)
def generate_trading_signals(close, open_, high, low, volume, macd, signal, atr, threshold, volatility_window):
"""生成交易信号"""
def generate_trading_signals(close, open_, high, low, volume,
macd, signal, atr,
threshold, volatility_window):
# 添加类型断言确保参数类型正确
n = len(close)
signals = np.zeros(n, dtype=np.bool_)
for i in range(3, n):
# 新增开盘价校验
if open_[i] <= 0 or close[i - 1] <= 0:
continue
# 基础K线形态条件
is_red = close[i] > open_[i]
# ========== 基础K线形态条件 ==========
# 条件1: 阳线且最高价超过前收盘1.005倍
cond1 = is_red and (high[i] / close[i - 1] > 1.005)
# 条件2: 实体大于上下影线
upper_shadow = high[i] - max(close[i], open_[i])
lower_shadow = min(close[i], open_[i]) - low[i]
body_size = abs(close[i] - open_[i])
cond1 = is_red and (high[i] / close[i - 1] > 1.005)
cond2 = (body_size > upper_shadow) and (body_size > lower_shadow)
cond3 = (high[i] / low[i] < 1.12) and (high[i] / open_[i] > 1.036)
cond4 = close[i] < close[i - 1] * 1.10 # 排除涨停
# 技术指标条件
cond5 = atr[i] > np.mean(atr[i - 4:i + 1]) * 0.8
# 条件3: 高低价比值<1.12且最高开盘比>1.036
if open_[i] <= 0: # 二次校验开盘价
cond3 = False
else:
cond3 = (high[i] / low[i] < 1.12) and (high[i] / open_[i] > 1.036)
# 条件4: 排除涨停
cond4 = close[i] < close[i - 1] * 1.10
# ========== 技术指标条件 ==========
# 条件5: ATR超过近期均值80%
cond5 = atr[i] > np.mean(atr[max(0, i - 4):i + 1]) * 0.8
# 条件6: MACD动量增强
cond6 = (macd[i] - signal[i]) > (macd[i - 1] - signal[i - 1]) * 1.2
# 波动率条件
llv = np.min(low[max(0, i - volatility_window + 1):i + 1])
hhv = np.max(high[max(0, i - volatility_window + 1):i + 1])
# ========== 波动率条件 ==========
# 新增分母校验
window_start = max(0, i - volatility_window + 1)
hhv = np.max(high[window_start:i + 1])
if hhv <= 0: # 确保最高价>0
cond7 = False
else:
llv = np.min(low[window_start:i + 1])
cond7 = (llv / hhv) < threshold
# 量能条件
# ========== 量能条件 ==========
# 条件8: 成交量低于近期均值
vol_cond1 = volume[i] < np.mean(volume[max(0, i - 10):i])
vol_cond2 = volume[i] < np.min(volume[max(0, i - 20):i - 1]) * 3.5
# 条件9: 成交量低于近期最低值的3.5倍
vol_window = volume[max(0, i - 20):i - 1]
if len(vol_window) == 0:
vol_cond2 = False
else:
vol_min = np.min(vol_window)
vol_cond2 = volume[i] < vol_min * 3.5
signals[i] = cond1 & cond2 & cond3 & cond4 & cond5 & cond6 & cond7 & vol_cond1 & vol_cond2
@@ -131,6 +165,9 @@ def load_index_data(index_path):
# index_data.rename(columns={date_col: 'trade_date'}, inplace=True)
index_data.sort_values(date_col, inplace=True)
# 过滤2022年及以后的数据
index_data = index_data[index_data[date_col] >= pd.Timestamp('2022-01-01')]
logging.info(
f"指数数据加载成功,时间范围: {index_data['trade_date'].min().date()}{index_data['trade_date'].max().date()}")
return index_data
@@ -140,11 +177,27 @@ def load_index_data(index_path):
def process_stock_file(file_path, index_data):
"""处理单个股票文件"""
try:
# 加载并预处理数据
df = pd.read_csv(file_path, sep='\t',
usecols=['trade_date', 'open', 'high', 'low', 'close', 'vol'])
# 增加列存在性检查
required_cols = ['trade_date', 'open', 'high', 'low', 'close', 'vol']
df = pd.read_csv(file_path, sep='\t', usecols=required_cols)
# 严格数据过滤(新增多条件校验)
df = df[
(df['open'] > 0) &
(df['close'] > 0) &
(df['high'] > 0) &
(df['low'] > 0) &
(df['high'] >= df['low']) & # 确保最高价>=最低价
(df['close'] >= df['low']) & # 确保收盘价>=最低价
(df['close'] <= df['high']) & # 确保收盘价<=最高价
(df['vol'] > 0) # 新增成交量校验
]
# 如果过滤后无数据则跳过
if df.empty:
logging.warning(f"文件 {os.path.basename(file_path)} 无有效数据")
return None
df = df.rename(columns={'vol': 'volume'})
df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d', errors='coerce')
df = df.dropna(subset=['trade_date']).sort_values('trade_date')
@@ -154,6 +207,9 @@ def process_stock_file(file_path, index_data):
end_date = index_data['trade_date'].max()
df = df[(df['trade_date'] >= start_date) & (df['trade_date'] <= end_date)]
# 过滤2022年及以后的数据
df = df[df['trade_date'] >= pd.Timestamp('2022-01-01')]
# if len(df) < StrategyConfig.MIN_TRADE_DAYS:
# return None
@@ -169,19 +225,27 @@ def process_stock_file(file_path, index_data):
volatility_window=VOLATILITY_WINDOW
)
# # 获取市场状态
# market_condition = get_market_condition(index_data)
# threshold = StrategyConfig.THRESHOLDS[market_condition]
# 获取市场状态和对应阈值
market_condition = get_market_condition(index_data)
threshold_map = {
'bull': BULL_THRESHOLD,
'bear': BEAR_THRESHOLD,
'neutral': NEUTRAL_THRESHOLD
}
threshold = threshold_map.get(market_condition, NEUTRAL_THRESHOLD)
# 生成信号
# 生成信号时使用动态阈值
signals = generate_trading_signals(
close, df['open'].values, high, low, volume,
macd, signal, atr, threshold=BULL_THRESHOLD,
macd, signal, atr,
threshold=threshold, # 使用动态阈值
volatility_window=VOLATILITY_WINDOW
)
df['signal'] = signals
return os.path.basename(file_path).split('_')[0], df
# 获取股票代码
stock_code = os.path.basename(file_path).split('_')[0]
return stock_code, df
except Exception as e:
logging.error(f"处理文件 {os.path.basename(file_path)} 失败: {str(e)}")
@@ -193,31 +257,54 @@ def backtest_strategy(all_data, index_data):
"""执行动态持仓周期回测"""
results = []
def backtest_strategy(all_data, index_data):
results = []
for stock_code, data in all_data.items():
if data is None or 'signal' not in data.columns:
continue
try:
signals = data[data['signal']]
for idx in signals.index:
# 动态获取市场状态
current_date = data.iloc[idx]['trade_date']
market_condition = get_market_condition(index_data)
holding_days = HOLDING_DAYS_MAP.get(market_condition, 2)
# 获取关键价格数据
entry = data.iloc[idx]
next_day_data = data.iloc[idx + 1] if idx + 1 < len(data) else None
# 计算退出时间
exit_idx = idx + holding_days + 1 # 包含买入当天
if exit_idx >= len(data):
# 严格校验数据有效性(新增校验点)
if next_day_data is None or \
entry['close'] <= 0 or \
next_day_data['open'] <= 0:
continue
# 计算收益
entry_price = data.loc[idx, 'close']
exit_prices = data.iloc[idx + 1:exit_idx]['close']
# 动态持仓计算
current_date = entry['trade_date']
historical_index = index_data[index_data['trade_date'] <= current_date]
market_condition = get_market_condition(historical_index)
holding_days = HOLDING_DAYS_MAP.get(market_condition, 2)
# 计算收益区间(新增边界校验)
exit_idx = idx + holding_days + 1
if exit_idx >= len(data):
exit_idx = len(data) - 1
exit_data = data.iloc[idx + 1:exit_idx + 1]
# 计算收益率增加try-except保护
try:
entry_price = entry['close']
exit_prices = exit_data['close']
max_profit = (exit_prices.max() - entry_price) / entry_price
max_loss = (exit_prices.min() - entry_price) / entry_price
final_return = (exit_prices.iloc[-1] - entry_price) / entry_price
# 格式化保留小数点后4位
final_return = round(final_return, 4)
max_profit = round(max_profit, 4)
max_loss = round(max_loss, 4)
except ZeroDivisionError:
logging.error(f"零除错误 @ {stock_code} {current_date}")
continue
results.append({
'code': stock_code,
'date': current_date.strftime('%Y-%m-%d'),
@@ -227,8 +314,9 @@ def backtest_strategy(all_data, index_data):
'max_profit': max_profit,
'max_loss': max_loss
})
return pd.DataFrame(results) if results else pd.DataFrame()
except Exception as e:
logging.error(f"处理股票 {stock_code} 时出错: {str(e)}")
return pd.DataFrame(results)
def analyze_results(results_df):
@@ -304,7 +392,7 @@ if __name__ == "__main__":
for future in tqdm(as_completed(futures), total=len(futures)):
result = future.result()
if result:
code, data, _ = result
code, data = result
all_data[code] = data
if not all_data: