This commit is contained in:
HuangHai
2026-01-13 20:12:38 +08:00
parent 099f8fb148
commit c1effd1249
6 changed files with 154 additions and 65 deletions

View File

@@ -1,19 +1,32 @@
# 采集配置
SCROLL_DISTANCE_RATIO = 0.6
# 滑动距离比例 (0.1 ~ 0.9),数值越大滑动幅度越大
SCROLL_DISTANCE_RATIO = 0.5
# 最大采集场站数量,达到此次数后停止采集
MAX_STATIONS_COUNT = 100
# 场站去重过期时间(秒),在此时间内重复出现的场站不会再次点击进入详情页
REDIS_STATION_EXPIRE = 120
# 数据库数据保留时长超过此时长的历史数据is_current=0将被删除
DATA_RETENTION_DAYS = 365
# 测试配置
# [Testing] 是否在启动时清除 Redis 中的已爬取记录 (用于快速重复测试)
TEST_CLEAR_REDIS = False
# 等待时间配置 (秒)
WAIT_DETAIL_PAGE_LOAD = 3.0
WAIT_BACK_TO_LIST = 1.5
WAIT_AFTER_SCROLL = 2.5
# 点击进入详情页后等待加载的时间
WAIT_DETAIL_PAGE_LOAD = 2.0
# 从详情页返回列表页后等待页面刷新的时间
WAIT_BACK_TO_LIST = 0.8
# 执行滑动操作后等待页面内容加载和稳定的时间
WAIT_AFTER_SCROLL = 2.0
# 坐标计算与安全防护
# 屏幕顶部安全排除比例 (0.0~1.0),此比例区域内不进行点击(避开状态栏、顶部菜单、横幅广告等)
SAFE_EXCLUDE_RATIO = 0.20
BOTTOM_SAFE_EXCLUDE_RATIO = 0.1
# 屏幕底部安全排除比例 (0.0~1.0),此比例区域内不进行点击(避开底部导航栏、功能按钮等)
BOTTOM_SAFE_EXCLUDE_RATIO = 0.12
# 默认回退屏幕宽度,当无法自动获取设备信息时使用
FALLBACK_WIDTH = 1080
# 默认回退屏幕高度,当无法自动获取设备信息时使用
FALLBACK_HEIGHT = 2400

View File

@@ -24,11 +24,28 @@ from Apps.AiTeJiYiChong.Config.Setting import (
MAX_STATIONS_COUNT, REDIS_STATION_EXPIRE,
WAIT_AFTER_SCROLL,
SAFE_EXCLUDE_RATIO,
BOTTOM_SAFE_EXCLUDE_RATIO
BOTTOM_SAFE_EXCLUDE_RATIO,
TEST_CLEAR_REDIS
)
logger = logging.getLogger("AiTeJiYiChongCrawler")
async def clean_redis_data(redis_kit):
"""
[Testing] 清除 Redis 中的已爬取记录
"""
if not TEST_CLEAR_REDIS:
return
logger.warning("!!! [Testing] 正在清除 Redis 中的艾特吉易充已爬取记录...")
keys = await redis_kit.keys("crawled:aite:*")
if keys:
for key in keys:
await redis_kit.delete(key)
logger.info(f"已清除 {len(keys)} 条记录。")
else:
logger.info("Redis 中没有找到相关记录。")
async def is_already_crawled(redis_kit, station_name):
"""
检查场站是否已经爬取过,仅按名称去重
@@ -60,12 +77,13 @@ async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT):
background_tasks = []
last_list_md5 = None
no_new_data_count = 0
total_processed_count = 0
total_encountered_count = 0
total_new_processed_count = 0
scroll_count = 0
while total_processed_count < max_stations_count:
while total_encountered_count < max_stations_count:
scroll_count += 1
logger.info(f"正在处理第 {scroll_count} 次滚动 (已采集: {total_processed_count}/{max_stations_count})...")
logger.info(f"正在处理第 {scroll_count} 次滚动 (已遇到: {total_encountered_count}/{max_stations_count}, 新采集: {total_new_processed_count})...")
# 1. 拍摄截图
image_uuid = str(uuid.uuid4())
@@ -101,17 +119,17 @@ async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT):
logger.info("正在调用 VL 模型识别场站信息 (整页识别)...")
# 优先使用带绿框的图片进行识别
img_to_process = vl_img_path if os.path.exists(vl_img_path) else screenshot_path
stations = await service.process_station_list_vl(img_to_process, json_metadata, device_info=device_info)
stations = await service.process_station_list_vl(img_to_process, json_metadata, device_info=device_info, max_count=max_stations_count - total_encountered_count)
logger.info(f"本页识别到 {len(stations)} 个有效场站")
ad_top_y_norm = 0.78 # 默认点击边界
# 5. 遍历处理本页所有场站
new_stations_processed = 0
new_stations_processed_in_page = 0
if json_metadata.get("cards") and stations:
for card_idx, card in enumerate(json_metadata["cards"]):
# 检查是否已达到最大采集数量
if total_processed_count >= max_stations_count:
if total_encountered_count >= max_stations_count:
break
# 检查索引是否越界 (VL 模型可能返回的数组长度不一致)
@@ -120,22 +138,27 @@ async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT):
station_name = station.get("station_name")
if not station_name: continue
# 只要是有效的场站,就计入已遇到数量
total_encountered_count += 1
# 检查 Redis 去重
if await is_already_crawled(redis_kit, station_name):
logger.info(f"场站 {station_name} 匹配到已处理记录,跳过。")
logger.info(f"场站 {station_name} 匹配到已处理记录,跳过。({total_encountered_count}/{max_stations_count})")
continue
# 检查是否被遮挡或太靠近底部
card_bottom = card["bounds_norm"]["bottom"]
if card_bottom > ad_top_y_norm:
logger.warning(f"场站 '{station_name}' 被遮挡或太靠近底部 (Bottom {card_bottom:.2f} > {ad_top_y_norm}),留待下次滚动处理。")
# 还没处理,不能算作已遇到
total_encountered_count -= 1
continue
# 正常处理新场站
click_x, click_y = card["click_point"]
logger.info(f">>> 发现新场站 '{station_name}',开始处理... ({total_processed_count + 1}/{max_stations_count})")
new_stations_processed += 1
total_processed_count += 1
logger.info(f">>> 发现新场站 '{station_name}',开始处理... ({total_encountered_count}/{max_stations_count})")
new_stations_processed_in_page += 1
total_new_processed_count += 1
d.click(click_x, click_y)
# 等待二级页面加载
@@ -161,28 +184,53 @@ async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT):
background_tasks.append(task_detail)
# 4. 寻找分时价格入口并进入三级页面
# 优先使用 VL 模型精准识别,避免误判导致退回微信
res = await ReadImageKit.find_time_price_button_coordinate(detail_path, device_info=device_info)
pos = (res.get("uia_center_x"), res.get("uia_center_y")) if res.get("uia_center_x") else None
# 【优化】梯级识别策略:优先模板匹配 (timePrice.jpg),失败则降级为 VL 识别
template_time_price = os.path.join(os.path.dirname(__file__), "Template", "timePrice.jpg")
# 记录点击前的页面特征,用于验证是否成功进入三级页面
before_click_md5 = Kit.get_image_content_md5(detail_path)
entered_price_page = False
if pos and pos[0] is not None and pos[1] is not None:
ex, ey = pos
logger.info(f"找到 '分时价格' 按钮: ({ex}, {ey})")
click_pos = None
try:
# 1. 尝试模板匹配
if os.path.exists(template_time_price):
match_res = d.image.match(template_time_price)
if match_res:
# 兼容 uiautomator2 不同版本的返回结果 (Match对象或dict)
if hasattr(match_res, 'point') and match_res.point:
click_pos = match_res.point
elif isinstance(match_res, dict):
if 'point' in match_res and match_res['point']:
click_pos = match_res['point']
elif 'x' in match_res and 'y' in match_res:
click_pos = (match_res['x'], match_res['y'])
elif isinstance(match_res, (list, tuple)) and len(match_res) >= 2:
click_pos = (match_res[0], match_res[1])
if click_pos:
logger.info(f"通过 timePrice.jpg 成功找到!坐标: {click_pos}")
# 记录点击前的页面特征,用于验证是否成功进入三级页面
before_click_md5 = Kit.get_image_content_md5(detail_path)
d.click(ex, ey)
await asyncio.sleep(2)
# 检查页面是否真的变化了
after_click_path = take_screenshot(d, f"check_{detail_uuid}", save_dir=TEMP_IMAGE_DIR)
after_click_md5 = Kit.get_image_content_md5(after_click_path)
if before_click_md5 != after_click_md5:
entered_price_page = True
# 抓取三级页面图片
# 如果模板匹配未成功解析坐标,或者匹配直接失败,则降级到 VL
if not click_pos:
logger.info("模板匹配未找到或解析失败,降级使用 VL 识别...")
res = await ReadImageKit.find_time_price_button_coordinate(detail_path, device_info=device_info)
if res.get("uia_center_x"):
click_pos = (res.get("uia_center_x"), res.get("uia_center_y"))
logger.info(f"VL 识别成功找到 '分时价格' 按钮: {click_pos}")
if click_pos:
d.click(click_pos[0], click_pos[1])
await asyncio.sleep(2)
# 检查页面是否真的变化了
after_click_path = take_screenshot(d, f"check_{detail_uuid}", save_dir=TEMP_IMAGE_DIR)
after_click_md5 = Kit.get_image_content_md5(after_click_path)
if before_click_md5 != after_click_md5:
entered_price_page = True
# 抓取三级页面图片
price_screenshots = []
price_screenshots.append(after_click_path)
@@ -238,21 +286,21 @@ async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT):
await redis_kit.set_data(full_name_key, "1", expire=REDIS_STATION_EXPIRE)
# 如果内层循环已达上限,外层循环也应退出,避免不必要的滑动
if total_processed_count >= max_stations_count:
if total_encountered_count >= max_stations_count:
break
# 在每一页结束时,清理已完成的任务
# 清理已完成的后台任务
done_tasks = [t for t in background_tasks if t.done()]
for t in done_tasks:
if t.exception():
logger.error(f"后台任务执行异常: {t.exception()}")
logger.error(f"后台任务异常: {t.exception()}")
background_tasks.remove(t)
if stations and new_stations_processed == 0:
if stations and new_stations_processed_in_page == 0:
no_new_data_count += 1
logger.info(f"本页所有场站均已处理过,连续 {no_new_data_count} 页无新数据。")
if no_new_data_count >= 3:
logger.info("连续 3 页无新数据,判定为已到底或重复循环,提前结束。")
if no_new_data_count >= 5:
logger.info("连续 5 页无新数据,判定为已到底,提前结束。")
break
else:
no_new_data_count = 0
@@ -269,8 +317,8 @@ async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT):
logger.info(f"正在等待剩余 {len(background_tasks)} 个后台分析任务完成...")
await asyncio.gather(*background_tasks, return_exceptions=True)
logger.info(f"采集任务完成,共采集 {total_processed_count}场站")
return total_processed_count
logger.info(f"采集任务完成,共遇到 {total_encountered_count} 个场站,新处理 {total_new_processed_count} 个。")
return total_new_processed_count
async def analyze_prices_background(service, station_name, image_paths):
"""
@@ -311,19 +359,16 @@ async def main(service=None, do_cleanup=True):
"""
爬虫主入口
"""
if do_cleanup:
Kit.clear_temp_dir()
should_close_service = False
if service is None:
service = AiTeJiYiChongService()
should_close_service = True
await service.init_db()
redis_kit = RedisKit()
d = u2.connect()
# 任务日志记录
from datetime import datetime
task_id = str(uuid.uuid4())
start_time = datetime.now()
operator = "艾特吉易充"
@@ -344,7 +389,20 @@ async def main(service=None, do_cleanup=True):
error_msg = None
try:
total_count = await get_station_list(d, service)
if do_cleanup:
# 清理临时目录
Kit.clear_temp_dir()
# [Testing] 如果配置为测试模式,启动时清除 Redis 记录
await clean_redis_data(redis_kit)
total_count = await get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT)
if total_count:
logger.info("场站列表采集完成。")
else:
logger.warning("未采集到任何场站信息。")
return True
except Exception as e:
logger.error(f"爬取过程中出现异常: {e}")

View File

@@ -144,7 +144,7 @@ class AiTeJiYiChongService:
logger.info(f"场站详情处理完成: {name}")
return detail
async def process_station_list_vl(self, image_path, json_metadata, device_info=None) -> list:
async def process_station_list_vl(self, image_path, json_metadata, device_info=None, max_count=None) -> list:
"""
基于 VL 模式处理场站列表 (整页识别)
"""
@@ -159,6 +159,12 @@ class AiTeJiYiChongService:
processed_stations = []
async with await self.db.get_session() as session:
for station in station_list:
# 如果指定了最大数量且已达到,则停止保存
if max_count is not None and len(processed_stations) >= max_count:
# 填充空位以保持索引对齐,但不保存到数据库
processed_stations.append(None)
continue
name = station.get("station_name")
if not name:
continue
@@ -167,7 +173,7 @@ class AiTeJiYiChongService:
now = datetime.now()
station["station_hash"] = station_hash
# 1. 保存 Profile
# 1. 保存 Profile (使用 db_retry 装饰器的方法,如果 Db 类已更新)
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 KiB

View File

@@ -155,12 +155,13 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
background_tasks = []
last_list_md5 = None
no_new_data_count = 0
total_processed_count = 0
total_encountered_count = 0
total_new_processed_count = 0
scroll_count = 0
while total_processed_count < max_stations_count:
while total_encountered_count < max_stations_count:
scroll_count += 1
logger.info(f"正在处理第 {scroll_count} 次滚动 (已采集: {total_processed_count}/{max_stations_count})...")
logger.info(f"正在处理第 {scroll_count} 次滚动 (已遇到: {total_encountered_count}/{max_stations_count}, 新采集: {total_new_processed_count})...")
# 1. 拍摄截图
image_uuid = str(uuid.uuid4())
@@ -205,16 +206,16 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
# 4. 【优化】直接使用本地路径调用 VL 模型识别,避免等待上传
logger.info("正在调用 VL 模型识别场站信息...")
stations = await service.process_station_list_vl(vl_img_path, json_metadata, device_info=device_info)
stations = await service.process_station_list_vl(vl_img_path, json_metadata, device_info=device_info, max_count=max_stations_count - total_encountered_count)
logger.info(f"本页识别到 {len(stations)} 个场站")
# 5. 匹配几何卡片与 VL 识别结果 (XinDianTu 的 parse_vl_image 已经按顺序返回了)
# 这里的 stations 列表长度应该与 json_metadata["cards"] 对应
new_stations_processed = 0
new_stations_processed_in_page = 0
if json_metadata.get("cards") and stations:
for idx, card in enumerate(json_metadata["cards"]):
# 检查是否已达到最大采集数量
if total_processed_count >= max_stations_count:
if total_encountered_count >= max_stations_count:
break
# 检查索引是否越界 (VL 模型可能返回的数组长度不一致)
@@ -223,9 +224,12 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
station_name = st.get("station_name")
if not station_name: continue
# 只要是有效的场站,就计入已遇到数量
total_encountered_count += 1
# 检查 Redis 去重
if await is_already_crawled(redis_kit, station_name):
logger.info(f"场站 {station_name} 匹配到已处理记录,跳过。")
logger.info(f"场站 {station_name} 匹配到已处理记录,跳过。({total_encountered_count}/{max_stations_count})")
continue
# 【优化】检查是否被遮挡或太靠近底部
@@ -233,12 +237,14 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
card_bottom = card["bounds_norm"]["bottom"]
if card_bottom > ad_top_y_norm:
logger.warning(f"场站 '{station_name}' 被遮挡或太靠近底部 (Bottom {card_bottom:.2f} > {ad_top_y_norm}),留待下次滚动处理。")
# 还没处理,不能算作已遇到(因为下次还会遇到)
total_encountered_count -= 1
continue
# 正常处理新场站
logger.info(f">>> 发现新场站 '{station_name}',开始处理... ({total_processed_count + 1}/{max_stations_count})")
new_stations_processed += 1
total_processed_count += 1
logger.info(f">>> 发现新场站 '{station_name}',开始处理... ({total_encountered_count}/{max_stations_count})")
new_stations_processed_in_page += 1
total_new_processed_count += 1
click_x, click_y = card["click_point"]
logger.info(f"准备处理场站: {station_name}, 点击坐标: ({click_x}, {click_y})")
@@ -397,7 +403,7 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
await redis_kit.set_data(f"crawled:xdt:{cleaned}", "1", expire=REDIS_STATION_EXPIRE)
# 如果内层循环已达上限,外层循环也应退出,避免不必要的滑动
if total_processed_count >= max_stations_count:
if total_encountered_count >= max_stations_count:
break
# 清理已完成的后台任务
@@ -407,7 +413,7 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
logger.error(f"后台任务异常: {t.exception()}")
background_tasks.remove(t)
if stations and new_stations_processed == 0:
if stations and new_stations_processed_in_page == 0:
no_new_data_count += 1
logger.info(f"本页所有场站均已处理过,连续 {no_new_data_count} 页无新数据。")
# 【优化】由于滑动步长已调大 (0.5),连续多页重复的可能性降低
@@ -429,8 +435,8 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
logger.info(f"正在等待剩余 {len(background_tasks)} 个后台任务完成...")
await asyncio.gather(*background_tasks, return_exceptions=True)
logger.info("采集任务完成")
return total_processed_count
logger.info(f"采集任务完成,共遇到 {total_encountered_count} 个场站,新处理 {total_new_processed_count} 个。")
return total_new_processed_count
def clean_images_dir():

View File

@@ -61,7 +61,7 @@ class XinDianTuService:
async def close_db(self):
await self.db.close()
async def process_station_list_vl(self, vl_image_url_or_path, json_metadata, device_info=None) -> list:
async def process_station_list_vl(self, vl_image_url_or_path, json_metadata, device_info=None, max_count=None) -> list:
"""
基于 VL 模式处理场站列表
"""
@@ -72,6 +72,12 @@ class XinDianTuService:
processed_stations = []
async with await self.db.get_session() as session:
for station in station_list:
# 如果指定了最大数量且已达到,则停止保存
if max_count is not None and len(processed_stations) >= max_count:
# 填充空位以保持索引对齐,但不保存到数据库
processed_stations.append(None)
continue
name = station.get("station_name")
if not name:
continue