308 lines
14 KiB
Python
308 lines
14 KiB
Python
# coding=utf-8
|
||
import os
|
||
import sys
|
||
import logging
|
||
import asyncio
|
||
import hashlib
|
||
import json
|
||
import numpy as np
|
||
|
||
import cv2
|
||
|
||
# 添加项目根目录到 sys.path
|
||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
if project_root not in sys.path:
|
||
sys.path.append(project_root)
|
||
|
||
from WeiXin import WxUtil
|
||
from WeiXin.WxUtil import perform_input_action
|
||
from Util.LlmUtil import get_llm_response
|
||
from Util import Win32Patch
|
||
|
||
# 配置日志
|
||
log_dir = WxUtil.LOG_DIR
|
||
if not os.path.exists(log_dir):
|
||
os.makedirs(log_dir)
|
||
|
||
log_file_path = os.path.join(log_dir, "T2_ChatMonitor.log")
|
||
|
||
# 设置 logger
|
||
logger = logging.getLogger("T2_ChatMonitor")
|
||
logger.setLevel(logging.INFO)
|
||
|
||
if logger.hasHandlers():
|
||
logger.handlers.clear()
|
||
|
||
file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='w')
|
||
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
||
logger.addHandler(file_handler)
|
||
|
||
stream_handler = logging.StreamHandler()
|
||
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
||
logger.addHandler(stream_handler)
|
||
|
||
logger.propagate = False
|
||
logger.info(f"🚀 日志文件路径: {os.path.abspath(log_file_path)}")
|
||
|
||
# 同时将 WxUtil 的日志也输出到同一个文件
|
||
wx_logger = logging.getLogger("WxUtil")
|
||
wx_logger.propagate = False # 防止日志向上传递导致重复 (因为 WxUtil 中调用了 basicConfig)
|
||
if not any(isinstance(h, logging.FileHandler) and os.path.abspath(h.baseFilename) == os.path.abspath(log_file_path) for h in wx_logger.handlers):
|
||
wx_logger.addHandler(file_handler)
|
||
wx_logger.addHandler(stream_handler) # 确保 WxUtil 也输出到控制台
|
||
|
||
class ChatMonitorBot:
|
||
"""
|
||
大张老师自动巡课系统 (CV版)
|
||
"""
|
||
def __init__(self):
|
||
self.device = None
|
||
self.screenshot_path = os.path.join(WxUtil.OUTPUT_DIR, "T2_ChatMonitor_live_shot.jpg")
|
||
self.debug_view_path = os.path.join(WxUtil.OUTPUT_DIR, "T2_ChatMonitor_debug_view.jpg")
|
||
self.dialogue_log = []
|
||
self.input_pos = None
|
||
self.last_screen_hash = None
|
||
self.last_processed_msg_hash = None
|
||
self.check_interval = 5 # 检查频率 (秒)
|
||
|
||
self.persona = (
|
||
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师,名叫‘大张老师’。你目前在‘长春市少惠林作文素养培养中心’工作。"
|
||
"你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。"
|
||
"你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。"
|
||
"【严格约束】:\n"
|
||
"1. 绝对禁止发散!绝对禁止幻觉!\n"
|
||
"2. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
|
||
"3. 仅针对家长明确表达的内容进行回复。\n"
|
||
"4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
|
||
"5. 回复必须简练,字数严格控制在 50 字以内!\n"
|
||
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
|
||
"- 单位:长春市少惠林作文素养培养中心\n"
|
||
"- 地址:南环城路与临河街交汇,TOUCH12街3楼325号\n"
|
||
"- 联系人:小张老师(电话:18686619970)\n"
|
||
"- 每学期开学招收小学三年级至六年级,初中七年级的学生入学,其它年段不招生。\n"
|
||
)
|
||
|
||
async def get_reply(self, last_message_text, context_text=""):
|
||
prompt = (
|
||
f"【教师人设】:{self.persona}\n\n"
|
||
f"【上下文对话内容】:\n{context_text}\n\n"
|
||
f"【最后一条待回复消息】:\n{last_message_text}\n\n"
|
||
"【任务要求】:\n"
|
||
"请作为大张老师回复家长。**必须且只能针对最后一条消息进行回复!**\n"
|
||
"参考上下文对话内容,确保回复逻辑连贯。\n"
|
||
"严禁发散,严禁编造家长没说过的情况。如果不清楚家长的意图,就温柔询问。\n"
|
||
"字数严格控制在 50 字以内。直接输出回复正文。"
|
||
)
|
||
|
||
full_response = ""
|
||
async for chunk in get_llm_response(prompt, stream=False):
|
||
full_response += chunk
|
||
return full_response.strip().strip('"').strip('“').strip('”')
|
||
|
||
def step_1_prepare_env(self):
|
||
"""步骤1: 环境准备"""
|
||
logger.info("--- [Step 1] 环境准备 ---")
|
||
WxUtil.setup_script_environment()
|
||
return True
|
||
|
||
def step_2_connect_device(self):
|
||
"""步骤2: 连接设备"""
|
||
logger.info("--- [Step 2] 连接设备 ---")
|
||
self.device = WxUtil.connect_device()
|
||
if not self.device:
|
||
logger.error("❌ 设备连接失败,请检查手机是否连接且开启了调试模式")
|
||
return False
|
||
return True
|
||
|
||
def get_image_hash(self, file_path):
|
||
"""计算图片的 MD5 哈希值 (忽略顶部 100 像素的状态栏)"""
|
||
if not os.path.exists(file_path):
|
||
return None
|
||
try:
|
||
# 使用 OpenCV 读取图片
|
||
img = cv2.imread(file_path)
|
||
if img is None:
|
||
# 如果读取失败,回退到文件哈希
|
||
with open(file_path, "rb") as f:
|
||
return hashlib.md5(f.read()).hexdigest()
|
||
|
||
# 裁剪掉顶部 150 像素 (状态栏/时间)
|
||
h, w = img.shape[:2]
|
||
if h > 150:
|
||
cropped_img = img[150:h, 0:w]
|
||
else:
|
||
cropped_img = img
|
||
|
||
# 计算裁剪后数据的哈希
|
||
return hashlib.md5(cropped_img.tobytes()).hexdigest()
|
||
except Exception as e:
|
||
logger.error(f"计算哈希出错: {e}, 回退到文件哈希")
|
||
with open(file_path, "rb") as f:
|
||
return hashlib.md5(f.read()).hexdigest()
|
||
|
||
async def run(self):
|
||
"""主运行循环"""
|
||
logger.info("🚀 大张老师自动巡课系统启动 (T2 增强版)...")
|
||
|
||
# 定义 JSON 序列化辅助函数
|
||
def numpy_serializer(obj):
|
||
if isinstance(obj, np.integer):
|
||
return int(obj)
|
||
if isinstance(obj, np.floating):
|
||
return float(obj)
|
||
if isinstance(obj, np.ndarray):
|
||
return obj.tolist()
|
||
raise TypeError(f"Type {type(obj)} not serializable")
|
||
|
||
# 1. 环境准备
|
||
if not self.step_1_prepare_env(): return
|
||
if not self.step_2_connect_device(): return
|
||
|
||
# 2. 首次运行:识别所有语音并获取上下文
|
||
logger.info("🔍 [首次运行] 正在进行全量识别,获取对话上下文...")
|
||
|
||
# 调用封装好的 get_first_screen
|
||
self.dialogue_log, self.input_pos, enter_path, flag_path = await WxUtil.get_first_screen(self.device)
|
||
|
||
# 更新 live paths (用于后续监控逻辑的引用)
|
||
import shutil
|
||
if enter_path and os.path.exists(enter_path):
|
||
shutil.copy(enter_path, self.screenshot_path)
|
||
|
||
if flag_path and os.path.exists(flag_path):
|
||
shutil.copy(flag_path, self.debug_view_path)
|
||
logger.info(f"📸 已保存识别标记图: {flag_path}")
|
||
|
||
if self.dialogue_log:
|
||
logger.info(f"✅ 首次运行识别完成,获取到 {len(self.dialogue_log)} 条消息上下文")
|
||
logger.info("\n" + "="*50)
|
||
logger.info("【测试模式】最终提取的对话记录:")
|
||
for msg in self.dialogue_log:
|
||
# 格式化输出:[发送者] 内容 (类型)
|
||
sender = msg.get('sender', '未知')
|
||
content = msg.get('content', '')
|
||
msg_type = "语音" if msg.get('type') == 'voice' else "文字"
|
||
|
||
# 按照用户要求的格式输出
|
||
logger.info(f"说话人: {sender}")
|
||
logger.info(f"消息类型: {msg_type}")
|
||
logger.info(f"消息内容: {content}")
|
||
logger.info("-" * 20)
|
||
logger.info("="*50 + "\n")
|
||
|
||
# 初始化最后处理的消息哈希,避免重复回复第一条
|
||
last_msg = self.dialogue_log[-1]
|
||
# last_msg 是字典,需要转字符串再 encode
|
||
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
|
||
self.last_processed_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
|
||
self.last_screen_hash = self.get_image_hash(self.screenshot_path)
|
||
else:
|
||
logger.warning("⚠️ 首次运行未识别到有效对话")
|
||
|
||
logger.info("🛑 测试结束:已完成所有语音的转换与读取。停止进入监控循环。")
|
||
return # 测试模式:直接退出,不进入监控循环
|
||
|
||
# 3. 进入循环阶段
|
||
logger.info("🔄 进入实时监控阶段...")
|
||
while True:
|
||
try:
|
||
# A. 截图并计算哈希
|
||
self.device.screenshot(self.screenshot_path)
|
||
current_screen_hash = self.get_image_hash(self.screenshot_path)
|
||
|
||
# B. 如果屏幕无变化,则跳过识别
|
||
if current_screen_hash == self.last_screen_hash:
|
||
await asyncio.sleep(self.check_interval)
|
||
continue
|
||
|
||
self.last_screen_hash = current_screen_hash
|
||
logger.info("📸 屏幕发生变化,正在分析...")
|
||
|
||
# C. 分析最新图片
|
||
dialogue_log, input_pos = await WxUtil.analyze_chat_image(
|
||
self.screenshot_path,
|
||
self.debug_view_path,
|
||
device=self.device,
|
||
process_strategy="UNREAD" # 监控阶段:只处理带红点的新语音
|
||
)
|
||
|
||
if not dialogue_log:
|
||
logger.info("😴 未识别到有效消息")
|
||
await asyncio.sleep(self.check_interval)
|
||
continue
|
||
|
||
logger.info(f"📊 当前识别到 {len(dialogue_log)} 条消息,最后一条: {dialogue_log[-1]}")
|
||
|
||
# 更新当前对话日志(可用于上下文参考)
|
||
self.dialogue_log = dialogue_log
|
||
self.input_pos = input_pos
|
||
|
||
# D. 只关注最后一条消息
|
||
last_msg = dialogue_log[-1]
|
||
# last_msg 是字典,需要序列化
|
||
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
|
||
current_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
|
||
|
||
# E. 判断是否需要回复 (对方发送且非重复消息)
|
||
sender = last_msg.get('sender', '')
|
||
if sender == "对方":
|
||
if current_msg_hash != self.last_processed_msg_hash:
|
||
event_shot = WxUtil.get_next_debug_path("event_new_msg")
|
||
self.device.screenshot(event_shot)
|
||
logger.info(f"💡 发现新消息: {last_msg},保存现场截图: {event_shot}")
|
||
|
||
# 获取上下文文本
|
||
context_text = "\n".join(dialogue_log[:-1])
|
||
|
||
# 生成回复
|
||
reply = await self.get_reply(last_msg, context_text)
|
||
|
||
if reply:
|
||
logger.info(f"🤖 LLM 回复: {reply}")
|
||
if self.input_pos:
|
||
perform_input_action(self.device, self.input_pos, reply)
|
||
|
||
# 发送后截图留存
|
||
reply_sent_shot = WxUtil.get_next_debug_path("event_reply_sent")
|
||
self.device.screenshot(reply_sent_shot)
|
||
logger.info(f"✅ 回复已发送,保存发送后截图: {reply_sent_shot}")
|
||
|
||
self.last_processed_msg_hash = current_msg_hash
|
||
else:
|
||
logger.warning("❌ 未找到输入框位置,无法发送")
|
||
else:
|
||
logger.warning("⚠️ LLM 未生成有效回复")
|
||
else:
|
||
# 消息已处理过
|
||
pass
|
||
else:
|
||
# 最后一条是我发送的
|
||
if current_msg_hash != self.last_processed_msg_hash:
|
||
logger.info(f"⚪ 最后一条消息非对方发送,跳过回复: {last_msg}")
|
||
self.last_processed_msg_hash = current_msg_hash
|
||
|
||
await asyncio.sleep(self.check_interval)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 循环中发生错误: {e}", exc_info=True)
|
||
await asyncio.sleep(self.check_interval)
|
||
|
||
async def run_main():
|
||
"""
|
||
运行自动巡课机器人
|
||
"""
|
||
bot = ChatMonitorBot()
|
||
await bot.run()
|
||
|
||
if __name__ == "__main__":
|
||
# 应用 Win32 补丁
|
||
Win32Patch.patch()
|
||
|
||
try:
|
||
# 运行机器人
|
||
asyncio.run(run_main())
|
||
except KeyboardInterrupt:
|
||
logger.info("🛑 用户手动停止程序。")
|
||
except Exception as e:
|
||
logger.error(f"❌ 程序异常退出: {e}", exc_info=True)
|