""" 个股监控 - 监控个股状态 """ import tushare as ts import time import logging from typing import Dict, List, Optional from quote_manager import QuoteManager # 新增导入 from logger_utils_new import setup_logger, LOG_STYLES # 新增导入 # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) # 替换原有的日志配置 # 修改日志配置部分 import logging from logger_utils_new import setup_logger # 确保logger被正确初始化 logger = setup_logger('strategy_monitor') # 使用logger_utils_new中的setup_logger # 如果setup_logger不可用,使用基本的日志配置 if logger is None: logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('strategy_monitor') class StockMonitor: def __init__(self, token: str): """初始化监控器""" self.quote_manager = QuoteManager() ts.set_token(token) self.pro = ts.pro_api() # 监控配置 self.monitor_interval = 5 self.stock_list = self._load_and_filter_stocks(r"D:\gp_data\history\20250714.txt") # 策略参数 self.buy_threshold = 1.02 self.sell_threshold = 0.98 self.min_volume = 100000 def _load_and_filter_stocks(self, file_path: str) -> List[str]: """从文件加载股票列表并过滤ST和科创板股票""" try: with open(file_path, 'r', encoding='utf-8') as f: stocks = [line.strip() for line in f if line.strip()] # 过滤ST和科创板股票 filtered_stocks = [] for stock in stocks: # 排除ST/*ST股票 if stock.startswith(('ST', '*ST')): continue # 排除科创板股票(代码以688开头) if stock.split('.')[0].startswith('688'): continue filtered_stocks.append(stock) logger.info(f"加载并过滤后股票数量: {len(filtered_stocks)}") return filtered_stocks except Exception as e: if logger: # 添加检查 logger.error(f"加载股票列表失败: {str(e)}") else: print(f"加载股票列表失败: {str(e)}") # 后备方案 return [ "600519.SH", "000858.SZ", "601318.SH", "600036.SH", "000333.SZ" ] def get_realtime_quotes(self) -> Optional[Dict]: """使用quote_manager获取实时行情数据""" try: # 使用get_realtime_quotes方法替代get_quote quotes_data = self.quote_manager.get_realtime_quotes(self.stock_list) if not quotes_data: return None # 转换数据格式以保持兼容 quotes = {} for code, row in quotes_data.items(): # 确保row是字典类型 if not isinstance(row, dict): row = row.to_dict() if hasattr(row, 'to_dict') else {} quotes[code] = { 'price': float(row.get('PRICE', 0)), 'avg_price': (float(row.get('OPEN', 0)) + float(row.get('PRE_CLOSE', 0))) / 2, 'volume': float(row.get('VOLUME', 0)) } return quotes except Exception as e: logger.error(f"获取行情失败: {str(e)}") return None def analyze_signals(self, quotes: Dict) -> List[Dict]: """分析股票信号""" signals = [] for code, data in quotes.items(): try: current_price = float(data['price']) avg_price = float(data['avg_price']) volume = float(data['volume']) signal = 'HOLD' if current_price > avg_price * self.buy_threshold and volume > self.min_volume: signal = 'BUY' elif current_price < avg_price * self.sell_threshold and volume > self.min_volume / 2: signal = 'SELL' signals.append({ 'code': code, 'price': current_price, 'avg_price': avg_price, 'volume': volume, 'signal': signal }) except Exception as e: logger.error(f"分析股票{code}时出错: {str(e)}") return signals def start_monitoring(self): """开始监控""" logger.info("启动股票监控系统...") logger.info(f"监控股票: {', '.join(self.stock_list)}") try: while True: quotes = self.get_realtime_quotes() if quotes: signals = self.analyze_signals(quotes) for signal in signals: if signal['signal'] != 'HOLD': logger.info( f"信号: {signal['code']} | " f"价格: {signal['price']:.2f} | " f"均线: {signal['avg_price']:.2f} | " f"成交量: {signal['volume']} | " f"操作: {signal['signal']}" ) time.sleep(self.monitor_interval) except KeyboardInterrupt: logger.info("监控已停止") except Exception as e: logger.error(f"监控异常终止: {str(e)}") if __name__ == "__main__": monitor = StockMonitor("你的Tushare_token") monitor.start_monitoring()