'commit'
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
|
||||
# 采集配置
|
||||
SCROLL_DISTANCE_RATIO = 0.5
|
||||
MAX_STATIONS_COUNT = 100
|
||||
MAX_STATIONS_COUNT = 3
|
||||
FIRST_RUN_ONLY_ONE_STATION = False
|
||||
|
||||
# 调试绘图配置
|
||||
|
||||
@@ -55,38 +55,50 @@ class TeLaiDianCrawler(BaseCrawler):
|
||||
|
||||
async def clear_ads(self, d, max_rounds=3):
|
||||
"""
|
||||
清理页面上的广告弹窗
|
||||
清理页面上的广告弹窗,支持多轮检测
|
||||
"""
|
||||
logger.info(f"开始清理广告弹窗,最多尝试 {max_rounds} 轮...")
|
||||
logger.info(f"--- [广告清理] 开始检测,最多尝试 {max_rounds} 轮 ---")
|
||||
for i in range(max_rounds):
|
||||
ad_screen = take_screenshot(d, f"tld_ad_check_{int(time.time())}.jpg")
|
||||
res = await self.read_image_kit.find_close_button_vlm(ad_screen)
|
||||
round_idx = i + 1
|
||||
logger.info(f"[广告清理] 第 {round_idx} 轮:正在截屏分析...")
|
||||
ad_screen = take_screenshot(d, f"tld_ad_check_r{round_idx}_{int(time.time())}.jpg")
|
||||
|
||||
if res.get("has_ad") and res.get("close_point"):
|
||||
close_point = res.get("close_point")
|
||||
w, h = d.window_size()
|
||||
target_x = int(close_point[0] * w / 1000)
|
||||
target_y = int(close_point[1] * h / 1000)
|
||||
try:
|
||||
res = await self.read_image_kit.find_close_button_vlm(ad_screen)
|
||||
|
||||
# 安全校验:绝对不能点击微信小程序的胶囊按钮区 (右上角)
|
||||
# 胶囊按钮通常在 x > 80% 宽度 且 y < 100 像素(或 8% 高度) 的区域
|
||||
if target_x > w * 0.75 and target_y < 150:
|
||||
logger.warning(f"⚠️ 拒绝点击疑似微信胶囊按钮的区域: ({target_x}, {target_y})")
|
||||
continue
|
||||
if res.get("has_ad") and res.get("close_point"):
|
||||
close_point = res.get("close_point")
|
||||
reason = res.get("reason", "未提供原因")
|
||||
w, h = d.window_size()
|
||||
target_x = int(close_point[0] * w / 1000)
|
||||
target_y = int(close_point[1] * h / 1000)
|
||||
|
||||
logger.info(f"[广告清理] 第 {round_idx} 轮:VLM 发现广告!原因: {reason}")
|
||||
logger.info(f"[广告清理] 计划点击坐标: ({target_x}, {target_y}),归一化坐标: {close_point}")
|
||||
|
||||
# 安全校验:绝对不能点击微信小程序的胶囊按钮区 (右上角)
|
||||
if target_x > w * 0.75 and target_y < 150:
|
||||
logger.warning(f"[广告清理] ⚠️ 拒绝点击疑似微信胶囊按钮的区域: ({target_x}, {target_y}),跳过本轮。")
|
||||
continue
|
||||
|
||||
logger.info(f"第 {i+1} 轮发现广告: {res.get('reason')},点击关闭: ({target_x}, {target_y})")
|
||||
d.click(target_x, target_y)
|
||||
await asyncio.sleep(1.5)
|
||||
else:
|
||||
logger.info(f"第 {i+1} 轮未发现明显广告,清理结束。")
|
||||
logger.info(f"[广告清理] 正在执行点击关闭操作...")
|
||||
d.click(target_x, target_y)
|
||||
# 点击后等待一下,让弹窗消失或下一轮广告弹出
|
||||
await asyncio.sleep(2.0)
|
||||
else:
|
||||
logger.info(f"[广告清理] 第 {round_idx} 轮:未发现广告弹窗。VLM 理由: {res.get('reason', '无')}")
|
||||
if os.path.exists(ad_screen): os.remove(ad_screen)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"[广告清理] 第 {round_idx} 轮检测发生异常: {e}")
|
||||
finally:
|
||||
if os.path.exists(ad_screen): os.remove(ad_screen)
|
||||
break
|
||||
|
||||
if os.path.exists(ad_screen): os.remove(ad_screen)
|
||||
|
||||
logger.info("--- [广告清理] 任务结束 ---")
|
||||
|
||||
async def crawl_list_logic(self, d):
|
||||
# [临时禁用] 广告清理逻辑误触严重,先解决主流程问题
|
||||
# await self.clear_ads(d)
|
||||
# 1. 启动即清理广告 (已根据要求关闭)
|
||||
# await self.clear_ads(d, max_rounds=3)
|
||||
|
||||
# [优化] 向下滚动以刷新/校准地理位置
|
||||
# 使用更加显式的 swipe 方式:从屏幕 30% 划到 80%
|
||||
@@ -282,68 +294,75 @@ class TeLaiDianCrawler(BaseCrawler):
|
||||
|
||||
w, h = d.window_size()
|
||||
|
||||
logger.info("[详情页] 根据用户策略: 多次大幅向上滑动,直到页面基本不再变化")
|
||||
last_md5 = None
|
||||
stable_count = 0
|
||||
max_round = 30
|
||||
final_screen_path = None
|
||||
logger.info("[详情页] 根据用户要求:进入页面后多等会,然后向上滑动一整屏,通过 VLM 查找价格入口")
|
||||
w, h = d.window_size()
|
||||
|
||||
# 1. 增加等待时间,确保页面加载完成
|
||||
logger.info(f"[详情页] 等待 {WAIT_DETAIL_PAGE_LOAD + 2}s 确保页面稳定 (根据要求多等会)...")
|
||||
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD + 2)
|
||||
|
||||
# 2. 向上滑动一部分屏 (从屏幕底部的 85% 划到顶部的 15%)
|
||||
# [优化] 避免滑过头,从全屏改为 0.7 屏幕左右
|
||||
logger.info("[详情页] 执行向上滑动操作 (y: 85% -> 15%)")
|
||||
d.swipe(w // 2, int(h * 0.85), w // 2, int(h * 0.15), duration=0.8)
|
||||
await asyncio.sleep(2.0) # 滑动后再次等待稳定
|
||||
|
||||
from Apps.TeLaiDian.Kit import get_image_content_md5
|
||||
# 3. 截图并识别
|
||||
final_screen_path = take_screenshot(d, f"tld_detail_vlm_input_{int(time.time())}.jpg")
|
||||
logger.info(f"[详情页] 已截取 VLM 识别用图: {final_screen_path}")
|
||||
|
||||
for idx in range(max_round):
|
||||
start_x = int(w * 0.9)
|
||||
start_y = int(h * 0.85)
|
||||
end_y = int(h * 0.25)
|
||||
logger.info(f"[详情页] 第 {idx + 1} 轮大幅向上滑动: ({start_x}, {start_y}) -> ({start_x}, {end_y})")
|
||||
d.swipe(start_x, start_y, start_x, end_y, 0.25)
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
screen_path = take_screenshot(d, f"tld_detail_scan_{int(time.time())}_{idx}.jpg")
|
||||
logger.info(f"[详情页] 第 {idx + 1} 轮滑动后的截图: {screen_path}")
|
||||
curr_md5 = get_image_content_md5(
|
||||
screen_path,
|
||||
top_ratio=SAFE_EXCLUDE_RATIO,
|
||||
bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO,
|
||||
)
|
||||
if last_md5 is not None and curr_md5 == last_md5:
|
||||
stable_count += 1
|
||||
logger.info(f"[详情页] 页面内容连续第 {stable_count} 次无变化,可能已到稳定区域")
|
||||
else:
|
||||
stable_count = 0
|
||||
last_md5 = curr_md5
|
||||
final_screen_path = screen_path
|
||||
|
||||
if stable_count >= 2:
|
||||
logger.info("[详情页] 检测到页面多次无变化,认为已到达顶部固定区域,提前结束扫描。")
|
||||
break
|
||||
|
||||
if not final_screen_path:
|
||||
final_screen_path = take_screenshot(d, f"tld_detail_scan_final_{int(time.time())}.jpg")
|
||||
logger.info(f"[详情页] 扫描流程未生成截图,使用兜底截图: {final_screen_path}")
|
||||
else:
|
||||
logger.info(f"[详情页] 使用最终稳定区域截图作为价格页识别输入: {final_screen_path}")
|
||||
|
||||
logger.info("[详情页] 使用固定归一化坐标点击顶部“价格”标签,并点击左侧当前价数字进入全时段电价")
|
||||
logger.info("[详情页] 优先使用 VLM 识别顶部“价格”标签和价格入口")
|
||||
price_tab_screen = final_screen_path
|
||||
entrance_clicked = False
|
||||
|
||||
try:
|
||||
tab_x = int(PRICE_TAB_X_NORM * w / 1000)
|
||||
tab_y = int(PRICE_TAB_Y_NORM * h / 1000)
|
||||
logger.info(f"[详情页] 固定坐标点击价格标签: 归一化({PRICE_TAB_X_NORM}, {PRICE_TAB_Y_NORM}) -> 像素({tab_x}, {tab_y})")
|
||||
# 1. 尝试使用 VLM 寻找顶部“价格”标签
|
||||
tab_data = await self.read_image_kit.find_price_tab_vlm(final_screen_path)
|
||||
tab_x, tab_y = None, None
|
||||
|
||||
if tab_data.get("found") and tab_data.get("point"):
|
||||
p = tab_data["point"]
|
||||
tab_x = int(p[0] * w / 1000)
|
||||
tab_y = int(p[1] * h / 1000)
|
||||
logger.info(f"[详情页] VLM 成功找到价格标签: 归一化{p} -> 像素({tab_x}, {tab_y})")
|
||||
else:
|
||||
tab_x = int(PRICE_TAB_X_NORM * w / 1000)
|
||||
tab_y = int(PRICE_TAB_Y_NORM * h / 1000)
|
||||
logger.warning(f"[详情页] VLM 未找到价格标签,使用固定坐标兜底: 像素({tab_x}, {tab_y})")
|
||||
|
||||
logger.info(f"[详情页] 正在点击价格标签...")
|
||||
d.click(tab_x, tab_y)
|
||||
await asyncio.sleep(1.0)
|
||||
await asyncio.sleep(2.0) # 点击标签后多等会
|
||||
|
||||
price_tab_screen = take_screenshot(d, f"tld_detail_after_price_tab_{int(time.time())}.jpg")
|
||||
logger.info(f"[详情页] 点击价格标签后的界面截图已保存: {price_tab_screen}")
|
||||
|
||||
entry_x = int(PRICE_ENTRY_X_NORM * w / 1000)
|
||||
entry_y = int(PRICE_ENTRY_Y_NORM * h / 1000)
|
||||
# 2. 尝试使用 VLM 寻找价格入口 (重点识别“全部时段 >”)
|
||||
entry_data = await self.read_image_kit.find_price_entrance_vlm(price_tab_screen)
|
||||
entry_x, entry_y = None, None
|
||||
|
||||
if entry_data.get("found") and entry_data.get("point"):
|
||||
p = entry_data["point"]
|
||||
# 安全校验:严禁点击 y > 850 (85%) 的区域,那是底部浮动条
|
||||
if p[1] > 850:
|
||||
logger.warning(f"[详情页] VLM 虽返回坐标 {p},但 Y 轴过大 (疑似底部浮动条),强制判定为未找到。")
|
||||
entry_x = int(PRICE_ENTRY_X_NORM * w / 1000)
|
||||
entry_y = int(PRICE_ENTRY_Y_NORM * h / 1000)
|
||||
else:
|
||||
entry_x = int(p[0] * w / 1000)
|
||||
entry_y = int(p[1] * h / 1000)
|
||||
logger.info(f"[详情页] VLM 成功找到价格入口 (匹配全部时段标识): 归一化{p} -> 像素({entry_x}, {entry_y}),原因: {entry_data.get('reason')}")
|
||||
else:
|
||||
entry_x = int(PRICE_ENTRY_X_NORM * w / 1000)
|
||||
entry_y = int(PRICE_ENTRY_Y_NORM * h / 1000)
|
||||
logger.warning(f"[详情页] VLM 未找到价格入口,使用固定坐标兜底: 像素({entry_x}, {entry_y})")
|
||||
|
||||
click_x = max(5, min(w - 5, entry_x))
|
||||
click_y = max(5, min(h - 5, entry_y))
|
||||
|
||||
debug_click_path = price_tab_screen.replace(
|
||||
".jpg",
|
||||
f"_click_{tab_x}_{tab_y}_price_{click_x}_{click_y}.jpg"
|
||||
f"_click_tab_{tab_x}_{tab_y}_entry_{click_x}_{click_y}.jpg"
|
||||
)
|
||||
try:
|
||||
img = read_image(price_tab_screen)
|
||||
@@ -351,18 +370,16 @@ class TeLaiDianCrawler(BaseCrawler):
|
||||
cv2.circle(img, (tab_x, tab_y), 20, (0, 0, 255), -1)
|
||||
cv2.circle(img, (click_x, click_y), 20, (0, 0, 255), -1)
|
||||
save_image(debug_click_path, img)
|
||||
logger.info(f"[详情页] 已生成价格标签与当前价入口红点标记图: {debug_click_path}")
|
||||
else:
|
||||
logger.warning(f"[详情页] 加载价格页截图失败,无法绘制诊断红点: {price_tab_screen}")
|
||||
logger.info(f"[详情页] 已生成价格点击诊断红点图: {debug_click_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"[详情页] 生成价格入口点击诊断图片失败: {e}")
|
||||
logger.error(f"[详情页] 生成诊断图片失败: {e}")
|
||||
|
||||
logger.info(f"[详情页] 点击当前价入口: 像素({click_x}, {click_y}),屏幕大小: ({w}, {h})")
|
||||
logger.info(f"[详情页] 正在点击电价入口...")
|
||||
d.click(click_x, click_y)
|
||||
entrance_clicked = True
|
||||
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
|
||||
except Exception as e:
|
||||
logger.error(f"[详情页] 固定坐标点击价格标签或入口失败: {e}")
|
||||
logger.error(f"[详情页] 识别或点击价格入口失败: {e}")
|
||||
|
||||
if entrance_clicked:
|
||||
entered_price_path = take_screenshot(d, f"tld_detail_price_after_enter_{int(time.time())}.jpg")
|
||||
|
||||
@@ -417,6 +417,38 @@ def clear_temp_dir(save_dir=None):
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting file {file_path}: {e}")
|
||||
|
||||
from Util.EasyOcrKit import get_easyocr_reader
|
||||
|
||||
# 预加载 EasyOCR Reader (单例模式)
|
||||
def get_ocr_reader():
|
||||
return get_easyocr_reader(gpu=True)
|
||||
|
||||
def detect_price_info_container_cv(image_path):
|
||||
"""
|
||||
使用 OCR 精准定位详情页中的“全部时段”文本。
|
||||
返回: [x1, y1, x2, y2] 归一化坐标,如果未找到则返回 None
|
||||
"""
|
||||
img = read_image(image_path)
|
||||
if img is None:
|
||||
return None
|
||||
h, w = img.shape[:2]
|
||||
|
||||
try:
|
||||
reader = get_ocr_reader()
|
||||
# 使用封装后的方法查找文本
|
||||
found = reader.find_text_position(img, '全部时段')
|
||||
|
||||
if found:
|
||||
text, quad, prob = found
|
||||
# 使用封装后的方法计算归一化矩形
|
||||
res = reader.get_normalized_rect(quad, w, h)
|
||||
print(f"[OCR识别] 找到文本: '{text}', 置信度: {prob:.4f}, 归一化坐标: {res}")
|
||||
return res
|
||||
except Exception as e:
|
||||
print(f"OCR 识别发生异常: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def setup_logger(name, log_file=None, clear_old_log=False):
|
||||
"""
|
||||
配置日志,支持同时输出到控制台和文件。
|
||||
|
||||
@@ -10,7 +10,7 @@ if project_root not in sys.path:
|
||||
sys.path.append(project_root)
|
||||
|
||||
from Util.VLMKit import VLMKit
|
||||
from Apps.TeLaiDian.Kit import draw_rectangles, setup_logger, read_image
|
||||
from Apps.TeLaiDian.Kit import draw_rectangles, setup_logger, read_image, detect_price_info_container_cv
|
||||
from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO, MIN_CARD_HEIGHT
|
||||
|
||||
# 初始化日志
|
||||
@@ -60,44 +60,77 @@ class ReadImageKit:
|
||||
|
||||
async def find_price_entrance_vlm(self, image_path):
|
||||
"""
|
||||
使用 VLM 在详情页寻找价格入口(如:1.1556元/度 的卡片或价格信息按钮)
|
||||
使用 VLM 在详情页寻找价格入口
|
||||
重点识别:上方是大字价格(如 1.3435元/度),下方紧跟“全部时段 >”文字的区域
|
||||
"""
|
||||
prompt = """
|
||||
分析这张特来电充电站详情页截图,找到进入“分时电价详情”的点击入口。
|
||||
入口规则:
|
||||
1. 只选择“价格信息”模块中“当前价”下方的红色电价数字(例如 1.0689 元/度、1.3435 元/度)。
|
||||
2. 排除底部悬浮条或底部操作区中的红色价格(靠近“扫码充电”“立即充电”等按钮的区域)。
|
||||
3. 排除“停车参考价”“停车费参考价”等与停车相关的区域。
|
||||
4. 禁止选择页面顶部的标签栏,例如“价格 / 终端 / 电站 / 评论 / 周边”这一行中的任何文字或区域。
|
||||
5. 如果页面没有“当前价”,才选择用于展示充电价格的按钮,如“价格信息”“电价详情”。
|
||||
# 1. 尝试使用 CV 检测“价格信息”容器矩形
|
||||
container_norm = detect_price_info_container_cv(image_path)
|
||||
container_desc = ""
|
||||
vlm_image_path = image_path
|
||||
|
||||
位置约束(尽量满足):
|
||||
- Y 位置位于价格信息模块区域内:明显在顶部标签栏下方、在底部悬浮条上方。
|
||||
- X 位置应位于左侧价格列区域(当前价所在列),避免会员价右侧列。
|
||||
if container_norm:
|
||||
# 如果找到了容器,绘制绿框诊断图
|
||||
img = read_image(image_path)
|
||||
if img is not None:
|
||||
h, w = img.shape[:2]
|
||||
x1 = int(container_norm[0] * w / 1000)
|
||||
y1 = int(container_norm[1] * h / 1000)
|
||||
x2 = int(container_norm[2] * w / 1000)
|
||||
y2 = int(container_norm[3] * h / 1000)
|
||||
|
||||
# 在左侧划分出一个子区域(当前价区域)
|
||||
# 价格信息容器通常左右平分,左侧是当前价,右侧是会员价
|
||||
current_price_x2 = x1 + (x2 - x1) // 2
|
||||
|
||||
# 绘制绿框
|
||||
# 我们把整个容器标绿,并特别说明左侧是目标
|
||||
diag_path = image_path.replace(".jpg", "_price_box.jpg")
|
||||
import cv2
|
||||
diag_img = img.copy()
|
||||
cv2.rectangle(diag_img, (x1, y1), (x2, y2), (0, 255, 0), 3)
|
||||
cv2.line(diag_img, (current_price_x2, y1), (current_price_x2, y2), (0, 255, 0), 2)
|
||||
from Apps.TeLaiDian.Kit import save_image
|
||||
save_image(diag_path, diag_img)
|
||||
logger.info(f"[CV] 已检测到价格容器并保存绿框诊断图: {diag_path}")
|
||||
|
||||
# 更新 VLM 使用的图片为带绿框的图片,或者在提示词中说明
|
||||
vlm_image_path = diag_path
|
||||
container_desc = f"\n**视觉辅助**:图中已用【绿色矩形框】标出了“价格信息”区域。该区域被中间竖线分为左右两块。请重点分析【左侧半块】(即“当前价”所在的灰色背景区域)。"
|
||||
|
||||
prompt = f"""
|
||||
分析这张特来电充电站详情页截图,找到进入“电价详情”的点击入口。
|
||||
{container_desc}
|
||||
|
||||
**核心识别目标**:
|
||||
1. 在页面中上部区域(或绿框内的左侧部分)寻找价格显示模块。
|
||||
2. 该模块包含**红色、加粗的大字价格数字**(例如:1.3435元/度)。
|
||||
3. **绝对必要标识**:在价格数字的正下方,**必须能够清晰看到**文本“全部时段 >”或“全部时段”。
|
||||
4. 点击目标是“全部时段 >”文字所在的中心位置。
|
||||
|
||||
**严格限制与排除**:
|
||||
1. **严禁选择 y > 850 的区域**(即屏幕最底部的 15% 区域)。
|
||||
2. **严禁选择底部的浮动条**:浮动条通常包含“扫码充电”按钮,严禁返回其坐标。
|
||||
3. 如果绿框存在,请优先在绿框的【左侧区域】寻找。
|
||||
|
||||
请判断符合上述规则的价格入口是否存在,并给出其中心坐标。
|
||||
输出格式为 JSON:
|
||||
{
|
||||
{{
|
||||
"found": true/false,
|
||||
"reason": "为什么认为这是入口(说明是否基于当前价红色价格,并确认未选顶部标签栏或底部悬浮条)",
|
||||
"point": [x, y],
|
||||
"type": "price_card" / "button"
|
||||
}
|
||||
"reason": "解释你看到了什么,是否在绿框内找到了‘全部时段 >’",
|
||||
"point": [x, y] // 归一化坐标 [0-1000]
|
||||
}}
|
||||
"""
|
||||
try:
|
||||
res_text = await self.vlm.analyze_image(image_path, prompt)
|
||||
res_text = await self.vlm.analyze_image(vlm_image_path, prompt)
|
||||
json_str = self.vlm.extract_json(res_text)
|
||||
data = json.loads(json_str)
|
||||
|
||||
# 保存诊断图片
|
||||
if data.get("found") and data.get("point"):
|
||||
p = data["point"]
|
||||
# 转换坐标
|
||||
img = read_image(image_path)
|
||||
if img is not None:
|
||||
h, w = img.shape[:2]
|
||||
actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)]
|
||||
# 生成虚拟 bbox
|
||||
bbox = [actual_p[0]-60, actual_p[1]-40, actual_p[0]+60, actual_p[1]+40]
|
||||
draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p])
|
||||
logger.info(f"已生成价格入口诊断图片: {image_path.replace('.jpg', '_vl.jpg')}")
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,8 +1,7 @@
|
||||
# coding=utf-8
|
||||
import sys
|
||||
import os
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
# 将项目根目录添加到 sys.path
|
||||
project_root = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
71
Tools/Test_EasyOCR_Price.py
Normal file
71
Tools/Test_EasyOCR_Price.py
Normal file
@@ -0,0 +1,71 @@
|
||||
# coding=utf-8
|
||||
# pip install easyocr
|
||||
# python -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121 --force-reinstall
|
||||
import os
|
||||
import sys
|
||||
import cv2
|
||||
import numpy as np
|
||||
# 设置项目根目录
|
||||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
if project_root not in sys.path:
|
||||
sys.path.append(project_root)
|
||||
|
||||
from Util.EasyOcrKit import get_easyocr_reader
|
||||
|
||||
def test_easyocr_price():
|
||||
# 待测试的图片路径
|
||||
test_image_path = os.path.join(project_root, "Output", "Screenshot_20260115_083521.jpg")
|
||||
output_image_path = os.path.join(project_root, "Output", "Test_EasyOCR_Result.jpg")
|
||||
|
||||
print(f"--- 开始测试 EasyOCR 识别 ---")
|
||||
print(f"输入图片: {test_image_path}")
|
||||
|
||||
if not os.path.exists(test_image_path):
|
||||
print(f"错误: 测试图片不存在!")
|
||||
return
|
||||
|
||||
# 1. 初始化 OCR (使用封装类)
|
||||
reader = get_easyocr_reader(gpu=True)
|
||||
|
||||
# 2. 读取图片
|
||||
img = cv2.imread(test_image_path)
|
||||
if img is None:
|
||||
print("错误: 无法读取图片。")
|
||||
return
|
||||
|
||||
# 3. 识别
|
||||
target = '全部时段'
|
||||
found = reader.find_text_position(img, target)
|
||||
|
||||
h, w = img.shape[:2]
|
||||
|
||||
if found:
|
||||
text, quad, prob = found
|
||||
pts = np.array(quad).astype(int)
|
||||
cv2.polylines(img, [pts], isClosed=True, color=(0, 255, 0), thickness=2)
|
||||
|
||||
# 使用封装后的方法获取归一化中心点 (演示 get_normalized_rect)
|
||||
rect = reader.get_normalized_rect(quad, w, h)
|
||||
norm_x = (rect[0] + rect[2]) // 2
|
||||
norm_y = (rect[1] + rect[3]) // 2
|
||||
|
||||
print(f'找到“{text}” (目标: {target}) 置信度={prob:.2f}')
|
||||
print(f'归一化坐标: [{norm_x}, {norm_y}]')
|
||||
|
||||
cv2.putText(img, f"OCR Found: {text}", (pts[0][0], pts[0][1] - 10),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
|
||||
else:
|
||||
print(f"未能在图片中找到包含“{target}”的文本。")
|
||||
# 打印出所有识别到的文本,方便调试
|
||||
print("识别到的所有文本:")
|
||||
results = reader.read_text(img)
|
||||
for (_, text, _) in results:
|
||||
print(f" - {text}")
|
||||
|
||||
# 4. 保存结果图
|
||||
cv2.imwrite(output_image_path, img)
|
||||
print(f"--- 测试完成 ---")
|
||||
print(f"结果已保存至: {output_image_path}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_easyocr_price()
|
||||
67
Tools/Test_PriceBox_Detection.py
Normal file
67
Tools/Test_PriceBox_Detection.py
Normal file
@@ -0,0 +1,67 @@
|
||||
# coding=utf-8
|
||||
import os
|
||||
import sys
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
# 设置项目根目录
|
||||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
if project_root not in sys.path:
|
||||
sys.path.append(project_root)
|
||||
|
||||
from Apps.TeLaiDian.Kit import detect_price_info_container_cv, read_image, save_image
|
||||
|
||||
def test_price_box_detection():
|
||||
# 待测试的图片路径
|
||||
test_image_path = os.path.join(project_root, "Output", "Screenshot_20260115_083521.jpg")
|
||||
output_image_path = os.path.join(project_root, "Output", "Test_PriceBox_Result_V2.jpg")
|
||||
|
||||
print(f"--- 开始测试价格容器识别 ---")
|
||||
print(f"输入图片: {test_image_path}")
|
||||
|
||||
if not os.path.exists(test_image_path):
|
||||
print(f"错误: 测试图片不存在!")
|
||||
return
|
||||
|
||||
# 1. 调用刚才实现的 CV 识别函数
|
||||
container_norm = detect_price_info_container_cv(test_image_path)
|
||||
|
||||
if not container_norm:
|
||||
print("识别失败: 未能检测到‘价格信息’容器矩形。")
|
||||
return
|
||||
|
||||
print(f"识别成功! 归一化坐标: {container_norm}")
|
||||
|
||||
# 2. 读取图片并绘制绿框进行可视化验证
|
||||
img = read_image(test_image_path)
|
||||
if img is None:
|
||||
print("错误: 无法读取图片。")
|
||||
return
|
||||
|
||||
h, w = img.shape[:2]
|
||||
x1 = int(container_norm[0] * w / 1000)
|
||||
y1 = int(container_norm[1] * h / 1000)
|
||||
x2 = int(container_norm[2] * w / 1000)
|
||||
y2 = int(container_norm[3] * h / 1000)
|
||||
|
||||
# 绘制外框 (绿框)
|
||||
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 4)
|
||||
|
||||
# 绘制中间分割线 (区分左侧当前价和右侧会员价)
|
||||
mid_x = x1 + (x2 - x1) // 2
|
||||
cv2.line(img, (mid_x, y1), (mid_x, y2), (0, 255, 0), 2)
|
||||
|
||||
# 加上文字标注
|
||||
cv2.putText(img, "Target Area (Left Half)", (x1 + 10, y1 + 40),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
||||
|
||||
# 3. 保存结果图
|
||||
if save_image(output_image_path, img):
|
||||
print(f"--- 测试完成 ---")
|
||||
print(f"结果已保存至: {output_image_path}")
|
||||
print(f"请检查该图片中的绿框是否准确圈定了‘价格信息’板块及其左侧区域。")
|
||||
else:
|
||||
print("错误: 结果图保存失败。")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_price_box_detection()
|
||||
82
Util/EasyOcrKit.py
Normal file
82
Util/EasyOcrKit.py
Normal file
@@ -0,0 +1,82 @@
|
||||
# coding=utf-8
|
||||
import easyocr
|
||||
import numpy as np
|
||||
import cv2
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class EasyOcrKit:
|
||||
_instance = None
|
||||
_reader = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(EasyOcrKit, cls).__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, langs=['ch_sim', 'en'], gpu=True):
|
||||
"""
|
||||
初始化 EasyOCR Reader
|
||||
:param langs: 识别语言列表
|
||||
:param gpu: 是否使用 GPU
|
||||
"""
|
||||
if self._reader is None:
|
||||
try:
|
||||
self._reader = easyocr.Reader(langs, gpu=gpu)
|
||||
logger.info(f"EasyOCR Reader 初始化成功 (gpu={gpu})")
|
||||
except Exception as e:
|
||||
logger.error(f"EasyOCR Reader 初始化失败: {e}")
|
||||
# 如果 GPU 失败,尝试回退到 CPU
|
||||
if gpu:
|
||||
logger.warning("尝试回退到 CPU 模式...")
|
||||
self._reader = easyocr.Reader(langs, gpu=False)
|
||||
|
||||
def read_text(self, image):
|
||||
"""
|
||||
识别图片中的文字
|
||||
:param image: 图片路径或 OpenCV 图像对象
|
||||
:return: EasyOCR 识别结果列表
|
||||
"""
|
||||
if self._reader is None:
|
||||
return []
|
||||
return self._reader.readtext(image)
|
||||
|
||||
def find_text_position(self, image, target_text, threshold=0.5):
|
||||
"""
|
||||
在图片中查找特定文本的位置
|
||||
:param image: 图片路径或 OpenCV 图像对象
|
||||
:param target_text: 目标文本
|
||||
:param threshold: 置信度阈值
|
||||
:return: (found_text, quad, probability) 如果没找到则返回 None
|
||||
"""
|
||||
results = self.read_text(image)
|
||||
for (quad, text, prob) in results:
|
||||
if target_text in text and prob >= threshold:
|
||||
return text, quad, prob
|
||||
return None
|
||||
|
||||
def get_normalized_rect(self, quad, width, height):
|
||||
"""
|
||||
获取归一化的矩形坐标 [x1, y1, x2, y2] (0-1000)
|
||||
:param quad: EasyOCR 返回的四个顶点坐标
|
||||
:param width: 图片宽度
|
||||
:param height: 图片高度
|
||||
:return: [x1, y1, x2, y2]
|
||||
"""
|
||||
pts = np.array(quad).astype(int)
|
||||
x_min = np.min(pts[:, 0])
|
||||
y_min = np.min(pts[:, 1])
|
||||
x_max = np.max(pts[:, 0])
|
||||
y_max = np.max(pts[:, 1])
|
||||
|
||||
return [
|
||||
int(max(0, x_min) * 1000 / width),
|
||||
int(max(0, y_min) * 1000 / height),
|
||||
int(min(width, x_max) * 1000 / width),
|
||||
int(min(height, y_max) * 1000 / height)
|
||||
]
|
||||
|
||||
# 便捷函数
|
||||
def get_easyocr_reader(gpu=True):
|
||||
return EasyOcrKit(gpu=gpu)
|
||||
BIN
Util/__pycache__/EasyOcrKit.cpython-310.pyc
Normal file
BIN
Util/__pycache__/EasyOcrKit.cpython-310.pyc
Normal file
Binary file not shown.
Reference in New Issue
Block a user