This commit is contained in:
HuangHai
2026-01-15 10:16:26 +08:00
parent 23dfa433b0
commit fdd5532b33
8 changed files with 0 additions and 1 deletions

View File

@@ -34,7 +34,6 @@ async def take_screenshot():
return filepath
else:
logger.error("拍照失败,未找到生成的文件。")
return None

View File

@@ -1,180 +0,0 @@
import uiautomator2 as u2
import time
import os
import cv2
import numpy as np
def test_click():
d = u2.connect()
w, h = d.window_size()
print(f"Device size: {w}x{h}")
# 1. Take screenshot
print("Taking screenshot...")
screenshot_path = r"d:\dsWork\aiData\Output\debug_ad_before.jpg"
d.screenshot(screenshot_path)
# 2. Check hierarchy
print("Dumping hierarchy...")
try:
xml = d.dump_hierarchy()
with open(r"d:\dsWork\aiData\Output\hierarchy.xml", "w", encoding="utf-8") as f:
f.write(xml)
print(r"Hierarchy saved to d:\dsWork\aiData\Output\hierarchy.xml")
except Exception as e:
print(f"Failed to dump hierarchy: {e}")
# 3. Visualize the current fixed point (80/1000, 835/1000)
# Norm: 0.08, 0.835
# Previous attempt was: (86, 2004) for 1080x2400
# Let's try to detect the black circle using HSV or thresholding
img = cv2.imread(screenshot_path)
if img is None:
print("Failed to load screenshot")
return
# ROI: Left side, bottom half
# x: 0 - 200, y: 1500 - 2200 (approx for 1080x2400)
roi_x1, roi_x2 = 0, int(w * 0.25)
roi_y1, roi_y2 = int(h * 0.6), int(h * 0.9)
roi = img[roi_y1:roi_y2, roi_x1:roi_x2]
# Convert to grayscale
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
# The close button is a black circle with a white X.
# We look for a dark circle, and then check if it contains something bright.
candidates = []
# 尝试多个阈值以应对不同的亮度环境
for threshold_val in [40, 60, 80, 100]:
_, thresh = cv2.threshold(gray, threshold_val, 255, cv2.THRESH_BINARY_INV)
# Find contours
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
# 圆形度检查
perimeter = cv2.arcLength(cnt, True)
if perimeter == 0: continue
circularity = 4 * np.pi * (area / (perimeter * perimeter))
# 兔子广告关闭按钮通常很小 (40x40 左右在 1080p 下是 1600 面积)
if 100 < area < 4000 and circularity > 0.4:
# 获取该候选区域的 bounding box
x, y, w_cnt, h_cnt = cv2.boundingRect(cnt)
# 在这个黑色圆内部,检查是否有亮色的 'X'
# 我们可以对这个区域做反向阈值,找亮色物体
padding = 2
inner_roi = gray[max(0, y-padding):min(roi.shape[0], y+h_cnt+padding),
max(0, x-padding):min(roi.shape[1], x+w_cnt+padding)]
# 找亮色物体 (X)
_, inner_thresh = cv2.threshold(inner_roi, 180, 255, cv2.THRESH_BINARY)
inner_contours, _ = cv2.findContours(inner_thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
has_x = False
for i_cnt in inner_contours:
i_area = cv2.contourArea(i_cnt)
# X 应该比圆小很多
if 10 < i_area < area * 0.5:
has_x = True
break
if has_x:
M = cv2.moments(cnt)
if M["m00"] != 0:
cX = int(M["m10"] / M["m00"]) + roi_x1
cY = int(M["m01"] / M["m00"]) + roi_y1
norm_x = int(cX / w * 1000)
norm_y = int(cY / h * 1000)
# 避免重复
if not any(abs(cX - c[0]) < 15 and abs(cY - c[1]) < 15 for c in candidates):
candidates.append((cX, cY, area, norm_x, norm_y, True)) # True means found X
# 保存候选区域图片以便调试
cv2.imwrite(rf"d:\dsWork\aiData\Output\cand_{len(candidates)-1}_roi.jpg", inner_roi)
else:
# 如果没找到 X但圆形度很高也可以作为一个低优先级候选
if circularity > 0.7:
M = cv2.moments(cnt)
if M["m00"] != 0:
cX = int(M["m10"] / M["m00"]) + roi_x1
cY = int(M["m01"] / M["m00"]) + roi_y1
norm_x = int(cX / w * 1000)
norm_y = int(cY / h * 1000)
if not any(abs(cX - c[0]) < 15 and abs(cY - c[1]) < 15 for c in candidates):
candidates.append((cX, cY, area, norm_x, norm_y, False))
cv2.imwrite(rf"d:\dsWork\aiData\Output\cand_{len(candidates)-1}_roi.jpg", inner_roi)
# Save thresh image for debugging
cv2.imwrite(r"d:\dsWork\aiData\Output\debug_thresh.jpg", thresh)
# 评分逻辑
def score_candidate(c):
# c = (cx, cy, area, nx, ny, has_x)
has_x = c[5]
# 基础分:如果有 X大幅加分
score = 1000 if has_x else 0
# 距离分:越靠近预期的 (93, 830) 分越高
dist = np.sqrt((c[3] - 93)**2 + (c[4] - 830)**2)
score -= dist * 2
# 面积分:理想面积在 500-1500 之间
if 500 < c[2] < 1500:
score += 200
return score
candidates.sort(key=score_candidate, reverse=True)
target_x, target_y = int(w * 0.08), int(h * 0.835) # Default fallback
norm_target_x, norm_target_y = 80, 835
if candidates:
print(f"Found {len(candidates)} candidate close buttons via CV:")
for i, (cx, cy, area, nx, ny, has_x) in enumerate(candidates):
score = score_candidate((cx, cy, area, nx, ny, has_x))
print(f" Candidate {i}: ({cx}, {cy}), Area: {area}, Norm: ({nx}, {ny}), HasX: {has_x}, Score: {score:.2f}")
# Pick the best one
best_c = candidates[0]
target_x, target_y = best_c[0], best_c[1]
norm_target_x, norm_target_y = best_c[3], best_c[4]
print(f"Selected CV target: ({target_x}, {target_y}), Norm: ({norm_target_x}, {norm_target_y}), HasX: {best_c[5]}")
# Visualize
viz_img = cv2.imread(screenshot_path)
cv2.circle(viz_img, (target_x, target_y), 3, (0, 0, 255), -1)
debug_path = r"d:\dsWork\aiData\Output\debug_ad_point.jpg"
red_point_path = r"d:\dsWork\aiData\Output\_redpoint.jpg"
cv2.imwrite(debug_path, viz_img)
cv2.imwrite(red_point_path, viz_img)
print(f"Debug images saved to {debug_path} and {red_point_path}")
# 4. Perform click
print(f"Clicking ({target_x}, {target_y}) using single click...")
d.click(target_x, target_y)
print("Waiting 3s for user to observe if the ad is closed...")
time.sleep(3)
# print("Pressing BACK to return to main page...")
# d.press("back")
else:
print("No CV candidates found. Skipping click to avoid accidental background interaction.")
time.sleep(2)
# 5. Take after screenshot
after_path = r"d:\dsWork\aiData\Output\debug_ad_after.jpg"
d.screenshot(after_path)
print(f"After screenshot saved to {after_path}")
# Check if successful (compare image hash or just file size, though file size varies with compression)
# Simple check: If the black button is gone, the area should be brighter?
# Or just let user check.
if __name__ == "__main__":
test_click()

View File

@@ -1,71 +0,0 @@
# coding=utf-8
# pip install easyocr
# python -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121 --force-reinstall
import os
import sys
import cv2
import numpy as np
# 设置项目根目录
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
from Util.EasyOcrKit import get_easyocr_reader
def test_easyocr_price():
# 待测试的图片路径
test_image_path = os.path.join(project_root, "Output", "Screenshot_20260115_083521.jpg")
output_image_path = os.path.join(project_root, "Output", "Test_EasyOCR_Result.jpg")
print(f"--- 开始测试 EasyOCR 识别 ---")
print(f"输入图片: {test_image_path}")
if not os.path.exists(test_image_path):
print(f"错误: 测试图片不存在!")
return
# 1. 初始化 OCR (使用封装类)
reader = get_easyocr_reader(gpu=True)
# 2. 读取图片
img = cv2.imread(test_image_path)
if img is None:
print("错误: 无法读取图片。")
return
# 3. 识别
target = '全部时段'
found = reader.find_text_position(img, target)
h, w = img.shape[:2]
if found:
text, quad, prob = found
pts = np.array(quad).astype(int)
cv2.polylines(img, [pts], isClosed=True, color=(0, 255, 0), thickness=2)
# 使用封装后的方法获取归一化中心点 (演示 get_normalized_rect)
rect = reader.get_normalized_rect(quad, w, h)
norm_x = (rect[0] + rect[2]) // 2
norm_y = (rect[1] + rect[3]) // 2
print(f'找到“{text}” (目标: {target}) 置信度={prob:.2f}')
print(f'归一化坐标: [{norm_x}, {norm_y}]')
cv2.putText(img, f"OCR Found: {text}", (pts[0][0], pts[0][1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
else:
print(f"未能在图片中找到包含“{target}”的文本。")
# 打印出所有识别到的文本,方便调试
print("识别到的所有文本:")
results = reader.read_text(img)
for (_, text, _) in results:
print(f" - {text}")
# 4. 保存结果图
cv2.imwrite(output_image_path, img)
print(f"--- 测试完成 ---")
print(f"结果已保存至: {output_image_path}")
if __name__ == "__main__":
test_easyocr_price()

View File

@@ -1,67 +0,0 @@
# coding=utf-8
import os
import sys
import cv2
import numpy as np
# 设置项目根目录
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.append(project_root)
from Apps.TeLaiDian.Kit import detect_price_info_container_cv, read_image, save_image
def test_price_box_detection():
# 待测试的图片路径
test_image_path = os.path.join(project_root, "Output", "Screenshot_20260115_083521.jpg")
output_image_path = os.path.join(project_root, "Output", "Test_PriceBox_Result_V2.jpg")
print(f"--- 开始测试价格容器识别 ---")
print(f"输入图片: {test_image_path}")
if not os.path.exists(test_image_path):
print(f"错误: 测试图片不存在!")
return
# 1. 调用刚才实现的 CV 识别函数
container_norm = detect_price_info_container_cv(test_image_path)
if not container_norm:
print("识别失败: 未能检测到‘价格信息’容器矩形。")
return
print(f"识别成功! 归一化坐标: {container_norm}")
# 2. 读取图片并绘制绿框进行可视化验证
img = read_image(test_image_path)
if img is None:
print("错误: 无法读取图片。")
return
h, w = img.shape[:2]
x1 = int(container_norm[0] * w / 1000)
y1 = int(container_norm[1] * h / 1000)
x2 = int(container_norm[2] * w / 1000)
y2 = int(container_norm[3] * h / 1000)
# 绘制外框 (绿框)
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 4)
# 绘制中间分割线 (区分左侧当前价和右侧会员价)
mid_x = x1 + (x2 - x1) // 2
cv2.line(img, (mid_x, y1), (mid_x, y2), (0, 255, 0), 2)
# 加上文字标注
cv2.putText(img, "Target Area (Left Half)", (x1 + 10, y1 + 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# 3. 保存结果图
if save_image(output_image_path, img):
print(f"--- 测试完成 ---")
print(f"结果已保存至: {output_image_path}")
print(f"请检查该图片中的绿框是否准确圈定了‘价格信息’板块及其左侧区域。")
else:
print("错误: 结果图保存失败。")
if __name__ == "__main__":
test_price_box_detection()

View File

@@ -1,53 +0,0 @@
import asyncio
import sys
import os
# 将项目根目录添加到 python 路径
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(project_root)
from Config.Config import DB_URL
from DbKit.Db import Db
from sqlalchemy import text
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def update_schema():
"""
为 t_station_status_scd 表添加 market_price 字段
"""
db = Db(db_url=DB_URL)
await db.init_db()
# 检查字段是否已存在
check_sql = "SHOW COLUMNS FROM t_station_status_scd LIKE 'market_price';"
# 添加字段的 SQL
alter_sql = "ALTER TABLE t_station_status_scd ADD COLUMN market_price DECIMAL(10, 4) COMMENT '挂牌价快照' AFTER pro_price;"
async with await db.get_session() as session:
try:
result = await session.execute(text(check_sql))
column_exists = result.fetchone() is not None
if not column_exists:
logger.info("正在为 t_station_status_scd 添加 market_price 字段...")
await session.execute(text(alter_sql))
await session.commit()
logger.info("字段 market_price 添加成功。")
else:
logger.info("字段 market_price 已存在,跳过。")
except Exception as e:
logger.error(f"执行 ALTER TABLE 时出错: {e}")
await session.rollback()
finally:
await session.close()
await db.engine.dispose()
if __name__ == "__main__":
asyncio.run(update_schema())