Files
drl_2/xuexiao/keshi_display.py
user9994793890 ee860ce0ae Initial commit
2026-05-29 10:28:07 +08:00

193 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""课时核对表UI 展示辅助(主月份快照、格式化)"""
from __future__ import annotations
from typing import Optional
from sqlalchemy import func
from models import db, MonthlySnapshot, StudentAccount
def list_period_stats() -> list[dict]:
"""各月导入条数(用于月份切换)"""
rows = db.session.query(
MonthlySnapshot.year,
MonthlySnapshot.month,
func.count(MonthlySnapshot.id),
).group_by(
MonthlySnapshot.year, MonthlySnapshot.month,
).order_by(
MonthlySnapshot.year.desc(),
MonthlySnapshot.month.desc(),
).all()
return [
{'year': int(y), 'month': int(m), 'count': int(c)}
for y, m, c in rows
]
def get_latest_period(min_records: int = 20) -> tuple[Optional[int], Optional[int]]:
"""
主展示月份:在条数>=min_records 的月份中取最新年月。
避免测试月(如 2026-05 仅 1 条)被按年月排序误选为默认月。
"""
rows = (
db.session.query(
MonthlySnapshot.year,
MonthlySnapshot.month,
func.count(MonthlySnapshot.id),
)
.group_by(MonthlySnapshot.year, MonthlySnapshot.month)
.having(func.count(MonthlySnapshot.id) >= min_records)
.order_by(
MonthlySnapshot.year.desc(),
MonthlySnapshot.month.desc(),
)
.all()
)
if rows:
y, m, _ = rows[0]
return int(y), int(m)
row2 = (
db.session.query(MonthlySnapshot.year, MonthlySnapshot.month)
.order_by(MonthlySnapshot.year.desc(), MonthlySnapshot.month.desc())
.first()
)
return (int(row2[0]), int(row2[1])) if row2 else (None, None)
def attach_latest_snapshots(
accounts: list[StudentAccount],
year: Optional[int] = None,
month: Optional[int] = None,
) -> dict[int, MonthlySnapshot]:
"""指定月份(默认主月份)下每个账户的快照"""
if not accounts:
return {}
if year is None or month is None:
year, month = get_latest_period()
if not year or not month:
return {}
ids = [a.id for a in accounts]
snaps = MonthlySnapshot.query.filter(
MonthlySnapshot.account_id.in_(ids),
MonthlySnapshot.year == year,
MonthlySnapshot.month == month,
).all()
return {s.account_id: s for s in snaps}
def snapshot_lessons_display(snap: Optional[MonthlySnapshot], account: StudentAccount) -> dict:
"""剩余课时 / 余额 / 单价(来自 Excel 月末列)"""
if snap:
end_normal = snap.end_lessons
end_gift = snap.end_gift_lessons
end_total = snap.end_total_lessons
end_balance = snap.end_balance
# 历史误映射:当月列曾写入 prev_*,月末展示优先 end_*,否则回退 prev_*
if end_normal is None and snap.prev_lessons is not None:
end_normal = snap.prev_lessons
if end_total is None and snap.prev_total_lessons is not None:
end_total = snap.prev_total_lessons
if end_balance is None and snap.prev_balance is not None:
end_balance = snap.prev_balance
if end_gift is None and end_total is not None and end_normal is not None:
end_gift = end_total - end_normal
if end_gift is None and snap.prev_gift_lessons is not None and snap.end_lessons is None:
end_gift = snap.prev_gift_lessons
if end_total is None:
parts = [float(end_normal or 0), float(end_gift or 0)]
if any(parts):
end_total = sum(parts)
unit = snap.unit_price
if (unit is None or float(unit or 0) == 0) and snap.prev_balance and snap.prev_lessons:
try:
unit = float(snap.prev_balance) / float(snap.prev_lessons)
except (TypeError, ValueError, ZeroDivisionError):
unit = account.unit_price
elif unit is None:
unit = account.unit_price
return {
'end_lessons': end_normal,
'end_gift_lessons': end_gift,
'end_total_lessons': end_total,
'end_balance': end_balance,
'unit_price': unit,
'period': snap.period_label,
}
return {
'end_lessons': account.normal_hours,
'end_gift_lessons': account.gifted_hours,
'end_total_lessons': account.total_hours,
'end_balance': None,
'unit_price': account.unit_price,
'period': None,
}
def fmt_money(val) -> str:
if val is None:
return '-'
return f'¥{float(val):,.2f}'
def fmt_hours(val) -> str:
if val is None:
return '-'
v = float(val)
if v == int(v):
return str(int(v))
return f'{v:.2f}'
def fmt_price(val) -> str:
if val is None:
return '-'
v = float(val)
if v == 0:
return '0.00'
return f'{v:.2f}'
def summarize_students_ledger(
student_ids: list[int],
year: Optional[int] = None,
month: Optional[int] = None,
) -> dict[int, dict]:
"""按学员汇总主月份 Excel 快照:总剩余课时、总余额、课程数"""
if not student_ids:
return {}
if year is None or month is None:
year, month = get_latest_period()
if not year or not month:
return {}
rows = (
db.session.query(MonthlySnapshot, StudentAccount)
.join(StudentAccount, MonthlySnapshot.account_id == StudentAccount.id)
.filter(
MonthlySnapshot.year == year,
MonthlySnapshot.month == month,
StudentAccount.student_id.in_(student_ids),
)
.all()
)
out: dict[int, dict] = {}
for snap, acc in rows:
disp = snapshot_lessons_display(snap, acc)
sid = acc.student_id
bucket = out.setdefault(sid, {
'total_lessons': 0.0,
'total_balance': 0.0,
'course_count': 0,
'period': disp.get('period'),
})
th = disp.get('end_total_lessons')
if th is not None:
bucket['total_lessons'] += float(th)
tb = disp.get('end_balance')
if tb is not None:
bucket['total_balance'] += float(tb)
bucket['course_count'] += 1
return out