Files
real_view/order_executor.py
2025-07-28 12:15:45 +08:00

173 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import subprocess
import time
import pygetwindow as gw
import win32gui
import win32con
import cv2
import os
from logger_utils import log_info, log_warning, log_error, log_trigger
# 移除 pyautogui 导入
class OrderExecutor:
def __init__(self, target_window_title="东方财富终端"):
self.target_window_title = target_window_title
self.order_window_title = 'FlashTradeDlgDlg'
self.order_count_window_title = '提示'
self.ahk_script_path = "D:/gp_data/order_executor.ahk"
def run_ahk_script(self, command):
"""执行AHK脚本命令"""
try:
# 尝试查找AHK安装路径
ahk_path = None
# 常见安装路径
possible_paths = [
"C:\\Program Files\\AutoHotkey\\v2\\AutoHotkey.exe", # v2版本
"C:\\Program Files\\AutoHotkey\\AutoHotkey.exe",
"AutoHotkey.exe" # 如果在PATH环境变量中
]
for path in possible_paths:
if os.path.exists(path):
ahk_path = path
break
if not ahk_path:
log_error("未找到AutoHotkey安装路径")
return False
subprocess.Popen(
[ahk_path, '/restart', self.ahk_script_path, command],
creationflags=subprocess.CREATE_NO_WINDOW
)
return True
except Exception as e:
log_error(f"执行AHK脚本失败: {e}")
return False
def click_confirm_button(self, window_title, confirm_ratio=(0.8, 0.8), delay=0.3):
"""
修改为使用AHK点击确认按钮
"""
try:
time.sleep(delay)
window_list = gw.getWindowsWithTitle(window_title)
if not window_list:
log_warning(f"未找到行情软件 '{window_title}' 的窗口")
return False
window = window_list[0]
left, top, width, height = window.left, window.top, window.width, window.height
x = left + int(width * confirm_ratio[0])
y = top + int(height * confirm_ratio[1])
# 使用AHK执行点击
command = f"click {x} {y}"
self.run_ahk_script(command)
log_trigger(f"AHK点击 '{window_title}' ,位置: ({x}, {y})")
return True
except Exception as e:
log_error(f"点击确认按钮失败: {e}")
return False
def click_button_by_image(self, image_path, window_title=None,
timeout=10, retry_interval=0.5,
confidence=0.8, move_duration=0.2):
"""
修改为使用AHK进行图像识别点击
"""
log_info(f"开始AHK图像识别: {image_path}")
# 构建AHK命令
command = f"image_click {image_path}"
if window_title:
command += f" {window_title}"
command += f" {confidence}"
# 执行AHK命令
success = self.run_ahk_script(command)
if success:
log_trigger(f"AHK成功识别并点击图像 '{image_path}'")
return True, (0, 0) # AHK不返回坐标用(0,0)占位
else:
log_warning(f"AHK未能识别图像 '{image_path}'")
return False, None
def place_order(self, pure_code, auto_push):
"""
完全使用AHK的下单函数
"""
try:
# 获取所有匹配标题的窗口
window_list = gw.getWindowsWithTitle(self.target_window_title)
if not window_list:
log_warning(f"未找到标题为 '{self.target_window_title}' 的窗口")
return False
# 筛选逻辑:优先选择可见且活动的窗口
target_window = None
def is_window_visible(hwnd):
return win32gui.IsWindowVisible(hwnd) and win32gui.IsWindowEnabled(hwnd)
for window in window_list:
hwnd = window._hWnd
if is_window_visible(hwnd) and window.isActive:
target_window = window
break
# 如果没有活动窗口,选择第一个可见窗口
if not target_window:
for window in window_list:
hwnd = window._hWnd
if is_window_visible(hwnd):
target_window = window
break
# 如果都没有,选择第一个匹配的窗口
if not target_window:
target_window = window_list[0]
# 使用筛选后的目标窗口进行操作
hwnd = target_window._hWnd
log_info(f"找到窗口句柄: {hwnd}")
target_window.restore()
target_window.maximize()
target_window.activate()
time.sleep(0.2)
# 使用AHK点击中心位置
self.click_confirm_button(self.target_window_title, (0.5, 0.5), 0.1)
# 使用AHK输入代码
self.run_ahk_script(f"type {pure_code}")
self.run_ahk_script("press enter")
time.sleep(0.1)
# 判断是否自动下单
if auto_push == 1:
self.run_ahk_script("type 21")
self.run_ahk_script("press enter")
time.sleep(0.1)
# 点击全仓按钮
success, pos = self.click_button_by_image(
image_path="../images/full_position.png",
window_title=self.order_window_title,
timeout=10,
retry_interval=0.3,
confidence=0.9
)
if success:
log_info("已点击全仓按钮")
# 判断是否弹出仓位的框
if self.is_window_exists(self.order_count_window_title, 0.5):
log_error(f"剩余金额不满足购买{pure_code}最低需求。")
else:
log_warning("未找到全仓按钮图像")
except Exception as e:
log_error(f"下单失败: {str(e)}")
return False