This commit is contained in:
HuangHai
2026-01-18 10:07:17 +08:00
parent bea73da605
commit 02a230bafc
7 changed files with 240 additions and 227 deletions

View File

@@ -1,7 +1,7 @@
# 采集配置
SCROLL_DISTANCE_RATIO = 0.5
MAX_STATIONS_COUNT = 20
MAX_STATIONS_COUNT = 1
FIRST_RUN_ONLY_ONE_STATION = True
REDIS_STATION_EXPIRE = 120
DATA_RETENTION_DAYS = 365

View File

@@ -18,6 +18,7 @@ import uiautomator2 as u2
from Core.BaseCrawler import BaseCrawler
from Apps.XinDianTu import Kit
from Apps.XinDianTu.Kit import take_screenshot
from Apps.XinDianTu.FirstPageKit import run_ocr_rect
from Util.ObsUtil import ObsUploader
from Util.RedisKit import RedisKit
import cv2
@@ -175,248 +176,189 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
logger.info(f"列表页截图已完成: {screenshot_path}")
# 2. 执行本地图形学分析,生成 _vl.jpg 和 .json
logger.info("正在执行图形学切片分析...")
Kit.crop_cards_from_image(screenshot_path)
json_path = screenshot_path.replace(".jpg", ".json")
vl_img_path = screenshot_path.replace(".jpg", "_vl.jpg")
logger.info("正在使用 OCR+LLM 分析场站列表...")
try:
stations_page = await run_ocr_rect(screenshot_path)
except Exception as e:
logger.error(f"OCR+LLM 识别列表页失败: {e}")
stations_page = []
if not os.path.exists(json_path) or not os.path.exists(vl_img_path):
logger.warning("未识别到任何卡片,跳过当前页")
if not stations_page:
logger.warning("OCR+LLM 未识别到任何场站,跳过当前页")
continue
with open(json_path, 'r', encoding='utf-8') as f:
json_metadata = json.load(f)
# 3. 【优化】后台异步上传 _vl.jpg 供以后参考,不再等待上传
vl_object_key = f"{OBS_TMP_PREFIX}/{image_uuid}_vl.jpg"
loop = asyncio.get_running_loop()
loop.run_in_executor(None, uploader.upload_file, vl_object_key, vl_img_path)
# 4. 【优化】直接使用本地路径调用 VL 模型识别,避免等待上传
logger.info("正在调用 VL 模型识别场站信息...")
stations = await service.process_station_list_vl(vl_img_path, json_metadata, device_info=device_info, max_count=max_stations_count - total_new_processed_count)
logger.info(f"本页识别到 {len(stations)} 个场站")
# 5. 匹配几何卡片与 VL 识别结果 (XinDianTu 的 parse_vl_image 已经按顺序返回了)
# 这里的 stations 列表长度应该与 json_metadata["cards"] 对应
new_stations_processed_in_page = 0
if json_metadata.get("cards") and stations:
for idx, card in enumerate(json_metadata["cards"]):
# 首屏策略:只处理第一个场站,其余留待滚动后在安全窗口内处理
if FIRST_RUN_ONLY_ONE_STATION and scroll_count == 1 and idx > 0:
logger.info("首屏仅处理第一个场站,跳过当前卡片。")
continue
for idx, st in enumerate(stations_page):
station_name = st.get("station_name") or st.get("name")
if not station_name:
continue
# 检查是否已达到最大采集数量(按新采集的场站数量限制)
if total_new_processed_count >= max_stations_count:
break
if FIRST_RUN_ONLY_ONE_STATION and scroll_count == 1 and idx > 0:
logger.info("首屏仅处理第一个场站,跳过当前卡片。")
continue
# 检查索引是否越界 (VL 模型可能返回的数组长度不一致)
if idx < len(stations) and stations[idx]:
st = stations[idx]
station_name = st.get("station_name")
if not station_name: continue
if total_new_processed_count >= max_stations_count:
break
# 只要是有效的场站,就计入已遇到数量
total_encountered_count += 1
total_encountered_count += 1
# 检查 Redis 去重
if await is_already_crawled(redis_kit, station_name):
logger.info(f"场站 {station_name} 匹配到已处理记录,跳过。({total_encountered_count}/{max_stations_count})")
if await is_already_crawled(redis_kit, station_name):
logger.info(f"场站 {station_name} 匹配到已处理记录,跳过。({total_encountered_count}/{max_stations_count})")
continue
rect = st.get("rect") or []
if not isinstance(rect, (list, tuple)) or len(rect) < 4:
logger.warning(f"场站 '{station_name}' 缺少有效矩形信息,跳过。")
continue
x1, y1, x2, y2 = rect[0], rect[1], rect[2], rect[3]
card_bottom = float(y2) / float(h) if h else 1.0
if card_bottom > ad_top_y_norm:
logger.warning(f"场站 '{station_name}' 被遮挡或太靠近底部 (Bottom {card_bottom:.2f} > {ad_top_y_norm}),留待下次滚动处理。")
total_encountered_count -= 1
continue
click_point = st.get("click_point") or st.get("click") or []
click_x = None
click_y = None
if isinstance(click_point, (list, tuple)) and len(click_point) >= 2:
click_x, click_y = click_point[0], click_point[1]
if click_x is None or click_y is None:
click_x = int((x1 + x2) / 2)
click_y = int((y1 + y2) / 2)
if click_x is None or click_y is None:
logger.warning(f"场站 '{station_name}' 缺少有效点击坐标,跳过。")
continue
busy_list = st.get("busy_list") or []
piles = []
if isinstance(busy_list, list):
for bi in busy_list:
if not isinstance(bi, dict):
continue
# 【优化】检查是否被遮挡或太靠近底部
# card["bounds_norm"] 是 {left, top, right, bottom}
card_bottom = card["bounds_norm"]["bottom"]
if card_bottom > ad_top_y_norm:
logger.warning(f"场站 '{station_name}' 被遮挡或太靠近底部 (Bottom {card_bottom:.2f} > {ad_top_y_norm}),留待下次滚动处理。")
# 还没处理,不能算作已遇到(因为下次还会遇到)
total_encountered_count -= 1
continue
# 正常处理新场站
current_idx = total_new_processed_count + 1
remaining = max_stations_count - current_idx
logger.info(f"--- [进度: {current_idx}/{max_stations_count}, 剩余: {remaining}] 发现新场站 '{station_name}',开始处理... ---")
new_stations_processed_in_page += 1
total_new_processed_count += 1
click_x = st.get("uia_center_x")
click_y = st.get("uia_center_y")
if click_x is None or click_y is None:
cp = card.get("click_point") or (None, None)
if isinstance(cp, (list, tuple)) and len(cp) >= 2:
click_x, click_y = cp[0], cp[1]
if click_x is None or click_y is None:
logger.warning(f"场站 '{station_name}' 缺少有效点击坐标,跳过。")
continue
logger.info(f"准备处理场站: {station_name}, 点击坐标: ({click_x}, {click_y})")
d.click(int(click_x), int(click_y))
# 等待二级页面加载
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
should_back_to_list = True
# 截取二级页面图
detail_uuid = str(uuid.uuid4())
detail_path = take_screenshot(d, detail_uuid, save_dir=TEMP_IMAGE_DIR)
# 【新增】二级页面广告检测 (如:免费停车提示)
# 【优化】不再在详情页检查广告,节省时间和 Token
# detail_ad_res = await ReadImageKit.detect_ad_popup(detail_path, device_info=device_info)
# if detail_ad_res:
# ... (代码已移除)
# 【优化】后台解析详情页地址,直接传本地路径,避免等待上传
logger.info(f"已启动后台分析详情页: {station_name}")
task_addr = asyncio.create_task(service.process_station_address(station_name, detail_path, device_info=device_info))
background_tasks.append(task_addr)
# 【优化】后台异步上传详情页截图供以后参考
detail_object_key = f"{OBS_TMP_PREFIX}/{detail_uuid}.jpg"
loop = asyncio.get_running_loop()
loop.run_in_executor(None, uploader.upload_file, detail_object_key, detail_path)
# --- 抓取价格时段信息 ---
# 【优化】使用 OCR 识别“全部时段”按钮,替代之前的模板匹配 (qbsd.jpg)
# 记录点击前的页面特征,用于验证是否成功进入三级页面
before_click_md5 = Kit.get_image_content_md5(detail_path)
entered_price_page = False
click_pos = None
mode = bi.get("mode") or bi.get("type") or "未知"
idle = bi.get("idle")
total = bi.get("total")
try:
# 1. 尝试 OCR 识别
logger.info("正在使用 OCR 识别 '全部时段' 按钮...")
ocr_res = await ReadImageKit.find_price_entrance_ocr(detail_path)
if ocr_res.get("found"):
p = ocr_res["point"]
# 归一化坐标转像素坐标
w, h = d.window_size()
click_pos = (int(p[0] * w / 1000), int(p[1] * h / 1000))
logger.info(f"OCR 成功找到 '全部时段' 按钮: {click_pos}")
# 2. 如果 OCR 未找到,则降级到 VL
if not click_pos:
logger.info("OCR 未找到,降级使用 VL 识别...")
res = await ReadImageKit.find_all_time_button_coordinate(detail_path, device_info=device_info)
if res.get("uia_center_x"):
click_pos = (res.get("uia_center_x"), res.get("uia_center_y"))
logger.info(f"VL 识别成功找到 '全部时段' 按钮: {click_pos}")
idle_val = int(idle) if idle is not None else 0
except Exception:
idle_val = 0
try:
total_val = int(total) if total is not None else 0
except Exception:
total_val = 0
piles.append(
{
"type": mode,
"total": total_val,
"free": idle_val,
}
)
if click_pos:
d.click(click_pos[0], click_pos[1])
await asyncio.sleep(0.5)
# 检查页面是否真的变化了
after_click_path = take_screenshot(d, f"check_{detail_uuid}", save_dir=TEMP_IMAGE_DIR)
after_click_md5 = Kit.get_image_content_md5(after_click_path)
if before_click_md5 != after_click_md5:
entered_price_page = True
price_screenshots = []
if piles:
try:
await service.record_station_status(
{
"station_name": station_name,
"piles": piles,
}
)
except Exception as e:
logger.error(f"记录列表页状态失败 ({station_name}): {e}")
w, h = d.window_size()
scroll_x = int(w * 0.5)
scroll_top_y = int(h * 0.6)
scroll_bottom_y = int(h * 0.85)
logger.info("从当前时段开始向下浏览价格列表并逐页截图...")
max_scroll_up = 10
for p_idx in range(1, max_scroll_up + 1):
p_uuid = f"{hashlib.md5(station_name.encode('utf-8')).hexdigest()}_p_{p_idx}"
p_path = take_screenshot(d, p_uuid, save_dir=TEMP_IMAGE_DIR)
price_screenshots.append(p_path)
before_up_md5 = Kit.get_image_content_md5(p_path)
d.swipe(scroll_x, scroll_bottom_y, scroll_x, scroll_top_y, 0.2)
await asyncio.sleep(0.3)
check_up_path = take_screenshot(d, f"check_up_{p_idx}", save_dir=TEMP_IMAGE_DIR)
after_up_md5 = Kit.get_image_content_md5(check_up_path)
if os.path.exists(check_up_path): os.remove(check_up_path)
if before_up_md5 == after_up_md5:
logger.info(f"价格列表已到达顶部 (共抓取页数: {p_idx})")
break
# 【异步优化】后台处理价格
logger.info(f"已启动后台分析价格页: {station_name}")
task_price = asyncio.create_task(analyze_prices_background(service, station_name, price_screenshots, device_info=device_info))
background_tasks.append(task_price)
current_idx = total_new_processed_count + 1
remaining = max_stations_count - current_idx
logger.info(f"--- [进度: {current_idx}/{max_stations_count}, 剩余: {remaining}] 发现新场站 '{station_name}',开始处理... ---")
new_stations_processed_in_page += 1
total_new_processed_count += 1
# 从三级页面返回二级页面
d.press("back")
await asyncio.sleep(0.5)
logger.info(f"准备处理场站: {station_name}, 点击坐标: ({click_x}, {click_y})")
# 检查返回后的页面状态,防止多重返回导致退出到搜索页
check_back_path = take_screenshot(d, f"check_back_{detail_uuid}", save_dir=TEMP_IMAGE_DIR)
# 1. 与列表页对比 (使用列表页的排除比例)
check_list_md5 = Kit.get_image_content_md5(
check_back_path,
top_ratio=SAFE_EXCLUDE_RATIO,
bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO
)
# 2. 与详情页对比 (使用默认排除比例,与 before_click_md5 一致)
check_detail_md5 = Kit.get_image_content_md5(check_back_path)
if check_list_md5 == current_md5:
logger.info("检测到已直接返回列表页,跳过后续返回操作。")
should_back_to_list = False
elif check_detail_md5 == before_click_md5:
logger.info("检测到返回详情页,需执行第二次返回。")
should_back_to_list = True
else:
logger.warning(f"返回后状态未知 (ListMD5:{check_list_md5[:8]} vs {current_md5[:8]}, DetailMD5:{check_detail_md5[:8]} vs {before_click_md5[:8]})。")
logger.info("再次尝试识别按钮以确认当前页面状态...")
# 优先尝试 OCR 确认
ocr_check = await ReadImageKit.find_price_entrance_ocr(check_back_path)
if ocr_check.get("found"):
logger.info("OCR 确认当前仍处于详情页,执行返回。")
should_back_to_list = True
else:
# OCR 未发现,则用 VL 兜底确认
check_res = await ReadImageKit.find_all_time_button_coordinate(check_back_path, device_info=device_info)
if check_res and check_res.get("uia_center_x"):
logger.info("VL 确认当前仍处于详情页,执行返回。")
should_back_to_list = True
else:
logger.info("无法确认当前页面状态,判定已回到列表页(或非详情页),停止回退以防误退。")
should_back_to_list = False
# 使用完毕后再清理检查用的截图
if os.path.exists(check_back_path): os.remove(check_back_path)
d.click(int(click_x), int(click_y))
else:
logger.warning(f"点击 '{station_name}''全部时段' 按钮后页面无明显变化,跳过三级页面处理。")
# 清理检查用的截图
if os.path.exists(after_click_path): os.remove(after_click_path)
else:
logger.info(f"场站 {station_name} 经过模板匹配与 VL 识别均未发现 '全部时段' 按钮。")
except Exception as e:
logger.error(f"处理 '全部时段' 按钮点击流程异常: {e}")
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
should_back_to_list = True
# 从二级页面返回 (仅当确实需要返回时)
if should_back_to_list:
d.press("back")
logger.info(f"等待 {WAIT_BACK_TO_LIST + 1} 秒返回列表...")
await asyncio.sleep(WAIT_BACK_TO_LIST + 1)
detail_uuid = str(uuid.uuid4())
detail_path = take_screenshot(d, detail_uuid, save_dir=TEMP_IMAGE_DIR)
# 记录 Redis 去重 (仅按名称去重)
cleaned = Kit.clean_station_name(station_name)
await redis_kit.set_data(f"crawled:xdt:{cleaned}", "1", expire=REDIS_STATION_EXPIRE)
logger.info(f"已启动后台分析详情页: {station_name}")
task_addr = asyncio.create_task(service.process_station_address(station_name, detail_path, device_info=device_info))
background_tasks.append(task_addr)
detail_object_key = f"{OBS_TMP_PREFIX}/{detail_uuid}.jpg"
loop = asyncio.get_running_loop()
loop.run_in_executor(None, uploader.upload_file, detail_object_key, detail_path)
before_click_md5 = Kit.get_image_content_md5(detail_path)
click_pos = None
try:
logger.info("正在使用 OCR 识别 '全部时段' 按钮...")
ocr_res = await ReadImageKit.find_price_entrance_ocr(detail_path)
if ocr_res.get("found"):
p = ocr_res["point"]
w, h = d.window_size()
click_pos = (int(p[0] * w / 1000), int(p[1] * h / 1000))
logger.info(f"OCR 成功找到 '全部时段' 按钮: {click_pos}")
if not click_pos:
logger.info("OCR 未找到,降级使用 VL 识别...")
res = await ReadImageKit.find_all_time_button_coordinate(detail_path, device_info=device_info)
if res.get("uia_center_x"):
click_pos = (res.get("uia_center_x"), res.get("uia_center_y"))
logger.info(f"VL 识别成功找到 '全部时段' 按钮: {click_pos}")
if click_pos:
d.click(click_pos[0], click_pos[1])
await asyncio.sleep(0.5)
w, h = d.window_size()
scroll_x = int(w * 0.5)
scroll_top_y = int(h * 0.6)
scroll_bottom_y = int(h * 0.85)
logger.info("从当前时段开始向下浏览价格列表并逐页截图...")
max_scroll_up = 10
price_screenshots = []
for p_idx in range(1, max_scroll_up + 1):
p_uuid = f"{hashlib.md5(station_name.encode('utf-8')).hexdigest()}_p_{p_idx}"
p_path = take_screenshot(d, p_uuid, save_dir=TEMP_IMAGE_DIR)
price_screenshots.append(p_path)
before_up_md5 = Kit.get_image_content_md5(p_path)
d.swipe(scroll_x, scroll_bottom_y, scroll_x, scroll_top_y, 0.2)
await asyncio.sleep(0.3)
check_up_path = take_screenshot(d, f"check_up_{p_idx}", save_dir=TEMP_IMAGE_DIR)
after_up_md5 = Kit.get_image_content_md5(check_up_path)
if os.path.exists(check_up_path):
os.remove(check_up_path)
if before_up_md5 == after_up_md5:
logger.info(f"价格列表已到达顶部 (共抓取页数: {p_idx})")
break
logger.info(f"已启动后台分析价格页: {station_name}")
task_price = asyncio.create_task(
analyze_prices_background(service, station_name, price_screenshots, device_info=device_info)
)
background_tasks.append(task_price)
else:
logger.info(f"场站 {station_name} 经过模板匹配与 VL 识别均未发现 '全部时段' 按钮。")
except Exception as e:
logger.error(f"处理 '全部时段' 按钮点击流程异常: {e}")
if should_back_to_list:
d.press("back")
logger.info(f"等待 {WAIT_BACK_TO_LIST + 1} 秒返回列表...")
await asyncio.sleep(WAIT_BACK_TO_LIST + 1)
cleaned = Kit.clean_station_name(station_name)
await redis_kit.set_data(f"crawled:xdt:{cleaned}", "1", expire=REDIS_STATION_EXPIRE)
# 如果内层循环已达上限,外层循环也应退出,避免不必要的滑动
if total_encountered_count >= max_stations_count:
@@ -429,7 +371,7 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
logger.error(f"后台任务异常: {t.exception()}")
background_tasks.remove(t)
if stations and new_stations_processed_in_page == 0:
if stations_page and new_stations_processed_in_page == 0:
no_new_data_count += 1
logger.info(f"本页所有场站均已处理过,连续 {no_new_data_count} 页无新数据。")
# 【优化】由于滑动步长已调大 (0.5),连续多页重复的可能性降低

View File

@@ -521,6 +521,8 @@ async def run_ocr_rect(image_path, log_path=None):
f.write(line + "\n")
log_detail(f"已写入详细日志到: {final_log_path}")
return results
async def run_batch_in_dir(base_dir, log_path=None):
files = sorted(os.listdir(base_dir))