This commit is contained in:
HuangHai
2026-01-14 13:27:06 +08:00
parent 9e610b1906
commit a83375e295
3 changed files with 70 additions and 81 deletions

View File

@@ -3,6 +3,7 @@
# 采集配置
SCROLL_DISTANCE_RATIO = 0.5
MAX_STATIONS_COUNT = 100
FIRST_RUN_ONLY_ONE_STATION = True
# 调试绘图配置
DRAW_DEBUG_BOXES = True

View File

@@ -14,7 +14,7 @@ from Apps.TeLaiDian.Service import TeLaiDianService
from Apps.TeLaiDian.Config.Setting import (
SCROLL_DISTANCE_RATIO, WAIT_AFTER_SCROLL, MAX_STATIONS_COUNT,
SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO, WAIT_DETAIL_PAGE_LOAD,
WAIT_BACK_TO_LIST, DETAIL_SCROLL_DISTANCE_RATIO
WAIT_BACK_TO_LIST, DETAIL_SCROLL_DISTANCE_RATIO, FIRST_RUN_ONLY_ONE_STATION
)
from Core.BaseCrawler import BaseCrawler
import uiautomator2 as u2
@@ -170,6 +170,11 @@ class TeLaiDianCrawler(BaseCrawler):
# 返回列表后也检查一下是否有新广告弹出
await self.clear_ads(d, max_rounds=1)
processed_count += 1
if FIRST_RUN_ONLY_ONE_STATION:
logger.info("已完成首个场站的全流程采集,根据配置退出爬取任务。")
if os.path.exists(screenshot_path):
os.remove(screenshot_path)
return
# 滑动到下一页
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
@@ -191,96 +196,80 @@ class TeLaiDianCrawler(BaseCrawler):
"""
在详情页提取价格和状态信息
"""
# 1. 识别第一屏的基础信息 (名称、精确地址)
# 1. 截取第一屏的基础信息图(名称、精确地址),并异步识别
first_screen_path = take_screenshot(d, f"tld_detail_basic_{int(time.time())}.jpg")
basic_info = await self.read_image_kit.analyze_detail_basic_info(first_screen_path)
station_name = basic_info.get("name") or station_info.get("name")
address = basic_info.get("address") or station_info.get("address")
logger.info(f"详情页基础信息识别完成: {station_name} | {address}")
station_name = station_info.get("name")
address = station_info.get("address")
logger.info(f"已截取详情页首屏截图,启动异步基础信息识别任务: {station_name} | {address}")
# 2. 小步快跑寻找价格入口 (结合 CV 和 VLM)
found_entrance = False
async def analyze_basic_info_background(image_path, fallback_name, fallback_address):
try:
basic_info = await self.read_image_kit.analyze_detail_basic_info(image_path)
name2 = basic_info.get("name") or fallback_name
addr2 = basic_info.get("address") or fallback_address
logger.info(f"[异步] 详情页基础信息识别完成: {name2} | {addr2}")
except Exception as ex:
logger.error(f"[异步] 分析详情页基础信息失败: {ex}")
finally:
if os.path.exists(image_path):
try:
os.remove(image_path)
except:
pass
asyncio.create_task(analyze_basic_info_background(first_screen_path, station_name, address))
# 2. 一次向上滑动,确保当前底部文字完全顶到上沿之外
w, h = d.window_size()
logger.info("执行一次向上滑动,将当前底部文字完全推至屏幕上沿之外...")
d.swipe(w * 0.5, h * 0.75, w * 0.5, h * 0.25, 0.8)
await asyncio.sleep(1.5)
# 3. 使用模板 jgxx.jpg 匹配“价格信息 当前价”入口并点击
template_jgxx = os.path.join(os.path.dirname(__file__), "Template", "jgxx.jpg")
entrance_point = None
max_search_steps = 4
logger.info(f"开始“小步快跑”策略寻找价格入口,最多尝试 {max_search_steps} 次小幅度滑动...")
for step in range(max_search_steps):
current_screen = take_screenshot(d, f"tld_search_price_step_{step}.jpg")
logger.info(f"--- 寻找入口 第 {step+1} 步 ---")
# 优先使用 CV 快速识别橘红色价格 P0
logger.info("尝试 CV 识别价格卡片 (P0)...")
cv_point = detect_price_click_point_cv(current_screen)
if cv_point:
logger.info(f"✅ CV 在第 {step+1} 步成功定位入口: {cv_point}")
entrance_point = cv_point
found_entrance = True
else:
# CV 没找到,使用 VLM 进行深度语义检查
logger.info("CV 未找到,启动 VLM 深度语义识别...")
vlm_res = await self.read_image_kit.find_price_entrance_vlm(current_screen)
if vlm_res.get("found"):
norm_point = vlm_res.get("point") # [x, y] in 0-1000
if norm_point and len(norm_point) == 2:
w, h = d.window_size()
entrance_point = [int(norm_point[0] * w / 1000), int(norm_point[1] * h / 1000)]
logger.info(f"✅ VLM 在第 {step+1} 步成功定位入口: {entrance_point} ({vlm_res.get('reason')})")
found_entrance = True
else:
logger.info(f"{step+1} 步未发现入口: {vlm_res.get('reason', '未知原因')}")
# 如果找到入口,进行标注并点击
if found_entrance and entrance_point:
debug_flag_path = current_screen.replace(".jpg", "_entrance_found.jpg")
img_debug = read_image(current_screen)
if img_debug is not None:
cv2.circle(img_debug, (entrance_point[0], entrance_point[1]), 25, (0, 255, 0), 5) # 绿色大圆圈
save_image(debug_flag_path, img_debug)
logger.info(f"入口位置标注图已保存: {debug_flag_path}")
logger.info(f"正在点击价格入口: {entrance_point}")
d.click(entrance_point[0], entrance_point[1])
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
# 清理临时截图
if os.path.exists(current_screen): os.remove(current_screen)
break
# 没找到,小步向上滚动
if step < max_search_steps - 1:
scroll_scale = 0.35
logger.info(f"未发现入口,执行小幅度向上滑动 (scale={scroll_scale})...")
d.swipe_ext("up", scale=scroll_scale)
await asyncio.sleep(1.2)
# 清理临时截图
if os.path.exists(current_screen): os.remove(current_screen)
if os.path.exists(template_jgxx):
logger.info(f"使用模板匹配价格入口: {template_jgxx}")
try:
match_res = d.image.match(template_jgxx)
except Exception as e:
match_res = None
logger.error(f"模板匹配价格入口失败: {e}")
if not found_entrance:
logger.warning("“小步快跑”策略未能找到价格入口,尝试坐标兜底...")
w, h = d.window_size()
if match_res:
if hasattr(match_res, "point") and match_res.point:
entrance_point = match_res.point
elif isinstance(match_res, dict):
if "point" in match_res and match_res["point"]:
entrance_point = match_res["point"]
elif "x" in match_res and "y" in match_res:
entrance_point = (match_res["x"], match_res["y"])
elif isinstance(match_res, (list, tuple)) and len(match_res) >= 2:
entrance_point = (match_res[0], match_res[1])
if entrance_point:
logger.info(f"通过 jgxx.jpg 成功找到价格入口,点击坐标: {entrance_point}")
d.click(int(entrance_point[0]), int(entrance_point[1]))
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
else:
logger.warning("未能通过 jgxx.jpg 找到价格入口,使用坐标兜底点击页面中部偏下位置。")
d.click(w // 2, int(h * 0.45))
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
# 3. 进入分时电价页面后的处理
# 4. 进入分时电价页面后的处理
try:
# 1. 延长等待时间,等待小程序自动定位到当前时段的滚动完成
logger.info("已点击进入价格详情,等待小程序自动滚动定位完成 (4秒)...")
await asyncio.sleep(4.0)
# 2. 回到 00:00 原点:要看到上面的内容,需要“向下拉动”页面(即向上滚动列表)
logger.info("执行向下拉动,尝试回到 00:00 时段顶部...")
for i in range(3):
# swipe_ext("down") 是手指从上往下划,动作是“向下”,结果是页面“向上”滚动
d.swipe_ext("down", scale=0.8)
await asyncio.sleep(0.5)
logger.info("执行两次大幅向下拉动,确保回到 00:00 时段顶部...")
for _ in range(2):
d.swipe_ext("down", scale=0.9)
await asyncio.sleep(1.0)
except Exception as e:
logger.error(f"处理分时电价页面初始状态失败: {e}")
# 4. 循环滑动抓取完整分时电价
# 5. 循环滑动抓取完整分时电价
all_prices = []
last_price_md5 = None
price_page_count = 0
@@ -353,7 +342,7 @@ class TeLaiDianCrawler(BaseCrawler):
except:
pass
# 5. 保存数据
# 6. 保存数据
if all_prices:
station_name_clean = clean_station_name(station_name)
# 对价格按时间排序
@@ -365,10 +354,9 @@ class TeLaiDianCrawler(BaseCrawler):
await self.service.save_station_data(station_name_clean, address, all_prices)
else:
logger.warning(f"❌ 未能提取到任何价格信息,请检查页面识别逻辑")
# 清理临时截图
for p in [first_screen_path, price_screen_path]:
if os.path.exists(p): os.remove(p)
# 清理价格页临时截图(详情页首屏截图已在异步任务中清理)
if os.path.exists(price_screen_path):
os.remove(price_screen_path)
async def crawl_list(self):
"""