import asyncio import os import sys import logging # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 将项目根目录添加到系统路径,以便能够导入 DbKit 和 Config 等模块 project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if project_root not in sys.path: sys.path.append(project_root) from Util import Win32Patch # 应用 Windows 平台的异步策略补丁 Win32Patch.patch() from DbKit.Db import Db from Config.Config import DB_URL async def init_tables(): """ 初始化数据库表结构:读取 SQL 文件并逐条执行。 """ logger.info("正在初始化数据库连接...") db = Db(db_url=DB_URL) await db.init_db() logger.info("正在读取 SQL 文件...") # SQL 文件路径:相对于当前脚本所在目录 (DbKit/) sql_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'Sql/doris_ddl.sql') logger.info(f"使用 SQL 文件: {sql_path}") with open(sql_path, 'r', encoding='utf-8') as f: sql_content = f.read() # 使用分号分割 SQL 语句,并过滤掉空行和空白语句 statements = [s.strip() for s in sql_content.split(';') if s.strip()] logger.info(f"共找到 {len(statements)} 条 SQL 语句。") async with await db.get_session() as session: for i, stmt in enumerate(statements): logger.info(f"正在执行第 {i + 1} 条语句...") # 使用 SQLAlchemy 的 text() 函数执行原生 SQL from sqlalchemy import text try: await session.execute(text(stmt)) logger.info(f"第 {i + 1} 条语句执行成功。") except Exception as e: logger.error(f"执行第 {i + 1} 条语句时出错: {e}") # 虽然大多数数据库的 DDL 语句是自动提交的,但显式提交是一个好习惯 await session.commit() logger.info("数据库表初始化完成。") # 优雅地关闭数据库引擎并释放连接 await db.engine.dispose() if __name__ == "__main__": # 再次确保补丁已应用 Win32Patch.patch() # 运行异步初始化函数 asyncio.run(init_tables())