430 lines
18 KiB
Python
430 lines
18 KiB
Python
import asyncio
|
||
import uuid
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
from datetime import datetime
|
||
from Apps.YeLiTe.Kit import take_screenshot, clean_station_name, get_image_content_md5, detect_price_info_container_cv, setup_logger, get_name_md5
|
||
from Apps.YeLiTe.ReadImageKit import ReadImageKit
|
||
from Apps.YeLiTe.FirstPageKit import run_ocr_rect
|
||
from Apps.YeLiTe.Service import YeLiTeService
|
||
from Apps.YeLiTe.Config.Setting import (
|
||
SCROLL_DISTANCE_RATIO, WAIT_AFTER_SCROLL, MAX_STATIONS_COUNT,
|
||
WAIT_DETAIL_PAGE_LOAD, WAIT_BACK_TO_LIST, TEST_CLEAR_REDIS,
|
||
SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO,
|
||
FIRST_RUN_ONLY_ONE_STATION
|
||
)
|
||
from Util.RedisKit import RedisKit
|
||
from Core.BaseCrawler import BaseCrawler
|
||
import uiautomator2 as u2
|
||
|
||
# 项目根目录处理
|
||
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
if project_root not in sys.path:
|
||
sys.path.append(project_root)
|
||
|
||
logger = setup_logger("YeLiTeCrawler")
|
||
|
||
class YeLiTeCrawler(BaseCrawler):
|
||
def __init__(self, service=None):
|
||
super().__init__(service or YeLiTeService())
|
||
self.redis_kit = RedisKit()
|
||
self.read_image_kit = ReadImageKit()
|
||
|
||
async def start(self):
|
||
"""
|
||
实现 BaseCrawler 的启动入口
|
||
"""
|
||
await main(self.service)
|
||
|
||
async def open_app(self):
|
||
"""
|
||
打开驿来特小程序
|
||
"""
|
||
from Apps.YeLiTe import Opener
|
||
return await Opener.open_mini_program()
|
||
|
||
async def crawl_list(self):
|
||
"""
|
||
开始爬取列表页
|
||
"""
|
||
d = u2.connect()
|
||
return await self.crawl_list_logic(d)
|
||
|
||
async def crawl_detail(self, station_info):
|
||
"""
|
||
爬取详情页 (BaseCrawler 要求,此处逻辑已集成在 crawl_list_logic 中)
|
||
"""
|
||
pass
|
||
|
||
async def clean_redis_data(self):
|
||
"""
|
||
清除测试用的 Redis 记录
|
||
"""
|
||
if TEST_CLEAR_REDIS:
|
||
logger.info("清理 Redis 中的场站处理记录...")
|
||
pattern = "crawled:ylt:*"
|
||
keys = await self.redis_kit.keys(pattern)
|
||
if keys:
|
||
await self.redis_kit.delete(*keys)
|
||
|
||
async def crawl_list_logic(self, d):
|
||
w, h = d.window_size()
|
||
max_to_crawl = 1 if FIRST_RUN_ONLY_ONE_STATION else MAX_STATIONS_COUNT
|
||
processed_count = 0
|
||
no_new_data_count = 0
|
||
last_md5 = None
|
||
background_tasks = []
|
||
|
||
while processed_count < max_to_crawl:
|
||
screenshot_path = take_screenshot(d, f"list_{int(time.time())}.jpg")
|
||
|
||
curr_md5 = get_image_content_md5(screenshot_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
|
||
if last_md5 == curr_md5:
|
||
logger.info("内容无变化,判定已到底部")
|
||
if os.path.exists(screenshot_path):
|
||
os.remove(screenshot_path)
|
||
break
|
||
last_md5 = curr_md5
|
||
|
||
stations_page = await run_ocr_rect(screenshot_path)
|
||
if not stations_page:
|
||
no_new_data_count += 1
|
||
if no_new_data_count >= 5:
|
||
logger.info("连续 5 页无新数据,停止")
|
||
break
|
||
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
|
||
await asyncio.sleep(WAIT_AFTER_SCROLL)
|
||
continue
|
||
|
||
no_new_data_count = 0
|
||
new_stations_in_page = 0
|
||
|
||
for st in stations_page:
|
||
if processed_count >= max_to_crawl:
|
||
break
|
||
|
||
name = st.get("station_name") or st.get("name")
|
||
rect = st.get("rect") or []
|
||
click_point = st.get("click_point") or st.get("click") or []
|
||
distance = st.get("distance_text") or st.get("distance")
|
||
piles = st.get("busy_list") or []
|
||
|
||
if not name or not isinstance(rect, (list, tuple)) or len(rect) < 4:
|
||
continue
|
||
if not isinstance(click_point, (list, tuple)) or len(click_point) < 2:
|
||
continue
|
||
|
||
point = [int(click_point[0]), int(click_point[1])]
|
||
|
||
redis_key = f"crawled:ylt:{clean_station_name(name)}"
|
||
if await self.redis_kit.get_data(redis_key):
|
||
logger.info(f"场站 {name} 已处理过,跳过")
|
||
continue
|
||
|
||
current_idx = processed_count + 1
|
||
remaining = max_to_crawl - current_idx
|
||
logger.info(f"--- [进度: {current_idx}/{max_to_crawl}, 剩余: {remaining}] 发现新场站: {name} (坐标: {point}, 距离: {distance}) ---")
|
||
|
||
file_tag = get_name_md5(name)
|
||
before_click_path = take_screenshot(d, f"before_{file_tag}")
|
||
before_md5 = get_image_content_md5(before_click_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
|
||
|
||
await asyncio.sleep(0.5)
|
||
d.shell(f"input tap {int(point[0])} {int(point[1])}")
|
||
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
|
||
|
||
# 分析详情页 (采用异步后台模式)
|
||
detail_shot = take_screenshot(d, f"detail_{file_tag}_{int(time.time())}")
|
||
after_md5 = get_image_content_md5(detail_shot, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
|
||
|
||
# 清理临时对比图
|
||
if os.path.exists(before_click_path): os.remove(before_click_path)
|
||
|
||
if before_md5 == after_md5:
|
||
logger.warning(f"首次点击 {name} 未跳转,尝试稍微偏移位置重试...")
|
||
# 尝试向下偏移 20px 点击
|
||
offset_y = point[1] + 20
|
||
d.shell(f"input tap {int(point[0])} {int(offset_y)}")
|
||
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
|
||
|
||
# 再次截图检查
|
||
if os.path.exists(detail_shot): os.remove(detail_shot)
|
||
detail_shot = take_screenshot(d, f"detail_{file_tag}_{int(time.time())}")
|
||
after_md5 = get_image_content_md5(detail_shot, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
|
||
|
||
if before_md5 != after_md5:
|
||
logger.info(f"成功进入详情页: {name}")
|
||
|
||
total_piles = None
|
||
free_piles = None
|
||
piles_detail = None
|
||
address_detail = None
|
||
parking_info = None
|
||
if isinstance(piles, list):
|
||
total_sum = 0
|
||
free_sum = 0
|
||
for p in piles:
|
||
if not isinstance(p, dict):
|
||
continue
|
||
t = p.get("total")
|
||
f = p.get("idle")
|
||
try:
|
||
if t is not None:
|
||
total_sum += int(t)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
if f is not None:
|
||
free_sum += int(f)
|
||
except Exception:
|
||
pass
|
||
if total_sum > 0:
|
||
total_piles = total_sum
|
||
free_piles = free_sum
|
||
piles_detail = piles
|
||
|
||
try:
|
||
basic_info = await self.read_image_kit.analyze_detail_basic_info(detail_shot)
|
||
if isinstance(basic_info, dict):
|
||
addr = basic_info.get("address")
|
||
if addr:
|
||
address_detail = addr
|
||
park = basic_info.get("parking_info")
|
||
if park:
|
||
parking_info = park
|
||
except Exception as e:
|
||
logger.warning(f"详情页基础信息识别失败: {e}")
|
||
# --- 新增:点击“阶段性电价”按钮以获取完整电价列表 ---
|
||
# 使用 OCR 探测价格入口
|
||
dqdf_pos = detect_price_info_container_cv(detail_shot)
|
||
|
||
detail_shots = []
|
||
|
||
if dqdf_pos:
|
||
logger.info(f"发现价格入口按钮 (阶段性电价/当前电费) {dqdf_pos},点击进入...")
|
||
d.click(dqdf_pos[0], dqdf_pos[1])
|
||
await asyncio.sleep(0.5)
|
||
if os.path.exists(detail_shot): os.remove(detail_shot)
|
||
|
||
scroll_x = int(w * 0.5)
|
||
scroll_top_y = int(h * 0.5)
|
||
scroll_bottom_y = int(h * 0.8)
|
||
|
||
logger.info("正在向上滚动价格列表到顶部 (快速多次滚动以尽快看到 00:00)...")
|
||
max_scroll_up_to_top = 10
|
||
for i in range(max_scroll_up_to_top):
|
||
before_scroll_path = take_screenshot(d, f"scroll_up_{i}")
|
||
before_scroll_md5 = get_image_content_md5(before_scroll_path)
|
||
|
||
d.swipe(scroll_x, scroll_top_y, scroll_x, scroll_bottom_y, 0.2)
|
||
await asyncio.sleep(0.3)
|
||
|
||
after_scroll_path = take_screenshot(d, f"scroll_up_after_{i}")
|
||
after_scroll_md5 = get_image_content_md5(after_scroll_path)
|
||
|
||
if os.path.exists(before_scroll_path): os.remove(before_scroll_path)
|
||
if os.path.exists(after_scroll_path): os.remove(after_scroll_path)
|
||
|
||
if before_scroll_md5 == after_scroll_md5:
|
||
logger.info(f"价格列表已到达顶部 (滚动次数: {i})")
|
||
break
|
||
|
||
logger.info("正在从顶部开始向下逐页截图...")
|
||
max_scroll_down_pages = 10
|
||
for p_idx in range(1, max_scroll_down_pages + 1):
|
||
p_shot = take_screenshot(d, f"detail_price_{file_tag}_{int(time.time())}_{p_idx}")
|
||
|
||
before_dn_md5 = get_image_content_md5(p_shot)
|
||
d.swipe(scroll_x, scroll_bottom_y, scroll_x, scroll_top_y, 0.2)
|
||
await asyncio.sleep(0.3)
|
||
|
||
check_dn_shot = take_screenshot(d, f"check_dn_{p_idx}")
|
||
after_dn_md5 = get_image_content_md5(check_dn_shot)
|
||
if os.path.exists(check_dn_shot): os.remove(check_dn_shot)
|
||
|
||
detail_shots.append(p_shot)
|
||
|
||
if before_dn_md5 == after_dn_md5:
|
||
logger.info(f"价格列表已到达底部 (共抓取页数: {p_idx})")
|
||
break
|
||
|
||
# 关闭分时段定价列表 (点击屏幕最顶部空白处)
|
||
logger.info("点击屏幕上部空白处以关闭定价列表...")
|
||
d.click(w * 0.5, h * 0.1)
|
||
await asyncio.sleep(1.0)
|
||
else:
|
||
logger.info("未发现价格入口按钮 (阶段性电价/当前电费),直接分析当前页")
|
||
detail_shots.append(detail_shot)
|
||
# --------------------------------------------------
|
||
|
||
# 启动后台任务处理详情页
|
||
task = asyncio.create_task(
|
||
self.analyze_detail_background(
|
||
name,
|
||
detail_shots,
|
||
address=address_detail,
|
||
distance=distance,
|
||
total_piles=total_piles,
|
||
free_piles=free_piles,
|
||
piles_detail=piles_detail,
|
||
parking_info=parking_info,
|
||
)
|
||
)
|
||
background_tasks.append(task)
|
||
|
||
processed_count += 1
|
||
new_stations_in_page += 1
|
||
await self.redis_kit.set_data(redis_key, "1", expire=86400*7)
|
||
|
||
# 返回列表页 (现在应该已经回到了详情页主界面)
|
||
d.press("back")
|
||
await asyncio.sleep(WAIT_BACK_TO_LIST)
|
||
|
||
# 再次检查是否回到了列表页,如果没回,再点一次 back
|
||
after_back_shot = take_screenshot(d, "check_back")
|
||
after_back_md5 = get_image_content_md5(after_back_shot, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
|
||
if os.path.exists(after_back_shot): os.remove(after_back_shot)
|
||
|
||
if after_back_md5 == after_md5: # 还在详情页/电费页
|
||
logger.info("似乎还在二级页面,尝试再次返回...")
|
||
d.press("back")
|
||
await asyncio.sleep(WAIT_BACK_TO_LIST)
|
||
|
||
if FIRST_RUN_ONLY_ONE_STATION:
|
||
logger.info("已完成首个场站的全流程采集,根据配置退出驿来特爬取任务。")
|
||
return processed_count
|
||
else:
|
||
logger.warning(f"点击场站 {name} 后页面似乎未跳转,跳过返回操作")
|
||
if os.path.exists(detail_shot): os.remove(detail_shot)
|
||
# 即使没进去,也记录一下,避免短时间内重复尝试
|
||
await self.redis_kit.set_data(redis_key, "1", expire=3600)
|
||
|
||
if new_stations_in_page == 0 and stations_page:
|
||
no_new_data_count += 1
|
||
if no_new_data_count >= 5:
|
||
logger.info("连续 5 页均无新场站,停止")
|
||
break
|
||
else:
|
||
no_new_data_count = 0
|
||
|
||
# 滑动翻页
|
||
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
|
||
await asyncio.sleep(WAIT_AFTER_SCROLL)
|
||
|
||
# 等待后台任务完成
|
||
if background_tasks:
|
||
logger.info(f"等待 {len(background_tasks)} 个后台分析任务完成...")
|
||
await asyncio.gather(*background_tasks, return_exceptions=True)
|
||
|
||
return processed_count
|
||
|
||
async def analyze_detail_background(
|
||
self,
|
||
station_name,
|
||
image_paths,
|
||
address=None,
|
||
distance=None,
|
||
total_piles=None,
|
||
free_piles=None,
|
||
piles_detail=None,
|
||
parking_info=None,
|
||
):
|
||
"""
|
||
后台异步分析详情页 (支持多张截图合并)
|
||
"""
|
||
try:
|
||
if isinstance(image_paths, str):
|
||
image_paths = [image_paths]
|
||
|
||
logger.info(f"开始后台分析场站: {station_name} (图片数: {len(image_paths)})")
|
||
|
||
all_prices = []
|
||
for img_path in image_paths:
|
||
prices = await self.read_image_kit.analyze_detail_price(img_path)
|
||
if prices:
|
||
all_prices.extend(prices)
|
||
|
||
# 去重合并
|
||
unique_prices = []
|
||
seen_periods = set()
|
||
|
||
for p in all_prices:
|
||
start = str(p.get('start', '')).strip()
|
||
end = str(p.get('end', '')).strip()
|
||
|
||
if not start or not end:
|
||
continue
|
||
|
||
key = f"{start}-{end}"
|
||
if key not in seen_periods:
|
||
seen_periods.add(key)
|
||
unique_prices.append(p)
|
||
|
||
# 按开始时间排序
|
||
unique_prices.sort(key=lambda x: x.get('start', '00:00'))
|
||
|
||
if unique_prices:
|
||
await self.service.process_price_detail_data(
|
||
station_name,
|
||
unique_prices,
|
||
address=address,
|
||
distance=distance,
|
||
total_piles=total_piles,
|
||
free_piles=free_piles,
|
||
piles_detail=piles_detail,
|
||
parking_info=parking_info,
|
||
)
|
||
logger.info(f"场站 {station_name} 价格分析完成并入库 (记录数: {len(unique_prices)}, 地址: {address}, 距离: {distance})")
|
||
else:
|
||
logger.warning(f"场站 {station_name} 未识别到价格信息")
|
||
|
||
except Exception as e:
|
||
logger.error(f"后台分析 {station_name} 失败: {e}")
|
||
finally:
|
||
# 调试阶段暂时不删除截图,方便排查 VLM 识别失败原因
|
||
pass
|
||
# if isinstance(image_paths, list):
|
||
# for p in image_paths:
|
||
# if os.path.exists(p): os.remove(p)
|
||
# elif isinstance(image_paths, str) and os.path.exists(image_paths):
|
||
# os.remove(image_paths)
|
||
|
||
async def main(service=None):
|
||
if service is None:
|
||
service = YeLiTeService()
|
||
await service.init_db()
|
||
|
||
crawler = YeLiTeCrawler(service)
|
||
d = u2.connect()
|
||
|
||
# 清理 Redis
|
||
await crawler.clean_redis_data()
|
||
|
||
# 记录任务日志
|
||
task_id = str(uuid.uuid4())
|
||
await service.log_task_start(task_id)
|
||
|
||
total_count = 0
|
||
status = "success"
|
||
error_msg = None
|
||
|
||
try:
|
||
total_count = await crawler.crawl_list_logic(d)
|
||
except Exception as e:
|
||
logger.error(f"主流程执行失败: {e}")
|
||
status = "failed"
|
||
error_msg = str(e)
|
||
finally:
|
||
await service.log_task_end(task_id, total_count, status, error_msg)
|
||
# 如果是内部初始化的 service,则关闭
|
||
if service and not isinstance(service, YeLiTeService):
|
||
await service.close_db()
|
||
|
||
async def get_image_md5_async(path):
|
||
return get_image_md5(path)
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|