This commit is contained in:
HuangHai
2026-01-26 16:56:09 +08:00
parent 7514d3119f
commit cbc59c6628
4 changed files with 360 additions and 311 deletions

View File

@@ -1,9 +1,11 @@
# coding=utf-8
import os
import sys
import time
import logging
import asyncio
import hashlib
import cv2
# 添加项目根目录到 sys.path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -11,25 +13,38 @@ if project_root not in sys.path:
sys.path.append(project_root)
from WeiXin import WxUtil
from WeiXin.WxUtil import perform_input_action
from Util.LlmUtil import get_llm_response
from Util import Win32Patch
# 配置日志
log_dir = WxUtil.LOG_DIR
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(log_dir, "T2_ChatMonitor.log"), encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger("T2_ChatMonitor")
log_file_path = os.path.join(log_dir, "T2_ChatMonitor.log")
class CVDebugTask:
# 设置 logger
logger = logging.getLogger("T2_ChatMonitor")
logger.setLevel(logging.INFO)
if logger.hasHandlers():
logger.handlers.clear()
file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='a')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(stream_handler)
logger.propagate = False
logger.info(f"🚀 日志文件路径: {os.path.abspath(log_file_path)}")
class ChatMonitorBot:
"""
结构化的 CV 语音调试任务,支持分步执行和单元测试
大张老师自动巡课系统 (CV版)
"""
def __init__(self):
self.device = None
@@ -37,6 +52,43 @@ class CVDebugTask:
self.debug_view_path = os.path.join(WxUtil.OUTPUT_DIR, "T2_ChatMonitor_debug_view.jpg")
self.dialogue_log = []
self.input_pos = None
self.last_screen_hash = None
self.last_processed_msg_hash = None
self.check_interval = 5 # 检查频率 (秒)
self.persona = (
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师名叫大张老师。你目前在长春市少惠林作文素养培养中心工作。"
"你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。"
"你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。"
"【严格约束】:\n"
"1. 绝对禁止发散!绝对禁止幻觉!\n"
"2. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
"3. 仅针对家长明确表达的内容进行回复。\n"
"4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
"5. 回复必须简练,字数严格控制在 50 字以内!\n"
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
"- 单位:长春市少惠林作文素养培养中心\n"
"- 地址南环城路与临河街交汇TOUCH12街3楼325号\n"
"- 联系人小张老师电话18686619970\n"
"- 每学期开学招收小学三年级至六年级,初中七年级的学生入学,其它年段不招生。\n"
)
async def get_reply(self, last_message_text, context_text=""):
prompt = (
f"【教师人设】:{self.persona}\n\n"
f"【上下文对话内容】:\n{context_text}\n\n"
f"【最后一条待回复消息】:\n{last_message_text}\n\n"
"【任务要求】:\n"
"请作为大张老师回复家长。**必须且只能针对最后一条消息进行回复!**\n"
"参考上下文对话内容,确保回复逻辑连贯。\n"
"严禁发散,严禁编造家长没说过的情况。如果不清楚家长的意图,就温柔询问。\n"
"字数严格控制在 50 字以内。直接输出回复正文。"
)
full_response = ""
async for chunk in get_llm_response(prompt, stream=False):
full_response += chunk
return full_response.strip().strip('"').strip('').strip('')
def step_1_prepare_env(self):
"""步骤1: 环境准备"""
@@ -53,81 +105,172 @@ class CVDebugTask:
return False
return True
def step_3_capture_screen(self):
"""步骤3: 捕获屏幕截图"""
logger.info("--- [Step 3] 捕获屏幕截图 ---")
def get_image_hash(self, file_path):
"""计算图片的 MD5 哈希值 (忽略顶部 100 像素的状态栏)"""
if not os.path.exists(file_path):
return None
try:
if not self.device:
logger.error("❌ 未连接设备,无法截图")
return False
self.device.screenshot(self.screenshot_path)
logger.info(f"✅ 截图已保存: {self.screenshot_path}")
return True
# 使用 OpenCV 读取图片
img = cv2.imread(file_path)
if img is None:
# 如果读取失败,回退到文件哈希
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
# 裁剪掉顶部 100 像素 (状态栏/时间)
h, w = img.shape[:2]
if h > 100:
cropped_img = img[100:h, 0:w]
else:
cropped_img = img
# 计算裁剪后数据的哈希
return hashlib.md5(cropped_img.tobytes()).hexdigest()
except Exception as e:
logger.error(f"❌ 截图失败: {e}")
return False
logger.error(f"计算哈希出错: {e}, 回退到文件哈希")
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
async def step_4_analyze_and_process(self, use_existing_image=False):
"""
步骤4: 分析图片并处理语音转换
:param use_existing_image: 是否使用已有的图片进行离线测试
"""
logger.info("--- [Step 4] 分析图片与语音处理 ---")
async def run(self):
"""主运行循环"""
logger.info("🚀 大张老师自动巡课系统启动 (T2 增强版)...")
target_img = self.screenshot_path
if use_existing_image:
if not os.path.exists(target_img):
logger.error(f"❌ 找不到指定的离线图片: {target_img}")
return False
logger.info(f"📂 正在使用离线图片进行测试: {target_img}")
# 1. 环境准备
if not self.step_1_prepare_env(): return
if not self.step_2_connect_device(): return
# 2. 首次运行:识别所有语音并获取上下文
logger.info("🔍 [首次运行] 正在进行全量识别,获取对话上下文...")
# 使用顺序命名
enter_path = WxUtil.get_next_debug_path("enter")
flag_path = WxUtil.get_next_debug_path("flag")
self.device.screenshot(enter_path)
logger.info(f"📸 已保存进入截图: {enter_path}")
# 同时也更新 live_shot
import shutil
shutil.copy(enter_path, self.screenshot_path)
# 调用核心分析逻辑
# 注意即使是离线分析WxUtil 内部也会尝试连接设备以进行长按操作
self.dialogue_log, self.input_pos = await WxUtil.analyze_chat_image(
target_img,
self.debug_view_path,
device=self.device
self.screenshot_path,
flag_path,
device=self.device,
only_process_last_voice=False # 首次运行:全量处理
)
if self.dialogue_log:
logger.info("✅ 任务处理完成,已生成对话日志")
return True
else:
logger.warning("⚠️ 未识别到任何有效的聊天内容")
return False
def step_5_report_results(self):
"""步骤5: 输出最终报告"""
logger.info("--- [Step 5] 结果汇总 ---")
if self.input_pos:
logger.info(f"📍 识别到输入框位置: {self.input_pos}")
# 如果 flag_path 生成了,也复制一份给 debug_view_path
if os.path.exists(flag_path):
shutil.copy(flag_path, self.debug_view_path)
logger.info(f"📸 已保存识别标记图: {flag_path}")
if self.dialogue_log:
logger.info("📋 最终对话内容提取结果已输出到控制台 (见上方横线区域)")
logger.info(f"✅ 首次运行识别完成,获取到 {len(self.dialogue_log)} 条消息上下文")
# 初始化最后处理的消息哈希,避免重复回复第一条
last_msg = self.dialogue_log[-1]
self.last_processed_msg_hash = hashlib.md5(last_msg.encode('utf-8')).hexdigest()
self.last_screen_hash = self.get_image_hash(self.screenshot_path)
else:
logger.warning("❌ 无对话内容输出")
return True
logger.warning("⚠️ 首次运行未识别到有效对话")
async def run_structured_debug():
# 3. 进入循环阶段
logger.info("🔄 进入实时监控阶段...")
while True:
try:
# A. 截图并计算哈希
self.device.screenshot(self.screenshot_path)
current_screen_hash = self.get_image_hash(self.screenshot_path)
# B. 如果屏幕无变化,则跳过识别
if current_screen_hash == self.last_screen_hash:
await asyncio.sleep(self.check_interval)
continue
self.last_screen_hash = current_screen_hash
logger.info("📸 屏幕发生变化,正在分析...")
# C. 分析最新图片
dialogue_log, input_pos = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
only_process_last_voice=True # 循环监控:仅处理最新一条
)
if not dialogue_log:
logger.info("😴 未识别到有效消息")
await asyncio.sleep(self.check_interval)
continue
logger.info(f"📊 当前识别到 {len(dialogue_log)} 条消息,最后一条: {dialogue_log[-1]}")
# 更新当前对话日志(可用于上下文参考)
self.dialogue_log = dialogue_log
self.input_pos = input_pos
# D. 只关注最后一条消息
last_msg = dialogue_log[-1]
current_msg_hash = hashlib.md5(last_msg.encode('utf-8')).hexdigest()
# E. 判断是否需要回复 (对方发送且非重复消息)
if "对方:" in last_msg:
if current_msg_hash != self.last_processed_msg_hash:
event_shot = WxUtil.get_next_debug_path("event_new_msg")
self.device.screenshot(event_shot)
logger.info(f"💡 发现新消息: {last_msg},保存现场截图: {event_shot}")
# 获取上下文文本
context_text = "\n".join(dialogue_log[:-1])
# 生成回复
reply = await self.get_reply(last_msg, context_text)
if reply:
logger.info(f"🤖 LLM 回复: {reply}")
if self.input_pos:
perform_input_action(self.device, self.input_pos, reply)
# 发送后截图留存
reply_sent_shot = WxUtil.get_next_debug_path("event_reply_sent")
self.device.screenshot(reply_sent_shot)
logger.info(f"✅ 回复已发送,保存发送后截图: {reply_sent_shot}")
self.last_processed_msg_hash = current_msg_hash
else:
logger.warning("❌ 未找到输入框位置,无法发送")
else:
logger.warning("⚠️ LLM 未生成有效回复")
else:
# 消息已处理过
pass
else:
# 最后一条是我发送的
if current_msg_hash != self.last_processed_msg_hash:
logger.info(f"⚪ 最后一条消息非对方发送,跳过回复: {last_msg}")
self.last_processed_msg_hash = current_msg_hash
await asyncio.sleep(self.check_interval)
except Exception as e:
logger.error(f"❌ 循环中发生错误: {e}", exc_info=True)
await asyncio.sleep(self.check_interval)
async def run_main():
"""
按步骤运行完整的调试任务
运行自动巡课机器人
"""
task = CVDebugTask()
# 顺序执行各步骤
if not task.step_1_prepare_env(): return
if not task.step_2_connect_device(): return
if not task.step_3_capture_screen(): return
# 执行耗时的分析和处理步骤
success = await task.step_4_analyze_and_process()
if success:
task.step_5_report_results()
logger.info("✨ 调试任务全部顺利完成!")
else:
logger.error("❌ 调试任务在处理阶段失败")
bot = ChatMonitorBot()
await bot.run()
if __name__ == "__main__":
# 运行结构化的调试流程
asyncio.run(run_structured_debug())
# 应用 Win32 补丁
Win32Patch.patch()
try:
# 运行机器人
asyncio.run(run_main())
except KeyboardInterrupt:
logger.info("🛑 用户手动停止程序。")
except Exception as e:
logger.error(f"❌ 程序异常退出: {e}", exc_info=True)

View File

@@ -1,162 +0,0 @@
# coding=utf-8
import asyncio
import logging
import os
import sys
import time
from datetime import datetime
import hashlib
# 添加项目根目录到 sys.path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
from Util import Win32Patch
from WeiXin import WxUtil
from WeiXin.WxUtil import perform_input_action
from Util.LlmUtil import get_llm_response
# 配置日志
log_dir = WxUtil.LOG_DIR
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 设置 logger
logger = logging.getLogger("T5_AutoChatMonitor")
logger.setLevel(logging.INFO)
if logger.hasHandlers():
logger.handlers.clear()
log_file_path = os.path.join(log_dir, "T5_AutoChatMonitor.log")
file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='w')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(stream_handler)
logger.propagate = False
logger.info(f"日志文件路径: {log_file_path}")
# 配置参数
CHECK_INTERVAL = 5 # 检查频率 (秒)
class ChatBot:
def __init__(self):
# 运行前清理 Logs 和 Output
WxUtil.setup_script_environment()
self.d = WxUtil.connect_device()
if not self.d:
raise Exception("无法连接到设备,任务终止")
self.last_processed_msg_hash = None # 记录最后一条已处理消息的哈希值
self.screenshot_dir = WxUtil.OUTPUT_DIR
self.persona = (
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师名叫大张老师。你目前在长春市少惠林作文素养培养中心工作。"
"你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。"
"你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。"
"【严格约束】:\n"
"1. 绝对禁止发散!绝对禁止幻觉!\n"
"2. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
"3. 仅针对家长明确表达的内容进行回复。\n"
"4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
"5. 回复必须简练,字数严格控制在 50 字以内!\n"
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
"- 单位:长春市少惠林作文素养培养中心\n"
"- 地址南环城路与临河街交汇TOUCH12街3楼325号\n"
"- 联系人小张老师电话18686619970\n"
"- 每学期开学招收小学三年级至六年级,初中七年级的学生入学,其它年段不招生。\n"
)
async def get_reply(self, last_message_text):
prompt = (
f"【教师人设】:{self.persona}\n\n"
f"【最后一条消息】:\n{last_message_text}\n\n"
"【任务要求】:\n"
"请作为大张老师回复家长。**必须且只能针对最后一条消息进行回复!**\n"
"严禁发散,严禁编造家长没说过的情况。如果不清楚家长的意图,就温柔询问。\n"
"字数严格控制在 50 字以内。直接输出回复正文。"
)
full_response = ""
async for chunk in get_llm_response(prompt, stream=False):
full_response += chunk
return full_response.strip().strip('"').strip('').strip('')
async def run(self):
logger.info("🚀 大张老师自动巡课系统启动 (CV版)...")
while True:
try:
# 1. 截图并分析
image_path = os.path.join(self.screenshot_dir, "current_screen.jpg")
self.d.screenshot(image_path)
# 使用 WxUtil 的集中式分析逻辑
# 它会自动处理语音转文字,并返回对话列表和输入框坐标
dialogue_log, input_pos = await WxUtil.analyze_chat_image(image_path, self.screenshot_dir, device=self.d)
if not dialogue_log:
logger.info("😴 未发现有效消息,等待下一次轮询。")
await asyncio.sleep(CHECK_INTERVAL)
continue
# 2. 只关注最后一条消息
last_msg = dialogue_log[-1]
logger.info(f"最后一条消息: {last_msg}")
# 计算最后一条消息的哈希值,用于去重
current_msg_hash = hashlib.md5(last_msg.encode('utf-8')).hexdigest()
# 3. 判断是否需要回复
# 规则:最后一条消息由“对方”发送,且不是上一次处理过的消息
if "对方:" in last_msg:
if current_msg_hash != self.last_processed_msg_hash:
logger.info(f"💡 发现新消息,准备生成回复: {last_msg}")
# 生成回复
reply = await self.get_reply(last_msg)
if reply:
logger.info(f"🤖 LLM 回复: {reply}")
# 执行输入和发送
if input_pos:
perform_input_action(self.d, input_pos, reply)
logger.info("✅ 回复已发送")
# 成功发送后更新最后处理的消息哈希
self.last_processed_msg_hash = current_msg_hash
else:
logger.warning("❌ 未找到输入框位置,无法发送回复")
else:
logger.warning("⚠️ LLM 未生成有效回复")
else:
# 消息已处理过,不重复回复
pass
else:
# 最后一条是我发送的或者是系统消息,更新哈希以防之后重复处理(如果之后又变成对方发)
# 或者简单地跳过
if current_msg_hash != self.last_processed_msg_hash:
logger.info(f"⚪ 最后一条消息非对方发送,无需回复: {last_msg}")
self.last_processed_msg_hash = current_msg_hash
# 4. 休眠
await asyncio.sleep(CHECK_INTERVAL)
except Exception as e:
logger.error(f"❌ 主循环发生错误: {e}", exc_info=True)
await asyncio.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
Win32Patch.patch()
bot = ChatBot()
try:
asyncio.run(bot.run())
except KeyboardInterrupt:
logger.info("🛑 用户手动停止程序。")
except Exception as e:
logger.error(f"❌ 程序异常退出: {e}", exc_info=True)

View File

@@ -14,6 +14,7 @@ if project_root not in sys.path:
sys.path.append(project_root)
import json
from datetime import datetime
from Util.EasyOcrKit import EasyOcrKit
# 初始化 EasyOcrKit
@@ -29,14 +30,30 @@ LOG_DIR = os.path.join(BASE_DATA_DIR, "Logs")
OUTPUT_DIR = os.path.join(BASE_DATA_DIR, "Output")
TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Templates")
def clear_directory(dir_path):
"""清理指定目录下的所有文件"""
# 全局调试图片计数器
_debug_counter = 0
def get_next_debug_path(desc="step"):
"""获取下一个顺序命名的调试图片路径 (debug_N_desc.jpg)"""
global _debug_counter
_debug_counter += 1
filename = f"debug_{_debug_counter}_{desc}.jpg"
return os.path.join(OUTPUT_DIR, filename)
def clear_directory(dir_path, exclude_files=None):
"""清理指定目录下的所有文件,支持排除特定文件"""
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return
if exclude_files is None:
exclude_files = []
import shutil
for filename in os.listdir(dir_path):
if filename in exclude_files:
continue
file_path = os.path.join(dir_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
@@ -49,7 +66,11 @@ def clear_directory(dir_path):
def setup_script_environment():
"""运行前清理日志和输出目录"""
logger.info("清理运行环境: Logs 和 Output 目录...")
clear_directory(LOG_DIR)
# 重置调试计数器
global _debug_counter
_debug_counter = 0
# 排除当前正在使用的日志文件
clear_directory(LOG_DIR, exclude_files=["T2_ChatMonitor.log", "WxUtil.log"])
clear_directory(OUTPUT_DIR)
def connect_device():
@@ -93,10 +114,11 @@ def safe_device_click(d, x, y):
logger.error(f"重试点击操作依然失败: {e2}")
return False
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方"):
async def analyze_chat_image(image_path, output_path, device=None, target_name="对方", only_process_last_voice=False):
"""
全面采用 CV + OCR 识别微信聊天截图中的最后一条消息
不再使用 VLM
:param only_process_last_voice: 如果为 True仅处理转文字屏幕上最后一条未转换的语音消息
"""
try:
# 1. 初始化
@@ -122,14 +144,31 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
logger.info("正在执行 OCR 识别...")
ocr_results = ocr_kit.read_text(image_path)
# 微信菜单关键字(用于排除干扰)
MENU_KEYWORDS = ["听筒播放", "收藏", "背景播放", "删除", "多选", "取消转文字", "转文字", "引用", "提醒"]
# 5. 整合所有消息
messages = []
debug_img = img.copy() # 初始化调试图
# 绘制过滤区域边界 (可视化)
cv2.line(debug_img, (0, 150), (w, 150), (255, 0, 255), 2) # 顶部线
cv2.line(debug_img, (0, h - 100), (w, h - 100), (255, 0, 255), 2) # 底部线 (从 180 改为 100)
cv2.putText(debug_img, "TOP_FILTER", (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
cv2.putText(debug_img, "BOTTOM_FILTER", (10, h - 110), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1)
# A. 添加语音消息
for ax, ay in audio_matches:
# 过滤掉顶部和底部的非聊天区域 (经验值: 顶部150, 底部250)
if ay < 150 or ay > h - 250:
# 标记所有找到的语音图标 (用于调试)
cv2.circle(debug_img, (ax, ay), 10, (255, 255, 0), -1) # 青色实心圆表示原始匹配点
# 过滤掉顶部和底部的非聊天区域
# 顶部标题栏通常在 150 像素以内
# 底部输入栏通常在 100 像素以内 (捕捉最底部的文字)
if ay < 150 or ay > h - 100:
logger.info(f"忽略区域外语音图标: ({ax}, {ay})")
cv2.rectangle(debug_img, (ax-35, ay-35), (ax+35, ay+35), (128, 128, 128), 1) # 灰色框表示被过滤
cv2.putText(debug_img, "FILTERED", (ax - 40, ay - 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1)
continue
sender = "对方" if ax < w / 2 else ""
@@ -144,15 +183,58 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
# --- 新增:判断是否已转文字 ---
# --- 改进:判断是否已转文字 ---
is_converted = False
converted_trigger_text = ""
for bbox, text, conf in ocr_results:
c_x = int((bbox[0][0] + bbox[2][0]) / 2)
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 转换后的文字通常在语音图标下方 30-300 像素内,且水平位置相近
if 30 < c_y - ay < 300 and abs(c_x - ax) < 200:
is_converted = True
break
# 判定逻辑:
# 1. 转换后的文字通常在语音图标下方 30-600 像素 (增加到 600 以适配长文本)
# 2. 水平位置偏移在 600 像素内 (增加到 600 以适配宽文本泡)
# 3. 关键:确保这两个坐标之间没有其他的语音图标(防止第一个语音“偷”了第二个语音的文字)
if 30 < c_y - ay < 600 and abs(c_x - ax) < 600:
# 检查中间是否有其他语音图标
has_intermediate_audio = False
for other_ax, other_ay in audio_matches:
# 增加 20 像素缓冲区,防止判定到自身或极近的干扰点
if ay + 20 < other_ay < c_y - 10:
has_intermediate_audio = True
logger.info(f"语音({ax},{ay}) 被中间语音图标({other_ax},{other_ay}) 阻断,无法关联文本 '{text[:10]}...'")
break
if has_intermediate_audio:
continue
clean_text = text.strip()
# 判定是否为时间戳 (如 13:49, 09:26)
# 增强:同时支持 "昨天 14:15" 这种格式
is_timestamp = re.search(r'(\d{1,2}:\d{2})', clean_text) and (len(clean_text) < 15)
# 判定是否为纯数字或时长 (如 5", 3", 少3")
# 增强:允许前面有少量杂讯字符,只要结尾是数字或 "
is_duration = re.search(r'\d{1,2}"?$', clean_text) and len(clean_text) < 6
# 排除掉语音时长、时间戳和菜单关键字的干扰
if not is_duration and not is_timestamp and clean_text not in MENU_KEYWORDS:
is_converted = True
converted_trigger_text = clean_text
logger.info(f"语音({ax},{ay}) 判定为已转换,关联到有效文本: '{clean_text}'")
break
else:
if is_timestamp:
logger.info(f"语音({ax},{ay}) 忽略下方时间戳文本: '{clean_text}'")
elif is_duration:
logger.info(f"语音({ax},{ay}) 忽略时长文本: '{clean_text}'")
if is_converted:
logger.info(f"语音消息 ({ax}, {ay}) 已有转换文字: '{converted_trigger_text}',跳过")
# --- 恢复绘图反馈 ---
# 根据已读/未读画框:未读红框,已读绿框
color = (0, 0, 255) if is_unread else (0, 255, 0)
cv2.rectangle(debug_img, (ax-30, ay-30), (ax+30, ay+30), color, 2)
label = "YES" if is_converted else "NO"
# 在框的右侧标注 YES 或 NO
@@ -165,7 +247,8 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
"center": (ax, ay),
"y": ay,
"is_unread": is_unread,
"is_converted": is_converted
"is_converted": is_converted,
"content": None
})
# B. 添加文本消息
@@ -174,30 +257,24 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
c_y = int((bbox[0][1] + bbox[2][1]) / 2)
# 过滤区域 (顶部标题栏和底部输入栏)
# 底部输入栏通常在最后 150 像素左右
if c_y < 150 or c_y > h - 150:
# 底部输入栏通常在 100 像素以内 (捕捉最底部的文字)
if c_y < 150 or c_y > h - 100:
continue
# 过滤掉明显的系统词 (通常是日期或时间)
# 匹配如: "2025年12月28日 11:18", "11:18", "昨天 09:26" 等
# 增加对 OCR 误识别的容错 (如 28811:18)
time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)'
# 如果文本包含这些关键词且长度较短,或者是纯数字/标点组合
if len(text) < 20 and (re.search(time_pattern, text) or re.match(r'^[0-9:\s日年月\-]+$', text)):
continue
# 过滤掉语音时长标识 (如 "5\"", "10\"", "小8\"")
if re.match(r'^.?[0-9]{1,2}"?$', text.strip()):
continue
# 过滤掉“撤回了一条消息”等系统提示
if "撤回了一条消息" in text or "打招呼的消息" in text:
continue
# 排除干扰:语音时长、菜单关键字、系统提示
clean_text = text.strip()
if re.match(r'^.?[0-9]{1,2}"?$', clean_text): continue
if clean_text in MENU_KEYWORDS: continue
if "撤回了一条消息" in text or "打招呼的消息" in text: continue
# 改进发送者判定:查看文本块的左边界
# 对方的消息靠左,我的消息靠右
left_x = bbox[0][0]
sender = "对方" if left_x < w * 0.3 else ""
sender = "对方" if left_x < w * 0.5 else ""
messages.append({
"type": "text",
@@ -229,8 +306,15 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
# 获取所有语音消息(不论已读未读,只要没转换成文字就处理)
unconverted_voices = [m for m in messages if m['type'] == 'voice' and not m.get('is_converted')]
# 按 Y 坐标排序,确保从上到下顺序
unconverted_voices.sort(key=lambda x: x['y'])
if unconverted_voices:
logger.info(f"发现 {len(unconverted_voices)} 条未转换的语音,开始处理...")
if only_process_last_voice:
logger.info(f"策略限制:仅处理最后一条未转换语音 (共发现 {len(unconverted_voices)} 条)")
unconverted_voices = [unconverted_voices[-1]]
else:
logger.info(f"发现 {len(unconverted_voices)} 条未转换的语音,开始全部处理...")
for v_msg in unconverted_voices:
vx, vy = int(v_msg['center'][0]), int(v_msg['center'][1])
@@ -242,7 +326,7 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
time.sleep(1.5)
# B. 截图寻找“转文字”按钮
menu_shot = os.path.join(OUTPUT_DIR, f"voice_menu_{vy}.jpg")
menu_shot = get_next_debug_path("step_long_press")
d.screenshot(menu_shot)
zhuan_template = os.path.join(TEMPLATE_DIR, "zhun_wen_zi.jpg")
@@ -258,7 +342,7 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
time.sleep(5.0)
# C. 再次截图 OCR 获取转换后的文字
after_convert_shot = os.path.join(OUTPUT_DIR, f"after_auto_{vy}.jpg")
after_convert_shot = get_next_debug_path("step_convert_result")
try:
d.screenshot(after_convert_shot)
convert_ocr = ocr_kit.read_text(after_convert_shot)
@@ -266,48 +350,12 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
logger.error(f"截图或 OCR 失败: {e}")
convert_ocr = []
# 提取转换文字(合并多行结果)
text_blocks = []
for c_bbox, c_text, c_conf in convert_ocr:
cc_x = (c_bbox[0][0] + c_bbox[2][0]) / 2
cc_y = (c_bbox[0][1] + c_bbox[2][1]) / 2
# 扩大搜索范围,适应更长的转换结果
# 增加 sender 判断 (通过水平位置判定)
c_left_x = c_bbox[0][0]
c_sender = "对方" if c_left_x < w * 0.3 else ""
if 30 < cc_y - vy < 600 and abs(cc_x - vx) < 400 and c_sender == v_msg['sender']:
text_blocks.append((cc_y, c_text))
# ... (中间提取文字逻辑不变) ...
# 按 Y 坐标排序并合并
text_blocks.sort(key=lambda x: x[0])
converted_text = "".join([t[1] for t in text_blocks])
if converted_text:
logger.info(f"✨ OCR 识别成功: {converted_text}")
v_msg['content'] = converted_text
v_msg['is_converted'] = True
else:
logger.warning("❌ OCR 未能提取到转换后的文字内容")
# D. 长按并点击“取消转文字”恢复界面
try:
logger.info("正在恢复界面状态 (点击'取消转文字')...")
d.long_click(vx, vy, 1.5)
time.sleep(1.0)
cancel_shot = os.path.join(OUTPUT_DIR, f"cancel_menu_{vy}.jpg")
d.screenshot(cancel_shot)
cancel_template = os.path.join(TEMPLATE_DIR, "cancel_zhuan_wen_zi.jpg")
cancel_btn = find_template_match(cancel_shot, cancel_template, threshold=0.7)
if cancel_btn:
c_btn_x, c_btn_y = int(cancel_btn[0]), int(cancel_btn[1])
safe_device_click(d, c_btn_x, c_btn_y)
logger.info(f"✅ 已点击'取消转文字' ({c_btn_x}, {c_btn_y}),界面已恢复")
else:
logger.warning("⚠️ 未找到'取消转文字'按钮,尝试点击空白处关闭菜单")
safe_device_click(d, vx + 300, vy)
except Exception as e:
logger.error(f"恢复界面状态时发生错误: {e}")
# D. (已移除) 不再执行“取消转文字”操作,保留文字以避免重复识别
# 之前此处会执行 long_click -> cancel_template -> safe_device_click
# 为了解决“反复打开”的问题,现在改为保留转出来的文字
logger.info("保留语音转换后的文字,不进行恢复界面操作")
else:
logger.warning("❌ 未能找到'转文字'按钮,点击空白处退出")
safe_device_click(d, vx + 300, vy)
@@ -326,13 +374,26 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
if msg['type'] == 'text':
cx, cy = msg['center']
# 1. 垂直距离在合理范围内 (30 到 600 像素)
# 2. 发送者一致 (确保归属正确)
# 3. 水平偏移在合理范围内 (对于对方cx 应该在左侧对于我cx 应该在右侧)
if 30 < cy - vy < 600 and msg['sender'] == v_msg['sender']:
# 进一步检查水平位置,确保文字在语音图标的大致垂直线上或稍有偏移
if abs(cx - vx) < 400:
# 2. 水平偏移在合理范围内 (增加到 600 像素以适配宽文本泡)
# 3. 关键:确保这两个坐标之间没有其他的语音图标(防止第一个语音“偷”了第二个语音的文字)
v_dist = cy - vy
h_dist = abs(cx - vx)
if 30 < v_dist < 600 and h_dist < 600:
# 检查中间是否有其他语音图标
has_intermediate_audio = False
for other_ax, other_ay in audio_matches:
if vy < other_ay < cy:
has_intermediate_audio = True
break
if has_intermediate_audio:
continue
# 发送者判定
if msg['sender'] == v_msg['sender']:
v_content_blocks.append(msg)
msg['is_voice_part'] = True
logger.info(f"关联成功: 语音({vx}, {vy}) -> 文本('{msg['content']}') [h_dist={h_dist:.1f}, v_dist={v_dist:.1f}]")
# 如果有内容块,按 Y 排序并合并
if v_content_blocks:
@@ -340,6 +401,9 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
combined_content = "".join([m['content'] for m in v_content_blocks])
v_msg['content'] = combined_content
v_msg['is_converted'] = True
else:
if not v_msg.get('content'):
logger.warning(f"语音({vx}, {vy}) 未能关联到任何文本块")
# 2. 收集最终要显示的消息(排除被标记为语音部分的文本)
for msg in messages:
@@ -485,6 +549,10 @@ def find_all_template_matches(screen_path, template_path, threshold=0.8):
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
# 记录最大匹配度,方便调试阈值
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
logger.info(f"模板匹配 {os.path.basename(template_path)}: 最大相似度 = {max_val:.4f} (阈值={threshold})")
# 找到所有大于阈值的点
loc = np.where(res >= threshold)