This commit is contained in:
HuangHai
2026-01-14 13:14:05 +08:00
parent 11e1091c6a
commit 9e610b1906
6 changed files with 300 additions and 120 deletions

View File

@@ -93,6 +93,48 @@ def save_image(path, img):
logger.error(f"Error saving image {path}: {e}")
return False
def detect_list_price_blocks_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
img = read_image(image_path)
if img is None:
return []
h, w = img.shape[:2]
top_limit = int(h * top_ratio)
bottom_limit = int(h * (1 - bottom_ratio))
roi_bgr = img[top_limit:bottom_limit, :, :]
if roi_bgr.size == 0:
return []
b, g, r = cv2.split(roi_bgr)
mask_bgr = (r > 190) & (g > 80) & (r - g > 25) & (r - b > 25)
mask_bgr = (mask_bgr.astype(np.uint8)) * 255
if cv2.countNonZero(mask_bgr) < 50:
hsv = cv2.cvtColor(roi_bgr, cv2.COLOR_BGR2HSV)
lower_red1 = np.array([0, 100, 120])
upper_red1 = np.array([20, 255, 255])
lower_red2 = np.array([160, 100, 120])
upper_red2 = np.array([180, 255, 255])
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
mask = cv2.bitwise_or(mask1, mask2)
else:
mask = mask_bgr
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (30, 10))
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
blocks = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
y_global = y + top_limit
if ch < 30 or ch > 140:
continue
if cw < 80 or cw > int(w * 0.8):
continue
center_y = y_global + ch // 2
if center_y < int(h * 0.6) or center_y > bottom_limit:
continue
blocks.append([x, y_global, x + cw, y_global + ch])
blocks.sort(key=lambda b: b[1])
return blocks
def detect_price_click_point_cv(image_path):
"""
使用 HSV 颜色过滤定位详情页的橘红色价格区域,返回最左侧区域的中心点击点
@@ -162,7 +204,28 @@ def detect_cards_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
return []
h, w = img.shape[:2]
price_blocks = detect_list_price_blocks_cv(image_path, top_ratio=top_ratio, bottom_ratio=bottom_ratio)
cards = []
if price_blocks:
from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO
top_limit = int(h * SAFE_EXCLUDE_RATIO)
bottom_limit = int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO))
for b in price_blocks:
px1, py1, px2, py2 = b
center_y = (py1 + py2) // 2
card_height = MIN_CARD_HEIGHT
card_top = max(top_limit, center_y - int(card_height * 0.7))
card_bottom = card_top + card_height
if card_bottom > bottom_limit:
card_bottom = bottom_limit
card_top = max(top_limit, card_bottom - card_height)
card_left = 0
card_right = w
cards.append([card_left, card_top, card_right, card_bottom])
cards.sort(key=lambda b: b[1])
return cards
# 转换为灰度图
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
@@ -205,7 +268,7 @@ def detect_cards_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
ok_width = cw >= min_card_width
ok_height = ch > MIN_CARD_HEIGHT * 0.8
ok_vertical = center_y >= int(h * 0.58) and y > top_limit and y + ch < bottom_limit
ok_vertical = center_y >= int(h * 0.58) and y > top_limit and center_y < bottom_limit
ok_shape = (len(approx) >= 6 or extent > 0.85)
ok_color = green_ratio < 0.25
@@ -222,24 +285,43 @@ def detect_cards_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
# 按 Y 轴排序
temp_boxes.sort(key=lambda b: b[1])
# 再次过滤和去重
# 再次过滤和去重,并根据高度拆分为多张卡片
for i, box in enumerate(temp_boxes):
x, y, cw, ch = box
# 检查是否与已有的框重叠
# 检查是否与已有的框重叠(按顶部 y 去重)
is_duplicate = False
for v in cards:
if abs(y - v[1]) < 100: # 增加去重间距
if abs(y - v[1]) < 100:
is_duplicate = True
break
if not is_duplicate:
padding = 2
cards.append([
max(0, x + padding),
max(top_limit, y + padding),
min(w, x + cw - padding),
min(bottom_limit, y + ch - padding)
])
if is_duplicate:
continue
padding = 2
x1 = max(0, x + padding)
x2 = min(w, x + cw - padding)
y1 = max(top_limit, y + padding)
y2 = min(bottom_limit, y + ch - padding)
if y2 <= y1:
continue
effective_h = y2 - y1
estimated_count = max(1, int(round(effective_h / float(MIN_CARD_HEIGHT))))
if estimated_count <= 1:
cards.append([x1, y1, x2, y2])
else:
seg_h = effective_h / float(estimated_count)
for k in range(estimated_count):
sy1 = int(round(y1 + k * seg_h))
sy2 = int(round(y1 + (k + 1) * seg_h))
if sy2 <= sy1:
continue
if sy2 - sy1 < MIN_CARD_HEIGHT * 0.6:
continue
cards.append([x1, sy1, x2, sy2])
return cards

View File

@@ -11,7 +11,7 @@ if project_root not in sys.path:
from Util.VLMKit import VLMKit
from Apps.TeLaiDian.Kit import draw_rectangles, detect_cards_cv, setup_logger, read_image, detect_wide_rounded_card_cv
from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO
from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO, MIN_CARD_HEIGHT
# 初始化日志
logger = setup_logger("ReadImageKit")
@@ -242,36 +242,30 @@ class ReadImageKit:
"""
分析场站列表页图片,提取场站位置和基本信息
"""
cv_bboxes = detect_cards_cv(image_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
prompt = """
分析这张充电站列表截图,提取所有真实的充电站卡片。
if cv_bboxes:
img = read_image(image_path)
h, w = img.shape[:2] if img is not None else (2400, 1080)
y_threshold = h * 0.58
# 在最终绘制阶段再生成可视化,避免早期绘制产生误导
prompt = f"""
图片下半部分可能包含多个充电站卡片,请按从上到下识别这些卡片的名称与地址。
输出格式为 JSON 数组。
每个对象包含:
- "name": 场站名称
- "address": 场站地址
- "is_valid": true/false (是否为真实的场站卡片)
"""
else:
prompt = """
分析这张充电站列表截图,提取所有充电站卡片信息。
**重要限制**
1. 忽略页面上半部分(坐标 y < 500的所有内容包括顶部的搜索框、广告 Banner 和“PLUS会员/我的卡券”等图标。
2. 仅提取下方重复出现的、包含“充电站”名称的矩形卡片。
输出格式为 JSON 数组,每个对象包含:
- "name": 场站名称
- "address": 场站地址
- "point": 场站卡片的中心点击坐标 [x, y]
- "bbox": 场站卡片的边界框 [x1, y1, x2, y2]
"""
要求:
1. 忽略页面上半部分(如顶部导航栏、搜索框、广告 Banner、筛选标签等
2. 仅识别下半部分一条条“充电站卡片”,每张卡片通常包含:场站名称、评分、最近充电时间、距离、价格、快/慢空闲数量等。
3. 不要把同一张卡片拆成多块;每条场站只对应一个矩形框。
对于每张卡片,请输出:
- name: 场站名称
- address: 场站地址(如果无法确定可置为 null
- point: 卡片中心点击坐标 [x, y],使用归一化坐标 [0-1000]0 表示最左/最上1000 表示最右/最下)
- bbox: 卡片外接矩形边界 [x1, y1, x2, y2],同样使用归一化坐标 [0-1000]
以 JSON 数组形式输出,例如:
[
{
"name": "某某充电站",
"address": "某某路 100 号",
"point": [500, 750],
"bbox": [50, 600, 950, 820]
}
]
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)
@@ -279,82 +273,79 @@ class ReadImageKit:
vlm_results = json.loads(json_str)
final_stations = []
if cv_bboxes and isinstance(vlm_results, list):
img = read_image(image_path)
h, w = img.shape[:2] if img is not None else (2400, 1080)
y_threshold = h * 0.58
bottom_threshold = int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO))
use_bboxes = []
for b in cv_bboxes:
cy = (b[1] + b[3]) // 2
if cy >= y_threshold:
use_bboxes.append(b)
for res in vlm_results:
if not res:
vlm_list = vlm_results if isinstance(vlm_results, list) else []
img = read_image(image_path)
h, w = img.shape[:2] if img is not None else (2400, 1080)
y_threshold = h * SAFE_EXCLUDE_RATIO
bottom_threshold = int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO))
def to_pixel(v, max_len):
if v is None:
return None
try:
fv = float(v)
except:
return None
if fv <= 1000.0:
return int(fv * max_len / 1000.0)
return int(fv)
for res in vlm_list:
if not res:
continue
p = res.get("point")
b = res.get("bbox")
actual_p = None
actual_bbox = None
if p and len(p) == 2:
px = to_pixel(p[0], w)
py = to_pixel(p[1], h)
if px is not None and py is not None:
actual_p = [px, py]
if b and len(b) == 4:
x1 = to_pixel(b[0], w)
y1 = to_pixel(b[1], h)
x2 = to_pixel(b[2], w)
y2 = to_pixel(b[3], h)
if None not in (x1, y1, x2, y2):
actual_bbox = [max(0, int(x1)), max(0, int(y1)), min(w, int(x2)), min(h, int(y2))]
if actual_p is None and actual_bbox:
actual_p = [(actual_bbox[0] + actual_bbox[2]) // 2, (actual_bbox[1] + actual_bbox[3]) // 2]
if actual_p is None and actual_bbox is None:
continue
if actual_p and (actual_p[1] < y_threshold or actual_p[1] > bottom_threshold):
logger.warning(f"过滤掉可能的误点击点 (y={actual_p[1]}): {res.get('name')}")
continue
if actual_bbox is None and actual_p:
half_w = int(w * 0.4)
half_h = max(MIN_CARD_HEIGHT // 2, 90)
x1 = max(0, actual_p[0] - half_w)
x2 = min(w, actual_p[0] + half_w)
y1 = max(0, actual_p[1] - half_h)
y2 = min(h, actual_p[1] + half_h)
actual_bbox = [x1, y1, x2, y2]
if actual_bbox is not None:
bx1, by1, bx2, by2 = actual_bbox
current_h = by2 - by1
if current_h < MIN_CARD_HEIGHT * 0.8 or by2 > bottom_threshold:
continue
p = res.get("point")
actual_p = None
if p and len(p) == 2:
actual_p = p
if p[0] <= 1000 and p[1] <= 1000:
actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)]
# 顶部与底部安全过滤
if actual_p and (actual_p[1] < y_threshold or actual_p[1] > bottom_threshold):
logger.warning(f"过滤掉可能的误点击点 (y={actual_p[1]} < {y_threshold}): {res.get('name')}")
continue
matched_bbox = None
if actual_p:
for b in use_bboxes:
if b[0] <= actual_p[0] <= b[2] and b[1] <= actual_p[1] <= b[3]:
matched_bbox = b
break
# 如果没有匹配到 CV 框,则根据点击点生成一个虚拟框
if matched_bbox is None and actual_p is not None:
half_w = int(w * 0.4)
half_h = 90
x1 = max(0, actual_p[0] - half_w)
x2 = min(w, actual_p[0] + half_w)
y1 = max(0, actual_p[1] - half_h)
y2 = min(h, actual_p[1] + half_h)
matched_bbox = [x1, y1, x2, y2]
# 如果既没有 point 也没有匹配框,跳过
if actual_p is None and matched_bbox is None:
continue
if actual_p is None and matched_bbox:
actual_p = [(matched_bbox[0] + matched_bbox[2]) // 2, (matched_bbox[1] + matched_bbox[3]) // 2]
final_stations.append({
"name": res.get("name"),
"address": res.get("address"),
"point": actual_p,
"bbox": matched_bbox
})
elif not cv_bboxes:
vlm_list = vlm_results if isinstance(vlm_results, list) else []
w, h = 0, 0
for res in vlm_list:
p = res.get("point")
if p and len(p) == 2:
# 1. 坐标转换逻辑
actual_p = p
if p[0] <= 1000 and p[1] <= 1000:
if w == 0:
img = read_image(image_path)
if img is not None:
h, w = img.shape[:2]
else:
w, h = 1080, 2400 # 兜底
actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)]
# 2. 坐标安全过滤:忽略页面上半部分的误点击(通常是 Logo 或广告)
# 即使 VLM 没听话,我们也在这里硬性过滤
# 根据主屏幕截图,顶部广告区和功能图标区占用了约 55% 的空间
y_threshold = h * 0.58 if h > 0 else 1400
if actual_p[1] < y_threshold:
logger.warning(f"过滤掉可能的误点击点 (y={actual_p[1]} < {y_threshold}): {res.get('name')}")
continue
res["point"] = actual_p
final_stations.append(res)
final_stations.append({
"name": res.get("name"),
"address": res.get("address"),
"point": actual_p,
"bbox": actual_bbox
})
if final_stations:
final_stations.sort(key=lambda s: (s.get("point")[1] if s.get("point") else (s.get("bbox")[1] if s.get("bbox") else 0)))
# 保存诊断图片 (_vl.jpg, _flag.jpg)
if final_stations:

View File

@@ -30,11 +30,25 @@ def test_cv_detection(image_path):
for i, box in enumerate(bboxes):
print(f" 卡片 {i+1}: {box}")
# 2. 生成 _vl.jpg (仅绿框)
# 1.5 检测红色价格块并打印
try:
price_blocks = Kit.detect_list_price_blocks_cv(image_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
except AttributeError:
price_blocks = []
print(f"检测到 {len(price_blocks)} 个红色价格块")
for i, pb in enumerate(price_blocks):
print(f" 价格块 {i+1}: {pb}")
# 2. 生成 _vl.jpg (绿框 + 蓝点标记红色价格行)
vl_path = image_path.replace(".jpg", "_vl.jpg")
img_vl = Kit.read_image(image_path)
for box in bboxes:
cv2.rectangle(img_vl, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 3)
for pb in price_blocks:
px1, py1, px2, py2 = pb
pcx = (px1 + px2) // 2
pcy = (py1 + py2) // 2
cv2.circle(img_vl, (pcx, pcy), 10, (255, 0, 0), -1)
Kit.save_image(vl_path, img_vl)
print(f"已生成 VLM 标注图: {vl_path}")
@@ -59,5 +73,5 @@ def test_cv_detection(image_path):
print(f"已生成人工核对图: {flag_path}")
if __name__ == "__main__":
target_image = r"d:\dsWork\aiData\Output\tld_list_1768347471.jpg"
target_image = r"d:\dsWork\aiData\Output\tld_list_1768359492.jpg"
test_cv_detection(target_image)

93
debug_cv.py Normal file
View File

@@ -0,0 +1,93 @@
import sys
import os
import cv2
import numpy as np
sys.path.append(os.getcwd())
from Apps.TeLaiDian.Kit import detect_cards_cv as real_detect_cards_cv
from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO
def read_image(path):
if not path or not os.path.exists(path):
return None
try:
data = np.fromfile(path, dtype=np.uint8)
if data.size == 0:
return None
img = cv2.imdecode(data, -1)
return img
except Exception as e:
print(f"Error reading image {path}: {e}")
return None
def detect_cards_cv(image_path, top_ratio=None, bottom_ratio=None):
if top_ratio is None:
top_ratio = SAFE_EXCLUDE_RATIO
if bottom_ratio is None:
bottom_ratio = BOTTOM_SAFE_EXCLUDE_RATIO
MIN_CARD_HEIGHT = 150 # Assuming default from Setting
img = read_image(image_path)
if img is None:
print("Image not found or invalid")
return []
h, w = img.shape[:2]
print(f"Image Size: {w}x{h}")
# 转换为灰度图
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 限制检测范围
top_limit = int(h * top_ratio)
bottom_limit = int(h * (1 - bottom_ratio))
print(f"CV limits: top={top_limit}, bottom={bottom_limit}, threshold_y={int(h * 0.58)}")
# 使用自适应阈值
thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2)
# 闭运算
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (w // 4, 3))
closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
# 寻找轮廓
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
min_card_width = int(w * 0.8)
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
center_y = y + ch // 2
rect_area = cw * ch
cnt_area = cv2.contourArea(cnt)
extent = cnt_area / rect_area if rect_area > 0 else 0
approx = cv2.approxPolyDP(cnt, 0.02 * cv2.arcLength(cnt, True), True)
ok_width = cw >= min_card_width
ok_height = ch > MIN_CARD_HEIGHT * 0.8
ok_vertical = center_y >= int(h * 0.58) and y > top_limit and y + ch < bottom_limit
# Check green ratio
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
roi = hsv[max(0,y):min(h,y+ch), max(0,x):min(w,x+cw)]
green_mask = cv2.inRange(roi, np.array([35, 80, 80]), np.array([85, 255, 255]))
green_ratio = float(cv2.countNonZero(green_mask)) / (roi.shape[0]*roi.shape[1]) if roi.size > 0 else 0.0
ok_color = green_ratio < 0.25
if cw > w * 0.5: # Only print large enough boxes
print(f"Box: y={y}, h={ch}, w={cw}, center_y={center_y}, extent={extent:.2f}, green={green_ratio:.2f}")
print(f" Checks: width={ok_width}, height={ok_height}, vertical={ok_vertical}, color={ok_color}")
image_path = r"d:\dsWork\aiData\Output\tld_list_1768359492_flag.jpg"
# Try the original if flag doesn't exist or is modified
original_path = r"d:\dsWork\aiData\Output\tld_list_1768359492.jpg"
if os.path.exists(original_path):
print(f"Testing original image: {original_path}")
detect_cards_cv(original_path)
else:
print(f"Original image not found, trying flag: {image_path}")
detect_cards_cv(image_path)