190 lines
7.0 KiB
Python
190 lines
7.0 KiB
Python
"""
|
||
全系统 Bug 审查脚本(对应 全系统Bug审查指令.md)
|
||
用法: python scripts/run_bug_audit.py
|
||
"""
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
ROOT = Path(__file__).resolve().parent.parent
|
||
sys.path.insert(0, str(ROOT))
|
||
|
||
ISSUES = []
|
||
OK = []
|
||
|
||
|
||
def fail(level, code, msg):
|
||
ISSUES.append({'level': level, 'code': code, 'msg': msg})
|
||
|
||
|
||
def pass_(code, msg):
|
||
OK.append({'code': code, 'msg': msg})
|
||
|
||
|
||
def audit_routes():
|
||
from routes import register_all_routes
|
||
from app import create_app
|
||
app = create_app()
|
||
sensitive = {
|
||
'refund_add': 'refund_add',
|
||
'consumption_add': 'consumption_add',
|
||
'consumption_batch': 'consumption_add',
|
||
'recharge_add': 'recharge_add',
|
||
'transfer_add': 'transfer_add',
|
||
'keshibiao_import': 'keshibiao_import',
|
||
}
|
||
with app.app_context():
|
||
for rule in app.url_map.iter_rules():
|
||
ep = rule.endpoint
|
||
if ep not in sensitive:
|
||
continue
|
||
vf = app.view_functions.get(ep)
|
||
if not vf:
|
||
continue
|
||
src = (vf.__doc__ or '') + str(vf)
|
||
# 检查装饰器链(简化:看函数 __wrapped__)
|
||
fn = vf
|
||
names = []
|
||
while fn:
|
||
names.append(getattr(fn, '__name__', ''))
|
||
fn = getattr(fn, '__wrapped__', None)
|
||
if 'permission_required' not in str(names) and 'decorated' not in str(names):
|
||
# 读取源码文件更可靠
|
||
import inspect
|
||
try:
|
||
mod = inspect.getmodule(vf)
|
||
path = inspect.getfile(vf)
|
||
text = Path(path).read_text(encoding='utf-8')
|
||
block = text.split(f"endpoint='{ep}'")[0].split('def ')[-1]
|
||
except Exception:
|
||
text = ''
|
||
if f"endpoint='{ep}'" in text:
|
||
idx = text.find(f"endpoint='{ep}'")
|
||
chunk = text[max(0, idx - 400):idx + 50]
|
||
if 'permission_required' not in chunk:
|
||
fail('🟡', f'PERM-{ep}', f'路由 {ep} 可能缺少 permission_required')
|
||
else:
|
||
pass_(f'PERM-{ep}', f'{ep} 已配置权限')
|
||
else:
|
||
pass_(f'PERM-{ep}', f'{ep} 已检查')
|
||
else:
|
||
pass_(f'PERM-{ep}', f'{ep} 权限装饰器存在')
|
||
|
||
|
||
def audit_business_rules():
|
||
import business_rules as r
|
||
import inspect
|
||
|
||
src = inspect.getsource(r.deduct_hours)
|
||
if 'gifted_hours' in src and 'normal_hours' in src:
|
||
pass_('BR-01', 'deduct_hours 含正课/赠课分支')
|
||
if 'consumed_normal' in inspect.getsource(r.calc_transfer_to_hours):
|
||
fail('🔴', 'BR-02', '转课换算仍引用 consumed_normal')
|
||
|
||
tsrc = Path(ROOT / 'routes' / 'transfer.py').read_text(encoding='utf-8')
|
||
if 'consumed_normal' in tsrc and '+ transfer_hours' in tsrc:
|
||
fail('🔴', 'BR-03', 'transfer.py 仍将转课计入 consumed_normal')
|
||
else:
|
||
pass_('BR-03', '转课不污染 consumed_normal')
|
||
|
||
rsrc = Path(ROOT / 'routes' / 'refund.py').read_text(encoding='utf-8')
|
||
if rsrc.count('promo_clawback_hours') > 2 and 'apply_refund' in rsrc:
|
||
# 路由层不应重复扣赠课
|
||
lines = [l for l in rsrc.splitlines() if 'gifted_hours' in l and 'promo' in l]
|
||
dup = [l for l in rsrc.splitlines() if 'account.gifted_hours' in l and 'promo_clawback' in l]
|
||
if dup:
|
||
fail('🔴', 'BR-04', 'refund.py 可能重复扣赠课')
|
||
else:
|
||
pass_('BR-04', '退费赠课扣回仅在 apply_refund')
|
||
else:
|
||
pass_('BR-04', '退费路由无重复扣赠课')
|
||
|
||
if hasattr(r, 'propagate_snapshot_chain'):
|
||
pass_('BR-05', '已实现快照链式 propagate')
|
||
else:
|
||
fail('🟠', 'BR-05', '缺少 propagate_snapshot_chain')
|
||
|
||
if hasattr(r, 'calc_consumed_hours_value'):
|
||
pass_('BR-06', '消课金额按记录时点单价核算')
|
||
else:
|
||
fail('🟠', 'BR-06', '退费仍用单一单价×consumed_normal')
|
||
|
||
|
||
def audit_db_models():
|
||
from models import ConsumptionRecord, StudentAccount, RechargeRecord, RefundRecord
|
||
cols = {c.name for c in ConsumptionRecord.__table__.columns}
|
||
if 'unit_price_at_consume' in cols:
|
||
pass_('DB-01', 'ConsumptionRecord.unit_price_at_consume 存在')
|
||
else:
|
||
fail('🟠', 'DB-01', '缺少 unit_price_at_consume 列')
|
||
|
||
if 'gift_clawed_back' in {c.name for c in RechargeRecord.__table__.columns}:
|
||
pass_('DB-02', 'gift_clawed_back 存在')
|
||
else:
|
||
fail('🟠', 'DB-02', '缺少 gift_clawed_back')
|
||
|
||
acc_cols = {c.name for c in StudentAccount.__table__.columns}
|
||
if 'unit_price' in acc_cols:
|
||
pass_('DB-03', 'student_accounts.unit_price 存在')
|
||
|
||
|
||
def audit_chain_snapshots():
|
||
from app import create_app
|
||
from models import MonthlySnapshot
|
||
app = create_app()
|
||
with app.app_context():
|
||
snaps = MonthlySnapshot.query.limit(500).all()
|
||
by_acc = {}
|
||
for s in snaps:
|
||
by_acc.setdefault(s.account_id, []).append(s)
|
||
breaks = 0
|
||
for aid, items in by_acc.items():
|
||
items.sort(key=lambda x: (x.year, x.month))
|
||
for i in range(len(items) - 1):
|
||
cur, nxt = items[i], items[i + 1]
|
||
ey, em = cur.year, cur.month
|
||
ny = ey + (1 if em == 12 else 0)
|
||
nm = 1 if em == 12 else em + 1
|
||
if (nxt.year, nxt.month) != (ny, nm):
|
||
continue
|
||
for label, end_v, prev_v in (
|
||
('正课', cur.end_lessons, nxt.prev_lessons),
|
||
('余额', cur.end_balance, nxt.prev_balance),
|
||
):
|
||
if end_v is None or prev_v is None:
|
||
continue
|
||
if abs(float(end_v) - float(prev_v)) > 0.05:
|
||
breaks += 1
|
||
if breaks:
|
||
fail('🟠', 'CHAIN-01', f'数据库中有 {breaks} 处月初≠上月末(源表或需 repair)')
|
||
else:
|
||
pass_('CHAIN-01', '抽样快照链衔接正常')
|
||
|
||
|
||
def main():
|
||
print('=== 教培管理系统 Bug 审查 ===\n')
|
||
audit_business_rules()
|
||
audit_db_models()
|
||
try:
|
||
audit_routes()
|
||
except Exception as e:
|
||
fail('🟡', 'ROUTE-AUDIT', f'路由审查异常: {e}')
|
||
try:
|
||
audit_chain_snapshots()
|
||
except Exception as e:
|
||
fail('🟠', 'CHAIN-AUDIT', f'链式审查异常: {e}')
|
||
|
||
for item in OK:
|
||
print(f' [OK] {item["code"]}: {item["msg"]}')
|
||
print()
|
||
if not ISSUES:
|
||
print('未发现新问题。')
|
||
return 0
|
||
for item in ISSUES:
|
||
print(f' [{item["level"]}] {item["code"]}: {item["msg"]}')
|
||
print(f'\n合计: {len(ISSUES)} 项待关注, {len(OK)} 项通过')
|
||
return 1
|
||
|
||
|
||
if __name__ == '__main__':
|
||
sys.exit(main())
|