This commit is contained in:
HuangHai
2026-02-06 07:42:45 +08:00
parent 3a6c19e850
commit e08b7af675
5 changed files with 350 additions and 42 deletions

View File

@@ -4,13 +4,23 @@ export PATH=$PATH:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin
[ -f ~/.bash_profile ] && . ~/.bash_profile
[ -f ~/.profile ] && . ~/.profile
TARGET=${1:-doris} # 默认为 doris
TARGET=$1
if [ -z "$TARGET" ]; then
echo "用法: $0 <target>"
echo "请指定同步目标:"
echo " doris - 全量同步到 Doris (默认端口 9030)"
echo " mysql - 全量同步到 MySQL (CSV Load 模式,高性能)"
# echo " mysql_jdbc - 全量同步到 MySQL (JDBC Batch 模式,速度较慢)"
exit 1
fi
DATAX_HOME="/usr/local/datax"
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
CONF_DIR="$SCRIPT_DIR/json" # 更新为统一目录
BIN_DIR="$DATAX_HOME/bin"
DATAX_PY="$BIN_DIR/datax.py"
TOOL_DIR="$SCRIPT_DIR/tool"
JOBS=(
"t_equipment_charge_order.json"
@@ -38,9 +48,14 @@ if [ "$TARGET" == "doris" ]; then
DEST_PARAMS="-Ddest_user=root -Ddest_pwd=DsideaL147258369 -Ddest_load_url=10.10.14.204:8030 -Ddest_jdbc=jdbc:mysql://10.10.14.204:9030/yltcharge"
PARAMS="$SRC_PARAMS $DEST_PARAMS"
elif [ "$TARGET" == "mysql" ]; then
echo "模式: MySQL 全量同步 (10.10.14.210:22066)"
DEST_PARAMS="-Ddest_mysql_user=ylt -Ddest_mysql_pwd=Ycharge666 -Ddest_mysql_jdbc=jdbc:mysql://10.10.14.210:22066/yltcharge?useUnicode=true&characterEncoding=UTF-8&useSSL=false&rewriteBatchedStatements=true&autoReconnect=true&failOverReadOnly=false"
echo "模式: MySQL 全量同步 (CSV Load)"
# 注意: CSV Load 模式下 DataX 负责写文件Python 负责 Load。JDBC 参数用于 Python 连接。
DEST_PARAMS="-Ddest_mysql_user=ylt -Ddest_mysql_pwd=Ycharge666 -Ddest_mysql_jdbc=jdbc:mysql://10.10.14.210:22066/yltcharge?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowLoadLocalInfile=true"
PARAMS="$SRC_PARAMS $DEST_PARAMS"
# elif [ "$TARGET" == "mysql_jdbc" ]; then
# echo "模式: MySQL 全量同步 (JDBC Batch)"
# DEST_PARAMS="-Ddest_mysql_user=ylt -Ddest_mysql_pwd=Ycharge666 -Ddest_mysql_jdbc=jdbc:mysql://10.10.14.210:22066/yltcharge?useUnicode=true&characterEncoding=UTF-8&useSSL=false&rewriteBatchedStatements=true&autoReconnect=true&failOverReadOnly=false"
# PARAMS="$SRC_PARAMS $DEST_PARAMS"
else
echo "错误: 未知目标 '$TARGET'。请使用 'doris' 或 'mysql'。"
exit 1
@@ -64,6 +79,11 @@ FAIL_COUNT=0
TOTAL=${#JOBS[@]}
CURRENT=0
# 清理旧的 CSV 临时目录
if [ "$TARGET" == "mysql" ]; then
rm -rf "$SCRIPT_DIR/datax_tmp_csv"
fi
for JOB in "${JOBS[@]}"; do
CURRENT=$((CURRENT + 1))
echo "任务 [$CURRENT/$TOTAL] $JOB"
@@ -77,24 +97,56 @@ for JOB in "${JOBS[@]}"; do
START_TIME=$(date +%s)
JOB_FILE="$JOB"
# 变量用于存储 mysql_load 模式下的元数据
TABLE_NAME=""
COLUMNS_JSON=""
CSV_DIR=""
# 如果是 MySQL需要生成临时配置文件修改 Writer
# ---------------------------------------------------------
# 配置转换逻辑
# ---------------------------------------------------------
TMP_FILE="/tmp/datax_optimized_${TARGET}_${JOB}"
# 传递目标类型和 CSV 输出目录给 Python
export DATAX_TARGET="$TARGET"
if [ "$TARGET" == "mysql" ]; then
TMP_FILE="/tmp/datax_mysql_full_${JOB}"
/usr/bin/python - <<'PY' "$JOB" "$TMP_FILE"
import json, sys, re
CSV_DIR="$SCRIPT_DIR/datax_tmp_csv/${JOB%.json}"
export DATAX_CSV_DIR="$CSV_DIR"
fi
# 执行 Python 转换脚本并捕获输出
TRANSFORM_OUTPUT=$(/usr/bin/python - <<'PY' "$JOB" "$TMP_FILE"
import json, sys, re, os
src_path = sys.argv[1]
dst_path = sys.argv[2]
target = os.environ.get('DATAX_TARGET', 'doris')
csv_dir = os.environ.get('DATAX_CSV_DIR', '')
with open(src_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 1. 基础性能优化 (设置并发和内存限制)
if 'setting' not in data['job']:
data['job']['setting'] = {}
data['job']['setting']['speed'] = {
"channel": 4, # 提高并发到 4加速 Reader 读取
}
data['job']['setting']['errorLimit'] = {
"record": 0
}
unit = data['job']['content'][0]
reader = unit.get('reader', {})
writer = unit.get('writer', {})
rp = reader.get('parameter', {})
wp = writer.get('parameter', {})
# 2. Reader 优化 (FetchSize)
rp['fetchSize'] = 4096 # 进一步增加读取缓存,减少网络往返
# 获取表名
table_name = None
try:
@@ -121,41 +173,84 @@ columns = wp.get('column') or rp.get('column')
if not columns:
raise RuntimeError("无法确定列列表: %s" % src_path)
new_writer = {
"name": "mysqlwriter",
"parameter": {
"connection": [
{
"jdbcUrl": "${dest_mysql_jdbc}",
"table": [table_name]
}
],
"username": "${dest_mysql_user}",
"password": "${dest_mysql_pwd}",
"column": columns,
"writeMode": "insert",
"batchSize": 2000,
"preSql": [f"TRUNCATE TABLE `{table_name}`"],
"postSql": []
# 3. 根据目标转换 Writer
if target == 'mysql':
if not os.path.exists(csv_dir):
os.makedirs(csv_dir)
new_writer = {
"name": "txtfilewriter",
"parameter": {
"path": csv_dir,
"fileName": table_name,
"writeMode": "truncate",
"fileFormat": "csv",
"separator": ",",
"quoteChar": "\"",
"escapeChar": "\\",
"nullFormat": "\\N",
"header": [],
"column": columns
}
}
}
unit['writer'] = new_writer
# 输出元数据供 Shell 使用
print(table_name)
print(json.dumps(columns))
unit['writer'] = new_writer
# elif target == 'mysql_jdbc': # 备用慢速模式
# new_writer = {
# "name": "mysqlwriter",
# "parameter": {
# "connection": [
# {
# "jdbcUrl": "${dest_mysql_jdbc}",
# "table": [table_name]
# }
# ],
# "username": "${dest_mysql_user}",
# "password": "${dest_mysql_pwd}",
# "column": columns,
# "writeMode": "insert",
# "batchSize": 2048, # 增加 Batch 大小
# "preSql": [f"TRUNCATE TABLE `{table_name}`"],
# "postSql": []
# }
# }
# unit['writer'] = new_writer
with open(dst_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
PY
)
if [ $? -ne 0 ]; then
echo "✗ 错误: 生成 MySQL Writer 配置失败 - $JOB"
echo "✗ 错误: 生成配置文件失败 - $JOB"
echo "$TRANSFORM_OUTPUT"
FAIL_COUNT=$((FAIL_COUNT + 1))
continue
fi
JOB_FILE="$TMP_FILE"
if [ "$TARGET" == "mysql" ]; then
TABLE_NAME=$(echo "$TRANSFORM_OUTPUT" | sed -n '1p')
COLUMNS_JSON=$(echo "$TRANSFORM_OUTPUT" | sed -n '2p')
# 去除可能的回车符
TABLE_NAME=$(echo "$TABLE_NAME" | tr -d '\r')
COLUMNS_JSON=$(echo "$COLUMNS_JSON" | tr -d '\r')
fi
fi
# ---------------------------------------------------------
# 执行 DataX
# ---------------------------------------------------------
# 构造 Java 命令
CLASS_PATH="$DATAX_HOME/lib/*:$DATAX_HOME/conf:."
JVM_OPTS="-server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError"
# 优化 JVM 参数: 增大内存,使用 G1 回收器提高吞吐,增加 Metaspace 空间
JVM_OPTS="-server -Xms2g -Xmx2g -XX:MaxMetaspaceSize=256m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+HeapDumpOnOutOfMemoryError"
java $JVM_OPTS -classpath "$CLASS_PATH" \
-Dfile.encoding=UTF-8 \
@@ -171,6 +266,40 @@ PY
EXIT_CODE=$?
# ---------------------------------------------------------
# 后置处理: mysql 模式下执行导入
# ---------------------------------------------------------
if [ $EXIT_CODE -eq 0 ] && [ "$TARGET" == "mysql" ]; then
echo "DataX 导出完成,开始执行 MySQL Load Data..."
# 提取连接参数
# 注意: 这里简化处理,直接写死或从 PARAMS 中解析有点麻烦
# 我们直接复用脚本顶部的变量,但要注意这些变量包含 -D 前缀
# 所以最好直接传递硬编码的值或者重新定义变量
# 解析参数 (简单暴力去除 -Ddest_mysql_... 前缀)
# 这里为了稳健,我们直接使用 Python 脚本解析 JDBC URL
# 定义 DB 连接信息 (与 DEST_PARAMS 保持一致)
DB_USER="ylt"
DB_PWD="Ycharge666"
DB_JDBC="jdbc:mysql://10.10.14.210:22066/yltcharge?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowLoadLocalInfile=true"
python "$TOOL_DIR/LoadCsvToMysql.py" \
"$DB_JDBC" \
"$DB_USER" \
"$DB_PWD" \
"$TABLE_NAME" \
"$CSV_DIR" \
"$COLUMNS_JSON"
EXIT_CODE=$?
# 清理临时 CSV
rm -rf "$CSV_DIR"
fi
END_TIME=$(date +%s)
DURATION=$((END_TIME - START_TIME))

View File

@@ -4,7 +4,15 @@ export PATH=$PATH:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin
[ -f ~/.bash_profile ] && . ~/.bash_profile
[ -f ~/.profile ] && . ~/.profile
TARGET=${1:-doris} # 默认为 doris
TARGET=$1
if [ -z "$TARGET" ]; then
echo "用法: $0 <target>"
echo "请指定同步目标:"
echo " doris - 增量同步到 Doris"
echo " mysql - 增量同步到 MySQL"
exit 1
fi
DATAX_HOME="/usr/local/datax"
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
@@ -94,12 +102,22 @@ target = os.environ.get('DATAX_TARGET', 'doris')
with open(src_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 0. 基础性能优化 (设置并发和内存限制)
if 'setting' not in data['job']:
data['job']['setting'] = {}
data['job']['setting']['speed'] = {
"channel": 4, # 提高并发到 4
}
unit = data['job']['content'][0]
reader = unit.get('reader', {})
writer = unit.get('writer', {})
rp = reader.get('parameter', {})
wp = writer.get('parameter', {})
# 0.1 Reader 优化 (FetchSize)
rp['fetchSize'] = 4096 # 进一步增加读取缓存,减少网络往返
# ---------------------------------------------------------
# 1. Reader 转换: 从 table 模式转换为 querySql 模式 (增量逻辑)
# ---------------------------------------------------------
@@ -131,15 +149,15 @@ if not table_name:
if not table_name:
raise RuntimeError("无法确定表名: %s" % src_path)
# 构建增量查询 SQL: SELECT col1, col2... FROM table ORDER BY id DESC LIMIT 10000
# 注意: 这里假设所有表都有 id 列。如果无 id 列,可能需要特殊处理。
# 安全起见,检查 columns 中是否包含 'id'
has_id = 'id' in columns
if has_id:
# 构建列字符串,处理反引号
col_str = ", ".join(["`{}`".format(c) for c in columns])
inc_sql = "SELECT {} FROM `{}` ORDER BY id DESC LIMIT 10000".format(col_str, table_name)
# 构建增量查询 SQL: SELECT col1, col2... FROM table ORDER BY id DESC LIMIT 1000
# 注意: 这里假设所有表都有 id 列。如果无 id 列,可能需要特殊处理。
# 安全起见,检查 columns 中是否包含 'id'
has_id = 'id' in columns
if has_id:
# 构建列字符串,处理反引号
col_str = ", ".join(["`{}`".format(c) for c in columns])
inc_sql = "SELECT {} FROM `{}` ORDER BY id DESC LIMIT 1000".format(col_str, table_name)
# 更新 Reader 配置
new_reader_conn = [
@@ -173,7 +191,7 @@ if target == 'mysql':
"password": "${dest_mysql_pwd}",
"column": columns,
"writeMode": "replace", # 增量同步使用 replace 模式
"batchSize": 2000,
"batchSize": 2048,
"preSql": [],
"postSql": []
}
@@ -193,7 +211,8 @@ PY
# 构造 Java 命令
CLASS_PATH="$DATAX_HOME/lib/*:$DATAX_HOME/conf:."
JVM_OPTS="-server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError"
# 优化 JVM 参数: 增大内存,使用 G1 回收器提高吞吐,增加 Metaspace 空间
JVM_OPTS="-server -Xms2g -Xmx2g -XX:MaxMetaspaceSize=256m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+HeapDumpOnOutOfMemoryError"
java $JVM_OPTS -classpath "$CLASS_PATH" \
-Dfile.encoding=UTF-8 \

View File

@@ -1,5 +1,13 @@
#!/bin/bash
TARGET=${1:-doris} # 默认为 doris
TARGET=$1
if [ -z "$TARGET" ]; then
echo "用法: $0 <target>"
echo "请指定同步目标:"
echo " doris - 启动 Doris 增量同步定时任务"
echo " mysql - 启动 MySQL 增量同步定时任务"
exit 1
fi
if [ "$TARGET" != "doris" ] && [ "$TARGET" != "mysql" ]; then
echo "错误: 请指定目标 (doris 或 mysql)"
@@ -29,7 +37,8 @@ fi
chmod +x "$SCRIPT_PATH"
# 构造 cron 命令,注意转义和参数传递
CRON_CMD="*/10 * * * * /bin/bash $SCRIPT_PATH $TARGET >> $LOG_FILE 2>&1"
# 修改为每 5 秒运行一次 (通过循环实现,因为 crontab 最小粒度是分钟)
CRON_CMD="* * * * * for i in {1..12}; do /bin/bash $SCRIPT_PATH $TARGET >> $LOG_FILE 2>&1; sleep 5; done"
# 查找已存在的任务 (匹配脚本路径和目标参数)
EXISTING_JOB=$(crontab -l 2>/dev/null | grep "$SCRIPT_PATH $TARGET")

View File

@@ -1,5 +1,13 @@
#!/bin/bash
TARGET=${1:-doris} # 默认为 doris
TARGET=$1
if [ -z "$TARGET" ]; then
echo "用法: $0 <target>"
echo "请指定同步目标:"
echo " doris - 停止 Doris 增量同步定时任务"
echo " mysql - 停止 MySQL 增量同步定时任务"
exit 1
fi
if [ "$TARGET" != "doris" ] && [ "$TARGET" != "mysql" ]; then
echo "错误: 请指定目标 (doris 或 mysql)"

View File

@@ -0,0 +1,143 @@
import pymysql
import sys
import os
import glob
import json
import time
from urllib.parse import urlparse, parse_qs
def parse_jdbc_url(url):
# jdbc:mysql://host:port/db?params
# remove jdbc:mysql://
if url.startswith("jdbc:mysql://"):
url = url[13:]
# split host:port and db
if "/" in url:
address, remainder = url.split("/", 1)
if "?" in remainder:
db, params = remainder.split("?", 1)
else:
db = remainder
params = ""
else:
address = url
db = ""
params = ""
if ":" in address:
host, port = address.split(":")
port = int(port)
else:
host = address
port = 3306
return host, port, db
def load_csv(jdbc_url, user, password, table, csv_dir, columns=None):
host, port, db = parse_jdbc_url(jdbc_url)
print(f"Connecting to MySQL {host}:{port}/{db} as {user}...")
try:
conn = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=db,
local_infile=True,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
except Exception as e:
print(f"Connection failed: {e}")
sys.exit(1)
try:
with conn.cursor() as cursor:
# Optimization settings
print("Setting session parameters for speed...")
cursor.execute("SET NAMES utf8mb4")
try:
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
cursor.execute("SET UNIQUE_CHECKS = 0")
cursor.execute("SET SQL_LOG_BIN = 0")
except Exception as e:
print(f"Warning: Could not set some optimization flags: {e}")
# Truncate table
print(f"Truncating table {table}...")
cursor.execute(f"TRUNCATE TABLE `{table}`")
# Find files
files = glob.glob(os.path.join(csv_dir, "*"))
if not files:
print(f"No files found in {csv_dir}")
return
total_rows = 0
start_time = time.time()
for file_path in files:
file_path = os.path.abspath(file_path).replace('\\', '/')
print(f"Loading file: {file_path}")
# Build SQL
# Assuming DataX txtfilewriter defaults:
# separator: ,
# quoteChar: "
# escapeChar: \
# nullFormat: \N
col_sql = ""
if columns:
col_list = [f"`{c}`" for c in columns]
col_sql = "(" + ", ".join(col_list) + ")"
sql = f"""
LOAD DATA LOCAL INFILE '{file_path}'
INTO TABLE `{table}`
CHARACTER SET utf8mb4
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
ESCAPED BY '\\\\'
LINES TERMINATED BY '\\n'
{col_sql}
"""
cursor.execute(sql)
rows = cursor.rowcount
total_rows += rows
print(f" -> Loaded {rows} rows")
conn.commit()
duration = time.time() - start_time
print(f"Total loaded: {total_rows} rows in {duration:.2f}s ({total_rows/duration if duration > 0 else 0:.2f} rows/s)")
except Exception as e:
print(f"Error during load: {e}")
sys.exit(1)
finally:
conn.close()
if __name__ == "__main__":
if len(sys.argv) < 6:
print("Usage: python LoadCsvToMysql.py <jdbc_url> <user> <password> <table> <csv_dir> [columns_json]")
sys.exit(1)
jdbc_url = sys.argv[1]
user = sys.argv[2]
password = sys.argv[3]
table = sys.argv[4]
csv_dir = sys.argv[5]
columns = None
if len(sys.argv) > 6:
try:
columns = json.loads(sys.argv[6])
except:
print("Warning: Could not parse columns JSON")
load_csv(jdbc_url, user, password, table, csv_dir, columns)