'commit'
|
Before Width: | Height: | Size: 38 KiB |
|
Before Width: | Height: | Size: 38 KiB |
|
Before Width: | Height: | Size: 57 KiB |
BIN
WeiXin/Screenshots/t4_temp_history_check.jpg
Normal file
|
After Width: | Height: | Size: 51 KiB |
|
Before Width: | Height: | Size: 37 KiB |
|
Before Width: | Height: | Size: 43 KiB After Width: | Height: | Size: 56 KiB |
@@ -53,7 +53,8 @@ async def get_history(target_name="对方"):
|
||||
# 调用 WxUtil 中的分析函数
|
||||
dialogue_log, input_box = await analyze_chat_image(save_path, analyzed_path, device=d, target_name=target_name)
|
||||
|
||||
if dialogue_log == "VOICE_CONVERTING":
|
||||
# 检查是否正在转换
|
||||
if isinstance(dialogue_log, list) and any("[正在转换语音...]" in str(msg) for msg in dialogue_log):
|
||||
logger.info("检测到语音正在转文字,T2 任务暂停。")
|
||||
return
|
||||
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
# coding=utf-8
|
||||
import time
|
||||
import logging
|
||||
import sys
|
||||
import os
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
if project_root not in sys.path:
|
||||
sys.path.append(project_root)
|
||||
|
||||
from WeiXin import WxUtil
|
||||
from WeiXin.WxUtil import find_input_box_center
|
||||
|
||||
# 配置日志
|
||||
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Logs")
|
||||
if not os.path.exists(log_dir):
|
||||
os.makedirs(log_dir)
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler(os.path.join(log_dir, "T3_MarkInputBox.log"), encoding='utf-8'),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger("T3_MarkInputBox")
|
||||
|
||||
def mark_input_box():
|
||||
logger.info("开始执行 T3: 标识输入框位置...")
|
||||
|
||||
d = WxUtil.connect_device()
|
||||
if not d:
|
||||
return
|
||||
|
||||
screenshot_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Screenshots")
|
||||
if os.path.exists(screenshot_dir):
|
||||
# 清空目录
|
||||
logger.info(f"正在清空截图目录: {screenshot_dir}")
|
||||
for f in os.listdir(screenshot_dir):
|
||||
file_path = os.path.join(screenshot_dir, f)
|
||||
try:
|
||||
if os.path.isfile(file_path):
|
||||
os.unlink(file_path)
|
||||
except Exception as e:
|
||||
logger.warning(f"删除文件失败 {file_path}: {e}")
|
||||
else:
|
||||
os.makedirs(screenshot_dir)
|
||||
|
||||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"T3_input_{timestamp}.jpg"
|
||||
save_path = os.path.join(screenshot_dir, filename)
|
||||
|
||||
try:
|
||||
d.screenshot(save_path)
|
||||
logger.info(f"截图已保存: {save_path}")
|
||||
|
||||
# 调用 WxUtil 识别输入框
|
||||
center_point, rect_box = find_input_box_center(save_path)
|
||||
|
||||
if center_point and rect_box:
|
||||
# 读取图片进行绘制
|
||||
img_data = np.fromfile(save_path, dtype=np.uint8)
|
||||
img = cv2.imdecode(img_data, cv2.IMREAD_COLOR)
|
||||
|
||||
x, y, w, h = rect_box
|
||||
cx, cy = center_point
|
||||
|
||||
# 要求:绿色框,模块点击点处标识小红点
|
||||
# 绿色框 (Green): BGR (0, 255, 0)
|
||||
cv2.rectangle(img, (x, y), (x + w, y + h), (0, 255, 0), 5)
|
||||
|
||||
# 小红点 (Red): BGR (0, 0, 255)
|
||||
# 半径 10, 实心
|
||||
cv2.circle(img, (cx, cy), 10, (0, 0, 255), -1)
|
||||
|
||||
output_filename = f"T3_input_{timestamp}_marked.jpg"
|
||||
output_path = os.path.join(screenshot_dir, output_filename)
|
||||
|
||||
# 保存
|
||||
ext = os.path.splitext(output_path)[1]
|
||||
cv2.imencode(ext, img)[1].tofile(output_path)
|
||||
logger.info(f"✅ T3 执行完成: 标记图片已保存至 {output_path}")
|
||||
logger.info(f"输入框区域: {rect_box}, 点击中心点: {center_point}")
|
||||
|
||||
else:
|
||||
logger.warning("❌ T3 执行结果: 未能识别到输入框。")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ T3 执行失败: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
mark_input_box()
|
||||
@@ -11,7 +11,7 @@ if project_root not in sys.path:
|
||||
sys.path.append(project_root)
|
||||
|
||||
from WeiXin import WxUtil
|
||||
from WeiXin.WxUtil import find_input_box_center, perform_input_action, analyze_chat_image, clean_screenshots_dir, is_in_chat_interface
|
||||
from WeiXin.WxUtil import find_input_box_center, perform_input_action, analyze_chat_image, clean_screenshots_dir
|
||||
from Util.LlmUtil import get_llm_response
|
||||
|
||||
# 配置日志
|
||||
@@ -41,33 +41,41 @@ async def generate_and_input():
|
||||
if not d:
|
||||
return
|
||||
|
||||
# 检查界面状态
|
||||
if not is_in_chat_interface(d):
|
||||
logger.error("🚫 当前不在微信聊天界面,任务终止")
|
||||
return
|
||||
|
||||
# 2. 截图并识别对话历史
|
||||
screenshot_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Screenshots")
|
||||
if not os.path.exists(screenshot_dir):
|
||||
os.makedirs(screenshot_dir)
|
||||
|
||||
tmp_shot = os.path.join(screenshot_dir, "t4_temp_history_check.jpg")
|
||||
d.screenshot(tmp_shot)
|
||||
|
||||
analyzed_shot = os.path.join(screenshot_dir, "t4_temp_history_analyzed.jpg")
|
||||
|
||||
d.screenshot(tmp_shot)
|
||||
dialogue_log, input_box = await analyze_chat_image(tmp_shot, analyzed_shot, device=d)
|
||||
|
||||
# 语音转文字处理
|
||||
if dialogue_log == "VOICE_CONVERTING":
|
||||
# 检查是否包含正在转换的标识
|
||||
is_converting = any("[正在转换语音...]" in str(msg) for msg in dialogue_log) if isinstance(dialogue_log, list) else False
|
||||
|
||||
if is_converting:
|
||||
logger.info("检测到语音正在转文字,等待 3 秒后重新截图分析...")
|
||||
await asyncio.sleep(3)
|
||||
d.screenshot(tmp_shot)
|
||||
dialogue_log, input_box = await analyze_chat_image(tmp_shot, analyzed_shot, device=d)
|
||||
|
||||
|
||||
# 无论第二次结果如何,我们都继续执行,不再跳过
|
||||
if any("[正在转换语音...]" in str(msg) for msg in dialogue_log) if isinstance(dialogue_log, list) else False:
|
||||
logger.warning("语音转换时间较长,将尝试根据当前已有内容生成回复。")
|
||||
|
||||
history_text = ""
|
||||
if dialogue_log:
|
||||
history_text = "\n".join(dialogue_log)
|
||||
logger.info(f"提取到对话历史:\n{history_text}")
|
||||
if dialogue_log and isinstance(dialogue_log, list):
|
||||
# 过滤掉系统的转换提示语,避免干扰 LLM
|
||||
filtered_log = [msg for msg in dialogue_log if "[正在转换语音...]" not in str(msg)]
|
||||
history_text = "\n".join(filtered_log)
|
||||
logger.info("提取到对话历史: ")
|
||||
for msg in filtered_log:
|
||||
logger.info(msg)
|
||||
elif dialogue_log:
|
||||
history_text = str(dialogue_log)
|
||||
logger.info(f"提取到对话历史: {history_text}")
|
||||
else:
|
||||
logger.warning("未提取到对话历史")
|
||||
history_text = "(无对话历史)"
|
||||
|
||||
@@ -315,7 +315,9 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
|
||||
if should_trigger_convert:
|
||||
# 转换完成后稍微多等一下,确保 UI 刷新
|
||||
time.sleep(1.0)
|
||||
return "VOICE_CONVERTING", input_field_coordinates
|
||||
# 即使触发了转换,我们也返回当前的对话日志,但在日志末尾注明正在转换
|
||||
dialogue_log.append("系统: [正在转换语音...]")
|
||||
return dialogue_log, input_field_coordinates
|
||||
|
||||
return dialogue_log, input_field_coordinates
|
||||
|
||||
@@ -568,9 +570,12 @@ def perform_input_action(d, center_point, text, auto_send=True):
|
||||
os.remove(tmp_screen)
|
||||
except:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"perform_input_action error: {e}")
|
||||
return False
|
||||
|
||||
def match_template_center(image_path, template_path, threshold=0.8):
|
||||
"""
|
||||
|
||||