Files
aiData/Test/TestFirstPage/T_TeLaiDian_FirstPageScrollTest.py
HuangHai 6688daa446 'commit'
2026-01-18 13:44:51 +08:00

152 lines
5.1 KiB
Python

import asyncio
import os
import sys
import time
import shutil
import uiautomator2 as u2
import cv2
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from Apps.TeLaiDian.Kit import setup_logger, take_screenshot
from Apps.TeLaiDian.Config.Setting import SCROLL_DISTANCE_RATIO, WAIT_AFTER_SCROLL
from Apps.TeLaiDian.FirstPageKit import run_ocr_rect, run_batch_in_dir
OUTPUT_DIR = os.path.join(project_root, "Output")
LOGS_DIR = os.path.join(project_root, "Logs")
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(LOGS_DIR, exist_ok=True)
LOG_FILE_PATH = os.path.join(LOGS_DIR, "T_TeLaiDian_FirstPageScrollTest.log")
logger = setup_logger(
"TeLaiDian.FirstPageScrollTest", log_file=LOG_FILE_PATH, clear_old_log=True
)
async def capture_list_page(d, base_dir: str, index: int):
ts = int(time.time())
image_uuid = f"tld_firstpage_{index}_{ts}"
list_path = take_screenshot(d, image_uuid, save_dir=base_dir)
logger.info(f"[{index}] 列表截图路径: {list_path}")
try:
await run_ocr_rect(list_path, log_path=LOG_FILE_PATH)
logger.info(f"[{index}] 已根据 OCR+LLM 算法生成首屏绿框调试图。")
except Exception as e:
logger.exception(f"[{index}] 运行 OCR 绿框测试失败: {e}")
async def run_capture_sequence(d, base_dir: str, pages: int = 5):
if os.path.exists(base_dir):
shutil.rmtree(base_dir, ignore_errors=True)
os.makedirs(base_dir, exist_ok=True)
for i in range(1, pages + 1):
await capture_list_page(d, base_dir, i)
if i < pages:
w, h = d.window_size()
start_x, start_y = w // 2, int(h * 0.8)
end_x, end_y = start_x, int(h * (1 - SCROLL_DISTANCE_RATIO))
logger.info(f"[{i}] 执行翻页滑动: ({start_x},{start_y}) -> ({end_x},{end_y})")
d.swipe(start_x, start_y, end_x, end_y, 0.3)
await asyncio.sleep(WAIT_AFTER_SCROLL)
async def offline_validate(base_dir: str):
img_files = []
for name in os.listdir(base_dir):
lower = name.lower()
if not lower.endswith(".jpg"):
continue
if "_flag" in lower or "_vl" in lower:
continue
img_files.append(os.path.join(base_dir, name))
img_files.sort()
if not img_files:
logger.info(f"目录下未找到图片: {base_dir}")
return
await run_batch_in_dir(base_dir, log_file=LOG_FILE_PATH)
total = len(img_files)
ok_count = 0
for path in img_files:
logger.info(f"开始离线校验: {path}")
img = cv2.imread(path)
if img is None:
logger.error(f"无法读取图片: {path}")
continue
h, w = img.shape[:2]
log_path = LOG_FILE_PATH
stations = await run_ocr_rect(path, log_path=log_path)
if not stations:
logger.error(f"未识别到任何场站: {path}")
continue
page_ok = True
for st in stations:
rect = st.get("rect") or []
if not isinstance(rect, (list, tuple)) or len(rect) < 4:
logger.error(f"矩形数据不合法: {st}")
page_ok = False
break
x1, y1, x2, y2 = rect
if not (0 <= x1 < x2 <= w and 0 <= y1 < y2 <= h):
logger.error(f"矩形越界: {rect}, 图片尺寸=({w},{h})")
page_ok = False
break
busy_list = st.get("busy_list") or []
if not busy_list:
logger.error(f"场站忙闲信息为空: {st}")
page_ok = False
break
click_point = st.get("click_point") or st.get("click") or []
if isinstance(click_point, (list, tuple)) and len(click_point) >= 2:
cx, cy = click_point[0], click_point[1]
if not (x1 <= cx <= x2 and y1 <= cy <= y2):
logger.error(f"点击点不在矩形内部: click={click_point}, rect={rect}")
page_ok = False
break
if page_ok:
ok_count += 1
logger.info(f"离线校验通过: {path}")
else:
logger.error(f"离线校验未通过: {path}")
if ok_count != total:
raise AssertionError(f"离线首屏 OCR+LLM 校验未全部通过: {ok_count}/{total}")
logger.info(f"离线首屏 OCR+LLM 校验全部通过: {ok_count}/{total}")
async def main():
if os.path.exists(LOGS_DIR):
for name in os.listdir(LOGS_DIR):
if not name.startswith("T_TeLaiDian_"):
continue
path = os.path.join(LOGS_DIR, name)
if os.path.isfile(path) or os.path.islink(path):
try:
os.remove(path)
except PermissionError:
pass
elif os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
else:
os.makedirs(LOGS_DIR, exist_ok=True)
d = u2.connect()
base_dir = OUTPUT_DIR
await run_capture_sequence(d, base_dir, pages=3)
await offline_validate(base_dir)
if __name__ == "__main__":
asyncio.run(main())