特征项 | 参数值 |
---|---|
文件格式 | CSV |
字段分隔符 | , |
文本包围符 | " |
表结构 | 已预先创建 |
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
DTS工具 | 可视化操作,功能全面 | 资源消耗大 | 中小规模数据迁移 |
dmfldr工具 | 高性能,资源占用低 | 需要编写控制脚本 | 大规模数据迁移 |
选择dmfldr工具,原因:
迁移思路:先测试功能,单个表功能导入正常后编写批量装载脚本
dmfldr使用手册中记载了命令行直接执行或者ctl文件的方式,考虑到是自定义分隔符以及包围符,为避免命令过长容易出错的情况,使用ctl进行配置
配置文件AMS_CAR_TRANS_SERL_BAK.ctl:
INFILE '/data/DM/AMS_CAR_TRANS_SERL_BAK.csv'
INTO TABLE xyk.AMS_CAR_TRANS_SERL_BAK
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
创建完成后执行命令
dmfldr userid=$DB_USER/$DB_PASS@$DB_HOST:$DB_PORT control=’/home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.ctl’
log=’/home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.log’
此时报错
参数解析错误 /
而命令中control的开头就是/符号,所以怀疑是’未识别到导致的参数解析出错,将命令改为
dmfldr userid=$DB_USER/$DB_PASS@$DB_HOST:$DB_PORT control=’home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.ctl’
log=’/home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.log’
此时执行时会报错:
参数解析错误 h
验证猜想,于是加上转义符:
dmfldr userid=$DB_USER/$DB_PASS@$DB_HOST:$DB_PORT
control=\’/home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.ctl\’
log=\’/home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.log\’
此时成功解析到了ctl文件,但是报错
控制文件语法分析出错 ENCLOSED
先尝试删除配置文件中的OPTIONALLY ENCLOSED BY ‘”’后语法解析通过,但是装载过程中报错字符截断
通过查询表结构以及数据文件,发现数据实际长度小于表结构中规定的字段长度
怀疑是去掉OPTIONALLY ENCLOSED BY之后导致装载工具将“”也识别为数据了,将数据文件中的字符串剪短后装载成功,通过查询表中数据发现确实将””识别为了数据
重新查看手册,发现配置文件打错了,将ENCLOSED改为ENCLOSE,装载成功,数据也正确
在验证了一个ctl的可用性之后,就考虑708张表如何实现批量操作,思考后决定编写shell脚本,为每张csv文件生成ctl文件后批量调用装载工具进行装载
脚本如下:
#!/bin/bash
# DM8 CSV批量导入脚本
# 作者:孙煜程
# 日期:2025-08-05
# 用途:将/home/dmdba/xykdatafile目录下的CSV文件导入到同名表中
# ------------------------- 配置区域 -------------------------
# 数据库连接信息
DB_USER=""
DB_PASS=""
DB_HOST=""
DB_PORT=""
# 数据文件目录
DATA_DIR="/home/dmdba/xykdatafile"
# 日志目录
LOG_DIR="/home/dmdba/import_logs"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
# 控制文件模板(将根据实际表结构调整)
CTL_TEMPLATE="LOAD DATA
INFILE '%filename%'
BADFILE '%badfile%'
REPLACE INTO TABLE %tablename%
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSE BY '\"'
"
# ------------------------- 函数定义 -------------------------
# 创建日志目录
create_log_dir() {
if [ ! -d "$LOG_DIR" ]; then
mkdir -p "$LOG_DIR"
chown dmdba:dinstall "$LOG_DIR"
echo "[INFO] 已创建日志目录: $LOG_DIR"
fi
}
# 获取表列名
get_table_columns() {
local table_name=$1
echo "COL1, COL2, COL3" # 这里应该替换为实际获取列名的SQL
# 真实实现示例(需要先安装dmfldr和disql):
# columns=$(echo "SELECT LISTAGG(COLUMN_NAME, ', ') WITHIN GROUP (ORDER BY COLUMN_ID)
# FROM USER_TAB_COLUMNS WHERE TABLE_NAME = UPPER('$table_name');" | disql -s $DB_USER/$DB_PASS@$DB_HOST:$DB_PORT | grep -v "^$" | tail -1)
# echo "$columns"
}
# 生成控制文件
generate_control_file() {
local csv_file=$1
local table_name=$(basename "$csv_file" .dat)
local base_name=$(basename "$csv_file")
local bad_file="$LOG_DIR/${table_name}_${TIMESTAMP}.bad"
local ctl_file="$LOG_DIR/${table_name}_${TIMESTAMP}.ctl"
# 获取表列名(实际使用时请取消注释下面的行)
# columns=$(get_table_columns "$table_name")
columns="COL1, COL2, COL3" # 临时占位符,请替换
# 生成控制文件内容
echo "$CTL_TEMPLATE" | \
sed "s|%filename%|$csv_file|g; \
s|%tablename%|$table_name|g; \
s|%badfile%|$bad_file|g; \" > "$ctl_file"
echo "$ctl_file"
}
# 执行数据导入
execute_import() {
local ctl_file=$1
local table_name=$(basename "$ctl_file" "_${TIMESTAMP}.ctl")
local log_file="$LOG_DIR/${table_name}_${TIMESTAMP}.log"
echo "[INFO] 开始导入表: $table_name ..."
echo " 控制文件: $ctl_file"
echo " 日志文件: $log_file"
# 执行dmfldr导入
dmfldr userid=$DB_USER/$DB_PASS@$DB_HOST:$DB_PORT \
control="$ctl_file" \
log="$log_file"
# 检查数据错误行数
error_count=$(grep "行由于数据错误没有加载" "$log_file" | awk '{print $1}')
if [ -z "$error_count" ]; then
echo "[SUCCESS] 表 $table_name 导入完成,日志中未找到数据错误记录"
return 0
elif [ "$error_count" -eq 0 ]; then
echo "[SUCCESS] 表 $table_name 导入完成,0 行数据错误"
return 0
else
echo "[ERROR] 表 $table_name 导入发现 $error_count 行数据错误"
return 1
fi
}
# ------------------------- 主程序 -------------------------
echo "=============================================="
echo " DM8 CSV批量导入脚本启动"
echo " 数据目录: $DATA_DIR"
echo " 开始时间: $(date)"
echo "=============================================="
# 创建日志目录
create_log_dir
# 计数器
TOTAL_FILES=0
SUCCESS_COUNT=0
FAILED_COUNT=0
# 遍历数据目录下的CSV文件
for csv_file in "$DATA_DIR"/*.dat; do
if [ -f "$csv_file" ]; then
((TOTAL_FILES++))
# 生成控制文件
ctl_file=$(generate_control_file "$csv_file")
# 执行导入
if execute_import "$ctl_file"; then
((SUCCESS_COUNT++))
else
((FAILED_COUNT++))
fi
# 小延迟,避免系统负载过高
sleep 1
fi
done
# 输出汇总报告
echo "=============================================="
echo " 导入完成报告"
echo " 总文件数: $TOTAL_FILES"
echo " 成功导入: $SUCCESS_COUNT"
echo " 失败导入: $FAILED_COUNT"
echo " 日志目录: $LOG_DIR"
echo " 结束时间: $(date)"
echo "=============================================="
exit 0
https://eco.dameng.com
文章
阅读量
获赞