# -*- coding: utf-8 -*- """ 交易记录查看器 - Flask Web应用 提供交易数据的可视化和分析 """ from flask import Flask, render_template, jsonify import pandas as pd import os from datetime import datetime import json app = Flask(__name__) # 交易记录文件路径 TRADES_DIR = 'trades_records' def load_trade_data(strategy_name): """加载交易记录数据""" file_path = os.path.join(TRADES_DIR, f'{strategy_name}_trades.csv') if not os.path.exists(file_path): return None df = pd.read_csv(file_path) df['date'] = pd.to_datetime(df['date']) return df def calculate_strategy_metrics(df): """计算策略关键指标""" if df is None or df.empty: return {} # 过滤出有盈亏数据的交易(卖出交易) sell_trades = df[df['action'] == 'SELL'].copy() if sell_trades.empty: return { 'total_trades': 0, 'total_pnl': 0, 'total_return': 0, 'win_rate': 0, 'avg_win': 0, 'avg_loss': 0, 'max_win': 0, 'max_loss': 0, 'profit_factor': 0 } # 【关键修复】正确计算总收益和收益率 initial_capital = 100000 # 使用最终资金余额计算(如果有balance列) if 'balance' in df.columns and df['balance'].notna().any(): final_balance = df['balance'].iloc[-1] total_pnl = final_balance - initial_capital total_return = (final_balance / initial_capital - 1) * 100 else: # 降级方案:使用pnl累加 total_pnl = sell_trades['pnl'].sum() total_return = (total_pnl / initial_capital) * 100 # 基本统计 total_trades = len(sell_trades) # 盈亏交易 winning_trades = sell_trades[sell_trades['pnl'] > 0] losing_trades = sell_trades[sell_trades['pnl'] < 0] win_count = len(winning_trades) loss_count = len(losing_trades) win_rate = (win_count / total_trades * 100) if total_trades > 0 else 0 # 【修复】平均盈亏:亏损应该保持负值,取绝对值仅用于显示 avg_win = winning_trades['pnl'].mean() if not winning_trades.empty else 0 avg_loss = losing_trades['pnl'].mean() if not losing_trades.empty else 0 # 保持负值 # 最大盈亏 max_win = winning_trades['pnl'].max() if not winning_trades.empty else 0 max_loss = abs(losing_trades['pnl'].min()) if not losing_trades.empty else 0 # 盈亏比 total_wins = winning_trades['pnl'].sum() if not winning_trades.empty else 0 total_losses = abs(losing_trades['pnl'].sum()) if not losing_trades.empty else 0 profit_factor = (total_wins / total_losses) if total_losses > 0 else 0 return { 'total_trades': total_trades, 'total_pnl': round(total_pnl, 2), 'total_return': round(total_return, 2), 'win_rate': round(win_rate, 2), 'win_count': win_count, 'loss_count': loss_count, 'avg_win': round(avg_win, 2), 'avg_loss': round(abs(avg_loss), 2), # 返回时取绝对值方便显示 'max_win': round(max_win, 2), 'max_loss': round(max_loss, 2), 'profit_factor': round(profit_factor, 2) } def calculate_equity_curve(df): """计算权益曲线""" if df is None or df.empty: return [] initial_capital = 100000 # 【关键修复】优先使用balance列(交易后实际资金) if 'balance' in df.columns and df['balance'].notna().any(): equity_curve = [{'date': '起始', 'equity': initial_capital}] for _, row in df.iterrows(): if pd.notna(row['balance']): equity_curve.append({ 'date': row['date'].strftime('%Y-%m-%d'), 'equity': round(row['balance'], 2) }) return equity_curve else: # 降级方案:使用pnl累加 sell_trades = df[df['action'] == 'SELL'].copy() sell_trades = sell_trades.sort_values('date') equity = initial_capital equity_curve = [{'date': '起始', 'equity': initial_capital}] for _, trade in sell_trades.iterrows(): equity += trade['pnl'] equity_curve.append({ 'date': trade['date'].strftime('%Y-%m-%d'), 'equity': round(equity, 2) }) return equity_curve def calculate_monthly_returns(df): """计算月度收益""" if df is None or df.empty: return [] sell_trades = df[df['action'] == 'SELL'].copy() sell_trades['year'] = sell_trades['date'].dt.year sell_trades['month'] = sell_trades['date'].dt.month monthly = sell_trades.groupby(['year', 'month'])['pnl'].sum().reset_index() monthly['year_month'] = monthly['year'].astype(str) + '-' + monthly['month'].astype(str).str.zfill(2) result = [] for _, row in monthly.iterrows(): result.append({ 'year_month': row['year_month'], 'pnl': round(row['pnl'], 2) }) return result def get_stock_performance(df): """获取各股票表现""" if df is None or df.empty: return [] sell_trades = df[df['action'] == 'SELL'].copy() stock_perf = sell_trades.groupby('ts_code').agg({ 'pnl': ['sum', 'count', 'mean'], 'pnl_pct': 'sum' }).reset_index() stock_perf.columns = ['ts_code', 'total_pnl', 'trade_count', 'avg_pnl', 'total_return'] result = [] for _, row in stock_perf.iterrows(): win_trades = sell_trades[(sell_trades['ts_code'] == row['ts_code']) & (sell_trades['pnl'] > 0)] total_trades = sell_trades[sell_trades['ts_code'] == row['ts_code']] win_rate = len(win_trades) / len(total_trades) * 100 if len(total_trades) > 0 else 0 result.append({ 'ts_code': row['ts_code'], 'total_pnl': round(row['total_pnl'], 2), 'trade_count': int(row['trade_count']), 'avg_pnl': round(row['avg_pnl'], 2), 'total_return': round(row['total_return'] * 100, 2), 'win_rate': round(win_rate, 2) }) return sorted(result, key=lambda x: x['total_pnl'], reverse=True) @app.route('/') def index(): """主页面""" strategies = [] if os.path.exists(TRADES_DIR): for file in os.listdir(TRADES_DIR): if file.endswith('_trades.csv'): strategy_name = file.replace('_trades.csv', '') strategies.append(strategy_name) return render_template('trade_viewer.html', strategies=strategies) @app.route('/api/strategy/') def get_strategy_data(strategy_name): """获取策略数据API""" df = load_trade_data(strategy_name) if df is None: return jsonify({'error': '未找到数据'}), 404 # 计算各类指标 metrics = calculate_strategy_metrics(df) equity_curve = calculate_equity_curve(df) monthly_returns = calculate_monthly_returns(df) stock_performance = get_stock_performance(df) # 交易明细 trades_list = [] for _, row in df.iterrows(): trades_list.append({ 'date': row['date'].strftime('%Y-%m-%d'), 'ts_code': row['ts_code'], 'action': row['action'], 'price': round(row['price'], 2), 'shares': int(row['shares']), 'commission': round(row['commission'], 2), 'reason': row['reason'], 'pnl': round(row['pnl'], 2) if pd.notna(row['pnl']) else None, 'pnl_pct': round(row['pnl_pct'] * 100, 2) if pd.notna(row['pnl_pct']) else None, 'balance': round(row['balance'], 2) if 'balance' in row and pd.notna(row['balance']) else None }) return jsonify({ 'metrics': metrics, 'equity_curve': equity_curve, 'monthly_returns': monthly_returns, 'stock_performance': stock_performance, 'trades': trades_list }) if __name__ == '__main__': app.run(debug=True, port=5001, host='0.0.0.0')