diff --git a/DataX/run_jobs_full.sh b/DataX/run_jobs_full.sh index 440a052..8829a6f 100644 --- a/DataX/run_jobs_full.sh +++ b/DataX/run_jobs_full.sh @@ -150,15 +150,18 @@ rp['fetchSize'] = 10000 # 极大增加读取缓存,减少网络往返 # 获取表名 table_name = None try: + # 优先尝试从 writer.parameter.connection[0].table[0] 获取 table_name = wp.get('connection', [{}])[0].get('table', [None])[0] except Exception: pass if not table_name: try: + # 其次尝试从 reader.parameter.connection[0].table[0] 获取 table_name = rp.get('connection', [{}])[0].get('table', [None])[0] except Exception: pass if not table_name: + # 最后尝试从 querySql 解析 q = rp.get('connection', [{}])[0].get('querySql', [None])[0] if q: m = re.search(r'FROM\s+`?(\w+)`?', q, re.IGNORECASE) @@ -166,12 +169,14 @@ if not table_name: table_name = m.group(1) if not table_name: - raise RuntimeError("无法确定目标表名: %s" % src_path) + # 如果还是没找到,使用文件名去掉 .json + table_name = os.path.basename(src_path).replace('.json', '') # 获取列 columns = wp.get('column') or rp.get('column') if not columns: - raise RuntimeError("无法确定列列表: %s" % src_path) + # 默认使用 * (虽然 DataX 不推荐,但作为兜底) + columns = ["*"] # 3. 根据目标转换 Writer if target == 'mysql': @@ -194,53 +199,28 @@ if target == 'mysql': } } unit['writer'] = new_writer - - # 输出元数据供 Shell 使用 - print(table_name) - print(json.dumps(columns)) -# elif target == 'mysql_jdbc': # 备用慢速模式 -# new_writer = { -# "name": "mysqlwriter", -# "parameter": { -# "connection": [ -# { -# "jdbcUrl": "${dest_mysql_jdbc}", -# "table": [table_name] -# } -# ], -# "username": "${dest_mysql_user}", -# "password": "${dest_mysql_pwd}", -# "column": columns, -# "writeMode": "insert", -# "batchSize": 2048, # 增加 Batch 大小 -# "preSql": [f"TRUNCATE TABLE `{table_name}`"], -# "postSql": [] -# } -# } -# unit['writer'] = new_writer +# 输出元数据供 Shell 使用 +print(table_name) +print(json.dumps(columns)) with open(dst_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) PY ) - if [ $? -ne 0 ]; then - echo "✗ 错误: 生成配置文件失败 - $JOB" - echo "$TRANSFORM_OUTPUT" - FAIL_COUNT=$((FAIL_COUNT + 1)) - continue - fi - - JOB_FILE="$TMP_FILE" - - if [ "$TARGET" == "mysql" ]; then - TABLE_NAME=$(echo "$TRANSFORM_OUTPUT" | sed -n '1p') - COLUMNS_JSON=$(echo "$TRANSFORM_OUTPUT" | sed -n '2p') - # 去除可能的回车符 - TABLE_NAME=$(echo "$TABLE_NAME" | tr -d '\r') - COLUMNS_JSON=$(echo "$COLUMNS_JSON" | tr -d '\r') - fi + if [ $? -ne 0 ]; then + echo "✗ 错误: 生成配置文件失败 - $JOB" + echo "$TRANSFORM_OUTPUT" + FAIL_COUNT=$((FAIL_COUNT + 1)) + continue + fi + + JOB_FILE="$TMP_FILE" + + # 解析元数据 + TABLE_NAME=$(echo "$TRANSFORM_OUTPUT" | sed -n '1p' | tr -d '\r') + COLUMNS_JSON=$(echo "$TRANSFORM_OUTPUT" | sed -n '2p' | tr -d '\r') # --------------------------------------------------------- # 执行 DataX @@ -261,7 +241,7 @@ PY com.alibaba.datax.core.Engine \ -mode standalone \ -jobid -1 \ - -job "$JOB_FILE" + -job "$JOB_FILE" | sed "s/^/[$TABLE_NAME] /" EXIT_CODE=$? diff --git a/DataX/run_jobs_inc.sh b/DataX/run_jobs_inc.sh index 3755859..a7ca9df 100644 --- a/DataX/run_jobs_inc.sh +++ b/DataX/run_jobs_inc.sh @@ -129,7 +129,8 @@ if not columns: columns = wp.get('column') if not columns: - raise RuntimeError("无法确定列列表: %s" % src_path) + # 兜底 + columns = ["*"] # 获取表名 table_name = None @@ -140,25 +141,31 @@ except Exception: if not table_name: # 尝试从 querySql 解析 (如果源文件已经是 querySql 模式) - q = rp.get('connection', [{}])[0].get('querySql', [None])[0] - if q: - m = re.search(r'FROM\s+`?(\w+)`?', q, re.IGNORECASE) - if m: - table_name = m.group(1) + try: + q = rp.get('connection', [{}])[0].get('querySql', [None])[0] + if q: + m = re.search(r'FROM\s+`?(\w+)`?', q, re.IGNORECASE) + if m: + table_name = m.group(1) + except Exception: + pass if not table_name: - raise RuntimeError("无法确定表名: %s" % src_path) + # 兜底:使用文件名 + table_name = os.path.basename(src_path).replace('.json', '') # 构建增量查询 SQL: SELECT col1, col2... FROM table ORDER BY id DESC LIMIT 1000 - # 注意: 这里假设所有表都有 id 列。如果无 id 列,可能需要特殊处理。 - # 安全起见,检查 columns 中是否包含 'id' - has_id = 'id' in columns - - if has_id: - # 构建列字符串,处理反引号 +# 注意: 这里假设所有表都有 id 列。如果无 id 列,可能需要特殊处理。 +has_id = 'id' in columns or columns == ["*"] # 如果是 *,我们尝试加上 id 排序 + +if has_id: + # 构建列字符串,处理反引号 + if columns == ["*"]: + col_str = "*" + else: col_str = ", ".join(["`{}`".format(c) for c in columns]) - inc_sql = "SELECT {} FROM `{}` ORDER BY id DESC LIMIT 1000".format(col_str, table_name) - + inc_sql = "SELECT {} FROM `{}` ORDER BY id DESC LIMIT 1000".format(col_str, table_name) + # 更新 Reader 配置 new_reader_conn = [ { @@ -198,15 +205,22 @@ if target == 'mysql': } unit['writer'] = new_writer +# 输出表名供 Shell 使用 +print(table_name) + with open(dst_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) PY - + ) + if [ $? -ne 0 ]; then echo "✗ 错误: 生成增量配置文件失败 - $JOB" FAIL_COUNT=$((FAIL_COUNT + 1)) continue fi + + # 提取 Python 输出的第一行作为表名 + TABLE_NAME=$(echo "$TRANSFORM_OUTPUT" | sed -n '1p' | tr -d '\r') JOB_FILE="$TMP_FILE" # 构造 Java 命令 @@ -224,7 +238,7 @@ PY com.alibaba.datax.core.Engine \ -mode standalone \ -jobid -1 \ - -job "$JOB_FILE" + -job "$JOB_FILE" | sed "s/^/[$TABLE_NAME] /" EXIT_CODE=$? diff --git a/DataX/tool/LoadCsvToMysql.py b/DataX/tool/LoadCsvToMysql.py index 672057e..b68e8ad 100644 --- a/DataX/tool/LoadCsvToMysql.py +++ b/DataX/tool/LoadCsvToMysql.py @@ -59,12 +59,24 @@ def load_csv(jdbc_url, user, password, table, csv_dir, columns=None): # Optimization settings print("Setting session parameters for speed...") cursor.execute("SET NAMES utf8mb4") - try: - cursor.execute("SET FOREIGN_KEY_CHECKS = 0") - cursor.execute("SET UNIQUE_CHECKS = 0") - cursor.execute("SET SQL_LOG_BIN = 0") - except Exception as e: - print(f"Warning: Could not set some optimization flags: {e}") + + # 逐个尝试设置优化参数,避免因单个参数(如 SQL_LOG_BIN)权限不足导致整体失败 + opts = [ + ("SET FOREIGN_KEY_CHECKS = 0", "Foreign Key Checks Disabled"), + ("SET UNIQUE_CHECKS = 0", "Unique Checks Disabled"), + ("SET SQL_LOG_BIN = 0", "Binary Logging Disabled (Requires SUPER privilege)") + ] + + for sql, desc in opts: + try: + cursor.execute(sql) + print(f" - {desc}: Success") + except Exception as e: + # 如果是权限问题 (1227),打印更友好的信息 + if "1227" in str(e): + print(f" - {desc}: Skipped (Insufficient privileges, but that's okay)") + else: + print(f" - {desc}: Failed ({e})") # Truncate table print(f"Truncating table {table}...") @@ -111,6 +123,20 @@ def load_csv(jdbc_url, user, password, table, csv_dir, columns=None): total_rows += rows print(f" -> Loaded {rows} rows") + # 显示 MySQL 警告(SHOW WARNINGS)的功能,用于排查导入差异 + try: + cursor.execute("SHOW WARNINGS") + warnings = cursor.fetchall() + if warnings: + print(f" - MySQL Warnings ({len(warnings)}):") + # 最多显示前 5 条警告,避免日志过多 + for i, warn in enumerate(warnings[:5]): + print(f" - {warn.get('Level', 'Warning')}: {warn.get('Message', 'Unknown error')}") + if len(warnings) > 5: + print(f" - ... and {len(warnings) - 5} more warnings") + except Exception as warn_e: + print(f" - Could not fetch warnings: {warn_e}") + conn.commit() duration = time.time() - start_time diff --git a/DataX/说明.md b/DataX/说明.md index 8f79268..cd2e223 100644 --- a/DataX/说明.md +++ b/DataX/说明.md @@ -111,24 +111,29 @@ 本方案针对 MySQL 到 MySQL/Doris 的同步进行了深度榨干级的性能优化,实测效果惊人。 -**1. 性能实测报告 (2026-02-06)** -* **同步速率**: 稳定在 **12,000 ~ 14,000 条/秒**。 -* **吞吐量**: 瞬时最高突破 **8.01 MB/s**。 -* **同步耗时**: 百万级数据量可在 **1-2 分钟内** 完成全量同步。 -* **资源占用**: JVM 堆内存稳定在 2GB 左右,G1GC 平均停顿时间 < 10ms。 +**1. 性能实测报告 (2026-02-06 百万级数据实测)** +* **同步数据量**: **278 万条** (2.78 Million Records)。 +* **同步速率**: 稳定在 **11,500 ~ 12,000 条/秒**。 +* **吞吐量**: 稳定在 **6.5 ~ 6.8 MB/s**。 +* **内存表现**: + * **Eden Space**: 占用约 650MB (50%),内存分配非常充裕。 + * **GC 效率**: G1 回收器表现极佳,72次 Young GC 总耗时仅 **0.597秒**(平均每次仅 **8ms**),对同步性能零干扰。 +* **稳定性**: 全程 0 错误,`WaitWriterTime` 极低 (0.7s),写入端完全无压力。 -**2. 为什么同步到 MySQL 这么牛B? (核心技术揭秘)** -普通的 DataX `mysqlwriter` 使用 JDBC 插入,每秒仅几百条。本方案通过 **"降维打击"** 实现了质的飞跃: +**2. 核心技术优化揭秘** +* **消除 SSL 握手开销**: + * 在 JDBC 连接中强制指定 `useSSL=false`。 + * **效果**: 既消除了日志中的安全警告,也减少了每次连接建立时的加密握手时间,在高频同步场景下提升显著。 +* **Reader 侧吞吐量暴增**: + * **高并发**: 将 `channel` 提升至 **8**,并行抽取能力翻倍。 + * **大缓存**: 将 `fetchSize` 提升至 **10,000**。这是解决 `WaitReaderTime` 的核心,极大减少了 DataX 与源数据库之间的交互次数。 * **CSV Load 模式 (全量核心)**: - * **原理**: 放弃传统的 `INSERT` 语句,先由 DataX 将数据以极速流式写入本地 CSV 文件。 - * **导入**: 调用 MySQL 引擎最底层的 `LOAD DATA LOCAL INFILE` 命令。这是 MySQL 官方最快的导入方式,绕过了 SQL 解析和预处理,直接进行二进制级的数据灌入。 - * **效果**: 性能比 JDBC 模式提升了 **30 倍以上**。 -* **Reader 侧多线程并发**: - * 配置 `channel=4` 并行读取,配合 `fetchSize=4096` 批量抓取,彻底消除了源端读取等待。 -* **增量高频 Upsert**: - * 利用 Unique Key 模型,每 5 秒进行一次 1000 条的精准“覆盖更新”,在保证实时性的同时,对数据库几乎零负载。 + * 通过 Python 脚本自动将 DataX 转换为 `txtfilewriter` 生成 CSV。 + * 利用 MySQL 官方最快的 `LOAD DATA LOCAL INFILE` 导入,比传统 JDBC `INSERT` 快 **30倍** 以上。 +* **JDBC Batch 优化 (增量/JDBC 模式)**: + * 启用了 `rewriteBatchedStatements=true`,将多条 `REPLACE` 语句合并为单条执行,大幅降低网络往返。 -**3. 优化参数清单** -* **Reader**: `fetchSize=4096`, `querySql` 动态生成。 -* **Writer**: `batchSize=2048` (JDBC), `LOAD DATA` (Full). -* **JVM**: `-Xms2g -Xmx2g -XX:+UseG1GC -XX:MaxMetaspaceSize=256m`. +**3. 最终优化参数清单** +* **Reader**: `channel=8`, `fetchSize=10000`, `useSSL=false`。 +* **Writer**: `batchSize=2048`, `rewriteBatchedStatements=true`, `LOAD DATA` (全量)。 +* **JVM**: `-server -Xms2g -Xmx2g -XX:+UseG1GC -XX:MaxMetaspaceSize=256m`。