diff --git a/WeiXin/T2_ChatMonitor.py b/WeiXin/T2_ChatMonitor.py new file mode 100644 index 0000000..0cd3e41 --- /dev/null +++ b/WeiXin/T2_ChatMonitor.py @@ -0,0 +1,133 @@ +# coding=utf-8 +import os +import sys +import time +import logging +import asyncio + +# 添加项目根目录到 sys.path +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if project_root not in sys.path: + sys.path.append(project_root) + +from WeiXin import WxUtil + +# 配置日志 +log_dir = WxUtil.LOG_DIR +if not os.path.exists(log_dir): + os.makedirs(log_dir) + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(os.path.join(log_dir, "T2_ChatMonitor.log"), encoding='utf-8'), + logging.StreamHandler() + ] +) +logger = logging.getLogger("T2_ChatMonitor") + +class CVDebugTask: + """ + 结构化的 CV 语音调试任务,支持分步执行和单元测试 + """ + def __init__(self): + self.device = None + self.screenshot_path = os.path.join(WxUtil.OUTPUT_DIR, "T2_ChatMonitor_live_shot.jpg") + self.debug_view_path = os.path.join(WxUtil.OUTPUT_DIR, "T2_ChatMonitor_debug_view.jpg") + self.dialogue_log = [] + self.input_pos = None + + def step_1_prepare_env(self): + """步骤1: 环境准备""" + logger.info("--- [Step 1] 环境准备 ---") + WxUtil.setup_script_environment() + return True + + def step_2_connect_device(self): + """步骤2: 连接设备""" + logger.info("--- [Step 2] 连接设备 ---") + self.device = WxUtil.connect_device() + if not self.device: + logger.error("❌ 设备连接失败,请检查手机是否连接且开启了调试模式") + return False + return True + + def step_3_capture_screen(self): + """步骤3: 捕获屏幕截图""" + logger.info("--- [Step 3] 捕获屏幕截图 ---") + try: + if not self.device: + logger.error("❌ 未连接设备,无法截图") + return False + self.device.screenshot(self.screenshot_path) + logger.info(f"✅ 截图已保存: {self.screenshot_path}") + return True + except Exception as e: + logger.error(f"❌ 截图失败: {e}") + return False + + async def step_4_analyze_and_process(self, use_existing_image=False): + """ + 步骤4: 分析图片并处理语音转换 + :param use_existing_image: 是否使用已有的图片进行离线测试 + """ + logger.info("--- [Step 4] 分析图片与语音处理 ---") + + target_img = self.screenshot_path + if use_existing_image: + if not os.path.exists(target_img): + logger.error(f"❌ 找不到指定的离线图片: {target_img}") + return False + logger.info(f"📂 正在使用离线图片进行测试: {target_img}") + + # 调用核心分析逻辑 + # 注意:即使是离线分析,WxUtil 内部也会尝试连接设备以进行长按操作 + self.dialogue_log, self.input_pos = await WxUtil.analyze_chat_image( + target_img, + self.debug_view_path, + device=self.device + ) + + if self.dialogue_log: + logger.info("✅ 任务处理完成,已生成对话日志") + return True + else: + logger.warning("⚠️ 未识别到任何有效的聊天内容") + return False + + def step_5_report_results(self): + """步骤5: 输出最终报告""" + logger.info("--- [Step 5] 结果汇总 ---") + if self.input_pos: + logger.info(f"📍 识别到输入框位置: {self.input_pos}") + + if self.dialogue_log: + logger.info("📋 最终对话内容提取结果已输出到控制台 (见上方横线区域)") + else: + logger.warning("❌ 无对话内容输出") + return True + +async def run_structured_debug(): + """ + 按步骤运行完整的调试任务 + """ + task = CVDebugTask() + + # 顺序执行各步骤 + if not task.step_1_prepare_env(): return + if not task.step_2_connect_device(): return + if not task.step_3_capture_screen(): return + + # 执行耗时的分析和处理步骤 + success = await task.step_4_analyze_and_process() + + if success: + task.step_5_report_results() + logger.info("✨ 调试任务全部顺利完成!") + else: + logger.error("❌ 调试任务在处理阶段失败") + +if __name__ == "__main__": + # 运行结构化的调试流程 + asyncio.run(run_structured_debug()) diff --git a/WeiXin/T4_CV_Voice_Debug.py b/WeiXin/T4_CV_Voice_Debug.py deleted file mode 100644 index 3a080cd..0000000 --- a/WeiXin/T4_CV_Voice_Debug.py +++ /dev/null @@ -1,72 +0,0 @@ -# coding=utf-8 -import os -import sys -import time - -import cv2 -import logging - -# 添加项目根目录到 sys.path -project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -if project_root not in sys.path: - sys.path.append(project_root) - -from WeiXin import WxUtil -from WeiXin.WxUtil import find_all_template_matches - -# 配置日志 -log_dir = WxUtil.LOG_DIR -if not os.path.exists(log_dir): - os.makedirs(log_dir) - -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(os.path.join(log_dir, "T4_CV_Voice_Debug.log"), encoding='utf-8'), - logging.StreamHandler() - ] -) -logger = logging.getLogger("T4_CV_Voice_Debug") - - -import asyncio - -async def run_cv_debug(): - # 运行前清理 Logs 和 Output - WxUtil.setup_script_environment() - - # 1. 拍照 (获取当前设备屏幕) - logger.info("📸 正在连接设备并截取屏幕...") - d = WxUtil.connect_device() - if not d: - return - - try: - screenshot_dir = WxUtil.OUTPUT_DIR - image_path = os.path.join(screenshot_dir, "t4_live_shot.jpg") - output_path = os.path.join(screenshot_dir, "T4_debug_view.jpg") - - d.screenshot(image_path) - logger.info(f"✅ 截图已保存: {image_path}") - except Exception as e: - logger.error(f"❌ 拍照失败: {e}") - return - - logger.info(f"🔍 正在调用 WxUtil.analyze_chat_image 分析最后一条消息...") - - # 2. 调用新的分析逻辑 - dialogue_log, input_pos = await WxUtil.analyze_chat_image(image_path, output_path, device=d) - - if dialogue_log: - logger.info("📢 识别到的最后一条消息:") - for line in dialogue_log: - logger.info(f" {line}") - else: - logger.warning("⚠️ 未识别到任何消息") - - if input_pos: - logger.info(f"📍 识别到输入框位置: {input_pos}") - -if __name__ == "__main__": - asyncio.run(run_cv_debug()) diff --git a/WeiXin/WxUtil.py b/WeiXin/WxUtil.py index 4658715..54e5782 100644 --- a/WeiXin/WxUtil.py +++ b/WeiXin/WxUtil.py @@ -58,6 +58,11 @@ def connect_device(): """ try: d = u2.connect() + # 强制检查连接是否可用 + if not d.info: + logger.error("设备连接不可用 (d.info is empty)") + return None + # 获取可靠的序列号 device_serial = d.serial if hasattr(d, 'serial') else "未知" logger.info(f"设备连接成功: {device_serial}") @@ -70,6 +75,24 @@ def connect_device(): logger.error(f"设备连接失败: {e}") return None +def safe_device_click(d, x, y): + """ + 安全的点击操作,包含简单的异常捕获和重试逻辑 + """ + try: + d.click(x, y) + return True + except Exception as e: + logger.warning(f"点击操作失败 ({x}, {y}): {e},尝试重新连接并重试...") + try: + # 尝试重新初始化连接 + new_d = u2.connect() + new_d.click(x, y) + return True + except Exception as e2: + logger.error(f"重试点击操作依然失败: {e2}") + return False + async def analyze_chat_image(image_path, output_path, device=None, target_name="对方"): """ 全面采用 CV + OCR 识别微信聊天截图中的最后一条消息 @@ -146,24 +169,40 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name=" }) # B. 添加文本消息 - # 简单策略:排除掉明显是系统时间、输入框或顶部标题的文字 for bbox, text, conf in ocr_results: c_x = int((bbox[0][0] + bbox[2][0]) / 2) c_y = int((bbox[0][1] + bbox[2][1]) / 2) - # 过滤区域 - if c_y < 150 or c_y > h - 250: + # 过滤区域 (顶部标题栏和底部输入栏) + # 底部输入栏通常在最后 150 像素左右 + if c_y < 150 or c_y > h - 150: continue - # 过滤掉单字(可能是头像旁边的文字或杂质)和某些系统词 - if len(text) < 1 and "昨天" not in text and "今天" not in text: + # 过滤掉明显的系统词 (通常是日期或时间) + # 匹配如: "2025年12月28日 11:18", "11:18", "昨天 09:26" 等 + # 增加对 OCR 误识别的容错 (如 28811:18) + time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)' + # 如果文本包含这些关键词且长度较短,或者是纯数字/标点组合 + if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)): + continue + + # 过滤掉语音时长标识 (如 "5\"", "10\"", "小8\"") + if re.match(r'^.?[0-9]{1,2}"?$', text.strip()): + continue + + # 过滤掉“撤回了一条消息”等系统提示 + if "撤回了一条消息" in text or "打招呼的消息" in text: continue - sender = "对方" if c_x < w / 2 else "我" + # 改进发送者判定:查看文本块的左边界 + # 对方的消息靠左,我的消息靠右 + left_x = bbox[0][0] + sender = "对方" if left_x < w * 0.3 else "我" + messages.append({ "type": "text", "sender": sender, - "content": text, + "content": text.strip(), "center": (c_x, c_y), "y": c_y }) @@ -186,11 +225,12 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name=" dialogue_log = [] input_field_coordinates = (w // 2, int(h * 0.9)) # 默认输入框位置 - # 7. 自动处理所有“红框 + NO”的语音消息 - unconverted_voices = [m for m in messages if m['type'] == 'voice' and m.get('is_unread') and not m.get('is_converted')] + # 7. 自动处理所有尚未转换的语音消息 + # 获取所有语音消息(不论已读未读,只要没转换成文字就处理) + unconverted_voices = [m for m in messages if m['type'] == 'voice' and not m.get('is_converted')] if unconverted_voices: - logger.info(f"发现 {len(unconverted_voices)} 条未转换的未读语音,开始处理...") + logger.info(f"发现 {len(unconverted_voices)} 条未转换的语音,开始处理...") for v_msg in unconverted_voices: vx, vy = int(v_msg['center'][0]), int(v_msg['center'][1]) @@ -206,78 +246,132 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name=" d.screenshot(menu_shot) zhuan_template = os.path.join(TEMPLATE_DIR, "zhun_wen_zi.jpg") - # 降低阈值到 0.7 以增加匹配成功率 btn_pos = find_template_match(menu_shot, zhuan_template, threshold=0.7) if btn_pos: btn_x, btn_y = int(btn_pos[0]), int(btn_pos[1]) logger.info(f"✅ 找到'转文字'按钮: ({btn_x}, {btn_y}),点击中...") - d.click(btn_x, btn_y) + safe_device_click(d, btn_x, btn_y) - # 等待转换完成 (根据语音长度,通常 3-5 秒足够) + # 等待转换完成 logger.info("等待语音转文字完成...") time.sleep(5.0) # C. 再次截图 OCR 获取转换后的文字 after_convert_shot = os.path.join(OUTPUT_DIR, f"after_auto_{vy}.jpg") - d.screenshot(after_convert_shot) - convert_ocr = ocr_kit.read_text(after_convert_shot) + try: + d.screenshot(after_convert_shot) + convert_ocr = ocr_kit.read_text(after_convert_shot) + except Exception as e: + logger.error(f"截图或 OCR 失败: {e}") + convert_ocr = [] - # 提取转换文字:寻找在语音图标下方的文字块 - converted_text = "" + # 提取转换文字(合并多行结果) + text_blocks = [] for c_bbox, c_text, c_conf in convert_ocr: cc_x = (c_bbox[0][0] + c_bbox[2][0]) / 2 cc_y = (c_bbox[0][1] + c_bbox[2][1]) / 2 - # 转换后的文字通常在语音图标下方 30-300 像素内,且水平位置相近 - if 30 < cc_y - vy < 300 and abs(cc_x - vx) < 250: - converted_text = c_text - break + # 扩大搜索范围,适应更长的转换结果 + # 增加 sender 判断 (通过水平位置判定) + c_left_x = c_bbox[0][0] + c_sender = "对方" if c_left_x < w * 0.3 else "我" + if 30 < cc_y - vy < 600 and abs(cc_x - vx) < 400 and c_sender == v_msg['sender']: + text_blocks.append((cc_y, c_text)) + + # 按 Y 坐标排序并合并 + text_blocks.sort(key=lambda x: x[0]) + converted_text = "".join([t[1] for t in text_blocks]) if converted_text: - logger.info(f"✨ OCR 识别成功!") - print(f"\n[语音转文字结果]: {converted_text}\n") - # 同步到消息对象 + logger.info(f"✨ OCR 识别成功: {converted_text}") v_msg['content'] = converted_text v_msg['is_converted'] = True - # 如果这条消息也是最后一条消息,更新 dialogue_log 需要的内容 - if v_msg == last_msg: - last_msg['content'] = converted_text else: logger.warning("❌ OCR 未能提取到转换后的文字内容") # D. 长按并点击“取消转文字”恢复界面 - logger.info("正在恢复界面状态 (点击'取消转文字')...") - d.long_click(vx, vy, 1.5) - time.sleep(1.0) - cancel_shot = os.path.join(OUTPUT_DIR, f"cancel_menu_{vy}.jpg") - d.screenshot(cancel_shot) - cancel_template = os.path.join(TEMPLATE_DIR, "cancel_zhuan_wen_zi.jpg") - cancel_btn = find_template_match(cancel_shot, cancel_template, threshold=0.7) - - if cancel_btn: - c_btn_x, c_btn_y = int(cancel_btn[0]), int(cancel_btn[1]) - d.click(c_btn_x, c_btn_y) - logger.info(f"✅ 已点击'取消转文字' ({c_btn_x}, {c_btn_y}),界面已恢复") - else: - # 兜底:点击语音图标右侧空白处尝试关闭菜单 - logger.warning("⚠️ 未找到'取消转文字'按钮,尝试点击空白处关闭菜单") - d.click(vx + 300, vy) + try: + logger.info("正在恢复界面状态 (点击'取消转文字')...") + d.long_click(vx, vy, 1.5) + time.sleep(1.0) + cancel_shot = os.path.join(OUTPUT_DIR, f"cancel_menu_{vy}.jpg") + d.screenshot(cancel_shot) + cancel_template = os.path.join(TEMPLATE_DIR, "cancel_zhuan_wen_zi.jpg") + cancel_btn = find_template_match(cancel_shot, cancel_template, threshold=0.7) + + if cancel_btn: + c_btn_x, c_btn_y = int(cancel_btn[0]), int(cancel_btn[1]) + safe_device_click(d, c_btn_x, c_btn_y) + logger.info(f"✅ 已点击'取消转文字' ({c_btn_x}, {c_btn_y}),界面已恢复") + else: + logger.warning("⚠️ 未找到'取消转文字'按钮,尝试点击空白处关闭菜单") + safe_device_click(d, vx + 300, vy) + except Exception as e: + logger.error(f"恢复界面状态时发生错误: {e}") else: - logger.warning("❌ 未能找到'转文字'按钮,可能长按失败或模板不匹配") - # 尝试点击空白处退出菜单 - d.click(vx + 300, vy) + logger.warning("❌ 未能找到'转文字'按钮,点击空白处退出") + safe_device_click(d, vx + 300, vy) - # 8. 整合对话日志 (仅针对最后一条消息进行反馈) + # 8. 重新排序并生成完整的对话日志 + # 先合并已经处理好的语音消息内容 + # 排除掉转换文字本身产生的 OCR 文本干扰(如果 OCR 识别结果包含在文本消息中,需要过滤) + final_messages = [] + # 1. 识别并归档所有属于语音转换出来的文字 + for v_msg in messages: + if v_msg['type'] == 'voice': + vx, vy = v_msg['center'] + v_content_blocks = [] + # 找出所有在语音图标下方且水平相近的文本块,且发送者一致 + for msg in messages: + if msg['type'] == 'text': + cx, cy = msg['center'] + # 1. 垂直距离在合理范围内 (30 到 600 像素) + # 2. 发送者一致 (确保归属正确) + # 3. 水平偏移在合理范围内 (对于对方,cx 应该在左侧;对于我,cx 应该在右侧) + if 30 < cy - vy < 600 and msg['sender'] == v_msg['sender']: + # 进一步检查水平位置,确保文字在语音图标的大致垂直线上或稍有偏移 + if abs(cx - vx) < 400: + v_content_blocks.append(msg) + msg['is_voice_part'] = True + + # 如果有内容块,按 Y 排序并合并 + if v_content_blocks: + v_content_blocks.sort(key=lambda x: x['y']) + combined_content = "".join([m['content'] for m in v_content_blocks]) + v_msg['content'] = combined_content + v_msg['is_converted'] = True + + # 2. 收集最终要显示的消息(排除被标记为语音部分的文本) + for msg in messages: + if msg['type'] == 'text': + if not msg.get('is_voice_part', False): + final_messages.append(msg) + else: + final_messages.append(msg) + + # 按 Y 坐标排序 + final_messages.sort(key=lambda x: x['y']) + + # 格式化输出到控制台 + print("\n" + "="*50) + print(" --- 微信聊天记录提取结果 ---") + print("="*50) + dialogue_log = [] - if last_msg['type'] == 'voice': - # 优先使用刚才转文字得到的内容 - content = last_msg.get('content') or "[语音]" - dialogue_log.append(f"{last_msg['sender']}: {content}") - else: - dialogue_log.append(f"{last_msg['sender']}: {last_msg['content']}") + for msg in final_messages: + sender = msg['sender'] + content = msg.get('content') or (msg.get('text') if 'text' in msg else "[未识别内容]") + if msg['type'] == 'voice': + content = f"[语音] {content}" + + log_line = f"{sender}: {content}" + dialogue_log.append(log_line) + print(log_line) + + print("="*50 + "\n") return dialogue_log, input_field_coordinates - + except Exception as e: logger.error(f"analyze_chat_image 失败: {e}", exc_info=True) return [], None diff --git a/WeiXin/__pycache__/WxUtil.cpython-310.pyc b/WeiXin/__pycache__/WxUtil.cpython-310.pyc index bc5fda6..2eaf8b8 100644 Binary files a/WeiXin/__pycache__/WxUtil.cpython-310.pyc and b/WeiXin/__pycache__/WxUtil.cpython-310.pyc differ