Files
aiData/WeiXin/WxUtil.py
HuangHai a662c33ecf 'commit'
2026-01-26 10:50:11 +08:00

576 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import uiautomator2 as u2
import time
import logging
import sys
import os
import cv2
import numpy as np
import re
# 添加项目根目录到 sys.path 以便导入 Util
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
import json
from Util.EasyOcrKit import EasyOcrKit
# 初始化 EasyOcrKit
ocr_kit = EasyOcrKit()
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("WxUtil")
# 目录配置
BASE_DATA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(BASE_DATA_DIR, "Logs")
OUTPUT_DIR = os.path.join(BASE_DATA_DIR, "Output")
TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Templates")
def clear_directory(dir_path):
"""清理指定目录下的所有文件"""
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return
import shutil
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
logger.warning(f"Failed to delete {file_path}. Reason: {e}")
def setup_script_environment():
"""运行前清理日志和输出目录"""
logger.info("清理运行环境: Logs 和 Output 目录...")
clear_directory(LOG_DIR)
clear_directory(OUTPUT_DIR)
def connect_device():
"""
连接设备并返回设备对象,同时打印详细的设备信息
"""
try:
d = u2.connect()
# 获取可靠的序列号
device_serial = d.serial if hasattr(d, 'serial') else "未知"
logger.info(f"设备连接成功: {device_serial}")
# 获取并打印详细设备信息
device_info = d.device_info
logger.info(f"详细设备信息: 品牌={device_info.get('brand')}, 型号={device_info.get('model')}, SDK={device_info.get('sdk')}")
return d
except Exception as e:
logger.error(f"设备连接失败: {e}")
return None
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方"):
"""
全面采用 CV + OCR 识别微信聊天截图中的最后一条消息
不再使用 VLM
"""
try:
# 1. 初始化
d = device if device else connect_device()
if not d:
return [], None
# 2. 读取图片
img = cv2.imread(image_path)
if img is None:
logger.error(f"无法读取图片: {image_path}")
return [], None
h, w = img.shape[:2]
# 3. 模板匹配寻找语音图标和红点
audio_template = os.path.join(TEMPLATE_DIR, "audio.jpg")
red_point_template = os.path.join(TEMPLATE_DIR, "red_point.jpg")
audio_matches = find_all_template_matches(image_path, audio_template, threshold=0.8)
red_points = find_all_template_matches(image_path, red_point_template, threshold=0.8)
# 4. OCR 识别所有文本
logger.info("正在执行 OCR 识别...")
ocr_results = ocr_kit.read_text(image_path)
# 5. 整合所有消息
messages = []
debug_img = img.copy() # 初始化调试图
# A. 添加语音消息
for ax, ay in audio_matches:
# 过滤掉顶部和底部的非聊天区域 (经验值: 顶部150, 底部250)
if ay < 150 or ay > h - 250:
continue
sender = "对方" if ax < w / 2 else ""
is_unread = False
for rx, ry in red_points:
# 红点通常在语音图标右侧且 Y 轴相近
if abs(ry - ay) < 50 and rx > ax:
is_unread = True
break
# 根据已读/未读画框:未读红框,已读绿框
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
# --- 新增:判断是否已转文字 ---
is_converted = False
for bbox, text, conf in ocr_results:
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 转换后的文字通常在语音图标下方 30-300 像素内,且水平位置相近
if 30 < c_y - ay < 300 and abs(c_x - ax) < 200:
is_converted = True
break
label = "YES" if is_converted else "NO"
# 在框的右侧标注 YES 或 NO
cv2.putText(debug_img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
# --- 结束 ---
messages.append({
"type": "voice",
"sender": sender,
"center": (ax, ay),
"y": ay,
"is_unread": is_unread,
"is_converted": is_converted
})
# B. 添加文本消息
# 简单策略:排除掉明显是系统时间、输入框或顶部标题的文字
for bbox, text, conf in ocr_results:
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 过滤区域
if c_y < 150 or c_y > h - 250:
continue
# 过滤掉单字(可能是头像旁边的文字或杂质)和某些系统词
if len(text) < 1 and "昨天" not in text and "今天" not in text:
continue
sender = "对方" if c_x < w / 2 else ""
messages.append({
"type": "text",
"sender": sender,
"content": text,
"center": (c_x, c_y),
"y": c_y
})
# 6. 排序并找出最后一条消息
if not messages:
logger.warning("未发现任何消息")
if output_path:
cv2.imwrite(output_path, debug_img)
return [], None
# 按 Y 坐标从上到下排序
messages.sort(key=lambda x: x['y'])
last_msg = messages[-1]
if output_path:
cv2.imwrite(output_path, debug_img)
logger.info(f"调试图已保存: {output_path}")
dialogue_log = []
input_field_coordinates = (w // 2, int(h * 0.9)) # 默认输入框位置
# 7. 自动处理所有“红框 + NO”的语音消息
unconverted_voices = [m for m in messages if m['type'] == 'voice' and m.get('is_unread') and not m.get('is_converted')]
if unconverted_voices:
logger.info(f"发现 {len(unconverted_voices)} 条未转换的未读语音,开始处理...")
for v_msg in unconverted_voices:
vx, vy = int(v_msg['center'][0]), int(v_msg['center'][1])
logger.info(f"--- 正在处理语音消息 ({vx}, {vy}) ---")
# A. 长按语音
logger.info(f"正在长按语音消息 ({vx}, {vy})...")
d.long_click(vx, vy, 1.5)
time.sleep(1.5)
# B. 截图寻找“转文字”按钮
menu_shot = os.path.join(OUTPUT_DIR, f"voice_menu_{vy}.jpg")
d.screenshot(menu_shot)
zhuan_template = os.path.join(TEMPLATE_DIR, "zhun_wen_zi.jpg")
# 降低阈值到 0.7 以增加匹配成功率
btn_pos = find_template_match(menu_shot, zhuan_template, threshold=0.7)
if btn_pos:
btn_x, btn_y = int(btn_pos[0]), int(btn_pos[1])
logger.info(f"✅ 找到'转文字'按钮: ({btn_x}, {btn_y}),点击中...")
d.click(btn_x, btn_y)
# 等待转换完成 (根据语音长度,通常 3-5 秒足够)
logger.info("等待语音转文字完成...")
time.sleep(5.0)
# C. 再次截图 OCR 获取转换后的文字
after_convert_shot = os.path.join(OUTPUT_DIR, f"after_auto_{vy}.jpg")
d.screenshot(after_convert_shot)
convert_ocr = ocr_kit.read_text(after_convert_shot)
# 提取转换文字:寻找在语音图标下方的文字块
converted_text = ""
for c_bbox, c_text, c_conf in convert_ocr:
cc_x = (c_bbox[0][0] + c_bbox[2][0]) / 2
cc_y = (c_bbox[0][1] + c_bbox[2][1]) / 2
# 转换后的文字通常在语音图标下方 30-300 像素内,且水平位置相近
if 30 < cc_y - vy < 300 and abs(cc_x - vx) < 250:
converted_text = c_text
break
if converted_text:
logger.info(f"✨ OCR 识别成功!")
print(f"\n[语音转文字结果]: {converted_text}\n")
# 同步到消息对象
v_msg['content'] = converted_text
v_msg['is_converted'] = True
# 如果这条消息也是最后一条消息,更新 dialogue_log 需要的内容
if v_msg == last_msg:
last_msg['content'] = converted_text
else:
logger.warning("❌ OCR 未能提取到转换后的文字内容")
# D. 长按并点击“取消转文字”恢复界面
logger.info("正在恢复界面状态 (点击'取消转文字')...")
d.long_click(vx, vy, 1.5)
time.sleep(1.0)
cancel_shot = os.path.join(OUTPUT_DIR, f"cancel_menu_{vy}.jpg")
d.screenshot(cancel_shot)
cancel_template = os.path.join(TEMPLATE_DIR, "cancel_zhuan_wen_zi.jpg")
cancel_btn = find_template_match(cancel_shot, cancel_template, threshold=0.7)
if cancel_btn:
c_btn_x, c_btn_y = int(cancel_btn[0]), int(cancel_btn[1])
d.click(c_btn_x, c_btn_y)
logger.info(f"✅ 已点击'取消转文字' ({c_btn_x}, {c_btn_y}),界面已恢复")
else:
# 兜底:点击语音图标右侧空白处尝试关闭菜单
logger.warning("⚠️ 未找到'取消转文字'按钮,尝试点击空白处关闭菜单")
d.click(vx + 300, vy)
else:
logger.warning("❌ 未能找到'转文字'按钮,可能长按失败或模板不匹配")
# 尝试点击空白处退出菜单
d.click(vx + 300, vy)
# 8. 整合对话日志 (仅针对最后一条消息进行反馈)
dialogue_log = []
if last_msg['type'] == 'voice':
# 优先使用刚才转文字得到的内容
content = last_msg.get('content') or "[语音]"
dialogue_log.append(f"{last_msg['sender']}: {content}")
else:
dialogue_log.append(f"{last_msg['sender']}: {last_msg['content']}")
return dialogue_log, input_field_coordinates
except Exception as e:
logger.error(f"analyze_chat_image 失败: {e}", exc_info=True)
return [], None
def clean_screenshots_dir():
"""清理截图目录"""
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
return
for f in os.listdir(OUTPUT_DIR):
if f.lower().endswith(('.jpg', '.png', '.jpeg')):
try:
os.remove(os.path.join(OUTPUT_DIR, f))
except Exception as e:
logger.warning(f"Failed to delete {f}: {e}")
def is_in_chat_interface(d):
"""
检查是否在微信聊天界面
"""
try:
# 1. 底部语音/键盘切换按钮
if d(description="切换到语音").exists or d(description="切换到键盘").exists:
return True
# 2. 底部输入框
if d(className="android.widget.EditText").exists:
return True
# 3. 底部“按住说话”按钮
if d(text="按住说话").exists:
return True
# 4. 右上角更多按钮
if d(description="聊天信息").exists:
return True
except Exception as e:
logger.warning(f"is_in_chat_interface check failed: {e}")
return False
def find_input_box_center(image_path):
"""
寻找输入框中心坐标 (兜底策略)
优先使用几何特征 (底部 88% 处)
"""
try:
if not os.path.exists(image_path):
return (540, 2100), None
img = cv2.imread(image_path)
if img is None:
return (540, 2100), None
h, w = img.shape[:2]
# 策略:直接返回屏幕底部 88% 处的中心点
center_x = int(w * 0.5)
center_y = int(h * 0.88)
logger.info(f"find_input_box_center fallback: ({center_x}, {center_y})")
return (center_x, center_y), None
except Exception as e:
logger.error(f"find_input_box_center error: {e}")
return (540, 2100), None
def find_template_match(screen_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找按钮中心坐标
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return None
img = cv2.imread(screen_path)
template = cv2.imread(template_path)
if img is None or template is None:
return None
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
if max_val >= threshold:
center_x = max_loc[0] + w // 2
center_y = max_loc[1] + h // 2
logger.info(f"Template matched! Score: {max_val:.2f}, Center: ({center_x}, {center_y})")
return (center_x, center_y)
logger.info(f"Template not matched. Max score: {max_val:.2f}")
return None
except Exception as e:
logger.error(f"Template matching failed: {e}")
return None
def find_all_template_matches(screen_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找**所有**符合条件的坐标
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return []
img = cv2.imread(screen_path)
template = cv2.imread(template_path)
if img is None or template is None:
return []
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
# 找到所有大于阈值的点
loc = np.where(res >= threshold)
points = []
for pt in zip(*loc[::-1]): # Switch collumns and rows
center_x = pt[0] + w // 2
center_y = pt[1] + h // 2
points.append((center_x, center_y))
# 简单的去重(非极大值抑制的简化版,合并相近的点)
# 这里假设红点不会重叠,暂时直接返回,或者做一个简单的聚类
# 实际应用中matchTemplate 对同一个目标周围可能会有多个连续的匹配点
# 我们需要合并它们
unique_points = []
for p in points:
is_close = False
for up in unique_points:
if abs(p[0] - up[0]) < 10 and abs(p[1] - up[1]) < 10:
is_close = True
break
if not is_close:
unique_points.append(p)
if unique_points:
logger.info(f"Found {len(unique_points)} matches for {os.path.basename(template_path)}")
return unique_points
except Exception as e:
logger.error(f"find_all_template_matches failed: {e}")
return []
def perform_input_action(d, center_point, text, auto_send=True):
"""
执行输入操作
"""
try:
# --- 新增逻辑:确保处于文字输入模式 ---
logger.info("正在检查输入模式...")
tmp_check_shot = os.path.join(OUTPUT_DIR, "temp_input_check.jpg")
d.screenshot(tmp_check_shot)
wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg")
input_text_template = os.path.join(TEMPLATE_DIR, "input_text.jpg")
# 1. 检查是否存在 '切换到文字' 图标 (表示当前是语音模式)
# 注意:这里假设 wen_zi_input.jpg 是那个“键盘”图标
wen_zi_pos = find_template_match(tmp_check_shot, wen_zi_template, threshold=0.8)
if wen_zi_pos:
logger.info(f"检测到语音模式 (找到切换文字图标: {wen_zi_pos}),点击切换...")
d.click(wen_zi_pos[0], wen_zi_pos[1])
time.sleep(1.0) # 等待 UI 切换
else:
# 2. 如果没找到切换图标,假设是文字模式,尝试点击输入区域标识
logger.info("未检测到语音模式切换图标,尝试寻找文字输入区域...")
input_text_pos = find_template_match(tmp_check_shot, input_text_template, threshold=0.8)
if input_text_pos:
logger.info(f"找到文字输入区域标识 (input_text.jpg): {input_text_pos},点击激活...")
d.click(input_text_pos[0], input_text_pos[1])
time.sleep(0.5)
else:
logger.info("未找到特定的输入区域标识,将使用默认坐标或控件查找。")
# 清理临时文件
if os.path.exists(tmp_check_shot):
try:
os.remove(tmp_check_shot)
except:
pass
# --- 新增逻辑结束 ---
# 1. 尝试找到原生输入框并输入
edit_text = d(className="android.widget.EditText")
input_success = False
if edit_text.exists:
logger.info("Found native EditText, using set_text")
try:
edit_text.click()
time.sleep(0.5)
edit_text.set_text(text)
input_success = True
except Exception as e:
logger.warning(f"Native input failed: {e}")
# 2. 如果原生输入失败,使用坐标点击 + 粘贴/输入
if not input_success:
cx, cy = center_point
logger.info(f"Using coordinate input: {center_point}")
d.click(cx, cy)
time.sleep(1.0)
try:
d.send_keys(text)
except Exception:
logger.warning("send_keys failed, trying set_clipboard")
d.set_clipboard(text)
d.click(cx, cy)
time.sleep(0.5)
# 尝试粘贴
d.press("paste")
time.sleep(1.0)
# 3. 发送
if auto_send:
# 优先使用模板匹配寻找“发送”按钮
logger.info("尝试使用模板匹配寻找'发送'按钮...")
tmp_screen = os.path.join(os.path.dirname(os.path.abspath(__file__)), "temp_send_check.jpg")
d.screenshot(tmp_screen)
# 使用相对路径
template_path = os.path.join(TEMPLATE_DIR, "send.jpg")
send_btn_pos = find_template_match(tmp_screen, template_path, threshold=0.7) # 稍微降低阈值以提高召回
if send_btn_pos:
logger.info(f"通过模板匹配找到发送按钮: {send_btn_pos}, 点击...")
d.click(send_btn_pos[0], send_btn_pos[1])
else:
logger.warning("模板匹配未找到发送按钮,尝试原生控件查找...")
if d(text="发送").exists:
d(text="发送").click()
logger.info("Clicked '发送'")
else:
d.press("enter")
logger.info("Pressed Enter")
# 清理临时文件
if os.path.exists(tmp_screen):
try:
os.remove(tmp_screen)
except:
pass
return True
except Exception as e:
logger.error(f"perform_input_action error: {e}")
return False
def match_template_center(image_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找目标图片中心坐标
"""
try:
if not os.path.exists(image_path) or not os.path.exists(template_path):
logger.error(f"Image or template not found: {image_path}, {template_path}")
return None
img = cv2.imread(image_path)
template = cv2.imread(template_path)
if img is None or template is None:
logger.error("Failed to read image or template")
return None
# 转换为灰度图进行匹配
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
# 模板匹配
result = cv2.matchTemplate(img_gray, template_gray, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val >= threshold:
h, w = template_gray.shape
top_left = max_loc
center_x = int(top_left[0] + w / 2)
center_y = int(top_left[1] + h / 2)
logger.info(f"Template matched with confidence {max_val:.2f} at ({center_x}, {center_y})")
return (center_x, center_y)
else:
logger.warning(f"Template match failed. Max confidence: {max_val:.2f} < Threshold: {threshold}")
return None
except Exception as e:
logger.error(f"match_template_center error: {e}")
return None