This commit is contained in:
HuangHai
2026-01-15 09:51:16 +08:00
parent fe27bc6884
commit 98e890a693
8 changed files with 3 additions and 449 deletions

View File

@@ -7,7 +7,7 @@ import json
import cv2
from Apps.TeLaiDian.Kit import (
take_screenshot, get_image_content_md5, clean_station_name,
setup_logger, detect_price_click_point_cv, read_image, save_image
setup_logger, read_image, save_image
)
from Apps.TeLaiDian.ReadImageKit import ReadImageKit
from Apps.TeLaiDian.Service import TeLaiDianService
@@ -29,11 +29,6 @@ if project_root not in sys.path:
# 初始化日志
logger = setup_logger("TeLaiDianCrawler")
PRICE_TAB_X_NORM = 220
PRICE_TAB_Y_NORM = 130
PRICE_ENTRY_X_NORM = 230
PRICE_ENTRY_Y_NORM = 380
class TeLaiDianCrawler(BaseCrawler):
def __init__(self, service=None):
super().__init__(service or TeLaiDianService())
@@ -241,7 +236,7 @@ class TeLaiDianCrawler(BaseCrawler):
await self.crawl_detail_logic(d, station)
# 标记为已处理
await self.redis_kit.set_data(redis_key, "1", ex=REDIS_STATION_EXPIRE)
await self.redis_kit.set_data(redis_key, "1", expire=REDIS_STATION_EXPIRE)
d.press("back")
await asyncio.sleep(WAIT_BACK_TO_LIST)
@@ -358,24 +353,6 @@ class TeLaiDianCrawler(BaseCrawler):
price_tab_screen = found_entry["screen"]
p = found_entry["point"]
# 1. 先点击顶部“价格”标签 (确保切到价格页,虽然滚动前可能已经点击,但这里做双保险)
# 先 OCR 找一次标签
tab_data = await self.read_image_kit.find_price_tab_ocr(price_tab_screen)
tab_x, tab_y = None, None
if tab_data.get("found"):
p_tab = tab_data["point"]
tab_x, tab_y = int(p_tab[0] * w / 1000), int(p_tab[1] * h / 1000)
logger.info(f"[详情页] 点击顶部价格标签: ({tab_x}, {tab_y})")
d.click(tab_x, tab_y)
await asyncio.sleep(1.0)
else:
# 按照用户要求:找不到文字时输出日志、截图并停止程序
fail_screen = take_screenshot(d, f"tld_ocr_tab_fail_{int(time.time())}.jpg")
logger.error(f"❌ [OCR失败] 在页面中未找到‘价格’标签文字!")
logger.error(f"❌ [OCR失败] 最终截图已保存至: {fail_screen}")
logger.error("❌ [OCR失败] 程序将停止运行,请检查页面内容或识别逻辑。")
sys.exit(1)
# 2. 点击“全部时段”入口
entry_x = int(p[0] * w / 1000)
entry_y = int(p[1] * h / 1000)

View File

@@ -93,270 +93,6 @@ def save_image(path, img):
logger.error(f"Error saving image {path}: {e}")
return False
def detect_list_price_blocks_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
img = read_image(image_path)
if img is None:
return []
h, w = img.shape[:2]
top_limit = int(h * top_ratio)
bottom_limit = int(h * (1 - bottom_ratio))
roi_bgr = img[top_limit:bottom_limit, :, :]
if roi_bgr.size == 0:
return []
b, g, r = cv2.split(roi_bgr)
mask_bgr = (r > 190) & (g > 80) & (r - g > 25) & (r - b > 25)
mask_bgr = (mask_bgr.astype(np.uint8)) * 255
if cv2.countNonZero(mask_bgr) < 50:
hsv = cv2.cvtColor(roi_bgr, cv2.COLOR_BGR2HSV)
lower_red1 = np.array([0, 100, 120])
upper_red1 = np.array([20, 255, 255])
lower_red2 = np.array([160, 100, 120])
upper_red2 = np.array([180, 255, 255])
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
mask = cv2.bitwise_or(mask1, mask2)
else:
mask = mask_bgr
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (30, 10))
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
blocks = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
y_global = y + top_limit
if ch < 30 or ch > 140:
continue
if cw < 80 or cw > int(w * 0.8):
continue
center_y = y_global + ch // 2
if center_y < int(h * 0.6) or center_y > bottom_limit:
continue
blocks.append([x, y_global, x + cw, y_global + ch])
blocks.sort(key=lambda b: b[1])
return blocks
def detect_price_click_point_cv(image_path):
"""
使用 HSV 颜色过滤定位详情页的橘红色价格区域,返回最左侧区域的中心点击点
"""
img = read_image(image_path)
if img is None:
return None
h, w = img.shape[:2]
# 1. 转换为 HSV 空间
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# 2. 橘红色的 HSV 范围 (适配特来电价格颜色)
lower_orange = np.array([0, 150, 150])
upper_orange = np.array([20, 255, 255])
mask = cv2.inRange(hsv, lower_orange, upper_orange)
# 3. 对掩码进行膨胀,连接数字
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (50, 20))
dilated = cv2.dilate(mask, kernel)
# 4. 寻找轮廓
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
detected_areas = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
# 1. 过滤掉宽度过大(可能是横幅广告)或过小(可能是杂点)的区域
# 2. 价格区域 P0 通常在屏幕的中部,且宽度约为屏幕的一半
if 200 < y < h * 0.8 and 100 < cw < w * 0.6 and ch > 30:
detected_areas.append([x, y, x + cw, y + ch])
if not detected_areas:
# 备选:如果 HSV 失败,尝试通过轮廓大小寻找
# 1.1556 这种大数字通常会有很明显的轮廓
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (30, 10))
closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
if 200 < y < h * 0.6 and 150 < cw < 300 and 50 < ch < 150:
detected_areas.append([x, y, x + cw, y + ch])
if not detected_areas:
return None
# 5. 按 X 轴排序,取最左边的区域 (即用户确认的 P0)
# 但要排除掉可能在最左侧的导航栏返回按钮等小元素,所以前面加了宽度限制
detected_areas.sort(key=lambda b: b[0])
target = detected_areas[0]
center_x = (target[0] + target[2]) // 2
center_y = (target[1] + target[3]) // 2
return [center_x, center_y]
def detect_cards_cv(image_path, top_ratio=0.40, bottom_ratio=0.12):
"""
使用计算机图形学 (OpenCV) 检测列表中的场站卡片。
"""
from Apps.TeLaiDian.Config.Setting import MIN_CARD_HEIGHT
img = read_image(image_path)
if img is None:
return []
h, w = img.shape[:2]
price_blocks = detect_list_price_blocks_cv(image_path, top_ratio=top_ratio, bottom_ratio=bottom_ratio)
cards = []
if price_blocks:
from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO
top_limit = int(h * SAFE_EXCLUDE_RATIO)
bottom_limit = int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO))
for b in price_blocks:
px1, py1, px2, py2 = b
center_y = (py1 + py2) // 2
card_height = MIN_CARD_HEIGHT
card_top = max(top_limit, center_y - int(card_height * 0.7))
card_bottom = card_top + card_height
if card_bottom > bottom_limit:
card_bottom = bottom_limit
card_top = max(top_limit, card_bottom - card_height)
card_left = 0
card_right = w
cards.append([card_left, card_top, card_right, card_bottom])
cards.sort(key=lambda b: b[1])
return cards
# 转换为灰度图
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 限制检测范围
top_limit = int(h * top_ratio)
bottom_limit = int(h * (1 - bottom_ratio))
logger.info(f"CV检测卡片: h={h}, w={w}, top_limit={top_limit}, bottom_limit={bottom_limit}")
# 使用自适应阈值
thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2)
# 闭运算:连接断开的边缘
# 减小水平核宽度,增加一点垂直连接,以适应卡片边缘不连续的情况
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (w // 4, 3))
closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
# 寻找轮廓
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cards = []
# 更严格的最小宽度要求:列表卡片通常接近整宽
min_card_width = int(w * 0.8)
temp_boxes = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
center_y = y + ch // 2
# 绿色营销券区域排除
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
roi = hsv[max(0,y):min(h,y+ch), max(0,x):min(w,x+cw)]
green_mask = cv2.inRange(roi, np.array([35, 80, 80]), np.array([85, 255, 255]))
green_ratio = float(cv2.countNonZero(green_mask)) / (roi.shape[0]*roi.shape[1]) if roi.size > 0 else 0.0
# 圆角矩形判断:面积占比与顶点数量
rect_area = cw * ch
cnt_area = cv2.contourArea(cnt)
extent = cnt_area / rect_area if rect_area > 0 else 0
approx = cv2.approxPolyDP(cnt, 0.02 * cv2.arcLength(cnt, True), True)
ok_width = cw >= min_card_width
ok_height = ch > MIN_CARD_HEIGHT * 0.8
ok_vertical = center_y >= int(h * 0.58) and y > top_limit and center_y < bottom_limit
ok_shape = (len(approx) >= 6 or extent > 0.85)
ok_color = green_ratio < 0.25
if ok_width and ok_height and ok_vertical and ok_shape and ok_color:
logger.info(f"✅ 找到候选卡片: y={y}, h={ch}, w={cw}, extent={extent:.2f}, verts={len(approx)}, green={green_ratio:.2f}")
temp_boxes.append((x, y, cw, ch))
else:
if ok_width:
logger.debug(f"❌ 排除候选: y={y}, h={ch}, w={cw}, extent={extent:.2f}, verts={len(approx)}, green={green_ratio:.2f} (limit: {top_limit}-{bottom_limit})")
if not temp_boxes:
logger.warning(f"⚠️ CV 未能在指定范围 ({top_limit}-{bottom_limit}) 内检测到任何场站卡片")
# 按 Y 轴排序
temp_boxes.sort(key=lambda b: b[1])
# 再次过滤和去重,并根据高度拆分为多张卡片
for i, box in enumerate(temp_boxes):
x, y, cw, ch = box
# 检查是否与已有的框重叠(按顶部 y 去重)
is_duplicate = False
for v in cards:
if abs(y - v[1]) < 100:
is_duplicate = True
break
if is_duplicate:
continue
padding = 2
x1 = max(0, x + padding)
x2 = min(w, x + cw - padding)
y1 = max(top_limit, y + padding)
y2 = min(bottom_limit, y + ch - padding)
if y2 <= y1:
continue
effective_h = y2 - y1
estimated_count = max(1, int(round(effective_h / float(MIN_CARD_HEIGHT))))
if estimated_count <= 1:
cards.append([x1, y1, x2, y2])
else:
seg_h = effective_h / float(estimated_count)
for k in range(estimated_count):
sy1 = int(round(y1 + k * seg_h))
sy2 = int(round(y1 + (k + 1) * seg_h))
if sy2 <= sy1:
continue
if sy2 - sy1 < MIN_CARD_HEIGHT * 0.6:
continue
cards.append([x1, sy1, x2, sy2])
return cards
def detect_wide_rounded_card_cv(image_path, min_width_ratio=0.8, min_y_ratio=0.5):
img = read_image(image_path)
if img is None:
return []
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
blur = cv2.GaussianBlur(gray, (7, 7), 0)
edges = cv2.Canny(blur, 60, 180)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15))
closed = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
results = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
center_y = y + ch // 2
if cw < int(w * min_width_ratio):
continue
if center_y < int(h * min_y_ratio):
continue
rect_area = cw * ch
cnt_area = cv2.contourArea(cnt)
if rect_area <= 0:
continue
extent = cnt_area / rect_area
approx = cv2.approxPolyDP(cnt, 0.02 * cv2.arcLength(cnt, True), True)
if extent < 0.7:
continue
if len(approx) >= 6 or (len(approx) == 4 and extent > 0.85):
results.append([x, y, x + cw, y + ch])
results.sort(key=lambda b: b[1])
return results
def draw_rectangles(image_path, bboxes=None, click_points=None):
"""
使用 OpenCV 在图片上绘制矩形框和点击点,生成 _vl.jpg 和 _flag.jpg

View File

@@ -10,6 +10,7 @@ if project_root not in sys.path:
sys.path.append(project_root)
from Util.VLMKit import VLMKit
from Util.EasyOcrKit import get_easyocr_reader
from Apps.TeLaiDian.Kit import draw_rectangles, setup_logger, read_image, detect_price_info_container_cv
from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO, MIN_CARD_HEIGHT
@@ -20,166 +21,6 @@ class ReadImageKit:
def __init__(self):
self.vlm = VLMKit()
async def find_price_tab_vlm(self, image_path):
"""
使用 VLM 在详情页顶部标签栏中寻找“价格”标签的点击位置
"""
prompt = """
分析这张特来电充电站详情页截图,找到顶部标签栏中“价格”两个字所在的点击区域中心。
要求:
1. 仅在页面最上方的标签栏里查找,该标签栏通常包含“价格 / 终端 / 电站 / 评论 / 周边”等文字。
2. 不要选择下面“价格信息”模块中的数字(例如 1.0689 元/度)或其它文本。
3. 不要选择最顶端系统状态栏或返回按钮等区域。
输出格式为 JSON
{
"found": true/false,
"reason": "为什么认为这个位置是顶部“价格”标签",
"point": [x, y] // 归一化坐标,范围 [0-1000]
}
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)
json_str = self.vlm.extract_json(res_text)
data = json.loads(json_str)
if data.get("found") and data.get("point"):
p = data["point"]
img = read_image(image_path)
if img is not None:
h, w = img.shape[:2]
actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)]
bbox = [actual_p[0]-60, actual_p[1]-30, actual_p[0]+60, actual_p[1]+30]
draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p])
logger.info(f"已生成价格标签诊断图片: {image_path.replace('.jpg', '_tab_vl.jpg')}")
return data
except Exception as e:
logger.error(f"VLM 寻找价格标签失败: {e}")
return {"found": False}
async def find_price_entrance_vlm(self, image_path):
"""
使用 VLM 在详情页寻找价格入口
重点识别:上方是大字价格(如 1.3435元/度),下方紧跟“全部时段 >”文字的区域
"""
# 1. 尝试使用 CV 检测“价格信息”容器矩形
container_norm = detect_price_info_container_cv(image_path)
container_desc = ""
vlm_image_path = image_path
if container_norm:
# 如果找到了容器,绘制绿框诊断图
img = read_image(image_path)
if img is not None:
h, w = img.shape[:2]
x1 = int(container_norm[0] * w / 1000)
y1 = int(container_norm[1] * h / 1000)
x2 = int(container_norm[2] * w / 1000)
y2 = int(container_norm[3] * h / 1000)
# 在左侧划分出一个子区域(当前价区域)
# 价格信息容器通常左右平分,左侧是当前价,右侧是会员价
current_price_x2 = x1 + (x2 - x1) // 2
# 绘制绿框
# 我们把整个容器标绿,并特别说明左侧是目标
diag_path = image_path.replace(".jpg", "_price_box.jpg")
import cv2
diag_img = img.copy()
cv2.rectangle(diag_img, (x1, y1), (x2, y2), (0, 255, 0), 3)
cv2.line(diag_img, (current_price_x2, y1), (current_price_x2, y2), (0, 255, 0), 2)
from Apps.TeLaiDian.Kit import save_image
save_image(diag_path, diag_img)
logger.info(f"[CV] 已检测到价格容器并保存绿框诊断图: {diag_path}")
# 更新 VLM 使用的图片为带绿框的图片,或者在提示词中说明
vlm_image_path = diag_path
container_desc = f"\n**视觉辅助**:图中已用【绿色矩形框】标出了“价格信息”区域。该区域被中间竖线分为左右两块。请重点分析【左侧半块】(即“当前价”所在的灰色背景区域)。"
prompt = f"""
分析这张特来电充电站详情页截图,找到进入“电价详情”的点击入口。
{container_desc}
**核心识别目标**
1. 在页面中上部区域(或绿框内的左侧部分)寻找价格显示模块。
2. 该模块包含**红色、加粗的大字价格数字**例如1.3435元/度)。
3. **绝对必要标识**:在价格数字的正下方,**必须能够清晰看到**文本“全部时段 >”或“全部时段”。
4. 点击目标是“全部时段 >”文字所在的中心位置。
**严格限制与排除**
1. **严禁选择 y > 850 的区域**(即屏幕最底部的 15% 区域)。
2. **严禁选择底部的浮动条**:浮动条通常包含“扫码充电”按钮,严禁返回其坐标。
3. 如果绿框存在,请优先在绿框的【左侧区域】寻找。
输出格式为 JSON
{{
"found": true/false,
"reason": "解释你看到了什么,是否在绿框内找到了‘全部时段 >",
"point": [x, y] // 归一化坐标 [0-1000]
}}
"""
try:
res_text = await self.vlm.analyze_image(vlm_image_path, prompt)
json_str = self.vlm.extract_json(res_text)
data = json.loads(json_str)
# 保存诊断图片
if data.get("found") and data.get("point"):
p = data["point"]
img = read_image(image_path)
if img is not None:
h, w = img.shape[:2]
actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)]
bbox = [actual_p[0]-60, actual_p[1]-40, actual_p[0]+60, actual_p[1]+40]
draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p])
logger.info(f"已生成价格入口诊断图片: {image_path.replace('.jpg', '_vl.jpg')}")
return data
except Exception as e:
logger.error(f"VLM 寻找价格入口失败: {e}")
return {"found": False}
async def find_price_tab_ocr(self, image_path):
"""
使用 OCR 在详情页顶部标签栏中寻找“价格”标签
不再使用 VLM
"""
from Apps.TeLaiDian.Kit import draw_rectangles
img = read_image(image_path)
if img is None:
return {"found": False}
h, w = img.shape[:2]
reader = get_easyocr_reader(gpu=True)
# 限制只在顶部 30% 区域查找标签
roi = img[0:int(h*0.3), :]
results = reader.read_text(roi)
for (quad, text, prob) in results:
if '价格' in text and prob > 0.3:
# 转换坐标 (加上 ROI 的偏移,这里 y 偏移是 0)
res = reader.get_normalized_rect(quad, w, h)
center_x = (res[0] + res[2]) // 2
center_y = (res[1] + res[3]) // 2
# 绘制诊断图
actual_p = [int(center_x * w / 1000), int(center_y * h / 1000)]
bbox = [int(res[0] * w / 1000), int(res[1] * h / 1000), int(res[2] * w / 1000), int(res[3] * h / 1000)]
draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p])
logger.info(f"[OCR] 已定位顶部价格标签并保存诊断图: {image_path.replace('.jpg', '_tab_vl.jpg')}")
return {
"found": True,
"reason": f"OCR 定位到‘{text}",
"point": [center_x, center_y]
}
logger.warning(f"[OCR] 未能在顶部区域定位到‘价格’标签")
return {"found": False}
async def find_price_entrance_ocr(self, image_path):
"""
使用 OCR 在详情页寻找价格入口 (全部时段 >)