# coding=utf-8 import uiautomator2 as u2 import time import asyncio import logging import sys import os import cv2 import numpy as np import re # 添加项目根目录到 sys.path 以便导入 Util project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if project_root not in sys.path: sys.path.append(project_root) import json from datetime import datetime, timedelta from Util.EasyOcrKit import EasyOcrKit # 初始化 EasyOcrKit ocr_kit = EasyOcrKit() # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("WxUtil") # 目录配置 BASE_DATA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) LOG_DIR = os.path.join(BASE_DATA_DIR, "Logs") OUTPUT_DIR = os.path.join(BASE_DATA_DIR, "Output") TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Templates") # 全局调试图片计数器 _debug_counter = 0 # 全局调试模式开关 DEBUG_MODE = False def set_debug_mode(enabled): """设置全局调试模式""" global DEBUG_MODE DEBUG_MODE = enabled if enabled: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) def parse_wechat_time(time_str): """ 解析微信时间字符串为标准化格式 (YYYY-MM-DD HH:MM) 支持: "10:03", "昨天 10:03", "星期三 10:03", "2025年1月1日 10:03" """ try: clean_str = time_str.strip() # 0. 预处理:过滤纯数字(防止电话号码被误识别为时间) # 微信时间戳通常包含中文或冒号,单纯的数字串(如 "18686619970")不是有效时间 if re.match(r'^\d+$', clean_str): logger.warning(f"忽略疑似电话号码/纯数字的时间字符串: '{clean_str}'") return "" now = datetime.now() today = now.date() # 1. HH:mm (当天) # 注意:有时候 OCR 会把冒号识别成其他字符,这里假设是标准的 HH:mm if re.match(r'^\d{1,2}:\d{2}$', clean_str): h, m = map(int, clean_str.split(':')) dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m)) return dt.strftime("%Y-%m-%d %H:%M") # 2. 昨天 HH:mm if "昨天" in clean_str: t_part = clean_str.replace("昨天", "").strip() if re.match(r'^\d{1,2}:\d{2}$', t_part): h, m = map(int, t_part.split(':')) yesterday = today - timedelta(days=1) dt = datetime.combine(yesterday, datetime.min.time().replace(hour=h, minute=m)) return dt.strftime("%Y-%m-%d %H:%M") # 3. 星期X / 周X HH:mm weekdays_map = { "星期一": 0, "星期二": 1, "星期三": 2, "星期四": 3, "星期五": 4, "星期六": 5, "星期日": 6, "周一": 0, "周二": 1, "周三": 2, "周四": 3, "周五": 4, "周六": 5, "周日": 6 } for w_str, w_idx in weekdays_map.items(): if w_str in clean_str: # 提取时间部分 (支持 "周三 10:03" 或 "周三10:03") t_part = clean_str.replace(w_str, "").strip() time_match = re.search(r'(\d{1,2}):(\d{2})', t_part) h, m = 0, 0 if time_match: h, m = map(int, time_match.groups()) current_weekday = now.weekday() # 计算日期回退天数 (mod 7 确保是过去的一周内) delta_days = (current_weekday - w_idx) % 7 # 如果 delta_days 是 0,且当前时间比解析出的时间早,说明是上周的今天 # 微信通常只有在真的“过去”才会显示星期几 if delta_days == 0 and time_match: if now.hour < h or (now.hour == h and now.minute < m): delta_days = 7 elif delta_days == 0 and not time_match: # 只有“周三”没有时间,通常指最近的一个周三(如果今天是周三,可能指上周三) # 但为了简单,如果今天是周三且没时间,我们暂定为今天 pass target_date = today - timedelta(days=delta_days) if time_match: dt = datetime.combine(target_date, datetime.min.time().replace(hour=h, minute=m)) return dt.strftime("%Y-%m-%d %H:%M") else: return target_date.strftime("%Y-%m-%d 00:00") # 4. YYYY年MM月DD日 HH:mm # 简单匹配年月日 match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', clean_str) if match: y, m, d = map(int, match.groups()) # 找时间部分 time_match = re.search(r'(\d{1,2}):(\d{2})', clean_str) if time_match: hh, mm = map(int, time_match.groups()) dt = datetime(y, m, d, hh, mm) return dt.strftime("%Y-%m-%d %H:%M") else: # 只有日期,没有时间 (通常是日期分隔符) # 这种情况下,可能需要给个默认时间?或者就返回日期 return f"{y:04d}-{m:02d}-{d:02d} 00:00" # 5. MM月DD日 HH:mm (跨年但未显示年份?微信通常会显示年份如果跨年) # 处理 "1月26日 10:00" match = re.search(r'(\d{1,2})月(\d{1,2})日', clean_str) if match: m, d = map(int, match.groups()) # 默认当年 y = today.year # 找时间 time_match = re.search(r'(\d{1,2}):(\d{2})', clean_str) if time_match: hh, mm = map(int, time_match.groups()) dt = datetime(y, m, d, hh, mm) # 如果计算出的时间在未来,可能是去年 (比如现在1月,消息是12月) if dt > now: dt = datetime(y - 1, m, d, hh, mm) return dt.strftime("%Y-%m-%d %H:%M") # 兜底:如果是 "下午 5:00" 这种格式 if "下午" in clean_str or "晚上" in clean_str: t_part = re.sub(r'下午|晚上', '', clean_str).strip() if re.match(r'^\d{1,2}:\d{2}$', t_part): h, m = map(int, t_part.split(':')) if h < 12: h += 12 dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m)) return dt.strftime("%Y-%m-%d %H:%M") if "上午" in clean_str: t_part = re.sub(r'上午', '', clean_str).strip() if re.match(r'^\d{1,2}:\d{2}$', t_part): h, m = map(int, t_part.split(':')) dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m)) return dt.strftime("%Y-%m-%d %H:%M") # 解析失败,返回空字符串,避免将无关文本(如电话号码)误认为时间注入到上下文中 return "" except Exception as e: logger.warning(f"时间解析失败 '{time_str}': {e}") return "" def get_next_debug_path(desc="step"): """获取下一个顺序命名的调试图片路径 (debug_N_desc.jpg)""" if not DEBUG_MODE: return None global _debug_counter _debug_counter += 1 filename = f"debug_{_debug_counter}_{desc}.jpg" return os.path.join(OUTPUT_DIR, filename) def clear_directory(dir_path, exclude_files=None): """清理指定目录下的所有文件,支持排除特定文件""" if not os.path.exists(dir_path): os.makedirs(dir_path) return if exclude_files is None: exclude_files = [] import shutil for filename in os.listdir(dir_path): if filename in exclude_files: continue file_path = os.path.join(dir_path, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except PermissionError: # 忽略正在被使用的文件(如当前的日志文件) continue except Exception as e: logger.warning(f"Failed to delete {file_path}. Reason: {e}") def setup_script_environment(): """运行前清理日志和输出目录""" logger.info("清理运行环境: Logs 和 Output 目录...") # 重置调试计数器 global _debug_counter _debug_counter = 0 # 清理所有日志和图片,确保新一轮运行有干净的环境 clear_directory(LOG_DIR) clear_directory(OUTPUT_DIR) def connect_device(): """ 连接设备并返回设备对象,同时打印详细的设备信息 """ try: d = u2.connect() # 强制检查连接是否可用 if not d.info: logger.error("设备连接不可用 (d.info is empty)") return None # 获取可靠的序列号 device_serial = d.serial if hasattr(d, 'serial') else "未知" logger.info(f"设备连接成功: {device_serial}") # 获取并打印详细设备信息 device_info = d.device_info logger.info(f"详细设备信息: 品牌={device_info.get('brand')}, 型号={device_info.get('model')}, SDK={device_info.get('sdk')}") return d except Exception as e: logger.error(f"设备连接失败: {e}") return None def safe_device_click(d, x, y): """ 安全的点击操作,包含简单的异常捕获和重试逻辑 """ try: # 强制转换为原生 int,防止 numpy.int64 导致的 JSON 序列化错误 ix, iy = int(x), int(y) d.click(ix, iy) return True except Exception as e: logger.warning(f"点击操作失败 ({x}, {y}): {e},尝试重新连接并重试...") try: # 尝试重新初始化连接 new_d = u2.connect() ix, iy = int(x), int(y) new_d.click(ix, iy) return True except Exception as e2: logger.error(f"重试点击操作依然失败: {e2}") return False def draw_debug_info(image_path, messages, current_voice_center=None, suffix=""): """ 辅助函数:在截图中绘制当前已知的消息状态 :param image_path: 图片路径 :param messages: 消息列表 :param current_voice_center: 当前正在处理的语音中心坐标 (vx, vy) :param suffix: 保存文件名的后缀 """ try: img = cv2.imread(image_path) if img is None: return for msg in messages: if msg['type'] == 'voice': ax, ay = msg['center'] is_unread = msg.get('is_unread', False) is_converted = msg.get('is_converted', False) # 绘制框 color = (0, 0, 255) if is_unread else (0, 255, 0) cv2.rectangle(img, (ax-30, ay-30), (ax+30, ay+30), color, 2) # 绘制 YES/NO label = "YES" if is_converted else "NO" cv2.putText(img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) # 如果是当前正在处理的语音,画一个额外的黄圈 if current_voice_center and abs(ax - current_voice_center[0]) < 10 and abs(ay - current_voice_center[1]) < 10: cv2.circle(img, (ax, ay), 40, (0, 255, 255), 3) cv2.putText(img, "PROCESSING", (ax - 60, ay - 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) # 保存覆盖后的图片 cv2.imwrite(image_path, img) logger.info(f"已更新调试标记到截图: {image_path}") except Exception as e: logger.warning(f"绘制调试信息失败: {e}") def _detect_bubble_color(img, bbox): """ 检测文本框区域的背景颜色,用于辅助判断发送者。 :param img: OpenCV 图像 (BGR) :param bbox: OCR 返回的边界框 4个点 :return: "green" (我), "white" (对方), or "unknown" """ if img is None: return "unknown" # 提取 bbox 区域 h, w = img.shape[:2] min_x = max(0, int(min(p[0] for p in bbox))) max_x = min(w, int(max(p[0] for p in bbox))) min_y = max(0, int(min(p[1] for p in bbox))) max_y = min(h, int(max(p[1] for p in bbox))) if max_x <= min_x or max_y <= min_y: return "unknown" roi = img[min_y:max_y, min_x:max_x] # 计算背景颜色 (抗文字干扰) # 文本是黑色的 (0,0,0),会拉低平均值/中位数 # 使用 95% 分位数来获取背景色 (偏亮的部分 - 真正的背景) try: # axis=(0,1) 对 h,w 维度操作,保留 c 维度 # percentile 返回 float,需转 int bg_color = np.percentile(roi, 95, axis=(0, 1)) b, g, r = bg_color except Exception: # Fallback mean_color = cv2.mean(roi)[:3] b, g, r = mean_color # 调试日志:打印颜色值 if DEBUG_MODE: logger.info(f"Color Debug: B={b:.1f}, G={g:.1f}, R={r:.1f} | bbox={bbox}") # 1. 绿色气泡特征 (我) # Light Mode: BGR (101, 225, 152) -> G 显著大于 R 和 B # Dark Mode: BGR (30, 80, 40) -> G 依然显著大于 R 和 B if g > r + 15 and g > b + 15: # 降低差值阈值,适应暗色模式 if g > 50: # 只要不是太暗 return "green" # 2. 白色/浅灰/深灰气泡特征 (对方) # Light Mode: BGR (255, 255, 255) # Dark Mode: BGR (45, 45, 45) if abs(r - g) < 20 and abs(g - b) < 20 and abs(r - b) < 20: # 白色 (Light Mode) if g > 180: return "white" # 深灰 (Dark Mode) if 40 < g < 100: return "white" # 统一归类为"对方"气泡颜色 # 3. 特殊补丁:系统消息 (灰色/极暗) if abs(r - g) < 15 and abs(g - b) < 15: # 系统背景灰 (Light Mode: 130 左右) if 110 < g < 160: return "system_gray" # 极暗背景 (Dark Mode: 30 左右) if g < 40: return "system_gray" return "unknown" def _scan_chat_messages(image_path): """ 内部函数:扫描图片中的微信消息(语音、文本、红点) 返回: (messages_list, debug_image) """ img = cv2.imread(image_path) if img is None: logger.error(f"无法读取图片: {image_path}") return [], None h, w = img.shape[:2] logger.info(f"DEBUG: Image size w={w}, h={h}") # 3. 模板匹配寻找语音图标和红点 audio_template = os.path.join(TEMPLATE_DIR, "audio.jpg") red_point_template = os.path.join(TEMPLATE_DIR, "red_point.jpg") audio_matches = find_all_template_matches(image_path, audio_template, threshold=0.8) red_points = find_all_template_matches(image_path, red_point_template, threshold=0.8) # 4. OCR 识别所有文本 logger.info("正在执行 OCR 识别...") ocr_results = ocr_kit.read_text(image_path) # 4.5 尝试提取聊天标题 (对方昵称) chat_title = "对方" potential_titles = [] for bbox, text, conf in ocr_results: c_y = int((bbox[0][1] + bbox[2][1]) / 2) c_x = int((bbox[0][0] + bbox[2][0]) / 2) # 标题区域通常在顶部 (状态栏下方,消息列表上方) if 60 < c_y < 140: clean = text.strip() # 排除时间、信号、返回按钮等 if re.match(r'^\d{1,2}:\d{2}$', clean): continue if "微信" in clean or "WeChat" in clean: continue if clean in ["<", "返回", "消息", "(", ")"]: continue if re.match(r'^\d+$', clean): continue # 排除纯数字(如未读数) if len(clean) > 0: potential_titles.append((c_x, clean)) if potential_titles: # 优先取最接近水平中心的文本作为标题 potential_titles.sort(key=lambda x: abs(x[0] - w/2)) chat_title = potential_titles[0][1] # 去除可能包含的括号(比如备注名后的群聊人数) chat_title = re.sub(r'\(\d+\)$', '', chat_title).strip() # [Fix] 过滤掉包含 ".." 的动态标题(如“对方正在输入..”),避免哈希失效 if ".." in chat_title: logger.info(f"忽略动态标题: {chat_title}") chat_title = "对方" else: logger.info(f"识别到聊天标题/对方昵称: {chat_title}") # 微信菜单关键字(用于排除干扰) MENU_KEYWORDS = ["听筒播放", "收藏", "背景播放", "删除", "多选", "取消转文字", "转文字", "引用", "提醒"] # 忽略的系统消息内容 IGNORE_CONTENT = ["撤回了一条消息", "打招呼的消息", "拍了拍", "你撤回了一条消息", "引用", "Clear Text", "Switch IME", "Done", "按住说话", "发送"] # 5. 整合所有消息 messages = [] debug_img = img.copy() # 初始化调试图 # 绘制过滤区域边界 (可视化) cv2.line(debug_img, (0, 150), (w, 150), (255, 0, 255), 2) # 顶部线 cv2.line(debug_img, (0, h - 60), (w, h - 60), (255, 0, 255), 2) # 底部线 (放宽到底部 60px) cv2.putText(debug_img, "TOP_FILTER", (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1) cv2.putText(debug_img, "BOTTOM_FILTER", (10, h - 70), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1) claimed_ocr_indices = set() # A. 添加语音消息 for ax, ay in audio_matches: # 标记所有找到的语音图标 (用于调试) cv2.circle(debug_img, (ax, ay), 10, (255, 255, 0), -1) # 过滤掉顶部和底部的非聊天区域 if ay < 150 or ay > h - 60: logger.info(f"忽略区域外语音图标: ({ax}, {ay})") cv2.rectangle(debug_img, (ax-35, ay-35), (ax+35, ay+35), (128, 128, 128), 1) cv2.putText(debug_img, "FILTERED", (ax - 40, ay - 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1) continue sender = "对方" if ax < w / 2 else "我" is_unread = False for rx, ry in red_points: # 红点通常在语音图标右侧且 Y 轴相近 if abs(ry - ay) < 50 and rx > ax: is_unread = True break # 改进:判断是否已转文字 is_converted = False converted_trigger_text = "" associated_texts = [] # 存储关联的多行文本 [(y, x, text)] for i, (bbox, text, conf) in enumerate(ocr_results): if i in claimed_ocr_indices: continue c_x = int((bbox[0][0] + bbox[2][0]) / 2) c_y = int((bbox[0][1] + bbox[2][1]) / 2) # 判定逻辑:文本在语音下方且水平偏移不大 # 1. Y轴限制: -50 < dy < 800 (适配多行文本) # 2. X轴限制: abs(dx) < 500 (减少误判,防止关联到屏幕另一侧的消息) # 3. 几何位置强校验 (核心修复) voice_is_left = ax < w / 2 # 获取文本框的左右边界 min_x = min(p[0] for p in bbox) max_x = max(p[0] for p in bbox) if voice_is_left: # 语音在左 (对方): 文本必须也是左对齐 # - min_x 必须靠左 (< 450) [Fix] 放宽阈值,防止长文本或缩进文本被过滤 # - max_x 不能太靠右 (> w - 150),否则可能是"我"的消息 if min_x > 450 or max_x > w - 150: logger.debug(f"忽略文本 '{text[:10]}' (Left Voice): min_x={min_x}, max_x={max_x} 不满足左对齐条件") continue else: # 语音在右 (我): 文本必须也是右对齐 # - max_x 必须靠右 (> w - 300) # - min_x 不能太靠左 (< 100) if max_x < w - 300 or min_x < 100: logger.debug(f"忽略文本 '{text[:10]}' (Right Voice): min_x={min_x}, max_x={max_x} 不满足右对齐条件") continue # [Fix] 放宽 X 轴判定范围 (500 -> 600) 以适应更宽的文本 if -50 < c_y - ay < 800 and abs(c_x - ax) < 600: # 检查中间是否有其他语音图标 has_intermediate_audio = False for other_ax, other_ay in audio_matches: # 只有当中间的语音图标在 [150, h-60] 的有效聊天区域内时,才视为阻断 if ay + 20 < other_ay < c_y - 10: if 150 <= other_ay <= h - 60: has_intermediate_audio = True logger.info(f"语音({ax},{ay}) 被中间有效区域内的语音图标({other_ax},{other_ay}) 阻断,无法关联文本 '{text[:10]}...'") break else: logger.info(f"语音({ax},{ay}) 忽略非聊天区域(Y={other_ay})的语音图标阻断") if has_intermediate_audio: continue # [Fix] 检查中间是否有其他气泡消息阻断 (防止跨消息合并) # 如果遇到一个明确属于另一方的消息气泡,必须停止关联 if c_y > ay + 60: # 稍微放宽 Y 轴,避免误判紧贴的转换文本 bubble_color = _detect_bubble_color(img, bbox) if voice_is_left: # 语音在左 (对方) # 如果遇到绿色气泡 (我),或者是明显的右对齐文本,视为阻断 if bubble_color == "green": logger.info(f"语音({ax},{ay}) 被中间'我'的消息(绿色气泡)阻断: '{text[:10]}...'") break if c_x > w * 0.65: # 右侧明显区域 (short message check) logger.info(f"语音({ax},{ay}) 被中间'我'的消息(右对齐)阻断: '{text[:10]}...'") break else: # 语音在右 (我) # 如果遇到白色气泡 (对方),或者是明显的左对齐文本,视为阻断 if bubble_color == "white": logger.info(f"语音({ax},{ay}) 被中间'对方'的消息(白色气泡)阻断: '{text[:10]}...'") break if c_x < w * 0.35: # 左侧明显区域 logger.info(f"语音({ax},{ay}) 被中间'对方'的消息(左对齐)阻断: '{text[:10]}...'") break clean_text = text.strip() # 判定是否为时间戳 is_timestamp = re.search(r'(\d{1,2}:\d{2})', clean_text) and (len(clean_text) < 15) # 判定是否为纯数字或时长 is_duration = re.search(r'\d{1,2}"?$', clean_text) and len(clean_text) < 6 # 判定是否为系统消息 is_ignored = any(k in clean_text for k in IGNORE_CONTENT) # 噪音判定 (例如 "少3"") is_noise = "少" in clean_text and len(clean_text) < 8 and re.search(r'\d', clean_text) if not is_duration and not is_timestamp and clean_text not in MENU_KEYWORDS and not is_ignored and not is_noise: is_converted = True associated_texts.append((c_y, c_x, clean_text)) claimed_ocr_indices.add(i) # 不再 break,继续寻找后续文本行 else: # 这些文本虽然不作为内容,但它们属于语音消息的附属信息,标记为已处理 claimed_ocr_indices.add(i) if is_timestamp: logger.info(f"语音({ax},{ay}) 忽略下方时间戳文本: '{clean_text}'") elif is_duration: logger.info(f"语音({ax},{ay}) 忽略时长文本: '{clean_text}'") elif is_noise: logger.info(f"语音({ax},{ay}) 忽略噪音文本: '{clean_text}'") elif is_ignored: logger.info(f"语音({ax},{ay}) 忽略系统消息文本: '{clean_text}'") else: logger.info(f"语音({ax},{ay}) 忽略其他文本(可能是菜单): '{clean_text}'") # 整合所有关联文本 if associated_texts: # 按 Y 轴排序,如果 Y 接近则按 X 轴排序 associated_texts.sort(key=lambda x: (x[0], x[1])) converted_trigger_text = "".join([t[2] for t in associated_texts]) # 去除已知噪音 noise_patterns = ["42IIhK+-语音输入粘贴#", "语音输入粘贴"] for np in noise_patterns: converted_trigger_text = converted_trigger_text.replace(np, "") converted_trigger_text = converted_trigger_text.strip() logger.info(f"语音({ax},{ay}) 判定为已转换,最终合并文本: '{converted_trigger_text}'") if is_converted: logger.info(f"语音消息 ({ax}, {ay}) 已有转换文字: '{converted_trigger_text}',跳过") # 绘图反馈 color = (0, 0, 255) if is_unread else (0, 255, 0) cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2) label = "YES" if is_converted else "NO" cv2.putText(debug_img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) messages.append({ "type": "voice", "sender": sender, "center": (ax, ay), "y": ay, "is_unread": is_unread, "is_converted": is_converted, "content": converted_trigger_text if is_converted else None }) # B. 添加文本消息 for i, (bbox, text, conf) in enumerate(ocr_results): if i in claimed_ocr_indices: continue c_x = int((bbox[0][0] + bbox[2][0]) / 2) c_y = int((bbox[0][1] + bbox[2][1]) / 2) if c_y < 150 or c_y > h - 60: continue # 判定发送者 (增强版: 几何 + 颜色) # 1. 尝试通过背景颜色判定 (最准确) sender_color = _detect_bubble_color(img, bbox) sender = "unknown" if sender_color == "green": sender = "我" elif sender_color == "white": sender = "对方" elif sender_color == "system_gray": # 灰底文字通常是时间戳或系统消息,由系统发送,几何上居中 sender = "system" # 2. 几何特征强制修正 (Double Check) # 假设头像+边距约占 15% 宽度 edge_margin = w * 0.15 min_x = min(p[0] for p in bbox) max_x = max(p[0] for p in bbox) # 规则 A: 如果这一行极其靠右 (超过 85% 宽度),那肯定是"我" # 即使颜色判成了白色 (比如光照问题),也得纠正回来 if max_x > w - edge_margin: if sender == "对方": logger.warning(f"Sender detected as '对方' by color but geometry says '我' (max_x={max_x} > {w-edge_margin}). Correcting to '我'.") sender = "我" # 规则 B: 如果这一行极其靠左 (小于 35% 宽度),且不靠右,那肯定是"对方" # 扩大判定范围,防止因为 OCR 稍微缩进导致判定失效 # 注意:如果颜色明确为"我"(绿色),则跳过此规则,因为"我"的长消息也可能靠左 elif min_x < w * 0.35 and max_x < w * 0.75: # 修正:max_x 阈值从 0.85 降低到 0.75 if sender == "我": logger.info(f"Geometry says '对方' (min_x={min_x} < {w*0.35}) but Color is '我' (Green). Trusting Color.") elif sender == "system": # 即使颜色是系统灰,但如果位置极其靠左,也可能是对方的某种特殊气泡 pass else: sender = "对方" # 规则 C: 如果颜色是 unknown,且不在极端位置,使用中心点兜底 if sender == "unknown": c_x = int((min_x + max_x) / 2) # 简单中心判断 if c_x < w / 2: sender = "对方" else: sender = "我" # 规则 D: 强几何中心校验 (Final Geometry Verdict) # 仅对短消息使用强几何校验 (宽度 < 70% 屏幕宽度) # 长消息通常铺满屏幕,中心点在中间,容易受字体渲染影响导致误判,应信任颜色检测结果 box_width = max_x - min_x if box_width < w * 0.7: # 如果中心点明显在左半屏 ( < 45% ),判定为"对方" if c_x < w * 0.45: # [Fix] 如果颜色明确是绿色,说明是"我"的左对齐文本(长文换行),不应被几何规则强制改为"对方" if sender == "我" and sender_color == "green": logger.info(f"Geometry says '对方' (center={c_x} < {w*0.45}) but Color is 'green'. Keeping '我'.") elif sender == "system": # 系统消息允许居中或偏左 pass else: if sender == "我": logger.warning(f"Sender detected as '我' by color but center is left ({c_x} < {w*0.45}). Correcting to '对方'.") sender = "对方" # 如果中心点明显在右半屏 ( > 55% ),判定为"我" elif c_x > w * 0.55: if sender == "对方": logger.warning(f"Sender detected as '对方' by color but center is right ({c_x} > {w*0.55}). Correcting to '我'.") elif sender == "system": pass else: sender = "我" else: logger.info(f"Message in middle zone ({w*0.45} < {c_x} < {w*0.55}), trusting color detection: {sender}") else: logger.info(f"Wide message (width={box_width} > {w*0.7}), skipping geometry check, trusting color: {sender}") time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)' # 优先判断是否为独立的时间戳 (行短且符合时间格式) if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)): # 进一步校验是否真的是时间 (通过 parse_wechat_time 尝试解析,或者简单正则) # 这里我们假设短行的符合 time_pattern 的都是时间标记 logger.info(f"识别到时间戳/日期: '{text}'") messages.append({ "type": "timestamp", "content": text.strip(), "y": c_y, "center": (c_x, c_y) }) continue clean_text = text.strip() if re.match(r'^.?[0-9]{1,2}"?$', clean_text): logger.info(f"忽略疑似时长文本: '{clean_text}'") continue # 噪音判定 (例如 "少3"") if "少" in clean_text and len(clean_text) < 8 and re.search(r'\d', clean_text): logger.info(f"忽略噪音文本: '{clean_text}'") continue if clean_text in MENU_KEYWORDS: logger.info(f"忽略菜单关键词: '{clean_text}'") continue if any(k in clean_text for k in IGNORE_CONTENT): logger.info(f"忽略系统消息内容: '{clean_text}'") continue messages.append({ "type": "text", "sender": sender, "content": text.strip(), "center": (c_x, c_y), "y": c_y }) # 6. 排序 messages.sort(key=lambda x: x['y']) # 7. 注入时间戳 current_time_str = None # 过滤掉 timestamp 类型的消息,将其作为属性注入到后续消息中 final_messages_with_time = [] for msg in messages: if msg['type'] == 'timestamp': # 更新当前时间上下文 parsed_time = parse_wechat_time(msg['content']) current_time_str = parsed_time logger.info(f"更新时间上下文: {msg['content']} -> {parsed_time}") else: # 只有语音和文本消息需要注入时间 if current_time_str: msg['time_display'] = current_time_str else: # 如果上方没有时间戳,尝试默认使用当天日期 (或者保持 None) # 对于首屏最上面的消息,可能没有时间戳 pass final_messages_with_time.append(msg) return final_messages_with_time, debug_img, chat_title async def analyze_chat_image(image_path, output_path, device=None, target_name="对方", process_strategy="ALL", restore_processed_voice=True): """ 全面采用 CV + OCR 识别微信聊天截图中的最后一条消息 :param process_strategy: 语音处理策略 (ALL/UNREAD/LAST) :param restore_processed_voice: 是否在转文字后还原(隐藏文字)。默认为 True。 设为 False 可防止最后一条消息在无内容时陷入"转文字->还原->空内容"的死循环。 注意:此函数现在包含一个循环,如果发现需要转文字的语音,会逐个处理并重新截图。 """ try: d = device if device else connect_device() if not d: return [], None current_image_path = image_path current_output_path = output_path final_messages = [] loop_count = 0 MAX_LOOPS = 10 # 增加循环次数上限,适应 ALL 策略 # 统计计数器 total_voices_count = 0 convert_opened_count = 0 convert_closed_count = 0 # 记录本次会话已处理过的语音 Y 坐标集合 processed_y_coords = set() # 记录 Peek-and-Restore 过程中抓取到的语音内容 {y_coord: content} captured_voice_contents = {} # 初始化异步任务列表 analyze_chat_image._ocr_tasks = [] while loop_count < MAX_LOOPS: loop_count += 1 logger.info(f"--- 分析循环 第 {loop_count} 次 ---") # 1. 扫描当前屏幕 messages, debug_img, chat_title = _scan_chat_messages(current_image_path) if messages is None: # 读取失败 return [], None # 更新消息发送者名称 (将 "对方" 替换为 实际标题) if chat_title and chat_title != "对方": for m in messages: if m['sender'] == "对方": m['sender'] = chat_title # 保存当前状态的调试图 if current_output_path and DEBUG_MODE: cv2.imwrite(current_output_path, debug_img) logger.info(f"调试图已保存: {current_output_path}") # 2. 筛选需要处理的语音 all_voices = [m for m in messages if m['type'] == 'voice'] all_voices.sort(key=lambda x: x['y']) # 从上到下 # 更新统计 (取当前扫描到的数量) total_voices_count = len(all_voices) # Helper: 检查是否已处理 def is_processed(y_coord): for py in processed_y_coords: if abs(y_coord - py) < 20: # 20px 容差 return True return False target_voices = [] if process_strategy == "ALL": # ALL 策略:处理所有未被记录处理过的、且未转换的语音 target_voices = [m for m in all_voices if not m.get('is_converted') and not is_processed(m['y'])] logger.info(f"策略(ALL): 发现 {len(target_voices)} 条未转换待处理语音") elif process_strategy == "UNREAD": # UNREAD 策略:只处理未读且未转换且未处理过的 target_voices = [m for m in all_voices if m.get('is_unread') and not m.get('is_converted') and not is_processed(m['y'])] logger.info(f"策略(UNREAD): 发现 {len(target_voices)} 条未读待处理语音") elif process_strategy == "LAST": # LAST 策略:只处理最后一条未转换的 unconverted = [m for m in all_voices if not m.get('is_converted')] if unconverted: last_voice = unconverted[-1] if not is_processed(last_voice['y']): target_voices = [last_voice] logger.info(f"策略(LAST): 仅关注最后一条未转换语音") # 如果没有需要处理的语音,或者我们已经达到了策略要求,退出循环 if not target_voices: logger.info("当前屏幕无待处理语音,分析结束") final_messages = messages break # 3. 处理第一条目标语音 # 注意:只处理第一条,因为处理后界面会变动(展开文字),坐标会失效 target = target_voices[0] vx, vy = int(target['center'][0]), int(target['center'][1]) # 标记为已处理 processed_y_coords.add(target['y']) logger.info(f"准备处理语音 ({vx}, {vy})...") # 高亮正在处理的语音并保存更新后的调试图 if DEBUG_MODE: draw_debug_info(current_output_path, messages, current_voice_center=(vx, vy)) # 执行操作:长按 -> 转文字 logger.info(f"正在长按语音消息 ({vx}, {vy})...") d.long_click(int(vx), int(vy), 1.0) # 确保坐标为原生 int # 轮询寻找“转文字”按钮 logger.info("正在快速寻找'转文字'按钮...") zhuan_template = os.path.join(TEMPLATE_DIR, "zhun_wen_zi.jpg") btn_pos = None poll_start = time.time() while time.time() - poll_start < 3.0: # 最多等 3 秒 menu_shot = get_next_debug_path("step_long_press_poll") if menu_shot: d.screenshot(menu_shot) btn_pos = find_template_match(menu_shot, zhuan_template, threshold=0.7) else: # 调试模式关闭时,直接在内存中匹配 (使用 format='opencv' 提高效率) btn_pos = find_template_match(d.screenshot(format='opencv'), zhuan_template, threshold=0.7) if btn_pos: break time.sleep(0.2) # 快速轮询 if btn_pos: btn_x, btn_y = int(btn_pos[0]), int(btn_pos[1]) logger.info(f"✅ 找到'转文字'按钮: ({btn_x}, {btn_y}),点击中...") safe_device_click(d, btn_x, btn_y) convert_opened_count += 1 logger.info("等待语音转文字完成...") time.sleep(3.0) # 缩短等待时间 (原5.0s) # --- Peek-and-Restore 逻辑 (异步优化版) --- # 1. 截图 (但不立即 OCR,而是丢给异步任务) peek_shot = get_next_debug_path("step_peek_content") if not peek_shot: # 如果不是调试模式,我们需要一个临时路径供 OCR 任务使用 peek_shot = os.path.join(OUTPUT_DIR, f"temp_peek_{int(time.time())}.jpg") d.screenshot(peek_shot) logger.info(f"已获取截图,启动异步OCR任务以提取内容...") async def _async_ocr_task(img_path, target_y): """内部异步任务:在线程池中运行 OCR""" try: loop = asyncio.get_running_loop() # 在默认执行器(线程池)中运行耗时的 _scan_chat_messages logger.info(f"🚀 [Async OCR] 开始分析截图 {os.path.basename(img_path)} (目标 Y={target_y})") msgs, _, _ = await loop.run_in_executor(None, _scan_chat_messages, img_path) found = None # 收集所有可能是该语音消息转换出的文本 all_found_texts = [] for pm in msgs: if pm['type'] == 'voice' and pm.get('is_converted'): # 容差稍微放大,因为转文字展开后 Y 坐标会变 if abs(pm['y'] - target_y) < 150: # 进一步放宽容差 content = pm.get('content', '').strip() if content: all_found_texts.append((pm['y'], content)) if all_found_texts: # 按 Y 轴排序,确保多行文本顺序正确 all_found_texts.sort(key=lambda x: x[0]) found = " ".join([t[1] for t in all_found_texts]) logger.info(f"✨ [Async OCR] 在 Y={target_y} 附近找到转换文字: {found}") if not found: logger.warning(f"⚠️ [Async OCR] 未能在 Y={target_y} 附近找到已转换文字") return target_y, found except Exception as e: logger.error(f"❌ [Async OCR] 任务执行失败: {e}") return target_y, None # 创建并保存任务 task = asyncio.create_task(_async_ocr_task(peek_shot, vy)) # 我们需要一个列表来保存任务,这里临时利用 list if not hasattr(analyze_chat_image, "_ocr_tasks"): analyze_chat_image._ocr_tasks = [] analyze_chat_image._ocr_tasks.append(task) # 2. 还原状态 (取消转文字) # 注意:由于 OCR 还没出结果,我们无法精确定位展开后的文字位置 # 但通常点击原语音气泡位置 (vx, vy) 也能触发菜单 if restore_processed_voice: logger.info("准备还原状态 (取消转文字)...") d.long_click(int(vx), int(vy), 1.0) # 确保坐标为原生 int logger.info("正在快速寻找'隐藏文字'按钮...") cancel_template = os.path.join(TEMPLATE_DIR, "cancel_zhuan_wen_zi.jpg") cancel_btn = None poll_start = time.time() while time.time() - poll_start < 3.0: restore_menu_shot = get_next_debug_path("step_restore_poll") if restore_menu_shot: d.screenshot(restore_menu_shot) cancel_btn = find_template_match(restore_menu_shot, cancel_template, threshold=0.7) else: # 调试模式关闭时,直接在内存中匹配 (使用 format='opencv' 提高效率) cancel_btn = find_template_match(d.screenshot(format='opencv'), cancel_template, threshold=0.7) if cancel_btn: break time.sleep(0.2) if cancel_btn: cx, cy = int(cancel_btn[0]), int(cancel_btn[1]) logger.info(f"✅ 找到'隐藏文字'按钮: ({cx}, {cy}),点击还原...") safe_device_click(d, cx, cy) convert_closed_count += 1 time.sleep(2.0) # 等待收起动画 else: logger.warning("❌ 未找到'隐藏文字'按钮,无法还原状态!(后续可能导致重复处理)") # 3. 准备下一次循环 if len(target_voices) == 1: logger.info("✅ 当前屏幕所有目标语音已处理完毕,无需再次全屏扫描。") final_messages = messages # 使用本轮初始扫描的消息列表 break # 重新截图,因为界面可能微调,或者只是恢复了 next_screenshot = get_next_debug_path("step_restored") d.screenshot(next_screenshot) current_image_path = next_screenshot current_output_path = get_next_debug_path("flag_restored") continue else: logger.info("⏩ [配置] 跳过还原状态步骤 (保持文字展开)。") # 即使不还原,我们也不建议继续处理下一条,因为界面已经大幅变动(展开了文字)。 # 除非我们重新截图并重新定位。 # 但在这里,如果 restore_processed_voice=False,通常意味着我们只关心最后一条(LAST策略),或者我们接受界面变动。 # 为了安全起见,如果不还原,我们最好终止循环(假设只处理这一条,或者下一轮主循环再处理其他的) # 否则后续的 target_voices 坐标全都不准了。 logger.info("🛑 因不还原状态,终止本轮多语音处理循环,等待下一次主监控循环。") final_messages = messages # 这里的 messages 其实是展开前的,但没关系,我们的内容通过 captured_voice_contents 注入 break else: logger.warning("❌ 未找到'转文字'按钮,可能是已转换或误判") # 即使失败,也已记录在 processed_y_coords 中,避免死循环 # 继续尝试下一条语音 logger.info("跳过当前语音,继续扫描...") continue # 循环结束后,等待所有异步 OCR 任务完成 if hasattr(analyze_chat_image, "_ocr_tasks") and analyze_chat_image._ocr_tasks: logger.info(f"等待 {len(analyze_chat_image._ocr_tasks)} 个异步 OCR 任务完成...") results = await asyncio.gather(*analyze_chat_image._ocr_tasks) for y, content in results: if content: captured_voice_contents[y] = content logger.info(f"✅ [Async OCR] 异步获取到语音内容 (y={y}): {content}") # 清空任务列表 analyze_chat_image._ocr_tasks = [] # 循环结束,返回最后一次分析的结果 if not final_messages: # 如果循环因为 max_loops 退出,确保有结果 final_messages = messages # 注入 peek 到的内容 if captured_voice_contents: logger.info(f"正在注入 {len(captured_voice_contents)} 条已还原的语音内容...") for m in final_messages: if m['type'] == 'voice' and (not m.get('content') or m.get('content').strip() == ""): for py, content in captured_voice_contents.items(): # 注入时的容差也要放大,因为 final_messages 的 Y 可能和点击时的 vy 略有差异 if abs(m['y'] - py) < 100: m['content'] = content m['is_converted'] = True # 标记为逻辑上已转换 logger.info(f" -> 注入内容到 Y={m['y']} (原 py={py}): {content[:20]}...") break # 构造返回值 dialogue_log = [] # 使用 debug_img 的尺寸,如果 debug_img 未定义(极端情况),默认 1080x1920 if 'debug_img' in locals() and debug_img is not None: # [User Requested] 几何兜底 Y 轴应为 0.88 (避开底部导航条) input_field_coordinates = (debug_img.shape[1] // 2, int(debug_img.shape[0] * 0.88)) else: # 尝试读取 current_image_path try: tmp_img = cv2.imread(current_image_path) input_field_coordinates = (tmp_img.shape[1] // 2, int(tmp_img.shape[0] * 0.88)) except: input_field_coordinates = (540, 1690) # 1920 * 0.88 # 找出最后一条消息 last_msg = None if final_messages: final_messages.sort(key=lambda x: x['y']) last_msg = final_messages[-1] # 转换为 dialogue_log 格式 (简单转换,具体业务逻辑在调用方处理) # 注意:T2 需要的是上下文列表 pass # 实际上 T2 使用的是 LLM 上下文构建,这里不需要转换成特定 dict 结构, # 但为了兼容旧接口,我们还是返回 messages 列表给调用者处理, # 或者在这里处理成 (role, content) 列表? # 原代码似乎没有做太多转换,而是直接返回 messages 列表? # 仔细看原代码:analyze_chat_image 并没有返回 messages 列表! # 它返回 dialogue_log, input_pos # 原代码 lines 339-340: dialogue_log = [] # 可以在最后统一生成 # 统一生成 dialogue_log for msg in final_messages: # 尝试注入异步获取的语音内容 if msg['type'] == 'voice': # 模糊匹配 Y 坐标 (增大容差到 100,应对界面滚动) # 优先检查 content 是否为空或为 placeholder if not msg.get('content') or msg.get('content').strip() == "": for y_key, content in captured_voice_contents.items(): if abs(msg['y'] - y_key) < 100: msg['is_converted'] = True msg['content'] = content logger.info(f"✅ [注入] 成功将异步语音内容 '{content}' 注入到 Y={msg['y']} 的消息中") break # 无论是否有内容,都加入 dialogue_log if msg['type'] == 'text': if msg.get('content'): # 文本消息没内容通常是识别错误,可以丢弃 dialogue_log.append(msg) elif msg['type'] == 'voice': # 语音消息即使没内容也保留,交给上层处理 dialogue_log.append(msg) logger.info(f"📊 [统计] 语音总数: {total_voices_count}, 打开转文字次数: {convert_opened_count}, 关闭转文字次数: {convert_closed_count}") return dialogue_log, input_field_coordinates except Exception as e: logger.error(f"分析过程发生异常: {e}", exc_info=True) return [], (540, 1690) def clean_screenshots_dir(): """清理截图目录""" if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) return for f in os.listdir(OUTPUT_DIR): if f.lower().endswith(('.jpg', '.png', '.jpeg')): try: os.remove(os.path.join(OUTPUT_DIR, f)) except Exception as e: logger.warning(f"Failed to delete {f}: {e}") def is_in_chat_interface(d): """ 检查是否在微信聊天界面 """ try: # 1. 底部语音/键盘切换按钮 if d(description="切换到语音").exists or d(description="切换到键盘").exists: return True # 2. 底部输入框 if d(className="android.widget.EditText").exists: return True # 3. 底部“按住说话”按钮 if d(text="按住说话").exists: return True # 4. 右上角更多按钮 if d(description="聊天信息").exists: return True except Exception as e: logger.warning(f"is_in_chat_interface check failed: {e}") return False def find_input_box_center(image_path): """ 寻找输入框中心坐标 (兜底策略) 优先使用几何特征 (底部 88% 处) """ try: if not os.path.exists(image_path): return (540, 2100), None img = cv2.imread(image_path) if img is None: return (540, 2100), None h, w = img.shape[:2] # 策略:直接返回屏幕底部 88% 处的中心点 center_x = int(w * 0.5) center_y = int(h * 0.88) logger.info(f"find_input_box_center fallback: ({center_x}, {center_y})") return (center_x, center_y), None except Exception as e: logger.error(f"find_input_box_center error: {e}") return (540, 2100), None def find_template_match(screen_input, template_path, threshold=0.8): """ 使用 OpenCV 模板匹配寻找按钮中心坐标 :param screen_input: 可以是文件路径 (str) 或 OpenCV 图像 (numpy.ndarray) :param template_path: 模板文件路径 :param threshold: 匹配阈值 """ try: if not os.path.exists(template_path): logger.error(f"Template file not found: {template_path}") return None # 处理输入图像 if isinstance(screen_input, str): img = cv2.imread(screen_input) elif isinstance(screen_input, np.ndarray): img = screen_input else: # 尝试处理 PIL Image (uiautomator2 默认返回) try: img = cv2.cvtColor(np.array(screen_input), cv2.COLOR_RGB2BGR) except Exception: logger.error(f"Invalid screen_input type: {type(screen_input)}") return None template = cv2.imread(template_path) if img is None or template is None: return None h, w = template.shape[:2] res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED) min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res) if max_val >= threshold: center_x = max_loc[0] + w // 2 center_y = max_loc[1] + h // 2 logger.info(f"Template matched! Score: {max_val:.2f}, Center: ({center_x}, {center_y})") return (center_x, center_y) logger.info(f"Template not matched. Max score: {max_val:.2f}") return None except Exception as e: logger.error(f"Template matching failed: {e}") return None def find_all_template_matches(screen_path, template_path, threshold=0.8): """ 使用 OpenCV 模板匹配寻找**所有**符合条件的坐标 """ try: if not os.path.exists(template_path): logger.error(f"Template file not found: {template_path}") return [] img = cv2.imread(screen_path) template = cv2.imread(template_path) if img is None or template is None: return [] h, w = template.shape[:2] res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED) # 记录最大匹配度,方便调试阈值 min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res) logger.info(f"模板匹配 {os.path.basename(template_path)}: 最大相似度 = {max_val:.4f} (阈值={threshold})") # 找到所有大于阈值的点 loc = np.where(res >= threshold) points = [] for pt in zip(*loc[::-1]): # Switch collumns and rows center_x = pt[0] + w // 2 center_y = pt[1] + h // 2 points.append((center_x, center_y)) # 简单的去重(非极大值抑制的简化版,合并相近的点) # 这里假设红点不会重叠,暂时直接返回,或者做一个简单的聚类 # 实际应用中,matchTemplate 对同一个目标周围可能会有多个连续的匹配点 # 我们需要合并它们 unique_points = [] for p in points: is_close = False for up in unique_points: if abs(p[0] - up[0]) < 10 and abs(p[1] - up[1]) < 10: is_close = True break if not is_close: unique_points.append(p) if unique_points: logger.info(f"Found {len(unique_points)} matches for {os.path.basename(template_path)}") return unique_points except Exception as e: logger.error(f"find_all_template_matches failed: {e}") return [] def perform_input_action(d, center_point, text, auto_send=True, debug_prefix=None): """ 执行输入操作 :param debug_prefix: 如果提供,将在关键步骤保存截图,如 {debug_prefix}_before_mode.jpg """ try: def save_debug_shot(name): if debug_prefix: shot_path = os.path.join(OUTPUT_DIR, f"{debug_prefix}_{name}.jpg") d.screenshot(shot_path) logger.info(f"保存中间过程截图: {shot_path}") # --- 新增逻辑:确保处于文字输入模式 --- logger.info("正在检查输入模式...") save_debug_shot("1_check_mode") # 优先使用 uiautomator2 的属性检测(比图像识别更稳) # 1. 检查是否有 "切换到键盘" 按钮(说明当前是语音模式) voice_mode_btn = d(description="切换到键盘") if voice_mode_btn.exists: logger.info("检测到语音模式 (UI树: '切换到键盘'),点击切换...") voice_mode_btn.click() time.sleep(1.0) # 等待 UI 切换 # 2. 检查是否有 "切换到语音" 按钮(说明当前是文字模式) # 这一步不是必须的,但可以用来确认状态 # text_mode_btn = d(description="切换到语音") # if text_mode_btn.exists: # logger.info("当前已是文字模式 (UI树: '切换到语音')") # 3. 如果 UI 树检测失败,尝试图像兜底 if not voice_mode_btn.exists: tmp_check_shot = os.path.join(OUTPUT_DIR, "temp_input_check.jpg") d.screenshot(tmp_check_shot) wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg") # 检查是否存在 '切换到文字' 图标 wen_zi_pos = find_template_match(tmp_check_shot, wen_zi_template, threshold=0.8) if wen_zi_pos: logger.info(f"检测到语音模式 (图像: 找到切换文字图标: {wen_zi_pos}),点击切换...") d.click(wen_zi_pos[0], wen_zi_pos[1]) time.sleep(1.0) # 清理临时文件 if os.path.exists(tmp_check_shot): try: os.remove(tmp_check_shot) except: pass # --- 新增逻辑结束 --- save_debug_shot("2_after_mode") # 1. 尝试找到原生输入框并输入 # 增加多种查找方式 edit_text = d(className="android.widget.EditText") if not edit_text.exists: # 尝试通过 resourceId 查找 (微信常见ID) edit_text = d(resourceId="com.tencent.mm:id/b4a") # 1.2 [User Request] 尝试使用 input_text.jpg 模板寻找输入框 if not edit_text.exists: input_template_path = os.path.join(TEMPLATE_DIR, "input_text.jpg") if os.path.exists(input_template_path): # 截图用于匹配 tmp_input_search = os.path.join(OUTPUT_DIR, "temp_input_search.jpg") d.screenshot(tmp_input_search) logger.info(f"正在尝试使用模板 {input_template_path} 寻找输入框...") # [User Request] 降低阈值到 0.6 input_pos = find_template_match(tmp_input_search, input_template_path, threshold=0.6) if input_pos: logger.info(f"✅ [Template] 通过 input_text.jpg 找到输入框: {input_pos}") save_debug_shot("3_input_box_found") # 绘制调试图 (蓝框) try: debug_img = cv2.imread(tmp_input_search) if debug_img is not None: # 读取模板获取宽高 tmpl = cv2.imread(input_template_path) if tmpl is not None: th, tw = tmpl.shape[:2] cx, cy = input_pos top_left = (cx - tw//2, cy - th//2) bottom_right = (cx + tw//2, cy + th//2) # 蓝色框 BGR=(255, 0, 0) cv2.rectangle(debug_img, top_left, bottom_right, (255, 0, 0), 3) cv2.putText(debug_img, "MATCH: input_text.jpg", (top_left[0], top_left[1]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2) debug_save_path = os.path.join(OUTPUT_DIR, "debug_input_box_match.jpg") cv2.imwrite(debug_save_path, debug_img) logger.info(f"已保存输入框匹配调试图(蓝框): {debug_save_path}") except Exception as e: logger.warning(f"绘制输入框调试图失败: {e}") # 更新点击坐标 center_point = input_pos else: logger.info(f"❌ [Template] input_text.jpg 未匹配到输入框") # 1.5 如果找不到原生输入框,尝试通过“切换到语音”按钮定位 Y 轴 # 输入框通常与左侧的“切换到语音”按钮垂直居中对齐 if not edit_text.exists: try: # 确保在文字模式下,左侧会有“切换到语音”按钮 # 有时候可能是 "切换到键盘" (如果状态判断出错),都尝试一下作为锚点 anchor_btn = d(description="切换到语音") if not anchor_btn.exists: anchor_btn = d(description="切换到键盘") if anchor_btn.exists: # 获取按钮中心 Y 坐标 bounds = anchor_btn.info['bounds'] anchor_y = (bounds['top'] + bounds['bottom']) // 2 # 获取屏幕宽度 w, h = d.window_size() # 更新中心点:X居中,Y与按钮对齐 center_point = (w // 2, anchor_y) logger.info(f"通过'切换到语音'按钮修正输入框坐标: {center_point}") except Exception as e: logger.warning(f"尝试修正坐标失败: {e}") input_success = False if edit_text.exists: logger.info("Found native EditText, using set_text") try: edit_text.click() time.sleep(0.5) edit_text.set_text(text) input_success = True except Exception as e: logger.warning(f"Native input failed: {e}") # 2. 如果原生输入失败,使用坐标点击 + 粘贴/输入 if not input_success: cx, cy = center_point logger.info(f"Using coordinate input: {center_point}") d.click(cx, cy) time.sleep(1.0) try: d.send_keys(text) except Exception: logger.warning("send_keys failed, trying set_clipboard") d.set_clipboard(text) d.click(cx, cy) time.sleep(0.5) # 尝试粘贴 d.press("paste") save_debug_shot("4_after_input") time.sleep(1.0) # 3. 发送 if auto_send: # 优先使用模板匹配寻找“发送”按钮 logger.info("尝试使用模板匹配寻找'发送'按钮...") tmp_screen = os.path.join(os.path.dirname(os.path.abspath(__file__)), "temp_send_check.jpg") d.screenshot(tmp_screen) # 使用相对路径 template_path = os.path.join(TEMPLATE_DIR, "send.jpg") send_btn_pos = find_template_match(tmp_screen, template_path, threshold=0.7) # 稍微降低阈值以提高召回 if send_btn_pos: logger.info(f"通过模板匹配找到发送按钮: {send_btn_pos}, 点击...") d.click(send_btn_pos[0], send_btn_pos[1]) else: logger.warning("模板匹配未找到发送按钮,尝试原生控件查找...") if d(text="发送").exists: d(text="发送").click() logger.info("Clicked '发送'") else: d.press("enter") logger.info("Pressed Enter") save_debug_shot("5_after_send") # 清理临时文件 if os.path.exists(tmp_screen): try: os.remove(tmp_screen) except: pass return True except Exception as e: logger.error(f"perform_input_action error: {e}") return False def perform_voice_input(d, duration=3.0, debug_prefix=None): """ 执行语音输入操作 (长按说话) 1. 检查是否在语音模式 (寻找 press_say.jpg) 2. 如果不在,尝试点击 keyboard.jpg 或 audio_reply.jpg 切换 3. 长按 press_say.jpg,并进行计时记录 """ try: def save_debug_shot(name): if debug_prefix: shot_path = os.path.join(OUTPUT_DIR, f"{debug_prefix}_{name}.jpg") d.screenshot(shot_path) logger.info(f"保存中间过程截图: {shot_path}") save_debug_shot("voice_1_check") # 模板路径 press_say_template = os.path.join(TEMPLATE_DIR, "press_say.jpg") keyboard_template = os.path.join(TEMPLATE_DIR, "keyboard.jpg") audio_reply_template = os.path.join(TEMPLATE_DIR, "audio_reply.jpg") # 1. 检查当前模式 tmp_screen = os.path.join(OUTPUT_DIR, "temp_voice_check.jpg") d.screenshot(tmp_screen) press_say_pos = find_template_match(tmp_screen, press_say_template, threshold=0.8) if press_say_pos: logger.info(">>> [状态] 当前已是语音模式 (找到 '按住说话' 按钮)") need_switch = False else: logger.info(">>> [状态] 当前可能是键盘模式 (未找到 '按住说话' 按钮)") need_switch = True if need_switch: logger.info(">>> [切换] 需要进行模式切换...") # 2. 尝试点击键盘图标或音频图标切换模式 switch_pos = find_template_match(tmp_screen, keyboard_template, threshold=0.8) if not switch_pos: switch_pos = find_template_match(tmp_screen, audio_reply_template, threshold=0.8) if switch_pos: logger.info(f">>> [切换] 找到切换按钮 {switch_pos},正在点击切换...") d.click(switch_pos[0], switch_pos[1]) time.sleep(1.5) # 稍微增加等待时间确保切换完成 # 验证切换是否成功 d.screenshot(tmp_screen) press_say_pos = find_template_match(tmp_screen, press_say_template, threshold=0.8) if press_say_pos: logger.info(">>> [切换] 成功完成切换,进入语音模式") else: logger.error(">>> [切换] 切换操作已执行,但仍未找到 '按住说话' 按钮,切换可能失败") else: logger.warning(">>> [切换] 未找到切换按钮 (keyboard.jpg/audio_reply.jpg),无法切换") # 3. 执行长按 if press_say_pos: x, y = press_say_pos logger.info(f">>> [发送] 开始按住发送语音按钮 ({x}, {y})") save_debug_shot("voice_2_before_hold") # 开始计时长按 d.touch.down(x, y) start_time = time.time() last_second = 0 while time.time() - start_time < duration: elapsed = int(time.time() - start_time) + 1 if elapsed > last_second and elapsed <= duration: logger.info(f">>> [计时] {elapsed}") last_second = elapsed time.sleep(0.1) d.touch.up(x, y) logger.info(f">>> [完成] 已完成指定时长 ({duration}s) 的按住按钮发送语音") save_debug_shot("voice_3_after_hold") if os.path.exists(tmp_screen): try: os.remove(tmp_screen) except: pass return True else: logger.error(">>> [失败] 最终未能定位到发送语音按钮") if os.path.exists(tmp_screen): try: os.remove(tmp_screen) except: pass return False except Exception as e: logger.error(f"perform_voice_input error: {e}") return False def switch_to_keyboard_mode(d): """ 强制切换到键盘/文本模式 """ try: logger.info(">>> [模式] 尝试切换到键盘模式...") # 1. 尝试 UI 树 voice_mode_btn = d(description="切换到键盘") if voice_mode_btn.exists: logger.info(">>> [模式] 检测到语音模式按钮,点击切换到键盘...") voice_mode_btn.click() time.sleep(1.0) return True # 2. 尝试图像匹配 (wen_zi_input.jpg) tmp_screen = os.path.join(OUTPUT_DIR, "temp_switch_kb.jpg") d.screenshot(tmp_screen) wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg") wen_zi_pos = find_template_match(tmp_screen, wen_zi_template, threshold=0.8) if wen_zi_pos: logger.info(f">>> [模式] 找到切换文字图标 {wen_zi_pos},点击切换...") d.click(wen_zi_pos[0], wen_zi_pos[1]) time.sleep(1.0) return True logger.info(">>> [模式] 当前可能已经是键盘模式,或未找到切换按钮") return False except Exception as e: logger.error(f"switch_to_keyboard_mode error: {e}") return False def check_is_chat_interface(screenshot_path): """ 检查当前是否在聊天界面 通过匹配 'audio_reply.jpg' (语音图标) 或 'keyboard.jpg' (键盘图标) 来判断 """ audio_reply_template = os.path.join(TEMPLATE_DIR, "audio_reply.jpg") keyboard_template = os.path.join(TEMPLATE_DIR, "keyboard.jpg") # 检查语音图标 if match_template_center(screenshot_path, audio_reply_template, threshold=0.8): logger.info("✅ 检测到语音回复图标,确认处于聊天界面") return True # 检查键盘图标 if match_template_center(screenshot_path, keyboard_template, threshold=0.8): logger.info("✅ 检测到键盘输入图标,确认处于聊天界面") return True logger.warning("⚠️ 未检测到聊天界面特征图标,当前可能不在聊天页面") return False def match_template_center(image_input, template_path, threshold=0.8): """ 使用 OpenCV 模板匹配寻找目标图片中心坐标 :param image_input: 可以是文件路径 (str) 或 OpenCV 图像 (numpy.ndarray) """ try: if not os.path.exists(template_path): logger.error(f"Template not found: {template_path}") return None # 处理输入图像 if isinstance(image_input, str): if not os.path.exists(image_input): logger.error(f"Image file not found: {image_input}") return None img = cv2.imread(image_input) elif isinstance(image_input, np.ndarray): img = image_input else: # 尝试处理 PIL Image try: img = cv2.cvtColor(np.array(image_input), cv2.COLOR_RGB2BGR) except Exception: logger.error(f"Invalid image_input type: {type(image_input)}") return None template = cv2.imread(template_path) if img is None or template is None: logger.error("Failed to read image or template") return None # 转换为灰度图进行匹配 img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY) # 模板匹配 result = cv2.matchTemplate(img_gray, template_gray, cv2.TM_CCOEFF_NORMED) min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) if max_val >= threshold: h, w = template_gray.shape top_left = max_loc center_x = int(top_left[0] + w / 2) center_y = int(top_left[1] + h / 2) logger.info(f"Template matched with confidence {max_val:.2f} at ({center_x}, {center_y})") return (center_x, center_y) else: logger.warning(f"Template match failed. Max confidence: {max_val:.2f} < Threshold: {threshold}") return None except Exception as e: logger.error(f"match_template_center error: {e}") return None