Files
aiData/WeiXin/WxUtil.py
HuangHai 7514d3119f 'commit'
2026-01-26 13:43:12 +08:00

670 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import uiautomator2 as u2
import time
import logging
import sys
import os
import cv2
import numpy as np
import re
# 添加项目根目录到 sys.path 以便导入 Util
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
import json
from Util.EasyOcrKit import EasyOcrKit
# 初始化 EasyOcrKit
ocr_kit = EasyOcrKit()
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("WxUtil")
# 目录配置
BASE_DATA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(BASE_DATA_DIR, "Logs")
OUTPUT_DIR = os.path.join(BASE_DATA_DIR, "Output")
TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Templates")
def clear_directory(dir_path):
"""清理指定目录下的所有文件"""
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return
import shutil
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
logger.warning(f"Failed to delete {file_path}. Reason: {e}")
def setup_script_environment():
"""运行前清理日志和输出目录"""
logger.info("清理运行环境: Logs 和 Output 目录...")
clear_directory(LOG_DIR)
clear_directory(OUTPUT_DIR)
def connect_device():
"""
连接设备并返回设备对象,同时打印详细的设备信息
"""
try:
d = u2.connect()
# 强制检查连接是否可用
if not d.info:
logger.error("设备连接不可用 (d.info is empty)")
return None
# 获取可靠的序列号
device_serial = d.serial if hasattr(d, 'serial') else "未知"
logger.info(f"设备连接成功: {device_serial}")
# 获取并打印详细设备信息
device_info = d.device_info
logger.info(f"详细设备信息: 品牌={device_info.get('brand')}, 型号={device_info.get('model')}, SDK={device_info.get('sdk')}")
return d
except Exception as e:
logger.error(f"设备连接失败: {e}")
return None
def safe_device_click(d, x, y):
"""
安全的点击操作,包含简单的异常捕获和重试逻辑
"""
try:
d.click(x, y)
return True
except Exception as e:
logger.warning(f"点击操作失败 ({x}, {y}): {e},尝试重新连接并重试...")
try:
# 尝试重新初始化连接
new_d = u2.connect()
new_d.click(x, y)
return True
except Exception as e2:
logger.error(f"重试点击操作依然失败: {e2}")
return False
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方"):
"""
全面采用 CV + OCR 识别微信聊天截图中的最后一条消息
不再使用 VLM
"""
try:
# 1. 初始化
d = device if device else connect_device()
if not d:
return [], None
# 2. 读取图片
img = cv2.imread(image_path)
if img is None:
logger.error(f"无法读取图片: {image_path}")
return [], None
h, w = img.shape[:2]
# 3. 模板匹配寻找语音图标和红点
audio_template = os.path.join(TEMPLATE_DIR, "audio.jpg")
red_point_template = os.path.join(TEMPLATE_DIR, "red_point.jpg")
audio_matches = find_all_template_matches(image_path, audio_template, threshold=0.8)
red_points = find_all_template_matches(image_path, red_point_template, threshold=0.8)
# 4. OCR 识别所有文本
logger.info("正在执行 OCR 识别...")
ocr_results = ocr_kit.read_text(image_path)
# 5. 整合所有消息
messages = []
debug_img = img.copy() # 初始化调试图
# A. 添加语音消息
for ax, ay in audio_matches:
# 过滤掉顶部和底部的非聊天区域 (经验值: 顶部150, 底部250)
if ay < 150 or ay > h - 250:
continue
sender = "对方" if ax < w / 2 else ""
is_unread = False
for rx, ry in red_points:
# 红点通常在语音图标右侧且 Y 轴相近
if abs(ry - ay) < 50 and rx > ax:
is_unread = True
break
# 根据已读/未读画框:未读红框,已读绿框
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
# --- 新增:判断是否已转文字 ---
is_converted = False
for bbox, text, conf in ocr_results:
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 转换后的文字通常在语音图标下方 30-300 像素内,且水平位置相近
if 30 < c_y - ay < 300 and abs(c_x - ax) < 200:
is_converted = True
break
label = "YES" if is_converted else "NO"
# 在框的右侧标注 YES 或 NO
cv2.putText(debug_img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
# --- 结束 ---
messages.append({
"type": "voice",
"sender": sender,
"center": (ax, ay),
"y": ay,
"is_unread": is_unread,
"is_converted": is_converted
})
# B. 添加文本消息
for bbox, text, conf in ocr_results:
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 过滤区域 (顶部标题栏和底部输入栏)
# 底部输入栏通常在最后 150 像素左右
if c_y < 150 or c_y > h - 150:
continue
# 过滤掉明显的系统词 (通常是日期或时间)
# 匹配如: "2025年12月28日 11:18", "11:18", "昨天 09:26" 等
# 增加对 OCR 误识别的容错 (如 28811:18)
time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)'
# 如果文本包含这些关键词且长度较短,或者是纯数字/标点组合
if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)):
continue
# 过滤掉语音时长标识 (如 "5\"", "10\"", "小8\"")
if re.match(r'^.?[0-9]{1,2}"?$', text.strip()):
continue
# 过滤掉“撤回了一条消息”等系统提示
if "撤回了一条消息" in text or "打招呼的消息" in text:
continue
# 改进发送者判定:查看文本块的左边界
# 对方的消息靠左,我的消息靠右
left_x = bbox[0][0]
sender = "对方" if left_x < w * 0.3 else ""
messages.append({
"type": "text",
"sender": sender,
"content": text.strip(),
"center": (c_x, c_y),
"y": c_y
})
# 6. 排序并找出最后一条消息
if not messages:
logger.warning("未发现任何消息")
if output_path:
cv2.imwrite(output_path, debug_img)
return [], None
# 按 Y 坐标从上到下排序
messages.sort(key=lambda x: x['y'])
last_msg = messages[-1]
if output_path:
cv2.imwrite(output_path, debug_img)
logger.info(f"调试图已保存: {output_path}")
dialogue_log = []
input_field_coordinates = (w // 2, int(h * 0.9)) # 默认输入框位置
# 7. 自动处理所有尚未转换的语音消息
# 获取所有语音消息(不论已读未读,只要没转换成文字就处理)
unconverted_voices = [m for m in messages if m['type'] == 'voice' and not m.get('is_converted')]
if unconverted_voices:
logger.info(f"发现 {len(unconverted_voices)} 条未转换的语音,开始处理...")
for v_msg in unconverted_voices:
vx, vy = int(v_msg['center'][0]), int(v_msg['center'][1])
logger.info(f"--- 正在处理语音消息 ({vx}, {vy}) ---")
# A. 长按语音
logger.info(f"正在长按语音消息 ({vx}, {vy})...")
d.long_click(vx, vy, 1.5)
time.sleep(1.5)
# B. 截图寻找“转文字”按钮
menu_shot = os.path.join(OUTPUT_DIR, f"voice_menu_{vy}.jpg")
d.screenshot(menu_shot)
zhuan_template = os.path.join(TEMPLATE_DIR, "zhun_wen_zi.jpg")
btn_pos = find_template_match(menu_shot, zhuan_template, threshold=0.7)
if btn_pos:
btn_x, btn_y = int(btn_pos[0]), int(btn_pos[1])
logger.info(f"✅ 找到'转文字'按钮: ({btn_x}, {btn_y}),点击中...")
safe_device_click(d, btn_x, btn_y)
# 等待转换完成
logger.info("等待语音转文字完成...")
time.sleep(5.0)
# C. 再次截图 OCR 获取转换后的文字
after_convert_shot = os.path.join(OUTPUT_DIR, f"after_auto_{vy}.jpg")
try:
d.screenshot(after_convert_shot)
convert_ocr = ocr_kit.read_text(after_convert_shot)
except Exception as e:
logger.error(f"截图或 OCR 失败: {e}")
convert_ocr = []
# 提取转换文字(合并多行结果)
text_blocks = []
for c_bbox, c_text, c_conf in convert_ocr:
cc_x = (c_bbox[0][0] + c_bbox[2][0]) / 2
cc_y = (c_bbox[0][1] + c_bbox[2][1]) / 2
# 扩大搜索范围,适应更长的转换结果
# 增加 sender 判断 (通过水平位置判定)
c_left_x = c_bbox[0][0]
c_sender = "对方" if c_left_x < w * 0.3 else ""
if 30 < cc_y - vy < 600 and abs(cc_x - vx) < 400 and c_sender == v_msg['sender']:
text_blocks.append((cc_y, c_text))
# 按 Y 坐标排序并合并
text_blocks.sort(key=lambda x: x[0])
converted_text = "".join([t[1] for t in text_blocks])
if converted_text:
logger.info(f"✨ OCR 识别成功: {converted_text}")
v_msg['content'] = converted_text
v_msg['is_converted'] = True
else:
logger.warning("❌ OCR 未能提取到转换后的文字内容")
# D. 长按并点击“取消转文字”恢复界面
try:
logger.info("正在恢复界面状态 (点击'取消转文字')...")
d.long_click(vx, vy, 1.5)
time.sleep(1.0)
cancel_shot = os.path.join(OUTPUT_DIR, f"cancel_menu_{vy}.jpg")
d.screenshot(cancel_shot)
cancel_template = os.path.join(TEMPLATE_DIR, "cancel_zhuan_wen_zi.jpg")
cancel_btn = find_template_match(cancel_shot, cancel_template, threshold=0.7)
if cancel_btn:
c_btn_x, c_btn_y = int(cancel_btn[0]), int(cancel_btn[1])
safe_device_click(d, c_btn_x, c_btn_y)
logger.info(f"✅ 已点击'取消转文字' ({c_btn_x}, {c_btn_y}),界面已恢复")
else:
logger.warning("⚠️ 未找到'取消转文字'按钮,尝试点击空白处关闭菜单")
safe_device_click(d, vx + 300, vy)
except Exception as e:
logger.error(f"恢复界面状态时发生错误: {e}")
else:
logger.warning("❌ 未能找到'转文字'按钮,点击空白处退出")
safe_device_click(d, vx + 300, vy)
# 8. 重新排序并生成完整的对话日志
# 先合并已经处理好的语音消息内容
# 排除掉转换文字本身产生的 OCR 文本干扰(如果 OCR 识别结果包含在文本消息中,需要过滤)
final_messages = []
# 1. 识别并归档所有属于语音转换出来的文字
for v_msg in messages:
if v_msg['type'] == 'voice':
vx, vy = v_msg['center']
v_content_blocks = []
# 找出所有在语音图标下方且水平相近的文本块,且发送者一致
for msg in messages:
if msg['type'] == 'text':
cx, cy = msg['center']
# 1. 垂直距离在合理范围内 (30 到 600 像素)
# 2. 发送者一致 (确保归属正确)
# 3. 水平偏移在合理范围内 (对于对方cx 应该在左侧对于我cx 应该在右侧)
if 30 < cy - vy < 600 and msg['sender'] == v_msg['sender']:
# 进一步检查水平位置,确保文字在语音图标的大致垂直线上或稍有偏移
if abs(cx - vx) < 400:
v_content_blocks.append(msg)
msg['is_voice_part'] = True
# 如果有内容块,按 Y 排序并合并
if v_content_blocks:
v_content_blocks.sort(key=lambda x: x['y'])
combined_content = "".join([m['content'] for m in v_content_blocks])
v_msg['content'] = combined_content
v_msg['is_converted'] = True
# 2. 收集最终要显示的消息(排除被标记为语音部分的文本)
for msg in messages:
if msg['type'] == 'text':
if not msg.get('is_voice_part', False):
final_messages.append(msg)
else:
final_messages.append(msg)
# 按 Y 坐标排序
final_messages.sort(key=lambda x: x['y'])
# 格式化输出到控制台
print("\n" + "="*50)
print(" --- 微信聊天记录提取结果 ---")
print("="*50)
dialogue_log = []
for msg in final_messages:
sender = msg['sender']
content = msg.get('content') or (msg.get('text') if 'text' in msg else "[未识别内容]")
if msg['type'] == 'voice':
content = f"[语音] {content}"
log_line = f"{sender}: {content}"
dialogue_log.append(log_line)
print(log_line)
print("="*50 + "\n")
return dialogue_log, input_field_coordinates
except Exception as e:
logger.error(f"analyze_chat_image 失败: {e}", exc_info=True)
return [], None
def clean_screenshots_dir():
"""清理截图目录"""
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
return
for f in os.listdir(OUTPUT_DIR):
if f.lower().endswith(('.jpg', '.png', '.jpeg')):
try:
os.remove(os.path.join(OUTPUT_DIR, f))
except Exception as e:
logger.warning(f"Failed to delete {f}: {e}")
def is_in_chat_interface(d):
"""
检查是否在微信聊天界面
"""
try:
# 1. 底部语音/键盘切换按钮
if d(description="切换到语音").exists or d(description="切换到键盘").exists:
return True
# 2. 底部输入框
if d(className="android.widget.EditText").exists:
return True
# 3. 底部“按住说话”按钮
if d(text="按住说话").exists:
return True
# 4. 右上角更多按钮
if d(description="聊天信息").exists:
return True
except Exception as e:
logger.warning(f"is_in_chat_interface check failed: {e}")
return False
def find_input_box_center(image_path):
"""
寻找输入框中心坐标 (兜底策略)
优先使用几何特征 (底部 88% 处)
"""
try:
if not os.path.exists(image_path):
return (540, 2100), None
img = cv2.imread(image_path)
if img is None:
return (540, 2100), None
h, w = img.shape[:2]
# 策略:直接返回屏幕底部 88% 处的中心点
center_x = int(w * 0.5)
center_y = int(h * 0.88)
logger.info(f"find_input_box_center fallback: ({center_x}, {center_y})")
return (center_x, center_y), None
except Exception as e:
logger.error(f"find_input_box_center error: {e}")
return (540, 2100), None
def find_template_match(screen_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找按钮中心坐标
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return None
img = cv2.imread(screen_path)
template = cv2.imread(template_path)
if img is None or template is None:
return None
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
if max_val >= threshold:
center_x = max_loc[0] + w // 2
center_y = max_loc[1] + h // 2
logger.info(f"Template matched! Score: {max_val:.2f}, Center: ({center_x}, {center_y})")
return (center_x, center_y)
logger.info(f"Template not matched. Max score: {max_val:.2f}")
return None
except Exception as e:
logger.error(f"Template matching failed: {e}")
return None
def find_all_template_matches(screen_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找**所有**符合条件的坐标
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return []
img = cv2.imread(screen_path)
template = cv2.imread(template_path)
if img is None or template is None:
return []
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
# 找到所有大于阈值的点
loc = np.where(res >= threshold)
points = []
for pt in zip(*loc[::-1]): # Switch collumns and rows
center_x = pt[0] + w // 2
center_y = pt[1] + h // 2
points.append((center_x, center_y))
# 简单的去重(非极大值抑制的简化版,合并相近的点)
# 这里假设红点不会重叠,暂时直接返回,或者做一个简单的聚类
# 实际应用中matchTemplate 对同一个目标周围可能会有多个连续的匹配点
# 我们需要合并它们
unique_points = []
for p in points:
is_close = False
for up in unique_points:
if abs(p[0] - up[0]) < 10 and abs(p[1] - up[1]) < 10:
is_close = True
break
if not is_close:
unique_points.append(p)
if unique_points:
logger.info(f"Found {len(unique_points)} matches for {os.path.basename(template_path)}")
return unique_points
except Exception as e:
logger.error(f"find_all_template_matches failed: {e}")
return []
def perform_input_action(d, center_point, text, auto_send=True):
"""
执行输入操作
"""
try:
# --- 新增逻辑:确保处于文字输入模式 ---
logger.info("正在检查输入模式...")
tmp_check_shot = os.path.join(OUTPUT_DIR, "temp_input_check.jpg")
d.screenshot(tmp_check_shot)
wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg")
input_text_template = os.path.join(TEMPLATE_DIR, "input_text.jpg")
# 1. 检查是否存在 '切换到文字' 图标 (表示当前是语音模式)
# 注意:这里假设 wen_zi_input.jpg 是那个“键盘”图标
wen_zi_pos = find_template_match(tmp_check_shot, wen_zi_template, threshold=0.8)
if wen_zi_pos:
logger.info(f"检测到语音模式 (找到切换文字图标: {wen_zi_pos}),点击切换...")
d.click(wen_zi_pos[0], wen_zi_pos[1])
time.sleep(1.0) # 等待 UI 切换
else:
# 2. 如果没找到切换图标,假设是文字模式,尝试点击输入区域标识
logger.info("未检测到语音模式切换图标,尝试寻找文字输入区域...")
input_text_pos = find_template_match(tmp_check_shot, input_text_template, threshold=0.8)
if input_text_pos:
logger.info(f"找到文字输入区域标识 (input_text.jpg): {input_text_pos},点击激活...")
d.click(input_text_pos[0], input_text_pos[1])
time.sleep(0.5)
else:
logger.info("未找到特定的输入区域标识,将使用默认坐标或控件查找。")
# 清理临时文件
if os.path.exists(tmp_check_shot):
try:
os.remove(tmp_check_shot)
except:
pass
# --- 新增逻辑结束 ---
# 1. 尝试找到原生输入框并输入
edit_text = d(className="android.widget.EditText")
input_success = False
if edit_text.exists:
logger.info("Found native EditText, using set_text")
try:
edit_text.click()
time.sleep(0.5)
edit_text.set_text(text)
input_success = True
except Exception as e:
logger.warning(f"Native input failed: {e}")
# 2. 如果原生输入失败,使用坐标点击 + 粘贴/输入
if not input_success:
cx, cy = center_point
logger.info(f"Using coordinate input: {center_point}")
d.click(cx, cy)
time.sleep(1.0)
try:
d.send_keys(text)
except Exception:
logger.warning("send_keys failed, trying set_clipboard")
d.set_clipboard(text)
d.click(cx, cy)
time.sleep(0.5)
# 尝试粘贴
d.press("paste")
time.sleep(1.0)
# 3. 发送
if auto_send:
# 优先使用模板匹配寻找“发送”按钮
logger.info("尝试使用模板匹配寻找'发送'按钮...")
tmp_screen = os.path.join(os.path.dirname(os.path.abspath(__file__)), "temp_send_check.jpg")
d.screenshot(tmp_screen)
# 使用相对路径
template_path = os.path.join(TEMPLATE_DIR, "send.jpg")
send_btn_pos = find_template_match(tmp_screen, template_path, threshold=0.7) # 稍微降低阈值以提高召回
if send_btn_pos:
logger.info(f"通过模板匹配找到发送按钮: {send_btn_pos}, 点击...")
d.click(send_btn_pos[0], send_btn_pos[1])
else:
logger.warning("模板匹配未找到发送按钮,尝试原生控件查找...")
if d(text="发送").exists:
d(text="发送").click()
logger.info("Clicked '发送'")
else:
d.press("enter")
logger.info("Pressed Enter")
# 清理临时文件
if os.path.exists(tmp_screen):
try:
os.remove(tmp_screen)
except:
pass
return True
except Exception as e:
logger.error(f"perform_input_action error: {e}")
return False
def match_template_center(image_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找目标图片中心坐标
"""
try:
if not os.path.exists(image_path) or not os.path.exists(template_path):
logger.error(f"Image or template not found: {image_path}, {template_path}")
return None
img = cv2.imread(image_path)
template = cv2.imread(template_path)
if img is None or template is None:
logger.error("Failed to read image or template")
return None
# 转换为灰度图进行匹配
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
# 模板匹配
result = cv2.matchTemplate(img_gray, template_gray, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val >= threshold:
h, w = template_gray.shape
top_left = max_loc
center_x = int(top_left[0] + w / 2)
center_y = int(top_left[1] + h / 2)
logger.info(f"Template matched with confidence {max_val:.2f} at ({center_x}, {center_y})")
return (center_x, center_y)
else:
logger.warning(f"Template match failed. Max confidence: {max_val:.2f} < Threshold: {threshold}")
return None
except Exception as e:
logger.error(f"match_template_center error: {e}")
return None