581 lines
23 KiB
Python
581 lines
23 KiB
Python
import hashlib
|
|
import os
|
|
import re
|
|
import sys
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
if project_root not in sys.path:
|
|
sys.path.append(project_root)
|
|
|
|
from Apps.XinDianTu.Kit import setup_logger
|
|
from Apps.XinDianTu.ReadImageKit import ReadImageKit
|
|
from DbKit.Db import Db
|
|
from Config.Config import DB_URL, PRICE_FLATTEN_TO_24H_GLOBAL
|
|
from Model.StationProfile import StationProfile
|
|
from Model.StationStatus import StationStatus
|
|
from Model.StationPriceSchedule import StationPriceSchedule
|
|
from Apps.XinDianTu.Config.Setting import PRICE_FLATTEN_TO_24H
|
|
|
|
logger = setup_logger("Service")
|
|
|
|
class XinDianTuService:
|
|
def __init__(self):
|
|
self.db = Db(db_url=DB_URL)
|
|
self.station_profile_model = StationProfile()
|
|
self.station_status_model = StationStatus()
|
|
self.station_price_schedule_model = StationPriceSchedule()
|
|
self.operator = "新电途"
|
|
|
|
def generate_id(self):
|
|
return str(uuid.uuid4())
|
|
|
|
def get_hash(self, s: str) -> str:
|
|
return hashlib.md5(s.encode('utf-8')).hexdigest()
|
|
|
|
def _to_float(self, value) -> float:
|
|
"""
|
|
从字符串中提取浮点数,过滤掉非数字字符
|
|
"""
|
|
if value is None:
|
|
return 0.0
|
|
if isinstance(value, (int, float)):
|
|
return float(value)
|
|
|
|
import re
|
|
try:
|
|
# 提取第一个匹配的数字部分(包含小数点)
|
|
match = re.search(r"[-+]?\d*\.?\d+", str(value))
|
|
if match:
|
|
return float(match.group())
|
|
return 0.0
|
|
except:
|
|
return 0.0
|
|
|
|
async def init_db(self):
|
|
await self.db.init_db()
|
|
|
|
async def close_db(self):
|
|
await self.db.close()
|
|
|
|
async def process_station_list_vl(self, vl_image_url_or_path, json_metadata, device_info=None, max_count=None) -> list:
|
|
"""
|
|
基于 VL 模式处理场站列表
|
|
"""
|
|
station_list = await ReadImageKit.parse_vl_image(vl_image_url_or_path, json_metadata, device_info=device_info)
|
|
if not station_list:
|
|
return []
|
|
|
|
processed_stations = []
|
|
async with await self.db.get_session() as session:
|
|
for station in station_list:
|
|
# 如果指定了最大数量且已达到,则停止保存
|
|
if max_count is not None and len(processed_stations) >= max_count:
|
|
# 填充空位以保持索引对齐,但不保存到数据库
|
|
processed_stations.append(None)
|
|
continue
|
|
|
|
name = station.get("station_name")
|
|
if not name:
|
|
continue
|
|
|
|
# 过滤逻辑 (复用)
|
|
# if any(keyword in name for keyword in STATION_NAME_IGNORE_KEYWORDS):
|
|
# continue
|
|
|
|
station_hash = self.get_hash(name)
|
|
now = datetime.now()
|
|
station["station_hash"] = station_hash
|
|
|
|
# 1. 保存 Profile
|
|
profile_id = self.generate_id()
|
|
await self.station_profile_model.save(
|
|
session=session,
|
|
id=profile_id,
|
|
station_hash=station_hash,
|
|
operator=self.operator,
|
|
station_name=name,
|
|
valid_start_time=now
|
|
)
|
|
station["profile_id"] = profile_id
|
|
station["valid_start_time"] = now.isoformat()
|
|
|
|
# 2. 保存 Status (解析价格和电桩)
|
|
status_id = self.generate_id()
|
|
|
|
# 处理 piles 字段
|
|
piles_data = station.get("piles")
|
|
total, free = 0, 0
|
|
standardized_piles = []
|
|
|
|
if isinstance(piles_data, list):
|
|
# 如果是列表,计算总数并标准化结构
|
|
for idx, p in enumerate(piles_data):
|
|
try:
|
|
t = int(p.get("total", 0))
|
|
f = int(p.get("free", 0))
|
|
total += t
|
|
free += f
|
|
|
|
# 构造符合 ExportExcel 期望的结构
|
|
standardized_piles.append({
|
|
"pile_no": f"G{idx+1}",
|
|
"type": p.get("type", "未知"),
|
|
"power": "", # 暂无功率信息
|
|
"status_text": f"空闲{f}/总{t}",
|
|
"remark": "AI聚合数据"
|
|
})
|
|
except:
|
|
pass
|
|
else:
|
|
# Fallback: 如果不是列表(可能是字符串)
|
|
piles_str = str(piles_data) if piles_data else "0/0"
|
|
|
|
nums = re.findall(r"\d+", piles_str)
|
|
if len(nums) >= 2:
|
|
v1 = int(nums[-2])
|
|
v2 = int(nums[-1])
|
|
if v2 >= v1:
|
|
free = v1
|
|
total = v2
|
|
else:
|
|
free = v2
|
|
total = v1
|
|
|
|
# 构造一条记录用于 Excel 展示
|
|
standardized_piles.append({
|
|
"pile_no": "-",
|
|
"type": "未知",
|
|
"status_text": piles_str,
|
|
"remark": "原始识别文本"
|
|
})
|
|
|
|
price_str = station.get("price", "")
|
|
current_price = self._to_float(price_str)
|
|
|
|
await self.station_status_model.save(
|
|
session=session,
|
|
id=status_id,
|
|
station_hash=station_hash,
|
|
total_piles=total,
|
|
free_piles=free,
|
|
piles_detail_json=standardized_piles,
|
|
current_price=current_price,
|
|
pro_price=station.get("pro_price"),
|
|
parking_info=station.get("parking"),
|
|
distance=station.get("distance"),
|
|
valid_start_time=now
|
|
)
|
|
station["status_id"] = status_id
|
|
station["total_piles"] = total
|
|
station["free_piles"] = free
|
|
station["current_price"] = current_price
|
|
|
|
processed_stations.append(station)
|
|
|
|
await session.commit()
|
|
return processed_stations
|
|
|
|
async def process_station_list_hybrid(self, local_image_path: str, uploader, cdn_domain) -> list:
|
|
"""
|
|
混合模式处理场站列表 (图形学切片 + VL)
|
|
"""
|
|
station_list = await ReadImageKit.parse_hybrid_image(local_image_path, uploader, cdn_domain)
|
|
if not station_list:
|
|
return []
|
|
|
|
processed_stations = []
|
|
async with await self.db.get_session() as session:
|
|
for station in station_list:
|
|
name = station.get("station_name")
|
|
if not name:
|
|
continue
|
|
|
|
station_hash = self.get_hash(name)
|
|
now = datetime.now()
|
|
station["station_hash"] = station_hash
|
|
|
|
# 1. 保存 Profile
|
|
profile_id = self.generate_id()
|
|
await self.station_profile_model.save(
|
|
session=session,
|
|
id=profile_id,
|
|
station_hash=station_hash,
|
|
operator=self.operator,
|
|
station_name=name,
|
|
valid_start_time=now
|
|
)
|
|
station["profile_id"] = profile_id
|
|
station["valid_start_time"] = now.isoformat()
|
|
|
|
# 2. 保存 Status (解析价格和电桩)
|
|
status_id = self.generate_id()
|
|
|
|
# 处理 piles 字段 (由 VL 返回的列表结构)
|
|
piles_data = station.get("piles")
|
|
total, free = 0, 0
|
|
standardized_piles = []
|
|
|
|
if isinstance(piles_data, list):
|
|
for idx, p in enumerate(piles_data):
|
|
try:
|
|
t = int(p.get("total", 0))
|
|
f = int(p.get("free", 0))
|
|
total += t
|
|
free += f
|
|
|
|
standardized_piles.append({
|
|
"pile_no": f"G{idx+1}",
|
|
"type": p.get("type", "未知"),
|
|
"power": "",
|
|
"status_text": f"空闲{f}/总{t}",
|
|
"remark": "列表页VL"
|
|
})
|
|
except:
|
|
pass
|
|
elif isinstance(piles_data, str) and "/" in piles_data:
|
|
# 兼容旧格式或意外返回的字符串格式
|
|
try:
|
|
parts = piles_data.split("/")
|
|
free = int(re.search(r"\d+", parts[0]).group())
|
|
total = int(re.search(r"\d+", parts[1]).group())
|
|
standardized_piles.append({
|
|
"pile_no": "G1",
|
|
"type": "未知",
|
|
"power": "",
|
|
"status_text": f"空闲{free}/总{total}",
|
|
"remark": "列表页VL(兼容)"
|
|
})
|
|
except:
|
|
pass
|
|
|
|
price_str = station.get("price", "")
|
|
current_price = self._to_float(price_str)
|
|
|
|
await self.station_status_model.save(
|
|
session=session,
|
|
id=status_id,
|
|
station_hash=station_hash,
|
|
total_piles=total,
|
|
free_piles=free,
|
|
piles_detail_json=standardized_piles if standardized_piles else None,
|
|
current_price=current_price,
|
|
parking_info=station.get("parking"),
|
|
valid_start_time=now
|
|
)
|
|
station["status_id"] = status_id
|
|
station["total_piles"] = total
|
|
station["free_piles"] = free
|
|
station["current_price"] = current_price
|
|
|
|
processed_stations.append(station)
|
|
|
|
await session.commit()
|
|
return processed_stations
|
|
|
|
async def process_station_list(self, list_url: str, device_info: dict = None) -> list:
|
|
station_list = await ReadImageKit.parse_first_level_image_url(list_url, device_info=device_info)
|
|
if not station_list:
|
|
return []
|
|
|
|
processed_stations = []
|
|
async with await self.db.get_session() as session:
|
|
for station in station_list:
|
|
name = station.get("station_name")
|
|
if not name:
|
|
continue
|
|
|
|
|
|
station_hash = self.get_hash(name)
|
|
now = datetime.now()
|
|
station["station_hash"] = station_hash
|
|
address = station.get("address")
|
|
|
|
profile_id = self.generate_id()
|
|
await self.station_profile_model.save(
|
|
session=session,
|
|
id=profile_id,
|
|
station_hash=station_hash,
|
|
operator=self.operator,
|
|
station_name=name,
|
|
address=address,
|
|
valid_start_time=now
|
|
)
|
|
station["profile_id"] = profile_id
|
|
station["valid_start_time"] = now.isoformat()
|
|
|
|
status_id = self.generate_id()
|
|
piles_info = station.get("piles", [])
|
|
|
|
total = 0
|
|
free = 0
|
|
if isinstance(piles_info, list):
|
|
for p in piles_info:
|
|
if isinstance(p, dict):
|
|
try:
|
|
total += int(p.get("total", 0))
|
|
free += int(p.get("free", 0))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
elif isinstance(piles_info, dict):
|
|
try:
|
|
total = int(piles_info.get("total", 0))
|
|
free = int(piles_info.get("free", 0))
|
|
except (ValueError, TypeError):
|
|
total = 0
|
|
free = 0
|
|
|
|
# 过滤掉总枪数为 0 的错误数据
|
|
if total <= 0:
|
|
logger.warning(f"跳过场站 {name}: 总枪数为 0 (疑似识别错误或数据无效)")
|
|
continue
|
|
|
|
price = station.get("price")
|
|
current_price = self._to_float(price) if price is not None and price != "" else None
|
|
|
|
pro_price_val = station.get("pro_price")
|
|
pro_price = self._to_float(pro_price_val) if pro_price_val is not None and pro_price_val != "" else None
|
|
|
|
await self.station_status_model.save(
|
|
session=session,
|
|
id=status_id,
|
|
station_hash=station_hash,
|
|
total_piles=total,
|
|
free_piles=free,
|
|
piles_detail_json=piles_info,
|
|
current_price=current_price,
|
|
pro_price=pro_price,
|
|
valid_start_time=now
|
|
)
|
|
station["status_id"] = status_id
|
|
station["total_piles"] = total
|
|
station["free_piles"] = free
|
|
station["current_price"] = current_price
|
|
|
|
processed_stations.append(station)
|
|
|
|
await session.commit()
|
|
|
|
return processed_stations
|
|
|
|
async def process_station_address(self, station_name: str, detail_url: str, device_info: dict = None) -> dict:
|
|
logger.info(f"\n[Service] Processing Address for '{station_name}' from: {detail_url}")
|
|
|
|
address_data = await ReadImageKit.parse_address(station_name, detail_url, device_info=device_info)
|
|
address = address_data.get("address")
|
|
if address:
|
|
address = address.strip()
|
|
|
|
# 尝试获取完整场站名称
|
|
full_name = address_data.get("full_station_name")
|
|
final_name = station_name
|
|
if full_name:
|
|
full_name = full_name.strip()
|
|
# 只有当完整名称存在且与原名称不同时才使用
|
|
if full_name and full_name != station_name:
|
|
logger.info(f" - Found Full Name: {full_name} (Original: {station_name})")
|
|
final_name = full_name
|
|
|
|
if address:
|
|
logger.info(f" - Found Address: {address}")
|
|
station_hash = self.get_hash(station_name) # Hash 必须基于原名称(列表名称)保持一致,以便索引
|
|
|
|
async with await self.db.get_session() as session:
|
|
updated = await self.station_profile_model.backfill_address_if_missing(
|
|
session=session,
|
|
station_hash=station_hash,
|
|
station_name=final_name, # 更新为完整名称
|
|
address=address
|
|
)
|
|
if updated > 0:
|
|
logger.info(f" - Backfilled address and full name into {updated} profile record(s).")
|
|
else:
|
|
now = datetime.now()
|
|
profile_id = self.generate_id()
|
|
await self.station_profile_model.save(
|
|
session=session,
|
|
id=profile_id,
|
|
station_hash=station_hash,
|
|
operator=self.operator,
|
|
station_name=final_name, # 使用完整名称
|
|
address=address,
|
|
coord_x=None,
|
|
coord_y=None,
|
|
valid_start_time=now
|
|
)
|
|
logger.info(" - Inserted/Updated profile record with full name.")
|
|
await session.commit()
|
|
else:
|
|
logger.warning(f" - Address is empty (likely incomplete screenshot). Skipping profile update. Data: {address_data}")
|
|
|
|
return address_data
|
|
|
|
async def record_station_status(self, station_info: dict):
|
|
"""
|
|
记录场站状态信息(从列表页抓取到的电桩、价格等)
|
|
"""
|
|
name = station_info.get("station_name")
|
|
if not name:
|
|
return
|
|
|
|
station_hash = self.get_hash(name)
|
|
now = datetime.now()
|
|
|
|
# 1. 保存/更新 Profile (基础信息)
|
|
async with await self.db.get_session() as session:
|
|
profile_id = self.generate_id()
|
|
await self.station_profile_model.save(
|
|
session=session,
|
|
id=profile_id,
|
|
station_hash=station_hash,
|
|
operator=self.operator,
|
|
station_name=name,
|
|
valid_start_time=now
|
|
)
|
|
|
|
# 2. 保存 Status
|
|
status_id = self.generate_id()
|
|
|
|
# 解析电桩数量 (支持由 VL 返回的列表或字符串)
|
|
piles_data = station_info.get("piles", [])
|
|
total, free = 0, 0
|
|
standardized_piles = []
|
|
|
|
if isinstance(piles_data, list):
|
|
# 模式 1: VL 返回的结构化列表
|
|
for idx, p in enumerate(piles_data):
|
|
if isinstance(p, dict):
|
|
try:
|
|
t = int(p.get("total", 0))
|
|
f = int(p.get("free", 0))
|
|
total += t
|
|
free += f
|
|
standardized_piles.append({
|
|
"pile_no": f"G{idx+1}",
|
|
"type": p.get("type", "未知"),
|
|
"power": "",
|
|
"status_text": f"空闲{f}/总{t}",
|
|
"remark": "列表页VL"
|
|
})
|
|
except:
|
|
pass
|
|
elif isinstance(piles_data, str):
|
|
# 模式 2: 字符串格式 (兼容)
|
|
piles_str = piles_data
|
|
# 尝试匹配 "快:2/10" 或 "2/10" 这种格式
|
|
matches_with_type = re.findall(r"(\w+):(\d+)/(\d+)", piles_str)
|
|
if matches_with_type:
|
|
for idx, (p_type, f, t) in enumerate(matches_with_type):
|
|
f_int, t_int = int(f), int(t)
|
|
free += f_int
|
|
total += t_int
|
|
standardized_piles.append({
|
|
"pile_no": f"G{idx+1}",
|
|
"type": p_type,
|
|
"power": "",
|
|
"status_text": f"空闲{f_int}/总{t_int}",
|
|
"remark": "列表页VL(兼容)"
|
|
})
|
|
else:
|
|
# 模式 3: 无类型,仅 "2/10"
|
|
matches_simple = re.findall(r"(\d+)/(\d+)", piles_str)
|
|
for idx, (f, t) in enumerate(matches_simple):
|
|
f_int, t_int = int(f), int(t)
|
|
free += f_int
|
|
total += t_int
|
|
standardized_piles.append({
|
|
"pile_no": f"G{idx+1}",
|
|
"type": "未知",
|
|
"power": "",
|
|
"status_text": f"空闲{f_int}/总{t_int}",
|
|
"remark": "列表页VL(兼容)"
|
|
})
|
|
|
|
if not standardized_piles and isinstance(piles_data, str):
|
|
# 兜底匹配所有数字
|
|
nums = re.findall(r"(\d+)", piles_data)
|
|
if len(nums) >= 2:
|
|
free = int(nums[0])
|
|
total = int(nums[1])
|
|
standardized_piles.append({
|
|
"pile_no": "G1",
|
|
"type": "未知",
|
|
"power": "",
|
|
"status_text": f"空闲{free}/总{total}",
|
|
"remark": "列表页VL兜底"
|
|
})
|
|
|
|
# 解析价格
|
|
price_val = station_info.get("price")
|
|
current_price = self._to_float(price_val)
|
|
|
|
await self.station_status_model.save(
|
|
session=session,
|
|
id=status_id,
|
|
station_hash=station_hash,
|
|
total_piles=total,
|
|
free_piles=free,
|
|
piles_detail_json=standardized_piles,
|
|
current_price=current_price,
|
|
pro_price=None, # 列表页暂不解析 PRO 价
|
|
parking_info=station_info.get("parking_info"),
|
|
distance=station_info.get("distance"),
|
|
valid_start_time=now
|
|
)
|
|
await session.commit()
|
|
logger.info(f" - Station status recorded for: {name} (Piles: {free}/{total}, Price: {current_price})")
|
|
|
|
async def process_price_detail_data(self, station_name, hourly_schedule) -> bool:
|
|
if not station_name or not hourly_schedule:
|
|
return False
|
|
|
|
station_hash = self.get_hash(station_name)
|
|
now = datetime.now()
|
|
|
|
schedule_to_save = hourly_schedule
|
|
|
|
use_flatten = (PRICE_FLATTEN_TO_24H_GLOBAL or PRICE_FLATTEN_TO_24H) and isinstance(hourly_schedule, list)
|
|
|
|
if use_flatten:
|
|
rows = ReadImageKit.hourly_full_day(hourly_schedule)
|
|
if rows:
|
|
schedule_to_save = rows
|
|
|
|
async with await self.db.get_session() as session:
|
|
schedule_id = self.generate_id()
|
|
await self.station_price_schedule_model.save(
|
|
session=session,
|
|
id=schedule_id,
|
|
station_hash=station_hash,
|
|
schedule_json=schedule_to_save,
|
|
valid_start_time=now,
|
|
)
|
|
await session.commit()
|
|
return True
|
|
|
|
async def process_price_schedule(self, station_name: str, schedule_urls: list, device_info: dict = None):
|
|
logger.info(f"\n[Service] Processing Price Schedule for '{station_name}'")
|
|
|
|
rows = await ReadImageKit.parse_price_schedule_multi(station_name, schedule_urls, device_info=device_info)
|
|
hourly_schedule = ReadImageKit.hourly_full_day(rows)
|
|
|
|
if hourly_schedule:
|
|
logger.info(f" - Got {len(hourly_schedule)} hourly records.")
|
|
station_hash = self.get_hash(station_name)
|
|
now = datetime.now()
|
|
schedule_id = self.generate_id()
|
|
|
|
async with await self.db.get_session() as session:
|
|
await self.station_price_schedule_model.save(
|
|
session=session,
|
|
id=schedule_id,
|
|
station_hash=station_hash,
|
|
schedule_json=hourly_schedule,
|
|
valid_start_time=now
|
|
)
|
|
await session.commit()
|
|
logger.info(" - Schedule record saved.")
|
|
else:
|
|
logger.info(" - Failed to parse schedule.")
|