This commit is contained in:
HuangHai
2026-02-06 08:21:47 +08:00
parent b943cb6b64
commit 1f4d29d5a1
4 changed files with 109 additions and 84 deletions

View File

@@ -150,15 +150,18 @@ rp['fetchSize'] = 10000 # 极大增加读取缓存,减少网络往返
# 获取表名
table_name = None
try:
# 优先尝试从 writer.parameter.connection[0].table[0] 获取
table_name = wp.get('connection', [{}])[0].get('table', [None])[0]
except Exception:
pass
if not table_name:
try:
# 其次尝试从 reader.parameter.connection[0].table[0] 获取
table_name = rp.get('connection', [{}])[0].get('table', [None])[0]
except Exception:
pass
if not table_name:
# 最后尝试从 querySql 解析
q = rp.get('connection', [{}])[0].get('querySql', [None])[0]
if q:
m = re.search(r'FROM\s+`?(\w+)`?', q, re.IGNORECASE)
@@ -166,12 +169,14 @@ if not table_name:
table_name = m.group(1)
if not table_name:
raise RuntimeError("无法确定目标表名: %s" % src_path)
# 如果还是没找到,使用文件名去掉 .json
table_name = os.path.basename(src_path).replace('.json', '')
# 获取列
columns = wp.get('column') or rp.get('column')
if not columns:
raise RuntimeError("无法确定列列表: %s" % src_path)
# 默认使用 * (虽然 DataX 不推荐,但作为兜底)
columns = ["*"]
# 3. 根据目标转换 Writer
if target == 'mysql':
@@ -194,53 +199,28 @@ if target == 'mysql':
}
}
unit['writer'] = new_writer
# 输出元数据供 Shell 使用
print(table_name)
print(json.dumps(columns))
# elif target == 'mysql_jdbc': # 备用慢速模式
# new_writer = {
# "name": "mysqlwriter",
# "parameter": {
# "connection": [
# {
# "jdbcUrl": "${dest_mysql_jdbc}",
# "table": [table_name]
# }
# ],
# "username": "${dest_mysql_user}",
# "password": "${dest_mysql_pwd}",
# "column": columns,
# "writeMode": "insert",
# "batchSize": 2048, # 增加 Batch 大小
# "preSql": [f"TRUNCATE TABLE `{table_name}`"],
# "postSql": []
# }
# }
# unit['writer'] = new_writer
# 输出元数据供 Shell 使用
print(table_name)
print(json.dumps(columns))
with open(dst_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
PY
)
if [ $? -ne 0 ]; then
echo "✗ 错误: 生成配置文件失败 - $JOB"
echo "$TRANSFORM_OUTPUT"
FAIL_COUNT=$((FAIL_COUNT + 1))
continue
fi
JOB_FILE="$TMP_FILE"
if [ "$TARGET" == "mysql" ]; then
TABLE_NAME=$(echo "$TRANSFORM_OUTPUT" | sed -n '1p')
COLUMNS_JSON=$(echo "$TRANSFORM_OUTPUT" | sed -n '2p')
# 去除可能的回车符
TABLE_NAME=$(echo "$TABLE_NAME" | tr -d '\r')
COLUMNS_JSON=$(echo "$COLUMNS_JSON" | tr -d '\r')
fi
if [ $? -ne 0 ]; then
echo "✗ 错误: 生成配置文件失败 - $JOB"
echo "$TRANSFORM_OUTPUT"
FAIL_COUNT=$((FAIL_COUNT + 1))
continue
fi
JOB_FILE="$TMP_FILE"
# 解析元数据
TABLE_NAME=$(echo "$TRANSFORM_OUTPUT" | sed -n '1p' | tr -d '\r')
COLUMNS_JSON=$(echo "$TRANSFORM_OUTPUT" | sed -n '2p' | tr -d '\r')
# ---------------------------------------------------------
# 执行 DataX
@@ -261,7 +241,7 @@ PY
com.alibaba.datax.core.Engine \
-mode standalone \
-jobid -1 \
-job "$JOB_FILE"
-job "$JOB_FILE" | sed "s/^/[$TABLE_NAME] /"
EXIT_CODE=$?

View File

@@ -129,7 +129,8 @@ if not columns:
columns = wp.get('column')
if not columns:
raise RuntimeError("无法确定列列表: %s" % src_path)
# 兜底
columns = ["*"]
# 获取表名
table_name = None
@@ -140,25 +141,31 @@ except Exception:
if not table_name:
# 尝试从 querySql 解析 (如果源文件已经是 querySql 模式)
q = rp.get('connection', [{}])[0].get('querySql', [None])[0]
if q:
m = re.search(r'FROM\s+`?(\w+)`?', q, re.IGNORECASE)
if m:
table_name = m.group(1)
try:
q = rp.get('connection', [{}])[0].get('querySql', [None])[0]
if q:
m = re.search(r'FROM\s+`?(\w+)`?', q, re.IGNORECASE)
if m:
table_name = m.group(1)
except Exception:
pass
if not table_name:
raise RuntimeError("无法确定表名: %s" % src_path)
# 兜底:使用文件名
table_name = os.path.basename(src_path).replace('.json', '')
# 构建增量查询 SQL: SELECT col1, col2... FROM table ORDER BY id DESC LIMIT 1000
# 注意: 这里假设所有表都有 id 列。如果无 id 列,可能需要特殊处理。
# 安全起见,检查 columns 中是否包含 'id'
has_id = 'id' in columns
if has_id:
# 构建列字符串,处理反引号
# 注意: 这里假设所有表都有 id 列。如果无 id 列,可能需要特殊处理。
has_id = 'id' in columns or columns == ["*"] # 如果是 *,我们尝试加上 id 排序
if has_id:
# 构建列字符串,处理反引号
if columns == ["*"]:
col_str = "*"
else:
col_str = ", ".join(["`{}`".format(c) for c in columns])
inc_sql = "SELECT {} FROM `{}` ORDER BY id DESC LIMIT 1000".format(col_str, table_name)
inc_sql = "SELECT {} FROM `{}` ORDER BY id DESC LIMIT 1000".format(col_str, table_name)
# 更新 Reader 配置
new_reader_conn = [
{
@@ -198,15 +205,22 @@ if target == 'mysql':
}
unit['writer'] = new_writer
# 输出表名供 Shell 使用
print(table_name)
with open(dst_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
PY
)
if [ $? -ne 0 ]; then
echo "✗ 错误: 生成增量配置文件失败 - $JOB"
FAIL_COUNT=$((FAIL_COUNT + 1))
continue
fi
# 提取 Python 输出的第一行作为表名
TABLE_NAME=$(echo "$TRANSFORM_OUTPUT" | sed -n '1p' | tr -d '\r')
JOB_FILE="$TMP_FILE"
# 构造 Java 命令
@@ -224,7 +238,7 @@ PY
com.alibaba.datax.core.Engine \
-mode standalone \
-jobid -1 \
-job "$JOB_FILE"
-job "$JOB_FILE" | sed "s/^/[$TABLE_NAME] /"
EXIT_CODE=$?

View File

@@ -59,12 +59,24 @@ def load_csv(jdbc_url, user, password, table, csv_dir, columns=None):
# Optimization settings
print("Setting session parameters for speed...")
cursor.execute("SET NAMES utf8mb4")
try:
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
cursor.execute("SET UNIQUE_CHECKS = 0")
cursor.execute("SET SQL_LOG_BIN = 0")
except Exception as e:
print(f"Warning: Could not set some optimization flags: {e}")
# 逐个尝试设置优化参数,避免因单个参数(如 SQL_LOG_BIN权限不足导致整体失败
opts = [
("SET FOREIGN_KEY_CHECKS = 0", "Foreign Key Checks Disabled"),
("SET UNIQUE_CHECKS = 0", "Unique Checks Disabled"),
("SET SQL_LOG_BIN = 0", "Binary Logging Disabled (Requires SUPER privilege)")
]
for sql, desc in opts:
try:
cursor.execute(sql)
print(f" - {desc}: Success")
except Exception as e:
# 如果是权限问题 (1227),打印更友好的信息
if "1227" in str(e):
print(f" - {desc}: Skipped (Insufficient privileges, but that's okay)")
else:
print(f" - {desc}: Failed ({e})")
# Truncate table
print(f"Truncating table {table}...")
@@ -111,6 +123,20 @@ def load_csv(jdbc_url, user, password, table, csv_dir, columns=None):
total_rows += rows
print(f" -> Loaded {rows} rows")
# 显示 MySQL 警告SHOW WARNINGS的功能用于排查导入差异
try:
cursor.execute("SHOW WARNINGS")
warnings = cursor.fetchall()
if warnings:
print(f" - MySQL Warnings ({len(warnings)}):")
# 最多显示前 5 条警告,避免日志过多
for i, warn in enumerate(warnings[:5]):
print(f" - {warn.get('Level', 'Warning')}: {warn.get('Message', 'Unknown error')}")
if len(warnings) > 5:
print(f" - ... and {len(warnings) - 5} more warnings")
except Exception as warn_e:
print(f" - Could not fetch warnings: {warn_e}")
conn.commit()
duration = time.time() - start_time

View File

@@ -111,24 +111,29 @@
本方案针对 MySQL 到 MySQL/Doris 的同步进行了深度榨干级的性能优化,实测效果惊人。
**1. 性能实测报告 (2026-02-06)**
* **同步速率**: 稳定在 **12,000 ~ 14,000 条/秒**
* **吞吐量**: 瞬时最高突破 **8.01 MB/s**。
* **同步耗时**: 百万级数据量可在 **1-2 分钟内** 完成全量同步
* **资源占用**: JVM 堆内存稳定在 2GB 左右G1GC 平均停顿时间 < 10ms。
**1. 性能实测报告 (2026-02-06 百万级数据实测)**
* **同步数据量**: **278 万条** (2.78 Million Records)
* **同步速率**: 稳定在 **11,500 ~ 12,000 条/秒**。
* **吞吐量**: 稳定在 **6.5 ~ 6.8 MB/s**
* **内存表现**:
* **Eden Space**: 占用约 650MB (50%),内存分配非常充裕。
* **GC 效率**: G1 回收器表现极佳72次 Young GC 总耗时仅 **0.597秒**(平均每次仅 **8ms**),对同步性能零干扰。
* **稳定性**: 全程 0 错误,`WaitWriterTime` 极低 (0.7s),写入端完全无压力。
**2. 为什么同步到 MySQL 这么牛B (核心技术揭秘)**
普通的 DataX `mysqlwriter` 使用 JDBC 插入,每秒仅几百条。本方案通过 **"降维打击"** 实现了质的飞跃:
**2. 核心技术优化揭秘**
* **消除 SSL 握手开销**:
* 在 JDBC 连接中强制指定 `useSSL=false`。
* **效果**: 既消除了日志中的安全警告,也减少了每次连接建立时的加密握手时间,在高频同步场景下提升显著。
* **Reader 侧吞吐量暴增**:
* **高并发**: 将 `channel` 提升至 **8**,并行抽取能力翻倍。
* **大缓存**: 将 `fetchSize` 提升至 **10,000**。这是解决 `WaitReaderTime` 的核心,极大减少了 DataX 与源数据库之间的交互次数。
* **CSV Load 模式 (全量核心)**:
* **原理**: 放弃传统的 `INSERT` 语句,先由 DataX 将数据以极速流式写入本地 CSV 文件
* **导入**: 调用 MySQL 引擎最底层的 `LOAD DATA LOCAL INFILE` 命令。这是 MySQL 官方最快的导入方式,绕过了 SQL 解析和预处理,直接进行二进制级的数据灌入
* **效果**: 性能比 JDBC 模式提升了 **30 倍以上**。
* **Reader 侧多线程并发**:
* 配置 `channel=4` 并行读取,配合 `fetchSize=4096` 批量抓取,彻底消除了源端读取等待。
* **增量高频 Upsert**:
* 利用 Unique Key 模型,每 5 秒进行一次 1000 条的精准“覆盖更新”,在保证实时性的同时,对数据库几乎零负载。
* 通过 Python 脚本自动将 DataX 转换为 `txtfilewriter` 生成 CSV。
* 用 MySQL 官方最快的 `LOAD DATA LOCAL INFILE` 导入,比传统 JDBC `INSERT` 快 **30倍** 以上
* **JDBC Batch 优化 (增量/JDBC 模式)**:
* 启用了 `rewriteBatchedStatements=true`,将多条 `REPLACE` 语句合并为单条执行,大幅降低网络往返。
**3. 优化参数清单**
* **Reader**: `fetchSize=4096`, `querySql` 动态生成
* **Writer**: `batchSize=2048` (JDBC), `LOAD DATA` (Full).
* **JVM**: `-Xms2g -Xmx2g -XX:+UseG1GC -XX:MaxMetaspaceSize=256m`.
**3. 最终优化参数清单**
* **Reader**: `channel=8`, `fetchSize=10000`, `useSSL=false`
* **Writer**: `batchSize=2048`, `rewriteBatchedStatements=true`, `LOAD DATA` (全量)。
* **JVM**: `-server -Xms2g -Xmx2g -XX:+UseG1GC -XX:MaxMetaspaceSize=256m`