Files
aiData/WeiXin/ChatMonitorAll_ReplyAudio.py
HuangHai a4b6ebfed5 'commit'
2026-01-31 16:45:10 +08:00

365 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import os
import sys
import time
import logging
import asyncio
import hashlib
import json
import threading
import numpy as np
import cv2
# 添加项目根目录到 sys.path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
from WeiXin import WxUtil
from WeiXin.WxUtil import perform_input_action
from Util.LlmUtil import get_llm_response
from Util import Win32Patch
from Util.AlyTtsKit import QwenTTSManager
# 配置日志
log_dir = WxUtil.LOG_DIR
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file_path = os.path.join(log_dir, "ChatMonitor_Audio.log")
# 设置 logger
logger = logging.getLogger("ChatMonitor_Audio")
logger.setLevel(logging.INFO)
if logger.hasHandlers():
logger.handlers.clear()
file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='w')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(stream_handler)
logger.propagate = False
logger.info(f"🚀 日志文件路径: {os.path.abspath(log_file_path)}")
# 同时将 WxUtil 的日志也输出到同一个文件
wx_logger = logging.getLogger("WxUtil")
wx_logger.propagate = False
if not any(isinstance(h, logging.FileHandler) and os.path.abspath(h.baseFilename) == os.path.abspath(log_file_path) for h in wx_logger.handlers):
wx_logger.addHandler(file_handler)
wx_logger.addHandler(stream_handler)
class ChatMonitorAudioBot:
"""
大张老师自动巡课系统 (语音版)
"""
def __init__(self):
self.device = None
self.screenshot_path = os.path.join(WxUtil.OUTPUT_DIR, "AudioMonitor_live_shot.jpg")
self.debug_view_path = os.path.join(WxUtil.OUTPUT_DIR, "AudioMonitor_debug_view.jpg")
self.dialogue_log = []
self.input_pos = None
self.last_screen_hash = None
self.last_processed_msg_hash = None
self.processed_hashes = set()
self.processed_meta = set()
self.check_interval = 3 # 检查频率 (秒)
# TTS 配置
self.voice_id = "qwen-tts-vc-guanyu-voice-20260131160431051-8e51"
self.tts_manager = None
self.persona = (
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师名叫大张老师。你目前在长春市少惠林作文素养培养中心工作。"
"你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。"
"你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。"
"【严格约束】:\n"
"1. 绝对禁止发散!绝对禁止幻觉!\n"
"2. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
"3. 仅针对家长明确表达的内容进行回复。\n"
"4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
"5. 回复必须简练,字数严格控制在 50 字以内!\n"
"6. 对方问什么就答什么。例如问‘学校叫什么’,就只回答‘少惠林’,不要回复地址和电话!\n"
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
"- 单位/学校名称:长春市少惠林作文素养培养中心(简称:少惠林)\n"
"- 地址南环城路与临河街交汇TOUCH12街3楼325号\n"
"- 联系人小张老师电话18686619970\n"
"- 每学期开学招收小学三年级至六年级,初中七年级的学生入学,其它年段不招生。\n"
)
def _record_processed_hash(self, msg, msg_hash):
"""记录已处理的消息哈希和元数据 (仅内存)"""
self.processed_hashes.add(msg_hash)
if msg:
meta = (msg.get("sender", ""), msg.get("time_display", ""), msg.get("type", ""))
self.processed_meta.add(meta)
if len(self.processed_hashes) > 100:
temp = list(self.processed_hashes)[-100:]
self.processed_hashes = set(temp)
if len(self.processed_meta) > 100:
temp_meta = list(self.processed_meta)[-100:]
self.processed_meta = set(temp_meta)
async def get_reply(self, last_message_text, context_text=""):
prompt = (
f"【教师人设】:{self.persona}\n\n"
f"【上下文对话内容】:\n{context_text}\n\n"
f"【最后一条待回复消息】:\n{last_message_text}\n\n"
"【任务要求】:\n"
"请作为大张老师回复家长。**必须且只能针对最后一条消息进行回复!**\n"
"参考上下文对话内容,确保回复逻辑连贯。\n"
"严禁发散,严禁编造家长没说过的情况。如果不清楚家长的意图,就温柔询问。\n"
"字数严格控制在 50 字以内。直接输出回复正文。"
)
full_response = ""
async for chunk in get_llm_response(prompt, stream=False):
full_response += chunk
return full_response.strip().strip('"').strip('').strip('')
def step_1_prepare_env(self):
"""步骤1: 环境准备"""
logger.info("--- [Step 1] 环境准备 ---")
WxUtil.setup_script_environment()
try:
self.tts_manager = QwenTTSManager()
logger.info("✅ TTS 引擎初始化成功")
except Exception as e:
logger.error(f"❌ TTS 引擎初始化失败: {e}")
return False
return True
def step_2_connect_device(self):
"""步骤2: 连接设备"""
logger.info("--- [Step 2] 连接设备 ---")
self.device = WxUtil.connect_device()
if not self.device:
logger.error("❌ 设备连接失败,请检查手机是否连接且开启了调试模式")
return False
return True
def get_image_hash(self, file_path):
"""计算图片的 MD5 哈希值 (忽略顶部 150 像素的状态栏)"""
if not os.path.exists(file_path):
return None
try:
img = cv2.imread(file_path)
if img is None:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
h, w = img.shape[:2]
if h > 150:
cropped_img = img[150:h, 0:w]
else:
cropped_img = img
return hashlib.md5(cropped_img.tobytes()).hexdigest()
except Exception as e:
logger.error(f"计算哈希出错: {e}, 回退到文件哈希")
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def get_stable_message_hash(self, msg):
"""计算消息的稳定哈希值"""
if not msg:
return ""
stable_data = {
"sender": msg.get("sender", ""),
"content": msg.get("content") or "",
"time_display": msg.get("time_display", ""),
"type": msg.get("type", "")
}
msg_str = json.dumps(stable_data, sort_keys=True, ensure_ascii=False)
return hashlib.md5(msg_str.encode('utf-8')).hexdigest()
def _tts_worker(self, text, finished_event, start_event):
"""TTS 播音工作线程"""
try:
logger.info(f"[TTS] 启动语音合成音色ID: {self.voice_id}")
self.tts_manager.start_synthesis(self.voice_id, [text], wait_finished=False, buffer_seconds=1.0)
self.tts_manager.wait_for_playback_start()
start_event.set()
if self.tts_manager.callback:
self.tts_manager.callback.wait_for_finished()
logger.info("[TTS] 语音播放完成")
except Exception as e:
logger.error(f"[TTS] 播放过程中发生异常: {e}")
finally:
finished_event.set()
async def send_voice_reply(self, reply_text):
"""发送语音回复"""
logger.info(f"🎤 准备发送语音回复: {reply_text}")
# 1. 确保处于语音输入模式
tmp_screen = os.path.join(WxUtil.OUTPUT_DIR, "audio_reply_check.jpg")
self.device.screenshot(tmp_screen)
press_say_template = os.path.join(WxUtil.TEMPLATE_DIR, "press_say.jpg")
pos = WxUtil.match_template_center(tmp_screen, press_say_template, threshold=0.8)
if not pos:
logger.info(">>> 未发现 '按住说话' 按钮,尝试切换模式...")
audio_reply_template = os.path.join(WxUtil.TEMPLATE_DIR, "audio_reply.jpg")
switch_pos = WxUtil.match_template_center(tmp_screen, audio_reply_template, threshold=0.8)
if switch_pos:
self.device.click(switch_pos[0], switch_pos[1])
await asyncio.sleep(1.5)
self.device.screenshot(tmp_screen)
pos = WxUtil.match_template_center(tmp_screen, press_say_template, threshold=0.8)
if not pos:
logger.error("❌ 无法定位到 '按住说话' 按钮")
return False
# 2. 启动 TTS 线程
finished_event = threading.Event()
playback_start_event = threading.Event()
tts_thread = threading.Thread(target=self._tts_worker, args=(reply_text, finished_event, playback_start_event))
# 3. 执行录音动作
logger.info(">>> 立即按住发送语音按钮...")
self.device.touch.down(pos[0], pos[1])
logger.info(">>> 等待 0.5 秒确保微信进入录音状态...")
await asyncio.sleep(0.5)
logger.info(">>> 启动 TTS 播音...")
tts_thread.start()
# 等待播放开始
playback_start_event.wait(timeout=10)
# 等待播放结束
start_time = time.time()
while not finished_event.is_set():
await asyncio.sleep(0.1)
if time.time() - start_time > 60:
logger.warning("录音超时,强制结束")
break
# 4. 释放按钮
self.device.touch.up(pos[0], pos[1])
logger.info(">>> 录音结束,语音已发送")
tts_thread.join()
return True
async def run(self):
"""主运行循环"""
logger.info("🚀 正在启动 ChatMonitor_Audio (Voice-Reply)...")
if not self.step_1_prepare_env(): return
if not self.step_2_connect_device(): return
logger.info("🚀 启动完成,进入实时监控阶段...")
while True:
try:
self.device.screenshot(self.screenshot_path)
current_screen_hash = self.get_image_hash(self.screenshot_path)
if current_screen_hash == self.last_screen_hash:
await asyncio.sleep(self.check_interval)
continue
self.last_screen_hash = current_screen_hash
logger.info("📸 屏幕发生变化,正在分析...")
dialogue_log, input_pos = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
process_strategy="UNREAD",
restore_processed_voice=False
)
if not dialogue_log:
await asyncio.sleep(self.check_interval)
continue
self.dialogue_log = dialogue_log
self.input_pos = input_pos
last_msg = dialogue_log[-1]
current_msg_hash = self.get_stable_message_hash(last_msg)
sender = last_msg.get('sender', '')
is_processed = current_msg_hash in self.processed_hashes
if is_processed and current_msg_hash != self.last_processed_msg_hash:
self.last_processed_msg_hash = current_msg_hash
if not is_processed and current_msg_hash != self.last_processed_msg_hash:
if sender != "":
logger.info(f"💡 发现新消息 [{last_msg.get('type')}]: {last_msg.get('content')}")
msg_shot_path = os.path.join(WxUtil.OUTPUT_DIR, f"NewMsg_{int(time.time())}.jpg")
self.device.screenshot(msg_shot_path)
context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in dialogue_log[:-1]])
last_content = last_msg.get('content') or ""
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.info("检测到未成功转换的语音消息,尝试强制重试 OCR 转换...")
dialogue_log_retry, _ = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
process_strategy="LAST",
restore_processed_voice=False
)
if dialogue_log_retry:
self.dialogue_log = dialogue_log_retry
last_msg = dialogue_log_retry[-1]
last_content = last_msg.get('content') or ""
current_msg_hash = self.get_stable_message_hash(last_msg)
if current_msg_hash in self.processed_hashes:
self.last_processed_msg_hash = current_msg_hash
continue
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.warning("语音消息内容为空,暂不生成回复")
await asyncio.sleep(self.check_interval)
continue
reply = await self.get_reply(last_content, context_text)
if reply:
logger.info(f"LLM 建议回复: {reply}")
# 发送语音回复
success = await self.send_voice_reply(reply)
if success:
logger.info(">>> 语音回复发送成功 <<<")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
else:
logger.error("语音回复动作执行失败")
else:
logger.info("LLM 认为无需回复")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
else:
self.last_processed_msg_hash = current_msg_hash
await asyncio.sleep(self.check_interval)
except Exception as e:
logger.error(f"Error in monitoring loop: {e}", exc_info=True)
await asyncio.sleep(self.check_interval)
async def run_main():
bot = ChatMonitorAudioBot()
await bot.run()
if __name__ == "__main__":
Win32Patch.patch()
asyncio.run(run_main())