111 lines
4.0 KiB
Python
111 lines
4.0 KiB
Python
"""
|
|
行情数据管理模块 - 支持多数据源(Tushare/AKShare)
|
|
"""
|
|
import pandas as pd
|
|
import tushare as ts
|
|
import akshare as ak
|
|
from typing import List, Dict, Optional # 添加 Optional 导入
|
|
import logging
|
|
import time
|
|
from enum import Enum, auto
|
|
import threading
|
|
|
|
# 添加 Tushare Token 设置
|
|
ts.set_token('9343e641869058684afeadfcfe7fd6684160852e52e85332a7734c8d')
|
|
|
|
class DataSource(Enum):
|
|
TUSHARE = "tushare"
|
|
AKSHARE = "akshare"
|
|
LOCAL = "local"
|
|
|
|
class QuoteManager:
|
|
_instance = None
|
|
_lock = threading.Lock()
|
|
|
|
def __new__(cls):
|
|
with cls._lock:
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
cls._instance._init_manager()
|
|
return cls._instance
|
|
|
|
def _init_manager(self):
|
|
self._cache = {}
|
|
self._cache_ttl = 60
|
|
self._data_source = DataSource.TUSHARE
|
|
self.max_retries = 3 # 默认最大重试次数
|
|
self.retry_interval = 2 # 默认重试间隔(秒)
|
|
|
|
def set_retry_policy(self, max_retries: int, retry_interval: float = 2):
|
|
"""设置重试策略"""
|
|
self.max_retries = max_retries
|
|
self.retry_interval = retry_interval
|
|
|
|
def set_data_source(self, source: DataSource):
|
|
"""设置数据源"""
|
|
self._data_source = source
|
|
|
|
def get_realtime_quotes(self, codes: List[str]) -> Dict[str, pd.DataFrame]:
|
|
"""获取实时行情(带重试机制)"""
|
|
last_error = None
|
|
for attempt in range(self.max_retries):
|
|
try:
|
|
if self._data_source == DataSource.TUSHARE:
|
|
return self._get_tushare_quotes(codes)
|
|
elif self._data_source == DataSource.AKSHARE:
|
|
return self._get_akshare_quotes(codes)
|
|
else:
|
|
raise ValueError("不支持的数据源")
|
|
except Exception as e:
|
|
last_error = e
|
|
if attempt < self.max_retries - 1: # 不是最后一次尝试
|
|
time.sleep(self.retry_interval)
|
|
continue
|
|
raise Exception(f"获取行情失败(尝试{self.max_retries}次): {str(last_error)}")
|
|
|
|
def _get_tushare_quotes(self, codes: List[str]) -> Dict[str, pd.DataFrame]:
|
|
"""使用 Tushare 获取实时行情(带重试机制)"""
|
|
for attempt in range(self.max_retries):
|
|
try:
|
|
df = ts.realtime_quote(ts_code=','.join(codes))
|
|
if df is None or df.empty:
|
|
raise Exception("返回数据为空")
|
|
return {row['TS_CODE']: row for _, row in df.iterrows()}
|
|
except Exception as e:
|
|
if attempt < self.max_retries - 1:
|
|
time.sleep(self.retry_interval)
|
|
continue
|
|
raise Exception(f"Tushare 行情获取失败(尝试{self.max_retries}次): {str(e)}")
|
|
|
|
def _get_akshare_quotes(self, codes: List[str]) -> Dict[str, pd.DataFrame]:
|
|
"""使用 AKShare 获取实时行情"""
|
|
# 这里需要实现 AKShare 的获取逻辑
|
|
raise NotImplementedError("AKShare 实现待完成")
|
|
|
|
def _convert_akshare_format(self, row) -> Dict:
|
|
"""将AKShare数据格式转换为统一格式"""
|
|
return {
|
|
'TS_CODE': row['代码'],
|
|
'PRICE': row['最新价'],
|
|
'OPEN': row['今开'],
|
|
'PRE_CLOSE': row['昨收'],
|
|
'HIGH': row['最高'],
|
|
'LOW': row['最低'],
|
|
'VOLUME': row['成交量']
|
|
}
|
|
|
|
def get_quote(self, code: str) -> Optional[Dict]:
|
|
"""获取单个股票行情(兼容旧接口)"""
|
|
try:
|
|
quotes = self.get_realtime_quotes([code])
|
|
if not quotes or code not in quotes:
|
|
return None
|
|
row = quotes[code]
|
|
return {
|
|
'price': row['PRICE'],
|
|
'avg_price': (row['OPEN'] + row['PRE_CLOSE']) / 2,
|
|
'volume': row['VOLUME']
|
|
}
|
|
except Exception as e:
|
|
logging.error(f"获取股票{code}行情失败: {str(e)}")
|
|
return None |