409 lines
16 KiB
Python
409 lines
16 KiB
Python
# coding=utf-8
|
||
import asyncio
|
||
import hashlib
|
||
import logging
|
||
import os
|
||
import sys
|
||
import uuid
|
||
from datetime import datetime
|
||
|
||
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
if project_root not in sys.path:
|
||
sys.path.append(project_root)
|
||
|
||
from DbKit.Db import Db
|
||
try:
|
||
from Config.Config import DB_URL, PRICE_FLATTEN_TO_24H_GLOBAL
|
||
except ModuleNotFoundError:
|
||
import importlib.util
|
||
_config_path_cfg = os.path.join(project_root, "Config", "Config.py")
|
||
_spec_cfg = importlib.util.spec_from_file_location("project_config_fallback", _config_path_cfg)
|
||
_cfg_mod = importlib.util.module_from_spec(_spec_cfg)
|
||
assert _spec_cfg.loader is not None
|
||
_spec_cfg.loader.exec_module(_cfg_mod)
|
||
DB_URL = _cfg_mod.DB_URL
|
||
PRICE_FLATTEN_TO_24H_GLOBAL = _cfg_mod.PRICE_FLATTEN_TO_24H_GLOBAL
|
||
from Model.StationProfile import StationProfile
|
||
from Model.StationStatus import StationStatus
|
||
from Model.StationPriceSchedule import StationPriceSchedule
|
||
from Apps.TeLaiDian.Kit import setup_logger
|
||
from Apps.TeLaiDian.Config.Setting import PRICE_FLATTEN_TO_24H
|
||
|
||
logger = setup_logger("TeLaiDianService")
|
||
|
||
class TeLaiDianService:
|
||
def __init__(self):
|
||
self.db = Db(db_url=DB_URL)
|
||
self.station_profile_model = StationProfile()
|
||
self.station_status_model = StationStatus()
|
||
self.station_price_schedule_model = StationPriceSchedule()
|
||
self.operator = "特来电"
|
||
|
||
def generate_id(self):
|
||
return str(uuid.uuid4())
|
||
|
||
def get_hash(self, station_name: str, address: str = None) -> str:
|
||
s = f"{station_name}{address or ''}"
|
||
return hashlib.md5(s.encode('utf-8')).hexdigest()
|
||
|
||
async def init_db(self):
|
||
await self.db.init_db()
|
||
|
||
async def close_db(self):
|
||
await self.db.close()
|
||
|
||
async def save_station_profile_only(self, station_name, address, distance=None):
|
||
if not station_name:
|
||
return False
|
||
|
||
station_hash = self.get_hash(station_name, address)
|
||
now = datetime.now()
|
||
|
||
async with await self.db.get_session() as session:
|
||
# 1. 保存 Profile
|
||
profile_id = self.generate_id()
|
||
await self.station_profile_model.save(
|
||
session=session,
|
||
id=profile_id,
|
||
station_hash=station_hash,
|
||
operator=self.operator,
|
||
station_name=station_name,
|
||
address=address,
|
||
valid_start_time=now
|
||
)
|
||
|
||
# 2. 如果有距离信息,也保存一个状态记录,确保距离被记录下来
|
||
if distance:
|
||
status_id = self.generate_id()
|
||
await self.station_status_model.save(
|
||
session=session,
|
||
id=status_id,
|
||
station_hash=station_hash,
|
||
total_piles=None,
|
||
free_piles=None,
|
||
piles_detail_json=None,
|
||
current_price=None,
|
||
distance=distance,
|
||
valid_start_time=now
|
||
)
|
||
|
||
await session.commit()
|
||
|
||
logger.info(f"仅保存场站基础信息: {station_name} | 地址: {address} | 距离: {distance}")
|
||
return True
|
||
|
||
async def save_station_data(self, station_name, address, prices, total_piles=None, free_piles=None, piles_detail=None, parking_info=None, distance=None):
|
||
"""
|
||
保存场站全量数据:名称、地址、24小时价格计划、当前状态
|
||
"""
|
||
now = datetime.now()
|
||
station_hash = self.get_hash(station_name, address)
|
||
|
||
# 预处理价格:生成 24 小时的价格映射
|
||
hourly_schedule = [None] * 24
|
||
|
||
# 如果只有一条价格且没有时段,或者是“全天价格统一”的情况
|
||
# 我们先检查是否有这种特殊情况
|
||
|
||
for p in prices:
|
||
try:
|
||
start_str = p.get('start') or '00:00'
|
||
end_str = p.get('end') or '00:00'
|
||
|
||
start_parts = start_str.split(':')
|
||
end_parts = end_str.split(':')
|
||
start_hour = int(start_parts[0])
|
||
end_hour = int(end_parts[0])
|
||
|
||
# 处理 00:00 或 24:00 作为结束时间的情况
|
||
if end_hour == 0 or end_hour == 24:
|
||
# 如果开始是 0,说明是跨越全天或到午夜
|
||
if start_hour == 0:
|
||
end_hour = 24
|
||
else:
|
||
# 比如 23:00 - 00:00
|
||
end_hour = 24
|
||
|
||
# 特殊处理:如果 start == end 且只有一条,视为全天
|
||
if start_hour == end_hour and len(prices) == 1:
|
||
end_hour = start_hour + 24
|
||
|
||
# 提取各项价格
|
||
price_val = p.get('price')
|
||
if price_val is None or price_val == 0:
|
||
# 尝试从电费和服务费计算
|
||
try:
|
||
e = float(p.get('elec_price', 0) or 0)
|
||
s = float(p.get('service_price', 0) or 0)
|
||
if e > 0 or s > 0:
|
||
price_val = e + s
|
||
except:
|
||
pass
|
||
|
||
# 如果还是没有价格,且有 plus_price 或 market_price,尝试回填
|
||
if price_val is None or price_val == 0:
|
||
price_val = p.get('plus_price') or p.get('market_price') or 0.0
|
||
|
||
price_data = {
|
||
"price": float(price_val) if price_val is not None else 0.0,
|
||
"plus_price": float(p.get('plus_price')) if p.get('plus_price') is not None and p.get('plus_price') != 0 else None,
|
||
"market_price": float(p.get('market_price')) if p.get('market_price') is not None and p.get('market_price') != 0 else None,
|
||
"elec_price": float(p.get('elec_price')) if p.get('elec_price') is not None and p.get('elec_price') != 0 else None,
|
||
"service_price": float(p.get('service_price')) if p.get('service_price') is not None and p.get('service_price') != 0 else None
|
||
}
|
||
|
||
# 填充对应的小时槽位
|
||
curr = start_hour
|
||
# 如果是跨天的,比如 23:00 - 01:00
|
||
if end_hour < start_hour:
|
||
end_hour += 24
|
||
|
||
while curr < end_hour:
|
||
hour_idx = curr % 24
|
||
# 为每个小时创建独立的对象,包含明确的时段信息
|
||
hourly_schedule[hour_idx] = {
|
||
"start": f"{hour_idx:02d}:00",
|
||
"end": f"{(hour_idx + 1):02d}:00",
|
||
**price_data
|
||
}
|
||
curr += 1
|
||
except Exception as e:
|
||
logger.error(f"解析价格时段失败: {p}, error: {e}")
|
||
|
||
# 最后兜底:如果有任何价格数据但 hourly_schedule 还有空位,用最近的一个填充
|
||
# 这通常处理 VLM 识别不全或者时段没对齐的情况
|
||
last_valid = None
|
||
# 先找第一个有效的
|
||
for item in hourly_schedule:
|
||
if item:
|
||
last_valid = item
|
||
break
|
||
|
||
if last_valid:
|
||
for i in range(24):
|
||
if hourly_schedule[i] is None:
|
||
# 复制最后一份有效价格,但更新为当前小时的时段
|
||
new_item = last_valid.copy()
|
||
new_item["start"] = f"{i:02d}:00"
|
||
new_item["end"] = f"{(i+1):02d}:00"
|
||
hourly_schedule[i] = new_item
|
||
else:
|
||
last_valid = hourly_schedule[i]
|
||
else:
|
||
logger.warning(f"场站 {station_name} 未能生成有效的 24 小时价格表")
|
||
|
||
async with await self.db.get_session() as session:
|
||
# 1. 保存 Profile
|
||
profile_id = self.generate_id()
|
||
await self.station_profile_model.save(
|
||
session=session,
|
||
id=profile_id,
|
||
station_hash=station_hash,
|
||
operator=self.operator,
|
||
station_name=station_name,
|
||
address=address,
|
||
valid_start_time=now
|
||
)
|
||
|
||
# 2. 保存价格计划
|
||
schedule_id = self.generate_id()
|
||
await self.station_price_schedule_model.save(
|
||
session=session,
|
||
id=schedule_id,
|
||
station_hash=station_hash,
|
||
schedule_json=hourly_schedule,
|
||
valid_start_time=now
|
||
)
|
||
|
||
# 3. 保存当前状态快照 (包含当前小时的价格)
|
||
current_hour = now.hour
|
||
current_price_info = hourly_schedule[current_hour] or {}
|
||
|
||
status_id = self.generate_id()
|
||
await self.station_status_model.save(
|
||
session=session,
|
||
id=status_id,
|
||
station_hash=station_hash,
|
||
total_piles=total_piles,
|
||
free_piles=free_piles,
|
||
piles_detail_json=piles_detail,
|
||
parking_info=parking_info,
|
||
distance=distance,
|
||
current_price=current_price_info.get('price'),
|
||
pro_price=current_price_info.get('plus_price'),
|
||
market_price=current_price_info.get('market_price'),
|
||
valid_start_time=now
|
||
)
|
||
await session.commit()
|
||
|
||
logger.info(f"成功保存场站数据: {station_name} (桩数: {total_piles}, 空闲: {free_piles}, 距离: {distance}, 价格计划条数: {len([x for x in hourly_schedule if x])})")
|
||
# 记录 24 小时价格概览,方便调试
|
||
summary = []
|
||
for i in range(24):
|
||
p = hourly_schedule[i]
|
||
if p:
|
||
summary.append(f"{i:02d}h:{p.get('price')}")
|
||
logger.info(f"24小时价格概览: {' | '.join(summary)}")
|
||
return True
|
||
|
||
async def process_price_detail_data(self, station_name, hourly_schedule, total_piles=None, free_piles=None, piles_detail=None, parking_info=None, distance=None):
|
||
if not station_name or not hourly_schedule:
|
||
return False
|
||
|
||
now = datetime.now()
|
||
station_hash = self.get_hash(station_name, None)
|
||
|
||
schedule_to_save = hourly_schedule
|
||
|
||
use_flatten = (PRICE_FLATTEN_TO_24H_GLOBAL or PRICE_FLATTEN_TO_24H) and isinstance(hourly_schedule, list)
|
||
|
||
if use_flatten:
|
||
tmp = [None] * 24
|
||
for p in hourly_schedule:
|
||
try:
|
||
start_str = p.get("start") or "00:00"
|
||
end_str = p.get("end") or "00:00"
|
||
start_hour = int(start_str.split(":")[0])
|
||
end_hour = int(end_str.split(":")[0])
|
||
if end_hour == 0 and start_hour != 0:
|
||
end_hour = 24
|
||
if end_hour < start_hour:
|
||
end_hour += 24
|
||
|
||
price_val = p.get("price")
|
||
if price_val is None or price_val == 0:
|
||
try:
|
||
e = float(p.get("elec_price", 0) or 0)
|
||
s = float(p.get("service_price", 0) or 0)
|
||
if e > 0 or s > 0:
|
||
price_val = e + s
|
||
except:
|
||
pass
|
||
if price_val is None or price_val == 0:
|
||
price_val = p.get("plus_price") or p.get("market_price") or 0.0
|
||
|
||
price_data = {
|
||
"price": float(price_val) if price_val is not None else 0.0,
|
||
"plus_price": float(p.get("plus_price")) if p.get("plus_price") not in (None, 0) else None,
|
||
"market_price": float(p.get("market_price")) if p.get("market_price") not in (None, 0) else None,
|
||
"elec_price": float(p.get("elec_price")) if p.get("elec_price") not in (None, 0) else None,
|
||
"service_price": float(p.get("service_price")) if p.get("service_price") not in (None, 0) else None,
|
||
}
|
||
|
||
curr = start_hour
|
||
while curr < end_hour:
|
||
hour_idx = curr % 24
|
||
tmp[hour_idx] = {
|
||
"start": f"{hour_idx:02d}:00",
|
||
"end": f"{(hour_idx + 1):02d}:00",
|
||
**price_data,
|
||
}
|
||
curr += 1
|
||
except Exception as e:
|
||
logger.error(f"解析价格时段失败: {p}, error: {e}")
|
||
|
||
last_valid = None
|
||
for item in tmp:
|
||
if item:
|
||
last_valid = item
|
||
break
|
||
if last_valid:
|
||
for i in range(24):
|
||
if tmp[i] is None:
|
||
new_item = last_valid.copy()
|
||
new_item["start"] = f"{i:02d}:00"
|
||
new_item["end"] = f"{(i+1):02d}:00"
|
||
tmp[i] = new_item
|
||
else:
|
||
last_valid = tmp[i]
|
||
|
||
if any(tmp):
|
||
schedule_to_save = tmp
|
||
|
||
async with await self.db.get_session() as session:
|
||
schedule_id = self.generate_id()
|
||
await self.station_price_schedule_model.save(
|
||
session=session,
|
||
id=schedule_id,
|
||
station_hash=station_hash,
|
||
schedule_json=schedule_to_save,
|
||
valid_start_time=now,
|
||
)
|
||
|
||
current_hour = now.hour
|
||
current_price_info = None
|
||
if isinstance(schedule_to_save, list) and 0 <= current_hour < len(schedule_to_save):
|
||
current_price_info = schedule_to_save[current_hour] or {}
|
||
elif isinstance(schedule_to_save, dict):
|
||
current_price_info = schedule_to_save
|
||
else:
|
||
current_price_info = {}
|
||
|
||
status_id = self.generate_id()
|
||
await self.station_status_model.save(
|
||
session=session,
|
||
id=status_id,
|
||
station_hash=station_hash,
|
||
total_piles=total_piles,
|
||
free_piles=free_piles,
|
||
piles_detail_json=piles_detail,
|
||
parking_info=parking_info,
|
||
distance=distance,
|
||
current_price=current_price_info.get("price"),
|
||
pro_price=current_price_info.get("plus_price"),
|
||
market_price=current_price_info.get("market_price"),
|
||
valid_start_time=now,
|
||
)
|
||
await session.commit()
|
||
|
||
return True
|
||
|
||
async def save_station_profile_and_status(self, station_name, address, total_piles=None, free_piles=None, piles_detail=None, parking_info=None, distance=None):
|
||
if not station_name:
|
||
return False
|
||
now = datetime.now()
|
||
station_hash = self.get_hash(station_name, address)
|
||
async with await self.db.get_session() as session:
|
||
profile_id = self.generate_id()
|
||
await self.station_profile_model.save(
|
||
session=session,
|
||
id=profile_id,
|
||
station_hash=station_hash,
|
||
operator=self.operator,
|
||
station_name=station_name,
|
||
address=address,
|
||
valid_start_time=now
|
||
)
|
||
status_id = self.generate_id()
|
||
await self.station_status_model.save(
|
||
session=session,
|
||
id=status_id,
|
||
station_hash=station_hash,
|
||
total_piles=total_piles,
|
||
free_piles=free_piles,
|
||
piles_detail_json=piles_detail,
|
||
parking_info=parking_info,
|
||
distance=distance,
|
||
current_price=None,
|
||
valid_start_time=now
|
||
)
|
||
await session.commit()
|
||
return True
|
||
|
||
async def _test_save_profile():
|
||
service = TeLaiDianService()
|
||
await service.init_db()
|
||
try:
|
||
ok = await service.save_station_profile_only(
|
||
"测试场站-AI调试",
|
||
"测试地址-杭州滨江",
|
||
"1.2km"
|
||
)
|
||
print("save_station_profile_only_return:", ok)
|
||
finally:
|
||
await service.close_db()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(_test_save_profile())
|