512 lines
23 KiB
Python
512 lines
23 KiB
Python
# coding=utf-8
|
||
import asyncio
|
||
import logging
|
||
import uuid
|
||
import os
|
||
import sys
|
||
import time
|
||
from datetime import datetime
|
||
from PIL import Image
|
||
|
||
# 将项目根目录添加到 sys.path
|
||
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
if project_root not in sys.path:
|
||
sys.path.append(project_root)
|
||
|
||
import uiautomator2 as u2
|
||
from Apps.AiTeJiYiChong import Kit
|
||
from Apps.AiTeJiYiChong.Kit import take_screenshot, setup_logger, get_name_md5
|
||
from Apps.AiTeJiYiChong.ReadImageKit import ReadImageKit
|
||
from Apps.AiTeJiYiChong.FirstPageKit import run_ocr_rect
|
||
from Util.RedisKit import RedisKit
|
||
from Apps.AiTeJiYiChong.Service import AiTeJiYiChongService
|
||
from Config.Config import TEMP_IMAGE_DIR
|
||
from Apps.AiTeJiYiChong.Config.Setting import (
|
||
SCROLL_DISTANCE_RATIO,
|
||
MAX_STATIONS_COUNT, REDIS_STATION_EXPIRE,
|
||
WAIT_DETAIL_PAGE_LOAD,
|
||
WAIT_AFTER_SCROLL,
|
||
SAFE_EXCLUDE_RATIO,
|
||
BOTTOM_SAFE_EXCLUDE_RATIO,
|
||
TEST_CLEAR_REDIS,
|
||
FIRST_RUN_ONLY_ONE_STATION
|
||
)
|
||
|
||
logger = setup_logger("AiTeJiYiChongCrawler")
|
||
|
||
async def clean_redis_data(redis_kit):
|
||
"""
|
||
[Testing] 清除 Redis 中的已爬取记录
|
||
"""
|
||
if not TEST_CLEAR_REDIS:
|
||
return
|
||
|
||
logger.warning("!!! [Testing] 正在清除 Redis 中的艾特吉易充已爬取记录...")
|
||
keys = await redis_kit.keys("crawled:aite:*")
|
||
if keys:
|
||
for key in keys:
|
||
await redis_kit.delete(key)
|
||
logger.info(f"已清除 {len(keys)} 条记录。")
|
||
else:
|
||
logger.info("Redis 中没有找到相关记录。")
|
||
|
||
async def is_already_crawled(redis_kit, station_name):
|
||
"""
|
||
检查场站是否已经爬取过,仅按名称去重
|
||
"""
|
||
cleaned_name = Kit.clean_station_name(station_name)
|
||
if not cleaned_name:
|
||
return False
|
||
|
||
redis_key = f"crawled:aite:{cleaned_name}"
|
||
if await redis_kit.get_data(redis_key):
|
||
return True
|
||
return False
|
||
|
||
async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT):
|
||
"""
|
||
获取场站列表并处理翻页
|
||
"""
|
||
redis_kit = RedisKit()
|
||
window_size = d.window_size()
|
||
w, h = window_size[0], window_size[1]
|
||
|
||
device_info = d.info
|
||
device_info['width'] = w
|
||
device_info['height'] = h
|
||
|
||
logger.info(f"开始爬取列表,设备: {device_info.get('productName')} | 分辨率: {w}x{h} | 目标数量: {max_stations_count}")
|
||
|
||
# 用于追踪后台分析任务
|
||
background_tasks = []
|
||
last_list_md5 = None
|
||
no_new_data_count = 0
|
||
total_encountered_count = 0
|
||
total_new_processed_count = 0
|
||
scroll_count = 0
|
||
|
||
while total_new_processed_count < max_stations_count:
|
||
scroll_count += 1
|
||
logger.info(f"正在处理第 {scroll_count} 次滚动 (已遇到: {total_encountered_count}, 新采集: {total_new_processed_count}/{max_stations_count})...")
|
||
|
||
# 1. 拍摄截图
|
||
image_uuid = str(uuid.uuid4())
|
||
screenshot_path = take_screenshot(d, image_uuid, save_dir=TEMP_IMAGE_DIR)
|
||
|
||
# 检查是否已经滚动到底部 (排除状态栏后,内容与上次一致)
|
||
current_md5 = Kit.get_image_content_md5(
|
||
screenshot_path,
|
||
top_ratio=SAFE_EXCLUDE_RATIO,
|
||
bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO
|
||
)
|
||
if last_list_md5 and current_md5 == last_list_md5:
|
||
logger.info("检测到列表核心内容无变化,已到达底部或加载完成,提前结束。")
|
||
if os.path.exists(screenshot_path): os.remove(screenshot_path)
|
||
break
|
||
last_list_md5 = current_md5
|
||
|
||
logger.info(f"列表页截图已完成: {screenshot_path}")
|
||
|
||
# 2. 使用 OCR+LLM 识别当前页所有场站卡片
|
||
logger.info("正在使用 OCR+LLM 识别场站列表...")
|
||
try:
|
||
stations_page = await run_ocr_rect(screenshot_path)
|
||
except Exception as e:
|
||
logger.error(f"OCR+LLM 识别列表页异常: {e}")
|
||
stations_page = None
|
||
|
||
if not stations_page:
|
||
logger.warning("OCR+LLM 未识别到任何场站,跳过当前页")
|
||
continue
|
||
|
||
ad_top_y = int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO))
|
||
|
||
# 3. 遍历处理本页所有场站
|
||
new_stations_processed_in_page = 0
|
||
if stations_page:
|
||
stations_page_sorted = []
|
||
for st in stations_page:
|
||
rect = st.get("rect") or []
|
||
if not isinstance(rect, (list, tuple)) or len(rect) < 4:
|
||
continue
|
||
stations_page_sorted.append(st)
|
||
stations_page_sorted.sort(key=lambda s: (s.get("rect") or [0, 0, 0, 0])[1])
|
||
|
||
for st in stations_page_sorted:
|
||
if total_new_processed_count >= max_stations_count:
|
||
break
|
||
|
||
station_name = st.get("station_name") or st.get("name")
|
||
rect = st.get("rect") or []
|
||
click_point = st.get("click_point") or st.get("click") or []
|
||
|
||
if not station_name or not isinstance(rect, (list, tuple)) or len(rect) < 4:
|
||
continue
|
||
if not isinstance(click_point, (list, tuple)) or len(click_point) < 2:
|
||
continue
|
||
|
||
x1, y1, x2, y2 = rect[0], rect[1], rect[2], rect[3]
|
||
if not (0 <= x1 < x2 <= w and 0 <= y1 < y2 <= h):
|
||
logger.warning(f"场站 '{station_name}' 的矩形越界,跳过: rect={rect}, image=({w},{h})")
|
||
continue
|
||
|
||
click_x, click_y = int(click_point[0]), int(click_point[1])
|
||
if not (x1 <= click_x <= x2 and y1 <= click_y <= y2):
|
||
logger.warning(f"场站 '{station_name}' 点击点不在矩形内部,跳过: click={click_point}, rect={rect}")
|
||
continue
|
||
|
||
total_encountered_count += 1
|
||
|
||
if await is_already_crawled(redis_kit, station_name):
|
||
logger.info(f"场站 {station_name} 匹配到已处理记录,跳过。({total_encountered_count}/{max_stations_count})")
|
||
continue
|
||
|
||
if y2 > ad_top_y:
|
||
logger.info(f"场站 '{station_name}' 底部靠近屏幕底部 (y2={y2}, ad_top_y={ad_top_y}),留待下次滚动处理。")
|
||
total_encountered_count -= 1
|
||
continue
|
||
|
||
busy_list = st.get("busy_list") or []
|
||
piles = []
|
||
if isinstance(busy_list, list):
|
||
for bi in busy_list:
|
||
if not isinstance(bi, dict):
|
||
continue
|
||
mode = bi.get("mode") or bi.get("type") or "未知"
|
||
idle = bi.get("idle")
|
||
total = bi.get("total")
|
||
try:
|
||
idle_val = int(idle) if idle is not None else 0
|
||
except Exception:
|
||
idle_val = 0
|
||
try:
|
||
total_val = int(total) if total is not None else 0
|
||
except Exception:
|
||
total_val = 0
|
||
piles.append(
|
||
{
|
||
"type": mode,
|
||
"total": total_val,
|
||
"free": idle_val,
|
||
}
|
||
)
|
||
|
||
if piles:
|
||
try:
|
||
await service.record_station_status(
|
||
{
|
||
"station_name": station_name,
|
||
"piles": piles,
|
||
"distance": st.get("distance_text"),
|
||
}
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"记录列表页状态失败 ({station_name}): {e}")
|
||
|
||
current_idx = total_new_processed_count + 1
|
||
remaining = max_stations_count - current_idx
|
||
logger.info(f"--- [进度: {current_idx}/{max_stations_count}, 剩余: {remaining}] 发现新场站 '{station_name}',开始处理... ---")
|
||
new_stations_processed_in_page += 1
|
||
total_new_processed_count += 1
|
||
d.click(click_x, click_y)
|
||
|
||
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
|
||
|
||
should_back_to_list = True
|
||
|
||
detail_uuid = f"detail_{get_name_md5(station_name)}_{image_uuid}"
|
||
detail_path = take_screenshot(d, detail_uuid, save_dir=TEMP_IMAGE_DIR)
|
||
|
||
logger.info(f"已启动后台分析详情页: {station_name}")
|
||
|
||
async def detail_task_wrapper(p, name):
|
||
try:
|
||
await service.process_station_detail(p, station_name=name)
|
||
if os.path.exists(p): os.remove(p)
|
||
except Exception as ex:
|
||
logger.error(f"详情页分析异常 ({name}): {ex}")
|
||
|
||
task_detail = asyncio.create_task(detail_task_wrapper(detail_path, station_name))
|
||
background_tasks.append(task_detail)
|
||
|
||
# 4. 寻找分时价格入口并进入三级页面
|
||
# 【优化】梯级识别策略:优先模板匹配 (timePrice.jpg),失败则降级为 VL 识别
|
||
template_time_price = os.path.join(os.path.dirname(__file__), "Template", "timePrice.jpg")
|
||
|
||
# 记录点击前的页面特征,用于验证是否成功进入三级页面
|
||
before_click_md5 = Kit.get_image_content_md5(detail_path)
|
||
|
||
entered_price_page = False
|
||
click_pos = None
|
||
|
||
try:
|
||
# 1. 尝试模板匹配
|
||
if os.path.exists(template_time_price):
|
||
match_res = d.image.match(template_time_price)
|
||
if match_res:
|
||
# 兼容 uiautomator2 不同版本的返回结果 (Match对象或dict)
|
||
if hasattr(match_res, 'point') and match_res.point:
|
||
click_pos = match_res.point
|
||
elif isinstance(match_res, dict):
|
||
if 'point' in match_res and match_res['point']:
|
||
click_pos = match_res['point']
|
||
elif 'x' in match_res and 'y' in match_res:
|
||
click_pos = (match_res['x'], match_res['y'])
|
||
elif isinstance(match_res, (list, tuple)) and len(match_res) >= 2:
|
||
click_pos = (match_res[0], match_res[1])
|
||
|
||
if click_pos:
|
||
logger.info(f"通过 timePrice.jpg 成功找到!坐标: {click_pos}")
|
||
|
||
# 如果模板匹配未成功解析坐标,或者匹配直接失败,则降级到 VL
|
||
if not click_pos:
|
||
logger.info("模板匹配未找到或解析失败,降级使用 VL 识别...")
|
||
res = await ReadImageKit.find_time_price_button_coordinate(detail_path, device_info=device_info)
|
||
if res.get("uia_center_x"):
|
||
click_pos = (res.get("uia_center_x"), res.get("uia_center_y"))
|
||
logger.info(f"VL 识别成功找到 '分时价格' 按钮: {click_pos}")
|
||
|
||
if click_pos:
|
||
click_x, click_y = int(click_pos[0]), int(click_pos[1])
|
||
bottom_safe_y = int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO))
|
||
if click_y >= bottom_safe_y:
|
||
logger.warning(f"'分时价格'按钮识别坐标 ({click_x}, {click_y}) 落在底部安全区域 (>= {bottom_safe_y}),为避免误触底部扫码/导航按钮,本次不点击。")
|
||
else:
|
||
d.click(click_x, click_y)
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 检查页面是否真的变化了
|
||
after_click_path = take_screenshot(d, f"check_{detail_uuid}", save_dir=TEMP_IMAGE_DIR)
|
||
after_click_md5 = Kit.get_image_content_md5(after_click_path)
|
||
|
||
if before_click_md5 != after_click_md5:
|
||
entered_price_page = True
|
||
price_screenshots = []
|
||
|
||
logger.info("正在向上滚动价格列表到顶部 (快速多次滚动以尽快看到 00:00)...")
|
||
max_scroll_up_to_top = 10
|
||
for i in range(max_scroll_up_to_top):
|
||
before_scroll_path = take_screenshot(d, f"aite_price_up_{i}", save_dir=TEMP_IMAGE_DIR)
|
||
before_scroll_md5 = Kit.get_image_content_md5(before_scroll_path)
|
||
|
||
d.swipe_ext("down", scale=0.85)
|
||
await asyncio.sleep(0.3)
|
||
|
||
after_scroll_path = take_screenshot(d, f"aite_price_up_after_{i}", save_dir=TEMP_IMAGE_DIR)
|
||
after_scroll_md5 = Kit.get_image_content_md5(after_scroll_path)
|
||
|
||
if os.path.exists(before_scroll_path): os.remove(before_scroll_path)
|
||
if os.path.exists(after_scroll_path): os.remove(after_scroll_path)
|
||
|
||
if before_scroll_md5 == after_scroll_md5:
|
||
logger.info(f"价格列表已到达顶部 (滚动次数: {i})")
|
||
break
|
||
|
||
logger.info("正在从顶部开始向下逐页截图...")
|
||
max_scroll_down_pages = 10
|
||
for p_idx in range(1, max_scroll_down_pages + 1):
|
||
p_shot = take_screenshot(d, f"price_scroll_{p_idx}_{get_name_md5(station_name)}", save_dir=TEMP_IMAGE_DIR)
|
||
|
||
before_dn_md5 = Kit.get_image_content_md5(p_shot)
|
||
d.swipe_ext("up", scale=0.8)
|
||
await asyncio.sleep(0.3)
|
||
|
||
check_dn_path = take_screenshot(d, f"check_dn_{p_idx}", save_dir=TEMP_IMAGE_DIR)
|
||
after_dn_md5 = Kit.get_image_content_md5(check_dn_path)
|
||
if os.path.exists(check_dn_path): os.remove(check_dn_path)
|
||
|
||
price_screenshots.append(p_shot)
|
||
|
||
if before_dn_md5 == after_dn_md5:
|
||
logger.info(f"价格列表已到达底部 (共抓取页数: {p_idx})")
|
||
break
|
||
|
||
# 后台处理价格图片
|
||
logger.info(f"已启动后台分析价格页: {station_name},共 {len(price_screenshots)} 张图")
|
||
task_price = asyncio.create_task(analyze_prices_background(service, station_name, price_screenshots))
|
||
background_tasks.append(task_price)
|
||
|
||
# 立即返回
|
||
d.press("back")
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 检查返回后的页面状态
|
||
check_back_path = take_screenshot(d, f"check_back_{detail_uuid}", save_dir=TEMP_IMAGE_DIR)
|
||
|
||
check_list_md5 = Kit.get_image_content_md5(
|
||
check_back_path,
|
||
top_ratio=SAFE_EXCLUDE_RATIO,
|
||
bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO
|
||
)
|
||
check_detail_md5 = Kit.get_image_content_md5(check_back_path)
|
||
|
||
if os.path.exists(check_back_path): os.remove(check_back_path)
|
||
|
||
if check_list_md5 == current_md5:
|
||
logger.info("检测到已直接返回列表页,跳过后续返回操作。")
|
||
should_back_to_list = False
|
||
elif check_detail_md5 == before_click_md5:
|
||
logger.info("检测到返回详情页,需执行第二次返回。")
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理三级页面详情异常 ({station_name}): {e}")
|
||
|
||
# 从二级页面返回列表页
|
||
if should_back_to_list:
|
||
d.press("back")
|
||
await asyncio.sleep(0.8)
|
||
|
||
# 记录 Redis
|
||
full_name_key = f"crawled:aite:{Kit.clean_station_name(station_name)}"
|
||
await redis_kit.set_data(full_name_key, "1", expire=REDIS_STATION_EXPIRE)
|
||
|
||
if FIRST_RUN_ONLY_ONE_STATION:
|
||
logger.info("已完成首个场站的全流程采集,根据配置退出艾特吉易充爬取任务。")
|
||
break
|
||
|
||
if total_encountered_count >= max_stations_count:
|
||
break
|
||
|
||
# 清理已完成的后台任务
|
||
done_tasks = [t for t in background_tasks if t.done()]
|
||
for t in done_tasks:
|
||
if t.exception():
|
||
logger.error(f"后台任务异常: {t.exception()}")
|
||
background_tasks.remove(t)
|
||
|
||
if new_stations_processed_in_page == 0:
|
||
no_new_data_count += 1
|
||
logger.info(f"本页所有场站均已处理过或被过滤,连续 {no_new_data_count} 页无新数据。")
|
||
else:
|
||
no_new_data_count = 0
|
||
|
||
if no_new_data_count >= 3:
|
||
logger.info("连续 3 页无有效场站或无新数据,判定列表已经到底,提前结束滚动。")
|
||
break
|
||
|
||
logger.info("执行翻页滑动...")
|
||
start_x, start_y = w // 2, int(h * 0.8)
|
||
end_x, end_y = w // 2, int(h * (0.8 - SCROLL_DISTANCE_RATIO))
|
||
d.swipe(start_x, start_y, end_x, end_y, duration=0.5)
|
||
await asyncio.sleep(WAIT_AFTER_SCROLL)
|
||
|
||
# 最终等待所有后台分析任务完成
|
||
if background_tasks:
|
||
logger.info(f"正在等待剩余 {len(background_tasks)} 个后台分析任务完成...")
|
||
await asyncio.gather(*background_tasks, return_exceptions=True)
|
||
|
||
logger.info(f"采集任务完成,共遇到 {total_encountered_count} 个场站,新处理 {total_new_processed_count} 个。")
|
||
return total_new_processed_count
|
||
|
||
async def analyze_prices_background(service, station_name, image_paths):
|
||
"""
|
||
后台异步处理价格图片分析
|
||
"""
|
||
try:
|
||
all_prices = []
|
||
for path in image_paths:
|
||
prices = await ReadImageKit.get_price_detail_from_image(path)
|
||
if prices and isinstance(prices, list):
|
||
for p in prices:
|
||
if not isinstance(p, dict): continue
|
||
if not any(isinstance(np_i, dict) and np_i.get("start") == p.get("start") for np_i in all_prices):
|
||
all_prices.append(p)
|
||
|
||
if all_prices:
|
||
all_prices = [p for p in all_prices if isinstance(p, dict)]
|
||
all_prices.sort(key=lambda x: x.get("start", "00:00"))
|
||
await service.process_price_detail_data(station_name, all_prices)
|
||
logger.info(f"后台处理完成: {station_name} 的分时价格已保存。")
|
||
|
||
# 处理完成后清理图片,释放磁盘空间
|
||
for path in image_paths:
|
||
try:
|
||
if os.path.exists(path):
|
||
os.remove(path)
|
||
logger.debug(f"已删除已处理的图片: {path}")
|
||
except Exception as e:
|
||
logger.warning(f"删除图片失败: {path}, {e}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"分析价格后台任务异常 ({station_name}): {e}")
|
||
return True
|
||
|
||
|
||
async def main(service=None, do_cleanup=True):
|
||
"""
|
||
爬虫主入口
|
||
"""
|
||
should_close_service = False
|
||
if service is None:
|
||
service = AiTeJiYiChongService()
|
||
should_close_service = True
|
||
await service.init_db()
|
||
|
||
redis_kit = RedisKit()
|
||
d = u2.connect()
|
||
|
||
# 任务日志记录
|
||
task_id = str(uuid.uuid4())
|
||
start_time = datetime.now()
|
||
operator = "艾特吉易充"
|
||
|
||
# 记录任务开始
|
||
try:
|
||
await service.db.save("t_crawl_task_log", {
|
||
"task_id": task_id,
|
||
"operator": operator,
|
||
"start_time": start_time,
|
||
"status": "running"
|
||
}, "task_id")
|
||
except Exception as e:
|
||
logger.error(f"无法记录任务开始日志: {e}")
|
||
|
||
total_count = 0
|
||
status = "success"
|
||
error_msg = None
|
||
|
||
try:
|
||
if do_cleanup:
|
||
# 清理临时目录
|
||
Kit.clear_temp_dir()
|
||
|
||
# [Testing] 如果配置为测试模式,启动时清除 Redis 记录
|
||
await clean_redis_data(redis_kit)
|
||
|
||
total_count = await get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT)
|
||
|
||
if total_count:
|
||
logger.info("场站列表采集完成。")
|
||
else:
|
||
logger.warning("未采集到任何场站信息。")
|
||
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"爬取过程中出现异常: {e}")
|
||
status = "failed"
|
||
error_msg = str(e)
|
||
return False
|
||
finally:
|
||
# 记录任务结束
|
||
end_time = datetime.now()
|
||
duration = int((end_time - start_time).total_seconds())
|
||
try:
|
||
await service.db.update("t_crawl_task_log", {
|
||
"task_id": task_id,
|
||
"end_time": end_time,
|
||
"duration_seconds": duration,
|
||
"station_count": total_count,
|
||
"status": status,
|
||
"error_msg": error_msg
|
||
}, "task_id")
|
||
logger.info(f"任务执行日志已更新: {operator} | 耗时: {duration}s | 数量: {total_count} | 状态: {status}")
|
||
except Exception as e:
|
||
logger.error(f"无法更新任务执行日志: {e}")
|
||
|
||
# 如果是内部初始化的 service,则在此关闭
|
||
if should_close_service:
|
||
await service.close_db()
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|