Files
real_view/strategy_monitor.py
2025-07-28 12:15:45 +08:00

166 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
个股监控 - 监控个股状态
"""
import tushare as ts
import time
import logging
from typing import Dict, List, Optional
from quote_manager import QuoteManager # 新增导入
from logger_utils_new import setup_logger, LOG_STYLES # 新增导入
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# 替换原有的日志配置
# 修改日志配置部分
import logging
from logger_utils_new import setup_logger
# 确保logger被正确初始化
logger = setup_logger('strategy_monitor') # 使用logger_utils_new中的setup_logger
# 如果setup_logger不可用使用基本的日志配置
if logger is None:
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('strategy_monitor')
class StockMonitor:
def __init__(self, token: str):
"""初始化监控器"""
self.quote_manager = QuoteManager()
ts.set_token(token)
self.pro = ts.pro_api()
# 监控配置
self.monitor_interval = 5
self.stock_list = self._load_and_filter_stocks(r"D:\gp_data\history\20250714.txt")
# 策略参数
self.buy_threshold = 1.02
self.sell_threshold = 0.98
self.min_volume = 100000
def _load_and_filter_stocks(self, file_path: str) -> List[str]:
"""从文件加载股票列表并过滤ST和科创板股票"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
stocks = [line.strip() for line in f if line.strip()]
# 过滤ST和科创板股票
filtered_stocks = []
for stock in stocks:
# 排除ST/*ST股票
if stock.startswith(('ST', '*ST')):
continue
# 排除科创板股票(代码以688开头)
if stock.split('.')[0].startswith('688'):
continue
filtered_stocks.append(stock)
logger.info(f"加载并过滤后股票数量: {len(filtered_stocks)}")
return filtered_stocks
except Exception as e:
if logger: # 添加检查
logger.error(f"加载股票列表失败: {str(e)}")
else:
print(f"加载股票列表失败: {str(e)}") # 后备方案
return [
"600519.SH",
"000858.SZ",
"601318.SH",
"600036.SH",
"000333.SZ"
]
def get_realtime_quotes(self) -> Optional[Dict]:
"""使用quote_manager获取实时行情数据"""
try:
# 使用get_realtime_quotes方法替代get_quote
quotes_data = self.quote_manager.get_realtime_quotes(self.stock_list)
if not quotes_data:
return None
# 转换数据格式以保持兼容
quotes = {}
for code, row in quotes_data.items():
# 确保row是字典类型
if not isinstance(row, dict):
row = row.to_dict() if hasattr(row, 'to_dict') else {}
quotes[code] = {
'price': float(row.get('PRICE', 0)),
'avg_price': (float(row.get('OPEN', 0)) + float(row.get('PRE_CLOSE', 0))) / 2,
'volume': float(row.get('VOLUME', 0))
}
return quotes
except Exception as e:
logger.error(f"获取行情失败: {str(e)}")
return None
def analyze_signals(self, quotes: Dict) -> List[Dict]:
"""分析股票信号"""
signals = []
for code, data in quotes.items():
try:
current_price = float(data['price'])
avg_price = float(data['avg_price'])
volume = float(data['volume'])
signal = 'HOLD'
if current_price > avg_price * self.buy_threshold and volume > self.min_volume:
signal = 'BUY'
elif current_price < avg_price * self.sell_threshold and volume > self.min_volume / 2:
signal = 'SELL'
signals.append({
'code': code,
'price': current_price,
'avg_price': avg_price,
'volume': volume,
'signal': signal
})
except Exception as e:
logger.error(f"分析股票{code}时出错: {str(e)}")
return signals
def start_monitoring(self):
"""开始监控"""
logger.info("启动股票监控系统...")
logger.info(f"监控股票: {', '.join(self.stock_list)}")
try:
while True:
quotes = self.get_realtime_quotes()
if quotes:
signals = self.analyze_signals(quotes)
for signal in signals:
if signal['signal'] != 'HOLD':
logger.info(
f"信号: {signal['code']} | "
f"价格: {signal['price']:.2f} | "
f"均线: {signal['avg_price']:.2f} | "
f"成交量: {signal['volume']} | "
f"操作: {signal['signal']}"
)
time.sleep(self.monitor_interval)
except KeyboardInterrupt:
logger.info("监控已停止")
except Exception as e:
logger.error(f"监控异常终止: {str(e)}")
if __name__ == "__main__":
monitor = StockMonitor("你的Tushare_token")
monitor.start_monitoring()