445 lines
19 KiB
Python
445 lines
19 KiB
Python
# coding=utf-8
|
||
import logging
|
||
import os
|
||
import sys
|
||
import json
|
||
|
||
# Ensure sys path includes root for imports if not already
|
||
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
if project_root not in sys.path:
|
||
sys.path.append(project_root)
|
||
|
||
from Util.VLMKit import VLMKit
|
||
from Apps.TeLaiDian.Kit import draw_rectangles, setup_logger, read_image
|
||
from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO, MIN_CARD_HEIGHT
|
||
|
||
# 初始化日志
|
||
logger = setup_logger("ReadImageKit")
|
||
|
||
class ReadImageKit:
|
||
def __init__(self):
|
||
self.vlm = VLMKit()
|
||
|
||
async def find_price_tab_vlm(self, image_path):
|
||
"""
|
||
使用 VLM 在详情页顶部标签栏中寻找“价格”标签的点击位置
|
||
"""
|
||
prompt = """
|
||
分析这张特来电充电站详情页截图,找到顶部标签栏中“价格”两个字所在的点击区域中心。
|
||
要求:
|
||
1. 仅在页面最上方的标签栏里查找,该标签栏通常包含“价格 / 终端 / 电站 / 评论 / 周边”等文字。
|
||
2. 不要选择下面“价格信息”模块中的数字(例如 1.0689 元/度)或其它文本。
|
||
3. 不要选择最顶端系统状态栏或返回按钮等区域。
|
||
|
||
输出格式为 JSON:
|
||
{
|
||
"found": true/false,
|
||
"reason": "为什么认为这个位置是顶部“价格”标签",
|
||
"point": [x, y] // 归一化坐标,范围 [0-1000]
|
||
}
|
||
"""
|
||
try:
|
||
res_text = await self.vlm.analyze_image(image_path, prompt)
|
||
json_str = self.vlm.extract_json(res_text)
|
||
data = json.loads(json_str)
|
||
|
||
if data.get("found") and data.get("point"):
|
||
p = data["point"]
|
||
img = read_image(image_path)
|
||
if img is not None:
|
||
h, w = img.shape[:2]
|
||
actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)]
|
||
bbox = [actual_p[0]-60, actual_p[1]-30, actual_p[0]+60, actual_p[1]+30]
|
||
draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p])
|
||
logger.info(f"已生成价格标签诊断图片: {image_path.replace('.jpg', '_tab_vl.jpg')}")
|
||
|
||
return data
|
||
except Exception as e:
|
||
logger.error(f"VLM 寻找价格标签失败: {e}")
|
||
return {"found": False}
|
||
|
||
async def find_price_entrance_vlm(self, image_path):
|
||
"""
|
||
使用 VLM 在详情页寻找价格入口(如:1.1556元/度 的卡片或价格信息按钮)
|
||
"""
|
||
prompt = """
|
||
分析这张特来电充电站详情页截图,找到进入“分时电价详情”的点击入口。
|
||
入口规则:
|
||
1. 只选择“价格信息”模块中“当前价”下方的红色电价数字(例如 1.0689 元/度、1.3435 元/度)。
|
||
2. 排除底部悬浮条或底部操作区中的红色价格(靠近“扫码充电”“立即充电”等按钮的区域)。
|
||
3. 排除“停车参考价”“停车费参考价”等与停车相关的区域。
|
||
4. 禁止选择页面顶部的标签栏,例如“价格 / 终端 / 电站 / 评论 / 周边”这一行中的任何文字或区域。
|
||
5. 如果页面没有“当前价”,才选择用于展示充电价格的按钮,如“价格信息”“电价详情”。
|
||
|
||
位置约束(尽量满足):
|
||
- Y 位置位于价格信息模块区域内:明显在顶部标签栏下方、在底部悬浮条上方。
|
||
- X 位置应位于左侧价格列区域(当前价所在列),避免会员价右侧列。
|
||
|
||
请判断符合上述规则的价格入口是否存在,并给出其中心坐标。
|
||
输出格式为 JSON:
|
||
{
|
||
"found": true/false,
|
||
"reason": "为什么认为这是入口(说明是否基于当前价红色价格,并确认未选顶部标签栏或底部悬浮条)",
|
||
"point": [x, y],
|
||
"type": "price_card" / "button"
|
||
}
|
||
"""
|
||
try:
|
||
res_text = await self.vlm.analyze_image(image_path, prompt)
|
||
json_str = self.vlm.extract_json(res_text)
|
||
data = json.loads(json_str)
|
||
|
||
# 保存诊断图片
|
||
if data.get("found") and data.get("point"):
|
||
p = data["point"]
|
||
# 转换坐标
|
||
img = read_image(image_path)
|
||
if img is not None:
|
||
h, w = img.shape[:2]
|
||
actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)]
|
||
# 生成虚拟 bbox
|
||
bbox = [actual_p[0]-60, actual_p[1]-40, actual_p[0]+60, actual_p[1]+40]
|
||
draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p])
|
||
logger.info(f"已生成价格入口诊断图片: {image_path.replace('.jpg', '_vl.jpg')}")
|
||
|
||
return data
|
||
except Exception as e:
|
||
logger.error(f"VLM 寻找价格入口失败: {e}")
|
||
return {"found": False}
|
||
|
||
async def find_close_button_vlm(self, image_path):
|
||
"""
|
||
使用 VLM 在页面上寻找关闭按钮(用于清理广告弹窗)
|
||
"""
|
||
prompt = """
|
||
分析这张截图,判断是否存在弹窗广告或遮罩层。
|
||
如果存在,请找到关闭按钮(通常是圆圈里的 X,或者写着“跳过”、“关闭”的按钮)。
|
||
|
||
**重要警告**:
|
||
1. 严禁选择屏幕最右上角的按钮(微信小程序的“胶囊按钮”,包含三个点和圆圈)。
|
||
2. 广告关闭按钮通常在弹窗的边缘,或者是页面中心大弹窗的某个角落。
|
||
|
||
输出格式为 JSON:
|
||
{
|
||
"has_ad": true/false,
|
||
"reason": "简单描述发现的弹窗",
|
||
"close_point": [x, y] // 归一化坐标 [0-1000],如果不存在则为 null
|
||
}
|
||
"""
|
||
try:
|
||
res_text = await self.vlm.analyze_image(image_path, prompt)
|
||
json_str = self.vlm.extract_json(res_text)
|
||
data = json.loads(json_str)
|
||
|
||
# 保存诊断图片
|
||
if data.get("has_ad") and data.get("close_point"):
|
||
p = data["close_point"]
|
||
# 转换坐标
|
||
img = read_image(image_path)
|
||
if img is not None:
|
||
h, w = img.shape[:2]
|
||
actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)]
|
||
# 生成虚拟 bbox
|
||
bbox = [actual_p[0]-50, actual_p[1]-30, actual_p[0]+50, actual_p[1]+30]
|
||
draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p])
|
||
logger.info(f"已生成广告关闭诊断图片: {image_path.replace('.jpg', '_vl.jpg')}")
|
||
|
||
return data
|
||
except Exception as e:
|
||
logger.error(f"VLM 寻找关闭按钮失败: {e}")
|
||
return {"has_ad": False}
|
||
|
||
async def check_warm_popup_vlm(self, image_path):
|
||
"""
|
||
检测详情页是否存在“温馨提示/下次再说”弹窗
|
||
"""
|
||
prompt = """
|
||
分析这张特来电充电站详情页截图,判断是否存在带有“下次再说”或“下次现说”文案的温馨提示弹窗。
|
||
要求:
|
||
1. 只关注覆盖在详情页上方的弹窗或遮罩,其上包含“下次再说”“下次现说”等文字按钮。
|
||
2. 不要将正常页面中的列表项、价格卡片、终端状态等区域误判为弹窗。
|
||
3. 如果存在该弹窗,请给出“下次再说”按钮的大致点击中心位置。
|
||
|
||
输出格式为 JSON:
|
||
{
|
||
"has_popup": true/false,
|
||
"reason": "为什么认为有或没有温馨提示弹窗",
|
||
"button_point": [x, y] // 归一化坐标,范围 [0-1000],没有则为 null
|
||
}
|
||
"""
|
||
try:
|
||
res_text = await self.vlm.analyze_image(image_path, prompt)
|
||
json_str = self.vlm.extract_json(res_text)
|
||
data = json.loads(json_str)
|
||
|
||
if data.get("has_popup") and data.get("button_point"):
|
||
p = data["button_point"]
|
||
img = read_image(image_path)
|
||
if img is not None:
|
||
h, w = img.shape[:2]
|
||
actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)]
|
||
bbox = [actual_p[0]-80, actual_p[1]-40, actual_p[0]+80, actual_p[1]+40]
|
||
draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p])
|
||
logger.info(f"已生成温馨提示弹窗诊断图片: {image_path.replace('.jpg', '_warm_vl.jpg')}")
|
||
|
||
return data
|
||
except Exception as e:
|
||
logger.error(f"VLM 检测温馨提示弹窗失败: {e}")
|
||
return {"has_popup": False}
|
||
|
||
async def check_wrong_page_vlm(self, image_path):
|
||
"""
|
||
检查是否误触进入了错误的页面(如:新人福利、我的卡券、活动页等)
|
||
"""
|
||
prompt = """
|
||
分析这张截图,判断这是否是一个真实的“充电站详情页”。
|
||
|
||
**识别准则**:
|
||
1. 真正的“详情页”必须包含:充电站的具体名称、电价列表、终端状态(空闲/占用)等信息。
|
||
2. 如果页面标题是“新人福利专区”、“活动规则”、“我的卡券”、“领券中心”或类似的营销活动页,则判定为错误页面。
|
||
|
||
输出格式为 JSON:
|
||
{
|
||
"is_detail_page": true/false,
|
||
"page_type": "detail" / "marketing" / "coupons" / "other",
|
||
"reason": "判断依据"
|
||
}
|
||
"""
|
||
try:
|
||
res_text = await self.vlm.analyze_image(image_path, prompt)
|
||
json_str = self.vlm.extract_json(res_text)
|
||
data = json.loads(json_str)
|
||
return data
|
||
except Exception as e:
|
||
logger.error(f"VLM 检查页面类型失败: {e}")
|
||
return {"is_detail_page": True} # 默认认为是详情页,避免死循环
|
||
|
||
async def analyze_detail_price(self, image_path):
|
||
"""
|
||
分析详情页截图,提取电价信息,包括优惠价、PLUS价和挂牌价
|
||
"""
|
||
prompt = """
|
||
分析这张充电站价格详情页截图,提取**分时电价表**。
|
||
对于每个时段,请识别并提取以下所有价格信息(如果存在):
|
||
1. 优惠价 (通常是红色或加粗的大字,作为默认 price)
|
||
2. PLUS会员价 (标有 "PLUS" 标签的价格)
|
||
3. 挂牌价 (标有 "挂牌价" 标签的价格)
|
||
4. 电费 (Base electricity price)
|
||
5. 服务费 (Service fee)
|
||
|
||
请提取每个时段的:
|
||
- start: 开始时间 (HH:MM)
|
||
- end: 结束时间 (HH:MM)
|
||
- price: 优惠价 (元/度)
|
||
- plus_price: PLUS会员价 (元/度)
|
||
- market_price: 挂牌价 (元/度)
|
||
- elec_price: 电费 (元/度)
|
||
- service_price: 服务费 (元/度)
|
||
|
||
输出格式为 JSON 数组:
|
||
[
|
||
{
|
||
"start": "16:00",
|
||
"end": "21:00",
|
||
"price": 1.3435,
|
||
"plus_price": 1.3035,
|
||
"market_price": 1.4435,
|
||
"elec_price": 0.9435,
|
||
"service_price": 0.4000
|
||
},
|
||
...
|
||
]
|
||
注意:
|
||
- 如果某个字段缺失,请设为 null。
|
||
- 确保 price 包含电费和服务费的总和。
|
||
- 如果无法识别任何价格信息,请返回空数组 []。
|
||
"""
|
||
try:
|
||
res_text = await self.vlm.analyze_image(image_path, prompt)
|
||
logger.info(f"VLM Price Analysis Result for {os.path.basename(image_path)}: {res_text[:200]}...")
|
||
|
||
json_str = self.vlm.extract_json(res_text)
|
||
prices = json.loads(json_str)
|
||
|
||
normalized_prices = []
|
||
if isinstance(prices, list):
|
||
for p in prices:
|
||
new_p = p.copy()
|
||
if 'time_range' in p and ('start' not in p or 'end' not in p):
|
||
tr = p['time_range'].replace('~', '-').replace(' ', '')
|
||
parts = tr.split('-')
|
||
if len(parts) >= 2:
|
||
new_p['start'] = parts[0]
|
||
new_p['end'] = parts[1]
|
||
|
||
if 'price' not in p:
|
||
if 'total_price' in p:
|
||
new_p['price'] = p['total_price']
|
||
elif 'elec_price' in p and 'service_price' in p:
|
||
try:
|
||
new_p['price'] = float(p['elec_price']) + float(p['service_price'])
|
||
except:
|
||
pass
|
||
normalized_prices.append(new_p)
|
||
return normalized_prices
|
||
return []
|
||
except Exception as e:
|
||
logger.error(f"分析电价详情失败: {e}")
|
||
return []
|
||
|
||
async def analyze_detail_basic_info(self, image_path):
|
||
"""
|
||
分析详情页首屏截图,提取场站名称和精确地址
|
||
"""
|
||
prompt = """
|
||
分析这张充电站详情页首屏截图,提取:
|
||
1. 场站名称 (通常在页面中部,大字体)
|
||
2. 详细地址 (通常在名称下方或页面下半部分,伴有地址图标)
|
||
|
||
输出格式为 JSON:
|
||
{
|
||
"name": "xxx充电站",
|
||
"address": "xxx省xxx市xxx区xxx路xxx号"
|
||
}
|
||
"""
|
||
try:
|
||
res_text = await self.vlm.analyze_image(image_path, prompt)
|
||
json_str = self.vlm.extract_json(res_text)
|
||
return json.loads(json_str)
|
||
except Exception as e:
|
||
logger.error(f"分析详情页基础信息失败: {e}")
|
||
return {}
|
||
|
||
async def analyze_station_list(self, image_path):
|
||
"""
|
||
分析场站列表页图片,提取场站位置和基本信息
|
||
"""
|
||
prompt = """
|
||
分析这张充电站列表截图,提取所有真实的充电站卡片。
|
||
|
||
要求:
|
||
1. 忽略页面上半部分(如顶部导航栏、搜索框、广告 Banner、筛选标签等)。
|
||
2. 仅识别下半部分一条条“充电站卡片”,每张卡片通常包含:场站名称、评分、最近充电时间、距离、价格、快/慢空闲数量等。
|
||
3. 不要把同一张卡片拆成多块;每条场站只对应一个矩形框。
|
||
|
||
对于每张卡片,请输出:
|
||
- name: 场站名称
|
||
- address: 场站地址(如果无法确定可置为 null)
|
||
- point: 卡片中心点击坐标 [x, y],使用归一化坐标 [0-1000](0 表示最左/最上,1000 表示最右/最下)
|
||
- bbox: 卡片外接矩形边界 [x1, y1, x2, y2],同样使用归一化坐标 [0-1000]
|
||
|
||
以 JSON 数组形式输出,例如:
|
||
[
|
||
{
|
||
"name": "某某充电站",
|
||
"address": "某某路 100 号",
|
||
"point": [500, 750],
|
||
"bbox": [50, 600, 950, 820]
|
||
}
|
||
]
|
||
"""
|
||
|
||
try:
|
||
res_text = await self.vlm.analyze_image(image_path, prompt)
|
||
json_str = self.vlm.extract_json(res_text)
|
||
vlm_results = json.loads(json_str)
|
||
|
||
final_stations = []
|
||
vlm_list = vlm_results if isinstance(vlm_results, list) else []
|
||
img = read_image(image_path)
|
||
h, w = img.shape[:2] if img is not None else (2400, 1080)
|
||
y_threshold = h * SAFE_EXCLUDE_RATIO
|
||
bottom_threshold = int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO))
|
||
|
||
def to_pixel(v, max_len):
|
||
if v is None:
|
||
return None
|
||
try:
|
||
fv = float(v)
|
||
except:
|
||
return None
|
||
if fv <= 1000.0:
|
||
return int(fv * max_len / 1000.0)
|
||
return int(fv)
|
||
|
||
for res in vlm_list:
|
||
if not res:
|
||
continue
|
||
p = res.get("point")
|
||
b = res.get("bbox")
|
||
actual_p = None
|
||
actual_bbox = None
|
||
|
||
if p and len(p) == 2:
|
||
px = to_pixel(p[0], w)
|
||
py = to_pixel(p[1], h)
|
||
if px is not None and py is not None:
|
||
actual_p = [px, py]
|
||
|
||
if b and len(b) == 4:
|
||
x1 = to_pixel(b[0], w)
|
||
y1 = to_pixel(b[1], h)
|
||
x2 = to_pixel(b[2], w)
|
||
y2 = to_pixel(b[3], h)
|
||
if None not in (x1, y1, x2, y2):
|
||
actual_bbox = [max(0, int(x1)), max(0, int(y1)), min(w, int(x2)), min(h, int(y2))]
|
||
|
||
if actual_p is None and actual_bbox:
|
||
actual_p = [(actual_bbox[0] + actual_bbox[2]) // 2, (actual_bbox[1] + actual_bbox[3]) // 2]
|
||
|
||
if actual_p is None and actual_bbox is None:
|
||
continue
|
||
|
||
if actual_p and (actual_p[1] < y_threshold or actual_p[1] > bottom_threshold):
|
||
logger.warning(f"过滤掉可能的误点击点 (y={actual_p[1]}): {res.get('name')}")
|
||
continue
|
||
|
||
if actual_bbox is None and actual_p:
|
||
half_w = int(w * 0.4)
|
||
half_h = max(MIN_CARD_HEIGHT // 2, 90)
|
||
x1 = max(0, actual_p[0] - half_w)
|
||
x2 = min(w, actual_p[0] + half_w)
|
||
y1 = max(0, actual_p[1] - half_h)
|
||
y2 = min(h, actual_p[1] + half_h)
|
||
actual_bbox = [x1, y1, x2, y2]
|
||
|
||
if actual_bbox is not None:
|
||
bx1, by1, bx2, by2 = actual_bbox
|
||
current_h = by2 - by1
|
||
if current_h < MIN_CARD_HEIGHT * 0.8 or by2 > bottom_threshold:
|
||
continue
|
||
|
||
final_stations.append({
|
||
"name": res.get("name"),
|
||
"address": res.get("address"),
|
||
"point": actual_p,
|
||
"bbox": actual_bbox
|
||
})
|
||
|
||
if final_stations:
|
||
final_stations.sort(key=lambda s: (s.get("point")[1] if s.get("point") else (s.get("bbox")[1] if s.get("bbox") else 0)))
|
||
|
||
# 保存诊断图片 (_vl.jpg, _flag.jpg)
|
||
if final_stations:
|
||
debug_bboxes = []
|
||
debug_points = []
|
||
for s in final_stations:
|
||
if s.get("bbox"):
|
||
debug_bboxes.append(s["bbox"])
|
||
if s.get("point"):
|
||
debug_points.append(s["point"])
|
||
|
||
# 如果没有 bbox,根据 point 生成一个虚拟 bbox 供可视化
|
||
if not debug_bboxes and debug_points:
|
||
for p in debug_points:
|
||
debug_bboxes.append([p[0]-50, p[1]-30, p[0]+50, p[1]+30])
|
||
|
||
if debug_bboxes or debug_points:
|
||
draw_rectangles(image_path, bboxes=debug_bboxes, click_points=debug_points)
|
||
logger.info(f"已生成诊断图片: {image_path.replace('.jpg', '_vl.jpg')} 和 _flag.jpg")
|
||
|
||
return final_stations
|
||
except Exception as e:
|
||
logger.error(f"分析列表页失败: {e}")
|
||
return []
|