302 lines
11 KiB
Python
302 lines
11 KiB
Python
# coding=utf-8
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import sys
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
if project_root not in sys.path:
|
|
sys.path.append(project_root)
|
|
|
|
from Apps.AiTeJiYiChong.ReadImageKit import ReadImageKit
|
|
from DbKit.Db import Db
|
|
from Config.Config import DB_URL, PRICE_FLATTEN_TO_24H_GLOBAL
|
|
from Model.StationProfile import StationProfile
|
|
from Model.StationStatus import StationStatus
|
|
from Model.StationPriceSchedule import StationPriceSchedule
|
|
from Apps.AiTeJiYiChong.Config.Setting import PRICE_FLATTEN_TO_24H
|
|
from Apps.AiTeJiYiChong.Kit import setup_logger
|
|
|
|
logger = setup_logger("AiTeJiYiChongService")
|
|
|
|
class AiTeJiYiChongService:
|
|
def __init__(self):
|
|
self.db = Db(db_url=DB_URL)
|
|
self.station_profile_model = StationProfile()
|
|
self.station_status_model = StationStatus()
|
|
self.station_price_schedule_model = StationPriceSchedule()
|
|
self.operator = "艾特吉易充"
|
|
|
|
def generate_id(self):
|
|
return str(uuid.uuid4())
|
|
|
|
def get_hash(self, s: str) -> str:
|
|
return hashlib.md5(s.encode('utf-8')).hexdigest()
|
|
|
|
def _to_float(self, value) -> float:
|
|
"""
|
|
从字符串中提取浮点数,过滤掉非数字字符
|
|
"""
|
|
if value is None:
|
|
return 0.0
|
|
if isinstance(value, (int, float)):
|
|
return float(value)
|
|
|
|
import re
|
|
try:
|
|
# 提取第一个匹配的数字部分(包含小数点)
|
|
match = re.search(r"[-+]?\d*\.?\d+", str(value))
|
|
if match:
|
|
return float(match.group())
|
|
return 0.0
|
|
except:
|
|
return 0.0
|
|
|
|
async def init_db(self):
|
|
await self.db.init_db()
|
|
|
|
async def close_db(self):
|
|
await self.db.close()
|
|
|
|
async def process_price_detail_data(self, station_name, hourly_schedule) -> bool:
|
|
if not station_name or not hourly_schedule:
|
|
return False
|
|
|
|
station_hash = self.get_hash(station_name)
|
|
now = datetime.now()
|
|
|
|
schedule_to_save = hourly_schedule
|
|
|
|
use_flatten = (PRICE_FLATTEN_TO_24H_GLOBAL or PRICE_FLATTEN_TO_24H) and isinstance(hourly_schedule, list)
|
|
|
|
if use_flatten:
|
|
rows = ReadImageKit.expand_schedule_to_24h(hourly_schedule)
|
|
if rows:
|
|
schedule_to_save = rows
|
|
|
|
async with await self.db.get_session() as session:
|
|
schedule_id = self.generate_id()
|
|
await self.station_price_schedule_model.save(
|
|
session=session,
|
|
id=schedule_id,
|
|
station_hash=station_hash,
|
|
schedule_json=schedule_to_save,
|
|
valid_start_time=now,
|
|
)
|
|
await session.commit()
|
|
return True
|
|
|
|
async def process_price_detail(self, image_path, station_name) -> list:
|
|
"""
|
|
处理三级价格详情页截图
|
|
"""
|
|
prices = await ReadImageKit.get_price_detail_from_image(image_path)
|
|
if not prices:
|
|
return None
|
|
|
|
station_hash = self.get_hash(station_name)
|
|
now = datetime.now()
|
|
|
|
schedule_to_save = prices
|
|
use_flatten = PRICE_FLATTEN_TO_24H_GLOBAL or PRICE_FLATTEN_TO_24H
|
|
if use_flatten:
|
|
rows = ReadImageKit.expand_schedule_to_24h(prices)
|
|
if rows:
|
|
schedule_to_save = rows
|
|
|
|
async with await self.db.get_session() as session:
|
|
schedule_id = self.generate_id()
|
|
await self.station_price_schedule_model.save(
|
|
session=session,
|
|
id=schedule_id,
|
|
station_hash=station_hash,
|
|
schedule_json=schedule_to_save,
|
|
valid_start_time=now,
|
|
)
|
|
await session.commit()
|
|
|
|
logger.info(f"三级页面价格详情处理完成: {station_name}, 共 {len(schedule_to_save)} 条记录")
|
|
return schedule_to_save
|
|
|
|
async def record_station_status(self, station_info: dict):
|
|
"""
|
|
记录场站状态信息(从列表页 OCR+LLM 抓取到的电桩忙闲数据)
|
|
"""
|
|
if not station_info or not isinstance(station_info, dict):
|
|
return
|
|
|
|
name = station_info.get("station_name") or station_info.get("name")
|
|
if not name:
|
|
return
|
|
|
|
station_hash = self.get_hash(name)
|
|
now = datetime.now()
|
|
|
|
async with await self.db.get_session() as session:
|
|
profile_id = self.generate_id()
|
|
await self.station_profile_model.save(
|
|
session=session,
|
|
id=profile_id,
|
|
station_hash=station_hash,
|
|
operator=self.operator,
|
|
station_name=name,
|
|
valid_start_time=now
|
|
)
|
|
|
|
status_id = self.generate_id()
|
|
|
|
piles_data = station_info.get("piles")
|
|
total, free = 0, 0
|
|
standardized_piles = []
|
|
if isinstance(piles_data, list):
|
|
for idx, p in enumerate(piles_data):
|
|
try:
|
|
t = int(p.get("total", 0))
|
|
f = int(p.get("free", 0))
|
|
total += t
|
|
free += f
|
|
standardized_piles.append({
|
|
"pile_no": f"G{idx+1}",
|
|
"type": p.get("type", "未知"),
|
|
"power": "",
|
|
"status_text": f"空闲{f}/总{t}",
|
|
"remark": "列表页OCR"
|
|
})
|
|
except Exception:
|
|
continue
|
|
|
|
await self.station_status_model.save(
|
|
session=session,
|
|
id=status_id,
|
|
station_hash=station_hash,
|
|
total_piles=total,
|
|
free_piles=free,
|
|
piles_detail_json=standardized_piles if standardized_piles else None,
|
|
current_price=self._to_float(station_info.get("price")),
|
|
parking_info=station_info.get("parking"),
|
|
distance=station_info.get("distance") or station_info.get("distance_text"),
|
|
valid_start_time=now
|
|
)
|
|
await session.commit()
|
|
|
|
async def process_station_detail(self, image_path, station_name=None) -> dict:
|
|
"""
|
|
处理场站详情页截图
|
|
"""
|
|
detail = await ReadImageKit.get_station_detail_from_image(image_path)
|
|
if not detail:
|
|
return None
|
|
|
|
name = station_name or detail.get("station_name")
|
|
if not name:
|
|
return None
|
|
|
|
station_hash = self.get_hash(name)
|
|
now = datetime.now()
|
|
|
|
async with await self.db.get_session() as session:
|
|
# 1. 更新 Profile 中的地址信息
|
|
profile_id = self.generate_id()
|
|
await self.station_profile_model.save(
|
|
session=session,
|
|
id=profile_id,
|
|
station_hash=station_hash,
|
|
operator=self.operator,
|
|
station_name=name,
|
|
address=detail.get("address"),
|
|
valid_start_time=now
|
|
)
|
|
|
|
await session.commit()
|
|
|
|
logger.info(f"场站详情处理完成: {name}")
|
|
return detail
|
|
|
|
async def process_station_list_vl(self, image_path, json_metadata, device_info=None, max_count=None) -> list:
|
|
"""
|
|
基于 VL 模式处理场站列表 (整页识别)
|
|
"""
|
|
# 优先使用带绿框的 _vl.jpg 图片进行识别
|
|
vl_img_path = image_path.replace(".jpg", "_vl.jpg")
|
|
img_to_process = vl_img_path if os.path.exists(vl_img_path) else image_path
|
|
|
|
station_list = await ReadImageKit.parse_vl_image(img_to_process, json_metadata, device_info=device_info)
|
|
if not station_list:
|
|
return []
|
|
|
|
processed_stations = []
|
|
async with await self.db.get_session() as session:
|
|
for station in station_list:
|
|
# 如果指定了最大数量且已达到,则停止保存
|
|
if max_count is not None and len(processed_stations) >= max_count:
|
|
# 填充空位以保持索引对齐,但不保存到数据库
|
|
processed_stations.append(None)
|
|
continue
|
|
|
|
name = station.get("station_name")
|
|
if not name:
|
|
continue
|
|
|
|
station_hash = self.get_hash(name)
|
|
now = datetime.now()
|
|
station["station_hash"] = station_hash
|
|
|
|
# 1. 保存 Profile (使用 db_retry 装饰器的方法,如果 Db 类已更新)
|
|
profile_id = self.generate_id()
|
|
await self.station_profile_model.save(
|
|
session=session,
|
|
id=profile_id,
|
|
station_hash=station_hash,
|
|
operator=self.operator,
|
|
station_name=name,
|
|
valid_start_time=now
|
|
)
|
|
station["profile_id"] = profile_id
|
|
station["valid_start_time"] = now.isoformat()
|
|
|
|
# 2. 保存 Status (解析价格和电桩)
|
|
status_id = self.generate_id()
|
|
|
|
# 处理 piles 字段
|
|
piles_data = station.get("piles")
|
|
total, free = 0, 0
|
|
standardized_piles = []
|
|
if isinstance(piles_data, list):
|
|
for idx, p in enumerate(piles_data):
|
|
try:
|
|
t = int(p.get("total", 0))
|
|
f = int(p.get("free", 0))
|
|
total += t
|
|
free += f
|
|
standardized_piles.append({
|
|
"pile_no": f"G{idx+1}",
|
|
"type": p.get("type", "未知"),
|
|
"power": "",
|
|
"status_text": f"空闲{f}/总{t}",
|
|
"remark": "列表页VL"
|
|
})
|
|
except:
|
|
continue
|
|
|
|
await self.station_status_model.save(
|
|
session=session,
|
|
id=status_id,
|
|
station_hash=station_hash,
|
|
total_piles=total,
|
|
free_piles=free,
|
|
piles_detail_json=standardized_piles if standardized_piles else None,
|
|
current_price=self._to_float(station.get("price")),
|
|
parking_info=station.get("parking", ""),
|
|
distance=station.get("distance", ""),
|
|
valid_start_time=now
|
|
)
|
|
station["status_id"] = status_id
|
|
|
|
processed_stations.append(station)
|
|
|
|
await session.commit()
|
|
|
|
return processed_stations
|