This commit is contained in:
HuangHai
2026-01-25 13:42:17 +08:00
parent 07c65bed47
commit f9c9181b73
16 changed files with 386 additions and 47 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 200 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 91 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 135 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 84 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 124 KiB

View File

@@ -18,7 +18,7 @@ from Util.EasyOcrKit import EasyOcrKit
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("T6_Debug")
logger = logging.getLogger("T5_VLM_Voice_Debug")
async def main():
logger.info("🚀 T6 VLM 语音坐标调试工具启动...")

View File

@@ -7,13 +7,16 @@ import os
import asyncio
from datetime import datetime
from Util import Win32Patch
# 添加项目根目录到 sys.path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
from WeiXin.WxUtil import find_input_box_center, perform_input_action, analyze_chat_image, clean_screenshots_dir, is_in_chat_interface
from WeiXin.WxUtil import find_input_box_center, perform_input_action, get_vlm_analysis, clean_screenshots_dir, is_in_chat_interface, get_vlm_json
from Util.LlmUtil import get_llm_response
from Util.EasyOcrKit import EasyOcrKit
# 配置日志
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Logs")
@@ -24,7 +27,7 @@ logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(log_dir, "T5_AutoChatMonitor.log"), encoding='utf-8', mode='w'),
logging.FileHandler(os.path.join(log_dir, "T6_AutoChatMonitor.log"), encoding='utf-8', mode='w'),
logging.StreamHandler()
]
)
@@ -44,6 +47,10 @@ class ChatBot:
self.screenshot_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Screenshots")
if not os.path.exists(self.screenshot_dir):
os.makedirs(self.screenshot_dir)
self.ocr_kit = EasyOcrKit()
self.is_first_run = True # 首次运行标志
self.persona = (
"你是一名1999年毕业、拥有27年一线教学经验的小学高级女教师名叫大张老师。你目前在长春市少惠林作文素养培养中心工作。"
@@ -86,6 +93,93 @@ class ChatBot:
full_response += chunk
return full_response.strip().strip('"').strip('').strip('')
async def process_single_voice(self, voice_msg):
"""
处理单个语音消息的完整流程:
长按 -> VLM找转文字 -> 点击 -> 等待 -> 截图OCR -> 长按 -> VLM找取消转文字 -> 点击
返回: 转换后的文本内容 (如果没有转换成功,返回 None)
"""
vx, vy = voice_msg['coordinates']
content = voice_msg.get('content', '0"')
logger.info(f"🎤 开始处理语音消息: {content}, 坐标: ({vx}, {vy})")
# 1. 长按语音消息
self.d.long_click(vx, vy, 1.5)
time.sleep(1.0)
# 2. CV 模板匹配寻找 "转文字" 按钮
menu_shot_path = os.path.join(self.screenshot_dir, "t6_menu_shot_convert.jpg")
self.d.screenshot(menu_shot_path)
convert_template = r"d:\dsWork\aiData\WeiXin\Templates\zhun_wen_zi.jpg"
convert_btn = find_template_match(menu_shot_path, convert_template, threshold=0.7)
if not convert_btn:
logger.warning("❌ CV 未找到 '转文字' 按钮,取消操作。")
self.d.click(vx + 200, vy) # 点击空白处关闭菜单
return None
logger.info(f"✅ CV 找到 '转文字' 按钮: {convert_btn}")
self.d.click(convert_btn[0], convert_btn[1])
# 3. 动态等待转换
duration_str = content.replace('"', '').strip()
try:
duration = int(duration_str)
except:
duration = 10
wait_seconds = max(2, duration / 5.0)
logger.info(f"⏳ 语音时长 {duration}s等待转换 {wait_seconds:.1f}s...")
time.sleep(wait_seconds)
# 4. 截图并 OCR 识别内容
ocr_shot_path = os.path.join(self.screenshot_dir, "t6_ocr_shot.jpg")
self.d.screenshot(ocr_shot_path)
# OCR 识别
# 策略:识别整个屏幕,找到位于语音消息下方的文字
# 简单起见,我们假设转换出的文字在语音气泡下方 y > vy
ocr_results = self.ocr_kit.read_text(ocr_shot_path)
extracted_text = []
for bbox, text, conf in ocr_results:
# bbox center y
c_y = (bbox[0][1] + bbox[2][1]) / 2
# 过滤条件:在语音气泡下方,且距离不太远 (例如 300 像素内)
if c_y > vy and c_y < vy + 400:
extracted_text.append(text)
full_text = " ".join(extracted_text)
logger.info(f"📝 OCR 识别结果: {full_text}")
# 5. 再次长按语音消息 (为了取消转换)
# 注意:转换出文字后,界面可能会发生位移。
# 但通常语音气泡的相对位置(如果是最后一条)可能变化不大,或者我们假设用户不滑动
# 更稳妥的是:重新识别一次语音气泡位置?
# 用户说:"这样原来什么样,识别完就是什么样",意味着我们要恢复原状。
# 我们假设点击原来的位置还能点到语音气泡(如果它没被顶上去太多)
# 或者,我们可以点击转换出来的文字区域?
# 让我们尝试点击原来的坐标。
self.d.long_click(vx, vy, 1.5)
time.sleep(1.0)
# 6. CV 模板匹配寻找 "取消转文字" 按钮
menu_shot_path_cancel = os.path.join(self.screenshot_dir, "t6_menu_shot_cancel.jpg")
self.d.screenshot(menu_shot_path_cancel)
cancel_template = r"d:\dsWork\aiData\WeiXin\Templates\cancel_zhuan_wen_zi.jpg"
cancel_btn = find_template_match(menu_shot_path_cancel, cancel_template, threshold=0.7)
if cancel_btn:
logger.info(f"✅ CV 找到 '取消转文字' 按钮: {cancel_btn}")
self.d.click(cancel_btn[0], cancel_btn[1])
else:
logger.warning("❌ CV 未找到 '取消转文字' 按钮,尝试点击空白处关闭菜单。")
self.d.click(vx + 200, vy)
return full_text
async def run(self):
logger.info("🚀 大张老师自动巡课系统启动...")
@@ -94,42 +188,144 @@ class ChatBot:
while True:
try:
# 1. 检查是否在微信聊天界面 (改为通过 VLM 识别结果判断,不再使用 UI 检查)
# if not is_in_chat_interface(self.d):
# logger.warning("⚠️ 当前不在微信聊天界面,等待下一次扫描...")
# await asyncio.sleep(CHECK_INTERVAL)
# continue
logger.info("🔍 正在扫描当前界面内容...")
# 1. 截图并分析
tmp_shot = os.path.join(self.screenshot_dir, "t5_monitor_temp.jpg")
analyzed_shot = os.path.join(self.screenshot_dir, "t5_monitor_analyzed.jpg")
# 1. 截图
tmp_shot = os.path.join(self.screenshot_dir, "t6_monitor_temp.jpg")
logger.info(f"📸 正在截取屏幕... ({datetime.now().strftime('%H:%M:%S')})")
self.d.screenshot(tmp_shot)
logger.info("🎨 正在分析聊天界面内容 (检测头像与对话)...")
# analyze_chat_image 现在会返回 None, None 如果不是聊天界面
dialogue_log, input_center = await analyze_chat_image(tmp_shot, analyzed_shot, device=self.d)
if dialogue_log is None:
logger.warning("⚠️ VLM 判断当前不在微信聊天界面,或无法识别。")
await asyncio.sleep(CHECK_INTERVAL)
continue
# 2. VLM 分析
logger.info("🧠 正在调用 VLM 分析图片...")
result_data = await get_vlm_analysis(tmp_shot)
# 语音转文字处理
if dialogue_log == "VOICE_CONVERTING":
logger.info("🎙️ 检测到语音消息,已触发转文字,等待处理完成 (5秒)...")
await asyncio.sleep(5)
continue
if not dialogue_log:
logger.info("⏳ 界面分析完成,未发现有效对话内容,继续监控...")
if not result_data:
logger.warning("⚠️ VLM 分析返回为空,跳过本次循环。")
await asyncio.sleep(CHECK_INTERVAL)
continue
# 3. 解析数据构建 dialogue_log
messages = result_data.get("messages", [])
input_center = result_data.get("input_box")
dialogue_log = []
voice_messages = [] # 存储所有语音消息
for i, msg in enumerate(messages):
# 简单的发送者判断
sender_val = msg.get("sender", "对方")
if sender_val in ["Me", ""]:
sender_name = ""
else:
sender_name = "对方"
msg_type = msg.get("type", "text")
content = msg.get("content", "")
# status = msg.get("status", "unconverted") # 不再依赖 status
is_unread = msg.get("is_unread", False)
if msg_type == "voice":
coords = msg.get("center") or msg.get("coordinates")
if coords:
msg["coordinates"] = coords
voice_messages.append(msg)
# 在日志中暂时标记为 [语音],稍后如果处理了会更新
# 但为了日志完整性,我们这里先占位
# 实际上,我们需要知道这个语音的内容才能放入 context
# 如果没有内容,只能放 [语音]
# 只有被处理过的语音,我们才能获取内容。
# 对于历史语音,如果我们不处理(非首次运行且无红点),我们无法知道内容。
# 所以这里只能 append 占位符。
dialogue_log.append(f"{sender_name}: [语音] {content}")
else:
dialogue_log.append(f"{sender_name}: {content}")
logger.info(f"📑 界面扫描完成,当前对话历史共 {len(dialogue_log)}")
# 4. 语音处理逻辑
processed_voice_content = None
if self.is_first_run:
logger.info("🌟 首次运行:处理屏幕上所有语音消息...")
for v_msg in voice_messages:
# 无论是否未读,都处理
text = await self.process_single_voice(v_msg)
if text:
# 更新日志中的内容 (这比较复杂,因为 log 是 append 的)
# 简单起见,我们只记录最后一条处理的内容用于回复判断
# 但为了上下文准确,应该更新 dialogue_log
# 这里简化处理:如果是最后一条,我们记录下来
if v_msg == voice_messages[-1]:
processed_voice_content = text
self.is_first_run = False # 标记首次运行结束
else:
# 后续监控:只处理最后一条,且必须是未读 (is_unread=True)
if voice_messages:
last_voice = voice_messages[-1]
if last_voice.get("is_unread", False):
logger.info("🔴 发现未读语音消息 (最后一条),正在处理...")
processed_voice_content = await self.process_single_voice(last_voice)
else:
logger.info("⚪ 最后一条语音消息已读,跳过处理。")
# 5. LLM 回复逻辑
# 只有当有新的语音被处理并识别出文字,或者有新的文本消息时才回复
# 这里简化:如果 processed_voice_content 存在,说明我们刚刚处理了一个语音,需要回复
# 或者,我们可以检查是否是最后一条消息是对方发的
# 重新构建 history_text如果有处理出的语音文本替换掉最后的 [语音]
if processed_voice_content:
# 找到最后一条包含 [语音] 的日志并替换
for i in range(len(dialogue_log) - 1, -1, -1):
if "[语音]" in dialogue_log[i]:
dialogue_log[i] = dialogue_log[i].replace("[语音]", f"[语音转文字: {processed_voice_content}]")
break
history_text = "\n".join(dialogue_log)
# 判断是否需要回复:
# 核心规则:只有当最后一条消息是“对方”说的,才回复。如果是“我”说的,则不回复。
should_reply = False
if dialogue_log:
last_log = dialogue_log[-1]
# 检查最后一条消息的发送者
if last_log.startswith("对方"):
logger.info(f"💡 最后一条消息是对方发送,准备回复。内容: {last_log}")
should_reply = True
else:
logger.info(f"⚪ 最后一条消息是我发送的,无需回复。内容: {last_log}")
should_reply = False
if should_reply:
logger.info("🤖 准备调用 LLM 生成回复...")
reply = await self.get_reply(history_text)
logger.info(f"💡 LLM 回复: {reply}")
if reply and input_center:
# 输入并发送
await perform_input_action(self.d, input_center, reply)
# 记录回复时间
self.last_interaction_time = time.time()
# 休眠
await asyncio.sleep(CHECK_INTERVAL)
except Exception as e:
logger.error(f"❌ 主循环发生错误: {e}", exc_info=True)
await asyncio.sleep(CHECK_INTERVAL)
# 5. 常规回复逻辑
if not dialogue_log:
logger.info("⏳ 未发现有效对话内容,继续监控...")
await asyncio.sleep(CHECK_INTERVAL)
continue
# 2. 检查是否有新消息
current_last_msg = dialogue_log[-1]
logger.info(f"💬 当前最后一条消息: {current_last_msg}")
@@ -137,10 +333,10 @@ class ChatBot:
# 判断逻辑:如果最后一条消息是“对方”发的,且与上次不同,则回复
if "对方:" in current_last_msg and current_last_msg != self.last_message_text:
# 关键检查:如果包含 "(待转换)",说明语音还没转文字,绝对不能回复
# 关键检查:如果包含 "(待转换)",说明语音还没转文字 (或者OCR失败),跳过
if "(待转换)" in current_last_msg:
logger.info(f"🚫 检测到未转换的语音消息,跳过回复生成,等待转文字... ({current_last_msg})")
await asyncio.sleep(2) # 稍作等待
await asyncio.sleep(2)
continue
logger.info(f"📩 检测到新消息: {current_last_msg}")
@@ -156,15 +352,13 @@ class ChatBot:
center_point, _ = find_input_box_center(tmp_shot)
logger.info(f"📍 使用 CV 识别的输入框坐标: {center_point}")
# 即使 CV 没找到坐标,也尝试执行,因为 perform_input_action 内部有原生控件识别
perform_input_action(self.d, center_point, reply, auto_send=True)
self.last_message_text = f"我: {reply}" # 更新状态,避免重复回复自己
self.last_message_text = f"我: {reply}" # 更新状态
self.last_interaction_time = time.time()
self.proactive_count = 0 # 重置主动询问计数
self.proactive_count = 0
# 3. 检查是否需要主动询问 (用户长时间不响应)
# 3. 检查是否需要主动询问
elif "我:" in current_last_msg:
# 如果最后一条是我发的,检查距离现在的时间
elapsed = time.time() - self.last_interaction_time
if elapsed > SILENCE_THRESHOLD and self.proactive_count < MAX_PROACTIVE_PROMPTS:
logger.info(f"⏳ 用户长时间未响应 ({int(elapsed)}s),准备主动询问...")
@@ -177,13 +371,12 @@ class ChatBot:
else:
center_point, _ = find_input_box_center(tmp_shot)
# 同上,解耦 CV 坐标
perform_input_action(self.d, center_point, proactive_reply, auto_send=True)
self.proactive_count += 1
self.last_interaction_time = time.time() # 更新时间,避免连续询问
self.last_interaction_time = time.time()
self.last_message_text = f"我: {proactive_reply}"
# 更新最后一条消息记录(仅用于对比)
# 更新最后一条消息记录
if "对方:" in current_last_msg:
self.last_message_text = current_last_msg
@@ -193,5 +386,6 @@ class ChatBot:
await asyncio.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
Win32Patch.patch()
bot = ChatBot()
asyncio.run(bot.run())

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.1 KiB

BIN
WeiXin/Templates/send.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.9 KiB

View File

@@ -26,6 +26,50 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(level
logger = logging.getLogger("WxUtil")
async def get_vlm_json(image_path, prompt):
"""
通用 VLM 分析函数,返回 JSON 数据 (自动处理归一化坐标的反归一化)
"""
try:
# 调用 VLM
response = await vlm_kit.analyze_image(image_path, prompt)
json_str = vlm_kit.extract_json(response)
result_data = json.loads(json_str)
# 获取图片尺寸进行坐标反归一化
try:
from PIL import Image
with Image.open(image_path) as img:
width, height = img.size
# 定义反归一化函数
def denormalize(point):
if not point or len(point) != 2:
return point
return [int(point[0] / 1000 * width), int(point[1] / 1000 * height)]
# 递归遍历字典进行反归一化 (仅针对常见坐标字段 center, input_box)
def recursive_denormalize(data):
if isinstance(data, dict):
for key, value in data.items():
if key in ["center", "input_box", "coordinates"] and isinstance(value, list) and len(value) == 2:
data[key] = denormalize(value)
elif isinstance(value, (dict, list)):
recursive_denormalize(value)
elif isinstance(data, list):
for item in data:
recursive_denormalize(item)
recursive_denormalize(result_data)
except Exception as e:
logger.warning(f"坐标反归一化失败: {e},将使用原始坐标")
return result_data
except Exception as e:
logger.error(f"VLM Analysis Failed: {e}", exc_info=True)
return None
async def get_vlm_analysis(image_path):
"""
仅调用 VLM 分析图片,返回原始 JSON 数据 (dict)
@@ -37,10 +81,14 @@ async def get_vlm_analysis(image_path):
请分析这张微信聊天截图。
【核心任务】
识别图中的【语音消息气泡】和【文本消息气泡】。
识别图中的【语音消息气泡】和【文本消息气泡】,并区分【发送者】
【重要判别规则】
1. 🔊 **语音消息 (Voice)**
1. 👤 **发送者 (Sender)**
- **对方 (Other)**:气泡在屏幕**左侧**,通常为白色或灰色,头像在左边。
- **我 (Me)**:气泡在屏幕**右侧**,通常为绿色,头像在右边。
2. 🔊 **语音消息 (Voice)**
- **视觉特征**
- **高度**:固定(单行)。
- **宽度**随时长1"~60")变化。
@@ -49,9 +97,12 @@ async def get_vlm_analysis(image_path):
- **内容**:气泡内**只有一个**表示时长的数字(如 `8"`)和一个声波图标。
- **绝对排除**:凡是包含汉字、长句子的气泡,**统统不是**语音消息。
2. 📝 **文本消息 (Text)**
3. 📝 **文本消息 (Text)**
- **视觉特征**:气泡内包含汉字、标点符号、表情等文本内容。
4. 🔴 **未读状态 (Unread)**
- **特征**:语音气泡右上角或附近有一个明显的**小红点**。
【坐标系统】
**必须使用 [0-1000] 的归一化坐标系。**
- 左上角为 [0, 0],右下角为 [1000, 1000]。
@@ -65,12 +116,15 @@ async def get_vlm_analysis(image_path):
"messages": [
{
"type": "voice",
"sender": "对方" | "",
"status": "converted" | "unconverted",
"is_unread": true | false,
"center": [x, y],
"content": "8\""
},
{
"type": "text",
"sender": "对方" | "",
"center": [x, y],
"content": "这里是文本内容"
}
@@ -79,7 +133,8 @@ async def get_vlm_analysis(image_path):
注意:
1. 坐标 `center` 和 `input_box` 必须是 [0-1000] 的归一化坐标。
2. `status` 判断:如果语音气泡的正下方紧挨着一条文本消息(通常是转换出的文字),则为 `converted`,否则为 `unconverted`。
3. 请按从上到下的顺序输出所有消息。
3. `is_unread` 判断:如果有红点则为 true否则为 false (仅针对语音消息)
4. 请按从上到下的顺序输出所有消息。
"""
try:
@@ -307,6 +362,36 @@ def find_input_box_center(image_path):
logger.error(f"find_input_box_center error: {e}")
return (540, 2100), None
def find_template_match(screen_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找按钮中心坐标
"""
try:
if not os.path.exists(template_path):
logger.error(f"Template file not found: {template_path}")
return None
img = cv2.imread(screen_path)
template = cv2.imread(template_path)
if img is None or template is None:
return None
h, w = template.shape[:2]
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
if max_val >= threshold:
center_x = max_loc[0] + w // 2
center_y = max_loc[1] + h // 2
logger.info(f"Template matched! Score: {max_val:.2f}, Center: ({center_x}, {center_y})")
return (center_x, center_y)
logger.info(f"Template not matched. Max score: {max_val:.2f}")
return None
except Exception as e:
logger.error(f"Template matching failed: {e}")
return None
def perform_input_action(d, center_point, text, auto_send=True):
"""
执行输入操作
@@ -347,14 +432,74 @@ def perform_input_action(d, center_point, text, auto_send=True):
# 3. 发送
if auto_send:
if d(text="发送").exists:
d(text="发送").click()
logger.info("Clicked '发送'")
# 优先使用模板匹配寻找“发送”按钮
logger.info("尝试使用模板匹配寻找'发送'按钮...")
tmp_screen = os.path.join(os.path.dirname(os.path.abspath(__file__)), "temp_send_check.jpg")
d.screenshot(tmp_screen)
# 使用绝对路径
template_path = r"d:\dsWork\aiData\WeiXin\Templates\send.jpg"
send_btn_pos = find_template_match(tmp_screen, template_path, threshold=0.7) # 稍微降低阈值以提高召回
if send_btn_pos:
logger.info(f"通过模板匹配找到发送按钮: {send_btn_pos}, 点击...")
d.click(send_btn_pos[0], send_btn_pos[1])
else:
d.press("enter")
logger.info("Pressed Enter")
logger.warning("模板匹配未找到发送按钮,尝试原生控件查找...")
if d(text="发送").exists:
d(text="发送").click()
logger.info("Clicked '发送'")
else:
d.press("enter")
logger.info("Pressed Enter")
# 清理临时文件
if os.path.exists(tmp_screen):
try:
os.remove(tmp_screen)
except:
pass
except Exception as e:
logger.error(f"perform_input_action error: {e}")
def match_template_center(image_path, template_path, threshold=0.8):
"""
使用 OpenCV 模板匹配寻找目标图片中心坐标
"""
try:
if not os.path.exists(image_path) or not os.path.exists(template_path):
logger.error(f"Image or template not found: {image_path}, {template_path}")
return None
img = cv2.imread(image_path)
template = cv2.imread(template_path)
if img is None or template is None:
logger.error("Failed to read image or template")
return None
# 转换为灰度图进行匹配
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
# 模板匹配
result = cv2.matchTemplate(img_gray, template_gray, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val >= threshold:
h, w = template_gray.shape
top_left = max_loc
center_x = int(top_left[0] + w / 2)
center_y = int(top_left[1] + h / 2)
logger.info(f"Template matched with confidence {max_val:.2f} at ({center_x}, {center_y})")
return (center_x, center_y)
else:
logger.warning(f"Template match failed. Max confidence: {max_val:.2f} < Threshold: {threshold}")
return None
except Exception as e:
logger.error(f"match_template_center error: {e}")
return None

Binary file not shown.