This commit is contained in:
HuangHai
2026-01-28 07:31:28 +08:00
parent efcef1bade
commit d2fc0cd7c7
13 changed files with 336 additions and 271 deletions

View File

@@ -2,7 +2,7 @@
# 采集配置
SCROLL_DISTANCE_RATIO = 0.3
MAX_STATIONS_COUNT = 20
MAX_STATIONS_COUNT = 50
FIRST_RUN_ONLY_ONE_STATION = False
REDIS_STATION_EXPIRE = 120

View File

@@ -34,7 +34,6 @@ NON_STATION_KEYWORDS = [
"充电券",
"电信积分兑换",
"确认",
"广告",
"距离/区域",
"综合排序",
"偏好",
@@ -47,6 +46,7 @@ NON_STATION_KEYWORDS = [
"输入",
"商城",
"推荐",
"新客专享",
]
def _load_image(path):
@@ -140,11 +140,17 @@ async def run_ocr_rect(image_path, log_path=None):
status = "drop"
reasons.append("bottom_safe_zone")
if status == "keep" and txt:
for kw in NON_STATION_KEYWORDS:
if kw and kw in txt:
status = "drop"
reasons.append("non_station_keyword")
break
# Prevent filter bar items from being treated as stations
if txt.strip() in ["充电站", "快充", "慢充", "超快充", "广告"]:
status = "drop"
reasons.append("exact_filter_keyword")
if status == "keep":
for kw in NON_STATION_KEYWORDS:
if kw and kw in txt:
status = "drop"
reasons.append("non_station_keyword")
break
log_detail(
f"OCR[{idx + 1}] text={repr(text)} prob={prob:.3f} "

View File

@@ -1,7 +1,7 @@
# 采集配置
SCROLL_DISTANCE_RATIO = 0.5
MAX_STATIONS_COUNT = 20
MAX_STATIONS_COUNT = 50
FIRST_RUN_ONLY_ONE_STATION = True
REDIS_STATION_EXPIRE = 120
DATA_RETENTION_DAYS = 365

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

View File

@@ -63,7 +63,8 @@ class ChatMonitorBot:
self.input_pos = None
self.last_screen_hash = None
self.last_processed_msg_hash = None
self.first_run = True # 标记是否为首次运行
# [User Requested] 移除持久化存储,只在内存中记录,重启即忘
self.processed_hashes = set()
self.check_interval = 3 # 检查频率 (秒)
self.persona = (
@@ -76,13 +77,23 @@ class ChatMonitorBot:
"3. 仅针对家长明确表达的内容进行回复。\n"
"4. 严禁使用列表格式。严禁使用‘首先、其次’等逻辑词。\n"
"5. 回复必须简练,字数严格控制在 50 字以内!\n"
"6. 对方问什么就答什么。例如问‘学校叫什么’,就只回答‘少惠林’,不要回复地址和电话!\n"
"如果涉及到校区信息,必须且只能使用以下真实数据:\n"
"- 单位:长春市少惠林作文素养培养中心\n"
"- 单位/学校名称:长春市少惠林作文素养培养中心(简称:少惠林)\n"
"- 地址南环城路与临河街交汇TOUCH12街3楼325号\n"
"- 联系人小张老师电话18686619970\n"
"- 每学期开学招收小学三年级至六年级,初中七年级的学生入学,其它年段不招生。\n"
)
def _record_processed_hash(self, msg_hash):
"""记录已处理的消息哈希 (仅内存)"""
self.processed_hashes.add(msg_hash)
# 仅保留最近 100 条记录,防止无限增长
if len(self.processed_hashes) > 100:
# 简单丢弃旧的(转列表切片再转回集合)
temp = list(self.processed_hashes)[-100:]
self.processed_hashes = set(temp)
async def get_reply(self, last_message_text, context_text=""):
prompt = (
f"【教师人设】:{self.persona}\n\n"
@@ -141,9 +152,30 @@ class ChatMonitorBot:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def get_stable_message_hash(self, msg):
"""
计算消息的稳定哈希值(忽略坐标等易变字段)
仅包含: sender, content, time_display, type
"""
if not msg:
return ""
stable_data = {
"sender": msg.get("sender", ""),
"content": msg.get("content") or "", # 确保 None 转为空字符串
"time_display": msg.get("time_display", ""),
"type": msg.get("type", "")
}
# 序列化并计算哈希
msg_str = json.dumps(stable_data, sort_keys=True, ensure_ascii=False)
return hashlib.md5(msg_str.encode('utf-8')).hexdigest()
async def run(self):
"""主运行循环"""
logger.info("🚀 大张老师自动巡课系统启动 (T2 增强版)...")
"""
主运行循环
"""
logger.info("🚀 正在启动 T2_ChatMonitor (Auto-Reply)...")
# 定义 JSON 序列化辅助函数
def numpy_serializer(obj):
@@ -159,118 +191,11 @@ class ChatMonitorBot:
if not self.step_1_prepare_env(): return
if not self.step_2_connect_device(): return
# 2. 首次运行:识别所有语音并获取上下文
logger.info("🔍 [首次运行] 正在进行全量识别,获取对话上下文...")
# [User Requested] 移除首屏概念,直接进入监控循环
# 以前说过什么都不管了,只关注最后一条
logger.info("🚀 启动完成,直接进入实时监控阶段...")
# 调用封装好的 get_first_screen
self.dialogue_log, self.input_pos, enter_path, flag_path = await WxUtil.get_first_screen(self.device)
# 更新 live paths (用于后续监控逻辑的引用)
import shutil
if enter_path and os.path.exists(enter_path):
shutil.copy(enter_path, self.screenshot_path)
if flag_path and os.path.exists(flag_path):
shutil.copy(flag_path, self.debug_view_path)
logger.info(f"📸 已保存识别标记图: {flag_path}")
if self.dialogue_log:
logger.info(f"✅ 首次运行识别完成,获取到 {len(self.dialogue_log)} 条消息上下文")
logger.info("\n" + "="*50)
logger.info("【测试模式】最终提取的对话记录:")
for msg in self.dialogue_log:
sender = msg.get('sender', '未知')
content = msg.get('content', '')
time_str = msg.get('time_display', '')
# 按照用户要求的格式输出: 2026-01-26 10:03 糖豆爸爸 老师您好!
log_prefix = f"{time_str} " if time_str else ""
log_line = f"{log_prefix}{sender} {content}"
logger.info(log_line)
logger.info("="*50 + "\n")
# --- LLM 总结 ---
logger.info("🤖 正在请求 LLM 生成对话摘要...")
chat_history_text = ""
for msg in self.dialogue_log:
sender = msg.get('sender', '未知')
content = msg.get('content', '')
type_str = "[语音]" if msg.get('type') == 'voice' else "[文字]"
time_str = msg.get('time_display', '')
time_prefix = f"[{time_str}] " if time_str else ""
chat_history_text += f"{time_prefix}{sender}{type_str}: {content}\n"
prompt = (
"请根据以下微信对话记录,总结归纳双方交流的主要信息点。\n"
"要求:\n"
"1. 简明扼要,分点列出。\n"
"2. 明确指出双方达成的一致或待解决的问题。\n"
"3. 忽略无关的寒暄。\n\n"
f"对话记录:\n{chat_history_text}"
)
try:
full_response = ""
async for chunk in get_llm_response(prompt, stream=True):
full_response += chunk
logger.info("\n" + "="*20 + " 对话摘要 (LLM) " + "="*20)
logger.info(full_response)
logger.info("="*55 + "\n")
except Exception as e:
logger.error(f"LLM 摘要生成失败: {e}")
# 初始化最后处理的消息哈希,避免重复回复第一条
last_msg = self.dialogue_log[-1]
# --- 初始回复逻辑 (Added) ---
# 如果最后一条是对方发的消息,说明可能需要回复
sender = last_msg.get('sender', '')
# 判断逻辑:只要不是"我",就认为是对方 (可能是 "对方", "糖豆爸爸" 等)
if sender != "":
logger.info(f"💡 [首屏] 最后一条消息来自 '{sender}',尝试生成回复...")
# 构建上下文
context_text = "\n".join([f"{m.get('time_display', '') + ' ' if m.get('time_display') else ''}{m.get('sender')}: {m.get('content')}" for m in self.dialogue_log[:-1]])
last_content = last_msg.get('content', '')
reply = await self.get_reply(last_content, context_text)
if reply:
logger.info(f"🤖 [首屏] LLM 建议回复: {reply}")
# 检查输入框位置
if self.input_pos:
logger.info(f"⚡ [首屏] 执行自动回复...")
perform_input_action(self.device, self.input_pos, reply)
# 发送后更新 hash避免进入循环后重复回复
# 发送后,界面会变,但我们需要标记当前这条已经回过了
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
self.last_processed_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
logger.info("✅ [首屏] 回复已发送")
else:
logger.warning("❌ [首屏] 未找到输入框位置,无法发送")
else:
logger.info("⚪ [首屏] LLM 认为无需回复")
else:
logger.info("⚪ [首屏] 最后一条是自己发的,无需回复")
# 更新 Hash (如果刚才没发回复,也需要记录当前最后一条,防止循环里重复处理)
if not self.last_processed_msg_hash:
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
self.last_processed_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
self.last_screen_hash = self.get_image_hash(self.screenshot_path)
else:
logger.warning("⚠️ 首次运行未识别到有效对话")
# logger.info("🛑 测试结束:已完成所有语音的转换与读取。停止进入监控循环。")
# return # 测试模式:直接退出,不进入监控循环
# 3. 进入循环阶段
logger.info("🔄 进入实时监控阶段...")
while True:
try:
# A. 截图并计算哈希
@@ -306,19 +231,22 @@ class ChatMonitorBot:
# D. 只关注最后一条消息
last_msg = dialogue_log[-1]
# last_msg 是字典,需要序列化
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
current_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
# 计算稳定哈希(忽略坐标变化)
current_msg_hash = self.get_stable_message_hash(last_msg)
# E. 判断是否需要回复 (对方发送且非重复消息)
sender = last_msg.get('sender', '')
if sender != "":
if self.first_run:
# 首次运行时,记录最后一条消息的哈希但不回复,防止重启后重复回复历史消息
logger.info(f"🚦 [启动] 首次扫描,忽略已存在的最后一条消息: {last_msg}")
self.last_processed_msg_hash = current_msg_hash
self.first_run = False
elif current_msg_hash != self.last_processed_msg_hash:
# Check if hash is already processed (in-memory only)
is_processed = current_msg_hash in self.processed_hashes
# Log only if it changed from last *in-memory* check to avoid spam
if is_processed and current_msg_hash != self.last_processed_msg_hash:
# logger.info(f"🚫 [监控] 消息哈希已存在于历史记录中,跳过回复 (Hash: {current_msg_hash})")
self.last_processed_msg_hash = current_msg_hash
if not is_processed and current_msg_hash != self.last_processed_msg_hash:
if sender != "":
event_shot = WxUtil.get_next_debug_path("event_new_msg")
self.device.screenshot(event_shot)
logger.info(f"💡 [监控] 发现新消息: {last_msg},保存现场截图: {event_shot}")
@@ -345,10 +273,14 @@ class ChatMonitorBot:
last_content = last_msg.get('content') or ""
logger.info(f"🔄 [重试] 强制转换后内容: {last_content}")
# 重新构建 msg_str 和 hash确保下次循环不会因为内容变化而再次触发虽然这里已经处理了
# 但实际上这里是在处理当前事件,更新 hash 是为了避免重复处理
msg_str = json.dumps(last_msg, sort_keys=True, ensure_ascii=False, default=numpy_serializer)
current_msg_hash = hashlib.md5(msg_str.encode('utf-8')).hexdigest()
# 重新构建哈希
current_msg_hash = self.get_stable_message_hash(last_msg)
# 再次检查是否已处理 (因为内容变了,哈希变了)
if current_msg_hash in self.processed_hashes:
logger.info(f"🚫 [重试] 转换后发现该消息已处理,跳过。")
self.last_processed_msg_hash = current_msg_hash
# 跳过本次循环的剩余部分
continue
# 生成回复
reply = await self.get_reply(last_content, context_text)
@@ -357,35 +289,35 @@ class ChatMonitorBot:
logger.info(f"🤖 [监控] LLM 建议回复: {reply}")
if self.input_pos:
logger.info(f"⚡ [监控] 执行自动回复...")
perform_input_action(self.device, self.input_pos, reply)
# input_pos 是 ((x,y), box) 格式,取第一个元素坐标点
target_pos = self.input_pos[0] if isinstance(self.input_pos, (list, tuple)) and len(self.input_pos) == 2 and isinstance(self.input_pos[0], (list, tuple)) else self.input_pos
# 简单兼容处理:如果 input_pos[0] 是 tuple/list 且 input_pos[1] 是 None/box则取 input_pos[0]
if isinstance(self.input_pos, (list, tuple)) and len(self.input_pos) == 2 and isinstance(self.input_pos[0], (list, tuple)):
target_pos = self.input_pos[0]
perform_input_action(self.device, target_pos, reply)
# 发送后截图留存
reply_sent_shot = WxUtil.get_next_debug_path("event_reply_sent")
self.device.screenshot(reply_sent_shot)
logger.info(f"✅ [监控] 回复已发送,保存发送后截图: {reply_sent_shot}")
self._record_processed_hash(current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
else:
logger.warning(" [监控] 未找到输入框位置,无法发送")
logger.error("❌ 未找到输入框位置,无法发送回复")
else:
logger.warning("⚠️ [监控] LLM 未生成有效回复")
logger.info(" [监控] LLM 认为无需回复")
self._record_processed_hash(current_msg_hash)
self.last_processed_msg_hash = current_msg_hash
else:
# 消息已处理
pass
else:
# 最后一条是我发送的
if self.first_run:
logger.info(f"🚦 [启动] 首次扫描,最后一条是自己发的,标记为已处理: {last_msg}")
# 是我发的消息,更新哈希,不再处理
self.last_processed_msg_hash = current_msg_hash
self.first_run = False
elif current_msg_hash != self.last_processed_msg_hash:
logger.info(f"⚪ [监控] 最后一条消息是自己发的,跳过回复: {last_msg}")
self.last_processed_msg_hash = current_msg_hash
await asyncio.sleep(self.check_interval)
except Exception as e:
logger.error(f"❌ 循环中发生错误: {e}", exc_info=True)
logger.error(f"Error in monitoring loop: {e}", exc_info=True)
await asyncio.sleep(self.check_interval)
async def run_main():
@@ -398,11 +330,4 @@ async def run_main():
if __name__ == "__main__":
# 应用 Win32 补丁
Win32Patch.patch()
try:
# 运行机器人
asyncio.run(run_main())
except KeyboardInterrupt:
logger.info("🛑 用户手动停止程序。")
except Exception as e:
logger.error(f"❌ 程序异常退出: {e}", exc_info=True)
asyncio.run(run_main())

View File

@@ -253,6 +253,61 @@ def draw_debug_info(image_path, messages, current_voice_center=None, suffix=""):
except Exception as e:
logger.warning(f"绘制调试信息失败: {e}")
def _detect_bubble_color(img, bbox):
"""
检测文本框区域的背景颜色,用于辅助判断发送者。
:param img: OpenCV 图像 (BGR)
:param bbox: OCR 返回的边界框 4个点
:return: "green" (我), "white" (对方), or "unknown"
"""
if img is None: return "unknown"
# 提取 bbox 区域
h, w = img.shape[:2]
min_x = max(0, int(min(p[0] for p in bbox)))
max_x = min(w, int(max(p[0] for p in bbox)))
min_y = max(0, int(min(p[1] for p in bbox)))
max_y = min(h, int(max(p[1] for p in bbox)))
if max_x <= min_x or max_y <= min_y:
return "unknown"
roi = img[min_y:max_y, min_x:max_x]
# 计算背景颜色 (抗文字干扰)
# 文本是黑色的 (0,0,0),会拉低平均值/中位数
# 使用 95% 分位数来获取背景色 (偏亮的部分 - 真正的背景)
try:
# axis=(0,1) 对 h,w 维度操作,保留 c 维度
# percentile 返回 float需转 int
bg_color = np.percentile(roi, 95, axis=(0, 1))
b, g, r = bg_color
except Exception:
# Fallback
mean_color = cv2.mean(roi)[:3]
b, g, r = mean_color
# 调试日志:打印颜色值
logger.info(f"Color Debug: B={b:.1f}, G={g:.1f}, R={r:.1f} | bbox={bbox}")
# 绿色气泡特征 (Light Mode):
# R: 152, G: 225, B: 101 (BGR: 101, 225, 152)
# G 显著大于 R 和 B
# 提高阈值以区分白色/灰色背景的噪声 (White: 255, 255, 255)
if g > r + 30 and g > b + 30 and g > 100:
return "green"
# 白色气泡特征:
# R, G, B 都很高且接近
# 考虑黑色文字的影响,如果是中位数,应该很高 (>200)
# 放宽对灰色的容忍度 (Dark Mode 可能偏灰)
if abs(r - g) < 30 and abs(g - b) < 30 and abs(r - b) < 30:
# 且亮度不能太低 (太低可能是黑色背景或深色物体)
if g > 150:
return "white"
return "unknown"
def _scan_chat_messages(image_path):
"""
内部函数:扫描图片中的微信消息(语音、文本、红点)
@@ -263,6 +318,7 @@ def _scan_chat_messages(image_path):
logger.error(f"无法读取图片: {image_path}")
return [], None
h, w = img.shape[:2]
logger.info(f"DEBUG: Image size w={w}, h={h}")
# 3. 模板匹配寻找语音图标和红点
audio_template = os.path.join(TEMPLATE_DIR, "audio.jpg")
@@ -383,6 +439,29 @@ def _scan_chat_messages(image_path):
if has_intermediate_audio:
continue
# [Fix] 检查中间是否有其他气泡消息阻断 (防止跨消息合并)
# 如果遇到一个明确属于另一方的消息气泡,必须停止关联
if c_y > ay + 60: # 稍微放宽 Y 轴,避免误判紧贴的转换文本
bubble_color = _detect_bubble_color(img, bbox)
if voice_is_left: # 语音在左 (对方)
# 如果遇到绿色气泡 (我),或者是明显的右对齐文本,视为阻断
if bubble_color == "green":
logger.info(f"语音({ax},{ay}) 被中间''的消息(绿色气泡)阻断: '{text[:10]}...'")
break
if c_x > w * 0.65: # 右侧明显区域 (short message check)
logger.info(f"语音({ax},{ay}) 被中间''的消息(右对齐)阻断: '{text[:10]}...'")
break
else: # 语音在右 (我)
# 如果遇到白色气泡 (对方),或者是明显的左对齐文本,视为阻断
if bubble_color == "white":
logger.info(f"语音({ax},{ay}) 被中间'对方'的消息(白色气泡)阻断: '{text[:10]}...'")
break
if c_x < w * 0.35: # 左侧明显区域
logger.info(f"语音({ax},{ay}) 被中间'对方'的消息(左对齐)阻断: '{text[:10]}...'")
break
clean_text = text.strip()
# 判定是否为时间戳
is_timestamp = re.search(r'(\d{1,2}:\d{2})', clean_text) and (len(clean_text) < 15)
@@ -456,23 +535,68 @@ def _scan_chat_messages(image_path):
if c_y < 150 or c_y > h - 100:
continue
# 判定发送者 (增强版几何判定,防止 720p 屏幕下的中心点误判)
# 默认使用中心点判定
sender = "对方" if c_x < w / 2 else ""
# 判定发送者 (增强版: 几何 + 颜色)
# 1. 尝试通过背景颜色判定 (最准确)
sender_color = _detect_bubble_color(img, bbox)
# 使用边界特征进行修正
sender = "unknown"
if sender_color == "green":
sender = ""
elif sender_color == "white":
sender = "对方"
# 2. 几何特征强制修正 (Double Check)
# 假设头像+边距约占 15% 宽度
edge_margin = w * 0.15
min_x = min(p[0] for p in bbox)
max_x = max(p[0] for p in bbox)
# 修正阈值:假设头像+边距约占 15% 宽度
edge_margin = w * 0.15
if max_x > w - edge_margin:
# 文本框延伸到了最右侧 -> 肯定是"我" (因为对方的头像在左,文本不会靠右)
# 规则 A: 如果这一行极其靠右 (超过 85% 宽度),那肯定是"我"
# 即使颜色判成了白色 (比如光照问题),也得纠正回来
if max_x > w - edge_margin:
if sender == "对方":
logger.warning(f"Sender detected as '对方' by color but geometry says '' (max_x={max_x} > {w-edge_margin}). Correcting to ''.")
sender = ""
elif min_x < edge_margin:
# 文本框延伸到了最左侧 -> 肯定是"对方" (因为我的头像在右,文本不会靠左)
sender = "对方"
# 规则 B: 如果这一行极其靠左 (小于 35% 宽度),且不靠右,那肯定是"对方"
# 扩大判定范围,防止因为 OCR 稍微缩进导致判定失效
# 注意:如果颜色明确为"我"(绿色),则跳过此规则,因为"我"的长消息也可能靠左
elif min_x < w * 0.35 and max_x < w * 0.85:
if sender == "":
logger.info(f"Geometry says '对方' (min_x={min_x} < {w*0.35}) but Color is '' (Green). Trusting Color.")
else:
sender = "对方"
# 规则 C: 如果颜色是 unknown且不在极端位置使用中心点兜底
if sender == "unknown":
c_x = int((min_x + max_x) / 2)
# 简单中心判断
if c_x < w / 2: sender = "对方"
else: sender = ""
# 规则 D: 强几何中心校验 (Final Geometry Verdict)
# 仅对短消息使用强几何校验 (宽度 < 70% 屏幕宽度)
# 长消息通常铺满屏幕,中心点在中间,容易受字体渲染影响导致误判,应信任颜色检测结果
box_width = max_x - min_x
if box_width < w * 0.7:
# 如果中心点明显在左半屏 ( < 45% ),判定为"对方"
if c_x < w * 0.45:
# [Fix] 如果颜色明确是绿色,说明是"我"的左对齐文本(长文换行),不应被几何规则强制改为"对方"
if sender == "" and sender_color == "green":
logger.info(f"Geometry says '对方' (center={c_x} < {w*0.45}) but Color is 'green'. Keeping ''.")
else:
if sender == "":
logger.warning(f"Sender detected as '' by color but center is left ({c_x} < {w*0.45}). Correcting to '对方'.")
sender = "对方"
# 如果中心点明显在右半屏 ( > 55% ),判定为"我"
elif c_x > w * 0.55:
if sender == "对方":
logger.warning(f"Sender detected as '对方' by color but center is right ({c_x} > {w*0.55}). Correcting to ''.")
sender = ""
else:
logger.info(f"Message in middle zone ({w*0.45} < {c_x} < {w*0.55}), trusting color detection: {sender}")
else:
logger.info(f"Wide message (width={box_width} > {w*0.7}), skipping geometry check, trusting color: {sender}")
time_pattern = r'(\d{4}年|\d{1,2}月|\d{1,2}日|\d{1,2}:\d{2}|昨天|今天|星期|上午|下午|晚上)'
# 优先判断是否为独立的时间戳 (行短且符合时间格式)
@@ -775,14 +899,15 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
dialogue_log = []
# 使用 debug_img 的尺寸,如果 debug_img 未定义(极端情况),默认 1080x1920
if 'debug_img' in locals() and debug_img is not None:
input_field_coordinates = (debug_img.shape[1] // 2, int(debug_img.shape[0] * 0.9))
# [User Requested] 几何兜底 Y 轴应为 0.88 (避开底部导航条)
input_field_coordinates = (debug_img.shape[1] // 2, int(debug_img.shape[0] * 0.88))
else:
# 尝试读取 current_image_path
try:
tmp_img = cv2.imread(current_image_path)
input_field_coordinates = (tmp_img.shape[1] // 2, int(tmp_img.shape[0] * 0.9))
input_field_coordinates = (tmp_img.shape[1] // 2, int(tmp_img.shape[0] * 0.88))
except:
input_field_coordinates = (540, 1728)
input_field_coordinates = (540, 1690) # 1920 * 0.88
# 找出最后一条消息
last_msg = None
@@ -827,7 +952,7 @@ async def analyze_chat_image(image_path, output_path, device=None, target_name="
except Exception as e:
logger.error(f"分析过程发生异常: {e}", exc_info=True)
return [], None
return [], (540, 1690)
def clean_screenshots_dir():
@@ -982,41 +1107,119 @@ def perform_input_action(d, center_point, text, auto_send=True):
try:
# --- 新增逻辑:确保处于文字输入模式 ---
logger.info("正在检查输入模式...")
tmp_check_shot = os.path.join(OUTPUT_DIR, "temp_input_check.jpg")
d.screenshot(tmp_check_shot)
wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg")
input_text_template = os.path.join(TEMPLATE_DIR, "input_text.jpg")
# 1. 检查是否存在 '切换到文字' 图标 (表示当前是语音模式)
# 注意:这里假设 wen_zi_input.jpg 是那个“键盘”图标
wen_zi_pos = find_template_match(tmp_check_shot, wen_zi_template, threshold=0.8)
if wen_zi_pos:
logger.info(f"检测到语音模式 (找到切换文字图标: {wen_zi_pos}),点击切换...")
d.click(wen_zi_pos[0], wen_zi_pos[1])
# 优先使用 uiautomator2 的属性检测(比图像识别更稳)
# 1. 检查是否有 "切换到键盘" 按钮(说明当前是语音模式)
voice_mode_btn = d(description="切换到键盘")
if voice_mode_btn.exists:
logger.info("检测到语音模式 (UI树: '切换到键盘'),点击切换...")
voice_mode_btn.click()
time.sleep(1.0) # 等待 UI 切换
else:
# 2. 如果没找到切换图标,假设是文字模式,尝试点击输入区域标识
logger.info("未检测到语音模式切换图标,尝试寻找文字输入区域...")
input_text_pos = find_template_match(tmp_check_shot, input_text_template, threshold=0.8)
if input_text_pos:
logger.info(f"找到文字输入区域标识 (input_text.jpg): {input_text_pos},点击激活...")
d.click(input_text_pos[0], input_text_pos[1])
time.sleep(0.5)
else:
logger.info("未找到特定的输入区域标识,将使用默认坐标或控件查找。")
# 清理临时文件
if os.path.exists(tmp_check_shot):
try:
os.remove(tmp_check_shot)
except:
pass
# 2. 检查是否有 "切换到语音" 按钮(说明当前是文字模式)
# 这一步不是必须的,但可以用来确认状态
# text_mode_btn = d(description="切换到语音")
# if text_mode_btn.exists:
# logger.info("当前已是文字模式 (UI树: '切换到语音')")
# 3. 如果 UI 树检测失败,尝试图像兜底
if not voice_mode_btn.exists:
tmp_check_shot = os.path.join(OUTPUT_DIR, "temp_input_check.jpg")
d.screenshot(tmp_check_shot)
wen_zi_template = os.path.join(TEMPLATE_DIR, "wen_zi_input.jpg")
# 检查是否存在 '切换到文字' 图标
wen_zi_pos = find_template_match(tmp_check_shot, wen_zi_template, threshold=0.8)
if wen_zi_pos:
logger.info(f"检测到语音模式 (图像: 找到切换文字图标: {wen_zi_pos}),点击切换...")
d.click(wen_zi_pos[0], wen_zi_pos[1])
time.sleep(1.0)
# 清理临时文件
if os.path.exists(tmp_check_shot):
try:
os.remove(tmp_check_shot)
except:
pass
# --- 新增逻辑结束 ---
# 1. 尝试找到原生输入框并输入
# 增加多种查找方式
edit_text = d(className="android.widget.EditText")
if not edit_text.exists:
# 尝试通过 resourceId 查找 (微信常见ID)
edit_text = d(resourceId="com.tencent.mm:id/b4a")
# 1.2 [User Request] 尝试使用 input_text.jpg 模板寻找输入框
if not edit_text.exists:
input_template_path = os.path.join(TEMPLATE_DIR, "input_text.jpg")
if os.path.exists(input_template_path):
# 截图用于匹配
tmp_input_search = os.path.join(OUTPUT_DIR, "temp_input_search.jpg")
d.screenshot(tmp_input_search)
logger.info(f"正在尝试使用模板 {input_template_path} 寻找输入框...")
# [User Request] 降低阈值到 0.6
input_pos = find_template_match(tmp_input_search, input_template_path, threshold=0.6)
if input_pos:
logger.info(f"✅ [Template] 通过 input_text.jpg 找到输入框: {input_pos}")
# 绘制调试图 (蓝框)
try:
debug_img = cv2.imread(tmp_input_search)
if debug_img is not None:
# 读取模板获取宽高
tmpl = cv2.imread(input_template_path)
if tmpl is not None:
th, tw = tmpl.shape[:2]
cx, cy = input_pos
top_left = (cx - tw//2, cy - th//2)
bottom_right = (cx + tw//2, cy + th//2)
# 蓝色框 BGR=(255, 0, 0)
cv2.rectangle(debug_img, top_left, bottom_right, (255, 0, 0), 3)
cv2.putText(debug_img, "MATCH: input_text.jpg", (top_left[0], top_left[1]-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
debug_save_path = os.path.join(OUTPUT_DIR, "debug_input_box_match.jpg")
cv2.imwrite(debug_save_path, debug_img)
logger.info(f"已保存输入框匹配调试图(蓝框): {debug_save_path}")
except Exception as e:
logger.warning(f"绘制输入框调试图失败: {e}")
# 更新点击坐标
center_point = input_pos
else:
logger.info(f"❌ [Template] input_text.jpg 未匹配到输入框")
# 1.5 如果找不到原生输入框,尝试通过“切换到语音”按钮定位 Y 轴
# 输入框通常与左侧的“切换到语音”按钮垂直居中对齐
if not edit_text.exists:
try:
# 确保在文字模式下,左侧会有“切换到语音”按钮
# 有时候可能是 "切换到键盘" (如果状态判断出错),都尝试一下作为锚点
anchor_btn = d(description="切换到语音")
if not anchor_btn.exists:
anchor_btn = d(description="切换到键盘")
if anchor_btn.exists:
# 获取按钮中心 Y 坐标
bounds = anchor_btn.info['bounds']
anchor_y = (bounds['top'] + bounds['bottom']) // 2
# 获取屏幕宽度
w, h = d.window_size()
# 更新中心点X居中Y与按钮对齐
center_point = (w // 2, anchor_y)
logger.info(f"通过'切换到语音'按钮修正输入框坐标: {center_point}")
except Exception as e:
logger.warning(f"尝试修正坐标失败: {e}")
input_success = False
if edit_text.exists:
@@ -1126,37 +1329,3 @@ def match_template_center(image_path, template_path, threshold=0.8):
async def get_first_screen(device=None):
"""
获取刚进入界面的首屏信息:
1. 截图
2. 全量识别 (策略=ALL),包含语音转文字 Peek-and-Restore
3. 返回识别结果和相关图片路径
Returns:
tuple: (dialogue_log, input_pos, enter_path, flag_path)
"""
logger.info("🔍 [get_first_screen] 正在进行首屏全量识别...")
if not device:
device = connect_device()
if not device:
logger.error("设备连接失败,无法获取首屏")
return [], None, None, None
# 1. 截图
enter_path = get_next_debug_path("enter")
device.screenshot(enter_path)
logger.info(f"📸 已保存进入截图: {enter_path}")
# 2. 识别
flag_path = get_next_debug_path("flag")
dialogue_log, input_pos = await analyze_chat_image(
enter_path,
flag_path,
device=device,
process_strategy="ALL"
)
return dialogue_log, input_pos, enter_path, flag_path

View File

@@ -1,35 +0,0 @@
import asyncio
import os
import sys
# Add current directory to path so we can import Util
sys.path.append(os.getcwd())
from Util.RedisKit import redisKit
async def clear_cache():
print("Connecting to Redis...")
# redisKit will automatically ensure pool on first operation
keys_to_delete = [
"sql_templates:templates_loaded",
"sql_templates:all_templates",
"sql_templates:loaded_files",
"sql_templates:template_map"
]
for key in keys_to_delete:
print(f"Deleting key: {key}")
# Assuming delete_data exists in redisKit based on common naming convention
# If it doesn't, we can use a raw delete command
try:
await redisKit.delete_data(key)
except AttributeError:
conn = await redisKit.get_connection()
await asyncio.to_thread(conn.delete, key)
print("SQL template cache cleared successfully.")
if __name__ == "__main__":
asyncio.run(clear_cache())

Binary file not shown.