From b3a2bd4816a932e072bcfe3978a2dea964ab675b Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Tue, 13 Jan 2026 20:36:20 +0800 Subject: [PATCH] 'commit' --- Apps/AiTeJiYiChong/Crawler.py | 82 +++++++------- Apps/YiLaiTe/Config/Setting.py | 38 +++++++ Apps/YiLaiTe/Crawler.py | 190 +++++++++++++++++++++++++++++++++ Apps/YiLaiTe/Kit.py | 156 +++++++++++++++++++++++++++ Apps/YiLaiTe/Opener.py | 85 +++++++++++++++ Apps/YiLaiTe/ReadImageKit.py | 114 ++++++++++++++++++++ Apps/YiLaiTe/Service.py | 132 +++++++++++++++++++++++ T3_YiLaiTe.py | 71 ++++++++++++ Util/VLMKit.py | 66 ++++++++++++ 9 files changed, 893 insertions(+), 41 deletions(-) create mode 100644 Apps/YiLaiTe/Config/Setting.py create mode 100644 Apps/YiLaiTe/Crawler.py create mode 100644 Apps/YiLaiTe/Kit.py create mode 100644 Apps/YiLaiTe/Opener.py create mode 100644 Apps/YiLaiTe/ReadImageKit.py create mode 100644 Apps/YiLaiTe/Service.py create mode 100644 T3_YiLaiTe.py create mode 100644 Util/VLMKit.py diff --git a/Apps/AiTeJiYiChong/Crawler.py b/Apps/AiTeJiYiChong/Crawler.py index fd2b558..4ece46c 100644 --- a/Apps/AiTeJiYiChong/Crawler.py +++ b/Apps/AiTeJiYiChong/Crawler.py @@ -232,50 +232,50 @@ async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT): if before_click_md5 != after_click_md5: entered_price_page = True # 抓取三级页面图片 - price_screenshots = [] - price_screenshots.append(after_click_path) - - # 滑动处理 - last_price_md5 = after_click_md5 - for scroll_idx in range(2): - d.swipe(w // 2, int(h * 0.7), w // 2, int(h * 0.3), duration=0.4) - await asyncio.sleep(1) - scroll_path = take_screenshot(d, f"price_scroll_{scroll_idx}_{station_name}", save_dir=TEMP_IMAGE_DIR) - curr_price_md5 = Kit.get_image_content_md5( - scroll_path, - top_ratio=SAFE_EXCLUDE_RATIO, + price_screenshots = [] + price_screenshots.append(after_click_path) + + # 滑动处理 + last_price_md5 = after_click_md5 + for scroll_idx in range(2): + d.swipe(w // 2, int(h * 0.7), w // 2, int(h * 0.3), duration=0.4) + await asyncio.sleep(1) + scroll_path = take_screenshot(d, f"price_scroll_{scroll_idx}_{station_name}", save_dir=TEMP_IMAGE_DIR) + curr_price_md5 = Kit.get_image_content_md5( + scroll_path, + top_ratio=SAFE_EXCLUDE_RATIO, + bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO + ) + if curr_price_md5 == last_price_md5: break + price_screenshots.append(scroll_path) + last_price_md5 = curr_price_md5 + + # 后台处理价格图片 + logger.info(f"已启动后台分析价格页: {station_name},共 {len(price_screenshots)} 张图") + task_price = asyncio.create_task(analyze_prices_background(service, station_name, price_screenshots)) + background_tasks.append(task_price) + + # 立即返回 + d.press("back") + await asyncio.sleep(0.5) + + # 检查返回后的页面状态 + check_back_path = take_screenshot(d, f"check_back_{detail_uuid}", save_dir=TEMP_IMAGE_DIR) + + check_list_md5 = Kit.get_image_content_md5( + check_back_path, + top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO ) - if curr_price_md5 == last_price_md5: break - price_screenshots.append(scroll_path) - last_price_md5 = curr_price_md5 + check_detail_md5 = Kit.get_image_content_md5(check_back_path) + + if os.path.exists(check_back_path): os.remove(check_back_path) - # 后台处理价格图片 - logger.info(f"已启动后台分析价格页: {station_name},共 {len(price_screenshots)} 张图") - task_price = asyncio.create_task(analyze_prices_background(service, station_name, price_screenshots)) - background_tasks.append(task_price) - - # 立即返回 - d.press("back") - await asyncio.sleep(0.5) - - # 检查返回后的页面状态 - check_back_path = take_screenshot(d, f"check_back_{detail_uuid}", save_dir=TEMP_IMAGE_DIR) - - check_list_md5 = Kit.get_image_content_md5( - check_back_path, - top_ratio=SAFE_EXCLUDE_RATIO, - bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO - ) - check_detail_md5 = Kit.get_image_content_md5(check_back_path) - - if os.path.exists(check_back_path): os.remove(check_back_path) - - if check_list_md5 == current_md5: - logger.info("检测到已直接返回列表页,跳过后续返回操作。") - should_back_to_list = False - elif check_detail_md5 == before_click_md5: - logger.info("检测到返回详情页,需执行第二次返回。") + if check_list_md5 == current_md5: + logger.info("检测到已直接返回列表页,跳过后续返回操作。") + should_back_to_list = False + elif check_detail_md5 == before_click_md5: + logger.info("检测到返回详情页,需执行第二次返回。") except Exception as e: logger.error(f"处理三级页面详情异常 ({station_name}): {e}") diff --git a/Apps/YiLaiTe/Config/Setting.py b/Apps/YiLaiTe/Config/Setting.py new file mode 100644 index 0000000..3e4e79a --- /dev/null +++ b/Apps/YiLaiTe/Config/Setting.py @@ -0,0 +1,38 @@ + +# 采集配置 +# 滑动距离比例 (0.1 ~ 0.9),数值越大滑动幅度越大 +SCROLL_DISTANCE_RATIO = 0.5 +# 最大采集场站数量,达到此次数后停止采集 +MAX_STATIONS_COUNT = 100 +# 场站去重过期时间(秒),在此时间内重复出现的场站不会再次点击进入详情页 +REDIS_STATION_EXPIRE = 120 +# 数据库数据保留时长(天),超过此时长的历史数据(is_current=0)将被删除 +DATA_RETENTION_DAYS = 365 + +# 测试配置 +# [Testing] 是否在启动时清除 Redis 中的已爬取记录 (用于快速重复测试) +TEST_CLEAR_REDIS = False + +# 调试绘图配置 +# [Debug] 是否在图片上绘制识别出的绿色矩形框 (计算机图形学可视化) +DRAW_DEBUG_BOXES = True +DEBUG_BOX_COLOR = (0, 255, 0) # BGR 格式的绿色 +DEBUG_BOX_THICKNESS = 3 # 线条粗细 + +# 等待时间配置 (秒) +# 点击进入详情页后等待加载的时间 +WAIT_DETAIL_PAGE_LOAD = 2.0 +# 从详情页返回列表页后等待页面刷新的时间 +WAIT_BACK_TO_LIST = 0.8 +# 执行滑动操作后等待页面内容加载和稳定的时间 +WAIT_AFTER_SCROLL = 2.0 + +# 坐标计算与安全防护 +# 屏幕顶部安全排除比例 (0.0~1.0),此比例区域内不进行点击(避开状态栏、顶部菜单、横幅广告等) +SAFE_EXCLUDE_RATIO = 0.15 +# 屏幕底部安全排除比例 (0.0~1.0),此比例区域内不进行点击(避开底部导航栏、功能按钮等) +BOTTOM_SAFE_EXCLUDE_RATIO = 0.10 +# 默认回退屏幕宽度,当无法自动获取设备信息时使用 +FALLBACK_WIDTH = 1080 +# 默认回退屏幕高度,当无法自动获取设备信息时使用 +FALLBACK_HEIGHT = 2400 diff --git a/Apps/YiLaiTe/Crawler.py b/Apps/YiLaiTe/Crawler.py new file mode 100644 index 0000000..2d24698 --- /dev/null +++ b/Apps/YiLaiTe/Crawler.py @@ -0,0 +1,190 @@ +import asyncio +import logging +import uuid +import os +import sys +import json +import time +from datetime import datetime +from Apps.YiLaiTe.Kit import take_screenshot, clean_station_name, get_image_content_md5 +from Apps.YiLaiTe.ReadImageKit import ReadImageKit +from Apps.YiLaiTe.Service import YiLaiTeService +from Apps.YiLaiTe.Config.Setting import ( + SCROLL_DISTANCE_RATIO, WAIT_AFTER_SCROLL, MAX_STATIONS_COUNT, + WAIT_DETAIL_PAGE_LOAD, WAIT_BACK_TO_LIST, TEST_CLEAR_REDIS, + SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO +) +from Util.RedisKit import RedisKit +from Core.BaseCrawler import BaseCrawler +import uiautomator2 as u2 + +# 项目根目录处理 +project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +if project_root not in sys.path: + sys.path.append(project_root) + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger("YiLaiTeCrawler") + +class YiLaiTeCrawler(BaseCrawler): + def __init__(self, service=None): + super().__init__(service or YiLaiTeService()) + self.read_image_kit = ReadImageKit() + self.redis_kit = RedisKit() + + async def start(self): + """ + 启动入口 + """ + await main(self.service) + + async def clean_redis_data(self): + """ + 清除测试用的 Redis 记录 + """ + if TEST_CLEAR_REDIS: + logger.info("清理 Redis 中的场站处理记录...") + pattern = "crawled:ylt:*" + keys = await self.redis_kit.client.keys(pattern) + if keys: + await self.redis_kit.client.delete(*keys) + + async def crawl_list_logic(self, d): + processed_count = 0 + no_new_data_count = 0 + last_md5 = None + background_tasks = [] + + while processed_count < MAX_STATIONS_COUNT: + # 1. 截图并分析 + screenshot_path = take_screenshot(d, f"list_{int(time.time())}.jpg") + + # 检测是否滚动到底部 + curr_md5 = get_image_content_md5(screenshot_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO) + if last_md5 == curr_md5: + logger.info("内容无变化,判定已到底部") + if os.path.exists(screenshot_path): os.remove(screenshot_path) + break + last_md5 = curr_md5 + + stations = await self.read_image_kit.analyze_station_list(screenshot_path) + if not stations: + no_new_data_count += 1 + if no_new_data_count >= 5: + logger.info("连续 5 页无新数据,停止") + break + # 滑动 + d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO) + await asyncio.sleep(WAIT_AFTER_SCROLL) + continue + + no_new_data_count = 0 + new_stations_in_page = 0 + for station in stations: + if processed_count >= MAX_STATIONS_COUNT: + break + + name = station.get('name') + point = station.get('point') + + if not name or not point: + continue + + # 去重 + redis_key = f"crawled:ylt:{clean_station_name(name)}" + if await self.redis_kit.get_data(redis_key): + continue + + # 点击进入 + logger.info(f">>> 发现新场站: {name} 坐标: {point}") + d.click(point[0], point[1]) + await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD) + + # 分析详情页 (采用异步后台模式) + detail_shot = take_screenshot(d, f"detail_{clean_station_name(name)}_{int(time.time())}.jpg") + + # 启动后台任务处理详情页 + task = asyncio.create_task(self.analyze_detail_background(name, detail_shot)) + background_tasks.append(task) + + processed_count += 1 + new_stations_in_page += 1 + await self.redis_kit.set_data(redis_key, "1", ex=86400*7) + + # 返回 + d.press("back") + await asyncio.sleep(WAIT_BACK_TO_LIST) + + if new_stations_in_page == 0 and stations: + no_new_data_count += 1 + if no_new_data_count >= 5: + logger.info("连续 5 页均无新场站,停止") + break + else: + no_new_data_count = 0 + + # 滑动翻页 + d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO) + await asyncio.sleep(WAIT_AFTER_SCROLL) + + # 等待后台任务完成 + if background_tasks: + logger.info(f"等待 {len(background_tasks)} 个后台分析任务完成...") + await asyncio.gather(*background_tasks, return_exceptions=True) + + return processed_count + + async def analyze_detail_background(self, station_name, image_path): + """ + 后台异步分析详情页 + """ + try: + logger.info(f"开始后台分析场站: {station_name}") + prices = await self.read_image_kit.analyze_detail_price(image_path) + if prices: + await self.service.process_price_detail_data(station_name, prices) + logger.info(f"场站 {station_name} 价格分析完成并入库") + else: + logger.warning(f"场站 {station_name} 未识别到价格信息") + except Exception as e: + logger.error(f"后台分析 {station_name} 失败: {e}") + finally: + if os.path.exists(image_path): + os.remove(image_path) + +async def main(service=None): + if service is None: + service = YiLaiTeService() + await service.init_db() + + crawler = YiLaiTeCrawler(service) + d = u2.connect() + + # 清理 Redis + await crawler.clean_redis_data() + + # 记录任务日志 + task_id = str(uuid.uuid4()) + await service.log_task_start(task_id) + + total_count = 0 + status = "success" + error_msg = None + + try: + total_count = await crawler.crawl_list_logic(d) + except Exception as e: + logger.error(f"主流程执行失败: {e}") + status = "failed" + error_msg = str(e) + finally: + await service.log_task_end(task_id, total_count, status, error_msg) + # 如果是内部初始化的 service,则关闭 + if service and not isinstance(service, YiLaiTeService): + await service.close_db() + +async def get_image_md5_async(path): + return get_image_md5(path) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/Apps/YiLaiTe/Kit.py b/Apps/YiLaiTe/Kit.py new file mode 100644 index 0000000..9269dfb --- /dev/null +++ b/Apps/YiLaiTe/Kit.py @@ -0,0 +1,156 @@ +# coding=utf-8 +import os +import time +import logging + +logger = logging.getLogger(__name__) + +def clean_station_name(name): + """ + 清理场站名称,去除特殊字符和距离信息 + """ + if not name: + return "" + # 驿来特可能特有的清理逻辑 + import re + # 移除常见的括号备注 + name = re.sub(r'\(.*?\)', '', name) + name = re.sub(r'(.*?)', '', name) + return name.strip() + +def take_screenshot(d, filename, path=None): + """ + 获取屏幕截图并保存 + """ + if not path: + from Config.Config import TEMP_IMAGE_DIR + path = TEMP_IMAGE_DIR + + if not os.path.exists(path): + os.makedirs(path) + + full_path = os.path.join(path, filename) + d.screenshot(full_path) + return full_path + +import hashlib +import numpy as np +import cv2 + +def get_file_md5(file_path): + """计算文件的 MD5 值""" + if not os.path.exists(file_path): + return None + hash_md5 = hashlib.md5() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + +def get_image_content_md5(file_path, top_ratio=0.1, bottom_ratio=0.1): + """ + 计算图片核心内容的 MD5 值(排除状态栏和导航栏) + """ + img = read_image(file_path) + if img is None: + return None + + h, w = img.shape[:2] + top = int(h * top_ratio) + bottom = int(h * (1 - bottom_ratio)) + + # 裁剪中间部分 + content = img[top:bottom, :] + + # 将图片数据转换为字节流计算 MD5 + success, encoded_img = cv2.imencode(".jpg", content) + if success: + return hashlib.md5(encoded_img.tobytes()).hexdigest() + return hashlib.md5(content.tobytes()).hexdigest() + +def read_image(path): + """读取图片,支持中文路径""" + if not path or not os.path.exists(path): + return None + try: + data = np.fromfile(path, dtype=np.uint8) + if data.size == 0: + return None + img = cv2.imdecode(data, -1) + return img + except Exception as e: + logger.error(f"Error reading image {path}: {e}") + return None + +def save_image(path, img): + """保存图片,支持中文路径""" + try: + ext = os.path.splitext(path)[1] + if not ext: + ext = ".jpg" + cv2.imencode(ext, img)[1].tofile(path) + return True + except Exception as e: + logger.error(f"Error saving image {path}: {e}") + return False + +def clear_temp_dir(save_dir=None): + """清空临时目录中的所有文件""" + if save_dir is None: + from Config.Config import TEMP_IMAGE_DIR + save_dir = TEMP_IMAGE_DIR + + if not os.path.exists(save_dir): + return + + for f in os.listdir(save_dir): + file_path = os.path.join(save_dir, f) + try: + if os.path.isfile(file_path): + os.remove(file_path) + except Exception as e: + logger.error(f"Error deleting file {file_path}: {e}") + +def get_image_md5(image_path): + """ + 计算图片的 MD5 值 + """ + import hashlib + if not os.path.exists(image_path): + return "" + with open(image_path, 'rb') as f: + data = f.read() + return hashlib.md5(data).hexdigest() + +def draw_rectangles(image_path, points, output_path=None): + """ + 使用 OpenCV 在图片上绘制矩形框 (计算机图形学绘制) + :param image_path: 原始图片路径 + :param points: 坐标点列表,格式如 [[x1, y1, x2, y2], ...] 或 [(x, y), ...] + :param output_path: 输出图片路径,如果不指定则覆盖原图 + """ + try: + import cv2 + import numpy as np + from Config.Setting import DRAW_DEBUG_BOXES, DEBUG_BOX_COLOR, DEBUG_BOX_THICKNESS + + if not DRAW_DEBUG_BOXES: + return image_path + + img = cv2.imread(image_path) + if img is None: + return image_path + + for p in points: + if len(p) == 4: # 矩形 [x1, y1, x2, y2] + cv2.rectangle(img, (int(p[0]), int(p[1])), (int(p[2]), int(p[3])), DEBUG_BOX_COLOR, DEBUG_BOX_THICKNESS) + elif len(p) == 2: # 点 (x, y),画一个小圆圈或正方形表示点击位 + center = (int(p[0]), int(p[1])) + cv2.circle(img, center, 10, DEBUG_BOX_COLOR, -1) + + save_path = output_path if output_path else image_path + cv2.imwrite(save_path, img) + return save_path + except Exception as e: + logger.error(f"绘制矩形框失败: {e}") + return image_path diff --git a/Apps/YiLaiTe/Opener.py b/Apps/YiLaiTe/Opener.py new file mode 100644 index 0000000..deaa671 --- /dev/null +++ b/Apps/YiLaiTe/Opener.py @@ -0,0 +1,85 @@ +# coding=utf-8 +import asyncio +import logging +import os +import time +import uiautomator2 as u2 +import uuid +from Apps.YiLaiTe.Kit import take_screenshot +from Apps.YiLaiTe.ReadImageKit import ReadImageKit +from Config.Config import TEMP_IMAGE_DIR + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("OpenYiLaiTe") + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + +async def check_and_close_ad(d): + """ + 检测并关闭广告弹窗 + """ + max_loops = 2 + w, h = d.window_size() + device_info = {'displayWidth': w, 'displayHeight': h} + + for loop_idx in range(max_loops): + logger.info(f"开始第 {loop_idx + 1} 轮广告检测...") + + image_uuid = f"ylt_ad_check_{int(time.time())}" + screenshot_path = take_screenshot(d, image_uuid, save_dir=TEMP_IMAGE_DIR) + + ad_result = await ReadImageKit.detect_ad_popup(screenshot_path, device_info=device_info) + + if ad_result: + x, y = ad_result['x'], ad_result['y'] + ad_type = ad_result.get("ad_type", "unknown") + logger.info(f"检测到广告 [{ad_type}],坐标: ({x}, {y}),执行关闭操作...") + d.click(x, y) + await asyncio.sleep(2.0) + if os.path.exists(screenshot_path): os.remove(screenshot_path) + else: + logger.info("未检测到广告弹窗。") + if os.path.exists(screenshot_path): os.remove(screenshot_path) + break + return True + +async def open_mini_program(): + """ + 进入微信小程序: 驿来特 + """ + d = u2.connect() + logger.info("执行进入小程序: 驿来特") + + # 1. 启动微信 + logger.info("启动微信...") + d.app_start("com.tencent.mm", stop=True) + await asyncio.sleep(5) + + # 2. 点击搜索按钮 (坐标适配大部分微信版本) + logger.info("点击搜索按钮...") + w, h = d.window_size() + d.click(int(w * 0.84), int(h * 0.08)) + await asyncio.sleep(2) + + # 3. 输入搜索内容 + logger.info("输入搜索内容: 驿来特") + d.send_keys("驿来特") + await asyncio.sleep(3) + + # 4. 点击搜索结果中的第一个小程序 + logger.info("点击搜索结果中的小程序...") + d.click(int(w * 0.5), int(h * 0.18)) + + logger.info("等待小程序加载...") + await asyncio.sleep(10) + + # 5. 广告检测 + await check_and_close_ad(d) + + return True + +if __name__ == "__main__": + asyncio.run(open_mini_program()) diff --git a/Apps/YiLaiTe/ReadImageKit.py b/Apps/YiLaiTe/ReadImageKit.py new file mode 100644 index 0000000..362c402 --- /dev/null +++ b/Apps/YiLaiTe/ReadImageKit.py @@ -0,0 +1,114 @@ +# coding=utf-8 +import logging +import os +import sys + +# Ensure sys path includes root for imports if not already +project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +if project_root not in sys.path: + sys.path.append(project_root) + +from Util.VLMKit import VLMKit + +import json +import re +from Apps.YiLaiTe.Kit import draw_rectangles +from Apps.YiLaiTe.Config.Setting import DRAW_DEBUG_BOXES + +logger = logging.getLogger(__name__) + +class ReadImageKit: + def __init__(self): + self.vlm = VLMKit() + + @classmethod + async def detect_ad_popup(cls, image_path, device_info=None): + """ + 检测图片中是否存在广告弹窗,并返回关闭按钮坐标 + """ + vlm = VLMKit() + prompt = """ + 请仔细检查这张图片中是否存在**弹窗广告**或**悬浮广告**。 + 广告可能有以下几种形式: + 1. 屏幕中央的大型弹窗广告:通常遮挡了页面内容,内容多为优惠券、活动推广等。 + 2. 悬浮广告:通常在侧边或角落。 + 3. 底部横幅广告。 + + 请返回关闭按钮的中心坐标。 + 请以纯 JSON 格式输出: + { + "has_ad": true/false, + "ad_type": "center" | "bottom" | "side" | "other", + "close_point": [x, y] // 绝对像素坐标 + } + 如果没有广告,请返回 {"has_ad": false}。 + """ + try: + res_text = await vlm.analyze_image(image_path, prompt) + json_str = vlm.extract_json(res_text) + res = json.loads(json_str) + if res.get("has_ad") and res.get("close_point"): + p = res["close_point"] + return {"x": p[0], "y": p[1], "ad_type": res.get("ad_type")} + return None + except Exception as e: + logger.error(f"广告检测失败: {e}") + return None + + async def analyze_station_list(self, image_path): + """ + 分析场站列表页图片,提取场站位置和基本信息 + """ + prompt = """ + 分析这张充电站列表截图,提取所有充电站卡片信息。 + 输出格式为 JSON 数组,每个对象包含: + - "name": 场站名称 + - "point": 场站卡片的中心点击坐标 [x, y] + - "bbox": 场站卡片的边界框 [x1, y1, x2, y2] + + 注意: + 1. 仅提取明显的场站列表卡片。 + 2. 坐标请以像素为单位。 + """ + + try: + res_text = await self.vlm.analyze_image(image_path, prompt) + # 使用 VLMKit 的提取方法 + json_str = self.vlm.extract_json(res_text) + stations = json.loads(json_str) + + if isinstance(stations, list): + # 调试绘图 + if DRAW_DEBUG_BOXES: + bboxes = [s['bbox'] for s in stations if 'bbox' in s] + points = [s['point'] for s in stations if 'point' in s] + draw_rectangles(image_path, bboxes + points) + + return stations + return [] + except Exception as e: + logger.error(f"VLM 分析列表页失败: {e}") + return [] + + async def analyze_detail_price(self, image_path): + """ + 分析详情页或三级价格页图片,提取分时电价 + """ + prompt = """ + 分析这张充电站价格详情截图,提取完整的分时价格信息。 + 输出格式为 JSON 数组,每个对象包含: + - "time_range": 时间段 (例如 "00:00-08:00") + - "total_price": 总价 (电费+服务费) + - "elec_price": 电费 (如果能看到) + - "service_price": 服务费 (如果能看到) + + 如果没有看到分时段价格,请尝试寻找“价格详情”或“分时电价”按钮的坐标。 + """ + + try: + res_text = await self.vlm.analyze_image(image_path, prompt) + json_str = self.vlm.extract_json(res_text) + return json.loads(json_str) + except Exception as e: + logger.error(f"VLM 分析价格页失败: {e}") + return [] diff --git a/Apps/YiLaiTe/Service.py b/Apps/YiLaiTe/Service.py new file mode 100644 index 0000000..23baddb --- /dev/null +++ b/Apps/YiLaiTe/Service.py @@ -0,0 +1,132 @@ +# coding=utf-8 +import hashlib +import logging +import os +import sys +import uuid +from datetime import datetime + +# Ensure sys path includes root for imports if not already +project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +if project_root not in sys.path: + sys.path.append(project_root) + +from DbKit.Db import Db +from Config.Config import DB_URL +from Model.StationProfile import StationProfile +from Model.StationStatus import StationStatus +from Model.StationPriceSchedule import StationPriceSchedule + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class YiLaiTeService: + def __init__(self): + self.db = Db(db_url=DB_URL) + self.station_profile_model = StationProfile() + self.station_status_model = StationStatus() + self.station_price_schedule_model = StationPriceSchedule() + self.operator = "驿来特" + + def generate_id(self): + return str(uuid.uuid4()) + + def get_hash(self, s: str) -> str: + return hashlib.md5(s.encode('utf-8')).hexdigest() + + def _to_float(self, value) -> float: + """ + 从字符串中提取浮点数,过滤掉非数字字符 + """ + if value is None: + return 0.0 + if isinstance(value, (int, float)): + return float(value) + + import re + try: + # 提取第一个匹配的数字部分(包含小数点) + match = re.search(r"[-+]?\d*\.?\d+", str(value)) + if match: + return float(match.group()) + return 0.0 + except: + return 0.0 + + async def init_db(self): + await self.db.init_db() + + async def close_db(self): + await self.db.close() + + async def log_task_start(self, task_id): + """记录任务开始""" + from datetime import datetime + try: + await self.db.save("t_crawl_task_log", { + "task_id": task_id, + "operator": self.operator, + "start_time": datetime.now(), + "status": "running" + }, "task_id") + except Exception as e: + logger.error(f"记录任务开始日志失败: {e}") + + async def log_task_end(self, task_id, count, status, error_msg=None): + """记录任务结束""" + from datetime import datetime + try: + end_time = datetime.now() + # 假设 start_time 已经在数据库中,我们可以通过 task_id 更新 + # 这里的 duration 计算可以在 SQL 中做,或者先查出来再算 + await self.db.update("t_crawl_task_log", { + "task_id": task_id, + "end_time": end_time, + "station_count": count, + "status": status, + "error_msg": error_msg + }, "task_id") + except Exception as e: + logger.error(f"更新任务结束日志失败: {e}") + + async def process_price_detail_data(self, station_name, hourly_schedule) -> bool: + """ + 直接保存已处理好的小时段价格数据 + """ + if not station_name or not hourly_schedule: + return False + + station_hash = self.get_hash(station_name) + now = datetime.now() + + async with await self.db.get_session() as session: + # 1. 保存 Profile (如果不存在) + profile_id = self.generate_id() + await self.station_profile_model.save( + session=session, + id=profile_id, + station_hash=station_hash, + operator=self.operator, + station_name=station_name, + valid_start_time=now + ) + + # 2. 保存价格 + schedule_id = self.generate_id() + await self.station_price_schedule_model.save( + session=session, + id=schedule_id, + station_hash=station_hash, + schedule_json=hourly_schedule, + valid_start_time=now + ) + await session.commit() + return True + + async def process_station_list_vl(self, image_path, json_metadata, device_info=None, max_count=None) -> list: + """ + 基于 VL 模式处理场站列表 (整页识别) + """ + # 驿来特目前逻辑待实现,先占位 + return [] diff --git a/T3_YiLaiTe.py b/T3_YiLaiTe.py new file mode 100644 index 0000000..35060bd --- /dev/null +++ b/T3_YiLaiTe.py @@ -0,0 +1,71 @@ +# coding=utf-8 +import sys +import os +import asyncio +import logging + +# 将项目根目录添加到 sys.path +project_root = os.path.dirname(os.path.abspath(__file__)) +if project_root not in sys.path: + sys.path.append(project_root) + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("T3_YiLaiTe") + +try: + from Apps.YiLaiTe.Service import YiLaiTeService + from Apps.YiLaiTe import Opener, Crawler, Kit +except KeyboardInterrupt: + logger.info("\n🛑 用户在初始化阶段手动停止了程序。") + sys.exit(0) +except Exception as e: + logger.error(f"❌ 初始化导入失败: {e}") + sys.exit(1) + +async def run_process(): + logger.info("=== 开始全流程任务 (驿来特): 打开小程序 -> 爬取数据 ===") + + # 步骤 0: 初始化基础服务 + logger.info(">>> 步骤 0: 初始化基础服务 (数据库连接)...") + service = YiLaiTeService() + await service.init_db() + + try: + # 启动前清空临时目录 + Kit.clear_temp_dir() + + # 步骤 1: 启动小程序 + logger.info(">>> 步骤 1: 启动 驿来特 小程序...") + success = await Opener.open_mini_program() + if not success: + logger.error("❌ 无法成功打开小程序,任务终止。") + return + + logger.info("✅ 小程序启动成功,等待 5 秒确保界面稳定...") + await asyncio.sleep(5) + + # 步骤 2: 执行爬取任务 + logger.info(">>> 步骤 2: 开始执行场站爬取任务...") + # 调用 Crawler 的 main,并传入 service + await Crawler.main(service=service) + + logger.info("✅ 爬取任务完成!") + + except Exception as e: + logger.error(f"❌ 运行异常: {e}") + finally: + if service: + await service.close_db() + logger.info("=== 全流程任务结束 ===") + +if __name__ == "__main__": + try: + asyncio.run(run_process()) + except KeyboardInterrupt: + logger.info("\n🛑 用户手动停止了程序 (Ctrl+C)。") + except Exception as e: + logger.exception(f"主程序崩溃: {e}") diff --git a/Util/VLMKit.py b/Util/VLMKit.py new file mode 100644 index 0000000..dfaa861 --- /dev/null +++ b/Util/VLMKit.py @@ -0,0 +1,66 @@ +# coding=utf-8 +import os +import base64 +import json +import logging +from openai import AsyncOpenAI +from Config.Config import ALY_LLM_API_KEY, VL_MODEL_NAME + +logger = logging.getLogger(__name__) + +class VLMKit: + def __init__(self, api_key=None, base_url=None, model=None): + self.api_key = api_key or ALY_LLM_API_KEY + self.base_url = base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1" + self.model = model or VL_MODEL_NAME + self.client = AsyncOpenAI(api_key=self.api_key, base_url=self.base_url) + + async def analyze_image(self, image_path, prompt, max_tokens=2000, temperature=0.01): + """ + 分析单张本地图片并返回模型响应文本 + """ + if not os.path.exists(image_path): + raise FileNotFoundError(f"Image not found: {image_path}") + + with open(image_path, "rb") as image_file: + encoded_image = base64.b64encode(image_file.read()).decode("utf-8") + + try: + response = await self.client.chat.completions.create( + model=self.model, + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}, + }, + ], + } + ], + max_tokens=max_tokens, + temperature=temperature + ) + return response.choices[0].message.content + except Exception as e: + logger.error(f"VLMKit analyze_image error: {e}") + raise e + + def extract_json(self, text): + """ + 从文本中提取 JSON 部分 + """ + import re + # 优先匹配 markdown 格式的 json 块 + json_match = re.search(r'```json\s*(.*?)\s*```', text, re.DOTALL) + if json_match: + return json_match.group(1) + + # 其次匹配数组或对象 + json_match = re.search(r'(\[.*\]|\{.*\})', text, re.DOTALL) + if json_match: + return json_match.group(1) + + return text