diff --git a/WeiXin/T2_ChatMonitor.py b/WeiXin/T2_ChatMonitor.py index cc0e2ed..dc32c34 100644 --- a/WeiXin/T2_ChatMonitor.py +++ b/WeiXin/T2_ChatMonitor.py @@ -179,16 +179,14 @@ class ChatMonitorBot: logger.info("\n" + "="*50) logger.info("【测试模式】最终提取的对话记录:") for msg in self.dialogue_log: - # 格式化输出:[发送者] 内容 (类型) sender = msg.get('sender', '未知') content = msg.get('content', '') - msg_type = "语音" if msg.get('type') == 'voice' else "文字" + time_str = msg.get('time_display', '') - # 按照用户要求的格式输出 - logger.info(f"说话人: {sender}") - logger.info(f"消息类型: {msg_type}") - logger.info(f"消息内容: {content}") - logger.info("-" * 20) + # 按照用户要求的格式输出: 2026-01-26 10:03 糖豆爸爸 : 老师您好! + log_prefix = f"{time_str} " if time_str else "" + log_line = f"{log_prefix}{sender} : {content}" + logger.info(log_line) logger.info("="*50 + "\n") # --- LLM 总结 --- @@ -198,7 +196,9 @@ class ChatMonitorBot: sender = msg.get('sender', '未知') content = msg.get('content', '') type_str = "[语音]" if msg.get('type') == 'voice' else "[文字]" - chat_history_text += f"{sender}{type_str}: {content}\n" + time_str = msg.get('time_display', '') + time_prefix = f"[{time_str}] " if time_str else "" + chat_history_text += f"{time_prefix}{sender}{type_str}: {content}\n" prompt = ( "请根据以下微信对话记录,总结归纳双方交流的主要信息点。\n" @@ -232,7 +232,7 @@ class ChatMonitorBot: logger.info(f"💡 [首屏] 最后一条消息来自 '{sender}',尝试生成回复...") # 构建上下文 - context_text = "\n".join([f"{m.get('sender')}: {m.get('content')}" for m in self.dialogue_log[:-1]]) + context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in self.dialogue_log[:-1]]) last_content = last_msg.get('content', '') reply = await self.get_reply(last_content, context_text) @@ -324,8 +324,31 @@ class ChatMonitorBot: logger.info(f"💡 [监控] 发现新消息: {last_msg},保存现场截图: {event_shot}") # 获取上下文文本 (格式化为 Sender: Content) - context_text = "\n".join([f"{m.get('sender')}: {m.get('content')}" for m in dialogue_log[:-1]]) - last_content = last_msg.get('content', '') + context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in dialogue_log[:-1]]) + last_content = last_msg.get('content') or "" + + # 兜底逻辑:如果最后一条是语音且内容为空(可能因无红点未被 UNREAD 策略处理),尝试强制转换 + if last_msg.get('type') == 'voice' and not last_content.strip(): + logger.info("⚠️ [监控] 最后一条语音消息未获取到内容(可能已读无红点),尝试强制转换...") + # 强制使用 LAST 策略重试 + dialogue_log_retry, _ = await WxUtil.analyze_chat_image( + self.screenshot_path, + self.debug_view_path, + device=self.device, + process_strategy="LAST" + ) + if dialogue_log_retry: + # 更新引用 + self.dialogue_log = dialogue_log_retry + dialogue_log = dialogue_log_retry + last_msg = dialogue_log[-1] + last_content = last_msg.get('content') or "" + logger.info(f"🔄 [重试] 强制转换后内容: {last_content}") + + # 重新构建 msg_str 和 hash,确保下次循环不会因为内容变化而再次触发(虽然这里已经处理了) + # 但实际上这里是在处理当前事件,更新 hash 是为了避免重复处理 + msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer) + current_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest() # 生成回复 reply = await self.get_reply(last_content, context_text) diff --git a/WeiXin/WxUtil.py b/WeiXin/WxUtil.py index ce1c05d..54ef86d 100644 --- a/WeiXin/WxUtil.py +++ b/WeiXin/WxUtil.py @@ -15,7 +15,7 @@ if project_root not in sys.path: sys.path.append(project_root) import json -from datetime import datetime +from datetime import datetime, timedelta from Util.EasyOcrKit import EasyOcrKit # 初始化 EasyOcrKit @@ -34,6 +34,107 @@ TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Templat # 全局调试图片计数器 _debug_counter = 0 +def parse_wechat_time(time_str): + """ + 解析微信时间字符串为标准化格式 (YYYY-MM-DD HH:MM) + 支持: "10:03", "昨天 10:03", "星期三 10:03", "2025年1月1日 10:03" + """ + try: + now = datetime.now() + today = now.date() + clean_str = time_str.strip() + + # 1. HH:mm (当天) + # 注意:有时候 OCR 会把冒号识别成其他字符,这里假设是标准的 HH:mm + if re.match(r'^\d{1,2}:\d{2}$', clean_str): + h, m = map(int, clean_str.split(':')) + dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m)) + return dt.strftime("%Y-%m-%d %H:%M") + + # 2. 昨天 HH:mm + if "昨天" in clean_str: + t_part = clean_str.replace("昨天", "").strip() + if re.match(r'^\d{1,2}:\d{2}$', t_part): + h, m = map(int, t_part.split(':')) + yesterday = today - timedelta(days=1) + dt = datetime.combine(yesterday, datetime.min.time().replace(hour=h, minute=m)) + return dt.strftime("%Y-%m-%d %H:%M") + + # 3. 星期X HH:mm + weekdays = {"星期一": 0, "星期二": 1, "星期三": 2, "星期四": 3, "星期五": 4, "星期六": 5, "星期日": 6} + for w_str, w_idx in weekdays.items(): + if w_str in clean_str: + t_part = clean_str.replace(w_str, "").strip() + if re.match(r'^\d{1,2}:\d{2}$', t_part): + h, m = map(int, t_part.split(':')) + current_weekday = now.weekday() + # 计算日期回退天数 (mod 7 确保是过去的一周内) + delta_days = (current_weekday - w_idx) % 7 + # 如果 delta_days 是 0 且当前时间比消息时间早 (不可能发生,除非穿越),说明是今天 + # 但通常"星期X"不显示今天,今天显示 HH:mm + # 如果 delta_days == 0,可能是上周的今天?微信通常显示 "上周X"? + # 简单起见,认为是今天或过去7天内的那天 + if delta_days == 0 and datetime.now().time() < datetime.min.time().replace(hour=h, minute=m): + delta_days = 7 # 上周 + + target_date = today - timedelta(days=delta_days) + dt = datetime.combine(target_date, datetime.min.time().replace(hour=h, minute=m)) + return dt.strftime("%Y-%m-%d %H:%M") + + # 4. YYYY年MM月DD日 HH:mm + # 简单匹配年月日 + match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', clean_str) + if match: + y, m, d = map(int, match.groups()) + # 找时间部分 + time_match = re.search(r'(\d{1,2}):(\d{2})', clean_str) + if time_match: + hh, mm = map(int, time_match.groups()) + dt = datetime(y, m, d, hh, mm) + return dt.strftime("%Y-%m-%d %H:%M") + else: + # 只有日期,没有时间 (通常是日期分隔符) + # 这种情况下,可能需要给个默认时间?或者就返回日期 + return f"{y:04d}-{m:02d}-{d:02d} 00:00" + + # 5. MM月DD日 HH:mm (跨年但未显示年份?微信通常会显示年份如果跨年) + # 处理 "1月26日 10:00" + match = re.search(r'(\d{1,2})月(\d{1,2})日', clean_str) + if match: + m, d = map(int, match.groups()) + # 默认当年 + y = today.year + # 找时间 + time_match = re.search(r'(\d{1,2}):(\d{2})', clean_str) + if time_match: + hh, mm = map(int, time_match.groups()) + dt = datetime(y, m, d, hh, mm) + # 如果计算出的时间在未来,可能是去年 (比如现在1月,消息是12月) + if dt > now: + dt = datetime(y - 1, m, d, hh, mm) + return dt.strftime("%Y-%m-%d %H:%M") + + # 兜底:如果是 "下午 5:00" 这种格式 + if "下午" in clean_str or "晚上" in clean_str: + t_part = re.sub(r'下午|晚上', '', clean_str).strip() + if re.match(r'^\d{1,2}:\d{2}$', t_part): + h, m = map(int, t_part.split(':')) + if h < 12: h += 12 + dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m)) + return dt.strftime("%Y-%m-%d %H:%M") + + if "上午" in clean_str: + t_part = re.sub(r'上午', '', clean_str).strip() + if re.match(r'^\d{1,2}:\d{2}$', t_part): + h, m = map(int, t_part.split(':')) + dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m)) + return dt.strftime("%Y-%m-%d %H:%M") + + return clean_str # 解析失败,返回原串 + except Exception as e: + logger.warning(f"时间解析失败 '{time_str}': {e}") + return time_str + def get_next_debug_path(desc="step"): """获取下一个顺序命名的调试图片路径 (debug_N_desc.jpg)""" global _debug_counter @@ -247,10 +348,30 @@ def _scan_chat_messages(image_path): c_x = int((bbox[0][0] + bbox[2][0]) / 2) c_y = int((bbox[0][1] + bbox[2][1]) / 2) - # 判定逻辑:文本在语音下方且水平偏移不大 (放宽 Y 轴限制以包含侧边的时长文本) - # 2025-01-26: 增加 X 轴范围到 900 以适配超长语音条的右侧时长/文本 - # 增加 Y 轴范围到 800 以适配多行转文字内容 - if -50 < c_y - ay < 800 and abs(c_x - ax) < 900: + # 判定逻辑:文本在语音下方且水平偏移不大 + # 1. Y轴限制: -50 < dy < 800 (适配多行文本) + # 2. X轴限制: abs(dx) < 500 (减少误判,防止关联到屏幕另一侧的消息) + # 3. 几何位置强校验 (核心修复) + voice_is_left = ax < w / 2 + + # 获取文本框的左右边界 + min_x = min(p[0] for p in bbox) + max_x = max(p[0] for p in bbox) + + if voice_is_left: + # 语音在左 (对方): 文本必须也是左对齐 + # - min_x 必须靠左 (< 300) + # - max_x 不能太靠右 (> w - 150),否则可能是"我"的消息 + if min_x > 300 or max_x > w - 150: + continue + else: + # 语音在右 (我): 文本必须也是右对齐 + # - max_x 必须靠右 (> w - 300) + # - min_x 不能太靠左 (< 100) + if max_x < w - 300 or min_x < 100: + continue + + if -50 < c_y - ay < 800 and abs(c_x - ax) < 500: # 检查中间是否有其他语音图标 has_intermediate_audio = False for other_ax, other_ay in audio_matches: @@ -298,6 +419,13 @@ def _scan_chat_messages(image_path): # 按 Y 轴排序,如果 Y 接近则按 X 轴排序 associated_texts.sort(key=lambda x: (x[0], x[1])) converted_trigger_text = "".join([t[2] for t in associated_texts]) + + # 去除已知噪音 + noise_patterns = ["42IIhK+-语音输入粘贴#", "语音输入粘贴"] + for np in noise_patterns: + converted_trigger_text = converted_trigger_text.replace(np, "") + converted_trigger_text = converted_trigger_text.strip() + logger.info(f"语音({ax},{ay}) 判定为已转换,最终合并文本: '{converted_trigger_text}'") if is_converted: @@ -328,9 +456,36 @@ def _scan_chat_messages(image_path): if c_y < 150 or c_y > h - 100: continue + # 判定发送者 (增强版几何判定,防止 720p 屏幕下的中心点误判) + # 默认使用中心点判定 + sender = "对方" if c_x < w / 2 else "我" + + # 使用边界特征进行修正 + min_x = min(p[0] for p in bbox) + max_x = max(p[0] for p in bbox) + + # 修正阈值:假设头像+边距约占 15% 宽度 + edge_margin = w * 0.15 + + if max_x > w - edge_margin: + # 文本框延伸到了最右侧 -> 肯定是"我" (因为对方的头像在左,文本不会靠右) + sender = "我" + elif min_x < edge_margin: + # 文本框延伸到了最左侧 -> 肯定是"对方" (因为我的头像在右,文本不会靠左) + sender = "对方" + time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)' + # 优先判断是否为独立的时间戳 (行短且符合时间格式) if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)): - logger.info(f"忽略时间戳/日期文本: '{text}'") + # 进一步校验是否真的是时间 (通过 parse_wechat_time 尝试解析,或者简单正则) + # 这里我们假设短行的符合 time_pattern 的都是时间标记 + logger.info(f"识别到时间戳/日期: '{text}'") + messages.append({ + "type": "timestamp", + "content": text.strip(), + "y": c_y, + "center": (c_x, c_y) + }) continue clean_text = text.strip() @@ -350,8 +505,6 @@ def _scan_chat_messages(image_path): logger.info(f"忽略系统消息内容: '{clean_text}'") continue - left_x = bbox[0][0] - sender = "对方" if left_x < w * 0.5 else "我" messages.append({ "type": "text", @@ -363,7 +516,30 @@ def _scan_chat_messages(image_path): # 6. 排序 messages.sort(key=lambda x: x['y']) - return messages, debug_img, chat_title + + # 7. 注入时间戳 + current_time_str = None + + # 过滤掉 timestamp 类型的消息,将其作为属性注入到后续消息中 + final_messages_with_time = [] + + for msg in messages: + if msg['type'] == 'timestamp': + # 更新当前时间上下文 + parsed_time = parse_wechat_time(msg['content']) + current_time_str = parsed_time + logger.info(f"更新时间上下文: {msg['content']} -> {parsed_time}") + else: + # 只有语音和文本消息需要注入时间 + if current_time_str: + msg['time_display'] = current_time_str + else: + # 如果上方没有时间戳,尝试默认使用当天日期 (或者保持 None) + # 对于首屏最上面的消息,可能没有时间戳 + pass + final_messages_with_time.append(msg) + + return final_messages_with_time, debug_img, chat_title async def analyze_chat_image(image_path, output_path, device=None, target_name="对方", process_strategy="ALL"): """ @@ -629,18 +805,21 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name=" for msg in final_messages: # 尝试注入异步获取的语音内容 if msg['type'] == 'voice': - # 模糊匹配 Y 坐标 + # 模糊匹配 Y 坐标 (增大容差到 100,应对界面滚动) for y_key, content in captured_voice_contents.items(): - if abs(msg['y'] - y_key) < 20: + if abs(msg['y'] - y_key) < 100: msg['is_converted'] = True msg['content'] = content logger.info(f"注入语音内容到最终消息列表: {content}") break - # 只添加有内容的文本消息,或已转换且有内容的语音消息 - if msg['type'] == 'text' and msg.get('content'): - dialogue_log.append(msg) - elif msg['type'] == 'voice' and msg.get('is_converted') and msg.get('content'): + # 无论是否有内容,都加入 dialogue_log + # 如果是语音且没内容,T2 会有兜底逻辑去处理 + if msg['type'] == 'text': + if msg.get('content'): # 文本消息没内容通常是识别错误,可以丢弃 + dialogue_log.append(msg) + elif msg['type'] == 'voice': + # 语音消息即使没内容也保留,交给上层处理 dialogue_log.append(msg) logger.info(f"📊 [统计] 语音总数: {total_voices_count}, 打开转文字次数: {convert_opened_count}, 关闭转文字次数: {convert_closed_count}") diff --git a/WeiXin/__pycache__/WxUtil.cpython-310.pyc b/WeiXin/__pycache__/WxUtil.cpython-310.pyc index a2634e7..a30fca3 100644 Binary files a/WeiXin/__pycache__/WxUtil.cpython-310.pyc and b/WeiXin/__pycache__/WxUtil.cpython-310.pyc differ