Files
real_view/自动下单PYautogui测试.py
2025-07-28 12:15:45 +08:00

320 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import pyautogui
import win32gui
import win32con
import tkinter as tk
from tkinter import filedialog
from logger_utils import setup_logger # 导入日志配置函数
import datetime
from logger_utils import log_info, log_warning, log_error, log_trigger
from order_executor import OrderExecutor # 导入 OrderExecutor 类 下单类
import os
from threading import Thread
import pygetwindow as gw
class Autotrading:
def __init__(self, master):
self.master = master
master.title("监控自动交易工具")
# 注册日志回调
self.order_executor = OrderExecutor()
self.default_buy_price = 0.00
self.default_sell_price = 0.00
self.default_buy_volume = 100
self.default_sell_volume = 100
self.stock_code = ""
self.monitoring = False
self.monitor_thread = None
self.last_file_size = 0
self.monitored_file = ""
self.monitoring = False # 添加监控状态标志
self.hwnd = None # 交易窗口的句柄
self.target_window_title = "东方财富终端" # 交易窗口的
# 创建界面组件
self.label = tk.Label(master, text="选择类型:", anchor="w")
self.label.pack(anchor="w")
# 添加单选按钮组
self.mode_var = tk.StringVar(value="方式1") # 默认选择方式1
self.mode1 = tk.Radiobutton(master, text="监控文件", variable=self.mode_var, font=('微软雅黑', 11), value="方式1")
self.mode2 = tk.Radiobutton(master, text="自动推送", variable=self.mode_var, font=('微软雅黑', 11), value="方式2")
self.mode1.pack(anchor="w")
self.mode2.pack(anchor="w")
# 添加ST排除复选框
self.exclude_st_var = tk.BooleanVar(value=True)
self.exclude_st_check = tk.Checkbutton(
master,
text="排除ST个股",
variable=self.exclude_st_var,
font=('微软雅黑', 11)
)
self.exclude_st_check.pack(anchor="w")
# 在 __init__ 中添加
self.use_executor_var = tk.BooleanVar(value=True) # 默认启用 executor
self.executor_check = tk.Checkbutton(
master,
text="使用专业下单模块 (OrderExecutor)",
variable=self.use_executor_var,
font=('微软雅黑', 11)
)
self.executor_check.pack(anchor="w")
# 新增编辑框
self.entry_label = tk.Label(master, text="输入策略名:", anchor="w")
self.entry_label.pack(anchor="w")
self.stock_entry = tk.Entry(master, width=25)
self.stock_entry.insert(0, "凤随☆") # 设置默认内容
self.stock_entry.pack(anchor="w", padx=5)
# 添加文件监控按钮
self.monitor_button = tk.Button(
master,
text="监控文件",
command=self.choose_model,
width=15,
font=('微软雅黑', 12), # 设置字体和大小
fg='red' # 字体颜色
)
self.monitor_button.pack()
# 文件选择按钮
self.file_button = tk.Button(
master,
text="选择文件",
command=self.select_file,
font=('微软雅黑', 12), # 设置字体和大小
width=12
)
self.file_button.pack()
# 文件显示标签
self.file_label = tk.Label(master, text="未选择文件", font=('微软雅黑', 11))
self.file_label.pack()
# 新增窗口状态标签
self.window_status = tk.Label(master, text="窗口状态: 未检测到", fg="red", font=('微软雅黑', 11))
self.window_status.pack()
# 添加日志窗口
self.log_frame = tk.Frame(master)
self.log_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 日志文本框,设置字体大小
self.log_text = tk.Text(
self.log_frame,
height=20,
state='disabled',
font=('微软雅黑', 12) # 设置字体和大小,可按需调整
)
self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# 滚动条
self.log_scroll = tk.Scrollbar(self.log_frame, command=self.log_text.yview)
self.log_scroll.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text.config(yscrollcommand=self.log_scroll.set)
# === 设置日志颜色样式(放在这里)===
self.log_text.tag_configure('error', foreground='red')
self.log_text.tag_configure('loading', foreground='orange')
self.log_text.tag_configure('trigger', foreground='green')
self.log_text.tag_configure('default', foreground='blue')
self.log_text.tag_configure('info', foreground='blue')
self.log_text.tag_configure('warning', foreground='orange')
# 注册日志回调
setup_logger(self.gui_log)
def gui_log(self, message, level='default'):
"""日志代理函数,用于将日志信息更新到 GUI"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
full_message = f" {message}\n"
self.log_text.config(state=tk.NORMAL)
self.log_text.insert(tk.END, full_message, level)
app.log_text.see(tk.END)
app.log_text.config(state=tk.DISABLED)
def get_entry_content(self):
"""获取输入框内容"""
content = self.stock_entry.get()
if not content.strip(): # 如果内容为空或只有空格
log_error("警告:策略名不能为空!")
return None
return content.strip()
def choose_model(self):
"""根据单选按钮选择功能"""
selected_mode = self.mode_var.get()
if selected_mode == "方式1":
self.start_file_monitoring()
elif selected_mode == "方式2":
self.start_auto_push()
# 获取窗口句柄
def get_window_handle(self, window_title):
"""获取指定标题的窗口句柄"""
hwnd = win32gui.FindWindow(None, window_title)
if not hwnd:
self.window_status.config(text=f"窗口状态: {window_title} 未打开", fg="red")
raise Exception(f"未找到标题为 '{window_title}' 的窗口")
self.window_status.config(text=f"窗口状态: {window_title}", fg="green")
return hwnd
def get_window_content(self, hwnd):
"""获取窗口可视内容(截图)"""
# 将窗口置顶
win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(hwnd)
# 获取窗口位置和尺寸
left, top, right, bottom = win32gui.GetWindowRect(hwnd)
width = right - left
height = bottom - top
# 截图并保存
screenshot = pyautogui.screenshot(region=(left, top, width, height))
return screenshot
def capture_window(self):
"""截图功能"""
try:
hwnd = self.get_window_handle(self.target_window_title)
content = self.get_window_content(hwnd)
content.save(f"{self.target_window_title}.png")
self.file_label.config(text=f"截图已保存为 {self.target_window_title}.png")
except Exception as e:
self.file_label.config(text=f"错误: {str(e)}")
def start_auto_push(self):
"""开始自动推送功能"""
self.place_order("002183", "买入", 100, 2.00, )
def start_file_monitoring(self):
if not self.monitored_file:
log_warning("请先选择要监控的文件")
return
if self.monitoring:
self.monitoring = False
self.monitor_button.config(text="监控文件")
log_info(f"已停止监控文件: {self.monitored_file}")
else:
try:
self.get_window_handle(self.target_window_title)
self.monitoring = True
self.monitor_button.config(text="停止监控")
self.last_file_size = os.path.getsize(self.monitored_file)
self.monitor_thread = Thread(target=self.monitor_file_changes)
self.monitor_thread.daemon = True
self.monitor_thread.start()
log_info(f"开始监控文件: {self.monitored_file}")
except Exception as e:
log_error(f"未找到 {self.target_window_title} 窗口,请先打开该窗口再开始监控。错误信息: {str(e)}")
def monitor_file_changes(self):
"""监控文件变化的后台线程处理ANSI格式文件"""
last_position = os.path.getsize(self.monitored_file) if os.path.exists(self.monitored_file) else 0
last_warning_type = "" # 记录上一条预警类型
# 获取策略名作为预警名
warning_name = self.get_entry_content()
log_error(f"当前策略:{warning_name}")
while self.monitoring:
try:
current_size = os.path.getsize(self.monitored_file)
if current_size > last_position: # 比较当前文件大小和上次记录的位置
with open(self.monitored_file, 'r', encoding='mbcs') as f:
f.seek(last_position)
new_lines = f.readlines()
last_position = f.tell()
if new_lines:
for line in new_lines:
# 解析每行数据
parts = line.strip().split('\t')
if len(parts) >= 6:
code = parts[0] # 代码
name = parts[1] # 个股名称
t_time = parts[2] # 预警时间
price = parts[3] # 预警价格
increase = parts[4] # 预警涨幅
code_num = parts[5] # 编码
warning_type = parts[6] if len(parts) > 6 else "" # 预警类型
output = f"预警类型: {warning_type}\n代码: {code}\n个股名称: {name}\n预警时间: {t_time}\n预警价格: {price}\n预警涨幅: {increase}\n编码: {code_num}\n"
# 检查是否排除ST个股
if self.exclude_st_var.get() and ('ST' in name or '*ST' in name):
log_error(f"排除ST个股: {name}({code})")
continue
# 判断是否匹配当前策略
if warning_type == warning_name:
log_trigger(f"策略触发----{warning_name}\n{output}")
# 调用下单函数(示例)
# self.place_order(code, name, price, self.default_buy_volume, 'buy')
else:
log_error(f"非指定策略-{warning_type}")
log_info("--------------------------------")
except Exception as e:
log_error(f"监控出错: {str(e)}")
time.sleep(1)
time.sleep(0.5)
def select_file(self):
"""文件选择功能"""
if self.monitoring:
log_warning("请先停止监控再选择文件")
return
file_path = filedialog.askopenfilename()
if file_path:
self.file_label.config(text=f"已选择文件: {file_path}")
self.monitored_file = file_path
log_info(f"已选择监控文件: {file_path}")
def get_min_interval(self):
"""检测系统最小可操作间隔"""
# 测试几次点击操作的最小间隔
test_times = 3
start_time = time.time()
for _ in range(test_times):
pyautogui.click(100, 100) # 在屏幕角落测试点击
end_time = time.time()
# 计算平均间隔时间,并乘以安全系数(1.5)
return (end_time - start_time) / test_times * 1.5
def place_order(self, code, code_name, price, volume, order_type):
"""
下单函数,优先使用 OrderExecutor 下单
:param code: 股票代码
:param code_name: 股票名称
:param price: 下单价格
:param volume: 下单数量
:param order_type: 下单类型,如 'buy''sell'
"""
try:
pure_code = code[-6:] # 假设输入为 "SH600000" 或 "SZ000001" 等格式
auto_push = 1 # 表示是否自动下单,这里默认开启
log_trigger(f"开始 {order_type} 下单,代码: {code}{code_name},价格: {price},数量: {volume}")
self.order_executor.place_order(pure_code, auto_push)
log_trigger(f"{order_type} 下单成功,代码: {code}{code_name},价格: {price},数量: {volume}")
except Exception as e:
log_error(f"{order_type} 下单失败,代码: {code},错误信息: {str(e)}")
if __name__ == "__main__":
root = tk.Tk()
app = Autotrading(root)
root.mainloop()