diff --git a/Apps/TeLaiDian/ClearHistory.py b/Apps/TeLaiDian/ClearHistory.py new file mode 100644 index 0000000..d4bd2a8 --- /dev/null +++ b/Apps/TeLaiDian/ClearHistory.py @@ -0,0 +1,85 @@ +import asyncio +import os +import sys +import logging + +# 确保项目根目录在 sys.path 中 +project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +if project_root not in sys.path: + sys.path.append(project_root) + +from DbKit.Db import Db +from Util.RedisKit import RedisKit +from sqlalchemy.sql import text +from Config.Config import DB_URL + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger("ClearAiTeJiYiChong") + +async def main(): + """ + 1. 删除数据库中所有 operator='特来电' 的记录 + 2. 删除 Redis 中所有关于艾特吉易充的缓存信息 + """ + operator = '特来电' + + # 1. 数据库清理 + logger.info(f"开始清理数据库中 operator='{operator}' 的数据...") + db = Db(db_url=DB_URL) + await db.init_db() + + try: + async with db.AsyncSessionLocal() as session: + async with session.begin(): + # 先删除从表记录(通过 station_hash 关联) + # 1. t_station_status_scd + sql_status = """ + DELETE FROM t_station_status_scd + WHERE station_hash IN ( + SELECT station_hash FROM t_station_profile_scd WHERE operator = :operator + ) + """ + logger.info("正在清理 t_station_status_scd...") + result_status = await session.execute(text(sql_status), {"operator": operator}) + logger.info(f"t_station_status_scd 已删除 {result_status.rowcount} 行记录。") + + # 2. t_station_price_schedule_scd + sql_price = """ + DELETE FROM t_station_price_schedule_scd + WHERE station_hash IN ( + SELECT station_hash FROM t_station_profile_scd WHERE operator = :operator + ) + """ + logger.info("正在清理 t_station_price_schedule_scd...") + result_price = await session.execute(text(sql_price), {"operator": operator}) + logger.info(f"t_station_price_schedule_scd 已删除 {result_price.rowcount} 行记录。") + + # 3. 最后删除主表 t_station_profile_scd + sql_profile = "DELETE FROM t_station_profile_scd WHERE operator = :operator" + logger.info("正在清理 t_station_profile_scd...") + result_profile = await session.execute(text(sql_profile), {"operator": operator}) + logger.info(f"t_station_profile_scd 已删除 {result_profile.rowcount} 行记录。") + + logger.info("数据库记录清理完成。") + except Exception as e: + logger.error(f"数据库清理失败: {e}") + + # 2. Redis 清理 + logger.info("开始清理 Redis 中的缓存数据...") + redis_kit = RedisKit() + # 根据特来电爬虫的约定,Redis 去重键的模式为 crawled:tld:* + pattern = "crawled:tld:*" + try: + keys = await redis_kit.keys(pattern) + if keys: + logger.info(f"匹配到 {len(keys)} 个键,正在删除...") + await redis_kit.delete(*keys) + logger.info("Redis 缓存清理完成。") + else: + logger.info(f"未匹配到模式为 '{pattern}' 的键。") + except Exception as e: + logger.error(f"Redis 清理失败: {e}") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/Apps/TeLaiDian/Crawler.py b/Apps/TeLaiDian/Crawler.py index 2b98091..a75cab5 100644 --- a/Apps/TeLaiDian/Crawler.py +++ b/Apps/TeLaiDian/Crawler.py @@ -27,6 +27,11 @@ if project_root not in sys.path: # 初始化日志 logger = setup_logger("TeLaiDianCrawler") +PRICE_TAB_X_NORM = 220 +PRICE_TAB_Y_NORM = 130 +PRICE_ENTRY_X_NORM = 230 +PRICE_ENTRY_Y_NORM = 380 + class TeLaiDianCrawler(BaseCrawler): def __init__(self, service=None): super().__init__(service or TeLaiDianService()) @@ -85,6 +90,61 @@ class TeLaiDianCrawler(BaseCrawler): # [优化] 向下滚动以刷新/校准地理位置 # 使用更加显式的 swipe 方式:从屏幕 30% 划到 80% + popup_screen_path = take_screenshot(d, f"tld_detail_popup_{int(time.time())}.jpg") + logger.info(f"[详情页] 截图用于检测温馨提示弹窗: {popup_screen_path}") + + template_xczs = os.path.join(os.path.dirname(__file__), "Template", "xczs.jpg") + template_point = None + if os.path.exists(template_xczs): + try: + match_res = d.image.match(template_xczs) + if match_res: + if hasattr(match_res, "point") and match_res.point: + template_point = match_res.point + elif isinstance(match_res, dict): + if match_res.get("point"): + template_point = match_res["point"] + elif "x" in match_res and "y" in match_res: + template_point = (match_res["x"], match_res["y"]) + elif isinstance(match_res, (list, tuple)) and len(match_res) >= 2: + template_point = (match_res[0], match_res[1]) + if template_point: + logger.info(f"[详情页] 模板检测到疑似温馨提示弹窗位置: {template_point}") + else: + logger.info("[详情页] 模板未能确定温馨提示弹窗位置") + except Exception as e: + logger.error(f"[详情页] 模板检测温馨提示弹窗失败: {e}") + else: + logger.info(f"[详情页] 温馨提示模板文件不存在: {template_xczs}") + + vlm_popup = {"has_popup": False} + try: + vlm_popup = await self.read_image_kit.check_warm_popup_vlm(popup_screen_path) + except Exception as e: + logger.error(f"[详情页] VLM 检测温馨提示弹窗失败: {e}") + + has_vlm_popup = isinstance(vlm_popup, dict) and vlm_popup.get("has_popup") + if template_point and has_vlm_popup: + click_x = int(template_point[0]) + click_y = int(template_point[1]) + logger.info(f"[详情页] 模板与 VLM 均确认存在温馨提示弹窗,即将点击关闭按钮: ({click_x}, {click_y})") + debug_popup_path = popup_screen_path.replace(".jpg", f"_xczs_click_{click_x}_{click_y}.jpg") + try: + img_popup = read_image(popup_screen_path) + if img_popup is not None: + cv2.circle(img_popup, (click_x, click_y), 20, (0, 0, 255), -1) + save_image(debug_popup_path, img_popup) + logger.info(f"[详情页] 已生成温馨提示弹窗点击诊断图片: {debug_popup_path}") + except Exception as e: + logger.error(f"[详情页] 生成温馨提示弹窗诊断图片失败: {e}") + try: + d.click(click_x, click_y) + await asyncio.sleep(1.5) + except Exception as e: + logger.error(f"[详情页] 点击温馨提示“下次再说”失败: {e}") + else: + logger.info(f"[详情页] 温馨提示弹窗未通过双重确认,模板检测: {bool(template_point)} | VLM 检测: {vlm_popup}") + w, h = d.window_size() logger.info(f"执行显式下拉刷新操作: (x={w//2}, y1={int(h*0.3)} -> y2={int(h*0.8)})") d.swipe(w // 2, int(h * 0.3), w // 2, int(h * 0.8), duration=0.5) @@ -220,319 +280,202 @@ class TeLaiDianCrawler(BaseCrawler): except: pass - template_xczs = os.path.join(os.path.dirname(__file__), "Template", "xczs.jpg") - logger.info("[详情页] 检查是否存在温馨提示弹窗(下次再说)...") - try: - await asyncio.sleep(0.5) - if os.path.exists(template_xczs): - clicked = d.image.click(template_xczs, timeout=3.0) - if clicked: - logger.info("[详情页] 检测到温馨提示弹窗,已点击“下次再说”关闭。") - await asyncio.sleep(1.5) - else: - logger.info("[详情页] 未在当前页面匹配到温馨提示弹窗模板。") - else: - logger.info(f"[详情页] 温馨提示模板文件不存在: {template_xczs}") - except Exception as e: - logger.error(f"[详情页] 通过模板点击关闭温馨提示弹窗失败: {e}") - w, h = d.window_size() - logger.info("[详情页] 先执行一次较大的向上滑动,将详情内容上移一屏左右") - d.swipe(w * 0.5, h * 0.8, w * 0.5, h * 0.3, 0.5) - await asyncio.sleep(1.0) - entrance_point = None - entrance_source = None - max_search_rounds = 5 + logger.info("[详情页] 根据用户策略: 多次大幅向上滑动,直到页面基本不再变化") + last_md5 = None + stable_count = 0 + max_round = 30 + final_screen_path = None - for round_idx in range(max_search_rounds): - if entrance_point: - break - search_screen_path = take_screenshot(d, f"tld_detail_search_{int(time.time())}_{round_idx}.jpg") - logger.info(f"[详情页] 搜索价格入口,第 {round_idx + 1}/{max_search_rounds} 轮: {search_screen_path}") + from Apps.TeLaiDian.Kit import get_image_content_md5 - if not entrance_point: - try: - vlm_res = await self.read_image_kit.find_price_entrance_vlm(search_screen_path) - except Exception as e: - vlm_res = {} - logger.error(f"[详情页] VLM 寻找价格入口失败: {e}") - - point = vlm_res.get("point") if isinstance(vlm_res, dict) else None - if vlm_res.get("found") and point and len(point) >= 2: - vx = int(point[0] * w / 1000) - vy = int(point[1] * h / 1000) - if vy < int(h * SAFE_EXCLUDE_RATIO): - vy = int(h * SAFE_EXCLUDE_RATIO) - if vy > int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO)): - vy = int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO)) - entrance_point = (vx, vy) - entrance_source = entrance_source or "vlm" - logger.info(f"[详情页] VLM 成功找到价格入口: {entrance_point},原因: {vlm_res.get('reason')}") - - if entrance_point: - break - - logger.info("[详情页] 本轮未找到价格入口,向上滑动继续搜索...") - d.swipe_ext("up", scale=DETAIL_SCROLL_DISTANCE_RATIO) - await asyncio.sleep(WAIT_AFTER_SCROLL) - - if not entrance_point: - logger.error(f"[详情页] 连续 {max_search_rounds} 轮滚动后仍未找到价格入口,本次流程直接中止。") - raise RuntimeError("TeLaiDian: 价格入口多轮搜索失败") - - before_price_path = take_screenshot(d, f"tld_detail_price_before_{int(time.time())}.jpg") - before_md5 = get_image_content_md5( - before_price_path, - top_ratio=SAFE_EXCLUDE_RATIO, - bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO, - ) - - ex, ey = int(entrance_point[0]), int(entrance_point[1]) - entered_price_page = False - - if entrance_source == "vlm": - click_x = ex - click_y = ey - logger.info(f"[详情页] 使用 VLM 当前价红色价格入口坐标直接点击: ({click_x}, {click_y})") - else: - click_x = ex - offset_y = int(h * 0.1) - click_y = min(h - 10, ey + offset_y) - logger.info(f"[详情页] 使用 {entrance_source or '入口'} 坐标: ({ex}, {ey}),调整后点击坐标: ({click_x}, {click_y}),即将进入电价页") - d.click(click_x, click_y) - await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD) - - after_price_path = take_screenshot(d, f"tld_detail_price_after_{int(time.time())}.jpg") - after_md5 = get_image_content_md5( - after_price_path, - top_ratio=SAFE_EXCLUDE_RATIO, - bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO, - ) - - if before_md5 and after_md5 and before_md5 != after_md5: - entered_price_page = True - logger.info("[电价页] 首次点击后页面内容发生变化,判定已进入电价详情页。") - else: - logger.warning("[电价页] 首次点击后页面内容无明显变化,尝试使用 VLM 兜底寻找价格入口。") - try: - vlm_res = await self.read_image_kit.find_price_entrance_vlm(before_price_path) - except Exception as e: - vlm_res = {} - logger.error(f"[电价页] VLM 寻找价格入口失败: {e}") - - point = vlm_res.get("point") if isinstance(vlm_res, dict) else None - if vlm_res.get("found") and point and len(point) >= 2: - w, h = d.window_size() - vx = int(point[0] * w / 1000) - vy = int(point[1] * h / 1000) - if vy < int(h * SAFE_EXCLUDE_RATIO): - vy = int(h * SAFE_EXCLUDE_RATIO) - if vy > int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO)): - vy = int(h * (1 - BOTTOM_SAFE_EXCLUDE_RATIO)) - logger.info(f"[电价页] 使用 VLM 兜底点击价格入口: ({vx}, {vy}),原因: {vlm_res.get('reason')}") - d.click(vx, vy) - await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD) - - vlm_after_path = take_screenshot(d, f"tld_detail_price_after_vlm_{int(time.time())}.jpg") - vlm_after_md5 = get_image_content_md5( - vlm_after_path, - top_ratio=SAFE_EXCLUDE_RATIO, - bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO, - ) - if before_md5 and vlm_after_md5 and before_md5 != vlm_after_md5: - entered_price_page = True - logger.info("[电价页] VLM 兜底点击后页面内容发生变化,判定已进入电价详情页。") - else: - logger.error(f"[电价页] VLM 未能找到可靠的价格入口: {vlm_res}") - - if not entered_price_page: - logger.error("[电价页] 多轮尝试后页面仍未变化,疑似点击未生效或入口不可用。") - raise RuntimeError("TeLaiDian: 价格入口点击后页面未变化") - - logger.info("[电价页] 已成功进入电价详情页,等待小程序自动滚动定位完成 (4秒)...") - await asyncio.sleep(4.0) - logger.info("[电价页] 额外执行多次向下滑动,让上方时段露出") - for idx in range(3): - logger.info(f"[电价页] 第 {idx+1} 次向下滑动(d.swipe_ext(\"down\", scale=0.9))") - d.swipe_ext("down", scale=0.9) + for idx in range(max_round): + start_x = int(w * 0.9) + start_y = int(h * 0.85) + end_y = int(h * 0.25) + logger.info(f"[详情页] 第 {idx + 1} 轮大幅向上滑动: ({start_x}, {start_y}) -> ({start_x}, {end_y})") + d.swipe(start_x, start_y, start_x, end_y, 0.25) await asyncio.sleep(1.0) - # 4. 进入分时电价页面后的处理 - try: - logger.info("[电价页] 已点击价格入口,等待小程序自动滚动定位完成 (4秒)...") - await asyncio.sleep(4.0) - logger.info("[电价页] 开始执行两次大幅向下拉动,目标是回到 00:00 时段顶部") - for idx in range(2): - logger.info(f"[电价页] 第 {idx+1} 次向下滑动(d.swipe_ext(\"down\", scale=0.9))") - d.swipe_ext("down", scale=0.9) - await asyncio.sleep(1.0) - except Exception as e: - logger.error(f"[电价页] 处理分时电价页面初始状态失败: {e}") + screen_path = take_screenshot(d, f"tld_detail_scan_{int(time.time())}_{idx}.jpg") + logger.info(f"[详情页] 第 {idx + 1} 轮滑动后的截图: {screen_path}") + curr_md5 = get_image_content_md5( + screen_path, + top_ratio=SAFE_EXCLUDE_RATIO, + bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO, + ) + if last_md5 is not None and curr_md5 == last_md5: + stable_count += 1 + logger.info(f"[详情页] 页面内容连续第 {stable_count} 次无变化,可能已到稳定区域") + else: + stable_count = 0 + last_md5 = curr_md5 + final_screen_path = screen_path - # 5. 循环滑动抓取完整分时电价 - all_prices = [] - last_price_md5 = None - price_page_count = 0 - max_price_pages = 4 - screenshot_tasks = [] - temp_screenshots = [] + if stable_count >= 2: + logger.info("[详情页] 检测到页面多次无变化,认为已到达顶部固定区域,提前结束扫描。") + break - logger.info("[电价页] 开始循环截图(UI操作优先,后台并行分析)...") - try: - while price_page_count < max_price_pages: - price_screen_path = take_screenshot(d, f"tld_detail_price_{int(time.time())}_{price_page_count}.jpg") - - curr_md5 = get_image_content_md5(price_screen_path, top_ratio=0.2, bottom_ratio=0.2) - if curr_md5 == last_price_md5: - logger.info("价格页面内容无变化,判定已触底") - if os.path.exists(price_screen_path): - os.remove(price_screen_path) - break - last_price_md5 = curr_md5 - temp_screenshots.append(price_screen_path) - - logger.info(f"已截取价格详情页第 {price_page_count + 1} 页: {price_screen_path},启动后台异步分析") - task = asyncio.create_task(self.read_image_kit.analyze_detail_price(price_screen_path)) - screenshot_tasks.append(task) - - logger.info("向上滚动列表,准备截取下一屏价格...") - d.swipe_ext("up", scale=0.8) - await asyncio.sleep(1.0) - price_page_count += 1 - - if screenshot_tasks: - logger.info(f"UI 操作已完成,等待 {len(screenshot_tasks)} 个后台分析任务结束...") - results = await asyncio.gather(*screenshot_tasks, return_exceptions=True) - - for res in results: - if isinstance(res, Exception): - logger.error(f"后台分析任务出错: {res}") - continue - if res: - for p in res: - is_duplicate = False - for existing in all_prices: - if p.get('start') == existing.get('start') and p.get('end') == existing.get('end'): - is_duplicate = True - for key in ['price', 'plus_price', 'market_price', 'elec_price', 'service_price']: - if p.get(key) is not None and (existing.get(key) is None or existing.get(key) == 0): - existing[key] = p[key] - break - if not is_duplicate: - all_prices.append(p) - except Exception as e: - logger.error(f"抓取价格详情过程中发生异常: {e}") - finally: - for task in screenshot_tasks: - if not task.done(): - task.cancel() - - for path in temp_screenshots: - if os.path.exists(path): - try: - os.remove(path) - except: - pass - - if all_prices: - station_name_clean = clean_station_name(station_name) - try: - all_prices.sort(key=lambda x: x.get('start', '00:00')) - except: - pass - logger.info(f"✅ 场站 {station_name_clean} 共提取到 {len(all_prices)} 条价格信息,准备保存...") - await self.service.save_station_data(station_name_clean, address, all_prices) + if not final_screen_path: + final_screen_path = take_screenshot(d, f"tld_detail_scan_final_{int(time.time())}.jpg") + logger.info(f"[详情页] 扫描流程未生成截图,使用兜底截图: {final_screen_path}") else: - logger.warning(f"❌ 未能提取到任何价格信息,请检查页面识别逻辑") - if address: + logger.info(f"[详情页] 使用最终稳定区域截图作为价格页识别输入: {final_screen_path}") + + logger.info("[详情页] 使用固定归一化坐标点击顶部“价格”标签,并点击左侧当前价数字进入全时段电价") + price_tab_screen = final_screen_path + entrance_clicked = False + + try: + tab_x = int(PRICE_TAB_X_NORM * w / 1000) + tab_y = int(PRICE_TAB_Y_NORM * h / 1000) + logger.info(f"[详情页] 固定坐标点击价格标签: 归一化({PRICE_TAB_X_NORM}, {PRICE_TAB_Y_NORM}) -> 像素({tab_x}, {tab_y})") + d.click(tab_x, tab_y) + await asyncio.sleep(1.0) + price_tab_screen = take_screenshot(d, f"tld_detail_after_price_tab_{int(time.time())}.jpg") + logger.info(f"[详情页] 点击价格标签后的界面截图已保存: {price_tab_screen}") + + entry_x = int(PRICE_ENTRY_X_NORM * w / 1000) + entry_y = int(PRICE_ENTRY_Y_NORM * h / 1000) + click_x = max(5, min(w - 5, entry_x)) + click_y = max(5, min(h - 5, entry_y)) + + debug_click_path = price_tab_screen.replace( + ".jpg", + f"_click_{tab_x}_{tab_y}_price_{click_x}_{click_y}.jpg" + ) + try: + img = read_image(price_tab_screen) + if img is not None: + cv2.circle(img, (tab_x, tab_y), 20, (0, 0, 255), -1) + cv2.circle(img, (click_x, click_y), 20, (0, 0, 255), -1) + save_image(debug_click_path, img) + logger.info(f"[详情页] 已生成价格标签与当前价入口红点标记图: {debug_click_path}") + else: + logger.warning(f"[详情页] 加载价格页截图失败,无法绘制诊断红点: {price_tab_screen}") + except Exception as e: + logger.error(f"[详情页] 生成价格入口点击诊断图片失败: {e}") + + logger.info(f"[详情页] 点击当前价入口: 像素({click_x}, {click_y}),屏幕大小: ({w}, {h})") + d.click(click_x, click_y) + entrance_clicked = True + await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD) + except Exception as e: + logger.error(f"[详情页] 固定坐标点击价格标签或入口失败: {e}") + + if entrance_clicked: + entered_price_path = take_screenshot(d, f"tld_detail_price_after_enter_{int(time.time())}.jpg") + logger.info(f"[电价页] 入口点击后的电价页截图已保存: {entered_price_path}") + + await asyncio.sleep(1.0) + + logger.info("[电价页] 先多次向下滚动,使列表回到起始位置") + top_last_md5 = None + top_stable = 0 + max_top_round = 8 + price_top_screen = entered_price_path + + for idx in range(max_top_round): + price_top_screen = take_screenshot(d, f"tld_detail_price_top_{int(time.time())}_{idx}.jpg") + logger.info(f"[电价页] 向下滚动前后的截图: {price_top_screen}") + curr_md5 = get_image_content_md5(price_top_screen, top_ratio=0.2, bottom_ratio=0.2) + if top_last_md5 is not None and curr_md5 == top_last_md5: + top_stable += 1 + logger.info(f"[电价页] 页面内容连续第 {top_stable} 次无变化,可能已到顶部") + if top_stable >= 2: + break + else: + top_stable = 0 + top_last_md5 = curr_md5 + + d.swipe_ext("down", scale=0.8) + await asyncio.sleep(1.0) + + all_prices = [] + last_price_md5 = None + price_page_count = 0 + max_price_pages = 4 + screenshot_tasks = [] + temp_screenshots = [] + price_screen_path = price_top_screen + + try: + while price_page_count < max_price_pages: + price_screen_path = take_screenshot(d, f"tld_detail_price_{int(time.time())}_{price_page_count}.jpg") + + curr_md5 = get_image_content_md5(price_screen_path, top_ratio=0.2, bottom_ratio=0.2) + if curr_md5 == last_price_md5: + logger.info("价格页面内容无变化,判定已触底") + if os.path.exists(price_screen_path): + os.remove(price_screen_path) + break + last_price_md5 = curr_md5 + temp_screenshots.append(price_screen_path) + + logger.info(f"已截取价格详情页第 {price_page_count + 1} 页: {price_screen_path},启动后台异步分析") + task = asyncio.create_task(self.read_image_kit.analyze_detail_price(price_screen_path)) + screenshot_tasks.append(task) + + logger.info("向上滚动列表,准备截取下一屏价格...") + d.swipe_ext("up", scale=0.8) + await asyncio.sleep(1.0) + price_page_count += 1 + + if screenshot_tasks: + logger.info(f"UI 操作已完成,等待 {len(screenshot_tasks)} 个后台分析任务结束...") + results = await asyncio.gather(*screenshot_tasks, return_exceptions=True) + + for res in results: + if isinstance(res, Exception): + logger.error(f"后台分析任务出错: {res}") + continue + if res: + for p in res: + is_duplicate = False + for existing in all_prices: + if p.get('start') == existing.get('start') and p.get('end') == existing.get('end'): + is_duplicate = True + for key in ['price', 'plus_price', 'market_price', 'elec_price', 'service_price']: + if p.get(key) is not None and (existing.get(key) is None or existing.get(key) == 0): + existing[key] = p[key] + break + if not is_duplicate: + all_prices.append(p) + except Exception as e: + logger.error(f"抓取价格详情过程中发生异常: {e}") + finally: + for task in screenshot_tasks: + if not task.done(): + task.cancel() + + for path in temp_screenshots: + if os.path.exists(path): + try: + os.remove(path) + except: + pass + + if all_prices: station_name_clean = clean_station_name(station_name) - logger.info(f"[详情页] 虽未获取价格,但已获取地址,尝试仅保存基础信息: {station_name_clean} | {address}") try: - await self.service.save_station_profile_only(station_name_clean, address) - except Exception as e: - logger.error(f"[详情页] 仅保存基础信息失败: {e}") - if os.path.exists(price_screen_path): - os.remove(price_screen_path) - - try: - while price_page_count < max_price_pages: - price_screen_path = take_screenshot(d, f"tld_detail_price_{int(time.time())}_{price_page_count}.jpg") - - # 校验页面是否发生滚动变化 - curr_md5 = get_image_content_md5(price_screen_path, top_ratio=0.2, bottom_ratio=0.2) - if curr_md5 == last_price_md5: - logger.info("价格页面内容无变化,判定已触底") - if os.path.exists(price_screen_path): os.remove(price_screen_path) - break - last_price_md5 = curr_md5 - temp_screenshots.append(price_screen_path) - - logger.info(f"已截取价格详情页第 {price_page_count + 1} 页: {price_screen_path},启动后台异步分析") - # 使用 asyncio.create_task 立即在后台开始执行分析 - task = asyncio.create_task(self.read_image_kit.analyze_detail_price(price_screen_path)) - screenshot_tasks.append(task) - - # 向上滚动列表(手指向上划),看后面的时段 - logger.info("向上滚动列表,准备截取下一屏价格...") - d.swipe_ext("up", scale=0.8) - await asyncio.sleep(1.0) - price_page_count += 1 - - # 等待所有后台分析任务完成 - if screenshot_tasks: - logger.info(f"UI 操作已完成,等待 {len(screenshot_tasks)} 个后台分析任务结束...") - results = await asyncio.gather(*screenshot_tasks, return_exceptions=True) - - for res in results: - if isinstance(res, Exception): - logger.error(f"后台分析任务出错: {res}") - continue - if res: - # 深度去重:根据时段 (start, end) 合并 - for p in res: - is_duplicate = False - for existing in all_prices: - if p.get('start') == existing.get('start') and p.get('end') == existing.get('end'): - is_duplicate = True - # 字段补全逻辑 - for key in ['price', 'plus_price', 'market_price', 'elec_price', 'service_price']: - if p.get(key) is not None and (existing.get(key) is None or existing.get(key) == 0): - existing[key] = p[key] - break - if not is_duplicate: - all_prices.append(p) - except Exception as e: - logger.error(f"抓取价格详情过程中发生异常: {e}") - finally: - # 无论是否异常,都要确保清理未完成的任务,避免 "never awaited" 警告 - for task in screenshot_tasks: - if not task.done(): - task.cancel() - - # 清理所有临时截图 - for path in temp_screenshots: - if os.path.exists(path): + all_prices.sort(key=lambda x: x.get('start', '00:00')) + except: + pass + logger.info(f"✅ 场站 {station_name_clean} 共提取到 {len(all_prices)} 条价格信息,准备保存...") + await self.service.save_station_data(station_name_clean, address, all_prices) + else: + logger.warning(f"❌ 未能提取到任何价格信息,请检查页面识别逻辑") + if address: + station_name_clean = clean_station_name(station_name) + logger.info(f"[详情页] 虽未获取价格,但已获取地址,尝试仅保存基础信息: {station_name_clean} | {address}") try: - os.remove(path) - except: - pass + await self.service.save_station_profile_only(station_name_clean, address) + except Exception as e: + logger.error(f"[详情页] 仅保存基础信息失败: {e}") - # 6. 保存数据 - if all_prices: - station_name_clean = clean_station_name(station_name) - # 对价格按时间排序 - try: - all_prices.sort(key=lambda x: x.get('start', '00:00')) - except: - pass - logger.info(f"✅ 场站 {station_name_clean} 共提取到 {len(all_prices)} 条价格信息,准备保存...") - await self.service.save_station_data(station_name_clean, address, all_prices) - else: - logger.warning(f"❌ 未能提取到任何价格信息,请检查页面识别逻辑") - # 清理价格页临时截图(详情页首屏截图已在异步任务中清理) - if os.path.exists(price_screen_path): - os.remove(price_screen_path) + if os.path.exists(price_screen_path): + os.remove(price_screen_path) async def crawl_list(self): """ diff --git a/Apps/TeLaiDian/ReadImageKit.py b/Apps/TeLaiDian/ReadImageKit.py index fcaeb35..586723c 100644 --- a/Apps/TeLaiDian/ReadImageKit.py +++ b/Apps/TeLaiDian/ReadImageKit.py @@ -10,7 +10,7 @@ if project_root not in sys.path: sys.path.append(project_root) from Util.VLMKit import VLMKit -from Apps.TeLaiDian.Kit import draw_rectangles, detect_cards_cv, setup_logger, read_image, detect_wide_rounded_card_cv +from Apps.TeLaiDian.Kit import draw_rectangles, setup_logger, read_image from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO, MIN_CARD_HEIGHT # 初始化日志 @@ -20,41 +20,71 @@ class ReadImageKit: def __init__(self): self.vlm = VLMKit() + async def find_price_tab_vlm(self, image_path): + """ + 使用 VLM 在详情页顶部标签栏中寻找“价格”标签的点击位置 + """ + prompt = """ + 分析这张特来电充电站详情页截图,找到顶部标签栏中“价格”两个字所在的点击区域中心。 + 要求: + 1. 仅在页面最上方的标签栏里查找,该标签栏通常包含“价格 / 终端 / 电站 / 评论 / 周边”等文字。 + 2. 不要选择下面“价格信息”模块中的数字(例如 1.0689 元/度)或其它文本。 + 3. 不要选择最顶端系统状态栏或返回按钮等区域。 + + 输出格式为 JSON: + { + "found": true/false, + "reason": "为什么认为这个位置是顶部“价格”标签", + "point": [x, y] // 归一化坐标,范围 [0-1000] + } + """ + try: + res_text = await self.vlm.analyze_image(image_path, prompt) + json_str = self.vlm.extract_json(res_text) + data = json.loads(json_str) + + if data.get("found") and data.get("point"): + p = data["point"] + img = read_image(image_path) + if img is not None: + h, w = img.shape[:2] + actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)] + bbox = [actual_p[0]-60, actual_p[1]-30, actual_p[0]+60, actual_p[1]+30] + draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p]) + logger.info(f"已生成价格标签诊断图片: {image_path.replace('.jpg', '_tab_vl.jpg')}") + + return data + except Exception as e: + logger.error(f"VLM 寻找价格标签失败: {e}") + return {"found": False} + async def find_price_entrance_vlm(self, image_path): """ 使用 VLM 在详情页寻找价格入口(如:1.1556元/度 的卡片或价格信息按钮) """ prompt = """ 分析这张特来电充电站详情页截图,找到进入“分时电价详情”的点击入口。 - 入口优先级和特征如下: - 1. 首选:页面中有文字“当前价”,其正下方一行通常是红色的电价数字, - 例如 “1.1317 元/度”、“1.1556 元/度”等,请优先选择这行红色价格数字所在区域。 - 2. 如果页面同时存在“停车参考价”“停车费参考价”等字样,请不要选择这些区域, - 只选择与充电“当前价”直接对应的红色价格数字。 - 3. 如果页面没有“当前价”字样,则可以退而求其次,选择明显用于展示 - 充电价格的卡片或按钮,例如写有“价格信息”“电价详情”的区域。 + 入口规则: + 1. 只选择“价格信息”模块中“当前价”下方的红色电价数字(例如 1.0689 元/度、1.3435 元/度)。 + 2. 排除底部悬浮条或底部操作区中的红色价格(靠近“扫码充电”“立即充电”等按钮的区域)。 + 3. 排除“停车参考价”“停车费参考价”等与停车相关的区域。 + 4. 禁止选择页面顶部的标签栏,例如“价格 / 终端 / 电站 / 评论 / 周边”这一行中的任何文字或区域。 + 5. 如果页面没有“当前价”,才选择用于展示充电价格的按钮,如“价格信息”“电价详情”。 + + 位置约束(尽量满足): + - Y 位置位于价格信息模块区域内:明显在顶部标签栏下方、在底部悬浮条上方。 + - X 位置应位于左侧价格列区域(当前价所在列),避免会员价右侧列。 请判断符合上述规则的价格入口是否存在,并给出其中心坐标。 输出格式为 JSON: { "found": true/false, - "reason": "为什么认为这是入口(说明是否基于当前价红色价格)", - "point": [x, y], // 归一化坐标 [0-1000],例如 [500, 600] 代表屏幕中心偏下 + "reason": "为什么认为这是入口(说明是否基于当前价红色价格,并确认未选顶部标签栏或底部悬浮条)", + "point": [x, y], "type": "price_card" / "button" } """ try: - cv_bboxes = detect_wide_rounded_card_cv(image_path, min_width_ratio=0.8, min_y_ratio=0.5) - if cv_bboxes: - img = read_image(image_path) - if img is not None: - h, w = img.shape[:2] - bx1, by1, bx2, by2 = cv_bboxes[0] - cx = (bx1 + bx2) // 2 - cy = (by1 + by2) // 2 - draw_rectangles(image_path, bboxes=[cv_bboxes[0]], click_points=[[cx, cy]]) - return {"found": True, "reason": "cv", "point": [int(cx * 1000 / w), int(cy * 1000 / h)], "type": "price_card"} - res_text = await self.vlm.analyze_image(image_path, prompt) json_str = self.vlm.extract_json(res_text) data = json.loads(json_str) @@ -68,7 +98,7 @@ class ReadImageKit: h, w = img.shape[:2] actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)] # 生成虚拟 bbox - bbox = [actual_p[0]-50, actual_p[1]-30, actual_p[0]+50, actual_p[1]+30] + bbox = [actual_p[0]-60, actual_p[1]-40, actual_p[0]+60, actual_p[1]+40] draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p]) logger.info(f"已生成价格入口诊断图片: {image_path.replace('.jpg', '_vl.jpg')}") @@ -119,6 +149,44 @@ class ReadImageKit: logger.error(f"VLM 寻找关闭按钮失败: {e}") return {"has_ad": False} + async def check_warm_popup_vlm(self, image_path): + """ + 检测详情页是否存在“温馨提示/下次再说”弹窗 + """ + prompt = """ + 分析这张特来电充电站详情页截图,判断是否存在带有“下次再说”或“下次现说”文案的温馨提示弹窗。 + 要求: + 1. 只关注覆盖在详情页上方的弹窗或遮罩,其上包含“下次再说”“下次现说”等文字按钮。 + 2. 不要将正常页面中的列表项、价格卡片、终端状态等区域误判为弹窗。 + 3. 如果存在该弹窗,请给出“下次再说”按钮的大致点击中心位置。 + + 输出格式为 JSON: + { + "has_popup": true/false, + "reason": "为什么认为有或没有温馨提示弹窗", + "button_point": [x, y] // 归一化坐标,范围 [0-1000],没有则为 null + } + """ + try: + res_text = await self.vlm.analyze_image(image_path, prompt) + json_str = self.vlm.extract_json(res_text) + data = json.loads(json_str) + + if data.get("has_popup") and data.get("button_point"): + p = data["button_point"] + img = read_image(image_path) + if img is not None: + h, w = img.shape[:2] + actual_p = [int(p[0] * w / 1000), int(p[1] * h / 1000)] + bbox = [actual_p[0]-80, actual_p[1]-40, actual_p[0]+80, actual_p[1]+40] + draw_rectangles(image_path, bboxes=[bbox], click_points=[actual_p]) + logger.info(f"已生成温馨提示弹窗诊断图片: {image_path.replace('.jpg', '_warm_vl.jpg')}") + + return data + except Exception as e: + logger.error(f"VLM 检测温馨提示弹窗失败: {e}") + return {"has_popup": False} + async def check_wrong_page_vlm(self, image_path): """ 检查是否误触进入了错误的页面(如:新人福利、我的卡券、活动页等) diff --git a/Apps/TeLaiDian/Template/qbsd.jpg b/Apps/TeLaiDian/Template/qbsd.jpg index 3a5fcb4..3ea15a6 100644 Binary files a/Apps/TeLaiDian/Template/qbsd.jpg and b/Apps/TeLaiDian/Template/qbsd.jpg differ diff --git a/Apps/TelaiDian/__pycache__/Crawler.cpython-310.pyc b/Apps/TelaiDian/__pycache__/Crawler.cpython-310.pyc index 21a0885..62f6e20 100644 Binary files a/Apps/TelaiDian/__pycache__/Crawler.cpython-310.pyc and b/Apps/TelaiDian/__pycache__/Crawler.cpython-310.pyc differ diff --git a/Apps/TelaiDian/__pycache__/ReadImageKit.cpython-310.pyc b/Apps/TelaiDian/__pycache__/ReadImageKit.cpython-310.pyc index 47993e9..bae86b9 100644 Binary files a/Apps/TelaiDian/__pycache__/ReadImageKit.cpython-310.pyc and b/Apps/TelaiDian/__pycache__/ReadImageKit.cpython-310.pyc differ diff --git a/Config/__pycache__/Config.cpython-310.pyc b/Config/__pycache__/Config.cpython-310.pyc index 9888431..a1690e3 100644 Binary files a/Config/__pycache__/Config.cpython-310.pyc and b/Config/__pycache__/Config.cpython-310.pyc differ diff --git a/T4_TeLaiDian_Simple.py b/T4_TeLaiDian_Simple.py deleted file mode 100644 index ca8bb2b..0000000 --- a/T4_TeLaiDian_Simple.py +++ /dev/null @@ -1,42 +0,0 @@ -# coding=utf-8 -import sys -import os -import asyncio -import time -import logging -import uiautomator2 as u2 - -project_root = os.path.dirname(os.path.abspath(__file__)) -if project_root not in sys.path: - sys.path.append(project_root) - -from Apps.TeLaiDian import Kit -from Apps.TeLaiDian.ReadImageKit import ReadImageKit - -logger = Kit.setup_logger("T4_TeLaiDian_Simple", clear_old_log=True) - -async def run_simple(): - Kit.clear_temp_dir() - d = u2.connect() - w, h = d.window_size() - logger.info(f"开始简单流程,当前窗口: {w}x{h}") - logger.info("执行显式下拉刷新以校准位置") - d.swipe(w // 2, int(h * 0.3), w // 2, int(h * 0.8), duration=0.5) - await asyncio.sleep(2.5) - - screenshot_path = Kit.take_screenshot(d, f"tld_list_{int(time.time())}.jpg") - logger.info(f"列表页截图: {screenshot_path}") - - rik = ReadImageKit() - stations = await rik.analyze_station_list(screenshot_path) - logger.info(f"识别到场站数量: {len(stations)}") - for i, s in enumerate(stations[:10]): - logger.info(f"[{i+1}] {s.get('name')} | point={s.get('point')} | bbox={s.get('bbox')}") - - logger.info("简单流程结束") - -if __name__ == "__main__": - try: - asyncio.run(run_simple()) - except KeyboardInterrupt: - logger.info("用户中断") diff --git a/Tools/T_TeLaiDian_ScrollTest.py b/Tools/T_TeLaiDian_ScrollTest.py new file mode 100644 index 0000000..9b7a2d3 --- /dev/null +++ b/Tools/T_TeLaiDian_ScrollTest.py @@ -0,0 +1,114 @@ +# coding=utf-8 +import asyncio +import os +import sys +import time +import uiautomator2 as u2 +import cv2 + +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if project_root not in sys.path: + sys.path.append(project_root) + +from Apps.TeLaiDian.Kit import setup_logger, take_screenshot, read_image, save_image +from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO + + +logger = setup_logger("TeLaiDianScrollTest", clear_old_log=False) + +# 根据最新红点截图重新估算的顶部“价格”标签归一化坐标(0-1000) +# 这里调成:X ≈ 22% 屏宽,Y ≈ 13% 屏高 +PRICE_TAB_X_NORM = 220 +PRICE_TAB_Y_NORM = 130 + +# 价格信息卡片中左侧“当前价”红色数字的大致归一化坐标(0-1000) +# 估算:X ≈ 23% 屏宽,Y ≈ 38% 屏高 +PRICE_ENTRY_X_NORM = 230 +PRICE_ENTRY_Y_NORM = 380 + + +async def run_scroll_test(): + d = u2.connect() + w, h = d.window_size() + + logger.info("=== 特来电详情页第2页滚动安全性测试开始 ===") + logger.info("请先手动进入某个场站详情页的第2页状态,然后执行本脚本。脚本会尽可能多次大幅向上滑动,直到页面不再变化。") + + first_screen = take_screenshot(d, f"tld_scrolltest_start_{int(time.time())}.jpg") + logger.info(f"[测试] 起始界面截图: {first_screen}") + + last_md5 = None + stable_count = 0 + max_round = 30 + + from Apps.TeLaiDian.Kit import get_image_content_md5 + + for idx in range(max_round): + # 1. 先执行一次“大力向上滑动” + start_x = int(w * 0.9) + start_y = int(h * 0.85) + end_y = int(h * 0.25) + logger.info(f"[测试] 第 {idx + 1} 轮大幅向上滑动: ({start_x}, {start_y}) -> ({start_x}, {end_y})") + d.swipe(start_x, start_y, start_x, end_y, 0.25) + await asyncio.sleep(1.0) + + # 2. 滑动完成后再截图、比较内容是否还在变化 + screen_path = take_screenshot(d, f"tld_scrolltest_{int(time.time())}_{idx}.jpg") + logger.info(f"[测试] 第 {idx + 1} 轮滑动后的截图: {screen_path}") + curr_md5 = get_image_content_md5( + screen_path, + top_ratio=SAFE_EXCLUDE_RATIO, + bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO, + ) + if last_md5 is not None and curr_md5 == last_md5: + stable_count += 1 + logger.info(f"[测试] 页面内容连续第 {stable_count} 次无变化") + else: + stable_count = 0 + last_md5 = curr_md5 + + if stable_count >= 2: + logger.info("[测试] 检测到页面多次无变化,认为已到达顶部固定区域,提前结束测试。") + break + + final_screen = take_screenshot(d, f"tld_scrolltest_end_{int(time.time())}.jpg") + logger.info(f"[测试] 结束时界面截图: {final_screen}") + + # 使用写死的归一化坐标点击顶部“价格”标签 + tab_x = int(PRICE_TAB_X_NORM * w / 1000) + tab_y = int(PRICE_TAB_Y_NORM * h / 1000) + logger.info(f"[测试] 使用固定归一化坐标点击顶部“价格”标签: 归一化({PRICE_TAB_X_NORM}, {PRICE_TAB_Y_NORM}) -> 像素({tab_x}, {tab_y})") + try: + d.click(tab_x, tab_y) + await asyncio.sleep(1.0) + after_tab_screen = take_screenshot(d, f"tld_scrolltest_after_price_tab_{int(time.time())}.jpg") + logger.info(f"[测试] 点击顶部“价格”标签后的界面截图: {after_tab_screen}") + try: + img = read_image(after_tab_screen) + if img is not None: + cv2.circle(img, (tab_x, tab_y), 20, (0, 0, 255), -1) + + entry_x = int(PRICE_ENTRY_X_NORM * w / 1000) + entry_y = int(PRICE_ENTRY_Y_NORM * h / 1000) + cv2.circle(img, (entry_x, entry_y), 20, (0, 0, 255), -1) + + debug_path = after_tab_screen.replace(".jpg", f"_click_{tab_x}_{tab_y}_price_{entry_x}_{entry_y}.jpg") + save_image(debug_path, img) + logger.info(f"[测试] 已在截图上标记价格标签和下方每度价格的红点: {debug_path}") + else: + logger.warning(f"[测试] 加载点击后截图失败,无法绘制红点: {after_tab_screen}") + except Exception as e: + logger.error(f"[测试] 绘制价格标签或下方价格红点失败: {e}") + except Exception as e: + logger.error(f"[测试] 点击顶部“价格”标签失败: {e}") + + logger.info("=== 特来电详情页第2页滚动安全性测试结束 ===") + + +if __name__ == "__main__": + try: + asyncio.run(run_scroll_test()) + except KeyboardInterrupt: + logger.info("用户中断了滚动测试。") + except Exception as e: + logger.exception(f"滚动测试运行异常: {e}") diff --git a/debug_cv.py b/debug_cv.py deleted file mode 100644 index b7a86f3..0000000 --- a/debug_cv.py +++ /dev/null @@ -1,93 +0,0 @@ - -import sys -import os -import cv2 -import numpy as np - -sys.path.append(os.getcwd()) -from Apps.TeLaiDian.Kit import detect_cards_cv as real_detect_cards_cv -from Apps.TeLaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO - -def read_image(path): - if not path or not os.path.exists(path): - return None - try: - data = np.fromfile(path, dtype=np.uint8) - if data.size == 0: - return None - img = cv2.imdecode(data, -1) - return img - except Exception as e: - print(f"Error reading image {path}: {e}") - return None - -def detect_cards_cv(image_path, top_ratio=None, bottom_ratio=None): - if top_ratio is None: - top_ratio = SAFE_EXCLUDE_RATIO - if bottom_ratio is None: - bottom_ratio = BOTTOM_SAFE_EXCLUDE_RATIO - MIN_CARD_HEIGHT = 150 # Assuming default from Setting - - img = read_image(image_path) - if img is None: - print("Image not found or invalid") - return [] - - h, w = img.shape[:2] - print(f"Image Size: {w}x{h}") - - # 转换为灰度图 - gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - - # 限制检测范围 - top_limit = int(h * top_ratio) - bottom_limit = int(h * (1 - bottom_ratio)) - print(f"CV limits: top={top_limit}, bottom={bottom_limit}, threshold_y={int(h * 0.58)}") - - # 使用自适应阈值 - thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2) - - # 闭运算 - kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (w // 4, 3)) - closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) - - # 寻找轮廓 - contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - - min_card_width = int(w * 0.8) - - for cnt in contours: - x, y, cw, ch = cv2.boundingRect(cnt) - center_y = y + ch // 2 - - rect_area = cw * ch - cnt_area = cv2.contourArea(cnt) - extent = cnt_area / rect_area if rect_area > 0 else 0 - approx = cv2.approxPolyDP(cnt, 0.02 * cv2.arcLength(cnt, True), True) - - ok_width = cw >= min_card_width - ok_height = ch > MIN_CARD_HEIGHT * 0.8 - ok_vertical = center_y >= int(h * 0.58) and y > top_limit and y + ch < bottom_limit - - # Check green ratio - hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) - roi = hsv[max(0,y):min(h,y+ch), max(0,x):min(w,x+cw)] - green_mask = cv2.inRange(roi, np.array([35, 80, 80]), np.array([85, 255, 255])) - green_ratio = float(cv2.countNonZero(green_mask)) / (roi.shape[0]*roi.shape[1]) if roi.size > 0 else 0.0 - - ok_color = green_ratio < 0.25 - - if cw > w * 0.5: # Only print large enough boxes - print(f"Box: y={y}, h={ch}, w={cw}, center_y={center_y}, extent={extent:.2f}, green={green_ratio:.2f}") - print(f" Checks: width={ok_width}, height={ok_height}, vertical={ok_vertical}, color={ok_color}") - -image_path = r"d:\dsWork\aiData\Output\tld_list_1768359492_flag.jpg" -# Try the original if flag doesn't exist or is modified -original_path = r"d:\dsWork\aiData\Output\tld_list_1768359492.jpg" - -if os.path.exists(original_path): - print(f"Testing original image: {original_path}") - detect_cards_cv(original_path) -else: - print(f"Original image not found, trying flag: {image_path}") - detect_cards_cv(image_path)