Files
aiData/WeiXin/ChatMonitorAll_ReplyAudio.py
HuangHai 00375a80b2 'commit'
2026-01-31 17:29:57 +08:00

427 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import asyncio
import hashlib
import json
import logging
import os
import re
import sys
import threading
import time
import cv2
# 添加项目根目录到 sys.path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
from WeiXin import WxUtil
from Util.LlmUtil import get_llm_response
from Util import Win32Patch
from Util.AlyTtsKit import QwenTTSManager
# 配置日志
log_dir = WxUtil.LOG_DIR
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file_path = os.path.join(log_dir, "ChatMonitor_Audio.log")
# 设置 logger
logger = logging.getLogger("ChatMonitor_Audio")
logger.setLevel(logging.INFO)
if logger.hasHandlers():
logger.handlers.clear()
file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='w')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(stream_handler)
logger.propagate = False
logger.info(f"🚀 日志文件路径: {os.path.abspath(log_file_path)}")
# 同时将 WxUtil 的日志也输出到同一个文件
wx_logger = logging.getLogger("WxUtil")
wx_logger.propagate = False
if not any(isinstance(h, logging.FileHandler) and os.path.abspath(h.baseFilename) == os.path.abspath(log_file_path) for h in wx_logger.handlers):
wx_logger.addHandler(file_handler)
wx_logger.addHandler(stream_handler)
class ChatMonitorAudioBot:
"""
大张老师自动巡课系统 (语音版)
"""
def __init__(self, debug_mode=False):
self.device = None
self.debug_mode = debug_mode
# 同步设置 WxUtil 的调试模式
WxUtil.set_debug_mode(debug_mode)
self.screenshot_path = os.path.join(WxUtil.OUTPUT_DIR, "AudioMonitor_live_shot.jpg")
self.debug_view_path = os.path.join(WxUtil.OUTPUT_DIR, "AudioMonitor_debug_view.jpg")
self.dialogue_log = []
self.input_pos = None
self.last_screen_hash = None
self.last_processed_msg_hash = None
self.processed_hashes = set()
self.processed_meta = set()
self.check_interval = 3 # 检查频率 (秒)
# TTS 配置
self.voice_id = "qwen-tts-vc-guanyu-voice-20260131160431051-8e51"
self.tts_manager = None
self.persona = (
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师名叫大张老师。你目前在长春市少惠林作文素养培养中心工作。"
"你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。"
"你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。"
"【关于作文评价的回复逻辑】:\n"
"**仅当且仅当**家长明确询问孩子今天的作文情况(例如:“孩子今天写得怎么样?”、“今天作文有进步吗?”)时,才触发此逻辑。触发时,请随机生成具体、专业的反馈。反馈应包含:\n"
"1. 孩子今天在写作中遇到的具体小问题(如修辞手法运用不当、开头略显生硬等);\n"
"2. 值得表扬的亮点(如观察细致、某个句子写得很传神等);\n"
"3. 对字迹的评价(如字迹工整、卷面整洁等);\n"
"4. 针对性的课外阅读建议(推荐具体类型的书目)。\n"
"此类特定回复字数可放宽,控制在 150 字以内,确保语音回复时长在 30 秒内。\n"
"【关于通用咨询的回复逻辑】:\n"
"如果家长是咨询学校信息、地址、课程时间、年级设置,或者是刚加好友打招呼,请务必保持礼貌、知性、亲切,直接回答相关问题。严禁在这些情况下提及孩子的作文表现或虚构作文反馈。\n"
"【严格约束】:\n"
"1. 绝对禁止输出任何括号内的内容(如动作、神态描述、心理活动等)!例如不要输出‘(微笑)’或‘(亲切地注视)’。\n"
"2. 你的回复是直接用于语音播放的,请只输出你想说的话,不要包含任何舞台指导文字。\n"
"3. 绝对禁止发散!绝对禁止幻觉!\n"
"4. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
"5. 仅针对家长明确表达的内容进行回复。\n"
"6. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
"7. 普通咨询回复字数严格控制在 50 字以内,专业作文评价回复控制在 150 字以内!\n"
"8. 说话要有人情味,不要回答得太生硬或太简短。例如当家长问‘学校叫什么名字’时,不要只说‘少惠林’,而应该说‘我们的学校名字叫少惠林。’或者‘咱们这儿叫少惠林。’,这样才显得亲切、有礼貌。\n"
"9. 对方问什么就答什么。回答要完整、体面,但不要过度发散地址和电话(除非被问到)。\n"
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
"- 单位/学校名称:长春市少惠林作文素养培养中心(简称:少惠林)\n"
"- 地址南环城路与临河街交汇TOUCH12街3楼325号\n"
"- 联系人小张老师电话18686619970\n"
"- 每学期开学招收小学三年级至六年级,初中七年级的学生入学,其它年段不招生。\n"
)
def _record_processed_hash(self, msg, msg_hash):
"""记录已处理的消息哈希和元数据 (仅内存)"""
self.processed_hashes.add(msg_hash)
if msg:
meta = (msg.get("sender", ""), msg.get("time_display", ""), msg.get("type", ""))
self.processed_meta.add(meta)
if len(self.processed_hashes) > 100:
temp = list(self.processed_hashes)[-100:]
self.processed_hashes = set(temp)
if len(self.processed_meta) > 100:
temp_meta = list(self.processed_meta)[-100:]
self.processed_meta = set(temp_meta)
async def get_reply(self, last_message_text, context_text=""):
prompt = (
f"【教师人设】:{self.persona}\n\n"
f"【上下文对话内容】:\n{context_text}\n\n"
f"【最后一条待回复消息】:\n{last_message_text}\n\n"
"【任务要求】:\n"
"请作为大张老师回复家长。**必须且只能针对最后一条消息进行回复!**\n"
"1. **意图识别**:判断家长是否在询问“孩子今天的作文写得怎么样”。\n"
"2. **条件回复**\n"
" - 如果是询问作文,请严格按【关于作文评价的回复逻辑】生成 150 字以内的详细专业反馈。\n"
" - 如果是咨询学校地址、课程、年级、打招呼等通用问题,请按【关于通用咨询的回复逻辑】简洁回答,严禁提到任何关于孩子作文的内容,字数控制在 50 字以内。\n"
"3. **格式要求**:严禁发散,直接输出回复正文,不包含任何括号内的动作描述。\n"
"4. **口语化处理**:由于回复将用于语音播放,请确保称呼自然。如果称呼家长,请直接使用‘糖豆爸爸’、‘糖豆妈妈’等格式,不要在中间加标点符号,也不要使用‘某某的爸爸’这种书面语,直接称呼‘糖豆爸爸’会更亲切自然。"
)
full_response = ""
async for chunk in get_llm_response(prompt, stream=False):
full_response += chunk
reply = full_response.strip().strip('"').strip('').strip('')
# [User Requested] 过滤掉括号内容 (如动作描述),防止 TTS 播放
reply = re.sub(r'\(.*?\)', '', reply)
reply = re.sub(r'.*?', '', reply)
# [Optimization] 口语化称呼预处理,确保 TTS 衔接自然
# 针对“XX爸爸/妈妈”中间可能出现的空格或异常停顿符号进行清理
reply = re.sub(r'(爸爸|妈妈|爷爷|奶奶|姥姥|姥爷)\s+', r'\1', reply) # 移除称谓后的空格
return reply.strip()
def step_1_prepare_env(self):
"""步骤1: 环境准备"""
logger.info("--- [Step 1] 环境准备 ---")
WxUtil.setup_script_environment()
try:
self.tts_manager = QwenTTSManager()
logger.info("✅ TTS 引擎初始化成功")
except Exception as e:
logger.error(f"❌ TTS 引擎初始化失败: {e}")
return False
return True
def step_2_connect_device(self):
"""步骤2: 连接设备"""
logger.info("--- [Step 2] 连接设备 ---")
self.device = WxUtil.connect_device()
if not self.device:
logger.error("❌ 设备连接失败,请检查手机是否连接且开启了调试模式")
return False
return True
def get_image_hash(self, file_path):
"""计算图片的 MD5 哈希值 (忽略顶部 150 像素的状态栏)"""
if not os.path.exists(file_path):
return None
try:
img = cv2.imread(file_path)
if img is None:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
h, w = img.shape[:2]
if h > 150:
cropped_img = img[150:h, 0:w]
else:
cropped_img = img
return hashlib.md5(cropped_img.tobytes()).hexdigest()
except Exception as e:
logger.error(f"计算哈希出错: {e}, 回退到文件哈希")
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def get_stable_message_hash(self, msg):
"""计算消息的稳定哈希值"""
if not msg:
return ""
stable_data = {
"sender": msg.get("sender", ""),
"content": msg.get("content") or "",
"time_display": msg.get("time_display", ""),
"type": msg.get("type", "")
}
msg_str = json.dumps(stable_data, sort_keys=True, ensure_ascii=False)
return hashlib.md5(msg_str.encode('utf-8')).hexdigest()
def _tts_worker(self, text, finished_event, start_event):
"""TTS 播音工作线程"""
try:
logger.info(f"[TTS] 启动语音合成音色ID: {self.voice_id}")
self.tts_manager.start_synthesis(self.voice_id, [text], wait_finished=False, buffer_seconds=0.3)
self.tts_manager.wait_for_playback_start()
start_event.set()
if self.tts_manager.callback:
self.tts_manager.callback.wait_for_finished()
logger.info("[TTS] 语音播放完成")
except Exception as e:
logger.error(f"[TTS] 播放过程中发生异常: {e}")
finally:
finished_event.set()
async def send_voice_reply(self, reply_text):
"""发送语音回复"""
logger.info(f"🎤 准备发送语音回复: {reply_text}")
# 1. 确保处于语音输入模式
tmp_screen = os.path.join(WxUtil.OUTPUT_DIR, "audio_reply_check.jpg")
self.device.screenshot(tmp_screen)
press_say_template = os.path.join(WxUtil.TEMPLATE_DIR, "press_say.jpg")
pos = WxUtil.match_template_center(tmp_screen, press_say_template, threshold=0.8)
if not pos:
logger.info(">>> 未发现 '按住说话' 按钮,尝试切换模式...")
audio_reply_template = os.path.join(WxUtil.TEMPLATE_DIR, "audio_reply.jpg")
switch_pos = WxUtil.match_template_center(tmp_screen, audio_reply_template, threshold=0.8)
if switch_pos:
self.device.click(switch_pos[0], switch_pos[1])
await asyncio.sleep(1.5)
self.device.screenshot(tmp_screen)
pos = WxUtil.match_template_center(tmp_screen, press_say_template, threshold=0.8)
if not pos:
logger.error("❌ 无法定位到 '按住说话' 按钮")
return False
# 2. 启动 TTS 线程
finished_event = threading.Event()
playback_start_event = threading.Event()
tts_thread = threading.Thread(target=self._tts_worker, args=(reply_text, finished_event, playback_start_event))
# 3. 执行录音动作
logger.info(">>> 立即按住发送语音按钮...")
self.device.touch.down(pos[0], pos[1])
logger.info(">>> 等待 0.5 秒确保微信进入录音状态...")
await asyncio.sleep(0.5)
logger.info(">>> 启动 TTS 播音...")
tts_thread.start()
# 等待播放开始
playback_start_event.wait(timeout=10)
# 等待播放结束
start_time = time.time()
while not finished_event.is_set():
await asyncio.sleep(0.1)
if time.time() - start_time > 60:
logger.warning("录音超时,强制结束")
break
# 4. 释放按钮
self.device.touch.up(pos[0], pos[1])
logger.info(">>> 录音结束,语音已发送")
tts_thread.join()
return True
async def run(self):
"""主运行循环"""
logger.info("🚀 正在启动 ChatMonitor_Audio (Voice-Reply)...")
if not self.step_1_prepare_env(): return
if not self.step_2_connect_device(): return
logger.info("🚀 启动完成,进入实时监控阶段...")
while True:
try:
self.device.screenshot(self.screenshot_path)
# [User Requested] 检查是否在聊天界面
if not WxUtil.check_is_chat_interface(self.screenshot_path):
logger.info("当前不在聊天界面,跳过本次循环,等待中...")
await asyncio.sleep(self.check_interval)
continue
current_screen_hash = self.get_image_hash(self.screenshot_path)
if current_screen_hash == self.last_screen_hash:
await asyncio.sleep(self.check_interval)
continue
self.last_screen_hash = current_screen_hash
logger.info("📸 屏幕发生变化,正在分析...")
dialogue_log, input_pos = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
process_strategy="UNREAD",
restore_processed_voice=False
)
if not dialogue_log:
await asyncio.sleep(self.check_interval)
continue
self.dialogue_log = dialogue_log
self.input_pos = input_pos
# [Optimization] 核心逻辑:在 Voice-Reply 模式下,只有当存在"对方"发来的、带红点的未读消息时,才进行处理
unread_voices = [m for m in dialogue_log if m.get('type') == 'voice' and m.get('is_unread') and m.get('sender') != ""]
if not unread_voices:
if self.debug_mode:
logger.debug("当前屏幕无带红点的未读语音消息,跳过分析")
await asyncio.sleep(self.check_interval)
continue
# 进一步确认最后一条消息不是"我"发的(双重保险)
chat_msgs = [m for m in dialogue_log if m.get('type') in ['text', 'voice']]
if not chat_msgs:
await asyncio.sleep(self.check_interval)
continue
absolute_last_msg = chat_msgs[-1]
if absolute_last_msg.get('sender') == "":
if self.debug_mode:
logger.debug("最后一条消息是''发送的,跳过回复")
await asyncio.sleep(self.check_interval)
continue
# 准备处理最新的未读语音
last_msg = unread_voices[-1]
last_message_text = last_msg.get('content') or ""
msg_type = last_msg.get('type')
current_msg_hash = self.get_stable_message_hash(last_msg)
is_processed = current_msg_hash in self.processed_hashes
if is_processed:
if current_msg_hash != self.last_processed_msg_hash:
self.last_processed_msg_hash = current_msg_hash
await asyncio.sleep(self.check_interval)
continue
if not is_processed and current_msg_hash != self.last_processed_msg_hash:
logger.info(f"💡 发现新消息 [{last_msg.get('type')}]: {last_msg.get('content')}")
msg_shot_path = os.path.join(WxUtil.OUTPUT_DIR, f"NewMsg_{int(time.time())}.jpg")
self.device.screenshot(msg_shot_path)
context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in dialogue_log if m != last_msg])
last_content = last_msg.get('content') or ""
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.info("检测到未成功转换的语音消息,尝试强制重试 OCR 转换...")
dialogue_log_retry, _ = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
process_strategy="LAST",
restore_processed_voice=False
)
if dialogue_log_retry:
retry_other_msgs = [m for m in dialogue_log_retry if m.get('sender') != ""]
if retry_other_msgs:
last_msg = retry_other_msgs[-1]
last_content = last_msg.get('content') or ""
current_msg_hash = self.get_stable_message_hash(last_msg)
if current_msg_hash in self.processed_hashes:
self.last_processed_msg_hash = current_msg_hash
continue
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.warning("语音消息内容为空,暂不生成回复")
await asyncio.sleep(self.check_interval)
continue
reply = await self.get_reply(last_content, context_text)
if reply:
logger.info(f"LLM 建议回复: {reply}")
# 发送语音回复
success = await self.send_voice_reply(reply)
if success:
logger.info(">>> 语音回复发送成功 <<<")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
else:
logger.error("语音回复动作执行失败")
else:
logger.info("LLM 认为无需回复")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
await asyncio.sleep(self.check_interval)
except Exception as e:
logger.error(f"Error in monitoring loop: {e}", exc_info=True)
await asyncio.sleep(self.check_interval)
async def run_main():
# 默认关闭调试模式以提高响应速度,如需调试可设为 True
bot = ChatMonitorAudioBot(debug_mode=False)
await bot.run()
if __name__ == "__main__":
Win32Patch.patch()
asyncio.run(run_main())