Files
aiData/Apps/XinDianTu/Service.py
HuangHai a34083cd47 'commit'
2026-01-17 08:49:42 +08:00

581 lines
23 KiB
Python

import hashlib
import os
import re
import sys
import uuid
from datetime import datetime
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from Apps.XinDianTu.Kit import setup_logger
from Apps.XinDianTu.ReadImageKit import ReadImageKit
from DbKit.Db import Db
from Config.Config import DB_URL, PRICE_FLATTEN_TO_24H_GLOBAL
from Model.StationProfile import StationProfile
from Model.StationStatus import StationStatus
from Model.StationPriceSchedule import StationPriceSchedule
from Apps.XinDianTu.Config.Setting import PRICE_FLATTEN_TO_24H
logger = setup_logger("Service")
class XinDianTuService:
def __init__(self):
self.db = Db(db_url=DB_URL)
self.station_profile_model = StationProfile()
self.station_status_model = StationStatus()
self.station_price_schedule_model = StationPriceSchedule()
self.operator = "新电途"
def generate_id(self):
return str(uuid.uuid4())
def get_hash(self, s: str) -> str:
return hashlib.md5(s.encode('utf-8')).hexdigest()
def _to_float(self, value) -> float:
"""
从字符串中提取浮点数,过滤掉非数字字符
"""
if value is None:
return 0.0
if isinstance(value, (int, float)):
return float(value)
import re
try:
# 提取第一个匹配的数字部分(包含小数点)
match = re.search(r"[-+]?\d*\.?\d+", str(value))
if match:
return float(match.group())
return 0.0
except:
return 0.0
async def init_db(self):
await self.db.init_db()
async def close_db(self):
await self.db.close()
async def process_station_list_vl(self, vl_image_url_or_path, json_metadata, device_info=None, max_count=None) -> list:
"""
基于 VL 模式处理场站列表
"""
station_list = await ReadImageKit.parse_vl_image(vl_image_url_or_path, json_metadata, device_info=device_info)
if not station_list:
return []
processed_stations = []
async with await self.db.get_session() as session:
for station in station_list:
# 如果指定了最大数量且已达到,则停止保存
if max_count is not None and len(processed_stations) >= max_count:
# 填充空位以保持索引对齐,但不保存到数据库
processed_stations.append(None)
continue
name = station.get("station_name")
if not name:
continue
# 过滤逻辑 (复用)
# if any(keyword in name for keyword in STATION_NAME_IGNORE_KEYWORDS):
# continue
station_hash = self.get_hash(name)
now = datetime.now()
station["station_hash"] = station_hash
# 1. 保存 Profile
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=name,
valid_start_time=now
)
station["profile_id"] = profile_id
station["valid_start_time"] = now.isoformat()
# 2. 保存 Status (解析价格和电桩)
status_id = self.generate_id()
# 处理 piles 字段
piles_data = station.get("piles")
total, free = 0, 0
standardized_piles = []
if isinstance(piles_data, list):
# 如果是列表,计算总数并标准化结构
for idx, p in enumerate(piles_data):
try:
t = int(p.get("total", 0))
f = int(p.get("free", 0))
total += t
free += f
# 构造符合 ExportExcel 期望的结构
standardized_piles.append({
"pile_no": f"G{idx+1}",
"type": p.get("type", "未知"),
"power": "", # 暂无功率信息
"status_text": f"空闲{f}/总{t}",
"remark": "AI聚合数据"
})
except:
pass
else:
# Fallback: 如果不是列表(可能是字符串)
piles_str = str(piles_data) if piles_data else "0/0"
nums = re.findall(r"\d+", piles_str)
if len(nums) >= 2:
v1 = int(nums[-2])
v2 = int(nums[-1])
if v2 >= v1:
free = v1
total = v2
else:
free = v2
total = v1
# 构造一条记录用于 Excel 展示
standardized_piles.append({
"pile_no": "-",
"type": "未知",
"status_text": piles_str,
"remark": "原始识别文本"
})
price_str = station.get("price", "")
current_price = self._to_float(price_str)
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=total,
free_piles=free,
piles_detail_json=standardized_piles,
current_price=current_price,
pro_price=station.get("pro_price"),
parking_info=station.get("parking"),
distance=station.get("distance"),
valid_start_time=now
)
station["status_id"] = status_id
station["total_piles"] = total
station["free_piles"] = free
station["current_price"] = current_price
processed_stations.append(station)
await session.commit()
return processed_stations
async def process_station_list_hybrid(self, local_image_path: str, uploader, cdn_domain) -> list:
"""
混合模式处理场站列表 (图形学切片 + VL)
"""
station_list = await ReadImageKit.parse_hybrid_image(local_image_path, uploader, cdn_domain)
if not station_list:
return []
processed_stations = []
async with await self.db.get_session() as session:
for station in station_list:
name = station.get("station_name")
if not name:
continue
station_hash = self.get_hash(name)
now = datetime.now()
station["station_hash"] = station_hash
# 1. 保存 Profile
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=name,
valid_start_time=now
)
station["profile_id"] = profile_id
station["valid_start_time"] = now.isoformat()
# 2. 保存 Status (解析价格和电桩)
status_id = self.generate_id()
# 处理 piles 字段 (由 VL 返回的列表结构)
piles_data = station.get("piles")
total, free = 0, 0
standardized_piles = []
if isinstance(piles_data, list):
for idx, p in enumerate(piles_data):
try:
t = int(p.get("total", 0))
f = int(p.get("free", 0))
total += t
free += f
standardized_piles.append({
"pile_no": f"G{idx+1}",
"type": p.get("type", "未知"),
"power": "",
"status_text": f"空闲{f}/总{t}",
"remark": "列表页VL"
})
except:
pass
elif isinstance(piles_data, str) and "/" in piles_data:
# 兼容旧格式或意外返回的字符串格式
try:
parts = piles_data.split("/")
free = int(re.search(r"\d+", parts[0]).group())
total = int(re.search(r"\d+", parts[1]).group())
standardized_piles.append({
"pile_no": "G1",
"type": "未知",
"power": "",
"status_text": f"空闲{free}/总{total}",
"remark": "列表页VL(兼容)"
})
except:
pass
price_str = station.get("price", "")
current_price = self._to_float(price_str)
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=total,
free_piles=free,
piles_detail_json=standardized_piles if standardized_piles else None,
current_price=current_price,
parking_info=station.get("parking"),
valid_start_time=now
)
station["status_id"] = status_id
station["total_piles"] = total
station["free_piles"] = free
station["current_price"] = current_price
processed_stations.append(station)
await session.commit()
return processed_stations
async def process_station_list(self, list_url: str, device_info: dict = None) -> list:
station_list = await ReadImageKit.parse_first_level_image_url(list_url, device_info=device_info)
if not station_list:
return []
processed_stations = []
async with await self.db.get_session() as session:
for station in station_list:
name = station.get("station_name")
if not name:
continue
station_hash = self.get_hash(name)
now = datetime.now()
station["station_hash"] = station_hash
address = station.get("address")
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=name,
address=address,
valid_start_time=now
)
station["profile_id"] = profile_id
station["valid_start_time"] = now.isoformat()
status_id = self.generate_id()
piles_info = station.get("piles", [])
total = 0
free = 0
if isinstance(piles_info, list):
for p in piles_info:
if isinstance(p, dict):
try:
total += int(p.get("total", 0))
free += int(p.get("free", 0))
except (ValueError, TypeError):
pass
elif isinstance(piles_info, dict):
try:
total = int(piles_info.get("total", 0))
free = int(piles_info.get("free", 0))
except (ValueError, TypeError):
total = 0
free = 0
# 过滤掉总枪数为 0 的错误数据
if total <= 0:
logger.warning(f"跳过场站 {name}: 总枪数为 0 (疑似识别错误或数据无效)")
continue
price = station.get("price")
current_price = self._to_float(price) if price is not None and price != "" else None
pro_price_val = station.get("pro_price")
pro_price = self._to_float(pro_price_val) if pro_price_val is not None and pro_price_val != "" else None
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=total,
free_piles=free,
piles_detail_json=piles_info,
current_price=current_price,
pro_price=pro_price,
valid_start_time=now
)
station["status_id"] = status_id
station["total_piles"] = total
station["free_piles"] = free
station["current_price"] = current_price
processed_stations.append(station)
await session.commit()
return processed_stations
async def process_station_address(self, station_name: str, detail_url: str, device_info: dict = None) -> dict:
logger.info(f"\n[Service] Processing Address for '{station_name}' from: {detail_url}")
address_data = await ReadImageKit.parse_address(station_name, detail_url, device_info=device_info)
address = address_data.get("address")
if address:
address = address.strip()
# 尝试获取完整场站名称
full_name = address_data.get("full_station_name")
final_name = station_name
if full_name:
full_name = full_name.strip()
# 只有当完整名称存在且与原名称不同时才使用
if full_name and full_name != station_name:
logger.info(f" - Found Full Name: {full_name} (Original: {station_name})")
final_name = full_name
if address:
logger.info(f" - Found Address: {address}")
station_hash = self.get_hash(station_name) # Hash 必须基于原名称(列表名称)保持一致,以便索引
async with await self.db.get_session() as session:
updated = await self.station_profile_model.backfill_address_if_missing(
session=session,
station_hash=station_hash,
station_name=final_name, # 更新为完整名称
address=address
)
if updated > 0:
logger.info(f" - Backfilled address and full name into {updated} profile record(s).")
else:
now = datetime.now()
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=final_name, # 使用完整名称
address=address,
coord_x=None,
coord_y=None,
valid_start_time=now
)
logger.info(" - Inserted/Updated profile record with full name.")
await session.commit()
else:
logger.warning(f" - Address is empty (likely incomplete screenshot). Skipping profile update. Data: {address_data}")
return address_data
async def record_station_status(self, station_info: dict):
"""
记录场站状态信息(从列表页抓取到的电桩、价格等)
"""
name = station_info.get("station_name")
if not name:
return
station_hash = self.get_hash(name)
now = datetime.now()
# 1. 保存/更新 Profile (基础信息)
async with await self.db.get_session() as session:
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=name,
valid_start_time=now
)
# 2. 保存 Status
status_id = self.generate_id()
# 解析电桩数量 (支持由 VL 返回的列表或字符串)
piles_data = station_info.get("piles", [])
total, free = 0, 0
standardized_piles = []
if isinstance(piles_data, list):
# 模式 1: VL 返回的结构化列表
for idx, p in enumerate(piles_data):
if isinstance(p, dict):
try:
t = int(p.get("total", 0))
f = int(p.get("free", 0))
total += t
free += f
standardized_piles.append({
"pile_no": f"G{idx+1}",
"type": p.get("type", "未知"),
"power": "",
"status_text": f"空闲{f}/总{t}",
"remark": "列表页VL"
})
except:
pass
elif isinstance(piles_data, str):
# 模式 2: 字符串格式 (兼容)
piles_str = piles_data
# 尝试匹配 "快:2/10" 或 "2/10" 这种格式
matches_with_type = re.findall(r"(\w+):(\d+)/(\d+)", piles_str)
if matches_with_type:
for idx, (p_type, f, t) in enumerate(matches_with_type):
f_int, t_int = int(f), int(t)
free += f_int
total += t_int
standardized_piles.append({
"pile_no": f"G{idx+1}",
"type": p_type,
"power": "",
"status_text": f"空闲{f_int}/总{t_int}",
"remark": "列表页VL(兼容)"
})
else:
# 模式 3: 无类型,仅 "2/10"
matches_simple = re.findall(r"(\d+)/(\d+)", piles_str)
for idx, (f, t) in enumerate(matches_simple):
f_int, t_int = int(f), int(t)
free += f_int
total += t_int
standardized_piles.append({
"pile_no": f"G{idx+1}",
"type": "未知",
"power": "",
"status_text": f"空闲{f_int}/总{t_int}",
"remark": "列表页VL(兼容)"
})
if not standardized_piles and isinstance(piles_data, str):
# 兜底匹配所有数字
nums = re.findall(r"(\d+)", piles_data)
if len(nums) >= 2:
free = int(nums[0])
total = int(nums[1])
standardized_piles.append({
"pile_no": "G1",
"type": "未知",
"power": "",
"status_text": f"空闲{free}/总{total}",
"remark": "列表页VL兜底"
})
# 解析价格
price_val = station_info.get("price")
current_price = self._to_float(price_val)
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=total,
free_piles=free,
piles_detail_json=standardized_piles,
current_price=current_price,
pro_price=None, # 列表页暂不解析 PRO 价
parking_info=station_info.get("parking_info"),
distance=station_info.get("distance"),
valid_start_time=now
)
await session.commit()
logger.info(f" - Station status recorded for: {name} (Piles: {free}/{total}, Price: {current_price})")
async def process_price_detail_data(self, station_name, hourly_schedule) -> bool:
if not station_name or not hourly_schedule:
return False
station_hash = self.get_hash(station_name)
now = datetime.now()
schedule_to_save = hourly_schedule
use_flatten = (PRICE_FLATTEN_TO_24H_GLOBAL or PRICE_FLATTEN_TO_24H) and isinstance(hourly_schedule, list)
if use_flatten:
rows = ReadImageKit.hourly_full_day(hourly_schedule)
if rows:
schedule_to_save = rows
async with await self.db.get_session() as session:
schedule_id = self.generate_id()
await self.station_price_schedule_model.save(
session=session,
id=schedule_id,
station_hash=station_hash,
schedule_json=schedule_to_save,
valid_start_time=now,
)
await session.commit()
return True
async def process_price_schedule(self, station_name: str, schedule_urls: list, device_info: dict = None):
logger.info(f"\n[Service] Processing Price Schedule for '{station_name}'")
rows = await ReadImageKit.parse_price_schedule_multi(station_name, schedule_urls, device_info=device_info)
hourly_schedule = ReadImageKit.hourly_full_day(rows)
if hourly_schedule:
logger.info(f" - Got {len(hourly_schedule)} hourly records.")
station_hash = self.get_hash(station_name)
now = datetime.now()
schedule_id = self.generate_id()
async with await self.db.get_session() as session:
await self.station_price_schedule_model.save(
session=session,
id=schedule_id,
station_hash=station_hash,
schedule_json=hourly_schedule,
valid_start_time=now
)
await session.commit()
logger.info(" - Schedule record saved.")
else:
logger.info(" - Failed to parse schedule.")