注册
dm-kafka同步
专栏/技术分享/ 文章详情 /

dm-kafka同步

打工人小赵 2025/10/11 59 0 0
摘要

1.1源端配置
1.1.1源端安装DRS
[dmdba@dw02 soft]$ ./dmdrs_rev200238_x86_rh6_64_20250922.bin -i
1.1.2开启归档及逻辑日志
##归档
主备及目标端均已开启归档,路径为/dbarch/dmarch

##开启逻辑附加日志
call sp_set_para_value(1,‘RLOG_APPEND_LOGIC’,1);
根据实际需求配置RLOG_APPEND_LOGIC的参数值,参数配置说明如下所示,以参数值设置为1为例。
1:如果有主键列,记录UPDATE和DELETE操作时只包含主键列信息,若没有主键列则包含所有列信息。
2:不论是否有主键列,记录UPDATE和DELETE操作时都包含所有列的信息。
3:记录UPDATE时包含更新列的信息以及ROWID,记录DELETE时只有ROWID。

#检查参数配置是否生效
select para_name, para_value, sess_value, file_value from V$DM_INI where para_name in ( ‘RLOG_APPEND_LOGIC’,‘ARCH_INI’);

1.1.3创建数据库同步用户DMDRS
CREATE TABLESPACE DMDRS DATAFILE ‘DMDRS.DBF’ SIZE 2048 AUTOEXTEND ON MAXSIZE 102400;
CREATE USER “DMDRS” IDENTIFIED BY “xxxx” DEFAULT TABLESPACE “DMDRS”;
GRANT “DBA” TO “DMDRS”;

1.1.4 源端目标端环境变量配置
##添加DRS及DPI
vi ~/.bash_profile
#添加

export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/home/dmdba/dm/dmdbms/bin:/home/dmdba/dm/dmdbms/drivers/dpi:/home/dmdba/dmdrs5/bin"
export DM_HOME="/home/dmdba/dm/dmdbms"
Export PATH=$PATH:$HOME/.local/bin:$HOME/bin:$DM_HOME/bin:/home/dmdba/dm/dmdbms/drivers/dpi:/home/dmdba/dmdrs5/bin

source ~/.bash_profile

1.1.5 配置DDL同步
在源数据库执行“ddl_sql_dm8.sql”脚本创建触发器和辅助表,脚本默认位置在DMDRS执行程序目录下的scripts子目录中。

SQL> set define off;
SQL> start ddl_sql_dm8.sql
检查创建的触发器和辅助表是否有效。

检查创建的触发器。

SQL> select owner, trigger_name from dba_triggers where owner = ‘SYSDBA’ and trigger_name like ‘DRS_$%’ and status = ‘Y’;
查询结果如下:

行号 OWNER TRIGGER_NAME


1 SYSDBA DRS_DDL_TRIGGER_AFTER 2 SYSDBA DRS_DDL_TRIGGER_BEFORE
3 SYSDBA DRS_DDL_TRIGGER_GRANT 4 SYSDBA DRS_DDL_TRIGGER_REVOKE
已用时间: 12.147(毫秒). 执行号:16.
如果存在以上查询结果,表示DMDRS触发器创建有效。

检查创建的辅助表。

SQL> select owner, table_name from dba_tables where owner = ‘SYSDBA’ and table_name like ‘DRS_$%’ and status = ‘VALID’;
查询结果如下:

行号 OWNER TABLE_NAME


1 SYSDBA DRS_DDL_COL 2 SYSDBA DRS_DDL_SQL
3 SYSDBA DRS_DDL_CONS 4 SYSDBA DRS_DDL_IDX
5 SYSDBA DRS_DDL_RENAME 6 SYSDBA DRS_DDL_SEQ
7 SYSDBA DRS_DDL_PART 8 SYSDBA DRS_DDL_COMMENT
9 SYSDBA DRS_$DDL_LOG
9 rows got
已用时间: 197.302(毫秒). 执行号:7.
如果存在以上查询结果,表示DMDRS辅助表创建成功。
1.1.6 配置dm_svc.conf
TIME_ZONE=(+8:00)
LANGUAGE=(cn)
DMDW=(192.168.182.135:5236,192.168.182.136:5236) #根据实际业务替换IP

[DMDW]
SWITCH_TIMES=(3)
SWITCH_INTERVAL=(100)
LOGIN_MODE=(1)

说明:
当源端数据库为DW主备集群,若源端CPT模块数据源只设置主库IP,当主备因故障发生切换时,源端cpt模块将无法访问数据库,配置服务名可避此问题。
当源端为单机时无需配置

1.1.7配置源端DRS配置文件
[dmdba@dw02 soft]$ cd /home/dmdba/dmdrs5/bin
[dmdba@dw02 bin]$ ls
1.1.8配置源端启动脚本
[dmdba@dw02 bin]$ cd /home/dmdba/dmdrs5/bin
[dmdba@dw02 bin]$ cp service_template/TemplateService ./DrsServiceCPT
修改DrsServiceCPT服务脚本相关配置
INSTALL_HOME=/home/dmdba/dmdrs5
PROG_DIR=/home/dmdba/dmdrs5/bin
CONF_PATH=/home/dmdba/dmdrs5/bin/cpt.xml
NEED_LIB_PATH=/cs/dmdbms/bin:/home/dmdba/dmdrs5/bin
EXEC_PROG_NAME=drsvr
SERVICE_TYPE_NAME=drs server
1.2目的端配置
1.2.1 安装JDK
Kafka需要java运行环境,需要安装JDK。
上传JDK安装包,并安装:
[root@dm3 ]# tar -zxvf jdk-8u60-linux-x64.tar.gz
[root@dm3]# mv jdk1.8.0_60/ /usr/local/src/java
修改环境变量:
[root@dm3 ~]# vim /etc/profile
添加:
export JAVA_HOME=/usr/local/src/java/
export PATH=JAVAHOME/bin:JAVA_HOME/bin:PATH
[root@dm3 ~]# source /etc/profile
查看java环境:
[root@dm3 ~]# java -version
1.2.2安装kafka
1、上传kafka安装包到home目录,并解压:
如需使用其他版本kafka,可通过http://kafka.apache.org/downloads下载
[root@dm3 data]#tar -zxvf kafka_2.12-3.1.2.tgz
2、修改server.properties文件
[root@dm3 data]# cd kafka_2.12-3.1.2/config/
[root@dm3 data]# ls

创建日志存放路径:
[root@dm3 data]# cd /data/kafka_2.12-3.1.2
[root@dm3 kafka_2.12-3.1.2]# mkdir -p logs/server
[root@dm3 kafka_2.12-3.1.2]# vim config/server.properties
修改如下内容:
broker.id=0
port=9092 #端口号
host.name=192.168.182.137 #单机可直接用localhost
log.dirs=/data/kafka_2.12-3.1.2/logs/server #日志存放路径可修改可不修改
zookeeper.connect=192.168.182.137:2181 #zookeeper地址和端口,单机配置部署,localhost:2181
#因为本次使用的kafka版本中自带zookeeper,使用默认配置即可
3、修改zookeeper.properties文件
创建日志存放路径:
[root@dm3 kafka_2.12-3.1.2]# mkdir -p logs/zookeeper
[root@dm3 kafka_2.12-3.1.2]# vim config/zookeeper.properties
修改如下内容:
dataDir=/data/kafka_2.12-3.1.2/zookeeper数据目录 (可以修改可以不修改)
dataLogDir=/data/kafka_2.12-3.1.2/logs/zookeeper #zookeeper日志目录 (可以修改可以不修改)
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
4、启动zookeeper和kafka
[root@dm3 kafka_2.12-3.1.2]# cd /data/kafka_2.12-3.1.2/bin/
[root@dm3 bin]# vim kafkaStart.sh
添加:
#!/bin/bash
#启动zookeeper
nohup /data/kafka_2.12-3.1.2/bin/zookeeper-server-start.sh /data/kafka_2.12-3.1.2/config/zookeeper.properties 1>& /data/kafka_2.12-3.1.2/logs/zookeeper/zookeeper.log &
sleep 3 #默默等3秒后执行
#启动kafka
nohup /data/kafka_2.12-3.1.2/bin/kafka-server-start.sh /data/kafka_2.12-3.1.2/config/server.properties 1>& /data/kafka_2.12-3.1.2/logs/server/kafka.log &
授予脚本执行权限:
[root@dm3 bin]# chmod 755 kafkaStart.sh
执行启动脚本:
[root@dm3 bin]# ./kafkaStart.sh
5、创建topic
[root@dm3 bin]# ./kafka-topics.sh --bootstrap-server 192.168.182.137:9092 --create --replication-factor 1 --partitions 1 --topic test
创建成功后查询top主题
[root@dm3 bin]# ./kafka-topics.sh --list --bootstrap-server 192.168.182.137:9092

6、测试生成者和消费者
[root@dm3 bin]# ./kafka-console-producer.sh --broker-list 192.168.182.137:9092 --topic test
输入 一些信息,然后启动消费者查询是否可查看到生产者发送的信息

[root@dm3bin]#./kafka-console-consumer.sh --bootstrap-server 192.168.182.137:9092 --topic test --from-beginning

1.2.3目的端安装DRS
[dmdba@dm3 soft]$ ./dmdrs_rev200238_x86_rh6_64_20250922.bin -i
1.2.4目的端DRS配置json_format.ini文件
[dmdba@dm3 soft]$ cd /home/dmdba/dmdrs5/bin
[dmdba@dm3 bin]$ vim json_format.ini
#DMDRS kafka json format Configuration file
#this is comments
#common format control parameters
OP_TIME_FORMAT = (yyyy-mm-dd hh:mi:ss)
CUR_TIME_FORMAT = (yyyy-mm-dd hh:mi:ss)
NULL_FORMAT = “null”
SET_QUOTA = 0
SET_ROWID_COL = 0
NEED_CRLF = 1
CHAR_REPLACE =(","),(,\)
OLD_VALUES = ALL
NEW_VALUES = ALL
OLD_LOB_FLAG = “empty_lob()”
JSON_FORMAT_INS = {
“table”:"#SCHEMA.#TABLE",
“op_type”:"#OP_TYPE",
“op_ts”:"#OP_TIME",
“current_ts”:"#TIME",
“pos”:"#POS",
“primary_keys”:[#PRIMARY_KEY],
“after”:{#NEW_VALUES}
}
JSON_FORMAT_UPD = {
“table”:"#SCHEMA.#TABLE",
“op_type”:"#OP_TYPE",
“op_ts”:"#OP_TIME",
“current_ts”:"#TIME",
“pos”:"#POS",
“primary_keys”:[#PRIMARY_KEY],
“before”:{#OLD_VALUES},
“after”:{#NEW_VALUES}
}
JSON_FORMAT_DEL = {
“table”:"#SCHEMA.#TABLE",
“op_type”:"#OP_TYPE",
“op_ts”:"#OP_TIME",
“current_ts”:"#TIME",
“pos”:"#POS",
“primary_keys”:[#PRIMARY_KEY],
“before”:{#OLD_VALUES}
}
JSON_FORMAT_DDL = {
“table”:"#SCHEMA.#TABLE",
“op_type”:"#OP_TYPE",
“op_ts”:"#OP_TIME",
“current_ts”:"#TIME",
“pos”:"#POS",
“after”:
{“DB_TYPE”:“ORA”,
“OBJECT_TYPE”:"#OP_TYPE",
“OBJECT_NAME”:"#SCHEMA.#TABLE",
“DDL_TEXT”:"#DDL_SQL"}
}
1.2.5目的端DRS创建并修改producer.properties配置文件
[dmdba@dm3 bin]$ cd /home/dmdba/dmdrs5/bin
[dmdba@dm3 bin]$ vim producer.properties
bootstrap.servers=192.168.182.137:9092
当使用schema registry服务时,还需在producer.properties配置文件中配置schema.registry.url参数。
schema.registry.url=http://192.168.182.137:8089
1.2.6配置目的端DRS配置文件
[dmdba@dm3 bin]$ cd /home/dmdba/dmdrs5/bin
[dmdba@dm3 bin]$ vim exec.xml
<?xml version=“1.0” encoding=“GB18030”?>
<drs>
<base>
<mgr_port>5345</mgr_port>
<siteid>2</siteid>
<mem_size>1</mem_size>
</base>
<exec>
<name>exec_kafka</name>
<login>
<dbtype>kafka</dbtype>
<producer_properties>/home/dmdba/dmdrs5/bin/producer.properties</producer_properties>
</login>
<group>
<item>
<id>0</id>
<topic_name>OTHERS</topic_name>
<json_format_path>/home/dmdba/dmdrs5/bin/json_format.ini</json_format_path>
</item>
<item>
<id>1</id>
<topic_name>test</topic_name>
<json_format_path>/home/dmdba/dmdrs5/bin/json_format.ini</json_format_path>
</item>
</group>
</exec>
</drs>
1.2.7配置目的端启动脚本
[dmdba@dm3 bin]$ cd /home/dmdba/dmdrs5/bin
[dmdba@dm3 bin]$ cp service_template/TemplateService ./DrsServiceEXEC
修改DrsServiceEXEC服务脚本相关配置
INSTALL_HOME=/home/dmdba/dmdrs5
PROG_DIR=/home/dmdba/dmdrs5/bin
CONF_PATH=/home/dmdba/dmdrs5/bin/exec.xml
NEED_LIB_PATH=/home/dmdba/dmdbms/bin:/home/dmdba/dmdrs5/bin
EXEC_PROG_NAME=drsvr
SERVICE_TYPE_NAME=drs server

1.3前台方式启动源端和目的端服务,进行同步测试
1.3.1前台方式启动目的端服务
[dmdba@dm3 bin]$ cd /home/dmdba/dmdrs5/bin
[dmdba@dm3 bin]$ ./drsvr exec.xml

1.3.2前台方式启动源端服务
[dmdba@dw02 bin]cd/home/dmdba/dmdrs5/bin[dmdba@dw02bin]cd /home/dmdba/dmdrs5/bin [dmdba@dw02 bin] ./drsvr cpt_dm8.xml

1.3.3开启源端同步
[dmdba@dw02 bin]$ cd /home/dmdba/dmdrs5/bin
[dmdba@dw02 bin]$ ./drcsl cpt.xml
#连接DRS服务
CSL> connect
#设置数据库当前最新的日志LSN作为日志解析的起始位置
CSL> alter cpt_dm8 set lsn
#启动源DMDRS服务中的CPT模块
CSL> start
#将源数据库中DRS模式下所有的表添加同步,并在同步之前把源端表的全量数据装载到目标数据库。
CSL> alter cpt_dm8 add table “sch.name=‘DMDRS’”
1.3.4查看源端和目的端日志
1、源端日志
image.png

2、目的端日志
image.png
此时可通过日志看出同步数据正常

1.3.5到kafka端验证输出结果
此时消费者输出同步的表和数据,同步正常,无问题。
image.png

评论
后发表回复

作者

文章

阅读量

获赞

扫一扫
联系客服