diff --git a/DataX/run_jobs_full.sh b/DataX/run_jobs_full.sh index ba8b1ec..440a052 100644 --- a/DataX/run_jobs_full.sh +++ b/DataX/run_jobs_full.sh @@ -41,11 +41,11 @@ JOBS=( ) # 公共源端参数 -SRC_PARAMS="-Dsrc_user=ylt -Dsrc_pwd=Ycharge666 -Dsrc_jdbc=jdbc:mysql://rm-bp1ux6tuk49er80t9xo.mysql.rds.aliyuncs.com:3306/yltcharge" +SRC_PARAMS="-Dsrc_user=ylt -Dsrc_pwd=Ycharge666 -Dsrc_jdbc=jdbc:mysql://rm-bp1ux6tuk49er80t9xo.mysql.rds.aliyuncs.com:3306/yltcharge?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai" if [ "$TARGET" == "doris" ]; then echo "模式: Doris 全量同步" - DEST_PARAMS="-Ddest_user=root -Ddest_pwd=DsideaL147258369 -Ddest_load_url=10.10.14.204:8030 -Ddest_jdbc=jdbc:mysql://10.10.14.204:9030/yltcharge" + DEST_PARAMS="-Ddest_user=root -Ddest_pwd=DsideaL147258369 -Ddest_load_url=10.10.14.204:8030 -Ddest_jdbc=jdbc:mysql://10.10.14.204:9030/yltcharge?useSSL=false" PARAMS="$SRC_PARAMS $DEST_PARAMS" elif [ "$TARGET" == "mysql" ]; then echo "模式: MySQL 全量同步 (CSV Load)" @@ -132,7 +132,7 @@ with open(src_path, 'r', encoding='utf-8') as f: if 'setting' not in data['job']: data['job']['setting'] = {} data['job']['setting']['speed'] = { - "channel": 4, # 提高并发到 4,加速 Reader 读取 + "channel": 8, # 提高并发到 8,加速 Reader 读取 } data['job']['setting']['errorLimit'] = { "record": 0 @@ -145,7 +145,7 @@ rp = reader.get('parameter', {}) wp = writer.get('parameter', {}) # 2. Reader 优化 (FetchSize) -rp['fetchSize'] = 4096 # 进一步增加读取缓存,减少网络往返 +rp['fetchSize'] = 10000 # 极大增加读取缓存,减少网络往返 # 获取表名 table_name = None diff --git a/DataX/run_jobs_inc.sh b/DataX/run_jobs_inc.sh index f4e1bef..3755859 100644 --- a/DataX/run_jobs_inc.sh +++ b/DataX/run_jobs_inc.sh @@ -39,11 +39,11 @@ JOBS=( ) # 公共源端参数 -SRC_PARAMS="-Dsrc_user=ylt -Dsrc_pwd=Ycharge666 -Dsrc_jdbc=jdbc:mysql://rm-bp1ux6tuk49er80t9xo.mysql.rds.aliyuncs.com:3306/yltcharge" +SRC_PARAMS="-Dsrc_user=ylt -Dsrc_pwd=Ycharge666 -Dsrc_jdbc=jdbc:mysql://rm-bp1ux6tuk49er80t9xo.mysql.rds.aliyuncs.com:3306/yltcharge?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai" if [ "$TARGET" == "doris" ]; then echo "模式: Doris 增量同步" - DEST_PARAMS="-Ddest_user=root -Ddest_pwd=DsideaL147258369 -Ddest_load_url=10.10.14.204:8030 -Ddest_jdbc=jdbc:mysql://10.10.14.204:9030/yltcharge" + DEST_PARAMS="-Ddest_user=root -Ddest_pwd=DsideaL147258369 -Ddest_load_url=10.10.14.204:8030 -Ddest_jdbc=jdbc:mysql://10.10.14.204:9030/yltcharge?useSSL=false" PARAMS="$SRC_PARAMS $DEST_PARAMS" elif [ "$TARGET" == "mysql" ]; then echo "模式: MySQL 增量同步 (10.10.14.210:22066)" @@ -106,7 +106,7 @@ with open(src_path, 'r', encoding='utf-8') as f: if 'setting' not in data['job']: data['job']['setting'] = {} data['job']['setting']['speed'] = { - "channel": 4, # 提高并发到 4 + "channel": 8, # 提高并发到 8 } unit = data['job']['content'][0] @@ -116,7 +116,7 @@ rp = reader.get('parameter', {}) wp = writer.get('parameter', {}) # 0.1 Reader 优化 (FetchSize) -rp['fetchSize'] = 4096 # 进一步增加读取缓存,减少网络往返 +rp['fetchSize'] = 10000 # 极大增加读取缓存,减少网络往返 # --------------------------------------------------------- # 1. Reader 转换: 从 table 模式转换为 querySql 模式 (增量逻辑) diff --git a/DataX/说明.md b/DataX/说明.md index 5d75860..8f79268 100644 --- a/DataX/说明.md +++ b/DataX/说明.md @@ -15,9 +15,19 @@ #### 二、配置部署 -本方案通过一套脚本支持 **Doris** 和 **MySQL** 的同步,并统一了 JSON 配置目录。 +本方案通过一套脚本支持 **Doris** 和 **MySQL** 的同步,并统一了 JSON 配置目录。 -1. **上传文件** +1. **环境准备 (重要)** + 高性能同步工具依赖 Python 的 MySQL 驱动,请先在服务器执行以下命令安装: + ```shell + # Rocky Linux / CentOS 9 + yum install -y python3-PyMySQL + + # 验证安装 (返回“成功”即表示OK) + python3 -c "import pymysql; print('成功')" + ``` + +2. **上传文件** 请将本地 `d:\dsWork\aiData\DataX\` 目录下的以下内容上传至服务器 `/usr/local/datax/` 根目录下: * 文件夹 `json/` (存放统一的 JSON 模板)