Files
aiData/Test/TestFirstPage/T_AiTeJiYiChong_FirstPageScrollTest.py
HuangHai bdbc6f46d1 'commit'
2026-01-18 12:43:13 +08:00

202 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import os
import sys
import time
import shutil
import uiautomator2 as u2
import cv2
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from Apps.AiTeJiYiChong.Kit import setup_logger, take_screenshot
from Apps.AiTeJiYiChong.Config.Setting import SCROLL_DISTANCE_RATIO, WAIT_AFTER_SCROLL
from Apps.AiTeJiYiChong.FirstPageKit import run_ocr_rect, run_batch_in_dir
OUTPUT_DIR = os.path.join(project_root, "Output")
LOGS_DIR = os.path.join(project_root, "Logs")
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(LOGS_DIR, exist_ok=True)
LOG_FILE_PATH = os.path.join(LOGS_DIR, "T_AiTeJiYiChong_FirstPageScrollTest.log")
logger = setup_logger(
"AiTeJiYiChong.FirstPageScrollTest", log_file=LOG_FILE_PATH, clear_old_log=True
)
async def capture_list_page(d, base_dir: str, index: int):
ts = int(time.time())
image_uuid = f"aite_firstpage_{index}_{ts}"
list_path = take_screenshot(d, image_uuid, save_dir=base_dir)
logger.info(f"[{index}] 列表截图路径: {list_path}")
try:
await run_ocr_rect(list_path, log_path=LOG_FILE_PATH)
logger.info(f"[{index}] 已根据 OCR+LLM 算法生成首屏绿框调试图。")
except Exception as e:
logger.exception(f"[{index}] 运行 OCR 绿框测试失败: {e}")
async def run_scroll_capture():
logger.info("请先手动打开微信并进入艾特吉易充小程序的场站列表第一页。")
d = u2.connect()
w, h = d.window_size()
device_info = d.info or {}
device_info["width"] = w
device_info["height"] = h
logger.info(f"当前设备: {device_info.get('productName')} | 分辨率: {w}x{h}")
base_dir = OUTPUT_DIR
if os.path.exists(base_dir):
for name in os.listdir(base_dir):
path = os.path.join(base_dir, name)
if os.path.isfile(path) or os.path.islink(path):
os.remove(path)
elif os.path.isdir(path):
shutil.rmtree(path)
os.makedirs(base_dir, exist_ok=True)
for i in range(1, 6):
await capture_list_page(d, base_dir, i)
if i < 5:
logger.info(f"[{i}] 向上滑动scale={SCROLL_DISTANCE_RATIO},等待 {WAIT_AFTER_SCROLL} 秒后继续截图。")
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
await asyncio.sleep(WAIT_AFTER_SCROLL)
async def run_batch_local():
base_dir = os.path.dirname(os.path.abspath(__file__))
logger.info(f"批量处理目录中的 JPG 文件: {base_dir}")
await run_batch_in_dir(base_dir, log_path=LOG_FILE_PATH)
async def run_single_image(image_path: str):
if not image_path:
logger.error("单图模式下需要提供图片路径。")
return
if not os.path.isabs(image_path):
image_path = os.path.abspath(image_path)
if not os.path.exists(image_path):
logger.error(f"指定的图片不存在: {image_path}")
return
logger.info(f"单图 OCR 测试,图片路径: {image_path}")
await run_ocr_rect(image_path, log_path=LOG_FILE_PATH)
async def run_output_regression():
base_dir = OUTPUT_DIR
if not os.path.exists(base_dir):
logger.error(f"离线回归目录不存在: {base_dir}")
return
files = sorted(os.listdir(base_dir))
jpg_files = []
for name in files:
lower = name.lower()
if not lower.endswith(".jpg"):
continue
if lower.endswith("_ocr_rect.jpg"):
continue
jpg_files.append(os.path.join(base_dir, name))
if not jpg_files:
logger.error("离线回归目录中不存在待检测的原始 JPG 截图。")
return
total = len(jpg_files)
ok_count = 0
for path in jpg_files:
img = cv2.imread(path)
if img is None:
logger.error(f"无法读取图片: {path}")
continue
h, w = img.shape[:2]
logger.info(f"开始离线校验: {path}")
stations = await run_ocr_rect(path, log_path=LOG_FILE_PATH)
if not stations:
logger.error(f"离线校验失败: {path} 未识别到任何场站。")
continue
page_ok = True
for st in stations:
rect = st.get("rect") or []
if not isinstance(rect, (list, tuple)) or len(rect) < 4:
logger.error(f"场站缺少有效矩形信息: {st}")
page_ok = False
break
x1, y1, x2, y2 = rect[0], rect[1], rect[2], rect[3]
if not (0 <= x1 < x2 <= w and 0 <= y1 < y2 <= h):
logger.error(f"矩形越界: {rect}, 图片尺寸=({w},{h})")
page_ok = False
break
busy_list = st.get("busy_list") or []
if not busy_list:
logger.error(f"场站忙闲信息为空: {st}")
page_ok = False
break
click_point = st.get("click_point") or st.get("click") or []
if isinstance(click_point, (list, tuple)) and len(click_point) >= 2:
cx, cy = click_point[0], click_point[1]
if not (x1 <= cx <= x2 and y1 <= cy <= y2):
logger.error(f"点击点不在矩形内部: click={click_point}, rect={rect}")
page_ok = False
break
if page_ok:
ok_count += 1
logger.info(f"离线校验通过: {path}")
else:
logger.error(f"离线校验未通过: {path}")
if ok_count != total:
raise AssertionError(f"离线首屏 OCR+LLM 校验未全部通过: {ok_count}/{total}")
logger.info(f"离线首屏 OCR+LLM 校验全部通过: {ok_count}/{total}")
if __name__ == "__main__":
if os.path.exists(LOGS_DIR):
for name in os.listdir(LOGS_DIR):
if not name.startswith("T_AiTeJiYiChong_"):
continue
path = os.path.join(LOGS_DIR, name)
if os.path.isfile(path) or os.path.islink(path):
try:
os.remove(path)
except PermissionError:
pass
elif os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
else:
os.makedirs(LOGS_DIR, exist_ok=True)
mode = "scroll"
image_path = None
if len(sys.argv) >= 2:
arg1 = sys.argv[1]
lower = arg1.lower()
if lower == "batch":
mode = "batch"
elif lower == "image":
mode = "image"
if len(sys.argv) >= 3:
image_path = sys.argv[2]
elif lower == "check":
mode = "check"
else:
if any(lower.endswith(ext) for ext in [".jpg", ".jpeg", ".png"]):
mode = "image"
image_path = arg1
try:
if mode == "batch":
asyncio.run(run_batch_local())
elif mode == "image":
asyncio.run(run_single_image(image_path))
elif mode == "check":
asyncio.run(run_output_regression())
else:
asyncio.run(run_scroll_capture())
except KeyboardInterrupt:
logger.info("用户手动中断测试脚本。")
except Exception as e:
logger.exception(f"测试脚本运行异常: {e}")