注册
dmfldr导入csv文件至DM8
技术分享/ 文章详情 /

dmfldr导入csv文件至DM8

孙煜程 2025/08/15 62 0 0

银行信创项目测试数据迁移实践

一、项目背景

1.1 迁移需求

  • 项目类型:银行信创项目测试环境搭建
  • 数据来源:东方通系统导出的CSV文件
  • 目标环境:达梦数据库(DM8)
  • 数据规模
    • 测试数据:708张表,总计30GB
    • 生产数据:预计500GB(后续迁移)

1.2 数据特征

特征项 参数值
文件格式 CSV
字段分隔符 ,
文本包围符 "
表结构 已预先创建

二、技术方案选型

2.1 可选方案对比

方案 优点 缺点 适用场景
DTS工具 可视化操作,功能全面 资源消耗大 中小规模数据迁移
dmfldr工具 高性能,资源占用低 需要编写控制脚本 大规模数据迁移

2.2 最终选择

选择dmfldr工具,原因:

  1. 测试环境云桌面内存资源有限
  2. 需要处理的数据量较大(30GB)
  3. 生产环境需要迁移500GB数据,需要高性能方案

三、实施过程

3.1 单表验证阶段

3.1.1 控制文件配置

迁移思路:先测试功能,单个表功能导入正常后编写批量装载脚本
dmfldr使用手册中记载了命令行直接执行或者ctl文件的方式,考虑到是自定义分隔符以及包围符,为避免命令过长容易出错的情况,使用ctl进行配置

配置文件AMS_CAR_TRANS_SERL_BAK.ctl:

INFILE '/data/DM/AMS_CAR_TRANS_SERL_BAK.csv'
INTO TABLE xyk.AMS_CAR_TRANS_SERL_BAK
FIELDS TERMINATED BY ',' 
OPTIONALLY ENCLOSED BY '"'

3.1.2 问题排查

创建完成后执行命令

dmfldr userid=$DB_USER/$DB_PASS@$DB_HOST:$DB_PORT  control=’/home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.ctl’
log=’/home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.log’

此时报错

参数解析错误 /

而命令中control的开头就是/符号,所以怀疑是’未识别到导致的参数解析出错,将命令改为

dmfldr userid=$DB_USER/$DB_PASS@$DB_HOST:$DB_PORT  control=’home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.ctl’
log=’/home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.log’

此时执行时会报错:

参数解析错误 h

验证猜想,于是加上转义符:

dmfldr userid=$DB_USER/$DB_PASS@$DB_HOST:$DB_PORT  
control=\’/home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.ctl\’
log=\’/home/dmdba/import_log/AMS_CAR_TRANS_SERL_BAK.log\’

此时成功解析到了ctl文件,但是报错

控制文件语法分析出错 ENCLOSED

先尝试删除配置文件中的OPTIONALLY ENCLOSED BY ‘”’后语法解析通过,但是装载过程中报错字符截断
image.png

通过查询表结构以及数据文件,发现数据实际长度小于表结构中规定的字段长度
image.png
image.png
怀疑是去掉OPTIONALLY ENCLOSED BY之后导致装载工具将“”也识别为数据了,将数据文件中的字符串剪短后装载成功,通过查询表中数据发现确实将””识别为了数据

重新查看手册,发现配置文件打错了,将ENCLOSED改为ENCLOSE,装载成功,数据也正确
image.png

3.2 批量导入脚本编写

在验证了一个ctl的可用性之后,就考虑708张表如何实现批量操作,思考后决定编写shell脚本,为每张csv文件生成ctl文件后批量调用装载工具进行装载
脚本如下:

#!/bin/bash
# DM8 CSV批量导入脚本
# 作者:孙煜程
# 日期:2025-08-05
# 用途:将/home/dmdba/xykdatafile目录下的CSV文件导入到同名表中

# ------------------------- 配置区域 -------------------------
# 数据库连接信息
DB_USER=""
DB_PASS=""
DB_HOST=""
DB_PORT=""

# 数据文件目录
DATA_DIR="/home/dmdba/xykdatafile"

# 日志目录
LOG_DIR="/home/dmdba/import_logs"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)

# 控制文件模板(将根据实际表结构调整)
CTL_TEMPLATE="LOAD DATA
INFILE '%filename%'
BADFILE '%badfile%'
REPLACE INTO TABLE %tablename%
FIELDS TERMINATED BY ',' 
OPTIONALLY ENCLOSE BY '\"'
"

# ------------------------- 函数定义 -------------------------
# 创建日志目录
create_log_dir() {
  if [ ! -d "$LOG_DIR" ]; then
    mkdir -p "$LOG_DIR"
    chown dmdba:dinstall "$LOG_DIR"
    echo "[INFO] 已创建日志目录: $LOG_DIR"
  fi
}

# 获取表列名
get_table_columns() {
  local table_name=$1
  echo "COL1, COL2, COL3"  # 这里应该替换为实际获取列名的SQL
  # 真实实现示例(需要先安装dmfldr和disql):
  # columns=$(echo "SELECT LISTAGG(COLUMN_NAME, ', ') WITHIN GROUP (ORDER BY COLUMN_ID) 
  #           FROM USER_TAB_COLUMNS WHERE TABLE_NAME = UPPER('$table_name');" | disql -s $DB_USER/$DB_PASS@$DB_HOST:$DB_PORT | grep -v "^$" | tail -1)
  # echo "$columns"
}

# 生成控制文件
generate_control_file() {
  local csv_file=$1
  local table_name=$(basename "$csv_file" .dat)
  local base_name=$(basename "$csv_file")
  
  local bad_file="$LOG_DIR/${table_name}_${TIMESTAMP}.bad"
  local ctl_file="$LOG_DIR/${table_name}_${TIMESTAMP}.ctl"
  
  # 获取表列名(实际使用时请取消注释下面的行)
  # columns=$(get_table_columns "$table_name")
  columns="COL1, COL2, COL3"  # 临时占位符,请替换
  
  # 生成控制文件内容
  echo "$CTL_TEMPLATE" | \
    sed "s|%filename%|$csv_file|g; \
         s|%tablename%|$table_name|g; \
         s|%badfile%|$bad_file|g; \" > "$ctl_file"
  
  echo "$ctl_file"
}

# 执行数据导入
execute_import() {
  local ctl_file=$1
  local table_name=$(basename "$ctl_file" "_${TIMESTAMP}.ctl")
  local log_file="$LOG_DIR/${table_name}_${TIMESTAMP}.log"
  
  echo "[INFO] 开始导入表: $table_name ..."
  echo "       控制文件: $ctl_file"
  echo "       日志文件: $log_file"
  
  # 执行dmfldr导入
  dmfldr userid=$DB_USER/$DB_PASS@$DB_HOST:$DB_PORT \
         control="$ctl_file" \
         log="$log_file"
  
 # 检查数据错误行数
  error_count=$(grep "行由于数据错误没有加载" "$log_file" | awk '{print $1}')
  
  if [ -z "$error_count" ]; then
    echo "[SUCCESS] 表 $table_name 导入完成,日志中未找到数据错误记录"
    return 0
  elif [ "$error_count" -eq 0 ]; then
    echo "[SUCCESS] 表 $table_name 导入完成,0 行数据错误"
    return 0
  else
    echo "[ERROR] 表 $table_name 导入发现 $error_count 行数据错误"
    return 1
  fi
}

# ------------------------- 主程序 -------------------------
echo "=============================================="
echo " DM8 CSV批量导入脚本启动"
echo " 数据目录: $DATA_DIR"
echo " 开始时间: $(date)"
echo "=============================================="

# 创建日志目录
create_log_dir

# 计数器
TOTAL_FILES=0
SUCCESS_COUNT=0
FAILED_COUNT=0

# 遍历数据目录下的CSV文件
for csv_file in "$DATA_DIR"/*.dat; do
  if [ -f "$csv_file" ]; then
    ((TOTAL_FILES++))
    
    # 生成控制文件
    ctl_file=$(generate_control_file "$csv_file")
    
    # 执行导入
    if execute_import "$ctl_file"; then
      ((SUCCESS_COUNT++))
    else
      ((FAILED_COUNT++))
    fi
    
    # 小延迟,避免系统负载过高
    sleep 1
  fi
done

# 输出汇总报告
echo "=============================================="
echo " 导入完成报告"
echo " 总文件数: $TOTAL_FILES"
echo " 成功导入: $SUCCESS_COUNT"
echo " 失败导入: $FAILED_COUNT"
echo " 日志目录: $LOG_DIR"
echo " 结束时间: $(date)"
echo "=============================================="

exit 0

https://eco.dameng.com

评论
后发表回复

作者

文章

阅读量

获赞

扫一扫
联系客服