This commit is contained in:
HuangHai
2026-01-14 10:58:11 +08:00
parent 7549a5f038
commit 11e1091c6a
5 changed files with 163 additions and 77 deletions

View File

@@ -151,11 +151,11 @@ def detect_price_click_point_cv(image_path):
return [center_x, center_y]
def detect_cards_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
"""
使用计算机图形学 (OpenCV) 检测列表中的场站卡片。
"""
from Apps.TeLaiDian.Config.Setting import MIN_CARD_HEIGHT
def detect_cards_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
"""
使用计算机图形学 (OpenCV) 检测列表中的场站卡片。
"""
from Apps.TeLaiDian.Config.Setting import MIN_CARD_HEIGHT
img = read_image(image_path)
if img is None:
@@ -183,24 +183,38 @@ def detect_cards_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cards = []
# 降低最小宽度要求,有些卡片边缘可能没完全闭合
min_card_width = int(w * 0.6)
# 更严格的最小宽度要求:列表卡片通常接近整宽
min_card_width = int(w * 0.8)
temp_boxes = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
center_y = y + ch // 2
# 增加最小高度验证 MIN_CARD_HEIGHT
if (cw > min_card_width and
ch > MIN_CARD_HEIGHT * 0.8 and # 稍微放宽高度限制
y > top_limit and
y + ch < bottom_limit):
logger.info(f"✅ 找到候选卡片: y={y}, h={ch}, w={cw}")
# 绿色营销券区域排除
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
roi = hsv[max(0,y):min(h,y+ch), max(0,x):min(w,x+cw)]
green_mask = cv2.inRange(roi, np.array([35, 80, 80]), np.array([85, 255, 255]))
green_ratio = float(cv2.countNonZero(green_mask)) / (roi.shape[0]*roi.shape[1]) if roi.size > 0 else 0.0
# 圆角矩形判断:面积占比与顶点数量
rect_area = cw * ch
cnt_area = cv2.contourArea(cnt)
extent = cnt_area / rect_area if rect_area > 0 else 0
approx = cv2.approxPolyDP(cnt, 0.02 * cv2.arcLength(cnt, True), True)
ok_width = cw >= min_card_width
ok_height = ch > MIN_CARD_HEIGHT * 0.8
ok_vertical = center_y >= int(h * 0.58) and y > top_limit and y + ch < bottom_limit
ok_shape = (len(approx) >= 6 or extent > 0.85)
ok_color = green_ratio < 0.25
if ok_width and ok_height and ok_vertical and ok_shape and ok_color:
logger.info(f"✅ 找到候选卡片: y={y}, h={ch}, w={cw}, extent={extent:.2f}, verts={len(approx)}, green={green_ratio:.2f}")
temp_boxes.append((x, y, cw, ch))
else:
if cw > min_card_width:
logger.debug(f"❌ 排除候选(不符条件): y={y}, h={ch}, w={cw} (limit: {top_limit}-{bottom_limit}, min_h: {MIN_CARD_HEIGHT*0.8})")
if ok_width:
logger.debug(f"❌ 排除候选: y={y}, h={ch}, w={cw}, extent={extent:.2f}, verts={len(approx)}, green={green_ratio:.2f} (limit: {top_limit}-{bottom_limit})")
if not temp_boxes:
logger.warning(f"⚠️ CV 未能在指定范围 ({top_limit}-{bottom_limit}) 内检测到任何场站卡片")
@@ -226,46 +240,46 @@ def detect_cards_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
min(w, x + cw - padding),
min(bottom_limit, y + ch - padding)
])
return cards
def detect_wide_rounded_card_cv(image_path, min_width_ratio=0.8, min_y_ratio=0.5):
img = read_image(image_path)
if img is None:
return []
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
blur = cv2.GaussianBlur(gray, (7, 7), 0)
edges = cv2.Canny(blur, 60, 180)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15))
closed = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
results = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
center_y = y + ch // 2
if cw < int(w * min_width_ratio):
continue
if center_y < int(h * min_y_ratio):
continue
rect_area = cw * ch
cnt_area = cv2.contourArea(cnt)
if rect_area <= 0:
continue
extent = cnt_area / rect_area
approx = cv2.approxPolyDP(cnt, 0.02 * cv2.arcLength(cnt, True), True)
if extent < 0.7:
continue
if len(approx) >= 6 or (len(approx) == 4 and extent > 0.85):
results.append([x, y, x + cw, y + ch])
results.sort(key=lambda b: b[1])
return results
def draw_rectangles(image_path, bboxes=None, click_points=None):
"""
使用 OpenCV 在图片上绘制矩形框和点击点,生成 _vl.jpg 和 _flag.jpg
- _vl.jpg: 仅包含矩形框,供视觉模型参考
- _flag.jpg: 包含矩形框和点击点,供人工调试
return cards
def detect_wide_rounded_card_cv(image_path, min_width_ratio=0.8, min_y_ratio=0.5):
img = read_image(image_path)
if img is None:
return []
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
blur = cv2.GaussianBlur(gray, (7, 7), 0)
edges = cv2.Canny(blur, 60, 180)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15))
closed = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
results = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
center_y = y + ch // 2
if cw < int(w * min_width_ratio):
continue
if center_y < int(h * min_y_ratio):
continue
rect_area = cw * ch
cnt_area = cv2.contourArea(cnt)
if rect_area <= 0:
continue
extent = cnt_area / rect_area
approx = cv2.approxPolyDP(cnt, 0.02 * cv2.arcLength(cnt, True), True)
if extent < 0.7:
continue
if len(approx) >= 6 or (len(approx) == 4 and extent > 0.85):
results.append([x, y, x + cw, y + ch])
results.sort(key=lambda b: b[1])
return results
def draw_rectangles(image_path, bboxes=None, click_points=None):
"""
使用 OpenCV 在图片上绘制矩形框和点击点,生成 _vl.jpg 和 _flag.jpg
- _vl.jpg: 仅包含矩形框,供视觉模型参考
- _flag.jpg: 包含矩形框和点击点,供人工调试
"""
try:
DEBUG_BOX_COLOR = (0, 255, 0) # 绿色矩形

View File

@@ -245,12 +245,14 @@ class ReadImageKit:
cv_bboxes = detect_cards_cv(image_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
if cv_bboxes:
draw_rectangles(image_path, bboxes=cv_bboxes)
img = read_image(image_path)
h, w = img.shape[:2] if img is not None else (2400, 1080)
y_threshold = h * 0.58
# 在最终绘制阶段再生成可视化,避免早期绘制产生误导
prompt = f"""
图片中已经用绿色矩形框标记了 {len(cv_bboxes)} 个可能的充电站卡片
请按从上到下的顺序,识别每个绿色框内的场站信息。
图片下半部分可能包含多个充电站卡片,请按从上到下识别这些卡片的名称与地址
输出格式为 JSON 数组,长度必须为 {len(cv_bboxes)}
输出格式为 JSON 数组。
每个对象包含:
- "name": 场站名称
- "address": 场站地址
@@ -281,23 +283,51 @@ class ReadImageKit:
img = read_image(image_path)
h, w = img.shape[:2] if img is not None else (2400, 1080)
y_threshold = h * 0.58
for i, res in enumerate(vlm_results):
if i < len(cv_bboxes):
bbox = cv_bboxes[i]
# 物理坐标硬过滤:即使 CV 识别到了y 坐标在 58% 以上的也统统干掉
center_y = (bbox[1] + bbox[3]) // 2
if center_y < y_threshold:
logger.warning(f"CV 误报过滤 (y={center_y} < {y_threshold}): 可能是顶部营销卡片")
continue
if res and (res.get("is_valid") is True or (res.get("name") and res.get("is_valid") is not False)):
final_stations.append({
"name": res.get("name"),
"address": res.get("address"),
"point": [(bbox[0] + bbox[2]) // 2, (bbox[1] + bbox[3]) // 2],
"bbox": bbox
})
bottom_threshold = int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO))
use_bboxes = []
for b in cv_bboxes:
cy = (b[1] + b[3]) // 2
if cy >= y_threshold:
use_bboxes.append(b)
for res in vlm_results:
if not res:
continue
p = res.get("point")
actual_p = None
if p and len(p) == 2:
actual_p = p
if p[0] <= 1000 and p[1] <= 1000:
actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)]
# 顶部与底部安全过滤
if actual_p and (actual_p[1] < y_threshold or actual_p[1] > bottom_threshold):
logger.warning(f"过滤掉可能的误点击点 (y={actual_p[1]} < {y_threshold}): {res.get('name')}")
continue
matched_bbox = None
if actual_p:
for b in use_bboxes:
if b[0] <= actual_p[0] <= b[2] and b[1] <= actual_p[1] <= b[3]:
matched_bbox = b
break
# 如果没有匹配到 CV 框,则根据点击点生成一个虚拟框
if matched_bbox is None and actual_p is not None:
half_w = int(w * 0.4)
half_h = 90
x1 = max(0, actual_p[0] - half_w)
x2 = min(w, actual_p[0] + half_w)
y1 = max(0, actual_p[1] - half_h)
y2 = min(h, actual_p[1] + half_h)
matched_bbox = [x1, y1, x2, y2]
# 如果既没有 point 也没有匹配框,跳过
if actual_p is None and matched_bbox is None:
continue
if actual_p is None and matched_bbox:
actual_p = [(matched_bbox[0] + matched_bbox[2]) // 2, (matched_bbox[1] + matched_bbox[3]) // 2]
final_stations.append({
"name": res.get("name"),
"address": res.get("address"),
"point": actual_p,
"bbox": matched_bbox
})
elif not cv_bboxes:
vlm_list = vlm_results if isinstance(vlm_results, list) else []
w, h = 0, 0

42
T4_TeLaiDian_Simple.py Normal file
View File

@@ -0,0 +1,42 @@
# coding=utf-8
import sys
import os
import asyncio
import time
import logging
import uiautomator2 as u2
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.append(project_root)
from Apps.TeLaiDian import Kit
from Apps.TeLaiDian.ReadImageKit import ReadImageKit
logger = Kit.setup_logger("T4_TeLaiDian_Simple", clear_old_log=True)
async def run_simple():
Kit.clear_temp_dir()
d = u2.connect()
w, h = d.window_size()
logger.info(f"开始简单流程,当前窗口: {w}x{h}")
logger.info("执行显式下拉刷新以校准位置")
d.swipe(w // 2, int(h * 0.3), w // 2, int(h * 0.8), duration=0.5)
await asyncio.sleep(2.5)
screenshot_path = Kit.take_screenshot(d, f"tld_list_{int(time.time())}.jpg")
logger.info(f"列表页截图: {screenshot_path}")
rik = ReadImageKit()
stations = await rik.analyze_station_list(screenshot_path)
logger.info(f"识别到场站数量: {len(stations)}")
for i, s in enumerate(stations[:10]):
logger.info(f"[{i+1}] {s.get('name')} | point={s.get('point')} | bbox={s.get('bbox')}")
logger.info("简单流程结束")
if __name__ == "__main__":
try:
asyncio.run(run_simple())
except KeyboardInterrupt:
logger.info("用户中断")