注册
DMDRS到KAFKA单向同步
培训园地/ 文章详情 /

DMDRS到KAFKA单向同步

眰恦 2025/11/03 135 0 0

DMDRS到KAFKA单向同步

环境规划

数据库规划

image.png

DMDRS规划

image.png

端口规划

image.png

安装jdk

本次使用的是centos7虚拟机,该虚拟机自带jdk环境,若使用其他虚拟机不自带jdk环境,则需要自行安装并设置环境变量,通过java -version能够查询到jdk版本信息说明已经部署好jdk环境
image.png
如果服务器不自带jdk,则在https://www.oracle.com/java/technologies/downloads/#java8链接中下载需要的jdk安装包,随后进行解压,编辑环境变量即可

vim ~/.bashrc

export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_441
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

source ~/.bashrc

安装kafka

下载安装包

通过https://kafka.apache.org/downloads下载需要的kafka安装包,上传到服务器并解压即可,此次使用的安装包为 kafka_2.12-3.1.2.tgz。
解压命令:

tar -zxvf kafka_2.12-3.1.2.tgz

image.png
image.png

创建kafka和zookeeper日志存放路径

[root@localhost opt]# cd kafka_2.12-3.1.2/
[root@localhost kafka_2.12-3.1.2]# mkdir -p logs/server
[root@localhost kafka_2.12-3.1.2]# mkdir logs/zookeeper

修改service.properties文件

cd /opt/kafka_2.12-3.1.2/config
ls -ll

image.png
修改如下内容

broker.id=0
port=9092
host.name=192.168.153.146
log.dirs=/opt/kafka_2.12-3.1.2/logs/server
zookeeper.connect=192.168.153.146:2181

修改zookeeper.properties文件

dataDir=/opt/kafka_2.12-3.1.2/zookeeper
dataLogDir=/opt/kafka_2.12-3.1.2/logs/zookeeper
clientPort=2181
maxClientCnxns=100
tickTime=200
initLimit=10

创建启动服务脚本

[root@localhost bin]# cd /opt/kafka_2.12-3.1.2/bin
[root@localhost bin]# vi kafkaStart.sh 
添加下面的内容:
#!/bin/bash
nohup /opt/kafka_2.12-3.1.2/bin/zookeeper-server-start.sh /opt/kafka_2.12-3.1.2/config/zookeeper.properties 1>& /opt/kafka_2.12-3.1.2/logs/zookeeper/zookeeper.log &
sleep 3
nohup /opt/kafka_2.12-3.1.2/bin/kafka-server-start.sh /opt/kafka_2.12-3.1.2/config/server.properties 1>& /opt/kafka_2.12-3.1.2/logs/server/kafka.log &
[root@localhost bin]# ./kafkaStart.sh start

创建topic

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.153.146:9092 --create --replication-factor 1 --partitions 1 --topic test
Created topic test.

查询topic主题

[root@localhost bin]# ./kafka-topics.sh --list --bootstrap-server 192.168.153.146:9092
test

输入一些信息,然后启动消费者查询观察是否能够收到生产者发送的消息

[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.153.146:9092 --topic test
>test
>product
>id
>exit
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.153.146:9092 --topic test --from-beginning
test
product
id
exit

源端、目的端安装DMDRS

[dmdba@localhost opt]$ ./dmdrs_rev191718_x86_rh6_64_20250619.bin -i
Extract install files......... 

Please select the installer's language (E/e:English C/c:Chinese)[E/e]:c
-----------欢迎使用DMDRS 安装工具-----------
输入[exit]可退出安装。
-----------安装目录-----------
指定安装目录[/home/dmdba/dmdrs5]:
该路径不为空,是否继续安装?(Y/y or N/n)[N/n]:y
-----------安装组件-----------
请选择需要安装的组件
1.安装达梦数据融合管理平台
2.安装代理
请选择安装组件数字序号(使用空格间隔):1 2
选择的组件有:
1.安装达梦数据融合管理平台
2.安装代理
确认?[Y/y(确认选择) or N/n(重新选择)]:y
-----------许可证文件-----------
1.免费试用达梦数据复制软件(必须在试用期范围内使用,反复安装无效,使用时间为3个月)
2.使用许可证文件
指定许可证文件(1,2)[1]:2
请输入许可证文件路径:/opt/dmdrsCB01900614.key
许可证文件限制信息如下:
有效日期:2025-11-25
版本类型:试用版
许可证编号:CB01900614
授权顾客名称:达梦公司产品试用
项目名称:达梦公司产品试用
授权数据库类型:DM6、DM7、DM8、Oracle、SQL Server、MySQL、DB2、PostgreSQL
当前默认许可证文件支持数据库类型为[DM6、DM7、DM8、Oracle、SQL Server、MySQL、DB2、PostgreSQL],是否继续安装?[Y/y or N/n]:y
-----------配置-----------
依赖环境配置 -数据库动态库路径[/opt/dm6/bin]:/home/dmdba/dmdbms/bin
注意:代理配置IP不允许设置成127.0.0.1!
代理配置-代理IP(192.168.153.134):192.168.153.134
代理配置-代理端口[19345]:
达梦数据融合管理平台配置-管理平台端口[8080]:8085
注意:密码必须至少8个字符,并且满足以下条件中的任意两项:数字、大写字母、小写字母、特殊字符(如!@#$%^&*等)。
达梦数据融合管理平台配置-管理平台密码:
达梦数据融合管理平台配置-管理平台确认密码:
是否使用外置库(0:不使用  1:使用 )[0]:0
-----------安装小结-----------
安装目录:[/home/dmdba/dmdrs5]
依赖环境配置-NEED_LIB_PATH:[/home/dmdba/dmdbms/bin]
代理IP:[192.168.153.134]
代理端口:[19345]
达梦数据融合管理平台IP:[127.0.0.1]
达梦数据融合管理平台端口:[8085]
内置库信息:
数据库IP:[127.0.0.1]
数据库端口:[15236]
用户名:[SYSDBA]
密码:[******]
所需磁盘空间/可用磁盘空间:[2,699 MB/34,930 MB]
确认安装?[Y/y or N/n]:y
-----------安装中-----------
server start ...    server finished.
default start ...    default finished.
agent start ...    agent finished.
web start ...    web finished.
doc start ...    doc finished.
db start ...    db finished.
安装成功
-----------系统服务-----------
内置数据库服务设置
1.注册系统服务
2.不注册系统服务
启动方式(1,2)[2]:2
正在创建内置数据库服务....
达梦数据融合管理平台服务设置
1.注册系统服务
2.不注册系统服务
启动方式(1,2)[2]:2
正在创建达梦数据融合管理平台服务....
达梦数据融合管理平台代理服务设置
1.注册系统服务
2.不注册系统服务
启动方式(1,2)[2]:2
正在创建达梦数据融合管理平台代理服务....
正在启动服务[DmServiceDFDB]
正在启动服务[DfmWebService]
正在启动服务[DfmAgentService]
-----------安装总结-----------
达梦数据复制软件V5安装完成
地址:http://127.0.0.1:8085
用户名/密码:admin/******
更多安装信息,请查看安装日志文件:/home/dmdba/dmdrs5/log/install.log


源端数据库配置

源端实例配置

[dmdba@localhost bin]$ ./dminit path=/dmdata/data instance_name=KAFKA db_name=KAFKA page_size=16 extent_size=16 log_size=2048 port_num=5266 sysdba_pwd=DMwzy6870 sysauditor_pwd=DMwzy6870
initdb V8
db version: 0x7000d
license[/opt/dm6/bin/dm.key] version[1.00] error
file dm.key not found, use default license!
License will expire on 2026-04-30
Normal of FAST
Normal of DEFAULT
Normal of RECYCLE
Normal of KEEP
Normal of ROLL

 log file path: /dmdata/data/KAFKA/KAFKA01.log


 log file path: /dmdata/data/KAFKA/KAFKA02.log

write to dir [/dmdata/data/KAFKA].
create dm database success. 2025-10-11 11:15:10

注册服务

[root@localhost opt]# cd /home/dmdba/dmdbms/script/root/
[root@localhost root]# ./dm_service_installer.sh -t dmserver -dm_ini /dmdata/data/KAFKA/dm.ini -p KAFKA
Created symlink from /etc/systemd/system/multi-user.target.wants/DmServiceKAFKA.service to /usr/lib/systemd/system/DmServiceKAFKA.service.
创建服务(DmServiceKAFKA)完成

开启归档及逻辑日志

[dmdba@localhost ~]$ cd /dmdata/data/KAFKA/
[dmdba@localhost KAFKA]$ vi dm.ini
ARCH_INI       =1
RLOG_APPENG_LOGIC=1

[dmdba@localhost KAFKA]$ vi dmarch.ini
[ARCHIVE_LOCAL1]
ARCH_TYPE=LOCAL
ARCH_DEST=/dmdata/arch
ARCH_FILE SIZE=1024
ARCH_SPACE LIMIT=4096

启动数据库服务

image.png

创建表空间及测试用户

CREATE TABLESPACE KAFKA DATAFILE '/dmdata/data/KAFKA/KAFKA.DBF' SIZE 1024 AUTOEXTEND ON NEXT 2048;
CREATE USER KAFKA IDENTIFIED BY DMwzy6870 DEFAULT TABLESPACE KAFKA DEFAULT INDEX TABLESPACE KAFKA;
GRANT DBA TO KAFKA;

配置辅助表和DDL

./disql SYSDBA/DMwzy6870@localhost:5266
start /home/dmdba/dmdrs5/bin/scripts/ddl sql dm8.sql

检查表和触发器是否创建成功

select owner,trigger_name from dba_triggers where owner = 'SYSDBA' and trigger_name like 'DRS_$%' and status ='Y';

image.png

select owner,table_name from dba_tables where owner = 'SYSDBA' and table_name like 'DRS_$%' and status ='VALID';

image.png
如果存在上面的表和触发器,说明表和触发器创建成功

源端DRS配置

配置xml文件

[dmdba@localhost bin]$ vi cpt.xml
添加如下内容:
<?xml version="1.0" encoding="GB18030"?>
<drs>
    <base>
        <mgr_port>5347</mgr_port>
        <siteid>100</siteid>
    </base>
    <cpt>
        <name>cpt_dm8</name>
        <login>
            <dbtype>dm8</dbtype>
            <server>192.168.153.134</server>
            <user>SYSDBA</user>
            <pwd>DMwzy6870</pwd>
            <port>5266</port>
            <char_code>GB18030</char_code>
            <ddl_mask>OBJ:OP</ddl_mask>
        </login>
        <send>
            <ip>192.168.153.146</ip>
            <port>5347</port>
            <target_name>exec_kafka</target_name>
            <map>
                <item>KAFKA.*==KAFKA.*</item>
            </map>
        </send>
    </cpt>
</drs>

配置后台服务

[dmdba@localhost bin]$ cp service_template/TemplateService ./DrsServiceCPT
[dmdba@localhost bin]$ vi DrsServiceCPT 
修改如下内容:
#REPLACE INSTALL_HOME path
INSTALL_HOME=/home/dmdba/dmdrs5
#REPLACE program dir
PROG_DIR=/home/dmdba/dmdrs5/bin
#REPLACE program config path
#If drs server is BP node and wants to startup without drs.xml,please specify the running port in CONF_PATH,for example,CONF_PATH="-port 5345"
CONF_PATH=/home/dmdba/dmdrs5/bin/cpt.xml
#REPLACE need library path, LD_LIBRARY_PATH/LIBPATH
NEED_LIB_PATH=/home/dmdba/dmdbms/bin:/home/dmdba/dmdrs5/bin

#REPLACE program name, drsvr/dssvr/dvsvr
EXEC_PROG_NAME=drsvr
#REPLACE service type,  drs server/dss server/dvs server
SERVICE_TYPE_NAME=drs server


目的端配置

配置 json_format.ini 文件

[dmdba@localhost bin]$ vi json_format.ini
添加下面的内容:
#DMDRS kafka json format Configuration file

#this is comments

#common format control parameters

OP_TIME_FORMAT = (yyyy-mm-dd hh:mi:ss)

CUR_TIME_FORMAT = (yyyy-mm-dd hh:mi:ss)

NULL_FORMAT = "null"

SET_QUOTA = 0

SET_ROWID_COL = 0

NEED_CRLF = 1

CHAR_REPLACE =(",\"),(\,\\)

OLD_VALUES = ALL
NEW_VALUES = ALL
OLD_LOB_FLAG = "empty_lob()"
JSON_FORMAT_INS={ 
    "table":"#SCHEMA.#TABLE",
    "op_type":"#OP_TYPE",
    "op_ts":"#OP_TIME",
    "current_ts":"#TIME",
    "pos":"#POS",
    "primary_keys":[#PRIMARY_KEY],
    "after":{#NEW_VALUES}
} 
JSON_FORMAT_UPD={ 
    "table":"#SCHEMA.#TABLE",
    "op_type":"#OP_TYPE",
    "op_ts":"#OP_TIME",
    "current_ts":"#TIME",
    "pos":"#POS",
    "primary_keys":[#PRIMARY_KEY],
    "before":{#OLD_VALUE},
    "after":{#NEW_VALUE}
} 
JSON_FORMAT_DEL={ 
    "table":"#SCHEMA.#TABLE",
    "op_type":"#OP_TYPE",
    "op_ts":"#OP_TIME",
    "current_ts":"#TIME",
    "pos":"#POS",
    "primary_keys":[#PRIMARY_KEY],
    "before":{#OLD_VALUE}
}
JSON_FORMAT_DDL={
    "table":"#SCHEMA.#TABLE",
    "op_type":"#OP_TYPE",
    "op_ts":"#OP_TIME",
    "current_ts":"#TIME",
    "pos":"#POS",
    "after":
    ("DB_TYPE":"ORA",
    "OBJECT_TYPE":"#OP_TYPE",
    "OBJECT_NAME":"#SCHEMA.#TABLE",
    "DDL_TEXT":"#DDL_SQL")
}

配置producer.properties文件

[dmdba@localhost bin]$ vi producer.properties
添加下面的内容:
bootstrap.servers=192.168.153.146:9092

配置xml文件

[dmdba@localhost bin]$ vi exec.xml
添加下面的内容:
<?xml version="1.0" encoding="GB18030"?>
<drs>
    <base>
        <mgr_port>5347</mgr_port>     
        <siteid>2</siteid>
    </base>
    <exec>
        <name>exec_kafka</name>
        <login>
            <dbtype>kafka</dbtype>
            <producer_properties>/home/dmdba/dmdrs5/bin/producer.properties</producer_properties>
        </login>
            <group>
                <item>
                   <id>0</id>
                   <topic_name>test</topic_name>
                   <json_format_path>/home/dmdba/dmdrs5/bin/json_format.ini</json_format_path>
                </item>
            </group>
    </exec>
</drs>

配置后台服务

[dmdba@localhost bin]$ vi DrsServiceEXEC

#REPLACE INSTALL_HOME path
INSTALL_HOME=/home/dmdba/dmdrs5
#REPLACE program dir
PROG_DIR=/home/dmdba/dmdrs5/bin
#REPLACE program config path
#If drs server is BP node and wants to startup without drs.xml,please specify the running port in CONF_PATH,for example,CONF_PATH="-port 5345"
CONF_PATH=/home/dmdba/dmdrs5/bin/exec.xml
#REPLACE need library path, LD_LIBRARY_PATH/LIBPATH
NEED_LIB_PATH=/home/dmdba/dmdbms/bin:/home/dmdba/dmdrs5/bin

#REPLACE program name, drsvr/dssvr/dvsvr
EXEC_PROG_NAME=drsvr
#REPLACE service type,  drs server/dss server/dvs server
SERVICE_TYPE_NAME=drs server

启动服务,观察同步情况

源端启动服务

[dmdba@localhost bin]$ ./DrsServiceCPT start
./DrsServiceCPT: line 43: server: command not found
Starting DrsServiceCPT:                                    [ OK ]

目的端启动服务

[root@localhost bin]# ./DrsServiceEXEC start
./DrsServiceEXEC: line 43: server: command not found
Starting DrsServiceEXEC: Last login: Sat Oct 11 11:00:46 CST 2025 on pts/0
                                                           [ OK ]

源端创建测试表

CREATE TABLE TEST(ID INT ,NAME VARCHAR(50));
INSERT INTO TEST VALUES(1,'WANG');
INSERT INTO TEST VALUES(2,'ZHANG');
INSERT INTO TEST VALUES(3,'LI');
INSERT INTO TEST VALUES(4,'LIU');

源端启动同步

[dmdba@localhost bin]$ ./drcsl cpt.xml
CSL[INFO]:  CONSOLE TOOL DRS5: V5.2.3.3-Build(2025.06.17-191718_trunc_sp1)_64_2506
CSL[WARN]:  mem_size参数配置过大,超过系统剩余内存大小 mem_size: 16G, total_free_size: 1G
CSL> CONNECT
CSL[INFO]:  [INPUT CMD: CONNECT]
CSL> alter cpt_dm8 set lsn
CSL[INFO]:  [INPUT CMD: alter cpt_dm8 set lsn]
MGR[INFO]:  成功获取模块的起始SCN cpt name: cpt_dm8, LSN: 49933
命令执行成功 
CSL> start
CSL[INFO]:  [INPUT CMD: start]
MGR[INFO]:  模块正在启动 module: cpt_dm8 
MGR[INFO]:  CPT准备就绪 type: dm8 cpt
MGR[INFO]:  模块已经处于运行状态 module: cpt_dm8 
命令执行成功 
CSL> alter cpt_dm8 add table "sch.name='KAFKA'"
CSL[INFO]:  [INPUT CMD: alter cpt_dm8 add table "sch.name='KAFKA'"]
MGR[INFO]:  获取到装载掩码组合 mask: |CREATE|INSERT|INDEX|TABLE|CHECK|GROUP
命令执行成功 

查看目的端KAFKA结果

image.png
image.png
目的端KAFKA四条数据已经被加载成功

源端插入数据,验证目的端KAFKA情况

INSERT INTO TEST VALUES(5,'ZHAO');

image.png
目标端KAFKA更新成功
至此,DMDRS到KAFKA的单向同步搭建成功

评论
后发表回复

作者

文章

阅读量

获赞

扫一扫
联系客服