858 lines
28 KiB
Python
858 lines
28 KiB
Python
"""课时核对表 Excel 解析:多工作表、多表头区块、列映射、上课日期解析"""
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from dataclasses import dataclass, field
|
||
from datetime import date
|
||
from pathlib import Path
|
||
from typing import Any, Optional
|
||
|
||
from openpyxl import load_workbook
|
||
|
||
# 文件名:课时核对表_YYYY_N月份.xlsx / 达尔琳2026年1月份
|
||
FILE_MONTH_PATTERNS = [
|
||
re.compile(r'课时核对表[_\s]*(\d{4})[_\s]*(\d{1,2})\s*月', re.IGNORECASE),
|
||
re.compile(r'课时核对表[((]?(\d{4})[.\s年]*(\d{1,2})\s*月', re.IGNORECASE),
|
||
re.compile(r'(\d{4})[.\s年]*(\d{1,2})\s*月', re.IGNORECASE),
|
||
]
|
||
# 上课情况:3.8 / 1.13(补2次)/ 1.24(补2节)
|
||
CLASS_CELL_RE = re.compile(
|
||
r'^(\d{1,2})\.(\d{1,2})(?:[((]([^))]+)[))]|\(([^)]+)\))?$',
|
||
re.IGNORECASE,
|
||
)
|
||
NAME_RE = re.compile(r'^(.+?)[((](.+?)[))]$')
|
||
PAYMENT_ROW_RE = re.compile(r'^\d{4}-\d{2}-\d{2}\s+\d{2}:')
|
||
LONG_ID_RE = re.compile(r'^\d{15,}$')
|
||
|
||
COURSE_ALIASES = {
|
||
'scratch': 'scratch',
|
||
'python': 'python',
|
||
'c++': 'c++',
|
||
'cpp': 'c++',
|
||
'wedo': 'wedo',
|
||
'lab': 'lab',
|
||
'stem': 'stem',
|
||
'code': 'code',
|
||
'大班': '大班',
|
||
'中班': '中班',
|
||
'小班': '小班',
|
||
'编程': 'python',
|
||
'python编程': 'python',
|
||
'scratch编程': 'scratch',
|
||
'乐高': 'wedo',
|
||
'wedo乐高': 'wedo',
|
||
'待定': 'lab',
|
||
'免费': 'free',
|
||
'trial': 'trial',
|
||
'试学': 'trial',
|
||
}
|
||
|
||
# 仅跳过「转账/缴费流水」类工作表(不跳过「试学明细」等含学员数据的子表)
|
||
|
||
# 列标题 → 逻辑字段(靠前者优先匹配)
|
||
HEADER_RULES: list[tuple[str, str]] = [
|
||
('12月份剩余课时', 'prev_lessons'),
|
||
('12月剩余总课时', 'prev_total_lessons'),
|
||
('12月末剩余金额', 'prev_balance'),
|
||
('12月末累计已上总课时', 'prev_cumulative_consumed'),
|
||
('12月末剩余课时', 'prev_lessons'),
|
||
('1月份剩余课时', 'end_lessons'),
|
||
('1月剩余总课时', 'end_total_lessons'),
|
||
('1月末剩余金额', 'end_balance'),
|
||
('1月末累计已上总课时', 'end_cumulative_consumed'),
|
||
('1月份已上金额', 'consumed_amount'),
|
||
('1月份已上课时', 'consumed_lessons'),
|
||
('1月已上课时', 'consumed_lessons'),
|
||
('1月份课时登记', '_class_register'),
|
||
('1月份上课情况', '_class_register'),
|
||
('1月已上课时登记', '_class_register'),
|
||
('序号', 'seq_no'),
|
||
('学员姓名', 'student_name'),
|
||
('学员名字', 'student_name'),
|
||
('课程级别', 'course_level'),
|
||
('课程', 'course_level'),
|
||
('单价/节', 'unit_price'),
|
||
('退费课时', 'refund_lessons'),
|
||
('退赠送课时', 'refund_gift_lessons'),
|
||
('退费金额', 'refund_amount'),
|
||
('账号费', 'account_fee'),
|
||
('退费', 'refund_amount'),
|
||
('赠送课时', 'prev_gift_lessons'),
|
||
('上月剩余总课时', 'prev_total_lessons'),
|
||
('剩余总课时', 'prev_total_lessons'),
|
||
('上月剩余课时', 'prev_lessons'),
|
||
('月份剩余课时', 'prev_lessons'),
|
||
('剩余课时', 'prev_lessons'),
|
||
('单价', 'unit_price'),
|
||
('上月末剩余金额', 'prev_balance'),
|
||
('月末剩余金额', 'prev_balance'),
|
||
('剩余金额', 'prev_balance'),
|
||
('新签金额', 'new_signup_amount'),
|
||
('新签课时', 'new_signup_lessons'),
|
||
('新签赠送', 'new_signup_gift_lessons'),
|
||
('续费金额', 'renewal_amount'),
|
||
('续费课时', 'renewal_lessons'),
|
||
('续费赠送', 'renewal_gift_lessons'),
|
||
('本月已上课时', 'consumed_lessons'),
|
||
('已上课时', 'consumed_lessons'),
|
||
('本月已上金额', 'consumed_amount'),
|
||
('已上金额', 'consumed_amount'),
|
||
('本月剩余总课时', 'end_total_lessons'),
|
||
('本月剩余课时', 'end_lessons'),
|
||
('本月末剩余金额', 'end_balance'),
|
||
('总课时', 'total_hours'),
|
||
('备注', 'notes'),
|
||
('缴费金额', 'new_signup_amount'),
|
||
('交费金额', 'new_signup_amount'),
|
||
('缴费时间', '_pay_time'),
|
||
]
|
||
|
||
|
||
def parse_filename_month(path: Path) -> Optional[tuple[int, int]]:
|
||
stem = path.stem
|
||
for pat in FILE_MONTH_PATTERNS:
|
||
m = pat.search(stem)
|
||
if m:
|
||
return int(m.group(1)), int(m.group(2))
|
||
return None
|
||
|
||
|
||
def parse_title_month(title: str) -> Optional[tuple[int, int]]:
|
||
if not title:
|
||
return None
|
||
m = re.search(r'(\d{4})\s*年\s*(\d{1,2})\s*月', str(title))
|
||
if m:
|
||
return int(m.group(1)), int(m.group(2))
|
||
return None
|
||
|
||
|
||
def split_student_name(full: str) -> tuple[str, str, str]:
|
||
"""返回 (全名, 正式名, 小名)"""
|
||
full = (full or '').strip().replace('\n', '')
|
||
if not full:
|
||
return '', '', ''
|
||
m = NAME_RE.match(full)
|
||
if m:
|
||
display = m.group(1).strip()
|
||
nick = m.group(2).strip()
|
||
return full, display, nick
|
||
return full, full, ''
|
||
|
||
|
||
def normalize_course_code(raw: str) -> str:
|
||
s = (raw or '').strip().lower().replace('\n', '')
|
||
s = s.replace('+', '+').replace(' ', '')
|
||
if not s or s in ('-', '—', '/', '无', 'nan'):
|
||
return ''
|
||
return COURSE_ALIASES.get(s, s)
|
||
|
||
|
||
def infer_course_from_schedule(text: str) -> str:
|
||
t = (text or '').lower()
|
||
if 'python' in t:
|
||
return 'python'
|
||
if 'scratch' in t:
|
||
return 'scratch'
|
||
if 'wedo' in t:
|
||
return 'wedo'
|
||
if 'c++' in t or 'cpp' in t:
|
||
return 'c++'
|
||
if 'lab' in t:
|
||
return 'lab'
|
||
return ''
|
||
|
||
|
||
def sheet_default_course(sheet_name: str) -> str:
|
||
sn = (sheet_name or '').lower()
|
||
if 'lab' in sn:
|
||
return 'lab'
|
||
return ''
|
||
|
||
|
||
def should_skip_sheet(sheet_name: str) -> bool:
|
||
"""跳过纯转账流水表;保留 code/stem/lab/免费/试学明细等业务子表"""
|
||
sn = (sheet_name or '').replace(' ', '')
|
||
if '缴费流水' in sn:
|
||
return True
|
||
if '转账' in sn and ('明细' in sn or '月转账' in sn or sn.endswith('转账')):
|
||
return True
|
||
if re.search(r'\d+月转账', sn):
|
||
return True
|
||
return False
|
||
|
||
|
||
def is_real_workbook_file(path: Path, min_size: int = 50_000) -> bool:
|
||
"""排除体积过小的示例/测试表,只导入完整月度核对表"""
|
||
return path.suffix.lower() == '.xlsx' and path.stat().st_size >= min_size
|
||
|
||
|
||
def _safe_float(val: Any) -> Optional[float]:
|
||
if val is None or val == '':
|
||
return None
|
||
if isinstance(val, (int, float)):
|
||
return float(val)
|
||
s = str(val).strip().replace(',', '').replace('\n', '')
|
||
if not s or s in ('-', '—', '/', '无', '待定', 'nan'):
|
||
return None
|
||
try:
|
||
return float(s)
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def _row_text(row: tuple) -> str:
|
||
return ''.join(str(c or '') for c in (row or ()))
|
||
|
||
|
||
def _has_student_col_header(text: str) -> bool:
|
||
"""标准表为「学员姓名」,免费课表为「学员名字」(字可能为不同 Unicode 码位)"""
|
||
if '学员' not in text:
|
||
return False
|
||
return '姓名' in text or '名字' in text or bool(
|
||
re.search(r'学员.{0,2}名', text)
|
||
)
|
||
|
||
|
||
def _find_title_in_row(row: tuple) -> str:
|
||
for cell in row or ():
|
||
if cell is None:
|
||
continue
|
||
s = str(cell).strip()
|
||
if s and ('课时明细' in s or '课时核对' in s or re.search(r'\d{4}\s*年', s)):
|
||
return s
|
||
return ''
|
||
|
||
|
||
def is_standard_header_row(row: tuple) -> bool:
|
||
text = _row_text(row)
|
||
return _has_student_col_header(text) and ('课程' in text or '课程级别' in text)
|
||
|
||
|
||
def is_free_class_header_row(row: tuple) -> bool:
|
||
text = _row_text(row)
|
||
return _has_student_col_header(text) and ('班级' in text or '总课时' in text)
|
||
|
||
|
||
def is_tracking_header_row(row: tuple) -> bool:
|
||
text = _row_text(row)
|
||
return _has_student_col_header(text) and '累计已上' in text and '总课时' in text
|
||
|
||
|
||
def is_trial_payment_header_row(row: tuple) -> bool:
|
||
text = _row_text(row)
|
||
return _has_student_col_header(text) and ('缴费' in text or '交费' in text)
|
||
|
||
|
||
def is_summary_row(row: tuple) -> bool:
|
||
text = _row_text(row)
|
||
return '合计' in text or text.strip().startswith('总计')
|
||
|
||
|
||
def is_payment_data_row(row: tuple) -> bool:
|
||
if not row:
|
||
return False
|
||
text = _row_text(row)
|
||
if '微信支付' in text or '已交款' in text or '网络银行' in text:
|
||
return True
|
||
first = str(row[0] or '').strip()
|
||
if PAYMENT_ROW_RE.match(first):
|
||
second = str(row[1] if len(row) > 1 else '').strip()
|
||
if LONG_ID_RE.match(second) or second == '':
|
||
return True
|
||
name_cell = ''
|
||
for cell in row[1:4]:
|
||
if cell and str(cell).strip():
|
||
name_cell = str(cell).strip()
|
||
break
|
||
if LONG_ID_RE.match(name_cell):
|
||
return True
|
||
if name_cell in ('DEL', 'XF'):
|
||
return True
|
||
return False
|
||
|
||
|
||
def _header_month_num(h: str) -> Optional[int]:
|
||
"""表头中的月份数字,如 4月份剩余课时 → 4"""
|
||
m = re.search(r'(\d{1,2})\s*月', h)
|
||
return int(m.group(1)) if m else None
|
||
|
||
|
||
def _prev_month_num(book_month: int) -> int:
|
||
return 12 if book_month == 1 else book_month - 1
|
||
|
||
|
||
def _map_header_by_month(h: str, book_month: int) -> Optional[str]:
|
||
"""按表头月份与核对表月份对齐:上月→prev_*,当月→end_* / consumed_*"""
|
||
hm = _header_month_num(h)
|
||
if hm is None:
|
||
return None
|
||
prev_m = _prev_month_num(book_month)
|
||
if hm == book_month:
|
||
if '剩余总课时' in h or ('剩余总' in h and '课时' in h):
|
||
return 'end_total_lessons'
|
||
if '月末' in h and ('剩余金额' in h or '余额' in h):
|
||
return 'end_balance'
|
||
if '剩余课时' in h:
|
||
return 'end_lessons'
|
||
if '已上课时' in h:
|
||
return 'consumed_lessons'
|
||
if '已上金额' in h:
|
||
return 'consumed_amount'
|
||
if '累计已上' in h:
|
||
return 'end_cumulative_consumed'
|
||
if hm == prev_m:
|
||
if '剩余总课时' in h or ('剩余总' in h and '课时' in h):
|
||
return 'prev_total_lessons'
|
||
if '月末' in h and ('剩余金额' in h or '余额' in h):
|
||
return 'prev_balance'
|
||
if '剩余课时' in h:
|
||
return 'prev_lessons'
|
||
if '累计已上' in h:
|
||
return 'prev_cumulative_consumed'
|
||
return None
|
||
|
||
|
||
def _map_header(cell_val: Any, book_year: Optional[int] = None, book_month: Optional[int] = None) -> Optional[str]:
|
||
if cell_val is None:
|
||
return None
|
||
h = str(cell_val).strip().replace('\n', '').replace(' ', '')
|
||
if not h:
|
||
return None
|
||
if book_month is not None:
|
||
by_month = _map_header_by_month(h, book_month)
|
||
if by_month:
|
||
return by_month
|
||
for keyword, field_key in HEADER_RULES:
|
||
if keyword not in str(cell_val):
|
||
continue
|
||
if field_key.startswith('_') or field_key == '_pay_time':
|
||
return field_key
|
||
if field_key in ('prev_lessons', 'prev_total_lessons', 'prev_balance', 'prev_gift_lessons'):
|
||
if '本月' in h or re.search(r'^1月|^1月份', h):
|
||
continue
|
||
if field_key == 'prev_balance' and re.search(r'^1月', h):
|
||
continue
|
||
if field_key in ('end_lessons', 'end_total_lessons', 'end_balance'):
|
||
if '12月' in h or '上月' in h:
|
||
continue
|
||
if '剩余' in h and '本月' not in h and not re.search(r'^1月|^1月份', h):
|
||
if field_key != 'end_balance' or '月末' not in h:
|
||
continue
|
||
if field_key == 'prev_balance' and re.search(r'^1月', h):
|
||
continue
|
||
if field_key == 'end_balance' and '12月' in h:
|
||
continue
|
||
if field_key == 'prev_lessons' and re.search(r'^1月', h) and '12' not in h[:3]:
|
||
if '剩余' in h:
|
||
continue
|
||
return field_key
|
||
return None
|
||
|
||
|
||
def _lessons_from_class_suffix(suffix: str, default_lessons: float) -> float:
|
||
if not suffix:
|
||
return default_lessons
|
||
m = re.search(r'(\d+(?:\.\d+)?)\s*课时', suffix)
|
||
if m:
|
||
return float(m.group(1))
|
||
m = re.search(r'补?(\d+)\s*次', suffix)
|
||
if m:
|
||
return float(m.group(1)) * default_lessons
|
||
m = re.search(r'(\d+)\s*节', suffix)
|
||
if m:
|
||
return float(m.group(1)) * default_lessons
|
||
return default_lessons
|
||
|
||
|
||
def parse_class_cell(text: str, year: int, month: int, default_lessons: float = 2.0):
|
||
"""解析上课情况单元格 → (class_date, lessons, raw_text)"""
|
||
if not text:
|
||
return None
|
||
raw = str(text).strip()
|
||
if not raw:
|
||
return None
|
||
compact = raw.replace(' ', '')
|
||
m = CLASS_CELL_RE.match(compact)
|
||
if not m:
|
||
return None
|
||
d_month = int(m.group(1))
|
||
d_day = int(m.group(2))
|
||
suffix = m.group(3) or m.group(4) or ''
|
||
lessons = _lessons_from_class_suffix(suffix, default_lessons)
|
||
use_month = month
|
||
if 1 <= d_month <= 12:
|
||
use_month = d_month
|
||
try:
|
||
class_date = date(year, use_month, d_day)
|
||
except ValueError:
|
||
return None
|
||
return class_date, lessons, raw
|
||
|
||
|
||
def _build_col_map(
|
||
header_row: tuple,
|
||
book_year: Optional[int] = None,
|
||
book_month: Optional[int] = None,
|
||
) -> tuple[dict[int, str], list[int]]:
|
||
col_map: dict[int, str] = {}
|
||
class_cols: list[int] = []
|
||
for idx, cell in enumerate(header_row):
|
||
field_key = _map_header(cell, book_year, book_month)
|
||
if field_key == '_class_register':
|
||
class_cols.append(idx)
|
||
elif field_key:
|
||
col_map[idx] = field_key
|
||
else:
|
||
h = str(cell or '').strip().replace(' ', '')
|
||
if (
|
||
not h
|
||
or '上课' in str(cell or '')
|
||
or '情况' in str(cell or '')
|
||
or '登记' in str(cell or '')
|
||
or CLASS_CELL_RE.match(h)
|
||
):
|
||
class_cols.append(idx)
|
||
|
||
notes_idx = next((i for i, k in col_map.items() if k == 'notes'), None)
|
||
end_idx = next(
|
||
(i for i, k in col_map.items() if k in ('end_balance', 'end_total_lessons', 'end_lessons')),
|
||
None,
|
||
)
|
||
scan_start = (end_idx + 1) if end_idx is not None else 0
|
||
scan_end = notes_idx if notes_idx is not None else len(header_row)
|
||
for idx in range(scan_start, scan_end):
|
||
if idx not in col_map and idx not in class_cols:
|
||
class_cols.append(idx)
|
||
if not class_cols:
|
||
mapped = set(col_map.keys())
|
||
start = (notes_idx + 1) if notes_idx is not None else (max(mapped) + 1 if mapped else 0)
|
||
for idx in range(start, len(header_row)):
|
||
if idx not in col_map:
|
||
class_cols.append(idx)
|
||
return col_map, class_cols
|
||
|
||
|
||
def _parse_standard_row(
|
||
data_row: tuple,
|
||
col_map: dict[int, str],
|
||
class_cols: list[int],
|
||
year: int,
|
||
month: int,
|
||
default_lessons: float,
|
||
section_course: str,
|
||
sheet_name: str,
|
||
) -> Optional[ParsedRow]:
|
||
name_idx = next((i for i, k in col_map.items() if k == 'student_name'), None)
|
||
if name_idx is None:
|
||
return None
|
||
name_val = data_row[name_idx] if name_idx < len(data_row) else None
|
||
if not name_val or not str(name_val).strip():
|
||
return None
|
||
name_str = str(name_val).strip().replace('\n', '')
|
||
if LONG_ID_RE.match(name_str) or name_str in ('DEL', 'XF'):
|
||
return None
|
||
|
||
full, display, nick = split_student_name(str(name_val))
|
||
course_idx = next((i for i, k in col_map.items() if k == 'course_level'), None)
|
||
course_raw = ''
|
||
if course_idx is not None and course_idx < len(data_row):
|
||
course_raw = str(data_row[course_idx] or '')
|
||
course_code = normalize_course_code(course_raw)
|
||
if not course_code:
|
||
course_code = section_course or sheet_default_course(sheet_name)
|
||
|
||
fields: dict[str, Optional[float]] = {}
|
||
notes = ''
|
||
for idx, key in col_map.items():
|
||
if idx >= len(data_row):
|
||
continue
|
||
val = data_row[idx]
|
||
if key in ('student_name', 'course_level', 'seq_no', '_pay_time'):
|
||
if key == '_pay_time':
|
||
pay_t = str(val or '').strip()
|
||
if pay_t:
|
||
notes = f'缴费时间:{pay_t}' + (f';{notes}' if notes else '')
|
||
continue
|
||
if key == 'notes':
|
||
notes = str(val or '').strip()
|
||
continue
|
||
if key == 'total_hours':
|
||
th = _safe_float(val)
|
||
if th is not None:
|
||
fields['total_hours'] = th
|
||
continue
|
||
fv = _safe_float(val)
|
||
if fv is not None:
|
||
fields[key] = fv
|
||
|
||
# lab 累计课消表:用累计差推算本月消耗(仅当未提供 consumed_lessons)
|
||
if 'prev_cumulative_consumed' in fields or 'end_cumulative_consumed' in fields:
|
||
prev_c = fields.get('prev_cumulative_consumed')
|
||
end_c = fields.get('end_cumulative_consumed')
|
||
if fields.get('consumed_lessons') is None and prev_c is not None and end_c is not None:
|
||
fields['consumed_lessons'] = end_c - prev_c
|
||
|
||
seq_no = None
|
||
seq_idx = next((i for i, k in col_map.items() if k == 'seq_no'), None)
|
||
if seq_idx is not None and seq_idx < len(data_row):
|
||
try:
|
||
if data_row[seq_idx] not in (None, ''):
|
||
seq_no = int(float(data_row[seq_idx]))
|
||
except (TypeError, ValueError):
|
||
seq_no = None
|
||
|
||
sessions = []
|
||
for cidx in class_cols:
|
||
if cidx >= len(data_row):
|
||
continue
|
||
cell = data_row[cidx]
|
||
if cell is None or str(cell).strip() == '':
|
||
continue
|
||
parsed_cell = parse_class_cell(str(cell), year, month, default_lessons)
|
||
if parsed_cell:
|
||
sessions.append(parsed_cell)
|
||
|
||
return ParsedRow(
|
||
seq_no=seq_no,
|
||
student_name=full,
|
||
display_name=display,
|
||
nickname=nick,
|
||
course_code=course_code,
|
||
fields=fields,
|
||
class_sessions=sessions,
|
||
notes=notes,
|
||
source_sheet=sheet_name,
|
||
)
|
||
|
||
|
||
def _parse_free_class_row(
|
||
data_row: tuple,
|
||
class_cols: list[int],
|
||
year: int,
|
||
month: int,
|
||
default_lessons: float,
|
||
section_course: str,
|
||
sheet_name: str,
|
||
) -> tuple[Optional[ParsedRow], str]:
|
||
"""免费课表:班级|姓名|总课时|12月末剩余|1月已上|1月剩余|登记…"""
|
||
schedule = str(data_row[0] or '').strip() if len(data_row) > 0 else ''
|
||
new_section = infer_course_from_schedule(schedule) if schedule else ''
|
||
if new_section:
|
||
section_course = new_section
|
||
|
||
name_val = data_row[1] if len(data_row) > 1 else None
|
||
if not name_val or not str(name_val).strip():
|
||
return None, section_course
|
||
name_str = str(name_val).strip()
|
||
if '姓名' in name_str or LONG_ID_RE.match(name_str):
|
||
return None, section_course
|
||
|
||
full, display, nick = split_student_name(name_str)
|
||
course_code = section_course or 'free'
|
||
|
||
fields: dict[str, Optional[float]] = {}
|
||
if len(data_row) > 2:
|
||
v = _safe_float(data_row[2])
|
||
if v is not None:
|
||
fields['total_hours'] = v
|
||
if len(data_row) > 3:
|
||
v = _safe_float(data_row[3])
|
||
if v is not None:
|
||
fields['prev_lessons'] = v
|
||
if len(data_row) > 4:
|
||
v = _safe_float(data_row[4])
|
||
if v is not None:
|
||
fields['consumed_lessons'] = v
|
||
if len(data_row) > 5:
|
||
v = _safe_float(data_row[5])
|
||
if v is not None:
|
||
fields['end_lessons'] = v
|
||
|
||
notes = str(data_row[-1] or '').strip() if data_row else ''
|
||
if notes in ('免费', '备注'):
|
||
notes = ''
|
||
|
||
sessions = []
|
||
for cidx in class_cols:
|
||
if cidx >= len(data_row):
|
||
continue
|
||
cell = data_row[cidx]
|
||
if cell is None or str(cell).strip() == '':
|
||
continue
|
||
parsed_cell = parse_class_cell(str(cell), year, month, default_lessons)
|
||
if parsed_cell:
|
||
sessions.append(parsed_cell)
|
||
|
||
row = ParsedRow(
|
||
student_name=full,
|
||
display_name=display,
|
||
nickname=nick,
|
||
course_code=course_code,
|
||
fields=fields,
|
||
class_sessions=sessions,
|
||
notes=notes or '免费课',
|
||
source_sheet=sheet_name,
|
||
)
|
||
return row, section_course
|
||
|
||
|
||
def _parse_trial_payment_row(
|
||
data_row: tuple,
|
||
col_map: dict[int, str],
|
||
year: int,
|
||
month: int,
|
||
sheet_name: str,
|
||
) -> Optional[ParsedRow]:
|
||
name_idx = next((i for i, k in col_map.items() if k == 'student_name'), None)
|
||
if name_idx is None:
|
||
return None
|
||
name_val = data_row[name_idx] if name_idx < len(data_row) else None
|
||
if not name_val or not str(name_val).strip():
|
||
return None
|
||
name_str = str(name_val).strip()
|
||
if '姓名' in name_str or '合计' in name_str:
|
||
return None
|
||
|
||
full, display, nick = split_student_name(name_str)
|
||
fields: dict[str, Optional[float]] = {}
|
||
notes = '试学缴费'
|
||
for idx, key in col_map.items():
|
||
if idx >= len(data_row):
|
||
continue
|
||
val = data_row[idx]
|
||
if key == 'student_name':
|
||
continue
|
||
if key == '_pay_time':
|
||
pay_t = str(val or '').strip()
|
||
if pay_t:
|
||
notes = f'缴费时间:{pay_t}'
|
||
continue
|
||
if key == 'notes':
|
||
extra = str(val or '').strip()
|
||
if extra:
|
||
notes = f'{notes};{extra}' if notes else extra
|
||
continue
|
||
if key == 'seq_no':
|
||
continue
|
||
fv = _safe_float(val)
|
||
if fv is not None:
|
||
fields[key] = fv
|
||
|
||
if not fields.get('new_signup_amount'):
|
||
return None
|
||
if name_str.isdigit() or LONG_ID_RE.match(name_str.replace(' ', '')):
|
||
return None
|
||
|
||
return ParsedRow(
|
||
student_name=full,
|
||
display_name=display,
|
||
nickname=nick,
|
||
course_code='trial',
|
||
fields=fields,
|
||
notes=notes,
|
||
source_sheet=sheet_name,
|
||
)
|
||
|
||
|
||
def parse_sheet_rows(
|
||
rows: list[tuple],
|
||
year: int,
|
||
month: int,
|
||
sheet_name: str,
|
||
default_lessons: float = 2.0,
|
||
) -> list[ParsedRow]:
|
||
parsed_rows: list[ParsedRow] = []
|
||
col_map: Optional[dict[int, str]] = None
|
||
class_cols: list[int] = []
|
||
layout = 'standard' # standard | free | tracking | trial
|
||
section_course = sheet_default_course(sheet_name)
|
||
|
||
for row in rows:
|
||
if not row or all(c is None or str(c).strip() == '' for c in row):
|
||
continue
|
||
if is_summary_row(row):
|
||
continue
|
||
if is_payment_data_row(row):
|
||
continue
|
||
|
||
if is_standard_header_row(row):
|
||
col_map, class_cols = _build_col_map(row, year, month)
|
||
layout = 'tracking' if is_tracking_header_row(row) else 'standard'
|
||
continue
|
||
if is_free_class_header_row(row) and not is_standard_header_row(row):
|
||
col_map, class_cols = _build_col_map(row, year, month)
|
||
if not class_cols:
|
||
class_cols = list(range(6, len(row)))
|
||
layout = 'free'
|
||
continue
|
||
if is_trial_payment_header_row(row):
|
||
col_map, class_cols = _build_col_map(row, year, month)
|
||
layout = 'trial'
|
||
continue
|
||
|
||
if layout == 'trial':
|
||
parsed = _parse_trial_payment_row(
|
||
row, col_map or {}, year, month, sheet_name,
|
||
)
|
||
if parsed and parsed.student_name:
|
||
parsed_rows.append(parsed)
|
||
continue
|
||
|
||
if layout == 'free':
|
||
result, section_course = _parse_free_class_row(
|
||
row, class_cols, year, month, default_lessons, section_course, sheet_name,
|
||
)
|
||
if result and result.student_name and result.course_code:
|
||
parsed_rows.append(result)
|
||
continue
|
||
|
||
if not col_map:
|
||
continue
|
||
|
||
parsed = _parse_standard_row(
|
||
row, col_map, class_cols, year, month, default_lessons, section_course, sheet_name,
|
||
)
|
||
if parsed and parsed.student_name and parsed.course_code:
|
||
parsed_rows.append(parsed)
|
||
|
||
return parsed_rows
|
||
|
||
|
||
@dataclass
|
||
class ParsedRow:
|
||
seq_no: Optional[int] = None
|
||
student_name: str = ''
|
||
display_name: str = ''
|
||
nickname: str = ''
|
||
course_code: str = ''
|
||
fields: dict[str, Optional[float]] = field(default_factory=dict)
|
||
class_sessions: list[tuple[date, float, str]] = field(default_factory=list)
|
||
notes: str = ''
|
||
source_sheet: str = ''
|
||
|
||
|
||
@dataclass
|
||
class ParsedWorkbook:
|
||
path: Path
|
||
year: int
|
||
month: int
|
||
sheet_name: str
|
||
title: str
|
||
rows: list[ParsedRow] = field(default_factory=list)
|
||
sheets_parsed: list[str] = field(default_factory=list)
|
||
sheets_skipped: list[str] = field(default_factory=list)
|
||
|
||
|
||
def parse_workbook(path: Path, default_lessons: float = 2.0) -> ParsedWorkbook:
|
||
path = Path(path)
|
||
ym = parse_filename_month(path)
|
||
wb = load_workbook(path, read_only=True, data_only=True)
|
||
|
||
all_rows: list[ParsedRow] = []
|
||
sheets_parsed: list[str] = []
|
||
sheets_skipped: list[str] = []
|
||
title = ''
|
||
|
||
for sheet_name in wb.sheetnames:
|
||
if should_skip_sheet(sheet_name):
|
||
sheets_skipped.append(sheet_name)
|
||
continue
|
||
ws = wb[sheet_name]
|
||
rows = list(ws.iter_rows(values_only=True))
|
||
if not rows:
|
||
continue
|
||
|
||
sheet_title = _find_title_in_row(rows[0])
|
||
if not sheet_title and len(rows) > 1:
|
||
sheet_title = _find_title_in_row(rows[1])
|
||
if sheet_title and not title:
|
||
title = sheet_title
|
||
if not ym and sheet_title:
|
||
ym = parse_title_month(sheet_title)
|
||
|
||
if not ym:
|
||
continue
|
||
|
||
year, month = ym
|
||
sheet_rows = parse_sheet_rows(rows, year, month, sheet_name, default_lessons)
|
||
if sheet_rows:
|
||
sheets_parsed.append(sheet_name)
|
||
all_rows.extend(sheet_rows)
|
||
|
||
wb.close()
|
||
|
||
if not ym:
|
||
raise ValueError(f'无法从文件名或标题解析年月: {path.name}')
|
||
year, month = ym
|
||
|
||
return ParsedWorkbook(
|
||
path=path,
|
||
year=year,
|
||
month=month,
|
||
sheet_name=','.join(sheets_parsed) if sheets_parsed else '',
|
||
title=title,
|
||
rows=all_rows,
|
||
sheets_parsed=sheets_parsed,
|
||
sheets_skipped=sheets_skipped,
|
||
)
|
||
|
||
|
||
def validate_balance(row: ParsedRow, tolerance: float = 1.0) -> list[str]:
|
||
"""校验月末余额公式,返回 warning 消息列表"""
|
||
f = row.fields
|
||
warnings = []
|
||
prev_bal = f.get('prev_balance')
|
||
end_bal = f.get('end_balance')
|
||
if prev_bal is None or end_bal is None:
|
||
return warnings
|
||
income = sum(f.get(k) or 0 for k in (
|
||
'new_signup_amount', 'renewal_amount',
|
||
))
|
||
outgo = sum(f.get(k) or 0 for k in (
|
||
'consumed_amount', 'refund_amount', 'account_fee',
|
||
))
|
||
expected = prev_bal + income - outgo
|
||
diff = abs(expected - end_bal)
|
||
if diff > tolerance:
|
||
warnings.append(
|
||
f'{row.student_name}/{row.course_code}: 余额不平 '
|
||
f'期望≈{expected:.2f} 实际={end_bal:.2f} 差={diff:.2f}'
|
||
)
|
||
return warnings
|
||
|
||
|
||
def batch_validate_rows(
|
||
rows: list[ParsedRow],
|
||
*,
|
||
balance_tolerance: float = 1.0,
|
||
lessons_tolerance: float = 0.05,
|
||
) -> list[str]:
|
||
"""批量校验所有行,一次性返回全部警告(不打印日志)"""
|
||
warnings: list[str] = []
|
||
for row in rows:
|
||
warnings.extend(validate_balance(row, tolerance=balance_tolerance))
|
||
warnings.extend(validate_lessons(row, tolerance=lessons_tolerance))
|
||
return warnings
|
||
|
||
|
||
def validate_lessons(row: ParsedRow, tolerance: float = 0.05) -> list[str]:
|
||
f = row.fields
|
||
warnings = []
|
||
prev = f.get('prev_lessons')
|
||
end = f.get('end_lessons')
|
||
if prev is None or end is None:
|
||
return warnings
|
||
add = sum(f.get(k) or 0 for k in ('new_signup_lessons', 'renewal_lessons'))
|
||
sub = sum(f.get(k) or 0 for k in ('consumed_lessons', 'refund_lessons'))
|
||
consumed = f.get('consumed_lessons') or 0
|
||
if consumed < 0:
|
||
sub = abs(consumed)
|
||
add = 0
|
||
expected = prev + add - sub
|
||
if abs(expected - end) > tolerance:
|
||
warnings.append(
|
||
f'{row.student_name}/{row.course_code}: 课时不平 '
|
||
f'期望≈{expected:.2f} 实际={end:.2f}'
|
||
)
|
||
return warnings
|