This commit is contained in:
HuangHai
2026-01-26 19:39:12 +08:00
parent cbc59c6628
commit 236171e015
4 changed files with 482 additions and 303 deletions

View File

@@ -4,6 +4,8 @@ import sys
import logging
import asyncio
import hashlib
import json
import numpy as np
import cv2
@@ -31,7 +33,7 @@ logger.setLevel(logging.INFO)
if logger.hasHandlers():
logger.handlers.clear()
file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='a')
file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='w')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
@@ -42,6 +44,13 @@ logger.addHandler(stream_handler)
logger.propagate = False
logger.info(f"🚀 日志文件路径: {os.path.abspath(log_file_path)}")
# 同时将 WxUtil 的日志也输出到同一个文件
wx_logger = logging.getLogger("WxUtil")
wx_logger.propagate = False # 防止日志向上传递导致重复 (因为 WxUtil 中调用了 basicConfig)
if not any(isinstance(h, logging.FileHandler) and os.path.abspath(h.baseFilename) == os.path.abspath(log_file_path) for h in wx_logger.handlers):
wx_logger.addHandler(file_handler)
wx_logger.addHandler(stream_handler) # 确保 WxUtil 也输出到控制台
class ChatMonitorBot:
"""
大张老师自动巡课系统 (CV版)
@@ -117,10 +126,10 @@ class ChatMonitorBot:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
# 裁剪掉顶部 100 像素 (状态栏/时间)
# 裁剪掉顶部 150 像素 (状态栏/时间)
h, w = img.shape[:2]
if h > 100:
cropped_img = img[100:h, 0:w]
if h > 150:
cropped_img = img[150:h, 0:w]
else:
cropped_img = img
@@ -142,38 +151,51 @@ class ChatMonitorBot:
# 2. 首次运行:识别所有语音并获取上下文
logger.info("🔍 [首次运行] 正在进行全量识别,获取对话上下文...")
# 使用顺序命名
enter_path = WxUtil.get_next_debug_path("enter")
flag_path = WxUtil.get_next_debug_path("flag")
# 调用封装好的 get_first_screen
self.dialogue_log, self.input_pos, enter_path, flag_path = await WxUtil.get_first_screen(self.device)
self.device.screenshot(enter_path)
logger.info(f"📸 已保存进入截图: {enter_path}")
# 同时也更新 live_shot
# 更新 live paths (用于后续监控逻辑的引用)
import shutil
shutil.copy(enter_path, self.screenshot_path)
self.dialogue_log, self.input_pos = await WxUtil.analyze_chat_image(
self.screenshot_path,
flag_path,
device=self.device,
only_process_last_voice=False # 首次运行:全量处理
)
# 如果 flag_path 生成了,也复制一份给 debug_view_path
if os.path.exists(flag_path):
if enter_path and os.path.exists(enter_path):
shutil.copy(enter_path, self.screenshot_path)
if flag_path and os.path.exists(flag_path):
shutil.copy(flag_path, self.debug_view_path)
logger.info(f"📸 已保存识别标记图: {flag_path}")
if self.dialogue_log:
logger.info(f"✅ 首次运行识别完成,获取到 {len(self.dialogue_log)} 条消息上下文")
logger.info("\n" + "="*50)
logger.info("【测试模式】最终提取的对话记录:")
for msg in self.dialogue_log:
# 格式化输出:[发送者] 内容 (类型)
sender = msg.get('sender', '未知')
content = msg.get('content', '')
msg_type = msg.get('type', 'unknown')
logger.info(f"[{sender}] {content} ({msg_type})")
logger.info("="*50 + "\n")
# 初始化最后处理的消息哈希,避免重复回复第一条
last_msg = self.dialogue_log[-1]
self.last_processed_msg_hash = hashlib.md5(last_msg.encode('utf-8')).hexdigest()
# last_msg 是字典,需要转字符串再 encode
def numpy_serializer(obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
raise TypeError(f"Type {type(obj)} not serializable")
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
self.last_processed_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
self.last_screen_hash = self.get_image_hash(self.screenshot_path)
else:
logger.warning("⚠️ 首次运行未识别到有效对话")
logger.info("🛑 测试结束:已完成所有语音的转换与读取。停止进入监控循环。")
return # 测试模式:直接退出,不进入监控循环
# 3. 进入循环阶段
logger.info("🔄 进入实时监控阶段...")
while True:
@@ -195,7 +217,7 @@ class ChatMonitorBot:
self.screenshot_path,
self.debug_view_path,
device=self.device,
only_process_last_voice=True # 循环监控:仅处理最新一条
process_strategy="UNREAD" # 监控阶段:只处理带红点的新语音
)
if not dialogue_log:

View File

@@ -114,330 +114,451 @@ def safe_device_click(d, x, y):
logger.error(f"重试点击操作依然失败: {e2}")
return False
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方", only_process_last_voice=False):
def draw_debug_info(image_path, messages, current_voice_center=None, suffix=""):
"""
全面采用 CV + OCR 识别微信聊天截图中的最后一条消息
不再使用 VLM
:param only_process_last_voice: 如果为 True仅处理转文字屏幕上最后一条未转换的语音消息
辅助函数:在截图中绘制当前已知的消息状态
:param image_path: 图片路径
:param messages: 消息列表
:param current_voice_center: 当前正在处理的语音中心坐标 (vx, vy)
:param suffix: 保存文件名的后缀
"""
try:
# 1. 初始化
d = device if device else connect_device()
if not d:
return [], None
# 2. 读取图片
img = cv2.imread(image_path)
if img is None:
logger.error(f"无法读取图片: {image_path}")
return [], None
h, w = img.shape[:2]
if img is None: return
# 3. 模板匹配寻找语音图标和红点
audio_template = os.path.join(TEMPLATE_DIR, "audio.jpg")
red_point_template = os.path.join(TEMPLATE_DIR, "red_point.jpg")
audio_matches = find_all_template_matches(image_path, audio_template, threshold=0.8)
red_points = find_all_template_matches(image_path, red_point_template, threshold=0.8)
# 4. OCR 识别所有文本
logger.info("正在执行 OCR 识别...")
ocr_results = ocr_kit.read_text(image_path)
# 微信菜单关键字(用于排除干扰)
MENU_KEYWORDS = ["听筒播放", "收藏", "背景播放", "删除", "多选", "取消转文字", "转文字", "引用", "提醒"]
# 5. 整合所有消息
messages = []
debug_img = img.copy() # 初始化调试图
# 绘制过滤区域边界 (可视化)
cv2.line(debug_img, (0, 150), (w, 150), (255, 0, 255), 2) # 顶部线
cv2.line(debug_img, (0, h - 100), (w, h - 100), (255, 0, 255), 2) # 底部线 (从 180 改为 100)
cv2.putText(debug_img, "TOP_FILTER", (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
cv2.putText(debug_img, "BOTTOM_FILTER", (10, h - 110), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
# A. 添加语音消息
for ax, ay in audio_matches:
# 标记所有找到的语音图标 (用于调试)
cv2.circle(debug_img, (ax, ay), 10, (255, 255, 0), -1) # 青色实心圆表示原始匹配点
# 过滤掉顶部和底部的非聊天区域
# 顶部标题栏通常在 150 像素以内
# 底部输入栏通常在 100 像素以内 (捕捉最底部的文字)
if ay < 150 or ay > h - 100:
logger.info(f"忽略区域外语音图标: ({ax}, {ay})")
cv2.rectangle(debug_img, (ax-35, ay-35), (ax+35, ay+35), (128, 128, 128), 1) # 灰色框表示被过滤
cv2.putText(debug_img, "FILTERED", (ax - 40, ay - 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1)
continue
for msg in messages:
if msg['type'] == 'voice':
ax, ay = msg['center']
is_unread = msg.get('is_unread', False)
is_converted = msg.get('is_converted', False)
sender = "对方" if ax < w / 2 else ""
is_unread = False
for rx, ry in red_points:
# 红点通常在语音图标右侧且 Y 轴相近
if abs(ry - ay) < 50 and rx > ax:
is_unread = True
break
# 根据已读/未读画框:未读红框,已读绿框
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
# --- 改进:判断是否已转文字 ---
is_converted = False
converted_trigger_text = ""
for bbox, text, conf in ocr_results:
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 绘制框
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
# 判定逻辑:
# 1. 转换后的文字通常在语音图标下方 30-600 像素 (增加到 600 以适配长文本)
# 2. 水平位置偏移在 600 像素内 (增加到 600 以适配宽文本泡)
# 3. 关键:确保这两个坐标之间没有其他的语音图标(防止第一个语音“偷”了第二个语音的文字)
if 30 < c_y - ay < 600 and abs(c_x - ax) < 600:
# 检查中间是否有其他语音图标
has_intermediate_audio = False
for other_ax, other_ay in audio_matches:
# 增加 20 像素缓冲区,防止判定到自身或极近的干扰点
if ay + 20 < other_ay < c_y - 10:
has_intermediate_audio = True
logger.info(f"语音({ax},{ay}) 被中间语音图标({other_ax},{other_ay}) 阻断,无法关联文本 '{text[:10]}...'")
break
if has_intermediate_audio:
continue
# 绘制 YES/NO
label = "YES" if is_converted else "NO"
cv2.putText(img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
# 如果是当前正在处理的语音,画一个额外的黄圈
if current_voice_center and abs(ax - current_voice_center[0]) < 10 and abs(ay - current_voice_center[1]) < 10:
cv2.circle(img, (ax, ay), 40, (0, 255, 255), 3)
cv2.putText(img, "PROCESSING", (ax - 60, ay - 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
clean_text = text.strip()
# 判定是否为时间戳 (如 13:49, 09:26)
# 增强:同时支持 "昨天 14:15" 这种格式
is_timestamp = re.search(r'(\d{1,2}:\d{2})', clean_text) and (len(clean_text) < 15)
# 判定是否为纯数字或时长 (如 5", 3", 少3")
# 增强:允许前面有少量杂讯字符,只要结尾是数字或 "
is_duration = re.search(r'\d{1,2}"?$', clean_text) and len(clean_text) < 6
# 排除掉语音时长、时间戳和菜单关键字的干扰
if not is_duration and not is_timestamp and clean_text not in MENU_KEYWORDS:
is_converted = True
converted_trigger_text = clean_text
logger.info(f"语音({ax},{ay}) 判定为已转换,关联到有效文本: '{clean_text}'")
break
else:
if is_timestamp:
logger.info(f"语音({ax},{ay}) 忽略下方时间戳文本: '{clean_text}'")
elif is_duration:
logger.info(f"语音({ax},{ay}) 忽略时长文本: '{clean_text}'")
if is_converted:
logger.info(f"语音消息 ({ax}, {ay}) 已有转换文字: '{converted_trigger_text}',跳过")
# --- 恢复绘图反馈 ---
# 根据已读/未读画框:未读红框,已读绿框
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
label = "YES" if is_converted else "NO"
# 在框的右侧标注 YES 或 NO
cv2.putText(debug_img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
# --- 结束 ---
# 保存覆盖后的图片
cv2.imwrite(image_path, img)
logger.info(f"已更新调试标记到截图: {image_path}")
except Exception as e:
logger.warning(f"绘制调试信息失败: {e}")
messages.append({
"type": "voice",
"sender": sender,
"center": (ax, ay),
"y": ay,
"is_unread": is_unread,
"is_converted": is_converted,
"content": None
})
def _scan_chat_messages(image_path):
"""
内部函数:扫描图片中的微信消息(语音、文本、红点)
返回: (messages_list, debug_image)
"""
img = cv2.imread(image_path)
if img is None:
logger.error(f"无法读取图片: {image_path}")
return [], None
h, w = img.shape[:2]
# 3. 模板匹配寻找语音图标和红点
audio_template = os.path.join(TEMPLATE_DIR, "audio.jpg")
red_point_template = os.path.join(TEMPLATE_DIR, "red_point.jpg")
audio_matches = find_all_template_matches(image_path, audio_template, threshold=0.8)
red_points = find_all_template_matches(image_path, red_point_template, threshold=0.8)
# 4. OCR 识别所有文本
logger.info("正在执行 OCR 识别...")
ocr_results = ocr_kit.read_text(image_path)
# 微信菜单关键字(用于排除干扰)
MENU_KEYWORDS = ["听筒播放", "收藏", "背景播放", "删除", "多选", "取消转文字", "转文字", "引用", "提醒"]
# 忽略的系统消息内容
IGNORE_CONTENT = ["撤回了一条消息", "打招呼的消息", "拍了拍", "你撤回了一条消息", "引用"]
# 5. 整合所有消息
messages = []
debug_img = img.copy() # 初始化调试图
# 绘制过滤区域边界 (可视化)
cv2.line(debug_img, (0, 150), (w, 150), (255, 0, 255), 2) # 顶部线
cv2.line(debug_img, (0, h - 100), (w, h - 100), (255, 0, 255), 2) # 底部线
cv2.putText(debug_img, "TOP_FILTER", (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
cv2.putText(debug_img, "BOTTOM_FILTER", (10, h - 110), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
# A. 添加语音消息
for ax, ay in audio_matches:
# 标记所有找到的语音图标 (用于调试)
cv2.circle(debug_img, (ax, ay), 10, (255, 255, 0), -1)
# 过滤掉顶部和底部的非聊天区域
if ay < 150 or ay > h - 100:
logger.info(f"忽略区域外语音图标: ({ax}, {ay})")
cv2.rectangle(debug_img, (ax-35, ay-35), (ax+35, ay+35), (128, 128, 128), 1)
cv2.putText(debug_img, "FILTERED", (ax - 40, ay - 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1)
continue
# B. 添加文本消息
sender = "对方" if ax < w / 2 else ""
is_unread = False
for rx, ry in red_points:
# 红点通常在语音图标右侧且 Y 轴相近
if abs(ry - ay) < 50 and rx > ax:
is_unread = True
break
# 改进:判断是否已转文字
is_converted = False
converted_trigger_text = ""
for bbox, text, conf in ocr_results:
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 过滤区域 (顶部标题栏和底部输入栏)
# 底部输入栏通常在 100 像素以内 (捕捉最底部的文字)
if c_y < 150 or c_y > h - 100:
continue
# 过滤掉明显的系统词 (通常是日期或时间)
time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)'
if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)):
continue
# 排除干扰:语音时长、菜单关键字、系统提示
clean_text = text.strip()
if re.match(r'^.?[0-9]{1,2}"?$', clean_text): continue
if clean_text in MENU_KEYWORDS: continue
if "撤回了一条消息" in text or "打招呼的消息" in text: continue
# 判定逻辑:文本在语音下方且水平偏移不大
if 30 < c_y - ay < 600 and abs(c_x - ax) < 600:
# 检查中间是否有其他语音图标
has_intermediate_audio = False
for other_ax, other_ay in audio_matches:
if ay + 20 < other_ay < c_y - 10:
has_intermediate_audio = True
logger.info(f"语音({ax},{ay}) 被中间语音图标({other_ax},{other_ay}) 阻断,无法关联文本 '{text[:10]}...'")
break
# 改进发送者判定:查看文本块的左边界
left_x = bbox[0][0]
sender = "对方" if left_x < w * 0.5 else ""
if has_intermediate_audio:
continue
clean_text = text.strip()
# 判定是否为时间戳
is_timestamp = re.search(r'(\d{1,2}:\d{2})', clean_text) and (len(clean_text) < 15)
# 判定是否为纯数字或时长
is_duration = re.search(r'\d{1,2}"?$', clean_text) and len(clean_text) < 6
# 判定是否为系统消息
is_ignored = any(k in clean_text for k in IGNORE_CONTENT)
if not is_duration and not is_timestamp and clean_text not in MENU_KEYWORDS and not is_ignored:
is_converted = True
# 针对 "少3"" 这种特殊噪点进行过滤,但仍标记为已转换
# 如果包含 "少" 且长度短且包含数字,视为噪点 (例如 "少3"")
if "" in clean_text and len(clean_text) < 6 and re.search(r'\d', clean_text):
logger.info(f"语音({ax},{ay}) 判定为已转换,但内容判定为噪点('{clean_text}'),置为空")
converted_trigger_text = ""
else:
converted_trigger_text = clean_text
logger.info(f"语音({ax},{ay}) 判定为已转换,关联到有效文本: '{clean_text}'")
break
else:
if is_timestamp:
logger.info(f"语音({ax},{ay}) 忽略下方时间戳文本: '{clean_text}'")
elif is_duration:
logger.info(f"语音({ax},{ay}) 忽略时长文本: '{clean_text}'")
elif is_ignored:
logger.info(f"语音({ax},{ay}) 忽略系统消息文本: '{clean_text}'")
if is_converted:
logger.info(f"语音消息 ({ax}, {ay}) 已有转换文字: '{converted_trigger_text}',跳过")
# 绘图反馈
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
label = "YES" if is_converted else "NO"
cv2.putText(debug_img, label, (ax + 40, ay + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
messages.append({
"type": "voice",
"sender": sender,
"center": (ax, ay),
"y": ay,
"is_unread": is_unread,
"is_converted": is_converted,
"content": converted_trigger_text if is_converted else None
})
# B. 添加文本消息
for bbox, text, conf in ocr_results:
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
if c_y < 150 or c_y > h - 100:
continue
time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)'
if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)):
continue
clean_text = text.strip()
if re.match(r'^.?[0-9]{1,2}"?$', clean_text): continue
if clean_text in MENU_KEYWORDS: continue
if any(k in clean_text for k in IGNORE_CONTENT): continue
messages.append({
"type": "text",
"sender": sender,
"content": text.strip(),
"center": (c_x, c_y),
"y": c_y
})
# 6. 排序并找出最后一条消息
if not messages:
logger.warning("未发现任何消息")
if output_path:
cv2.imwrite(output_path, debug_img)
left_x = bbox[0][0]
sender = "对方" if left_x < w * 0.5 else ""
messages.append({
"type": "text",
"sender": sender,
"content": text.strip(),
"center": (c_x, c_y),
"y": c_y
})
# 6. 排序
messages.sort(key=lambda x: x['y'])
return messages, debug_img
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方", process_strategy="ALL"):
"""
全面采用 CV + OCR 识别微信聊天截图中的最后一条消息
:param process_strategy: 语音处理策略 (ALL/UNREAD/LAST)
注意:此函数现在包含一个循环,如果发现需要转文字的语音,会逐个处理并重新截图。
"""
try:
d = device if device else connect_device()
if not d:
return [], None
# 按 Y 坐标从上到下排序
messages.sort(key=lambda x: x['y'])
last_msg = messages[-1]
if output_path:
cv2.imwrite(output_path, debug_img)
logger.info(f"调试图已保存: {output_path}")
dialogue_log = []
input_field_coordinates = (w // 2, int(h * 0.9)) # 默认输入框位置
current_image_path = image_path
current_output_path = output_path
# 7. 自动处理所有尚未转换的语音消息
# 获取所有语音消息(不论已读未读,只要没转换成文字就处理)
unconverted_voices = [m for m in messages if m['type'] == 'voice' and not m.get('is_converted')]
final_messages = []
loop_count = 0
MAX_LOOPS = 10 # 增加循环次数上限,适应 ALL 策略
# 按 Y 坐标排序,确保从上到下顺序
unconverted_voices.sort(key=lambda x: x['y'])
if unconverted_voices:
if only_process_last_voice:
logger.info(f"策略限制:仅处理最后一条未转换语音 (共发现 {len(unconverted_voices)} 条)")
unconverted_voices = [unconverted_voices[-1]]
else:
logger.info(f"发现 {len(unconverted_voices)} 条未转换的语音,开始全部处理...")
# 统计计数器
total_voices_count = 0
convert_opened_count = 0
convert_closed_count = 0
for v_msg in unconverted_voices:
vx, vy = int(v_msg['center'][0]), int(v_msg['center'][1])
logger.info(f"--- 正在处理语音消息 ({vx}, {vy}) ---")
# 记录本次会话已处理过的语音 Y 坐标集合
processed_y_coords = set()
# 记录 Peek-and-Restore 过程中抓取到的语音内容 {y_coord: content}
captured_voice_contents = {}
while loop_count < MAX_LOOPS:
loop_count += 1
logger.info(f"--- 分析循环 第 {loop_count} 次 ---")
# A. 长按语音
# 1. 扫描当前屏幕
messages, debug_img = _scan_chat_messages(current_image_path)
if messages is None: # 读取失败
return [], None
# 保存当前状态的调试图
if current_output_path:
cv2.imwrite(current_output_path, debug_img)
logger.info(f"调试图已保存: {current_output_path}")
# 2. 筛选需要处理的语音
all_voices = [m for m in messages if m['type'] == 'voice']
all_voices.sort(key=lambda x: x['y']) # 从上到下
# 更新统计 (取当前扫描到的数量)
total_voices_count = len(all_voices)
# Helper: 检查是否已处理
def is_processed(y_coord):
for py in processed_y_coords:
if abs(y_coord - py) < 20: # 20px 容差
return True
return False
target_voices = []
if process_strategy == "ALL":
# ALL 策略:处理所有未被记录处理过的、且未转换的语音
target_voices = [m for m in all_voices if not m.get('is_converted') and not is_processed(m['y'])]
logger.info(f"策略(ALL): 发现 {len(target_voices)} 条未转换待处理语音")
elif process_strategy == "UNREAD":
# UNREAD 策略:只处理未读且未转换且未处理过的
target_voices = [m for m in all_voices if m.get('is_unread') and not m.get('is_converted') and not is_processed(m['y'])]
logger.info(f"策略(UNREAD): 发现 {len(target_voices)} 条未读待处理语音")
elif process_strategy == "LAST":
# LAST 策略:只处理最后一条未转换的
unconverted = [m for m in all_voices if not m.get('is_converted')]
if unconverted:
last_voice = unconverted[-1]
if not is_processed(last_voice['y']):
target_voices = [last_voice]
logger.info(f"策略(LAST): 仅关注最后一条未转换语音")
# 如果没有需要处理的语音,或者我们已经达到了策略要求,退出循环
if not target_voices:
logger.info("当前屏幕无待处理语音,分析结束")
final_messages = messages
break
# 3. 处理第一条目标语音
# 注意:只处理第一条,因为处理后界面会变动(展开文字),坐标会失效
target = target_voices[0]
vx, vy = int(target['center'][0]), int(target['center'][1])
# 标记为已处理
processed_y_coords.add(target['y'])
logger.info(f"准备处理语音 ({vx}, {vy})...")
# 高亮正在处理的语音并保存更新后的调试图
draw_debug_info(current_output_path, messages, current_voice_center=(vx, vy))
# 执行操作:长按 -> 转文字
logger.info(f"正在长按语音消息 ({vx}, {vy})...")
d.long_click(vx, vy, 1.5)
time.sleep(1.5)
d.long_click(vx, vy, 1.0) # 缩短按压时间
# B. 截图寻找“转文字”按钮
menu_shot = get_next_debug_path("step_long_press")
d.screenshot(menu_shot)
# 轮询寻找“转文字”按钮
logger.info("正在快速寻找'转文字'按钮...")
zhuan_template = os.path.join(TEMPLATE_DIR, "zhun_wen_zi.jpg")
btn_pos = None
btn_pos = find_template_match(menu_shot, zhuan_template, threshold=0.7)
poll_start = time.time()
while time.time() - poll_start < 3.0: # 最多等 3 秒
menu_shot = get_next_debug_path("step_long_press_poll")
d.screenshot(menu_shot)
btn_pos = find_template_match(menu_shot, zhuan_template, threshold=0.7)
if btn_pos:
break
time.sleep(0.2) # 快速轮询
if btn_pos:
btn_x, btn_y = int(btn_pos[0]), int(btn_pos[1])
logger.info(f"✅ 找到'转文字'按钮: ({btn_x}, {btn_y}),点击中...")
safe_device_click(d, btn_x, btn_y)
convert_opened_count += 1
# 等待转换完成
logger.info("等待语音转文字完成...")
time.sleep(5.0)
time.sleep(3.0) # 缩短等待时间 (原5.0s)
# C. 再次截图 OCR 获取转换后的文字
after_convert_shot = get_next_debug_path("step_convert_result")
try:
d.screenshot(after_convert_shot)
convert_ocr = ocr_kit.read_text(after_convert_shot)
except Exception as e:
logger.error(f"截图或 OCR 失败: {e}")
convert_ocr = []
# --- Peek-and-Restore 逻辑 ---
# ... (中间提取文字逻辑不变) ...
# 1. 截图读取内容
peek_shot = get_next_debug_path("step_peek_content")
d.screenshot(peek_shot)
logger.info("正在读取转换后的语音内容...")
peek_messages, _ = _scan_chat_messages(peek_shot)
# D. (已移除) 不再执行“取消转文字”操作,保留文字以避免重复识别
# 之前此处会执行 long_click -> cancel_template -> safe_device_click
# 为了解决“反复打开”的问题,现在改为保留转出来的文字
logger.info("保留语音转换后的文字,不进行恢复界面操作")
else:
logger.warning("❌ 未能找到'转文字'按钮,点击空白处退出")
safe_device_click(d, vx + 300, vy)
# 8. 重新排序并生成完整的对话日志
# 先合并已经处理好的语音消息内容
# 排除掉转换文字本身产生的 OCR 文本干扰(如果 OCR 识别结果包含在文本消息中,需要过滤)
final_messages = []
# 1. 识别并归档所有属于语音转换出来的文字
for v_msg in messages:
if v_msg['type'] == 'voice':
vx, vy = v_msg['center']
v_content_blocks = []
# 找出所有在语音图标下方且水平相近的文本块,且发送者一致
for msg in messages:
if msg['type'] == 'text':
cx, cy = msg['center']
# 1. 垂直距离在合理范围内 (30 到 600 像素)
# 2. 水平偏移在合理范围内 (增加到 600 像素以适配宽文本泡)
# 3. 关键:确保这两个坐标之间没有其他的语音图标(防止第一个语音“偷”了第二个语音的文字)
v_dist = cy - vy
h_dist = abs(cx - vx)
if 30 < v_dist < 600 and h_dist < 600:
# 检查中间是否有其他语音图标
has_intermediate_audio = False
for other_ax, other_ay in audio_matches:
if vy < other_ay < cy:
has_intermediate_audio = True
break
if has_intermediate_audio:
continue
# 发送者判定
if msg['sender'] == v_msg['sender']:
v_content_blocks.append(msg)
msg['is_voice_part'] = True
logger.info(f"关联成功: 语音({vx}, {vy}) -> 文本('{msg['content']}') [h_dist={h_dist:.1f}, v_dist={v_dist:.1f}]")
# 2. 查找并保存内容
found_content = None
current_voice_in_peek = None
for pm in peek_messages:
if pm['type'] == 'voice' and pm.get('is_converted'):
# 简单匹配Y坐标接近 (容差 50px)
# 注意:如果文字展开,下方元素会被推下去,但当前语音本身的位置变化取决于展开方向
# 通常语音条下方展开文字语音条本身Y坐标变化不大
if abs(pm['y'] - vy) < 50:
found_content = pm.get('content')
current_voice_in_peek = pm
break
# 如果有内容块,按 Y 排序并合并
if v_content_blocks:
v_content_blocks.sort(key=lambda x: x['y'])
combined_content = "".join([m['content'] for m in v_content_blocks])
v_msg['content'] = combined_content
v_msg['is_converted'] = True
if found_content:
logger.info(f"✅ [Peek] 成功抓取语音内容: {found_content}")
captured_voice_contents[target['y']] = found_content
else:
if not v_msg.get('content'):
logger.warning(f"语音({vx}, {vy}) 未能关联到任何文本块")
logger.warning("⚠️ [Peek] 未能抓取到语音内容 (可能识别失败)")
# 2. 收集最终要显示的消息(排除被标记为语音部分的文本)
for msg in messages:
if msg['type'] == 'text':
if not msg.get('is_voice_part', False):
final_messages.append(msg)
# 3. 还原状态 (取消转文字)
logger.info("准备还原状态 (取消转文字)...")
click_x, click_y = vx, vy
if current_voice_in_peek:
click_x, click_y = int(current_voice_in_peek['center'][0]), int(current_voice_in_peek['center'][1])
d.long_click(click_x, click_y, 1.0) # 缩短按压时间
logger.info("正在快速寻找'隐藏文字'按钮...")
cancel_template = os.path.join(TEMPLATE_DIR, "cancel_zhuan_wen_zi.jpg")
cancel_btn = None
poll_start = time.time()
while time.time() - poll_start < 3.0:
restore_menu_shot = get_next_debug_path("step_restore_poll")
d.screenshot(restore_menu_shot)
cancel_btn = find_template_match(restore_menu_shot, cancel_template, threshold=0.7)
if cancel_btn:
break
time.sleep(0.2)
if cancel_btn:
cx, cy = int(cancel_btn[0]), int(cancel_btn[1])
logger.info(f"✅ 找到'隐藏文字'按钮: ({cx}, {cy}),点击还原...")
safe_device_click(d, cx, cy)
convert_closed_count += 1
time.sleep(2.0) # 等待收起动画
else:
logger.warning("❌ 未找到'隐藏文字'按钮,无法还原状态!(后续可能导致重复处理)")
# 4. 准备下一次循环
# 重新截图,因为界面可能微调,或者只是恢复了
next_screenshot = get_next_debug_path("step_restored")
d.screenshot(next_screenshot)
current_image_path = next_screenshot
current_output_path = get_next_debug_path("flag_restored")
continue
else:
final_messages.append(msg)
logger.warning("❌ 未找到'转文字'按钮,可能是已转换或误判")
# 即使失败,也已记录在 processed_y_coords 中,避免死循环
# 继续尝试下一条语音
logger.info("跳过当前语音,继续扫描...")
continue
# 按 Y 坐标排序
final_messages.sort(key=lambda x: x['y'])
# 格式化输出到控制台
print("\n" + "="*50)
print(" --- 微信聊天记录提取结果 ---")
print("="*50)
# 循环结束,返回最后一次分析的结果
if not final_messages: # 如果循环因为 max_loops 退出,确保有结果
final_messages = messages
# 注入 peek 到的内容
if captured_voice_contents:
logger.info(f"正在注入 {len(captured_voice_contents)} 条已还原的语音内容...")
for m in final_messages:
if m['type'] == 'voice' and not m.get('content'):
for py, content in captured_voice_contents.items():
if abs(m['y'] - py) < 30: # 匹配原始 Y 坐标
m['content'] = content
m['is_converted'] = True # 标记为逻辑上已转换
logger.info(f" -> 注入内容: {content[:10]}...")
break
# 构造返回值
dialogue_log = []
# 使用 debug_img 的尺寸,如果 debug_img 未定义(极端情况),默认 1080x1920
if 'debug_img' in locals() and debug_img is not None:
input_field_coordinates = (debug_img.shape[1] // 2, int(debug_img.shape[0] * 0.9))
else:
# 尝试读取 current_image_path
try:
tmp_img = cv2.imread(current_image_path)
input_field_coordinates = (tmp_img.shape[1] // 2, int(tmp_img.shape[0] * 0.9))
except:
input_field_coordinates = (540, 1728)
# 找出最后一条消息
last_msg = None
if final_messages:
final_messages.sort(key=lambda x: x['y'])
last_msg = final_messages[-1]
# 转换为 dialogue_log 格式 (简单转换,具体业务逻辑在调用方处理)
# 注意T2 需要的是上下文列表
pass # 实际上 T2 使用的是 LLM 上下文构建,这里不需要转换成特定 dict 结构,
# 但为了兼容旧接口,我们还是返回 messages 列表给调用者处理,
# 或者在这里处理成 (role, content) 列表?
# 原代码似乎没有做太多转换,而是直接返回 messages 列表?
# 仔细看原代码analyze_chat_image 并没有返回 messages 列表!
# 它返回 dialogue_log, input_pos
# 原代码 lines 339-340: dialogue_log = []
# 可以在最后统一生成
# 统一生成 dialogue_log
for msg in final_messages:
sender = msg['sender']
content = msg.get('content') or (msg.get('text') if 'text' in msg else "[未识别内容]")
if msg['type'] == 'voice':
content = f"[语音] {content}"
log_line = f"{sender}: {content}"
dialogue_log.append(log_line)
print(log_line)
print("="*50 + "\n")
# 只添加有内容的文本消息,或已转换且有内容的语音消息
if msg['type'] == 'text' and msg.get('content'):
dialogue_log.append(msg)
elif msg['type'] == 'voice' and msg.get('is_converted') and msg.get('content'):
dialogue_log.append(msg)
logger.info(f"📊 [统计] 语音总数: {total_voices_count}, 打开转文字次数: {convert_opened_count}, 关闭转文字次数: {convert_closed_count}")
return dialogue_log, input_field_coordinates
except Exception as e:
logger.error(f"analyze_chat_image 失败: {e}", exc_info=True)
logger.error(f"分析过程发生异常: {e}", exc_info=True)
return [], None
@@ -735,3 +856,39 @@ def match_template_center(image_path, template_path, threshold=0.8):
return None
async def get_first_screen(device=None):
"""
获取刚进入界面的首屏信息:
1. 截图
2. 全量识别 (策略=ALL),包含语音转文字 Peek-and-Restore
3. 返回识别结果和相关图片路径
Returns:
tuple: (dialogue_log, input_pos, enter_path, flag_path)
"""
logger.info("🔍 [get_first_screen] 正在进行首屏全量识别...")
if not device:
device = connect_device()
if not device:
logger.error("设备连接失败,无法获取首屏")
return [], None, None, None
# 1. 截图
enter_path = get_next_debug_path("enter")
device.screenshot(enter_path)
logger.info(f"📸 已保存进入截图: {enter_path}")
# 2. 识别
flag_path = get_next_debug_path("flag")
dialogue_log, input_pos = await analyze_chat_image(
enter_path,
flag_path,
device=device,
process_strategy="ALL"
)
return dialogue_log, input_pos, enter_path, flag_path