Files
aiData/WeiXin/WxUtil.py
HuangHai 4868198143 'commit'
2026-01-26 09:50:09 +08:00

664 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import uiautomator2 as u2
import time
import logging
import sys
import os
import cv2
import numpy as np
import re
# 添加项目根目录到 sys.path 以便导入 Util
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
import json
from Util.VLMKit import VLMKit
from Util.EasyOcrKit import EasyOcrKit
# 初始化 VLMKit 和 EasyOcrKit
vlm_kit = VLMKit()
ocr_kit = EasyOcrKit()
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("WxUtil")
# 目录配置
BASE_DATA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(BASE_DATA_DIR, "Logs")
OUTPUT_DIR = os.path.join(BASE_DATA_DIR, "Output")
TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Templates")
def clear_directory(dir_path):
"""清理指定目录下的所有文件"""
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return
import shutil
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
logger.warning(f"Failed to delete {file_path}. Reason: {e}")
def setup_script_environment():
"""运行前清理日志和输出目录"""
logger.info("清理运行环境: Logs 和 Output 目录...")
clear_directory(LOG_DIR)
clear_directory(OUTPUT_DIR)
def connect_device():
"""
连接设备并返回设备对象,同时打印详细的设备信息
"""
try:
d = u2.connect()
# 获取可靠的序列号
device_serial = d.serial if hasattr(d, 'serial') else "未知"
logger.info(f"设备连接成功: {device_serial}")
# 获取并打印详细设备信息
device_info = d.device_info
logger.info(f"详细设备信息: 品牌={device_info.get('brand')}, 型号={device_info.get('model')}, SDK={device_info.get('sdk')}")
return d
except Exception as e:
logger.error(f"设备连接失败: {e}")
return None
async def get_vlm_json(image_path, prompt):
"""
通用 VLM 分析函数,返回 JSON 数据 (自动处理归一化坐标的反归一化)
"""
try:
# 调用 VLM
response = await vlm_kit.analyze_image(image_path, prompt)
json_str = vlm_kit.extract_json(response)
result_data = json.loads(json_str)
# 获取图片尺寸进行坐标反归一化
try:
from PIL import Image
with Image.open(image_path) as img:
width, height = img.size
# 定义反归一化函数
def denormalize(point):
if not point or len(point) != 2:
return point
return [int(point[0] / 1000 * width), int(point[1] / 1000 * height)]
# 递归遍历字典进行反归一化 (仅针对常见坐标字段 center, input_box)
def recursive_denormalize(data):
if isinstance(data, dict):
for key, value in data.items():
if key in ["center", "input_box", "coordinates"] and isinstance(value, list) and len(value) == 2:
data[key] = denormalize(value)
elif isinstance(value, (dict, list)):
recursive_denormalize(value)
elif isinstance(data, list):
for item in data:
recursive_denormalize(item)
recursive_denormalize(result_data)
except Exception as e:
logger.warning(f"坐标反归一化失败: {e},将使用原始坐标")
return result_data
except Exception as e:
logger.error(f"VLM Analysis Failed: {e}", exc_info=True)
return None
async def get_vlm_analysis(image_path):
"""
仅调用 VLM 分析图片,返回原始 JSON 数据 (dict)
"""
logger.info(f"正在使用 VLM 分析图片: {image_path}")
# 构造 Prompt
prompt = """
请分析这张微信聊天截图,提取所有对话消息。
【核心规则 - 优先级最高】
1. 🚀 **从下往上扫描**:必须确保屏幕最底部的消息被识别。很多时候最底部的消息是最重要的。
2. 🔴 **未读红点 (Unread)**:极度关注语音气泡右上角的红点。如果有红点,`is_unread` 必须为 true。
3. 📦 **完整性**:识别图中【所有】可见的消息气泡,包括文本消息、语音消息、系统提示(如“昨天 10:36”、“你撤回了一条消息”
【消息类型判别】
- **发送者 (Sender)**:左侧头像为“对方”(Other),右侧头像为“我”(Me)。
- **语音 (Voice)**
- 气泡内只有时长(如 5")和声波图标。
- **重点**:如果语音气泡右侧有灰色的“转文字”字样或红点,且下方没有对应的文本翻译气泡,说明它【尚未转换】。
- `status` 判断:只有当语音气泡【正下方】紧跟着一个相同发送者的文本气泡(内容是翻译结果),`status` 才为 "converted"。否则为 "unconverted"
- **文本 (Text)**:气泡内包含具体的文字内容。
【坐标系统】
- 使用 [0-1000] 归一化坐标。返回气泡的几何中心点 `center`。
- 识别底部输入框的位置 `input_box`。
【输出格式】
请返回纯 JSON 格式:
{
"is_chat_interface": true,
"input_box": [x, y],
"messages": [
{
"type": "voice" | "text" | "system",
"sender": "对方" | "" | "系统",
"status": "converted" | "unconverted",
"is_unread": true | false,
"center": [x, y],
"content": "消息内容或时长"
},
...
]
}
"""
2. <EFBFBD> **红点 (Unread)**极度关注语音气泡右上角的红点如果有红点`is_unread` 必须为 true
3. 📦 **完整性**识别图中所有可见的消息气泡不要遗漏任何一个特别是连续的语音消息
消息类型判别
- **发送者 (Sender)**左侧头像为对方(Other)右侧头像为(Me)
- **语音 (Voice)**气泡内只有时长 5")和声波图标。
- 语音气泡右侧可能有转文字取消等灰色小字请忽略这些文字气泡依然是 Voice
- `status` 判断如果语音气泡下方紧接着有一个属于同一人的文本气泡且内容看起来像翻译结果 `status` "converted"否则为 "unconverted"
- **文本 (Text)**气泡内包含具体的文字内容
坐标系统
- 使用 [0-1000] 归一化坐标返回气泡的几何中心点 `center`
- 识别底部输入框的位置 `input_box`
输出格式
请返回纯 JSON 格式
{
"is_chat_interface": true,
"input_box": [x, y],
"messages": [
{
"type": "voice",
"sender": "对方" | "",
"status": "converted" | "unconverted",
"is_unread": true | false,
"center": [x, y],
"content": "8\""
},
...
]
}
"""
try:
# 调用 VLM
response = await vlm_kit.analyze_image(image_path, prompt)
logger.info(f"VLM Raw Response: {response}") # 打印原始响应以便调试
json_str = vlm_kit.extract_json(response)
result_data = json.loads(json_str)
# 获取图片尺寸进行坐标反归一化
try:
from PIL import Image
with Image.open(image_path) as img:
width, height = img.size
# 定义反归一化函数
def denormalize(point):
if not point or len(point) != 2:
return point
return [int(point[0] / 1000 * width), int(point[1] / 1000 * height)]
# 反归一化 input_box
if result_data.get("input_box"):
result_data["input_box"] = denormalize(result_data["input_box"])
# 反归一化 messages
if result_data.get("messages"):
for msg in result_data["messages"]:
if msg.get("center"):
msg["center"] = denormalize(msg["center"])
if msg.get("coordinates"): # 兼容旧字段
msg["coordinates"] = denormalize(msg["coordinates"])
except Exception as e:
logger.warning(f"坐标反归一化失败: {e},将使用原始坐标")
return result_data
except Exception as e:
logger.error(f"VLM Analysis Failed: {e}", exc_info=True)
return None
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方"):
"""
使用 VLM 识别微信聊天截图中的对话内容语音消息状态以及输入框位置
替代原本的 CV/OCR 方案
"""
# 语音识别标志
should_trigger_convert = False
try:
result_data = await get_vlm_analysis(image_path)
if not result_data:
return [], None
try:
# 检查是否为聊天界面
is_chat = result_data.get("is_chat_interface", False)
if not is_chat:
logger.warning("VLM 判断当前不是微信聊天界面")
return None, None
if isinstance(result_data, list):
# 兼容旧格式
messages = result_data
input_field_coordinates = None
else:
messages = result_data.get("messages", [])
input_field_coordinates = result_data.get("input_box") # input_box
except Exception as e:
logger.error(f"解析 VLM 结果失败: {e}")
return [], None
dialogue_log = []
unconverted_voices = []
# 处理识别结果
for msg in messages:
sender = msg.get('sender', '未知')
msg_type = msg.get('type', 'other')
content = msg.get('content', '')
coords = msg.get('center', [0, 0]) # center
status = msg.get('status', 'unconverted')
is_unread = msg.get('is_unread', False)
is_converted = (status == "converted")
unread_mark = "[未读]" if is_unread else ""
# 记录对话日志
if msg_type == 'voice':
if is_converted:
dialogue_log.append(f"{sender}: {unread_mark}[语音] {content} (已转换)")
else:
dialogue_log.append(f"{sender}: {unread_mark}[语音] (待转换)")
# 将 center 转换为 coordinates 供后续使用
msg['coordinates'] = coords
unconverted_voices.append(msg)
elif msg_type == 'text':
dialogue_log.append(f"{sender}: {content}")
logger.info(f"VLM 识别: {sender} [{msg_type}] {content} (Converted: {is_converted}, Unread: {is_unread})")
# 处理未转换的语音消息
if unconverted_voices:
# 优先级1. 有红点的最后一条 2. 没红点的最后一条
unread_voices = [v for v in unconverted_voices if v.get('is_unread')]
if unread_voices:
logger.info(f"发现 {len(unread_voices)} 条未读语音消息,优先处理最后一条...")
voice_to_process = unread_voices[-1]
else:
logger.info(f"发现 {len(unconverted_voices)} 条未转换语音消息,处理最后一条...")
voice_to_process = unconverted_voices[-1]
# 仅保留选中的一条进行处理
unconverted_voices = [voice_to_process]
# 使用传入的 device 或创建新连接
d = device if device else connect_device()
for voice in unconverted_voices:
vx, vy = voice['coordinates']
logger.info(f"长按语音消息: ({vx}, {vy})")
d.long_click(vx, vy, 1.5)
time.sleep(1.0)
# 查找“转文字” (使用 OCR)
menu_shot_path = os.path.join(os.path.dirname(image_path), "temp_menu_shot.jpg")
d.screenshot(menu_shot_path)
# OCR 识别
ocr_results = ocr_kit.read_text(menu_shot_path)
convert_btn_center = None
for bbox, text, conf in ocr_results:
if "转文字" in text or "转换为文字" in text:
# bbox is [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
# Calculate center
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
convert_btn_center = (c_x, c_y)
break
if convert_btn_center:
logger.info(f"OCR 找到 '转文字' 按钮: {convert_btn_center}")
d.click(convert_btn_center[0], convert_btn_center[1])
should_trigger_convert = True
# 动态等待: 60s语音约需10s转换比例约 1/6
duration_str = voice.get('content', '0').replace('"', '').strip()
try:
duration = int(duration_str)
except:
duration = 10 # 默认值
wait_seconds = max(2, duration / 5.0) # 稍微多等一点,用 /5.0
logger.info(f"语音时长 {duration}s预计等待转换 {wait_seconds:.1f}s...")
time.sleep(wait_seconds)
else:
logger.warning("OCR 未找到 '转文字' 菜单项")
# 点击空白处关闭菜单,避免遮挡
d.click(vx + 200, vy)
if should_trigger_convert:
# 转换完成后稍微多等一下,确保 UI 刷新
time.sleep(1.0)
# 即使触发了转换,我们也返回当前的对话日志,但在日志末尾注明正在转换
dialogue_log.append("系统: [正在转换语音...]")
return dialogue_log, input_field_coordinates
return dialogue_log, input_field_coordinates
except Exception as e:
logger.error(f"VLM 分析失败: {e}", exc_info=True)
return [], None
def clean_screenshots_dir():
"""清理截图目录"""
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
return
for f in os.listdir(OUTPUT_DIR):
if f.lower().endswith(('.jpg', '.png', '.jpeg')):
try:
os.remove(os.path.join(OUTPUT_DIR, f))
except Exception as e:
logger.warning(f"Failed to delete {f}: {e}")
def is_in_chat_interface(d):
"""
检查是否在微信聊天界面
"""
try:
# 1. 底部语音/键盘切换按钮
if d(description="切换到语音").exists or d(description="切换到键盘").exists:
return True
# 2. 底部输入框
if d(className="android.widget.EditText").exists:
return True
# 3. 底部“按住说话”按钮
if d(text="按住说话").exists:
return True
# 4. 右上角更多按钮
if d(description="聊天信息").exists:
return True
except Exception as e:
logger.warning(f"is_in_chat_interface check failed: {e}")
return False
def find_input_box_center(image_path):
"""
寻找输入框中心坐标 (兜底策略)
优先使用几何特征 (底部 88% )
"""
try:
if not os.path.exists(image_path):
return (540, 2100), None
img = cv2.imread(image_path)
if img is None:
return (540, 2100), None
h, w = img.shape[:2]
# 策略:直接返回屏幕底部 88% 处的中心点
center_x = int(w * 0.5)
center_y = int(h * 0.88)
logger.info(f"find_input_box_center fallback: ({center_x}, {center_y})")
return (center_x, center_y), None
except Exception as e:
logger.error(f"find_input_box_center error: {e}")
return (540, 2100), None
def find_template_match(screen_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找按钮中心坐标
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return None
img = cv2.imread(screen_path)
template = cv2.imread(template_path)
if img is None or template is None:
return None
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
if max_val >= threshold:
center_x = max_loc[0] + w // 2
center_y = max_loc[1] + h // 2
logger.info(f"Template matched! Score: {max_val:.2f}, Center: ({center_x}, {center_y})")
return (center_x, center_y)
logger.info(f"Template not matched. Max score: {max_val:.2f}")
return None
except Exception as e:
logger.error(f"Template matching failed: {e}")
return None
def find_all_template_matches(screen_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找**所有**符合条件的坐标
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return []
img = cv2.imread(screen_path)
template = cv2.imread(template_path)
if img is None or template is None:
return []
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
# 找到所有大于阈值的点
loc = np.where(res >= threshold)
points = []
for pt in zip(*loc[::-1]): # Switch collumns and rows
center_x = pt[0] + w // 2
center_y = pt[1] + h // 2
points.append((center_x, center_y))
# 简单的去重(非极大值抑制的简化版,合并相近的点)
# 这里假设红点不会重叠,暂时直接返回,或者做一个简单的聚类
# 实际应用中matchTemplate 对同一个目标周围可能会有多个连续的匹配点
# 我们需要合并它们
unique_points = []
for p in points:
is_close = False
for up in unique_points:
if abs(p[0] - up[0]) < 10 and abs(p[1] - up[1]) < 10:
is_close = True
break
if not is_close:
unique_points.append(p)
if unique_points:
logger.info(f"Found {len(unique_points)} matches for {os.path.basename(template_path)}")
return unique_points
except Exception as e:
logger.error(f"find_all_template_matches failed: {e}")
return []
def perform_input_action(d, center_point, text, auto_send=True):
"""
执行输入操作
"""
try:
# --- 新增逻辑:确保处于文字输入模式 ---
logger.info("正在检查输入模式...")
tmp_check_shot = os.path.join(OUTPUT_DIR, "temp_input_check.jpg")
d.screenshot(tmp_check_shot)
wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg")
input_text_template = os.path.join(TEMPLATE_DIR, "input_text.jpg")
# 1. 检查是否存在 '切换到文字' 图标 (表示当前是语音模式)
# 注意:这里假设 wen_zi_input.jpg 是那个“键盘”图标
wen_zi_pos = find_template_match(tmp_check_shot, wen_zi_template, threshold=0.8)
if wen_zi_pos:
logger.info(f"检测到语音模式 (找到切换文字图标: {wen_zi_pos}),点击切换...")
d.click(wen_zi_pos[0], wen_zi_pos[1])
time.sleep(1.0) # 等待 UI 切换
else:
# 2. 如果没找到切换图标,假设是文字模式,尝试点击输入区域标识
logger.info("未检测到语音模式切换图标,尝试寻找文字输入区域...")
input_text_pos = find_template_match(tmp_check_shot, input_text_template, threshold=0.8)
if input_text_pos:
logger.info(f"找到文字输入区域标识 (input_text.jpg): {input_text_pos},点击激活...")
d.click(input_text_pos[0], input_text_pos[1])
time.sleep(0.5)
else:
logger.info("未找到特定的输入区域标识,将使用默认坐标或控件查找。")
# 清理临时文件
if os.path.exists(tmp_check_shot):
try:
os.remove(tmp_check_shot)
except:
pass
# --- 新增逻辑结束 ---
# 1. 尝试找到原生输入框并输入
edit_text = d(className="android.widget.EditText")
input_success = False
if edit_text.exists:
logger.info("Found native EditText, using set_text")
try:
edit_text.click()
time.sleep(0.5)
edit_text.set_text(text)
input_success = True
except Exception as e:
logger.warning(f"Native input failed: {e}")
# 2. 如果原生输入失败,使用坐标点击 + 粘贴/输入
if not input_success:
cx, cy = center_point
logger.info(f"Using coordinate input: {center_point}")
d.click(cx, cy)
time.sleep(1.0)
try:
d.send_keys(text)
except Exception:
logger.warning("send_keys failed, trying set_clipboard")
d.set_clipboard(text)
d.click(cx, cy)
time.sleep(0.5)
# 尝试粘贴
d.press("paste")
time.sleep(1.0)
# 3. 发送
if auto_send:
# 优先使用模板匹配寻找“发送”按钮
logger.info("尝试使用模板匹配寻找'发送'按钮...")
tmp_screen = os.path.join(os.path.dirname(os.path.abspath(__file__)), "temp_send_check.jpg")
d.screenshot(tmp_screen)
# 使用相对路径
template_path = os.path.join(TEMPLATE_DIR, "send.jpg")
send_btn_pos = find_template_match(tmp_screen, template_path, threshold=0.7) # 稍微降低阈值以提高召回
if send_btn_pos:
logger.info(f"通过模板匹配找到发送按钮: {send_btn_pos}, 点击...")
d.click(send_btn_pos[0], send_btn_pos[1])
else:
logger.warning("模板匹配未找到发送按钮,尝试原生控件查找...")
if d(text="发送").exists:
d(text="发送").click()
logger.info("Clicked '发送'")
else:
d.press("enter")
logger.info("Pressed Enter")
# 清理临时文件
if os.path.exists(tmp_screen):
try:
os.remove(tmp_screen)
except:
pass
return True
except Exception as e:
logger.error(f"perform_input_action error: {e}")
return False
def match_template_center(image_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找目标图片中心坐标
"""
try:
if not os.path.exists(image_path) or not os.path.exists(template_path):
logger.error(f"Image or template not found: {image_path}, {template_path}")
return None
img = cv2.imread(image_path)
template = cv2.imread(template_path)
if img is None or template is None:
logger.error("Failed to read image or template")
return None
# 转换为灰度图进行匹配
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
# 模板匹配
result = cv2.matchTemplate(img_gray, template_gray, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val >= threshold:
h, w = template_gray.shape
top_left = max_loc
center_x = int(top_left[0] + w / 2)
center_y = int(top_left[1] + h / 2)
logger.info(f"Template matched with confidence {max_val:.2f} at ({center_x}, {center_y})")
return (center_x, center_y)
else:
logger.warning(f"Template match failed. Max confidence: {max_val:.2f} < Threshold: {threshold}")
return None
except Exception as e:
logger.error(f"match_template_center error: {e}")
return None