Files
drl_2/xuexiao/keshi_stats.py
user9994793890 ee860ce0ae Initial commit
2026-05-29 10:28:07 +08:00

252 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""基于课时核对表 MonthlySnapshot 的统计汇总Excel 导入数据)"""
from __future__ import annotations
from typing import Optional
from sqlalchemy import func
from models import db, MonthlySnapshot, StudentAccount, Course, ClassRecord
from keshi_display import get_latest_period, list_period_stats
def _recharge_expr():
return (
func.coalesce(MonthlySnapshot.new_signup_amount, 0)
+ func.coalesce(MonthlySnapshot.renewal_amount, 0)
)
def _snapshot_query(year: Optional[int] = None, month: Optional[int] = None):
q = db.session.query(MonthlySnapshot)
if year is not None:
q = q.filter(MonthlySnapshot.year == year)
if month is not None:
q = q.filter(MonthlySnapshot.month == month)
return q
def period_overview(year: int, month: int) -> dict:
"""单月概览:充值、消课、退费、新签人数"""
base = _snapshot_query(year, month)
recharge = base.with_entities(func.coalesce(func.sum(_recharge_expr()), 0)).scalar() or 0
consumed_h = base.with_entities(
func.coalesce(func.sum(MonthlySnapshot.consumed_lessons), 0),
).scalar() or 0
consumed_amt = base.with_entities(
func.coalesce(func.sum(MonthlySnapshot.consumed_amount), 0),
).scalar() or 0
refund = base.with_entities(
func.coalesce(func.sum(MonthlySnapshot.refund_amount), 0),
).scalar() or 0
end_balance = base.with_entities(
func.coalesce(func.sum(MonthlySnapshot.end_balance), 0),
).scalar() or 0
new_signups = (
db.session.query(func.count(func.distinct(StudentAccount.student_id)))
.select_from(MonthlySnapshot)
.join(StudentAccount, MonthlySnapshot.account_id == StudentAccount.id)
.filter(
MonthlySnapshot.year == year,
MonthlySnapshot.month == month,
db.or_(
MonthlySnapshot.new_signup_amount > 0,
MonthlySnapshot.new_signup_lessons > 0,
),
)
.scalar()
or 0
)
active_accounts = base.count()
return {
'recharge': float(recharge),
'consumed_hours': float(consumed_h),
'consumed_amount': float(consumed_amt),
'refund': float(refund),
'end_balance': float(end_balance),
'new_signups': int(new_signups),
'active_accounts': int(active_accounts),
}
def year_overview(year: int) -> dict:
"""年度汇总(该年各月快照之和)"""
base = _snapshot_query(year, None)
recharge = base.with_entities(func.coalesce(func.sum(_recharge_expr()), 0)).scalar() or 0
consumed_h = base.with_entities(
func.coalesce(func.sum(MonthlySnapshot.consumed_lessons), 0),
).scalar() or 0
refund = base.with_entities(
func.coalesce(func.sum(MonthlySnapshot.refund_amount), 0),
).scalar() or 0
new_signups = (
db.session.query(func.count(func.distinct(StudentAccount.student_id)))
.select_from(MonthlySnapshot)
.join(StudentAccount, MonthlySnapshot.account_id == StudentAccount.id)
.filter(
MonthlySnapshot.year == year,
db.or_(
MonthlySnapshot.new_signup_amount > 0,
MonthlySnapshot.new_signup_lessons > 0,
),
)
.scalar()
or 0
)
return {
'recharge': float(recharge),
'consumed_hours': float(consumed_h),
'refund': float(refund),
'new_signups': int(new_signups),
}
def cumulative_finance() -> dict:
"""全库累计(所有已导入月份)"""
recharge = (
db.session.query(func.coalesce(func.sum(_recharge_expr()), 0)).scalar() or 0
)
refund = (
db.session.query(func.coalesce(func.sum(MonthlySnapshot.refund_amount), 0)).scalar()
or 0
)
consumed_amt = (
db.session.query(func.coalesce(func.sum(MonthlySnapshot.consumed_amount), 0)).scalar()
or 0
)
return {
'total_recharge': float(recharge),
'total_refund': float(refund),
'net_income': float(recharge) - float(refund),
'total_consumed_amount': float(consumed_amt),
}
def course_stats_for_period(year: int, month: int) -> list[dict]:
"""各课程:充值、消课课时、消课金额、月末余额"""
rows = (
db.session.query(
Course.name,
Course.course_code,
func.coalesce(func.sum(_recharge_expr()), 0),
func.coalesce(func.sum(MonthlySnapshot.consumed_lessons), 0),
func.coalesce(func.sum(MonthlySnapshot.consumed_amount), 0),
func.coalesce(func.sum(MonthlySnapshot.refund_amount), 0),
func.coalesce(func.sum(MonthlySnapshot.end_balance), 0),
)
.select_from(MonthlySnapshot)
.join(StudentAccount, MonthlySnapshot.account_id == StudentAccount.id)
.join(Course, StudentAccount.course_id == Course.id)
.filter(MonthlySnapshot.year == year, MonthlySnapshot.month == month)
.group_by(Course.id, Course.name, Course.course_code)
.order_by(func.sum(_recharge_expr()).desc())
.all()
)
return [
{
'name': name,
'code': code or name,
'recharge': float(r or 0),
'consumed_hours': float(h or 0),
'consumed_amount': float(a or 0),
'refund': float(rf or 0),
'end_balance': float(bal or 0),
}
for name, code, r, h, a, rf, bal in rows
if float(r or 0) or float(h or 0) or float(rf or 0) or float(bal or 0)
]
def refund_breakdown(year: int, month: int) -> list[dict]:
"""有退费金额的记录按课程汇总"""
rows = (
db.session.query(
Course.name,
func.count(MonthlySnapshot.id),
func.coalesce(func.sum(MonthlySnapshot.refund_amount), 0),
func.coalesce(func.sum(MonthlySnapshot.refund_lessons), 0),
)
.select_from(MonthlySnapshot)
.join(StudentAccount, MonthlySnapshot.account_id == StudentAccount.id)
.join(Course, StudentAccount.course_id == Course.id)
.filter(
MonthlySnapshot.year == year,
MonthlySnapshot.month == month,
MonthlySnapshot.refund_amount > 0,
)
.group_by(Course.id, Course.name)
.order_by(func.sum(MonthlySnapshot.refund_amount).desc())
.all()
)
return [
{
'name': name,
'count': int(cnt),
'amount': float(amt or 0),
'lessons': float(less or 0),
}
for name, cnt, amt, less in rows
]
def monthly_trend_from_snapshots() -> list[dict]:
"""已导入各月充值/消课/退费趋势"""
stats = list_period_stats()
if not stats:
return []
trend = []
for p in sorted(stats, key=lambda x: (x['year'], x['month'])):
ov = period_overview(p['year'], p['month'])
trend.append({
'month': f"{p['year']}-{p['month']:02d}",
'label': f"{p['month']}",
'recharge': ov['recharge'],
'consumption': ov['consumed_hours'],
'refund': ov['refund'],
'count': p['count'],
})
return trend
def class_activity_for_period(year: int, month: int) -> list[dict]:
"""上课记录按课程汇总(人次/课时)"""
rows = (
db.session.query(
Course.name,
func.count(ClassRecord.id),
func.coalesce(func.sum(ClassRecord.lessons_consumed), 0),
)
.select_from(ClassRecord)
.join(StudentAccount, ClassRecord.account_id == StudentAccount.id)
.join(Course, StudentAccount.course_id == Course.id)
.filter(ClassRecord.year == year, ClassRecord.month == month)
.group_by(Course.id, Course.name)
.order_by(func.count(ClassRecord.id).desc())
.limit(15)
.all()
)
return [
{'name': name, 'sessions': int(cnt), 'lessons': float(less or 0)}
for name, cnt, less in rows
]
def resolve_report_period(
year: Optional[int] = None,
month: Optional[int] = None,
) -> tuple[int, int]:
if year and month:
return int(year), int(month)
py, pm = get_latest_period()
if py and pm:
return py, pm
row = (
db.session.query(MonthlySnapshot.year, MonthlySnapshot.month)
.order_by(MonthlySnapshot.year.desc(), MonthlySnapshot.month.desc())
.first()
)
if row:
return int(row[0]), int(row[1])
from datetime import date
t = date.today()
return t.year, t.month