This commit is contained in:
HuangHai
2026-01-14 09:56:04 +08:00
parent eed2b6f152
commit ee91bf76d2
14 changed files with 233 additions and 67 deletions

View File

@@ -10,8 +10,8 @@ DEBUG_BOX_COLOR = (0, 255, 0)
DEBUG_BOX_THICKNESS = 3
# 等待时间配置 (秒)
WAIT_DETAIL_PAGE_LOAD = 2.0
WAIT_BACK_TO_LIST = 1.0
WAIT_DETAIL_PAGE_LOAD = 3.5
WAIT_BACK_TO_LIST = 1.5
WAIT_AFTER_SCROLL = 2.5
# 坐标计算与安全防护

View File

@@ -1,10 +1,14 @@
# coding=utf-8
import asyncio
import logging
import asyncio
import os
import sys
import time
from Apps.TeLaiDian.Kit import take_screenshot, get_image_content_md5, clean_station_name, setup_logger
import cv2
from Apps.TeLaiDian.Kit import (
take_screenshot, get_image_content_md5, clean_station_name,
setup_logger, detect_price_click_point_cv, read_image, save_image
)
from Apps.TeLaiDian.ReadImageKit import ReadImageKit
from Apps.TeLaiDian.Service import TeLaiDianService
from Apps.TeLaiDian.Config.Setting import (
@@ -120,37 +124,92 @@ class TeLaiDianCrawler(BaseCrawler):
d.swipe_ext("up", scale=DETAIL_SCROLL_DISTANCE_RATIO)
await asyncio.sleep(1.5)
# 3. 点击“价格信息”按钮 (jgxx.jpg)
template_path = os.path.join(project_root, "Apps", "TeLaiDian", "Template", "jgxx.jpg")
logger.info(f"尝试点击价格详情按钮: {template_path}")
# 3. 点击“价格信息”区域 (识别橘红色价格 P0)
price_button_screen = take_screenshot(d, f"tld_before_price_click_{int(time.time())}.jpg")
logger.info("正在通过 CV 寻找橘红色价格区域 (P0)...")
click_point = detect_price_click_point_cv(price_button_screen)
# 调试:生成点击点标注图
if click_point:
debug_flag_path = price_button_screen.replace(".jpg", "_click_debug.jpg")
img_debug = read_image(price_button_screen)
if img_debug is not None:
cv2.circle(img_debug, (click_point[0], click_point[1]), 20, (0, 0, 255), -1) # 红色大圆点
cv2.line(img_debug, (click_point[0]-40, click_point[1]), (click_point[0]+40, click_point[1]), (255, 255, 255), 3)
cv2.line(img_debug, (click_point[0], click_point[1]-40), (click_point[0], click_point[1]+40), (255, 255, 255), 3)
save_image(debug_flag_path, img_debug)
logger.info(f"点击点调试图已保存: {debug_flag_path}")
try:
# 使用 uiautomator2 的图像识别点击
match = d.image.match(template_path)
if match:
logger.info(f"找到价格按钮,坐标: {match['point']}")
d.image.click(template_path)
if click_point:
logger.info(f"CV 成功定位价格区域,点击坐标: {click_point}")
d.click(click_point[0], click_point[1])
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
else:
logger.warning("未找到价格按钮模板,尝试备选方案:直接点击屏幕下方区域")
# 备选方案:如果模板匹配失败,尝试点击屏幕中下方
w, h = d.window_size()
d.click(w // 2, int(h * 0.8))
logger.warning("CV 未能定位价格区域,尝试模板匹配兜底...")
template_path = os.path.join(project_root, "Apps", "TeLaiDian", "Template", "jgxx.jpg")
match = d.image.match(template_path)
if match:
d.image.click(template_path)
else:
logger.warning("模板匹配也失败,执行坐标兜底...")
w, h = d.window_size()
d.click(w // 2, int(h * 0.45)) # 滑动后价格通常在屏幕中上部
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
except Exception as e:
logger.error(f"点击价格按钮失败: {e}")
logger.error(f"点击价格区域失败: {e}")
finally:
if os.path.exists(price_button_screen): os.remove(price_button_screen)
# 4. 截图并分析价格表
price_screen_path = take_screenshot(d, f"tld_detail_price_{int(time.time())}.jpg")
prices = await self.read_image_kit.analyze_detail_price(price_screen_path)
# 4. 循环滑动抓取完整分时电价
all_prices = []
last_price_md5 = None
price_page_count = 0
max_price_pages = 3 # 分时电价通常不会超过3页
logger.info("开始循环滑动抓取完整分时电价...")
while price_page_count < max_price_pages:
price_screen_path = take_screenshot(d, f"tld_detail_price_{int(time.time())}.jpg")
# 校验页面是否发生滚动变化
curr_md5 = get_image_content_md5(price_screen_path, top_ratio=0.2, bottom_ratio=0.2)
if curr_md5 == last_price_md5:
logger.info("价格页面内容无变化,判定已触底")
if os.path.exists(price_screen_path): os.remove(price_screen_path)
break
last_price_md5 = curr_md5
logger.info(f"正在分析价格详情页第 {price_page_count + 1} 页: {price_screen_path}")
page_prices = await self.read_image_kit.analyze_detail_price(price_screen_path)
if page_prices:
# 简单去重:根据时段合并
for p in page_prices:
if p not in all_prices:
all_prices.append(p)
# 向上滑动一点点,继续抓取
d.swipe_ext("up", scale=0.6)
await asyncio.sleep(1.5)
price_page_count += 1
# 清理临时截图
if os.path.exists(price_screen_path): os.remove(price_screen_path)
# 5. 保存数据
if prices:
if all_prices:
station_name_clean = clean_station_name(station_name)
logger.info(f"场站 {station_name_clean} 提取到 {len(prices)} 条价格信息,准备保存...")
await self.service.save_station_data(station_name_clean, address, prices)
# 对价格按时间排序
try:
all_prices.sort(key=lambda x: x.get('start', '00:00'))
except:
pass
logger.info(f"✅ 场站 {station_name_clean} 共提取到 {len(all_prices)} 条价格信息,准备保存...")
await self.service.save_station_data(station_name_clean, address, all_prices)
else:
logger.warning(f"未能{price_screen_path} 提取到价格信息")
logger.warning(f"未能提取到任何价格信息,请检查页面识别逻辑")
# 清理临时截图
for p in [first_screen_path, price_screen_path]:

View File

@@ -93,6 +93,64 @@ def save_image(path, img):
logger.error(f"Error saving image {path}: {e}")
return False
def detect_price_click_point_cv(image_path):
"""
使用 HSV 颜色过滤定位详情页的橘红色价格区域,返回最左侧区域的中心点击点
"""
img = read_image(image_path)
if img is None:
return None
h, w = img.shape[:2]
# 1. 转换为 HSV 空间
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# 2. 橘红色的 HSV 范围 (适配特来电价格颜色)
lower_orange = np.array([0, 150, 150])
upper_orange = np.array([20, 255, 255])
mask = cv2.inRange(hsv, lower_orange, upper_orange)
# 3. 对掩码进行膨胀,连接数字
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (50, 20))
dilated = cv2.dilate(mask, kernel)
# 4. 寻找轮廓
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
detected_areas = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
# 1. 过滤掉宽度过大(可能是横幅广告)或过小(可能是杂点)的区域
# 2. 价格区域 P0 通常在屏幕的中部,且宽度约为屏幕的一半
if 200 < y < h * 0.8 and 100 < cw < w * 0.6 and ch > 30:
detected_areas.append([x, y, x + cw, y + ch])
if not detected_areas:
# 备选:如果 HSV 失败,尝试通过轮廓大小寻找
# 1.1556 这种大数字通常会有很明显的轮廓
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (30, 10))
closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
if 200 < y < h * 0.6 and 150 < cw < 300 and 50 < ch < 150:
detected_areas.append([x, y, x + cw, y + ch])
if not detected_areas:
return None
# 5. 按 X 轴排序,取最左边的区域 (即用户确认的 P0)
# 但要排除掉可能在最左侧的导航栏返回按钮等小元素,所以前面加了宽度限制
detected_areas.sort(key=lambda b: b[0])
target = detected_areas[0]
center_x = (target[0] + target[2]) // 2
center_y = (target[1] + target[3]) // 2
return [center_x, center_y]
def detect_cards_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
"""
使用计算机图形学 (OpenCV) 检测列表中的场站卡片。

View File

@@ -22,27 +22,43 @@ class ReadImageKit:
async def analyze_detail_price(self, image_path):
"""
分析详情页截图,提取电价信息
分析详情页截图,提取电价信息包括优惠价、PLUS价和挂牌价
"""
prompt = """
分析这张充电站详情页截图,提取**电价时段表**。
请仔细寻找包含“时段”、“电价”、“服务费”或“总价”的表格或列表。
分析这张充电站价格详情页截图,提取**分时电价表**。
对于每个时段,请识别并提取以下所有价格信息(如果存在):
1. 优惠价 (通常是红色或加粗的大字,作为默认 price)
2. PLUS会员价 (标有 "PLUS" 标签的价格)
3. 挂牌价 (标有 "挂牌价" 标签的价格)
4. 电费 (Base electricity price)
5. 服务费 (Service fee)
请提取每个时段的:
1. 开始时间 (HH:MM)
2. 结束时间 (HH:MM)
3. 总电价 (元/度,包含电费和服务费)
- start: 开始时间 (HH:MM)
- end: 结束时间 (HH:MM)
- price: 优惠价 (元/度)
- plus_price: PLUS会员价 (元/度)
- market_price: 挂牌价 (元/度)
- elec_price: 电费 (元/度)
- service_price: 服务费 (元/度)
输出格式为 JSON 数组:
[
{
"start": "00:00",
"end": "08:00",
"price": 1.23
"start": "16:00",
"end": "21:00",
"price": 1.3435,
"plus_price": 1.3035,
"market_price": 1.4435,
"elec_price": 0.9435,
"service_price": 0.4000
},
...
]
如果无法识别任何价格信息,请返回空数组 []。
注意:
- 如果某个字段缺失,请设为 null。
- 确保 price 包含电费和服务费的总和。
- 如果无法识别任何价格信息,请返回空数组 []。
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)

View File

@@ -52,18 +52,38 @@ class TeLaiDianService:
now = datetime.now()
# 将价格转换为 24 小时的 schedule 格式 (0-23)
hourly_schedule = [0.0] * 24
# 每个小时存储一个包含多重价格的字典以记录优惠价、PLUS价和挂牌价
hourly_schedule = [None] * 24
for p in prices:
try:
start_hour = int(p['start'].split(':')[0])
end_hour = int(p['end'].split(':')[0])
price = float(p['price'])
start_parts = p['start'].split(':')
end_parts = p['end'].split(':')
start_hour = int(start_parts[0])
end_hour = int(end_parts[0])
# 处理跨天的情况(如 23:00 - 01:00
# 处理 00:00 作为结束时间的情况 (表示 24:00)
if end_hour == 0 and (int(end_parts[1]) == 0 if len(end_parts) > 1 else True):
if start_hour != 0:
end_hour = 24
# 提取各项价格
price_data = {
"price": float(p.get('price')) if p.get('price') is not None else 0.0,
"plus_price": float(p.get('plus_price')) if p.get('plus_price') is not None else None,
"market_price": float(p.get('market_price')) if p.get('market_price') is not None else None,
"elec_price": float(p.get('elec_price')) if p.get('elec_price') is not None else None,
"service_price": float(p.get('service_price')) if p.get('service_price') is not None else None
}
# 填充对应的小时槽位
curr = start_hour
while curr != end_hour:
hourly_schedule[curr] = price
curr = (curr + 1) % 24
# 如果是跨天的,比如 23:00 - 01:00
if end_hour < start_hour:
end_hour += 24
while curr < end_hour:
hourly_schedule[curr % 24] = price_data
curr += 1
except Exception as e:
logger.error(f"解析价格时段失败: {p}, error: {e}")
@@ -80,7 +100,7 @@ class TeLaiDianService:
valid_start_time=now
)
# 2. 保存价格
# 2. 保存价格计划
schedule_id = self.generate_id()
await self.station_price_schedule_model.save(
session=session,
@@ -89,6 +109,24 @@ class TeLaiDianService:
schedule_json=hourly_schedule,
valid_start_time=now
)
# 3. 保存当前状态快照 (包含当前小时的价格)
current_hour = now.hour
current_price_info = hourly_schedule[current_hour] or {}
status_id = self.generate_id()
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=None, # 特来电暂时没抓取总桩数
free_piles=None,
piles_detail_json=None,
current_price=current_price_info.get('price'),
pro_price=current_price_info.get('plus_price'),
market_price=current_price_info.get('market_price'),
valid_start_time=now
)
await session.commit()
logger.info(f"成功保存场站数据: {station_name}")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.7 KiB

After

Width:  |  Height:  |  Size: 13 KiB

View File

@@ -6,7 +6,7 @@ class StationStatus:
def __init__(self):
pass
async def save(self, session, id, station_hash, total_piles, free_piles, piles_detail_json, current_price, pro_price=None, parking_info=None, distance=None, valid_start_time=None):
async def save(self, session, id, station_hash, total_piles, free_piles, piles_detail_json, current_price, pro_price=None, market_price=None, parking_info=None, distance=None, valid_start_time=None):
if valid_start_time is None:
valid_start_time = datetime.now()
@@ -20,7 +20,7 @@ class StationStatus:
# 1. Check current record
select_sql = """
SELECT total_piles, free_piles, piles_detail_json, current_price, pro_price, parking_info, distance
SELECT total_piles, free_piles, piles_detail_json, current_price, pro_price, market_price, parking_info, distance
FROM t_station_status_scd
WHERE station_hash = :station_hash AND is_current = 1
"""
@@ -29,42 +29,35 @@ class StationStatus:
current_row = result.fetchone()
except Exception as e:
# Check if it's a "column not found" error
if "Unknown column 'parking_info'" in str(e) or "no such column: parking_info" in str(e):
# Handle schema evolution if needed, or just proceed assuming None for parking_info comparison
if "Unknown column 'market_price'" in str(e) or "no such column: market_price" in str(e):
current_row = None
# Or re-raise if we want to fail hard, but let's try to be robust
# For now, if column missing, we might fail on INSERT later anyway.
# So re-raising or logging might be better.
# But since I cannot easily alter table here, I will proceed with code update
# and assume user/I run the ALTER script.
else:
raise e
raise e
if current_row:
# Check if changed
# Note: current_row values might be Decimal, need to convert for comparison
row_total = current_row.total_piles
row_free = current_row.free_piles
row_json = current_row.piles_detail_json
row_price = current_row.current_price
row_pro_price = current_row.pro_price
row_parking = getattr(current_row, 'parking_info', None) # Safely get if column exists
row_market_price = getattr(current_row, 'market_price', None)
row_parking = getattr(current_row, 'parking_info', None)
row_distance = getattr(current_row, 'distance', None)
# Normalize row_json for comparison (handle key order differences)
# Normalize row_json for comparison
if row_json:
try:
if isinstance(row_json, str):
row_json_obj = json.loads(row_json)
row_json = json.dumps(row_json_obj, ensure_ascii=False, sort_keys=True)
except Exception:
pass # Keep original if parse fails
pass
# Convert price to float if it is Decimal, for comparison
if row_price is not None:
row_price = float(row_price)
if row_pro_price is not None:
row_pro_price = float(row_pro_price)
# Convert prices for comparison
if row_price is not None: row_price = float(row_price)
if row_pro_price is not None: row_pro_price = float(row_pro_price)
if row_market_price is not None: row_market_price = float(row_market_price)
# Simple comparison
is_same = (
@@ -72,13 +65,13 @@ class StationStatus:
row_free == free_piles and
row_price == current_price and
row_pro_price == pro_price and
row_market_price == market_price and
row_json == piles_json_str and
row_parking == parking_info and
row_distance == distance
)
if is_same:
# No change, skip insert
return
# Expire old record
@@ -95,9 +88,9 @@ class StationStatus:
# 2. Insert new record
sql = """
INSERT INTO t_station_status_scd
(id, station_hash, total_piles, free_piles, piles_detail_json, current_price, pro_price, parking_info, distance, valid_start_time, is_current)
(id, station_hash, total_piles, free_piles, piles_detail_json, current_price, pro_price, market_price, parking_info, distance, valid_start_time, is_current)
VALUES
(:id, :station_hash, :total_piles, :free_piles, :piles_detail_json, :current_price, :pro_price, :parking_info, :distance, :valid_start_time, 1)
(:id, :station_hash, :total_piles, :free_piles, :piles_detail_json, :current_price, :pro_price, :market_price, :parking_info, :distance, :valid_start_time, 1)
"""
await session.execute(text(sql), {
"id": id,
@@ -107,6 +100,7 @@ class StationStatus:
"piles_detail_json": piles_json_str,
"current_price": current_price,
"pro_price": pro_price,
"market_price": market_price,
"parking_info": parking_info,
"distance": distance,
"valid_start_time": valid_start_time

View File

@@ -49,8 +49,9 @@ CREATE TABLE IF NOT EXISTS t_station_status_scd (
`total_piles` INT COMMENT '总桩数',
`free_piles` INT COMMENT '空闲桩数',
`piles_detail_json` TEXT COMMENT '详细桩信息 (JSON格式)',
`current_price` DECIMAL(10, 4) COMMENT '当前价格快照',
`pro_price` DECIMAL(10, 4) COMMENT 'PRO会员专享价快照',
`current_price` DECIMAL(10, 4) COMMENT '当前价格快照 (优惠价)',
`pro_price` DECIMAL(10, 4) COMMENT 'PRO/PLUS会员专享价快照',
`market_price` DECIMAL(10, 4) COMMENT '挂牌价快照',
`parking_info` VARCHAR(500) COMMENT '停车收费信息',
`distance` VARCHAR(50) COMMENT '距离信息 (例如 5.3km)',
`valid_end_time` DATETIME NOT NULL DEFAULT '9999-12-31 23:59:59' COMMENT '记录生效结束时间',