Files
aiData/Apps/AiTeJiYiChong/Service.py
HuangHai 219cd5c220 'commit'
2026-01-18 20:11:51 +08:00

302 lines
11 KiB
Python

# coding=utf-8
import hashlib
import logging
import os
import sys
import uuid
from datetime import datetime
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from Apps.AiTeJiYiChong.ReadImageKit import ReadImageKit
from DbKit.Db import Db
from Config.Config import DB_URL, PRICE_FLATTEN_TO_24H_GLOBAL
from Model.StationProfile import StationProfile
from Model.StationStatus import StationStatus
from Model.StationPriceSchedule import StationPriceSchedule
from Apps.AiTeJiYiChong.Config.Setting import PRICE_FLATTEN_TO_24H
from Apps.AiTeJiYiChong.Kit import setup_logger
logger = setup_logger("AiTeJiYiChongService")
class AiTeJiYiChongService:
def __init__(self):
self.db = Db(db_url=DB_URL)
self.station_profile_model = StationProfile()
self.station_status_model = StationStatus()
self.station_price_schedule_model = StationPriceSchedule()
self.operator = "艾特吉易充"
def generate_id(self):
return str(uuid.uuid4())
def get_hash(self, s: str) -> str:
return hashlib.md5(s.encode('utf-8')).hexdigest()
def _to_float(self, value) -> float:
"""
从字符串中提取浮点数,过滤掉非数字字符
"""
if value is None:
return 0.0
if isinstance(value, (int, float)):
return float(value)
import re
try:
# 提取第一个匹配的数字部分(包含小数点)
match = re.search(r"[-+]?\d*\.?\d+", str(value))
if match:
return float(match.group())
return 0.0
except:
return 0.0
async def init_db(self):
await self.db.init_db()
async def close_db(self):
await self.db.close()
async def process_price_detail_data(self, station_name, hourly_schedule) -> bool:
if not station_name or not hourly_schedule:
return False
station_hash = self.get_hash(station_name)
now = datetime.now()
schedule_to_save = hourly_schedule
use_flatten = (PRICE_FLATTEN_TO_24H_GLOBAL or PRICE_FLATTEN_TO_24H) and isinstance(hourly_schedule, list)
if use_flatten:
rows = ReadImageKit.expand_schedule_to_24h(hourly_schedule)
if rows:
schedule_to_save = rows
async with await self.db.get_session() as session:
schedule_id = self.generate_id()
await self.station_price_schedule_model.save(
session=session,
id=schedule_id,
station_hash=station_hash,
schedule_json=schedule_to_save,
valid_start_time=now,
)
await session.commit()
return True
async def process_price_detail(self, image_path, station_name) -> list:
"""
处理三级价格详情页截图
"""
prices = await ReadImageKit.get_price_detail_from_image(image_path)
if not prices:
return None
station_hash = self.get_hash(station_name)
now = datetime.now()
schedule_to_save = prices
use_flatten = PRICE_FLATTEN_TO_24H_GLOBAL or PRICE_FLATTEN_TO_24H
if use_flatten:
rows = ReadImageKit.expand_schedule_to_24h(prices)
if rows:
schedule_to_save = rows
async with await self.db.get_session() as session:
schedule_id = self.generate_id()
await self.station_price_schedule_model.save(
session=session,
id=schedule_id,
station_hash=station_hash,
schedule_json=schedule_to_save,
valid_start_time=now,
)
await session.commit()
logger.info(f"三级页面价格详情处理完成: {station_name}, 共 {len(schedule_to_save)} 条记录")
return schedule_to_save
async def record_station_status(self, station_info: dict):
"""
记录场站状态信息(从列表页 OCR+LLM 抓取到的电桩忙闲数据)
"""
if not station_info or not isinstance(station_info, dict):
return
name = station_info.get("station_name") or station_info.get("name")
if not name:
return
station_hash = self.get_hash(name)
now = datetime.now()
async with await self.db.get_session() as session:
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=name,
valid_start_time=now
)
status_id = self.generate_id()
piles_data = station_info.get("piles")
total, free = 0, 0
standardized_piles = []
if isinstance(piles_data, list):
for idx, p in enumerate(piles_data):
try:
t = int(p.get("total", 0))
f = int(p.get("free", 0))
total += t
free += f
standardized_piles.append({
"pile_no": f"G{idx+1}",
"type": p.get("type", "未知"),
"power": "",
"status_text": f"空闲{f}/总{t}",
"remark": "列表页OCR"
})
except Exception:
continue
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=total,
free_piles=free,
piles_detail_json=standardized_piles if standardized_piles else None,
current_price=self._to_float(station_info.get("price")),
parking_info=station_info.get("parking"),
distance=station_info.get("distance") or station_info.get("distance_text"),
valid_start_time=now
)
await session.commit()
async def process_station_detail(self, image_path, station_name=None) -> dict:
"""
处理场站详情页截图
"""
detail = await ReadImageKit.get_station_detail_from_image(image_path)
if not detail:
return None
name = station_name or detail.get("station_name")
if not name:
return None
station_hash = self.get_hash(name)
now = datetime.now()
async with await self.db.get_session() as session:
# 1. 更新 Profile 中的地址信息
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=name,
address=detail.get("address"),
valid_start_time=now
)
await session.commit()
logger.info(f"场站详情处理完成: {name}")
return detail
async def process_station_list_vl(self, image_path, json_metadata, device_info=None, max_count=None) -> list:
"""
基于 VL 模式处理场站列表 (整页识别)
"""
# 优先使用带绿框的 _vl.jpg 图片进行识别
vl_img_path = image_path.replace(".jpg", "_vl.jpg")
img_to_process = vl_img_path if os.path.exists(vl_img_path) else image_path
station_list = await ReadImageKit.parse_vl_image(img_to_process, json_metadata, device_info=device_info)
if not station_list:
return []
processed_stations = []
async with await self.db.get_session() as session:
for station in station_list:
# 如果指定了最大数量且已达到,则停止保存
if max_count is not None and len(processed_stations) >= max_count:
# 填充空位以保持索引对齐,但不保存到数据库
processed_stations.append(None)
continue
name = station.get("station_name")
if not name:
continue
station_hash = self.get_hash(name)
now = datetime.now()
station["station_hash"] = station_hash
# 1. 保存 Profile (使用 db_retry 装饰器的方法,如果 Db 类已更新)
profile_id = self.generate_id()
await self.station_profile_model.save(
session=session,
id=profile_id,
station_hash=station_hash,
operator=self.operator,
station_name=name,
valid_start_time=now
)
station["profile_id"] = profile_id
station["valid_start_time"] = now.isoformat()
# 2. 保存 Status (解析价格和电桩)
status_id = self.generate_id()
# 处理 piles 字段
piles_data = station.get("piles")
total, free = 0, 0
standardized_piles = []
if isinstance(piles_data, list):
for idx, p in enumerate(piles_data):
try:
t = int(p.get("total", 0))
f = int(p.get("free", 0))
total += t
free += f
standardized_piles.append({
"pile_no": f"G{idx+1}",
"type": p.get("type", "未知"),
"power": "",
"status_text": f"空闲{f}/总{t}",
"remark": "列表页VL"
})
except:
continue
await self.station_status_model.save(
session=session,
id=status_id,
station_hash=station_hash,
total_piles=total,
free_piles=free,
piles_detail_json=standardized_piles if standardized_piles else None,
current_price=self._to_float(station.get("price")),
parking_info=station.get("parking", ""),
distance=station.get("distance", ""),
valid_start_time=now
)
station["status_id"] = status_id
processed_stations.append(station)
await session.commit()
return processed_stations