Files
drl_2/xuexiao/business_rules.py
user9994793890 ee860ce0ae Initial commit
2026-05-29 10:28:07 +08:00

551 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
教培管理系统 · 业务规则引擎
与《教培管理系统_规则模块需求文档.md》保持一致
"""
from datetime import date
from decimal import Decimal, ROUND_DOWN
from models import (
db, RechargeActivity, RechargeRecord, StudentAccount, Course,
ConsumptionRecord, MonthlySnapshot,
)
from sqlalchemy import func
# ========== 规则常量 ==========
MATERIAL_FEE_PER_QUARTER = 250
LOW_HOURS_WARNING_THRESHOLD = 5
MAX_STOP_DAYS = 365 * 3
MIN_CONSUMPTION_UNIT = 1
RECHARGE_CUMULATIVE = False
RECHARGE_MULTI_TRIGGER = False
RECHARGE_GROUP_DISCOUNT = False
GIFT_CAN_TRANSFER = False
# ========== 账户工具 ==========
def account_effective_unit_price(account, course=None):
"""学员账户加权单价(优先 unit_price其次累计金额/正课总量)"""
if account.unit_price and float(account.unit_price) > 0:
return float(account.unit_price)
normal_pool = float(account.normal_hours or 0) + float(account.consumed_normal or 0)
if normal_pool > 0 and float(account.cumulative_amount or 0) > 0:
return float(account.cumulative_amount) / normal_pool
if course and course.price_per_hour:
return float(course.price_per_hour)
return 0.0
def update_account_unit_price(account):
"""充值/消课后刷新加权单价"""
normal_pool = float(account.normal_hours or 0) + float(account.consumed_normal or 0)
if normal_pool > 0 and float(account.cumulative_amount or 0) > 0:
account.unit_price = round(float(account.cumulative_amount) / normal_pool, 4)
elif float(account.normal_hours or 0) <= 0 and float(account.consumed_normal or 0) <= 0:
account.unit_price = 0
def lock_student_account(student_id, course_id):
"""行级锁,降低并发重复扣课风险"""
q = StudentAccount.query.filter_by(student_id=student_id, course_id=course_id)
try:
return q.with_for_update().first()
except Exception:
return q.first()
def account_snapshot_dict(account):
if not account:
return {}
return {
'normal_hours': float(account.normal_hours or 0),
'gifted_hours': float(account.gifted_hours or 0),
'consumed_normal': float(account.consumed_normal or 0),
'consumed_gifted': float(account.consumed_gifted or 0),
'cumulative_amount': float(account.cumulative_amount or 0),
'unit_price': float(account.unit_price or 0),
}
def _next_year_month(year, month):
if month >= 12:
return year + 1, 1
return year, month + 1
def validate_account_non_negative(account):
"""业务层禁止负课时"""
account.normal_hours = max(0, float(account.normal_hours or 0))
account.gifted_hours = max(0, float(account.gifted_hours or 0))
account.cumulative_amount = max(0, float(account.cumulative_amount or 0))
account.version = int(account.version or 0) + 1
def propagate_snapshot_chain(account_id, from_year, from_month, tolerance=0.05):
"""将 from 月月末同步为下一月初(链式)"""
snaps = MonthlySnapshot.query.filter_by(
account_id=account_id,
).order_by(MonthlySnapshot.year, MonthlySnapshot.month).all()
if not snaps:
return 0
by_key = {(s.year, s.month): s for s in snaps}
updated = 0
y, m = from_year, from_month
while (y, m) in by_key:
cur = by_key[(y, m)]
ny, nm = _next_year_month(y, m)
if (ny, nm) not in by_key:
break
nxt = by_key[(ny, nm)]
if cur.end_lessons is not None:
nxt.prev_lessons = cur.end_lessons
if cur.end_gift_lessons is not None:
nxt.prev_gift_lessons = cur.end_gift_lessons
if cur.end_total_lessons is not None:
nxt.prev_total_lessons = cur.end_total_lessons
if cur.end_balance is not None:
nxt.prev_balance = cur.end_balance
updated += 1
y, m = ny, nm
return updated
def sync_month_snapshot_from_account(account, year, month):
"""按指定年月回写快照期末,并链式更新下月月初"""
snap = MonthlySnapshot.query.filter_by(
account_id=account.id, year=year, month=month,
).first()
if not snap:
return
nh = float(account.normal_hours or 0)
gh = float(account.gifted_hours or 0)
snap.end_lessons = nh
snap.end_gift_lessons = gh
snap.end_total_lessons = nh + gh
unit = account_effective_unit_price(account)
if unit > 0:
snap.unit_price = unit
snap.end_balance = round(nh * unit, 2)
propagate_snapshot_chain(account.id, year, month)
def sync_account_snapshots(account, as_of_date=None):
"""在线业务变更后同步快照(按操作归属月)"""
d = as_of_date or date.today()
sync_month_snapshot_from_account(account, d.year, d.month)
def sync_current_month_snapshot(account):
sync_account_snapshots(account, date.today())
def calc_consumed_hours_value(student_id, course_id, fallback_unit):
"""§三 已消金额:按每条消课记录时点单价汇总(不随改价回溯)"""
records = ConsumptionRecord.query.filter(
ConsumptionRecord.student_id == student_id,
ConsumptionRecord.course_id == course_id,
ConsumptionRecord.is_trial == 0,
).all()
total = 0.0
for r in records:
up = float(r.unit_price_at_consume or 0) if getattr(r, 'unit_price_at_consume', None) else 0
if up <= 0:
up = fallback_unit
total += float(r.normal_consumed or 0) * up
if total <= 0:
account = StudentAccount.query.filter_by(
student_id=student_id, course_id=course_id,
).first()
if account:
total = float(account.consumed_normal or 0) * fallback_unit
return round(total, 2)
def repair_account_snapshot_chain(account_id):
"""修复单账户全部相邻月份的月初←上月月末衔接"""
snaps = MonthlySnapshot.query.filter_by(account_id=account_id).order_by(
MonthlySnapshot.year, MonthlySnapshot.month,
).all()
updated = 0
for i in range(len(snaps) - 1):
cur, nxt = snaps[i], snaps[i + 1]
ny, nm = _next_year_month(cur.year, cur.month)
if (nxt.year, nxt.month) != (ny, nm):
continue
if cur.end_lessons is not None:
nxt.prev_lessons = cur.end_lessons
if cur.end_gift_lessons is not None:
nxt.prev_gift_lessons = cur.end_gift_lessons
if cur.end_total_lessons is not None:
nxt.prev_total_lessons = cur.end_total_lessons
if cur.end_balance is not None:
nxt.prev_balance = cur.end_balance
updated += 1
return updated
def repair_all_snapshot_chains(tolerance=0.05):
"""ETL/维护:修复全部账户的快照链"""
del tolerance # 保留参数兼容
account_ids = [
row[0] for row in db.session.query(MonthlySnapshot.account_id).distinct().all()
]
return repair_snapshot_chains_for_accounts(account_ids)
def repair_snapshot_chains_for_accounts(account_ids) -> int:
"""仅修复指定账户的快照链(一次查询加载,避免逐账户 SELECT"""
if not account_ids:
return 0
ids = list({int(a) for a in account_ids})
snaps = MonthlySnapshot.query.filter(
MonthlySnapshot.account_id.in_(ids),
).order_by(
MonthlySnapshot.account_id,
MonthlySnapshot.year,
MonthlySnapshot.month,
).all()
by_account: dict[int, list] = {}
for s in snaps:
by_account.setdefault(s.account_id, []).append(s)
updated = 0
for items in by_account.values():
for i in range(len(items) - 1):
cur, nxt = items[i], items[i + 1]
ny, nm = _next_year_month(cur.year, cur.month)
if (nxt.year, nxt.month) != (ny, nm):
continue
if cur.end_lessons is not None:
nxt.prev_lessons = cur.end_lessons
if cur.end_gift_lessons is not None:
nxt.prev_gift_lessons = cur.end_gift_lessons
if cur.end_total_lessons is not None:
nxt.prev_total_lessons = cur.end_total_lessons
if cur.end_balance is not None:
nxt.prev_balance = cur.end_balance
updated += 1
if updated:
db.session.commit()
return updated
# ========== 充值 ==========
def calc_normal_hours_from_amount(amount, price_per_hour):
if not price_per_hour or float(price_per_hour) <= 0:
return 0
hours = float(amount) / float(price_per_hour)
return max(int(hours), 0)
def match_recharge_activities(student_id, course_id, amount):
today = date.today()
amount = float(amount)
if amount <= 0:
return 0.0, None
activities = RechargeActivity.query.filter(
RechargeActivity.trigger_amount > 0,
RechargeActivity.trigger_amount <= amount,
RechargeActivity.status == 1,
).order_by(RechargeActivity.trigger_amount.desc()).all()
total_gifted = 0.0
primary_activity_id = None
for activity in activities:
if float(activity.trigger_amount) > amount:
continue
if activity.is_time_limited:
if activity.start_date and activity.start_date > today:
continue
if activity.end_date and activity.end_date < today:
continue
if activity.is_cumulative and RECHARGE_CUMULATIVE:
cumulative = db.session.query(
func.coalesce(func.sum(RechargeRecord.amount), 0)
).filter(
RechargeRecord.student_id == student_id,
RechargeRecord.course_id == course_id,
).scalar()
if float(cumulative) + amount < float(activity.trigger_amount):
continue
total_gifted += float(activity.gifted_hours or 0)
if primary_activity_id is None:
primary_activity_id = activity.id
if not activity.can_stack:
break
return total_gifted, primary_activity_id
def apply_recharge_account(account, amount, normal_hours, gifted_hours, is_new_account=False):
if is_new_account or not account.cumulative_start_date:
account.cumulative_start_date = date.today()
account.normal_hours = float(account.normal_hours or 0) + normal_hours
account.gifted_hours = float(account.gifted_hours or 0) + gifted_hours
account.cumulative_amount = float(account.cumulative_amount or 0) + float(amount)
account.normal_validity = 'permanent'
account.gifted_validity = 'permanent'
update_account_unit_price(account)
sync_current_month_snapshot(account)
# ========== 消课 ==========
def normalize_consumption_hours(hours, *, allow_zero=False):
"""§六 整课时0 或非法值默认拒绝(试听课 allow_zero=True"""
try:
h = int(Decimal(str(hours)).quantize(Decimal('1'), rounding=ROUND_DOWN))
except (ValueError, TypeError, ArithmeticError):
h = 0
if h <= 0:
if allow_zero:
return 0
raise ValueError('消课课时必须为正整数')
return max(h, MIN_CONSUMPTION_UNIT)
def deduct_hours(account, hours_to_deduct):
hours_to_deduct = normalize_consumption_hours(hours_to_deduct)
remaining_normal = float(account.normal_hours or 0)
remaining_gifted = float(account.gifted_hours or 0)
if remaining_normal + remaining_gifted < hours_to_deduct:
raise ValueError(
f'课时不足!剩余正课{int(remaining_normal)}课时,赠课{int(remaining_gifted)}课时'
)
if remaining_normal >= hours_to_deduct:
account.normal_hours = remaining_normal - hours_to_deduct
account.consumed_normal = float(account.consumed_normal or 0) + hours_to_deduct
update_account_unit_price(account)
validate_account_non_negative(account)
return hours_to_deduct, 0, 'normal'
normal_consumed = int(remaining_normal)
gifted_consumed = hours_to_deduct - normal_consumed
account.normal_hours = 0
account.gifted_hours = remaining_gifted - gifted_consumed
account.consumed_normal = float(account.consumed_normal or 0) + normal_consumed
account.consumed_gifted = float(account.consumed_gifted or 0) + gifted_consumed
update_account_unit_price(account)
validate_account_non_negative(account)
return normal_consumed, gifted_consumed, 'mixed'
# ========== 退费 ==========
def _count_quarters(student_id, course_id):
"""§三 按实际上课季度计材料费(非试听课)"""
records = ConsumptionRecord.query.filter(
ConsumptionRecord.student_id == student_id,
ConsumptionRecord.course_id == course_id,
ConsumptionRecord.is_trial == 0,
ConsumptionRecord.hours_consumed > 0,
).all()
quarters = set()
for r in records:
if r.consume_date:
quarters.add((r.consume_date.year, (r.consume_date.month - 1) // 3 + 1))
return len(quarters)
def _promo_clawback_gift_hours(student_id, course_id, remaining_recharge_amount):
"""仅统计尚未扣回过的优惠赠课"""
clawback = 0.0
records = RechargeRecord.query.filter_by(
student_id=student_id, course_id=course_id,
).order_by(RechargeRecord.created_at.desc()).all()
for rec in records:
if getattr(rec, 'gift_clawed_back', 0):
continue
if not rec.activity_id or float(rec.gifted_hours or 0) <= 0:
continue
activity = db.session.get(RechargeActivity, rec.activity_id)
if not activity:
continue
if remaining_recharge_amount < float(activity.trigger_amount):
clawback += float(rec.gifted_hours or 0)
return clawback
def _mark_promo_gift_clawed_back(student_id, course_id, hours_to_claw):
"""标记已回退的充值赠课记录"""
remaining = float(hours_to_claw)
if remaining <= 0:
return
records = RechargeRecord.query.filter_by(
student_id=student_id, course_id=course_id,
).order_by(RechargeRecord.created_at.desc()).all()
for rec in records:
if remaining <= 0:
break
if getattr(rec, 'gift_clawed_back', 0):
continue
if float(rec.gifted_hours or 0) <= 0:
continue
rec.gift_clawed_back = 1
remaining -= float(rec.gifted_hours or 0)
def calculate_refund(student_id, course_id, partial_amount=None):
account = StudentAccount.query.filter_by(
student_id=student_id, course_id=course_id,
).first()
if not account:
return {
'refund_amount': 0, 'consumed_hours_value': 0, 'material_fee': 0,
'deduct_gifted_amount': 0, 'quarters': 0, 'total_deduct': 0,
'remaining_normal': 0, 'remaining_gifted': 0, 'price_per_hour': 0,
'promo_clawback_hours': 0, 'is_partial': False,
'full_refund_amount': 0,
}
course = db.session.get(Course, course_id)
unit_price = account_effective_unit_price(account, course)
total_recharged = float(account.cumulative_amount or 0)
consumed_hours_value = calc_consumed_hours_value(
student_id, course_id, unit_price,
)
quarters = _count_quarters(student_id, course_id)
material_fee = quarters * MATERIAL_FEE_PER_QUARTER
remaining_gifted = float(account.gifted_hours or 0)
gifted_value_remaining = remaining_gifted * unit_price
full_refund = max(
0,
total_recharged - consumed_hours_value - material_fee - gifted_value_remaining,
)
full_refund = round(full_refund, 2)
is_partial = partial_amount is not None and float(partial_amount) < full_refund
refund_amount = min(float(partial_amount), full_refund) if is_partial else full_refund
refund_amount = round(max(refund_amount, 0), 2)
remaining_recharge = total_recharged - refund_amount
promo_clawback_hours = _promo_clawback_gift_hours(
student_id, course_id, remaining_recharge,
)
total_deduct = round(
consumed_hours_value + material_fee + gifted_value_remaining, 2,
)
return {
'refund_amount': refund_amount,
'consumed_hours_value': round(consumed_hours_value, 2),
'material_fee': material_fee,
'deduct_gifted_amount': round(gifted_value_remaining, 2),
'quarters': quarters,
'total_deduct': total_deduct,
'remaining_normal': int(float(account.normal_hours or 0)),
'remaining_gifted': int(remaining_gifted),
'price_per_hour': unit_price,
'promo_clawback_hours': int(promo_clawback_hours),
'is_partial': is_partial,
'full_refund_amount': full_refund,
}
def apply_refund(account, calc, student_id=None, course_id=None):
"""执行退费后更新账户(优惠扣回仅在此处执行一次)"""
student_id = student_id or account.student_id
course_id = course_id or account.course_id
claw = calc.get('promo_clawback_hours', 0) or 0
if calc.get('is_partial'):
ratio = calc['refund_amount'] / max(float(account.cumulative_amount or 1), 0.01)
account.cumulative_amount = max(
0, float(account.cumulative_amount or 0) - calc['refund_amount'],
)
normal = float(account.normal_hours or 0)
account.normal_hours = max(0, int(normal * (1 - min(ratio, 1))))
if claw > 0:
account.gifted_hours = max(0, float(account.gifted_hours or 0) - claw)
_mark_promo_gift_clawed_back(student_id, course_id, claw)
update_account_unit_price(account)
validate_account_non_negative(account)
sync_account_snapshots(account)
return
account.normal_hours = 0
account.gifted_hours = 0
account.cumulative_amount = 0
account.unit_price = 0
account.account_status = 'refunded'
if claw > 0:
_mark_promo_gift_clawed_back(student_id, course_id, claw)
for rec in RechargeRecord.query.filter_by(
student_id=student_id, course_id=course_id,
).all():
if float(rec.gifted_hours or 0) > 0:
rec.gift_clawed_back = 1
validate_account_non_negative(account)
sync_account_snapshots(account)
def manual_adjust_account(account, normal_delta=0, gifted_delta=0, reason=''):
"""管理员手动调课时(可增可减,禁止结果为负)"""
nh = float(account.normal_hours or 0) + float(normal_delta)
gh = float(account.gifted_hours or 0) + float(gifted_delta)
if nh < 0 or gh < 0:
raise ValueError('调整后课时不能为负数')
account.normal_hours = nh
account.gifted_hours = gh
update_account_unit_price(account)
validate_account_non_negative(account)
sync_account_snapshots(account)
return account_snapshot_dict(account)
# ========== 转课 / 转赠 ==========
def calc_transfer_to_hours(normal_hours, from_unit_price, to_unit_price):
"""§四 转入课时 = 正课剩余金额 ÷ 转入单价"""
normal_hours = int(normal_hours)
if to_unit_price <= 0:
return 0
amount = normal_hours * float(from_unit_price)
return max(int(amount / float(to_unit_price)), 0)
def transfer_gift_allowed():
return GIFT_CAN_TRANSFER
# ========== 停课 ==========
def validate_stop_period(start_date, end_date):
if not start_date or not end_date:
return False, '请填写起止日期'
if end_date < start_date:
return False, '结束日期不能早于开始日期'
if (end_date - start_date).days > MAX_STOP_DAYS:
return False, '停课保号最长3年'
return True, ''
def recharge_activity_defaults():
return {
'is_cumulative': 0,
'cumulative_window': 0,
'cumulative_clear': 0,
'allow_multi_trigger': 0,
'is_time_limited': 0,
'target_audience': 'all',
'can_stack': 1,
'gift_cap': 0,
'group_discount': 0,
'group_min_people': 0,
'group_extra_hours': 0,
'referral_reward': 0,
'referral_reward_hours': 0,
'gift_type': '课时',
}