Files
aiData/WeiXin/T2_ChatMonitor.py
HuangHai 37b5d5c431 'commit'
2026-01-26 20:07:58 +08:00

376 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import os
import sys
import logging
import asyncio
import hashlib
import json
import numpy as np
import cv2
# 添加项目根目录到 sys.path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
from WeiXin import WxUtil
from WeiXin.WxUtil import perform_input_action
from Util.LlmUtil import get_llm_response
from Util import Win32Patch
# 配置日志
log_dir = WxUtil.LOG_DIR
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file_path = os.path.join(log_dir, "T2_ChatMonitor.log")
# 设置 logger
logger = logging.getLogger("T2_ChatMonitor")
logger.setLevel(logging.INFO)
if logger.hasHandlers():
logger.handlers.clear()
file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='w')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(stream_handler)
logger.propagate = False
logger.info(f"🚀 日志文件路径: {os.path.abspath(log_file_path)}")
# 同时将 WxUtil 的日志也输出到同一个文件
wx_logger = logging.getLogger("WxUtil")
wx_logger.propagate = False # 防止日志向上传递导致重复 (因为 WxUtil 中调用了 basicConfig)
if not any(isinstance(h, logging.FileHandler) and os.path.abspath(h.baseFilename) == os.path.abspath(log_file_path) for h in wx_logger.handlers):
wx_logger.addHandler(file_handler)
wx_logger.addHandler(stream_handler) # 确保 WxUtil 也输出到控制台
class ChatMonitorBot:
"""
大张老师自动巡课系统 (CV版)
"""
def __init__(self):
self.device = None
self.screenshot_path = os.path.join(WxUtil.OUTPUT_DIR, "T2_ChatMonitor_live_shot.jpg")
self.debug_view_path = os.path.join(WxUtil.OUTPUT_DIR, "T2_ChatMonitor_debug_view.jpg")
self.dialogue_log = []
self.input_pos = None
self.last_screen_hash = None
self.last_processed_msg_hash = None
self.check_interval = 3 # 检查频率 (秒)
self.persona = (
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师名叫大张老师。你目前在长春市少惠林作文素养培养中心工作。"
"你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。"
"你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。"
"【严格约束】:\n"
"1. 绝对禁止发散!绝对禁止幻觉!\n"
"2. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
"3. 仅针对家长明确表达的内容进行回复。\n"
"4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
"5. 回复必须简练,字数严格控制在 50 字以内!\n"
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
"- 单位:长春市少惠林作文素养培养中心\n"
"- 地址南环城路与临河街交汇TOUCH12街3楼325号\n"
"- 联系人小张老师电话18686619970\n"
"- 每学期开学招收小学三年级至六年级,初中七年级的学生入学,其它年段不招生。\n"
)
async def get_reply(self, last_message_text, context_text=""):
prompt = (
f"【教师人设】:{self.persona}\n\n"
f"【上下文对话内容】:\n{context_text}\n\n"
f"【最后一条待回复消息】:\n{last_message_text}\n\n"
"【任务要求】:\n"
"请作为大张老师回复家长。**必须且只能针对最后一条消息进行回复!**\n"
"参考上下文对话内容,确保回复逻辑连贯。\n"
"严禁发散,严禁编造家长没说过的情况。如果不清楚家长的意图,就温柔询问。\n"
"字数严格控制在 50 字以内。直接输出回复正文。"
)
full_response = ""
async for chunk in get_llm_response(prompt, stream=False):
full_response += chunk
return full_response.strip().strip('"').strip('').strip('')
def step_1_prepare_env(self):
"""步骤1: 环境准备"""
logger.info("--- [Step 1] 环境准备 ---")
WxUtil.setup_script_environment()
return True
def step_2_connect_device(self):
"""步骤2: 连接设备"""
logger.info("--- [Step 2] 连接设备 ---")
self.device = WxUtil.connect_device()
if not self.device:
logger.error("❌ 设备连接失败,请检查手机是否连接且开启了调试模式")
return False
return True
def get_image_hash(self, file_path):
"""计算图片的 MD5 哈希值 (忽略顶部 100 像素的状态栏)"""
if not os.path.exists(file_path):
return None
try:
# 使用 OpenCV 读取图片
img = cv2.imread(file_path)
if img is None:
# 如果读取失败,回退到文件哈希
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
# 裁剪掉顶部 150 像素 (状态栏/时间)
h, w = img.shape[:2]
if h > 150:
cropped_img = img[150:h, 0:w]
else:
cropped_img = img
# 计算裁剪后数据的哈希
return hashlib.md5(cropped_img.tobytes()).hexdigest()
except Exception as e:
logger.error(f"计算哈希出错: {e}, 回退到文件哈希")
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
async def run(self):
"""主运行循环"""
logger.info("🚀 大张老师自动巡课系统启动 (T2 增强版)...")
# 定义 JSON 序列化辅助函数
def numpy_serializer(obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
raise TypeError(f"Type {type(obj)} not serializable")
# 1. 环境准备
if not self.step_1_prepare_env(): return
if not self.step_2_connect_device(): return
# 2. 首次运行:识别所有语音并获取上下文
logger.info("🔍 [首次运行] 正在进行全量识别,获取对话上下文...")
# 调用封装好的 get_first_screen
self.dialogue_log, self.input_pos, enter_path, flag_path = await WxUtil.get_first_screen(self.device)
# 更新 live paths (用于后续监控逻辑的引用)
import shutil
if enter_path and os.path.exists(enter_path):
shutil.copy(enter_path, self.screenshot_path)
if flag_path and os.path.exists(flag_path):
shutil.copy(flag_path, self.debug_view_path)
logger.info(f"📸 已保存识别标记图: {flag_path}")
if self.dialogue_log:
logger.info(f"✅ 首次运行识别完成,获取到 {len(self.dialogue_log)} 条消息上下文")
logger.info("\n" + "="*50)
logger.info("【测试模式】最终提取的对话记录:")
for msg in self.dialogue_log:
# 格式化输出:[发送者] 内容 (类型)
sender = msg.get('sender', '未知')
content = msg.get('content', '')
msg_type = "语音" if msg.get('type') == 'voice' else "文字"
# 按照用户要求的格式输出
logger.info(f"说话人: {sender}")
logger.info(f"消息类型: {msg_type}")
logger.info(f"消息内容: {content}")
logger.info("-" * 20)
logger.info("="*50 + "\n")
# --- LLM 总结 ---
logger.info("🤖 正在请求 LLM 生成对话摘要...")
chat_history_text = ""
for msg in self.dialogue_log:
sender = msg.get('sender', '未知')
content = msg.get('content', '')
type_str = "[语音]" if msg.get('type') == 'voice' else "[文字]"
chat_history_text += f"{sender}{type_str}: {content}\n"
prompt = (
"请根据以下微信对话记录,总结归纳双方交流的主要信息点。\n"
"要求:\n"
"1. 简明扼要,分点列出。\n"
"2. 明确指出双方达成的一致或待解决的问题。\n"
"3. 忽略无关的寒暄。\n\n"
f"对话记录:\n{chat_history_text}"
)
try:
full_response = ""
async for chunk in get_llm_response(prompt, stream=True):
full_response += chunk
logger.info("\n" + "="*20 + " 对话摘要 (LLM) " + "="*20)
logger.info(full_response)
logger.info("="*55 + "\n")
except Exception as e:
logger.error(f"LLM 摘要生成失败: {e}")
# 初始化最后处理的消息哈希,避免重复回复第一条
last_msg = self.dialogue_log[-1]
# --- 初始回复逻辑 (Added) ---
# 如果最后一条是对方发的消息,说明可能需要回复
sender = last_msg.get('sender', '')
# 判断逻辑:只要不是"我",就认为是对方 (可能是 "对方", "糖豆爸爸" 等)
if sender != "":
logger.info(f"💡 [首屏] 最后一条消息来自 '{sender}',尝试生成回复...")
# 构建上下文
context_text = "\n".join([f"{m.get('sender')}: {m.get('content')}" for m in self.dialogue_log[:-1]])
last_content = last_msg.get('content', '')
reply = await self.get_reply(last_content, context_text)
if reply:
logger.info(f"🤖 [首屏] LLM 建议回复: {reply}")
# 检查输入框位置
if self.input_pos:
logger.info(f"⚡ [首屏] 执行自动回复...")
perform_input_action(self.device, self.input_pos, reply)
# 发送后更新 hash避免进入循环后重复回复
# 发送后,界面会变,但我们需要标记当前这条已经回过了
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
self.last_processed_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
logger.info("✅ [首屏] 回复已发送")
else:
logger.warning("❌ [首屏] 未找到输入框位置,无法发送")
else:
logger.info("⚪ [首屏] LLM 认为无需回复")
else:
logger.info("⚪ [首屏] 最后一条是自己发的,无需回复")
# 更新 Hash (如果刚才没发回复,也需要记录当前最后一条,防止循环里重复处理)
if not self.last_processed_msg_hash:
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
self.last_processed_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
self.last_screen_hash = self.get_image_hash(self.screenshot_path)
else:
logger.warning("⚠️ 首次运行未识别到有效对话")
# logger.info("🛑 测试结束:已完成所有语音的转换与读取。停止进入监控循环。")
# return # 测试模式:直接退出,不进入监控循环
# 3. 进入循环阶段
logger.info("🔄 进入实时监控阶段...")
while True:
try:
# A. 截图并计算哈希
self.device.screenshot(self.screenshot_path)
current_screen_hash = self.get_image_hash(self.screenshot_path)
# B. 如果屏幕无变化,则跳过识别
if current_screen_hash == self.last_screen_hash:
await asyncio.sleep(self.check_interval)
continue
self.last_screen_hash = current_screen_hash
logger.info("📸 屏幕发生变化,正在分析...")
# C. 分析最新图片
dialogue_log, input_pos = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
process_strategy="UNREAD" # 监控阶段:只处理带红点的新语音
)
if not dialogue_log:
logger.info("😴 未识别到有效消息")
await asyncio.sleep(self.check_interval)
continue
logger.info(f"📊 当前识别到 {len(dialogue_log)} 条消息,最后一条: {dialogue_log[-1]}")
# 更新当前对话日志(可用于上下文参考)
self.dialogue_log = dialogue_log
self.input_pos = input_pos
# D. 只关注最后一条消息
last_msg = dialogue_log[-1]
# last_msg 是字典,需要序列化
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
current_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
# E. 判断是否需要回复 (对方发送且非重复消息)
sender = last_msg.get('sender', '')
if sender != "":
if current_msg_hash != self.last_processed_msg_hash:
event_shot = WxUtil.get_next_debug_path("event_new_msg")
self.device.screenshot(event_shot)
logger.info(f"💡 [监控] 发现新消息: {last_msg},保存现场截图: {event_shot}")
# 获取上下文文本 (格式化为 Sender: Content)
context_text = "\n".join([f"{m.get('sender')}: {m.get('content')}" for m in dialogue_log[:-1]])
last_content = last_msg.get('content', '')
# 生成回复
reply = await self.get_reply(last_content, context_text)
if reply:
logger.info(f"🤖 [监控] LLM 建议回复: {reply}")
if self.input_pos:
logger.info(f"⚡ [监控] 执行自动回复...")
perform_input_action(self.device, self.input_pos, reply)
# 发送后截图留存
reply_sent_shot = WxUtil.get_next_debug_path("event_reply_sent")
self.device.screenshot(reply_sent_shot)
logger.info(f"✅ [监控] 回复已发送,保存发送后截图: {reply_sent_shot}")
self.last_processed_msg_hash = current_msg_hash
else:
logger.warning("❌ [监控] 未找到输入框位置,无法发送")
else:
logger.warning("⚠️ [监控] LLM 未生成有效回复")
else:
# 消息已处理过
pass
else:
# 最后一条是我发送的
if current_msg_hash != self.last_processed_msg_hash:
logger.info(f"⚪ [监控] 最后一条消息是自己发的,跳过回复: {last_msg}")
self.last_processed_msg_hash = current_msg_hash
await asyncio.sleep(self.check_interval)
except Exception as e:
logger.error(f"❌ 循环中发生错误: {e}", exc_info=True)
await asyncio.sleep(self.check_interval)
async def run_main():
"""
运行自动巡课机器人
"""
bot = ChatMonitorBot()
await bot.run()
if __name__ == "__main__":
# 应用 Win32 补丁
Win32Patch.patch()
try:
# 运行机器人
asyncio.run(run_main())
except KeyboardInterrupt:
logger.info("🛑 用户手动停止程序。")
except Exception as e:
logger.error(f"❌ 程序异常退出: {e}", exc_info=True)