diff --git a/DataX/run_jobs_full.sh b/DataX/run_jobs_full.sh index c5b11b5..09fd1db 100644 --- a/DataX/run_jobs_full.sh +++ b/DataX/run_jobs_full.sh @@ -4,13 +4,23 @@ export PATH=$PATH:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin [ -f ~/.bash_profile ] && . ~/.bash_profile [ -f ~/.profile ] && . ~/.profile -TARGET=${1:-doris} # 默认为 doris +TARGET=$1 + +if [ -z "$TARGET" ]; then + echo "用法: $0 " + echo "请指定同步目标:" + echo " doris - 全量同步到 Doris (默认端口 9030)" + echo " mysql - 全量同步到 MySQL (CSV Load 模式,高性能)" + # echo " mysql_jdbc - 全量同步到 MySQL (JDBC Batch 模式,速度较慢)" + exit 1 +fi DATAX_HOME="/usr/local/datax" SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" CONF_DIR="$SCRIPT_DIR/json" # 更新为统一目录 BIN_DIR="$DATAX_HOME/bin" DATAX_PY="$BIN_DIR/datax.py" +TOOL_DIR="$SCRIPT_DIR/tool" JOBS=( "t_equipment_charge_order.json" @@ -38,9 +48,14 @@ if [ "$TARGET" == "doris" ]; then DEST_PARAMS="-Ddest_user=root -Ddest_pwd=DsideaL147258369 -Ddest_load_url=10.10.14.204:8030 -Ddest_jdbc=jdbc:mysql://10.10.14.204:9030/yltcharge" PARAMS="$SRC_PARAMS $DEST_PARAMS" elif [ "$TARGET" == "mysql" ]; then - echo "模式: MySQL 全量同步 (10.10.14.210:22066)" - DEST_PARAMS="-Ddest_mysql_user=ylt -Ddest_mysql_pwd=Ycharge666 -Ddest_mysql_jdbc=jdbc:mysql://10.10.14.210:22066/yltcharge?useUnicode=true&characterEncoding=UTF-8&useSSL=false&rewriteBatchedStatements=true&autoReconnect=true&failOverReadOnly=false" + echo "模式: MySQL 全量同步 (CSV Load)" + # 注意: CSV Load 模式下 DataX 负责写文件,Python 负责 Load。JDBC 参数用于 Python 连接。 + DEST_PARAMS="-Ddest_mysql_user=ylt -Ddest_mysql_pwd=Ycharge666 -Ddest_mysql_jdbc=jdbc:mysql://10.10.14.210:22066/yltcharge?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowLoadLocalInfile=true" PARAMS="$SRC_PARAMS $DEST_PARAMS" +# elif [ "$TARGET" == "mysql_jdbc" ]; then +# echo "模式: MySQL 全量同步 (JDBC Batch)" +# DEST_PARAMS="-Ddest_mysql_user=ylt -Ddest_mysql_pwd=Ycharge666 -Ddest_mysql_jdbc=jdbc:mysql://10.10.14.210:22066/yltcharge?useUnicode=true&characterEncoding=UTF-8&useSSL=false&rewriteBatchedStatements=true&autoReconnect=true&failOverReadOnly=false" +# PARAMS="$SRC_PARAMS $DEST_PARAMS" else echo "错误: 未知目标 '$TARGET'。请使用 'doris' 或 'mysql'。" exit 1 @@ -64,6 +79,11 @@ FAIL_COUNT=0 TOTAL=${#JOBS[@]} CURRENT=0 +# 清理旧的 CSV 临时目录 +if [ "$TARGET" == "mysql" ]; then + rm -rf "$SCRIPT_DIR/datax_tmp_csv" +fi + for JOB in "${JOBS[@]}"; do CURRENT=$((CURRENT + 1)) echo "任务 [$CURRENT/$TOTAL] $JOB" @@ -77,24 +97,56 @@ for JOB in "${JOBS[@]}"; do START_TIME=$(date +%s) JOB_FILE="$JOB" + + # 变量用于存储 mysql_load 模式下的元数据 + TABLE_NAME="" + COLUMNS_JSON="" + CSV_DIR="" - # 如果是 MySQL,需要生成临时配置文件修改 Writer + # --------------------------------------------------------- + # 配置转换逻辑 + # --------------------------------------------------------- + + TMP_FILE="/tmp/datax_optimized_${TARGET}_${JOB}" + + # 传递目标类型和 CSV 输出目录给 Python + export DATAX_TARGET="$TARGET" if [ "$TARGET" == "mysql" ]; then - TMP_FILE="/tmp/datax_mysql_full_${JOB}" - /usr/bin/python - <<'PY' "$JOB" "$TMP_FILE" -import json, sys, re + CSV_DIR="$SCRIPT_DIR/datax_tmp_csv/${JOB%.json}" + export DATAX_CSV_DIR="$CSV_DIR" + fi + + # 执行 Python 转换脚本并捕获输出 + TRANSFORM_OUTPUT=$(/usr/bin/python - <<'PY' "$JOB" "$TMP_FILE" +import json, sys, re, os + src_path = sys.argv[1] dst_path = sys.argv[2] +target = os.environ.get('DATAX_TARGET', 'doris') +csv_dir = os.environ.get('DATAX_CSV_DIR', '') with open(src_path, 'r', encoding='utf-8') as f: data = json.load(f) +# 1. 基础性能优化 (设置并发和内存限制) +if 'setting' not in data['job']: + data['job']['setting'] = {} +data['job']['setting']['speed'] = { + "channel": 4, # 提高并发到 4,加速 Reader 读取 +} +data['job']['setting']['errorLimit'] = { + "record": 0 +} + unit = data['job']['content'][0] reader = unit.get('reader', {}) writer = unit.get('writer', {}) rp = reader.get('parameter', {}) wp = writer.get('parameter', {}) +# 2. Reader 优化 (FetchSize) +rp['fetchSize'] = 4096 # 进一步增加读取缓存,减少网络往返 + # 获取表名 table_name = None try: @@ -121,41 +173,84 @@ columns = wp.get('column') or rp.get('column') if not columns: raise RuntimeError("无法确定列列表: %s" % src_path) -new_writer = { - "name": "mysqlwriter", - "parameter": { - "connection": [ - { - "jdbcUrl": "${dest_mysql_jdbc}", - "table": [table_name] - } - ], - "username": "${dest_mysql_user}", - "password": "${dest_mysql_pwd}", - "column": columns, - "writeMode": "insert", - "batchSize": 2000, - "preSql": [f"TRUNCATE TABLE `{table_name}`"], - "postSql": [] +# 3. 根据目标转换 Writer +if target == 'mysql': + if not os.path.exists(csv_dir): + os.makedirs(csv_dir) + + new_writer = { + "name": "txtfilewriter", + "parameter": { + "path": csv_dir, + "fileName": table_name, + "writeMode": "truncate", + "fileFormat": "csv", + "separator": ",", + "quoteChar": "\"", + "escapeChar": "\\", + "nullFormat": "\\N", + "header": [], + "column": columns + } } -} + unit['writer'] = new_writer + + # 输出元数据供 Shell 使用 + print(table_name) + print(json.dumps(columns)) -unit['writer'] = new_writer +# elif target == 'mysql_jdbc': # 备用慢速模式 +# new_writer = { +# "name": "mysqlwriter", +# "parameter": { +# "connection": [ +# { +# "jdbcUrl": "${dest_mysql_jdbc}", +# "table": [table_name] +# } +# ], +# "username": "${dest_mysql_user}", +# "password": "${dest_mysql_pwd}", +# "column": columns, +# "writeMode": "insert", +# "batchSize": 2048, # 增加 Batch 大小 +# "preSql": [f"TRUNCATE TABLE `{table_name}`"], +# "postSql": [] +# } +# } +# unit['writer'] = new_writer with open(dst_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) PY + ) + if [ $? -ne 0 ]; then - echo "✗ 错误: 生成 MySQL Writer 配置失败 - $JOB" + echo "✗ 错误: 生成配置文件失败 - $JOB" + echo "$TRANSFORM_OUTPUT" FAIL_COUNT=$((FAIL_COUNT + 1)) continue fi + JOB_FILE="$TMP_FILE" + + if [ "$TARGET" == "mysql" ]; then + TABLE_NAME=$(echo "$TRANSFORM_OUTPUT" | sed -n '1p') + COLUMNS_JSON=$(echo "$TRANSFORM_OUTPUT" | sed -n '2p') + # 去除可能的回车符 + TABLE_NAME=$(echo "$TABLE_NAME" | tr -d '\r') + COLUMNS_JSON=$(echo "$COLUMNS_JSON" | tr -d '\r') + fi fi + # --------------------------------------------------------- + # 执行 DataX + # --------------------------------------------------------- + # 构造 Java 命令 CLASS_PATH="$DATAX_HOME/lib/*:$DATAX_HOME/conf:." - JVM_OPTS="-server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError" + # 优化 JVM 参数: 增大内存,使用 G1 回收器提高吞吐,增加 Metaspace 空间 + JVM_OPTS="-server -Xms2g -Xmx2g -XX:MaxMetaspaceSize=256m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+HeapDumpOnOutOfMemoryError" java $JVM_OPTS -classpath "$CLASS_PATH" \ -Dfile.encoding=UTF-8 \ @@ -171,6 +266,40 @@ PY EXIT_CODE=$? + # --------------------------------------------------------- + # 后置处理: mysql 模式下执行导入 + # --------------------------------------------------------- + + if [ $EXIT_CODE -eq 0 ] && [ "$TARGET" == "mysql" ]; then + echo "DataX 导出完成,开始执行 MySQL Load Data..." + + # 提取连接参数 + # 注意: 这里简化处理,直接写死或从 PARAMS 中解析有点麻烦 + # 我们直接复用脚本顶部的变量,但要注意这些变量包含 -D 前缀 + # 所以最好直接传递硬编码的值或者重新定义变量 + + # 解析参数 (简单暴力去除 -Ddest_mysql_... 前缀) + # 这里为了稳健,我们直接使用 Python 脚本解析 JDBC URL + + # 定义 DB 连接信息 (与 DEST_PARAMS 保持一致) + DB_USER="ylt" + DB_PWD="Ycharge666" + DB_JDBC="jdbc:mysql://10.10.14.210:22066/yltcharge?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowLoadLocalInfile=true" + + python "$TOOL_DIR/LoadCsvToMysql.py" \ + "$DB_JDBC" \ + "$DB_USER" \ + "$DB_PWD" \ + "$TABLE_NAME" \ + "$CSV_DIR" \ + "$COLUMNS_JSON" + + EXIT_CODE=$? + + # 清理临时 CSV + rm -rf "$CSV_DIR" + fi + END_TIME=$(date +%s) DURATION=$((END_TIME - START_TIME)) diff --git a/DataX/run_jobs_inc.sh b/DataX/run_jobs_inc.sh index bb6834f..f4e1bef 100644 --- a/DataX/run_jobs_inc.sh +++ b/DataX/run_jobs_inc.sh @@ -4,7 +4,15 @@ export PATH=$PATH:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin [ -f ~/.bash_profile ] && . ~/.bash_profile [ -f ~/.profile ] && . ~/.profile -TARGET=${1:-doris} # 默认为 doris +TARGET=$1 + +if [ -z "$TARGET" ]; then + echo "用法: $0 " + echo "请指定同步目标:" + echo " doris - 增量同步到 Doris" + echo " mysql - 增量同步到 MySQL" + exit 1 +fi DATAX_HOME="/usr/local/datax" SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" @@ -94,12 +102,22 @@ target = os.environ.get('DATAX_TARGET', 'doris') with open(src_path, 'r', encoding='utf-8') as f: data = json.load(f) +# 0. 基础性能优化 (设置并发和内存限制) +if 'setting' not in data['job']: + data['job']['setting'] = {} +data['job']['setting']['speed'] = { + "channel": 4, # 提高并发到 4 +} + unit = data['job']['content'][0] reader = unit.get('reader', {}) writer = unit.get('writer', {}) rp = reader.get('parameter', {}) wp = writer.get('parameter', {}) +# 0.1 Reader 优化 (FetchSize) +rp['fetchSize'] = 4096 # 进一步增加读取缓存,减少网络往返 + # --------------------------------------------------------- # 1. Reader 转换: 从 table 模式转换为 querySql 模式 (增量逻辑) # --------------------------------------------------------- @@ -131,15 +149,15 @@ if not table_name: if not table_name: raise RuntimeError("无法确定表名: %s" % src_path) -# 构建增量查询 SQL: SELECT col1, col2... FROM table ORDER BY id DESC LIMIT 10000 -# 注意: 这里假设所有表都有 id 列。如果无 id 列,可能需要特殊处理。 -# 安全起见,检查 columns 中是否包含 'id' -has_id = 'id' in columns - -if has_id: - # 构建列字符串,处理反引号 - col_str = ", ".join(["`{}`".format(c) for c in columns]) - inc_sql = "SELECT {} FROM `{}` ORDER BY id DESC LIMIT 10000".format(col_str, table_name) +# 构建增量查询 SQL: SELECT col1, col2... FROM table ORDER BY id DESC LIMIT 1000 + # 注意: 这里假设所有表都有 id 列。如果无 id 列,可能需要特殊处理。 + # 安全起见,检查 columns 中是否包含 'id' + has_id = 'id' in columns + + if has_id: + # 构建列字符串,处理反引号 + col_str = ", ".join(["`{}`".format(c) for c in columns]) + inc_sql = "SELECT {} FROM `{}` ORDER BY id DESC LIMIT 1000".format(col_str, table_name) # 更新 Reader 配置 new_reader_conn = [ @@ -173,7 +191,7 @@ if target == 'mysql': "password": "${dest_mysql_pwd}", "column": columns, "writeMode": "replace", # 增量同步使用 replace 模式 - "batchSize": 2000, + "batchSize": 2048, "preSql": [], "postSql": [] } @@ -193,7 +211,8 @@ PY # 构造 Java 命令 CLASS_PATH="$DATAX_HOME/lib/*:$DATAX_HOME/conf:." - JVM_OPTS="-server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError" + # 优化 JVM 参数: 增大内存,使用 G1 回收器提高吞吐,增加 Metaspace 空间 + JVM_OPTS="-server -Xms2g -Xmx2g -XX:MaxMetaspaceSize=256m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+HeapDumpOnOutOfMemoryError" java $JVM_OPTS -classpath "$CLASS_PATH" \ -Dfile.encoding=UTF-8 \ diff --git a/DataX/start_cron_inc.sh b/DataX/start_cron_inc.sh index 59ca3c4..2c99363 100644 --- a/DataX/start_cron_inc.sh +++ b/DataX/start_cron_inc.sh @@ -1,5 +1,13 @@ #!/bin/bash -TARGET=${1:-doris} # 默认为 doris +TARGET=$1 + +if [ -z "$TARGET" ]; then + echo "用法: $0 " + echo "请指定同步目标:" + echo " doris - 启动 Doris 增量同步定时任务" + echo " mysql - 启动 MySQL 增量同步定时任务" + exit 1 +fi if [ "$TARGET" != "doris" ] && [ "$TARGET" != "mysql" ]; then echo "错误: 请指定目标 (doris 或 mysql)" @@ -29,7 +37,8 @@ fi chmod +x "$SCRIPT_PATH" # 构造 cron 命令,注意转义和参数传递 -CRON_CMD="*/10 * * * * /bin/bash $SCRIPT_PATH $TARGET >> $LOG_FILE 2>&1" +# 修改为每 5 秒运行一次 (通过循环实现,因为 crontab 最小粒度是分钟) +CRON_CMD="* * * * * for i in {1..12}; do /bin/bash $SCRIPT_PATH $TARGET >> $LOG_FILE 2>&1; sleep 5; done" # 查找已存在的任务 (匹配脚本路径和目标参数) EXISTING_JOB=$(crontab -l 2>/dev/null | grep "$SCRIPT_PATH $TARGET") diff --git a/DataX/stop_cron_inc.sh b/DataX/stop_cron_inc.sh index eec61d9..b32fc76 100644 --- a/DataX/stop_cron_inc.sh +++ b/DataX/stop_cron_inc.sh @@ -1,5 +1,13 @@ #!/bin/bash -TARGET=${1:-doris} # 默认为 doris +TARGET=$1 + +if [ -z "$TARGET" ]; then + echo "用法: $0 " + echo "请指定同步目标:" + echo " doris - 停止 Doris 增量同步定时任务" + echo " mysql - 停止 MySQL 增量同步定时任务" + exit 1 +fi if [ "$TARGET" != "doris" ] && [ "$TARGET" != "mysql" ]; then echo "错误: 请指定目标 (doris 或 mysql)" diff --git a/DataX/tool/LoadCsvToMysql.py b/DataX/tool/LoadCsvToMysql.py new file mode 100644 index 0000000..672057e --- /dev/null +++ b/DataX/tool/LoadCsvToMysql.py @@ -0,0 +1,143 @@ +import pymysql +import sys +import os +import glob +import json +import time +from urllib.parse import urlparse, parse_qs + +def parse_jdbc_url(url): + # jdbc:mysql://host:port/db?params + # remove jdbc:mysql:// + if url.startswith("jdbc:mysql://"): + url = url[13:] + + # split host:port and db + if "/" in url: + address, remainder = url.split("/", 1) + if "?" in remainder: + db, params = remainder.split("?", 1) + else: + db = remainder + params = "" + else: + address = url + db = "" + params = "" + + if ":" in address: + host, port = address.split(":") + port = int(port) + else: + host = address + port = 3306 + + return host, port, db + +def load_csv(jdbc_url, user, password, table, csv_dir, columns=None): + host, port, db = parse_jdbc_url(jdbc_url) + + print(f"Connecting to MySQL {host}:{port}/{db} as {user}...") + + try: + conn = pymysql.connect( + host=host, + port=port, + user=user, + password=password, + database=db, + local_infile=True, + charset='utf8mb4', + cursorclass=pymysql.cursors.DictCursor + ) + except Exception as e: + print(f"Connection failed: {e}") + sys.exit(1) + + try: + with conn.cursor() as cursor: + # Optimization settings + print("Setting session parameters for speed...") + cursor.execute("SET NAMES utf8mb4") + try: + cursor.execute("SET FOREIGN_KEY_CHECKS = 0") + cursor.execute("SET UNIQUE_CHECKS = 0") + cursor.execute("SET SQL_LOG_BIN = 0") + except Exception as e: + print(f"Warning: Could not set some optimization flags: {e}") + + # Truncate table + print(f"Truncating table {table}...") + cursor.execute(f"TRUNCATE TABLE `{table}`") + + # Find files + files = glob.glob(os.path.join(csv_dir, "*")) + if not files: + print(f"No files found in {csv_dir}") + return + + total_rows = 0 + start_time = time.time() + + for file_path in files: + file_path = os.path.abspath(file_path).replace('\\', '/') + print(f"Loading file: {file_path}") + + # Build SQL + # Assuming DataX txtfilewriter defaults: + # separator: , + # quoteChar: " + # escapeChar: \ + # nullFormat: \N + + col_sql = "" + if columns: + col_list = [f"`{c}`" for c in columns] + col_sql = "(" + ", ".join(col_list) + ")" + + sql = f""" + LOAD DATA LOCAL INFILE '{file_path}' + INTO TABLE `{table}` + CHARACTER SET utf8mb4 + FIELDS TERMINATED BY ',' + OPTIONALLY ENCLOSED BY '"' + ESCAPED BY '\\\\' + LINES TERMINATED BY '\\n' + {col_sql} + """ + + cursor.execute(sql) + rows = cursor.rowcount + total_rows += rows + print(f" -> Loaded {rows} rows") + + conn.commit() + + duration = time.time() - start_time + print(f"Total loaded: {total_rows} rows in {duration:.2f}s ({total_rows/duration if duration > 0 else 0:.2f} rows/s)") + + except Exception as e: + print(f"Error during load: {e}") + sys.exit(1) + finally: + conn.close() + +if __name__ == "__main__": + if len(sys.argv) < 6: + print("Usage: python LoadCsvToMysql.py [columns_json]") + sys.exit(1) + + jdbc_url = sys.argv[1] + user = sys.argv[2] + password = sys.argv[3] + table = sys.argv[4] + csv_dir = sys.argv[5] + + columns = None + if len(sys.argv) > 6: + try: + columns = json.loads(sys.argv[6]) + except: + print("Warning: Could not parse columns JSON") + + load_csv(jdbc_url, user, password, table, csv_dir, columns)