Files
aiData/Apps/TeLaiDian/Service.py
HuangHai c7cf0db77e 'commit'
2026-01-16 19:15:40 +08:00

409 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import asyncio
import hashlib
import logging
import os
import sys
import uuid
from datetime import datetime
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from DbKit.Db import Db
try:
from Config.Config import DB_URL, PRICE_FLATTEN_TO_24H_GLOBAL
except ModuleNotFoundError:
import importlib.util
_config_path_cfg = os.path.join(project_root, "Config", "Config.py")
_spec_cfg = importlib.util.spec_from_file_location("project_config_fallback", _config_path_cfg)
_cfg_mod = importlib.util.module_from_spec(_spec_cfg)
assert _spec_cfg.loader is not None
_spec_cfg.loader.exec_module(_cfg_mod)
DB_URL = _cfg_mod.DB_URL
PRICE_FLATTEN_TO_24H_GLOBAL = _cfg_mod.PRICE_FLATTEN_TO_24H_GLOBAL
from Model.StationProfile import StationProfile
from Model.StationStatus import StationStatus
from Model.StationPriceSchedule import StationPriceSchedule
from Apps.TeLaiDian.Kit import setup_logger
from Apps.TeLaiDian.Config.Setting import PRICE_FLATTEN_TO_24H
logger = setup_logger("TeLaiDianService")
class TeLaiDianService:
def __init__(self):
self.db = Db(db_url=DB_URL)
self.station_profile_model = StationProfile()
self.station_status_model = StationStatus()
self.station_price_schedule_model = StationPriceSchedule()
self.operator = "特来电"
def generate_id(self):
return str(uuid.uuid4())
def get_hash(self, station_name: str, address: str = None) -> str:
s = f"{station_name}{address or ''}"
return hashlib.md5(s.encode('utf-8')).hexdigest()
async def init_db(self):
await self.db.init_db()
async def close_db(self):
await self.db.close()
async def save_station_profile_only(self, station_name, address, distance=None):
if not station_name:
return False
station_hash = self.get_hash(station_name, address)
now = datetime.now()
async with await self.db.get_session() as session:
# 1. 保存 Profile
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=station_name,
address=address,
valid_start_time=now
)
# 2. 如果有距离信息,也保存一个状态记录,确保距离被记录下来
if distance:
status_id = self.generate_id()
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=None,
free_piles=None,
piles_detail_json=None,
current_price=None,
distance=distance,
valid_start_time=now
)
await session.commit()
logger.info(f"仅保存场站基础信息: {station_name} | 地址: {address} | 距离: {distance}")
return True
async def save_station_data(self, station_name, address, prices, total_piles=None, free_piles=None, piles_detail=None, parking_info=None, distance=None):
"""
保存场站全量数据名称、地址、24小时价格计划、当前状态
"""
now = datetime.now()
station_hash = self.get_hash(station_name, address)
# 预处理价格:生成 24 小时的价格映射
hourly_schedule = [None] * 24
# 如果只有一条价格且没有时段,或者是“全天价格统一”的情况
# 我们先检查是否有这种特殊情况
for p in prices:
try:
start_str = p.get('start') or '00:00'
end_str = p.get('end') or '00:00'
start_parts = start_str.split(':')
end_parts = end_str.split(':')
start_hour = int(start_parts[0])
end_hour = int(end_parts[0])
# 处理 00:00 或 24:00 作为结束时间的情况
if end_hour == 0 or end_hour == 24:
# 如果开始是 0说明是跨越全天或到午夜
if start_hour == 0:
end_hour = 24
else:
# 比如 23:00 - 00:00
end_hour = 24
# 特殊处理:如果 start == end 且只有一条,视为全天
if start_hour == end_hour and len(prices) == 1:
end_hour = start_hour + 24
# 提取各项价格
price_val = p.get('price')
if price_val is None or price_val == 0:
# 尝试从电费和服务费计算
try:
e = float(p.get('elec_price', 0) or 0)
s = float(p.get('service_price', 0) or 0)
if e > 0 or s > 0:
price_val = e + s
except:
pass
# 如果还是没有价格,且有 plus_price 或 market_price尝试回填
if price_val is None or price_val == 0:
price_val = p.get('plus_price') or p.get('market_price') or 0.0
price_data = {
"price": float(price_val) if price_val is not None else 0.0,
"plus_price": float(p.get('plus_price')) if p.get('plus_price') is not None and p.get('plus_price') != 0 else None,
"market_price": float(p.get('market_price')) if p.get('market_price') is not None and p.get('market_price') != 0 else None,
"elec_price": float(p.get('elec_price')) if p.get('elec_price') is not None and p.get('elec_price') != 0 else None,
"service_price": float(p.get('service_price')) if p.get('service_price') is not None and p.get('service_price') != 0 else None
}
# 填充对应的小时槽位
curr = start_hour
# 如果是跨天的,比如 23:00 - 01:00
if end_hour < start_hour:
end_hour += 24
while curr < end_hour:
hour_idx = curr % 24
# 为每个小时创建独立的对象,包含明确的时段信息
hourly_schedule[hour_idx] = {
"start": f"{hour_idx:02d}:00",
"end": f"{(hour_idx + 1):02d}:00",
**price_data
}
curr += 1
except Exception as e:
logger.error(f"解析价格时段失败: {p}, error: {e}")
# 最后兜底:如果有任何价格数据但 hourly_schedule 还有空位,用最近的一个填充
# 这通常处理 VLM 识别不全或者时段没对齐的情况
last_valid = None
# 先找第一个有效的
for item in hourly_schedule:
if item:
last_valid = item
break
if last_valid:
for i in range(24):
if hourly_schedule[i] is None:
# 复制最后一份有效价格,但更新为当前小时的时段
new_item = last_valid.copy()
new_item["start"] = f"{i:02d}:00"
new_item["end"] = f"{(i+1):02d}:00"
hourly_schedule[i] = new_item
else:
last_valid = hourly_schedule[i]
else:
logger.warning(f"场站 {station_name} 未能生成有效的 24 小时价格表")
async with await self.db.get_session() as session:
# 1. 保存 Profile
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=station_name,
address=address,
valid_start_time=now
)
# 2. 保存价格计划
schedule_id = self.generate_id()
await self.station_price_schedule_model.save(
session=session,
id=schedule_id,
station_hash=station_hash,
schedule_json=hourly_schedule,
valid_start_time=now
)
# 3. 保存当前状态快照 (包含当前小时的价格)
current_hour = now.hour
current_price_info = hourly_schedule[current_hour] or {}
status_id = self.generate_id()
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=total_piles,
free_piles=free_piles,
piles_detail_json=piles_detail,
parking_info=parking_info,
distance=distance,
current_price=current_price_info.get('price'),
pro_price=current_price_info.get('plus_price'),
market_price=current_price_info.get('market_price'),
valid_start_time=now
)
await session.commit()
logger.info(f"成功保存场站数据: {station_name} (桩数: {total_piles}, 空闲: {free_piles}, 距离: {distance}, 价格计划条数: {len([x for x in hourly_schedule if x])})")
# 记录 24 小时价格概览,方便调试
summary = []
for i in range(24):
p = hourly_schedule[i]
if p:
summary.append(f"{i:02d}h:{p.get('price')}")
logger.info(f"24小时价格概览: {' | '.join(summary)}")
return True
async def process_price_detail_data(self, station_name, hourly_schedule, total_piles=None, free_piles=None, piles_detail=None, parking_info=None, distance=None):
if not station_name or not hourly_schedule:
return False
now = datetime.now()
station_hash = self.get_hash(station_name, None)
schedule_to_save = hourly_schedule
use_flatten = (PRICE_FLATTEN_TO_24H_GLOBAL or PRICE_FLATTEN_TO_24H) and isinstance(hourly_schedule, list)
if use_flatten:
tmp = [None] * 24
for p in hourly_schedule:
try:
start_str = p.get("start") or "00:00"
end_str = p.get("end") or "00:00"
start_hour = int(start_str.split(":")[0])
end_hour = int(end_str.split(":")[0])
if end_hour == 0 and start_hour != 0:
end_hour = 24
if end_hour < start_hour:
end_hour += 24
price_val = p.get("price")
if price_val is None or price_val == 0:
try:
e = float(p.get("elec_price", 0) or 0)
s = float(p.get("service_price", 0) or 0)
if e > 0 or s > 0:
price_val = e + s
except:
pass
if price_val is None or price_val == 0:
price_val = p.get("plus_price") or p.get("market_price") or 0.0
price_data = {
"price": float(price_val) if price_val is not None else 0.0,
"plus_price": float(p.get("plus_price")) if p.get("plus_price") not in (None, 0) else None,
"market_price": float(p.get("market_price")) if p.get("market_price") not in (None, 0) else None,
"elec_price": float(p.get("elec_price")) if p.get("elec_price") not in (None, 0) else None,
"service_price": float(p.get("service_price")) if p.get("service_price") not in (None, 0) else None,
}
curr = start_hour
while curr < end_hour:
hour_idx = curr % 24
tmp[hour_idx] = {
"start": f"{hour_idx:02d}:00",
"end": f"{(hour_idx + 1):02d}:00",
**price_data,
}
curr += 1
except Exception as e:
logger.error(f"解析价格时段失败: {p}, error: {e}")
last_valid = None
for item in tmp:
if item:
last_valid = item
break
if last_valid:
for i in range(24):
if tmp[i] is None:
new_item = last_valid.copy()
new_item["start"] = f"{i:02d}:00"
new_item["end"] = f"{(i+1):02d}:00"
tmp[i] = new_item
else:
last_valid = tmp[i]
if any(tmp):
schedule_to_save = tmp
async with await self.db.get_session() as session:
schedule_id = self.generate_id()
await self.station_price_schedule_model.save(
session=session,
id=schedule_id,
station_hash=station_hash,
schedule_json=schedule_to_save,
valid_start_time=now,
)
current_hour = now.hour
current_price_info = None
if isinstance(schedule_to_save, list) and 0 <= current_hour < len(schedule_to_save):
current_price_info = schedule_to_save[current_hour] or {}
elif isinstance(schedule_to_save, dict):
current_price_info = schedule_to_save
else:
current_price_info = {}
status_id = self.generate_id()
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=total_piles,
free_piles=free_piles,
piles_detail_json=piles_detail,
parking_info=parking_info,
distance=distance,
current_price=current_price_info.get("price"),
pro_price=current_price_info.get("plus_price"),
market_price=current_price_info.get("market_price"),
valid_start_time=now,
)
await session.commit()
return True
async def save_station_profile_and_status(self, station_name, address, total_piles=None, free_piles=None, piles_detail=None, parking_info=None, distance=None):
if not station_name:
return False
now = datetime.now()
station_hash = self.get_hash(station_name, address)
async with await self.db.get_session() as session:
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=station_name,
address=address,
valid_start_time=now
)
status_id = self.generate_id()
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=total_piles,
free_piles=free_piles,
piles_detail_json=piles_detail,
parking_info=parking_info,
distance=distance,
current_price=None,
valid_start_time=now
)
await session.commit()
return True
async def _test_save_profile():
service = TeLaiDianService()
await service.init_db()
try:
ok = await service.save_station_profile_only(
"测试场站-AI调试",
"测试地址-杭州滨江",
"1.2km"
)
print("save_station_profile_only_return:", ok)
finally:
await service.close_db()
if __name__ == "__main__":
asyncio.run(_test_save_profile())