Files
aiData/WeiXin/WxUtil.py
HuangHai 26b22c6e8f 'commit'
2026-01-26 20:05:00 +08:00

984 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import uiautomator2 as u2
import time
import asyncio
import logging
import sys
import os
import cv2
import numpy as np
import re
# 添加项目根目录到 sys.path 以便导入 Util
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
import json
from datetime import datetime
from Util.EasyOcrKit import EasyOcrKit
# 初始化 EasyOcrKit
ocr_kit = EasyOcrKit()
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("WxUtil")
# 目录配置
BASE_DATA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(BASE_DATA_DIR, "Logs")
OUTPUT_DIR = os.path.join(BASE_DATA_DIR, "Output")
TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Templates")
# 全局调试图片计数器
_debug_counter = 0
def get_next_debug_path(desc="step"):
"""获取下一个顺序命名的调试图片路径 (debug_N_desc.jpg)"""
global _debug_counter
_debug_counter += 1
filename = f"debug_{_debug_counter}_{desc}.jpg"
return os.path.join(OUTPUT_DIR, filename)
def clear_directory(dir_path, exclude_files=None):
"""清理指定目录下的所有文件,支持排除特定文件"""
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return
if exclude_files is None:
exclude_files = []
import shutil
for filename in os.listdir(dir_path):
if filename in exclude_files:
continue
file_path = os.path.join(dir_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
logger.warning(f"Failed to delete {file_path}. Reason: {e}")
def setup_script_environment():
"""运行前清理日志和输出目录"""
logger.info("清理运行环境: Logs 和 Output 目录...")
# 重置调试计数器
global _debug_counter
_debug_counter = 0
# 排除当前正在使用的日志文件
clear_directory(LOG_DIR, exclude_files=["T2_ChatMonitor.log", "WxUtil.log"])
clear_directory(OUTPUT_DIR)
def connect_device():
"""
连接设备并返回设备对象,同时打印详细的设备信息
"""
try:
d = u2.connect()
# 强制检查连接是否可用
if not d.info:
logger.error("设备连接不可用 (d.info is empty)")
return None
# 获取可靠的序列号
device_serial = d.serial if hasattr(d, 'serial') else "未知"
logger.info(f"设备连接成功: {device_serial}")
# 获取并打印详细设备信息
device_info = d.device_info
logger.info(f"详细设备信息: 品牌={device_info.get('brand')}, 型号={device_info.get('model')}, SDK={device_info.get('sdk')}")
return d
except Exception as e:
logger.error(f"设备连接失败: {e}")
return None
def safe_device_click(d, x, y):
"""
安全的点击操作,包含简单的异常捕获和重试逻辑
"""
try:
d.click(x, y)
return True
except Exception as e:
logger.warning(f"点击操作失败 ({x}, {y}): {e},尝试重新连接并重试...")
try:
# 尝试重新初始化连接
new_d = u2.connect()
new_d.click(x, y)
return True
except Exception as e2:
logger.error(f"重试点击操作依然失败: {e2}")
return False
def draw_debug_info(image_path, messages, current_voice_center=None, suffix=""):
"""
辅助函数:在截图中绘制当前已知的消息状态
:param image_path: 图片路径
:param messages: 消息列表
:param current_voice_center: 当前正在处理的语音中心坐标 (vx, vy)
:param suffix: 保存文件名的后缀
"""
try:
img = cv2.imread(image_path)
if img is None: return
for msg in messages:
if msg['type'] == 'voice':
ax, ay = msg['center']
is_unread = msg.get('is_unread', False)
is_converted = msg.get('is_converted', False)
# 绘制框
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
# 绘制 YES/NO
label = "YES" if is_converted else "NO"
cv2.putText(img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
# 如果是当前正在处理的语音,画一个额外的黄圈
if current_voice_center and abs(ax - current_voice_center[0]) < 10 and abs(ay - current_voice_center[1]) < 10:
cv2.circle(img, (ax, ay), 40, (0, 255, 255), 3)
cv2.putText(img, "PROCESSING", (ax - 60, ay - 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 保存覆盖后的图片
cv2.imwrite(image_path, img)
logger.info(f"已更新调试标记到截图: {image_path}")
except Exception as e:
logger.warning(f"绘制调试信息失败: {e}")
def _scan_chat_messages(image_path):
"""
内部函数:扫描图片中的微信消息(语音、文本、红点)
返回: (messages_list, debug_image)
"""
img = cv2.imread(image_path)
if img is None:
logger.error(f"无法读取图片: {image_path}")
return [], None
h, w = img.shape[:2]
# 3. 模板匹配寻找语音图标和红点
audio_template = os.path.join(TEMPLATE_DIR, "audio.jpg")
red_point_template = os.path.join(TEMPLATE_DIR, "red_point.jpg")
audio_matches = find_all_template_matches(image_path, audio_template, threshold=0.8)
red_points = find_all_template_matches(image_path, red_point_template, threshold=0.8)
# 4. OCR 识别所有文本
logger.info("正在执行 OCR 识别...")
ocr_results = ocr_kit.read_text(image_path)
# 4.5 尝试提取聊天标题 (对方昵称)
chat_title = "对方"
potential_titles = []
for bbox, text, conf in ocr_results:
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
# 标题区域通常在顶部 (状态栏下方,消息列表上方)
if 60 < c_y < 140:
clean = text.strip()
# 排除时间、信号、返回按钮等
if re.match(r'^\d{1,2}:\d{2}$', clean): continue
if "微信" in clean or "WeChat" in clean: continue
if clean in ["<", "返回", "消息", "(", ")"]: continue
if re.match(r'^\d+$', clean): continue # 排除纯数字(如未读数)
if len(clean) > 0:
potential_titles.append((c_x, clean))
if potential_titles:
# 优先取最接近水平中心的文本作为标题
potential_titles.sort(key=lambda x: abs(x[0] - w/2))
chat_title = potential_titles[0][1]
# 去除可能包含的括号(比如备注名后的群聊人数,虽然后面会被截断)
chat_title = re.sub(r'\(\d+\)$', '', chat_title).strip()
logger.info(f"识别到聊天标题/对方昵称: {chat_title}")
# 微信菜单关键字(用于排除干扰)
MENU_KEYWORDS = ["听筒播放", "收藏", "背景播放", "删除", "多选", "取消转文字", "转文字", "引用", "提醒"]
# 忽略的系统消息内容
IGNORE_CONTENT = ["撤回了一条消息", "打招呼的消息", "拍了拍", "你撤回了一条消息", "引用", "Clear Text", "Switch IME", "Done"]
# 5. 整合所有消息
messages = []
debug_img = img.copy() # 初始化调试图
# 绘制过滤区域边界 (可视化)
cv2.line(debug_img, (0, 150), (w, 150), (255, 0, 255), 2) # 顶部线
cv2.line(debug_img, (0, h - 100), (w, h - 100), (255, 0, 255), 2) # 底部线
cv2.putText(debug_img, "TOP_FILTER", (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
cv2.putText(debug_img, "BOTTOM_FILTER", (10, h - 110), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
claimed_ocr_indices = set()
# A. 添加语音消息
for ax, ay in audio_matches:
# 标记所有找到的语音图标 (用于调试)
cv2.circle(debug_img, (ax, ay), 10, (255, 255, 0), -1)
# 过滤掉顶部和底部的非聊天区域
if ay < 150 or ay > h - 100:
logger.info(f"忽略区域外语音图标: ({ax}, {ay})")
cv2.rectangle(debug_img, (ax-35, ay-35), (ax+35, ay+35), (128, 128, 128), 1)
cv2.putText(debug_img, "FILTERED", (ax - 40, ay - 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1)
continue
sender = "对方" if ax < w / 2 else ""
is_unread = False
for rx, ry in red_points:
# 红点通常在语音图标右侧且 Y 轴相近
if abs(ry - ay) < 50 and rx > ax:
is_unread = True
break
# 改进:判断是否已转文字
is_converted = False
converted_trigger_text = ""
associated_texts = [] # 存储关联的多行文本 [(y, x, text)]
for i, (bbox, text, conf) in enumerate(ocr_results):
if i in claimed_ocr_indices: continue
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 判定逻辑:文本在语音下方且水平偏移不大 (放宽 Y 轴限制以包含侧边的时长文本)
# 2025-01-26: 增加 X 轴范围到 900 以适配超长语音条的右侧时长/文本
# 增加 Y 轴范围到 800 以适配多行转文字内容
if -50 < c_y - ay < 800 and abs(c_x - ax) < 900:
# 检查中间是否有其他语音图标
has_intermediate_audio = False
for other_ax, other_ay in audio_matches:
if ay + 20 < other_ay < c_y - 10:
has_intermediate_audio = True
logger.info(f"语音({ax},{ay}) 被中间语音图标({other_ax},{other_ay}) 阻断,无法关联文本 '{text[:10]}...'")
break
if has_intermediate_audio:
continue
clean_text = text.strip()
# 判定是否为时间戳
is_timestamp = re.search(r'(\d{1,2}:\d{2})', clean_text) and (len(clean_text) < 15)
# 判定是否为纯数字或时长
is_duration = re.search(r'\d{1,2}"?$', clean_text) and len(clean_text) < 6
# 判定是否为系统消息
is_ignored = any(k in clean_text for k in IGNORE_CONTENT)
# 噪音判定 (例如 "少3"")
is_noise = "" in clean_text and len(clean_text) < 8 and re.search(r'\d', clean_text)
if not is_duration and not is_timestamp and clean_text not in MENU_KEYWORDS and not is_ignored and not is_noise:
is_converted = True
associated_texts.append((c_y, c_x, clean_text))
claimed_ocr_indices.add(i)
# 不再 break继续寻找后续文本行
else:
# 这些文本虽然不作为内容,但它们属于语音消息的附属信息,标记为已处理
claimed_ocr_indices.add(i)
if is_timestamp:
logger.info(f"语音({ax},{ay}) 忽略下方时间戳文本: '{clean_text}'")
elif is_duration:
logger.info(f"语音({ax},{ay}) 忽略时长文本: '{clean_text}'")
elif is_noise:
logger.info(f"语音({ax},{ay}) 忽略噪音文本: '{clean_text}'")
elif is_ignored:
logger.info(f"语音({ax},{ay}) 忽略系统消息文本: '{clean_text}'")
else:
logger.info(f"语音({ax},{ay}) 忽略其他文本(可能是菜单): '{clean_text}'")
# 整合所有关联文本
if associated_texts:
# 按 Y 轴排序,如果 Y 接近则按 X 轴排序
associated_texts.sort(key=lambda x: (x[0], x[1]))
converted_trigger_text = "".join([t[2] for t in associated_texts])
logger.info(f"语音({ax},{ay}) 判定为已转换,最终合并文本: '{converted_trigger_text}'")
if is_converted:
logger.info(f"语音消息 ({ax}, {ay}) 已有转换文字: '{converted_trigger_text}',跳过")
# 绘图反馈
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
label = "YES" if is_converted else "NO"
cv2.putText(debug_img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
messages.append({
"type": "voice",
"sender": sender,
"center": (ax, ay),
"y": ay,
"is_unread": is_unread,
"is_converted": is_converted,
"content": converted_trigger_text if is_converted else None
})
# B. 添加文本消息
for i, (bbox, text, conf) in enumerate(ocr_results):
if i in claimed_ocr_indices: continue
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
if c_y < 150 or c_y > h - 100:
continue
time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)'
if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)):
logger.info(f"忽略时间戳/日期文本: '{text}'")
continue
clean_text = text.strip()
if re.match(r'^.?[0-9]{1,2}"?$', clean_text):
logger.info(f"忽略疑似时长文本: '{clean_text}'")
continue
# 噪音判定 (例如 "少3"")
if "" in clean_text and len(clean_text) < 8 and re.search(r'\d', clean_text):
logger.info(f"忽略噪音文本: '{clean_text}'")
continue
if clean_text in MENU_KEYWORDS:
logger.info(f"忽略菜单关键词: '{clean_text}'")
continue
if any(k in clean_text for k in IGNORE_CONTENT):
logger.info(f"忽略系统消息内容: '{clean_text}'")
continue
left_x = bbox[0][0]
sender = "对方" if left_x < w * 0.5 else ""
messages.append({
"type": "text",
"sender": sender,
"content": text.strip(),
"center": (c_x, c_y),
"y": c_y
})
# 6. 排序
messages.sort(key=lambda x: x['y'])
return messages, debug_img, chat_title
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方", process_strategy="ALL"):
"""
全面采用 CV + OCR 识别微信聊天截图中的最后一条消息
:param process_strategy: 语音处理策略 (ALL/UNREAD/LAST)
注意:此函数现在包含一个循环,如果发现需要转文字的语音,会逐个处理并重新截图。
"""
try:
d = device if device else connect_device()
if not d:
return [], None
current_image_path = image_path
current_output_path = output_path
final_messages = []
loop_count = 0
MAX_LOOPS = 10 # 增加循环次数上限,适应 ALL 策略
# 统计计数器
total_voices_count = 0
convert_opened_count = 0
convert_closed_count = 0
# 记录本次会话已处理过的语音 Y 坐标集合
processed_y_coords = set()
# 记录 Peek-and-Restore 过程中抓取到的语音内容 {y_coord: content}
captured_voice_contents = {}
# 初始化异步任务列表
analyze_chat_image._ocr_tasks = []
while loop_count < MAX_LOOPS:
loop_count += 1
logger.info(f"--- 分析循环 第 {loop_count} 次 ---")
# 1. 扫描当前屏幕
messages, debug_img, chat_title = _scan_chat_messages(current_image_path)
if messages is None: # 读取失败
return [], None
# 更新消息发送者名称 (将 "对方" 替换为 实际标题)
if chat_title and chat_title != "对方":
for m in messages:
if m['sender'] == "对方":
m['sender'] = chat_title
# 保存当前状态的调试图
if current_output_path:
cv2.imwrite(current_output_path, debug_img)
logger.info(f"调试图已保存: {current_output_path}")
# 2. 筛选需要处理的语音
all_voices = [m for m in messages if m['type'] == 'voice']
all_voices.sort(key=lambda x: x['y']) # 从上到下
# 更新统计 (取当前扫描到的数量)
total_voices_count = len(all_voices)
# Helper: 检查是否已处理
def is_processed(y_coord):
for py in processed_y_coords:
if abs(y_coord - py) < 20: # 20px 容差
return True
return False
target_voices = []
if process_strategy == "ALL":
# ALL 策略:处理所有未被记录处理过的、且未转换的语音
target_voices = [m for m in all_voices if not m.get('is_converted') and not is_processed(m['y'])]
logger.info(f"策略(ALL): 发现 {len(target_voices)} 条未转换待处理语音")
elif process_strategy == "UNREAD":
# UNREAD 策略:只处理未读且未转换且未处理过的
target_voices = [m for m in all_voices if m.get('is_unread') and not m.get('is_converted') and not is_processed(m['y'])]
logger.info(f"策略(UNREAD): 发现 {len(target_voices)} 条未读待处理语音")
elif process_strategy == "LAST":
# LAST 策略:只处理最后一条未转换的
unconverted = [m for m in all_voices if not m.get('is_converted')]
if unconverted:
last_voice = unconverted[-1]
if not is_processed(last_voice['y']):
target_voices = [last_voice]
logger.info(f"策略(LAST): 仅关注最后一条未转换语音")
# 如果没有需要处理的语音,或者我们已经达到了策略要求,退出循环
if not target_voices:
logger.info("当前屏幕无待处理语音,分析结束")
final_messages = messages
break
# 3. 处理第一条目标语音
# 注意:只处理第一条,因为处理后界面会变动(展开文字),坐标会失效
target = target_voices[0]
vx, vy = int(target['center'][0]), int(target['center'][1])
# 标记为已处理
processed_y_coords.add(target['y'])
logger.info(f"准备处理语音 ({vx}, {vy})...")
# 高亮正在处理的语音并保存更新后的调试图
draw_debug_info(current_output_path, messages, current_voice_center=(vx, vy))
# 执行操作:长按 -> 转文字
logger.info(f"正在长按语音消息 ({vx}, {vy})...")
d.long_click(vx, vy, 1.0) # 缩短按压时间
# 轮询寻找“转文字”按钮
logger.info("正在快速寻找'转文字'按钮...")
zhuan_template = os.path.join(TEMPLATE_DIR, "zhun_wen_zi.jpg")
btn_pos = None
poll_start = time.time()
while time.time() - poll_start < 3.0: # 最多等 3 秒
menu_shot = get_next_debug_path("step_long_press_poll")
d.screenshot(menu_shot)
btn_pos = find_template_match(menu_shot, zhuan_template, threshold=0.7)
if btn_pos:
break
time.sleep(0.2) # 快速轮询
if btn_pos:
btn_x, btn_y = int(btn_pos[0]), int(btn_pos[1])
logger.info(f"✅ 找到'转文字'按钮: ({btn_x}, {btn_y}),点击中...")
safe_device_click(d, btn_x, btn_y)
convert_opened_count += 1
logger.info("等待语音转文字完成...")
time.sleep(3.0) # 缩短等待时间 (原5.0s)
# --- Peek-and-Restore 逻辑 (异步优化版) ---
# 1. 截图 (但不立即 OCR而是丢给异步任务)
peek_shot = get_next_debug_path("step_peek_content")
d.screenshot(peek_shot)
logger.info("已截图启动异步OCR任务以提取内容...")
async def _async_ocr_task(img_path, target_y):
"""内部异步任务:在线程池中运行 OCR"""
loop = asyncio.get_running_loop()
# 在默认执行器(线程池)中运行耗时的 _scan_chat_messages
msgs, _, _ = await loop.run_in_executor(None, _scan_chat_messages, img_path)
found = None
for pm in msgs:
if pm['type'] == 'voice' and pm.get('is_converted'):
if abs(pm['y'] - target_y) < 50:
found = pm.get('content')
break
return target_y, found
# 创建并保存任务
task = asyncio.create_task(_async_ocr_task(peek_shot, vy))
# 我们需要一个列表来保存任务,这里临时利用 list
if not hasattr(analyze_chat_image, "_ocr_tasks"):
analyze_chat_image._ocr_tasks = []
analyze_chat_image._ocr_tasks.append(task)
# 2. 立即还原状态 (取消转文字)
# 注意:由于 OCR 还没出结果,我们无法精确定位展开后的文字位置
# 但通常点击原语音气泡位置 (vx, vy) 也能触发菜单
logger.info("准备还原状态 (取消转文字)...")
d.long_click(vx, vy, 1.0) # 盲点原坐标
logger.info("正在快速寻找'隐藏文字'按钮...")
cancel_template = os.path.join(TEMPLATE_DIR, "cancel_zhuan_wen_zi.jpg")
cancel_btn = None
poll_start = time.time()
while time.time() - poll_start < 3.0:
restore_menu_shot = get_next_debug_path("step_restore_poll")
d.screenshot(restore_menu_shot)
cancel_btn = find_template_match(restore_menu_shot, cancel_template, threshold=0.7)
if cancel_btn:
break
time.sleep(0.2)
if cancel_btn:
cx, cy = int(cancel_btn[0]), int(cancel_btn[1])
logger.info(f"✅ 找到'隐藏文字'按钮: ({cx}, {cy}),点击还原...")
safe_device_click(d, cx, cy)
convert_closed_count += 1
time.sleep(2.0) # 等待收起动画
else:
logger.warning("❌ 未找到'隐藏文字'按钮,无法还原状态!(后续可能导致重复处理)")
# 3. 准备下一次循环
# 重新截图,因为界面可能微调,或者只是恢复了
next_screenshot = get_next_debug_path("step_restored")
d.screenshot(next_screenshot)
current_image_path = next_screenshot
current_output_path = get_next_debug_path("flag_restored")
continue
else:
logger.warning("❌ 未找到'转文字'按钮,可能是已转换或误判")
# 即使失败,也已记录在 processed_y_coords 中,避免死循环
# 继续尝试下一条语音
logger.info("跳过当前语音,继续扫描...")
continue
# 循环结束后,等待所有异步 OCR 任务完成
if hasattr(analyze_chat_image, "_ocr_tasks") and analyze_chat_image._ocr_tasks:
logger.info(f"等待 {len(analyze_chat_image._ocr_tasks)} 个异步 OCR 任务完成...")
results = await asyncio.gather(*analyze_chat_image._ocr_tasks)
for y, content in results:
if content:
captured_voice_contents[y] = content
logger.info(f"✅ [Async OCR] 异步获取到语音内容 (y={y}): {content}")
# 清空任务列表
analyze_chat_image._ocr_tasks = []
# 循环结束,返回最后一次分析的结果
if not final_messages: # 如果循环因为 max_loops 退出,确保有结果
final_messages = messages
# 注入 peek 到的内容
if captured_voice_contents:
logger.info(f"正在注入 {len(captured_voice_contents)} 条已还原的语音内容...")
for m in final_messages:
if m['type'] == 'voice' and not m.get('content'):
for py, content in captured_voice_contents.items():
if abs(m['y'] - py) < 30: # 匹配原始 Y 坐标
m['content'] = content
m['is_converted'] = True # 标记为逻辑上已转换
logger.info(f" -> 注入内容: {content[:10]}...")
break
# 构造返回值
dialogue_log = []
# 使用 debug_img 的尺寸,如果 debug_img 未定义(极端情况),默认 1080x1920
if 'debug_img' in locals() and debug_img is not None:
input_field_coordinates = (debug_img.shape[1] // 2, int(debug_img.shape[0] * 0.9))
else:
# 尝试读取 current_image_path
try:
tmp_img = cv2.imread(current_image_path)
input_field_coordinates = (tmp_img.shape[1] // 2, int(tmp_img.shape[0] * 0.9))
except:
input_field_coordinates = (540, 1728)
# 找出最后一条消息
last_msg = None
if final_messages:
final_messages.sort(key=lambda x: x['y'])
last_msg = final_messages[-1]
# 转换为 dialogue_log 格式 (简单转换,具体业务逻辑在调用方处理)
# 注意T2 需要的是上下文列表
pass # 实际上 T2 使用的是 LLM 上下文构建,这里不需要转换成特定 dict 结构,
# 但为了兼容旧接口,我们还是返回 messages 列表给调用者处理,
# 或者在这里处理成 (role, content) 列表?
# 原代码似乎没有做太多转换,而是直接返回 messages 列表?
# 仔细看原代码analyze_chat_image 并没有返回 messages 列表!
# 它返回 dialogue_log, input_pos
# 原代码 lines 339-340: dialogue_log = []
# 可以在最后统一生成
# 统一生成 dialogue_log
for msg in final_messages:
# 尝试注入异步获取的语音内容
if msg['type'] == 'voice':
# 模糊匹配 Y 坐标
for y_key, content in captured_voice_contents.items():
if abs(msg['y'] - y_key) < 20:
msg['is_converted'] = True
msg['content'] = content
logger.info(f"注入语音内容到最终消息列表: {content}")
break
# 只添加有内容的文本消息,或已转换且有内容的语音消息
if msg['type'] == 'text' and msg.get('content'):
dialogue_log.append(msg)
elif msg['type'] == 'voice' and msg.get('is_converted') and msg.get('content'):
dialogue_log.append(msg)
logger.info(f"📊 [统计] 语音总数: {total_voices_count}, 打开转文字次数: {convert_opened_count}, 关闭转文字次数: {convert_closed_count}")
return dialogue_log, input_field_coordinates
except Exception as e:
logger.error(f"分析过程发生异常: {e}", exc_info=True)
return [], None
def clean_screenshots_dir():
"""清理截图目录"""
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
return
for f in os.listdir(OUTPUT_DIR):
if f.lower().endswith(('.jpg', '.png', '.jpeg')):
try:
os.remove(os.path.join(OUTPUT_DIR, f))
except Exception as e:
logger.warning(f"Failed to delete {f}: {e}")
def is_in_chat_interface(d):
"""
检查是否在微信聊天界面
"""
try:
# 1. 底部语音/键盘切换按钮
if d(description="切换到语音").exists or d(description="切换到键盘").exists:
return True
# 2. 底部输入框
if d(className="android.widget.EditText").exists:
return True
# 3. 底部“按住说话”按钮
if d(text="按住说话").exists:
return True
# 4. 右上角更多按钮
if d(description="聊天信息").exists:
return True
except Exception as e:
logger.warning(f"is_in_chat_interface check failed: {e}")
return False
def find_input_box_center(image_path):
"""
寻找输入框中心坐标 (兜底策略)
优先使用几何特征 (底部 88% 处)
"""
try:
if not os.path.exists(image_path):
return (540, 2100), None
img = cv2.imread(image_path)
if img is None:
return (540, 2100), None
h, w = img.shape[:2]
# 策略:直接返回屏幕底部 88% 处的中心点
center_x = int(w * 0.5)
center_y = int(h * 0.88)
logger.info(f"find_input_box_center fallback: ({center_x}, {center_y})")
return (center_x, center_y), None
except Exception as e:
logger.error(f"find_input_box_center error: {e}")
return (540, 2100), None
def find_template_match(screen_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找按钮中心坐标
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return None
img = cv2.imread(screen_path)
template = cv2.imread(template_path)
if img is None or template is None:
return None
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
if max_val >= threshold:
center_x = max_loc[0] + w // 2
center_y = max_loc[1] + h // 2
logger.info(f"Template matched! Score: {max_val:.2f}, Center: ({center_x}, {center_y})")
return (center_x, center_y)
logger.info(f"Template not matched. Max score: {max_val:.2f}")
return None
except Exception as e:
logger.error(f"Template matching failed: {e}")
return None
def find_all_template_matches(screen_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找**所有**符合条件的坐标
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return []
img = cv2.imread(screen_path)
template = cv2.imread(template_path)
if img is None or template is None:
return []
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
# 记录最大匹配度,方便调试阈值
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
logger.info(f"模板匹配 {os.path.basename(template_path)}: 最大相似度 = {max_val:.4f} (阈值={threshold})")
# 找到所有大于阈值的点
loc = np.where(res >= threshold)
points = []
for pt in zip(*loc[::-1]): # Switch collumns and rows
center_x = pt[0] + w // 2
center_y = pt[1] + h // 2
points.append((center_x, center_y))
# 简单的去重(非极大值抑制的简化版,合并相近的点)
# 这里假设红点不会重叠,暂时直接返回,或者做一个简单的聚类
# 实际应用中matchTemplate 对同一个目标周围可能会有多个连续的匹配点
# 我们需要合并它们
unique_points = []
for p in points:
is_close = False
for up in unique_points:
if abs(p[0] - up[0]) < 10 and abs(p[1] - up[1]) < 10:
is_close = True
break
if not is_close:
unique_points.append(p)
if unique_points:
logger.info(f"Found {len(unique_points)} matches for {os.path.basename(template_path)}")
return unique_points
except Exception as e:
logger.error(f"find_all_template_matches failed: {e}")
return []
def perform_input_action(d, center_point, text, auto_send=True):
"""
执行输入操作
"""
try:
# --- 新增逻辑:确保处于文字输入模式 ---
logger.info("正在检查输入模式...")
tmp_check_shot = os.path.join(OUTPUT_DIR, "temp_input_check.jpg")
d.screenshot(tmp_check_shot)
wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg")
input_text_template = os.path.join(TEMPLATE_DIR, "input_text.jpg")
# 1. 检查是否存在 '切换到文字' 图标 (表示当前是语音模式)
# 注意:这里假设 wen_zi_input.jpg 是那个“键盘”图标
wen_zi_pos = find_template_match(tmp_check_shot, wen_zi_template, threshold=0.8)
if wen_zi_pos:
logger.info(f"检测到语音模式 (找到切换文字图标: {wen_zi_pos}),点击切换...")
d.click(wen_zi_pos[0], wen_zi_pos[1])
time.sleep(1.0) # 等待 UI 切换
else:
# 2. 如果没找到切换图标,假设是文字模式,尝试点击输入区域标识
logger.info("未检测到语音模式切换图标,尝试寻找文字输入区域...")
input_text_pos = find_template_match(tmp_check_shot, input_text_template, threshold=0.8)
if input_text_pos:
logger.info(f"找到文字输入区域标识 (input_text.jpg): {input_text_pos},点击激活...")
d.click(input_text_pos[0], input_text_pos[1])
time.sleep(0.5)
else:
logger.info("未找到特定的输入区域标识,将使用默认坐标或控件查找。")
# 清理临时文件
if os.path.exists(tmp_check_shot):
try:
os.remove(tmp_check_shot)
except:
pass
# --- 新增逻辑结束 ---
# 1. 尝试找到原生输入框并输入
edit_text = d(className="android.widget.EditText")
input_success = False
if edit_text.exists:
logger.info("Found native EditText, using set_text")
try:
edit_text.click()
time.sleep(0.5)
edit_text.set_text(text)
input_success = True
except Exception as e:
logger.warning(f"Native input failed: {e}")
# 2. 如果原生输入失败,使用坐标点击 + 粘贴/输入
if not input_success:
cx, cy = center_point
logger.info(f"Using coordinate input: {center_point}")
d.click(cx, cy)
time.sleep(1.0)
try:
d.send_keys(text)
except Exception:
logger.warning("send_keys failed, trying set_clipboard")
d.set_clipboard(text)
d.click(cx, cy)
time.sleep(0.5)
# 尝试粘贴
d.press("paste")
time.sleep(1.0)
# 3. 发送
if auto_send:
# 优先使用模板匹配寻找“发送”按钮
logger.info("尝试使用模板匹配寻找'发送'按钮...")
tmp_screen = os.path.join(os.path.dirname(os.path.abspath(__file__)), "temp_send_check.jpg")
d.screenshot(tmp_screen)
# 使用相对路径
template_path = os.path.join(TEMPLATE_DIR, "send.jpg")
send_btn_pos = find_template_match(tmp_screen, template_path, threshold=0.7) # 稍微降低阈值以提高召回
if send_btn_pos:
logger.info(f"通过模板匹配找到发送按钮: {send_btn_pos}, 点击...")
d.click(send_btn_pos[0], send_btn_pos[1])
else:
logger.warning("模板匹配未找到发送按钮,尝试原生控件查找...")
if d(text="发送").exists:
d(text="发送").click()
logger.info("Clicked '发送'")
else:
d.press("enter")
logger.info("Pressed Enter")
# 清理临时文件
if os.path.exists(tmp_screen):
try:
os.remove(tmp_screen)
except:
pass
return True
except Exception as e:
logger.error(f"perform_input_action error: {e}")
return False
def match_template_center(image_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找目标图片中心坐标
"""
try:
if not os.path.exists(image_path) or not os.path.exists(template_path):
logger.error(f"Image or template not found: {image_path}, {template_path}")
return None
img = cv2.imread(image_path)
template = cv2.imread(template_path)
if img is None or template is None:
logger.error("Failed to read image or template")
return None
# 转换为灰度图进行匹配
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
# 模板匹配
result = cv2.matchTemplate(img_gray, template_gray, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val >= threshold:
h, w = template_gray.shape
top_left = max_loc
center_x = int(top_left[0] + w / 2)
center_y = int(top_left[1] + h / 2)
logger.info(f"Template matched with confidence {max_val:.2f} at ({center_x}, {center_y})")
return (center_x, center_y)
else:
logger.warning(f"Template match failed. Max confidence: {max_val:.2f} < Threshold: {threshold}")
return None
except Exception as e:
logger.error(f"match_template_center error: {e}")
return None
async def get_first_screen(device=None):
"""
获取刚进入界面的首屏信息:
1. 截图
2. 全量识别 (策略=ALL),包含语音转文字 Peek-and-Restore
3. 返回识别结果和相关图片路径
Returns:
tuple: (dialogue_log, input_pos, enter_path, flag_path)
"""
logger.info("🔍 [get_first_screen] 正在进行首屏全量识别...")
if not device:
device = connect_device()
if not device:
logger.error("设备连接失败,无法获取首屏")
return [], None, None, None
# 1. 截图
enter_path = get_next_debug_path("enter")
device.screenshot(enter_path)
logger.info(f"📸 已保存进入截图: {enter_path}")
# 2. 识别
flag_path = get_next_debug_path("flag")
dialogue_log, input_pos = await analyze_chat_image(
enter_path,
flag_path,
device=device,
process_strategy="ALL"
)
return dialogue_log, input_pos, enter_path, flag_path