import time import pyautogui import win32gui import win32con import tkinter as tk from tkinter import filedialog from logger_utils import setup_logger # 导入日志配置函数 import datetime from logger_utils import log_info, log_warning, log_error, log_trigger from order_executor import OrderExecutor # 导入 OrderExecutor 类 下单类 import os from threading import Thread import pygetwindow as gw class Autotrading: def __init__(self, master): self.master = master master.title("监控自动交易工具") # 注册日志回调 self.order_executor = OrderExecutor() self.default_buy_price = 0.00 self.default_sell_price = 0.00 self.default_buy_volume = 100 self.default_sell_volume = 100 self.stock_code = "" self.monitoring = False self.monitor_thread = None self.last_file_size = 0 self.monitored_file = "" self.monitoring = False # 添加监控状态标志 self.hwnd = None # 交易窗口的句柄 self.target_window_title = "东方财富终端" # 交易窗口的 # 创建界面组件 self.label = tk.Label(master, text="选择类型:", anchor="w") self.label.pack(anchor="w") # 添加单选按钮组 self.mode_var = tk.StringVar(value="方式1") # 默认选择方式1 self.mode1 = tk.Radiobutton(master, text="监控文件", variable=self.mode_var, font=('微软雅黑', 11), value="方式1") self.mode2 = tk.Radiobutton(master, text="自动推送", variable=self.mode_var, font=('微软雅黑', 11), value="方式2") self.mode1.pack(anchor="w") self.mode2.pack(anchor="w") # 添加ST排除复选框 self.exclude_st_var = tk.BooleanVar(value=True) self.exclude_st_check = tk.Checkbutton( master, text="排除ST个股", variable=self.exclude_st_var, font=('微软雅黑', 11) ) self.exclude_st_check.pack(anchor="w") # 在 __init__ 中添加 self.use_executor_var = tk.BooleanVar(value=True) # 默认启用 executor self.executor_check = tk.Checkbutton( master, text="使用专业下单模块 (OrderExecutor)", variable=self.use_executor_var, font=('微软雅黑', 11) ) self.executor_check.pack(anchor="w") # 新增编辑框 self.entry_label = tk.Label(master, text="输入策略名:", anchor="w") self.entry_label.pack(anchor="w") self.stock_entry = tk.Entry(master, width=25) self.stock_entry.insert(0, "凤随☆") # 设置默认内容 self.stock_entry.pack(anchor="w", padx=5) # 添加文件监控按钮 self.monitor_button = tk.Button( master, text="监控文件", command=self.choose_model, width=15, font=('微软雅黑', 12), # 设置字体和大小 fg='red' # 字体颜色 ) self.monitor_button.pack() # 文件选择按钮 self.file_button = tk.Button( master, text="选择文件", command=self.select_file, font=('微软雅黑', 12), # 设置字体和大小 width=12 ) self.file_button.pack() # 文件显示标签 self.file_label = tk.Label(master, text="未选择文件", font=('微软雅黑', 11)) self.file_label.pack() # 新增窗口状态标签 self.window_status = tk.Label(master, text="窗口状态: 未检测到", fg="red", font=('微软雅黑', 11)) self.window_status.pack() # 添加日志窗口 self.log_frame = tk.Frame(master) self.log_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # 日志文本框,设置字体大小 self.log_text = tk.Text( self.log_frame, height=20, state='disabled', font=('微软雅黑', 12) # 设置字体和大小,可按需调整 ) self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) # 滚动条 self.log_scroll = tk.Scrollbar(self.log_frame, command=self.log_text.yview) self.log_scroll.pack(side=tk.RIGHT, fill=tk.Y) self.log_text.config(yscrollcommand=self.log_scroll.set) # === 设置日志颜色样式(放在这里)=== self.log_text.tag_configure('error', foreground='red') self.log_text.tag_configure('loading', foreground='orange') self.log_text.tag_configure('trigger', foreground='green') self.log_text.tag_configure('default', foreground='blue') self.log_text.tag_configure('info', foreground='blue') self.log_text.tag_configure('warning', foreground='orange') # 注册日志回调 setup_logger(self.gui_log) def gui_log(self, message, level='default'): """日志代理函数,用于将日志信息更新到 GUI""" current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") full_message = f" {message}\n" self.log_text.config(state=tk.NORMAL) self.log_text.insert(tk.END, full_message, level) app.log_text.see(tk.END) app.log_text.config(state=tk.DISABLED) def get_entry_content(self): """获取输入框内容""" content = self.stock_entry.get() if not content.strip(): # 如果内容为空或只有空格 log_error("警告:策略名不能为空!") return None return content.strip() def choose_model(self): """根据单选按钮选择功能""" selected_mode = self.mode_var.get() if selected_mode == "方式1": self.start_file_monitoring() elif selected_mode == "方式2": self.start_auto_push() # 获取窗口句柄 def get_window_handle(self, window_title): """获取指定标题的窗口句柄""" hwnd = win32gui.FindWindow(None, window_title) if not hwnd: self.window_status.config(text=f"窗口状态: {window_title} 未打开", fg="red") raise Exception(f"未找到标题为 '{window_title}' 的窗口") self.window_status.config(text=f"窗口状态: {window_title}", fg="green") return hwnd def get_window_content(self, hwnd): """获取窗口可视内容(截图)""" # 将窗口置顶 win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) win32gui.SetForegroundWindow(hwnd) # 获取窗口位置和尺寸 left, top, right, bottom = win32gui.GetWindowRect(hwnd) width = right - left height = bottom - top # 截图并保存 screenshot = pyautogui.screenshot(region=(left, top, width, height)) return screenshot def capture_window(self): """截图功能""" try: hwnd = self.get_window_handle(self.target_window_title) content = self.get_window_content(hwnd) content.save(f"{self.target_window_title}.png") self.file_label.config(text=f"截图已保存为 {self.target_window_title}.png") except Exception as e: self.file_label.config(text=f"错误: {str(e)}") def start_auto_push(self): """开始自动推送功能""" self.place_order("002183", "买入", 100, 2.00, ) def start_file_monitoring(self): if not self.monitored_file: log_warning("请先选择要监控的文件") return if self.monitoring: self.monitoring = False self.monitor_button.config(text="监控文件") log_info(f"已停止监控文件: {self.monitored_file}") else: try: self.get_window_handle(self.target_window_title) self.monitoring = True self.monitor_button.config(text="停止监控") self.last_file_size = os.path.getsize(self.monitored_file) self.monitor_thread = Thread(target=self.monitor_file_changes) self.monitor_thread.daemon = True self.monitor_thread.start() log_info(f"开始监控文件: {self.monitored_file}") except Exception as e: log_error(f"未找到 {self.target_window_title} 窗口,请先打开该窗口再开始监控。错误信息: {str(e)}") def monitor_file_changes(self): """监控文件变化的后台线程,处理ANSI格式文件""" last_position = os.path.getsize(self.monitored_file) if os.path.exists(self.monitored_file) else 0 last_warning_type = "" # 记录上一条预警类型 # 获取策略名作为预警名 warning_name = self.get_entry_content() log_error(f"当前策略:{warning_name}") while self.monitoring: try: current_size = os.path.getsize(self.monitored_file) if current_size > last_position: # 比较当前文件大小和上次记录的位置 with open(self.monitored_file, 'r', encoding='mbcs') as f: f.seek(last_position) new_lines = f.readlines() last_position = f.tell() if new_lines: for line in new_lines: # 解析每行数据 parts = line.strip().split('\t') if len(parts) >= 6: code = parts[0] # 代码 name = parts[1] # 个股名称 t_time = parts[2] # 预警时间 price = parts[3] # 预警价格 increase = parts[4] # 预警涨幅 code_num = parts[5] # 编码 warning_type = parts[6] if len(parts) > 6 else "" # 预警类型 output = f"预警类型: {warning_type}\n代码: {code}\n个股名称: {name}\n预警时间: {t_time}\n预警价格: {price}\n预警涨幅: {increase}\n编码: {code_num}\n" # 检查是否排除ST个股 if self.exclude_st_var.get() and ('ST' in name or '*ST' in name): log_error(f"排除ST个股: {name}({code})") continue # 判断是否匹配当前策略 if warning_type == warning_name: log_trigger(f"策略触发----{warning_name}\n{output}") # 调用下单函数(示例) # self.place_order(code, name, price, self.default_buy_volume, 'buy') else: log_error(f"非指定策略-{warning_type}") log_info("--------------------------------") except Exception as e: log_error(f"监控出错: {str(e)}") time.sleep(1) time.sleep(0.5) def select_file(self): """文件选择功能""" if self.monitoring: log_warning("请先停止监控再选择文件") return file_path = filedialog.askopenfilename() if file_path: self.file_label.config(text=f"已选择文件: {file_path}") self.monitored_file = file_path log_info(f"已选择监控文件: {file_path}") def get_min_interval(self): """检测系统最小可操作间隔""" # 测试几次点击操作的最小间隔 test_times = 3 start_time = time.time() for _ in range(test_times): pyautogui.click(100, 100) # 在屏幕角落测试点击 end_time = time.time() # 计算平均间隔时间,并乘以安全系数(1.5) return (end_time - start_time) / test_times * 1.5 def place_order(self, code, code_name, price, volume, order_type): """ 下单函数,优先使用 OrderExecutor 下单 :param code: 股票代码 :param code_name: 股票名称 :param price: 下单价格 :param volume: 下单数量 :param order_type: 下单类型,如 'buy' 或 'sell' """ try: pure_code = code[-6:] # 假设输入为 "SH600000" 或 "SZ000001" 等格式 auto_push = 1 # 表示是否自动下单,这里默认开启 log_trigger(f"开始 {order_type} 下单,代码: {code},{code_name},价格: {price},数量: {volume}") self.order_executor.place_order(pure_code, auto_push) log_trigger(f"{order_type} 下单成功,代码: {code},{code_name},价格: {price},数量: {volume}") except Exception as e: log_error(f"{order_type} 下单失败,代码: {code},错误信息: {str(e)}") if __name__ == "__main__": root = tk.Tk() app = Autotrading(root) root.mainloop()