Files
drl_2/xuexiao/import_excel.py
user9994793890 c954532f52 feat: 导入Excel数据并完成系统测试验证
Coze-Commit-Type: user
Coze-User-ID: 3722323274763196
Coze-Conversation-ID: 5260473
2026-05-29 10:55:05 +08:00

638 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Excel数据导入SQLite脚本
导入达尔琳基础数据和课时核对表数据
"""
import os
import sys
import re
from datetime import datetime, date
from decimal import Decimal
# 设置环境变量使用SQLite
os.environ['DB_TYPE'] = 'sqlite'
import openpyxl
from flask import Flask
from config import Config
from models import db, Role, User, Teacher, Student, Course, Class_, ClassStudent, StudentAccount, RechargeRecord
def parse_date(value):
"""解析日期"""
if value is None:
return None
if isinstance(value, datetime):
return value.date()
if isinstance(value, date):
return value
if isinstance(value, str):
# 处理中文日期格式如 "2026-04-01" 或 "4.5"
value = value.strip()
if re.match(r'^\d{4}-\d{2}-\d{2}', value):
return datetime.strptime(value[:10], '%Y-%m-%d').date()
elif re.match(r'^\d{1,2}\.\d{1,2}$', value):
# 格式如 4.5 表示4月5日使用当前年份
parts = value.split('.')
month, day = int(parts[0]), int(parts[1])
return date(2026, month, day)
elif re.match(r'^\d+$', value):
# Excel日期序列号
try:
return datetime.fromordinal(datetime(1900, 1, 1).toordinal() + int(value) - 2).date()
except:
pass
return None
def parse_number(value):
"""解析数字"""
if value is None:
return 0
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
value = value.strip()
if value == '' or value == '系统合并':
return 0
try:
return float(value)
except:
return 0
return 0
def clean_name(name):
"""清理姓名中的括号和备注"""
if name is None:
return ''
name = str(name).strip()
# 移除括号及其内容(如 "王莫迪(赛赛)" -> "王莫迪"
name = re.sub(r'.*?', '', name)
name = re.sub(r'\(.*?\)', '', name)
return name.strip()
def extract_nickname(full_name):
"""提取昵称"""
if full_name is None:
return ''
full_name = str(full_name).strip()
# 匹配中文括号
match = re.search(r'(.+?)', full_name)
if match:
return match.group(1).strip()
# 匹配英文括号
match = re.search(r'\((.+?)\)', full_name)
if match:
return match.group(1).strip()
return ''
def get_gender_from_remark(remark):
"""从备注中推断性别"""
if remark and '' in str(remark):
return ''
elif remark and '' in str(remark):
return ''
return ''
def create_app():
"""创建Flask应用"""
app = Flask(__name__)
app.config.from_object(Config)
db.init_app(app)
return app
def init_database(app):
"""初始化数据库表"""
with app.app_context():
db.create_all()
# 创建默认角色
if not Role.query.filter_by(name='管理员').first():
admin_role = Role(name='管理员', description='系统管理员')
db.session.add(admin_role)
if not Role.query.filter_by(name='超级管理员').first():
super_role = Role(name='超级管理员', description='超级管理员', permissions='all')
db.session.add(super_role)
# 创建默认管理员用户
if not User.query.filter_by(username='admin').first():
admin_user = User(
username='admin',
real_name='管理员',
role_id=Role.query.filter_by(name='超级管理员').first().id,
phone='13800000000'
)
admin_user.set_password('admin123')
db.session.add(admin_user)
db.session.commit()
print("数据库初始化完成")
def import_basic_data(app):
"""导入基础数据"""
print("\n" + "=" * 60)
print("开始导入基础数据...")
print("=" * 60)
with app.app_context():
# 读取Excel
excel_path = 'assets/达尔琳基础数据.xlsx'
wb = openpyxl.load_workbook(excel_path, data_only=True)
ws = wb['基础信息']
# 存储课程映射 (课程名 -> Course对象)
course_map = {}
# 存储班级映射 (班级名 -> Class_对象)
class_map = {}
# 存储老师映射 (老师名 -> Teacher对象)
teacher_map = {}
# 首先创建默认课程
default_courses = {
'scratch': Course(name='Scratch图形编程', level='scratch'),
'python': Course(name='Python编程', level='python'),
'c++': Course(name='C++编程', level='c++'),
'lab': Course(name='Lab编程', level='lab'),
'大班': Course(name='STEM大班', level='大班'),
'中班': Course(name='STEM中班', level='中班'),
'小班': Course(name='STEM小班', level='小班'),
}
for name, course in default_courses.items():
db.session.add(course)
db.session.commit()
for name, course in default_courses.items():
course_map[name] = course
print(f"已创建 {len(default_courses)} 个默认课程")
# 从Excel读取数据
students_data = []
for row_idx, row in enumerate(ws.iter_rows(min_row=3, values_only=True), 3):
if not row[1]: # 学员姓名
continue
student_name = clean_name(row[1])
if not student_name:
continue
class_name = str(row[2]).strip() if row[2] else ''
course_level = str(row[3]).strip() if row[3] else ''
nature = str(row[4]).strip() if row[4] else '自费'
# 解析新签信息
new_sign_amount = parse_number(row[11]) # 新签金额
new_sign_hours = parse_number(row[12]) # 新签课时
new_sign_gift = parse_number(row[13]) # 新签赠课
new_sign_price = parse_number(row[14]) # 新签课单价
# 解析续费信息
renewal1_amount = parse_number(row[16]) # 续费一金额
renewal1_hours = parse_number(row[17]) # 续费一课时
renewal1_gift = parse_number(row[18]) # 赠送课时一
renewal1_price = parse_number(row[19]) # 续费课单价一
renewal2_amount = parse_number(row[20]) # 续费二金额
renewal2_hours = parse_number(row[21]) # 续费二课时
renewal2_gift = parse_number(row[22]) # 赠送课时二
renewal2_price = parse_number(row[23]) # 续费课单价二
renewal3_amount = parse_number(row[24]) # 续费三金额
renewal3_hours = parse_number(row[25]) # 续费三课时
renewal3_gift = parse_number(row[26]) # 赠送课时三
renewal3_price = parse_number(row[27]) # 续费课单价三
# 剩余课时信息
remain_hours = parse_number(row[30]) # 3月份剩余课时
gift_hours = parse_number(row[31]) # 赠课
remain_total = parse_number(row[32]) # 3月份剩余总课时
unit_price = parse_number(row[33]) # 单价
remain_amount = parse_number(row[34]) # 3月末剩余金额
remark = str(row[35]) if row[35] else ''
# 提取昵称
nickname = extract_nickname(str(row[1]) if row[1] else '')
gender = get_gender_from_remark(remark)
students_data.append({
'name': student_name,
'nickname': nickname,
'class_name': class_name,
'course_level': course_level,
'nature': nature,
'gender': gender,
'remark': remark,
'new_sign_amount': new_sign_amount,
'new_sign_hours': new_sign_hours,
'new_sign_gift': new_sign_gift,
'new_sign_price': new_sign_price,
'renewal1_amount': renewal1_amount,
'renewal1_hours': renewal1_hours,
'renewal1_gift': renewal1_gift,
'renewal1_price': renewal1_price,
'renewal2_amount': renewal2_amount,
'renewal2_hours': renewal2_hours,
'renewal2_gift': renewal2_gift,
'renewal2_price': renewal2_price,
'renewal3_amount': renewal3_amount,
'renewal3_hours': renewal3_hours,
'renewal3_gift': renewal3_gift,
'renewal3_price': renewal3_price,
'remain_hours': remain_hours,
'gift_hours': gift_hours,
'remain_total': remain_total,
'unit_price': unit_price,
'remain_amount': remain_amount,
})
print(f"从基础数据表读取了 {len(students_data)} 条学员记录")
# 导入学生
student_map = {} # name -> Student
account_count = 0
for data in students_data:
# 创建或获取学生
student = Student.query.filter_by(name=data['name']).first()
if not student:
student = Student(
name=data['name'],
nickname=data['nickname'],
gender=data['gender'],
remark=data['remark'],
status=1
)
db.session.add(student)
db.session.flush()
student_map[data['name']] = student
elif data['nickname'] and not student.nickname:
student.nickname = data['nickname']
# 获取课程
course = course_map.get(data['course_level'])
if not course:
course = course_map.get('scratch') # 默认
# 创建课时账户
account = StudentAccount.query.filter_by(
student_id=student.id,
course_id=course.id
).first()
if not account:
# 计算总课时(正课+赠课)
total_normal = data['remain_hours']
total_gift = data['gift_hours']
total_consumed = 0
# 根据单价计算
if data['unit_price'] > 0:
account = StudentAccount(
student_id=student.id,
course_id=course.id,
normal_hours=total_normal,
gifted_hours=total_gift,
consumed_normal=total_consumed,
consumed_gifted=0,
unit_price=data['unit_price'],
original_price_per_lesson=data['unit_price'],
cumulative_amount=data['remain_amount'],
account_status='active'
)
else:
account = StudentAccount(
student_id=student.id,
course_id=course.id,
normal_hours=total_normal,
gifted_hours=total_gift,
consumed_normal=total_consumed,
consumed_gifted=0,
cumulative_amount=data['remain_amount'],
account_status='active'
)
db.session.add(account)
account_count += 1
# 添加充值记录
operator = User.query.filter_by(username='admin').first()
if not operator:
continue
# 新签记录
if data['new_sign_amount'] > 0:
recharge = RechargeRecord(
student_id=student.id,
course_id=course.id,
amount=data['new_sign_amount'],
normal_hours=data['new_sign_hours'],
gifted_hours=data['new_sign_gift'],
operator_id=operator.id,
remark='新签'
)
db.session.add(recharge)
# 续费记录1
if data['renewal1_amount'] > 0:
recharge = RechargeRecord(
student_id=student.id,
course_id=course.id,
amount=data['renewal1_amount'],
normal_hours=data['renewal1_hours'],
gifted_hours=data['renewal1_gift'],
operator_id=operator.id,
remark='续费一'
)
db.session.add(recharge)
# 续费记录2
if data['renewal2_amount'] > 0:
recharge = RechargeRecord(
student_id=student.id,
course_id=course.id,
amount=data['renewal2_amount'],
normal_hours=data['renewal2_hours'],
gifted_hours=data['renewal2_gift'],
operator_id=operator.id,
remark='续费二'
)
db.session.add(recharge)
# 续费记录3
if data['renewal3_amount'] > 0:
recharge = RechargeRecord(
student_id=student.id,
course_id=course.id,
amount=data['renewal3_amount'],
normal_hours=data['renewal3_hours'],
gifted_hours=data['renewal3_gift'],
operator_id=operator.id,
remark='续费三'
)
db.session.add(recharge)
db.session.commit()
print(f"导入完成: {len(student_map)} 名学生, {account_count} 个课时账户")
print(f"充值记录已添加到各学生账户")
def import_consumption_data(app):
"""导入课时核对表数据(消课记录)"""
print("\n" + "=" * 60)
print("开始导入课时核对表数据...")
print("=" * 60)
with app.app_context():
from models import ConsumptionRecord
excel_path = 'assets/课时核对表_2026_4月份.xlsx'
wb = openpyxl.load_workbook(excel_path, data_only=True)
total_consumption = 0
# 处理每个sheet
for sheet_name in wb.sheetnames:
if sheet_name in ['研学名单', '4月份转账', '校园通转账明细']:
continue # 跳过转账相关sheet
ws = wb[sheet_name]
print(f"\n处理 Sheet: {sheet_name}")
# 根据sheet类型确定列位置
if sheet_name == 'code学龄后':
# 列: A序号 B学员姓名 C课程级别 D3月份剩余课时 E赠送课时 F3月剩余总课时
# G单价 H3月末剩余金额 I新签金额 J新签课时 K新签赠送课时 L续费金额 M续费课时
# N续费赠送课时 O4月份已上课时 P4月份已上金额 Q退费课时 R账号费
# S退赠送课时 T退费金额 U4月份剩余课时 V4月剩余总课时 W4月末剩余金额
# X4月份上课情况 AA备注
pass
# 遍历数据行
for row_idx, row in enumerate(ws.iter_rows(min_row=3, values_only=True), 3):
if not row[1]: # 学员姓名
continue
student_name = clean_name(row[1])
if not student_name:
continue
# 找到学生
student = Student.query.filter_by(name=student_name).first()
if not student:
print(f" 警告: 未找到学生 '{student_name}'")
continue
# 解析上课日期
# 根据sheet类型找到上课情况列
if sheet_name == 'code学龄后':
attendance_dates = [] # 从X列开始是上课日期
for col_idx in range(23, 31): # X到AE列
if col_idx < len(row) and row[col_idx]:
date_val = row[col_idx]
if isinstance(date_val, str):
# 格式如 "4.5", "4.12", "4.19(补)"
date_str = re.sub(r'.*?', '', date_val).strip()
parsed_date = parse_date(date_str)
if parsed_date:
attendance_dates.append(parsed_date)
elif isinstance(date_val, (datetime, date)):
if isinstance(date_val, datetime):
attendance_dates.append(date_val.date())
else:
attendance_dates.append(date_val)
# 获取已上课时
consumed_hours = parse_number(row[14] if len(row) > 14 else 0) # O列
# 创建消课记录
if attendance_dates:
for consume_date in attendance_dates:
consumption = ConsumptionRecord(
student_id=student.id,
course_id=student.accounts[0].course_id if student.accounts else 1,
hours_consumed=1, # 每次课1课时
consume_type='normal',
normal_consumed=1,
gifted_consumed=0,
operator_id=User.query.filter_by(username='admin').first().id,
consume_date=consume_date,
remark=f'{sheet_name}导入'
)
db.session.add(consumption)
total_consumption += 1
elif sheet_name == 'stem学龄前':
attendance_dates = []
for col_idx in range(22, 31): # W列开始
if col_idx < len(row) and row[col_idx]:
date_val = row[col_idx]
if isinstance(date_val, str):
date_str = re.sub(r'.*?', '', date_val).strip()
parsed_date = parse_date(date_str)
if parsed_date:
attendance_dates.append(parsed_date)
elif isinstance(date_val, (datetime, date)):
if isinstance(date_val, datetime):
attendance_dates.append(date_val.date())
else:
attendance_dates.append(date_val)
if attendance_dates:
for consume_date in attendance_dates:
consumption = ConsumptionRecord(
student_id=student.id,
course_id=student.accounts[0].course_id if student.accounts else 2,
hours_consumed=1,
consume_type='normal',
normal_consumed=1,
gifted_consumed=0,
operator_id=User.query.filter_by(username='admin').first().id,
consume_date=consume_date,
remark=f'{sheet_name}导入'
)
db.session.add(consumption)
total_consumption += 1
elif sheet_name == '学龄后lab':
attendance_dates = []
for col_idx in range(15, 26): # P列开始
if col_idx < len(row) and row[col_idx]:
date_val = row[col_idx]
if isinstance(date_val, str):
date_str = re.sub(r'.*?', '', date_val).strip()
parsed_date = parse_date(date_str)
if parsed_date:
attendance_dates.append(parsed_date)
elif isinstance(date_val, (datetime, date)):
if isinstance(date_val, datetime):
attendance_dates.append(date_val.date())
else:
attendance_dates.append(date_val)
if attendance_dates:
for consume_date in attendance_dates:
consumption = ConsumptionRecord(
student_id=student.id,
course_id=student.accounts[0].course_id if student.accounts else 4,
hours_consumed=1,
consume_type='normal',
normal_consumed=1,
gifted_consumed=0,
operator_id=User.query.filter_by(username='admin').first().id,
consume_date=consume_date,
remark=f'{sheet_name}导入'
)
db.session.add(consumption)
total_consumption += 1
elif sheet_name == '免费':
attendance_dates = []
for col_idx in range(6, 12): # G列开始是上课日期
if col_idx < len(row) and row[col_idx]:
date_val = row[col_idx]
if isinstance(date_val, str):
date_str = re.sub(r'.*?', '', date_val).strip()
# 免费课日期格式可能不同
if re.match(r'^\d{1,2}\.\d{1,2}', date_str):
parts = date_str.split('.')
month, day = int(parts[0]), int(parts[1][:2])
parsed_date = date(2026, month, day)
attendance_dates.append(parsed_date)
elif isinstance(date_val, (datetime, date)):
if isinstance(date_val, datetime):
attendance_dates.append(date_val.date())
else:
attendance_dates.append(date_val)
if attendance_dates:
for consume_date in attendance_dates:
consumption = ConsumptionRecord(
student_id=student.id,
course_id=1, # 免费课
hours_consumed=1,
consume_type='gift',
normal_consumed=0,
gifted_consumed=1,
operator_id=User.query.filter_by(username='admin').first().id,
consume_date=consume_date,
remark=f'{sheet_name}导入'
)
db.session.add(consumption)
total_consumption += 1
db.session.commit()
print(f"\n消课记录导入完成: {total_consumption} 条记录")
def verify_data(app):
"""验证数据完整性"""
print("\n" + "=" * 60)
print("数据完整性验证...")
print("=" * 60)
with app.app_context():
student_count = Student.query.count()
account_count = StudentAccount.query.count()
recharge_count = RechargeRecord.query.count()
consumption_count = ConsumptionRecord.query.count() if 'ConsumptionRecord' in dir() else 0
course_count = Course.query.count()
print(f"\n数据统计:")
print(f" - 学生总数: {student_count}")
print(f" - 课时账户: {account_count}")
print(f" - 充值记录: {recharge_count}")
print(f" - 课程数量: {course_count}")
# 验证消课记录
from models import ConsumptionRecord
consumption_count = ConsumptionRecord.query.count()
print(f" - 消课记录: {consumption_count}")
# 按课程统计学生
print("\n按课程统计:")
for course in Course.query.all():
count = StudentAccount.query.filter_by(course_id=course.id).count()
print(f" - {course.name}: {count} 名学生")
# 显示部分学生数据
print("\n部分学生数据示例:")
for student in Student.query.limit(5).all():
print(f" - {student.name} (ID:{student.id})")
for account in student.accounts:
print(f" 课程: {account.course.name}, 正课: {account.normal_hours}, 赠课: {account.gifted_hours}, 单价: {account.unit_price}")
def main():
"""主函数"""
print("=" * 60)
print("达尔琳数据导入工具")
print("=" * 60)
# 创建应用
app = create_app()
# 初始化数据库
init_database(app)
# 导入基础数据
import_basic_data(app)
# 导入课时核对表数据
import_consumption_data(app)
# 验证数据
verify_data(app)
print("\n" + "=" * 60)
print("导入完成!")
print("=" * 60)
if __name__ == '__main__':
main()