from DbKit.Db import Db class DegreeModel: def __init__(self, db: Db = None): self.db = db or Db() async def get_degree(self, station_names=None, start_time=None, end_time=None): await self.db.init_db() params = {} if station_names: params["station_names"] = station_names if start_time: params["start_time"] = start_time if end_time: params["end_time"] = end_time # 打印 SQL 模板内容 sql_identifier = "Degree.getDegree" sql_template = await self.db.sql_loader.get_sql(sql_identifier) print(f"\n--- SQL Template ({sql_identifier}) ---") print(sql_template) # 打印填充参数后的 SQL processed_sql = await self.db.get_sql(sql_identifier, params) print(f"\n--- Processed SQL with Params ---") print(processed_sql) print("-" * 40) return await self.db.find(sql_identifier, params) async def getStationList(self): await self.db.init_db() sql_identifier = "Degree.getStationList" return await self.db.find(sql_identifier) async def get_trend_change(self, end_time=None): """查询近3个月的充电量变化""" await self.db.init_db() from datetime import datetime from dateutil.relativedelta import relativedelta if end_time: end_dt = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') else: end_dt = datetime.now() # 计算过去三个月的月份字符串 (M, M-1, M-2) m3_dt = end_dt m2_dt = end_dt - relativedelta(months=1) m1_dt = end_dt - relativedelta(months=2) month3 = m3_dt.strftime('%Y-%m') month2 = m2_dt.strftime('%Y-%m') month1 = m1_dt.strftime('%Y-%m') # 统计范围从 m1 月的第一天开始 start_time = m1_dt.replace(day=1, hour=0, minute=0, second=0).strftime('%Y-%m-%d %H:%M:%S') end_time_str = end_dt.strftime('%Y-%m-%d %H:%M:%S') params = { "start_time": start_time, "end_time": end_time_str, "month1": month1, "month2": month2, "month3": month3 } sql_identifier = "Degree.getTrendChange" return await self.db.find(sql_identifier, params) async def get_company_top10(self, start_time=None, end_time=None): """获取指定时间内,按企业充电量由大到小TOP 10""" await self.db.init_db() params = {} if start_time: params["start_time"] = start_time if end_time: params["end_time"] = end_time sql_identifier = "Degree.getCompanyTop10" # 打印调试信息 sql_template = await self.db.sql_loader.get_sql(sql_identifier) print(f"\n--- SQL Template ({sql_identifier}) ---") print(sql_template) processed_sql = await self.db.get_sql(sql_identifier, params) print(f"\n--- Processed SQL with Params ---") print(processed_sql) print("-" * 40) return await self.db.find(sql_identifier, params) async def close(self): await self.db.shutdown()