Files
aiData/Apps/XinDianTu/Crawler.py
HuangHai 78f116ab84 'commit'
2026-01-16 19:30:31 +08:00

601 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import asyncio
import logging
import uuid
import os
import sys
import json
import time
import hashlib
from PIL import Image
# 将项目根目录添加到 sys.path
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
import uiautomator2 as u2
from Core.BaseCrawler import BaseCrawler
from Apps.XinDianTu import Kit
from Apps.XinDianTu.Kit import take_screenshot
from Util.ObsUtil import ObsUploader
from Util.RedisKit import RedisKit
import cv2
from Apps.XinDianTu.Service import XinDianTuService
from Apps.XinDianTu.ReadImageKit import ReadImageKit
from Config.Config import (
OBS_TMP_PREFIX, CDN_DOMAIN, TEMP_IMAGE_DIR
)
from Apps.XinDianTu.Config.Setting import (
SCROLL_DISTANCE_RATIO,
MAX_STATIONS_COUNT, REDIS_STATION_EXPIRE,
WAIT_DETAIL_PAGE_LOAD, WAIT_BACK_TO_LIST, WAIT_AFTER_SCROLL,
SAFE_EXCLUDE_RATIO,
BOTTOM_SAFE_EXCLUDE_RATIO
)
# --- 用户配置区域 ---
# 是否保留截图文件True=保留备查False=随用随删)
KEEP_SCREENSHOTS = True
# [Testing] 是否在启动时清除 Redis 中的场站处理记录
# True: 每次运行前清除记录,方便反复测试同一个场站
# False: 生产模式,保留记录以避免重复爬取
TEST_CLEAR_REDIS = True
# 配置说明:
# SCROLL_DISTANCE_RATIO 控制翻页时的滑动距离(在 Config.py 中修改)。
# 对于华为 Mate 20x 等分辨率较低或屏幕较小的手机,如果发现翻页时跳过了某些场站,
# 请尝试减小 SCROLL_DISTANCE_RATIO例如设置为 0.4 或 0.3)。
# 这样每次滑动的距离变短,可以确保所有场站都能被完整显示并识别。
# 配置日志输出
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger("StationList")
# 强制设置所有相关模块的日志级别为 INFO防止被第三方库干扰
logging.getLogger("OpenXinDianTu").setLevel(logging.INFO)
logging.getLogger("FullProcess").setLevel(logging.INFO)
class XinDianTuCrawler(BaseCrawler):
"""
新电途小程序爬虫实现
"""
def __init__(self, service=None):
super().__init__(service)
# 初始化配置参数
self.max_stations_count = MAX_STATIONS_COUNT
self.uploader = ObsUploader()
self.redis_kit = RedisKit()
async def start(self):
"""
实现 BaseCrawler 的启动入口
"""
# 兼容旧逻辑,直接调用 main
await main(self.service)
async def open_app(self):
# 实际逻辑在 Opener.py此处可作为封装层
pass
async def crawl_list(self):
# 实际逻辑在 get_station_list此处可作为封装层
pass
async def crawl_detail(self, station_info):
pass
async def is_already_crawled(redis_kit, station_name):
"""
检查场站是否已经爬取过,仅按名称去重
"""
cleaned_name = Kit.clean_station_name(station_name)
if not cleaned_name:
return False
redis_key = f"crawled:xdt:{cleaned_name}"
if await redis_kit.get_data(redis_key):
return True
return False
async def analyze_prices_background(service, station_name, image_paths, device_info=None):
"""
后台异步处理价格图片分析
"""
try:
# 使用多图解析并去重
all_rows = await ReadImageKit.parse_price_schedule_multi(station_name, image_paths, device_info=device_info)
if all_rows:
await service.process_price_detail_data(station_name, all_rows)
logger.info(f"后台处理完成: {station_name} 的分时价格已保存。")
else:
logger.warning(f"后台处理失败: {station_name} 未能解析出价格时段。")
# 处理完成后清理图片
for path in image_paths:
try:
if os.path.exists(path):
os.remove(path)
except:
pass
except Exception as e:
logger.error(f"分析价格后台任务异常 ({station_name}): {e}")
return True
async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS_COUNT):
"""
获取场站列表并处理翻页 (异步优化版)
"""
redis_kit = RedisKit()
window_size = d.window_size()
w, h = window_size[0], window_size[1]
device_info = d.info
device_info['width'] = w
device_info['height'] = h
logger.info(f"开始爬取列表,设备: {device_info.get('productName')} | 分辨率: {w}x{h} | 目标数量: {max_stations_count}")
background_tasks = []
last_list_md5 = None
no_new_data_count = 0
total_encountered_count = 0
total_new_processed_count = 0
scroll_count = 0
while total_encountered_count < max_stations_count:
scroll_count += 1
logger.info(f"正在处理第 {scroll_count} 次滚动 (已遇到: {total_encountered_count}/{max_stations_count}, 新采集: {total_new_processed_count})...")
# 1. 拍摄截图
image_uuid = str(uuid.uuid4())
screenshot_path = take_screenshot(d, image_uuid, save_dir=TEMP_IMAGE_DIR)
# 【优化】不再在每页滚动时检查兔子广告,仅在 Opener 进入时检查一次
# 如果后续发现有其它非兔子广告弹出,可在此处恢复非兔子广告的检测逻辑
ad_top_y_norm = 0.78 # 默认的点击边界 (0.78)
# 检查是否已经滚动到底部 (排除状态栏后,内容与上次一致)
current_md5 = Kit.get_image_content_md5(
screenshot_path,
top_ratio=SAFE_EXCLUDE_RATIO,
bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO
)
if last_list_md5 and current_md5 == last_list_md5:
logger.info("检测到列表核心内容无变化,已到达底部或加载完成,提前结束。")
if os.path.exists(screenshot_path): os.remove(screenshot_path)
break
last_list_md5 = current_md5
logger.info(f"列表页截图已完成: {screenshot_path}")
# 2. 执行本地图形学分析,生成 _vl.jpg 和 .json
logger.info("正在执行图形学切片分析...")
Kit.crop_cards_from_image(screenshot_path)
json_path = screenshot_path.replace(".jpg", ".json")
vl_img_path = screenshot_path.replace(".jpg", "_vl.jpg")
if not os.path.exists(json_path) or not os.path.exists(vl_img_path):
logger.warning("未识别到任何卡片,跳过当前页")
continue
with open(json_path, 'r', encoding='utf-8') as f:
json_metadata = json.load(f)
# 3. 【优化】后台异步上传 _vl.jpg 供以后参考,不再等待上传
vl_object_key = f"{OBS_TMP_PREFIX}/{image_uuid}_vl.jpg"
loop = asyncio.get_running_loop()
loop.run_in_executor(None, uploader.upload_file, vl_object_key, vl_img_path)
# 4. 【优化】直接使用本地路径调用 VL 模型识别,避免等待上传
logger.info("正在调用 VL 模型识别场站信息...")
stations = await service.process_station_list_vl(vl_img_path, json_metadata, device_info=device_info, max_count=max_stations_count - total_encountered_count)
logger.info(f"本页识别到 {len(stations)} 个场站")
# 5. 匹配几何卡片与 VL 识别结果 (XinDianTu 的 parse_vl_image 已经按顺序返回了)
# 这里的 stations 列表长度应该与 json_metadata["cards"] 对应
new_stations_processed_in_page = 0
if json_metadata.get("cards") and stations:
for idx, card in enumerate(json_metadata["cards"]):
# 检查是否已达到最大采集数量
if total_encountered_count >= max_stations_count:
break
# 检查索引是否越界 (VL 模型可能返回的数组长度不一致)
if idx < len(stations) and stations[idx]:
st = stations[idx]
station_name = st.get("station_name")
if not station_name: continue
# 只要是有效的场站,就计入已遇到数量
total_encountered_count += 1
# 检查 Redis 去重
if await is_already_crawled(redis_kit, station_name):
logger.info(f"场站 {station_name} 匹配到已处理记录,跳过。({total_encountered_count}/{max_stations_count})")
continue
# 【优化】检查是否被遮挡或太靠近底部
# card["bounds_norm"] 是 {left, top, right, bottom}
card_bottom = card["bounds_norm"]["bottom"]
if card_bottom > ad_top_y_norm:
logger.warning(f"场站 '{station_name}' 被遮挡或太靠近底部 (Bottom {card_bottom:.2f} > {ad_top_y_norm}),留待下次滚动处理。")
# 还没处理,不能算作已遇到(因为下次还会遇到)
total_encountered_count -= 1
continue
# 正常处理新场站
current_idx = total_encountered_count
remaining = max_stations_count - current_idx
logger.info(f"--- [进度: {current_idx}/{max_stations_count}, 剩余: {remaining}] 发现新场站 '{station_name}',开始处理... ---")
new_stations_processed_in_page += 1
total_new_processed_count += 1
click_x, click_y = card["click_point"]
logger.info(f"准备处理场站: {station_name}, 点击坐标: ({click_x}, {click_y})")
d.click(int(click_x), int(click_y))
# 等待二级页面加载
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
should_back_to_list = True
# 截取二级页面图
detail_uuid = str(uuid.uuid4())
detail_path = take_screenshot(d, detail_uuid, save_dir=TEMP_IMAGE_DIR)
# 【新增】二级页面广告检测 (如:免费停车提示)
# 【优化】不再在详情页检查广告,节省时间和 Token
# detail_ad_res = await ReadImageKit.detect_ad_popup(detail_path, device_info=device_info)
# if detail_ad_res:
# ... (代码已移除)
# 【优化】后台解析详情页地址,直接传本地路径,避免等待上传
logger.info(f"已启动后台分析详情页: {station_name}")
task_addr = asyncio.create_task(service.process_station_address(station_name, detail_path, device_info=device_info))
background_tasks.append(task_addr)
# 【优化】后台异步上传详情页截图供以后参考
detail_object_key = f"{OBS_TMP_PREFIX}/{detail_uuid}.jpg"
loop = asyncio.get_running_loop()
loop.run_in_executor(None, uploader.upload_file, detail_object_key, detail_path)
# --- 抓取价格时段信息 ---
# 【优化】使用 OCR 识别“全部时段”按钮,替代之前的模板匹配 (qbsd.jpg)
# 记录点击前的页面特征,用于验证是否成功进入三级页面
before_click_md5 = Kit.get_image_content_md5(detail_path)
entered_price_page = False
click_pos = None
try:
# 1. 尝试 OCR 识别
logger.info("正在使用 OCR 识别 '全部时段' 按钮...")
ocr_res = await ReadImageKit.find_price_entrance_ocr(detail_path)
if ocr_res.get("found"):
p = ocr_res["point"]
# 归一化坐标转像素坐标
w, h = d.window_size()
click_pos = (int(p[0] * w / 1000), int(p[1] * h / 1000))
logger.info(f"OCR 成功找到 '全部时段' 按钮: {click_pos}")
# 2. 如果 OCR 未找到,则降级到 VL
if not click_pos:
logger.info("OCR 未找到,降级使用 VL 识别...")
res = await ReadImageKit.find_all_time_button_coordinate(detail_path, device_info=device_info)
if res.get("uia_center_x"):
click_pos = (res.get("uia_center_x"), res.get("uia_center_y"))
logger.info(f"VL 识别成功找到 '全部时段' 按钮: {click_pos}")
if click_pos:
d.click(click_pos[0], click_pos[1])
await asyncio.sleep(1.5)
# 检查页面是否真的变化了
after_click_path = take_screenshot(d, f"check_{detail_uuid}", save_dir=TEMP_IMAGE_DIR)
after_click_md5 = Kit.get_image_content_md5(after_click_path)
if before_click_md5 != after_click_md5:
entered_price_page = True
price_screenshots = []
# 1. 向下滚动到底 (根据用户反馈只有不断向下滚动才能看到00点的)
logger.info("正在向下滚动价格列表到底部 (快速多次滚动以尽快看到 00:00)...")
max_scroll_down = 10
for i in range(max_scroll_down):
before_scroll_path = take_screenshot(d, f"scroll_dn_{i}", save_dir=TEMP_IMAGE_DIR)
before_scroll_md5 = Kit.get_image_content_md5(before_scroll_path)
d.swipe_ext("up", scale=0.8)
await asyncio.sleep(0.5)
after_scroll_path = take_screenshot(d, f"scroll_dn_after_{i}", save_dir=TEMP_IMAGE_DIR)
after_scroll_md5 = Kit.get_image_content_md5(after_scroll_path)
# 清理临时截图
if os.path.exists(before_scroll_path): os.remove(before_scroll_path)
if os.path.exists(after_scroll_path): os.remove(after_scroll_path)
if before_scroll_md5 == after_scroll_md5:
logger.info(f"价格列表已到达底部 (滚动次数: {i})")
break
# 2. 向上滚动并逐页截图 (从底向上抓取)
logger.info("正在向上滚动价格列表并逐页截图...")
max_scroll_up = 10
for p_idx in range(1, max_scroll_up + 1):
# 截图当前页
p_uuid = f"{hashlib.md5(station_name.encode('utf-8')).hexdigest()}_p_{p_idx}"
p_path = take_screenshot(d, p_uuid, save_dir=TEMP_IMAGE_DIR)
price_screenshots.append(p_path)
# 检查是否还能向上滚动
before_up_md5 = Kit.get_image_content_md5(p_path)
d.swipe_ext("down", scale=0.85)
await asyncio.sleep(0.5)
# 检查是否还有新内容
check_up_path = take_screenshot(d, f"check_up_{p_idx}", save_dir=TEMP_IMAGE_DIR)
after_up_md5 = Kit.get_image_content_md5(check_up_path)
if os.path.exists(check_up_path): os.remove(check_up_path)
if before_up_md5 == after_up_md5:
logger.info(f"价格列表已到达顶部 (共抓取页数: {p_idx})")
break
# 【异步优化】后台处理价格
logger.info(f"已启动后台分析价格页: {station_name}")
task_price = asyncio.create_task(analyze_prices_background(service, station_name, price_screenshots, device_info=device_info))
background_tasks.append(task_price)
# 从三级页面返回二级页面
d.press("back")
await asyncio.sleep(0.5)
# 检查返回后的页面状态,防止多重返回导致退出到搜索页
check_back_path = take_screenshot(d, f"check_back_{detail_uuid}", save_dir=TEMP_IMAGE_DIR)
# 1. 与列表页对比 (使用列表页的排除比例)
check_list_md5 = Kit.get_image_content_md5(
check_back_path,
top_ratio=SAFE_EXCLUDE_RATIO,
bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO
)
# 2. 与详情页对比 (使用默认排除比例,与 before_click_md5 一致)
check_detail_md5 = Kit.get_image_content_md5(check_back_path)
if check_list_md5 == current_md5:
logger.info("检测到已直接返回列表页,跳过后续返回操作。")
should_back_to_list = False
elif check_detail_md5 == before_click_md5:
logger.info("检测到返回详情页,需执行第二次返回。")
should_back_to_list = True
else:
logger.warning(f"返回后状态未知 (ListMD5:{check_list_md5[:8]} vs {current_md5[:8]}, DetailMD5:{check_detail_md5[:8]} vs {before_click_md5[:8]})。")
logger.info("再次尝试识别按钮以确认当前页面状态...")
# 优先尝试 OCR 确认
ocr_check = await ReadImageKit.find_price_entrance_ocr(check_back_path)
if ocr_check.get("found"):
logger.info("OCR 确认当前仍处于详情页,执行返回。")
should_back_to_list = True
else:
# OCR 未发现,则用 VL 兜底确认
check_res = await ReadImageKit.find_all_time_button_coordinate(check_back_path, device_info=device_info)
if check_res and check_res.get("uia_center_x"):
logger.info("VL 确认当前仍处于详情页,执行返回。")
should_back_to_list = True
else:
logger.info("无法确认当前页面状态,判定已回到列表页(或非详情页),停止回退以防误退。")
should_back_to_list = False
# 使用完毕后再清理检查用的截图
if os.path.exists(check_back_path): os.remove(check_back_path)
else:
logger.warning(f"点击 '{station_name}''全部时段' 按钮后页面无明显变化,跳过三级页面处理。")
# 清理检查用的截图
if os.path.exists(after_click_path): os.remove(after_click_path)
else:
logger.info(f"场站 {station_name} 经过模板匹配与 VL 识别均未发现 '全部时段' 按钮。")
except Exception as e:
logger.error(f"处理 '全部时段' 按钮点击流程异常: {e}")
# 从二级页面返回 (仅当确实需要返回时)
if should_back_to_list:
d.press("back")
logger.info(f"等待 {WAIT_BACK_TO_LIST + 1} 秒返回列表...")
await asyncio.sleep(WAIT_BACK_TO_LIST + 1)
# 记录 Redis 去重 (仅按名称去重)
cleaned = Kit.clean_station_name(station_name)
await redis_kit.set_data(f"crawled:xdt:{cleaned}", "1", expire=REDIS_STATION_EXPIRE)
# 如果内层循环已达上限,外层循环也应退出,避免不必要的滑动
if total_encountered_count >= max_stations_count:
break
# 清理已完成的后台任务
done_tasks = [t for t in background_tasks if t.done()]
for t in done_tasks:
if t.exception():
logger.error(f"后台任务异常: {t.exception()}")
background_tasks.remove(t)
if stations and new_stations_processed_in_page == 0:
no_new_data_count += 1
logger.info(f"本页所有场站均已处理过,连续 {no_new_data_count} 页无新数据。")
# 【优化】由于滑动步长已调大 (0.5),连续多页重复的可能性降低
if no_new_data_count >= 5:
logger.info("连续 5 页无新数据,判定为已到底或重复循环,提前结束。")
break
else:
no_new_data_count = 0
# 6. 翻页
# 【优化】使用“小步快跑”策略,减小滑动距离以避免跳过场站,并能更平滑地躲避广告
# 步长已在 Setting.py 中统一配置为 SCROLL_DISTANCE_RATIO
logger.info(f"执行翻页滑动 (步长: {SCROLL_DISTANCE_RATIO})...")
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
await asyncio.sleep(WAIT_AFTER_SCROLL)
# 等待所有后台任务完成
if background_tasks:
logger.info(f"正在等待剩余 {len(background_tasks)} 个后台任务完成...")
await asyncio.gather(*background_tasks, return_exceptions=True)
logger.info(f"采集任务完成,共遇到 {total_encountered_count} 个场站,新处理 {total_new_processed_count} 个。")
return total_new_processed_count
def clean_images_dir():
"""清理 Images 目录下除保留文件外的所有文件"""
base_dir = os.path.dirname(os.path.abspath(__file__))
images_dir = os.path.join(base_dir, "Images")
if not os.path.exists(images_dir):
return
keep_files = {'1.jpg', '2.jpg', '3.jpg', '4.jpg'}
logger.info(f"正在清理 Images 目录 (保留: {keep_files})...")
deleted_count = 0
for filename in os.listdir(images_dir):
if filename in keep_files:
continue
file_path = os.path.join(images_dir, filename)
try:
if os.path.isfile(file_path):
os.remove(file_path)
deleted_count += 1
except Exception as e:
logger.warning(f"删除文件失败 {filename}: {e}")
logger.info(f"清理完成,共删除 {deleted_count} 个文件。")
async def clean_redis_data(redis_kit):
"""清除 Redis 中的场站处理记录"""
if TEST_CLEAR_REDIS:
logger.info("测试模式开启:正在清除 Redis 中的场站处理记录 (processed_station:*)...")
keys = await redis_kit.keys("processed_station:*")
if keys:
count = await redis_kit.delete(*keys)
logger.info(f"已清除 {count} 条场站处理记录")
else:
logger.info("未发现旧的场站处理记录")
async def main(service=None, do_cleanup=True):
# 连接设备
d = u2.connect()
# 初始化服务
should_close_service = False
if service is None:
service = XinDianTuService()
should_close_service = True
# 初始化数据库连接XinDianTuService 需要)
await service.init_db()
uploader = ObsUploader()
redis_kit = RedisKit()
# 任务日志记录
from datetime import datetime
task_id = str(uuid.uuid4())
start_time = datetime.now()
operator = "新电途"
# 记录任务开始
try:
await service.db.save("t_crawl_task_log", {
"task_id": task_id,
"operator": operator,
"start_time": start_time,
"status": "running"
}, "task_id")
except Exception as e:
logger.error(f"无法记录任务开始日志: {e}")
total_count = 0
status = "success"
error_msg = None
try:
if do_cleanup:
# 清理图片目录
clean_images_dir()
# [Testing] 如果配置为测试模式,启动时清除 Redis 记录
await clean_redis_data(redis_kit)
# 清理过期数据
# await service.cleanup_old_data()
# 获取场站列表
total_count = await get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS_COUNT)
if total_count:
logger.info("场站列表采集完成。")
else:
logger.warning("未采集到任何场站信息。")
return True
except Exception as e:
logger.exception(f"运行过程中出现异常: {e}")
status = "failed"
error_msg = str(e)
return False
finally:
# 记录任务结束
end_time = datetime.now()
duration = int((end_time - start_time).total_seconds())
try:
await service.db.update("t_crawl_task_log", {
"task_id": task_id,
"end_time": end_time,
"duration_seconds": duration,
"station_count": total_count,
"status": status,
"error_msg": error_msg
}, "task_id")
logger.info(f"任务执行日志已更新: {operator} | 耗时: {duration}s | 数量: {total_count} | 状态: {status}")
except Exception as e:
logger.error(f"无法更新任务执行日志: {e}")
# 关闭数据库连接
if should_close_service:
await service.close_db()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("程序被用户中断.")
except Exception as e:
logger.exception(f"程序崩溃: {e}")