注册
DRS 实现 DM8 过程数据同步的 CVT 脚本实战
培训园地/ 文章详情 /

DRS 实现 DM8 过程数据同步的 CVT 脚本实战

爱吃鱼的猫 2026/03/18 259 0 0

DRS 实现 DM8 过程数据同步的 CVT 脚本实战

1. 项目概述与需求分析

1.1 项目背景

        为推进某项目国产数据库适配工作,需完成源端DM8DSC目的端DM8单机的业务数据同步。该场景属于电网级数据同步需求,对数据可追溯性、操作审计能力有强制要求,需实现过程数据的全链路记录。

1.2 核心需求

  1. 同步过程需完整记录数据操作轨迹,包含操作类型(新增/删除)、操作时间、数据入库顺序;
  2. 支持全量数据初始化与增量数据实时同步的协同,确保过程数据的时序一致性;
  3. 同步逻辑需满足政务数据合规要求,保留数据变更前后的完整状态,支持故障回滚与操作审计。

1.3 技术选型

        选用达梦DRS作为数据同步工具,通过CVT自定义脚本实现DML操作重写,核心依赖DM8的归档日志与逻辑日志捕获能力,将所有DELETE/UPDATE操作转换为INSERT操作,通过自定义标识字段实现过程数据的可追溯性。

2. 技术原理与方案设计

2.1 核心原理

  1. 日志捕获机制:源端DM8开启归档日志与逻辑日志后,DRS的CPT(捕获模块)实时捕获所有DML/DDL操作日志,作为同步数据的数据源;
  2. CVT数据转换机制:数据转换是指对源数据库的全量数据和增量数据进行自定义转换,可用于数据迁移、数据同步和数据分发等场景,满足多样化的数据转化需求。该功能支持在源DMDRS服务或目标DMDRS服务中部署,可实时对增量数据进行数据内容转换、数据拆分和函数处理等操作,CVT模块针对每一行记录进行实时转换;
  3. DML操作重写:基于CVT可编程能力,对捕获的DML操作进行二次解析,将UPDATE操作拆分为两条INSERT操作(旧数据标记为删除、新数据标记为新增),将DELETE操作转换为INSERT操作(标记为删除);
  4. 过程数据存储:目的端表新增操作标识列、操作时间列、自增序列列,分别记录操作类型、同步时间、数据入库顺序,实现过程数据的时序化追溯。

2.2 实现逻辑

  1. 给目标端的表增加操作时间列和操作标识列(I 新增,D 删除)
  2. 开启同步前的全量数据使用drs直接同步。标识列为I,操作时间为数据的插入时间。自增列的值为0。
  3. 开启同步后的增量数据,自增列按照增量数据入库顺序依次增加。标识列为:
  4. 当源端是插入语句时将删除行的全部信息(该行包含所有列的值)同步到目标端标识列为I;
  5. 当源端是删除语句时将删除行的全部信息(该行包含所有列的值)同步到目标端标识列为D;
  6. 当源端是更新操作时,同步改行的全部信息(该行更新前所有列的值)同步到目标端,标识列为D。同时在将改行的全部信息(该行更新后所有列的值)同步到目标端,标识列为I。

2.3 方案架构

数据同步场景(本项目采用)
image.png

数据分发场景(扩展架构)
image.png

        在本项目的同步场景中,需在目标DMDRS中通过CVT完成数据转换(DML操作重写)后,再同步到目标数据库;若为数据分发场景,则需在源DMDRS中完成数据转换后,再分发给多个目标DMDRS并同步至对应数据库。

2.4 CVT数据转换核心特性

  1. 部署灵活性:可在源DMDRS服务或者目标DMDRS服务中启动CVT数据转换功能,适配不同业务场景需求;
  2. 实时处理能力:支持对增量数据进行实时的数据内容转换、数据拆分和函数处理,针对每一行记录进行逐行转换;
  3. 可编程扩展:提供自定义可编程的方式编写转换功能及转换逻辑,内置丰富的处理流程及处理函数,支持HASH、B树等复杂数据结构进行数据处理。

3. 环境信息和测试过程

3.1 环境信息

配置项 源端(2节点DSC集群) 目的端(单机环境)
数据库版本 1-2-94-21.11.11-150650-10038-ENT 1-2-94-21.11.11-150650-10038-ENT
数据库架构 DSC环境 单机环境
数据库端口 5236 5236
DRS版本 dmdrs_rev168340_x86_rh6_64_20240823 dmdrs_rev168340_x86_rh6_64_20240823
操作系统 CentOS 7.9 CentOS 7.9

3.2 源端和目标端的测试表结构的定义如下:

create table TEST.T1 (id INTEGER,name VARCHAR2(200));

3.3 目的端表结构改造

在目的端所有需同步的表中新增3个过程数据字段,SQL示例如下:

-- 新增操作标识列:I-新增,D-删除 ALTER TABLE TEST.T1 ADD COLUMN BIAOSHI VARCHAR(1) default ('I') ; -- 新增操作时间列:默认取数据插入时间 ALTER TABLE TEST.T1 ADD COLUMN OP_TIME TIMESTAMP(6) DEFAULT SYSDATE ; -- 新增自增序列列:记录数据入库顺序 ALTER TABLE TEST.T1 ADD COLUMN ZIZENG BIGINT AUTO_INCREMENT UNIQUE;

4. 核心配置与实操步骤

4.1 DRS工具安装

源端与目的端均需安装DRS,安装步骤如下:

# 切换至dmdba用户 su - dmdba # 执行DRS安装包 .dmdrs_rev*.bin -i Please select the installer's language (E/e:English C/c:Chinese)[E/e]:c -----------欢迎使用达梦数据复制软件安装工具----------- 输入[exit]可退出安装。 -----------安装目录----------- 指定安装目录[/home/dmdba/dmdrs5]: -----------安装组件----------- 请选择需要安装的组件 1.安装达梦数据融合管理平台 2.安装代理 请选择安装组件数字序号(使用空格间隔): 未选择组件 "确认?[Y/y(确认选择) or N/n(重新选择)]:y -----------许可证文件----------- 1.免费试用达梦数据复制软件(必须在试用期范围内使用,反复安装无效,使用时间为3个月) 2.使用许可证文件 指定许可证文件(1,2)[1]: 免费试用许可证文件限制信息如下: 有效日期:2024-08-28 授权顾客名称:DEVELOP USER 项目名称: 许可证编号:dm66n367 版本类型:试用版 授权数据库类型:DM6,DM7,DM8,Oracle,SQL Server,MySQL,DB2,PostgreSQL,Kafka 当前默认许可证文件支持数据库类型为[DM6,DM7,DM8,Oracle,SQL Server,MySQL,DB2,PostgreSQL,Kafka],是否继续安装?[Y/y or N/n]:y -----------配置----------- 依赖环境配置 -数据库动态库路径[/home/dmdba/dmdbms/bin]: -----------安装小结----------- 安装目录:[/home/dmdba/dmdrs5] 依赖环境配置-NEED_LIB_PATH:[/home/dmdba/dmdbms/bin] 所需磁盘空间/可用磁盘空间:[1,371 MB/27,992 MB] 确认安装?[Y/y or N/n]:y -----------安装中----------- server start ... server finished. default start ... default finished. doc start ... doc finished. 安装成功 -----------安装总结----------- 达梦数据复制软件V5安装完成 更多安装信息,请查看安装日志文件:/home/dmdba/dmdrs5/log/install.log

4.2 开启源端归档和逻辑日志

        为了保证源DMDRS服务运行中数据的一致性,源DMDRS服务需要读源数据库的归档和逻辑日志,因此源数据库需要开启归档和逻辑日志功能。
设置DM8数据库配置文件“dm.ini”中ARCH_INI参数为1,RLOG_APPEND_LOGIC的参数为1。

-- 开启逻辑日志 CALL SP_SET_PARA_VALUE(1,'RLOG_APPEND_LOGIC',1); -- 验证参数配置 SELECT PARA_NAME, PARA_VALUE FROM V$DM_INI WHERE PARA_NAME IN ('RLOG_APPEND_LOGIC');

4.3 DDL同步配置

辅助表方式(推荐)
在源数据库执行“ddl_sql_dm8.sql”脚本创建辅助表,脚本默认位置在DMDRS执行程序目录下的scripts子目录中(/home/dmdba/dmdrs5/bin/scripts/)

cd /home/dmdba/dmdrs5/bin/scripts/ disql SYSDBA/SYSDBA@127.0.0.1:5236

在disql中执行:

SQL> set define off; SQL> start ddl_sql_dm8.sql 检查是否有效: select owner, trigger_name from dba_triggers where owner = 'SYSDBA' and trigger_name like 'DRS_$%' and status = 'Y'; select owner, table_name from dba_tables where owner = 'SYSDBA' and table_name like 'DRS_$%' and status = 'VALID';

4.4 同步用户创建

源端与目的端均需创建同步用户并赋予权限:

CREATE USER "DMDRS" IDENTIFIED BY "Dmdrs@123"; GRANT "DBA","RESOURCE","PUBLIC","VTI","SOI","SVI" TO "DMDRS";

4.5 环境变量配置

DMDRS服务运行过程中,需要使用数据库的驱动文件进行数据库访问。
在终端的配置文件中添加设置环境变量命令。

vi ~/.bash_profile

添加以下内容:

export LD_LIBRARY_PATH=/home/dmdba/dmdbms/drivers/dpi:$LD_LIBRARY_PATH

执行source ~/.bash_profile使环境变量生效,通过echo $LD_LIBRARY_PATH验证配置结果。

4.6 DRS核心配置

4.6.1 源端drs.xml配置

源端drs.xml配置文件路径为/home/dmdba/dmdrs5/bin/drs.xml,完整配置如下:

<?xml version="1.0" encoding="GB18030"?> <drs> <base> <mgr_port>5345</mgr_port> <siteid>142</siteid> <lang>ch-utf8</lang> <mem_size>4</mem_size> </base> <cpt> <name>CPT</name> <login> <dbtype>DM8</dbtype> <server>DM8DSC</server> <user>SYSDBA</user> <pwd>SYSDBA</pwd> <port>5236</port> </login> <ddl_mask>TABLE:ALTER</ddl_mask> <char_code>GB18030</char_code> <log_buf_size>512</log_buf_size> <rac> <nodes>2</nodes> <rac_type>ASM</rac_type> <login> <server>127.0.0.1</server> <user>default</user> <pwd>default</pwd> <port>9351</port> </login> <dir_replace> <item>0#+DMARCH/ARCH0</item> <item>1#+DMARCH/ARCH1</item> </dir_replace> </rac> <send> <ip>192.168.49.131</ip> <port>5345</port> <target_name>EXEC</target_name> <map> <item>*.*==*.*</item> </map> </send> <load> <enable>1</enable> <threads>64</threads> </load> </cpt> </drs>

4.6.2 目的端drs.xml配置

目的端drs.xml配置文件路径为/home/dmdba/dmdrs5/bin/drs.xml,完整配置如下:

<?xml version="1.0" encoding="GB18030"?> <drs> <base> <mgr_port>5345</mgr_port> <siteid>141</siteid> <lang>ch-utf8</lang> </base> <exec> <name>EXEC</name> <login> <dbtype>DM8</dbtype> <server>192.168.49.131</server> <user>SYSDBA</user> <pwd>SYSDBA</pwd> <port>5236</port> </login> <char_code>GB18030</char_code> <threads>60</threads> <load_threads>60</load_threads> <seq_sync_mode>1</seq_sync_mode> <cvt_dir>/home/dmdba/dm/dmdrs5/bin/cvt_1</cvt_dir> <group> <item> <id>0</id> <merge_policy>0</merge_policy> <error_policy>1</error_policy> <desc> <table>*.*</table> </desc> <max_thrs>64</max_thrs> </item> </group> </exec> </drs>

4.6.3 CVT脚本配置

在源DMDRS配置文件的cpt标签或目标DMDRS配置文件的exec标签下增加cvt_dir参数,用来指定CVT脚本所在的目录,DMDRS会解析该目录下所有名称合法的文件。
同步源端过程数据基本配置如下:

TABLE "TEST"."*"
DECLARE
    old_cols     DRS.DML_COL_ARR;
    new_cols    DRS.DML_COL_ARR;    
    i			INT;
BEGIN
    IF op.#OP = 'UPDATE' THEN    
        old_cols := op.GET_OLDCOLS();  
        new_cols := op.GET_NEWCOLS();
		 op.#op := 'INSERT';
        op.set_cols(old_cols);
		op.ADD_COL('BIAOSHI', 'D');
	   DRS_SYS.PUT_LINE('NEW SQL is : '|| op.#SQL);
	    op.exec();	
	    op.#op :='INSERT';
	    op.SET_COLS(new_cols);
	    FOR i IN 1..old_cols.SIZE LOOP  
	        if op.HAS_NEWCOL(old_cols[i].NAME) = 0 then
                op.ADD_COL(old_cols[i].NAME, old_cols[i].VALUE);
            end if;
        end loop; 
        op.ADD_COL('BIAOSHI', 'I');
	 DRS_SYS.PUT_LINE('NEW SQL is : '|| op.#SQL);
	    op.exec();	    
	    return;
    ELSIF op.#OP ='INSERT' THEN
	     op.ADD_COL('BIAOSHI', 'I');
	 DRS_SYS.PUT_LINE('NEW SQL is : '|| op.#SQL);
         op.exec();
         return;
    ELSIF op.#OP ='DELETE'  THEN
        old_cols := op.GET_OLDCOLS();
        op.#OP := 'INSERT';	
        op.SET_COLS(old_cols);
		op.ADD_COL('BIAOSHI', 'D');
	 DRS_SYS.PUT_LINE('NEW SQL is : '|| op.#SQL);
        op.exec();
        return;
    ELSE
	     RETURN;
    END IF; 
END;

脚本逻辑说明:

  1. 对UPDATE操作拆分为两条INSERT操作,分别记录更新前后的数据状态;
  2. 对INSERT操作直接添加新增标识;
  3. 对DELETE操作转换为INSERT操作并添加删除标识;
  4. 补全UPDATE操作中未被修改的字段,确保数据完整性。

4.7 DRS服务启动

1.启动源端drs服务

cd /home/dmdba/dmdrs5/bin ./DrsService start

2.启动目标端drs服务

cd /home/dmdba/dmdrs5/bin ./DrsService start

目标DMDRS服务中的EXEC模块默认是自动启动,如需关闭EXEC模块,可输入stop命令。
3.启动源端控制台

./drcsl cpt.xml

4.连接源DMDRS服务

CSL> connect

5.设置lsn

CSL> alter cpt set lsn

6.装载TEST模式字典

CSL>alter cpt add dict  "sch.name='TEST'"

7.启动cpt

CSL> start

5. 测试验证

INSERT操作验证

在源端执行INSERT操作:

INSERT INTO TEST.T1 VALUES (1,'zhangsan');

在目的端执行查询:

SELECT ID, NAME, BIAOSHI, OP_TIME, ZIZENG FROM TEST.T1;

image.png

UPDATE操作验证

在源端执行UPDATE操作:

UPDATE TEST.T1 SET NAME = 'lisi' WHERE ID = 1;

在目的端执行查询:

SELECT ID, NAME, BIAOSHI, OP_TIME, ZIZENG FROM TEST.T1 WHERE ID = 1 ORDER BY ZIZENG;

预期结果:查询到2条数据,旧数据BIAOSHI为'D',新数据BIAOSHI为'I',ZIZENG按入库顺序递增。
image.png

5.2.3 DELETE操作验证

在源端执行DELETE操作:

DELETE FROM TEST.T1 WHERE ID = 1;

在目的端执行查询:

SELECT ID, NAME, BIAOSHI, OP_TIME, ZIZENG FROM TEST.T1 WHERE ID = 1 ORDER BY ZIZENG;

预期结果:新增1条数据,BIAOSHI为'D',保留删除前的完整数据。
image.png

6. 常见问题与故障排查

6.1 字符串截断报错

故障现象

使用DMDRS进行DM到DM装载数据时,目的端报错:字符串截断。

根因分析

源端与目的端表字段长度不匹配,或字符编码差异导致数据长度超出目的端字段限制。

解决方案

在目的端配置文件drs.xmlexec节点下添加以下参数:

<char_length_expand>4</char_length_expand>

参数说明:同步表列精度需要扩大的倍数,取值范围为0~10,默认值为0。配置后DRS会自动扩大目的端字段长度,避免字符串截断。

6.2 执行端内存不足报错

故障现象

配置Oracle到DM(或DM到DM)的DRS数据装载时,报错:执行端内存不足。

根因分析

DRS服务程序运行时申请的内存超过配置的最大内存上限,导致内存不足。

解决方案

在目的端drs.xmlbase节点下调整mem_size参数:

<mem_size>8</mem_size>

参数说明:服务程序运行时可以申请的最大内存,取值范围0~255(单位:GB),默认值为16。需根据服务器实际内存配置,避免参数值超过物理内存导致系统OOM。

6.3 动态库文件加载失败(DRS-3009)

故障现象

配置DM到DM的DRS数据同步,源端启动DRS服务时报错:DRS-3009 动态库文件加载失败 library_path: libz.so。

根因分析

DRS启动时无法找到依赖的libz.so动态库,不同DM版本的libz.so依赖库路径存在差异。

解决方案

  1. 首先通过find命令查找libz.so的实际路径:
[root@db1 ~]# find / -name libz.so /home/dmdba/dmdbms/bin/dependencies/libz.so
  1. 将找到的路径添加到LD_LIBRARY_PATH环境变量中:
vi ~/.bash_profile # 添加以下内容 export LD_LIBRARY_PATH=/home/dmdba/dmdbms/bin/dependencies:$LD_LIBRARY_PATH # 使环境变量生效 source ~/.bash_profile
  1. 重启DRS服务,验证启动是否正常。

6.4 DRS服务启动失败

故障现象

执行./DrsService start后服务无法启动,日志提示“无法加载数据库驱动”。

根因分析

LD_LIBRARY_PATH环境变量未正确配置,DRS无法找到DM8驱动文件。

解决方案

  1. 验证环境变量配置是否正确,驱动文件路径是否存在;
  2. 执行source ~/.bash_profile使环境变量生效;
  3. 确认驱动文件权限为dmdba所有。

6.5 数据同步失败可能是一下原因导致

  1. 源端未开启归档/逻辑日志,或未重启数据库生效;
  2. 源端DRS捕获模块未启动,或未装载模式字典;
  3. 源端与目的端网络不通,5345端口未开放;
  4. drs.xml中同步对象配置错误。

6.6 CVT脚本执行报错

故障现象

DRS日志提示“列BIAOSHI不存在”。

根因分析

目的端表未提前创建BIAOSHI列,或脚本中列名书写错误。

解决方案

  1. 验证目的端表结构是否新增过程数据字段;
  2. 检查CVT脚本中ADD_COL的列名与表结构是否一致,注意大小写敏感。
评论
后发表回复

作者

文章

阅读量

获赞

扫一扫
联系客服