import asyncio import json import logging import os import re import sys import zipfile from datetime import datetime from decimal import Decimal from typing import Any, Dict, List, Optional, Set logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if project_root not in sys.path: sys.path.append(project_root) from Util import Win32Patch Win32Patch.patch() from Config.Config import DB_URL from Model.YltAnalyticsModel import YltAnalyticsModel try: from openpyxl import Workbook from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from openpyxl.utils import get_column_letter except Exception as e: raise RuntimeError("缺少 openpyxl 依赖,请先安装:pip install openpyxl") from e # --- 样式定义 --- # 颜色 HEADER_FILL_COLOR = "4472C4" # 专业蓝 (表头背景) HEADER_FONT_COLOR = "FFFFFF" # 白色 (表头文字) LABEL_FILL_COLOR = "D9E1F2" # 浅蓝 (Key标签背景,更柔和) SECTION_FILL_COLOR = "DDEBF7" # 极浅蓝 (小节标题背景) BORDER_COLOR = "BFBFBF" # 边框颜色 OUTER_BORDER_COLOR = "4472C4" # 外边框颜色 # 字体 FONT_NAME = "微软雅黑" TITLE_FONT = Font(name=FONT_NAME, size=20, bold=True, color="4472C4") # 主标题加大 SUBTITLE_FONT = Font(name=FONT_NAME, size=12, bold=True, color="203764") # 小节标题加深色 HEADER_FONT = Font(name=FONT_NAME, size=11, bold=True, color=HEADER_FONT_COLOR) LABEL_FONT = Font(name=FONT_NAME, size=10, bold=True, color="333333") # 标签文字深灰 DATA_FONT = Font(name=FONT_NAME, size=10) # 边框 THIN_SIDE = Side(style="thin", color=BORDER_COLOR) MEDIUM_SIDE = Side(style="medium", color=OUTER_BORDER_COLOR) BORDER_ALL_THIN = Border(left=THIN_SIDE, right=THIN_SIDE, top=THIN_SIDE, bottom=THIN_SIDE) # 填充 HEADER_FILL = PatternFill("solid", fgColor=HEADER_FILL_COLOR) LABEL_FILL = PatternFill("solid", fgColor=LABEL_FILL_COLOR) SECTION_FILL = PatternFill("solid", fgColor=SECTION_FILL_COLOR) INVALID_SHEET_CHARS_RE = re.compile(r"[:\\/?*\[\]]") def _safe_excel_sheet_title(raw_title: str, existing_titles: Set[str]) -> str: title = (raw_title or "").strip() if not title: title = "未命名场站" title = INVALID_SHEET_CHARS_RE.sub("_", title) title = title.replace("\n", " ").replace("\r", " ").strip() if len(title) > 31: title = title[:31].rstrip() if not title: title = "未命名场站" if title not in existing_titles: existing_titles.add(title) return title base = title[:28].rstrip() if len(title) > 28 else title idx = 2 while True: candidate = f"{base}_{idx}" if len(candidate) > 31: candidate = candidate[:31] if candidate not in existing_titles: existing_titles.add(candidate) return candidate idx += 1 def _to_float(value): if value is None: return None if isinstance(value, Decimal): return float(value) return value def _parse_schedule(schedule_json: Any) -> List[Dict[str, Any]]: if not schedule_json: return [] if isinstance(schedule_json, (dict, list)): return schedule_json if isinstance(schedule_json, list) else [schedule_json] if isinstance(schedule_json, (bytes, bytearray)): try: schedule_json = schedule_json.decode("utf-8", errors="ignore") except Exception: schedule_json = str(schedule_json) if not isinstance(schedule_json, str): return [] try: data = json.loads(schedule_json) if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] except Exception: return [] def _apply_table_header_style(ws, row_idx: int, col_start: int, col_end: int): alignment = Alignment(horizontal="center", vertical="center", wrap_text=True) # Header row with top/bottom medium border border = Border(left=THIN_SIDE, right=THIN_SIDE, top=MEDIUM_SIDE, bottom=MEDIUM_SIDE) for col in range(col_start, col_end + 1): cell = ws.cell(row=row_idx, column=col) cell.fill = HEADER_FILL cell.font = HEADER_FONT cell.alignment = alignment cell.border = border ws.row_dimensions[row_idx].height = 28 # Increased header height def _apply_table_cell_style(ws, row_idx: int, col_start: int, col_end: int, center_cols: List[int] = None): for col in range(col_start, col_end + 1): cell = ws.cell(row=row_idx, column=col) cell.font = DATA_FONT is_center = center_cols and col in center_cols cell.alignment = Alignment(horizontal="center" if is_center else "left", vertical="center", wrap_text=True) cell.border = BORDER_ALL_THIN ws.row_dimensions[row_idx].height = 22 # Comfortable row height def _style_info_key_value(ws, row, col_key, col_val, merge_val_cols=0): # Key Style c_key = ws.cell(row=row, column=col_key) c_key.font = LABEL_FONT c_key.alignment = Alignment(horizontal="right", vertical="center") c_key.fill = LABEL_FILL c_key.border = BORDER_ALL_THIN # Value Style c_val = ws.cell(row=row, column=col_val) c_val.font = DATA_FONT c_val.alignment = Alignment(horizontal="left", vertical="center", wrap_text=True) c_val.border = BORDER_ALL_THIN if merge_val_cols > 0: ws.merge_cells(start_row=row, start_column=col_val, end_row=row, end_column=col_val + merge_val_cols) # Apply border to merged cells (required for openpyxl) for c_offset in range(1, merge_val_cols + 1): c_merged = ws.cell(row=row, column=col_val + c_offset) c_merged.border = BORDER_ALL_THIN def _set_column_widths(ws, widths: dict[int, float]): for col_idx, width in widths.items(): ws.column_dimensions[get_column_letter(col_idx)].width = width class DorisExcelExporter: def __init__(self, db_url: str = None): self.model = YltAnalyticsModel() async def init(self): pass # Model handles initialization if needed async def close(self): pass # Model handles closing if needed async def fetch_current_station_rows(self, operator: str) -> List[Dict[str, Any]]: return await self.model.fetch_current_station_rows(operator) def extract_hourly_prices_from_schedule(schedule_json: Any) -> List[Optional[float]]: items = _parse_schedule(schedule_json) series: List[Optional[float]] = [None] * 24 if not items: return series for idx, item in enumerate(items): if not isinstance(item, dict): continue price = _to_float(item.get("price_kwh") if "price_kwh" in item else item.get("price")) if price is None: continue start = item.get("start") or item.get("begin") or item.get("start_time") h = None if isinstance(start, str) and ":" in start: try: h = int(start.split(":", 1)[0]) except Exception: h = None if h is None: h = idx if 0 <= h < 24: series[h] = float(price) return series def _build_station_sheet(ws, row: Dict[str, Any]): station_name = row.get("station_name") or "" address = row.get("address") or "" coord_x = row.get("coord_x") coord_y = row.get("coord_y") total_guns = row.get("total_guns") free_guns = row.get("free_guns") current_price = _to_float(row.get("current_price")) pro_price = _to_float(row.get("pro_price")) parking_info = row.get("parking_info") distance = row.get("distance") status_update_time = row.get("status_update_time") schedule_update_time = row.get("schedule_update_time") schedule_items = _parse_schedule(row.get("schedule_json")) piles_detail = _parse_schedule(row.get("piles_detail_json")) # Reuse the same parser for i in range(1, len(schedule_items)): curr = schedule_items[i] prev = schedule_items[i - 1] if not isinstance(curr, dict) or not isinstance(prev, dict): continue for k in ("price_kwh", "electric_fee_kwh", "service_fee_kwh", "price", "electric_fee", "service_fee"): if curr.get(k) is None and prev.get(k) is not None: curr[k] = prev[k] ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=6) ws.row_dimensions[1].height = 40 # Increased title height title_cell = ws.cell(row=1, column=1, value=station_name or "场站") title_cell.font = TITLE_FONT title_cell.alignment = Alignment(horizontal="center", vertical="center") # Title border (Optional: bottom medium) for c in range(1, 7): ws.cell(row=1, column=c).border = Border(bottom=MEDIUM_SIDE) ws.sheet_properties.tabColor = "4472C4" # Tab color matches theme # Layout using _style_info_key_value # Row 3: 位置 | Address _style_info_key_value(ws, 3, 1, 2, merge_val_cols=4) ws.cell(row=3, column=1, value="位置") ws.cell(row=3, column=2, value=address) # Row 4: 经纬度 | Coords | 距离 | Distance coord_text = f"{coord_x}, {coord_y}" if coord_x is not None else "" _style_info_key_value(ws, 4, 1, 2, merge_val_cols=1) # 经纬度 occupies col 2-3 ws.cell(row=4, column=1, value="经纬度") ws.cell(row=4, column=2, value=coord_text) _style_info_key_value(ws, 4, 4, 5, merge_val_cols=1) # 距离 occupies col 5-6 ws.cell(row=4, column=4, value="距离") ws.cell(row=4, column=5, value=distance) # Row 5: 空闲 | val | 总枪 | val | 停车 | val (Parking info needs more space, let's adjust) # Original: Free(1,2), Total(3,4), Parking(5,6) - Parking info is long usually. # New Layout: Free(1,2), Total(3,4), Parking(5,6) is fine if col 6 is wide. _style_info_key_value(ws, 5, 1, 2) ws.cell(row=5, column=1, value="空闲枪数") ws.cell(row=5, column=2, value=free_guns) _style_info_key_value(ws, 5, 3, 4) ws.cell(row=5, column=3, value="总枪数") ws.cell(row=5, column=4, value=total_guns) _style_info_key_value(ws, 5, 5, 6) ws.cell(row=5, column=5, value="停车信息") ws.cell(row=5, column=6, value=parking_info) # Row 6: Price | val | Pro Price | val | Update Time | val _style_info_key_value(ws, 6, 1, 2) ws.cell(row=6, column=1, value="当前价格") ws.cell(row=6, column=2, value=current_price) _style_info_key_value(ws, 6, 3, 4) ws.cell(row=6, column=3, value="PRO会员价") ws.cell(row=6, column=4, value=pro_price) _style_info_key_value(ws, 6, 5, 6) ws.cell(row=6, column=5, value="更新时间") ws.cell(row=6, column=6, value=status_update_time) # Adjust row heights for info section for r in range(3, 7): ws.row_dimensions[r].height = 24 # 价格表部分 schedule_start_row = 8 # Section Title with style sect_cell = ws.cell(row=schedule_start_row, column=1, value="价格时段表") sect_cell.font = SUBTITLE_FONT sect_cell.fill = SECTION_FILL sect_cell.border = Border(bottom=THIN_SIDE) ws.merge_cells(start_row=schedule_start_row, start_column=1, end_row=schedule_start_row, end_column=6) # Apply fill to merged cells for c in range(2, 7): ws.cell(row=schedule_start_row, column=c).fill = SECTION_FILL ws.cell(row=schedule_start_row, column=c).border = Border(bottom=THIN_SIDE) header_row = schedule_start_row + 1 headers = ["开始", "结束", "电价(元/kWh)", "电费(元/kWh)", "服务费(元/kWh)", "备注"] for i, h in enumerate(headers, start=1): ws.cell(row=header_row, column=i, value=h) _apply_table_header_style(ws, header_row, 1, len(headers)) row_idx = header_row + 1 # 预处理 schedule_items:如果只有 24 个且没有 start/end,补充时间 processed_items = [] if len(schedule_items) == 24: has_time = any(item.get("start") or item.get("begin") or item.get("start_time") for item in schedule_items if isinstance(item, dict)) if not has_time: for i, item in enumerate(schedule_items): if not isinstance(item, dict): continue new_item = item.copy() new_item["start"] = f"{i:02d}:00" new_item["end"] = f"{i+1:02d}:00" processed_items.append(new_item) else: processed_items = schedule_items else: processed_items = schedule_items # 进一步优化:合并连续相同价格的时段 merged_items = [] if processed_items: current_group = None for item in processed_items: if not isinstance(item, dict): continue # 提取关键价格信息用于比较 price = _to_float(item.get("price_kwh") if "price_kwh" in item else item.get("price")) elec = _to_float(item.get("electric_fee_kwh") if "electric_fee_kwh" in item else item.get("electric_fee") if "electric_fee" in item else item.get("elec_price")) serv = _to_float(item.get("service_fee_kwh") if "service_fee_kwh" in item else item.get("service_fee") if "service_fee" in item else item.get("service_price")) remark = item.get("type") or item.get("tag") or item.get("period") or "" start = item.get("start") or item.get("begin") or item.get("start_time") or "" end = item.get("end") or item.get("finish") or item.get("end_time") or "" if current_group is None: current_group = { "start": start, "end": end, "price": price, "elec": elec, "serv": serv, "remark": remark } else: # 如果价格、电费、服务费、备注都相同,则合并 if (current_group["price"] == price and current_group["elec"] == elec and current_group["serv"] == serv and current_group["remark"] == remark): current_group["end"] = end else: merged_items.append(current_group) current_group = { "start": start, "end": end, "price": price, "elec": elec, "serv": serv, "remark": remark } if current_group: merged_items.append(current_group) for item in merged_items: ws.cell(row=row_idx, column=1, value=item["start"]) ws.cell(row=row_idx, column=2, value=item["end"]) ws.cell(row=row_idx, column=3, value=item["price"]) ws.cell(row=row_idx, column=4, value=item["elec"]) ws.cell(row=row_idx, column=5, value=item["serv"]) ws.cell(row=row_idx, column=6, value=item["remark"]) _apply_table_cell_style(ws, row_idx, 1, len(headers), center_cols=[1, 2, 3, 4, 5]) row_idx += 1 # 充电桩详情部分 piles_start_row = row_idx + 2 # Section Title with style sect_cell = ws.cell(row=piles_start_row, column=1, value="充电桩详情") sect_cell.font = SUBTITLE_FONT sect_cell.fill = SECTION_FILL sect_cell.border = Border(bottom=THIN_SIDE) ws.merge_cells(start_row=piles_start_row, start_column=1, end_row=piles_start_row, end_column=6) # Apply fill to merged cells for c in range(2, 7): ws.cell(row=piles_start_row, column=c).fill = SECTION_FILL ws.cell(row=piles_start_row, column=c).border = Border(bottom=THIN_SIDE) pile_header_row = piles_start_row + 1 pile_headers = ["编号", "功率", "类型", "状态", "价格(元/kWh)", "备注"] for i, h in enumerate(pile_headers, start=1): ws.cell(row=pile_header_row, column=i, value=h) _apply_table_header_style(ws, pile_header_row, 1, len(pile_headers)) row_idx = pile_header_row + 1 if piles_detail: for pile in piles_detail: if not isinstance(pile, dict): continue ws.cell(row=row_idx, column=1, value=pile.get("pile_no") or pile.get("id") or "") ws.cell(row=row_idx, column=2, value=pile.get("power") or "") ws.cell(row=row_idx, column=3, value=pile.get("type") or "") ws.cell(row=row_idx, column=4, value=pile.get("status_text") or pile.get("status") or "") ws.cell(row=row_idx, column=5, value=_to_float(pile.get("price"))) ws.cell(row=row_idx, column=6, value=pile.get("remark") or "") _apply_table_cell_style(ws, row_idx, 1, len(pile_headers), center_cols=[1, 2, 3, 4, 5]) row_idx += 1 else: ws.cell(row=row_idx, column=1, value="暂无详情数据") ws.merge_cells(start_row=row_idx, start_column=1, end_row=row_idx, end_column=6) _apply_table_cell_style(ws, row_idx, 1, len(pile_headers)) row_idx += 1 ws.freeze_panes = ws["A9"] _set_column_widths(ws, {1: 20, 2: 20, 3: 16, 4: 16, 5: 16, 6: 25}) def _build_summary_sheet(ws, rows: List[Dict[str, Any]], operator: str): ws.title = "场站总览" ws.sheet_properties.tabColor = "4472C4" # Tab color matches theme # Row 1: Main Title title_cell = ws.cell(row=1, column=1, value=f"{operator}充电场站数据总览") title_cell.font = TITLE_FONT title_cell.alignment = Alignment(horizontal="center", vertical="center") ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=10) ws.row_dimensions[1].height = 40 # Row 2: Headers headers = ["场站名称", "地址", "距离", "经度", "纬度", "总枪数", "空闲枪数", "停车信息", "当前价格(元/kWh)", "状态更新时间"] for i, h in enumerate(headers, start=1): ws.cell(row=2, column=i, value=h) _apply_table_header_style(ws, 2, 1, len(headers)) # Enable AutoFilter ws.auto_filter.ref = f"A2:{get_column_letter(len(headers))}{len(rows) + 2}" row_idx = 3 for row in rows: ws.cell(row=row_idx, column=1, value=row.get("station_name")) ws.cell(row=row_idx, column=2, value=row.get("address")) ws.cell(row=row_idx, column=3, value=row.get("distance")) ws.cell(row=row_idx, column=4, value=row.get("coord_x")) ws.cell(row=row_idx, column=5, value=row.get("coord_y")) ws.cell(row=row_idx, column=6, value=row.get("total_guns")) ws.cell(row=row_idx, column=7, value=row.get("free_guns")) ws.cell(row=row_idx, column=8, value=row.get("parking_info")) ws.cell(row=row_idx, column=9, value=_to_float(row.get("current_price"))) ws.cell(row=row_idx, column=10, value=row.get("status_update_time")) _apply_table_cell_style(ws, row_idx, 1, len(headers), center_cols=[3, 4, 5, 6, 7, 9, 10]) row_idx += 1 _set_column_widths(ws, {1: 35, 2: 45, 3: 12, 4: 14, 5: 14, 6: 12, 7: 12, 8: 35, 9: 20, 10: 22}) ws.freeze_panes = ws["A3"] async def export_excel(operator: str, output_path: str, *, limit: Optional[int] = None) -> str: exporter = DorisExcelExporter(db_url=DB_URL) await exporter.init() try: rows = await exporter.fetch_current_station_rows(operator) finally: await exporter.close() if limit is not None: rows = rows[: max(0, int(limit))] wb = Workbook() if not rows: ws = wb.active ws.title = "无数据" ws.cell(row=1, column=1, value=f"未查询到 {operator} 的任何场站数据") wb.save(output_path) return output_path # 1. 创建总览页 summary_ws = wb.active _build_summary_sheet(summary_ws, rows, operator) # 2. 为每个场站创建独立页 existing_titles: Set[str] = {"场站总览"} for row in rows: sheet_title = _safe_excel_sheet_title(str(row.get("station_name") or ""), existing_titles) ws = wb.create_sheet(title=sheet_title) _build_station_sheet(ws, row) wb.save(output_path) return output_path async def main(): limit = None operators = ["新电途", "特来电", "驿来特", "艾特吉易充"] excel_paths: List[str] = [] ts = datetime.now().strftime("%Y%m%d_%H%M%S") export_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Excel", ts) if not os.path.exists(export_dir): os.makedirs(export_dir, exist_ok=True) logger.info(f"开始批量导出 Excel...") logger.info(f"输出目录: {export_dir}") for operator in operators: output_filename = f"{operator}_{ts}.xlsx" output_path = os.path.join(export_dir, output_filename) logger.info(f"--- 正在导出 {operator} ---") if limit is not None: logger.info(f"限制导出记录数: {limit}") try: final_path = await export_excel(operator, output_path, limit=limit) logger.info(f"{operator} 导出完成: {final_path}") excel_paths.append(final_path) except Exception as e: logger.error(f"{operator} 导出失败: {e}") logger.info("所有导出任务已结束。") if __name__ == "__main__": asyncio.run(main())