Files
aiData/WeiXin/WxUtil.py
HuangHai 00375a80b2 'commit'
2026-01-31 17:29:57 +08:00

1656 lines
76 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import uiautomator2 as u2
import time
import asyncio
import logging
import sys
import os
import cv2
import numpy as np
import re
# 添加项目根目录到 sys.path 以便导入 Util
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
import json
from datetime import datetime, timedelta
from Util.EasyOcrKit import EasyOcrKit
# 初始化 EasyOcrKit
ocr_kit = EasyOcrKit()
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("WxUtil")
# 目录配置
BASE_DATA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(BASE_DATA_DIR, "Logs")
OUTPUT_DIR = os.path.join(BASE_DATA_DIR, "Output")
TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Templates")
# 全局调试图片计数器
_debug_counter = 0
# 全局调试模式开关
DEBUG_MODE = False
def set_debug_mode(enabled):
"""设置全局调试模式"""
global DEBUG_MODE
DEBUG_MODE = enabled
if enabled:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
def parse_wechat_time(time_str):
"""
解析微信时间字符串为标准化格式 (YYYY-MM-DD HH:MM)
支持: "10:03", "昨天 10:03", "星期三 10:03", "2025年1月1日 10:03"
"""
try:
clean_str = time_str.strip()
# 0. 预处理:过滤纯数字(防止电话号码被误识别为时间)
# 微信时间戳通常包含中文或冒号,单纯的数字串(如 "18686619970")不是有效时间
if re.match(r'^\d+$', clean_str):
logger.warning(f"忽略疑似电话号码/纯数字的时间字符串: '{clean_str}'")
return ""
now = datetime.now()
today = now.date()
# 1. HH:mm (当天)
# 注意:有时候 OCR 会把冒号识别成其他字符,这里假设是标准的 HH:mm
if re.match(r'^\d{1,2}:\d{2}$', clean_str):
h, m = map(int, clean_str.split(':'))
dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m))
return dt.strftime("%Y-%m-%d %H:%M")
# 2. 昨天 HH:mm
if "昨天" in clean_str:
t_part = clean_str.replace("昨天", "").strip()
if re.match(r'^\d{1,2}:\d{2}$', t_part):
h, m = map(int, t_part.split(':'))
yesterday = today - timedelta(days=1)
dt = datetime.combine(yesterday, datetime.min.time().replace(hour=h, minute=m))
return dt.strftime("%Y-%m-%d %H:%M")
# 3. 星期X / 周X HH:mm
weekdays_map = {
"星期一": 0, "星期二": 1, "星期三": 2, "星期四": 3, "星期五": 4, "星期六": 5, "星期日": 6,
"周一": 0, "周二": 1, "周三": 2, "周四": 3, "周五": 4, "周六": 5, "周日": 6
}
for w_str, w_idx in weekdays_map.items():
if w_str in clean_str:
# 提取时间部分 (支持 "周三 10:03" 或 "周三10:03")
t_part = clean_str.replace(w_str, "").strip()
time_match = re.search(r'(\d{1,2}):(\d{2})', t_part)
h, m = 0, 0
if time_match:
h, m = map(int, time_match.groups())
current_weekday = now.weekday()
# 计算日期回退天数 (mod 7 确保是过去的一周内)
delta_days = (current_weekday - w_idx) % 7
# 如果 delta_days 是 0且当前时间比解析出的时间早说明是上周的今天
# 微信通常只有在真的“过去”才会显示星期几
if delta_days == 0 and time_match:
if now.hour < h or (now.hour == h and now.minute < m):
delta_days = 7
elif delta_days == 0 and not time_match:
# 只有“周三”没有时间,通常指最近的一个周三(如果今天是周三,可能指上周三)
# 但为了简单,如果今天是周三且没时间,我们暂定为今天
pass
target_date = today - timedelta(days=delta_days)
if time_match:
dt = datetime.combine(target_date, datetime.min.time().replace(hour=h, minute=m))
return dt.strftime("%Y-%m-%d %H:%M")
else:
return target_date.strftime("%Y-%m-%d 00:00")
# 4. YYYY年MM月DD日 HH:mm
# 简单匹配年月日
match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', clean_str)
if match:
y, m, d = map(int, match.groups())
# 找时间部分
time_match = re.search(r'(\d{1,2}):(\d{2})', clean_str)
if time_match:
hh, mm = map(int, time_match.groups())
dt = datetime(y, m, d, hh, mm)
return dt.strftime("%Y-%m-%d %H:%M")
else:
# 只有日期,没有时间 (通常是日期分隔符)
# 这种情况下,可能需要给个默认时间?或者就返回日期
return f"{y:04d}-{m:02d}-{d:02d} 00:00"
# 5. MM月DD日 HH:mm (跨年但未显示年份?微信通常会显示年份如果跨年)
# 处理 "1月26日 10:00"
match = re.search(r'(\d{1,2})月(\d{1,2})日', clean_str)
if match:
m, d = map(int, match.groups())
# 默认当年
y = today.year
# 找时间
time_match = re.search(r'(\d{1,2}):(\d{2})', clean_str)
if time_match:
hh, mm = map(int, time_match.groups())
dt = datetime(y, m, d, hh, mm)
# 如果计算出的时间在未来,可能是去年 (比如现在1月消息是12月)
if dt > now:
dt = datetime(y - 1, m, d, hh, mm)
return dt.strftime("%Y-%m-%d %H:%M")
# 兜底:如果是 "下午 5:00" 这种格式
if "下午" in clean_str or "晚上" in clean_str:
t_part = re.sub(r'下午|晚上', '', clean_str).strip()
if re.match(r'^\d{1,2}:\d{2}$', t_part):
h, m = map(int, t_part.split(':'))
if h < 12: h += 12
dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m))
return dt.strftime("%Y-%m-%d %H:%M")
if "上午" in clean_str:
t_part = re.sub(r'上午', '', clean_str).strip()
if re.match(r'^\d{1,2}:\d{2}$', t_part):
h, m = map(int, t_part.split(':'))
dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m))
return dt.strftime("%Y-%m-%d %H:%M")
# 解析失败,返回空字符串,避免将无关文本(如电话号码)误认为时间注入到上下文中
return ""
except Exception as e:
logger.warning(f"时间解析失败 '{time_str}': {e}")
return ""
def get_next_debug_path(desc="step"):
"""获取下一个顺序命名的调试图片路径 (debug_N_desc.jpg)"""
if not DEBUG_MODE:
return None
global _debug_counter
_debug_counter += 1
filename = f"debug_{_debug_counter}_{desc}.jpg"
return os.path.join(OUTPUT_DIR, filename)
def clear_directory(dir_path, exclude_files=None):
"""清理指定目录下的所有文件,支持排除特定文件"""
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return
if exclude_files is None:
exclude_files = []
import shutil
for filename in os.listdir(dir_path):
if filename in exclude_files:
continue
file_path = os.path.join(dir_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except PermissionError:
# 忽略正在被使用的文件(如当前的日志文件)
continue
except Exception as e:
logger.warning(f"Failed to delete {file_path}. Reason: {e}")
def setup_script_environment():
"""运行前清理日志和输出目录"""
logger.info("清理运行环境: Logs 和 Output 目录...")
# 重置调试计数器
global _debug_counter
_debug_counter = 0
# 清理所有日志和图片,确保新一轮运行有干净的环境
clear_directory(LOG_DIR)
clear_directory(OUTPUT_DIR)
def connect_device():
"""
连接设备并返回设备对象,同时打印详细的设备信息
"""
try:
d = u2.connect()
# 强制检查连接是否可用
if not d.info:
logger.error("设备连接不可用 (d.info is empty)")
return None
# 获取可靠的序列号
device_serial = d.serial if hasattr(d, 'serial') else "未知"
logger.info(f"设备连接成功: {device_serial}")
# 获取并打印详细设备信息
device_info = d.device_info
logger.info(f"详细设备信息: 品牌={device_info.get('brand')}, 型号={device_info.get('model')}, SDK={device_info.get('sdk')}")
return d
except Exception as e:
logger.error(f"设备连接失败: {e}")
return None
def safe_device_click(d, x, y):
"""
安全的点击操作,包含简单的异常捕获和重试逻辑
"""
try:
# 强制转换为原生 int防止 numpy.int64 导致的 JSON 序列化错误
ix, iy = int(x), int(y)
d.click(ix, iy)
return True
except Exception as e:
logger.warning(f"点击操作失败 ({x}, {y}): {e},尝试重新连接并重试...")
try:
# 尝试重新初始化连接
new_d = u2.connect()
ix, iy = int(x), int(y)
new_d.click(ix, iy)
return True
except Exception as e2:
logger.error(f"重试点击操作依然失败: {e2}")
return False
def draw_debug_info(image_path, messages, current_voice_center=None, suffix=""):
"""
辅助函数:在截图中绘制当前已知的消息状态
:param image_path: 图片路径
:param messages: 消息列表
:param current_voice_center: 当前正在处理的语音中心坐标 (vx, vy)
:param suffix: 保存文件名的后缀
"""
try:
img = cv2.imread(image_path)
if img is None: return
for msg in messages:
if msg['type'] == 'voice':
ax, ay = msg['center']
is_unread = msg.get('is_unread', False)
is_converted = msg.get('is_converted', False)
# 绘制框
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
# 绘制 YES/NO
label = "YES" if is_converted else "NO"
cv2.putText(img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
# 如果是当前正在处理的语音,画一个额外的黄圈
if current_voice_center and abs(ax - current_voice_center[0]) < 10 and abs(ay - current_voice_center[1]) < 10:
cv2.circle(img, (ax, ay), 40, (0, 255, 255), 3)
cv2.putText(img, "PROCESSING", (ax - 60, ay - 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 保存覆盖后的图片
cv2.imwrite(image_path, img)
logger.info(f"已更新调试标记到截图: {image_path}")
except Exception as e:
logger.warning(f"绘制调试信息失败: {e}")
def _detect_bubble_color(img, bbox):
"""
检测文本框区域的背景颜色,用于辅助判断发送者。
:param img: OpenCV 图像 (BGR)
:param bbox: OCR 返回的边界框 4个点
:return: "green" (我), "white" (对方), or "unknown"
"""
if img is None: return "unknown"
# 提取 bbox 区域
h, w = img.shape[:2]
min_x = max(0, int(min(p[0] for p in bbox)))
max_x = min(w, int(max(p[0] for p in bbox)))
min_y = max(0, int(min(p[1] for p in bbox)))
max_y = min(h, int(max(p[1] for p in bbox)))
if max_x <= min_x or max_y <= min_y:
return "unknown"
roi = img[min_y:max_y, min_x:max_x]
# 计算背景颜色 (抗文字干扰)
# 文本是黑色的 (0,0,0),会拉低平均值/中位数
# 使用 95% 分位数来获取背景色 (偏亮的部分 - 真正的背景)
try:
# axis=(0,1) 对 h,w 维度操作,保留 c 维度
# percentile 返回 float需转 int
bg_color = np.percentile(roi, 95, axis=(0, 1))
b, g, r = bg_color
except Exception:
# Fallback
mean_color = cv2.mean(roi)[:3]
b, g, r = mean_color
# 调试日志:打印颜色值
if DEBUG_MODE:
logger.info(f"Color Debug: B={b:.1f}, G={g:.1f}, R={r:.1f} | bbox={bbox}")
# 1. 绿色气泡特征 (我)
# Light Mode: BGR (101, 225, 152) -> G 显著大于 R 和 B
# Dark Mode: BGR (30, 80, 40) -> G 依然显著大于 R 和 B
if g > r + 15 and g > b + 15: # 降低差值阈值,适应暗色模式
if g > 50: # 只要不是太暗
return "green"
# 2. 白色/浅灰/深灰气泡特征 (对方)
# Light Mode: BGR (255, 255, 255)
# Dark Mode: BGR (45, 45, 45)
if abs(r - g) < 20 and abs(g - b) < 20 and abs(r - b) < 20:
# 白色 (Light Mode)
if g > 180:
return "white"
# 深灰 (Dark Mode)
if 40 < g < 100:
return "white" # 统一归类为"对方"气泡颜色
# 3. 特殊补丁:系统消息 (灰色/极暗)
if abs(r - g) < 15 and abs(g - b) < 15:
# 系统背景灰 (Light Mode: 130 左右)
if 110 < g < 160:
return "system_gray"
# 极暗背景 (Dark Mode: 30 左右)
if g < 40:
return "system_gray"
return "unknown"
def _scan_chat_messages(image_path):
"""
内部函数:扫描图片中的微信消息(语音、文本、红点)
返回: (messages_list, debug_image)
"""
img = cv2.imread(image_path)
if img is None:
logger.error(f"无法读取图片: {image_path}")
return [], None
h, w = img.shape[:2]
logger.info(f"DEBUG: Image size w={w}, h={h}")
# 3. 模板匹配寻找语音图标和红点
audio_template = os.path.join(TEMPLATE_DIR, "audio.jpg")
red_point_template = os.path.join(TEMPLATE_DIR, "red_point.jpg")
audio_matches = find_all_template_matches(image_path, audio_template, threshold=0.8)
red_points = find_all_template_matches(image_path, red_point_template, threshold=0.8)
# 4. OCR 识别所有文本
logger.info("正在执行 OCR 识别...")
ocr_results = ocr_kit.read_text(image_path)
# 4.5 尝试提取聊天标题 (对方昵称)
chat_title = "对方"
potential_titles = []
for bbox, text, conf in ocr_results:
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
# 标题区域通常在顶部 (状态栏下方,消息列表上方)
if 60 < c_y < 140:
clean = text.strip()
# 排除时间、信号、返回按钮等
if re.match(r'^\d{1,2}:\d{2}$', clean): continue
if "微信" in clean or "WeChat" in clean: continue
if clean in ["<", "返回", "消息", "(", ")"]: continue
if re.match(r'^\d+$', clean): continue # 排除纯数字(如未读数)
if len(clean) > 0:
potential_titles.append((c_x, clean))
if potential_titles:
# 优先取最接近水平中心的文本作为标题
potential_titles.sort(key=lambda x: abs(x[0] - w/2))
chat_title = potential_titles[0][1]
# 去除可能包含的括号(比如备注名后的群聊人数)
chat_title = re.sub(r'\(\d+\)$', '', chat_title).strip()
# [Fix] 过滤掉包含 ".." 的动态标题(如“对方正在输入..”),避免哈希失效
if ".." in chat_title:
logger.info(f"忽略动态标题: {chat_title}")
chat_title = "对方"
else:
logger.info(f"识别到聊天标题/对方昵称: {chat_title}")
# 微信菜单关键字(用于排除干扰)
MENU_KEYWORDS = ["听筒播放", "收藏", "背景播放", "删除", "多选", "取消转文字", "转文字", "引用", "提醒"]
# 忽略的系统消息内容
IGNORE_CONTENT = ["撤回了一条消息", "打招呼的消息", "拍了拍", "你撤回了一条消息", "引用", "Clear Text", "Switch IME", "Done", "按住说话", "发送"]
# 5. 整合所有消息
messages = []
debug_img = img.copy() # 初始化调试图
# 绘制过滤区域边界 (可视化)
cv2.line(debug_img, (0, 150), (w, 150), (255, 0, 255), 2) # 顶部线
cv2.line(debug_img, (0, h - 60), (w, h - 60), (255, 0, 255), 2) # 底部线 (放宽到底部 60px)
cv2.putText(debug_img, "TOP_FILTER", (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
cv2.putText(debug_img, "BOTTOM_FILTER", (10, h - 70), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
claimed_ocr_indices = set()
# A. 添加语音消息
for ax, ay in audio_matches:
# 标记所有找到的语音图标 (用于调试)
cv2.circle(debug_img, (ax, ay), 10, (255, 255, 0), -1)
# 过滤掉顶部和底部的非聊天区域
if ay < 150 or ay > h - 60:
logger.info(f"忽略区域外语音图标: ({ax}, {ay})")
cv2.rectangle(debug_img, (ax-35, ay-35), (ax+35, ay+35), (128, 128, 128), 1)
cv2.putText(debug_img, "FILTERED", (ax - 40, ay - 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1)
continue
sender = "对方" if ax < w / 2 else ""
is_unread = False
for rx, ry in red_points:
# 红点通常在语音图标右侧且 Y 轴相近
if abs(ry - ay) < 50 and rx > ax:
is_unread = True
break
# 改进:判断是否已转文字
is_converted = False
converted_trigger_text = ""
associated_texts = [] # 存储关联的多行文本 [(y, x, text)]
for i, (bbox, text, conf) in enumerate(ocr_results):
if i in claimed_ocr_indices: continue
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 判定逻辑:文本在语音下方且水平偏移不大
# 1. Y轴限制: -50 < dy < 800 (适配多行文本)
# 2. X轴限制: abs(dx) < 500 (减少误判,防止关联到屏幕另一侧的消息)
# 3. 几何位置强校验 (核心修复)
voice_is_left = ax < w / 2
# 获取文本框的左右边界
min_x = min(p[0] for p in bbox)
max_x = max(p[0] for p in bbox)
if voice_is_left:
# 语音在左 (对方): 文本必须也是左对齐
# - min_x 必须靠左 (< 450) [Fix] 放宽阈值,防止长文本或缩进文本被过滤
# - max_x 不能太靠右 (> w - 150),否则可能是"我"的消息
if min_x > 450 or max_x > w - 150:
logger.debug(f"忽略文本 '{text[:10]}' (Left Voice): min_x={min_x}, max_x={max_x} 不满足左对齐条件")
continue
else:
# 语音在右 (我): 文本必须也是右对齐
# - max_x 必须靠右 (> w - 300)
# - min_x 不能太靠左 (< 100)
if max_x < w - 300 or min_x < 100:
logger.debug(f"忽略文本 '{text[:10]}' (Right Voice): min_x={min_x}, max_x={max_x} 不满足右对齐条件")
continue
# [Fix] 放宽 X 轴判定范围 (500 -> 600) 以适应更宽的文本
if -50 < c_y - ay < 800 and abs(c_x - ax) < 600:
# 检查中间是否有其他语音图标
has_intermediate_audio = False
for other_ax, other_ay in audio_matches:
# 只有当中间的语音图标在 [150, h-60] 的有效聊天区域内时,才视为阻断
if ay + 20 < other_ay < c_y - 10:
if 150 <= other_ay <= h - 60:
has_intermediate_audio = True
logger.info(f"语音({ax},{ay}) 被中间有效区域内的语音图标({other_ax},{other_ay}) 阻断,无法关联文本 '{text[:10]}...'")
break
else:
logger.info(f"语音({ax},{ay}) 忽略非聊天区域(Y={other_ay})的语音图标阻断")
if has_intermediate_audio:
continue
# [Fix] 检查中间是否有其他气泡消息阻断 (防止跨消息合并)
# 如果遇到一个明确属于另一方的消息气泡,必须停止关联
if c_y > ay + 60: # 稍微放宽 Y 轴,避免误判紧贴的转换文本
bubble_color = _detect_bubble_color(img, bbox)
if voice_is_left: # 语音在左 (对方)
# 如果遇到绿色气泡 (我),或者是明显的右对齐文本,视为阻断
if bubble_color == "green":
logger.info(f"语音({ax},{ay}) 被中间''的消息(绿色气泡)阻断: '{text[:10]}...'")
break
if c_x > w * 0.65: # 右侧明显区域 (short message check)
logger.info(f"语音({ax},{ay}) 被中间''的消息(右对齐)阻断: '{text[:10]}...'")
break
else: # 语音在右 (我)
# 如果遇到白色气泡 (对方),或者是明显的左对齐文本,视为阻断
if bubble_color == "white":
logger.info(f"语音({ax},{ay}) 被中间'对方'的消息(白色气泡)阻断: '{text[:10]}...'")
break
if c_x < w * 0.35: # 左侧明显区域
logger.info(f"语音({ax},{ay}) 被中间'对方'的消息(左对齐)阻断: '{text[:10]}...'")
break
clean_text = text.strip()
# 判定是否为时间戳
is_timestamp = re.search(r'(\d{1,2}:\d{2})', clean_text) and (len(clean_text) < 15)
# 判定是否为纯数字或时长
is_duration = re.search(r'\d{1,2}"?$', clean_text) and len(clean_text) < 6
# 判定是否为系统消息
is_ignored = any(k in clean_text for k in IGNORE_CONTENT)
# 噪音判定 (例如 "少3"")
is_noise = "" in clean_text and len(clean_text) < 8 and re.search(r'\d', clean_text)
if not is_duration and not is_timestamp and clean_text not in MENU_KEYWORDS and not is_ignored and not is_noise:
is_converted = True
associated_texts.append((c_y, c_x, clean_text))
claimed_ocr_indices.add(i)
# 不再 break继续寻找后续文本行
else:
# 这些文本虽然不作为内容,但它们属于语音消息的附属信息,标记为已处理
claimed_ocr_indices.add(i)
if is_timestamp:
logger.info(f"语音({ax},{ay}) 忽略下方时间戳文本: '{clean_text}'")
elif is_duration:
logger.info(f"语音({ax},{ay}) 忽略时长文本: '{clean_text}'")
elif is_noise:
logger.info(f"语音({ax},{ay}) 忽略噪音文本: '{clean_text}'")
elif is_ignored:
logger.info(f"语音({ax},{ay}) 忽略系统消息文本: '{clean_text}'")
else:
logger.info(f"语音({ax},{ay}) 忽略其他文本(可能是菜单): '{clean_text}'")
# 整合所有关联文本
if associated_texts:
# 按 Y 轴排序,如果 Y 接近则按 X 轴排序
associated_texts.sort(key=lambda x: (x[0], x[1]))
converted_trigger_text = "".join([t[2] for t in associated_texts])
# 去除已知噪音
noise_patterns = ["42IIhK+-语音输入粘贴#", "语音输入粘贴"]
for np in noise_patterns:
converted_trigger_text = converted_trigger_text.replace(np, "")
converted_trigger_text = converted_trigger_text.strip()
logger.info(f"语音({ax},{ay}) 判定为已转换,最终合并文本: '{converted_trigger_text}'")
if is_converted:
logger.info(f"语音消息 ({ax}, {ay}) 已有转换文字: '{converted_trigger_text}',跳过")
# 绘图反馈
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
label = "YES" if is_converted else "NO"
cv2.putText(debug_img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
messages.append({
"type": "voice",
"sender": sender,
"center": (ax, ay),
"y": ay,
"is_unread": is_unread,
"is_converted": is_converted,
"content": converted_trigger_text if is_converted else None
})
# B. 添加文本消息
for i, (bbox, text, conf) in enumerate(ocr_results):
if i in claimed_ocr_indices: continue
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
if c_y < 150 or c_y > h - 60:
continue
# 判定发送者 (增强版: 几何 + 颜色)
# 1. 尝试通过背景颜色判定 (最准确)
sender_color = _detect_bubble_color(img, bbox)
sender = "unknown"
if sender_color == "green":
sender = ""
elif sender_color == "white":
sender = "对方"
elif sender_color == "system_gray":
# 灰底文字通常是时间戳或系统消息,由系统发送,几何上居中
sender = "system"
# 2. 几何特征强制修正 (Double Check)
# 假设头像+边距约占 15% 宽度
edge_margin = w * 0.15
min_x = min(p[0] for p in bbox)
max_x = max(p[0] for p in bbox)
# 规则 A: 如果这一行极其靠右 (超过 85% 宽度),那肯定是"我"
# 即使颜色判成了白色 (比如光照问题),也得纠正回来
if max_x > w - edge_margin:
if sender == "对方":
logger.warning(f"Sender detected as '对方' by color but geometry says '' (max_x={max_x} > {w-edge_margin}). Correcting to ''.")
sender = ""
# 规则 B: 如果这一行极其靠左 (小于 35% 宽度),且不靠右,那肯定是"对方"
# 扩大判定范围,防止因为 OCR 稍微缩进导致判定失效
# 注意:如果颜色明确为"我"(绿色),则跳过此规则,因为"我"的长消息也可能靠左
elif min_x < w * 0.35 and max_x < w * 0.75: # 修正max_x 阈值从 0.85 降低到 0.75
if sender == "":
logger.info(f"Geometry says '对方' (min_x={min_x} < {w*0.35}) but Color is '' (Green). Trusting Color.")
elif sender == "system":
# 即使颜色是系统灰,但如果位置极其靠左,也可能是对方的某种特殊气泡
pass
else:
sender = "对方"
# 规则 C: 如果颜色是 unknown且不在极端位置使用中心点兜底
if sender == "unknown":
c_x = int((min_x + max_x) / 2)
# 简单中心判断
if c_x < w / 2: sender = "对方"
else: sender = ""
# 规则 D: 强几何中心校验 (Final Geometry Verdict)
# 仅对短消息使用强几何校验 (宽度 < 70% 屏幕宽度)
# 长消息通常铺满屏幕,中心点在中间,容易受字体渲染影响导致误判,应信任颜色检测结果
box_width = max_x - min_x
if box_width < w * 0.7:
# 如果中心点明显在左半屏 ( < 45% ),判定为"对方"
if c_x < w * 0.45:
# [Fix] 如果颜色明确是绿色,说明是"我"的左对齐文本(长文换行),不应被几何规则强制改为"对方"
if sender == "" and sender_color == "green":
logger.info(f"Geometry says '对方' (center={c_x} < {w*0.45}) but Color is 'green'. Keeping ''.")
elif sender == "system":
# 系统消息允许居中或偏左
pass
else:
if sender == "":
logger.warning(f"Sender detected as '' by color but center is left ({c_x} < {w*0.45}). Correcting to '对方'.")
sender = "对方"
# 如果中心点明显在右半屏 ( > 55% ),判定为"我"
elif c_x > w * 0.55:
if sender == "对方":
logger.warning(f"Sender detected as '对方' by color but center is right ({c_x} > {w*0.55}). Correcting to ''.")
elif sender == "system":
pass
else:
sender = ""
else:
logger.info(f"Message in middle zone ({w*0.45} < {c_x} < {w*0.55}), trusting color detection: {sender}")
else:
logger.info(f"Wide message (width={box_width} > {w*0.7}), skipping geometry check, trusting color: {sender}")
time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)'
# 优先判断是否为独立的时间戳 (行短且符合时间格式)
if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)):
# 进一步校验是否真的是时间 (通过 parse_wechat_time 尝试解析,或者简单正则)
# 这里我们假设短行的符合 time_pattern 的都是时间标记
logger.info(f"识别到时间戳/日期: '{text}'")
messages.append({
"type": "timestamp",
"content": text.strip(),
"y": c_y,
"center": (c_x, c_y)
})
continue
clean_text = text.strip()
if re.match(r'^.?[0-9]{1,2}"?$', clean_text):
logger.info(f"忽略疑似时长文本: '{clean_text}'")
continue
# 噪音判定 (例如 "少3"")
if "" in clean_text and len(clean_text) < 8 and re.search(r'\d', clean_text):
logger.info(f"忽略噪音文本: '{clean_text}'")
continue
if clean_text in MENU_KEYWORDS:
logger.info(f"忽略菜单关键词: '{clean_text}'")
continue
if any(k in clean_text for k in IGNORE_CONTENT):
logger.info(f"忽略系统消息内容: '{clean_text}'")
continue
messages.append({
"type": "text",
"sender": sender,
"content": text.strip(),
"center": (c_x, c_y),
"y": c_y
})
# 6. 排序
messages.sort(key=lambda x: x['y'])
# 7. 注入时间戳
current_time_str = None
# 过滤掉 timestamp 类型的消息,将其作为属性注入到后续消息中
final_messages_with_time = []
for msg in messages:
if msg['type'] == 'timestamp':
# 更新当前时间上下文
parsed_time = parse_wechat_time(msg['content'])
current_time_str = parsed_time
logger.info(f"更新时间上下文: {msg['content']} -> {parsed_time}")
else:
# 只有语音和文本消息需要注入时间
if current_time_str:
msg['time_display'] = current_time_str
else:
# 如果上方没有时间戳,尝试默认使用当天日期 (或者保持 None)
# 对于首屏最上面的消息,可能没有时间戳
pass
final_messages_with_time.append(msg)
return final_messages_with_time, debug_img, chat_title
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方", process_strategy="ALL", restore_processed_voice=True):
"""
全面采用 CV + OCR 识别微信聊天截图中的最后一条消息
:param process_strategy: 语音处理策略 (ALL/UNREAD/LAST)
:param restore_processed_voice: 是否在转文字后还原(隐藏文字)。默认为 True。
设为 False 可防止最后一条消息在无内容时陷入"转文字->还原->空内容"的死循环。
注意:此函数现在包含一个循环,如果发现需要转文字的语音,会逐个处理并重新截图。
"""
try:
d = device if device else connect_device()
if not d:
return [], None
current_image_path = image_path
current_output_path = output_path
final_messages = []
loop_count = 0
MAX_LOOPS = 10 # 增加循环次数上限,适应 ALL 策略
# 统计计数器
total_voices_count = 0
convert_opened_count = 0
convert_closed_count = 0
# 记录本次会话已处理过的语音 Y 坐标集合
processed_y_coords = set()
# 记录 Peek-and-Restore 过程中抓取到的语音内容 {y_coord: content}
captured_voice_contents = {}
# 初始化异步任务列表
analyze_chat_image._ocr_tasks = []
while loop_count < MAX_LOOPS:
loop_count += 1
logger.info(f"--- 分析循环 第 {loop_count} 次 ---")
# 1. 扫描当前屏幕
messages, debug_img, chat_title = _scan_chat_messages(current_image_path)
if messages is None: # 读取失败
return [], None
# 更新消息发送者名称 (将 "对方" 替换为 实际标题)
if chat_title and chat_title != "对方":
for m in messages:
if m['sender'] == "对方":
m['sender'] = chat_title
# 保存当前状态的调试图
if current_output_path and DEBUG_MODE:
cv2.imwrite(current_output_path, debug_img)
logger.info(f"调试图已保存: {current_output_path}")
# 2. 筛选需要处理的语音
all_voices = [m for m in messages if m['type'] == 'voice']
all_voices.sort(key=lambda x: x['y']) # 从上到下
# 更新统计 (取当前扫描到的数量)
total_voices_count = len(all_voices)
# Helper: 检查是否已处理
def is_processed(y_coord):
for py in processed_y_coords:
if abs(y_coord - py) < 20: # 20px 容差
return True
return False
target_voices = []
if process_strategy == "ALL":
# ALL 策略:处理所有未被记录处理过的、且未转换的语音
target_voices = [m for m in all_voices if not m.get('is_converted') and not is_processed(m['y'])]
logger.info(f"策略(ALL): 发现 {len(target_voices)} 条未转换待处理语音")
elif process_strategy == "UNREAD":
# UNREAD 策略:只处理未读且未转换且未处理过的
target_voices = [m for m in all_voices if m.get('is_unread') and not m.get('is_converted') and not is_processed(m['y'])]
logger.info(f"策略(UNREAD): 发现 {len(target_voices)} 条未读待处理语音")
elif process_strategy == "LAST":
# LAST 策略:只处理最后一条未转换的
unconverted = [m for m in all_voices if not m.get('is_converted')]
if unconverted:
last_voice = unconverted[-1]
if not is_processed(last_voice['y']):
target_voices = [last_voice]
logger.info(f"策略(LAST): 仅关注最后一条未转换语音")
# 如果没有需要处理的语音,或者我们已经达到了策略要求,退出循环
if not target_voices:
logger.info("当前屏幕无待处理语音,分析结束")
final_messages = messages
break
# 3. 处理第一条目标语音
# 注意:只处理第一条,因为处理后界面会变动(展开文字),坐标会失效
target = target_voices[0]
vx, vy = int(target['center'][0]), int(target['center'][1])
# 标记为已处理
processed_y_coords.add(target['y'])
logger.info(f"准备处理语音 ({vx}, {vy})...")
# 高亮正在处理的语音并保存更新后的调试图
if DEBUG_MODE:
draw_debug_info(current_output_path, messages, current_voice_center=(vx, vy))
# 执行操作:长按 -> 转文字
logger.info(f"正在长按语音消息 ({vx}, {vy})...")
d.long_click(int(vx), int(vy), 1.0) # 确保坐标为原生 int
# 轮询寻找“转文字”按钮
logger.info("正在快速寻找'转文字'按钮...")
zhuan_template = os.path.join(TEMPLATE_DIR, "zhun_wen_zi.jpg")
btn_pos = None
poll_start = time.time()
while time.time() - poll_start < 3.0: # 最多等 3 秒
menu_shot = get_next_debug_path("step_long_press_poll")
if menu_shot:
d.screenshot(menu_shot)
btn_pos = find_template_match(menu_shot, zhuan_template, threshold=0.7)
else:
# 调试模式关闭时,直接在内存中匹配 (使用 format='opencv' 提高效率)
btn_pos = find_template_match(d.screenshot(format='opencv'), zhuan_template, threshold=0.7)
if btn_pos:
break
time.sleep(0.2) # 快速轮询
if btn_pos:
btn_x, btn_y = int(btn_pos[0]), int(btn_pos[1])
logger.info(f"✅ 找到'转文字'按钮: ({btn_x}, {btn_y}),点击中...")
safe_device_click(d, btn_x, btn_y)
convert_opened_count += 1
logger.info("等待语音转文字完成...")
time.sleep(3.0) # 缩短等待时间 (原5.0s)
# --- Peek-and-Restore 逻辑 (异步优化版) ---
# 1. 截图 (但不立即 OCR而是丢给异步任务)
peek_shot = get_next_debug_path("step_peek_content")
if not peek_shot:
# 如果不是调试模式,我们需要一个临时路径供 OCR 任务使用
peek_shot = os.path.join(OUTPUT_DIR, f"temp_peek_{int(time.time())}.jpg")
d.screenshot(peek_shot)
logger.info(f"已获取截图启动异步OCR任务以提取内容...")
async def _async_ocr_task(img_path, target_y):
"""内部异步任务:在线程池中运行 OCR"""
try:
loop = asyncio.get_running_loop()
# 在默认执行器(线程池)中运行耗时的 _scan_chat_messages
logger.info(f"🚀 [Async OCR] 开始分析截图 {os.path.basename(img_path)} (目标 Y={target_y})")
msgs, _, _ = await loop.run_in_executor(None, _scan_chat_messages, img_path)
found = None
# 收集所有可能是该语音消息转换出的文本
all_found_texts = []
for pm in msgs:
if pm['type'] == 'voice' and pm.get('is_converted'):
# 容差稍微放大,因为转文字展开后 Y 坐标会变
if abs(pm['y'] - target_y) < 150: # 进一步放宽容差
content = pm.get('content', '').strip()
if content:
all_found_texts.append((pm['y'], content))
if all_found_texts:
# 按 Y 轴排序,确保多行文本顺序正确
all_found_texts.sort(key=lambda x: x[0])
found = " ".join([t[1] for t in all_found_texts])
logger.info(f"✨ [Async OCR] 在 Y={target_y} 附近找到转换文字: {found}")
if not found:
logger.warning(f"⚠️ [Async OCR] 未能在 Y={target_y} 附近找到已转换文字")
return target_y, found
except Exception as e:
logger.error(f"❌ [Async OCR] 任务执行失败: {e}")
return target_y, None
# 创建并保存任务
task = asyncio.create_task(_async_ocr_task(peek_shot, vy))
# 我们需要一个列表来保存任务,这里临时利用 list
if not hasattr(analyze_chat_image, "_ocr_tasks"):
analyze_chat_image._ocr_tasks = []
analyze_chat_image._ocr_tasks.append(task)
# 2. 还原状态 (取消转文字)
# 注意:由于 OCR 还没出结果,我们无法精确定位展开后的文字位置
# 但通常点击原语音气泡位置 (vx, vy) 也能触发菜单
if restore_processed_voice:
logger.info("准备还原状态 (取消转文字)...")
d.long_click(int(vx), int(vy), 1.0) # 确保坐标为原生 int
logger.info("正在快速寻找'隐藏文字'按钮...")
cancel_template = os.path.join(TEMPLATE_DIR, "cancel_zhuan_wen_zi.jpg")
cancel_btn = None
poll_start = time.time()
while time.time() - poll_start < 3.0:
restore_menu_shot = get_next_debug_path("step_restore_poll")
if restore_menu_shot:
d.screenshot(restore_menu_shot)
cancel_btn = find_template_match(restore_menu_shot, cancel_template, threshold=0.7)
else:
# 调试模式关闭时,直接在内存中匹配 (使用 format='opencv' 提高效率)
cancel_btn = find_template_match(d.screenshot(format='opencv'), cancel_template, threshold=0.7)
if cancel_btn:
break
time.sleep(0.2)
if cancel_btn:
cx, cy = int(cancel_btn[0]), int(cancel_btn[1])
logger.info(f"✅ 找到'隐藏文字'按钮: ({cx}, {cy}),点击还原...")
safe_device_click(d, cx, cy)
convert_closed_count += 1
time.sleep(2.0) # 等待收起动画
else:
logger.warning("❌ 未找到'隐藏文字'按钮,无法还原状态!(后续可能导致重复处理)")
# 3. 准备下一次循环
if len(target_voices) == 1:
logger.info("✅ 当前屏幕所有目标语音已处理完毕,无需再次全屏扫描。")
final_messages = messages # 使用本轮初始扫描的消息列表
break
# 重新截图,因为界面可能微调,或者只是恢复了
next_screenshot = get_next_debug_path("step_restored")
d.screenshot(next_screenshot)
current_image_path = next_screenshot
current_output_path = get_next_debug_path("flag_restored")
continue
else:
logger.info("⏩ [配置] 跳过还原状态步骤 (保持文字展开)。")
# 即使不还原,我们也不建议继续处理下一条,因为界面已经大幅变动(展开了文字)。
# 除非我们重新截图并重新定位。
# 但在这里,如果 restore_processed_voice=False通常意味着我们只关心最后一条LAST策略或者我们接受界面变动。
# 为了安全起见,如果不还原,我们最好终止循环(假设只处理这一条,或者下一轮主循环再处理其他的)
# 否则后续的 target_voices 坐标全都不准了。
logger.info("🛑 因不还原状态,终止本轮多语音处理循环,等待下一次主监控循环。")
final_messages = messages # 这里的 messages 其实是展开前的,但没关系,我们的内容通过 captured_voice_contents 注入
break
else:
logger.warning("❌ 未找到'转文字'按钮,可能是已转换或误判")
# 即使失败,也已记录在 processed_y_coords 中,避免死循环
# 继续尝试下一条语音
logger.info("跳过当前语音,继续扫描...")
continue
# 循环结束后,等待所有异步 OCR 任务完成
if hasattr(analyze_chat_image, "_ocr_tasks") and analyze_chat_image._ocr_tasks:
logger.info(f"等待 {len(analyze_chat_image._ocr_tasks)} 个异步 OCR 任务完成...")
results = await asyncio.gather(*analyze_chat_image._ocr_tasks)
for y, content in results:
if content:
captured_voice_contents[y] = content
logger.info(f"✅ [Async OCR] 异步获取到语音内容 (y={y}): {content}")
# 清空任务列表
analyze_chat_image._ocr_tasks = []
# 循环结束,返回最后一次分析的结果
if not final_messages: # 如果循环因为 max_loops 退出,确保有结果
final_messages = messages
# 注入 peek 到的内容
if captured_voice_contents:
logger.info(f"正在注入 {len(captured_voice_contents)} 条已还原的语音内容...")
for m in final_messages:
if m['type'] == 'voice' and (not m.get('content') or m.get('content').strip() == ""):
for py, content in captured_voice_contents.items():
# 注入时的容差也要放大,因为 final_messages 的 Y 可能和点击时的 vy 略有差异
if abs(m['y'] - py) < 100:
m['content'] = content
m['is_converted'] = True # 标记为逻辑上已转换
logger.info(f" -> 注入内容到 Y={m['y']} (原 py={py}): {content[:20]}...")
break
# 构造返回值
dialogue_log = []
# 使用 debug_img 的尺寸,如果 debug_img 未定义(极端情况),默认 1080x1920
if 'debug_img' in locals() and debug_img is not None:
# [User Requested] 几何兜底 Y 轴应为 0.88 (避开底部导航条)
input_field_coordinates = (debug_img.shape[1] // 2, int(debug_img.shape[0] * 0.88))
else:
# 尝试读取 current_image_path
try:
tmp_img = cv2.imread(current_image_path)
input_field_coordinates = (tmp_img.shape[1] // 2, int(tmp_img.shape[0] * 0.88))
except:
input_field_coordinates = (540, 1690) # 1920 * 0.88
# 找出最后一条消息
last_msg = None
if final_messages:
final_messages.sort(key=lambda x: x['y'])
last_msg = final_messages[-1]
# 转换为 dialogue_log 格式 (简单转换,具体业务逻辑在调用方处理)
# 注意T2 需要的是上下文列表
pass # 实际上 T2 使用的是 LLM 上下文构建,这里不需要转换成特定 dict 结构,
# 但为了兼容旧接口,我们还是返回 messages 列表给调用者处理,
# 或者在这里处理成 (role, content) 列表?
# 原代码似乎没有做太多转换,而是直接返回 messages 列表?
# 仔细看原代码analyze_chat_image 并没有返回 messages 列表!
# 它返回 dialogue_log, input_pos
# 原代码 lines 339-340: dialogue_log = []
# 可以在最后统一生成
# 统一生成 dialogue_log
for msg in final_messages:
# 尝试注入异步获取的语音内容
if msg['type'] == 'voice':
# 模糊匹配 Y 坐标 (增大容差到 100应对界面滚动)
# 优先检查 content 是否为空或为 placeholder
if not msg.get('content') or msg.get('content').strip() == "":
for y_key, content in captured_voice_contents.items():
if abs(msg['y'] - y_key) < 100:
msg['is_converted'] = True
msg['content'] = content
logger.info(f"✅ [注入] 成功将异步语音内容 '{content}' 注入到 Y={msg['y']} 的消息中")
break
# 无论是否有内容,都加入 dialogue_log
if msg['type'] == 'text':
if msg.get('content'): # 文本消息没内容通常是识别错误,可以丢弃
dialogue_log.append(msg)
elif msg['type'] == 'voice':
# 语音消息即使没内容也保留,交给上层处理
dialogue_log.append(msg)
logger.info(f"📊 [统计] 语音总数: {total_voices_count}, 打开转文字次数: {convert_opened_count}, 关闭转文字次数: {convert_closed_count}")
return dialogue_log, input_field_coordinates
except Exception as e:
logger.error(f"分析过程发生异常: {e}", exc_info=True)
return [], (540, 1690)
def clean_screenshots_dir():
"""清理截图目录"""
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
return
for f in os.listdir(OUTPUT_DIR):
if f.lower().endswith(('.jpg', '.png', '.jpeg')):
try:
os.remove(os.path.join(OUTPUT_DIR, f))
except Exception as e:
logger.warning(f"Failed to delete {f}: {e}")
def is_in_chat_interface(d):
"""
检查是否在微信聊天界面
"""
try:
# 1. 底部语音/键盘切换按钮
if d(description="切换到语音").exists or d(description="切换到键盘").exists:
return True
# 2. 底部输入框
if d(className="android.widget.EditText").exists:
return True
# 3. 底部“按住说话”按钮
if d(text="按住说话").exists:
return True
# 4. 右上角更多按钮
if d(description="聊天信息").exists:
return True
except Exception as e:
logger.warning(f"is_in_chat_interface check failed: {e}")
return False
def find_input_box_center(image_path):
"""
寻找输入框中心坐标 (兜底策略)
优先使用几何特征 (底部 88% 处)
"""
try:
if not os.path.exists(image_path):
return (540, 2100), None
img = cv2.imread(image_path)
if img is None:
return (540, 2100), None
h, w = img.shape[:2]
# 策略:直接返回屏幕底部 88% 处的中心点
center_x = int(w * 0.5)
center_y = int(h * 0.88)
logger.info(f"find_input_box_center fallback: ({center_x}, {center_y})")
return (center_x, center_y), None
except Exception as e:
logger.error(f"find_input_box_center error: {e}")
return (540, 2100), None
def find_template_match(screen_input, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找按钮中心坐标
:param screen_input: 可以是文件路径 (str) 或 OpenCV 图像 (numpy.ndarray)
:param template_path: 模板文件路径
:param threshold: 匹配阈值
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return None
# 处理输入图像
if isinstance(screen_input, str):
img = cv2.imread(screen_input)
elif isinstance(screen_input, np.ndarray):
img = screen_input
else:
# 尝试处理 PIL Image (uiautomator2 默认返回)
try:
img = cv2.cvtColor(np.array(screen_input), cv2.COLOR_RGB2BGR)
except Exception:
logger.error(f"Invalid screen_input type: {type(screen_input)}")
return None
template = cv2.imread(template_path)
if img is None or template is None:
return None
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
if max_val >= threshold:
center_x = max_loc[0] + w // 2
center_y = max_loc[1] + h // 2
logger.info(f"Template matched! Score: {max_val:.2f}, Center: ({center_x}, {center_y})")
return (center_x, center_y)
logger.info(f"Template not matched. Max score: {max_val:.2f}")
return None
except Exception as e:
logger.error(f"Template matching failed: {e}")
return None
def find_all_template_matches(screen_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找**所有**符合条件的坐标
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return []
img = cv2.imread(screen_path)
template = cv2.imread(template_path)
if img is None or template is None:
return []
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
# 记录最大匹配度,方便调试阈值
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
logger.info(f"模板匹配 {os.path.basename(template_path)}: 最大相似度 = {max_val:.4f} (阈值={threshold})")
# 找到所有大于阈值的点
loc = np.where(res >= threshold)
points = []
for pt in zip(*loc[::-1]): # Switch collumns and rows
center_x = pt[0] + w // 2
center_y = pt[1] + h // 2
points.append((center_x, center_y))
# 简单的去重(非极大值抑制的简化版,合并相近的点)
# 这里假设红点不会重叠,暂时直接返回,或者做一个简单的聚类
# 实际应用中matchTemplate 对同一个目标周围可能会有多个连续的匹配点
# 我们需要合并它们
unique_points = []
for p in points:
is_close = False
for up in unique_points:
if abs(p[0] - up[0]) < 10 and abs(p[1] - up[1]) < 10:
is_close = True
break
if not is_close:
unique_points.append(p)
if unique_points:
logger.info(f"Found {len(unique_points)} matches for {os.path.basename(template_path)}")
return unique_points
except Exception as e:
logger.error(f"find_all_template_matches failed: {e}")
return []
def perform_input_action(d, center_point, text, auto_send=True, debug_prefix=None):
"""
执行输入操作
:param debug_prefix: 如果提供,将在关键步骤保存截图,如 {debug_prefix}_before_mode.jpg
"""
try:
def save_debug_shot(name):
if debug_prefix:
shot_path = os.path.join(OUTPUT_DIR, f"{debug_prefix}_{name}.jpg")
d.screenshot(shot_path)
logger.info(f"保存中间过程截图: {shot_path}")
# --- 新增逻辑:确保处于文字输入模式 ---
logger.info("正在检查输入模式...")
save_debug_shot("1_check_mode")
# 优先使用 uiautomator2 的属性检测(比图像识别更稳)
# 1. 检查是否有 "切换到键盘" 按钮(说明当前是语音模式)
voice_mode_btn = d(description="切换到键盘")
if voice_mode_btn.exists:
logger.info("检测到语音模式 (UI树: '切换到键盘'),点击切换...")
voice_mode_btn.click()
time.sleep(1.0) # 等待 UI 切换
# 2. 检查是否有 "切换到语音" 按钮(说明当前是文字模式)
# 这一步不是必须的,但可以用来确认状态
# text_mode_btn = d(description="切换到语音")
# if text_mode_btn.exists:
# logger.info("当前已是文字模式 (UI树: '切换到语音')")
# 3. 如果 UI 树检测失败,尝试图像兜底
if not voice_mode_btn.exists:
tmp_check_shot = os.path.join(OUTPUT_DIR, "temp_input_check.jpg")
d.screenshot(tmp_check_shot)
wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg")
# 检查是否存在 '切换到文字' 图标
wen_zi_pos = find_template_match(tmp_check_shot, wen_zi_template, threshold=0.8)
if wen_zi_pos:
logger.info(f"检测到语音模式 (图像: 找到切换文字图标: {wen_zi_pos}),点击切换...")
d.click(wen_zi_pos[0], wen_zi_pos[1])
time.sleep(1.0)
# 清理临时文件
if os.path.exists(tmp_check_shot):
try:
os.remove(tmp_check_shot)
except:
pass
# --- 新增逻辑结束 ---
save_debug_shot("2_after_mode")
# 1. 尝试找到原生输入框并输入
# 增加多种查找方式
edit_text = d(className="android.widget.EditText")
if not edit_text.exists:
# 尝试通过 resourceId 查找 (微信常见ID)
edit_text = d(resourceId="com.tencent.mm:id/b4a")
# 1.2 [User Request] 尝试使用 input_text.jpg 模板寻找输入框
if not edit_text.exists:
input_template_path = os.path.join(TEMPLATE_DIR, "input_text.jpg")
if os.path.exists(input_template_path):
# 截图用于匹配
tmp_input_search = os.path.join(OUTPUT_DIR, "temp_input_search.jpg")
d.screenshot(tmp_input_search)
logger.info(f"正在尝试使用模板 {input_template_path} 寻找输入框...")
# [User Request] 降低阈值到 0.6
input_pos = find_template_match(tmp_input_search, input_template_path, threshold=0.6)
if input_pos:
logger.info(f"✅ [Template] 通过 input_text.jpg 找到输入框: {input_pos}")
save_debug_shot("3_input_box_found")
# 绘制调试图 (蓝框)
try:
debug_img = cv2.imread(tmp_input_search)
if debug_img is not None:
# 读取模板获取宽高
tmpl = cv2.imread(input_template_path)
if tmpl is not None:
th, tw = tmpl.shape[:2]
cx, cy = input_pos
top_left = (cx - tw//2, cy - th//2)
bottom_right = (cx + tw//2, cy + th//2)
# 蓝色框 BGR=(255, 0, 0)
cv2.rectangle(debug_img, top_left, bottom_right, (255, 0, 0), 3)
cv2.putText(debug_img, "MATCH: input_text.jpg", (top_left[0], top_left[1]-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
debug_save_path = os.path.join(OUTPUT_DIR, "debug_input_box_match.jpg")
cv2.imwrite(debug_save_path, debug_img)
logger.info(f"已保存输入框匹配调试图(蓝框): {debug_save_path}")
except Exception as e:
logger.warning(f"绘制输入框调试图失败: {e}")
# 更新点击坐标
center_point = input_pos
else:
logger.info(f"❌ [Template] input_text.jpg 未匹配到输入框")
# 1.5 如果找不到原生输入框,尝试通过“切换到语音”按钮定位 Y 轴
# 输入框通常与左侧的“切换到语音”按钮垂直居中对齐
if not edit_text.exists:
try:
# 确保在文字模式下,左侧会有“切换到语音”按钮
# 有时候可能是 "切换到键盘" (如果状态判断出错),都尝试一下作为锚点
anchor_btn = d(description="切换到语音")
if not anchor_btn.exists:
anchor_btn = d(description="切换到键盘")
if anchor_btn.exists:
# 获取按钮中心 Y 坐标
bounds = anchor_btn.info['bounds']
anchor_y = (bounds['top'] + bounds['bottom']) // 2
# 获取屏幕宽度
w, h = d.window_size()
# 更新中心点X居中Y与按钮对齐
center_point = (w // 2, anchor_y)
logger.info(f"通过'切换到语音'按钮修正输入框坐标: {center_point}")
except Exception as e:
logger.warning(f"尝试修正坐标失败: {e}")
input_success = False
if edit_text.exists:
logger.info("Found native EditText, using set_text")
try:
edit_text.click()
time.sleep(0.5)
edit_text.set_text(text)
input_success = True
except Exception as e:
logger.warning(f"Native input failed: {e}")
# 2. 如果原生输入失败,使用坐标点击 + 粘贴/输入
if not input_success:
cx, cy = center_point
logger.info(f"Using coordinate input: {center_point}")
d.click(cx, cy)
time.sleep(1.0)
try:
d.send_keys(text)
except Exception:
logger.warning("send_keys failed, trying set_clipboard")
d.set_clipboard(text)
d.click(cx, cy)
time.sleep(0.5)
# 尝试粘贴
d.press("paste")
save_debug_shot("4_after_input")
time.sleep(1.0)
# 3. 发送
if auto_send:
# 优先使用模板匹配寻找“发送”按钮
logger.info("尝试使用模板匹配寻找'发送'按钮...")
tmp_screen = os.path.join(os.path.dirname(os.path.abspath(__file__)), "temp_send_check.jpg")
d.screenshot(tmp_screen)
# 使用相对路径
template_path = os.path.join(TEMPLATE_DIR, "send.jpg")
send_btn_pos = find_template_match(tmp_screen, template_path, threshold=0.7) # 稍微降低阈值以提高召回
if send_btn_pos:
logger.info(f"通过模板匹配找到发送按钮: {send_btn_pos}, 点击...")
d.click(send_btn_pos[0], send_btn_pos[1])
else:
logger.warning("模板匹配未找到发送按钮,尝试原生控件查找...")
if d(text="发送").exists:
d(text="发送").click()
logger.info("Clicked '发送'")
else:
d.press("enter")
logger.info("Pressed Enter")
save_debug_shot("5_after_send")
# 清理临时文件
if os.path.exists(tmp_screen):
try:
os.remove(tmp_screen)
except:
pass
return True
except Exception as e:
logger.error(f"perform_input_action error: {e}")
return False
def perform_voice_input(d, duration=3.0, debug_prefix=None):
"""
执行语音输入操作 (长按说话)
1. 检查是否在语音模式 (寻找 press_say.jpg)
2. 如果不在,尝试点击 keyboard.jpg 或 audio_reply.jpg 切换
3. 长按 press_say.jpg并进行计时记录
"""
try:
def save_debug_shot(name):
if debug_prefix:
shot_path = os.path.join(OUTPUT_DIR, f"{debug_prefix}_{name}.jpg")
d.screenshot(shot_path)
logger.info(f"保存中间过程截图: {shot_path}")
save_debug_shot("voice_1_check")
# 模板路径
press_say_template = os.path.join(TEMPLATE_DIR, "press_say.jpg")
keyboard_template = os.path.join(TEMPLATE_DIR, "keyboard.jpg")
audio_reply_template = os.path.join(TEMPLATE_DIR, "audio_reply.jpg")
# 1. 检查当前模式
tmp_screen = os.path.join(OUTPUT_DIR, "temp_voice_check.jpg")
d.screenshot(tmp_screen)
press_say_pos = find_template_match(tmp_screen, press_say_template, threshold=0.8)
if press_say_pos:
logger.info(">>> [状态] 当前已是语音模式 (找到 '按住说话' 按钮)")
need_switch = False
else:
logger.info(">>> [状态] 当前可能是键盘模式 (未找到 '按住说话' 按钮)")
need_switch = True
if need_switch:
logger.info(">>> [切换] 需要进行模式切换...")
# 2. 尝试点击键盘图标或音频图标切换模式
switch_pos = find_template_match(tmp_screen, keyboard_template, threshold=0.8)
if not switch_pos:
switch_pos = find_template_match(tmp_screen, audio_reply_template, threshold=0.8)
if switch_pos:
logger.info(f">>> [切换] 找到切换按钮 {switch_pos},正在点击切换...")
d.click(switch_pos[0], switch_pos[1])
time.sleep(1.5) # 稍微增加等待时间确保切换完成
# 验证切换是否成功
d.screenshot(tmp_screen)
press_say_pos = find_template_match(tmp_screen, press_say_template, threshold=0.8)
if press_say_pos:
logger.info(">>> [切换] 成功完成切换,进入语音模式")
else:
logger.error(">>> [切换] 切换操作已执行,但仍未找到 '按住说话' 按钮,切换可能失败")
else:
logger.warning(">>> [切换] 未找到切换按钮 (keyboard.jpg/audio_reply.jpg),无法切换")
# 3. 执行长按
if press_say_pos:
x, y = press_say_pos
logger.info(f">>> [发送] 开始按住发送语音按钮 ({x}, {y})")
save_debug_shot("voice_2_before_hold")
# 开始计时长按
d.touch.down(x, y)
start_time = time.time()
last_second = 0
while time.time() - start_time < duration:
elapsed = int(time.time() - start_time) + 1
if elapsed > last_second and elapsed <= duration:
logger.info(f">>> [计时] {elapsed}")
last_second = elapsed
time.sleep(0.1)
d.touch.up(x, y)
logger.info(f">>> [完成] 已完成指定时长 ({duration}s) 的按住按钮发送语音")
save_debug_shot("voice_3_after_hold")
if os.path.exists(tmp_screen):
try: os.remove(tmp_screen)
except: pass
return True
else:
logger.error(">>> [失败] 最终未能定位到发送语音按钮")
if os.path.exists(tmp_screen):
try: os.remove(tmp_screen)
except: pass
return False
except Exception as e:
logger.error(f"perform_voice_input error: {e}")
return False
def switch_to_keyboard_mode(d):
"""
强制切换到键盘/文本模式
"""
try:
logger.info(">>> [模式] 尝试切换到键盘模式...")
# 1. 尝试 UI 树
voice_mode_btn = d(description="切换到键盘")
if voice_mode_btn.exists:
logger.info(">>> [模式] 检测到语音模式按钮,点击切换到键盘...")
voice_mode_btn.click()
time.sleep(1.0)
return True
# 2. 尝试图像匹配 (wen_zi_input.jpg)
tmp_screen = os.path.join(OUTPUT_DIR, "temp_switch_kb.jpg")
d.screenshot(tmp_screen)
wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg")
wen_zi_pos = find_template_match(tmp_screen, wen_zi_template, threshold=0.8)
if wen_zi_pos:
logger.info(f">>> [模式] 找到切换文字图标 {wen_zi_pos},点击切换...")
d.click(wen_zi_pos[0], wen_zi_pos[1])
time.sleep(1.0)
return True
logger.info(">>> [模式] 当前可能已经是键盘模式,或未找到切换按钮")
return False
except Exception as e:
logger.error(f"switch_to_keyboard_mode error: {e}")
return False
def check_is_chat_interface(screenshot_path):
"""
检查当前是否在聊天界面
通过匹配 'audio_reply.jpg' (语音图标) 或 'keyboard.jpg' (键盘图标) 来判断
"""
audio_reply_template = os.path.join(TEMPLATE_DIR, "audio_reply.jpg")
keyboard_template = os.path.join(TEMPLATE_DIR, "keyboard.jpg")
# 检查语音图标
if match_template_center(screenshot_path, audio_reply_template, threshold=0.8):
logger.info("✅ 检测到语音回复图标,确认处于聊天界面")
return True
# 检查键盘图标
if match_template_center(screenshot_path, keyboard_template, threshold=0.8):
logger.info("✅ 检测到键盘输入图标,确认处于聊天界面")
return True
logger.warning("⚠️ 未检测到聊天界面特征图标,当前可能不在聊天页面")
return False
def match_template_center(image_input, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找目标图片中心坐标
:param image_input: 可以是文件路径 (str) 或 OpenCV 图像 (numpy.ndarray)
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template not found: {template_path}")
return None
# 处理输入图像
if isinstance(image_input, str):
if not os.path.exists(image_input):
logger.error(f"Image file not found: {image_input}")
return None
img = cv2.imread(image_input)
elif isinstance(image_input, np.ndarray):
img = image_input
else:
# 尝试处理 PIL Image
try:
img = cv2.cvtColor(np.array(image_input), cv2.COLOR_RGB2BGR)
except Exception:
logger.error(f"Invalid image_input type: {type(image_input)}")
return None
template = cv2.imread(template_path)
if img is None or template is None:
logger.error("Failed to read image or template")
return None
# 转换为灰度图进行匹配
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
# 模板匹配
result = cv2.matchTemplate(img_gray, template_gray, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val >= threshold:
h, w = template_gray.shape
top_left = max_loc
center_x = int(top_left[0] + w / 2)
center_y = int(top_left[1] + h / 2)
logger.info(f"Template matched with confidence {max_val:.2f} at ({center_x}, {center_y})")
return (center_x, center_y)
else:
logger.warning(f"Template match failed. Max confidence: {max_val:.2f} < Threshold: {threshold}")
return None
except Exception as e:
logger.error(f"match_template_center error: {e}")
return None