注册
DMHS同步DM8到kafka
专栏/技术分享/ 文章详情 /

DMHS同步DM8到kafka

zero 2023/11/13 1960 0 0
摘要

一、环境信息

dmhs版本:dmhs_V4.3.14
数据库版本:DM8
image.png

1.1 数据库配置

安装略
开启归档

alter database mount;
alter database archivelog;
alter database add archivelog 'DEST=/data/dmdata/dmarch, TYPE=LOCAL, FILE_SIZE=512, SPACE_LIMIT=102400';
alter database open;

开启归档逻辑日志
用于增量捕获数据同步,建议开启2
image.png

sp_set_para_value(1,'RLOG_APPEND_LOGIC',2)

配置DDL同步
导入脚本,在dmhs安装后目录下

disql SYSDBA/SYSDBA -C "set define off" \`/home/dmdba/dmhs/scripts/ddl_sql_dm8.sql

二、安装dmhs软件

源端

./dmhs_V4.3.08_dm8_rev127399_FTarm_kylin4_64_veri_20230407_sp8.bin  -i

目的端
可直接copy源端dmhs目录

三、同步配置

同步配置需要配置如下配置文件,这里自己编写配置文件方式
源端:dmhs.hs 源端同步信息配置
目的端 dmhs.hs 目的端同步信息配置
dmhs_kafka.properties 连接kafka配置信息填写,
更多配置信息,可查看管理员手册和参考手册

3.1源端dmhs.hs配置

dmhs/bin目录下

<?xml version="1.0" encoding="GB2312" standalone="no"?>
<dmhs>
<base>
        <lang>ch</lang>
        <mgr_port>5345</mgr_port>
        <ckpt_interval>60</ckpt_interval>
        <siteid>1</siteid>
<version>2.0</version>
</base>
<cpt>
        <db_type>DM8</db_type>
        <db_server>xx.45</db_server>
        <db_user>SYSDBA</db_user>
        <db_pwd>SYSDBA</db_pwd>
        <db_port>5236</db_port>
        <idle_time>10</idle_time>
        <parse_thr>1</parse_thr>
        <ddl_mask>OP:OBJ:REC</ddl_mask>
        <char_code>PG_UTF8</char_code>
        <arch>
                <clear_interval>60</clear_interval>
                <clear_flag>0</clear_flag>
        </arch>
        <send>
                <ip>149.0.178.35</ip>
                <mgr_port>5345</mgr_port>
                <data_port>5346</data_port>
                <net_turns>0</net_turns>
                <trigger>1</trigger>
                <constraint>1</constraint>
                <identity>1</identity>
                <filter>
                        <enable>
                                <!--<item>schema.tabname</item>-->
                                <item>test.test1</item>
                                <item>test.test2</item>
                        </enable>
                        <disable/>
                </filter>
                <map>
                </map>
        </send>
</cpt>
</dmhs>

3.2 目的端配置文件

dmhs/bin目录下
dmhs.hs

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<dmhs>  
  <base>
        <lang>ch</lang>
        <mgr_port>5345</mgr_port>
        <chk_interval>3</chk_interval>
        <ckpt_interval>60</ckpt_interval>
        <siteid>2</siteid>
        <version>2.0</version>
    </base>
    <exec>
        <recv>
            <data_port>5346</data_port>
        </recv>
        <db_name></db_name>
        <exec_thr>1</exec_thr>
        <case_sensitive>0</case_sensitive>
        <exec_policy>0</exec_policy>
        <toggle_case>0</toggle_case>
        <ddl_mode>1</ddl_mode>
        <enable_ddl>1</enable_ddl>
        <commit_policy>1</commit_policy>
        <enable_merge>0</enable_merge>
        <enable_ddl>1</enable_ddl>
        <is_kafka>1</is_kafka>
        <save_dir>/home/dmdba/dmhs4314/log</save_dir>
        <save_mask>json</save_mask>
        <json_format>file</json_format>
        <save_max_size>500</save_max_size>
    </exec>
</dmhs>

Kafka配置文件dmhs_kafka.properties

发送数据到kafka需要配置kafka IP:PORT 以及主题名称,对于主题有两种方式配置
一种是所有表数据发送到一个topic,另一种是每个表发送到单独的topic中去,若kafka没有topic同步数据时会自动创建
dmhs.conf.path=/home/dmdba/dmhs4314/bin/dmhs.hs
# kafka broker list,such as ip1:port1,ip2:port2,...
#bootstrap.servers=149.0.164.102:9092
bootstrap.servers=149.0.164.102:9092
# kafka topic name若不通过映射表名自动生产topic,指定发送数据到某个topic 此处需配置
#kafka.topic.name=dmhs_dmtest
# whether to enable JSON format check
json.format.check=0
# How many messages print cost time
print.message.num=1000
# How many messages batch to get
dmhs.min.batch.size=20
# kafka serializer class
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka partitioner config
partitioner.class=com.dameng.dmhs.dmga.service.impl.OnePartitioner
# kafka request acks config
acks=0
max.request.size=5024000
#batch.size=1048576
#linger.ms=3
#buffer.memory=134217728
retries=3
#enable.idempotence=true
compression.type=none
max.in.flight.requests.per.connection=1
send.buffer.bytes=1048576
metadata.max.age.ms=100000
dmhs.send.policy=0
dmhs.send.kafka.mode=1
dmhs.sendTopic.parse.format=table
topic.map.conf.path=/home/dmdba/dmhs4314/bin/topicmap.properties#topic映射配置

映射topic配置
cat topicmap.properties
#表名=kafka主题名称,默认生成表名相同的topic
test1=testdm1
test2=testdm2
自定义json格式配置
默认的格式不太方便使用,可自行配置
dmhs/bin目录下
json_format.ini

#DMHS kafka json format Configuration file
#this is comments
#common format control Bparameters
BATCH_COMMIT                  = 0
NEED_CRLF                       = 1
OPCMD_LEN                      = 8
SET_NULL                        = 1
SET_QUOTA                       = 0
CHAR_REPLACE            = (",\"),(\,\\),(0x0D,\n),(0x0A,\r),(0x0c,\f)
NEW_VALUES                      = ALL
SET_TIME_EPOCH                  = 0
LOB_PIECE                         = 0
CLOB_FORMAT                     = CHAR
BLOB_FORMAT                     = BASE64
ADD_TABLE_TOPIC                 = 1   
json_format ={
"database": "#SCHEMA",
"table": "#TABLE",
"op_type": "#OP_TYPE",
"eventime": "#OP_TIME",
"data": #CONCAT{#NEW_VALUES}#CUT_EMPTY,
"old": #CONCAT{#OLD_VALUES}#CUT_EMPTY,
"ddl": #CONCAT{#DDL_SQL}#CUT_EMPTY
}

目的端启动脚本
dmdba@35:~/dmhs/bin$ cat start_dmhs_kafka.sh

export LANG=en_US.UTF-8
/home/dmdba/jdk1.8.0_301/bin/java -Djava.ext.dirs="/usr/local/kafka/libs:." com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService  /home/dmdba/dmhs4314/bin/dmhs_kafka.properties

三、启动软件

源端
注册服务后后台启动

root@45:/home/dmdba/dmhs/scripts/root# ./dmhs_service_installer.sh -t dmhs_server -d /home/dmdba/dmhs/bin -p Dmhs -x /home/dmdba/dmhs/bin/dmhs.hs
Created symlink /etc/systemd/system/multi-user.target.wants/DmhsServiceDmhs.service → /lib/systemd/system/DmhsServiceDmhs.service.
创建服务(DmhsServiceDmhs)完成

启动服务

dmdba@45:~/dmhs/bin$ ./DmhsServiceDmhs start 
Starting DmhsServiceDmhs: [ OK ]

目的端
kafka目的端不需要注册服务

dmdba@178-35:~/dmhs4314/bin$ nohup ./start_dmhs_kafka.sh & 
[1] 64519
nohup: 忽略输入并把输出追加到'nohup.out

四、同步数据验证

4.1 全量初始化

dmdba@-45:~/dmhs/bin$ ./dmhs_console 
DMHS console tool: V4.3.08-Build(2023.03.31-127399trunc)_64_2303_sp8
Copyright (c) 2020, DMHS. All rights reserved.
Type ? or "help" for help, type "quit" to quit console.

连接到DMHS:127.0.0.1:5345
执行成功
Dameng HS Server V4.3.08-Build(2023.03.31-127399trunc)_64_2303_sp8

DMHS> copy 0 "sch.name='ajgl' and tab.name in('test1','test2')" insert
copy mask is : |INSERT|TABLE|PARTITION|OBJID|REP
执行完成,请查看执行模块日志,检查数据装载是否成功

验证数据
当目的端日志出现一下信息表示发送成功
image.png

kafka查看自动生产topic testdm1 dmtestdm2
image.png

评论
后发表回复

作者

文章

阅读量

获赞

扫一扫
联系客服