501 lines
24 KiB
Python
501 lines
24 KiB
Python
# coding=utf-8
|
||
import asyncio
|
||
import logging
|
||
import os
|
||
import sys
|
||
import time
|
||
from datetime import datetime
|
||
|
||
import uiautomator2 as u2
|
||
|
||
# 添加项目根目录到 sys.path
|
||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
if project_root not in sys.path:
|
||
sys.path.append(project_root)
|
||
|
||
from Util import Win32Patch
|
||
|
||
from WeiXin.WxUtil import perform_input_action, clean_screenshots_dir, is_in_chat_interface, find_template_match, find_all_template_matches
|
||
from Util.LlmUtil import get_llm_response
|
||
from Util.EasyOcrKit import EasyOcrKit
|
||
|
||
# 配置日志
|
||
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Logs")
|
||
if not os.path.exists(log_dir):
|
||
os.makedirs(log_dir)
|
||
|
||
# 设置 logger
|
||
logger = logging.getLogger("T6_AutoChatMonitor")
|
||
logger.setLevel(logging.INFO)
|
||
|
||
# 清除现有的 handlers,防止重复打印或配置冲突
|
||
if logger.hasHandlers():
|
||
logger.handlers.clear()
|
||
|
||
# 创建 FileHandler
|
||
log_file_path = os.path.join(log_dir, "T6_AutoChatMonitor.log")
|
||
file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='w')
|
||
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
||
logger.addHandler(file_handler)
|
||
|
||
# 创建 StreamHandler
|
||
stream_handler = logging.StreamHandler()
|
||
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
||
logger.addHandler(stream_handler)
|
||
|
||
# 防止日志传播到 root logger,避免重复输出
|
||
logger.propagate = False
|
||
|
||
# 打印日志文件位置,方便确认
|
||
logger.info(f"日志文件路径: {log_file_path}")
|
||
|
||
# 配置参数
|
||
CHECK_INTERVAL = 5 # 检查频率 (秒)
|
||
|
||
class ChatBot:
|
||
def __init__(self):
|
||
self.d = u2.connect()
|
||
self.last_message_text = ""
|
||
self.last_processed_msg = None # 记录上一条已处理/回复过的对方消息内容
|
||
self.screenshot_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Screenshots")
|
||
if not os.path.exists(self.screenshot_dir):
|
||
os.makedirs(self.screenshot_dir)
|
||
|
||
self.ocr_kit = EasyOcrKit(gpu=True)
|
||
|
||
self.is_first_run = True # 首次运行标志
|
||
|
||
self.persona = (
|
||
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师,名叫‘大张老师’。你目前在‘长春市少惠林作文素养培养中心’工作。"
|
||
"你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。"
|
||
"你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。"
|
||
"【严格约束】:\n"
|
||
"1. 绝对禁止发散!绝对禁止幻觉!\n"
|
||
"2. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
|
||
"3. 仅针对家长明确表达的内容进行回复。\n"
|
||
"4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
|
||
"5. 回复必须简练,字数严格控制在 50 字以内!\n"
|
||
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
|
||
"- 单位:长春市少惠林作文素养培养中心\n"
|
||
"- 地址:南环城路与临河街交汇,TOUCH12街3楼325号\n"
|
||
"- 联系人:小张老师(电话:18686619970)\n"
|
||
"- 每学期开学招收小学三年级至六年级,初中七年级的学生入学,其它年段不招生。\n"
|
||
)
|
||
|
||
async def get_reply(self, history_text):
|
||
prompt = (
|
||
f"【教师人设】:{self.persona}\n\n"
|
||
f"【近期聊天记录】:\n{history_text}\n\n"
|
||
"【任务要求】:\n"
|
||
"请作为大张老师回复家长。**必须且只能针对聊天记录中的最后一条消息进行回复!**\n"
|
||
"之前的聊天记录仅供参考上下文,如果之前的问题已经回答过,绝对不要重复回答。\n"
|
||
"严禁发散,严禁编造家长没说过的情况。如果不清楚家长的意图,就温柔询问。\n"
|
||
"字数严格控制在 50 字以内。直接输出回复正文。"
|
||
)
|
||
|
||
full_response = ""
|
||
async for chunk in get_llm_response(prompt, stream=False):
|
||
full_response += chunk
|
||
return full_response.strip().strip('"').strip('“').strip('”')
|
||
|
||
async def process_single_voice(self, voice_msg, next_msg=None, input_box_y=None):
|
||
"""
|
||
处理单个语音消息的完整流程:
|
||
长按 -> CV找转文字 -> 点击 -> 等待 -> 截图OCR -> 长按 -> CV找取消转文字 -> 点击
|
||
返回: 转换后的文本内容 (如果没有转换成功,返回 None)
|
||
"""
|
||
vx, vy = voice_msg['coordinates']
|
||
content = voice_msg.get('content', '0"')
|
||
logger.info(f"🎤 开始处理语音消息: {content}, 坐标: ({vx}, {vy})")
|
||
|
||
try:
|
||
# 1. 长按语音消息
|
||
logger.info("👆 正在长按语音消息...")
|
||
self.d.long_click(vx, vy, 0.6)
|
||
logger.info("✅ 长按完成,等待菜单...")
|
||
time.sleep(0.3)
|
||
|
||
# 2. CV 模板匹配寻找 "转文字" 按钮
|
||
menu_shot_path = os.path.join(self.screenshot_dir, "t6_menu_shot_convert.jpg")
|
||
logger.info(f"📸 截取菜单图: {menu_shot_path}")
|
||
self.d.screenshot(menu_shot_path)
|
||
|
||
convert_template = r"d:\dsWork\aiData\WeiXin\Templates\zhun_wen_zi.jpg"
|
||
logger.info(f"🔍 寻找模板: {convert_template}")
|
||
convert_btn = find_template_match(menu_shot_path, convert_template, threshold=0.6)
|
||
|
||
if not convert_btn:
|
||
logger.warning("❌ CV 未找到 '转文字' 按钮,尝试小范围 OCR 兜底...")
|
||
# 尝试在该区域进行 OCR 识别,寻找 "转文字" 三个字
|
||
ocr_results_menu = self.ocr_kit.read_text(menu_shot_path)
|
||
for bbox, text, conf in ocr_results_menu:
|
||
if "转文字" in text or "转文" in text or "文字" in text:
|
||
cx = (bbox[0][0] + bbox[2][0]) / 2
|
||
cy = (bbox[0][1] + bbox[2][1]) / 2
|
||
convert_btn = (cx, cy)
|
||
logger.info(f"✅ OCR 兜底找到 '转文字' 按钮: {convert_btn}")
|
||
break
|
||
|
||
if not convert_btn:
|
||
logger.warning("❌ CV 和 OCR 均未找到 '转文字' 按钮,取消操作。")
|
||
# 点击屏幕中心区域的空白处关闭菜单,避免点到顶部返回键
|
||
self.d.click(500, 500)
|
||
return None
|
||
|
||
logger.info(f"✅ 最终找到 '转文字' 按钮坐标: {convert_btn}")
|
||
self.d.click(convert_btn[0], convert_btn[1])
|
||
|
||
# 3. 动态等待转换
|
||
duration_str = content.replace('"', '').strip()
|
||
try:
|
||
duration = int(duration_str)
|
||
except:
|
||
duration = 10
|
||
wait_seconds = max(2, duration / 5.0)
|
||
logger.info(f"⏳ 语音时长 {duration}s,等待转换 {wait_seconds:.1f}s...")
|
||
time.sleep(wait_seconds)
|
||
|
||
# 4. 截图并 OCR 识别内容
|
||
ocr_shot_path = os.path.join(self.screenshot_dir, "t6_ocr_shot.jpg")
|
||
logger.info(f"📸 截取 OCR 识别图: {ocr_shot_path}")
|
||
self.d.screenshot(ocr_shot_path)
|
||
|
||
# OCR 识别
|
||
# 策略:识别整个屏幕,但只提取位于当前语音消息下方,且在下一条消息(如果有)上方的内容
|
||
logger.info("📖 开始 OCR 识别...")
|
||
ocr_results = self.ocr_kit.read_text(ocr_shot_path)
|
||
logger.info(f"✅ OCR 识别完成,获取 {len(ocr_results)} 个文本块")
|
||
except Exception as e:
|
||
logger.error(f"❌ process_single_voice 发生异常: {e}", exc_info=True)
|
||
return None
|
||
|
||
# 按 Y 坐标排序,确保从上往下处理
|
||
ocr_results.sort(key=lambda x: (x[0][0][1] + x[0][2][1]) / 2)
|
||
|
||
extracted_text = []
|
||
|
||
# 准备下一条消息的内容片段作为停止条件
|
||
next_msg_snippet = None
|
||
if next_msg and next_msg.get("type") == "text":
|
||
c = next_msg.get("content", "").strip()
|
||
if c:
|
||
next_msg_snippet = c[:8] # 取前8个字符作为指纹
|
||
|
||
for bbox, text, conf in ocr_results:
|
||
# bbox center y
|
||
c_y = (bbox[0][1] + bbox[2][1]) / 2
|
||
|
||
# 1. 过滤掉当前语音气泡及以上的内容
|
||
# 语音气泡中心是 vy,底部大概在 vy + 30 左右
|
||
if c_y <= vy + 25:
|
||
continue
|
||
|
||
# 2. 如果有输入框坐标,过滤掉输入框以下的内容
|
||
if input_box_y and c_y >= input_box_y - 30:
|
||
continue
|
||
|
||
# 3. 如果遇到下一条消息的内容,停止读取
|
||
if next_msg_snippet and next_msg_snippet in text:
|
||
logger.info(f"🛑 遇到下一条消息内容 '{text}',停止 OCR 录入。")
|
||
break
|
||
|
||
# 4. 如果下一条是语音,尝试通过时长文本判断停止
|
||
if next_msg and next_msg.get("type") == "voice":
|
||
v_dur = next_msg.get("content", "").strip()
|
||
# 语音时长通常比较短,且包含 " 符号
|
||
if v_dur and v_dur in text and len(text) < 10:
|
||
logger.info(f"🛑 遇到下一条语音时长 '{text}',停止 OCR 录入。")
|
||
break
|
||
|
||
# 5. 安全兜底:如果距离当前语音气泡太远(超过600像素),停止
|
||
# 这可以防止读取到屏幕底部无关的内容
|
||
if c_y > vy + 600:
|
||
break
|
||
|
||
extracted_text.append(text)
|
||
|
||
full_text = " ".join(extracted_text)
|
||
logger.info(f"📝 OCR 识别结果: {full_text}")
|
||
|
||
# 5. 再次长按语音消息 (为了取消转换)
|
||
# 注意:转换出文字后,界面可能会发生位移。
|
||
# 但通常语音气泡的相对位置(如果是最后一条)可能变化不大,或者我们假设用户不滑动
|
||
# 更稳妥的是:重新识别一次语音气泡位置?
|
||
# 用户说:"这样原来什么样,识别完就是什么样",意味着我们要恢复原状。
|
||
# 我们假设点击原来的位置还能点到语音气泡(如果它没被顶上去太多)
|
||
# 或者,我们可以点击转换出来的文字区域?
|
||
# 让我们尝试点击原来的坐标。
|
||
|
||
self.d.long_click(vx, vy, 0.6)
|
||
time.sleep(0.3)
|
||
|
||
# 6. CV 模板匹配寻找 "取消转文字" 按钮
|
||
menu_shot_path_cancel = os.path.join(self.screenshot_dir, "t6_menu_shot_cancel.jpg")
|
||
self.d.screenshot(menu_shot_path_cancel)
|
||
|
||
cancel_template = r"d:\dsWork\aiData\WeiXin\Templates\cancel_zhuan_wen_zi.jpg"
|
||
cancel_btn = find_template_match(menu_shot_path_cancel, cancel_template, threshold=0.6)
|
||
|
||
if cancel_btn:
|
||
logger.info(f"✅ CV 找到 '取消转文字' 按钮: {cancel_btn}")
|
||
self.d.click(cancel_btn[0], cancel_btn[1])
|
||
else:
|
||
logger.warning("❌ CV 未找到 '取消转文字' 按钮,点击中心区域关闭菜单。")
|
||
self.d.click(500, 500)
|
||
|
||
return full_text
|
||
|
||
async def run(self):
|
||
logger.info("🚀 大张老师自动巡课系统启动...")
|
||
|
||
# 0. 清除旧截图
|
||
clean_screenshots_dir()
|
||
|
||
last_screen_md5 = None
|
||
|
||
while True:
|
||
try:
|
||
# 0.5 检查是否在聊天界面
|
||
if not is_in_chat_interface(self.d):
|
||
logger.warning("📵 当前不在聊天界面,跳过扫描...")
|
||
await asyncio.sleep(CHECK_INTERVAL)
|
||
continue
|
||
|
||
logger.info("🔍 正在扫描当前界面内容...")
|
||
|
||
# 1. 截图
|
||
tmp_shot = os.path.join(self.screenshot_dir, "t6_monitor_temp.jpg")
|
||
logger.info(f"📸 正在截取屏幕... ({datetime.now().strftime('%H:%M:%S')})")
|
||
self.d.screenshot(tmp_shot)
|
||
|
||
# 计算 MD5 并去重
|
||
import hashlib
|
||
with open(tmp_shot, 'rb') as f:
|
||
current_md5 = hashlib.md5(f.read()).hexdigest()
|
||
|
||
if last_screen_md5 and current_md5 == last_screen_md5:
|
||
logger.info("😴 屏幕内容未变,跳过本次循环。")
|
||
await asyncio.sleep(CHECK_INTERVAL)
|
||
continue
|
||
|
||
last_screen_md5 = current_md5
|
||
|
||
# 2. 本地视觉分析 (替代 VLM)
|
||
logger.info("<EFBFBD>️ 正在进行本地视觉扫描...")
|
||
|
||
# A. 寻找语音图标 (audio.jpg) 和 红点 (red_point.jpg)
|
||
audio_template = r"d:\dsWork\aiData\WeiXin\Templates\audio.jpg"
|
||
red_point_template = r"d:\dsWork\aiData\WeiXin\Templates\red_point.jpg"
|
||
|
||
audio_matches = find_all_template_matches(tmp_shot, audio_template, threshold=0.8)
|
||
red_points = find_all_template_matches(tmp_shot, red_point_template, threshold=0.8)
|
||
|
||
# B. 本地 OCR 识别全文以构建上下文
|
||
ocr_results = self.ocr_kit.read_text(tmp_shot)
|
||
# 按 Y 坐标排序
|
||
ocr_results.sort(key=lambda x: (x[0][0][1] + x[0][2][1]) / 2)
|
||
|
||
dialogue_log = []
|
||
voice_messages = []
|
||
|
||
# 准备可视化调试图
|
||
import cv2
|
||
import numpy as np
|
||
debug_img = cv2.imread(tmp_shot)
|
||
|
||
# 记录已匹配到语音图标的 OCR 块索引
|
||
matched_ocr_indices = set()
|
||
|
||
# 先处理语音图标匹配
|
||
for ax, ay in audio_matches:
|
||
# 排除顶部标题栏(0-300)和底部输入区(1800+)
|
||
if ay < 300 or ay > 1800:
|
||
logger.info(f"⏭️ 忽略区域外语音图标: ({ax}, {ay})")
|
||
continue
|
||
|
||
sender = "对方" if ax < 500 else "我"
|
||
logger.info(f"🎙️ 发现语音图标: x={ax}, y={ay}, 发送者={sender}")
|
||
is_unread = False
|
||
if red_points:
|
||
for rx, ry in red_points:
|
||
# 红点通常在语音图标右侧,且 Y 轴相近
|
||
if abs(ry - ay) < 50 and rx > ax:
|
||
is_unread = True
|
||
# 绘制红点
|
||
cv2.circle(debug_img, (int(rx), int(ry)), 12, (0, 0, 255), -1)
|
||
break
|
||
|
||
# 寻找附近的时长文字 (OCR)
|
||
duration_text = "语音"
|
||
for idx, (bbox, text, conf) in enumerate(ocr_results):
|
||
c_x = (bbox[0][0] + bbox[2][0]) / 2
|
||
c_y = (bbox[0][1] + bbox[2][1]) / 2
|
||
if abs(c_y - ay) < 40 and abs(c_x - ax) < 300:
|
||
if '"' in text or text.isdigit():
|
||
duration_text = text
|
||
matched_ocr_indices.add(idx)
|
||
break
|
||
|
||
# 计算点击坐标:直接点击语音图标中心
|
||
click_x, click_y = ax, ay
|
||
|
||
# 绘制视觉反馈
|
||
# 语音图标用绿框
|
||
cv2.rectangle(debug_img, (int(ax-30), int(ay-30)), (int(ax+30), int(ay+30)), (0, 255, 0), 3)
|
||
# 点击位置用红十字
|
||
cv2.drawMarker(debug_img, (int(click_x), int(click_y)), (0, 0, 255), cv2.MARKER_CROSS, 35, 3)
|
||
|
||
v_msg = {
|
||
"type": "voice",
|
||
"content": duration_text,
|
||
"coordinates": [click_x, click_y],
|
||
"sender": sender,
|
||
"is_unread": is_unread
|
||
}
|
||
if sender == "对方":
|
||
voice_messages.append(v_msg)
|
||
dialogue_log.append({
|
||
"y": ay,
|
||
"text": f"{sender}: [语音] {duration_text}",
|
||
"is_voice": True,
|
||
"id": f"voice_{ax}_{ay}"
|
||
})
|
||
|
||
# 处理剩余的 OCR 文字块 (普通文本)
|
||
for idx, (bbox, text, conf) in enumerate(ocr_results):
|
||
if idx in matched_ocr_indices: continue
|
||
|
||
x_min, x_max = bbox[0][0], bbox[2][0]
|
||
y_min, y_max = bbox[0][1], bbox[2][1]
|
||
c_x, c_y = (x_min + x_max) / 2, (y_min + y_max) / 2
|
||
|
||
if c_y < 300 or c_y > 1800: continue
|
||
|
||
if x_min < 250 and x_max < 700:
|
||
sender, color = "对方", (0, 255, 0)
|
||
elif x_max > 800 and x_min > 300:
|
||
sender, color = "我", (255, 0, 0)
|
||
else:
|
||
sender, color = "系统", (128, 128, 128)
|
||
|
||
if sender != "系统":
|
||
logger.info(f"💬 发现文本消息: x={c_x}, y={c_y}, 发送者={sender}, 内容={text}")
|
||
cv2.rectangle(debug_img, (int(x_min), int(y_min)), (int(x_max), int(y_max)), color, 1)
|
||
dialogue_log.append({
|
||
"y": c_y,
|
||
"text": f"{sender}: {text}",
|
||
"is_voice": False
|
||
})
|
||
|
||
# 按 Y 轴重新排序整个对话日志
|
||
dialogue_log.sort(key=lambda x: x['y'])
|
||
|
||
# 保存调试图
|
||
debug_shot_path = os.path.join(self.screenshot_dir, "t6_debug_view.jpg")
|
||
cv2.imwrite(debug_shot_path, debug_img)
|
||
logger.info(f"🎨 已保存视觉调试图: {debug_shot_path}")
|
||
|
||
# C. 寻找输入框 (CV 模板匹配)
|
||
input_template = r"d:\dsWork\aiData\WeiXin\Templates\input_box.jpg" # 假设有这个模板
|
||
input_center = find_template_match(tmp_shot, input_template, threshold=0.6)
|
||
if not input_center:
|
||
# 几何兜底:屏幕底部 88% 处
|
||
from PIL import Image
|
||
with Image.open(tmp_shot) as img:
|
||
w, h = img.size
|
||
input_center = [w // 2, int(h * 0.88)]
|
||
logger.info(f"<EFBFBD> 使用几何兜底输入框坐标: {input_center}")
|
||
|
||
# 4. 语音处理逻辑
|
||
processed_voice_content = None
|
||
input_y = input_center[1] if input_center else None
|
||
|
||
# 只有未读的才处理
|
||
for v_msg in voice_messages:
|
||
if v_msg.get("is_unread") or self.is_first_run:
|
||
logger.info(f"🔴 发现未读/待处理语音: {v_msg['content']}")
|
||
# 找到 OCR 结果中的下一条作为边界
|
||
idx = -1
|
||
# 这里简化逻辑,直接处理
|
||
text = await self.process_single_voice(v_msg, None, input_y)
|
||
if text:
|
||
# 更新 log 中的内容
|
||
for item in dialogue_log:
|
||
if item.get("is_voice") and f"[语音] {v_msg['content']}" in item["text"]:
|
||
item["text"] = item["text"].replace("[语音]", f"[语音转文字: {text}]")
|
||
break
|
||
|
||
self.is_first_run = False
|
||
|
||
# 5. LLM 回复逻辑
|
||
final_dialogue_texts = [item['text'] for item in dialogue_log]
|
||
history_text = "\n".join(final_dialogue_texts)
|
||
|
||
# 判断是否需要回复:
|
||
# 核心规则:只有当最后一条消息是“对方”说的,且内容未处理过,才回复。
|
||
|
||
should_reply = False
|
||
current_last_content = ""
|
||
|
||
if dialogue_log:
|
||
last_item = dialogue_log[-1]
|
||
last_log = last_item["text"]
|
||
|
||
# 检查最后一条消息的发送者
|
||
if last_log.startswith("对方"):
|
||
parts = last_log.split(":", 1)
|
||
if len(parts) > 1:
|
||
current_last_content = parts[1].strip()
|
||
else:
|
||
current_last_content = last_log
|
||
|
||
if current_last_content != self.last_processed_msg:
|
||
logger.info(f"💡 发现新消息,准备回复。内容: {current_last_content}")
|
||
should_reply = True
|
||
else:
|
||
# logger.info(f"⚪ 消息已回复过,跳过: {current_last_content}")
|
||
should_reply = False
|
||
else:
|
||
# logger.info(f"⚪ 最后一条消息是我发送的,无需回复。")
|
||
should_reply = False
|
||
# 如果最后一条是我发的,重置 last_processed_msg
|
||
self.last_processed_msg = None
|
||
|
||
if should_reply:
|
||
logger.info("🤖 准备调用 LLM 生成回复...")
|
||
# 立即更新状态,防止在回复生成期间(如果耗时)重复触发
|
||
self.last_processed_msg = current_last_content
|
||
|
||
reply = await self.get_reply(history_text)
|
||
if reply:
|
||
logger.info(f"💡 LLM 回复: {reply}")
|
||
|
||
if input_center:
|
||
# 输入并发送
|
||
perform_input_action(self.d, input_center, reply)
|
||
# 发送后,为了防止下一轮 OCR 识别到自己的回复片段并误判为对方消息
|
||
# 我们把 last_processed_msg 设置为一个特殊的占位符,直到下一次真正识别到对方的新消息
|
||
# 或者更简单:在下一轮循环开始前稍微多等一下,让消息气泡完全显示
|
||
time.sleep(1)
|
||
# 将最后处理的消息内容标记为已处理,防止 LLM 回复逻辑在下一轮立即触发
|
||
# 注意:这里的 current_last_content 是对方的最后一条
|
||
else:
|
||
logger.warning("⚠️ LLM 未生成有效回复。")
|
||
|
||
# 休眠
|
||
await asyncio.sleep(CHECK_INTERVAL)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 主循环发生错误: {e}", exc_info=True)
|
||
await asyncio.sleep(CHECK_INTERVAL)
|
||
|
||
if __name__ == "__main__":
|
||
Win32Patch.patch()
|
||
bot = ChatBot()
|
||
try:
|
||
asyncio.run(bot.run())
|
||
except KeyboardInterrupt:
|
||
logger.info("🛑 用户手动停止程序。")
|
||
except Exception as e:
|
||
logger.error(f"❌ 程序异常退出: {e}", exc_info=True)
|