Files
aiData/YLT_Analytics_Api.py
HuangHai a8ad244ecf 'commit'
2026-01-18 15:43:02 +08:00

223 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import math
import asyncio
from typing import List, Optional
import os
import sys
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.append(project_root)
from DbKit.Db import Db
from Config.Config import DB_URL
from Util.LlmUtil import get_llm_response
class StationBase(BaseModel):
station_hash: str
operator: str
station_name: str
address: Optional[str]
coord_x: Optional[float]
coord_y: Optional[float]
current_price: Optional[float]
class CompetitorStation(BaseModel):
station_hash: str
operator: str
station_name: str
distance_km: float
current_price: Optional[float]
class GeoCompetitionResponse(BaseModel):
base_station: StationBase
competitors: List[CompetitorStation]
ylt_price: Optional[float]
min_competitor_price: Optional[float]
max_competitor_price: Optional[float]
cheaper_count: int
same_count: int
more_expensive_count: int
class GeoCompetitionSummary(BaseModel):
summary: str
app = FastAPI(title="YLT Analytics API")
db = Db(db_url=DB_URL)
async def init_db():
await db.init_db()
async def close_db():
await db.close()
@app.on_event("startup")
async def on_startup():
await init_db()
@app.on_event("shutdown")
async def on_shutdown():
await close_db()
def haversine_km(lon1: float, lat1: float, lon2: float, lat2: float) -> float:
r = 6371.0
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
d_phi = math.radians(lat2 - lat1)
d_lambda = math.radians(lon2 - lon1)
a = math.sin(d_phi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(d_lambda / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return r * c
async def fetch_current_stations() -> List[dict]:
sql = """
SELECT
p.station_hash,
p.operator,
p.station_name,
p.address,
p.coord_x,
p.coord_y,
s.current_price
FROM t_station_profile_scd p
LEFT JOIN t_station_status_scd s
ON p.station_hash = s.station_hash AND s.is_current = 1
WHERE p.is_current = 1
"""
return await db.find(sql, {})
async def build_geo_competition(station_hash: str, radius_km: float = 3.0) -> GeoCompetitionResponse:
rows = await fetch_current_stations()
if not rows:
raise HTTPException(status_code=404, detail="no station data")
base_row = None
for r in rows:
if r.get("station_hash") == station_hash and r.get("operator") == "驿来特":
base_row = r
break
if base_row is None:
raise HTTPException(status_code=404, detail="base station not found for 驿来特")
base_lon = base_row.get("coord_x")
base_lat = base_row.get("coord_y")
if base_lon is None or base_lat is None:
raise HTTPException(status_code=400, detail="base station has no coordinates")
competitors: List[CompetitorStation] = []
ylt_price = base_row.get("current_price")
cheaper = 0
same = 0
more_expensive = 0
min_price = None
max_price = None
for r in rows:
if r.get("operator") == "驿来特":
continue
lon = r.get("coord_x")
lat = r.get("coord_y")
if lon is None or lat is None:
continue
dist = haversine_km(base_lon, base_lat, lon, lat)
if dist > radius_km:
continue
price = r.get("current_price")
competitors.append(
CompetitorStation(
station_hash=r.get("station_hash"),
operator=r.get("operator"),
station_name=r.get("station_name"),
distance_km=round(dist, 3),
current_price=price,
)
)
if price is not None:
if min_price is None or price < min_price:
min_price = price
if max_price is None or price > max_price:
max_price = price
if ylt_price is not None:
if price < ylt_price:
cheaper += 1
elif price > ylt_price:
more_expensive += 1
else:
same += 1
base_station = StationBase(
station_hash=base_row.get("station_hash"),
operator=base_row.get("operator"),
station_name=base_row.get("station_name"),
address=base_row.get("address"),
coord_x=base_lon,
coord_y=base_lat,
current_price=ylt_price,
)
return GeoCompetitionResponse(
base_station=base_station,
competitors=competitors,
ylt_price=ylt_price,
min_competitor_price=min_price,
max_competitor_price=max_price,
cheaper_count=cheaper,
same_count=same,
more_expensive_count=more_expensive,
)
@app.get("/health")
async def health():
return {"status": "ok"}
@app.get("/api/ylt/geo/competitors/{station_hash}", response_model=GeoCompetitionResponse)
async def get_geo_competitors(station_hash: str):
return await build_geo_competition(station_hash)
@app.get("/api/ylt/geo/competitors/{station_hash}/summary", response_model=GeoCompetitionSummary)
async def get_geo_competitors_summary(station_hash: str):
data = await build_geo_competition(station_hash)
base = data.base_station
total_comp = len(data.competitors)
cheaper = data.cheaper_count
same = data.same_count
more_expensive = data.more_expensive_count
ylt_price = data.ylt_price
min_price = data.min_competitor_price
max_price = data.max_competitor_price
summary_input = {
"station_name": base.station_name,
"operator": base.operator,
"ylt_price": ylt_price,
"competitor_count": total_comp,
"cheaper_count": cheaper,
"same_count": same,
"more_expensive_count": more_expensive,
"min_competitor_price": min_price,
"max_competitor_price": max_price,
}
text = (
"请作为驿来特价格策略分析顾问用简明中文解释当前场站在3公里范围内的价格竞争情况"
"给出可操作的价格调整或产品策略建议控制在300字以内。以下是结构化数据\n"
f"{summary_input}"
)
chunks = []
async for chunk in get_llm_response(text, stream=False, system_prompt="你是驿来特电价和选址策略顾问。"):
chunks.append(chunk)
summary_text = "".join(chunks)
return GeoCompetitionSummary(summary=summary_text)