This commit is contained in:
HuangHai
2026-01-12 16:12:36 +08:00
parent 8606709510
commit 45303dfcb2
6 changed files with 166 additions and 65 deletions

View File

@@ -28,6 +28,33 @@ from Apps.AiTeJiYiChong.Config.Setting import (
logger = logging.getLogger("AiTeJiYiChongCrawler")
async def is_already_crawled(redis_kit, station_name):
"""
检查场站是否已经爬取过,支持前缀模糊匹配处理截断名称
"""
cleaned_name = Kit.clean_station_name(station_name)
if not cleaned_name:
return False
# 1. 检查精确匹配
redis_key = f"crawled:aite:{cleaned_name}"
if await redis_kit.get_data(redis_key):
return True
# 2. 如果名称被截断(原名包含 . ),尝试前缀匹配
# 逻辑:在 Redis 中查找是否有以当前清理后的名称开头的已完成场站
# 为了效率,我们配合 Redis 的 keys 模式(小规模数据下可用)
if ".." in station_name or station_name.endswith("."):
# 获取所有已爬取的 key 进行内存前缀比对,或者利用 Redis 模糊查询
# 这里采用一种更高效的方式:在详情页保存时,额外保存一个前缀索引
prefix_key = f"crawled:aite:prefix:{cleaned_name[:6]}" # 取前6位作为索引前缀
stored_names = await redis_kit.get_data(prefix_key)
if stored_names and isinstance(stored_names, list):
for full_name in stored_names:
if full_name.startswith(cleaned_name):
return True
return False
async def get_station_list(d, service, max_scrolls=MAX_SCROLLS):
"""
获取场站列表并处理翻页
@@ -41,9 +68,16 @@ async def get_station_list(d, service, max_scrolls=MAX_SCROLLS):
device_info['height'] = h
logger.info(f"开始爬取列表,设备: {device_info.get('productName')} | 分辨率: {w}x{h}")
# 用于追踪后台分析任务
background_tasks = []
for i in range(max_scrolls + 1):
logger.info(f"正在处理第 {i + 1} 页...")
# ... (前序步骤保持不变)
# (在此处截断,以便后续替换循环体内的逻辑)
# 1. 拍摄截图
image_uuid = str(uuid.uuid4())
@@ -85,10 +119,9 @@ async def get_station_list(d, service, max_scrolls=MAX_SCROLLS):
logger.warning(f"{card_idx + 1} 个几何卡片未匹配到场站名称,跳过。")
continue
# 检查 Redis 去重
redis_key = f"crawled:aite:{station_name}"
if await redis_kit.get_data(redis_key):
logger.info(f"场站 {station_name} 已处理,跳过。")
# 检查 Redis 去重 (使用新的模糊匹配逻辑)
if await is_already_crawled(redis_kit, station_name):
logger.info(f"场站 {station_name} 匹配到已处理记录,跳过。")
continue
click_x, click_y = card["click_point"]
@@ -102,9 +135,18 @@ async def get_station_list(d, service, max_scrolls=MAX_SCROLLS):
detail_uuid = f"detail_{station_name}_{image_uuid}"
detail_path = take_screenshot(d, detail_uuid, save_dir=TEMP_IMAGE_DIR)
# 5. 调用详情页处理逻辑 (二级页面:提取地址)
logger.info(f"正在解析详情页基础数据: {detail_path}")
detail_data = await service.process_station_detail(detail_path, station_name=station_name)
# 【异步优化】后台解析详情页基础数据,不阻塞 UI 流程
logger.info(f"已启动后台分析详情页: {station_name}")
async def detail_task_wrapper(p, name):
try:
await service.process_station_detail(p, station_name=name)
if os.path.exists(p): os.remove(p)
except Exception as ex:
logger.error(f"详情页分析异常 ({name}): {ex}")
task_detail = asyncio.create_task(detail_task_wrapper(detail_path, station_name))
background_tasks.append(task_detail)
# 6. 寻找 timePrice.jpg 图标并进入三级页面 (分时价格页)
time_price_template = os.path.join(os.path.dirname(__file__), "BiaoShi", "timePrice.jpg")
@@ -114,72 +156,108 @@ async def get_station_list(d, service, max_scrolls=MAX_SCROLLS):
cx, cy, conf = coords
logger.info(f"找到分时价格图标,进入三级页面...")
d.click(cx, cy)
await asyncio.sleep(3)
await asyncio.sleep(2) # 等待加载
# 取三级页面并处理滑动价格
# 取三级页面图片(包括可能的滑动)
price_screenshots = []
price_detail_uuid = f"price_detail_{station_name}_{image_uuid}"
price_detail_path = take_screenshot(d, price_detail_uuid, save_dir=TEMP_IMAGE_DIR)
price_path = take_screenshot(d, price_detail_uuid, save_dir=TEMP_IMAGE_DIR)
price_screenshots.append(price_path)
all_prices = []
# 初始识别
prices = await ReadImageKit.get_price_detail_from_image(price_detail_path)
if prices: all_prices.extend(prices)
last_md5 = Kit.get_file_md5(price_detail_path)
# 滑动逻辑
for scroll_idx in range(3):
logger.info(f"执行第 {scroll_idx + 1} 次滑动以抓取更多价格...")
d.swipe(w // 2, int(h * 0.7), w // 2, int(h * 0.3), duration=0.5)
await asyncio.sleep(2)
# 如果需要滑动,先完成所有截图动作
last_md5 = Kit.get_file_md5(price_path)
for scroll_idx in range(2): # 减少滑动次数以加快速度通常1-2次足够
d.swipe(w // 2, int(h * 0.7), w // 2, int(h * 0.3), duration=0.4)
await asyncio.sleep(1)
scroll_path = take_screenshot(d, f"price_scroll_{scroll_idx}_{station_name}", save_dir=TEMP_IMAGE_DIR)
current_md5 = Kit.get_file_md5(scroll_path)
if current_md5 == last_md5:
logger.info("检测到屏幕未发生变化(已滑到底部),停止滑动识别。")
break
last_md5 = current_md5
new_prices = await ReadImageKit.get_price_detail_from_image(scroll_path)
if new_prices:
for np_item in new_prices:
if not any(np_i.get("start") == np_item.get("start") for np_i in all_prices):
all_prices.append(np_item)
else: break
curr_md5 = Kit.get_file_md5(scroll_path)
if curr_md5 == last_md5: break
price_screenshots.append(scroll_path)
last_md5 = curr_md5
if all_prices:
all_prices.sort(key=lambda x: x.get("start", "00:00"))
hourly_schedule = ReadImageKit.expand_schedule_to_24h(all_prices)
await service.process_price_detail_data(station_name, hourly_schedule)
logger.info(f"场站 {station_name} 三级页面价格处理完成。")
# 【异步优化】后台处理所有价格图片,不阻塞 UI 返回
logger.info(f"已启动后台分析价格页: {station_name},共 {len(price_screenshots)} 张图")
task_price = asyncio.create_task(analyze_prices_background(service, station_name, price_screenshots))
background_tasks.append(task_price)
# 从三级页面返回二级页面
logger.info("从三级页面返回二级页面...")
# 立即从三级页面返回
d.press("back")
await asyncio.sleep(1.5)
await asyncio.sleep(0.5)
else:
logger.warning(f"场站 {station_name} 未找到分时价格入口。")
# 从二级页面返回列表页
logger.info("从二级页面返回列表页...")
d.press("back")
await asyncio.sleep(1.5)
await asyncio.sleep(0.8)
# 记录 Redis
await redis_kit.set_data(redis_key, "1", expire=REDIS_STATION_EXPIRE)
# 记录 Redis (保存全名并建立前缀索引)
full_name_key = f"crawled:aite:{Kit.clean_station_name(station_name)}"
await redis_kit.set_data(full_name_key, "1", expire=REDIS_STATION_EXPIRE)
# 建立前缀索引 (取前6位方便模糊查找)
prefix_idx_key = f"crawled:aite:prefix:{Kit.clean_station_name(station_name)[:6]}"
existing_names = await redis_kit.get_data(prefix_idx_key) or []
if station_name not in existing_names:
existing_names.append(station_name)
await redis_kit.set_data(prefix_idx_key, existing_names, expire=REDIS_STATION_EXPIRE)
# 5. 翻页滑动 (如果没进入二级页面)
# 在每一页结束时,清理已完成的任务,防止内存堆积
done_tasks = [t for t in background_tasks if t.done()]
for t in done_tasks:
if t.exception():
logger.error(f"后台任务执行异常: {t.exception()}")
background_tasks.remove(t)
# 7. 列表页翻页滑动
logger.info("执行翻页滑动...")
start_x, start_y = w // 2, int(h * 0.8)
end_x, end_y = w // 2, int(h * (0.8 - SCROLL_DISTANCE_RATIO))
d.swipe(start_x, start_y, end_x, end_y, duration=0.5)
await asyncio.sleep(WAIT_AFTER_SCROLL)
# 最终等待所有后台分析任务完成
if background_tasks:
logger.info(f"正在等待剩余 {len(background_tasks)} 个后台分析任务完成...")
await asyncio.gather(*background_tasks, return_exceptions=True)
logger.info("达到最大翻页次数,爬取结束。")
return True
async def analyze_prices_background(service, station_name, image_paths):
"""
后台异步处理价格图片分析
"""
try:
all_prices = []
for path in image_paths:
prices = await ReadImageKit.get_price_detail_from_image(path)
if prices and isinstance(prices, list):
for p in prices:
if not isinstance(p, dict): continue
if not any(isinstance(np_i, dict) and np_i.get("start") == p.get("start") for np_i in all_prices):
all_prices.append(p)
if all_prices:
all_prices = [p for p in all_prices if isinstance(p, dict)]
all_prices.sort(key=lambda x: x.get("start", "00:00"))
hourly_schedule = ReadImageKit.expand_schedule_to_24h(all_prices)
await service.process_price_detail_data(station_name, hourly_schedule)
logger.info(f"后台处理完成: {station_name} 的分时价格已保存。")
# 处理完成后清理图片,释放磁盘空间
for path in image_paths:
try:
if os.path.exists(path):
os.remove(path)
logger.debug(f"已删除已处理的图片: {path}")
except Exception as e:
logger.warning(f"删除图片失败: {path}, {e}")
except Exception as e:
logger.error(f"分析价格后台任务异常 ({station_name}): {e}")
return True
async def main(service=None, do_cleanup=True):
"""
爬虫主入口

View File

@@ -12,6 +12,19 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(level
logger = logging.getLogger(__name__)
def clean_station_name(name):
"""
清理场站名称,去掉末尾的省略号和空格
"""
if not name:
return ""
# 去掉末尾的 ... 或 ……
name = name.strip()
while name.endswith(".") or name.endswith(""):
name = name[:-1]
return name.strip()
def get_file_md5(file_path):
"""计算文件的 MD5 值"""
if not os.path.exists(file_path):
@@ -25,10 +38,17 @@ def get_file_md5(file_path):
def read_image(path):
"""读取图片,支持中文路径"""
if not path or not os.path.exists(path):
return None
try:
return cv2.imdecode(np.fromfile(path, dtype=np.uint8), -1)
# 使用 np.fromfile 解决中文路径问题
data = np.fromfile(path, dtype=np.uint8)
if data.size == 0:
return None
img = cv2.imdecode(data, -1)
return img
except Exception as e:
logger.info(f"Error reading image {path}: {e}")
logger.error(f"Error reading image {path}: {e}")
return None

View File

@@ -7,7 +7,7 @@ import json
import aiohttp
import logging
import base64
from openai import OpenAI, BadRequestError
from openai import AsyncOpenAI, BadRequestError
from Config.Config import (
ALY_LLM_API_KEY, VL_MODEL_NAME, VL_MODEL_NAME_AD
)
@@ -20,7 +20,7 @@ logger = logging.getLogger(__name__)
class ReadImageKit:
_client = OpenAI(
_client = AsyncOpenAI(
api_key=ALY_LLM_API_KEY,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
@@ -199,8 +199,7 @@ class ReadImageKit:
encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
try:
response = await asyncio.to_thread(
cls._client.chat.completions.create,
response = await cls._client.chat.completions.create(
model=VL_MODEL_NAME,
messages=[
{
@@ -226,14 +225,20 @@ class ReadImageKit:
# 兼容性处理:如果返回的是对象且包含列表字段,提取列表
if isinstance(data, dict):
for key in ["price_list", "prices", "schedule", "data"]:
for key in ["price_list", "prices", "schedule", "data", "items"]:
if key in data and isinstance(data[key], list):
return data[key]
# 如果本身就是个包含列表的字典,也可能需要根据实际情况调整
# 这里假设 prompt 引导它返回一个包含列表的对象或直接是列表
if "items" in data: return data["items"]
# 如果字典本身就是一个价格项(包含 start 和 price将其包装成列表
if "start" in data and ("price" in data or "total_price" in data):
return [data]
return data
# 如果是列表,直接返回
if isinstance(data, list):
return data
# 兜底:确保返回的是列表
return [data] if data else []
except Exception as e:
logger.error(f"Error calling VL model for price detail: {e}")
@@ -252,8 +257,7 @@ class ReadImageKit:
encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
try:
response = await asyncio.to_thread(
cls._client.chat.completions.create,
response = await cls._client.chat.completions.create(
model=VL_MODEL_NAME,
messages=[
{
@@ -300,8 +304,7 @@ class ReadImageKit:
encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
try:
response = await asyncio.to_thread(
cls._client.chat.completions.create,
response = await cls._client.chat.completions.create(
model=VL_MODEL_NAME,
messages=[
{