Files
aiData/Tools/T6_Export.py

545 lines
22 KiB
Python
Raw Permalink Normal View History

2026-01-15 21:52:14 +08:00
import asyncio
import json
import logging
import os
import re
import sys
import zipfile
from datetime import datetime
from decimal import Decimal
from typing import Any, Dict, List, Optional, Set
2026-01-12 07:49:18 +08:00
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
2026-01-15 10:06:08 +08:00
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
2026-01-12 07:49:18 +08:00
if project_root not in sys.path:
sys.path.append(project_root)
from Util import Win32Patch
Win32Patch.patch()
from Config.Config import DB_URL
2026-01-21 08:41:47 +08:00
from Model.YltAnalyticsModel import YltAnalyticsModel
2026-01-12 07:49:18 +08:00
try:
from openpyxl import Workbook
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from openpyxl.utils import get_column_letter
except Exception as e:
raise RuntimeError("缺少 openpyxl 依赖请先安装pip install openpyxl") from e
# --- 样式定义 ---
# 颜色
HEADER_FILL_COLOR = "4472C4" # 专业蓝 (表头背景)
HEADER_FONT_COLOR = "FFFFFF" # 白色 (表头文字)
LABEL_FILL_COLOR = "D9E1F2" # 浅蓝 (Key标签背景更柔和)
SECTION_FILL_COLOR = "DDEBF7" # 极浅蓝 (小节标题背景)
BORDER_COLOR = "BFBFBF" # 边框颜色
OUTER_BORDER_COLOR = "4472C4" # 外边框颜色
# 字体
FONT_NAME = "微软雅黑"
TITLE_FONT = Font(name=FONT_NAME, size=20, bold=True, color="4472C4") # 主标题加大
SUBTITLE_FONT = Font(name=FONT_NAME, size=12, bold=True, color="203764") # 小节标题加深色
HEADER_FONT = Font(name=FONT_NAME, size=11, bold=True, color=HEADER_FONT_COLOR)
LABEL_FONT = Font(name=FONT_NAME, size=10, bold=True, color="333333") # 标签文字深灰
DATA_FONT = Font(name=FONT_NAME, size=10)
# 边框
THIN_SIDE = Side(style="thin", color=BORDER_COLOR)
MEDIUM_SIDE = Side(style="medium", color=OUTER_BORDER_COLOR)
BORDER_ALL_THIN = Border(left=THIN_SIDE, right=THIN_SIDE, top=THIN_SIDE, bottom=THIN_SIDE)
# 填充
HEADER_FILL = PatternFill("solid", fgColor=HEADER_FILL_COLOR)
LABEL_FILL = PatternFill("solid", fgColor=LABEL_FILL_COLOR)
SECTION_FILL = PatternFill("solid", fgColor=SECTION_FILL_COLOR)
INVALID_SHEET_CHARS_RE = re.compile(r"[:\\/?*\[\]]")
def _safe_excel_sheet_title(raw_title: str, existing_titles: Set[str]) -> str:
title = (raw_title or "").strip()
if not title:
title = "未命名场站"
title = INVALID_SHEET_CHARS_RE.sub("_", title)
title = title.replace("\n", " ").replace("\r", " ").strip()
if len(title) > 31:
title = title[:31].rstrip()
if not title:
title = "未命名场站"
if title not in existing_titles:
existing_titles.add(title)
return title
base = title[:28].rstrip() if len(title) > 28 else title
idx = 2
while True:
candidate = f"{base}_{idx}"
if len(candidate) > 31:
candidate = candidate[:31]
if candidate not in existing_titles:
existing_titles.add(candidate)
return candidate
idx += 1
def _to_float(value):
if value is None:
return None
if isinstance(value, Decimal):
return float(value)
return value
def _parse_schedule(schedule_json: Any) -> List[Dict[str, Any]]:
if not schedule_json:
return []
if isinstance(schedule_json, (dict, list)):
return schedule_json if isinstance(schedule_json, list) else [schedule_json]
if isinstance(schedule_json, (bytes, bytearray)):
try:
schedule_json = schedule_json.decode("utf-8", errors="ignore")
except Exception:
schedule_json = str(schedule_json)
if not isinstance(schedule_json, str):
return []
try:
data = json.loads(schedule_json)
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
return []
except Exception:
return []
def _apply_table_header_style(ws, row_idx: int, col_start: int, col_end: int):
alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
# Header row with top/bottom medium border
border = Border(left=THIN_SIDE, right=THIN_SIDE, top=MEDIUM_SIDE, bottom=MEDIUM_SIDE)
for col in range(col_start, col_end + 1):
cell = ws.cell(row=row_idx, column=col)
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = alignment
cell.border = border
ws.row_dimensions[row_idx].height = 28 # Increased header height
def _apply_table_cell_style(ws, row_idx: int, col_start: int, col_end: int, center_cols: List[int] = None):
for col in range(col_start, col_end + 1):
cell = ws.cell(row=row_idx, column=col)
cell.font = DATA_FONT
is_center = center_cols and col in center_cols
cell.alignment = Alignment(horizontal="center" if is_center else "left", vertical="center", wrap_text=True)
cell.border = BORDER_ALL_THIN
ws.row_dimensions[row_idx].height = 22 # Comfortable row height
def _style_info_key_value(ws, row, col_key, col_val, merge_val_cols=0):
# Key Style
c_key = ws.cell(row=row, column=col_key)
c_key.font = LABEL_FONT
c_key.alignment = Alignment(horizontal="right", vertical="center")
c_key.fill = LABEL_FILL
c_key.border = BORDER_ALL_THIN
# Value Style
c_val = ws.cell(row=row, column=col_val)
c_val.font = DATA_FONT
c_val.alignment = Alignment(horizontal="left", vertical="center", wrap_text=True)
c_val.border = BORDER_ALL_THIN
if merge_val_cols > 0:
ws.merge_cells(start_row=row, start_column=col_val, end_row=row, end_column=col_val + merge_val_cols)
# Apply border to merged cells (required for openpyxl)
for c_offset in range(1, merge_val_cols + 1):
c_merged = ws.cell(row=row, column=col_val + c_offset)
c_merged.border = BORDER_ALL_THIN
def _set_column_widths(ws, widths: dict[int, float]):
for col_idx, width in widths.items():
ws.column_dimensions[get_column_letter(col_idx)].width = width
2026-01-19 13:56:47 +08:00
class DorisExcelExporter:
2026-01-21 08:41:47 +08:00
def __init__(self, db_url: str = None):
self.model = YltAnalyticsModel()
async def init(self):
pass # Model handles initialization if needed
async def close(self):
pass # Model handles closing if needed
2026-01-19 13:56:47 +08:00
async def fetch_current_station_rows(self, operator: str) -> List[Dict[str, Any]]:
2026-01-21 08:41:47 +08:00
return await self.model.fetch_current_station_rows(operator)
2026-01-19 13:56:47 +08:00
def extract_hourly_prices_from_schedule(schedule_json: Any) -> List[Optional[float]]:
items = _parse_schedule(schedule_json)
series: List[Optional[float]] = [None] * 24
if not items:
return series
for idx, item in enumerate(items):
if not isinstance(item, dict):
continue
price = _to_float(item.get("price_kwh") if "price_kwh" in item else item.get("price"))
if price is None:
continue
start = item.get("start") or item.get("begin") or item.get("start_time")
h = None
if isinstance(start, str) and ":" in start:
try:
h = int(start.split(":", 1)[0])
except Exception:
h = None
if h is None:
h = idx
if 0 <= h < 24:
series[h] = float(price)
return series
2026-01-12 07:49:18 +08:00
def _build_station_sheet(ws, row: Dict[str, Any]):
station_name = row.get("station_name") or ""
address = row.get("address") or ""
coord_x = row.get("coord_x")
coord_y = row.get("coord_y")
total_guns = row.get("total_guns")
free_guns = row.get("free_guns")
current_price = _to_float(row.get("current_price"))
pro_price = _to_float(row.get("pro_price"))
parking_info = row.get("parking_info")
distance = row.get("distance")
status_update_time = row.get("status_update_time")
schedule_update_time = row.get("schedule_update_time")
schedule_items = _parse_schedule(row.get("schedule_json"))
piles_detail = _parse_schedule(row.get("piles_detail_json")) # Reuse the same parser
for i in range(1, len(schedule_items)):
curr = schedule_items[i]
prev = schedule_items[i - 1]
if not isinstance(curr, dict) or not isinstance(prev, dict):
continue
for k in ("price_kwh", "electric_fee_kwh", "service_fee_kwh", "price", "electric_fee", "service_fee"):
if curr.get(k) is None and prev.get(k) is not None:
curr[k] = prev[k]
ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=6)
ws.row_dimensions[1].height = 40 # Increased title height
title_cell = ws.cell(row=1, column=1, value=station_name or "场站")
title_cell.font = TITLE_FONT
title_cell.alignment = Alignment(horizontal="center", vertical="center")
# Title border (Optional: bottom medium)
for c in range(1, 7):
ws.cell(row=1, column=c).border = Border(bottom=MEDIUM_SIDE)
ws.sheet_properties.tabColor = "4472C4" # Tab color matches theme
# Layout using _style_info_key_value
# Row 3: 位置 | Address
_style_info_key_value(ws, 3, 1, 2, merge_val_cols=4)
ws.cell(row=3, column=1, value="位置")
ws.cell(row=3, column=2, value=address)
# Row 4: 经纬度 | Coords | 距离 | Distance
coord_text = f"{coord_x}, {coord_y}" if coord_x is not None else ""
_style_info_key_value(ws, 4, 1, 2, merge_val_cols=1) # 经纬度 occupies col 2-3
ws.cell(row=4, column=1, value="经纬度")
ws.cell(row=4, column=2, value=coord_text)
_style_info_key_value(ws, 4, 4, 5, merge_val_cols=1) # 距离 occupies col 5-6
ws.cell(row=4, column=4, value="距离")
ws.cell(row=4, column=5, value=distance)
# Row 5: 空闲 | val | 总枪 | val | 停车 | val (Parking info needs more space, let's adjust)
# Original: Free(1,2), Total(3,4), Parking(5,6) - Parking info is long usually.
# New Layout: Free(1,2), Total(3,4), Parking(5,6) is fine if col 6 is wide.
_style_info_key_value(ws, 5, 1, 2)
ws.cell(row=5, column=1, value="空闲枪数")
ws.cell(row=5, column=2, value=free_guns)
_style_info_key_value(ws, 5, 3, 4)
ws.cell(row=5, column=3, value="总枪数")
ws.cell(row=5, column=4, value=total_guns)
_style_info_key_value(ws, 5, 5, 6)
ws.cell(row=5, column=5, value="停车信息")
ws.cell(row=5, column=6, value=parking_info)
# Row 6: Price | val | Pro Price | val | Update Time | val
_style_info_key_value(ws, 6, 1, 2)
ws.cell(row=6, column=1, value="当前价格")
ws.cell(row=6, column=2, value=current_price)
_style_info_key_value(ws, 6, 3, 4)
ws.cell(row=6, column=3, value="PRO会员价")
ws.cell(row=6, column=4, value=pro_price)
_style_info_key_value(ws, 6, 5, 6)
ws.cell(row=6, column=5, value="更新时间")
ws.cell(row=6, column=6, value=status_update_time)
# Adjust row heights for info section
for r in range(3, 7):
ws.row_dimensions[r].height = 24
# 价格表部分
schedule_start_row = 8
# Section Title with style
sect_cell = ws.cell(row=schedule_start_row, column=1, value="价格时段表")
sect_cell.font = SUBTITLE_FONT
sect_cell.fill = SECTION_FILL
sect_cell.border = Border(bottom=THIN_SIDE)
ws.merge_cells(start_row=schedule_start_row, start_column=1, end_row=schedule_start_row, end_column=6)
# Apply fill to merged cells
for c in range(2, 7):
ws.cell(row=schedule_start_row, column=c).fill = SECTION_FILL
ws.cell(row=schedule_start_row, column=c).border = Border(bottom=THIN_SIDE)
header_row = schedule_start_row + 1
headers = ["开始", "结束", "电价(元/kWh)", "电费(元/kWh)", "服务费(元/kWh)", "备注"]
for i, h in enumerate(headers, start=1):
ws.cell(row=header_row, column=i, value=h)
_apply_table_header_style(ws, header_row, 1, len(headers))
row_idx = header_row + 1
2026-01-15 10:14:44 +08:00
# 预处理 schedule_items如果只有 24 个且没有 start/end补充时间
processed_items = []
if len(schedule_items) == 24:
has_time = any(item.get("start") or item.get("begin") or item.get("start_time") for item in schedule_items if isinstance(item, dict))
if not has_time:
for i, item in enumerate(schedule_items):
if not isinstance(item, dict): continue
new_item = item.copy()
new_item["start"] = f"{i:02d}:00"
new_item["end"] = f"{i+1:02d}:00"
processed_items.append(new_item)
else:
processed_items = schedule_items
else:
processed_items = schedule_items
# 进一步优化:合并连续相同价格的时段
merged_items = []
if processed_items:
current_group = None
for item in processed_items:
if not isinstance(item, dict): continue
# 提取关键价格信息用于比较
price = _to_float(item.get("price_kwh") if "price_kwh" in item else item.get("price"))
elec = _to_float(item.get("electric_fee_kwh") if "electric_fee_kwh" in item else item.get("electric_fee") if "electric_fee" in item else item.get("elec_price"))
serv = _to_float(item.get("service_fee_kwh") if "service_fee_kwh" in item else item.get("service_fee") if "service_fee" in item else item.get("service_price"))
remark = item.get("type") or item.get("tag") or item.get("period") or ""
start = item.get("start") or item.get("begin") or item.get("start_time") or ""
end = item.get("end") or item.get("finish") or item.get("end_time") or ""
if current_group is None:
current_group = {
"start": start,
"end": end,
"price": price,
"elec": elec,
"serv": serv,
"remark": remark
}
else:
# 如果价格、电费、服务费、备注都相同,则合并
if (current_group["price"] == price and
current_group["elec"] == elec and
current_group["serv"] == serv and
current_group["remark"] == remark):
current_group["end"] = end
else:
merged_items.append(current_group)
current_group = {
"start": start,
"end": end,
"price": price,
"elec": elec,
"serv": serv,
"remark": remark
}
if current_group:
merged_items.append(current_group)
for item in merged_items:
ws.cell(row=row_idx, column=1, value=item["start"])
ws.cell(row=row_idx, column=2, value=item["end"])
ws.cell(row=row_idx, column=3, value=item["price"])
ws.cell(row=row_idx, column=4, value=item["elec"])
ws.cell(row=row_idx, column=5, value=item["serv"])
ws.cell(row=row_idx, column=6, value=item["remark"])
2026-01-12 07:49:18 +08:00
_apply_table_cell_style(ws, row_idx, 1, len(headers), center_cols=[1, 2, 3, 4, 5])
row_idx += 1
# 充电桩详情部分
piles_start_row = row_idx + 2
# Section Title with style
sect_cell = ws.cell(row=piles_start_row, column=1, value="充电桩详情")
sect_cell.font = SUBTITLE_FONT
sect_cell.fill = SECTION_FILL
sect_cell.border = Border(bottom=THIN_SIDE)
ws.merge_cells(start_row=piles_start_row, start_column=1, end_row=piles_start_row, end_column=6)
# Apply fill to merged cells
for c in range(2, 7):
ws.cell(row=piles_start_row, column=c).fill = SECTION_FILL
ws.cell(row=piles_start_row, column=c).border = Border(bottom=THIN_SIDE)
pile_header_row = piles_start_row + 1
pile_headers = ["编号", "功率", "类型", "状态", "价格(元/kWh)", "备注"]
for i, h in enumerate(pile_headers, start=1):
ws.cell(row=pile_header_row, column=i, value=h)
_apply_table_header_style(ws, pile_header_row, 1, len(pile_headers))
row_idx = pile_header_row + 1
if piles_detail:
for pile in piles_detail:
if not isinstance(pile, dict): continue
ws.cell(row=row_idx, column=1, value=pile.get("pile_no") or pile.get("id") or "")
ws.cell(row=row_idx, column=2, value=pile.get("power") or "")
ws.cell(row=row_idx, column=3, value=pile.get("type") or "")
ws.cell(row=row_idx, column=4, value=pile.get("status_text") or pile.get("status") or "")
ws.cell(row=row_idx, column=5, value=_to_float(pile.get("price")))
ws.cell(row=row_idx, column=6, value=pile.get("remark") or "")
_apply_table_cell_style(ws, row_idx, 1, len(pile_headers), center_cols=[1, 2, 3, 4, 5])
row_idx += 1
else:
ws.cell(row=row_idx, column=1, value="暂无详情数据")
ws.merge_cells(start_row=row_idx, start_column=1, end_row=row_idx, end_column=6)
_apply_table_cell_style(ws, row_idx, 1, len(pile_headers))
row_idx += 1
ws.freeze_panes = ws["A9"]
_set_column_widths(ws, {1: 20, 2: 20, 3: 16, 4: 16, 5: 16, 6: 25})
2026-01-15 10:06:08 +08:00
def _build_summary_sheet(ws, rows: List[Dict[str, Any]], operator: str):
2026-01-12 07:49:18 +08:00
ws.title = "场站总览"
ws.sheet_properties.tabColor = "4472C4" # Tab color matches theme
# Row 1: Main Title
2026-01-15 10:06:08 +08:00
title_cell = ws.cell(row=1, column=1, value=f"{operator}充电场站数据总览")
2026-01-12 07:49:18 +08:00
title_cell.font = TITLE_FONT
title_cell.alignment = Alignment(horizontal="center", vertical="center")
ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=10)
ws.row_dimensions[1].height = 40
# Row 2: Headers
headers = ["场站名称", "地址", "距离", "经度", "纬度", "总枪数", "空闲枪数", "停车信息", "当前价格(元/kWh)", "状态更新时间"]
for i, h in enumerate(headers, start=1):
ws.cell(row=2, column=i, value=h)
_apply_table_header_style(ws, 2, 1, len(headers))
# Enable AutoFilter
ws.auto_filter.ref = f"A2:{get_column_letter(len(headers))}{len(rows) + 2}"
row_idx = 3
for row in rows:
ws.cell(row=row_idx, column=1, value=row.get("station_name"))
ws.cell(row=row_idx, column=2, value=row.get("address"))
ws.cell(row=row_idx, column=3, value=row.get("distance"))
ws.cell(row=row_idx, column=4, value=row.get("coord_x"))
ws.cell(row=row_idx, column=5, value=row.get("coord_y"))
ws.cell(row=row_idx, column=6, value=row.get("total_guns"))
ws.cell(row=row_idx, column=7, value=row.get("free_guns"))
ws.cell(row=row_idx, column=8, value=row.get("parking_info"))
ws.cell(row=row_idx, column=9, value=_to_float(row.get("current_price")))
ws.cell(row=row_idx, column=10, value=row.get("status_update_time"))
_apply_table_cell_style(ws, row_idx, 1, len(headers), center_cols=[3, 4, 5, 6, 7, 9, 10])
row_idx += 1
_set_column_widths(ws, {1: 35, 2: 45, 3: 12, 4: 14, 5: 14, 6: 12, 7: 12, 8: 35, 9: 20, 10: 22})
ws.freeze_panes = ws["A3"]
2026-01-15 10:06:08 +08:00
async def export_excel(operator: str, output_path: str, *, limit: Optional[int] = None) -> str:
2026-01-12 07:49:18 +08:00
exporter = DorisExcelExporter(db_url=DB_URL)
await exporter.init()
try:
2026-01-15 10:06:08 +08:00
rows = await exporter.fetch_current_station_rows(operator)
2026-01-12 07:49:18 +08:00
finally:
await exporter.close()
if limit is not None:
rows = rows[: max(0, int(limit))]
wb = Workbook()
if not rows:
ws = wb.active
ws.title = "无数据"
2026-01-15 10:06:08 +08:00
ws.cell(row=1, column=1, value=f"未查询到 {operator} 的任何场站数据")
2026-01-12 07:49:18 +08:00
wb.save(output_path)
return output_path
# 1. 创建总览页
summary_ws = wb.active
2026-01-15 10:06:08 +08:00
_build_summary_sheet(summary_ws, rows, operator)
2026-01-12 07:49:18 +08:00
# 2. 为每个场站创建独立页
existing_titles: Set[str] = {"场站总览"}
for row in rows:
sheet_title = _safe_excel_sheet_title(str(row.get("station_name") or ""), existing_titles)
ws = wb.create_sheet(title=sheet_title)
_build_station_sheet(ws, row)
wb.save(output_path)
return output_path
async def main():
limit = None
2026-01-15 10:06:08 +08:00
operators = ["新电途", "特来电", "驿来特", "艾特吉易充"]
2026-01-15 21:52:14 +08:00
excel_paths: List[str] = []
2026-01-15 10:06:08 +08:00
2026-01-12 07:49:18 +08:00
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
2026-01-15 10:06:08 +08:00
export_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Excel", ts)
if not os.path.exists(export_dir):
os.makedirs(export_dir, exist_ok=True)
2026-01-12 07:49:18 +08:00
2026-01-15 10:06:08 +08:00
logger.info(f"开始批量导出 Excel...")
logger.info(f"输出目录: {export_dir}")
2026-01-12 07:49:18 +08:00
2026-01-15 10:06:08 +08:00
for operator in operators:
output_filename = f"{operator}_{ts}.xlsx"
output_path = os.path.join(export_dir, output_filename)
2026-01-15 21:52:14 +08:00
logger.info(f"--- 正在导出 {operator} ---")
2026-01-15 10:06:08 +08:00
if limit is not None:
logger.info(f"限制导出记录数: {limit}")
try:
final_path = await export_excel(operator, output_path, limit=limit)
logger.info(f"{operator} 导出完成: {final_path}")
2026-01-15 21:52:14 +08:00
excel_paths.append(final_path)
2026-01-15 10:06:08 +08:00
except Exception as e:
logger.error(f"{operator} 导出失败: {e}")
logger.info("所有导出任务已结束。")
2026-01-12 07:49:18 +08:00
if __name__ == "__main__":
asyncio.run(main())