diff --git a/WeiXin/ChatMonitorAll_ReplyAudio.py b/WeiXin/ChatMonitorAll_ReplyAudio.py new file mode 100644 index 0000000..f3a0d14 --- /dev/null +++ b/WeiXin/ChatMonitorAll_ReplyAudio.py @@ -0,0 +1,373 @@ +# coding=utf-8 +import os +import sys +import time +import logging +import asyncio +import hashlib +import json +import threading +import numpy as np + +import cv2 + +# 添加项目根目录到 sys.path +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if project_root not in sys.path: + sys.path.append(project_root) + +from WeiXin import WxUtil +from WeiXin.WxUtil import perform_input_action +from Util.LlmUtil import get_llm_response +from Util import Win32Patch +from Util.AlyTtsKit import QwenTTSManager + +# 配置日志 +log_dir = WxUtil.LOG_DIR +if not os.path.exists(log_dir): + os.makedirs(log_dir) + +log_file_path = os.path.join(log_dir, "ChatMonitor_Audio.log") + +# 设置 logger +logger = logging.getLogger("ChatMonitor_Audio") +logger.setLevel(logging.INFO) + +if logger.hasHandlers(): + logger.handlers.clear() + +file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='w') +file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) +logger.addHandler(file_handler) + +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) +logger.addHandler(stream_handler) + +logger.propagate = False +logger.info(f"🚀 日志文件路径: {os.path.abspath(log_file_path)}") + +# 同时将 WxUtil 的日志也输出到同一个文件 +wx_logger = logging.getLogger("WxUtil") +wx_logger.propagate = False +if not any(isinstance(h, logging.FileHandler) and os.path.abspath(h.baseFilename) == os.path.abspath(log_file_path) for h in wx_logger.handlers): + wx_logger.addHandler(file_handler) + wx_logger.addHandler(stream_handler) + +class ChatMonitorAudioBot: + """ + 大张老师自动巡课系统 (语音版) + """ + def __init__(self): + self.device = None + self.screenshot_path = os.path.join(WxUtil.OUTPUT_DIR, "AudioMonitor_live_shot.jpg") + self.debug_view_path = os.path.join(WxUtil.OUTPUT_DIR, "AudioMonitor_debug_view.jpg") + self.dialogue_log = [] + self.input_pos = None + self.last_screen_hash = None + self.last_processed_msg_hash = None + self.processed_hashes = set() + self.processed_meta = set() + self.check_interval = 3 # 检查频率 (秒) + + # TTS 配置 + self.voice_id = "qwen-tts-vc-guanyu-voice-20260131160431051-8e51" + self.tts_manager = None + + self.persona = ( + "你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师,名叫‘大张老师’。你目前在‘长春市少惠林作文素养培养中心’工作。" + "你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。" + "你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。" + "【严格约束】:\n" + "1. 绝对禁止发散!绝对禁止幻觉!\n" + "2. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n" + "3. 仅针对家长明确表达的内容进行回复。\n" + "4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n" + "5. 回复必须简练,字数严格控制在 50 字以内!\n" + "6. 对方问什么就答什么。例如问‘学校叫什么’,就只回答‘少惠林’,不要回复地址和电话!\n" + "如果涉及到校区信息,必须且只能使用以下真实数据:\n" + "- 单位/学校名称:长春市少惠林作文素养培养中心(简称:少惠林)\n" + "- 地址:南环城路与临河街交汇,TOUCH12街3楼325号\n" + "- 联系人:小张老师(电话:18686619970)\n" + "- 每学期开学招收小学三年级至六年级,初中七年级的学生入学,其它年段不招生。\n" + ) + + def _record_processed_hash(self, msg, msg_hash): + """记录已处理的消息哈希和元数据 (仅内存)""" + self.processed_hashes.add(msg_hash) + if msg: + meta = (msg.get("sender", ""), msg.get("time_display", ""), msg.get("type", "")) + self.processed_meta.add(meta) + + if len(self.processed_hashes) > 100: + temp = list(self.processed_hashes)[-100:] + self.processed_hashes = set(temp) + + if len(self.processed_meta) > 100: + temp_meta = list(self.processed_meta)[-100:] + self.processed_meta = set(temp_meta) + + async def get_reply(self, last_message_text, context_text=""): + prompt = ( + f"【教师人设】:{self.persona}\n\n" + f"【上下文对话内容】:\n{context_text}\n\n" + f"【最后一条待回复消息】:\n{last_message_text}\n\n" + "【任务要求】:\n" + "请作为大张老师回复家长。**必须且只能针对最后一条消息进行回复!**\n" + "参考上下文对话内容,确保回复逻辑连贯。\n" + "严禁发散,严禁编造家长没说过的情况。如果不清楚家长的意图,就温柔询问。\n" + "字数严格控制在 50 字以内。直接输出回复正文。" + ) + + full_response = "" + async for chunk in get_llm_response(prompt, stream=False): + full_response += chunk + return full_response.strip().strip('"').strip('“').strip('”') + + def step_1_prepare_env(self): + """步骤1: 环境准备""" + logger.info("--- [Step 1] 环境准备 ---") + WxUtil.setup_script_environment() + try: + self.tts_manager = QwenTTSManager() + logger.info("✅ TTS 引擎初始化成功") + except Exception as e: + logger.error(f"❌ TTS 引擎初始化失败: {e}") + return False + return True + + def step_2_connect_device(self): + """步骤2: 连接设备""" + logger.info("--- [Step 2] 连接设备 ---") + self.device = WxUtil.connect_device() + if not self.device: + logger.error("❌ 设备连接失败,请检查手机是否连接且开启了调试模式") + return False + return True + + def get_image_hash(self, file_path): + """计算图片的 MD5 哈希值 (忽略顶部 150 像素的状态栏)""" + if not os.path.exists(file_path): + return None + try: + img = cv2.imread(file_path) + if img is None: + with open(file_path, "rb") as f: + return hashlib.md5(f.read()).hexdigest() + h, w = img.shape[:2] + if h > 150: + cropped_img = img[150:h, 0:w] + else: + cropped_img = img + return hashlib.md5(cropped_img.tobytes()).hexdigest() + except Exception as e: + logger.error(f"计算哈希出错: {e}, 回退到文件哈希") + with open(file_path, "rb") as f: + return hashlib.md5(f.read()).hexdigest() + + def get_stable_message_hash(self, msg): + """计算消息的稳定哈希值""" + if not msg: + return "" + stable_data = { + "sender": msg.get("sender", ""), + "content": msg.get("content") or "", + "time_display": msg.get("time_display", ""), + "type": msg.get("type", "") + } + msg_str = json.dumps(stable_data, sort_keys=True, ensure_ascii=False) + return hashlib.md5(msg_str.encode('utf-8')).hexdigest() + + def _tts_worker(self, text, finished_event, start_event): + """TTS 播音工作线程""" + try: + logger.info(f"[TTS] 启动语音合成,音色ID: {self.voice_id}") + self.tts_manager.start_synthesis(self.voice_id, [text], wait_finished=False, buffer_seconds=1.0) + self.tts_manager.wait_for_playback_start() + start_event.set() + if self.tts_manager.callback: + self.tts_manager.callback.wait_for_finished() + logger.info("[TTS] 语音播放完成") + except Exception as e: + logger.error(f"[TTS] 播放过程中发生异常: {e}") + finally: + finished_event.set() + + async def send_voice_reply(self, reply_text): + """发送语音回复""" + logger.info(f"🎤 准备发送语音回复: {reply_text}") + + # 1. 确保处于语音输入模式 + tmp_screen = os.path.join(WxUtil.OUTPUT_DIR, "audio_reply_check.jpg") + self.device.screenshot(tmp_screen) + press_say_template = os.path.join(WxUtil.TEMPLATE_DIR, "press_say.jpg") + pos = WxUtil.match_template_center(tmp_screen, press_say_template, threshold=0.8) + + if not pos: + logger.info(">>> 未发现 '按住说话' 按钮,尝试切换模式...") + audio_reply_template = os.path.join(WxUtil.TEMPLATE_DIR, "audio_reply.jpg") + switch_pos = WxUtil.match_template_center(tmp_screen, audio_reply_template, threshold=0.8) + if switch_pos: + self.device.click(switch_pos[0], switch_pos[1]) + await asyncio.sleep(1.5) + self.device.screenshot(tmp_screen) + pos = WxUtil.match_template_center(tmp_screen, press_say_template, threshold=0.8) + + if not pos: + logger.error("❌ 无法定位到 '按住说话' 按钮") + return False + + # 2. 启动 TTS 线程 + finished_event = threading.Event() + playback_start_event = threading.Event() + tts_thread = threading.Thread(target=self._tts_worker, args=(reply_text, finished_event, playback_start_event)) + + # 3. 执行录音动作 + logger.info(">>> 立即按住发送语音按钮...") + self.device.touch.down(pos[0], pos[1]) + + logger.info(">>> 等待 2 秒确保微信进入录音状态...") + await asyncio.sleep(2.0) + + logger.info(">>> 启动 TTS 播音...") + tts_thread.start() + + # 等待播放开始 + playback_start_event.wait(timeout=10) + + # 等待播放结束 + start_time = time.time() + while not finished_event.is_set(): + await asyncio.sleep(0.1) + if time.time() - start_time > 60: + logger.warning("录音超时,强制结束") + break + + # 4. 释放按钮 + self.device.touch.up(pos[0], pos[1]) + logger.info(">>> 录音结束,语音已发送") + + tts_thread.join() + return True + + async def run(self): + """主运行循环""" + logger.info("🚀 正在启动 ChatMonitor_Audio (Voice-Reply)...") + + if not self.step_1_prepare_env(): return + if not self.step_2_connect_device(): return + + logger.info("🚀 启动完成,进入实时监控阶段...") + + while True: + try: + self.device.screenshot(self.screenshot_path) + current_screen_hash = self.get_image_hash(self.screenshot_path) + + if current_screen_hash == self.last_screen_hash: + await asyncio.sleep(self.check_interval) + continue + + self.last_screen_hash = current_screen_hash + logger.info("📸 屏幕发生变化,正在分析...") + + dialogue_log, input_pos = await WxUtil.analyze_chat_image( + self.screenshot_path, + self.debug_view_path, + device=self.device, + process_strategy="UNREAD", + restore_processed_voice=False + ) + + if not dialogue_log: + await asyncio.sleep(self.check_interval) + continue + + self.dialogue_log = dialogue_log + self.input_pos = input_pos + + last_msg = dialogue_log[-1] + current_msg_hash = self.get_stable_message_hash(last_msg) + sender = last_msg.get('sender', '') + + is_processed = current_msg_hash in self.processed_hashes + + if is_processed and current_msg_hash != self.last_processed_msg_hash: + self.last_processed_msg_hash = current_msg_hash + + if not is_processed and current_msg_hash != self.last_processed_msg_hash: + if sender != "我": + logger.info(f"💡 发现新消息 [{last_msg.get('type')}]: {last_msg.get('content')}") + + msg_shot_path = os.path.join(WxUtil.OUTPUT_DIR, f"NewMsg_{int(time.time())}.jpg") + self.device.screenshot(msg_shot_path) + + context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in dialogue_log[:-1]]) + last_content = last_msg.get('content') or "" + + if last_msg.get('type') == 'voice' and not last_content.strip(): + logger.info("检测到未成功转换的语音消息,尝试强制重试 OCR 转换...") + dialogue_log_retry, _ = await WxUtil.analyze_chat_image( + self.screenshot_path, + self.debug_view_path, + device=self.device, + process_strategy="LAST", + restore_processed_voice=False + ) + if dialogue_log_retry: + self.dialogue_log = dialogue_log_retry + last_msg = dialogue_log_retry[-1] + last_content = last_msg.get('content') or "" + current_msg_hash = self.get_stable_message_hash(last_msg) + if current_msg_hash in self.processed_hashes: + self.last_processed_msg_hash = current_msg_hash + continue + + if last_msg.get('type') == 'voice' and not last_content.strip(): + logger.warning("语音消息内容为空,暂不生成回复") + await asyncio.sleep(self.check_interval) + continue + + reply = await self.get_reply(last_content, context_text) + + if reply: + logger.info(f"LLM 建议回复: {reply}") + + # 如果触发回复的消息是语音,先点击语音条以清理状态 + if last_msg.get('type') == 'voice': + try: + cx, cy = last_msg.get('center', (0, 0)) + WxUtil.safe_device_click(self.device, cx, cy) + await asyncio.sleep(1.5) + except Exception as e: + logger.warning(f"清理语音状态失败: {e}") + + # 发送语音回复 + success = await self.send_voice_reply(reply) + + if success: + logger.info(">>> 语音回复发送成功 <<<") + self._record_processed_hash(last_msg, current_msg_hash) + self.last_processed_msg_hash = current_msg_hash + else: + logger.error("语音回复动作执行失败") + else: + logger.info("LLM 认为无需回复") + self._record_processed_hash(last_msg, current_msg_hash) + self.last_processed_msg_hash = current_msg_hash + else: + self.last_processed_msg_hash = current_msg_hash + + await asyncio.sleep(self.check_interval) + + except Exception as e: + logger.error(f"Error in monitoring loop: {e}", exc_info=True) + await asyncio.sleep(self.check_interval) + +async def run_main(): + bot = ChatMonitorAudioBot() + await bot.run() + +if __name__ == "__main__": + Win32Patch.patch() + asyncio.run(run_main()) diff --git a/WeiXin/ChatMonitorAll.py b/WeiXin/ChatMonitorAll_ReplyTxt.py similarity index 100% rename from WeiXin/ChatMonitorAll.py rename to WeiXin/ChatMonitorAll_ReplyTxt.py diff --git a/WeiXin/T7_TTS_VoiceReply.py b/WeiXin/T7_TTS_VoiceReply.py index 71caf8f..596925c 100644 --- a/WeiXin/T7_TTS_VoiceReply.py +++ b/WeiXin/T7_TTS_VoiceReply.py @@ -21,14 +21,20 @@ MY_VOICE_ID = "qwen-tts-vc-guanyu-voice-20260131160431051-8e51" REPLY_TEXT = "我是少惠林的大张老师,您的孩子几年级了?我们周六周日上班,您可以带孩子过来试听一下。" # 设置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler(sys.stdout), - logging.FileHandler(os.path.join(WxUtil.LOG_DIR, "T7_TTS_VoiceReply.log"), mode='w', encoding='utf-8') - ] -) +# 强制重新配置日志,确保输出到文件 +root_logger = logging.getLogger() +for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + +file_handler = logging.FileHandler(os.path.join(WxUtil.LOG_DIR, "T7_TTS_VoiceReply.log"), mode='w', encoding='utf-8') +stream_handler = logging.StreamHandler(sys.stdout) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +file_handler.setFormatter(formatter) +stream_handler.setFormatter(formatter) +root_logger.addHandler(file_handler) +root_logger.addHandler(stream_handler) +root_logger.setLevel(logging.INFO) + logger = logging.getLogger("T7_TTS") def tts_worker(tts_manager, voice_id, text, finished_event, start_event): @@ -53,7 +59,7 @@ def tts_worker(tts_manager, voice_id, text, finished_event, start_event): finished_event.set() def run_t7_task(): - logger.info("开始执行 T7: 集成 TTS 克隆引擎与微信语音回复 (优化连贯性版本)") + logger.info("开始执行 T7: 集成 TTS 克隆引擎与微信语音回复 (优化延迟版)") # 1. 连接设备 d = WxUtil.connect_device() @@ -63,16 +69,13 @@ def run_t7_task(): # 2. 初始化 TTS 管理器 try: - # 增加缓冲区到 1.0 秒以解决断续问题 tts_manager = QwenTTSManager() - # 这里虽然 init 没传参,但我们可以在 start_synthesis 逻辑里或者修改 AlyTtsKit 让它支持传参 except Exception as e: logger.error(f"初始化 TTS 失败: {e}") return # 3. 准备微信环境:确保处于语音输入模式 logger.info("步骤1: 准备微信环境...") - # 先尝试寻找 "按住说话" 按钮,如果找不到则尝试切换 tmp_screen = os.path.join(WxUtil.OUTPUT_DIR, "t7_check_mode.jpg") d.screenshot(tmp_screen) press_say_template = os.path.join(WxUtil.TEMPLATE_DIR, "press_say.jpg") @@ -80,7 +83,6 @@ def run_t7_task(): if not pos: logger.info(">>> [状态] 未发现 '按住说话' 按钮,尝试切换模式...") - # 尝试点击键盘图标切换回语音模式 audio_reply_template = os.path.join(WxUtil.TEMPLATE_DIR, "audio_reply.jpg") switch_pos = WxUtil.match_template_center(tmp_screen, audio_reply_template, threshold=0.8) if switch_pos: @@ -96,25 +98,28 @@ def run_t7_task(): logger.info(f">>> [定位] 成功定位到语音按钮中心: {pos}") - # 4. 同步执行:开始长按 + 启动 TTS + # 4. 同步执行:先按住 -> 等待2秒 -> 播放 finished_event = threading.Event() playback_start_event = threading.Event() tts_thread = threading.Thread(target=tts_worker, args=(tts_manager, MY_VOICE_ID, REPLY_TEXT, finished_event, playback_start_event)) - logger.info("步骤2: 开始同步录音与播放 (使用 Jitter Buffer 优化)...") + logger.info("步骤2: 开始同步录音与播放 (先按住 2 秒再播放)...") - # 启动 TTS 线程 + # A. 先按住按钮 + logger.info(">>> [发送] 1. 立即按住发送语音按钮...") + d.touch.down(pos[0], pos[1]) + + # B. 明确等待 2 秒(解决最前面语音丢失问题) + logger.info(">>> [等待] 2. 录音已启动,等待 2 秒确保微信进入录音状态...") + time.sleep(2.0) + + # C. 启动 TTS 线程(开始合成并播放) + logger.info(">>> [播放] 3. 启动 TTS 播音...") tts_thread.start() - # 等待 TTS 真正开始发出声音 - logger.info(">>> [等待] 等待音频缓冲区填充并开始播放...") - if not playback_start_event.wait(timeout=10): - logger.error("等待音频播放超时") - return - - # 按住按钮 - logger.info(">>> [发送] 检测到音频播放开始,立即按住发送语音按钮...") - d.touch.down(pos[0], pos[1]) + # D. 等待音频真正开始播放(用于日志同步) + if playback_start_event.wait(timeout=10): + logger.info(">>> [同步] 检测到音频已开始从扬声器输出") # 模拟计时 start_time = time.time() @@ -122,17 +127,17 @@ def run_t7_task(): while not finished_event.is_set(): elapsed = int(time.time() - start_time) if elapsed > last_second: - logger.info(f">>> [计时] {elapsed}") + logger.info(f">>> [录音中] {elapsed}s") last_second = elapsed time.sleep(0.1) - if elapsed > 30: - logger.warning("录音时间超过 30 秒,强制结束") + if elapsed > 40: # 调大超时时间 + logger.warning("录音时间过长,强制结束") break - # 释放按钮 + # E. 释放按钮 d.touch.up(pos[0], pos[1]) - total_duration = time.time() - start_time - logger.info(f">>> [完成] 录音完成,总时长约 {total_duration:.2f}s") + total_duration = time.time() - start_time + 2.0 # 加上最开始等待的2秒 + logger.info(f">>> [完成] 录音结束,微信录音总时长约 {total_duration:.2f}s") tts_thread.join()