Files
aiData/WeiXin/WxUtil.py
HuangHai bf485d10f1 'commit'
2026-01-25 12:52:52 +08:00

361 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import uiautomator2 as u2
import time
import logging
import sys
import os
import cv2
import numpy as np
import re
# 添加项目根目录到 sys.path 以便导入 Util
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
import json
from Util.VLMKit import VLMKit
from Util.EasyOcrKit import EasyOcrKit
# 初始化 VLMKit 和 EasyOcrKit
vlm_kit = VLMKit()
ocr_kit = EasyOcrKit()
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("WxUtil")
async def get_vlm_analysis(image_path):
"""
仅调用 VLM 分析图片,返回原始 JSON 数据 (dict)
"""
logger.info(f"正在使用 VLM 分析图片: {image_path}")
# 构造 Prompt
prompt = """
请分析这张微信聊天截图。
【核心任务】
识别图中的【语音消息气泡】和【文本消息气泡】。
【重要判别规则】
1. 🔊 **语音消息 (Voice)**
- **视觉特征**
- **高度**:固定(单行)。
- **宽度**随时长1"~60")变化。
- **极短 (1"-2")**:气泡非常短,形状接近一个小正方形。
- **极长 (60")**:气泡很长,宽度接近屏幕的一半。
- **内容**:气泡内**只有一个**表示时长的数字(如 `8"`)和一个声波图标。
- **绝对排除**:凡是包含汉字、长句子的气泡,**统统不是**语音消息。
2. 📝 **文本消息 (Text)**
- **视觉特征**:气泡内包含汉字、标点符号、表情等文本内容。
【坐标系统】
**必须使用 [0-1000] 的归一化坐标系。**
- 左上角为 [0, 0],右下角为 [1000, 1000]。
- 请返回气泡的**几何中心点**的归一化坐标。
【输出格式】
请返回纯 JSON 格式:
{
"is_chat_interface": true,
"input_box": [x, y],
"messages": [
{
"type": "voice",
"status": "converted" | "unconverted",
"center": [x, y],
"content": "8\""
},
{
"type": "text",
"center": [x, y],
"content": "这里是文本内容"
}
]
}
注意:
1. 坐标 `center` 和 `input_box` 必须是 [0-1000] 的归一化坐标。
2. `status` 判断:如果语音气泡的正下方紧挨着一条文本消息(通常是转换出的文字),则为 `converted`,否则为 `unconverted`。
3. 请按从上到下的顺序输出所有消息。
"""
try:
# 调用 VLM
response = await vlm_kit.analyze_image(image_path, prompt)
json_str = vlm_kit.extract_json(response)
result_data = json.loads(json_str)
# 获取图片尺寸进行坐标反归一化
try:
from PIL import Image
with Image.open(image_path) as img:
width, height = img.size
# 定义反归一化函数
def denormalize(point):
if not point or len(point) != 2:
return point
return [int(point[0] / 1000 * width), int(point[1] / 1000 * height)]
# 反归一化 input_box
if result_data.get("input_box"):
result_data["input_box"] = denormalize(result_data["input_box"])
# 反归一化 messages
if result_data.get("messages"):
for msg in result_data["messages"]:
if msg.get("center"):
msg["center"] = denormalize(msg["center"])
if msg.get("coordinates"): # 兼容旧字段
msg["coordinates"] = denormalize(msg["coordinates"])
except Exception as e:
logger.warning(f"坐标反归一化失败: {e},将使用原始坐标")
return result_data
except Exception as e:
logger.error(f"VLM Analysis Failed: {e}", exc_info=True)
return None
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方"):
"""
使用 VLM 识别微信聊天截图中的对话内容、语音消息状态以及输入框位置
替代原本的 CV/OCR 方案
"""
# 语音识别标志
should_trigger_convert = False
try:
result_data = await get_vlm_analysis(image_path)
if not result_data:
return [], None
try:
# 检查是否为聊天界面
is_chat = result_data.get("is_chat_interface", False)
if not is_chat:
logger.warning("VLM 判断当前不是微信聊天界面")
return None, None
if isinstance(result_data, list):
# 兼容旧格式
messages = result_data
input_field_coordinates = None
else:
messages = result_data.get("messages", [])
input_field_coordinates = result_data.get("input_box") # input_box
except Exception as e:
logger.error(f"解析 VLM 结果失败: {e}")
return [], None
dialogue_log = []
unconverted_voices = []
# 处理识别结果
for msg in messages:
sender = msg.get('sender', '未知')
msg_type = msg.get('type', 'other')
content = msg.get('content', '')
coords = msg.get('center', [0, 0]) # center
status = msg.get('status', 'unconverted')
is_converted = (status == "converted")
# 记录对话日志
if msg_type == 'voice':
if is_converted:
dialogue_log.append(f"{sender}: [语音] {content} (已转换)")
else:
dialogue_log.append(f"{sender}: [语音] (待转换)")
# 将 center 转换为 coordinates 供后续使用
msg['coordinates'] = coords
unconverted_voices.append(msg)
elif msg_type == 'text':
dialogue_log.append(f"{sender}: {content}")
logger.info(f"VLM 识别: {sender} [{msg_type}] {content} (Converted: {is_converted})")
# 处理未转换的语音消息
if unconverted_voices:
logger.info(f"发现 {len(unconverted_voices)} 条未转换的语音消息,将仅处理最后一条...")
# 仅保留最后一条语音消息进行处理
unconverted_voices = [unconverted_voices[-1]]
# 使用传入的 device 或创建新连接
d = device if device else u2.connect()
for voice in unconverted_voices:
vx, vy = voice['coordinates']
logger.info(f"长按语音消息: ({vx}, {vy})")
d.long_click(vx, vy, 1.5)
time.sleep(1.0)
# 查找“转文字” (使用 OCR)
menu_shot_path = os.path.join(os.path.dirname(image_path), "temp_menu_shot.jpg")
d.screenshot(menu_shot_path)
# OCR 识别
ocr_results = ocr_kit.read_text(menu_shot_path)
convert_btn_center = None
for bbox, text, conf in ocr_results:
if "转文字" in text or "转换为文字" in text:
# bbox is [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
# Calculate center
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
convert_btn_center = (c_x, c_y)
break
if convert_btn_center:
logger.info(f"OCR 找到 '转文字' 按钮: {convert_btn_center}")
d.click(convert_btn_center[0], convert_btn_center[1])
should_trigger_convert = True
# 动态等待: 60s语音约需10s转换比例约 1/6
duration_str = voice.get('content', '0').replace('"', '').strip()
try:
duration = int(duration_str)
except:
duration = 10 # 默认值
wait_seconds = max(2, duration / 5.0) # 稍微多等一点,用 /5.0
logger.info(f"语音时长 {duration}s预计等待转换 {wait_seconds:.1f}s...")
time.sleep(wait_seconds)
else:
logger.warning("OCR 未找到 '转文字' 菜单项")
# 点击空白处关闭菜单,避免遮挡
d.click(vx + 200, vy)
if should_trigger_convert:
# 转换完成后稍微多等一下,确保 UI 刷新
time.sleep(1.0)
return "VOICE_CONVERTING", input_field_coordinates
return dialogue_log, input_field_coordinates
except Exception as e:
logger.error(f"VLM 分析失败: {e}", exc_info=True)
return [], None
def clean_screenshots_dir():
"""清理截图目录"""
screenshot_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Screenshots")
if not os.path.exists(screenshot_dir):
os.makedirs(screenshot_dir)
return
for f in os.listdir(screenshot_dir):
if f.lower().endswith(('.jpg', '.png', '.jpeg')):
try:
os.remove(os.path.join(screenshot_dir, f))
except Exception as e:
logger.warning(f"Failed to delete {f}: {e}")
def is_in_chat_interface(d):
"""
检查是否在微信聊天界面
"""
try:
# 1. 底部语音/键盘切换按钮
if d(description="切换到语音").exists or d(description="切换到键盘").exists:
return True
# 2. 底部输入框
if d(className="android.widget.EditText").exists:
return True
# 3. 底部“按住说话”按钮
if d(text="按住说话").exists:
return True
# 4. 右上角更多按钮
if d(description="聊天信息").exists:
return True
except Exception as e:
logger.warning(f"is_in_chat_interface check failed: {e}")
return False
def find_input_box_center(image_path):
"""
寻找输入框中心坐标 (兜底策略)
优先使用几何特征 (底部 88% 处)
"""
try:
if not os.path.exists(image_path):
return (540, 2100), None
img = cv2.imread(image_path)
if img is None:
return (540, 2100), None
h, w = img.shape[:2]
# 策略:直接返回屏幕底部 88% 处的中心点
center_x = int(w * 0.5)
center_y = int(h * 0.88)
logger.info(f"find_input_box_center fallback: ({center_x}, {center_y})")
return (center_x, center_y), None
except Exception as e:
logger.error(f"find_input_box_center error: {e}")
return (540, 2100), None
def perform_input_action(d, center_point, text, auto_send=True):
"""
执行输入操作
"""
try:
# 1. 尝试找到原生输入框并输入
edit_text = d(className="android.widget.EditText")
input_success = False
if edit_text.exists:
logger.info("Found native EditText, using set_text")
try:
edit_text.click()
time.sleep(0.5)
edit_text.set_text(text)
input_success = True
except Exception as e:
logger.warning(f"Native input failed: {e}")
# 2. 如果原生输入失败,使用坐标点击 + 粘贴/输入
if not input_success:
cx, cy = center_point
logger.info(f"Using coordinate input: {center_point}")
d.click(cx, cy)
time.sleep(1.0)
try:
d.send_keys(text)
except Exception:
logger.warning("send_keys failed, trying set_clipboard")
d.set_clipboard(text)
d.click(cx, cy)
time.sleep(0.5)
# 尝试粘贴
d.press("paste")
time.sleep(1.0)
# 3. 发送
if auto_send:
if d(text="发送").exists:
d(text="发送").click()
logger.info("Clicked '发送'")
else:
d.press("enter")
logger.info("Pressed Enter")
except Exception as e:
logger.error(f"perform_input_action error: {e}")