This commit is contained in:
HuangHai
2026-01-15 10:47:59 +08:00
parent 9d9b95037f
commit 3e5ddc587c
9 changed files with 136 additions and 43 deletions

View File

@@ -2,7 +2,7 @@
# 采集配置
SCROLL_DISTANCE_RATIO = 0.3
MAX_STATIONS_COUNT = 30
MAX_STATIONS_COUNT = 1
FIRST_RUN_ONLY_ONE_STATION = False
# 场站去重过期时间(秒),在此时间内重复出现的场站不会再次点击进入详情页
REDIS_STATION_EXPIRE = 120

View File

@@ -209,7 +209,7 @@ class TeLaiDianCrawler(BaseCrawler):
logger.info(f"跳过已处理场站 (Redis): {name}")
continue
logger.info(f"处理场站: {name} (坐标: {point})")
logger.info(f"处理场站: {name} (坐标: {point}, 距离: {station.get('distance')})")
# 点击进入详情
d.click(point[0], point[1])
@@ -453,6 +453,14 @@ class TeLaiDianCrawler(BaseCrawler):
continue
if res:
for p in res:
# 标准化时间格式,确保 0:00 -> 00:00
for key in ['start', 'end']:
val = p.get(key)
if val and ':' in val:
parts = val.split(':')
if len(parts) == 2:
p[key] = f"{int(parts[0]):02d}:{int(parts[1]):02d}"
is_duplicate = False
for existing in all_prices:
if p.get('start') == existing.get('start') and p.get('end') == existing.get('end'):
@@ -500,7 +508,7 @@ class TeLaiDianCrawler(BaseCrawler):
station_name_clean = clean_station_name(station_name)
logger.info(f"[详情页] 虽未获取价格,但已获取地址,尝试仅保存基础信息: {station_name_clean} | {address}")
try:
await self.service.save_station_profile_only(station_name_clean, address)
await self.service.save_station_profile_only(station_name_clean, address, distance=distance)
except Exception as e:
logger.error(f"[详情页] 仅保存基础信息失败: {e}")

View File

@@ -274,30 +274,25 @@ class ReadImageKit:
分析场站列表页图片,提取场站位置和基本信息
"""
prompt = """
分析这张充电站列表截图,提取所有真实的充电站卡片。
分析这张充电站列表截图,提取所有真实的充电站卡片信息
要求
1. 忽略页面上半部分(如顶部导航栏、搜索框、广告 Banner、筛选标签等
2. 仅识别下半部分一条条“充电站卡片”,每张卡片通常包含:场站名称、评分、最近充电时间、距离(如 '5.3km''90m')、价格、快/慢空闲数量等。
3. 不要把同一张卡片拆成多块;每条场站只对应一个矩形框
识别规则
1. 必须是卡片形式的充电站信息区域
2. 每一个卡片通常包含以下要素:
- **场站名称 (name)**: 通常位于卡片顶部,字体最大且加粗
- **距离信息 (distance)**: 例如 '5.3km''90m',通常在名称附近或卡片右上角。
- **价格 (price)**: 例如 '0.84',通常以 ¥ 开头。
- **枪数信息**: 如 '快 闲10/12'
对于每张卡片,请输出:
- name: 场站名称
- distance: 距离信息字符串(例如 '5.3km''90m'
- address: 场站地址(如果无法确定可置为 null
- point: 卡片中心点击坐标 [x, y],使用归一化坐标 [0-1000]0 表示最左/最上1000 表示最右/最下)
- bbox: 卡片外接矩形边界 [x1, y1, x2, y2]同样使用归一化坐标 [0-1000]
- distance: 距离信息字符串(例如 '5.3km''90m'。必须精准提取,不要遗漏单位。
- address: 场站地址(如果卡片上有显示则提取,无则为 null
- point: 卡片中心点击坐标 [x, y],使用归一化坐标 [0-1000]
- bbox: 卡片外接矩形边界 [x1, y1, x2, y2],使用归一化坐标 [0-1000]
JSON 数组形式输出,例如:
[
{
"name": "某某充电站",
"distance": "1.2km",
"address": "某某路 100 号",
"point": [500, 750],
"bbox": [50, 600, 950, 820]
}
]
输出格式为 JSON 数组
注意:严禁识别广告位、筛选标签或功能入口。真正的场站卡片通常是一个横跨屏幕的大区域。
"""
try:

View File

@@ -32,7 +32,8 @@ class TeLaiDianService:
def generate_id(self):
return str(uuid.uuid4())
def get_hash(self, s: str) -> str:
def get_hash(self, station_name: str, address: str = None) -> str:
s = f"{station_name}{address or ''}"
return hashlib.md5(s.encode('utf-8')).hexdigest()
async def init_db(self):
@@ -41,14 +42,15 @@ class TeLaiDianService:
async def close_db(self):
await self.db.close()
async def save_station_profile_only(self, station_name, address):
if not station_name or not address:
async def save_station_profile_only(self, station_name, address, distance=None):
if not station_name:
return False
station_hash = self.get_hash(station_name)
station_hash = self.get_hash(station_name, address)
now = datetime.now()
async with await self.db.get_session() as session:
# 1. 保存 Profile
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
@@ -59,9 +61,25 @@ class TeLaiDianService:
address=address,
valid_start_time=now
)
# 2. 如果有距离信息,也保存一个状态记录,确保距离被记录下来
if distance:
status_id = self.generate_id()
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=None,
free_piles=None,
piles_detail_json=None,
current_price=None,
distance=distance,
valid_start_time=now
)
await session.commit()
logger.info(f"仅保存场站基础信息: {station_name}")
logger.info(f"仅保存场站基础信息: {station_name} | 地址: {address} | 距离: {distance}")
return True
async def save_station_data(self, station_name, address, prices, total_piles=None, free_piles=None, piles_detail=None, parking_info=None, distance=None):
@@ -69,30 +87,59 @@ class TeLaiDianService:
保存场站全量数据名称、地址、24小时价格计划、当前状态
"""
now = datetime.now()
station_hash = hashlib.md5(f"{station_name}{address}".encode('utf-8')).hexdigest()
station_hash = self.get_hash(station_name, address)
# 预处理价格:生成 24 小时的价格映射
hourly_schedule = [None] * 24
# 如果只有一条价格且没有时段,或者是“全天价格统一”的情况
# 我们先检查是否有这种特殊情况
for p in prices:
try:
start_parts = p['start'].split(':')
end_parts = p['end'].split(':')
start_str = p.get('start') or '00:00'
end_str = p.get('end') or '00:00'
start_parts = start_str.split(':')
end_parts = end_str.split(':')
start_hour = int(start_parts[0])
end_hour = int(end_parts[0])
# 处理 00:00 作为结束时间的情况 (表示 24:00)
if end_hour == 0 and (int(end_parts[1]) == 0 if len(end_parts) > 1 else True):
if start_hour != 0:
# 处理 00:00 或 24:00 作为结束时间的情况
if end_hour == 0 or end_hour == 24:
# 如果开始是 0说明是跨越全天或到午夜
if start_hour == 0:
end_hour = 24
else:
# 比如 23:00 - 00:00
end_hour = 24
# 特殊处理:如果 start == end 且只有一条,视为全天
if start_hour == end_hour and len(prices) == 1:
end_hour = start_hour + 24
# 提取各项价格
price_val = p.get('price')
if price_val is None or price_val == 0:
# 尝试从电费和服务费计算
try:
e = float(p.get('elec_price', 0) or 0)
s = float(p.get('service_price', 0) or 0)
if e > 0 or s > 0:
price_val = e + s
except:
pass
# 如果还是没有价格,且有 plus_price 或 market_price尝试回填
if price_val is None or price_val == 0:
price_val = p.get('plus_price') or p.get('market_price') or 0.0
price_data = {
"price": float(p.get('price')) if p.get('price') is not None else 0.0,
"plus_price": float(p.get('plus_price')) if p.get('plus_price') is not None else None,
"market_price": float(p.get('market_price')) if p.get('market_price') is not None else None,
"elec_price": float(p.get('elec_price')) if p.get('elec_price') is not None else None,
"service_price": float(p.get('service_price')) if p.get('service_price') is not None else None
"price": float(price_val) if price_val is not None else 0.0,
"plus_price": float(p.get('plus_price')) if p.get('plus_price') is not None and p.get('plus_price') != 0 else None,
"market_price": float(p.get('market_price')) if p.get('market_price') is not None and p.get('market_price') != 0 else None,
"elec_price": float(p.get('elec_price')) if p.get('elec_price') is not None and p.get('elec_price') != 0 else None,
"service_price": float(p.get('service_price')) if p.get('service_price') is not None and p.get('service_price') != 0 else None
}
# 填充对应的小时槽位
@@ -102,11 +149,39 @@ class TeLaiDianService:
end_hour += 24
while curr < end_hour:
hourly_schedule[curr % 24] = price_data
hour_idx = curr % 24
# 为每个小时创建独立的对象,包含明确的时段信息
hourly_schedule[hour_idx] = {
"start": f"{hour_idx:02d}:00",
"end": f"{(hour_idx + 1):02d}:00",
**price_data
}
curr += 1
except Exception as e:
logger.error(f"解析价格时段失败: {p}, error: {e}")
# 最后兜底:如果有任何价格数据但 hourly_schedule 还有空位,用最近的一个填充
# 这通常处理 VLM 识别不全或者时段没对齐的情况
last_valid = None
# 先找第一个有效的
for item in hourly_schedule:
if item:
last_valid = item
break
if last_valid:
for i in range(24):
if hourly_schedule[i] is None:
# 复制最后一份有效价格,但更新为当前小时的时段
new_item = last_valid.copy()
new_item["start"] = f"{i:02d}:00"
new_item["end"] = f"{(i+1):02d}:00"
hourly_schedule[i] = new_item
else:
last_valid = hourly_schedule[i]
else:
logger.warning(f"场站 {station_name} 未能生成有效的 24 小时价格表")
async with await self.db.get_session() as session:
# 1. 保存 Profile
profile_id = self.generate_id()
@@ -151,5 +226,12 @@ class TeLaiDianService:
)
await session.commit()
logger.info(f"成功保存场站数据: {station_name} (桩数: {total_piles}, 空闲: {free_piles}, 停车费: {parking_info})")
logger.info(f"成功保存场站数据: {station_name} (桩数: {total_piles}, 空闲: {free_piles}, 距离: {distance}, 价格计划条数: {len([x for x in hourly_schedule if x])})")
# 记录 24 小时价格概览,方便调试
summary = []
for i in range(24):
p = hourly_schedule[i]
if p:
summary.append(f"{i:02d}h:{p.get('price')}")
logger.info(f"24小时价格概览: {' | '.join(summary)}")
return True

View File

@@ -28,7 +28,7 @@ except ModuleNotFoundError:
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("T5_ClearHistory")
logger = logging.getLogger("T6_ClearHistory")
# 供应商配置映射
VENDORS = {
@@ -75,23 +75,31 @@ async def clear_vendor_data(db: Db, redis_kit: RedisKit, vendor_info: Dict, mode
# 分批执行删除,防止 SQL 语句过长
batch_size = 500
total_status_deleted = 0
total_price_deleted = 0
for i in range(0, len(station_hashes), batch_size):
batch = station_hashes[i:i + batch_size]
# 1. t_station_status_scd
sql_status = f"DELETE FROM t_station_status_scd WHERE station_hash IN :hashes {where_clause}"
res_status = await session.execute(text(sql_status), {"hashes": batch})
total_status_deleted += res_status.rowcount
# 2. t_station_price_schedule_scd
sql_price = f"DELETE FROM t_station_price_schedule_scd WHERE station_hash IN :hashes {where_clause}"
res_price = await session.execute(text(sql_price), {"hashes": batch})
total_price_deleted += res_price.rowcount
logger.info(f"[{operator}] 已处理场站分批 {i//batch_size + 1},删除状态记录 {res_status.rowcount} 条,价格记录 {res_price.rowcount} 条。")
logger.info(f"[{operator}] 正在处理分批 {i//batch_size + 1}/{ (len(station_hashes)-1)//batch_size + 1}...")
logger.info(f"[{operator}] 表 t_station_status_scd 清理完成,共删除 {total_status_deleted} 条记录。")
logger.info(f"[{operator}] 表 t_station_price_schedule_scd 清理完成,共删除 {total_price_deleted} 条记录。")
# 3. 最后删除主表记录
sql_profile = f"DELETE FROM t_station_profile_scd WHERE operator = :operator {where_clause}"
res_profile = await session.execute(text(sql_profile) , {"operator": operator})
logger.info(f"[{operator}] t_station_profile_scd 清理完成,删除 {res_profile.rowcount} ")
logger.info(f"[{operator}] t_station_profile_scd 清理完成,删除 {res_profile.rowcount} 条记录")
logger.info(f"[{operator}] 数据库记录清理完成。")
except Exception as e: