This commit is contained in:
HuangHai
2026-01-20 11:23:44 +08:00
parent bf2680e346
commit a97397a4ed
4 changed files with 590 additions and 0 deletions

View File

@@ -0,0 +1,285 @@
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"user_id",
"company_id",
"station_id",
"equipment_id",
"connector_id",
"user_card_no",
"state",
"operate_state",
"order_no",
"platform_order_no",
"platform_sale_type",
"platform_driver_id",
"platform_phone",
"charging_amt",
"order_type",
"order_time",
"charge_begin_time",
"charge_end_time",
"charge_begin_degree",
"charge_end_degree",
"charge_degree",
"charge_ah",
"charge_times_degree",
"charge_begin_soc",
"charge_end_soc",
"charge_cur_soc",
"charge_duration",
"charge_settle_type",
"charge_unit_price",
"charge_unit_cost",
"charge_unit_service_fee",
"receivable_electric_fee",
"receivable_service_fee",
"receivable_total_fee",
"charge_elecfee_amount",
"charge_elecfee_cost_amount",
"charge_service_amount",
"actual_pay_amount",
"pay_order_no",
"pay_amount",
"pay_method",
"pay_status",
"pay_time",
"user_coupon_id",
"coupon_name",
"coupon_type",
"coupon_amount",
"elecfee_coupon_amount",
"servicefee_coupon_amount",
"invoice_id",
"invoice_fee",
"ticket_status",
"activity_electric_fee",
"activity_service_fee",
"charge_plate_no",
"charge_vin",
"charge_strategy",
"charge_strategy_param",
"operator_id",
"operator_income",
"plat_service_fee",
"charge_discount_type",
"settle_electric_fee",
"settle_service_fee",
"settle_fee",
"settle_method",
"settle_coupon_amount",
"settle_coupon_electric_fee",
"settle_coupon_service_fee",
"settle_status",
"cancel_type",
"cancel_code",
"cancel_msg",
"cancel_time",
"finish_type",
"finish_code",
"finish_msg",
"finish_time",
"boot_mode",
"settlement_type",
"is_parking_coupon",
"control_source",
"exception_flag",
"exception_msg",
"report_time",
"com_service_fee",
"com_electric_fee",
"com_total_fee",
"com_charge_discount_type",
"com_charge_discount",
"has_shallow_report",
"has_deep_report",
"is_exception",
"shallow_report_is_received",
"create_time",
"update_time",
"charge_market_id",
"coupon_total_amount",
"coupon_service_total_amount",
"coupon_elecfee_total_amount",
"activity_name",
"activity_total_fee",
"company_activity_id",
"company_activity_name",
"company_activity_total_fee",
"company_activity_service_fee",
"company_activity_electric_fee",
"subsidy_degree",
"subsidy_fee",
"special_channel_order_source"
],
"connection": [
{
"jdbcUrl": [
"${src_jdbc}"
],
"table": [
"t_equipment_charge_order"
]
}
],
"username": "${src_user}",
"password": "${src_pwd}",
"where": ""
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": [
"${dest_load_url}"
],
"column": [
"id",
"user_id",
"company_id",
"station_id",
"equipment_id",
"connector_id",
"user_card_no",
"state",
"operate_state",
"order_no",
"platform_order_no",
"platform_sale_type",
"platform_driver_id",
"platform_phone",
"charging_amt",
"order_type",
"order_time",
"charge_begin_time",
"charge_end_time",
"charge_begin_degree",
"charge_end_degree",
"charge_degree",
"charge_ah",
"charge_times_degree",
"charge_begin_soc",
"charge_end_soc",
"charge_cur_soc",
"charge_duration",
"charge_settle_type",
"charge_unit_price",
"charge_unit_cost",
"charge_unit_service_fee",
"receivable_electric_fee",
"receivable_service_fee",
"receivable_total_fee",
"charge_elecfee_amount",
"charge_elecfee_cost_amount",
"charge_service_amount",
"actual_pay_amount",
"pay_order_no",
"pay_amount",
"pay_method",
"pay_status",
"pay_time",
"user_coupon_id",
"coupon_name",
"coupon_type",
"coupon_amount",
"elecfee_coupon_amount",
"servicefee_coupon_amount",
"invoice_id",
"invoice_fee",
"ticket_status",
"activity_electric_fee",
"activity_service_fee",
"charge_plate_no",
"charge_vin",
"charge_strategy",
"charge_strategy_param",
"operator_id",
"operator_income",
"plat_service_fee",
"charge_discount_type",
"settle_electric_fee",
"settle_service_fee",
"settle_fee",
"settle_method",
"settle_coupon_amount",
"settle_coupon_electric_fee",
"settle_coupon_service_fee",
"settle_status",
"cancel_type",
"cancel_code",
"cancel_msg",
"cancel_time",
"finish_type",
"finish_code",
"finish_msg",
"finish_time",
"boot_mode",
"settlement_type",
"is_parking_coupon",
"control_source",
"exception_flag",
"exception_msg",
"report_time",
"com_service_fee",
"com_electric_fee",
"com_total_fee",
"com_charge_discount_type",
"com_charge_discount",
"has_shallow_report",
"has_deep_report",
"is_exception",
"shallow_report_is_received",
"create_time",
"update_time",
"charge_market_id",
"coupon_total_amount",
"coupon_service_total_amount",
"coupon_elecfee_total_amount",
"activity_name",
"activity_total_fee",
"company_activity_id",
"company_activity_name",
"company_activity_total_fee",
"company_activity_service_fee",
"company_activity_electric_fee",
"subsidy_degree",
"subsidy_fee",
"special_channel_order_source"
],
"username": "${dest_user}",
"password": "${dest_pwd}",
"postSql": [],
"preSql": [
"TRUNCATE TABLE t_equipment_charge_order"
],
"flushInterval": 30000,
"connection": [
{
"jdbcUrl": "${dest_jdbc}",
"selectedDatabase": "yltcharge",
"table": [
"t_equipment_charge_order"
]
}
],
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}

View File

@@ -0,0 +1,109 @@
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"order_no",
"charge_degree",
"service_fee",
"electric_fee",
"cost_fee",
"service_fee_unit_price",
"electric_fee_unit_price",
"cost_fee_unit_price",
"activity_service_fee",
"activity_electric_fee",
"charge_startr_time",
"charge_end_time",
"charge_meal_electric_fee",
"charge_meal_service_fee",
"charge_meal_total_fee",
"electric_card_use_dgree",
"check_electric_unit_price",
"check_service_unit_price",
"time_flag",
"create_time",
"update_time",
"detail_more"
],
"connection": [
{
"jdbcUrl": [
"${src_jdbc}"
],
"table": [
"t_equipment_charge_order_detail"
]
}
],
"username": "${src_user}",
"password": "${src_pwd}",
"where": ""
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": [
"${dest_load_url}"
],
"column": [
"id",
"order_no",
"charge_degree",
"service_fee",
"electric_fee",
"cost_fee",
"service_fee_unit_price",
"electric_fee_unit_price",
"cost_fee_unit_price",
"activity_service_fee",
"activity_electric_fee",
"charge_startr_time",
"charge_end_time",
"charge_meal_electric_fee",
"charge_meal_service_fee",
"charge_meal_total_fee",
"electric_card_use_dgree",
"check_electric_unit_price",
"check_service_unit_price",
"time_flag",
"create_time",
"update_time",
"detail_more"
],
"username": "${dest_user}",
"password": "${dest_pwd}",
"postSql": [],
"preSql": [
"TRUNCATE TABLE t_equipment_charge_order_detail"
],
"flushInterval": 30000,
"connection": [
{
"jdbcUrl": "${dest_jdbc}",
"selectedDatabase": "yltcharge",
"table": [
"t_equipment_charge_order_detail"
]
}
],
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}

View File

@@ -0,0 +1,168 @@
#!/bin/bash
# 配置路径
DATAX_HOME="/usr/local/datax"
CONF_DIR="$DATAX_HOME/conf"
BIN_DIR="$DATAX_HOME/bin"
DATAX_PY="$BIN_DIR/datax.py"
# 明确指定要执行的10个文件按你需要的顺序
JOBS=(
"t_equipment_charge_order.json"
"t_equipment_charge_order_detail.json"
)
# 定义全局参数
PARAMS="-Dsrc_user=ylt -Dsrc_pwd=Ycharge666 -Dsrc_jdbc=jdbc:mysql://rm-bp1ux6tuk49er80t9xo.mysql.rds.aliyuncs.com:3306/yltcharge -Ddest_user=root -Ddest_pwd=DsideaL147258369 -Ddest_load_url=10.10.14.204:8030 -Ddest_jdbc=jdbc:mysql://10.10.14.204:9030/yltcharge"
# 在脚本最开始记录开始时间
SCRIPT_START_TIME=$(date '+%Y-%m-%d %H:%M:%S')
SCRIPT_START_TIMESTAMP=$(date +%s)
echo "DataX顺序执行脚本"
echo "=========================="
echo "DataX路径: $DATAX_PY"
echo "配置文件目录: $CONF_DIR"
echo "任务数量: ${#JOBS[@]}"
echo "脚本开始时间: $SCRIPT_START_TIME"
echo "=========================="
# 切换到配置文件目录
cd "$CONF_DIR" || {
echo "错误: 无法进入配置文件目录 $CONF_DIR"
exit 1
}
echo "当前目录: $(pwd)"
echo
# 执行计数器
SUCCESS_COUNT=0
FAIL_COUNT=0
TOTAL=${#JOBS[@]}
CURRENT=0
# 数组存储每个任务的执行详情
declare -a TASK_RESULTS=()
declare -a TASK_DURATIONS=()
declare -a TASK_STATUSES=()
for JOB in "${JOBS[@]}"; do
CURRENT=$((CURRENT + 1))
echo "任务 [$CURRENT/$TOTAL] $JOB"
echo "任务开始时间: $(date '+%H:%M:%S')"
echo "----------------------------------------"
# 检查文件是否存在
if [ ! -f "$JOB" ]; then
echo "✗ 错误: 文件不存在 - $JOB"
FAIL_COUNT=$((FAIL_COUNT + 1))
TASK_RESULTS+=("$JOB")
TASK_STATUSES+=("文件不存在")
TASK_DURATIONS+=("0")
echo "----------------------------------------"
continue
fi
# 记录开始时间(用于计算耗时)
START_TIME=$(date +%s)
# 执行任务 (添加 -p 参数传递变量)
/usr/bin/python "$DATAX_PY" -p "$PARAMS" "$JOB"
EXIT_CODE=$?
# 计算耗时
END_TIME=$(date +%s)
DURATION=$((END_TIME - START_TIME))
# 保存任务详情
TASK_RESULTS+=("$JOB")
TASK_DURATIONS+=("$DURATION")
# 处理执行结果
if [ $EXIT_CODE -eq 0 ]; then
echo "✓ 成功: $JOB (耗时: ${DURATION}秒)"
SUCCESS_COUNT=$((SUCCESS_COUNT + 1))
TASK_STATUSES+=("成功")
else
echo "✗ 失败: $JOB (耗时: ${DURATION}秒)"
echo "错误代码: $EXIT_CODE"
FAIL_COUNT=$((FAIL_COUNT + 1))
TASK_STATUSES+=("失败")
# 询问是否继续执行后续任务
echo "----------------------------------------"
read -p "任务失败,是否继续执行下一个任务?(y/n): " -n 1 -r
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
echo "用户选择停止执行"
break
fi
fi
echo "任务结束时间: $(date '+%H:%M:%S')"
echo "----------------------------------------"
echo
# 可选:任务间等待时间(秒)
# sleep 1
done
# 记录脚本结束时间
SCRIPT_END_TIME=$(date '+%Y-%m-%d %H:%M:%S')
SCRIPT_END_TIMESTAMP=$(date +%s)
TOTAL_DURATION=$((SCRIPT_END_TIMESTAMP - SCRIPT_START_TIMESTAMP))
echo "=========================="
echo "执行结果汇总"
echo "=========================="
echo "脚本开始时间: $SCRIPT_START_TIME"
echo "脚本结束时间: $SCRIPT_END_TIME"
echo "脚本总耗时: ${TOTAL_DURATION}"
echo "=========================="
echo "任务执行详情:"
echo "----------------------------------------"
# 输出每个任务的执行结果
for i in "${!TASK_RESULTS[@]}"; do
STATUS_ICON="✓"
if [ "${TASK_STATUSES[$i]}" = "失败" ] || [ "${TASK_STATUSES[$i]}" = "文件不存在" ]; then
STATUS_ICON="✗"
fi
printf "%-40s %-2s %-10s %-8s秒\n" \
"${TASK_RESULTS[$i]}" \
"$STATUS_ICON" \
"${TASK_STATUSES[$i]}" \
"${TASK_DURATIONS[$i]}"
done
echo "----------------------------------------"
echo "汇总统计:"
echo "总任务数: $TOTAL"
echo "成功: $SUCCESS_COUNT"
echo "失败: $FAIL_COUNT"
if [ $TOTAL -gt 0 ]; then
SUCCESS_RATE=$((SUCCESS_COUNT * 100 / TOTAL))
echo "成功率: ${SUCCESS_RATE}%"
fi
# 计算平均耗时
if [ ${#TASK_DURATIONS[@]} -gt 0 ]; then
TOTAL_TASK_TIME=0
VALID_TASKS=0
for duration in "${TASK_DURATIONS[@]}"; do
if [ "$duration" -gt 0 ]; then
TOTAL_TASK_TIME=$((TOTAL_TASK_TIME + duration))
VALID_TASKS=$((VALID_TASKS + 1))
fi
done
if [ $VALID_TASKS -gt 0 ]; then
AVG_DURATION=$((TOTAL_TASK_TIME / VALID_TASKS))
echo "平均任务耗时: ${AVG_DURATION}"
fi
fi
echo "=========================="

View File

@@ -0,0 +1,28 @@
#### 一、下载
https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
#### 二、部署
1、将 `datax.tar.gz` 文件放到`/usr/local/` 目录下
```shell
cd /usr/local
tar zxvf datax.tar.gz
```
2、将配置文件目录中所有`json`文件放到 `/usr/local/datax/conf/` 目录下
注意:将实际情况修改`json`文件中数据库连接信息。
3、将 `run_jobs.sh` 脚本放到 `/usr/local/` 目录下
```shell
chmod +x run_jobs.sh
```
#### 三、运行
```shell
/usr/local/run_jobs.sh
```