202 lines
7.1 KiB
Python
202 lines
7.1 KiB
Python
import asyncio
|
||
import os
|
||
import sys
|
||
import time
|
||
import shutil
|
||
|
||
import uiautomator2 as u2
|
||
import cv2
|
||
|
||
|
||
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
if project_root not in sys.path:
|
||
sys.path.append(project_root)
|
||
|
||
|
||
from Apps.YeLiTe.Kit import setup_logger, take_screenshot
|
||
from Apps.YeLiTe.Config.Setting import SCROLL_DISTANCE_RATIO, WAIT_AFTER_SCROLL
|
||
from Apps.YeLiTe.FirstPageKit import run_ocr_rect, run_batch_in_dir
|
||
|
||
|
||
OUTPUT_DIR = os.path.join(project_root, "Output")
|
||
LOGS_DIR = os.path.join(project_root, "Logs")
|
||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||
os.makedirs(LOGS_DIR, exist_ok=True)
|
||
LOG_FILE_PATH = os.path.join(LOGS_DIR, "T_YeLiTe_FirstPageScrollTest.log")
|
||
|
||
logger = setup_logger(
|
||
"YeLiTe.FirstPageScrollTest", log_file=LOG_FILE_PATH, clear_old_log=True
|
||
)
|
||
|
||
|
||
async def capture_list_page(d, base_dir: str, index: int):
|
||
ts = int(time.time())
|
||
image_uuid = f"ylt_firstpage_{index}_{ts}"
|
||
list_path = take_screenshot(d, image_uuid, save_dir=base_dir)
|
||
logger.info(f"[{index}] 列表截图路径: {list_path}")
|
||
|
||
try:
|
||
await run_ocr_rect(list_path, log_path=LOG_FILE_PATH)
|
||
logger.info(f"[{index}] 已根据 OCR+LLM 算法生成首屏绿框调试图。")
|
||
except Exception as e:
|
||
logger.exception(f"[{index}] 运行 OCR 绿框测试失败: {e}")
|
||
|
||
|
||
async def run_scroll_capture():
|
||
logger.info("请先手动打开微信并进入驿来特小程序的场站列表第一页。")
|
||
|
||
d = u2.connect()
|
||
w, h = d.window_size()
|
||
device_info = d.info or {}
|
||
device_info["width"] = w
|
||
device_info["height"] = h
|
||
|
||
logger.info(f"当前设备: {device_info.get('productName')} | 分辨率: {w}x{h}")
|
||
|
||
base_dir = OUTPUT_DIR
|
||
if os.path.exists(base_dir):
|
||
for name in os.listdir(base_dir):
|
||
path = os.path.join(base_dir, name)
|
||
if os.path.isfile(path) or os.path.islink(path):
|
||
os.remove(path)
|
||
elif os.path.isdir(path):
|
||
shutil.rmtree(path)
|
||
os.makedirs(base_dir, exist_ok=True)
|
||
|
||
for i in range(1, 11):
|
||
await capture_list_page(d, base_dir, i)
|
||
if i < 10:
|
||
logger.info(f"[{i}] 向上滑动,scale={SCROLL_DISTANCE_RATIO},等待 {WAIT_AFTER_SCROLL} 秒后继续截图。")
|
||
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
|
||
await asyncio.sleep(WAIT_AFTER_SCROLL)
|
||
|
||
|
||
async def run_batch_local():
|
||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
logger.info(f"批量处理目录中的 JPG 文件: {base_dir}")
|
||
await run_batch_in_dir(base_dir, log_path=LOG_FILE_PATH)
|
||
|
||
|
||
async def run_single_image(image_path: str):
|
||
if not image_path:
|
||
logger.error("单图模式下需要提供图片路径。")
|
||
return
|
||
if not os.path.isabs(image_path):
|
||
image_path = os.path.abspath(image_path)
|
||
if not os.path.exists(image_path):
|
||
logger.error(f"指定的图片不存在: {image_path}")
|
||
return
|
||
logger.info(f"单图 OCR 测试,图片路径: {image_path}")
|
||
await run_ocr_rect(image_path, log_path=LOG_FILE_PATH)
|
||
|
||
|
||
async def run_output_regression():
|
||
base_dir = OUTPUT_DIR
|
||
if not os.path.exists(base_dir):
|
||
logger.error(f"离线回归目录不存在: {base_dir}")
|
||
return
|
||
files = sorted(os.listdir(base_dir))
|
||
jpg_files = []
|
||
for name in files:
|
||
lower = name.lower()
|
||
if not lower.endswith(".jpg"):
|
||
continue
|
||
if lower.endswith("_ocr_rect.jpg"):
|
||
continue
|
||
jpg_files.append(os.path.join(base_dir, name))
|
||
if not jpg_files:
|
||
logger.error("离线回归目录中不存在待检测的原始 JPG 截图。")
|
||
return
|
||
total = len(jpg_files)
|
||
ok_count = 0
|
||
for path in jpg_files:
|
||
img = cv2.imread(path)
|
||
if img is None:
|
||
logger.error(f"无法读取图片: {path}")
|
||
continue
|
||
h, w = img.shape[:2]
|
||
logger.info(f"开始离线校验: {path}")
|
||
stations = await run_ocr_rect(path, log_path=LOG_FILE_PATH)
|
||
if not stations:
|
||
logger.error(f"离线校验失败: {path} 未识别到任何场站。")
|
||
continue
|
||
page_ok = True
|
||
for st in stations:
|
||
rect = st.get("rect") or []
|
||
if not isinstance(rect, (list, tuple)) or len(rect) < 4:
|
||
logger.error(f"场站缺少有效矩形信息: {st}")
|
||
page_ok = False
|
||
break
|
||
x1, y1, x2, y2 = rect[0], rect[1], rect[2], rect[3]
|
||
if not (0 <= x1 < x2 <= w and 0 <= y1 < y2 <= h):
|
||
logger.error(f"矩形越界: {rect}, 图片尺寸=({w},{h})")
|
||
page_ok = False
|
||
break
|
||
busy_list = st.get("busy_list") or []
|
||
if not busy_list:
|
||
logger.error(f"场站忙闲信息为空: {st}")
|
||
page_ok = False
|
||
break
|
||
click_point = st.get("click_point") or st.get("click") or []
|
||
if isinstance(click_point, (list, tuple)) and len(click_point) >= 2:
|
||
cx, cy = click_point[0], click_point[1]
|
||
if not (x1 <= cx <= x2 and y1 <= cy <= y2):
|
||
logger.error(f"点击点不在矩形内部: click={click_point}, rect={rect}")
|
||
page_ok = False
|
||
break
|
||
if page_ok:
|
||
ok_count += 1
|
||
logger.info(f"离线校验通过: {path}")
|
||
else:
|
||
logger.error(f"离线校验未通过: {path}")
|
||
if ok_count != total:
|
||
raise AssertionError(f"离线首屏 OCR+LLM 校验未全部通过: {ok_count}/{total}")
|
||
logger.info(f"离线首屏 OCR+LLM 校验全部通过: {ok_count}/{total}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
if os.path.exists(LOGS_DIR):
|
||
for name in os.listdir(LOGS_DIR):
|
||
if not name.startswith("T_YeLiTe_"):
|
||
continue
|
||
path = os.path.join(LOGS_DIR, name)
|
||
if os.path.isfile(path) or os.path.islink(path):
|
||
try:
|
||
os.remove(path)
|
||
except PermissionError:
|
||
pass
|
||
elif os.path.isdir(path):
|
||
shutil.rmtree(path, ignore_errors=True)
|
||
else:
|
||
os.makedirs(LOGS_DIR, exist_ok=True)
|
||
mode = "scroll"
|
||
image_path = None
|
||
if len(sys.argv) >= 2:
|
||
arg1 = sys.argv[1]
|
||
lower = arg1.lower()
|
||
if lower == "batch":
|
||
mode = "batch"
|
||
elif lower == "image":
|
||
mode = "image"
|
||
if len(sys.argv) >= 3:
|
||
image_path = sys.argv[2]
|
||
elif lower == "check":
|
||
mode = "check"
|
||
else:
|
||
if any(lower.endswith(ext) for ext in [".jpg", ".jpeg", ".png"]):
|
||
mode = "image"
|
||
image_path = arg1
|
||
try:
|
||
if mode == "batch":
|
||
asyncio.run(run_batch_local())
|
||
elif mode == "image":
|
||
asyncio.run(run_single_image(image_path))
|
||
elif mode == "check":
|
||
asyncio.run(run_output_regression())
|
||
else:
|
||
asyncio.run(run_scroll_capture())
|
||
except KeyboardInterrupt:
|
||
logger.info("用户手动中断测试脚本。")
|
||
except Exception as e:
|
||
logger.exception(f"测试脚本运行异常: {e}")
|