This commit is contained in:
HuangHai
2026-01-12 22:04:41 +08:00
parent 231f52d3ee
commit de62305a28
11 changed files with 302 additions and 189 deletions

View File

@@ -69,6 +69,17 @@ async def get_station_list(d, service, max_scrolls=MAX_SCROLLS):
image_uuid = str(uuid.uuid4())
screenshot_path = take_screenshot(d, image_uuid, save_dir=TEMP_IMAGE_DIR)
# 1.5 检查是否存在广告 (VL)
logger.info("检查是否存在广告弹窗...")
ad_res = await ReadImageKit.detect_ad_popup(screenshot_path, device_info=device_info)
if ad_res:
logger.info(f"检测到广告弹窗,准备关闭: {ad_res}")
d.click(ad_res['x'], ad_res['y'])
await asyncio.sleep(1.5)
# 重新截图
if os.path.exists(screenshot_path): os.remove(screenshot_path)
screenshot_path = take_screenshot(d, image_uuid, save_dir=TEMP_IMAGE_DIR)
# 检查是否已经滚动到底部 (排除状态栏后,内容与上次一致)
current_md5 = Kit.get_image_content_md5(
screenshot_path,

View File

@@ -78,6 +78,76 @@ class ReadImageKit:
"严格返回纯JSON格式。"
)
@classmethod
async def detect_ad_popup(cls, image_path: str, device_info=None):
"""
使用 VL 模型检测是否存在弹窗广告,并返回关闭按钮坐标
"""
if not os.path.exists(image_path):
return None
with open(image_path, "rb") as image_file:
encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
img_input = {"url": f"data:image/jpeg;base64,{encoded_image}"}
# 构建提示词
prompt = (
"请仔细检查这张图片中是否存在**弹窗广告**或**悬浮广告**。\n"
"广告可能有以下几种形式:\n"
"1. **屏幕中央的大型弹窗广告**:通常遮挡了页面内容,内容多为优惠券、活动推广等。\n"
"2. **屏幕左侧或右侧的悬浮小广告**:例如带有'活动''红包''优惠'字样的小图标。\n"
"3. **底部的横幅广告**:带有明显的关闭按钮。\n\n"
"如果发现了上述任何一种广告,请务必找到该广告的**关闭按钮**(通常是一个'X'符号,或者'关闭'字样,或者位于广告下方/右上角的圆圈叉号)。\n"
"请返回该关闭按钮的**中心坐标**。\n\n"
"请以纯 JSON 格式输出:\n"
"{\n"
" \"has_ad\": true/false,\n"
" \"close_point\": [x, y] // 0-1000 归一化坐标\n"
"}\n"
"如果没有广告,请返回 `{\"has_ad\": false}`。\n"
"只输出 JSON不要包含任何其他文字。"
)
try:
resp = await cls._client.chat.completions.create(
model=VL_MODEL_NAME, # 使用默认 VL 模型即可,或者使用 AD 专用
timeout=30,
messages=[
{
"role": "user",
"content": [
{"type": "image_url", "image_url": img_input},
{"type": "text", "text": prompt},
],
}
],
)
content = resp.choices[0].message.content.strip()
raw_json = cls._extract_json(content)
result = json.loads(raw_json)
if result.get("has_ad") and result.get("close_point"):
# 转换坐标
norm_point = result["close_point"]
if len(norm_point) == 2:
if not device_info:
device_info = cls._FALLBACK_DEVICE_INFO
w = device_info.get("displayWidth", FALLBACK_WIDTH)
h = device_info.get("displayHeight", FALLBACK_HEIGHT)
# 假设模型返回的是 0-1000 的坐标
x = int(norm_point[0] / 1000 * w)
y = int(norm_point[1] / 1000 * h)
return {"x": x, "y": y}
return None
except Exception as e:
logger.error(f"Failed to detect ad popup: {e}")
return None
@staticmethod
def _extract_json(text: str) -> str:
if not text:
@@ -509,6 +579,10 @@ class ReadImageKit:
"""
使用 VL 模型定位详情页中的 '分时价格' 按钮
"""
if device_info is None:
logger.warning("未提供动态设备信息,使用通用回退配置。")
device_info = cls._FALLBACK_DEVICE_INFO
if not os.path.exists(image_path):
return {}
@@ -517,12 +591,14 @@ class ReadImageKit:
img_input = {"url": f"data:image/jpeg;base64,{encoded}"}
prompt = (
"识别图片中 '分时价格''查看分时电价' 按钮的中心坐标\n"
"该按钮通常位于充电站详情页,可能是一个带有价格图标或箭头的小型矩形区域。\n"
"请直接以 JSON 格式输出该按钮的中心点坐标:\n"
"{\"uia_center_x\": 500, \"uia_center_y\": 500}\n"
"注意:坐标系为 0-1000 归一化坐标,其中 (0,0) 为左上角,(1000,1000) 为右下角。\n"
"只输出 JSON不要有任何解释文字。"
"找到图片中 '分时价格''查看分时电价' 按钮区域(通常在详情页,可能是一个带有价格图标或箭头的小型矩形区域)\n"
"返回格式示例:\n"
"{\n"
' "bounds": {"x1": 100, "y1": 200, "x2": 300, "y2": 400}, \n'
' "bounds_norm": {"left": 0.1, "top": 0.2, "right": 0.3, "bottom": 0.4}\n'
"}\n"
"注意bounds应使用0-1000的归一化坐标空间。\n"
"如果未找到返回空JSON {}"
)
try:
@@ -533,26 +609,32 @@ class ReadImageKit:
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": img_input}
{"type": "image_url", "image_url": img_input},
{"type": "text", "text": json.dumps(device_info, ensure_ascii=False)},
{"type": "text", "text": prompt}
]
}
],
max_tokens=100,
max_tokens=200,
temperature=0.01
)
content = resp.choices[0].message.content.strip()
raw = cls._extract_json(content)
res = json.loads(raw)
# 转换 0-1000 坐标到实际像素坐标
if res.get("uia_center_x") is not None and device_info:
w = device_info.get("width", FALLBACK_WIDTH)
h = device_info.get("height", FALLBACK_HEIGHT)
res["uia_center_x"] = int(res["uia_center_x"] * w / 1000)
res["uia_center_y"] = int(res["uia_center_y"] * h / 1000)
try:
data = json.loads(raw)
if isinstance(data, dict) and (data.get("bounds") or data.get("bounds_norm")):
data = cls._add_center(data, device_info)
# 只返回中心坐标
return {
"uia_center_x": data.get("uia_center_x"),
"uia_center_y": data.get("uia_center_y")
}
return {}
except Exception as e:
logger.error(f"Error parsing JSON: {e}")
return {}
return res
except Exception as e:
logger.error(f"VL find_time_price_button_coordinate error: {e}")
return {}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.3 KiB

View File

@@ -163,6 +163,17 @@ async def get_station_list(d, service, uploader, max_scrolls=MAX_SCROLLS):
# 1. 拍摄截图
image_uuid = str(uuid.uuid4())
screenshot_path = take_screenshot(d, image_uuid, save_dir=TEMP_IMAGE_DIR)
# 1.5 检查是否存在广告 (VL)
logger.info("检查是否存在广告弹窗...")
ad_res = await ReadImageKit.detect_ad_popup(screenshot_path, device_info=device_info)
if ad_res:
logger.info(f"检测到广告弹窗,准备关闭: {ad_res}")
d.click(ad_res['x'], ad_res['y'])
await asyncio.sleep(1.5)
# 重新截图
if os.path.exists(screenshot_path): os.remove(screenshot_path)
screenshot_path = take_screenshot(d, image_uuid, save_dir=TEMP_IMAGE_DIR)
# 检查是否已经滚动到底部 (排除状态栏后,内容与上次一致)
current_md5 = Kit.get_image_content_md5(

View File

@@ -2,7 +2,7 @@ import logging
import os
import cv2
import numpy as np
import time
import time
import hashlib
from Apps.XinDianTu.Config.Setting import BOTTOM_SAFE_EXCLUDE_RATIO
from Config.Config import TEMP_IMAGE_DIR
@@ -96,82 +96,7 @@ def clear_temp_dir(save_dir=TEMP_IMAGE_DIR):
logger.error(f"无法删除文件 {file_path}: {e}")
def click_image_template(d, template_path, timeout=5.0, threshold=0.8):
"""
使用 OpenCV 模板匹配查找并点击图片
:param d: uiautomator2 设备对象
:param template_path: 模板图片路径
:param timeout: 超时时间(秒)
:param threshold: 匹配阈值 (0.0 - 1.0)
:return: 是否点击成功
"""
if not os.path.exists(template_path):
logger.info(f"Template file not found: {template_path}")
return False
template = read_image(template_path)
if template is None:
logger.info(f"Failed to load template: {template_path}")
return False
t_h, t_w = template.shape[:2]
start_time = time.time()
best_val_overall = 0.0
while time.time() - start_time < timeout:
# 临时截图
temp_uuid = "temp_click_check"
screenshot_path = take_screenshot(d, temp_uuid, save_dir=TEMP_IMAGE_DIR)
target = read_image(screenshot_path)
if target is None:
time.sleep(0.5)
continue
# 多尺度匹配
found = None
# 缩放比例从 0.5 到 1.5,步长 0.1
for scale in np.linspace(0.5, 1.5, 11):
# 调整模板大小
resized_template = cv2.resize(template, (int(t_w * scale), int(t_h * scale)))
r_h, r_w = resized_template.shape[:2]
# 如果模板比目标还大,跳过
if r_h > target.shape[0] or r_w > target.shape[1]:
continue
result = cv2.matchTemplate(target, resized_template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val > best_val_overall:
best_val_overall = max_val
if max_val >= threshold:
found = (max_val, max_loc, r_w, r_h)
break # 找到满足阈值的即停止当前截图的搜索
# 清理临时文件
try:
os.remove(screenshot_path)
except:
pass
if found:
max_val, max_loc, r_w, r_h = found
# 计算中心点
top_left = max_loc
center_x = top_left[0] + r_w // 2
center_y = top_left[1] + r_h // 2
logger.info(f"Found image at ({center_x}, {center_y}) with confidence {max_val:.2f}")
d.click(center_x, center_y)
return True
time.sleep(1.0)
logger.info(f"Image not found after {timeout}s (Best confidence: {best_val_overall:.2f})")
return False
def is_background_dimmed(image_path, threshold=80):
@@ -711,29 +636,29 @@ def get_row_stats(gray):
return row_means, row_stds, edge_means
def clean_station_name(name):
"""
清理场站名称,移除结尾的省略号及多余空格,方便比对
"""
if not name: return ""
name = name.strip()
# 移除结尾的 . (通常是截断标识)
while name.endswith("."):
name = name[:-1]
return name.strip()
def get_file_md5(path):
"""计算文件的 MD5 值"""
if not os.path.exists(path):
return ""
hash_md5 = hashlib.md5()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def clean_station_name(name):
"""
清理场站名称,移除结尾的省略号及多余空格,方便比对
"""
if not name: return ""
name = name.strip()
# 移除结尾的 . (通常是截断标识)
while name.endswith("."):
name = name[:-1]
return name.strip()
def get_file_md5(path):
"""计算文件的 MD5 值"""
if not os.path.exists(path):
return ""
hash_md5 = hashlib.md5()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def crop_cards_from_image(img_path, output_dir=None, save_debug=True):
"""
从图片中裁剪场站卡片

View File

@@ -7,8 +7,7 @@ import uuid
import uiautomator2 as u2
from Apps.XinDianTu.Kit import take_screenshot, detect_black_agree_button, detect_any_ad_close, \
detect_bottom_close_circle, is_background_dimmed, click_image_template
from Apps.XinDianTu.Kit import take_screenshot
from Apps.XinDianTu.ReadImageKit import ReadImageKit
from Config.Config import TEMP_IMAGE_DIR
@@ -25,9 +24,9 @@ BASE_DIR = os.path.dirname(os.path.abspath(__file__))
async def check_and_close_ad(d):
"""
检测并关闭广告弹窗(优先使用计算机图形学方案,节省成本和时间
检测并关闭广告弹窗(仅使用 VL 视觉模型
"""
logger.info("开始检测广告弹窗...")
logger.info("开始检测广告弹窗 (VL方案)...")
# 1. 拍摄截图
t1 = time.time()
@@ -37,59 +36,7 @@ async def check_and_close_ad(d):
screenshot_path = take_screenshot(d, image_uuid, save_dir=save_dir)
logger.info(f"Step [广告检测截图] 耗时: {time.time() - t1:.4f}s")
# 1.1 特征预检:检测背景蒙板
is_dimmed = is_background_dimmed(screenshot_path)
if is_dimmed:
logger.info("检测到背景变暗(蒙板),极大概率存在广告弹窗。")
# 2. 图形学检测方案 (Local CV)
t_cv = time.time()
# 方案 A: 检测黑色的"同意"按钮 (隐私协议)
# 这一步不需要模板,直接通过颜色和形状特征检测
agree_pos = detect_black_agree_button(screenshot_path, debug_dir=save_dir)
if agree_pos:
x, y = agree_pos
logger.info(f"通过图形学算法检测到隐私协议'同意'按钮: ({x}, {y})")
d.click(x, y)
logger.info(f"Step [图形学检测-同意按钮] 耗时: {time.time() - t_cv:.4f}s")
# 清理截图
if os.path.exists(screenshot_path):
os.remove(screenshot_path)
return True
# 方案 B: 多模板匹配检测 "关闭(X)" 按钮 (支持多种广告样式)
template_dir = os.path.join(BASE_DIR, "Templates")
if os.path.exists(template_dir):
logger.info(f"正在尝试多模板匹配关闭按钮: {template_dir}")
# 如果检测到蒙板,可以适当调低匹配阈值
match_threshold = 0.6 if is_dimmed else 0.7
close_pos = detect_ad_close_x_with_threshold(screenshot_path, template_dir, save_dir, match_threshold)
if close_pos:
x, y = close_pos
logger.info(f"通过多模板匹配检测到广告关闭按钮: ({x}, {y})")
d.click(x, y)
logger.info(f"Step [图形学检测-关闭按钮] 耗时: {time.time() - t_cv:.4f}s")
if os.path.exists(screenshot_path):
os.remove(screenshot_path)
return True
# 方案 C: 底部圆形关闭按钮检测 (无模板,基于几何特征)
# 针对插屏广告底部中间的圆形关闭按钮
circle_pos = detect_bottom_close_circle(screenshot_path, debug_dir=save_dir)
if circle_pos:
x, y = circle_pos
logger.info(f"通过几何特征检测到底部圆形关闭按钮: ({x}, {y})")
d.click(x, y)
logger.info(f"Step [图形学检测-底部圆形按钮] 耗时: {time.time() - t_cv:.4f}s")
if os.path.exists(screenshot_path):
os.remove(screenshot_path)
return True
# 3. 视觉大模型检测方案 (VL Model) - 作为兜底
logger.info("本地图形学检测未发现广告,尝试调用视觉大模型检测...")
# 2. 视觉大模型检测方案 (VL Model)
try:
window_size = d.window_size()
device_info = {
@@ -98,9 +45,10 @@ async def check_and_close_ad(d):
"productName": d.info.get('productName', 'unknown')
}
ad_result = await ReadImageKit.detect_ad(screenshot_path, device_info=device_info)
if ad_result.get("has_ad") and ad_result.get("uia_center_x") is not None:
x, y = ad_result["uia_center_x"], ad_result["uia_center_y"]
# 使用最新的 detect_ad_popup 方法
ad_result = await ReadImageKit.detect_ad_popup(screenshot_path, device_info=device_info)
if ad_result:
x, y = ad_result["x"], ad_result["y"]
logger.info(f"通过视觉大模型检测到广告关闭按钮: ({x}, {y})")
d.click(x, y)
if os.path.exists(screenshot_path):
@@ -109,7 +57,7 @@ async def check_and_close_ad(d):
except Exception as e:
logger.error(f"视觉大模型广告检测异常: {e}")
logger.info(f"本地图形学和 VL 检测完成,未发现已知广告。")
logger.info(f"VL 检测完成,未发现已知广告。")
# 清理本地截图
if os.path.exists(screenshot_path):
@@ -117,18 +65,6 @@ async def check_and_close_ad(d):
return False
def detect_ad_close_x_with_threshold(screenshot_path, template_dir, debug_dir, threshold):
"""
带自定义阈值的多模板匹配
"""
for filename in os.listdir(template_dir):
if filename.startswith("ad_close") and filename.endswith(".jpg"):
template_path = os.path.join(template_dir, filename)
from Apps.XinDianTu.Kit import detect_ad_close_x
pos = detect_ad_close_x(screenshot_path, template_path, debug_dir=debug_dir, threshold=threshold)
if pos:
return pos
return None
async def open_mini_program():

View File

@@ -74,6 +74,76 @@ class ReadImageKit:
"严格返回纯JSON格式。"
)
@classmethod
async def detect_ad_popup(cls, image_path: str, device_info=None):
"""
使用 VL 模型检测是否存在弹窗广告,并返回关闭按钮坐标
"""
if not os.path.exists(image_path):
return None
with open(image_path, "rb") as image_file:
encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
img_input = {"url": f"data:image/jpeg;base64,{encoded_image}"}
# 构建提示词
prompt = (
"请仔细检查这张图片中是否存在**弹窗广告**或**悬浮广告**。\n"
"广告可能有以下几种形式:\n"
"1. **屏幕中央的大型弹窗广告**:通常遮挡了页面内容,内容多为优惠券、活动推广等。\n"
"2. **屏幕左侧或右侧的悬浮小广告**:例如一只兔子形状的图标、红包图标、或者带有'活动'字样的小图标。\n"
"3. **底部的横幅广告**:带有明显的关闭按钮。\n\n"
"如果发现了上述任何一种广告,请务必找到该广告的**关闭按钮**(通常是一个'X'符号,或者'关闭'字样,或者位于广告下方/右上角的圆圈叉号)。\n"
"请返回该关闭按钮的**中心坐标**。\n\n"
"请以纯 JSON 格式输出:\n"
"{\n"
" \"has_ad\": true/false,\n"
" \"close_point\": [x, y] // 0-1000 归一化坐标\n"
"}\n"
"如果没有广告,请返回 `{\"has_ad\": false}`。\n"
"只输出 JSON不要包含任何其他文字。"
)
try:
resp = await cls._client.chat.completions.create(
model=VL_MODEL_NAME, # 使用默认 VL 模型即可,或者使用 AD 专用
timeout=30,
messages=[
{
"role": "user",
"content": [
{"type": "image_url", "image_url": img_input},
{"type": "text", "text": prompt},
],
}
],
)
content = resp.choices[0].message.content.strip()
raw_json = cls._extract_json(content)
result = json.loads(raw_json)
if result.get("has_ad") and result.get("close_point"):
# 转换坐标
norm_point = result["close_point"]
if len(norm_point) == 2:
if not device_info:
device_info = cls._FALLBACK_DEVICE_INFO
w = device_info.get("displayWidth", FALLBACK_WIDTH)
h = device_info.get("displayHeight", FALLBACK_HEIGHT)
# 假设模型返回的是 0-1000 的坐标
x = int(norm_point[0] / 1000 * w)
y = int(norm_point[1] / 1000 * h)
return {"x": x, "y": y}
return None
except Exception as e:
logger.error(f"Failed to detect ad popup: {e}")
return None
@staticmethod
def _extract_json(text: str) -> str:
if not text:

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.7 KiB

View File

@@ -0,0 +1,46 @@
import os
import sys
# 将项目根目录添加到 sys.path
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from Apps.XinDianTu import Kit
def verify():
target_path = r"d:\dsWork\aiData\Output\Screenshot_20260112_215026.jpg"
template_path = r"d:\dsWork\aiData\Apps\XinDianTu\Templates\TuZi.jpg"
if not os.path.exists(target_path):
print(f"Target not found: {target_path}")
return
if not os.path.exists(template_path):
print(f"Template not found: {template_path}")
return
match = Kit.find_template_match(target_path, template_path, threshold=0.7)
if match:
cx, cy, w, h, val = match
print(f"SUCCESS: Found rabbit at ({cx}, {cy}) with size {w}x{h}, confidence {val:.2f}")
# Calculate close button position (below the rabbit)
# Based on visual estimation from user description, the close button is "below it".
# Let's assume a safe offset. Usually ad close buttons are attached to the ad image or slightly below.
# If the rabbit image includes the whole ad, the close button might be part of it or below.
# If TuZi.jpg is just the rabbit head, the close button is below.
close_offset_y = h // 2 + 30 # Start from center, go down half height + 30px
close_x = cx
close_y = cy + h // 2 + 20 # Try to aim below the image bottom edge
print(f"Proposed close button click at: ({close_x}, {close_y})")
else:
print("FAILED: Rabbit not found in the screenshot.")
if __name__ == "__main__":
verify()

View File

@@ -0,0 +1,32 @@
import asyncio
import os
import sys
# 将项目根目录添加到 sys.path
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from Apps.XinDianTu.ReadImageKit import ReadImageKit
async def verify_vl_ad_check():
target_path = r"d:\dsWork\aiData\Output\Screenshot_20260112_215026.jpg"
if not os.path.exists(target_path):
print(f"Target not found: {target_path}")
return
print("Checking for ads using VL model...")
# Mock device info for verification script
device_info = {"displayWidth": 1080, "displayHeight": 2244}
res = await ReadImageKit.detect_ad_popup(target_path, device_info=device_info)
if res:
print(f"SUCCESS: Ad detected! Close button at ({res['x']}, {res['y']})")
else:
print("No ad detected.")
if __name__ == "__main__":
asyncio.run(verify_vl_ad_check())