This commit is contained in:
HuangHai
2026-01-14 10:16:28 +08:00
parent 46a53d6781
commit 6fba055a0f
3 changed files with 163 additions and 93 deletions

View File

@@ -118,117 +118,159 @@ class TeLaiDianCrawler(BaseCrawler):
address = basic_info.get("address") or station_info.get("address")
logger.info(f"详情页基础信息识别完成: {station_name} | {address}")
# 2. 向上滑动以露出价格按钮
logger.info(f"执行滑动操作以显示价格按钮 (距离比例: {DETAIL_SCROLL_DISTANCE_RATIO})...")
# 从屏幕中间向上滑动
d.swipe_ext("up", scale=DETAIL_SCROLL_DISTANCE_RATIO)
await asyncio.sleep(1.5)
# 3. 点击“价格信息”区域 (识别橘红色价格 P0)
price_button_screen = take_screenshot(d, f"tld_before_price_click_{int(time.time())}.jpg")
logger.info("正在通过 CV 寻找橘红色价格区域 (P0)...")
# 2. 小步快跑寻找价格入口 (结合 CV 和 VLM)
found_entrance = False
entrance_point = None
max_search_steps = 4
click_point = detect_price_click_point_cv(price_button_screen)
logger.info(f"开始“小步快跑”策略寻找价格入口,最多尝试 {max_search_steps} 次小幅度滑动...")
# 调试:生成点击点标注图
if click_point:
debug_flag_path = price_button_screen.replace(".jpg", "_click_debug.jpg")
img_debug = read_image(price_button_screen)
if img_debug is not None:
cv2.circle(img_debug, (click_point[0], click_point[1]), 20, (0, 0, 255), -1) # 红色大圆点
cv2.line(img_debug, (click_point[0]-40, click_point[1]), (click_point[0]+40, click_point[1]), (255, 255, 255), 3)
cv2.line(img_debug, (click_point[0], click_point[1]-40), (click_point[0], click_point[1]+40), (255, 255, 255), 3)
save_image(debug_flag_path, img_debug)
logger.info(f"点击点调试图已保存: {debug_flag_path}")
try:
if click_point:
logger.info(f"CV 成功定位价格区域,点击坐标: {click_point}")
d.click(click_point[0], click_point[1])
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
for step in range(max_search_steps):
current_screen = take_screenshot(d, f"tld_search_price_step_{step}.jpg")
logger.info(f"--- 寻找入口 第 {step+1} 步 ---")
# 优先使用 CV 快速识别橘红色价格 P0
logger.info("尝试 CV 识别价格卡片 (P0)...")
cv_point = detect_price_click_point_cv(current_screen)
if cv_point:
logger.info(f"✅ CV 在第 {step+1} 步成功定位入口: {cv_point}")
entrance_point = cv_point
found_entrance = True
else:
logger.warning("CV 未能定位价格区域,尝试模板匹配兜底...")
template_path = os.path.join(project_root, "Apps", "TeLaiDian", "Template", "jgxx.jpg")
match = d.image.match(template_path)
if match:
d.image.click(template_path)
# CV 没找到,使用 VLM 进行深度语义检查
logger.info("CV 未找到,启动 VLM 深度语义识别...")
vlm_res = await self.read_image_kit.find_price_entrance_vlm(current_screen)
if vlm_res.get("found"):
norm_point = vlm_res.get("point") # [x, y] in 0-1000
if norm_point and len(norm_point) == 2:
w, h = d.window_size()
entrance_point = [int(norm_point[0] * w / 1000), int(norm_point[1] * h / 1000)]
logger.info(f"✅ VLM 在第 {step+1} 步成功定位入口: {entrance_point} ({vlm_res.get('reason')})")
found_entrance = True
else:
logger.warning("模板匹配也失败,执行坐标兜底...")
w, h = d.window_size()
d.click(w // 2, int(h * 0.45)) # 滑动后价格通常在屏幕中上部
await asyncio.sleep(2.0) # 等待页面加载及小程序可能的自动滚动
logger.info(f"{step+1} 步未发现入口: {vlm_res.get('reason', '未知原因')}")
# 如果找到入口,进行标注并点击
if found_entrance and entrance_point:
debug_flag_path = current_screen.replace(".jpg", "_entrance_found.jpg")
img_debug = read_image(current_screen)
if img_debug is not None:
cv2.circle(img_debug, (entrance_point[0], entrance_point[1]), 25, (0, 255, 0), 5) # 绿色大圆圈
save_image(debug_flag_path, img_debug)
logger.info(f"入口位置标注图已保存: {debug_flag_path}")
# 额外处理:小程序可能会自动滚动到当前时段,我们需要手动滚回顶部以抓取完整数据
logger.info("向上滚动 2 次,确保回到 00:00 时段顶部...")
for _ in range(2):
d.swipe_ext("down", scale=0.9) # 向下滑动 = 页面向上滚动
await asyncio.sleep(0.8)
logger.info(f"正在点击价格入口: {entrance_point}")
d.click(entrance_point[0], entrance_point[1])
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
# 清理临时截图
if os.path.exists(current_screen): os.remove(current_screen)
break
# 没找到,小步向上滚动
if step < max_search_steps - 1:
scroll_scale = 0.35
logger.info(f"未发现入口,执行小幅度向上滑动 (scale={scroll_scale})...")
d.swipe_ext("up", scale=scroll_scale)
await asyncio.sleep(1.2)
# 清理临时截图
if os.path.exists(current_screen): os.remove(current_screen)
if not found_entrance:
logger.warning("“小步快跑”策略未能找到价格入口,尝试坐标兜底...")
w, h = d.window_size()
d.click(w // 2, int(h * 0.45))
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
# 3. 进入分时电价页面后的处理
try:
# 1. 延长等待时间,等待小程序自动定位到当前时段的滚动完成
logger.info("已点击进入价格详情,等待小程序自动滚动定位完成 (4秒)...")
await asyncio.sleep(4.0)
# 2. 回到 00:00 原点:要看到上面的内容,需要“向下拉动”页面(即向上滚动列表)
logger.info("执行向下拉动,尝试回到 00:00 时段顶部...")
for i in range(3):
# swipe_ext("down") 是手指从上往下划,动作是“向下”,结果是页面“向上”滚动
d.swipe_ext("down", scale=0.8)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"点击价格区域失败: {e}")
finally:
if os.path.exists(price_button_screen): os.remove(price_button_screen)
logger.error(f"处理分时电价页面初始状态失败: {e}")
# 4. 循环滑动抓取完整分时电价
all_prices = []
last_price_md5 = None
price_page_count = 0
max_price_pages = 3 # 分时电价通常不会超过3页
max_price_pages = 4 # 增加到4页确保覆盖 00:00-24:00
screenshot_tasks = [] # 用于异步分析图片的任务列表
temp_screenshots = [] # 记录临时文件以便后续清理
logger.info("开始循环截图并异步抓取完整分时电价...")
logger.info("开始循环截图UI操作优先后台并行分析...")
while price_page_count < max_price_pages:
price_screen_path = take_screenshot(d, f"tld_detail_price_{int(time.time())}_{price_page_count}.jpg")
# 校验页面是否发生滚动变化
curr_md5 = get_image_content_md5(price_screen_path, top_ratio=0.2, bottom_ratio=0.2)
if curr_md5 == last_price_md5:
logger.info("价格页面内容无变化,判定已触底")
if os.path.exists(price_screen_path): os.remove(price_screen_path)
break
last_price_md5 = curr_md5
temp_screenshots.append(price_screen_path)
try:
while price_page_count < max_price_pages:
price_screen_path = take_screenshot(d, f"tld_detail_price_{int(time.time())}_{price_page_count}.jpg")
# 校验页面是否发生滚动变化
curr_md5 = get_image_content_md5(price_screen_path, top_ratio=0.2, bottom_ratio=0.2)
if curr_md5 == last_price_md5:
logger.info("价格页面内容无变化,判定已触底")
if os.path.exists(price_screen_path): os.remove(price_screen_path)
break
last_price_md5 = curr_md5
temp_screenshots.append(price_screen_path)
logger.info(f"已截取价格详情页第 {price_page_count + 1} 页: {price_screen_path}加入异步分析队列")
# 创建异步任务,但不立即 await
task = self.read_image_kit.analyze_detail_price(price_screen_path)
screenshot_tasks.append(task)
# 向上滑动,继续抓取下一屏
logger.info("向上滑动,准备截取下一屏价格...")
d.swipe_ext("up", scale=0.8)
await asyncio.sleep(1.2)
price_page_count += 1
logger.info(f"已截取价格详情页第 {price_page_count + 1} 页: {price_screen_path}启动后台异步分析")
# 使用 asyncio.create_task 立即在后台开始执行分析
task = asyncio.create_task(self.read_image_kit.analyze_detail_price(price_screen_path))
screenshot_tasks.append(task)
# 向上滚动列表(手指向上划),看后面的时段
logger.info("向上滚动列表,准备截取下一屏价格...")
d.swipe_ext("up", scale=0.8)
await asyncio.sleep(1.0)
price_page_count += 1
# 异步等待所有图片识别任务完成
if screenshot_tasks:
logger.info(f"正在异步分析 {len(screenshot_tasks)} 张价格截图...")
results = await asyncio.gather(*screenshot_tasks)
# 等待所有后台分析任务完成
if screenshot_tasks:
logger.info(f"UI 操作已完成,等待 {len(screenshot_tasks)} 个后台分析任务结束...")
results = await asyncio.gather(*screenshot_tasks, return_exceptions=True)
for res in results:
if isinstance(res, Exception):
logger.error(f"后台分析任务出错: {res}")
continue
if res:
# 深度去重:根据时段 (start, end) 合并
for p in res:
is_duplicate = False
for existing in all_prices:
if p.get('start') == existing.get('start') and p.get('end') == existing.get('end'):
is_duplicate = True
# 字段补全逻辑
for key in ['price', 'plus_price', 'market_price', 'elec_price', 'service_price']:
if p.get(key) is not None and (existing.get(key) is None or existing.get(key) == 0):
existing[key] = p[key]
break
if not is_duplicate:
all_prices.append(p)
except Exception as e:
logger.error(f"抓取价格详情过程中发生异常: {e}")
finally:
# 无论是否异常,都要确保清理未完成的任务,避免 "never awaited" 警告
for task in screenshot_tasks:
if not task.done():
task.cancel()
for page_prices in results:
if page_prices:
# 深度去重:根据时段 (start, end) 合并
for p in page_prices:
is_duplicate = False
for existing in all_prices:
if p.get('start') == existing.get('start') and p.get('end') == existing.get('end'):
is_duplicate = True
# 如果已有条目信息不全,则更新它(比如之前没识别出 PLUS 价格)
for key in ['price', 'plus_price', 'market_price', 'elec_price', 'service_price']:
if p.get(key) is not None and existing.get(key) is None:
existing[key] = p[key]
break
if not is_duplicate:
all_prices.append(p)
# 清理所有临时截图
for path in temp_screenshots:
if os.path.exists(path):
try:
os.remove(path)
except:
pass
# 清理所有临时截图
for path in temp_screenshots:
if os.path.exists(path):
try:
os.remove(path)
except:
pass
# 5. 保存数据
if all_prices:

View File

@@ -20,6 +20,34 @@ class ReadImageKit:
def __init__(self):
self.vlm = VLMKit()
async def find_price_entrance_vlm(self, image_path):
"""
使用 VLM 在详情页寻找价格入口1.1556元/度 的卡片或价格信息按钮)
"""
prompt = """
分析这张充电站详情页截图,找到进入“分时电价详情”的点击入口。
入口特征:
1. 包含价格数字的卡片,例如 "1.1556元/度"
2. 或者标有 "价格信息""电价详情" 字样的按钮。
请判断该入口是否存在,并给出其中心坐标。
输出格式为 JSON
{
"found": true/false,
"reason": "为什么认为这是入口",
"point": [x, y], // 归一化坐标 [0-1000],例如 [500, 600] 代表屏幕中心偏下
"type": "price_card" / "button"
}
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)
json_str = self.vlm.extract_json(res_text)
data = json.loads(json_str)
return data
except Exception as e:
logger.error(f"VLM 寻找价格入口失败: {e}")
return {"found": False}
async def analyze_detail_price(self, image_path):
"""
分析详情页截图提取电价信息包括优惠价、PLUS价和挂牌价