This commit is contained in:
HuangHai
2026-01-26 20:37:36 +08:00
parent faf5787f39
commit efcef1bade
3 changed files with 228 additions and 26 deletions

View File

@@ -179,16 +179,14 @@ class ChatMonitorBot:
logger.info("\n" + "="*50)
logger.info("【测试模式】最终提取的对话记录:")
for msg in self.dialogue_log:
# 格式化输出:[发送者] 内容 (类型)
sender = msg.get('sender', '未知')
content = msg.get('content', '')
msg_type = "语音" if msg.get('type') == 'voice' else "文字"
time_str = msg.get('time_display', '')
# 按照用户要求的格式输出
logger.info(f"说话人: {sender}")
logger.info(f"消息类型: {msg_type}")
logger.info(f"消息内容: {content}")
logger.info("-" * 20)
# 按照用户要求的格式输出: 2026-01-26 10:03 糖豆爸爸 老师您好!
log_prefix = f"{time_str} " if time_str else ""
log_line = f"{log_prefix}{sender} {content}"
logger.info(log_line)
logger.info("="*50 + "\n")
# --- LLM 总结 ---
@@ -198,7 +196,9 @@ class ChatMonitorBot:
sender = msg.get('sender', '未知')
content = msg.get('content', '')
type_str = "[语音]" if msg.get('type') == 'voice' else "[文字]"
chat_history_text += f"{sender}{type_str}: {content}\n"
time_str = msg.get('time_display', '')
time_prefix = f"[{time_str}] " if time_str else ""
chat_history_text += f"{time_prefix}{sender}{type_str}: {content}\n"
prompt = (
"请根据以下微信对话记录,总结归纳双方交流的主要信息点。\n"
@@ -232,7 +232,7 @@ class ChatMonitorBot:
logger.info(f"💡 [首屏] 最后一条消息来自 '{sender}',尝试生成回复...")
# 构建上下文
context_text = "\n".join([f"{m.get('sender')}: {m.get('content')}" for m in self.dialogue_log[:-1]])
context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in self.dialogue_log[:-1]])
last_content = last_msg.get('content', '')
reply = await self.get_reply(last_content, context_text)
@@ -324,8 +324,31 @@ class ChatMonitorBot:
logger.info(f"💡 [监控] 发现新消息: {last_msg},保存现场截图: {event_shot}")
# 获取上下文文本 (格式化为 Sender: Content)
context_text = "\n".join([f"{m.get('sender')}: {m.get('content')}" for m in dialogue_log[:-1]])
last_content = last_msg.get('content', '')
context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in dialogue_log[:-1]])
last_content = last_msg.get('content') or ""
# 兜底逻辑:如果最后一条是语音且内容为空(可能因无红点未被 UNREAD 策略处理),尝试强制转换
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.info("⚠️ [监控] 最后一条语音消息未获取到内容(可能已读无红点),尝试强制转换...")
# 强制使用 LAST 策略重试
dialogue_log_retry, _ = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
process_strategy="LAST"
)
if dialogue_log_retry:
# 更新引用
self.dialogue_log = dialogue_log_retry
dialogue_log = dialogue_log_retry
last_msg = dialogue_log[-1]
last_content = last_msg.get('content') or ""
logger.info(f"🔄 [重试] 强制转换后内容: {last_content}")
# 重新构建 msg_str 和 hash确保下次循环不会因为内容变化而再次触发虽然这里已经处理了
# 但实际上这里是在处理当前事件,更新 hash 是为了避免重复处理
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
current_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
# 生成回复
reply = await self.get_reply(last_content, context_text)

View File

@@ -15,7 +15,7 @@ if project_root not in sys.path:
sys.path.append(project_root)
import json
from datetime import datetime
from datetime import datetime, timedelta
from Util.EasyOcrKit import EasyOcrKit
# 初始化 EasyOcrKit
@@ -34,6 +34,107 @@ TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Templat
# 全局调试图片计数器
_debug_counter = 0
def parse_wechat_time(time_str):
"""
解析微信时间字符串为标准化格式 (YYYY-MM-DD HH:MM)
支持: "10:03", "昨天 10:03", "星期三 10:03", "2025年1月1日 10:03"
"""
try:
now = datetime.now()
today = now.date()
clean_str = time_str.strip()
# 1. HH:mm (当天)
# 注意:有时候 OCR 会把冒号识别成其他字符,这里假设是标准的 HH:mm
if re.match(r'^\d{1,2}:\d{2}$', clean_str):
h, m = map(int, clean_str.split(':'))
dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m))
return dt.strftime("%Y-%m-%d %H:%M")
# 2. 昨天 HH:mm
if "昨天" in clean_str:
t_part = clean_str.replace("昨天", "").strip()
if re.match(r'^\d{1,2}:\d{2}$', t_part):
h, m = map(int, t_part.split(':'))
yesterday = today - timedelta(days=1)
dt = datetime.combine(yesterday, datetime.min.time().replace(hour=h, minute=m))
return dt.strftime("%Y-%m-%d %H:%M")
# 3. 星期X HH:mm
weekdays = {"星期一": 0, "星期二": 1, "星期三": 2, "星期四": 3, "星期五": 4, "星期六": 5, "星期日": 6}
for w_str, w_idx in weekdays.items():
if w_str in clean_str:
t_part = clean_str.replace(w_str, "").strip()
if re.match(r'^\d{1,2}:\d{2}$', t_part):
h, m = map(int, t_part.split(':'))
current_weekday = now.weekday()
# 计算日期回退天数 (mod 7 确保是过去的一周内)
delta_days = (current_weekday - w_idx) % 7
# 如果 delta_days 是 0 且当前时间比消息时间早 (不可能发生,除非穿越),说明是今天
# 但通常"星期X"不显示今天,今天显示 HH:mm
# 如果 delta_days == 0可能是上周的今天微信通常显示 "上周X"
# 简单起见认为是今天或过去7天内的那天
if delta_days == 0 and datetime.now().time() < datetime.min.time().replace(hour=h, minute=m):
delta_days = 7 # 上周
target_date = today - timedelta(days=delta_days)
dt = datetime.combine(target_date, datetime.min.time().replace(hour=h, minute=m))
return dt.strftime("%Y-%m-%d %H:%M")
# 4. YYYY年MM月DD日 HH:mm
# 简单匹配年月日
match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', clean_str)
if match:
y, m, d = map(int, match.groups())
# 找时间部分
time_match = re.search(r'(\d{1,2}):(\d{2})', clean_str)
if time_match:
hh, mm = map(int, time_match.groups())
dt = datetime(y, m, d, hh, mm)
return dt.strftime("%Y-%m-%d %H:%M")
else:
# 只有日期,没有时间 (通常是日期分隔符)
# 这种情况下,可能需要给个默认时间?或者就返回日期
return f"{y:04d}-{m:02d}-{d:02d} 00:00"
# 5. MM月DD日 HH:mm (跨年但未显示年份?微信通常会显示年份如果跨年)
# 处理 "1月26日 10:00"
match = re.search(r'(\d{1,2})月(\d{1,2})日', clean_str)
if match:
m, d = map(int, match.groups())
# 默认当年
y = today.year
# 找时间
time_match = re.search(r'(\d{1,2}):(\d{2})', clean_str)
if time_match:
hh, mm = map(int, time_match.groups())
dt = datetime(y, m, d, hh, mm)
# 如果计算出的时间在未来,可能是去年 (比如现在1月消息是12月)
if dt > now:
dt = datetime(y - 1, m, d, hh, mm)
return dt.strftime("%Y-%m-%d %H:%M")
# 兜底:如果是 "下午 5:00" 这种格式
if "下午" in clean_str or "晚上" in clean_str:
t_part = re.sub(r'下午|晚上', '', clean_str).strip()
if re.match(r'^\d{1,2}:\d{2}$', t_part):
h, m = map(int, t_part.split(':'))
if h < 12: h += 12
dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m))
return dt.strftime("%Y-%m-%d %H:%M")
if "上午" in clean_str:
t_part = re.sub(r'上午', '', clean_str).strip()
if re.match(r'^\d{1,2}:\d{2}$', t_part):
h, m = map(int, t_part.split(':'))
dt = datetime.combine(today, datetime.min.time().replace(hour=h, minute=m))
return dt.strftime("%Y-%m-%d %H:%M")
return clean_str # 解析失败,返回原串
except Exception as e:
logger.warning(f"时间解析失败 '{time_str}': {e}")
return time_str
def get_next_debug_path(desc="step"):
"""获取下一个顺序命名的调试图片路径 (debug_N_desc.jpg)"""
global _debug_counter
@@ -247,10 +348,30 @@ def _scan_chat_messages(image_path):
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 判定逻辑:文本在语音下方且水平偏移不大 (放宽 Y 轴限制以包含侧边的时长文本)
# 2025-01-26: 增加 X 轴范围到 900 适配超长语音条的右侧时长/文本
# 增加 Y 轴范围到 800 以适配多行转文字内容
if -50 < c_y - ay < 800 and abs(c_x - ax) < 900:
# 判定逻辑:文本在语音下方且水平偏移不大
# 1. Y轴限制: -50 < dy < 800 (适配多行文本)
# 2. X轴限制: abs(dx) < 500 (减少误判,防止关联到屏幕另一侧的消息)
# 3. 几何位置强校验 (核心修复)
voice_is_left = ax < w / 2
# 获取文本框的左右边界
min_x = min(p[0] for p in bbox)
max_x = max(p[0] for p in bbox)
if voice_is_left:
# 语音在左 (对方): 文本必须也是左对齐
# - min_x 必须靠左 (< 300)
# - max_x 不能太靠右 (> w - 150),否则可能是"我"的消息
if min_x > 300 or max_x > w - 150:
continue
else:
# 语音在右 (我): 文本必须也是右对齐
# - max_x 必须靠右 (> w - 300)
# - min_x 不能太靠左 (< 100)
if max_x < w - 300 or min_x < 100:
continue
if -50 < c_y - ay < 800 and abs(c_x - ax) < 500:
# 检查中间是否有其他语音图标
has_intermediate_audio = False
for other_ax, other_ay in audio_matches:
@@ -298,6 +419,13 @@ def _scan_chat_messages(image_path):
# 按 Y 轴排序,如果 Y 接近则按 X 轴排序
associated_texts.sort(key=lambda x: (x[0], x[1]))
converted_trigger_text = "".join([t[2] for t in associated_texts])
# 去除已知噪音
noise_patterns = ["42IIhK+-语音输入粘贴#", "语音输入粘贴"]
for np in noise_patterns:
converted_trigger_text = converted_trigger_text.replace(np, "")
converted_trigger_text = converted_trigger_text.strip()
logger.info(f"语音({ax},{ay}) 判定为已转换,最终合并文本: '{converted_trigger_text}'")
if is_converted:
@@ -328,9 +456,36 @@ def _scan_chat_messages(image_path):
if c_y < 150 or c_y > h - 100:
continue
# 判定发送者 (增强版几何判定,防止 720p 屏幕下的中心点误判)
# 默认使用中心点判定
sender = "对方" if c_x < w / 2 else ""
# 使用边界特征进行修正
min_x = min(p[0] for p in bbox)
max_x = max(p[0] for p in bbox)
# 修正阈值:假设头像+边距约占 15% 宽度
edge_margin = w * 0.15
if max_x > w - edge_margin:
# 文本框延伸到了最右侧 -> 肯定是"我" (因为对方的头像在左,文本不会靠右)
sender = ""
elif min_x < edge_margin:
# 文本框延伸到了最左侧 -> 肯定是"对方" (因为我的头像在右,文本不会靠左)
sender = "对方"
time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)'
# 优先判断是否为独立的时间戳 (行短且符合时间格式)
if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)):
logger.info(f"忽略时间戳/日期文本: '{text}'")
# 进一步校验是否真的是时间 (通过 parse_wechat_time 尝试解析,或者简单正则)
# 这里我们假设短行的符合 time_pattern 的都是时间标记
logger.info(f"识别到时间戳/日期: '{text}'")
messages.append({
"type": "timestamp",
"content": text.strip(),
"y": c_y,
"center": (c_x, c_y)
})
continue
clean_text = text.strip()
@@ -350,8 +505,6 @@ def _scan_chat_messages(image_path):
logger.info(f"忽略系统消息内容: '{clean_text}'")
continue
left_x = bbox[0][0]
sender = "对方" if left_x < w * 0.5 else ""
messages.append({
"type": "text",
@@ -363,7 +516,30 @@ def _scan_chat_messages(image_path):
# 6. 排序
messages.sort(key=lambda x: x['y'])
return messages, debug_img, chat_title
# 7. 注入时间戳
current_time_str = None
# 过滤掉 timestamp 类型的消息,将其作为属性注入到后续消息中
final_messages_with_time = []
for msg in messages:
if msg['type'] == 'timestamp':
# 更新当前时间上下文
parsed_time = parse_wechat_time(msg['content'])
current_time_str = parsed_time
logger.info(f"更新时间上下文: {msg['content']} -> {parsed_time}")
else:
# 只有语音和文本消息需要注入时间
if current_time_str:
msg['time_display'] = current_time_str
else:
# 如果上方没有时间戳,尝试默认使用当天日期 (或者保持 None)
# 对于首屏最上面的消息,可能没有时间戳
pass
final_messages_with_time.append(msg)
return final_messages_with_time, debug_img, chat_title
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方", process_strategy="ALL"):
"""
@@ -629,18 +805,21 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
for msg in final_messages:
# 尝试注入异步获取的语音内容
if msg['type'] == 'voice':
# 模糊匹配 Y 坐标
# 模糊匹配 Y 坐标 (增大容差到 100应对界面滚动)
for y_key, content in captured_voice_contents.items():
if abs(msg['y'] - y_key) < 20:
if abs(msg['y'] - y_key) < 100:
msg['is_converted'] = True
msg['content'] = content
logger.info(f"注入语音内容到最终消息列表: {content}")
break
# 只添加有内容的文本消息,或已转换且有内容的语音消息
if msg['type'] == 'text' and msg.get('content'):
dialogue_log.append(msg)
elif msg['type'] == 'voice' and msg.get('is_converted') and msg.get('content'):
# 无论是否有内容,都加入 dialogue_log
# 如果是语音且没内容T2 会有兜底逻辑去处理
if msg['type'] == 'text':
if msg.get('content'): # 文本消息没内容通常是识别错误,可以丢弃
dialogue_log.append(msg)
elif msg['type'] == 'voice':
# 语音消息即使没内容也保留,交给上层处理
dialogue_log.append(msg)
logger.info(f"📊 [统计] 语音总数: {total_voices_count}, 打开转文字次数: {convert_opened_count}, 关闭转文字次数: {convert_closed_count}")