Files
2026-01-17 21:21:30 +08:00

218 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
# 作 者84028
# 时 间2024/2/28 21:00
# tsdpsdk
"""
import logging
import os
import time
from concurrent.futures.thread import ThreadPoolExecutor
from functools import wraps
from typing import Optional, Callable
from google.protobuf import json_format
from tushare.subs.ht_subs.covert import datatype_map, convert_ts_model
import sys
logger = logging.getLogger(__name__)
com_path = os.path.dirname(os.path.abspath(__file__))
sys.path.append(com_path)
sys.path.append(f'{com_path}/com')
sys.path.append(f"{com_path}/com/model")
sys.path.append(f"{com_path}/com/interface")
sys.path.append(f"{com_path}/com/libs")
thread_pool = ThreadPoolExecutor()
class InsightSubscribe(object):
def __init__(self, user='', password='', ip='221.131.138.171', port=9362):
self.user = user
self.password = password
self.ip = ip
self.port = port
open_trace = False
open_file_log = False
open_cout_log = False
get_interface().init(open_trace, open_file_log, open_cout_log)
self.login()
self.subscribe_id_elements = set()
self.subscribe_callbacks = set()
self.subscribe_types = set()
self.subscribe_codes = set()
def login(self):
istoken = False
certfolder = f"{com_path}/com/cert"
backup_list = mdc_gateway_interface.BackupList()
get_interface().login(self.ip, self.port, self.user, self.password, istoken, certfolder, backup_list)
def subscribe_start(self):
"""统一订阅app上所有的types-codes, 启动监听程序"""
interface = get_interface()
callback = OnRecvMarkertData()
callback.OnMarketData = self.subscribe_callback
interface.setCallBack(callback)
interface.subscribeById(MDSubscribe.COVERAGE, list(self.subscribe_id_elements))
def subscribe_callback(self, record):
"""回调所有的注册函数"""
for cb in self.subscribe_callbacks:
cb(record)
def subscribe_register(self, datatypes=None, codes=None):
if not datatypes or not codes:
raise Exception("订阅内容不能为空!")
market_data_type_list = []
for datatype in datatypes:
datatype = datatype.upper()
if datatype not in datatype_map:
raise Exception(f'非法的数据据类型 {datatype}')
market_data_type_list.append(datatype_map[datatype])
for code in codes:
id_element = mdc_gateway_interface.SubscribeByIdElement(code, market_data_type_list)
self.subscribe_id_elements.add(id_element)
def decorator(func):
# 回调函数
def callback_func(record):
inst_data = json_format.MessageToDict(record)
# 判断type类型
if record.marketDataType in market_data_type_list:
pass
else: # 没有命中
return
# 判断 codes
for v in inst_data.values():
if isinstance(v, dict) and 'HTSCSecurityID' in v:
_code = v['HTSCSecurityID']
if _code in codes:
break
else: # 没有命中
return
# 回调函数
try:
ts_record = convert_ts_model(inst_data)
thread_pool.submit(func, ts_record, inst_data)
except Exception as ee:
logger.error(str(ee), exc_info=True)
self.subscribe_callbacks.add(callback_func)
@wraps(func)
def inner(*args, **kwargs):
""" should receive a message-value parameter """
return func(*args, **kwargs)
return inner
return decorator
@staticmethod
def register_playback(datatype='', codes=None, start_date='20211103090000', end_date='20211103150000'):
if not codes:
raise Exception("订阅内容不能为空!")
if '-' in start_date or ':' in start_date:
start_date = start_date.replace('-', '').replace(' ', '').replace(':', '')
if '-' in end_date or ':' in end_date:
end_date = end_date.replace('-', '').replace(' ', '').replace(':', '')
datatype = datatype.upper()
if datatype.upper() not in datatype_map:
raise Exception(f'非法的数据据类型 {datatype}')
def decorator(func):
# 回调函数
def on_func(record):
try:
for inst in record.marketDataStream.marketDataList.marketDatas:
inst_data = json_format.MessageToDict(inst)
ts_record = convert_ts_model(inst_data)
thread_pool.submit(func, ts_record, inst_data)
except Exception as ee:
logger.error(str(ee), exc_info=True)
callback = OnRecvMarkertData()
callback.OnPlaybackPayload = on_func
interface = get_interface()
interface.setCallBack(callback)
interface.playCallback(codes, datatype_map[datatype], MDPlayback.NO_EXRIGHTS, start_date, end_date)
@wraps(func)
def inner(*args, **kwargs):
""" should receive a message-value parameter """
return func(*args, **kwargs)
return inner
return decorator
def run(self, stop_when: Optional[Callable] = None):
"""
Parameters
----------
stop_when 可执行对象, 当执行返回True时退出订阅循环
Returns
-------
"""
if self.subscribe_callbacks:
self.subscribe_start()
if isinstance(stop_when, Callable):
while True:
time.sleep(5)
if stop_when():
return
else:
print("input 'stop' to exit >>>")
line = input()
if line == 'stop':
print("sync: input-->>" + str(line) + ",then exit this sync.")
def demo1(username, password):
app = InsightSubscribe(user=username, password=password)
@app.subscribe_register(datatypes=['TICK'], codes=["000001.SZ"])
def print_subscribe_message(ts_record={}, ht_record_dict={}, *args, **kwargs):
"""
订阅数据类型datatypes并指定codes列表
datatype TICK, TRANSACTION, ORDER, 1MIN, 5MIN, 15MIN, 30MIN, 60MIN, 1DAY, 15SECOND
:param
ts_record_list:
数据类型:列表
字段说明参考 tushare.subs.model.min 和 tushare.subs.model.tick
ht_record_dict
数据类型:字典
字段说明参考华泰的数据格式
:return:
"""
print(ts_record, ht_record_dict)
logger.info('用户定义业务代码输出 print_message(%s)' % str(ts_record))
# 程序启动后等待, 输入stop后推出
app.run()
def demo2(username, password):
app = InsightSubscribe(user=username, password=password)
@app.register_playback(datatype='TICK', codes=["000001.SZ"], start_date='2021092413000000', end_date='20210924150000')
def print_playback_message(ts_record={}, ht_record_dict={}, *args, **kwargs):
"""
订阅数据类型datatype并指定codes列表
datatype TICK, TRANSACTION, ORDER, 1MIN, 5MIN, 15MIN, 30MIN, 60MIN, 1DAY, 15SECOND
:param
ts_record_list:
数据类型:列表
字段说明参考 tushare.subs.model.min 和 tushare.subs.model.tick
ht_record_dict
数据类型:字典
字段说明参考华泰的数据格式
:return:
"""
print(ts_record, ht_record_dict)
logger.info('用户定义业务代码输出 print_message(%s)' % str(ts_record))
# 数据回访完程序自动结束
app.run()