This commit is contained in:
HuangHai
2026-01-25 10:07:19 +08:00
parent b514bd2812
commit 9071c0aa72
18 changed files with 189 additions and 98 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 138 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 87 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 83 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 89 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 91 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 86 KiB

View File

@@ -39,7 +39,17 @@ def mark_input_box():
return
screenshot_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Screenshots")
if not os.path.exists(screenshot_dir):
if os.path.exists(screenshot_dir):
# 清空目录
logger.info(f"正在清空截图目录: {screenshot_dir}")
for f in os.listdir(screenshot_dir):
file_path = os.path.join(screenshot_dir, f)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
logger.warning(f"删除文件失败 {file_path}: {e}")
else:
os.makedirs(screenshot_dir)
timestamp = time.strftime("%Y%m%d_%H%M%S")

89
WeiXin/T4_InputLlmText.py Normal file
View File

@@ -0,0 +1,89 @@
# coding=utf-8
import uiautomator2 as u2
import time
import logging
import sys
import os
import asyncio
# 添加项目根目录到 sys.path 以便导入 Util
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
from WeiXin.WxUtil import find_input_box_center, perform_input_action
from Util.LlmUtil import get_llm_response
# 配置日志
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Logs")
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(log_dir, "T4_InputLlmText.log"), encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger("T4_InputLlmText")
async def generate_and_input():
logger.info("开始执行 T4: 生成 LLM 回复并输入...")
try:
# 1. 连接设备
d = u2.connect()
logger.info(f"设备连接成功: {d.info.get('serial')}")
# 2. 调用 LLM 生成回复
# 模拟一个简单的场景:针对之前的对话生成回复
prompt = "对方说:'AI 助手我现在可以开始和你聊天了!',请给出一个简短、友好且自然的回复,不超过 20 字。不要包含任何解释性文字。"
logger.info(f"正在请求 LLM 生成回复, Prompt: {prompt}")
full_response = ""
async for chunk in get_llm_response(prompt, stream=False):
full_response += chunk
llm_text = full_response.strip().strip('"').strip('').strip('')
logger.info(f"LLM 生成的回复内容: {llm_text}")
if not llm_text:
logger.error("LLM 生成内容为空,停止执行。")
return
# 3. 识别输入框位置
# 先截个图供识别使用
screenshot_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Screenshots")
if not os.path.exists(screenshot_dir):
os.makedirs(screenshot_dir)
tmp_shot = os.path.join(screenshot_dir, "t4_temp_input_check.jpg")
d.screenshot(tmp_shot)
center_point, rect_box = find_input_box_center(tmp_shot)
if center_point:
cx, cy = center_point
logger.info(f"识别到输入框中心: ({cx}, {cy})")
# 4. 点击输入框并输入文字
# 使用 WxUtil 中的 perform_input_action
# 该函数内部会先点击坐标,然后输入内容
# 设置 auto_send=True输入文字后点击发送
logger.info(f"正在点击输入框并输入文字并发送: {llm_text}")
success = perform_input_action(d, center_point, llm_text, auto_send=True)
if success:
logger.info("✅ T4 执行完成:文字已成功输入并点击发送。")
else:
logger.error("❌ T4 执行失败:输入动作未成功完成。")
else:
logger.error("❌ T4 执行失败:未能识别到输入框位置。")
except Exception as e:
logger.error(f"❌ T4 执行出错: {e}", exc_info=True)
if __name__ == "__main__":
asyncio.run(generate_and_input())

View File

@@ -57,6 +57,12 @@ def find_input_box_center(image_path):
# 1. 宽度: 屏幕宽度的 50% - 98% (通常输入框很长)
# 2. 高度: 30px - 底部区域的 90%
if width * 0.5 < w < width * 0.98 and 30 < h < bottom_h * 0.9:
# 3. 增加水平居中检查中心点X坐标必须在屏幕水平中心 10% 范围内
# 这能有效排除左右侧的气泡或按钮
cnt_center_x = x + w // 2
if abs(cnt_center_x - width // 2) > width * 0.1:
continue
# 计算中心点 Y 坐标相对于全图
global_y = crop_y_start + y + h // 2
@@ -77,68 +83,38 @@ def find_input_box_center(image_path):
candidates.sort(key=lambda c: c['global_y'], reverse=True)
# 检查最靠下的候选框是否合理
# 正常情况下输入框应该在屏幕底部 92% - 98% 之间
# 如果最靠下的候选框 Y < 92%,说明可能识别错了 (可能是倒数第二条消息)
# 严格限制:必须在屏幕底部 94% - 99% 之间 (有效排除最后一条消息)
for cand in candidates:
if cand['global_y'] > height * 0.92:
if cand['global_y'] > height * 0.94:
best_candidate = cand
break
# 如果没有找到足够靠下的,但有候选框,尝试放宽条件或者使用最靠下的那个
if not best_candidate and candidates:
logger.warning("找到 >92% 高度的输入框,尝试使用最靠下的候选框")
best_candidate = candidates[0]
# 如果没有找到符合 >94% 条件的,视为未找到,直接使用几何兜底
if not best_candidate:
logger.warning(f"找到 {len(candidates)} 个候选框,但没有一个满足 Y > 94% (最高候选 Y={candidates[0]['global_y']/height:.2%})")
if best_candidate:
x, y, w, h = best_candidate['x'], best_candidate['y'], best_candidate['w'], best_candidate['h']
center_x = x + w // 2
center_y = crop_y_start + y + h // 2
logger.info(f"找到输入框(CV-Canny): ({center_x}, {center_y}), 尺寸: {w}x{h}")
logger.info(f"找到输入框(CV-Canny): ({center_x}, {center_y}), 尺寸: {w}x{h}, 位置: {center_y/height:.2%}")
return (center_x, center_y), (x, crop_y_start + y, w, h)
# 策略2: 自适应阈值 (原有逻辑作为备份)
thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV, 11, 2)
contours_thresh, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
candidates_thresh = []
for cnt in contours_thresh:
x, y, w, h = cv2.boundingRect(cnt)
if width * 0.4 < w < width * 0.95 and 20 < h < bottom_h * 0.8:
global_y = crop_y_start + y + h // 2
if global_y > height * 0.99:
continue
candidates_thresh.append({
'x': x, 'y': y, 'w': w, 'h': h,
'global_y': global_y
})
if candidates_thresh:
candidates_thresh.sort(key=lambda c: c['global_y'], reverse=True)
best = candidates_thresh[0]
# 同样应用 > 92% 规则
if best['global_y'] > height * 0.92:
x, y, w, h = best['x'], best['y'], best['w'], best['h']
center_x = x + w // 2
center_y = crop_y_start + y + h // 2
logger.info(f"找到输入框(CV-Adaptive): ({center_x}, {center_y}), 尺寸: {w}x{h}")
return (center_x, center_y), (x, crop_y_start + y, w, h)
# 兜底策略:使用更靠下的默认坐标 (96% - 微信输入框通常很靠下)
# 之前的 94% 可能还是偏高
logger.warning("未找到明显输入框轮廓,使用更靠下的默认坐标 (96.5%)")
default_y = int(height * 0.965)
# 策略2: 如果 CV 失败,使用几何兜底 (屏幕底部中间区域)
logger.warning("CV 识别输入框失败,使用几何兜底策略 (Bottom Middle)")
# 假设输入框位于屏幕底部 97% 处,宽度为屏幕的 90%
fallback_w = int(width * 0.9)
fallback_h = int(bottom_h * 0.4) # 估算高度
fallback_x = (width - fallback_w) // 2
# 估算中心点 Y: 屏幕高度的 97%
center_x = width // 2
center_y = int(height * 0.97)
# 构造假想框
fake_w = int(width * 0.7)
fake_h = int(height * 0.08) # 稍微加高一点,视觉上更像
fake_x = (width - fake_w) // 2
fake_y = default_y - fake_h // 2
# 构造一个虚拟的 rect_box
fallback_y_rel = center_y - crop_y_start - fallback_h // 2
return (center_x, default_y), (fake_x, fake_y, fake_w, fake_h)
return (center_x, center_y), (fallback_x, crop_y_start + fallback_y_rel, fallback_w, fallback_h)
except Exception as e:
logger.error(f"查找输入框失败: {e}")
return None, None
@@ -250,14 +226,20 @@ def take_debug_screenshot(d, step_name):
logger.error(f"截图失败 ({step_name}): {e}")
return None
def perform_input_action(coords, text):
def perform_input_action(d, coords, text, auto_send=True):
"""
点击坐标并输入文本
@param d: uiautomator2 device object
@param coords: (x, y) 坐标
@param text: 输入文本
@param auto_send: 是否自动点击发送,默认为 True
"""
# 优先尝试使用 uiautomator2 的原生控件查找 (更稳健)
native_success = False
try:
d = u2.connect()
# 如果没有传入 d尝试连接
if d is None:
d = u2.connect()
# 查找 EditText 控件
input_elem = d(className="android.widget.EditText")
@@ -300,35 +282,39 @@ def perform_input_action(coords, text):
# 尝试发送回车键
time.sleep(0.5)
d.press("enter")
if auto_send:
d.press("enter")
# 尝试点击发送按钮
try:
if d(text="发送").exists:
d(text="发送").click()
logger.info("已点击 '发送' 按钮 (Native Text)")
take_debug_screenshot(d, "native_04_after_send_click_text")
else:
logger.info("未找到 '发送' 文本控件,尝试图像识别...")
send_btn_coords = find_send_button(d)
if send_btn_coords:
sx, sy = send_btn_coords
d.click(sx, sy)
logger.info(f"已点击 '发送' 按钮 (Image Rec): {sx}, {sy}")
take_debug_screenshot(d, "native_04_after_send_click_image")
if auto_send:
try:
if d(text="发送").exists:
d(text="发送").click()
logger.info("已点击 '发送' 按钮 (Native Text)")
take_debug_screenshot(d, "native_04_after_send_click_text")
else:
width, height = d.window_size()
fallback_x = int(width * 0.9)
fallback_y = int(height * 0.965)
logger.info(f"未识别到发送按钮,尝试盲点右下角: {fallback_x}, {fallback_y}")
d.click(fallback_x, fallback_y)
take_debug_screenshot(d, "native_04_after_send_click_fallback")
except Exception as e:
logger.error(f"点击发送按钮失败: {e}")
logger.info("未找到 '发送' 文本控件,尝试图像识别...")
send_btn_coords = find_send_button(d)
if send_btn_coords:
sx, sy = send_btn_coords
d.click(sx, sy)
logger.info(f"已点击 '发送' 按钮 (Image Rec): {sx}, {sy}")
take_debug_screenshot(d, "native_04_after_send_click_image")
else:
width, height = d.window_size()
fallback_x = int(width * 0.9)
fallback_y = int(height * 0.965)
logger.info(f"未识别到发送按钮,尝试盲点右下角: {fallback_x}, {fallback_y}")
d.click(fallback_x, fallback_y)
take_debug_screenshot(d, "native_04_after_send_click_fallback")
except Exception as e:
logger.error(f"点击发送按钮失败: {e}")
else:
logger.info("auto_send=False, 跳过发送动作")
logger.info("输入完成 (Native)")
native_success = True
return
return True
else:
logger.warning("未找到输入框元素 (Native),转入坐标点击模式...")
@@ -337,7 +323,7 @@ def perform_input_action(coords, text):
logger.warning(f"原生控件操作失败,降级为坐标点击: {e}")
if native_success:
return
return True
# 降级方案:使用坐标点击
if not coords:
@@ -396,36 +382,42 @@ def perform_input_action(coords, text):
take_debug_screenshot(d, "coord_03_after_input_text")
time.sleep(0.5)
d.press("enter")
# 尝试查找发送按钮并点击
try:
if d(text="发送").exists:
d(text="发送").click()
logger.info("已点击 '发送' 按钮 (Native Text)")
take_debug_screenshot(d, "coord_04_after_click_send_native")
else:
logger.info("未找到 '发送' 文本控件,尝试图像识别...")
send_btn_coords = find_send_button(d)
if send_btn_coords:
sx, sy = send_btn_coords
d.click(sx, sy)
logger.info(f"已点击 '发送' 按钮 (Image Rec): {sx}, {sy}")
take_debug_screenshot(d, "coord_04_after_click_send_image")
if auto_send:
d.press("enter")
# 尝试查找发送按钮并点击
try:
if d(text="发送").exists:
d(text="发送").click()
logger.info("已点击 '发送' 按钮 (Native Text)")
take_debug_screenshot(d, "coord_04_after_click_send_native")
else:
width, height = d.window_size()
fallback_x = int(width * 0.9)
fallback_y = int(height * 0.965)
logger.info(f"未识别到发送按钮,尝试盲点右下角: {fallback_x}, {fallback_y}")
d.click(fallback_x, fallback_y)
take_debug_screenshot(d, "coord_04_after_click_send_fallback")
except Exception as e:
logger.error(f"点击发送按钮失败: {e}")
logger.info("未找到 '发送' 文本控件,尝试图像识别...")
send_btn_coords = find_send_button(d)
if send_btn_coords:
sx, sy = send_btn_coords
d.click(sx, sy)
logger.info(f"已点击 '发送' 按钮 (Image Rec): {sx}, {sy}")
take_debug_screenshot(d, "coord_04_after_click_send_image")
else:
width, height = d.window_size()
fallback_x = int(width * 0.9)
fallback_y = int(height * 0.965)
logger.info(f"未识别到发送按钮,尝试盲点右下角: {fallback_x}, {fallback_y}")
d.click(fallback_x, fallback_y)
take_debug_screenshot(d, "coord_04_after_click_send_fallback")
except Exception as e:
logger.error(f"点击发送按钮失败: {e}")
else:
logger.info("auto_send=False, 跳过坐标模式下的发送动作")
logger.info("输入完成 (Coordinate)")
return True
except Exception as e:
logger.error(f"自动化操作失败: {e}")
return False
def analyze_chat_image(image_path, output_path, target_name="对方"):
"""

Binary file not shown.