Files
aiData/DataX/run_jobs_inc.sh
HuangHai 1f4d29d5a1 'commit'
2026-02-06 08:21:47 +08:00

261 lines
8.3 KiB
Bash
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/bin/bash
export PATH=$PATH:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin
[ -f /etc/profile ] && . /etc/profile
[ -f ~/.bash_profile ] && . ~/.bash_profile
[ -f ~/.profile ] && . ~/.profile
TARGET=$1
if [ -z "$TARGET" ]; then
echo "用法: $0 <target>"
echo "请指定同步目标:"
echo " doris - 增量同步到 Doris"
echo " mysql - 增量同步到 MySQL"
exit 1
fi
DATAX_HOME="/usr/local/datax"
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
CONF_DIR="$SCRIPT_DIR/json" # 更新为统一目录
BIN_DIR="$DATAX_HOME/bin"
DATAX_PY="$BIN_DIR/datax.py"
JOBS=(
"t_equipment_charge_order.json"
"t_equipment_charge_order_detail.json"
"t_account_recharge.json"
"t_account_water.json"
"t_car.json"
"t_company.json"
"t_company_info_value.json"
"t_connector.json"
"t_equipment.json"
"t_station.json"
"t_ext_hurry_quit.json"
"t_time_day.json"
"t_user.json"
"t_user_account.json"
"t_user_upload_fault.json"
)
# 公共源端参数
SRC_PARAMS="-Dsrc_user=ylt -Dsrc_pwd=Ycharge666 -Dsrc_jdbc=jdbc:mysql://rm-bp1ux6tuk49er80t9xo.mysql.rds.aliyuncs.com:3306/yltcharge?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
if [ "$TARGET" == "doris" ]; then
echo "模式: Doris 增量同步"
DEST_PARAMS="-Ddest_user=root -Ddest_pwd=DsideaL147258369 -Ddest_load_url=10.10.14.204:8030 -Ddest_jdbc=jdbc:mysql://10.10.14.204:9030/yltcharge?useSSL=false"
PARAMS="$SRC_PARAMS $DEST_PARAMS"
elif [ "$TARGET" == "mysql" ]; then
echo "模式: MySQL 增量同步 (10.10.14.210:22066)"
DEST_PARAMS="-Ddest_mysql_user=ylt -Ddest_mysql_pwd=Ycharge666 -Ddest_mysql_jdbc=jdbc:mysql://10.10.14.210:22066/yltcharge?useUnicode=true&characterEncoding=UTF-8&useSSL=false&rewriteBatchedStatements=true&autoReconnect=true&failOverReadOnly=false"
PARAMS="$SRC_PARAMS $DEST_PARAMS"
else
echo "错误: 未知目标 '$TARGET'。请使用 'doris' 或 'mysql'。"
exit 1
fi
SCRIPT_START_TIME=$(date '+%Y-%m-%d %H:%M:%S')
SCRIPT_START_TIMESTAMP=$(date +%s)
echo "====================================="
echo "DataX 增量同步脚本 (Target: $TARGET)"
echo "====================================="
echo "配置文件目录: $CONF_DIR"
echo "任务数量: ${#JOBS[@]}"
echo "脚本开始时间: $SCRIPT_START_TIME"
echo "====================================="
cd "$CONF_DIR" || { echo "错误: 无法进入配置文件目录 $CONF_DIR"; exit 1; }
SUCCESS_COUNT=0
FAIL_COUNT=0
TOTAL=${#JOBS[@]}
CURRENT=0
for JOB in "${JOBS[@]}"; do
CURRENT=$((CURRENT + 1))
echo "任务 [$CURRENT/$TOTAL] $JOB"
echo "----------------------------------------"
if [ ! -f "$JOB" ]; then
echo "✗ 错误: 文件不存在 - $JOB"
FAIL_COUNT=$((FAIL_COUNT + 1))
continue
fi
START_TIME=$(date +%s)
JOB_FILE="$JOB"
# 生成临时配置文件: 修改 Reader 为 querySql (增量逻辑) 且根据目标修改 Writer
TMP_FILE="/tmp/datax_inc_${TARGET}_${JOB}"
# 传递目标类型给 Python 脚本
export DATAX_TARGET="$TARGET"
/usr/bin/python - <<'PY' "$JOB" "$TMP_FILE"
import json, sys, os, re
src_path = sys.argv[1]
dst_path = sys.argv[2]
target = os.environ.get('DATAX_TARGET', 'doris')
with open(src_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 0. 基础性能优化 (设置并发和内存限制)
if 'setting' not in data['job']:
data['job']['setting'] = {}
data['job']['setting']['speed'] = {
"channel": 8, # 提高并发到 8
}
unit = data['job']['content'][0]
reader = unit.get('reader', {})
writer = unit.get('writer', {})
rp = reader.get('parameter', {})
wp = writer.get('parameter', {})
# 0.1 Reader 优化 (FetchSize)
rp['fetchSize'] = 10000 # 极大增加读取缓存,减少网络往返
# ---------------------------------------------------------
# 1. Reader 转换: 从 table 模式转换为 querySql 模式 (增量逻辑)
# ---------------------------------------------------------
# 获取列列表
columns = rp.get('column')
if not columns:
# 尝试从 writer 获取
columns = wp.get('column')
if not columns:
# 兜底
columns = ["*"]
# 获取表名
table_name = None
try:
table_name = rp.get('connection', [{}])[0].get('table', [None])[0]
except Exception:
pass
if not table_name:
# 尝试从 querySql 解析 (如果源文件已经是 querySql 模式)
try:
q = rp.get('connection', [{}])[0].get('querySql', [None])[0]
if q:
m = re.search(r'FROM\s+`?(\w+)`?', q, re.IGNORECASE)
if m:
table_name = m.group(1)
except Exception:
pass
if not table_name:
# 兜底:使用文件名
table_name = os.path.basename(src_path).replace('.json', '')
# 构建增量查询 SQL: SELECT col1, col2... FROM table ORDER BY id DESC LIMIT 1000
# 注意: 这里假设所有表都有 id 列。如果无 id 列,可能需要特殊处理。
has_id = 'id' in columns or columns == ["*"] # 如果是 *,我们尝试加上 id 排序
if has_id:
# 构建列字符串,处理反引号
if columns == ["*"]:
col_str = "*"
else:
col_str = ", ".join(["`{}`".format(c) for c in columns])
inc_sql = "SELECT {} FROM `{}` ORDER BY id DESC LIMIT 1000".format(col_str, table_name)
# 更新 Reader 配置
new_reader_conn = [
{
"jdbcUrl": rp.get('connection', [{}])[0].get('jdbcUrl', ["${src_jdbc}"]),
"querySql": [inc_sql]
}
]
rp['connection'] = new_reader_conn
# 清除 table 和 where 参数,因为 querySql 模式下这些通常不生效或冲突
if 'table' in rp: del rp['table']
if 'where' in rp: del rp['where']
else:
print("警告: 表 %s 没有 id 列,无法使用 ORDER BY id DESC 策略,保持原样" % table_name)
# ---------------------------------------------------------
# 2. Writer 转换: 如果目标是 MySQL转换为 mysqlwriter
# ---------------------------------------------------------
if target == 'mysql':
new_writer = {
"name": "mysqlwriter",
"parameter": {
"connection": [
{
"jdbcUrl": "${dest_mysql_jdbc}",
"table": [table_name]
}
],
"username": "${dest_mysql_user}",
"password": "${dest_mysql_pwd}",
"column": columns,
"writeMode": "replace", # 增量同步使用 replace 模式
"batchSize": 2048,
"preSql": [],
"postSql": []
}
}
unit['writer'] = new_writer
# 输出表名供 Shell 使用
print(table_name)
with open(dst_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
PY
)
if [ $? -ne 0 ]; then
echo "✗ 错误: 生成增量配置文件失败 - $JOB"
FAIL_COUNT=$((FAIL_COUNT + 1))
continue
fi
# 提取 Python 输出的第一行作为表名
TABLE_NAME=$(echo "$TRANSFORM_OUTPUT" | sed -n '1p' | tr -d '\r')
JOB_FILE="$TMP_FILE"
# 构造 Java 命令
CLASS_PATH="$DATAX_HOME/lib/*:$DATAX_HOME/conf:."
# 优化 JVM 参数: 增大内存,使用 G1 回收器提高吞吐,增加 Metaspace 空间
JVM_OPTS="-server -Xms2g -Xmx2g -XX:MaxMetaspaceSize=256m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+HeapDumpOnOutOfMemoryError"
java $JVM_OPTS -classpath "$CLASS_PATH" \
-Dfile.encoding=UTF-8 \
-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener \
-Djava.security.egd=file:///dev/urandom \
-Ddatax.home="$DATAX_HOME" \
-Dlogback.configurationFile="$DATAX_HOME/conf/logback.xml" \
$PARAMS \
com.alibaba.datax.core.Engine \
-mode standalone \
-jobid -1 \
-job "$JOB_FILE" | sed "s/^/[$TABLE_NAME] /"
EXIT_CODE=$?
END_TIME=$(date +%s)
DURATION=$((END_TIME - START_TIME))
if [ $EXIT_CODE -eq 0 ]; then
echo "✓ 成功: $JOB 用时 ${DURATION}s"
SUCCESS_COUNT=$((SUCCESS_COUNT + 1))
else
echo "✗ 失败: $JOB 用时 ${DURATION}s (exit=$EXIT_CODE)"
FAIL_COUNT=$((FAIL_COUNT + 1))
fi
echo "----------------------------------------"
done
TOTAL_TIME=$(( $(date +%s) - SCRIPT_START_TIMESTAMP ))
echo "完成: 成功 $SUCCESS_COUNT, 失败 $FAIL_COUNT, 总耗时 ${TOTAL_TIME}s"
exit $([ $FAIL_COUNT -eq 0 ] && echo 0 || echo 1)