1430 lines
66 KiB
Python
1430 lines
66 KiB
Python
# coding=utf-8
|
||
import uiautomator2 as u2
|
||
import time
|
||
import asyncio
|
||
import logging
|
||
import sys
|
||
import os
|
||
import cv2
|
||
import numpy as np
|
||
import re
|
||
|
||
# 添加项目根目录到 sys.path 以便导入 Util
|
||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
if project_root not in sys.path:
|
||
sys.path.append(project_root)
|
||
|
||
import json
|
||
from datetime import datetime, timedelta
|
||
from Util.EasyOcrKit import EasyOcrKit
|
||
|
||
# 初始化 EasyOcrKit
|
||
ocr_kit = EasyOcrKit()
|
||
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||
logger = logging.getLogger("WxUtil")
|
||
|
||
# 目录配置
|
||
BASE_DATA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
LOG_DIR = os.path.join(BASE_DATA_DIR, "Logs")
|
||
OUTPUT_DIR = os.path.join(BASE_DATA_DIR, "Output")
|
||
TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Templates")
|
||
|
||
# 全局调试图片计数器
|
||
_debug_counter = 0
|
||
|
||
def parse_wechat_time(time_str):
|
||
"""
|
||
解析微信时间字符串为标准化格式 (YYYY-MM-DD HH:MM)
|
||
支持: "10:03", "昨天 10:03", "星期三 10:03", "2025年1月1日 10:03"
|
||
"""
|
||
try:
|
||
clean_str = time_str.strip()
|
||
|
||
# 0. 预处理:过滤纯数字(防止电话号码被误识别为时间)
|
||
# 微信时间戳通常包含中文或冒号,单纯的数字串(如 "18686619970")不是有效时间
|
||
if re.match(r'^\d+$', clean_str):
|
||
logger.warning(f"忽略疑似电话号码/纯数字的时间字符串: '{clean_str}'")
|
||
return ""
|
||
|
||
now = datetime.now()
|
||
today = now.date()
|
||
|
||
# 1. HH:mm (当天)
|
||
# 注意:有时候 OCR 会把冒号识别成其他字符,这里假设是标准的 HH:mm
|
||
if re.match(r'^\d{1,2}:\d{2}$', clean_str):
|
||
h, m = map(int, clean_str.split(':'))
|
||
dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m))
|
||
return dt.strftime("%Y-%m-%d %H:%M")
|
||
|
||
# 2. 昨天 HH:mm
|
||
if "昨天" in clean_str:
|
||
t_part = clean_str.replace("昨天", "").strip()
|
||
if re.match(r'^\d{1,2}:\d{2}$', t_part):
|
||
h, m = map(int, t_part.split(':'))
|
||
yesterday = today - timedelta(days=1)
|
||
dt = datetime.combine(yesterday, datetime.min.time().replace(hour=h, minute=m))
|
||
return dt.strftime("%Y-%m-%d %H:%M")
|
||
|
||
# 3. 星期X / 周X HH:mm
|
||
weekdays_map = {
|
||
"星期一": 0, "星期二": 1, "星期三": 2, "星期四": 3, "星期五": 4, "星期六": 5, "星期日": 6,
|
||
"周一": 0, "周二": 1, "周三": 2, "周四": 3, "周五": 4, "周六": 5, "周日": 6
|
||
}
|
||
for w_str, w_idx in weekdays_map.items():
|
||
if w_str in clean_str:
|
||
# 提取时间部分 (支持 "周三 10:03" 或 "周三10:03")
|
||
t_part = clean_str.replace(w_str, "").strip()
|
||
time_match = re.search(r'(\d{1,2}):(\d{2})', t_part)
|
||
|
||
h, m = 0, 0
|
||
if time_match:
|
||
h, m = map(int, time_match.groups())
|
||
|
||
current_weekday = now.weekday()
|
||
# 计算日期回退天数 (mod 7 确保是过去的一周内)
|
||
delta_days = (current_weekday - w_idx) % 7
|
||
|
||
# 如果 delta_days 是 0,且当前时间比解析出的时间早,说明是上周的今天
|
||
# 微信通常只有在真的“过去”才会显示星期几
|
||
if delta_days == 0 and time_match:
|
||
if now.hour < h or (now.hour == h and now.minute < m):
|
||
delta_days = 7
|
||
elif delta_days == 0 and not time_match:
|
||
# 只有“周三”没有时间,通常指最近的一个周三(如果今天是周三,可能指上周三)
|
||
# 但为了简单,如果今天是周三且没时间,我们暂定为今天
|
||
pass
|
||
|
||
target_date = today - timedelta(days=delta_days)
|
||
if time_match:
|
||
dt = datetime.combine(target_date, datetime.min.time().replace(hour=h, minute=m))
|
||
return dt.strftime("%Y-%m-%d %H:%M")
|
||
else:
|
||
return target_date.strftime("%Y-%m-%d 00:00")
|
||
|
||
# 4. YYYY年MM月DD日 HH:mm
|
||
# 简单匹配年月日
|
||
match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', clean_str)
|
||
if match:
|
||
y, m, d = map(int, match.groups())
|
||
# 找时间部分
|
||
time_match = re.search(r'(\d{1,2}):(\d{2})', clean_str)
|
||
if time_match:
|
||
hh, mm = map(int, time_match.groups())
|
||
dt = datetime(y, m, d, hh, mm)
|
||
return dt.strftime("%Y-%m-%d %H:%M")
|
||
else:
|
||
# 只有日期,没有时间 (通常是日期分隔符)
|
||
# 这种情况下,可能需要给个默认时间?或者就返回日期
|
||
return f"{y:04d}-{m:02d}-{d:02d} 00:00"
|
||
|
||
# 5. MM月DD日 HH:mm (跨年但未显示年份?微信通常会显示年份如果跨年)
|
||
# 处理 "1月26日 10:00"
|
||
match = re.search(r'(\d{1,2})月(\d{1,2})日', clean_str)
|
||
if match:
|
||
m, d = map(int, match.groups())
|
||
# 默认当年
|
||
y = today.year
|
||
# 找时间
|
||
time_match = re.search(r'(\d{1,2}):(\d{2})', clean_str)
|
||
if time_match:
|
||
hh, mm = map(int, time_match.groups())
|
||
dt = datetime(y, m, d, hh, mm)
|
||
# 如果计算出的时间在未来,可能是去年 (比如现在1月,消息是12月)
|
||
if dt > now:
|
||
dt = datetime(y - 1, m, d, hh, mm)
|
||
return dt.strftime("%Y-%m-%d %H:%M")
|
||
|
||
# 兜底:如果是 "下午 5:00" 这种格式
|
||
if "下午" in clean_str or "晚上" in clean_str:
|
||
t_part = re.sub(r'下午|晚上', '', clean_str).strip()
|
||
if re.match(r'^\d{1,2}:\d{2}$', t_part):
|
||
h, m = map(int, t_part.split(':'))
|
||
if h < 12: h += 12
|
||
dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m))
|
||
return dt.strftime("%Y-%m-%d %H:%M")
|
||
|
||
if "上午" in clean_str:
|
||
t_part = re.sub(r'上午', '', clean_str).strip()
|
||
if re.match(r'^\d{1,2}:\d{2}$', t_part):
|
||
h, m = map(int, t_part.split(':'))
|
||
dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m))
|
||
return dt.strftime("%Y-%m-%d %H:%M")
|
||
|
||
# 解析失败,返回空字符串,避免将无关文本(如电话号码)误认为时间注入到上下文中
|
||
return ""
|
||
except Exception as e:
|
||
logger.warning(f"时间解析失败 '{time_str}': {e}")
|
||
return ""
|
||
|
||
def get_next_debug_path(desc="step"):
|
||
"""获取下一个顺序命名的调试图片路径 (debug_N_desc.jpg)"""
|
||
global _debug_counter
|
||
_debug_counter += 1
|
||
filename = f"debug_{_debug_counter}_{desc}.jpg"
|
||
return os.path.join(OUTPUT_DIR, filename)
|
||
|
||
def clear_directory(dir_path, exclude_files=None):
|
||
"""清理指定目录下的所有文件,支持排除特定文件"""
|
||
if not os.path.exists(dir_path):
|
||
os.makedirs(dir_path)
|
||
return
|
||
|
||
if exclude_files is None:
|
||
exclude_files = []
|
||
|
||
import shutil
|
||
for filename in os.listdir(dir_path):
|
||
if filename in exclude_files:
|
||
continue
|
||
|
||
file_path = os.path.join(dir_path, filename)
|
||
try:
|
||
if os.path.isfile(file_path) or os.path.islink(file_path):
|
||
os.unlink(file_path)
|
||
elif os.path.isdir(file_path):
|
||
shutil.rmtree(file_path)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to delete {file_path}. Reason: {e}")
|
||
|
||
def setup_script_environment():
|
||
"""运行前清理日志和输出目录"""
|
||
logger.info("清理运行环境: Logs 和 Output 目录...")
|
||
# 重置调试计数器
|
||
global _debug_counter
|
||
_debug_counter = 0
|
||
# 排除当前正在使用的日志文件
|
||
clear_directory(LOG_DIR, exclude_files=["T2_ChatMonitor.log", "WxUtil.log"])
|
||
clear_directory(OUTPUT_DIR)
|
||
|
||
def connect_device():
|
||
"""
|
||
连接设备并返回设备对象,同时打印详细的设备信息
|
||
"""
|
||
try:
|
||
d = u2.connect()
|
||
# 强制检查连接是否可用
|
||
if not d.info:
|
||
logger.error("设备连接不可用 (d.info is empty)")
|
||
return None
|
||
|
||
# 获取可靠的序列号
|
||
device_serial = d.serial if hasattr(d, 'serial') else "未知"
|
||
logger.info(f"设备连接成功: {device_serial}")
|
||
|
||
# 获取并打印详细设备信息
|
||
device_info = d.device_info
|
||
logger.info(f"详细设备信息: 品牌={device_info.get('brand')}, 型号={device_info.get('model')}, SDK={device_info.get('sdk')}")
|
||
return d
|
||
except Exception as e:
|
||
logger.error(f"设备连接失败: {e}")
|
||
return None
|
||
|
||
def safe_device_click(d, x, y):
|
||
"""
|
||
安全的点击操作,包含简单的异常捕获和重试逻辑
|
||
"""
|
||
try:
|
||
d.click(x, y)
|
||
return True
|
||
except Exception as e:
|
||
logger.warning(f"点击操作失败 ({x}, {y}): {e},尝试重新连接并重试...")
|
||
try:
|
||
# 尝试重新初始化连接
|
||
new_d = u2.connect()
|
||
new_d.click(x, y)
|
||
return True
|
||
except Exception as e2:
|
||
logger.error(f"重试点击操作依然失败: {e2}")
|
||
return False
|
||
|
||
def draw_debug_info(image_path, messages, current_voice_center=None, suffix=""):
|
||
"""
|
||
辅助函数:在截图中绘制当前已知的消息状态
|
||
:param image_path: 图片路径
|
||
:param messages: 消息列表
|
||
:param current_voice_center: 当前正在处理的语音中心坐标 (vx, vy)
|
||
:param suffix: 保存文件名的后缀
|
||
"""
|
||
try:
|
||
img = cv2.imread(image_path)
|
||
if img is None: return
|
||
|
||
for msg in messages:
|
||
if msg['type'] == 'voice':
|
||
ax, ay = msg['center']
|
||
is_unread = msg.get('is_unread', False)
|
||
is_converted = msg.get('is_converted', False)
|
||
|
||
# 绘制框
|
||
color = (0, 0, 255) if is_unread else (0, 255, 0)
|
||
cv2.rectangle(img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
|
||
|
||
# 绘制 YES/NO
|
||
label = "YES" if is_converted else "NO"
|
||
cv2.putText(img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
|
||
|
||
# 如果是当前正在处理的语音,画一个额外的黄圈
|
||
if current_voice_center and abs(ax - current_voice_center[0]) < 10 and abs(ay - current_voice_center[1]) < 10:
|
||
cv2.circle(img, (ax, ay), 40, (0, 255, 255), 3)
|
||
cv2.putText(img, "PROCESSING", (ax - 60, ay - 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
|
||
|
||
# 保存覆盖后的图片
|
||
cv2.imwrite(image_path, img)
|
||
logger.info(f"已更新调试标记到截图: {image_path}")
|
||
except Exception as e:
|
||
logger.warning(f"绘制调试信息失败: {e}")
|
||
|
||
def _detect_bubble_color(img, bbox):
|
||
"""
|
||
检测文本框区域的背景颜色,用于辅助判断发送者。
|
||
:param img: OpenCV 图像 (BGR)
|
||
:param bbox: OCR 返回的边界框 4个点
|
||
:return: "green" (我), "white" (对方), or "unknown"
|
||
"""
|
||
if img is None: return "unknown"
|
||
|
||
# 提取 bbox 区域
|
||
h, w = img.shape[:2]
|
||
min_x = max(0, int(min(p[0] for p in bbox)))
|
||
max_x = min(w, int(max(p[0] for p in bbox)))
|
||
min_y = max(0, int(min(p[1] for p in bbox)))
|
||
max_y = min(h, int(max(p[1] for p in bbox)))
|
||
|
||
if max_x <= min_x or max_y <= min_y:
|
||
return "unknown"
|
||
|
||
roi = img[min_y:max_y, min_x:max_x]
|
||
|
||
# 计算背景颜色 (抗文字干扰)
|
||
# 文本是黑色的 (0,0,0),会拉低平均值/中位数
|
||
# 使用 95% 分位数来获取背景色 (偏亮的部分 - 真正的背景)
|
||
try:
|
||
# axis=(0,1) 对 h,w 维度操作,保留 c 维度
|
||
# percentile 返回 float,需转 int
|
||
bg_color = np.percentile(roi, 95, axis=(0, 1))
|
||
b, g, r = bg_color
|
||
except Exception:
|
||
# Fallback
|
||
mean_color = cv2.mean(roi)[:3]
|
||
b, g, r = mean_color
|
||
|
||
# 调试日志:打印颜色值
|
||
logger.info(f"Color Debug: B={b:.1f}, G={g:.1f}, R={r:.1f} | bbox={bbox}")
|
||
|
||
# 绿色气泡特征 (Light Mode):
|
||
# R: 152, G: 225, B: 101 (BGR: 101, 225, 152)
|
||
# G 显著大于 R 和 B
|
||
# 提高阈值以区分白色/灰色背景的噪声 (White: 255, 255, 255)
|
||
if g > r + 30 and g > b + 30 and g > 100:
|
||
return "green"
|
||
|
||
# 白色气泡特征:
|
||
# R, G, B 都很高且接近
|
||
# 考虑黑色文字的影响,如果是中位数,应该很高 (>200)
|
||
# 放宽对灰色的容忍度 (Dark Mode 可能偏灰)
|
||
if abs(r - g) < 30 and abs(g - b) < 30 and abs(r - b) < 30:
|
||
# 且亮度不能太低 (太低可能是黑色背景或深色物体)
|
||
if g > 150:
|
||
return "white"
|
||
|
||
# 特殊补丁:如果 B, G, R 都很接近且在 130 左右,可能是微信的背景灰 (通常用于时间戳或系统消息)
|
||
if 110 < r < 160 and 110 < g < 160 and 110 < b < 160:
|
||
if abs(r - g) < 15 and abs(g - b) < 15:
|
||
return "system_gray"
|
||
|
||
return "unknown"
|
||
|
||
def _scan_chat_messages(image_path):
|
||
"""
|
||
内部函数:扫描图片中的微信消息(语音、文本、红点)
|
||
返回: (messages_list, debug_image)
|
||
"""
|
||
img = cv2.imread(image_path)
|
||
if img is None:
|
||
logger.error(f"无法读取图片: {image_path}")
|
||
return [], None
|
||
h, w = img.shape[:2]
|
||
logger.info(f"DEBUG: Image size w={w}, h={h}")
|
||
|
||
# 3. 模板匹配寻找语音图标和红点
|
||
audio_template = os.path.join(TEMPLATE_DIR, "audio.jpg")
|
||
red_point_template = os.path.join(TEMPLATE_DIR, "red_point.jpg")
|
||
|
||
audio_matches = find_all_template_matches(image_path, audio_template, threshold=0.8)
|
||
red_points = find_all_template_matches(image_path, red_point_template, threshold=0.8)
|
||
|
||
# 4. OCR 识别所有文本
|
||
logger.info("正在执行 OCR 识别...")
|
||
ocr_results = ocr_kit.read_text(image_path)
|
||
|
||
# 4.5 尝试提取聊天标题 (对方昵称)
|
||
chat_title = "对方"
|
||
potential_titles = []
|
||
for bbox, text, conf in ocr_results:
|
||
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
|
||
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
|
||
# 标题区域通常在顶部 (状态栏下方,消息列表上方)
|
||
if 60 < c_y < 140:
|
||
clean = text.strip()
|
||
# 排除时间、信号、返回按钮等
|
||
if re.match(r'^\d{1,2}:\d{2}$', clean): continue
|
||
if "微信" in clean or "WeChat" in clean: continue
|
||
if clean in ["<", "返回", "消息", "(", ")"]: continue
|
||
if re.match(r'^\d+$', clean): continue # 排除纯数字(如未读数)
|
||
if len(clean) > 0:
|
||
potential_titles.append((c_x, clean))
|
||
|
||
if potential_titles:
|
||
# 优先取最接近水平中心的文本作为标题
|
||
potential_titles.sort(key=lambda x: abs(x[0] - w/2))
|
||
chat_title = potential_titles[0][1]
|
||
# 去除可能包含的括号(比如备注名后的群聊人数,虽然后面会被截断)
|
||
chat_title = re.sub(r'\(\d+\)$', '', chat_title).strip()
|
||
logger.info(f"识别到聊天标题/对方昵称: {chat_title}")
|
||
|
||
# 微信菜单关键字(用于排除干扰)
|
||
MENU_KEYWORDS = ["听筒播放", "收藏", "背景播放", "删除", "多选", "取消转文字", "转文字", "引用", "提醒"]
|
||
# 忽略的系统消息内容
|
||
IGNORE_CONTENT = ["撤回了一条消息", "打招呼的消息", "拍了拍", "你撤回了一条消息", "引用", "Clear Text", "Switch IME", "Done", "按住说话", "发送"]
|
||
|
||
# 5. 整合所有消息
|
||
messages = []
|
||
debug_img = img.copy() # 初始化调试图
|
||
|
||
# 绘制过滤区域边界 (可视化)
|
||
cv2.line(debug_img, (0, 150), (w, 150), (255, 0, 255), 2) # 顶部线
|
||
cv2.line(debug_img, (0, h - 60), (w, h - 60), (255, 0, 255), 2) # 底部线 (放宽到底部 60px)
|
||
cv2.putText(debug_img, "TOP_FILTER", (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
|
||
cv2.putText(debug_img, "BOTTOM_FILTER", (10, h - 70), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
|
||
|
||
claimed_ocr_indices = set()
|
||
|
||
# A. 添加语音消息
|
||
for ax, ay in audio_matches:
|
||
# 标记所有找到的语音图标 (用于调试)
|
||
cv2.circle(debug_img, (ax, ay), 10, (255, 255, 0), -1)
|
||
|
||
# 过滤掉顶部和底部的非聊天区域
|
||
if ay < 150 or ay > h - 60:
|
||
logger.info(f"忽略区域外语音图标: ({ax}, {ay})")
|
||
cv2.rectangle(debug_img, (ax-35, ay-35), (ax+35, ay+35), (128, 128, 128), 1)
|
||
cv2.putText(debug_img, "FILTERED", (ax - 40, ay - 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1)
|
||
continue
|
||
|
||
sender = "对方" if ax < w / 2 else "我"
|
||
is_unread = False
|
||
for rx, ry in red_points:
|
||
# 红点通常在语音图标右侧且 Y 轴相近
|
||
if abs(ry - ay) < 50 and rx > ax:
|
||
is_unread = True
|
||
break
|
||
|
||
# 改进:判断是否已转文字
|
||
is_converted = False
|
||
converted_trigger_text = ""
|
||
associated_texts = [] # 存储关联的多行文本 [(y, x, text)]
|
||
|
||
for i, (bbox, text, conf) in enumerate(ocr_results):
|
||
if i in claimed_ocr_indices: continue
|
||
|
||
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
|
||
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
|
||
|
||
# 判定逻辑:文本在语音下方且水平偏移不大
|
||
# 1. Y轴限制: -50 < dy < 800 (适配多行文本)
|
||
# 2. X轴限制: abs(dx) < 500 (减少误判,防止关联到屏幕另一侧的消息)
|
||
# 3. 几何位置强校验 (核心修复)
|
||
voice_is_left = ax < w / 2
|
||
|
||
# 获取文本框的左右边界
|
||
min_x = min(p[0] for p in bbox)
|
||
max_x = max(p[0] for p in bbox)
|
||
|
||
if voice_is_left:
|
||
# 语音在左 (对方): 文本必须也是左对齐
|
||
# - min_x 必须靠左 (< 450) [Fix] 放宽阈值,防止长文本或缩进文本被过滤
|
||
# - max_x 不能太靠右 (> w - 150),否则可能是"我"的消息
|
||
if min_x > 450 or max_x > w - 150:
|
||
logger.debug(f"忽略文本 '{text[:10]}' (Left Voice): min_x={min_x}, max_x={max_x} 不满足左对齐条件")
|
||
continue
|
||
else:
|
||
# 语音在右 (我): 文本必须也是右对齐
|
||
# - max_x 必须靠右 (> w - 300)
|
||
# - min_x 不能太靠左 (< 100)
|
||
if max_x < w - 300 or min_x < 100:
|
||
logger.debug(f"忽略文本 '{text[:10]}' (Right Voice): min_x={min_x}, max_x={max_x} 不满足右对齐条件")
|
||
continue
|
||
|
||
# [Fix] 放宽 X 轴判定范围 (500 -> 600) 以适应更宽的文本
|
||
if -50 < c_y - ay < 800 and abs(c_x - ax) < 600:
|
||
# 检查中间是否有其他语音图标
|
||
has_intermediate_audio = False
|
||
for other_ax, other_ay in audio_matches:
|
||
# 只有当中间的语音图标在 [150, h-60] 的有效聊天区域内时,才视为阻断
|
||
if ay + 20 < other_ay < c_y - 10:
|
||
if 150 <= other_ay <= h - 60:
|
||
has_intermediate_audio = True
|
||
logger.info(f"语音({ax},{ay}) 被中间有效区域内的语音图标({other_ax},{other_ay}) 阻断,无法关联文本 '{text[:10]}...'")
|
||
break
|
||
else:
|
||
logger.info(f"语音({ax},{ay}) 忽略非聊天区域(Y={other_ay})的语音图标阻断")
|
||
|
||
if has_intermediate_audio:
|
||
continue
|
||
|
||
# [Fix] 检查中间是否有其他气泡消息阻断 (防止跨消息合并)
|
||
# 如果遇到一个明确属于另一方的消息气泡,必须停止关联
|
||
if c_y > ay + 60: # 稍微放宽 Y 轴,避免误判紧贴的转换文本
|
||
bubble_color = _detect_bubble_color(img, bbox)
|
||
|
||
if voice_is_left: # 语音在左 (对方)
|
||
# 如果遇到绿色气泡 (我),或者是明显的右对齐文本,视为阻断
|
||
if bubble_color == "green":
|
||
logger.info(f"语音({ax},{ay}) 被中间'我'的消息(绿色气泡)阻断: '{text[:10]}...'")
|
||
break
|
||
if c_x > w * 0.65: # 右侧明显区域 (short message check)
|
||
logger.info(f"语音({ax},{ay}) 被中间'我'的消息(右对齐)阻断: '{text[:10]}...'")
|
||
break
|
||
|
||
else: # 语音在右 (我)
|
||
# 如果遇到白色气泡 (对方),或者是明显的左对齐文本,视为阻断
|
||
if bubble_color == "white":
|
||
logger.info(f"语音({ax},{ay}) 被中间'对方'的消息(白色气泡)阻断: '{text[:10]}...'")
|
||
break
|
||
if c_x < w * 0.35: # 左侧明显区域
|
||
logger.info(f"语音({ax},{ay}) 被中间'对方'的消息(左对齐)阻断: '{text[:10]}...'")
|
||
break
|
||
|
||
clean_text = text.strip()
|
||
# 判定是否为时间戳
|
||
is_timestamp = re.search(r'(\d{1,2}:\d{2})', clean_text) and (len(clean_text) < 15)
|
||
# 判定是否为纯数字或时长
|
||
is_duration = re.search(r'\d{1,2}"?$', clean_text) and len(clean_text) < 6
|
||
# 判定是否为系统消息
|
||
is_ignored = any(k in clean_text for k in IGNORE_CONTENT)
|
||
|
||
# 噪音判定 (例如 "少3"")
|
||
is_noise = "少" in clean_text and len(clean_text) < 8 and re.search(r'\d', clean_text)
|
||
|
||
if not is_duration and not is_timestamp and clean_text not in MENU_KEYWORDS and not is_ignored and not is_noise:
|
||
is_converted = True
|
||
associated_texts.append((c_y, c_x, clean_text))
|
||
claimed_ocr_indices.add(i)
|
||
# 不再 break,继续寻找后续文本行
|
||
else:
|
||
# 这些文本虽然不作为内容,但它们属于语音消息的附属信息,标记为已处理
|
||
claimed_ocr_indices.add(i)
|
||
|
||
if is_timestamp:
|
||
logger.info(f"语音({ax},{ay}) 忽略下方时间戳文本: '{clean_text}'")
|
||
elif is_duration:
|
||
logger.info(f"语音({ax},{ay}) 忽略时长文本: '{clean_text}'")
|
||
elif is_noise:
|
||
logger.info(f"语音({ax},{ay}) 忽略噪音文本: '{clean_text}'")
|
||
elif is_ignored:
|
||
logger.info(f"语音({ax},{ay}) 忽略系统消息文本: '{clean_text}'")
|
||
else:
|
||
logger.info(f"语音({ax},{ay}) 忽略其他文本(可能是菜单): '{clean_text}'")
|
||
|
||
# 整合所有关联文本
|
||
if associated_texts:
|
||
# 按 Y 轴排序,如果 Y 接近则按 X 轴排序
|
||
associated_texts.sort(key=lambda x: (x[0], x[1]))
|
||
converted_trigger_text = "".join([t[2] for t in associated_texts])
|
||
|
||
# 去除已知噪音
|
||
noise_patterns = ["42IIhK+-语音输入粘贴#", "语音输入粘贴"]
|
||
for np in noise_patterns:
|
||
converted_trigger_text = converted_trigger_text.replace(np, "")
|
||
converted_trigger_text = converted_trigger_text.strip()
|
||
|
||
logger.info(f"语音({ax},{ay}) 判定为已转换,最终合并文本: '{converted_trigger_text}'")
|
||
|
||
if is_converted:
|
||
logger.info(f"语音消息 ({ax}, {ay}) 已有转换文字: '{converted_trigger_text}',跳过")
|
||
|
||
# 绘图反馈
|
||
color = (0, 0, 255) if is_unread else (0, 255, 0)
|
||
cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
|
||
label = "YES" if is_converted else "NO"
|
||
cv2.putText(debug_img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
|
||
|
||
messages.append({
|
||
"type": "voice",
|
||
"sender": sender,
|
||
"center": (ax, ay),
|
||
"y": ay,
|
||
"is_unread": is_unread,
|
||
"is_converted": is_converted,
|
||
"content": converted_trigger_text if is_converted else None
|
||
})
|
||
|
||
# B. 添加文本消息
|
||
for i, (bbox, text, conf) in enumerate(ocr_results):
|
||
if i in claimed_ocr_indices: continue
|
||
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
|
||
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
|
||
|
||
if c_y < 150 or c_y > h - 60:
|
||
continue
|
||
|
||
# 判定发送者 (增强版: 几何 + 颜色)
|
||
# 1. 尝试通过背景颜色判定 (最准确)
|
||
sender_color = _detect_bubble_color(img, bbox)
|
||
|
||
sender = "unknown"
|
||
if sender_color == "green":
|
||
sender = "我"
|
||
elif sender_color == "white":
|
||
sender = "对方"
|
||
elif sender_color == "system_gray":
|
||
# 灰底文字通常是时间戳或系统消息,由系统发送,几何上居中
|
||
sender = "system"
|
||
|
||
# 2. 几何特征强制修正 (Double Check)
|
||
# 假设头像+边距约占 15% 宽度
|
||
edge_margin = w * 0.15
|
||
min_x = min(p[0] for p in bbox)
|
||
max_x = max(p[0] for p in bbox)
|
||
|
||
# 规则 A: 如果这一行极其靠右 (超过 85% 宽度),那肯定是"我"
|
||
# 即使颜色判成了白色 (比如光照问题),也得纠正回来
|
||
if max_x > w - edge_margin:
|
||
if sender == "对方":
|
||
logger.warning(f"Sender detected as '对方' by color but geometry says '我' (max_x={max_x} > {w-edge_margin}). Correcting to '我'.")
|
||
sender = "我"
|
||
|
||
# 规则 B: 如果这一行极其靠左 (小于 35% 宽度),且不靠右,那肯定是"对方"
|
||
# 扩大判定范围,防止因为 OCR 稍微缩进导致判定失效
|
||
# 注意:如果颜色明确为"我"(绿色),则跳过此规则,因为"我"的长消息也可能靠左
|
||
elif min_x < w * 0.35 and max_x < w * 0.75: # 修正:max_x 阈值从 0.85 降低到 0.75
|
||
if sender == "我":
|
||
logger.info(f"Geometry says '对方' (min_x={min_x} < {w*0.35}) but Color is '我' (Green). Trusting Color.")
|
||
elif sender == "system":
|
||
# 即使颜色是系统灰,但如果位置极其靠左,也可能是对方的某种特殊气泡
|
||
pass
|
||
else:
|
||
sender = "对方"
|
||
|
||
# 规则 C: 如果颜色是 unknown,且不在极端位置,使用中心点兜底
|
||
if sender == "unknown":
|
||
c_x = int((min_x + max_x) / 2)
|
||
# 简单中心判断
|
||
if c_x < w / 2: sender = "对方"
|
||
else: sender = "我"
|
||
|
||
# 规则 D: 强几何中心校验 (Final Geometry Verdict)
|
||
# 仅对短消息使用强几何校验 (宽度 < 70% 屏幕宽度)
|
||
# 长消息通常铺满屏幕,中心点在中间,容易受字体渲染影响导致误判,应信任颜色检测结果
|
||
box_width = max_x - min_x
|
||
if box_width < w * 0.7:
|
||
# 如果中心点明显在左半屏 ( < 45% ),判定为"对方"
|
||
if c_x < w * 0.45:
|
||
# [Fix] 如果颜色明确是绿色,说明是"我"的左对齐文本(长文换行),不应被几何规则强制改为"对方"
|
||
if sender == "我" and sender_color == "green":
|
||
logger.info(f"Geometry says '对方' (center={c_x} < {w*0.45}) but Color is 'green'. Keeping '我'.")
|
||
elif sender == "system":
|
||
# 系统消息允许居中或偏左
|
||
pass
|
||
else:
|
||
if sender == "我":
|
||
logger.warning(f"Sender detected as '我' by color but center is left ({c_x} < {w*0.45}). Correcting to '对方'.")
|
||
sender = "对方"
|
||
# 如果中心点明显在右半屏 ( > 55% ),判定为"我"
|
||
elif c_x > w * 0.55:
|
||
if sender == "对方":
|
||
logger.warning(f"Sender detected as '对方' by color but center is right ({c_x} > {w*0.55}). Correcting to '我'.")
|
||
elif sender == "system":
|
||
pass
|
||
else:
|
||
sender = "我"
|
||
else:
|
||
logger.info(f"Message in middle zone ({w*0.45} < {c_x} < {w*0.55}), trusting color detection: {sender}")
|
||
else:
|
||
logger.info(f"Wide message (width={box_width} > {w*0.7}), skipping geometry check, trusting color: {sender}")
|
||
|
||
time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)'
|
||
# 优先判断是否为独立的时间戳 (行短且符合时间格式)
|
||
if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)):
|
||
# 进一步校验是否真的是时间 (通过 parse_wechat_time 尝试解析,或者简单正则)
|
||
# 这里我们假设短行的符合 time_pattern 的都是时间标记
|
||
logger.info(f"识别到时间戳/日期: '{text}'")
|
||
messages.append({
|
||
"type": "timestamp",
|
||
"content": text.strip(),
|
||
"y": c_y,
|
||
"center": (c_x, c_y)
|
||
})
|
||
continue
|
||
|
||
clean_text = text.strip()
|
||
if re.match(r'^.?[0-9]{1,2}"?$', clean_text):
|
||
logger.info(f"忽略疑似时长文本: '{clean_text}'")
|
||
continue
|
||
|
||
# 噪音判定 (例如 "少3"")
|
||
if "少" in clean_text and len(clean_text) < 8 and re.search(r'\d', clean_text):
|
||
logger.info(f"忽略噪音文本: '{clean_text}'")
|
||
continue
|
||
|
||
if clean_text in MENU_KEYWORDS:
|
||
logger.info(f"忽略菜单关键词: '{clean_text}'")
|
||
continue
|
||
if any(k in clean_text for k in IGNORE_CONTENT):
|
||
logger.info(f"忽略系统消息内容: '{clean_text}'")
|
||
continue
|
||
|
||
|
||
messages.append({
|
||
"type": "text",
|
||
"sender": sender,
|
||
"content": text.strip(),
|
||
"center": (c_x, c_y),
|
||
"y": c_y
|
||
})
|
||
|
||
# 6. 排序
|
||
messages.sort(key=lambda x: x['y'])
|
||
|
||
# 7. 注入时间戳
|
||
current_time_str = None
|
||
|
||
# 过滤掉 timestamp 类型的消息,将其作为属性注入到后续消息中
|
||
final_messages_with_time = []
|
||
|
||
for msg in messages:
|
||
if msg['type'] == 'timestamp':
|
||
# 更新当前时间上下文
|
||
parsed_time = parse_wechat_time(msg['content'])
|
||
current_time_str = parsed_time
|
||
logger.info(f"更新时间上下文: {msg['content']} -> {parsed_time}")
|
||
else:
|
||
# 只有语音和文本消息需要注入时间
|
||
if current_time_str:
|
||
msg['time_display'] = current_time_str
|
||
else:
|
||
# 如果上方没有时间戳,尝试默认使用当天日期 (或者保持 None)
|
||
# 对于首屏最上面的消息,可能没有时间戳
|
||
pass
|
||
final_messages_with_time.append(msg)
|
||
|
||
return final_messages_with_time, debug_img, chat_title
|
||
|
||
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方", process_strategy="ALL", restore_processed_voice=True):
|
||
"""
|
||
全面采用 CV + OCR 识别微信聊天截图中的最后一条消息
|
||
:param process_strategy: 语音处理策略 (ALL/UNREAD/LAST)
|
||
:param restore_processed_voice: 是否在转文字后还原(隐藏文字)。默认为 True。
|
||
设为 False 可防止最后一条消息在无内容时陷入"转文字->还原->空内容"的死循环。
|
||
注意:此函数现在包含一个循环,如果发现需要转文字的语音,会逐个处理并重新截图。
|
||
"""
|
||
try:
|
||
d = device if device else connect_device()
|
||
if not d:
|
||
return [], None
|
||
|
||
current_image_path = image_path
|
||
current_output_path = output_path
|
||
|
||
final_messages = []
|
||
loop_count = 0
|
||
MAX_LOOPS = 10 # 增加循环次数上限,适应 ALL 策略
|
||
|
||
# 统计计数器
|
||
total_voices_count = 0
|
||
convert_opened_count = 0
|
||
convert_closed_count = 0
|
||
|
||
# 记录本次会话已处理过的语音 Y 坐标集合
|
||
processed_y_coords = set()
|
||
# 记录 Peek-and-Restore 过程中抓取到的语音内容 {y_coord: content}
|
||
captured_voice_contents = {}
|
||
|
||
# 初始化异步任务列表
|
||
analyze_chat_image._ocr_tasks = []
|
||
|
||
while loop_count < MAX_LOOPS:
|
||
loop_count += 1
|
||
logger.info(f"--- 分析循环 第 {loop_count} 次 ---")
|
||
|
||
# 1. 扫描当前屏幕
|
||
messages, debug_img, chat_title = _scan_chat_messages(current_image_path)
|
||
if messages is None: # 读取失败
|
||
return [], None
|
||
|
||
# 更新消息发送者名称 (将 "对方" 替换为 实际标题)
|
||
if chat_title and chat_title != "对方":
|
||
for m in messages:
|
||
if m['sender'] == "对方":
|
||
m['sender'] = chat_title
|
||
|
||
# 保存当前状态的调试图
|
||
if current_output_path:
|
||
cv2.imwrite(current_output_path, debug_img)
|
||
logger.info(f"调试图已保存: {current_output_path}")
|
||
|
||
# 2. 筛选需要处理的语音
|
||
all_voices = [m for m in messages if m['type'] == 'voice']
|
||
all_voices.sort(key=lambda x: x['y']) # 从上到下
|
||
|
||
# 更新统计 (取当前扫描到的数量)
|
||
total_voices_count = len(all_voices)
|
||
|
||
# Helper: 检查是否已处理
|
||
def is_processed(y_coord):
|
||
for py in processed_y_coords:
|
||
if abs(y_coord - py) < 20: # 20px 容差
|
||
return True
|
||
return False
|
||
|
||
target_voices = []
|
||
if process_strategy == "ALL":
|
||
# ALL 策略:处理所有未被记录处理过的、且未转换的语音
|
||
target_voices = [m for m in all_voices if not m.get('is_converted') and not is_processed(m['y'])]
|
||
logger.info(f"策略(ALL): 发现 {len(target_voices)} 条未转换待处理语音")
|
||
elif process_strategy == "UNREAD":
|
||
# UNREAD 策略:只处理未读且未转换且未处理过的
|
||
target_voices = [m for m in all_voices if m.get('is_unread') and not m.get('is_converted') and not is_processed(m['y'])]
|
||
logger.info(f"策略(UNREAD): 发现 {len(target_voices)} 条未读待处理语音")
|
||
elif process_strategy == "LAST":
|
||
# LAST 策略:只处理最后一条未转换的
|
||
unconverted = [m for m in all_voices if not m.get('is_converted')]
|
||
if unconverted:
|
||
last_voice = unconverted[-1]
|
||
if not is_processed(last_voice['y']):
|
||
target_voices = [last_voice]
|
||
logger.info(f"策略(LAST): 仅关注最后一条未转换语音")
|
||
|
||
# 如果没有需要处理的语音,或者我们已经达到了策略要求,退出循环
|
||
if not target_voices:
|
||
logger.info("当前屏幕无待处理语音,分析结束")
|
||
final_messages = messages
|
||
break
|
||
|
||
# 3. 处理第一条目标语音
|
||
# 注意:只处理第一条,因为处理后界面会变动(展开文字),坐标会失效
|
||
target = target_voices[0]
|
||
vx, vy = int(target['center'][0]), int(target['center'][1])
|
||
|
||
# 标记为已处理
|
||
processed_y_coords.add(target['y'])
|
||
|
||
logger.info(f"准备处理语音 ({vx}, {vy})...")
|
||
|
||
# 高亮正在处理的语音并保存更新后的调试图
|
||
draw_debug_info(current_output_path, messages, current_voice_center=(vx, vy))
|
||
|
||
# 执行操作:长按 -> 转文字
|
||
logger.info(f"正在长按语音消息 ({vx}, {vy})...")
|
||
d.long_click(vx, vy, 1.0) # 缩短按压时间
|
||
|
||
# 轮询寻找“转文字”按钮
|
||
logger.info("正在快速寻找'转文字'按钮...")
|
||
zhuan_template = os.path.join(TEMPLATE_DIR, "zhun_wen_zi.jpg")
|
||
btn_pos = None
|
||
|
||
poll_start = time.time()
|
||
while time.time() - poll_start < 3.0: # 最多等 3 秒
|
||
menu_shot = get_next_debug_path("step_long_press_poll")
|
||
d.screenshot(menu_shot)
|
||
btn_pos = find_template_match(menu_shot, zhuan_template, threshold=0.7)
|
||
if btn_pos:
|
||
break
|
||
time.sleep(0.2) # 快速轮询
|
||
|
||
if btn_pos:
|
||
btn_x, btn_y = int(btn_pos[0]), int(btn_pos[1])
|
||
logger.info(f"✅ 找到'转文字'按钮: ({btn_x}, {btn_y}),点击中...")
|
||
safe_device_click(d, btn_x, btn_y)
|
||
convert_opened_count += 1
|
||
|
||
logger.info("等待语音转文字完成...")
|
||
time.sleep(3.0) # 缩短等待时间 (原5.0s)
|
||
|
||
# --- Peek-and-Restore 逻辑 (异步优化版) ---
|
||
|
||
# 1. 截图 (但不立即 OCR,而是丢给异步任务)
|
||
peek_shot = get_next_debug_path("step_peek_content")
|
||
d.screenshot(peek_shot)
|
||
logger.info(f"已截图 {peek_shot},启动异步OCR任务以提取内容...")
|
||
|
||
async def _async_ocr_task(img_path, target_y):
|
||
"""内部异步任务:在线程池中运行 OCR"""
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
# 在默认执行器(线程池)中运行耗时的 _scan_chat_messages
|
||
logger.info(f"🚀 [Async OCR] 开始分析截图 {os.path.basename(img_path)} (目标 Y={target_y})")
|
||
msgs, _, _ = await loop.run_in_executor(None, _scan_chat_messages, img_path)
|
||
|
||
found = None
|
||
# 收集所有可能是该语音消息转换出的文本
|
||
all_found_texts = []
|
||
for pm in msgs:
|
||
if pm['type'] == 'voice' and pm.get('is_converted'):
|
||
# 容差稍微放大,因为转文字展开后 Y 坐标会变
|
||
if abs(pm['y'] - target_y) < 150: # 进一步放宽容差
|
||
content = pm.get('content', '').strip()
|
||
if content:
|
||
all_found_texts.append((pm['y'], content))
|
||
|
||
if all_found_texts:
|
||
# 按 Y 轴排序,确保多行文本顺序正确
|
||
all_found_texts.sort(key=lambda x: x[0])
|
||
found = " ".join([t[1] for t in all_found_texts])
|
||
logger.info(f"✨ [Async OCR] 在 Y={target_y} 附近找到转换文字: {found}")
|
||
|
||
if not found:
|
||
logger.warning(f"⚠️ [Async OCR] 未能在 Y={target_y} 附近找到已转换文字")
|
||
return target_y, found
|
||
except Exception as e:
|
||
logger.error(f"❌ [Async OCR] 任务执行失败: {e}")
|
||
return target_y, None
|
||
|
||
# 创建并保存任务
|
||
task = asyncio.create_task(_async_ocr_task(peek_shot, vy))
|
||
# 我们需要一个列表来保存任务,这里临时利用 list
|
||
if not hasattr(analyze_chat_image, "_ocr_tasks"):
|
||
analyze_chat_image._ocr_tasks = []
|
||
analyze_chat_image._ocr_tasks.append(task)
|
||
|
||
# 2. 还原状态 (取消转文字)
|
||
# 注意:由于 OCR 还没出结果,我们无法精确定位展开后的文字位置
|
||
# 但通常点击原语音气泡位置 (vx, vy) 也能触发菜单
|
||
if restore_processed_voice:
|
||
logger.info("准备还原状态 (取消转文字)...")
|
||
|
||
d.long_click(vx, vy, 1.0) # 盲点原坐标
|
||
|
||
logger.info("正在快速寻找'隐藏文字'按钮...")
|
||
cancel_template = os.path.join(TEMPLATE_DIR, "cancel_zhuan_wen_zi.jpg")
|
||
cancel_btn = None
|
||
|
||
poll_start = time.time()
|
||
while time.time() - poll_start < 3.0:
|
||
restore_menu_shot = get_next_debug_path("step_restore_poll")
|
||
d.screenshot(restore_menu_shot)
|
||
cancel_btn = find_template_match(restore_menu_shot, cancel_template, threshold=0.7)
|
||
if cancel_btn:
|
||
break
|
||
time.sleep(0.2)
|
||
|
||
if cancel_btn:
|
||
cx, cy = int(cancel_btn[0]), int(cancel_btn[1])
|
||
logger.info(f"✅ 找到'隐藏文字'按钮: ({cx}, {cy}),点击还原...")
|
||
safe_device_click(d, cx, cy)
|
||
convert_closed_count += 1
|
||
time.sleep(2.0) # 等待收起动画
|
||
else:
|
||
logger.warning("❌ 未找到'隐藏文字'按钮,无法还原状态!(后续可能导致重复处理)")
|
||
|
||
# 3. 准备下一次循环
|
||
if len(target_voices) == 1:
|
||
logger.info("✅ 当前屏幕所有目标语音已处理完毕,无需再次全屏扫描。")
|
||
final_messages = messages # 使用本轮初始扫描的消息列表
|
||
break
|
||
|
||
# 重新截图,因为界面可能微调,或者只是恢复了
|
||
next_screenshot = get_next_debug_path("step_restored")
|
||
d.screenshot(next_screenshot)
|
||
|
||
current_image_path = next_screenshot
|
||
current_output_path = get_next_debug_path("flag_restored")
|
||
|
||
continue
|
||
else:
|
||
logger.info("⏩ [配置] 跳过还原状态步骤 (保持文字展开)。")
|
||
# 即使不还原,我们也不建议继续处理下一条,因为界面已经大幅变动(展开了文字)。
|
||
# 除非我们重新截图并重新定位。
|
||
# 但在这里,如果 restore_processed_voice=False,通常意味着我们只关心最后一条(LAST策略),或者我们接受界面变动。
|
||
|
||
# 为了安全起见,如果不还原,我们最好终止循环(假设只处理这一条,或者下一轮主循环再处理其他的)
|
||
# 否则后续的 target_voices 坐标全都不准了。
|
||
logger.info("🛑 因不还原状态,终止本轮多语音处理循环,等待下一次主监控循环。")
|
||
final_messages = messages # 这里的 messages 其实是展开前的,但没关系,我们的内容通过 captured_voice_contents 注入
|
||
break
|
||
|
||
else:
|
||
logger.warning("❌ 未找到'转文字'按钮,可能是已转换或误判")
|
||
# 即使失败,也已记录在 processed_y_coords 中,避免死循环
|
||
# 继续尝试下一条语音
|
||
logger.info("跳过当前语音,继续扫描...")
|
||
continue
|
||
|
||
# 循环结束后,等待所有异步 OCR 任务完成
|
||
if hasattr(analyze_chat_image, "_ocr_tasks") and analyze_chat_image._ocr_tasks:
|
||
logger.info(f"等待 {len(analyze_chat_image._ocr_tasks)} 个异步 OCR 任务完成...")
|
||
results = await asyncio.gather(*analyze_chat_image._ocr_tasks)
|
||
for y, content in results:
|
||
if content:
|
||
captured_voice_contents[y] = content
|
||
logger.info(f"✅ [Async OCR] 异步获取到语音内容 (y={y}): {content}")
|
||
# 清空任务列表
|
||
analyze_chat_image._ocr_tasks = []
|
||
|
||
# 循环结束,返回最后一次分析的结果
|
||
if not final_messages: # 如果循环因为 max_loops 退出,确保有结果
|
||
final_messages = messages
|
||
|
||
# 注入 peek 到的内容
|
||
if captured_voice_contents:
|
||
logger.info(f"正在注入 {len(captured_voice_contents)} 条已还原的语音内容...")
|
||
for m in final_messages:
|
||
if m['type'] == 'voice' and (not m.get('content') or m.get('content').strip() == ""):
|
||
for py, content in captured_voice_contents.items():
|
||
# 注入时的容差也要放大,因为 final_messages 的 Y 可能和点击时的 vy 略有差异
|
||
if abs(m['y'] - py) < 100:
|
||
m['content'] = content
|
||
m['is_converted'] = True # 标记为逻辑上已转换
|
||
logger.info(f" -> 注入内容到 Y={m['y']} (原 py={py}): {content[:20]}...")
|
||
break
|
||
|
||
# 构造返回值
|
||
dialogue_log = []
|
||
# 使用 debug_img 的尺寸,如果 debug_img 未定义(极端情况),默认 1080x1920
|
||
if 'debug_img' in locals() and debug_img is not None:
|
||
# [User Requested] 几何兜底 Y 轴应为 0.88 (避开底部导航条)
|
||
input_field_coordinates = (debug_img.shape[1] // 2, int(debug_img.shape[0] * 0.88))
|
||
else:
|
||
# 尝试读取 current_image_path
|
||
try:
|
||
tmp_img = cv2.imread(current_image_path)
|
||
input_field_coordinates = (tmp_img.shape[1] // 2, int(tmp_img.shape[0] * 0.88))
|
||
except:
|
||
input_field_coordinates = (540, 1690) # 1920 * 0.88
|
||
|
||
# 找出最后一条消息
|
||
last_msg = None
|
||
if final_messages:
|
||
final_messages.sort(key=lambda x: x['y'])
|
||
last_msg = final_messages[-1]
|
||
|
||
# 转换为 dialogue_log 格式 (简单转换,具体业务逻辑在调用方处理)
|
||
# 注意:T2 需要的是上下文列表
|
||
pass # 实际上 T2 使用的是 LLM 上下文构建,这里不需要转换成特定 dict 结构,
|
||
# 但为了兼容旧接口,我们还是返回 messages 列表给调用者处理,
|
||
# 或者在这里处理成 (role, content) 列表?
|
||
# 原代码似乎没有做太多转换,而是直接返回 messages 列表?
|
||
# 仔细看原代码:analyze_chat_image 并没有返回 messages 列表!
|
||
# 它返回 dialogue_log, input_pos
|
||
# 原代码 lines 339-340: dialogue_log = []
|
||
# 可以在最后统一生成
|
||
|
||
# 统一生成 dialogue_log
|
||
for msg in final_messages:
|
||
# 尝试注入异步获取的语音内容
|
||
if msg['type'] == 'voice':
|
||
# 模糊匹配 Y 坐标 (增大容差到 100,应对界面滚动)
|
||
# 优先检查 content 是否为空或为 placeholder
|
||
if not msg.get('content') or msg.get('content').strip() == "":
|
||
for y_key, content in captured_voice_contents.items():
|
||
if abs(msg['y'] - y_key) < 100:
|
||
msg['is_converted'] = True
|
||
msg['content'] = content
|
||
logger.info(f"✅ [注入] 成功将异步语音内容 '{content}' 注入到 Y={msg['y']} 的消息中")
|
||
break
|
||
|
||
# 无论是否有内容,都加入 dialogue_log
|
||
if msg['type'] == 'text':
|
||
if msg.get('content'): # 文本消息没内容通常是识别错误,可以丢弃
|
||
dialogue_log.append(msg)
|
||
elif msg['type'] == 'voice':
|
||
# 语音消息即使没内容也保留,交给上层处理
|
||
dialogue_log.append(msg)
|
||
|
||
logger.info(f"📊 [统计] 语音总数: {total_voices_count}, 打开转文字次数: {convert_opened_count}, 关闭转文字次数: {convert_closed_count}")
|
||
return dialogue_log, input_field_coordinates
|
||
|
||
except Exception as e:
|
||
logger.error(f"分析过程发生异常: {e}", exc_info=True)
|
||
return [], (540, 1690)
|
||
|
||
|
||
def clean_screenshots_dir():
|
||
"""清理截图目录"""
|
||
if not os.path.exists(OUTPUT_DIR):
|
||
os.makedirs(OUTPUT_DIR)
|
||
return
|
||
|
||
for f in os.listdir(OUTPUT_DIR):
|
||
if f.lower().endswith(('.jpg', '.png', '.jpeg')):
|
||
try:
|
||
os.remove(os.path.join(OUTPUT_DIR, f))
|
||
except Exception as e:
|
||
logger.warning(f"Failed to delete {f}: {e}")
|
||
|
||
def is_in_chat_interface(d):
|
||
"""
|
||
检查是否在微信聊天界面
|
||
"""
|
||
try:
|
||
# 1. 底部语音/键盘切换按钮
|
||
if d(description="切换到语音").exists or d(description="切换到键盘").exists:
|
||
return True
|
||
# 2. 底部输入框
|
||
if d(className="android.widget.EditText").exists:
|
||
return True
|
||
# 3. 底部“按住说话”按钮
|
||
if d(text="按住说话").exists:
|
||
return True
|
||
# 4. 右上角更多按钮
|
||
if d(description="聊天信息").exists:
|
||
return True
|
||
except Exception as e:
|
||
logger.warning(f"is_in_chat_interface check failed: {e}")
|
||
|
||
return False
|
||
|
||
def find_input_box_center(image_path):
|
||
"""
|
||
寻找输入框中心坐标 (兜底策略)
|
||
优先使用几何特征 (底部 88% 处)
|
||
"""
|
||
try:
|
||
if not os.path.exists(image_path):
|
||
return (540, 2100), None
|
||
|
||
img = cv2.imread(image_path)
|
||
if img is None:
|
||
return (540, 2100), None
|
||
|
||
h, w = img.shape[:2]
|
||
|
||
# 策略:直接返回屏幕底部 88% 处的中心点
|
||
center_x = int(w * 0.5)
|
||
center_y = int(h * 0.88)
|
||
|
||
logger.info(f"find_input_box_center fallback: ({center_x}, {center_y})")
|
||
return (center_x, center_y), None
|
||
|
||
except Exception as e:
|
||
logger.error(f"find_input_box_center error: {e}")
|
||
return (540, 2100), None
|
||
|
||
def find_template_match(screen_path, template_path, threshold=0.8):
|
||
"""
|
||
使用 OpenCV 模板匹配寻找按钮中心坐标
|
||
"""
|
||
try:
|
||
if not os.path.exists(template_path):
|
||
logger.error(f"Template file not found: {template_path}")
|
||
return None
|
||
|
||
img = cv2.imread(screen_path)
|
||
template = cv2.imread(template_path)
|
||
if img is None or template is None:
|
||
return None
|
||
|
||
h, w = template.shape[:2]
|
||
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
|
||
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
|
||
|
||
if max_val >= threshold:
|
||
center_x = max_loc[0] + w // 2
|
||
center_y = max_loc[1] + h // 2
|
||
logger.info(f"Template matched! Score: {max_val:.2f}, Center: ({center_x}, {center_y})")
|
||
return (center_x, center_y)
|
||
|
||
logger.info(f"Template not matched. Max score: {max_val:.2f}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Template matching failed: {e}")
|
||
return None
|
||
|
||
def find_all_template_matches(screen_path, template_path, threshold=0.8):
|
||
"""
|
||
使用 OpenCV 模板匹配寻找**所有**符合条件的坐标
|
||
"""
|
||
try:
|
||
if not os.path.exists(template_path):
|
||
logger.error(f"Template file not found: {template_path}")
|
||
return []
|
||
|
||
img = cv2.imread(screen_path)
|
||
template = cv2.imread(template_path)
|
||
if img is None or template is None:
|
||
return []
|
||
|
||
h, w = template.shape[:2]
|
||
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
|
||
|
||
# 记录最大匹配度,方便调试阈值
|
||
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
|
||
logger.info(f"模板匹配 {os.path.basename(template_path)}: 最大相似度 = {max_val:.4f} (阈值={threshold})")
|
||
|
||
# 找到所有大于阈值的点
|
||
loc = np.where(res >= threshold)
|
||
|
||
points = []
|
||
for pt in zip(*loc[::-1]): # Switch collumns and rows
|
||
center_x = pt[0] + w // 2
|
||
center_y = pt[1] + h // 2
|
||
points.append((center_x, center_y))
|
||
|
||
# 简单的去重(非极大值抑制的简化版,合并相近的点)
|
||
# 这里假设红点不会重叠,暂时直接返回,或者做一个简单的聚类
|
||
# 实际应用中,matchTemplate 对同一个目标周围可能会有多个连续的匹配点
|
||
# 我们需要合并它们
|
||
|
||
unique_points = []
|
||
for p in points:
|
||
is_close = False
|
||
for up in unique_points:
|
||
if abs(p[0] - up[0]) < 10 and abs(p[1] - up[1]) < 10:
|
||
is_close = True
|
||
break
|
||
if not is_close:
|
||
unique_points.append(p)
|
||
|
||
if unique_points:
|
||
logger.info(f"Found {len(unique_points)} matches for {os.path.basename(template_path)}")
|
||
|
||
return unique_points
|
||
|
||
except Exception as e:
|
||
logger.error(f"find_all_template_matches failed: {e}")
|
||
return []
|
||
|
||
def perform_input_action(d, center_point, text, auto_send=True, debug_prefix=None):
|
||
"""
|
||
执行输入操作
|
||
:param debug_prefix: 如果提供,将在关键步骤保存截图,如 {debug_prefix}_before_mode.jpg
|
||
"""
|
||
try:
|
||
def save_debug_shot(name):
|
||
if debug_prefix:
|
||
shot_path = os.path.join(OUTPUT_DIR, f"{debug_prefix}_{name}.jpg")
|
||
d.screenshot(shot_path)
|
||
logger.info(f"保存中间过程截图: {shot_path}")
|
||
|
||
# --- 新增逻辑:确保处于文字输入模式 ---
|
||
logger.info("正在检查输入模式...")
|
||
save_debug_shot("1_check_mode")
|
||
|
||
# 优先使用 uiautomator2 的属性检测(比图像识别更稳)
|
||
# 1. 检查是否有 "切换到键盘" 按钮(说明当前是语音模式)
|
||
voice_mode_btn = d(description="切换到键盘")
|
||
if voice_mode_btn.exists:
|
||
logger.info("检测到语音模式 (UI树: '切换到键盘'),点击切换...")
|
||
voice_mode_btn.click()
|
||
time.sleep(1.0) # 等待 UI 切换
|
||
|
||
# 2. 检查是否有 "切换到语音" 按钮(说明当前是文字模式)
|
||
# 这一步不是必须的,但可以用来确认状态
|
||
# text_mode_btn = d(description="切换到语音")
|
||
# if text_mode_btn.exists:
|
||
# logger.info("当前已是文字模式 (UI树: '切换到语音')")
|
||
|
||
# 3. 如果 UI 树检测失败,尝试图像兜底
|
||
if not voice_mode_btn.exists:
|
||
tmp_check_shot = os.path.join(OUTPUT_DIR, "temp_input_check.jpg")
|
||
d.screenshot(tmp_check_shot)
|
||
|
||
wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg")
|
||
|
||
# 检查是否存在 '切换到文字' 图标
|
||
wen_zi_pos = find_template_match(tmp_check_shot, wen_zi_template, threshold=0.8)
|
||
|
||
if wen_zi_pos:
|
||
logger.info(f"检测到语音模式 (图像: 找到切换文字图标: {wen_zi_pos}),点击切换...")
|
||
d.click(wen_zi_pos[0], wen_zi_pos[1])
|
||
time.sleep(1.0)
|
||
|
||
# 清理临时文件
|
||
if os.path.exists(tmp_check_shot):
|
||
try:
|
||
os.remove(tmp_check_shot)
|
||
except:
|
||
pass
|
||
# --- 新增逻辑结束 ---
|
||
save_debug_shot("2_after_mode")
|
||
|
||
# 1. 尝试找到原生输入框并输入
|
||
# 增加多种查找方式
|
||
edit_text = d(className="android.widget.EditText")
|
||
if not edit_text.exists:
|
||
# 尝试通过 resourceId 查找 (微信常见ID)
|
||
edit_text = d(resourceId="com.tencent.mm:id/b4a")
|
||
|
||
# 1.2 [User Request] 尝试使用 input_text.jpg 模板寻找输入框
|
||
if not edit_text.exists:
|
||
input_template_path = os.path.join(TEMPLATE_DIR, "input_text.jpg")
|
||
if os.path.exists(input_template_path):
|
||
# 截图用于匹配
|
||
tmp_input_search = os.path.join(OUTPUT_DIR, "temp_input_search.jpg")
|
||
d.screenshot(tmp_input_search)
|
||
|
||
logger.info(f"正在尝试使用模板 {input_template_path} 寻找输入框...")
|
||
# [User Request] 降低阈值到 0.6
|
||
input_pos = find_template_match(tmp_input_search, input_template_path, threshold=0.6)
|
||
|
||
if input_pos:
|
||
logger.info(f"✅ [Template] 通过 input_text.jpg 找到输入框: {input_pos}")
|
||
save_debug_shot("3_input_box_found")
|
||
|
||
# 绘制调试图 (蓝框)
|
||
try:
|
||
debug_img = cv2.imread(tmp_input_search)
|
||
if debug_img is not None:
|
||
# 读取模板获取宽高
|
||
tmpl = cv2.imread(input_template_path)
|
||
if tmpl is not None:
|
||
th, tw = tmpl.shape[:2]
|
||
cx, cy = input_pos
|
||
top_left = (cx - tw//2, cy - th//2)
|
||
bottom_right = (cx + tw//2, cy + th//2)
|
||
|
||
# 蓝色框 BGR=(255, 0, 0)
|
||
cv2.rectangle(debug_img, top_left, bottom_right, (255, 0, 0), 3)
|
||
cv2.putText(debug_img, "MATCH: input_text.jpg", (top_left[0], top_left[1]-10),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
|
||
|
||
debug_save_path = os.path.join(OUTPUT_DIR, "debug_input_box_match.jpg")
|
||
cv2.imwrite(debug_save_path, debug_img)
|
||
logger.info(f"已保存输入框匹配调试图(蓝框): {debug_save_path}")
|
||
except Exception as e:
|
||
logger.warning(f"绘制输入框调试图失败: {e}")
|
||
|
||
# 更新点击坐标
|
||
center_point = input_pos
|
||
else:
|
||
logger.info(f"❌ [Template] input_text.jpg 未匹配到输入框")
|
||
|
||
|
||
# 1.5 如果找不到原生输入框,尝试通过“切换到语音”按钮定位 Y 轴
|
||
# 输入框通常与左侧的“切换到语音”按钮垂直居中对齐
|
||
if not edit_text.exists:
|
||
try:
|
||
# 确保在文字模式下,左侧会有“切换到语音”按钮
|
||
# 有时候可能是 "切换到键盘" (如果状态判断出错),都尝试一下作为锚点
|
||
anchor_btn = d(description="切换到语音")
|
||
if not anchor_btn.exists:
|
||
anchor_btn = d(description="切换到键盘")
|
||
|
||
if anchor_btn.exists:
|
||
# 获取按钮中心 Y 坐标
|
||
bounds = anchor_btn.info['bounds']
|
||
anchor_y = (bounds['top'] + bounds['bottom']) // 2
|
||
|
||
# 获取屏幕宽度
|
||
w, h = d.window_size()
|
||
|
||
# 更新中心点:X居中,Y与按钮对齐
|
||
center_point = (w // 2, anchor_y)
|
||
logger.info(f"通过'切换到语音'按钮修正输入框坐标: {center_point}")
|
||
except Exception as e:
|
||
logger.warning(f"尝试修正坐标失败: {e}")
|
||
|
||
input_success = False
|
||
|
||
if edit_text.exists:
|
||
logger.info("Found native EditText, using set_text")
|
||
try:
|
||
edit_text.click()
|
||
time.sleep(0.5)
|
||
edit_text.set_text(text)
|
||
input_success = True
|
||
except Exception as e:
|
||
logger.warning(f"Native input failed: {e}")
|
||
|
||
# 2. 如果原生输入失败,使用坐标点击 + 粘贴/输入
|
||
if not input_success:
|
||
cx, cy = center_point
|
||
logger.info(f"Using coordinate input: {center_point}")
|
||
d.click(cx, cy)
|
||
time.sleep(1.0)
|
||
|
||
try:
|
||
d.send_keys(text)
|
||
except Exception:
|
||
logger.warning("send_keys failed, trying set_clipboard")
|
||
d.set_clipboard(text)
|
||
d.click(cx, cy)
|
||
time.sleep(0.5)
|
||
# 尝试粘贴
|
||
d.press("paste")
|
||
|
||
save_debug_shot("4_after_input")
|
||
time.sleep(1.0)
|
||
|
||
# 3. 发送
|
||
if auto_send:
|
||
# 优先使用模板匹配寻找“发送”按钮
|
||
logger.info("尝试使用模板匹配寻找'发送'按钮...")
|
||
tmp_screen = os.path.join(os.path.dirname(os.path.abspath(__file__)), "temp_send_check.jpg")
|
||
d.screenshot(tmp_screen)
|
||
|
||
# 使用相对路径
|
||
template_path = os.path.join(TEMPLATE_DIR, "send.jpg")
|
||
send_btn_pos = find_template_match(tmp_screen, template_path, threshold=0.7) # 稍微降低阈值以提高召回
|
||
|
||
if send_btn_pos:
|
||
logger.info(f"通过模板匹配找到发送按钮: {send_btn_pos}, 点击...")
|
||
d.click(send_btn_pos[0], send_btn_pos[1])
|
||
else:
|
||
logger.warning("模板匹配未找到发送按钮,尝试原生控件查找...")
|
||
if d(text="发送").exists:
|
||
d(text="发送").click()
|
||
logger.info("Clicked '发送'")
|
||
else:
|
||
d.press("enter")
|
||
logger.info("Pressed Enter")
|
||
|
||
save_debug_shot("5_after_send")
|
||
# 清理临时文件
|
||
if os.path.exists(tmp_screen):
|
||
try:
|
||
os.remove(tmp_screen)
|
||
except:
|
||
pass
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"perform_input_action error: {e}")
|
||
return False
|
||
|
||
def match_template_center(image_path, template_path, threshold=0.8):
|
||
"""
|
||
使用 OpenCV 模板匹配寻找目标图片中心坐标
|
||
"""
|
||
try:
|
||
if not os.path.exists(image_path) or not os.path.exists(template_path):
|
||
logger.error(f"Image or template not found: {image_path}, {template_path}")
|
||
return None
|
||
|
||
img = cv2.imread(image_path)
|
||
template = cv2.imread(template_path)
|
||
|
||
if img is None or template is None:
|
||
logger.error("Failed to read image or template")
|
||
return None
|
||
|
||
# 转换为灰度图进行匹配
|
||
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
|
||
|
||
# 模板匹配
|
||
result = cv2.matchTemplate(img_gray, template_gray, cv2.TM_CCOEFF_NORMED)
|
||
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
|
||
|
||
if max_val >= threshold:
|
||
h, w = template_gray.shape
|
||
top_left = max_loc
|
||
center_x = int(top_left[0] + w / 2)
|
||
center_y = int(top_left[1] + h / 2)
|
||
logger.info(f"Template matched with confidence {max_val:.2f} at ({center_x}, {center_y})")
|
||
return (center_x, center_y)
|
||
else:
|
||
logger.warning(f"Template match failed. Max confidence: {max_val:.2f} < Threshold: {threshold}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"match_template_center error: {e}")
|
||
return None
|
||
|
||
|
||
|
||
|