256 lines
9.4 KiB
Python
256 lines
9.4 KiB
Python
"""回测结果Web查看器 - Flask后端服务。
|
||
|
||
提供API接口读取results目录下的CSV文件,供前端展示。
|
||
"""
|
||
from flask import Flask, jsonify, render_template, send_from_directory
|
||
from pathlib import Path
|
||
import pandas as pd
|
||
import json
|
||
from datetime import datetime
|
||
|
||
app = Flask(__name__, static_folder='static', template_folder='templates')
|
||
|
||
# 结果文件路径
|
||
RESULTS_DIR = Path(__file__).parent / "results"
|
||
OPTIMIZATION_DIR = RESULTS_DIR / "optimization"
|
||
TRADES_DIR = RESULTS_DIR / "trades"
|
||
EQUITY_DIR = RESULTS_DIR / "equity"
|
||
|
||
|
||
@app.route('/')
|
||
def index():
|
||
"""主页面。"""
|
||
return render_template('index.html')
|
||
|
||
|
||
@app.route('/api/optimization/list')
|
||
def list_optimization_results():
|
||
"""获取所有参数优化结果文件列表。"""
|
||
try:
|
||
files = list(OPTIMIZATION_DIR.glob("grid_search_*.csv"))
|
||
file_info = []
|
||
|
||
for file in sorted(files, reverse=True):
|
||
# 从文件名提取时间戳
|
||
timestamp_str = file.stem.replace("grid_search_", "")
|
||
try:
|
||
timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
|
||
time_display = timestamp.strftime("%Y-%m-%d %H:%M:%S")
|
||
except:
|
||
time_display = timestamp_str
|
||
|
||
# 读取文件统计信息
|
||
df = pd.read_csv(file)
|
||
file_info.append({
|
||
"filename": file.name,
|
||
"timestamp": timestamp_str,
|
||
"time_display": time_display,
|
||
"num_results": len(df),
|
||
"size": file.stat().st_size
|
||
})
|
||
|
||
return jsonify({"success": True, "files": file_info})
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)})
|
||
|
||
|
||
@app.route('/api/optimization/<filename>')
|
||
def get_optimization_result(filename):
|
||
"""获取指定参数优化结果详情。"""
|
||
try:
|
||
file_path = OPTIMIZATION_DIR / filename
|
||
if not file_path.exists():
|
||
return jsonify({"success": False, "error": "文件不存在"})
|
||
|
||
df = pd.read_csv(file_path)
|
||
|
||
# 转换为JSON格式
|
||
data = df.to_dict(orient='records')
|
||
|
||
# 计算统计信息
|
||
stats = {
|
||
"total_combinations": len(df),
|
||
"best_sharpe": float(df['sharpe'].max()) if 'sharpe' in df.columns else None,
|
||
"best_return": float(df['total_return'].max()) if 'total_return' in df.columns else None,
|
||
"worst_drawdown": float(df['max_drawdown'].min()) if 'max_drawdown' in df.columns else None,
|
||
}
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": data,
|
||
"stats": stats,
|
||
"columns": list(df.columns)
|
||
})
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)})
|
||
|
||
|
||
@app.route('/api/trades/list')
|
||
def list_trade_results():
|
||
"""获取所有交易记录文件列表。"""
|
||
try:
|
||
files = list(TRADES_DIR.glob("*.csv"))
|
||
file_info = []
|
||
|
||
for file in sorted(files, reverse=True):
|
||
# 从文件名提取信息
|
||
parts = file.stem.split("_")
|
||
timestamp_str = f"{parts[0]}_{parts[1]}"
|
||
try:
|
||
timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
|
||
time_display = timestamp.strftime("%Y-%m-%d %H:%M:%S")
|
||
except:
|
||
time_display = timestamp_str
|
||
|
||
# 提取策略和参数信息
|
||
strategy_params = "_".join(parts[2:])
|
||
|
||
# 读取文件统计信息
|
||
df = pd.read_csv(file)
|
||
|
||
# 计算胜率和盈亏
|
||
total_trades = len(df)
|
||
win_trades = df['is_win'].sum() if 'is_win' in df.columns else 0
|
||
total_profit = df['profit_amount'].sum() if 'profit_amount' in df.columns else 0
|
||
|
||
file_info.append({
|
||
"filename": file.name,
|
||
"timestamp": timestamp_str,
|
||
"time_display": time_display,
|
||
"strategy_params": strategy_params,
|
||
"total_trades": total_trades,
|
||
"win_rate": f"{win_trades/total_trades*100:.2f}%" if total_trades > 0 else "0%",
|
||
"total_profit": f"{total_profit:.2f}",
|
||
"size": file.stat().st_size
|
||
})
|
||
|
||
return jsonify({"success": True, "files": file_info})
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)})
|
||
|
||
|
||
@app.route('/api/trades/<filename>')
|
||
def get_trade_result(filename):
|
||
"""获取指定交易记录详情。"""
|
||
try:
|
||
file_path = TRADES_DIR / filename
|
||
if not file_path.exists():
|
||
return jsonify({"success": False, "error": "文件不存在"})
|
||
|
||
df = pd.read_csv(file_path)
|
||
|
||
# 转换为JSON格式
|
||
data = df.to_dict(orient='records')
|
||
|
||
# 计算详细统计
|
||
total_trades = len(df)
|
||
win_trades = df[df['is_win'] == True] if 'is_win' in df.columns else pd.DataFrame()
|
||
loss_trades = df[df['is_win'] == False] if 'is_win' in df.columns else pd.DataFrame()
|
||
|
||
stats = {
|
||
"total_trades": total_trades,
|
||
"win_count": len(win_trades),
|
||
"loss_count": len(loss_trades),
|
||
"win_rate": f"{len(win_trades)/total_trades*100:.2f}%" if total_trades > 0 else "0%",
|
||
"total_profit": float(df['profit_amount'].sum()) if 'profit_amount' in df.columns else 0,
|
||
"avg_profit": float(df['profit_amount'].mean()) if 'profit_amount' in df.columns else 0,
|
||
"max_profit": float(df['profit_amount'].max()) if 'profit_amount' in df.columns else 0,
|
||
"max_loss": float(df['profit_amount'].min()) if 'profit_amount' in df.columns else 0,
|
||
"avg_win": float(win_trades['profit_amount'].mean()) if len(win_trades) > 0 else 0,
|
||
"avg_loss": float(loss_trades['profit_amount'].mean()) if len(loss_trades) > 0 else 0,
|
||
}
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": data,
|
||
"stats": stats,
|
||
"columns": list(df.columns)
|
||
})
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)})
|
||
|
||
|
||
@app.route('/api/equity/list')
|
||
def list_equity_results():
|
||
"""获取所有资金曲线文件列表。"""
|
||
try:
|
||
files = list(EQUITY_DIR.glob("*.csv"))
|
||
file_info = []
|
||
|
||
for file in sorted(files):
|
||
# 读取文件统计信息
|
||
df = pd.read_csv(file)
|
||
|
||
initial_asset = df['total_asset'].iloc[0] if len(df) > 0 else 0
|
||
final_asset = df['total_asset'].iloc[-1] if len(df) > 0 else 0
|
||
total_return = (final_asset - initial_asset) / initial_asset if initial_asset > 0 else 0
|
||
|
||
file_info.append({
|
||
"filename": file.name,
|
||
"strategy": file.stem.replace("_equity", ""),
|
||
"initial_asset": float(initial_asset),
|
||
"final_asset": float(final_asset),
|
||
"total_return": f"{total_return*100:.2f}%",
|
||
"num_days": len(df),
|
||
"size": file.stat().st_size
|
||
})
|
||
|
||
return jsonify({"success": True, "files": file_info})
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)})
|
||
|
||
|
||
@app.route('/api/equity/<filename>')
|
||
def get_equity_result(filename):
|
||
"""获取指定资金曲线详情。"""
|
||
try:
|
||
file_path = EQUITY_DIR / filename
|
||
if not file_path.exists():
|
||
return jsonify({"success": False, "error": "文件不存在"})
|
||
|
||
df = pd.read_csv(file_path)
|
||
|
||
# 转换为JSON格式(只取部分数据避免过大)
|
||
data = df.to_dict(orient='records')
|
||
|
||
# 计算统计信息
|
||
initial_asset = df['total_asset'].iloc[0] if len(df) > 0 else 0
|
||
final_asset = df['total_asset'].iloc[-1] if len(df) > 0 else 0
|
||
max_asset = df['total_asset'].max() if len(df) > 0 else 0
|
||
min_asset = df['total_asset'].min() if len(df) > 0 else 0
|
||
|
||
# 计算最大回撤
|
||
cummax = df['total_asset'].cummax()
|
||
drawdown = (df['total_asset'] - cummax) / cummax
|
||
max_drawdown = drawdown.min()
|
||
|
||
stats = {
|
||
"initial_asset": float(initial_asset),
|
||
"final_asset": float(final_asset),
|
||
"total_return": float((final_asset - initial_asset) / initial_asset) if initial_asset > 0 else 0,
|
||
"max_asset": float(max_asset),
|
||
"min_asset": float(min_asset),
|
||
"max_drawdown": float(max_drawdown),
|
||
"num_days": len(df)
|
||
}
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": data,
|
||
"stats": stats,
|
||
"columns": list(df.columns)
|
||
})
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)})
|
||
|
||
|
||
if __name__ == '__main__':
|
||
print("=" * 60)
|
||
print("回测结果Web查看器")
|
||
print("=" * 60)
|
||
print(f"正在启动Flask服务...")
|
||
print(f"结果目录: {RESULTS_DIR}")
|
||
print(f"请在浏览器打开: http://localhost:5000")
|
||
print("=" * 60)
|
||
app.run(debug=True, host='0.0.0.0', port=5000)
|