注册
DMHS同步DM8至kafka
技术分享/ 文章详情 /

DMHS同步DM8至kafka

XGQ 2024/01/31 1424 0 0

1.部署规划
image.png
2.源端配置
2.1 安装DMHS
安装步骤省略
2.2 环境变量配置

export PATH
export DMHS_HOME=/home/dmdba/dmhs
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/dmdba/dmhs/bin
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/home/dmdba/dmdbms/bin"
export DM_HOME="/home/dmdba/dmdbms"
export PATH=$PATH:$DM_HOME/bin:$DMHS_HOME/bin

2.3开启归档与逻辑日志

ALTER DATABASE ARCHIVELOG;
ALTER DATABASE ADD ARCHIVELOG 'DEST=/home/dmdba/dmdbms/data/DAMENG/arch, TYPE=LOCAL, FILE_SIZE=1024, SPACE_LIMIT=51200';
ALTER DATABASE OPEN;

SQL> select value from v$parameter where name = 'RLOG_APPEND_LOGIC';
SQL> sp_set_para_value(1,'RLOG_APPEND_LOGIC',1);

2.4配置DMHS

<dmhs>
        <base>
                <lang>ch</lang>
                <mgr_port>5345</mgr_port>
                <ckpt_interval>60</ckpt_interval>
                <siteid>1</siteid>
                <version>2.0</version>
        </base>
        <cpt>
                <db_type>dm8</db_type>
                <db_server>192.168.18.55</db_server>
                <db_port>5236</db_port>
                <db_user>SYSDBA</db_user>
                <db_pwd>SYSDBA</db_pwd>
                <ddl_mask>0</ddl_mask>
                <char_code>PG_UTF8</char_code>
                <arch>
                        <clear_interval>600</clear_interval>
                        <clear_flag>0</clear_flag>
                </arch>
                <send>
                        <ip>192.168.18.56</ip>
                        <mgr_port>5445</mgr_port>
                        <data_port>5446</data_port>
                        <trigger>0</trigger>
                        <constraint>0</constraint>
                        <identity>1</identity>
                        <filter>
                                <enable>
                                  <item>XGQ.*</item>
                                </enable>
                        </filter>
                        <map>
                        </map>
                </send>
        </cpt>
</dmhs>

2.5 检查依赖

	linux-vdso.so.1 =>  (0x00007fff57168000)
	libc.so.6 => /lib64/libc.so.6 (0x00007f186fcb2000)
	libm.so.6 => /lib64/libm.so.6 (0x00007f186f9af000)
	librt.so.1 => /lib64/librt.so.1 (0x00007f186f7a7000)
	libpthread.so.0 => /lib64/libpthread.so.0 (0x00007f186f58b000)
	libdl.so.2 => /lib64/libdl.so.2 (0x00007f186f386000)
	libdmhs_pub.so => ./libdmhs_pub.so (0x00007f186ece2000)
	libdmhs_net.so => ./libdmhs_net.so (0x00007f186ea7c000)
	libdmhs_ld_dm8.so => ./libdmhs_ld_dm8.so (0x00007f186e7f8000)
	libdmhs_ucvt.so => ./libdmhs_ucvt.so (0x00007f186e29b000)
	libdmoci.so => ./libdmoci.so (0x00007f186d52f000)
	/lib64/ld-linux-x86-64.so.2 (0x00005591c4e9d000)
	libdmhs_bool_parse.so => ./libdmhs_bool_parse.so (0x00007f186d31e000)
	libstdc++.so.6 => /lib64/libstdc++.so.6 (0x00007f186d016000)
	libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x00007f186cdff000)

2.6启动DMHS

MGR[WARN]: License will expire on 2023-12-16
MGR[INFO]: 成功加载配置文件,站点号:1, 管理端口:5345, 轮询间隔:3, 最大内存设置:64(GB)
MGR[INFO]: 正在初始化分析模块...
PUB[INFO]: set enable_directio = 0
MGR[INFO]: 正在加载DM8日志分析模块...
MGR[INFO]: 管理 服务正在监听管理端口:5345
CPT[INFO]: DM8_V4.3.20_D64
CPT[INFO]: 发送队列长度(send_lst)为:3
CPT[INFO]: DM8是否为UNICODE库: TRUE
CPT[INFO]: DM8 parameter LENGTH_IN_CHAR: 0
CPT[INFO]: DM8 parameter 'DIRECT_IO': 0
CPT[INFO]: DDL同步方式:DMHS辅助表和触发器方式同步DDL
CPT[INFO]: DM8 log format is RLOG_PKG, arch: /home/dmdba/dmdbms/data/DAMENG/arch/ARCHIVE_LOCAL1_0x706FD896_EP0_2023-10-09_02-03-58.log, db_magic: 1886378134 - 1886378134
CPT[INFO]: DM8归档目录: /home/dmdba/dmdbms/data/DAMENG/arch 归档文件大小:1024 M
CPT[INFO]: dm8 log version flag is : 2, old: 0
CPT[INFO]: [0]获取归档文件数:2, START_ARCH_LSN: 0, START_FILE: null, VER: 1

3.目的端配置
3.1安装DMHS与kafka
安装过程略
3.2 配置环境变量

export DMHS_HOME=/home/dmdba/dmhs
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/dmdba/dmhs/bin

export JAVA_HOME=/home/jdk1.8.0_341
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$DMHS_HOME/bin 


3.3 配置DMSH


```<?xml version="1.0" encoding="GB2312"?>
<dmhs>
    <base>
        <lang>en</lang>
        <mgr_port>5445</mgr_port>
        <ckpt_interval>60</ckpt_interval>
        <siteid>2</siteid>
        <version>2.0</version>
    </base>
    <exec>
        <char_code>PG_UTF8</char_code>
        <recv>
            <data_port>5446</data_port>
        </recv>
        <exec_thr>4</exec_thr>
        <exec_sql>1024</exec_sql>
        <exec_trx>5000</exec_trx>
        <exec_rows>1000</exec_rows>
        <is_kafka>1</is_kafka>
        <json_format>file</json_format>
        <max_packet_size>16</max_packet_size>
        <enable_ddl>1</enable_ddl>
        <exec_policy>2</exec_policy>
    </exec>
</dmhs>

3.4 dmhs_kafka.properties

bootstrap.servers=192.168.18.56:9092
kafka.topic.name=DMKAFKA
json.format.check=1
print.message.num=1000
dmhs.min.batch.size=100
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
partitioner.class=com.dameng.dmhs.dmga.service.impl.OnePartitioner
acks=-1
max.request.size=5024000
batch.size=1048576
linger.ms=3
buffer.memory=134217728
retries=3
compression.type=none
max.in.flight.requests.per.connection=1
send.buffer.bytes=1048576
metadata.max.age.ms=300000

3.4配置 start_kafka_dmhs.sh

in/dmhs_kafka.properties

3.5json_format.ini

        BATCH_COMMIT                    = 0
        NEED_CRLF                       = 0
        OPCMD_LEN                       = 8
        SET_NULL                        = 1
        SET_QUOTA                       = 0
        CHAR_REPLACE                    = (",\"),(\,\\)
        ADD_TABLE_TOPIC                 = 0
        NEW_VALUES                      = ALL
        OLD_VALUES                      = ALL
        SET_TIME_EPOCH                  = 0
        SET_JSON_VERION                 = 2.0
        OLD_LOB_FLAG                    = --
        LOB_PIECE                       = 0
        CLOB_FORMAT                     = CHAR
        BLOB_FORMAT                     = BASE64
        JSON_FORMAT                     = {
        "version":"2.0",
        "schema":{
                "column":#CONCAT[#NAME_TYPE]#CUT_NULL,
                "pk":#CONCAT[#PRIMARY_KEY]#CUT_NULL,
                "source":{
                        "dbType":"DM8",
                        "schema":"#SCHEMA",
                        "table":"#TABLE"
                        }
                },
        "payload":{
                "before":#CONCAT
                {
                        "data":{#OLD_VALUES}
                }#CUT_NULL,
                "after":#CONCAT
                {
                        "data":{#NEW_VALUES}
                }#CUT_NULL,
                "scn":"#ROWID",
                "timestamp":{
                                "eventTime":"#OP_TIME"
                        },
                "op":"#OP_TYPE",
                "ddl":#CONCAT{
                        "text":"#DDL_SQL"
                }#CUT_NULL
        }
}

3.6准备Kafka依赖文件


3.7启动DMHS服务

[INFO ] 2023-10-10 18:32:53,067 method:com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService.loadLog4j(ExecDMHSKafkaService.java:118)
加载log4j配置成功!
MGR[INFO]: DMHS start up, current version: V4.3.20-Build(2023.09.16-140201trunc)_64_2309 (The beta)(Enterprise Edition)
MGR[WARN]: License will expire on 2023-12-16
MGR[INFO]: load config file successful,site no:2, manager port :5445, poll interval:3, max mem size:64(GB)
PUB[WARN]: NLS_LANG user env cannot find, use default UTF default values:AMERICAN_AMERICA.AL32UTF8
MGR[INFO]: loading the execute module...
EXE[INFO]: EXEC_V4.3.20_64
REV[INFO]: exec server data receiving thread created successfully, listening data port : 5446
MGR[INFO]: manager listening  port:5445
MGR[INFO]: Site 1 registration for 192.168.18.55 host node, unique_sign: 1696842116539
MGR[INFO]: extract min LSN of site 1...
MGR[INFO]: site 1 min LSN is: 40392

4.数据同步

(
"id" BIGINT NOT NULL,
"instance_code" VARCHAR(160),
"config_id" BIGINT NOT NULL,
"content" CLOB,
"creator" VARCHAR(2040),
"gmt_created" TIMESTAMP(6),
"level" INT DEFAULT 1) STORAGE(ON "MAIN", CLUSTERBTR) ;

DMHS> start exec
execute success

DMHS>  copy 0 "sch.name='XGQ'" dict|create|insert
copy mask is : |CREATE|INSERT|TABLE|DICT|PARTITION|OBJID|REP
执行完成,请查看执行模块日志,检查数据装载是否成功

MGR[INFO]: 发送命令到站点192.168.18.56:5445成功
PUB[WARN]: 使用字符集:PG_UTF8
CPT[INFO]: 加载离线字典表数为:1
CPT[INFO]: DDL同步方式:DMHS辅助表和触发器方式同步DDL
CPT[INFO]: 正在初始化表信息...
CPT[INFO]: 装载用户表的字典个数:2
CPT[INFO]: 正在初始化列信息...
CPT[INFO]: 正在初始化主键列信息...
CPT[INFO]: 正在初始化分区表信息...
CPT[INFO]: 正在保存字典信息...
LD[INFO]: 装载DM8的实例名: DMSERVER
LD[INFO]: DM8 parameter LENGTH_IN_CHAR: 0
LD[INFO]: 正在装载表 XGQ.ag_instance_configuration_snapshot(1:2), 装载类型:ALL
LD[INFO]: 表 XGQ.ag_instance_configuration_snapshot 生成查询语句:select ROWID,"id","instance_code","config_id","content","creator","gmt_created","level" FROM "XGQ"."ag_instance_configuration_snapshot"
LD[INFO]: INSERT:.XGQ.ag_instance_configuration_snapshot
LD[INFO]: lock table:lock table "XGQ"."ag_instance_configuration_snapshot"  in share mode nowait
LD[INFO]: 设置表.XGQ.ag_instance_configuration_snapshot的起始LSN为: 0
LD[WARN]: 设置表.XGQ.ag_instance_configuration_snapshot的起始LSN为: 40996
LD[INFO]: 表 [XGQ.ag_instance_configuration_snapshot], 完成装载:[起始lsn: 40996, 共 0 行, 用时: 23.781(ms)]
LD[INFO]: 表 XGQ.ag_instance_configuration_snapshot 完成装载,共 0 行
LD[INFO]: 正在装载表 XGQ.dm_kafka(2:2), 装载类型:ALL
LD[INFO]: 表 XGQ.dm_kafka 生成查询语句:select ROWID,"id","instance_code","config_id","content","creator","gmt_created","level" FROM "XGQ"."dm_kafka"
LD[INFO]: INSERT:.XGQ.dm_kafka
LD[INFO]: lock table:lock table "XGQ"."dm_kafka"  in share mode nowait
LD[INFO]: 设置表.XGQ.dm_kafka的起始LSN为: 0
LD[WARN]: 设置表.XGQ.dm_kafka的起始LSN为: 41005
LD[INFO]: 表 [XGQ.dm_kafka], 完成装载:[起始lsn: 41005, 共 0 行, 用时: 7.328(ms)]
LD[INFO]: 表 XGQ.dm_kafka 完成装载,共 0 行
SND[INFO]: 分析模块1正在发送映射规则...
SND[INFO]: 分析模块1正在获取站点192.168.18.56:5446上的最小LSN...
SND[INFO]: 分析模块1成功获取LSN:40785 LFS:0
LD[INFO]: 成功装载,总共有 2 张表被装载!
SND[INFO]: 日志压缩线程退出,ID: 1
SND[INFO]: 分析模块正在确认执行端已经执行完所有提交了的事务...
SND[INFO]: 192.168.18.56:5446到站点1的发送线程已经退出
启动cpt模块 并插入数据
insert into xgq.dm_kafka values(xxxx)


目的端查看数据

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic DMKAFKA --from-beginning
评论
后发表回复

作者

文章

阅读量

获赞

扫一扫
联系客服