# coding=utf-8 import asyncio import logging import uuid import os import sys import json import time import hashlib from PIL import Image # 将项目根目录添加到 sys.path project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) if project_root not in sys.path: sys.path.append(project_root) import uiautomator2 as u2 from Core.BaseCrawler import BaseCrawler from Apps.XinDianTu import Kit from Apps.XinDianTu.Kit import take_screenshot from Util.ObsUtil import ObsUploader from Util.RedisKit import RedisKit import cv2 from Apps.XinDianTu.Service import XinDianTuService from Apps.XinDianTu.ReadImageKit import ReadImageKit from Config.Config import ( OBS_TMP_PREFIX, CDN_DOMAIN, TEMP_IMAGE_DIR ) from Apps.XinDianTu.Config.Setting import ( SCROLL_DISTANCE_RATIO, MAX_STATIONS_COUNT, REDIS_STATION_EXPIRE, WAIT_DETAIL_PAGE_LOAD, WAIT_BACK_TO_LIST, WAIT_AFTER_SCROLL, SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO, FIRST_RUN_ONLY_ONE_STATION, ) # --- 用户配置区域 --- # 是否保留截图文件(True=保留备查,False=随用随删) KEEP_SCREENSHOTS = True # [Testing] 是否在启动时清除 Redis 中的场站处理记录 # True: 每次运行前清除记录,方便反复测试同一个场站 # False: 生产模式,保留记录以避免重复爬取 TEST_CLEAR_REDIS = True # 配置说明: # SCROLL_DISTANCE_RATIO 控制翻页时的滑动距离(在 Config.py 中修改)。 # 对于华为 Mate 20x 等分辨率较低或屏幕较小的手机,如果发现翻页时跳过了某些场站, # 请尝试减小 SCROLL_DISTANCE_RATIO(例如设置为 0.4 或 0.3)。 # 这样每次滑动的距离变短,可以确保所有场站都能被完整显示并识别。 from Apps.XinDianTu.Kit import setup_logger logger = setup_logger("StationList") class XinDianTuCrawler(BaseCrawler): """ 新电途小程序爬虫实现 """ def __init__(self, service=None): super().__init__(service) # 初始化配置参数 self.max_stations_count = MAX_STATIONS_COUNT self.uploader = ObsUploader() self.redis_kit = RedisKit() async def start(self): """ 实现 BaseCrawler 的启动入口 """ # 兼容旧逻辑,直接调用 main await main(self.service) async def open_app(self): # 实际逻辑在 Opener.py,此处可作为封装层 pass async def crawl_list(self): # 实际逻辑在 get_station_list,此处可作为封装层 pass async def crawl_detail(self, station_info): pass async def is_already_crawled(redis_kit, station_name): """ 检查场站是否已经爬取过,仅按名称去重 """ cleaned_name = Kit.clean_station_name(station_name) if not cleaned_name: return False redis_key = f"crawled:xdt:{cleaned_name}" if await redis_kit.get_data(redis_key): return True return False async def analyze_prices_background(service, station_name, image_paths, device_info=None): """ 后台异步处理价格图片分析 """ try: # 使用多图解析并去重 all_rows = await ReadImageKit.parse_price_schedule_multi(station_name, image_paths, device_info=device_info) if all_rows: await service.process_price_detail_data(station_name, all_rows) logger.info(f"后台处理完成: {station_name} 的分时价格已保存。") else: logger.warning(f"后台处理失败: {station_name} 未能解析出价格时段。") # 处理完成后清理图片 for path in image_paths: try: if os.path.exists(path): os.remove(path) except: pass except Exception as e: logger.error(f"分析价格后台任务异常 ({station_name}): {e}") return True async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS_COUNT): """ 获取场站列表并处理翻页 (异步优化版) """ redis_kit = RedisKit() window_size = d.window_size() w, h = window_size[0], window_size[1] device_info = d.info device_info['width'] = w device_info['height'] = h logger.info(f"开始爬取列表,设备: {device_info.get('productName')} | 分辨率: {w}x{h} | 目标数量: {max_stations_count}") background_tasks = [] last_list_md5 = None no_new_data_count = 0 total_encountered_count = 0 total_new_processed_count = 0 scroll_count = 0 while total_new_processed_count < max_stations_count: scroll_count += 1 logger.info(f"正在处理第 {scroll_count} 次滚动 (已遇到: {total_encountered_count}, 新采集: {total_new_processed_count}/{max_stations_count})...") # 1. 拍摄截图 image_uuid = str(uuid.uuid4()) screenshot_path = take_screenshot(d, image_uuid, save_dir=TEMP_IMAGE_DIR) ad_top_y_norm = 0.78 # 默认的点击边界 (0.78) if scroll_count > 1: ad_top_y_norm = 1.0 logger.info(f"当前列表页广告安全下边界: {ad_top_y_norm:.2f} (scroll_count={scroll_count})") # 检查是否已经滚动到底部 (排除状态栏后,内容与上次一致) current_md5 = Kit.get_image_content_md5( screenshot_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO ) if last_list_md5 and current_md5 == last_list_md5: logger.info("检测到列表核心内容无变化,已到达底部或加载完成,提前结束。") if os.path.exists(screenshot_path): os.remove(screenshot_path) break last_list_md5 = current_md5 logger.info(f"列表页截图已完成: {screenshot_path}") # 2. 执行本地图形学分析,生成 _vl.jpg 和 .json logger.info("正在执行图形学切片分析...") Kit.crop_cards_from_image(screenshot_path) json_path = screenshot_path.replace(".jpg", ".json") vl_img_path = screenshot_path.replace(".jpg", "_vl.jpg") if not os.path.exists(json_path) or not os.path.exists(vl_img_path): logger.warning("未识别到任何卡片,跳过当前页") continue with open(json_path, 'r', encoding='utf-8') as f: json_metadata = json.load(f) # 3. 【优化】后台异步上传 _vl.jpg 供以后参考,不再等待上传 vl_object_key = f"{OBS_TMP_PREFIX}/{image_uuid}_vl.jpg" loop = asyncio.get_running_loop() loop.run_in_executor(None, uploader.upload_file, vl_object_key, vl_img_path) # 4. 【优化】直接使用本地路径调用 VL 模型识别,避免等待上传 logger.info("正在调用 VL 模型识别场站信息...") stations = await service.process_station_list_vl(vl_img_path, json_metadata, device_info=device_info, max_count=max_stations_count - total_new_processed_count) logger.info(f"本页识别到 {len(stations)} 个场站") # 5. 匹配几何卡片与 VL 识别结果 (XinDianTu 的 parse_vl_image 已经按顺序返回了) # 这里的 stations 列表长度应该与 json_metadata["cards"] 对应 new_stations_processed_in_page = 0 if json_metadata.get("cards") and stations: for idx, card in enumerate(json_metadata["cards"]): # 首屏策略:只处理第一个场站,其余留待滚动后在安全窗口内处理 if FIRST_RUN_ONLY_ONE_STATION and scroll_count == 1 and idx > 0: logger.info("首屏仅处理第一个场站,跳过当前卡片。") continue # 检查是否已达到最大采集数量(按新采集的场站数量限制) if total_new_processed_count >= max_stations_count: break # 检查索引是否越界 (VL 模型可能返回的数组长度不一致) if idx < len(stations) and stations[idx]: st = stations[idx] station_name = st.get("station_name") if not station_name: continue # 只要是有效的场站,就计入已遇到数量 total_encountered_count += 1 # 检查 Redis 去重 if await is_already_crawled(redis_kit, station_name): logger.info(f"场站 {station_name} 匹配到已处理记录,跳过。({total_encountered_count}/{max_stations_count})") continue # 【优化】检查是否被遮挡或太靠近底部 # card["bounds_norm"] 是 {left, top, right, bottom} card_bottom = card["bounds_norm"]["bottom"] if card_bottom > ad_top_y_norm: logger.warning(f"场站 '{station_name}' 被遮挡或太靠近底部 (Bottom {card_bottom:.2f} > {ad_top_y_norm}),留待下次滚动处理。") # 还没处理,不能算作已遇到(因为下次还会遇到) total_encountered_count -= 1 continue # 正常处理新场站 current_idx = total_new_processed_count + 1 remaining = max_stations_count - current_idx logger.info(f"--- [进度: {current_idx}/{max_stations_count}, 剩余: {remaining}] 发现新场站 '{station_name}',开始处理... ---") new_stations_processed_in_page += 1 total_new_processed_count += 1 click_x = st.get("uia_center_x") click_y = st.get("uia_center_y") if click_x is None or click_y is None: cp = card.get("click_point") or (None, None) if isinstance(cp, (list, tuple)) and len(cp) >= 2: click_x, click_y = cp[0], cp[1] if click_x is None or click_y is None: logger.warning(f"场站 '{station_name}' 缺少有效点击坐标,跳过。") continue logger.info(f"准备处理场站: {station_name}, 点击坐标: ({click_x}, {click_y})") d.click(int(click_x), int(click_y)) # 等待二级页面加载 await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD) should_back_to_list = True # 截取二级页面图 detail_uuid = str(uuid.uuid4()) detail_path = take_screenshot(d, detail_uuid, save_dir=TEMP_IMAGE_DIR) # 【新增】二级页面广告检测 (如:免费停车提示) # 【优化】不再在详情页检查广告,节省时间和 Token # detail_ad_res = await ReadImageKit.detect_ad_popup(detail_path, device_info=device_info) # if detail_ad_res: # ... (代码已移除) # 【优化】后台解析详情页地址,直接传本地路径,避免等待上传 logger.info(f"已启动后台分析详情页: {station_name}") task_addr = asyncio.create_task(service.process_station_address(station_name, detail_path, device_info=device_info)) background_tasks.append(task_addr) # 【优化】后台异步上传详情页截图供以后参考 detail_object_key = f"{OBS_TMP_PREFIX}/{detail_uuid}.jpg" loop = asyncio.get_running_loop() loop.run_in_executor(None, uploader.upload_file, detail_object_key, detail_path) # --- 抓取价格时段信息 --- # 【优化】使用 OCR 识别“全部时段”按钮,替代之前的模板匹配 (qbsd.jpg) # 记录点击前的页面特征,用于验证是否成功进入三级页面 before_click_md5 = Kit.get_image_content_md5(detail_path) entered_price_page = False click_pos = None try: # 1. 尝试 OCR 识别 logger.info("正在使用 OCR 识别 '全部时段' 按钮...") ocr_res = await ReadImageKit.find_price_entrance_ocr(detail_path) if ocr_res.get("found"): p = ocr_res["point"] # 归一化坐标转像素坐标 w, h = d.window_size() click_pos = (int(p[0] * w / 1000), int(p[1] * h / 1000)) logger.info(f"OCR 成功找到 '全部时段' 按钮: {click_pos}") # 2. 如果 OCR 未找到,则降级到 VL if not click_pos: logger.info("OCR 未找到,降级使用 VL 识别...") res = await ReadImageKit.find_all_time_button_coordinate(detail_path, device_info=device_info) if res.get("uia_center_x"): click_pos = (res.get("uia_center_x"), res.get("uia_center_y")) logger.info(f"VL 识别成功找到 '全部时段' 按钮: {click_pos}") if click_pos: d.click(click_pos[0], click_pos[1]) await asyncio.sleep(0.5) # 检查页面是否真的变化了 after_click_path = take_screenshot(d, f"check_{detail_uuid}", save_dir=TEMP_IMAGE_DIR) after_click_md5 = Kit.get_image_content_md5(after_click_path) if before_click_md5 != after_click_md5: entered_price_page = True price_screenshots = [] w, h = d.window_size() scroll_x = int(w * 0.5) scroll_top_y = int(h * 0.6) scroll_bottom_y = int(h * 0.85) logger.info("从当前时段开始向下浏览价格列表并逐页截图...") max_scroll_up = 10 for p_idx in range(1, max_scroll_up + 1): p_uuid = f"{hashlib.md5(station_name.encode('utf-8')).hexdigest()}_p_{p_idx}" p_path = take_screenshot(d, p_uuid, save_dir=TEMP_IMAGE_DIR) price_screenshots.append(p_path) before_up_md5 = Kit.get_image_content_md5(p_path) d.swipe(scroll_x, scroll_bottom_y, scroll_x, scroll_top_y, 0.2) await asyncio.sleep(0.3) check_up_path = take_screenshot(d, f"check_up_{p_idx}", save_dir=TEMP_IMAGE_DIR) after_up_md5 = Kit.get_image_content_md5(check_up_path) if os.path.exists(check_up_path): os.remove(check_up_path) if before_up_md5 == after_up_md5: logger.info(f"价格列表已到达顶部 (共抓取页数: {p_idx})") break # 【异步优化】后台处理价格 logger.info(f"已启动后台分析价格页: {station_name}") task_price = asyncio.create_task(analyze_prices_background(service, station_name, price_screenshots, device_info=device_info)) background_tasks.append(task_price) # 从三级页面返回二级页面 d.press("back") await asyncio.sleep(0.5) # 检查返回后的页面状态,防止多重返回导致退出到搜索页 check_back_path = take_screenshot(d, f"check_back_{detail_uuid}", save_dir=TEMP_IMAGE_DIR) # 1. 与列表页对比 (使用列表页的排除比例) check_list_md5 = Kit.get_image_content_md5( check_back_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO ) # 2. 与详情页对比 (使用默认排除比例,与 before_click_md5 一致) check_detail_md5 = Kit.get_image_content_md5(check_back_path) if check_list_md5 == current_md5: logger.info("检测到已直接返回列表页,跳过后续返回操作。") should_back_to_list = False elif check_detail_md5 == before_click_md5: logger.info("检测到返回详情页,需执行第二次返回。") should_back_to_list = True else: logger.warning(f"返回后状态未知 (ListMD5:{check_list_md5[:8]} vs {current_md5[:8]}, DetailMD5:{check_detail_md5[:8]} vs {before_click_md5[:8]})。") logger.info("再次尝试识别按钮以确认当前页面状态...") # 优先尝试 OCR 确认 ocr_check = await ReadImageKit.find_price_entrance_ocr(check_back_path) if ocr_check.get("found"): logger.info("OCR 确认当前仍处于详情页,执行返回。") should_back_to_list = True else: # OCR 未发现,则用 VL 兜底确认 check_res = await ReadImageKit.find_all_time_button_coordinate(check_back_path, device_info=device_info) if check_res and check_res.get("uia_center_x"): logger.info("VL 确认当前仍处于详情页,执行返回。") should_back_to_list = True else: logger.info("无法确认当前页面状态,判定已回到列表页(或非详情页),停止回退以防误退。") should_back_to_list = False # 使用完毕后再清理检查用的截图 if os.path.exists(check_back_path): os.remove(check_back_path) else: logger.warning(f"点击 '{station_name}' 的 '全部时段' 按钮后页面无明显变化,跳过三级页面处理。") # 清理检查用的截图 if os.path.exists(after_click_path): os.remove(after_click_path) else: logger.info(f"场站 {station_name} 经过模板匹配与 VL 识别均未发现 '全部时段' 按钮。") except Exception as e: logger.error(f"处理 '全部时段' 按钮点击流程异常: {e}") # 从二级页面返回 (仅当确实需要返回时) if should_back_to_list: d.press("back") logger.info(f"等待 {WAIT_BACK_TO_LIST + 1} 秒返回列表...") await asyncio.sleep(WAIT_BACK_TO_LIST + 1) # 记录 Redis 去重 (仅按名称去重) cleaned = Kit.clean_station_name(station_name) await redis_kit.set_data(f"crawled:xdt:{cleaned}", "1", expire=REDIS_STATION_EXPIRE) # 如果内层循环已达上限,外层循环也应退出,避免不必要的滑动 if total_encountered_count >= max_stations_count: break # 清理已完成的后台任务 done_tasks = [t for t in background_tasks if t.done()] for t in done_tasks: if t.exception(): logger.error(f"后台任务异常: {t.exception()}") background_tasks.remove(t) if stations and new_stations_processed_in_page == 0: no_new_data_count += 1 logger.info(f"本页所有场站均已处理过,连续 {no_new_data_count} 页无新数据。") # 【优化】由于滑动步长已调大 (0.5),连续多页重复的可能性降低 if no_new_data_count >= 5: logger.info("连续 5 页无新数据,判定为已到底或重复循环,提前结束。") break else: no_new_data_count = 0 # 6. 翻页 # 【优化】使用“小步快跑”策略,减小滑动距离以避免跳过场站,并能更平滑地躲避广告 # 步长已在 Setting.py 中统一配置为 SCROLL_DISTANCE_RATIO logger.info(f"执行翻页滑动 (步长: {SCROLL_DISTANCE_RATIO})...") d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO) await asyncio.sleep(WAIT_AFTER_SCROLL) # 等待所有后台任务完成 if background_tasks: logger.info(f"正在等待剩余 {len(background_tasks)} 个后台任务完成...") await asyncio.gather(*background_tasks, return_exceptions=True) logger.info(f"采集任务完成,共遇到 {total_encountered_count} 个场站,新处理 {total_new_processed_count} 个。") return total_new_processed_count def clean_images_dir(): """清理 Images 目录下除保留文件外的所有文件""" base_dir = os.path.dirname(os.path.abspath(__file__)) images_dir = os.path.join(base_dir, "Images") if not os.path.exists(images_dir): return keep_files = {'1.jpg', '2.jpg', '3.jpg', '4.jpg'} logger.info(f"正在清理 Images 目录 (保留: {keep_files})...") deleted_count = 0 for filename in os.listdir(images_dir): if filename in keep_files: continue file_path = os.path.join(images_dir, filename) try: if os.path.isfile(file_path): os.remove(file_path) deleted_count += 1 except Exception as e: logger.warning(f"删除文件失败 {filename}: {e}") logger.info(f"清理完成,共删除 {deleted_count} 个文件。") async def clean_redis_data(redis_kit): """清除 Redis 中的场站处理记录""" if TEST_CLEAR_REDIS: logger.info("测试模式开启:正在清除 Redis 中的场站处理记录 (processed_station:*)...") keys = await redis_kit.keys("processed_station:*") if keys: count = await redis_kit.delete(*keys) logger.info(f"已清除 {count} 条场站处理记录") else: logger.info("未发现旧的场站处理记录") async def main(service=None, do_cleanup=True): # 连接设备 d = u2.connect() # 初始化服务 should_close_service = False if service is None: service = XinDianTuService() should_close_service = True # 初始化数据库连接(XinDianTuService 需要) await service.init_db() uploader = ObsUploader() redis_kit = RedisKit() # 任务日志记录 from datetime import datetime task_id = str(uuid.uuid4()) start_time = datetime.now() operator = "新电途" # 记录任务开始 try: await service.db.save("t_crawl_task_log", { "task_id": task_id, "operator": operator, "start_time": start_time, "status": "running" }, "task_id") except Exception as e: logger.error(f"无法记录任务开始日志: {e}") total_count = 0 status = "success" error_msg = None try: if do_cleanup: # 清理图片目录 clean_images_dir() # [Testing] 如果配置为测试模式,启动时清除 Redis 记录 await clean_redis_data(redis_kit) # 清理过期数据 # await service.cleanup_old_data() # 获取场站列表 total_count = await get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS_COUNT) if total_count: logger.info("场站列表采集完成。") else: logger.warning("未采集到任何场站信息。") return True except Exception as e: logger.exception(f"运行过程中出现异常: {e}") status = "failed" error_msg = str(e) return False finally: # 记录任务结束 end_time = datetime.now() duration = int((end_time - start_time).total_seconds()) try: await service.db.update("t_crawl_task_log", { "task_id": task_id, "end_time": end_time, "duration_seconds": duration, "station_count": total_count, "status": status, "error_msg": error_msg }, "task_id") logger.info(f"任务执行日志已更新: {operator} | 耗时: {duration}s | 数量: {total_count} | 状态: {status}") except Exception as e: logger.error(f"无法更新任务执行日志: {e}") # 关闭数据库连接 if should_close_service: await service.close_db() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("程序被用户中断.") except Exception as e: logger.exception(f"程序崩溃: {e}")