This commit is contained in:
HuangHai
2026-01-13 20:36:20 +08:00
parent 9c03442035
commit b3a2bd4816
9 changed files with 893 additions and 41 deletions

View File

@@ -232,50 +232,50 @@ async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT):
if before_click_md5 != after_click_md5:
entered_price_page = True
# 抓取三级页面图片
price_screenshots = []
price_screenshots.append(after_click_path)
# 滑动处理
last_price_md5 = after_click_md5
for scroll_idx in range(2):
d.swipe(w // 2, int(h * 0.7), w // 2, int(h * 0.3), duration=0.4)
await asyncio.sleep(1)
scroll_path = take_screenshot(d, f"price_scroll_{scroll_idx}_{station_name}", save_dir=TEMP_IMAGE_DIR)
curr_price_md5 = Kit.get_image_content_md5(
scroll_path,
top_ratio=SAFE_EXCLUDE_RATIO,
price_screenshots = []
price_screenshots.append(after_click_path)
# 滑动处理
last_price_md5 = after_click_md5
for scroll_idx in range(2):
d.swipe(w // 2, int(h * 0.7), w // 2, int(h * 0.3), duration=0.4)
await asyncio.sleep(1)
scroll_path = take_screenshot(d, f"price_scroll_{scroll_idx}_{station_name}", save_dir=TEMP_IMAGE_DIR)
curr_price_md5 = Kit.get_image_content_md5(
scroll_path,
top_ratio=SAFE_EXCLUDE_RATIO,
bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO
)
if curr_price_md5 == last_price_md5: break
price_screenshots.append(scroll_path)
last_price_md5 = curr_price_md5
# 后台处理价格图片
logger.info(f"已启动后台分析价格页: {station_name},共 {len(price_screenshots)} 张图")
task_price = asyncio.create_task(analyze_prices_background(service, station_name, price_screenshots))
background_tasks.append(task_price)
# 立即返回
d.press("back")
await asyncio.sleep(0.5)
# 检查返回后的页面状态
check_back_path = take_screenshot(d, f"check_back_{detail_uuid}", save_dir=TEMP_IMAGE_DIR)
check_list_md5 = Kit.get_image_content_md5(
check_back_path,
top_ratio=SAFE_EXCLUDE_RATIO,
bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO
)
if curr_price_md5 == last_price_md5: break
price_screenshots.append(scroll_path)
last_price_md5 = curr_price_md5
check_detail_md5 = Kit.get_image_content_md5(check_back_path)
if os.path.exists(check_back_path): os.remove(check_back_path)
# 后台处理价格图片
logger.info(f"已启动后台分析价格页: {station_name},共 {len(price_screenshots)} 张图")
task_price = asyncio.create_task(analyze_prices_background(service, station_name, price_screenshots))
background_tasks.append(task_price)
# 立即返回
d.press("back")
await asyncio.sleep(0.5)
# 检查返回后的页面状态
check_back_path = take_screenshot(d, f"check_back_{detail_uuid}", save_dir=TEMP_IMAGE_DIR)
check_list_md5 = Kit.get_image_content_md5(
check_back_path,
top_ratio=SAFE_EXCLUDE_RATIO,
bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO
)
check_detail_md5 = Kit.get_image_content_md5(check_back_path)
if os.path.exists(check_back_path): os.remove(check_back_path)
if check_list_md5 == current_md5:
logger.info("检测到已直接返回列表页,跳过后续返回操作。")
should_back_to_list = False
elif check_detail_md5 == before_click_md5:
logger.info("检测到返回详情页,需执行第二次返回。")
if check_list_md5 == current_md5:
logger.info("检测到已直接返回列表页,跳过后续返回操作。")
should_back_to_list = False
elif check_detail_md5 == before_click_md5:
logger.info("检测到返回详情页,需执行第二次返回。")
except Exception as e:
logger.error(f"处理三级页面详情异常 ({station_name}): {e}")

View File

@@ -0,0 +1,38 @@
# 采集配置
# 滑动距离比例 (0.1 ~ 0.9),数值越大滑动幅度越大
SCROLL_DISTANCE_RATIO = 0.5
# 最大采集场站数量,达到此次数后停止采集
MAX_STATIONS_COUNT = 100
# 场站去重过期时间(秒),在此时间内重复出现的场站不会再次点击进入详情页
REDIS_STATION_EXPIRE = 120
# 数据库数据保留时长超过此时长的历史数据is_current=0将被删除
DATA_RETENTION_DAYS = 365
# 测试配置
# [Testing] 是否在启动时清除 Redis 中的已爬取记录 (用于快速重复测试)
TEST_CLEAR_REDIS = False
# 调试绘图配置
# [Debug] 是否在图片上绘制识别出的绿色矩形框 (计算机图形学可视化)
DRAW_DEBUG_BOXES = True
DEBUG_BOX_COLOR = (0, 255, 0) # BGR 格式的绿色
DEBUG_BOX_THICKNESS = 3 # 线条粗细
# 等待时间配置 (秒)
# 点击进入详情页后等待加载的时间
WAIT_DETAIL_PAGE_LOAD = 2.0
# 从详情页返回列表页后等待页面刷新的时间
WAIT_BACK_TO_LIST = 0.8
# 执行滑动操作后等待页面内容加载和稳定的时间
WAIT_AFTER_SCROLL = 2.0
# 坐标计算与安全防护
# 屏幕顶部安全排除比例 (0.0~1.0),此比例区域内不进行点击(避开状态栏、顶部菜单、横幅广告等)
SAFE_EXCLUDE_RATIO = 0.15
# 屏幕底部安全排除比例 (0.0~1.0),此比例区域内不进行点击(避开底部导航栏、功能按钮等)
BOTTOM_SAFE_EXCLUDE_RATIO = 0.10
# 默认回退屏幕宽度,当无法自动获取设备信息时使用
FALLBACK_WIDTH = 1080
# 默认回退屏幕高度,当无法自动获取设备信息时使用
FALLBACK_HEIGHT = 2400

190
Apps/YiLaiTe/Crawler.py Normal file
View File

@@ -0,0 +1,190 @@
import asyncio
import logging
import uuid
import os
import sys
import json
import time
from datetime import datetime
from Apps.YiLaiTe.Kit import take_screenshot, clean_station_name, get_image_content_md5
from Apps.YiLaiTe.ReadImageKit import ReadImageKit
from Apps.YiLaiTe.Service import YiLaiTeService
from Apps.YiLaiTe.Config.Setting import (
SCROLL_DISTANCE_RATIO, WAIT_AFTER_SCROLL, MAX_STATIONS_COUNT,
WAIT_DETAIL_PAGE_LOAD, WAIT_BACK_TO_LIST, TEST_CLEAR_REDIS,
SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO
)
from Util.RedisKit import RedisKit
from Core.BaseCrawler import BaseCrawler
import uiautomator2 as u2
# 项目根目录处理
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("YiLaiTeCrawler")
class YiLaiTeCrawler(BaseCrawler):
def __init__(self, service=None):
super().__init__(service or YiLaiTeService())
self.read_image_kit = ReadImageKit()
self.redis_kit = RedisKit()
async def start(self):
"""
启动入口
"""
await main(self.service)
async def clean_redis_data(self):
"""
清除测试用的 Redis 记录
"""
if TEST_CLEAR_REDIS:
logger.info("清理 Redis 中的场站处理记录...")
pattern = "crawled:ylt:*"
keys = await self.redis_kit.client.keys(pattern)
if keys:
await self.redis_kit.client.delete(*keys)
async def crawl_list_logic(self, d):
processed_count = 0
no_new_data_count = 0
last_md5 = None
background_tasks = []
while processed_count < MAX_STATIONS_COUNT:
# 1. 截图并分析
screenshot_path = take_screenshot(d, f"list_{int(time.time())}.jpg")
# 检测是否滚动到底部
curr_md5 = get_image_content_md5(screenshot_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
if last_md5 == curr_md5:
logger.info("内容无变化,判定已到底部")
if os.path.exists(screenshot_path): os.remove(screenshot_path)
break
last_md5 = curr_md5
stations = await self.read_image_kit.analyze_station_list(screenshot_path)
if not stations:
no_new_data_count += 1
if no_new_data_count >= 5:
logger.info("连续 5 页无新数据,停止")
break
# 滑动
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
await asyncio.sleep(WAIT_AFTER_SCROLL)
continue
no_new_data_count = 0
new_stations_in_page = 0
for station in stations:
if processed_count >= MAX_STATIONS_COUNT:
break
name = station.get('name')
point = station.get('point')
if not name or not point:
continue
# 去重
redis_key = f"crawled:ylt:{clean_station_name(name)}"
if await self.redis_kit.get_data(redis_key):
continue
# 点击进入
logger.info(f">>> 发现新场站: {name} 坐标: {point}")
d.click(point[0], point[1])
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
# 分析详情页 (采用异步后台模式)
detail_shot = take_screenshot(d, f"detail_{clean_station_name(name)}_{int(time.time())}.jpg")
# 启动后台任务处理详情页
task = asyncio.create_task(self.analyze_detail_background(name, detail_shot))
background_tasks.append(task)
processed_count += 1
new_stations_in_page += 1
await self.redis_kit.set_data(redis_key, "1", ex=86400*7)
# 返回
d.press("back")
await asyncio.sleep(WAIT_BACK_TO_LIST)
if new_stations_in_page == 0 and stations:
no_new_data_count += 1
if no_new_data_count >= 5:
logger.info("连续 5 页均无新场站,停止")
break
else:
no_new_data_count = 0
# 滑动翻页
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
await asyncio.sleep(WAIT_AFTER_SCROLL)
# 等待后台任务完成
if background_tasks:
logger.info(f"等待 {len(background_tasks)} 个后台分析任务完成...")
await asyncio.gather(*background_tasks, return_exceptions=True)
return processed_count
async def analyze_detail_background(self, station_name, image_path):
"""
后台异步分析详情页
"""
try:
logger.info(f"开始后台分析场站: {station_name}")
prices = await self.read_image_kit.analyze_detail_price(image_path)
if prices:
await self.service.process_price_detail_data(station_name, prices)
logger.info(f"场站 {station_name} 价格分析完成并入库")
else:
logger.warning(f"场站 {station_name} 未识别到价格信息")
except Exception as e:
logger.error(f"后台分析 {station_name} 失败: {e}")
finally:
if os.path.exists(image_path):
os.remove(image_path)
async def main(service=None):
if service is None:
service = YiLaiTeService()
await service.init_db()
crawler = YiLaiTeCrawler(service)
d = u2.connect()
# 清理 Redis
await crawler.clean_redis_data()
# 记录任务日志
task_id = str(uuid.uuid4())
await service.log_task_start(task_id)
total_count = 0
status = "success"
error_msg = None
try:
total_count = await crawler.crawl_list_logic(d)
except Exception as e:
logger.error(f"主流程执行失败: {e}")
status = "failed"
error_msg = str(e)
finally:
await service.log_task_end(task_id, total_count, status, error_msg)
# 如果是内部初始化的 service则关闭
if service and not isinstance(service, YiLaiTeService):
await service.close_db()
async def get_image_md5_async(path):
return get_image_md5(path)
if __name__ == "__main__":
asyncio.run(main())

156
Apps/YiLaiTe/Kit.py Normal file
View File

@@ -0,0 +1,156 @@
# coding=utf-8
import os
import time
import logging
logger = logging.getLogger(__name__)
def clean_station_name(name):
"""
清理场站名称,去除特殊字符和距离信息
"""
if not name:
return ""
# 驿来特可能特有的清理逻辑
import re
# 移除常见的括号备注
name = re.sub(r'\(.*?\)', '', name)
name = re.sub(r'.*?', '', name)
return name.strip()
def take_screenshot(d, filename, path=None):
"""
获取屏幕截图并保存
"""
if not path:
from Config.Config import TEMP_IMAGE_DIR
path = TEMP_IMAGE_DIR
if not os.path.exists(path):
os.makedirs(path)
full_path = os.path.join(path, filename)
d.screenshot(full_path)
return full_path
import hashlib
import numpy as np
import cv2
def get_file_md5(file_path):
"""计算文件的 MD5 值"""
if not os.path.exists(file_path):
return None
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def get_image_content_md5(file_path, top_ratio=0.1, bottom_ratio=0.1):
"""
计算图片核心内容的 MD5 值(排除状态栏和导航栏)
"""
img = read_image(file_path)
if img is None:
return None
h, w = img.shape[:2]
top = int(h * top_ratio)
bottom = int(h * (1 - bottom_ratio))
# 裁剪中间部分
content = img[top:bottom, :]
# 将图片数据转换为字节流计算 MD5
success, encoded_img = cv2.imencode(".jpg", content)
if success:
return hashlib.md5(encoded_img.tobytes()).hexdigest()
return hashlib.md5(content.tobytes()).hexdigest()
def read_image(path):
"""读取图片,支持中文路径"""
if not path or not os.path.exists(path):
return None
try:
data = np.fromfile(path, dtype=np.uint8)
if data.size == 0:
return None
img = cv2.imdecode(data, -1)
return img
except Exception as e:
logger.error(f"Error reading image {path}: {e}")
return None
def save_image(path, img):
"""保存图片,支持中文路径"""
try:
ext = os.path.splitext(path)[1]
if not ext:
ext = ".jpg"
cv2.imencode(ext, img)[1].tofile(path)
return True
except Exception as e:
logger.error(f"Error saving image {path}: {e}")
return False
def clear_temp_dir(save_dir=None):
"""清空临时目录中的所有文件"""
if save_dir is None:
from Config.Config import TEMP_IMAGE_DIR
save_dir = TEMP_IMAGE_DIR
if not os.path.exists(save_dir):
return
for f in os.listdir(save_dir):
file_path = os.path.join(save_dir, f)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
logger.error(f"Error deleting file {file_path}: {e}")
def get_image_md5(image_path):
"""
计算图片的 MD5 值
"""
import hashlib
if not os.path.exists(image_path):
return ""
with open(image_path, 'rb') as f:
data = f.read()
return hashlib.md5(data).hexdigest()
def draw_rectangles(image_path, points, output_path=None):
"""
使用 OpenCV 在图片上绘制矩形框 (计算机图形学绘制)
:param image_path: 原始图片路径
:param points: 坐标点列表,格式如 [[x1, y1, x2, y2], ...] 或 [(x, y), ...]
:param output_path: 输出图片路径,如果不指定则覆盖原图
"""
try:
import cv2
import numpy as np
from Config.Setting import DRAW_DEBUG_BOXES, DEBUG_BOX_COLOR, DEBUG_BOX_THICKNESS
if not DRAW_DEBUG_BOXES:
return image_path
img = cv2.imread(image_path)
if img is None:
return image_path
for p in points:
if len(p) == 4: # 矩形 [x1, y1, x2, y2]
cv2.rectangle(img, (int(p[0]), int(p[1])), (int(p[2]), int(p[3])), DEBUG_BOX_COLOR, DEBUG_BOX_THICKNESS)
elif len(p) == 2: # 点 (x, y),画一个小圆圈或正方形表示点击位
center = (int(p[0]), int(p[1]))
cv2.circle(img, center, 10, DEBUG_BOX_COLOR, -1)
save_path = output_path if output_path else image_path
cv2.imwrite(save_path, img)
return save_path
except Exception as e:
logger.error(f"绘制矩形框失败: {e}")
return image_path

85
Apps/YiLaiTe/Opener.py Normal file
View File

@@ -0,0 +1,85 @@
# coding=utf-8
import asyncio
import logging
import os
import time
import uiautomator2 as u2
import uuid
from Apps.YiLaiTe.Kit import take_screenshot
from Apps.YiLaiTe.ReadImageKit import ReadImageKit
from Config.Config import TEMP_IMAGE_DIR
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("OpenYiLaiTe")
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
async def check_and_close_ad(d):
"""
检测并关闭广告弹窗
"""
max_loops = 2
w, h = d.window_size()
device_info = {'displayWidth': w, 'displayHeight': h}
for loop_idx in range(max_loops):
logger.info(f"开始第 {loop_idx + 1} 轮广告检测...")
image_uuid = f"ylt_ad_check_{int(time.time())}"
screenshot_path = take_screenshot(d, image_uuid, save_dir=TEMP_IMAGE_DIR)
ad_result = await ReadImageKit.detect_ad_popup(screenshot_path, device_info=device_info)
if ad_result:
x, y = ad_result['x'], ad_result['y']
ad_type = ad_result.get("ad_type", "unknown")
logger.info(f"检测到广告 [{ad_type}],坐标: ({x}, {y}),执行关闭操作...")
d.click(x, y)
await asyncio.sleep(2.0)
if os.path.exists(screenshot_path): os.remove(screenshot_path)
else:
logger.info("未检测到广告弹窗。")
if os.path.exists(screenshot_path): os.remove(screenshot_path)
break
return True
async def open_mini_program():
"""
进入微信小程序: 驿来特
"""
d = u2.connect()
logger.info("执行进入小程序: 驿来特")
# 1. 启动微信
logger.info("启动微信...")
d.app_start("com.tencent.mm", stop=True)
await asyncio.sleep(5)
# 2. 点击搜索按钮 (坐标适配大部分微信版本)
logger.info("点击搜索按钮...")
w, h = d.window_size()
d.click(int(w * 0.84), int(h * 0.08))
await asyncio.sleep(2)
# 3. 输入搜索内容
logger.info("输入搜索内容: 驿来特")
d.send_keys("驿来特")
await asyncio.sleep(3)
# 4. 点击搜索结果中的第一个小程序
logger.info("点击搜索结果中的小程序...")
d.click(int(w * 0.5), int(h * 0.18))
logger.info("等待小程序加载...")
await asyncio.sleep(10)
# 5. 广告检测
await check_and_close_ad(d)
return True
if __name__ == "__main__":
asyncio.run(open_mini_program())

View File

@@ -0,0 +1,114 @@
# coding=utf-8
import logging
import os
import sys
# Ensure sys path includes root for imports if not already
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from Util.VLMKit import VLMKit
import json
import re
from Apps.YiLaiTe.Kit import draw_rectangles
from Apps.YiLaiTe.Config.Setting import DRAW_DEBUG_BOXES
logger = logging.getLogger(__name__)
class ReadImageKit:
def __init__(self):
self.vlm = VLMKit()
@classmethod
async def detect_ad_popup(cls, image_path, device_info=None):
"""
检测图片中是否存在广告弹窗,并返回关闭按钮坐标
"""
vlm = VLMKit()
prompt = """
请仔细检查这张图片中是否存在**弹窗广告**或**悬浮广告**。
广告可能有以下几种形式:
1. 屏幕中央的大型弹窗广告:通常遮挡了页面内容,内容多为优惠券、活动推广等。
2. 悬浮广告:通常在侧边或角落。
3. 底部横幅广告。
请返回关闭按钮的中心坐标。
请以纯 JSON 格式输出:
{
"has_ad": true/false,
"ad_type": "center" | "bottom" | "side" | "other",
"close_point": [x, y] // 绝对像素坐标
}
如果没有广告,请返回 {"has_ad": false}。
"""
try:
res_text = await vlm.analyze_image(image_path, prompt)
json_str = vlm.extract_json(res_text)
res = json.loads(json_str)
if res.get("has_ad") and res.get("close_point"):
p = res["close_point"]
return {"x": p[0], "y": p[1], "ad_type": res.get("ad_type")}
return None
except Exception as e:
logger.error(f"广告检测失败: {e}")
return None
async def analyze_station_list(self, image_path):
"""
分析场站列表页图片,提取场站位置和基本信息
"""
prompt = """
分析这张充电站列表截图,提取所有充电站卡片信息。
输出格式为 JSON 数组,每个对象包含:
- "name": 场站名称
- "point": 场站卡片的中心点击坐标 [x, y]
- "bbox": 场站卡片的边界框 [x1, y1, x2, y2]
注意:
1. 仅提取明显的场站列表卡片。
2. 坐标请以像素为单位。
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)
# 使用 VLMKit 的提取方法
json_str = self.vlm.extract_json(res_text)
stations = json.loads(json_str)
if isinstance(stations, list):
# 调试绘图
if DRAW_DEBUG_BOXES:
bboxes = [s['bbox'] for s in stations if 'bbox' in s]
points = [s['point'] for s in stations if 'point' in s]
draw_rectangles(image_path, bboxes + points)
return stations
return []
except Exception as e:
logger.error(f"VLM 分析列表页失败: {e}")
return []
async def analyze_detail_price(self, image_path):
"""
分析详情页或三级价格页图片,提取分时电价
"""
prompt = """
分析这张充电站价格详情截图,提取完整的分时价格信息。
输出格式为 JSON 数组,每个对象包含:
- "time_range": 时间段 (例如 "00:00-08:00")
- "total_price": 总价 (电费+服务费)
- "elec_price": 电费 (如果能看到)
- "service_price": 服务费 (如果能看到)
如果没有看到分时段价格,请尝试寻找“价格详情”或“分时电价”按钮的坐标。
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)
json_str = self.vlm.extract_json(res_text)
return json.loads(json_str)
except Exception as e:
logger.error(f"VLM 分析价格页失败: {e}")
return []

132
Apps/YiLaiTe/Service.py Normal file
View File

@@ -0,0 +1,132 @@
# coding=utf-8
import hashlib
import logging
import os
import sys
import uuid
from datetime import datetime
# Ensure sys path includes root for imports if not already
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from DbKit.Db import Db
from Config.Config import DB_URL
from Model.StationProfile import StationProfile
from Model.StationStatus import StationStatus
from Model.StationPriceSchedule import StationPriceSchedule
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class YiLaiTeService:
def __init__(self):
self.db = Db(db_url=DB_URL)
self.station_profile_model = StationProfile()
self.station_status_model = StationStatus()
self.station_price_schedule_model = StationPriceSchedule()
self.operator = "驿来特"
def generate_id(self):
return str(uuid.uuid4())
def get_hash(self, s: str) -> str:
return hashlib.md5(s.encode('utf-8')).hexdigest()
def _to_float(self, value) -> float:
"""
从字符串中提取浮点数,过滤掉非数字字符
"""
if value is None:
return 0.0
if isinstance(value, (int, float)):
return float(value)
import re
try:
# 提取第一个匹配的数字部分(包含小数点)
match = re.search(r"[-+]?\d*\.?\d+", str(value))
if match:
return float(match.group())
return 0.0
except:
return 0.0
async def init_db(self):
await self.db.init_db()
async def close_db(self):
await self.db.close()
async def log_task_start(self, task_id):
"""记录任务开始"""
from datetime import datetime
try:
await self.db.save("t_crawl_task_log", {
"task_id": task_id,
"operator": self.operator,
"start_time": datetime.now(),
"status": "running"
}, "task_id")
except Exception as e:
logger.error(f"记录任务开始日志失败: {e}")
async def log_task_end(self, task_id, count, status, error_msg=None):
"""记录任务结束"""
from datetime import datetime
try:
end_time = datetime.now()
# 假设 start_time 已经在数据库中,我们可以通过 task_id 更新
# 这里的 duration 计算可以在 SQL 中做,或者先查出来再算
await self.db.update("t_crawl_task_log", {
"task_id": task_id,
"end_time": end_time,
"station_count": count,
"status": status,
"error_msg": error_msg
}, "task_id")
except Exception as e:
logger.error(f"更新任务结束日志失败: {e}")
async def process_price_detail_data(self, station_name, hourly_schedule) -> bool:
"""
直接保存已处理好的小时段价格数据
"""
if not station_name or not hourly_schedule:
return False
station_hash = self.get_hash(station_name)
now = datetime.now()
async with await self.db.get_session() as session:
# 1. 保存 Profile (如果不存在)
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=station_name,
valid_start_time=now
)
# 2. 保存价格
schedule_id = self.generate_id()
await self.station_price_schedule_model.save(
session=session,
id=schedule_id,
station_hash=station_hash,
schedule_json=hourly_schedule,
valid_start_time=now
)
await session.commit()
return True
async def process_station_list_vl(self, image_path, json_metadata, device_info=None, max_count=None) -> list:
"""
基于 VL 模式处理场站列表 (整页识别)
"""
# 驿来特目前逻辑待实现,先占位
return []

71
T3_YiLaiTe.py Normal file
View File

@@ -0,0 +1,71 @@
# coding=utf-8
import sys
import os
import asyncio
import logging
# 将项目根目录添加到 sys.path
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.append(project_root)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("T3_YiLaiTe")
try:
from Apps.YiLaiTe.Service import YiLaiTeService
from Apps.YiLaiTe import Opener, Crawler, Kit
except KeyboardInterrupt:
logger.info("\n🛑 用户在初始化阶段手动停止了程序。")
sys.exit(0)
except Exception as e:
logger.error(f"❌ 初始化导入失败: {e}")
sys.exit(1)
async def run_process():
logger.info("=== 开始全流程任务 (驿来特): 打开小程序 -> 爬取数据 ===")
# 步骤 0: 初始化基础服务
logger.info(">>> 步骤 0: 初始化基础服务 (数据库连接)...")
service = YiLaiTeService()
await service.init_db()
try:
# 启动前清空临时目录
Kit.clear_temp_dir()
# 步骤 1: 启动小程序
logger.info(">>> 步骤 1: 启动 驿来特 小程序...")
success = await Opener.open_mini_program()
if not success:
logger.error("❌ 无法成功打开小程序,任务终止。")
return
logger.info("✅ 小程序启动成功,等待 5 秒确保界面稳定...")
await asyncio.sleep(5)
# 步骤 2: 执行爬取任务
logger.info(">>> 步骤 2: 开始执行场站爬取任务...")
# 调用 Crawler 的 main并传入 service
await Crawler.main(service=service)
logger.info("✅ 爬取任务完成!")
except Exception as e:
logger.error(f"❌ 运行异常: {e}")
finally:
if service:
await service.close_db()
logger.info("=== 全流程任务结束 ===")
if __name__ == "__main__":
try:
asyncio.run(run_process())
except KeyboardInterrupt:
logger.info("\n🛑 用户手动停止了程序 (Ctrl+C)。")
except Exception as e:
logger.exception(f"主程序崩溃: {e}")

66
Util/VLMKit.py Normal file
View File

@@ -0,0 +1,66 @@
# coding=utf-8
import os
import base64
import json
import logging
from openai import AsyncOpenAI
from Config.Config import ALY_LLM_API_KEY, VL_MODEL_NAME
logger = logging.getLogger(__name__)
class VLMKit:
def __init__(self, api_key=None, base_url=None, model=None):
self.api_key = api_key or ALY_LLM_API_KEY
self.base_url = base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
self.model = model or VL_MODEL_NAME
self.client = AsyncOpenAI(api_key=self.api_key, base_url=self.base_url)
async def analyze_image(self, image_path, prompt, max_tokens=2000, temperature=0.01):
"""
分析单张本地图片并返回模型响应文本
"""
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image not found: {image_path}")
with open(image_path, "rb") as image_file:
encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"},
},
],
}
],
max_tokens=max_tokens,
temperature=temperature
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"VLMKit analyze_image error: {e}")
raise e
def extract_json(self, text):
"""
从文本中提取 JSON 部分
"""
import re
# 优先匹配 markdown 格式的 json 块
json_match = re.search(r'```json\s*(.*?)\s*```', text, re.DOTALL)
if json_match:
return json_match.group(1)
# 其次匹配数组或对象
json_match = re.search(r'(\[.*\]|\{.*\})', text, re.DOTALL)
if json_match:
return json_match.group(1)
return text