This commit is contained in:
HuangHai
2026-01-31 17:07:57 +08:00
parent a4b6ebfed5
commit fe4d593771
4 changed files with 270 additions and 147 deletions

View File

@@ -1,13 +1,13 @@
# coding=utf-8
import os
import sys
import time
import logging
import asyncio
import hashlib
import json
import logging
import os
import re
import sys
import threading
import numpy as np
import time
import cv2
@@ -17,7 +17,6 @@ if project_root not in sys.path:
sys.path.append(project_root)
from WeiXin import WxUtil
from WeiXin.WxUtil import perform_input_action
from Util.LlmUtil import get_llm_response
from Util import Win32Patch
from Util.AlyTtsKit import QwenTTSManager
@@ -58,8 +57,12 @@ class ChatMonitorAudioBot:
"""
大张老师自动巡课系统 (语音版)
"""
def __init__(self):
def __init__(self, debug_mode=False):
self.device = None
self.debug_mode = debug_mode
# 同步设置 WxUtil 的调试模式
WxUtil.set_debug_mode(debug_mode)
self.screenshot_path = os.path.join(WxUtil.OUTPUT_DIR, "AudioMonitor_live_shot.jpg")
self.debug_view_path = os.path.join(WxUtil.OUTPUT_DIR, "AudioMonitor_debug_view.jpg")
self.dialogue_log = []
@@ -78,13 +81,24 @@ class ChatMonitorAudioBot:
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师名叫大张老师。你目前在长春市少惠林作文素养培养中心工作。"
"你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。"
"你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。"
"【关于作文评价的回复逻辑】:\n"
"**仅当且仅当**家长明确询问孩子今天的作文情况(例如:“孩子今天写得怎么样?”、“今天作文有进步吗?”)时,才触发此逻辑。触发时,请随机生成具体、专业的反馈。反馈应包含:\n"
"1. 孩子今天在写作中遇到的具体小问题(如修辞手法运用不当、开头略显生硬等);\n"
"2. 值得表扬的亮点(如观察细致、某个句子写得很传神等);\n"
"3. 对字迹的评价(如字迹工整、卷面整洁等);\n"
"4. 针对性的课外阅读建议(推荐具体类型的书目)。\n"
"此类特定回复字数可放宽,控制在 150 字以内,确保语音回复时长在 30 秒内。\n"
"【关于通用咨询的回复逻辑】:\n"
"如果家长是咨询学校信息、地址、课程时间、年级设置,或者是刚加好友打招呼,请务必保持礼貌、知性、亲切,直接回答相关问题。严禁在这些情况下提及孩子的作文表现或虚构作文反馈。\n"
"【严格约束】:\n"
"1. 绝对禁止发散!绝对禁止幻觉!\n"
"2. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
"3. 仅针对家长明确表达的内容进行回复。\n"
"4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
"5. 回复必须简练,字数严格控制在 50 字以内!\n"
"6. 对方问什么就答什么。例如问‘学校叫什么’,就只回答‘少惠林’,不要回复地址和电话!\n"
"1. 绝对禁止输出任何括号内的内容(如动作、神态描述、心理活动等)!例如不要输出‘(微笑)’或‘(亲切地注视)’。\n"
"2. 你的回复是直接用于语音播放的,请只输出你想说的话,不要包含任何舞台指导文字。\n"
"3. 绝对禁止发散!绝对禁止幻觉!\n"
"4. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
"5. 仅针对家长明确表达的内容进行回复。\n"
"6. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
"7. 普通咨询回复字数严格控制在 50 字以内,专业作文评价回复控制在 150 字以内!\n"
"8. 对方问什么就答什么。例如问‘学校叫什么’,就只回答‘少惠林’,不要回复地址和电话!\n"
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
"- 单位/学校名称:长春市少惠林作文素养培养中心(简称:少惠林)\n"
"- 地址南环城路与临河街交汇TOUCH12街3楼325号\n"
@@ -114,15 +128,25 @@ class ChatMonitorAudioBot:
f"【最后一条待回复消息】:\n{last_message_text}\n\n"
"【任务要求】:\n"
"请作为大张老师回复家长。**必须且只能针对最后一条消息进行回复!**\n"
"参考上下文对话内容,确保回复逻辑连贯\n"
"严禁发散,严禁编造家长没说过的情况。如果不清楚家长的意图,就温柔询问。\n"
"字数严格控制在 50 字以内。直接输出回复正文。"
"1. **意图识别**:判断家长是否在询问“孩子今天的作文写得怎么样”\n"
"2. **条件回复**\n"
" - 如果是询问作文,请严格按【关于作文评价的回复逻辑】生成 150 字以内的详细专业反馈。\n"
" - 如果是咨询学校地址、课程、年级、打招呼等通用问题,请按【关于通用咨询的回复逻辑】简洁回答,严禁提到任何关于孩子作文的内容,字数控制在 50 字以内。\n"
"3. **格式要求**:严禁发散,直接输出回复正文,不包含任何括号内的动作描述。"
)
full_response = ""
async for chunk in get_llm_response(prompt, stream=False):
full_response += chunk
return full_response.strip().strip('"').strip('').strip('')
reply = full_response.strip().strip('"').strip('').strip('')
# [User Requested] 过滤掉括号内容 (如动作描述),防止 TTS 播放
# 支持中英文括号: (),
reply = re.sub(r'\(.*?\)', '', reply)
reply = re.sub(r'.*?', '', reply)
return reply.strip()
def step_1_prepare_env(self):
"""步骤1: 环境准备"""
@@ -262,6 +286,13 @@ class ChatMonitorAudioBot:
while True:
try:
self.device.screenshot(self.screenshot_path)
# [User Requested] 检查是否在聊天界面
if not WxUtil.check_is_chat_interface(self.screenshot_path):
logger.info("当前不在聊天界面,跳过本次循环,等待中...")
await asyncio.sleep(self.check_interval)
continue
current_screen_hash = self.get_image_hash(self.screenshot_path)
if current_screen_hash == self.last_screen_hash:
@@ -286,68 +317,75 @@ class ChatMonitorAudioBot:
self.dialogue_log = dialogue_log
self.input_pos = input_pos
last_msg = dialogue_log[-1]
# 过滤出对方发送的消息
other_msgs = [m for m in dialogue_log if m.get('sender') != ""]
if not other_msgs:
logger.info("当前没有对方发送的消息")
await asyncio.sleep(self.check_interval)
continue
last_msg = other_msgs[-1]
current_msg_hash = self.get_stable_message_hash(last_msg)
sender = last_msg.get('sender', '')
is_processed = current_msg_hash in self.processed_hashes
if is_processed and current_msg_hash != self.last_processed_msg_hash:
self.last_processed_msg_hash = current_msg_hash
if is_processed:
if current_msg_hash != self.last_processed_msg_hash:
self.last_processed_msg_hash = current_msg_hash
await asyncio.sleep(self.check_interval)
continue
if not is_processed and current_msg_hash != self.last_processed_msg_hash:
if sender != "":
logger.info(f"💡 发现新消息 [{last_msg.get('type')}]: {last_msg.get('content')}")
msg_shot_path = os.path.join(WxUtil.OUTPUT_DIR, f"NewMsg_{int(time.time())}.jpg")
self.device.screenshot(msg_shot_path)
logger.info(f"💡 发现新消息 [{last_msg.get('type')}]: {last_msg.get('content')}")
msg_shot_path = os.path.join(WxUtil.OUTPUT_DIR, f"NewMsg_{int(time.time())}.jpg")
self.device.screenshot(msg_shot_path)
context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in dialogue_log[:-1]])
last_content = last_msg.get('content') or ""
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.info("检测到未成功转换的语音消息,尝试强制重试 OCR 转换...")
dialogue_log_retry, _ = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
process_strategy="LAST",
restore_processed_voice=False
)
if dialogue_log_retry:
self.dialogue_log = dialogue_log_retry
last_msg = dialogue_log_retry[-1]
context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in dialogue_log if m != last_msg])
last_content = last_msg.get('content') or ""
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.info("检测到未成功转换的语音消息,尝试强制重试 OCR 转换...")
dialogue_log_retry, _ = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
process_strategy="LAST",
restore_processed_voice=False
)
if dialogue_log_retry:
retry_other_msgs = [m for m in dialogue_log_retry if m.get('sender') != ""]
if retry_other_msgs:
last_msg = retry_other_msgs[-1]
last_content = last_msg.get('content') or ""
current_msg_hash = self.get_stable_message_hash(last_msg)
if current_msg_hash in self.processed_hashes:
self.last_processed_msg_hash = current_msg_hash
continue
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.warning("语音消息内容为空,暂不生成回复")
await asyncio.sleep(self.check_interval)
continue
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.warning("语音消息内容为空,暂不生成回复")
await asyncio.sleep(self.check_interval)
continue
reply = await self.get_reply(last_content, context_text)
reply = await self.get_reply(last_content, context_text)
if reply:
logger.info(f"LLM 建议回复: {reply}")
if reply:
logger.info(f"LLM 建议回复: {reply}")
# 发送语音回复
success = await self.send_voice_reply(reply)
if success:
logger.info(">>> 语音回复发送成功 <<<")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
else:
logger.error("语音回复动作执行失败")
else:
logger.info("LLM 认为无需回复")
# 发送语音回复
success = await self.send_voice_reply(reply)
if success:
logger.info(">>> 语音回复发送成功 <<<")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
else:
logger.error("语音回复动作执行失败")
else:
self.last_processed_msg_hash = current_msg_hash
logger.info("LLM 认为无需回复")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
await asyncio.sleep(self.check_interval)
@@ -356,7 +394,8 @@ class ChatMonitorAudioBot:
await asyncio.sleep(self.check_interval)
async def run_main():
bot = ChatMonitorAudioBot()
# 默认关闭调试模式以提高响应速度,如需调试可设为 True
bot = ChatMonitorAudioBot(debug_mode=False)
await bot.run()
if __name__ == "__main__":

View File

@@ -56,8 +56,12 @@ class ChatMonitorBot:
"""
大张老师自动巡课系统 (CV版)
"""
def __init__(self):
def __init__(self, debug_mode=False):
self.device = None
self.debug_mode = debug_mode
# 同步设置 WxUtil 的调试模式
WxUtil.set_debug_mode(debug_mode)
self.screenshot_path = os.path.join(WxUtil.OUTPUT_DIR, "T2_ChatMonitor_live_shot.jpg")
self.debug_view_path = os.path.join(WxUtil.OUTPUT_DIR, "T2_ChatMonitor_debug_view.jpg")
self.dialogue_log = []
@@ -74,13 +78,23 @@ class ChatMonitorBot:
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师名叫大张老师。你目前在长春市少惠林作文素养培养中心工作。"
"你不仅是一位作文教学专家,更是一位心思细腻、能与家长共情的教育智者。"
"你的回复风格应该是:温柔、知性、亲切,就像一位邻家大姐姐在聊天。"
"【关于作文评价的回复逻辑】:\n"
"**仅当且仅当**家长明确询问孩子今天的作文情况(例如:“孩子今天写得怎么样?”、“今天作文有进步吗?”)时,才触发此逻辑。触发时,请随机生成具体、专业的反馈。反馈应包含:\n"
"1. 孩子今天在写作中遇到的具体小问题(如修辞手法运用不当、开头略显生硬等);\n"
"2. 值得表表扬的亮点(如观察细致、某个句子写得很传神等);\n"
"3. 对字迹的评价(如字迹工整、卷面整洁等);\n"
"4. 针对性的课外阅读建议(推荐具体类型的书目)。\n"
"此类特定回复字数可适当增加,控制在 150 字以内。\n"
"【关于通用咨询的回复逻辑】:\n"
"如果家长是咨询学校信息、地址、课程时间、年级设置,或者是刚加好友打招呼,请务必保持礼貌、知性、亲切,直接回答相关问题。严禁在这些情况下提及孩子的作文表现或虚构作文反馈。\n"
"【严格约束】:\n"
"1. 绝对禁止发散!绝对禁止幻觉!\n"
"2. 知道什么就说什么,不要乱讲话,不要自己编造内容\n"
"3. 仅针对家长明确表达的内容进行回复。\n"
"4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词\n"
"5. 回复必须简练,字数严格控制在 50 字以内!\n"
"6. 对方问什么就答什么。例如问‘学校叫什么’,就只回答‘少惠林’,不要回复地址和电话\n"
"1. 绝对禁止输出任何括号内的内容(如动作、神态描述等)!例如不要输出‘(微笑)’。\n"
"2. 绝对禁止发散!绝对禁止幻觉\n"
"3. 知道什么就说什么,不要乱讲话,不要自己编造内容!\n"
"4. 仅针对家长明确表达的内容进行回复\n"
"5. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
"6. 普通咨询回复字数严格控制在 50 字以内,专业作文评价回复控制在 150 字以内\n"
"7. 对方问什么就答什么。例如问‘学校叫什么’,就只回答‘少惠林’,不要回复地址和电话!\n"
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
"- 单位/学校名称:长春市少惠林作文素养培养中心(简称:少惠林)\n"
"- 地址南环城路与临河街交汇TOUCH12街3楼325号\n"
@@ -114,9 +128,11 @@ class ChatMonitorBot:
f"【最后一条待回复消息】:\n{last_message_text}\n\n"
"【任务要求】:\n"
"请作为大张老师回复家长。**必须且只能针对最后一条消息进行回复!**\n"
"参考上下文对话内容,确保回复逻辑连贯\n"
"严禁发散,严禁编造家长没说过的情况。如果不清楚家长的意图,就温柔询问。\n"
"字数严格控制在 50 字以内。直接输出回复正文。"
"1. **意图识别**:判断家长是否在询问“孩子今天的作文写得怎么样”\n"
"2. **条件回复**\n"
" - 如果是询问作文,请严格按【关于作文评价的回复逻辑】生成 150 字以内的详细专业反馈。\n"
" - 如果是咨询学校地址、课程、年级、打招呼等通用问题,请按【关于通用咨询的回复逻辑】简洁回答,严禁提到任何关于孩子作文的内容,字数控制在 50 字以内。\n"
"3. **格式要求**:严禁发散,直接输出回复正文,不包含任何括号内的动作描述。"
)
full_response = ""
@@ -213,6 +229,13 @@ class ChatMonitorBot:
try:
# A. 截图并计算哈希
self.device.screenshot(self.screenshot_path)
# [User Requested] 检查是否在聊天界面
if not WxUtil.check_is_chat_interface(self.screenshot_path):
logger.info("当前不在聊天界面,跳过本次循环,等待中...")
await asyncio.sleep(self.check_interval)
continue
current_screen_hash = self.get_image_hash(self.screenshot_path)
# B. 如果屏幕无变化,则跳过识别
@@ -243,86 +266,92 @@ class ChatMonitorBot:
self.input_pos = input_pos
# D. 提取最新消息并检查是否需要回复
last_msg = dialogue_log[-1]
# 过滤出对方发送的消息
other_msgs = [m for m in dialogue_log if m.get('sender') != ""]
if not other_msgs:
logger.info("当前没有对方发送的消息")
await asyncio.sleep(self.check_interval)
continue
last_msg = other_msgs[-1]
current_msg_hash = self.get_stable_message_hash(last_msg)
sender = last_msg.get('sender', '')
# 检查该消息是否已经处理过 (通过内容哈希)
is_processed = current_msg_hash in self.processed_hashes
if is_processed and current_msg_hash != self.last_processed_msg_hash:
self.last_processed_msg_hash = current_msg_hash
if is_processed:
if current_msg_hash != self.last_processed_msg_hash:
self.last_processed_msg_hash = current_msg_hash
await asyncio.sleep(self.check_interval)
continue
if not is_processed and current_msg_hash != self.last_processed_msg_hash:
if sender != "":
logger.info(f"💡 发现新消息 [{last_msg.get('type')}]: {last_msg.get('content')}")
# 记录发现新消息的现场截图
msg_shot_path = os.path.join(WxUtil.OUTPUT_DIR, f"NewMsg_{int(time.time())}.jpg")
self.device.screenshot(msg_shot_path)
logger.info(f"已保存新消息现场截图: {msg_shot_path}")
logger.info(f"💡 发现新消息 [{last_msg.get('type')}]: {last_msg.get('content')}")
# 记录发现新消息的现场截图
msg_shot_path = os.path.join(WxUtil.OUTPUT_DIR, f"NewMsg_{int(time.time())}.jpg")
self.device.screenshot(msg_shot_path)
logger.info(f"已保存新消息现场截图: {msg_shot_path}")
# 获取上下文文本
context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in dialogue_log[:-1]])
last_content = last_msg.get('content') or ""
# 兜底逻辑:语音消息若无文字内容,尝试强制触发重试
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.info("检测到未成功转换的语音消息,尝试强制重试 OCR 转换...")
dialogue_log_retry, _ = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
process_strategy="LAST",
restore_processed_voice=False
)
if dialogue_log_retry:
self.dialogue_log = dialogue_log_retry
last_msg = dialogue_log_retry[-1]
# 获取上下文文本
context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in dialogue_log if m != last_msg])
last_content = last_msg.get('content') or ""
# 兜底逻辑:语音消息若无文字内容,尝试强制触发重试
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.info("检测到未成功转换的语音消息,尝试强制重试 OCR 转换...")
dialogue_log_retry, _ = await WxUtil.analyze_chat_image(
self.screenshot_path,
self.debug_view_path,
device=self.device,
process_strategy="LAST",
restore_processed_voice=False
)
if dialogue_log_retry:
retry_other_msgs = [m for m in dialogue_log_retry if m.get('sender') != ""]
if retry_other_msgs:
last_msg = retry_other_msgs[-1]
last_content = last_msg.get('content') or ""
current_msg_hash = self.get_stable_message_hash(last_msg)
if current_msg_hash in self.processed_hashes:
self.last_processed_msg_hash = current_msg_hash
continue
# 语音消息若重试后仍无内容,暂不回复
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.warning("语音消息内容为空,暂不生成回复")
await asyncio.sleep(self.check_interval)
continue
if last_msg.get('type') == 'voice' and not last_content.strip():
logger.warning("语音消息内容为空,暂不生成回复")
await asyncio.sleep(self.check_interval)
continue
# E. 生成回复
reply = await self.get_reply(last_content, context_text)
if reply:
logger.info(f"LLM 建议回复: {reply}")
if self.input_pos:
# 确定输入框位置
target_pos = self.input_pos[0] if isinstance(self.input_pos, (list, tuple)) and len(self.input_pos) == 2 else self.input_pos
# 执行输入和发送动作,并保存过程截图
success = perform_input_action(
self.device,
target_pos,
reply,
auto_send=True,
debug_prefix=f"Reply_{int(time.time())}"
)
if success:
logger.info(">>> 回复发送成功 <<<")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
else:
logger.error("回复动作执行失败")
# E. 生成回复
reply = await self.get_reply(last_content, context_text)
if reply:
logger.info(f"LLM 建议回复: {reply}")
if self.input_pos:
# 确定输入框位置
target_pos = self.input_pos[0] if isinstance(self.input_pos, (list, tuple)) and len(self.input_pos) == 2 else self.input_pos
# 执行输入和发送动作,并保存过程截图
success = perform_input_action(
self.device,
target_pos,
reply,
auto_send=True,
debug_prefix=f"Reply_{int(time.time())}"
)
if success:
logger.info(">>> 回复发送成功 <<<")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
else:
logger.error("无法定位输入框坐标,放弃本次回复")
logger.error("回复动作执行失败")
else:
logger.info("LLM 认为无需回复")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
logger.error("无法定位输入框坐标,放弃本次回复")
else:
self.last_processed_msg_hash = current_msg_hash
logger.info("LLM 认为无需回复")
self._record_processed_hash(last_msg, current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
await asyncio.sleep(self.check_interval)
@@ -334,7 +363,8 @@ async def run_main():
"""
运行自动巡课机器人
"""
bot = ChatMonitorBot()
# 默认关闭调试模式以提高响应速度,如需调试可设为 True
bot = ChatMonitorBot(debug_mode=False)
await bot.run()
if __name__ == "__main__":

View File

@@ -833,7 +833,8 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
logger.info(f"准备处理语音 ({vx}, {vy})...")
# 高亮正在处理的语音并保存更新后的调试图
draw_debug_info(current_output_path, messages, current_voice_center=(vx, vy))
if DEBUG_MODE:
draw_debug_info(current_output_path, messages, current_voice_center=(vx, vy))
# 执行操作:长按 -> 转文字
logger.info(f"正在长按语音消息 ({vx}, {vy})...")
@@ -851,8 +852,8 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
d.screenshot(menu_shot)
btn_pos = find_template_match(menu_shot, zhuan_template, threshold=0.7)
else:
# 调试模式关闭时,直接在内存中匹配
btn_pos = find_template_match(d.screenshot(), zhuan_template, threshold=0.7)
# 调试模式关闭时,直接在内存中匹配 (使用 format='opencv' 提高效率)
btn_pos = find_template_match(d.screenshot(format='opencv'), zhuan_template, threshold=0.7)
if btn_pos:
break
@@ -936,7 +937,8 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
d.screenshot(restore_menu_shot)
cancel_btn = find_template_match(restore_menu_shot, cancel_template, threshold=0.7)
else:
cancel_btn = find_template_match(d.screenshot(), cancel_template, threshold=0.7)
# 调试模式关闭时,直接在内存中匹配 (使用 format='opencv' 提高效率)
cancel_btn = find_template_match(d.screenshot(format='opencv'), cancel_template, threshold=0.7)
if cancel_btn:
break
@@ -1134,16 +1136,31 @@ def find_input_box_center(image_path):
logger.error(f"find_input_box_center error: {e}")
return (540, 2100), None
def find_template_match(screen_path, template_path, threshold=0.8):
def find_template_match(screen_input, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找按钮中心坐标
:param screen_input: 可以是文件路径 (str) 或 OpenCV 图像 (numpy.ndarray)
:param template_path: 模板文件路径
:param threshold: 匹配阈值
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return None
img = cv2.imread(screen_path)
# 处理输入图像
if isinstance(screen_input, str):
img = cv2.imread(screen_input)
elif isinstance(screen_input, np.ndarray):
img = screen_input
else:
# 尝试处理 PIL Image (uiautomator2 默认返回)
try:
img = cv2.cvtColor(np.array(screen_input), cv2.COLOR_RGB2BGR)
except Exception:
logger.error(f"Invalid screen_input type: {type(screen_input)}")
return None
template = cv2.imread(template_path)
if img is None or template is None:
return None
@@ -1545,16 +1562,53 @@ def switch_to_keyboard_mode(d):
logger.error(f"switch_to_keyboard_mode error: {e}")
return False
def match_template_center(image_path, template_path, threshold=0.8):
def check_is_chat_interface(screenshot_path):
"""
检查当前是否在聊天界面
通过匹配 'audio_reply.jpg' (语音图标) 或 'keyboard.jpg' (键盘图标) 来判断
"""
audio_reply_template = os.path.join(TEMPLATE_DIR, "audio_reply.jpg")
keyboard_template = os.path.join(TEMPLATE_DIR, "keyboard.jpg")
# 检查语音图标
if match_template_center(screenshot_path, audio_reply_template, threshold=0.8):
logger.info("✅ 检测到语音回复图标,确认处于聊天界面")
return True
# 检查键盘图标
if match_template_center(screenshot_path, keyboard_template, threshold=0.8):
logger.info("✅ 检测到键盘输入图标,确认处于聊天界面")
return True
logger.warning("⚠️ 未检测到聊天界面特征图标,当前可能不在聊天页面")
return False
def match_template_center(image_input, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找目标图片中心坐标
:param image_input: 可以是文件路径 (str) 或 OpenCV 图像 (numpy.ndarray)
"""
try:
if not os.path.exists(image_path) or not os.path.exists(template_path):
logger.error(f"Image or template not found: {image_path}, {template_path}")
if not os.path.exists(template_path):
logger.error(f"Template not found: {template_path}")
return None
img = cv2.imread(image_path)
# 处理输入图像
if isinstance(image_input, str):
if not os.path.exists(image_input):
logger.error(f"Image file not found: {image_input}")
return None
img = cv2.imread(image_input)
elif isinstance(image_input, np.ndarray):
img = image_input
else:
# 尝试处理 PIL Image
try:
img = cv2.cvtColor(np.array(image_input), cv2.COLOR_RGB2BGR)
except Exception:
logger.error(f"Invalid image_input type: {type(image_input)}")
return None
template = cv2.imread(template_path)
if img is None or template is None: