Files
drl_2/xuexiao/scripts/run_bug_audit.py
user9994793890 ee860ce0ae Initial commit
2026-05-29 10:28:07 +08:00

190 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
全系统 Bug 审查脚本(对应 全系统Bug审查指令.md
用法: python scripts/run_bug_audit.py
"""
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
ISSUES = []
OK = []
def fail(level, code, msg):
ISSUES.append({'level': level, 'code': code, 'msg': msg})
def pass_(code, msg):
OK.append({'code': code, 'msg': msg})
def audit_routes():
from routes import register_all_routes
from app import create_app
app = create_app()
sensitive = {
'refund_add': 'refund_add',
'consumption_add': 'consumption_add',
'consumption_batch': 'consumption_add',
'recharge_add': 'recharge_add',
'transfer_add': 'transfer_add',
'keshibiao_import': 'keshibiao_import',
}
with app.app_context():
for rule in app.url_map.iter_rules():
ep = rule.endpoint
if ep not in sensitive:
continue
vf = app.view_functions.get(ep)
if not vf:
continue
src = (vf.__doc__ or '') + str(vf)
# 检查装饰器链(简化:看函数 __wrapped__
fn = vf
names = []
while fn:
names.append(getattr(fn, '__name__', ''))
fn = getattr(fn, '__wrapped__', None)
if 'permission_required' not in str(names) and 'decorated' not in str(names):
# 读取源码文件更可靠
import inspect
try:
mod = inspect.getmodule(vf)
path = inspect.getfile(vf)
text = Path(path).read_text(encoding='utf-8')
block = text.split(f"endpoint='{ep}'")[0].split('def ')[-1]
except Exception:
text = ''
if f"endpoint='{ep}'" in text:
idx = text.find(f"endpoint='{ep}'")
chunk = text[max(0, idx - 400):idx + 50]
if 'permission_required' not in chunk:
fail('🟡', f'PERM-{ep}', f'路由 {ep} 可能缺少 permission_required')
else:
pass_(f'PERM-{ep}', f'{ep} 已配置权限')
else:
pass_(f'PERM-{ep}', f'{ep} 已检查')
else:
pass_(f'PERM-{ep}', f'{ep} 权限装饰器存在')
def audit_business_rules():
import business_rules as r
import inspect
src = inspect.getsource(r.deduct_hours)
if 'gifted_hours' in src and 'normal_hours' in src:
pass_('BR-01', 'deduct_hours 含正课/赠课分支')
if 'consumed_normal' in inspect.getsource(r.calc_transfer_to_hours):
fail('🔴', 'BR-02', '转课换算仍引用 consumed_normal')
tsrc = Path(ROOT / 'routes' / 'transfer.py').read_text(encoding='utf-8')
if 'consumed_normal' in tsrc and '+ transfer_hours' in tsrc:
fail('🔴', 'BR-03', 'transfer.py 仍将转课计入 consumed_normal')
else:
pass_('BR-03', '转课不污染 consumed_normal')
rsrc = Path(ROOT / 'routes' / 'refund.py').read_text(encoding='utf-8')
if rsrc.count('promo_clawback_hours') > 2 and 'apply_refund' in rsrc:
# 路由层不应重复扣赠课
lines = [l for l in rsrc.splitlines() if 'gifted_hours' in l and 'promo' in l]
dup = [l for l in rsrc.splitlines() if 'account.gifted_hours' in l and 'promo_clawback' in l]
if dup:
fail('🔴', 'BR-04', 'refund.py 可能重复扣赠课')
else:
pass_('BR-04', '退费赠课扣回仅在 apply_refund')
else:
pass_('BR-04', '退费路由无重复扣赠课')
if hasattr(r, 'propagate_snapshot_chain'):
pass_('BR-05', '已实现快照链式 propagate')
else:
fail('🟠', 'BR-05', '缺少 propagate_snapshot_chain')
if hasattr(r, 'calc_consumed_hours_value'):
pass_('BR-06', '消课金额按记录时点单价核算')
else:
fail('🟠', 'BR-06', '退费仍用单一单价×consumed_normal')
def audit_db_models():
from models import ConsumptionRecord, StudentAccount, RechargeRecord, RefundRecord
cols = {c.name for c in ConsumptionRecord.__table__.columns}
if 'unit_price_at_consume' in cols:
pass_('DB-01', 'ConsumptionRecord.unit_price_at_consume 存在')
else:
fail('🟠', 'DB-01', '缺少 unit_price_at_consume 列')
if 'gift_clawed_back' in {c.name for c in RechargeRecord.__table__.columns}:
pass_('DB-02', 'gift_clawed_back 存在')
else:
fail('🟠', 'DB-02', '缺少 gift_clawed_back')
acc_cols = {c.name for c in StudentAccount.__table__.columns}
if 'unit_price' in acc_cols:
pass_('DB-03', 'student_accounts.unit_price 存在')
def audit_chain_snapshots():
from app import create_app
from models import MonthlySnapshot
app = create_app()
with app.app_context():
snaps = MonthlySnapshot.query.limit(500).all()
by_acc = {}
for s in snaps:
by_acc.setdefault(s.account_id, []).append(s)
breaks = 0
for aid, items in by_acc.items():
items.sort(key=lambda x: (x.year, x.month))
for i in range(len(items) - 1):
cur, nxt = items[i], items[i + 1]
ey, em = cur.year, cur.month
ny = ey + (1 if em == 12 else 0)
nm = 1 if em == 12 else em + 1
if (nxt.year, nxt.month) != (ny, nm):
continue
for label, end_v, prev_v in (
('正课', cur.end_lessons, nxt.prev_lessons),
('余额', cur.end_balance, nxt.prev_balance),
):
if end_v is None or prev_v is None:
continue
if abs(float(end_v) - float(prev_v)) > 0.05:
breaks += 1
if breaks:
fail('🟠', 'CHAIN-01', f'数据库中有 {breaks} 处月初≠上月末(源表或需 repair')
else:
pass_('CHAIN-01', '抽样快照链衔接正常')
def main():
print('=== 教培管理系统 Bug 审查 ===\n')
audit_business_rules()
audit_db_models()
try:
audit_routes()
except Exception as e:
fail('🟡', 'ROUTE-AUDIT', f'路由审查异常: {e}')
try:
audit_chain_snapshots()
except Exception as e:
fail('🟠', 'CHAIN-AUDIT', f'链式审查异常: {e}')
for item in OK:
print(f' [OK] {item["code"]}: {item["msg"]}')
print()
if not ISSUES:
print('未发现新问题。')
return 0
for item in ISSUES:
print(f' [{item["level"]}] {item["code"]}: {item["msg"]}')
print(f'\n合计: {len(ISSUES)} 项待关注, {len(OK)} 项通过')
return 1
if __name__ == '__main__':
sys.exit(main())