Files
aiData/Apps/YeLiTe/Crawler.py
HuangHai 6655e0cc29 'commit'
2026-01-18 18:59:17 +08:00

430 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import uuid
import os
import sys
import json
import time
from datetime import datetime
from Apps.YeLiTe.Kit import take_screenshot, clean_station_name, get_image_content_md5, detect_price_info_container_cv, setup_logger, get_name_md5
from Apps.YeLiTe.ReadImageKit import ReadImageKit
from Apps.YeLiTe.FirstPageKit import run_ocr_rect
from Apps.YeLiTe.Service import YeLiTeService
from Apps.YeLiTe.Config.Setting import (
SCROLL_DISTANCE_RATIO, WAIT_AFTER_SCROLL, MAX_STATIONS_COUNT,
WAIT_DETAIL_PAGE_LOAD, WAIT_BACK_TO_LIST, TEST_CLEAR_REDIS,
SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO,
FIRST_RUN_ONLY_ONE_STATION
)
from Util.RedisKit import RedisKit
from Core.BaseCrawler import BaseCrawler
import uiautomator2 as u2
# 项目根目录处理
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
logger = setup_logger("YeLiTeCrawler")
class YeLiTeCrawler(BaseCrawler):
def __init__(self, service=None):
super().__init__(service or YeLiTeService())
self.redis_kit = RedisKit()
self.read_image_kit = ReadImageKit()
async def start(self):
"""
实现 BaseCrawler 的启动入口
"""
await main(self.service)
async def open_app(self):
"""
打开驿来特小程序
"""
from Apps.YeLiTe import Opener
return await Opener.open_mini_program()
async def crawl_list(self):
"""
开始爬取列表页
"""
d = u2.connect()
return await self.crawl_list_logic(d)
async def crawl_detail(self, station_info):
"""
爬取详情页 (BaseCrawler 要求,此处逻辑已集成在 crawl_list_logic 中)
"""
pass
async def clean_redis_data(self):
"""
清除测试用的 Redis 记录
"""
if TEST_CLEAR_REDIS:
logger.info("清理 Redis 中的场站处理记录...")
pattern = "crawled:ylt:*"
keys = await self.redis_kit.keys(pattern)
if keys:
await self.redis_kit.delete(*keys)
async def crawl_list_logic(self, d):
w, h = d.window_size()
max_to_crawl = 1 if FIRST_RUN_ONLY_ONE_STATION else MAX_STATIONS_COUNT
processed_count = 0
no_new_data_count = 0
last_md5 = None
background_tasks = []
while processed_count < max_to_crawl:
screenshot_path = take_screenshot(d, f"list_{int(time.time())}.jpg")
curr_md5 = get_image_content_md5(screenshot_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
if last_md5 == curr_md5:
logger.info("内容无变化,判定已到底部")
if os.path.exists(screenshot_path):
os.remove(screenshot_path)
break
last_md5 = curr_md5
stations_page = await run_ocr_rect(screenshot_path)
if not stations_page:
no_new_data_count += 1
if no_new_data_count >= 5:
logger.info("连续 5 页无新数据,停止")
break
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
await asyncio.sleep(WAIT_AFTER_SCROLL)
continue
no_new_data_count = 0
new_stations_in_page = 0
for st in stations_page:
if processed_count >= max_to_crawl:
break
name = st.get("station_name") or st.get("name")
rect = st.get("rect") or []
click_point = st.get("click_point") or st.get("click") or []
distance = st.get("distance_text") or st.get("distance")
piles = st.get("busy_list") or []
if not name or not isinstance(rect, (list, tuple)) or len(rect) < 4:
continue
if not isinstance(click_point, (list, tuple)) or len(click_point) < 2:
continue
point = [int(click_point[0]), int(click_point[1])]
redis_key = f"crawled:ylt:{clean_station_name(name)}"
if await self.redis_kit.get_data(redis_key):
logger.info(f"场站 {name} 已处理过,跳过")
continue
current_idx = processed_count + 1
remaining = max_to_crawl - current_idx
logger.info(f"--- [进度: {current_idx}/{max_to_crawl}, 剩余: {remaining}] 发现新场站: {name} (坐标: {point}, 距离: {distance}) ---")
file_tag = get_name_md5(name)
before_click_path = take_screenshot(d, f"before_{file_tag}")
before_md5 = get_image_content_md5(before_click_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
await asyncio.sleep(0.5)
d.shell(f"input tap {int(point[0])} {int(point[1])}")
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
# 分析详情页 (采用异步后台模式)
detail_shot = take_screenshot(d, f"detail_{file_tag}_{int(time.time())}")
after_md5 = get_image_content_md5(detail_shot, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
# 清理临时对比图
if os.path.exists(before_click_path): os.remove(before_click_path)
if before_md5 == after_md5:
logger.warning(f"首次点击 {name} 未跳转,尝试稍微偏移位置重试...")
# 尝试向下偏移 20px 点击
offset_y = point[1] + 20
d.shell(f"input tap {int(point[0])} {int(offset_y)}")
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
# 再次截图检查
if os.path.exists(detail_shot): os.remove(detail_shot)
detail_shot = take_screenshot(d, f"detail_{file_tag}_{int(time.time())}")
after_md5 = get_image_content_md5(detail_shot, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
if before_md5 != after_md5:
logger.info(f"成功进入详情页: {name}")
total_piles = None
free_piles = None
piles_detail = None
address_detail = None
parking_info = None
if isinstance(piles, list):
total_sum = 0
free_sum = 0
for p in piles:
if not isinstance(p, dict):
continue
t = p.get("total")
f = p.get("idle")
try:
if t is not None:
total_sum += int(t)
except Exception:
pass
try:
if f is not None:
free_sum += int(f)
except Exception:
pass
if total_sum > 0:
total_piles = total_sum
free_piles = free_sum
piles_detail = piles
try:
basic_info = await self.read_image_kit.analyze_detail_basic_info(detail_shot)
if isinstance(basic_info, dict):
addr = basic_info.get("address")
if addr:
address_detail = addr
park = basic_info.get("parking_info")
if park:
parking_info = park
except Exception as e:
logger.warning(f"详情页基础信息识别失败: {e}")
# --- 新增:点击“阶段性电价”按钮以获取完整电价列表 ---
# 使用 OCR 探测价格入口
dqdf_pos = detect_price_info_container_cv(detail_shot)
detail_shots = []
if dqdf_pos:
logger.info(f"发现价格入口按钮 (阶段性电价/当前电费) {dqdf_pos},点击进入...")
d.click(dqdf_pos[0], dqdf_pos[1])
await asyncio.sleep(0.5)
if os.path.exists(detail_shot): os.remove(detail_shot)
scroll_x = int(w * 0.5)
scroll_top_y = int(h * 0.5)
scroll_bottom_y = int(h * 0.8)
logger.info("正在向上滚动价格列表到顶部 (快速多次滚动以尽快看到 00:00)...")
max_scroll_up_to_top = 10
for i in range(max_scroll_up_to_top):
before_scroll_path = take_screenshot(d, f"scroll_up_{i}")
before_scroll_md5 = get_image_content_md5(before_scroll_path)
d.swipe(scroll_x, scroll_top_y, scroll_x, scroll_bottom_y, 0.2)
await asyncio.sleep(0.3)
after_scroll_path = take_screenshot(d, f"scroll_up_after_{i}")
after_scroll_md5 = get_image_content_md5(after_scroll_path)
if os.path.exists(before_scroll_path): os.remove(before_scroll_path)
if os.path.exists(after_scroll_path): os.remove(after_scroll_path)
if before_scroll_md5 == after_scroll_md5:
logger.info(f"价格列表已到达顶部 (滚动次数: {i})")
break
logger.info("正在从顶部开始向下逐页截图...")
max_scroll_down_pages = 10
for p_idx in range(1, max_scroll_down_pages + 1):
p_shot = take_screenshot(d, f"detail_price_{file_tag}_{int(time.time())}_{p_idx}")
before_dn_md5 = get_image_content_md5(p_shot)
d.swipe(scroll_x, scroll_bottom_y, scroll_x, scroll_top_y, 0.2)
await asyncio.sleep(0.3)
check_dn_shot = take_screenshot(d, f"check_dn_{p_idx}")
after_dn_md5 = get_image_content_md5(check_dn_shot)
if os.path.exists(check_dn_shot): os.remove(check_dn_shot)
detail_shots.append(p_shot)
if before_dn_md5 == after_dn_md5:
logger.info(f"价格列表已到达底部 (共抓取页数: {p_idx})")
break
# 关闭分时段定价列表 (点击屏幕最顶部空白处)
logger.info("点击屏幕上部空白处以关闭定价列表...")
d.click(w * 0.5, h * 0.1)
await asyncio.sleep(1.0)
else:
logger.info("未发现价格入口按钮 (阶段性电价/当前电费),直接分析当前页")
detail_shots.append(detail_shot)
# --------------------------------------------------
# 启动后台任务处理详情页
task = asyncio.create_task(
self.analyze_detail_background(
name,
detail_shots,
address=address_detail,
distance=distance,
total_piles=total_piles,
free_piles=free_piles,
piles_detail=piles_detail,
parking_info=parking_info,
)
)
background_tasks.append(task)
processed_count += 1
new_stations_in_page += 1
await self.redis_kit.set_data(redis_key, "1", expire=86400*7)
# 返回列表页 (现在应该已经回到了详情页主界面)
d.press("back")
await asyncio.sleep(WAIT_BACK_TO_LIST)
# 再次检查是否回到了列表页,如果没回,再点一次 back
after_back_shot = take_screenshot(d, "check_back")
after_back_md5 = get_image_content_md5(after_back_shot, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
if os.path.exists(after_back_shot): os.remove(after_back_shot)
if after_back_md5 == after_md5: # 还在详情页/电费页
logger.info("似乎还在二级页面,尝试再次返回...")
d.press("back")
await asyncio.sleep(WAIT_BACK_TO_LIST)
if FIRST_RUN_ONLY_ONE_STATION:
logger.info("已完成首个场站的全流程采集,根据配置退出驿来特爬取任务。")
return processed_count
else:
logger.warning(f"点击场站 {name} 后页面似乎未跳转,跳过返回操作")
if os.path.exists(detail_shot): os.remove(detail_shot)
# 即使没进去,也记录一下,避免短时间内重复尝试
await self.redis_kit.set_data(redis_key, "1", expire=3600)
if new_stations_in_page == 0 and stations_page:
no_new_data_count += 1
if no_new_data_count >= 5:
logger.info("连续 5 页均无新场站,停止")
break
else:
no_new_data_count = 0
# 滑动翻页
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
await asyncio.sleep(WAIT_AFTER_SCROLL)
# 等待后台任务完成
if background_tasks:
logger.info(f"等待 {len(background_tasks)} 个后台分析任务完成...")
await asyncio.gather(*background_tasks, return_exceptions=True)
return processed_count
async def analyze_detail_background(
self,
station_name,
image_paths,
address=None,
distance=None,
total_piles=None,
free_piles=None,
piles_detail=None,
parking_info=None,
):
"""
后台异步分析详情页 (支持多张截图合并)
"""
try:
if isinstance(image_paths, str):
image_paths = [image_paths]
logger.info(f"开始后台分析场站: {station_name} (图片数: {len(image_paths)})")
all_prices = []
for img_path in image_paths:
prices = await self.read_image_kit.analyze_detail_price(img_path)
if prices:
all_prices.extend(prices)
# 去重合并
unique_prices = []
seen_periods = set()
for p in all_prices:
start = str(p.get('start', '')).strip()
end = str(p.get('end', '')).strip()
if not start or not end:
continue
key = f"{start}-{end}"
if key not in seen_periods:
seen_periods.add(key)
unique_prices.append(p)
# 按开始时间排序
unique_prices.sort(key=lambda x: x.get('start', '00:00'))
if unique_prices:
await self.service.process_price_detail_data(
station_name,
unique_prices,
address=address,
distance=distance,
total_piles=total_piles,
free_piles=free_piles,
piles_detail=piles_detail,
parking_info=parking_info,
)
logger.info(f"场站 {station_name} 价格分析完成并入库 (记录数: {len(unique_prices)}, 地址: {address}, 距离: {distance})")
else:
logger.warning(f"场站 {station_name} 未识别到价格信息")
except Exception as e:
logger.error(f"后台分析 {station_name} 失败: {e}")
finally:
# 调试阶段暂时不删除截图,方便排查 VLM 识别失败原因
pass
# if isinstance(image_paths, list):
# for p in image_paths:
# if os.path.exists(p): os.remove(p)
# elif isinstance(image_paths, str) and os.path.exists(image_paths):
# os.remove(image_paths)
async def main(service=None):
if service is None:
service = YeLiTeService()
await service.init_db()
crawler = YeLiTeCrawler(service)
d = u2.connect()
# 清理 Redis
await crawler.clean_redis_data()
# 记录任务日志
task_id = str(uuid.uuid4())
await service.log_task_start(task_id)
total_count = 0
status = "success"
error_msg = None
try:
total_count = await crawler.crawl_list_logic(d)
except Exception as e:
logger.error(f"主流程执行失败: {e}")
status = "failed"
error_msg = str(e)
finally:
await service.log_task_end(task_id, total_count, status, error_msg)
# 如果是内部初始化的 service则关闭
if service and not isinstance(service, YeLiTeService):
await service.close_db()
async def get_image_md5_async(path):
return get_image_md5(path)
if __name__ == "__main__":
asyncio.run(main())