Files
aiData/Apps/XinDianTu/Kit.py
HuangHai 24380767a4 'commit'
2026-01-17 10:52:13 +08:00

983 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
import cv2
import numpy as np
import time
import hashlib
from Apps.XinDianTu.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO
from Config.Config import TEMP_IMAGE_DIR
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def read_image(path):
"""读取图片,支持中文路径及鲁棒性检查"""
try:
if not path or not os.path.exists(path):
return None
if os.path.getsize(path) == 0:
return None
return cv2.imdecode(np.fromfile(path, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
except Exception as e:
logger.error(f"读取图片失败 {path}: {e}")
return None
def get_image_content_md5(file_path, top_ratio=0.1, bottom_ratio=0.1):
"""
计算图片核心内容的 MD5 值(排除状态栏和导航栏)
"""
img = read_image(file_path)
if img is None:
return None
h, w = img.shape[:2]
top = int(h * top_ratio)
bottom = int(h * (1 - bottom_ratio))
# 裁剪中间部分
content = img[top:bottom, :]
# 将图片数据转换为字节流计算 MD5
success, encoded_img = cv2.imencode(".jpg", content)
if success:
return hashlib.md5(encoded_img.tobytes()).hexdigest()
return hashlib.md5(content.tobytes()).hexdigest()
def save_image(path, img):
"""保存图片,支持中文路径及鲁棒性检查"""
try:
if img is None:
return False
# 确保目录存在
dir_name = os.path.dirname(path)
if dir_name and not os.path.exists(dir_name):
os.makedirs(dir_name, exist_ok=True)
ext = os.path.splitext(path)[1]
if not ext:
ext = ".jpg"
success, encoded_img = cv2.imencode(ext, img)
if success:
encoded_img.tofile(path)
return True
return False
except Exception as e:
logger.error(f"保存图片失败 {path}: {e}")
return False
# 截图
def take_screenshot(d, image_uuid, save_dir=TEMP_IMAGE_DIR):
path = f"{save_dir}/{image_uuid}.jpg"
os.makedirs(save_dir, exist_ok=True)
d.screenshot(path)
return path
def clear_temp_dir(save_dir=TEMP_IMAGE_DIR):
"""清空临时目录中的所有文件"""
if not os.path.exists(save_dir):
return
logger.info(f"正在清空临时目录: {save_dir}")
for file in os.listdir(save_dir):
file_path = os.path.join(save_dir, file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
elif os.path.isdir(file_path):
import shutil
shutil.rmtree(file_path)
except Exception as e:
logger.error(f"无法删除文件 {file_path}: {e}")
def is_background_dimmed(image_path, threshold=80):
"""
检测背景是否被暗色蒙板覆盖 (Image Mask Detection)
原理:计算屏幕四周边缘区域的平均亮度。如果边缘区域普遍较暗(蒙板效果),则返回 True。
:param image_path: 截图路径
:param threshold: 亮度阈值,低于此值认为是蒙板 (0-255)
:return: bool
"""
if not os.path.exists(image_path):
return False
img = read_image(image_path)
if img is None:
return False
h, w = img.shape[:2]
# 转换为灰度图计算亮度
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 定义边缘区域 (四周各取 5% 的宽度/高度)
edge_h = int(h * 0.05)
edge_w = int(w * 0.05)
# 提取四个角落/边缘块
top_edge = gray[0:edge_h, :]
bottom_edge = gray[h-edge_h:h, :]
left_edge = gray[:, 0:edge_w]
right_edge = gray[:, w-edge_w:w]
# 计算平均亮度
avg_brightness = (np.mean(top_edge) + np.mean(bottom_edge) + np.mean(left_edge) + np.mean(right_edge)) / 4
logger.info(f"Background Dimmed Check: Avg Brightness = {avg_brightness:.2f} (Threshold: {threshold})")
# 如果平均亮度低于阈值,说明背景被压暗了
return avg_brightness < threshold
from Util.EasyOcrKit import get_easyocr_reader
# 预加载 EasyOCR Reader (单例模式)
def get_ocr_reader():
return get_easyocr_reader(gpu=True)
def detect_price_info_container_cv(image_path):
"""
使用 OCR 精准定位详情页中的价格入口文本(“全部时段”)。
返回: [x1, y1, x2, y2] 归一化坐标 (0-1000),如果未找到则返回 None
"""
img = read_image(image_path)
if img is None:
return None
h, w = img.shape[:2]
keywords = ['全部时段']
try:
reader = get_ocr_reader()
# 获取所有识别结果
results = reader.read_text(img)
for (quad, text, prob) in results:
# 检查是否包含关键字
if any(kw in text for kw in keywords) and prob >= 0.5:
# 使用封装后的方法计算归一化矩形
res = reader.get_normalized_rect(quad, w, h)
logger.info(f"[OCR识别] 找到文本: '{text}', 置信度: {prob:.4f}, 归一化坐标: {res}")
return res
except Exception as e:
logger.error(f"OCR 识别发生异常: {e}")
return None
def detect_rabbit_ad_close(image_path, debug_dir=None):
"""
通过图形学算法检测“新电兔AI”广告的关闭按钮
特征黑色圆中间有白色X
:param image_path: 截图路径
:param debug_dir: 调试图保存目录
:return: (x, y) 归一化坐标,如果未找到返回 None
"""
if not os.path.exists(image_path):
return None
img = read_image(image_path)
if img is None:
return None
h, w = img.shape[:2]
# ROI: 左侧,中下部 (x: 0-25%, y: 60-90%)
roi_x1, roi_x2 = 0, int(w * 0.25)
roi_y1, roi_y2 = int(h * 0.6), int(h * 0.9)
roi = img[roi_y1:roi_y2, roi_x1:roi_x2]
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
candidates = []
# 尝试多个阈值以应对不同的亮度环境
for threshold_val in [40, 60, 80, 100]:
_, thresh = cv2.threshold(gray, threshold_val, 255, cv2.THRESH_BINARY_INV)
# Find contours
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
# 圆形度检查
perimeter = cv2.arcLength(cnt, True)
if perimeter == 0: continue
circularity = 4 * np.pi * (area / (perimeter * perimeter))
# 兔子广告关闭按钮通常很小 (面积在 100-4000 之间)
if 100 < area < 4000 and circularity > 0.4:
# 获取该候选区域的 bounding box
x, y, w_cnt, h_cnt = cv2.boundingRect(cnt)
# 在这个黑色圆内部,检查是否有亮色的 'X'
padding = 2
inner_roi = gray[max(0, y-padding):min(roi.shape[0], y+h_cnt+padding),
max(0, x-padding):min(roi.shape[1], x+w_cnt+padding)]
# 找亮色物体 (X)
_, inner_thresh = cv2.threshold(inner_roi, 180, 255, cv2.THRESH_BINARY)
inner_contours, _ = cv2.findContours(inner_thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
has_x = False
for i_cnt in inner_contours:
i_area = cv2.contourArea(i_cnt)
# X 应该比圆小很多
if 10 < i_area < area * 0.5:
has_x = True
break
if has_x or circularity > 0.7:
M = cv2.moments(cnt)
if M["m00"] != 0:
cX = int(M["m10"] / M["m00"]) + roi_x1
cY = int(M["m01"] / M["m00"]) + roi_y1
norm_x = cX / w
norm_y = cY / h
# 避免重复
if not any(abs(cX - c[0]) < 15 and abs(cY - c[1]) < 15 for c in candidates):
candidates.append((cX, cY, area, norm_x, norm_y, has_x))
if not candidates:
return None
# 评分逻辑
def score_candidate(c):
# c = (cx, cy, area, nx, ny, has_x)
has_x = c[5]
# 基础分:如果有 X大幅加分
score = 1000 if has_x else 0
# 距离分:越靠近预期的 (0.094, 0.830) 分越高
dist = np.sqrt((c[3] - 0.094)**2 + (c[4] - 0.830)**2)
score -= dist * 2000 # 归一化后距离变小,需加大权重
# 面积分:理想面积在 500-1500 之间
if 500 < c[2] < 1500: score += 200
return score
candidates.sort(key=score_candidate, reverse=True)
best = candidates[0]
best_score = score_candidate(best)
logger.info(f"CV detected rabbit ad close button at Norm({best[3]:.3f}, {best[4]:.3f}) with score {best_score:.2f}")
# 【优化】如果得分太低 (低于 850),说明误判概率较大,不予返回
if best_score < 850:
logger.info(f"Score {best_score:.2f} is below threshold 850, ignoring candidate.")
return None
if debug_dir:
os.makedirs(debug_dir, exist_ok=True)
debug_img = img.copy()
cv2.circle(debug_img, (best[0], best[1]), 10, (0, 0, 255), -1) # 使用红点表示识别结果
save_image(os.path.join(debug_dir, "debug_rabbit_ad_cv.jpg"), debug_img)
return [best[3], best[4]]
def setup_logger(name, log_file=None, clear_old_log=False):
"""
配置日志,支持同时输出到控制台和文件。
使用供应商代号作为父级 Logger所有子 Logger 继承其 Handler
并通过 propagate=False 避免与根 Logger 重复。
:param name: Logger 名称
:param log_file: 指定日志文件路径,如果不指定则使用默认路径
:param clear_old_log: 是否在启动时清空旧日志文件
"""
# 1. 获取供应商代号 (如 XinDianTu)
supplier_code = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
# 2. 获取父级 Logger 并配置
parent_logger = logging.getLogger(supplier_code)
parent_logger.setLevel(logging.INFO)
parent_logger.propagate = False # 禁止向上传递给 root logger防止重复
if log_file is None:
# 获取项目根目录 (aiData)
root_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
log_dir = os.path.join(root_dir, "Logs")
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, f"{supplier_code}.log")
# 如果需要清空旧日志且文件存在
if clear_old_log and os.path.exists(log_file):
try:
# 关闭现有的 handler 以便删除文件
for handler in parent_logger.handlers[:]:
handler.close()
parent_logger.removeHandler(handler)
os.remove(log_file)
except Exception as e:
print(f"无法清空旧日志文件 {log_file}: {e}")
if not parent_logger.handlers:
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 控制台 Handler
ch = logging.StreamHandler()
ch.setFormatter(formatter)
parent_logger.addHandler(ch)
# 文件 Handler
fh = logging.FileHandler(log_file, encoding='utf-8')
fh.setFormatter(formatter)
parent_logger.addHandler(fh)
# 3. 返回子 Logger
if name == supplier_code:
return parent_logger
return logging.getLogger(f"{supplier_code}.{name}")
def detect_black_agree_button(image_path, debug_dir=None):
"""
通过计算机图形学检测黑色的"同意"按钮 (Image 1 场景)
特征:黑色圆角矩形,位于屏幕中下部,面积适中
:param image_path: 截图路径
:return: (x, y) 坐标中心点,如果未找到返回 None
"""
if not os.path.exists(image_path):
return None
img = read_image(image_path)
if img is None:
return None
h, w = img.shape[:2]
# 转换为HSV颜色空间因为黑色更容易过滤
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# 定义黑色的范围
# 黑色V (Brightness) 很低
lower_black = np.array([0, 0, 0])
upper_black = np.array([180, 255, 40]) # V < 40 认为是黑色
mask = cv2.inRange(hsv, lower_black, upper_black)
# 限制搜索区域:通常在屏幕下半部分
roi_top = int(h * 0.4)
roi_bottom = int(h * 0.8)
roi_mask = np.zeros_like(mask)
roi_mask[roi_top:roi_bottom, :] = mask[roi_top:roi_bottom, :]
# 形态学操作:去除噪点,连接断开的区域
kernel = np.ones((5, 5), np.uint8)
roi_mask = cv2.morphologyEx(roi_mask, cv2.MORPH_CLOSE, kernel)
roi_mask = cv2.morphologyEx(roi_mask, cv2.MORPH_OPEN, kernel)
# 查找轮廓
contours, _ = cv2.findContours(roi_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
best_cnt = None
max_area = 0
for cnt in contours:
area = cv2.contourArea(cnt)
x, y, cw, ch = cv2.boundingRect(cnt)
aspect_ratio = float(cw) / ch
# 过滤条件
# 1. 面积要够大 (例如 > 屏幕面积的 1%)
if area < (w * h * 0.01):
continue
# 2. 宽高比:通常按钮是扁长条,例如 > 2.0
if aspect_ratio < 2.0 or aspect_ratio > 10.0:
continue
# 3. 宽度:通常占据屏幕宽度的 50% 以上
if cw < (w * 0.5):
continue
if area > max_area:
max_area = area
best_cnt = cnt
if best_cnt is not None:
x, y, cw, ch = cv2.boundingRect(best_cnt)
center_x = x + cw // 2
center_y = y + ch // 2
logger.info(f"Found Black Agree Button at ({center_x}, {center_y}), Size: {cw}x{ch}")
if debug_dir:
os.makedirs(debug_dir, exist_ok=True)
debug_img = img.copy()
cv2.rectangle(debug_img, (x, y), (x + cw, y + ch), (0, 0, 255), 2)
cv2.circle(debug_img, (center_x, center_y), 5, (0, 255, 0), -1)
save_image(os.path.join(debug_dir, "debug_agree_btn.jpg"), debug_img)
return (center_x, center_y)
return None
def find_expand_button_position(image_path, debug_dir=None, debug_filename_prefix=None):
"""
通过几何特征识别"全部时段"按钮的位置
特征:该行左侧(30%)和右侧(30%)基本为空白,中间有内容
:param image_path: 截图路径
:param debug_dir: 调试图片保存目录如果为None则不保存
:param debug_filename_prefix: 调试图片文件名前缀
:return: (x, y) 坐标中心点,如果未找到返回 None
"""
if not os.path.exists(image_path):
return None
img = read_image(image_path)
if img is None:
return None
h, w = img.shape[:2]
# 转灰度
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 边缘检测
edges = cv2.Canny(gray, 50, 150)
# 定义区域
left_w = int(w * 0.35)
right_w = int(w * 0.65)
# 提取各部分边缘
left_part = edges[:, :left_w]
right_part = edges[:, right_w:]
center_part = edges[:, left_w:right_w]
# 计算每一行的边缘点数量
row_sum_left = np.sum(left_part, axis=1) / 255.0
row_sum_right = np.sum(right_part, axis=1) / 255.0
row_sum_center = np.sum(center_part, axis=1) / 255.0
# 筛选条件:左右边缘点很少,中间边缘点较多
# 阈值可以根据实际情况调整
# 允许少量噪点所以不是严格的0而是小于某个较小值例如宽度的1%
noise_threshold = 2 # 允许2个像素的噪点
content_threshold = 5 # 中间至少有5个像素的边缘
candidates = (row_sum_left <= noise_threshold) & \
(row_sum_right <= noise_threshold) & \
(row_sum_center >= content_threshold)
# 找到连续的候选行
y_indices = np.where(candidates)[0]
if len(y_indices) == 0:
return None
# 将连续行分组
segments = []
if len(y_indices) > 0:
start = y_indices[0]
prev = y_indices[0]
for y in y_indices[1:]:
if y > prev + 5: # 允许5像素断裂
segments.append((start, prev))
start = y
prev = y
segments.append((start, prev))
# 筛选最合适的段
best_segment = None
# 我们期望按钮在屏幕中下部,且高度适中(例如 20-100px
# 且通常是在价格表下方。假设价格表占据了屏幕上部。
# 我们可以简单地取符合条件的段中Y值最大的那个最靠下的或者最符合"中间有字"特征的。
# 考虑到页面底部可能有其他干扰,取"中下部"的一个。
valid_segments = []
for start, end in segments:
height = end - start
mid_y = (start + end) // 2
# 过滤掉太高或太矮的区域
if height < 20 or height > 150:
continue
# 过滤掉顶部的区域(可能是标题栏误判)
if mid_y < h * 0.3:
continue
# 过滤掉底部的区域(可能是底部按钮)
if mid_y > h * 0.9:
continue
valid_segments.append((start, end))
if not valid_segments:
return None
# 如果有多个,通常"全部时段"是在价格表之后,紧接着的一个
# 这里我们取第一个(最靠上的)符合条件的段,因为它紧跟在价格表下方
# 或者取所有段中,中间内容最"紧凑"的?
# 让我们简单点,取第一个符合条件的段。
best_segment = valid_segments[0]
start, end = best_segment
center_y = (start + end) // 2
center_x = w // 2
if debug_dir:
os.makedirs(debug_dir, exist_ok=True)
debug_img = img.copy()
# 画出识别区域
cv2.rectangle(debug_img, (0, start), (w, end), (0, 255, 0), 2)
# 画出红点
cv2.circle(debug_img, (center_x, center_y), 10, (0, 0, 255), -1)
# 保存 flag 图片
if debug_filename_prefix:
# Sanitize filename: remove invalid chars
import re
safe_prefix = re.sub(r'[\\/*?:"<>|]', '_', str(debug_filename_prefix))
debug_name = f"{safe_prefix}_flag_expand.jpg"
else:
timestamp = time.strftime("%Y%m%d_%H%M%S")
debug_name = f"{timestamp}_flag_expand.jpg"
debug_path = os.path.join(debug_dir, debug_name)
save_image(debug_path, debug_img)
logger.info(f"Saved debug image to {debug_path}")
return (int(center_x), int(center_y))
def get_row_stats(gray):
"""
计算每一行的统计特征
"""
h, w = gray.shape
# 中央区域 (用于检测内容)
center_x = w // 2
strip_w = 100
center_strip = gray[:, center_x - 50: center_x + 50]
# 边缘区域 (用于检测背景/边距)
# 假设边距至少有 10px
edge_strip = gray[:, 0:20]
row_means = np.mean(center_strip, axis=1)
row_stds = np.std(center_strip, axis=1)
edge_means = np.mean(edge_strip, axis=1)
return row_means, row_stds, edge_means
def clean_station_name(name):
"""
清理场站名称,移除结尾的省略号及多余空格,方便比对
"""
if not name: return ""
name = name.strip()
# 移除结尾的 . (通常是截断标识)
while name.endswith("."):
name = name[:-1]
return name.strip()
def get_file_md5(path):
"""计算文件的 MD5 值"""
if not os.path.exists(path):
return ""
hash_md5 = hashlib.md5()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def crop_cards_from_image(img_path, output_dir=None, save_debug=True):
"""
从图片中裁剪场站卡片
:param img_path: 图片路径
:param output_dir: 输出目录,默认与 img_path 相同
:param save_debug: 是否保存调试图 (_flag.jpg)
:return: 裁剪出的卡片列表,每项包含 (out_path, (click_x, click_y))
"""
logger.info(f"Processing: {img_path}")
if not os.path.exists(img_path):
logger.info(f"Error: File not found {img_path}")
return []
img = read_image(img_path)
if img is None:
logger.info(f"Error: Failed to load image {img_path}")
return []
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
h, w = gray.shape
row_means, row_stds, edge_means = get_row_stats(gray)
# 参数定义
# 背景灰度值范围 (根据 Analyze2.py 的输出,背景约 242)
BG_GRAY_MIN = 230
BG_GRAY_MAX = 250
# 间隙判定:中央区域也是均匀的灰度
# 真正的背景灰度约 242。卡片内的伪背景(不够白)约 246-248。
# 所以降低 GAP_MEAN_MAX 以区分 Gap 和 Dirty White。
GAP_MEAN_MIN = 235
GAP_MEAN_MAX = 244
GAP_STD_MAX = 10.0
segments = []
is_in_card = False
start_y = 0
# 记录原始的行状态,用于后续可能的合并优化
row_is_card = []
for y in range(h):
rm = row_means[y]
rs = row_stds[y]
em = edge_means[y]
# 1. 判定当前行是否可能是卡片的一部分
# 条件A: 边缘是灰色的 (说明有边距排除了全宽的Header/Footer)
has_gray_margin = (BG_GRAY_MIN <= em <= BG_GRAY_MAX)
# 条件B: 中央不是均匀的背景灰 (说明有内容,或者是白色底)
# 如果是 Gap则 Mean 在 Gap范围 且 Std 很小
is_gap = (GAP_MEAN_MIN <= rm <= GAP_MEAN_MAX) and (rs < GAP_STD_MAX)
is_card_row = has_gray_margin and not is_gap
row_is_card.append(is_card_row)
# 简单的形态学闭运算:填补卡片内部的小裂缝
# 如果一个 False (Gap) 的上下都是 True (Card),且 Gap 长度很短,则将其视为 Card
# 允许的裂缝最大长度
# [优化] 减小填补阈值,避免把两个卡片中间的真 Gap 填补了导致粘连
# 之前是 10现在改为 3。
MAX_GAP_FILL = 3
cleaned_row_is_card = row_is_card[:]
# 查找连续的 False 区域
i = 0
while i < h:
if not cleaned_row_is_card[i]:
# Found a gap start
gap_start = i
while i < h and not cleaned_row_is_card[i]:
i += 1
gap_end = i
gap_len = gap_end - gap_start
# Check context
prev_is_card = (gap_start > 0) and cleaned_row_is_card[gap_start - 1]
next_is_card = (gap_end < h) and cleaned_row_is_card[gap_end]
if prev_is_card and next_is_card and gap_len <= MAX_GAP_FILL:
# Fill the gap
for k in range(gap_start, gap_end):
cleaned_row_is_card[k] = True
else:
i += 1
# 根据 cleaned_row_is_card 生成 segments
is_in_card = False
start_y = 0
for y in range(h):
if cleaned_row_is_card[y]:
if not is_in_card:
is_in_card = True
start_y = y
else:
if is_in_card:
is_in_card = False
end_y = y
height = end_y - start_y
# 过滤太矮的区域
if height > 100: # 稍微放宽一点150 -> 100
segments.append((start_y, end_y))
# 处理最后一个 segment
if is_in_card:
end_y = h
height = end_y - start_y
if height > 100:
segments.append((start_y, end_y))
# [新增] 触底过滤:过滤掉延伸到图片底部的 Segment
# 底部通常是导航栏或被截断的卡片
BOTTOM_MARGIN = 50 # 距离底部多少像素内算触底
valid_segments_scan = []
for y1, y2 in segments:
if y2 >= h - BOTTOM_MARGIN:
logger.info(f" Dropping segment Y={y1}-{y2} because it touches the bottom (H={h}).")
continue
valid_segments_scan.append((y1, y2))
segments = valid_segments_scan
logger.info(f" Found {len(segments)} potential segments based on vertical scan.")
# 1.5 Group Segments by Large Gaps (to separate Header / List / Footer)
# The filter bar usually creates a large gap (e.g. > 60px).
SECTION_GAP_MIN = 60
groups = []
if segments:
current_group = [segments[0]]
for i in range(1, len(segments)):
prev_end = segments[i - 1][1]
curr_start = segments[i][0]
gap = curr_start - prev_end
if gap > SECTION_GAP_MIN:
groups.append(current_group)
current_group = []
current_group.append(segments[i])
groups.append(current_group)
logger.info(f" Found {len(groups)} segment groups.")
target_group = segments
logger.info(f" Selected group with {len(target_group)} segments.")
# 2. 确定每个卡片的左右边界 (Width Refinement)
# 收集所有 segment 的建议边界,然后统一
candidate_x1 = []
candidate_x2 = []
temp_valid_segments = []
for y1, y2 in target_group:
# 取中间一段行来分析
mid_y = (y1 + y2) // 2
sample_h = min(10, y2 - y1)
sample_rows = gray[mid_y - sample_h // 2: mid_y + sample_h // 2, :]
col_means = np.mean(sample_rows, axis=0)
# 从左向右找第一个非灰像素
x1 = 0
for x in range(w // 2):
if not (BG_GRAY_MIN <= col_means[x] <= BG_GRAY_MAX):
x1 = x
break
# 从右向左找第一个非灰像素
x2 = w
for x in range(w - 1, w // 2, -1):
if not (BG_GRAY_MIN <= col_means[x] <= BG_GRAY_MAX):
x2 = x
break
if x1 >= x2 or (x2 - x1) < w * 0.5:
# 如果这个 segment 无法确定宽度,可能不是有效卡片,暂不参与宽度投票
# 但为了不漏掉,暂时先记录,用默认值填充
logger.warning(f" Warning: Segment {y1}-{y2} has weird width {x2 - x1}.")
pass
else:
candidate_x1.append(x1)
candidate_x2.append(x2)
temp_valid_segments.append((y1, y2))
# 对过长的段进行内部切分,避免将两个卡片合并为一个
refined_segments = []
SPLIT_GAP_MIN = 8
SPLIT_MARGIN = 6
GAP_STD_STRICT = max(0.0, GAP_STD_MAX - 3.0)
for y1, y2 in temp_valid_segments:
split_points = []
run_len = 0
run_start = None
for yy in range(y1 + SPLIT_MARGIN, y2 - SPLIT_MARGIN):
rm = row_means[yy]
rs = row_stds[yy]
em = edge_means[yy]
is_gap_line = (GAP_MEAN_MIN <= rm <= GAP_MEAN_MAX) and (rs < GAP_STD_STRICT) and (BG_GRAY_MIN <= em <= BG_GRAY_MAX)
if is_gap_line:
if run_len == 0:
run_start = yy
run_len += 1
else:
if run_len >= SPLIT_GAP_MIN:
sp = (run_start + yy) // 2
split_points.append(sp)
run_len = 0
run_start = None
if run_len >= SPLIT_GAP_MIN and run_start is not None:
sp = (run_start + (y2 - SPLIT_MARGIN)) // 2
split_points.append(sp)
if split_points:
prev = y1
for sp in split_points:
if sp - prev > 100:
refined_segments.append((prev, sp))
prev = sp
if y2 - prev > 100:
refined_segments.append((prev, y2))
else:
refined_segments.append((y1, y2))
temp_valid_segments = refined_segments
# 计算统一宽度
if not candidate_x1:
logger.info(" No valid width detected. Using default.")
final_x1 = 0
final_x2 = w
else:
# 使用中位数或众数来消除噪声
# 考虑到对齐Min x1 和 Max x2 可能更合适?或者 Median。
# 通常卡片是对齐的,所以 x1 应该几乎一样。
final_x1 = int(np.median(candidate_x1))
final_x2 = int(np.median(candidate_x2))
# 稍微加点 Padding (但不要超过图片边界)
final_x1 = max(0, final_x1 - 5)
final_x2 = min(w, final_x2 + 5)
logger.info(f" Unified Width: X={final_x1}-{final_x2}, W={final_x2 - final_x1}")
# 3. 过滤高度异常的卡片 (Height Filtering)
# 计算所有潜在卡片的高度
final_cards = []
if not temp_valid_segments:
logger.info(" No segments found.")
else:
heights = [y2 - y1 for y1, y2 in temp_valid_segments]
if not heights:
logger.info(" No heights to calculate.")
else:
max_h = max(heights)
median_h = np.median(heights)
threshold_h = median_h * 0.80
for (y1, y2), card_h in zip(temp_valid_segments, heights):
if card_h < threshold_h:
logger.info(
f" Filtering out segment Y={y1}-{y2} (H={card_h}) because it's too short (Threshold={threshold_h:.1f}).")
else:
final_cards.append((y1, y2, final_x1, final_x2))
logger.info(f" Card: Y={y1}-{y2}, X={final_x1}-{final_x2}, H={card_h}")
# 4. 保存结果
if output_dir is None:
output_dir = os.path.dirname(img_path)
base_name = os.path.basename(img_path)
stem, ext = os.path.splitext(base_name)
# 准备可视化标记图
debug_img = img.copy()
results = []
# 准备 JSON 数据结构
json_data = {
"image": base_name,
"width": w,
"height": h,
"cards": []
}
# 顶部安全过滤:丢弃中心落在 SAFE_EXCLUDE_RATIO 以上区域的卡片
filtered_cards = []
for y1, y2, x1, x2 in final_cards:
center_y = (y1 + y2) / 2.0
if center_y < h * SAFE_EXCLUDE_RATIO:
logger.info(
f" Dropping card Y={y1}-{y2} (center={center_y:.1f}) "
f"because it is within top safe area ({SAFE_EXCLUDE_RATIO*100:.0f}%)."
)
continue
filtered_cards.append((y1, y2, x1, x2))
final_cards = filtered_cards
# 准备 _vl.jpg (只画框,不画红点)
vl_img = img.copy()
logger.info(f" Step [2.1/VL] 准备在 VL 图片上绘制 {len(final_cards)} 个场站的绿色方框...")
for idx, (y1, y2, x1, x2) in enumerate(final_cards):
# 轻微向上扩展卡片上边界,避免漏掉标题区域
PAD_TOP = 5
draw_y1 = max(0, y1 - PAD_TOP)
draw_y2 = y2
# 计算点击点 (左上角,避免被底部按钮遮挡)
# 策略X偏移 15%, Y偏移 20%
w_card = x2 - x1
h_card = draw_y2 - draw_y1
click_x = int(x1 + w_card * 0.15)
click_y = int(draw_y1 + h_card * 0.20)
# [修改] 不再保存单张子图,只记录元数据
# card = img[y1:y2, x1:x2]
# 文件名添加坐标: _ClickX_ClickY
# out_name = f"{stem}_{idx + 1}_{click_x}_{click_y}{ext}"
# out_path = os.path.join(output_dir, out_name)
# cv2.imwrite(out_path, card)
# logger.info(f" Saved {out_path}")
# results.append((out_path, (click_x, click_y)))
# 在标记图上画红点 (实心圆, 半径10, 红色BGR)
cv2.circle(debug_img, (click_x, click_y), 10, (0, 0, 255), -1)
# [修改] 必须画绿框,因为后续视觉模型依赖这个框来识别范围
cv2.rectangle(debug_img, (x1, draw_y1), (x2, draw_y2), (0, 255, 0), 2)
# 在 _vl 图上只画绿框
cv2.rectangle(vl_img, (x1, draw_y1), (x2, draw_y2), (0, 255, 0), 2)
# 收集 JSON 数据
card_info = {
"id": idx + 1,
"rect": [x1, draw_y1, x2, draw_y2],
"bounds_norm": {
"left": x1 / w,
"top": draw_y1 / h,
"right": x2 / w,
"bottom": draw_y2 / h
},
"click_point": [click_x, click_y]
}
json_data["cards"].append(card_info)
# 记录区域信息供调用者使用 (如果需要)
# 格式: (None, (click_x, click_y), (x1, draw_y1, x2, draw_y2))
results.append((None, (click_x, click_y), (x1, draw_y1, x2, draw_y2)))
# [删除] 之前生成的单张 _for_vl.jpg 逻辑已移除
# 保存标记图 (_flag.jpg)
if save_debug:
flag_out_path = os.path.join(output_dir, f"{stem}_flag{ext}")
save_image(flag_out_path, debug_img)
logger.info(f" Saved Debug Image: {flag_out_path}")
# 保存 _vl.jpg
vl_out_path = os.path.join(output_dir, f"{stem}_vl{ext}")
save_image(vl_out_path, vl_img)
logger.info(f" Step [2.2/VL] 已保存带有绿色方框的图片: {vl_out_path}")
# 保存 .json
import json
json_out_path = os.path.join(output_dir, f"{stem}.json")
with open(json_out_path, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
logger.info(f" Step [2.3/JSON] 已保存场站坐标元数据: {json_out_path}")
return results