Files
aiData/WeiXin/T7_AutoChatMonitor.py
HuangHai 1f6a48004b 'commit'
2026-01-25 18:17:37 +08:00

495 lines
24 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import asyncio
import logging
import os
import sys
import time
from datetime import datetime
import uiautomator2 as u2
# 添加项目根目录到 sys.path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
from Util import Win32Patch
from WeiXin.WxUtil import perform_input_action, clean_screenshots_dir, find_template_match, find_all_template_matches
from Util.LlmUtil import get_llm_response
from Util.EasyOcrKit import EasyOcrKit
# 配置日志
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Logs")
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 设置 logger
logger = logging.getLogger("T6_AutoChatMonitor")
logger.setLevel(logging.INFO)
# 清除现有的 handlers防止重复打印或配置冲突
if logger.hasHandlers():
logger.handlers.clear()
# 创建 FileHandler
log_file_path = os.path.join(log_dir, "T6_AutoChatMonitor.log")
file_handler = logging.FileHandler(log_file_path, encoding='utf-8', mode='w')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
# 创建 StreamHandler
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(stream_handler)
# 防止日志传播到 root logger避免重复输出
logger.propagate = False
# 打印日志文件位置,方便确认
logger.info(f"日志文件路径: {log_file_path}")
# 配置参数
CHECK_INTERVAL = 5 # 检查频率 (秒)
class ChatBot:
def __init__(self):
self.d = u2.connect()
self.last_message_text = ""
self.last_processed_msg = None # 记录上一条已处理/回复过的对方消息内容
self.screenshot_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Screenshots")
if not os.path.exists(self.screenshot_dir):
os.makedirs(self.screenshot_dir)
self.ocr_kit = EasyOcrKit(gpu=True)
self.is_first_run = True # 首次运行标志
self.persona = (
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师名叫大张老师。你目前在长春市少惠林作文素养培养中心工作。"
"你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。"
"你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。"
"【严格约束】:\n"
"1. 绝对禁止发散!绝对禁止幻觉!\n"
"2. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
"3. 仅针对家长明确表达的内容进行回复。\n"
"4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
"5. 回复必须简练,字数严格控制在 50 字以内!\n"
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
"- 单位:长春市少惠林作文素养培养中心\n"
"- 地址南环城路与临河街交汇TOUCH12街3楼325号\n"
"- 联系人小张老师电话18686619970\n"
"- 每学期开学招收小学三年级至六年级,初中七年级的学生入学,其它年段不招生。\n"
)
async def get_reply(self, history_text):
prompt = (
f"【教师人设】:{self.persona}\n\n"
f"【近期聊天记录】:\n{history_text}\n\n"
"【任务要求】:\n"
"请作为大张老师回复家长。**必须且只能针对聊天记录中的最后一条消息进行回复!**\n"
"之前的聊天记录仅供参考上下文,如果之前的问题已经回答过,绝对不要重复回答。\n"
"严禁发散,严禁编造家长没说过的情况。如果不清楚家长的意图,就温柔询问。\n"
"字数严格控制在 50 字以内。直接输出回复正文。"
)
full_response = ""
async for chunk in get_llm_response(prompt, stream=False):
full_response += chunk
return full_response.strip().strip('"').strip('').strip('')
async def process_single_voice(self, voice_msg, next_msg=None, input_box_y=None):
"""
处理单个语音消息的完整流程:
长按 -> CV找转文字 -> 点击 -> 等待 -> 截图OCR -> 长按 -> CV找取消转文字 -> 点击
返回: 转换后的文本内容 (如果没有转换成功,返回 None)
"""
vx, vy = voice_msg['coordinates']
content = voice_msg.get('content', '0"')
logger.info(f"🎤 开始处理语音消息: {content}, 坐标: ({vx}, {vy})")
try:
# 1. 长按语音消息
logger.info("👆 正在长按语音消息...")
self.d.long_click(vx, vy, 0.6)
logger.info("✅ 长按完成,等待菜单...")
time.sleep(0.3)
# 2. CV 模板匹配寻找 "转文字" 按钮
menu_shot_path = os.path.join(self.screenshot_dir, "t6_menu_shot_convert.jpg")
logger.info(f"📸 截取菜单图: {menu_shot_path}")
self.d.screenshot(menu_shot_path)
convert_template = r"d:\dsWork\aiData\WeiXin\Templates\zhun_wen_zi.jpg"
logger.info(f"🔍 寻找模板: {convert_template}")
convert_btn = find_template_match(menu_shot_path, convert_template, threshold=0.6)
if not convert_btn:
logger.warning("❌ CV 未找到 '转文字' 按钮,尝试小范围 OCR 兜底...")
# 尝试在该区域进行 OCR 识别,寻找 "转文字" 三个字
ocr_results_menu = self.ocr_kit.read_text(menu_shot_path)
for bbox, text, conf in ocr_results_menu:
if "转文字" in text or "转文" in text or "文字" in text:
cx = (bbox[0][0] + bbox[2][0]) / 2
cy = (bbox[0][1] + bbox[2][1]) / 2
convert_btn = (cx, cy)
logger.info(f"✅ OCR 兜底找到 '转文字' 按钮: {convert_btn}")
break
if not convert_btn:
logger.warning("❌ CV 和 OCR 均未找到 '转文字' 按钮,取消操作。")
# 点击屏幕中心区域的空白处关闭菜单,避免点到顶部返回键
self.d.click(500, 500)
return None
logger.info(f"✅ 最终找到 '转文字' 按钮坐标: {convert_btn}")
self.d.click(convert_btn[0], convert_btn[1])
# 3. 动态等待转换
duration_str = content.replace('"', '').strip()
try:
duration = int(duration_str)
except:
duration = 10
wait_seconds = max(2, duration / 5.0)
logger.info(f"⏳ 语音时长 {duration}s等待转换 {wait_seconds:.1f}s...")
time.sleep(wait_seconds)
# 4. 截图并 OCR 识别内容
ocr_shot_path = os.path.join(self.screenshot_dir, "t6_ocr_shot.jpg")
logger.info(f"📸 截取 OCR 识别图: {ocr_shot_path}")
self.d.screenshot(ocr_shot_path)
# OCR 识别
# 策略:识别整个屏幕,但只提取位于当前语音消息下方,且在下一条消息(如果有)上方的内容
logger.info("📖 开始 OCR 识别...")
ocr_results = self.ocr_kit.read_text(ocr_shot_path)
logger.info(f"✅ OCR 识别完成,获取 {len(ocr_results)} 个文本块")
except Exception as e:
logger.error(f"❌ process_single_voice 发生异常: {e}", exc_info=True)
return None
# 按 Y 坐标排序,确保从上往下处理
ocr_results.sort(key=lambda x: (x[0][0][1] + x[0][2][1]) / 2)
extracted_text = []
# 准备下一条消息的内容片段作为停止条件
next_msg_snippet = None
if next_msg and next_msg.get("type") == "text":
c = next_msg.get("content", "").strip()
if c:
next_msg_snippet = c[:8] # 取前8个字符作为指纹
for bbox, text, conf in ocr_results:
# bbox center y
c_y = (bbox[0][1] + bbox[2][1]) / 2
# 1. 过滤掉当前语音气泡及以上的内容
# 语音气泡中心是 vy底部大概在 vy + 30 左右
if c_y <= vy + 25:
continue
# 2. 如果有输入框坐标,过滤掉输入框以下的内容
if input_box_y and c_y >= input_box_y - 30:
continue
# 3. 如果遇到下一条消息的内容,停止读取
if next_msg_snippet and next_msg_snippet in text:
logger.info(f"🛑 遇到下一条消息内容 '{text}',停止 OCR 录入。")
break
# 4. 如果下一条是语音,尝试通过时长文本判断停止
if next_msg and next_msg.get("type") == "voice":
v_dur = next_msg.get("content", "").strip()
# 语音时长通常比较短,且包含 " 符号
if v_dur and v_dur in text and len(text) < 10:
logger.info(f"🛑 遇到下一条语音时长 '{text}',停止 OCR 录入。")
break
# 5. 安全兜底如果距离当前语音气泡太远超过600像素停止
# 这可以防止读取到屏幕底部无关的内容
if c_y > vy + 600:
break
extracted_text.append(text)
full_text = " ".join(extracted_text)
logger.info(f"📝 OCR 识别结果: {full_text}")
# 5. 再次长按语音消息 (为了取消转换)
# 注意:转换出文字后,界面可能会发生位移。
# 但通常语音气泡的相对位置(如果是最后一条)可能变化不大,或者我们假设用户不滑动
# 更稳妥的是:重新识别一次语音气泡位置?
# 用户说:"这样原来什么样,识别完就是什么样",意味着我们要恢复原状。
# 我们假设点击原来的位置还能点到语音气泡(如果它没被顶上去太多)
# 或者,我们可以点击转换出来的文字区域?
# 让我们尝试点击原来的坐标。
self.d.long_click(vx, vy, 0.6)
time.sleep(0.3)
# 6. CV 模板匹配寻找 "取消转文字" 按钮
menu_shot_path_cancel = os.path.join(self.screenshot_dir, "t6_menu_shot_cancel.jpg")
self.d.screenshot(menu_shot_path_cancel)
cancel_template = r"d:\dsWork\aiData\WeiXin\Templates\cancel_zhuan_wen_zi.jpg"
cancel_btn = find_template_match(menu_shot_path_cancel, cancel_template, threshold=0.6)
if cancel_btn:
logger.info(f"✅ CV 找到 '取消转文字' 按钮: {cancel_btn}")
self.d.click(cancel_btn[0], cancel_btn[1])
else:
logger.warning("❌ CV 未找到 '取消转文字' 按钮,点击中心区域关闭菜单。")
self.d.click(500, 500)
return full_text
async def run(self):
logger.info("🚀 大张老师自动巡课系统启动...")
# 0. 清除旧截图
clean_screenshots_dir()
last_screen_md5 = None
while True:
try:
logger.info("🔍 正在扫描当前界面内容...")
# 1. 截图
tmp_shot = os.path.join(self.screenshot_dir, "t6_monitor_temp.jpg")
logger.info(f"📸 正在截取屏幕... ({datetime.now().strftime('%H:%M:%S')})")
self.d.screenshot(tmp_shot)
# 计算 MD5 并去重
import hashlib
with open(tmp_shot, 'rb') as f:
current_md5 = hashlib.md5(f.read()).hexdigest()
if last_screen_md5 and current_md5 == last_screen_md5:
logger.info("😴 屏幕内容未变,跳过本次循环。")
await asyncio.sleep(CHECK_INTERVAL)
continue
last_screen_md5 = current_md5
# 2. 本地视觉分析 (替代 VLM)
logger.info("<EFBFBD> 正在进行本地视觉扫描...")
# A. 寻找语音图标 (audio.jpg) 和 红点 (red_point.jpg)
audio_template = r"d:\dsWork\aiData\WeiXin\Templates\audio.jpg"
red_point_template = r"d:\dsWork\aiData\WeiXin\Templates\red_point.jpg"
audio_matches = find_all_template_matches(tmp_shot, audio_template, threshold=0.8)
red_points = find_all_template_matches(tmp_shot, red_point_template, threshold=0.8)
# B. 本地 OCR 识别全文以构建上下文
ocr_results = self.ocr_kit.read_text(tmp_shot)
# 按 Y 坐标排序
ocr_results.sort(key=lambda x: (x[0][0][1] + x[0][2][1]) / 2)
dialogue_log = []
voice_messages = []
# 准备可视化调试图
import cv2
import numpy as np
debug_img = cv2.imread(tmp_shot)
# 记录已匹配到语音图标的 OCR 块索引
matched_ocr_indices = set()
# 先处理语音图标匹配
for ax, ay in audio_matches:
# 排除顶部标题栏(0-300)和底部输入区(1800+)
if ay < 300 or ay > 1800:
logger.info(f"⏭️ 忽略区域外语音图标: ({ax}, {ay})")
continue
sender = "对方" if ax < 500 else ""
logger.info(f"🎙️ 发现语音图标: x={ax}, y={ay}, 发送者={sender}")
is_unread = False
if red_points:
for rx, ry in red_points:
# 红点通常在语音图标右侧,且 Y 轴相近
if abs(ry - ay) < 50 and rx > ax:
is_unread = True
# 绘制红点
cv2.circle(debug_img, (int(rx), int(ry)), 12, (0, 0, 255), -1)
break
# 寻找附近的时长文字 (OCR)
duration_text = "语音"
for idx, (bbox, text, conf) in enumerate(ocr_results):
c_x = (bbox[0][0] + bbox[2][0]) / 2
c_y = (bbox[0][1] + bbox[2][1]) / 2
if abs(c_y - ay) < 40 and abs(c_x - ax) < 300:
if '"' in text or text.isdigit():
duration_text = text
matched_ocr_indices.add(idx)
break
# 计算点击坐标:直接点击语音图标中心
click_x, click_y = ax, ay
# 绘制视觉反馈
# 1. 语音图标用绿框
cv2.rectangle(debug_img, (int(ax-30), int(ay-30)), (int(ax+30), int(ay+30)), (0, 255, 0), 3)
# 2. 点击位置用红点 (用户偏好)
cv2.circle(debug_img, (int(click_x), int(click_y)), 15, (0, 0, 255), -1)
v_msg = {
"type": "voice",
"content": duration_text,
"coordinates": [click_x, click_y],
"sender": sender,
"is_unread": is_unread
}
if sender == "对方":
voice_messages.append(v_msg)
dialogue_log.append({
"y": ay,
"text": f"{sender}: [语音] {duration_text}",
"is_voice": True,
"id": f"voice_{ax}_{ay}"
})
# 处理剩余的 OCR 文字块 (普通文本)
for idx, (bbox, text, conf) in enumerate(ocr_results):
if idx in matched_ocr_indices: continue
x_min, x_max = bbox[0][0], bbox[2][0]
y_min, y_max = bbox[0][1], bbox[2][1]
c_x, c_y = (x_min + x_max) / 2, (y_min + y_max) / 2
if c_y < 300 or c_y > 1800: continue
if x_min < 250 and x_max < 700:
sender, color = "对方", (0, 255, 0)
elif x_max > 800 and x_min > 300:
sender, color = "", (255, 0, 0)
else:
sender, color = "系统", (128, 128, 128)
if sender != "系统":
logger.info(f"💬 发现文本消息: x={c_x}, y={c_y}, 发送者={sender}, 内容={text}")
cv2.rectangle(debug_img, (int(x_min), int(y_min)), (int(x_max), int(y_max)), color, 1)
dialogue_log.append({
"y": c_y,
"text": f"{sender}: {text}",
"is_voice": False
})
# 按 Y 轴重新排序整个对话日志
dialogue_log.sort(key=lambda x: x['y'])
# 保存调试图
debug_shot_path = os.path.join(self.screenshot_dir, "t6_debug_view.jpg")
cv2.imwrite(debug_shot_path, debug_img)
logger.info(f"🎨 已保存视觉调试图: {debug_shot_path}")
# C. 寻找输入框 (CV 模板匹配)
input_template = r"d:\dsWork\aiData\WeiXin\Templates\input_box.jpg" # 假设有这个模板
input_center = find_template_match(tmp_shot, input_template, threshold=0.6)
if not input_center:
# 几何兜底:屏幕底部 88% 处
from PIL import Image
with Image.open(tmp_shot) as img:
w, h = img.size
input_center = [w // 2, int(h * 0.88)]
logger.info(f"<EFBFBD> 使用几何兜底输入框坐标: {input_center}")
# 4. 语音处理逻辑
processed_voice_content = None
input_y = input_center[1] if input_center else None
# 只有未读的才处理
for v_msg in voice_messages:
if v_msg.get("is_unread") or self.is_first_run:
logger.info(f"🔴 发现未读/待处理语音: {v_msg['content']}")
# 找到 OCR 结果中的下一条作为边界
idx = -1
# 这里简化逻辑,直接处理
text = await self.process_single_voice(v_msg, None, input_y)
if text:
# 更新 log 中的内容
for item in dialogue_log:
if item.get("is_voice") and f"[语音] {v_msg['content']}" in item["text"]:
item["text"] = item["text"].replace("[语音]", f"[语音转文字: {text}]")
break
self.is_first_run = False
# 5. LLM 回复逻辑
final_dialogue_texts = [item['text'] for item in dialogue_log]
history_text = "\n".join(final_dialogue_texts)
# 判断是否需要回复:
# 核心规则:只有当最后一条消息是“对方”说的,且内容未处理过,才回复。
should_reply = False
current_last_content = ""
if dialogue_log:
last_item = dialogue_log[-1]
last_log = last_item["text"]
# 检查最后一条消息的发送者
if last_log.startswith("对方"):
parts = last_log.split(":", 1)
if len(parts) > 1:
current_last_content = parts[1].strip()
else:
current_last_content = last_log
if current_last_content != self.last_processed_msg:
logger.info(f"💡 发现新消息,准备回复。内容: {current_last_content}")
should_reply = True
else:
# logger.info(f"⚪ 消息已回复过,跳过: {current_last_content}")
should_reply = False
else:
# logger.info(f"⚪ 最后一条消息是我发送的,无需回复。")
should_reply = False
# 如果最后一条是我发的,重置 last_processed_msg
self.last_processed_msg = None
if should_reply:
logger.info("🤖 准备调用 LLM 生成回复...")
# 立即更新状态,防止在回复生成期间(如果耗时)重复触发
self.last_processed_msg = current_last_content
reply = await self.get_reply(history_text)
if reply:
logger.info(f"💡 LLM 回复: {reply}")
if input_center:
# 输入并发送
perform_input_action(self.d, input_center, reply)
# 发送后,为了防止下一轮 OCR 识别到自己的回复片段并误判为对方消息
# 我们把 last_processed_msg 设置为一个特殊的占位符,直到下一次真正识别到对方的新消息
# 或者更简单:在下一轮循环开始前稍微多等一下,让消息气泡完全显示
time.sleep(1)
# 将最后处理的消息内容标记为已处理,防止 LLM 回复逻辑在下一轮立即触发
# 注意:这里的 current_last_content 是对方的最后一条
else:
logger.warning("⚠️ LLM 未生成有效回复。")
# 休眠
await asyncio.sleep(CHECK_INTERVAL)
except Exception as e:
logger.error(f"❌ 主循环发生错误: {e}", exc_info=True)
await asyncio.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
Win32Patch.patch()
bot = ChatBot()
try:
asyncio.run(bot.run())
except KeyboardInterrupt:
logger.info("🛑 用户手动停止程序。")
except Exception as e:
logger.error(f"❌ 程序异常退出: {e}", exc_info=True)