Files
aiData/Model/DegreeModel.py

96 lines
3.2 KiB
Python
Raw Permalink Normal View History

2026-01-20 08:09:13 +08:00
from DbKit.Db import Db
class DegreeModel:
def __init__(self, db: Db = None):
self.db = db or Db()
async def get_degree(self, station_names=None, start_time=None, end_time=None):
await self.db.init_db()
params = {}
if station_names:
params["station_names"] = station_names
if start_time:
params["start_time"] = start_time
if end_time:
params["end_time"] = end_time
# 打印 SQL 模板内容
sql_identifier = "Degree.getDegree"
sql_template = await self.db.sql_loader.get_sql(sql_identifier)
print(f"\n--- SQL Template ({sql_identifier}) ---")
print(sql_template)
# 打印填充参数后的 SQL
processed_sql = await self.db.get_sql(sql_identifier, params)
print(f"\n--- Processed SQL with Params ---")
print(processed_sql)
print("-" * 40)
return await self.db.find(sql_identifier, params)
async def getStationList(self):
await self.db.init_db()
sql_identifier = "Degree.getStationList"
return await self.db.find(sql_identifier)
async def get_trend_change(self, end_time=None):
"""查询近3个月的充电量变化"""
await self.db.init_db()
from datetime import datetime
from dateutil.relativedelta import relativedelta
if end_time:
end_dt = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
else:
end_dt = datetime.now()
# 计算过去三个月的月份字符串 (M, M-1, M-2)
m3_dt = end_dt
m2_dt = end_dt - relativedelta(months=1)
m1_dt = end_dt - relativedelta(months=2)
month3 = m3_dt.strftime('%Y-%m')
month2 = m2_dt.strftime('%Y-%m')
month1 = m1_dt.strftime('%Y-%m')
# 统计范围从 m1 月的第一天开始
start_time = m1_dt.replace(day=1, hour=0, minute=0, second=0).strftime('%Y-%m-%d %H:%M:%S')
end_time_str = end_dt.strftime('%Y-%m-%d %H:%M:%S')
params = {
"start_time": start_time,
"end_time": end_time_str,
"month1": month1,
"month2": month2,
"month3": month3
}
sql_identifier = "Degree.getTrendChange"
return await self.db.find(sql_identifier, params)
async def get_company_top10(self, start_time=None, end_time=None):
"""获取指定时间内按企业充电量由大到小TOP 10"""
await self.db.init_db()
params = {}
if start_time:
params["start_time"] = start_time
if end_time:
params["end_time"] = end_time
sql_identifier = "Degree.getCompanyTop10"
# 打印调试信息
sql_template = await self.db.sql_loader.get_sql(sql_identifier)
print(f"\n--- SQL Template ({sql_identifier}) ---")
print(sql_template)
processed_sql = await self.db.get_sql(sql_identifier, params)
print(f"\n--- Processed SQL with Params ---")
print(processed_sql)
print("-" * 40)
return await self.db.find(sql_identifier, params)
async def close(self):
await self.db.shutdown()