Files
drl_2/xuexiao/etl/keshi_parser.py
user9994793890 ee860ce0ae Initial commit
2026-05-29 10:28:07 +08:00

858 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""课时核对表 Excel 解析:多工作表、多表头区块、列映射、上课日期解析"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import date
from pathlib import Path
from typing import Any, Optional
from openpyxl import load_workbook
# 文件名课时核对表_YYYY_N月份.xlsx / 达尔琳2026年1月份
FILE_MONTH_PATTERNS = [
re.compile(r'课时核对表[_\s]*(\d{4})[_\s]*(\d{1,2})\s*月', re.IGNORECASE),
re.compile(r'课时核对表[(]?(\d{4})[.\s年]*(\d{1,2})\s*月', re.IGNORECASE),
re.compile(r'(\d{4})[.\s年]*(\d{1,2})\s*月', re.IGNORECASE),
]
# 上课情况3.8 / 1.13补2次/ 1.24(补2节)
CLASS_CELL_RE = re.compile(
r'^(\d{1,2})\.(\d{1,2})(?:[(]([^)]+)[)]|\(([^)]+)\))?$',
re.IGNORECASE,
)
NAME_RE = re.compile(r'^(.+?)[(](.+?)[)]$')
PAYMENT_ROW_RE = re.compile(r'^\d{4}-\d{2}-\d{2}\s+\d{2}:')
LONG_ID_RE = re.compile(r'^\d{15,}$')
COURSE_ALIASES = {
'scratch': 'scratch',
'python': 'python',
'c++': 'c++',
'cpp': 'c++',
'wedo': 'wedo',
'lab': 'lab',
'stem': 'stem',
'code': 'code',
'大班': '大班',
'中班': '中班',
'小班': '小班',
'编程': 'python',
'python编程': 'python',
'scratch编程': 'scratch',
'乐高': 'wedo',
'wedo乐高': 'wedo',
'待定': 'lab',
'免费': 'free',
'trial': 'trial',
'试学': 'trial',
}
# 仅跳过「转账/缴费流水」类工作表(不跳过「试学明细」等含学员数据的子表)
# 列标题 → 逻辑字段(靠前者优先匹配)
HEADER_RULES: list[tuple[str, str]] = [
('12月份剩余课时', 'prev_lessons'),
('12月剩余总课时', 'prev_total_lessons'),
('12月末剩余金额', 'prev_balance'),
('12月末累计已上总课时', 'prev_cumulative_consumed'),
('12月末剩余课时', 'prev_lessons'),
('1月份剩余课时', 'end_lessons'),
('1月剩余总课时', 'end_total_lessons'),
('1月末剩余金额', 'end_balance'),
('1月末累计已上总课时', 'end_cumulative_consumed'),
('1月份已上金额', 'consumed_amount'),
('1月份已上课时', 'consumed_lessons'),
('1月已上课时', 'consumed_lessons'),
('1月份课时登记', '_class_register'),
('1月份上课情况', '_class_register'),
('1月已上课时登记', '_class_register'),
('序号', 'seq_no'),
('学员姓名', 'student_name'),
('学员名字', 'student_name'),
('课程级别', 'course_level'),
('课程', 'course_level'),
('单价/节', 'unit_price'),
('退费课时', 'refund_lessons'),
('退赠送课时', 'refund_gift_lessons'),
('退费金额', 'refund_amount'),
('账号费', 'account_fee'),
('退费', 'refund_amount'),
('赠送课时', 'prev_gift_lessons'),
('上月剩余总课时', 'prev_total_lessons'),
('剩余总课时', 'prev_total_lessons'),
('上月剩余课时', 'prev_lessons'),
('月份剩余课时', 'prev_lessons'),
('剩余课时', 'prev_lessons'),
('单价', 'unit_price'),
('上月末剩余金额', 'prev_balance'),
('月末剩余金额', 'prev_balance'),
('剩余金额', 'prev_balance'),
('新签金额', 'new_signup_amount'),
('新签课时', 'new_signup_lessons'),
('新签赠送', 'new_signup_gift_lessons'),
('续费金额', 'renewal_amount'),
('续费课时', 'renewal_lessons'),
('续费赠送', 'renewal_gift_lessons'),
('本月已上课时', 'consumed_lessons'),
('已上课时', 'consumed_lessons'),
('本月已上金额', 'consumed_amount'),
('已上金额', 'consumed_amount'),
('本月剩余总课时', 'end_total_lessons'),
('本月剩余课时', 'end_lessons'),
('本月末剩余金额', 'end_balance'),
('总课时', 'total_hours'),
('备注', 'notes'),
('缴费金额', 'new_signup_amount'),
('交费金额', 'new_signup_amount'),
('缴费时间', '_pay_time'),
]
def parse_filename_month(path: Path) -> Optional[tuple[int, int]]:
stem = path.stem
for pat in FILE_MONTH_PATTERNS:
m = pat.search(stem)
if m:
return int(m.group(1)), int(m.group(2))
return None
def parse_title_month(title: str) -> Optional[tuple[int, int]]:
if not title:
return None
m = re.search(r'(\d{4})\s*年\s*(\d{1,2})\s*月', str(title))
if m:
return int(m.group(1)), int(m.group(2))
return None
def split_student_name(full: str) -> tuple[str, str, str]:
"""返回 (全名, 正式名, 小名)"""
full = (full or '').strip().replace('\n', '')
if not full:
return '', '', ''
m = NAME_RE.match(full)
if m:
display = m.group(1).strip()
nick = m.group(2).strip()
return full, display, nick
return full, full, ''
def normalize_course_code(raw: str) -> str:
s = (raw or '').strip().lower().replace('\n', '')
s = s.replace('', '+').replace(' ', '')
if not s or s in ('-', '', '/', '', 'nan'):
return ''
return COURSE_ALIASES.get(s, s)
def infer_course_from_schedule(text: str) -> str:
t = (text or '').lower()
if 'python' in t:
return 'python'
if 'scratch' in t:
return 'scratch'
if 'wedo' in t:
return 'wedo'
if 'c++' in t or 'cpp' in t:
return 'c++'
if 'lab' in t:
return 'lab'
return ''
def sheet_default_course(sheet_name: str) -> str:
sn = (sheet_name or '').lower()
if 'lab' in sn:
return 'lab'
return ''
def should_skip_sheet(sheet_name: str) -> bool:
"""跳过纯转账流水表;保留 code/stem/lab/免费/试学明细等业务子表"""
sn = (sheet_name or '').replace(' ', '')
if '缴费流水' in sn:
return True
if '转账' in sn and ('明细' in sn or '月转账' in sn or sn.endswith('转账')):
return True
if re.search(r'\d+月转账', sn):
return True
return False
def is_real_workbook_file(path: Path, min_size: int = 50_000) -> bool:
"""排除体积过小的示例/测试表,只导入完整月度核对表"""
return path.suffix.lower() == '.xlsx' and path.stat().st_size >= min_size
def _safe_float(val: Any) -> Optional[float]:
if val is None or val == '':
return None
if isinstance(val, (int, float)):
return float(val)
s = str(val).strip().replace(',', '').replace('\n', '')
if not s or s in ('-', '', '/', '', '待定', 'nan'):
return None
try:
return float(s)
except ValueError:
return None
def _row_text(row: tuple) -> str:
return ''.join(str(c or '') for c in (row or ()))
def _has_student_col_header(text: str) -> bool:
"""标准表为「学员姓名」,免费课表为「学员名字」(字可能为不同 Unicode 码位)"""
if '学员' not in text:
return False
return '姓名' in text or '名字' in text or bool(
re.search(r'学员.{0,2}名', text)
)
def _find_title_in_row(row: tuple) -> str:
for cell in row or ():
if cell is None:
continue
s = str(cell).strip()
if s and ('课时明细' in s or '课时核对' in s or re.search(r'\d{4}\s*年', s)):
return s
return ''
def is_standard_header_row(row: tuple) -> bool:
text = _row_text(row)
return _has_student_col_header(text) and ('课程' in text or '课程级别' in text)
def is_free_class_header_row(row: tuple) -> bool:
text = _row_text(row)
return _has_student_col_header(text) and ('班级' in text or '总课时' in text)
def is_tracking_header_row(row: tuple) -> bool:
text = _row_text(row)
return _has_student_col_header(text) and '累计已上' in text and '总课时' in text
def is_trial_payment_header_row(row: tuple) -> bool:
text = _row_text(row)
return _has_student_col_header(text) and ('缴费' in text or '交费' in text)
def is_summary_row(row: tuple) -> bool:
text = _row_text(row)
return '合计' in text or text.strip().startswith('总计')
def is_payment_data_row(row: tuple) -> bool:
if not row:
return False
text = _row_text(row)
if '微信支付' in text or '已交款' in text or '网络银行' in text:
return True
first = str(row[0] or '').strip()
if PAYMENT_ROW_RE.match(first):
second = str(row[1] if len(row) > 1 else '').strip()
if LONG_ID_RE.match(second) or second == '':
return True
name_cell = ''
for cell in row[1:4]:
if cell and str(cell).strip():
name_cell = str(cell).strip()
break
if LONG_ID_RE.match(name_cell):
return True
if name_cell in ('DEL', 'XF'):
return True
return False
def _header_month_num(h: str) -> Optional[int]:
"""表头中的月份数字,如 4月份剩余课时 → 4"""
m = re.search(r'(\d{1,2})\s*月', h)
return int(m.group(1)) if m else None
def _prev_month_num(book_month: int) -> int:
return 12 if book_month == 1 else book_month - 1
def _map_header_by_month(h: str, book_month: int) -> Optional[str]:
"""按表头月份与核对表月份对齐上月→prev_*当月→end_* / consumed_*"""
hm = _header_month_num(h)
if hm is None:
return None
prev_m = _prev_month_num(book_month)
if hm == book_month:
if '剩余总课时' in h or ('剩余总' in h and '课时' in h):
return 'end_total_lessons'
if '月末' in h and ('剩余金额' in h or '余额' in h):
return 'end_balance'
if '剩余课时' in h:
return 'end_lessons'
if '已上课时' in h:
return 'consumed_lessons'
if '已上金额' in h:
return 'consumed_amount'
if '累计已上' in h:
return 'end_cumulative_consumed'
if hm == prev_m:
if '剩余总课时' in h or ('剩余总' in h and '课时' in h):
return 'prev_total_lessons'
if '月末' in h and ('剩余金额' in h or '余额' in h):
return 'prev_balance'
if '剩余课时' in h:
return 'prev_lessons'
if '累计已上' in h:
return 'prev_cumulative_consumed'
return None
def _map_header(cell_val: Any, book_year: Optional[int] = None, book_month: Optional[int] = None) -> Optional[str]:
if cell_val is None:
return None
h = str(cell_val).strip().replace('\n', '').replace(' ', '')
if not h:
return None
if book_month is not None:
by_month = _map_header_by_month(h, book_month)
if by_month:
return by_month
for keyword, field_key in HEADER_RULES:
if keyword not in str(cell_val):
continue
if field_key.startswith('_') or field_key == '_pay_time':
return field_key
if field_key in ('prev_lessons', 'prev_total_lessons', 'prev_balance', 'prev_gift_lessons'):
if '本月' in h or re.search(r'^1月|^1月份', h):
continue
if field_key == 'prev_balance' and re.search(r'^1月', h):
continue
if field_key in ('end_lessons', 'end_total_lessons', 'end_balance'):
if '12月' in h or '上月' in h:
continue
if '剩余' in h and '本月' not in h and not re.search(r'^1月|^1月份', h):
if field_key != 'end_balance' or '月末' not in h:
continue
if field_key == 'prev_balance' and re.search(r'^1月', h):
continue
if field_key == 'end_balance' and '12月' in h:
continue
if field_key == 'prev_lessons' and re.search(r'^1月', h) and '12' not in h[:3]:
if '剩余' in h:
continue
return field_key
return None
def _lessons_from_class_suffix(suffix: str, default_lessons: float) -> float:
if not suffix:
return default_lessons
m = re.search(r'(\d+(?:\.\d+)?)\s*课时', suffix)
if m:
return float(m.group(1))
m = re.search(r'补?(\d+)\s*次', suffix)
if m:
return float(m.group(1)) * default_lessons
m = re.search(r'(\d+)\s*节', suffix)
if m:
return float(m.group(1)) * default_lessons
return default_lessons
def parse_class_cell(text: str, year: int, month: int, default_lessons: float = 2.0):
"""解析上课情况单元格 → (class_date, lessons, raw_text)"""
if not text:
return None
raw = str(text).strip()
if not raw:
return None
compact = raw.replace(' ', '')
m = CLASS_CELL_RE.match(compact)
if not m:
return None
d_month = int(m.group(1))
d_day = int(m.group(2))
suffix = m.group(3) or m.group(4) or ''
lessons = _lessons_from_class_suffix(suffix, default_lessons)
use_month = month
if 1 <= d_month <= 12:
use_month = d_month
try:
class_date = date(year, use_month, d_day)
except ValueError:
return None
return class_date, lessons, raw
def _build_col_map(
header_row: tuple,
book_year: Optional[int] = None,
book_month: Optional[int] = None,
) -> tuple[dict[int, str], list[int]]:
col_map: dict[int, str] = {}
class_cols: list[int] = []
for idx, cell in enumerate(header_row):
field_key = _map_header(cell, book_year, book_month)
if field_key == '_class_register':
class_cols.append(idx)
elif field_key:
col_map[idx] = field_key
else:
h = str(cell or '').strip().replace(' ', '')
if (
not h
or '上课' in str(cell or '')
or '情况' in str(cell or '')
or '登记' in str(cell or '')
or CLASS_CELL_RE.match(h)
):
class_cols.append(idx)
notes_idx = next((i for i, k in col_map.items() if k == 'notes'), None)
end_idx = next(
(i for i, k in col_map.items() if k in ('end_balance', 'end_total_lessons', 'end_lessons')),
None,
)
scan_start = (end_idx + 1) if end_idx is not None else 0
scan_end = notes_idx if notes_idx is not None else len(header_row)
for idx in range(scan_start, scan_end):
if idx not in col_map and idx not in class_cols:
class_cols.append(idx)
if not class_cols:
mapped = set(col_map.keys())
start = (notes_idx + 1) if notes_idx is not None else (max(mapped) + 1 if mapped else 0)
for idx in range(start, len(header_row)):
if idx not in col_map:
class_cols.append(idx)
return col_map, class_cols
def _parse_standard_row(
data_row: tuple,
col_map: dict[int, str],
class_cols: list[int],
year: int,
month: int,
default_lessons: float,
section_course: str,
sheet_name: str,
) -> Optional[ParsedRow]:
name_idx = next((i for i, k in col_map.items() if k == 'student_name'), None)
if name_idx is None:
return None
name_val = data_row[name_idx] if name_idx < len(data_row) else None
if not name_val or not str(name_val).strip():
return None
name_str = str(name_val).strip().replace('\n', '')
if LONG_ID_RE.match(name_str) or name_str in ('DEL', 'XF'):
return None
full, display, nick = split_student_name(str(name_val))
course_idx = next((i for i, k in col_map.items() if k == 'course_level'), None)
course_raw = ''
if course_idx is not None and course_idx < len(data_row):
course_raw = str(data_row[course_idx] or '')
course_code = normalize_course_code(course_raw)
if not course_code:
course_code = section_course or sheet_default_course(sheet_name)
fields: dict[str, Optional[float]] = {}
notes = ''
for idx, key in col_map.items():
if idx >= len(data_row):
continue
val = data_row[idx]
if key in ('student_name', 'course_level', 'seq_no', '_pay_time'):
if key == '_pay_time':
pay_t = str(val or '').strip()
if pay_t:
notes = f'缴费时间:{pay_t}' + (f'{notes}' if notes else '')
continue
if key == 'notes':
notes = str(val or '').strip()
continue
if key == 'total_hours':
th = _safe_float(val)
if th is not None:
fields['total_hours'] = th
continue
fv = _safe_float(val)
if fv is not None:
fields[key] = fv
# lab 累计课消表:用累计差推算本月消耗(仅当未提供 consumed_lessons
if 'prev_cumulative_consumed' in fields or 'end_cumulative_consumed' in fields:
prev_c = fields.get('prev_cumulative_consumed')
end_c = fields.get('end_cumulative_consumed')
if fields.get('consumed_lessons') is None and prev_c is not None and end_c is not None:
fields['consumed_lessons'] = end_c - prev_c
seq_no = None
seq_idx = next((i for i, k in col_map.items() if k == 'seq_no'), None)
if seq_idx is not None and seq_idx < len(data_row):
try:
if data_row[seq_idx] not in (None, ''):
seq_no = int(float(data_row[seq_idx]))
except (TypeError, ValueError):
seq_no = None
sessions = []
for cidx in class_cols:
if cidx >= len(data_row):
continue
cell = data_row[cidx]
if cell is None or str(cell).strip() == '':
continue
parsed_cell = parse_class_cell(str(cell), year, month, default_lessons)
if parsed_cell:
sessions.append(parsed_cell)
return ParsedRow(
seq_no=seq_no,
student_name=full,
display_name=display,
nickname=nick,
course_code=course_code,
fields=fields,
class_sessions=sessions,
notes=notes,
source_sheet=sheet_name,
)
def _parse_free_class_row(
data_row: tuple,
class_cols: list[int],
year: int,
month: int,
default_lessons: float,
section_course: str,
sheet_name: str,
) -> tuple[Optional[ParsedRow], str]:
"""免费课表:班级|姓名|总课时|12月末剩余|1月已上|1月剩余|登记…"""
schedule = str(data_row[0] or '').strip() if len(data_row) > 0 else ''
new_section = infer_course_from_schedule(schedule) if schedule else ''
if new_section:
section_course = new_section
name_val = data_row[1] if len(data_row) > 1 else None
if not name_val or not str(name_val).strip():
return None, section_course
name_str = str(name_val).strip()
if '姓名' in name_str or LONG_ID_RE.match(name_str):
return None, section_course
full, display, nick = split_student_name(name_str)
course_code = section_course or 'free'
fields: dict[str, Optional[float]] = {}
if len(data_row) > 2:
v = _safe_float(data_row[2])
if v is not None:
fields['total_hours'] = v
if len(data_row) > 3:
v = _safe_float(data_row[3])
if v is not None:
fields['prev_lessons'] = v
if len(data_row) > 4:
v = _safe_float(data_row[4])
if v is not None:
fields['consumed_lessons'] = v
if len(data_row) > 5:
v = _safe_float(data_row[5])
if v is not None:
fields['end_lessons'] = v
notes = str(data_row[-1] or '').strip() if data_row else ''
if notes in ('免费', '备注'):
notes = ''
sessions = []
for cidx in class_cols:
if cidx >= len(data_row):
continue
cell = data_row[cidx]
if cell is None or str(cell).strip() == '':
continue
parsed_cell = parse_class_cell(str(cell), year, month, default_lessons)
if parsed_cell:
sessions.append(parsed_cell)
row = ParsedRow(
student_name=full,
display_name=display,
nickname=nick,
course_code=course_code,
fields=fields,
class_sessions=sessions,
notes=notes or '免费课',
source_sheet=sheet_name,
)
return row, section_course
def _parse_trial_payment_row(
data_row: tuple,
col_map: dict[int, str],
year: int,
month: int,
sheet_name: str,
) -> Optional[ParsedRow]:
name_idx = next((i for i, k in col_map.items() if k == 'student_name'), None)
if name_idx is None:
return None
name_val = data_row[name_idx] if name_idx < len(data_row) else None
if not name_val or not str(name_val).strip():
return None
name_str = str(name_val).strip()
if '姓名' in name_str or '合计' in name_str:
return None
full, display, nick = split_student_name(name_str)
fields: dict[str, Optional[float]] = {}
notes = '试学缴费'
for idx, key in col_map.items():
if idx >= len(data_row):
continue
val = data_row[idx]
if key == 'student_name':
continue
if key == '_pay_time':
pay_t = str(val or '').strip()
if pay_t:
notes = f'缴费时间:{pay_t}'
continue
if key == 'notes':
extra = str(val or '').strip()
if extra:
notes = f'{notes}{extra}' if notes else extra
continue
if key == 'seq_no':
continue
fv = _safe_float(val)
if fv is not None:
fields[key] = fv
if not fields.get('new_signup_amount'):
return None
if name_str.isdigit() or LONG_ID_RE.match(name_str.replace(' ', '')):
return None
return ParsedRow(
student_name=full,
display_name=display,
nickname=nick,
course_code='trial',
fields=fields,
notes=notes,
source_sheet=sheet_name,
)
def parse_sheet_rows(
rows: list[tuple],
year: int,
month: int,
sheet_name: str,
default_lessons: float = 2.0,
) -> list[ParsedRow]:
parsed_rows: list[ParsedRow] = []
col_map: Optional[dict[int, str]] = None
class_cols: list[int] = []
layout = 'standard' # standard | free | tracking | trial
section_course = sheet_default_course(sheet_name)
for row in rows:
if not row or all(c is None or str(c).strip() == '' for c in row):
continue
if is_summary_row(row):
continue
if is_payment_data_row(row):
continue
if is_standard_header_row(row):
col_map, class_cols = _build_col_map(row, year, month)
layout = 'tracking' if is_tracking_header_row(row) else 'standard'
continue
if is_free_class_header_row(row) and not is_standard_header_row(row):
col_map, class_cols = _build_col_map(row, year, month)
if not class_cols:
class_cols = list(range(6, len(row)))
layout = 'free'
continue
if is_trial_payment_header_row(row):
col_map, class_cols = _build_col_map(row, year, month)
layout = 'trial'
continue
if layout == 'trial':
parsed = _parse_trial_payment_row(
row, col_map or {}, year, month, sheet_name,
)
if parsed and parsed.student_name:
parsed_rows.append(parsed)
continue
if layout == 'free':
result, section_course = _parse_free_class_row(
row, class_cols, year, month, default_lessons, section_course, sheet_name,
)
if result and result.student_name and result.course_code:
parsed_rows.append(result)
continue
if not col_map:
continue
parsed = _parse_standard_row(
row, col_map, class_cols, year, month, default_lessons, section_course, sheet_name,
)
if parsed and parsed.student_name and parsed.course_code:
parsed_rows.append(parsed)
return parsed_rows
@dataclass
class ParsedRow:
seq_no: Optional[int] = None
student_name: str = ''
display_name: str = ''
nickname: str = ''
course_code: str = ''
fields: dict[str, Optional[float]] = field(default_factory=dict)
class_sessions: list[tuple[date, float, str]] = field(default_factory=list)
notes: str = ''
source_sheet: str = ''
@dataclass
class ParsedWorkbook:
path: Path
year: int
month: int
sheet_name: str
title: str
rows: list[ParsedRow] = field(default_factory=list)
sheets_parsed: list[str] = field(default_factory=list)
sheets_skipped: list[str] = field(default_factory=list)
def parse_workbook(path: Path, default_lessons: float = 2.0) -> ParsedWorkbook:
path = Path(path)
ym = parse_filename_month(path)
wb = load_workbook(path, read_only=True, data_only=True)
all_rows: list[ParsedRow] = []
sheets_parsed: list[str] = []
sheets_skipped: list[str] = []
title = ''
for sheet_name in wb.sheetnames:
if should_skip_sheet(sheet_name):
sheets_skipped.append(sheet_name)
continue
ws = wb[sheet_name]
rows = list(ws.iter_rows(values_only=True))
if not rows:
continue
sheet_title = _find_title_in_row(rows[0])
if not sheet_title and len(rows) > 1:
sheet_title = _find_title_in_row(rows[1])
if sheet_title and not title:
title = sheet_title
if not ym and sheet_title:
ym = parse_title_month(sheet_title)
if not ym:
continue
year, month = ym
sheet_rows = parse_sheet_rows(rows, year, month, sheet_name, default_lessons)
if sheet_rows:
sheets_parsed.append(sheet_name)
all_rows.extend(sheet_rows)
wb.close()
if not ym:
raise ValueError(f'无法从文件名或标题解析年月: {path.name}')
year, month = ym
return ParsedWorkbook(
path=path,
year=year,
month=month,
sheet_name=','.join(sheets_parsed) if sheets_parsed else '',
title=title,
rows=all_rows,
sheets_parsed=sheets_parsed,
sheets_skipped=sheets_skipped,
)
def validate_balance(row: ParsedRow, tolerance: float = 1.0) -> list[str]:
"""校验月末余额公式,返回 warning 消息列表"""
f = row.fields
warnings = []
prev_bal = f.get('prev_balance')
end_bal = f.get('end_balance')
if prev_bal is None or end_bal is None:
return warnings
income = sum(f.get(k) or 0 for k in (
'new_signup_amount', 'renewal_amount',
))
outgo = sum(f.get(k) or 0 for k in (
'consumed_amount', 'refund_amount', 'account_fee',
))
expected = prev_bal + income - outgo
diff = abs(expected - end_bal)
if diff > tolerance:
warnings.append(
f'{row.student_name}/{row.course_code}: 余额不平 '
f'期望≈{expected:.2f} 实际={end_bal:.2f} 差={diff:.2f}'
)
return warnings
def batch_validate_rows(
rows: list[ParsedRow],
*,
balance_tolerance: float = 1.0,
lessons_tolerance: float = 0.05,
) -> list[str]:
"""批量校验所有行,一次性返回全部警告(不打印日志)"""
warnings: list[str] = []
for row in rows:
warnings.extend(validate_balance(row, tolerance=balance_tolerance))
warnings.extend(validate_lessons(row, tolerance=lessons_tolerance))
return warnings
def validate_lessons(row: ParsedRow, tolerance: float = 0.05) -> list[str]:
f = row.fields
warnings = []
prev = f.get('prev_lessons')
end = f.get('end_lessons')
if prev is None or end is None:
return warnings
add = sum(f.get(k) or 0 for k in ('new_signup_lessons', 'renewal_lessons'))
sub = sum(f.get(k) or 0 for k in ('consumed_lessons', 'refund_lessons'))
consumed = f.get('consumed_lessons') or 0
if consumed < 0:
sub = abs(consumed)
add = 0
expected = prev + add - sub
if abs(expected - end) > tolerance:
warnings.append(
f'{row.student_name}/{row.course_code}: 课时不平 '
f'期望≈{expected:.2f} 实际={end:.2f}'
)
return warnings