This commit is contained in:
HuangHai
2026-01-14 07:19:39 +08:00
parent a3347b9340
commit 4cc6bc06d1
15 changed files with 689 additions and 0 deletions

View File

@@ -0,0 +1,19 @@
# coding=utf-8
# 采集配置
SCROLL_DISTANCE_RATIO = 0.5
MAX_STATIONS_COUNT = 100
# 调试绘图配置
DRAW_DEBUG_BOXES = True
DEBUG_BOX_COLOR = (0, 255, 0)
DEBUG_BOX_THICKNESS = 3
# 等待时间配置 (秒)
WAIT_DETAIL_PAGE_LOAD = 2.0
WAIT_BACK_TO_LIST = 1.0
WAIT_AFTER_SCROLL = 2.5
# 坐标计算与安全防护
SAFE_EXCLUDE_RATIO = 0.15
BOTTOM_SAFE_EXCLUDE_RATIO = 0.10

View File

@@ -0,0 +1 @@
# coding=utf-8

127
Apps/TelaiDian/Crawler.py Normal file
View File

@@ -0,0 +1,127 @@
# coding=utf-8
import asyncio
import logging
import os
import sys
import time
from Apps.TelaiDian.Kit import take_screenshot, get_image_content_md5, clean_station_name
from Apps.TelaiDian.ReadImageKit import ReadImageKit
from Apps.TelaiDian.Service import TelaiDianService
from Apps.TelaiDian.Config.Setting import (
SCROLL_DISTANCE_RATIO, WAIT_AFTER_SCROLL, MAX_STATIONS_COUNT,
SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO, WAIT_DETAIL_PAGE_LOAD,
WAIT_BACK_TO_LIST
)
from Core.BaseCrawler import BaseCrawler
import uiautomator2 as u2
# 项目根目录处理
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("TelaiDianCrawler")
class TelaiDianCrawler(BaseCrawler):
def __init__(self, service=None):
super().__init__(service or TelaiDianService())
self.read_image_kit = ReadImageKit()
async def start(self):
"""
实现 BaseCrawler 的启动入口
"""
success = await self.open_app()
if not success:
logger.error("打开特来电小程序失败。")
return
d = u2.connect()
await self.crawl_list_logic(d)
async def open_app(self):
"""
打开特来电小程序
"""
from Apps.TelaiDian import Opener
return await Opener.open_mini_program()
async def crawl_list_logic(self, d):
processed_count = 0
last_md5 = None
while processed_count < MAX_STATIONS_COUNT:
# 1. 截图并分析
screenshot_path = take_screenshot(d, f"tld_list_{int(time.time())}.jpg")
# 检测是否滚动到底部
curr_md5 = get_image_content_md5(screenshot_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
if last_md5 == curr_md5:
logger.info("内容无变化,判定已到底部")
if os.path.exists(screenshot_path): os.remove(screenshot_path)
break
last_md5 = curr_md5
stations = await self.read_image_kit.analyze_station_list(screenshot_path)
if not stations:
logger.info("本页未检测到场站,尝试滑动...")
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
await asyncio.sleep(WAIT_AFTER_SCROLL)
continue
for station in stations:
if processed_count >= MAX_STATIONS_COUNT:
break
name = station.get("name")
point = station.get("point")
if not name or not point:
continue
logger.info(f"处理场站: {name}")
# 点击进入详情
d.click(point[0], point[1])
await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD)
# 爬取详情
await self.crawl_detail_logic(d, station)
# 返回列表
d.press("back")
await asyncio.sleep(WAIT_BACK_TO_LIST)
processed_count += 1
# 滑动到下一页
d.swipe_ext("up", scale=SCROLL_DISTANCE_RATIO)
await asyncio.sleep(WAIT_AFTER_SCROLL)
if os.path.exists(screenshot_path): os.remove(screenshot_path)
async def crawl_detail_logic(self, d, station_info):
"""
在详情页提取价格和状态信息
"""
screenshot_path = take_screenshot(d, f"tld_detail_{int(time.time())}.jpg")
# 1. 提取价格
prices = await self.read_image_kit.analyze_detail_price(screenshot_path)
# 2. 保存数据
if prices:
station_name = clean_station_name(station_info.get("name"))
address = station_info.get("address")
logger.info(f"场站 {station_name} 提取到 {len(prices)} 条价格信息,准备保存...")
await self.service.save_station_data(station_name, address, prices)
if os.path.exists(screenshot_path): os.remove(screenshot_path)
async def main(service=None):
crawler = TelaiDianCrawler(service=service)
await crawler.start()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1 @@
供应商名称:特来电

Binary file not shown.

After

Width:  |  Height:  |  Size: 132 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 144 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 103 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 KiB

194
Apps/TelaiDian/Kit.py Normal file
View File

@@ -0,0 +1,194 @@
# coding=utf-8
import os
import logging
import hashlib
import numpy as np
import cv2
logger = logging.getLogger(__name__)
def clean_station_name(name):
"""
清理场站名称,去除特殊字符和距离信息
"""
if not name:
return ""
import re
# 移除常见的括号备注
name = re.sub(r'\(.*?\)', '', name)
name = re.sub(r'.*?', '', name)
return name.strip()
def take_screenshot(d, filename, save_dir=None):
"""
获取屏幕截图并保存
"""
if not save_dir:
from Config.Config import TEMP_IMAGE_DIR
save_dir = TEMP_IMAGE_DIR
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 确保文件名有后缀
if not filename.endswith(".jpg") and not filename.endswith(".png"):
filename = f"{filename}.jpg"
full_path = os.path.join(save_dir, filename)
d.screenshot(full_path)
return full_path
def get_file_md5(file_path):
"""计算文件的 MD5 值"""
if not os.path.exists(file_path):
return None
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def get_image_content_md5(file_path, top_ratio=0.1, bottom_ratio=0.1):
"""
计算图片核心内容的 MD5 值(排除状态栏和导航栏)
"""
img = read_image(file_path)
if img is None:
return None
h, w = img.shape[:2]
top = int(h * top_ratio)
bottom = int(h * (1 - bottom_ratio))
# 裁剪中间部分
content = img[top:bottom, :]
# 直接使用原始像素数组计算 MD5避免压缩损失
return hashlib.md5(content.tobytes()).hexdigest()
def read_image(path):
"""读取图片,支持中文路径"""
if not path or not os.path.exists(path):
return None
try:
data = np.fromfile(path, dtype=np.uint8)
if data.size == 0:
return None
img = cv2.imdecode(data, -1)
return img
except Exception as e:
logger.error(f"Error reading image {path}: {e}")
return None
def save_image(path, img):
"""保存图片,支持中文路径"""
try:
ext = os.path.splitext(path)[1]
if not ext:
ext = ".jpg"
cv2.imencode(ext, img)[1].tofile(path)
return True
except Exception as e:
logger.error(f"Error saving image {path}: {e}")
return False
def detect_cards_cv(image_path, top_ratio=0.15, bottom_ratio=0.1):
"""
使用计算机图形学 (OpenCV) 检测列表中的场站卡片
"""
img = read_image(image_path)
if img is None:
return []
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blurred, 10, 50)
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cards = []
min_card_width = int(w * 0.8)
min_card_height = 150
top_limit = int(h * top_ratio)
bottom_limit = int(h * (1 - bottom_ratio))
valid_contours = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
if (cw > min_card_width and
ch > min_card_height and
y > top_limit and
y + ch < bottom_limit):
center_y = y + ch // 2
is_duplicate = False
for vx, vy, vcw, vch in valid_contours:
v_center_y = vy + vch // 2
if abs(center_y - v_center_y) < 50:
is_duplicate = True
break
if not is_duplicate:
valid_contours.append((x, y, cw, ch))
padding = 5
cards.append([
max(0, x + padding),
max(0, y + padding),
max(0, x + cw - padding),
max(0, y + ch - padding)
])
cards.sort(key=lambda c: c[1])
return cards
def draw_rectangles(image_path, points, output_path=None):
"""
使用 OpenCV 在图片上绘制矩形框
"""
try:
from Apps.TelaiDian.Config.Setting import DRAW_DEBUG_BOXES
# 这里暂时写死颜色和粗细,或者从 Setting 读取
DEBUG_BOX_COLOR = (0, 255, 0)
DEBUG_BOX_THICKNESS = 3
# if not DRAW_DEBUG_BOXES: # 暂时强制开启以方便调试
# return image_path
img = read_image(image_path)
if img is None:
return image_path
for p in points:
if len(p) == 4:
cv2.rectangle(img, (int(p[0]), int(p[1])), (int(p[2]), int(p[3])), DEBUG_BOX_COLOR, DEBUG_BOX_THICKNESS)
elif len(p) == 2:
center = (int(p[0]), int(p[1]))
cv2.circle(img, center, 10, DEBUG_BOX_COLOR, -1)
save_path = output_path if output_path else image_path
save_image(save_path, img)
return save_path
except Exception as e:
logger.error(f"绘制矩形框失败: {e}")
return image_path
def clear_temp_dir(save_dir=None):
"""清空临时目录中的所有文件"""
if save_dir is None:
from Config.Config import TEMP_IMAGE_DIR
save_dir = TEMP_IMAGE_DIR
if not os.path.exists(save_dir):
return
for f in os.listdir(save_dir):
file_path = os.path.join(save_dir, f)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
logger.error(f"Error deleting file {file_path}: {e}")

50
Apps/TelaiDian/Opener.py Normal file
View File

@@ -0,0 +1,50 @@
# coding=utf-8
import asyncio
import logging
import os
import time
import uiautomator2 as u2
from Apps.TelaiDian.Kit import take_screenshot
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("OpenTelaiDian")
async def open_mini_program():
"""
进入微信小程序: 特来电
"""
d = u2.connect()
logger.info("执行进入小程序: 特来电")
# 1. 启动微信
logger.info("启动微信...")
d.app_start("com.tencent.mm", stop=True)
await asyncio.sleep(5)
# 2. 点击搜索按钮 (坐标适配大部分微信版本)
logger.info("点击搜索按钮...")
w, h = d.window_size()
d.click(int(w * 0.84), int(h * 0.08))
await asyncio.sleep(2)
# 3. 输入搜索内容
logger.info("输入搜索内容: 特来电")
d.send_keys("特来电")
await asyncio.sleep(3)
# 4. 点击搜索结果中的第一个小程序
logger.info("点击搜索结果中的小程序...")
# 这里的坐标可能需要根据实际情况调整,先参考 YeLiTe 的实现
d.click(int(w * 0.5), int(h * 0.18))
logger.info("等待小程序加载...")
await asyncio.sleep(10)
logger.info("已到达小程序页面,停止执行。")
return True
if __name__ == "__main__":
asyncio.run(open_mini_program())

View File

@@ -0,0 +1,131 @@
# coding=utf-8
import logging
import os
import sys
import json
# Ensure sys path includes root for imports if not already
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from Util.VLMKit import VLMKit
from Apps.TelaiDian.Kit import draw_rectangles, detect_cards_cv
from Apps.TelaiDian.Config.Setting import SAFE_EXCLUDE_RATIO, BOTTOM_SAFE_EXCLUDE_RATIO
logger = logging.getLogger(__name__)
class ReadImageKit:
def __init__(self):
self.vlm = VLMKit()
async def analyze_detail_price(self, image_path):
"""
分析详情页截图,提取电价信息
"""
prompt = """
分析这张充电站详情页截图,提取**电价时段表**。
请仔细寻找包含“时段”、“电价”、“服务费”或“总价”的表格或列表。
请提取每个时段的:
1. 开始时间 (HH:MM)
2. 结束时间 (HH:MM)
3. 总电价 (元/度,包含电费和服务费)
输出格式为 JSON 数组:
[
{
"start": "00:00",
"end": "08:00",
"price": 1.23
},
...
]
如果无法识别任何价格信息,请返回空数组 []。
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)
logger.info(f"VLM Price Analysis Result for {os.path.basename(image_path)}: {res_text[:200]}...")
json_str = self.vlm.extract_json(res_text)
prices = json.loads(json_str)
normalized_prices = []
if isinstance(prices, list):
for p in prices:
new_p = p.copy()
if 'time_range' in p and ('start' not in p or 'end' not in p):
tr = p['time_range'].replace('~', '-').replace(' ', '')
parts = tr.split('-')
if len(parts) >= 2:
new_p['start'] = parts[0]
new_p['end'] = parts[1]
if 'price' not in p:
if 'total_price' in p:
new_p['price'] = p['total_price']
elif 'elec_price' in p and 'service_price' in p:
try:
new_p['price'] = float(p['elec_price']) + float(p['service_price'])
except:
pass
normalized_prices.append(new_p)
return normalized_prices
return []
except Exception as e:
logger.error(f"分析电价详情失败: {e}")
return []
async def analyze_station_list(self, image_path):
"""
分析场站列表页图片,提取场站位置和基本信息
"""
cv_bboxes = detect_cards_cv(image_path, top_ratio=SAFE_EXCLUDE_RATIO, bottom_ratio=BOTTOM_SAFE_EXCLUDE_RATIO)
if cv_bboxes:
draw_rectangles(image_path, cv_bboxes)
prompt = f"""
图片中已经用绿色矩形框标记了 {len(cv_bboxes)} 个可能的充电站卡片。
请按从上到下的顺序,识别每个绿色框内的场站信息。
输出格式为 JSON 数组,长度必须为 {len(cv_bboxes)}
每个对象包含:
- "name": 场站名称
- "address": 场站地址
- "is_valid": true/false (是否为真实的场站卡片)
"""
else:
prompt = """
分析这张充电站列表截图,提取所有充电站卡片信息。
忽略顶部的筛选栏,仅提取下方重复出现的场站卡片。
输出格式为 JSON 数组,每个对象包含:
- "name": 场站名称
- "address": 场站地址
- "point": 场站卡片的中心点击坐标 [x, y]
- "bbox": 场站卡片的边界框 [x1, y1, x2, y2]
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)
json_str = self.vlm.extract_json(res_text)
vlm_results = json.loads(json_str)
final_stations = []
if cv_bboxes and isinstance(vlm_results, list):
for i, res in enumerate(vlm_results):
if i < len(cv_bboxes):
bbox = cv_bboxes[i]
if res and (res.get("is_valid") is True or (res.get("name") and res.get("is_valid") is not False)):
final_stations.append({
"name": res.get("name"),
"address": res.get("address"),
"point": [(bbox[0] + bbox[2]) // 2, (bbox[1] + bbox[3]) // 2],
"bbox": bbox
})
elif not cv_bboxes:
final_stations = vlm_results if isinstance(vlm_results, list) else []
return final_stations
except Exception as e:
logger.error(f"分析列表页失败: {e}")
return []

95
Apps/TelaiDian/Service.py Normal file
View File

@@ -0,0 +1,95 @@
# coding=utf-8
import hashlib
import logging
import os
import sys
import uuid
from datetime import datetime
# Ensure sys path includes root for imports if not already
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from DbKit.Db import Db
from Config.Config import DB_URL
from Model.StationProfile import StationProfile
from Model.StationStatus import StationStatus
from Model.StationPriceSchedule import StationPriceSchedule
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class TelaiDianService:
def __init__(self):
self.db = Db(db_url=DB_URL)
self.station_profile_model = StationProfile()
self.station_status_model = StationStatus()
self.station_price_schedule_model = StationPriceSchedule()
self.operator = "特来电"
def generate_id(self):
return str(uuid.uuid4())
def get_hash(self, s: str) -> str:
return hashlib.md5(s.encode('utf-8')).hexdigest()
async def init_db(self):
await self.db.init_db()
async def close_db(self):
await self.db.close()
async def save_station_data(self, station_name, address, prices):
"""
保存场站及其价格数据到数据库
"""
if not prices:
return False
station_hash = self.get_hash(station_name)
now = datetime.now()
# 将价格转换为 24 小时的 schedule 格式 (0-23)
hourly_schedule = [0.0] * 24
for p in prices:
try:
start_hour = int(p['start'].split(':')[0])
end_hour = int(p['end'].split(':')[0])
price = float(p['price'])
# 处理跨天的情况(如 23:00 - 01:00
curr = start_hour
while curr != end_hour:
hourly_schedule[curr] = price
curr = (curr + 1) % 24
except Exception as e:
logger.error(f"解析价格时段失败: {p}, error: {e}")
async with await self.db.get_session() as session:
# 1. 保存 Profile
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=station_name,
address=address,
valid_start_time=now
)
# 2. 保存价格
schedule_id = self.generate_id()
await self.station_price_schedule_model.save(
session=session,
id=schedule_id,
station_hash=station_hash,
schedule_json=hourly_schedule,
valid_start_time=now
)
await session.commit()
logger.info(f"成功保存场站数据: {station_name}")
return True

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.7 KiB

View File

71
T4_TelaiDian.py Normal file
View File

@@ -0,0 +1,71 @@
# coding=utf-8
import sys
import os
import asyncio
import logging
# 将项目根目录添加到 sys.path
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.append(project_root)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("T4_TelaiDian")
try:
from Apps.TelaiDian.Service import TelaiDianService
from Apps.TelaiDian import Opener, Crawler, Kit
except KeyboardInterrupt:
logger.info("\n🛑 用户在初始化阶段手动停止了程序。")
sys.exit(0)
except Exception as e:
logger.error(f"❌ 初始化导入失败: {e}")
sys.exit(1)
async def run_process():
logger.info("=== 开始全流程任务 (特来电): 打开小程序 -> 爬取数据 ===")
# 步骤 0: 初始化基础服务
logger.info(">>> 步骤 0: 初始化基础服务 (数据库连接)...")
service = TelaiDianService()
await service.init_db()
try:
# 启动前清空临时目录
Kit.clear_temp_dir()
# 步骤 1: 启动小程序
logger.info(">>> 步骤 1: 启动 特来电 小程序...")
success = await Opener.open_mini_program()
if not success:
logger.error("❌ 无法成功打开小程序,任务终止。")
return
logger.info("✅ 小程序启动成功,等待 5 秒确保界面稳定...")
await asyncio.sleep(5)
# 步骤 2: 执行爬取任务
logger.info(">>> 步骤 2: 开始执行场站爬取任务...")
# 调用 Crawler 的 main并传入 service
await Crawler.main(service=service)
logger.info("✅ 爬取任务完成!")
except Exception as e:
logger.error(f"❌ 运行异常: {e}")
finally:
if service:
await service.close_db()
logger.info("=== 全流程任务结束 ===")
if __name__ == "__main__":
try:
asyncio.run(run_process())
except KeyboardInterrupt:
logger.info("\n🛑 用户手动停止了程序 (Ctrl+C)。")
except Exception as e:
logger.exception(f"主程序崩溃: {e}")