This commit is contained in:
HuangHai
2026-01-13 20:54:49 +08:00
parent 6a11893d07
commit ddb63403b7
7 changed files with 168 additions and 45 deletions

View File

@@ -115,25 +115,40 @@ class YiLaiTeCrawler(BaseCrawler):
if await self.redis_kit.get_data(redis_key):
continue
# 点击进入前截图,用于对比是否成功进入二级页
before_click_path = take_screenshot(d, f"before_{clean_station_name(name)}")
before_md5 = get_image_content_md5(before_click_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
# 点击进入
logger.info(f">>> 发现新场站: {name} 坐标: {point}")
d.click(point[0], point[1])
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
# 分析详情页 (采用异步后台模式)
detail_shot = take_screenshot(d, f"detail_{clean_station_name(name)}_{int(time.time())}.jpg")
# 启动后台任务处理详情页
task = asyncio.create_task(self.analyze_detail_background(name, detail_shot))
background_tasks.append(task)
processed_count += 1
new_stations_in_page += 1
await self.redis_kit.set_data(redis_key, "1", expire=86400*7)
# 返回
d.press("back")
await asyncio.sleep(WAIT_BACK_TO_LIST)
detail_shot = take_screenshot(d, f"detail_{clean_station_name(name)}_{int(time.time())}")
after_md5 = get_image_content_md5(detail_shot, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
# 清理临时对比图
if os.path.exists(before_click_path): os.remove(before_click_path)
if before_md5 != after_md5:
logger.info(f"成功进入详情页: {name}")
# 启动后台任务处理详情页
task = asyncio.create_task(self.analyze_detail_background(name, detail_shot))
background_tasks.append(task)
processed_count += 1
new_stations_in_page += 1
await self.redis_kit.set_data(redis_key, "1", expire=86400*7)
# 返回
d.press("back")
await asyncio.sleep(WAIT_BACK_TO_LIST)
else:
logger.warning(f"点击场站 {name} 后页面似乎未跳转,跳过返回操作")
if os.path.exists(detail_shot): os.remove(detail_shot)
# 即使没进去,也记录一下,避免短时间内重复尝试
await self.redis_kit.set_data(redis_key, "1", expire=3600)
if new_stations_in_page == 0 and stations:
no_new_data_count += 1

View File

@@ -98,6 +98,96 @@ def save_image(path, img):
logger.error(f"Error saving image {path}: {e}")
return False
def detect_cards_cv(image_path, top_ratio=0.15, bottom_ratio=0.1):
"""
使用计算机图形学 (OpenCV) 检测列表中的场站卡片
方案:以 nav.jpg 为锚点,向上/下探测白色卡片边界
"""
img = read_image(image_path)
if img is None:
return []
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 1. 模板匹配查找锚点 (nav.jpg)
template_path = os.path.join(os.path.dirname(__file__), "Template", "nav.jpg")
anchors = []
if os.path.exists(template_path):
template = read_image(template_path)
if template is not None:
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
threshold = 0.8
loc = np.where(res >= threshold)
# 去重记录锚点中心
t_h, t_w = template.shape[:2]
matched_points = []
for pt in zip(*loc[::-1]): # (x, y)
is_duplicate = False
for mx, my in matched_points:
if abs(mx - pt[0]) < 50 and abs(my - pt[1]) < 50:
is_duplicate = True
break
if not is_duplicate:
matched_points.append(pt)
anchors.append((pt[0] + t_w // 2, pt[1] + t_h // 2))
logger.info(f"通过模板匹配找到 {len(anchors)} 个锚点")
# 2. 基于锚点向上/下扩张探测边界
# 计算每一行的平均亮度,用于探测背景
row_means = np.mean(gray[:, int(w*0.2):int(w*0.8)], axis=1)
cards = []
for ax, ay in anchors:
# 向上探测:直到亮度低于 245 (进入灰色背景)
y1 = ay
while y1 > h * top_ratio:
if row_means[int(y1)] < 245:
break
y1 -= 1
# 向下探测:直到亮度低于 245
y2 = ay
while y2 < h * (1 - bottom_ratio):
if row_means[int(y2)] < 245:
break
y2 += 1
# 验证卡片高度是否合理
if y2 - y1 > 150:
# 给 y 坐标增加一点 padding
cards.append([int(w * 0.05), int(y1 + 5), int(w * 0.95), int(y2 - 5)])
# 3. 兜底方案:如果模板匹配没找到,使用之前的投影法
if not cards:
logger.warning("模板匹配未找到卡片,尝试投影法兜底...")
_, binary = cv2.threshold(gray, 250, 255, cv2.THRESH_BINARY)
top = int(h * top_ratio)
bottom = int(h * (1 - bottom_ratio))
roi_binary = binary[:, int(w*0.2):int(w*0.8)]
row_white_counts = np.sum(roi_binary == 255, axis=1)
threshold_count = (int(w*0.8) - int(w*0.2)) * 0.9
is_card_row = row_white_counts > threshold_count
start_y = None
for y in range(top, bottom):
if is_card_row[y]:
if start_y is None: start_y = y
else:
if start_y is not None:
if y - start_y > 150:
cards.append([int(w * 0.05), start_y + 10, int(w * 0.95), y - 10])
start_y = None
if start_y is not None and bottom - start_y > 150:
cards.append([int(w * 0.05), start_y + 10, int(w * 0.95), bottom - 10])
# 4. 排序
cards.sort(key=lambda c: c[1])
logger.info(f"最终检测到 {len(cards)} 个场站卡片区域")
return cards
def clear_temp_dir(save_dir=None):
"""清空临时目录中的所有文件"""
if save_dir is None:
@@ -141,7 +231,7 @@ def draw_rectangles(image_path, points, output_path=None):
if not DRAW_DEBUG_BOXES:
return image_path
img = cv2.imread(image_path)
img = read_image(image_path)
if img is None:
return image_path
@@ -153,7 +243,7 @@ def draw_rectangles(image_path, points, output_path=None):
cv2.circle(img, center, 10, DEBUG_BOX_COLOR, -1)
save_path = output_path if output_path else image_path
cv2.imwrite(save_path, img)
save_image(save_path, img)
return save_path
except Exception as e:
logger.error(f"绘制矩形框失败: {e}")

View File

@@ -12,8 +12,8 @@ from Util.VLMKit import VLMKit
import json
import re
from Apps.YeLiTe.Kit import draw_rectangles
from Apps.YeLiTe.Config.Setting import DRAW_DEBUG_BOXES
from Apps.YeLiTe.Kit import draw_rectangles, detect_cards_cv
from Apps.YeLiTe.Config.Setting import DRAW_DEBUG_BOXES, SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO
logger = logging.getLogger(__name__)
@@ -57,43 +57,61 @@ class ReadImageKit:
async def analyze_station_list(self, image_path):
"""
分析场站列表页图片,提取场站位置和基本信息
分析场站列表页图片,提取场站位置和基本信息 (Hybrid 模式: CV 检测边界 + VLM 识别文字)
"""
prompt = """
分析这张充电站列表截图,提取所有充电站卡片信息。
# 1. 使用图形学检测卡片边界
cv_bboxes = detect_cards_cv(image_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
**重要过滤规则:**
1. 忽略顶部的筛选栏、排序方式区域(包含“排序方式”、“离我最近”、“地图模式”、“停车收费”、“限时免费”等按钮的区域)。
2. 仅提取下方重复出现的场站信息卡片。
3. 场站卡片通常包含:场站名称(如“驿来特...”、价格如“¥1.33/度”)、快/慢充桩数量、距离等。
输出格式为 JSON 数组,每个对象包含:
- "name": 场站名称
- "point": 场站卡片的中心点击坐标 [x, y]
- "bbox": 场站卡片的边界框 [x1, y1, x2, y2]
注意:
1. 仅提取明显的场站列表卡片。
2. 坐标请以像素为单位。
"""
if cv_bboxes:
# 在图片上绘制绿色框,方便 VLM 对应
draw_rectangles(image_path, cv_bboxes)
prompt = f"""
图片中已经用绿色矩形框标记了 {len(cv_bboxes)} 个可能的充电站卡片。
请按从上到下的顺序,识别每个绿色框内的场站信息。
输出格式为 JSON 数组,长度必须为 {len(cv_bboxes)}
如果某个框内不是有效的场站卡片,请在对应的数组位置返回 null。
每个对象包含:
- "name": 场站名称 (仅提取名称文字)
- "is_valid": true/false (是否为真实的场站卡片)
注意:仅提取名称,不要包含距离或价格。
"""
else:
# 如果 CV 没检测到,退回到纯 VLM 模式
prompt = """
分析这张充电站列表截图,提取所有充电站卡片信息。
忽略顶部的筛选栏,仅提取下方重复出现的场站卡片。
输出格式为 JSON 数组,每个对象包含:
- "name": 场站名称
- "point": 场站卡片的中心点击坐标 [x, y]
- "bbox": 场站卡片的边界框 [x1, y1, x2, y2]
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)
# 使用 VLMKit 的提取方法
json_str = self.vlm.extract_json(res_text)
stations = json.loads(json_str)
vlm_results = json.loads(json_str)
if isinstance(stations, list):
# 调试绘图
if DRAW_DEBUG_BOXES:
bboxes = [s['bbox'] for s in stations if 'bbox' in s]
points = [s['point'] for s in stations if 'point' in s]
draw_rectangles(image_path, bboxes + points)
final_stations = []
if cv_bboxes and isinstance(vlm_results, list):
# 将 VLM 识别到的名称与 CV 的坐标匹配
for i, res in enumerate(vlm_results):
if i < len(cv_bboxes) and res and res.get("is_valid"):
bbox = cv_bboxes[i]
final_stations.append({
"name": res.get("name"),
"point": [(bbox[0] + bbox[2]) // 2, (bbox[1] + bbox[3]) // 2],
"bbox": bbox
})
elif not cv_bboxes:
final_stations = vlm_results if isinstance(vlm_results, list) else []
return stations
return []
return final_stations
except Exception as e:
logger.error(f"VLM 分析列表页失败: {e}")
logger.error(f"Hybrid 分析列表页失败: {e}")
return []
async def analyze_detail_price(self, image_path):

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.9 KiB