193 lines
6.3 KiB
Python
193 lines
6.3 KiB
Python
"""课时核对表:UI 展示辅助(主月份快照、格式化)"""
|
||
from __future__ import annotations
|
||
|
||
from typing import Optional
|
||
|
||
from sqlalchemy import func
|
||
|
||
from models import db, MonthlySnapshot, StudentAccount
|
||
|
||
|
||
def list_period_stats() -> list[dict]:
|
||
"""各月导入条数(用于月份切换)"""
|
||
rows = db.session.query(
|
||
MonthlySnapshot.year,
|
||
MonthlySnapshot.month,
|
||
func.count(MonthlySnapshot.id),
|
||
).group_by(
|
||
MonthlySnapshot.year, MonthlySnapshot.month,
|
||
).order_by(
|
||
MonthlySnapshot.year.desc(),
|
||
MonthlySnapshot.month.desc(),
|
||
).all()
|
||
return [
|
||
{'year': int(y), 'month': int(m), 'count': int(c)}
|
||
for y, m, c in rows
|
||
]
|
||
|
||
|
||
def get_latest_period(min_records: int = 20) -> tuple[Optional[int], Optional[int]]:
|
||
"""
|
||
主展示月份:在条数>=min_records 的月份中取最新年月。
|
||
避免测试月(如 2026-05 仅 1 条)被按年月排序误选为默认月。
|
||
"""
|
||
rows = (
|
||
db.session.query(
|
||
MonthlySnapshot.year,
|
||
MonthlySnapshot.month,
|
||
func.count(MonthlySnapshot.id),
|
||
)
|
||
.group_by(MonthlySnapshot.year, MonthlySnapshot.month)
|
||
.having(func.count(MonthlySnapshot.id) >= min_records)
|
||
.order_by(
|
||
MonthlySnapshot.year.desc(),
|
||
MonthlySnapshot.month.desc(),
|
||
)
|
||
.all()
|
||
)
|
||
if rows:
|
||
y, m, _ = rows[0]
|
||
return int(y), int(m)
|
||
row2 = (
|
||
db.session.query(MonthlySnapshot.year, MonthlySnapshot.month)
|
||
.order_by(MonthlySnapshot.year.desc(), MonthlySnapshot.month.desc())
|
||
.first()
|
||
)
|
||
return (int(row2[0]), int(row2[1])) if row2 else (None, None)
|
||
|
||
|
||
def attach_latest_snapshots(
|
||
accounts: list[StudentAccount],
|
||
year: Optional[int] = None,
|
||
month: Optional[int] = None,
|
||
) -> dict[int, MonthlySnapshot]:
|
||
"""指定月份(默认主月份)下每个账户的快照"""
|
||
if not accounts:
|
||
return {}
|
||
if year is None or month is None:
|
||
year, month = get_latest_period()
|
||
if not year or not month:
|
||
return {}
|
||
ids = [a.id for a in accounts]
|
||
snaps = MonthlySnapshot.query.filter(
|
||
MonthlySnapshot.account_id.in_(ids),
|
||
MonthlySnapshot.year == year,
|
||
MonthlySnapshot.month == month,
|
||
).all()
|
||
return {s.account_id: s for s in snaps}
|
||
|
||
|
||
def snapshot_lessons_display(snap: Optional[MonthlySnapshot], account: StudentAccount) -> dict:
|
||
"""剩余课时 / 余额 / 单价(来自 Excel 月末列)"""
|
||
if snap:
|
||
end_normal = snap.end_lessons
|
||
end_gift = snap.end_gift_lessons
|
||
end_total = snap.end_total_lessons
|
||
end_balance = snap.end_balance
|
||
# 历史误映射:当月列曾写入 prev_*,月末展示优先 end_*,否则回退 prev_*
|
||
if end_normal is None and snap.prev_lessons is not None:
|
||
end_normal = snap.prev_lessons
|
||
if end_total is None and snap.prev_total_lessons is not None:
|
||
end_total = snap.prev_total_lessons
|
||
if end_balance is None and snap.prev_balance is not None:
|
||
end_balance = snap.prev_balance
|
||
if end_gift is None and end_total is not None and end_normal is not None:
|
||
end_gift = end_total - end_normal
|
||
if end_gift is None and snap.prev_gift_lessons is not None and snap.end_lessons is None:
|
||
end_gift = snap.prev_gift_lessons
|
||
if end_total is None:
|
||
parts = [float(end_normal or 0), float(end_gift or 0)]
|
||
if any(parts):
|
||
end_total = sum(parts)
|
||
unit = snap.unit_price
|
||
if (unit is None or float(unit or 0) == 0) and snap.prev_balance and snap.prev_lessons:
|
||
try:
|
||
unit = float(snap.prev_balance) / float(snap.prev_lessons)
|
||
except (TypeError, ValueError, ZeroDivisionError):
|
||
unit = account.unit_price
|
||
elif unit is None:
|
||
unit = account.unit_price
|
||
return {
|
||
'end_lessons': end_normal,
|
||
'end_gift_lessons': end_gift,
|
||
'end_total_lessons': end_total,
|
||
'end_balance': end_balance,
|
||
'unit_price': unit,
|
||
'period': snap.period_label,
|
||
}
|
||
return {
|
||
'end_lessons': account.normal_hours,
|
||
'end_gift_lessons': account.gifted_hours,
|
||
'end_total_lessons': account.total_hours,
|
||
'end_balance': None,
|
||
'unit_price': account.unit_price,
|
||
'period': None,
|
||
}
|
||
|
||
|
||
def fmt_money(val) -> str:
|
||
if val is None:
|
||
return '-'
|
||
return f'¥{float(val):,.2f}'
|
||
|
||
|
||
def fmt_hours(val) -> str:
|
||
if val is None:
|
||
return '-'
|
||
v = float(val)
|
||
if v == int(v):
|
||
return str(int(v))
|
||
return f'{v:.2f}'
|
||
|
||
|
||
def fmt_price(val) -> str:
|
||
if val is None:
|
||
return '-'
|
||
v = float(val)
|
||
if v == 0:
|
||
return '0.00'
|
||
return f'{v:.2f}'
|
||
|
||
|
||
def summarize_students_ledger(
|
||
student_ids: list[int],
|
||
year: Optional[int] = None,
|
||
month: Optional[int] = None,
|
||
) -> dict[int, dict]:
|
||
"""按学员汇总主月份 Excel 快照:总剩余课时、总余额、课程数"""
|
||
if not student_ids:
|
||
return {}
|
||
if year is None or month is None:
|
||
year, month = get_latest_period()
|
||
if not year or not month:
|
||
return {}
|
||
|
||
rows = (
|
||
db.session.query(MonthlySnapshot, StudentAccount)
|
||
.join(StudentAccount, MonthlySnapshot.account_id == StudentAccount.id)
|
||
.filter(
|
||
MonthlySnapshot.year == year,
|
||
MonthlySnapshot.month == month,
|
||
StudentAccount.student_id.in_(student_ids),
|
||
)
|
||
.all()
|
||
)
|
||
out: dict[int, dict] = {}
|
||
for snap, acc in rows:
|
||
disp = snapshot_lessons_display(snap, acc)
|
||
sid = acc.student_id
|
||
bucket = out.setdefault(sid, {
|
||
'total_lessons': 0.0,
|
||
'total_balance': 0.0,
|
||
'course_count': 0,
|
||
'period': disp.get('period'),
|
||
})
|
||
th = disp.get('end_total_lessons')
|
||
if th is not None:
|
||
bucket['total_lessons'] += float(th)
|
||
tb = disp.get('end_balance')
|
||
if tb is not None:
|
||
bucket['total_balance'] += float(tb)
|
||
bucket['course_count'] += 1
|
||
return out
|