This commit is contained in:
HuangHai
2026-01-15 15:41:30 +08:00
parent 5172dbbc49
commit ba46eb15f6
20 changed files with 335 additions and 182 deletions

View File

@@ -238,20 +238,49 @@ async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT):
price_screenshots = []
price_screenshots.append(after_click_path)
# 滑动处理
last_price_md5 = after_click_md5
for scroll_idx in range(2):
d.swipe(w // 2, int(h * 0.7), w // 2, int(h * 0.3), duration=0.4)
await asyncio.sleep(1)
scroll_path = take_screenshot(d, f"price_scroll_{scroll_idx}_{station_name}", save_dir=TEMP_IMAGE_DIR)
curr_price_md5 = Kit.get_image_content_md5(
scroll_path,
top_ratio=SAFE_EXCLUDE_RATIO,
bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO
)
if curr_price_md5 == last_price_md5: break
price_screenshots.append(scroll_path)
last_price_md5 = curr_price_md5
# 1. 向下滚动到底 (根据用户反馈只有不断向下滚动才能看到00点的)
logger.info("正在向下滚动价格列表到底部...")
max_scroll_down = 6
for i in range(max_scroll_down):
before_scroll_path = take_screenshot(d, f"scroll_dn_{i}", save_dir=TEMP_IMAGE_DIR)
before_scroll_md5 = Kit.get_image_content_md5(before_scroll_path)
d.swipe_ext("up", scale=0.8) # 向下滚动 = 手势向上
await asyncio.sleep(1.2)
after_scroll_path = take_screenshot(d, f"scroll_dn_after_{i}", save_dir=TEMP_IMAGE_DIR)
after_scroll_md5 = Kit.get_image_content_md5(after_scroll_path)
# 清理临时截图
if os.path.exists(before_scroll_path): os.remove(before_scroll_path)
if os.path.exists(after_scroll_path): os.remove(after_scroll_path)
if before_scroll_md5 == after_scroll_md5:
logger.info(f"价格列表已到达底部 (滚动次数: {i})")
break
# 2. 向上滚动并逐页截图 (从底向上抓取)
logger.info("正在向上滚动价格列表并逐页截图...")
max_scroll_up = 8
for p_idx in range(1, max_scroll_up + 1):
# 截图当前页
p_shot = take_screenshot(d, f"price_scroll_{p_idx}_{station_name}", save_dir=TEMP_IMAGE_DIR)
# 检查是否还能向上滚动
before_up_md5 = Kit.get_image_content_md5(p_shot)
d.swipe_ext("down", scale=0.85) # 向上滚动 = 手势向下
await asyncio.sleep(1.5)
# 检查是否还有新内容
check_up_path = take_screenshot(d, f"check_up_{p_idx}", save_dir=TEMP_IMAGE_DIR)
after_up_md5 = Kit.get_image_content_md5(check_up_path)
if os.path.exists(check_up_path): os.remove(check_up_path)
price_screenshots.append(p_shot)
if before_up_md5 == after_up_md5:
logger.info(f"价格列表已到达顶部 (共抓取页数: {p_idx})")
break
# 后台处理价格图片
logger.info(f"已启动后台分析价格页: {station_name},共 {len(price_screenshots)} 张图")

View File

@@ -382,129 +382,88 @@ class TeLaiDianCrawler(BaseCrawler):
await asyncio.sleep(1.0)
logger.info("[电价页] 先多次向下滚动,使列表回到起始位置")
top_last_md5 = None
top_stable = 0
max_top_round = 8
price_top_screen = entered_price_path
for idx in range(max_top_round):
price_top_screen = take_screenshot(d, f"tld_detail_price_top_{int(time.time())}_{idx}.jpg")
logger.info(f"[电价页] 向下滚动前后的截图: {price_top_screen}")
curr_md5 = get_image_content_md5(price_top_screen, top_ratio=0.2, bottom_ratio=0.2)
if top_last_md5 is not None and curr_md5 == top_last_md5:
top_stable += 1
logger.info(f"[电价页] 页面内容连续第 {top_stable} 次无变化,可能已到顶部")
if top_stable >= 2:
break
else:
top_stable = 0
top_last_md5 = curr_md5
d.swipe_ext("down", scale=0.8)
await asyncio.sleep(1.0)
# 1. 向下滚动到底 (根据用户反馈只有不断向下滚动才能看到00点的)
logger.info("正在向下滚动价格列表到底部...")
max_scroll_down = 6
for i in range(max_scroll_down):
before_scroll_path = take_screenshot(d, f"scroll_dn_{i}.jpg")
before_scroll_md5 = get_image_content_md5(before_scroll_path)
d.swipe_ext("up", scale=0.8) # 向下滚动 = 手势向上
await asyncio.sleep(1.2)
after_scroll_path = take_screenshot(d, f"scroll_dn_after_{i}.jpg")
after_scroll_md5 = get_image_content_md5(after_scroll_path)
# 清理临时截图
if os.path.exists(before_scroll_path): os.remove(before_scroll_path)
if os.path.exists(after_scroll_path): os.remove(after_scroll_path)
if before_scroll_md5 == after_scroll_md5:
logger.info(f"价格列表已到达底部 (滚动次数: {i})")
break
# 2. 向上滚动并逐页截图 (从底向上抓取)
logger.info("正在向上滚动价格列表并逐页截图...")
all_prices = []
last_price_md5 = None
price_page_count = 0
max_price_pages = 4
screenshot_tasks = []
temp_screenshots = []
price_screen_path = price_top_screen
price_screenshots = []
max_scroll_up = 8
for p_idx in range(1, max_scroll_up + 1):
# 截图当前页
p_shot = take_screenshot(d, f"tld_detail_price_{int(time.time())}_{p_idx}.jpg")
# 检查是否还能向上滚动
before_up_md5 = get_image_content_md5(p_shot)
d.swipe_ext("down", scale=0.85) # 向上滚动 = 手势向下
await asyncio.sleep(1.5)
# 检查是否还有新内容
check_up_path = take_screenshot(d, f"check_up_{p_idx}.jpg")
after_up_md5 = get_image_content_md5(check_up_path)
if os.path.exists(check_up_path): os.remove(check_up_path)
price_screenshots.append(p_shot)
if before_up_md5 == after_up_md5:
logger.info(f"价格列表已到达顶部 (共抓取页数: {p_idx})")
break
try:
while price_page_count < max_price_pages:
price_screen_path = take_screenshot(d, f"tld_detail_price_{int(time.time())}_{price_page_count}.jpg")
curr_md5 = get_image_content_md5(price_screen_path, top_ratio=0.2, bottom_ratio=0.2)
if curr_md5 == last_price_md5:
logger.info("价格页面内容无变化,判定已触底")
if os.path.exists(price_screen_path):
os.remove(price_screen_path)
break
last_price_md5 = curr_md5
temp_screenshots.append(price_screen_path)
logger.info(f"已截取价格详情页第 {price_page_count + 1} 页: {price_screen_path},启动后台异步分析")
task = asyncio.create_task(self.read_image_kit.analyze_detail_price(price_screen_path))
screenshot_tasks.append(task)
logger.info("向上滚动列表,准备截取下一屏价格...")
d.swipe_ext("up", scale=0.8)
await asyncio.sleep(1.0)
price_page_count += 1
if screenshot_tasks:
logger.info(f"UI 操作已完成,等待 {len(screenshot_tasks)} 个后台分析任务结束...")
results = await asyncio.gather(*screenshot_tasks, return_exceptions=True)
for res in results:
if isinstance(res, Exception):
logger.error(f"后台分析任务出错: {res}")
continue
if res:
for p in res:
# 标准化时间格式,确保 0:00 -> 00:00
for key in ['start', 'end']:
val = p.get(key)
if val and ':' in val:
parts = val.split(':')
if len(parts) == 2:
p[key] = f"{int(parts[0]):02d}:{int(parts[1]):02d}"
is_duplicate = False
for existing in all_prices:
if p.get('start') == existing.get('start') and p.get('end') == existing.get('end'):
is_duplicate = True
for key in ['price', 'plus_price', 'market_price', 'elec_price', 'service_price']:
if p.get(key) is not None and (existing.get(key) is None or existing.get(key) == 0):
existing[key] = p[key]
break
if not is_duplicate:
all_prices.append(p)
# 后台处理价格图片 (TeLaiDian 目前是顺序处理,后续可优化为异步)
for p_shot in price_screenshots:
prices = await self.read_image_kit.analyze_detail_price_info(p_shot)
if prices:
all_prices.extend(prices)
if os.path.exists(p_shot): os.remove(p_shot)
if all_prices:
# 去重并排序
unique_prices = []
seen_periods = set()
for p in all_prices:
key = f"{p.get('start')}-{p.get('end')}"
if key not in seen_periods:
unique_prices.append(p)
seen_periods.add(key)
unique_prices.sort(key=lambda x: x.get("start", "00:00"))
# 补充详情
await self.service.process_price_detail_data(
station_name,
unique_prices,
total_piles=total_piles,
free_piles=free_piles,
piles_detail=piles_detail,
parking_info=parking_info
)
logger.info(f"[详情页] {station_name} 价格信息处理完成,共 {len(unique_prices)} 条时段")
except Exception as e:
logger.error(f"抓取价格详情过程中发生异常: {e}")
logger.error(f"[详情页] 处理价格截图失败: {e}")
finally:
for task in screenshot_tasks:
if not task.done():
task.cancel()
for path in temp_screenshots:
if os.path.exists(path):
try:
os.remove(path)
except:
pass
if all_prices:
station_name_clean = clean_station_name(station_name)
try:
all_prices.sort(key=lambda x: x.get('start', '00:00'))
except:
pass
logger.info(f"✅ 场站 {station_name_clean} 共提取到 {len(all_prices)} 条价格信息,准备保存...")
await self.service.save_station_data(
station_name_clean,
address,
all_prices,
total_piles=total_piles,
free_piles=free_piles,
piles_detail=piles_detail,
parking_info=parking_info,
distance=distance
)
else:
logger.warning(f"❌ 未能提取到任何价格信息,请检查页面识别逻辑")
if address:
station_name_clean = clean_station_name(station_name)
logger.info(f"[详情页] 虽未获取价格,但已获取地址,尝试仅保存基础信息: {station_name_clean} | {address}")
try:
await self.service.save_station_profile_only(station_name_clean, address, distance=distance)
except Exception as e:
logger.error(f"[详情页] 仅保存基础信息失败: {e}")
if os.path.exists(price_screen_path):
os.remove(price_screen_path)
# 清理
for p_shot in price_screenshots:
if os.path.exists(p_shot): os.remove(p_shot)
async def crawl_list(self):
"""

View File

@@ -21,6 +21,9 @@ class ReadImageKit:
def __init__(self):
self.vlm = VLMKit()
async def analyze_detail_price_info(self, image_path):
return await self.analyze_detail_price(image_path)
async def find_price_entrance_ocr(self, image_path):
"""
使用 OCR 在详情页寻找价格入口 (全部时段 > 或 全天价格统一 >)

View File

@@ -235,3 +235,49 @@ class TeLaiDianService:
summary.append(f"{i:02d}h:{p.get('price')}")
logger.info(f"24小时价格概览: {' | '.join(summary)}")
return True
async def process_price_detail_data(self, station_name, hourly_schedule, total_piles=None, free_piles=None, piles_detail=None, parking_info=None, distance=None):
if not station_name or not hourly_schedule:
return False
now = datetime.now()
station_hash = self.get_hash(station_name, None)
async with await self.db.get_session() as session:
schedule_id = self.generate_id()
await self.station_price_schedule_model.save(
session=session,
id=schedule_id,
station_hash=station_hash,
schedule_json=hourly_schedule,
valid_start_time=now
)
current_hour = now.hour
current_price_info = None
if isinstance(hourly_schedule, list) and 0 <= current_hour < len(hourly_schedule):
current_price_info = hourly_schedule[current_hour] or {}
elif isinstance(hourly_schedule, dict):
current_price_info = hourly_schedule
else:
current_price_info = {}
status_id = self.generate_id()
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=total_piles,
free_piles=free_piles,
piles_detail_json=piles_detail,
parking_info=parking_info,
distance=distance,
current_price=current_price_info.get('price'),
pro_price=current_price_info.get('plus_price'),
market_price=current_price_info.get('market_price'),
valid_start_time=now
)
await session.commit()
return True

View File

@@ -279,8 +279,7 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
loop.run_in_executor(None, uploader.upload_file, detail_object_key, detail_path)
# --- 抓取价格时段信息 ---
# 【优化】梯级识别策略:优先模板匹配 (qbsd.jpg),失败则降级为 VL 识别
template_qbsd = os.path.join(os.path.dirname(__file__), "Template", "qbsd.jpg")
# 【优化】使用 OCR 识别“全部时段”按钮,替代之前的模板匹配 (qbsd.jpg)
# 记录点击前的页面特征,用于验证是否成功进入三级页面
before_click_md5 = Kit.get_image_content_md5(detail_path)
@@ -289,28 +288,20 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
click_pos = None
try:
# 1. 尝试模板匹配
match_res = d.image.match(template_qbsd)
if match_res:
# 兼容 uiautomator2 不同版本的返回结果 (Match对象或dict)
if hasattr(match_res, 'point') and match_res.point:
click_pos = match_res.point
elif isinstance(match_res, dict):
if 'point' in match_res and match_res['point']:
click_pos = match_res['point']
elif 'x' in match_res and 'y' in match_res:
click_pos = (match_res['x'], match_res['y'])
elif isinstance(match_res, (list, tuple)) and len(match_res) >= 2:
click_pos = (match_res[0], match_res[1])
if click_pos:
logger.info(f"通过 qbsd.jpg 成功找到!坐标: {click_pos}")
else:
logger.warning(f"模板匹配成功但无法解析坐标: {match_res}")
# 1. 尝试 OCR 识别
logger.info("正在使用 OCR 识别 '全部时段' 按钮...")
ocr_res = await ReadImageKit.find_price_entrance_ocr(detail_path)
# 如果模板匹配未成功解析坐标,或者匹配直接失败,则降级到 VL
if ocr_res.get("found"):
p = ocr_res["point"]
# 归一化坐标转像素坐标
w, h = d.window_size()
click_pos = (int(p[0] * w / 1000), int(p[1] * h / 1000))
logger.info(f"OCR 成功找到 '全部时段' 按钮: {click_pos}")
# 2. 如果 OCR 未找到,则降级到 VL
if not click_pos:
logger.info("模板匹配未找到或解析失败,降级使用 VL 识别...")
logger.info("OCR 未找到,降级使用 VL 识别...")
res = await ReadImageKit.find_all_time_button_coordinate(detail_path, device_info=device_info)
if res.get("uia_center_x"):
click_pos = (res.get("uia_center_x"), res.get("uia_center_y"))
@@ -327,13 +318,50 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
if before_click_md5 != after_click_md5:
entered_price_page = True
price_screenshots = []
for p_idx in range(1, 4):
# 1. 向下滚动到底 (根据用户反馈只有不断向下滚动才能看到00点的)
logger.info("正在向下滚动价格列表到底部...")
max_scroll_down = 6
for i in range(max_scroll_down):
before_scroll_path = take_screenshot(d, f"scroll_dn_{i}", save_dir=TEMP_IMAGE_DIR)
before_scroll_md5 = Kit.get_image_content_md5(before_scroll_path)
d.swipe_ext("up", scale=0.8)
await asyncio.sleep(1.2)
after_scroll_path = take_screenshot(d, f"scroll_dn_after_{i}", save_dir=TEMP_IMAGE_DIR)
after_scroll_md5 = Kit.get_image_content_md5(after_scroll_path)
# 清理临时截图
if os.path.exists(before_scroll_path): os.remove(before_scroll_path)
if os.path.exists(after_scroll_path): os.remove(after_scroll_path)
if before_scroll_md5 == after_scroll_md5:
logger.info(f"价格列表已到达底部 (滚动次数: {i})")
break
# 2. 向上滚动并逐页截图 (从底向上抓取)
logger.info("正在向上滚动价格列表并逐页截图...")
max_scroll_up = 8
for p_idx in range(1, max_scroll_up + 1):
# 截图当前页
p_uuid = f"{hashlib.md5(station_name.encode('utf-8')).hexdigest()}_p_{p_idx}"
p_path = take_screenshot(d, p_uuid, save_dir=TEMP_IMAGE_DIR)
price_screenshots.append(p_path)
if p_idx < 3:
d.swipe_ext("up", scale=0.7)
await asyncio.sleep(1.0)
# 检查是否还能向上滚动
before_up_md5 = Kit.get_image_content_md5(p_path)
d.swipe_ext("down", scale=0.85) # 向上滚动 = 手势向下
await asyncio.sleep(1.5)
# 检查是否还有新内容
check_up_path = take_screenshot(d, f"check_up_{p_idx}", save_dir=TEMP_IMAGE_DIR)
after_up_md5 = Kit.get_image_content_md5(check_up_path)
if os.path.exists(check_up_path): os.remove(check_up_path)
if before_up_md5 == after_up_md5:
logger.info(f"价格列表已到达顶部 (共抓取页数: {p_idx})")
break
# 【异步优化】后台处理价格
logger.info(f"已启动后台分析价格页: {station_name}")
@@ -367,12 +395,13 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
logger.warning(f"返回后状态未知 (ListMD5:{check_list_md5[:8]} vs {current_md5[:8]}, DetailMD5:{check_detail_md5[:8]} vs {before_click_md5[:8]})。")
logger.info("再次尝试识别按钮以确认当前页面状态...")
# 优先尝试模板匹配确认
if d.image.match(template_qbsd):
logger.info("模板匹配确认当前仍处于详情页,执行返回。")
# 优先尝试 OCR 确认
ocr_check = await ReadImageKit.find_price_entrance_ocr(check_back_path)
if ocr_check.get("found"):
logger.info("OCR 确认当前仍处于详情页,执行返回。")
should_back_to_list = True
else:
# 模板匹配未发现,则用 VL 兜底确认
# OCR 未发现,则用 VL 兜底确认
check_res = await ReadImageKit.find_all_time_button_coordinate(check_back_path, device_info=device_info)
if check_res and check_res.get("uia_center_x"):
logger.info("VL 确认当前仍处于详情页,执行返回。")

View File

@@ -137,6 +137,43 @@ def is_background_dimmed(image_path, threshold=80):
return avg_brightness < threshold
from Util.EasyOcrKit import get_easyocr_reader
# 预加载 EasyOCR Reader (单例模式)
def get_ocr_reader():
return get_easyocr_reader(gpu=True)
def detect_price_info_container_cv(image_path):
"""
使用 OCR 精准定位详情页中的价格入口文本(“全部时段”)。
返回: [x1, y1, x2, y2] 归一化坐标 (0-1000),如果未找到则返回 None
"""
img = read_image(image_path)
if img is None:
return None
h, w = img.shape[:2]
keywords = ['全部时段']
try:
reader = get_ocr_reader()
# 获取所有识别结果
results = reader.read_text(img)
for (quad, text, prob) in results:
# 检查是否包含关键字
if any(kw in text for kw in keywords) and prob >= 0.5:
# 使用封装后的方法计算归一化矩形
res = reader.get_normalized_rect(quad, w, h)
logger.info(f"[OCR识别] 找到文本: '{text}', 置信度: {prob:.4f}, 归一化坐标: {res}")
return res
except Exception as e:
logger.error(f"OCR 识别发生异常: {e}")
return None
def detect_rabbit_ad_close(image_path, debug_dir=None):
"""
通过图形学算法检测“新电兔AI”广告的关闭按钮

View File

@@ -9,7 +9,7 @@ import logging
import base64
from openai import AsyncOpenAI, BadRequestError
from Apps.XinDianTu.Kit import (
read_image, save_image, detect_rabbit_ad_close
read_image, save_image, detect_rabbit_ad_close, detect_price_info_container_cv
)
from Config.Config import (
ALY_LLM_API_KEY, VL_MODEL_NAME, VL_MODEL_NAME_AD, TEMP_IMAGE_DIR
@@ -77,6 +77,28 @@ class ReadImageKit:
"严格返回纯JSON格式。"
)
@classmethod
async def find_price_entrance_ocr(cls, image_path):
"""
使用 OCR 在详情页寻找价格入口 (全部时段 >)
"""
container_norm = detect_price_info_container_cv(image_path)
if container_norm:
# 找到矩形框 [x1, y1, x2, y2]
x1, y1, x2, y2 = container_norm
# 计算中心点
center_x = (x1 + x2) // 2
center_y = (y1 + y2) // 2
return {
"found": True,
"reason": "OCR 精准定位到价格入口关键字 '全部时段'",
"point": [center_x, center_y]
}
logger.warning(f"⚠️ [OCR识别失败] 在图片中未发现价格入口关键字 (全部时段): {image_path}")
return {"found": False}
@classmethod
async def detect_ad_popup(cls, image_path: str, device_info=None):
"""

View File

View File

@@ -71,6 +71,7 @@ class YiLaiTeCrawler(BaseCrawler):
await self.redis_kit.delete(*keys)
async def crawl_list_logic(self, d):
w, h = d.window_size()
max_to_crawl = 1 if FIRST_RUN_ONLY_ONE_STATION else MAX_STATIONS_COUNT
processed_count = 0
no_new_data_count = 0
@@ -163,35 +164,62 @@ class YiLaiTeCrawler(BaseCrawler):
detail_shots = []
if dqdf_pos:
logger.info(f"发现‘阶段性电价’按钮 {dqdf_pos},点击进入...")
logger.info(f"发现价格入口按钮 (阶段性电价/当前电费) {dqdf_pos},点击进入...")
d.click(dqdf_pos[0], dqdf_pos[1])
await asyncio.sleep(2) # 等待列表加载
# 删除旧的详情页截图
if os.path.exists(detail_shot): os.remove(detail_shot)
# 截图1 (顶部)
shot1 = take_screenshot(d, f"detail_price_{clean_station_name(name)}_{int(time.time())}_1")
detail_shots.append(shot1)
# 1. 向下滚动到底 (根据用户反馈只有不断向下滚动才能看到00点的)
logger.info("正在向下滚动价格列表到底部...")
max_scroll_down = 6
for i in range(max_scroll_down):
before_scroll_path = take_screenshot(d, f"scroll_dn_{i}")
before_scroll_md5 = get_image_content_md5(before_scroll_path)
d.swipe_ext("up", scale=0.8) # 向下滚动 = 手势向上
await asyncio.sleep(1.2)
after_scroll_path = take_screenshot(d, f"scroll_dn_after_{i}")
after_scroll_md5 = get_image_content_md5(after_scroll_path)
# 清理临时截图
if os.path.exists(before_scroll_path): os.remove(before_scroll_path)
if os.path.exists(after_scroll_path): os.remove(after_scroll_path)
if before_scroll_md5 == after_scroll_md5:
logger.info(f"价格列表已到达底部 (滚动次数: {i})")
break
# 向下滚动以获取完整价格表
logger.info("向下滚动以获取更多电价信息(从底部向上滑)...")
# User feedback: 必须放在下方,向上滚动
w, h = d.window_size()
# 调整滑动参数:避开底部导航栏,增加滑动距离,放慢速度
# start_y=0.75, end_y=0.25
d.swipe(w * 0.5, h * 0.75, w * 0.5, h * 0.25, 0.8)
await asyncio.sleep(2.0) # 增加等待时间,确保滚动停止
# 截图2 (底部)
shot2 = take_screenshot(d, f"detail_price_{clean_station_name(name)}_{int(time.time())}_2")
detail_shots.append(shot2)
# 2. 向上滚动并逐页截图 (从底向上抓取)
logger.info("正在向上滚动价格列表并逐页截图...")
max_scroll_up = 8
for p_idx in range(1, max_scroll_up + 1):
# 截图当前页
p_shot = take_screenshot(d, f"detail_price_{clean_station_name(name)}_{int(time.time())}_{p_idx}")
# 检查是否还能向上滚动
before_up_md5 = get_image_content_md5(p_shot)
d.swipe_ext("down", scale=0.85) # 向上滚动 = 手势向下
await asyncio.sleep(1.5)
# 检查是否还有新内容
check_up_shot = take_screenshot(d, f"check_up_{p_idx}")
after_up_md5 = get_image_content_md5(check_up_shot)
if os.path.exists(check_up_shot): os.remove(check_up_shot)
detail_shots.append(p_shot)
if before_up_md5 == after_up_md5:
logger.info(f"价格列表已到达顶部 (共抓取页数: {p_idx})")
break
# 关闭分时段定价列表 (点击屏幕最顶部空白处)
logger.info("点击屏幕上部空白处以关闭定价列表...")
d.click(w * 0.5, h * 0.1)
await asyncio.sleep(1.0)
else:
logger.info("未发现‘阶段性电价’按钮,直接分析当前页")
logger.info("未发现价格入口按钮 (阶段性电价/当前电费),直接分析当前页")
detail_shots.append(detail_shot)
# --------------------------------------------------

View File

@@ -286,7 +286,7 @@ def detect_price_info_container_cv(image_path):
return None
h, w = img.shape[:2]
keywords = ['阶段性电价']
keywords = ['阶段性电价', '当前电费']
try:
reader = get_ocr_reader()
@@ -300,8 +300,8 @@ def detect_price_info_container_cv(image_path):
x_max = np.max(pts[:, 0])
y_max = np.max(pts[:, 1])
center_x = (x_min + x_max) // 2
center_y = (y_min + y_max) // 2
center_x = int((x_min + x_max) // 2)
center_y = int((y_min + y_max) // 2)
logger.info(f"[OCR识别] 找到文本: '{text}', 置信度: {prob:.4f}, 中心坐标: ({center_x}, {center_y})")
return (center_x, center_y)
except Exception as e: