dmhs版本:dmhs_V4.3.14
数据库版本:DM8
安装略
开启归档
alter database mount;
alter database archivelog;
alter database add archivelog 'DEST=/data/dmdata/dmarch, TYPE=LOCAL, FILE_SIZE=512, SPACE_LIMIT=102400';
alter database open;
开启归档逻辑日志
用于增量捕获数据同步,建议开启2
sp_set_para_value(1,'RLOG_APPEND_LOGIC',2)
配置DDL同步
导入脚本,在dmhs安装后目录下
disql SYSDBA/SYSDBA -C "set define off" \`/home/dmdba/dmhs/scripts/ddl_sql_dm8.sql
源端
./dmhs_V4.3.08_dm8_rev127399_FTarm_kylin4_64_veri_20230407_sp8.bin -i
目的端
可直接copy源端dmhs目录
同步配置需要配置如下配置文件,这里自己编写配置文件方式
源端:dmhs.hs 源端同步信息配置
目的端 dmhs.hs 目的端同步信息配置
dmhs_kafka.properties 连接kafka配置信息填写,
更多配置信息,可查看管理员手册和参考手册
dmhs/bin目录下
<?xml version="1.0" encoding="GB2312" standalone="no"?>
<dmhs>
<base>
<lang>ch</lang>
<mgr_port>5345</mgr_port>
<ckpt_interval>60</ckpt_interval>
<siteid>1</siteid>
<version>2.0</version>
</base>
<cpt>
<db_type>DM8</db_type>
<db_server>xx.45</db_server>
<db_user>SYSDBA</db_user>
<db_pwd>SYSDBA</db_pwd>
<db_port>5236</db_port>
<idle_time>10</idle_time>
<parse_thr>1</parse_thr>
<ddl_mask>OP:OBJ:REC</ddl_mask>
<char_code>PG_UTF8</char_code>
<arch>
<clear_interval>60</clear_interval>
<clear_flag>0</clear_flag>
</arch>
<send>
<ip>149.0.178.35</ip>
<mgr_port>5345</mgr_port>
<data_port>5346</data_port>
<net_turns>0</net_turns>
<trigger>1</trigger>
<constraint>1</constraint>
<identity>1</identity>
<filter>
<enable>
<!--<item>schema.tabname</item>-->
<item>test.test1</item>
<item>test.test2</item>
</enable>
<disable/>
</filter>
<map>
</map>
</send>
</cpt>
</dmhs>
dmhs/bin目录下
dmhs.hs
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<dmhs>
<base>
<lang>ch</lang>
<mgr_port>5345</mgr_port>
<chk_interval>3</chk_interval>
<ckpt_interval>60</ckpt_interval>
<siteid>2</siteid>
<version>2.0</version>
</base>
<exec>
<recv>
<data_port>5346</data_port>
</recv>
<db_name></db_name>
<exec_thr>1</exec_thr>
<case_sensitive>0</case_sensitive>
<exec_policy>0</exec_policy>
<toggle_case>0</toggle_case>
<ddl_mode>1</ddl_mode>
<enable_ddl>1</enable_ddl>
<commit_policy>1</commit_policy>
<enable_merge>0</enable_merge>
<enable_ddl>1</enable_ddl>
<is_kafka>1</is_kafka>
<save_dir>/home/dmdba/dmhs4314/log</save_dir>
<save_mask>json</save_mask>
<json_format>file</json_format>
<save_max_size>500</save_max_size>
</exec>
</dmhs>
Kafka配置文件dmhs_kafka.properties
发送数据到kafka需要配置kafka IP:PORT 以及主题名称,对于主题有两种方式配置
一种是所有表数据发送到一个topic,另一种是每个表发送到单独的topic中去,若kafka没有topic同步数据时会自动创建
dmhs.conf.path=/home/dmdba/dmhs4314/bin/dmhs.hs
# kafka broker list,such as ip1:port1,ip2:port2,...
#bootstrap.servers=149.0.164.102:9092
bootstrap.servers=149.0.164.102:9092
# kafka topic name若不通过映射表名自动生产topic,指定发送数据到某个topic 此处需配置
#kafka.topic.name=dmhs_dmtest
# whether to enable JSON format check
json.format.check=0
# How many messages print cost time
print.message.num=1000
# How many messages batch to get
dmhs.min.batch.size=20
# kafka serializer class
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka partitioner config
partitioner.class=com.dameng.dmhs.dmga.service.impl.OnePartitioner
# kafka request acks config
acks=0
max.request.size=5024000
#batch.size=1048576
#linger.ms=3
#buffer.memory=134217728
retries=3
#enable.idempotence=true
compression.type=none
max.in.flight.requests.per.connection=1
send.buffer.bytes=1048576
metadata.max.age.ms=100000
dmhs.send.policy=0
dmhs.send.kafka.mode=1
dmhs.sendTopic.parse.format=table
topic.map.conf.path=/home/dmdba/dmhs4314/bin/topicmap.properties#topic映射配置
映射topic配置
cat topicmap.properties
#表名=kafka主题名称,默认生成表名相同的topic
test1=testdm1
test2=testdm2
自定义json格式配置
默认的格式不太方便使用,可自行配置
dmhs/bin目录下
json_format.ini
#DMHS kafka json format Configuration file
#this is comments
#common format control Bparameters
BATCH_COMMIT = 0
NEED_CRLF = 1
OPCMD_LEN = 8
SET_NULL = 1
SET_QUOTA = 0
CHAR_REPLACE = (",\"),(\,\\),(0x0D,\n),(0x0A,\r),(0x0c,\f)
NEW_VALUES = ALL
SET_TIME_EPOCH = 0
LOB_PIECE = 0
CLOB_FORMAT = CHAR
BLOB_FORMAT = BASE64
ADD_TABLE_TOPIC = 1
json_format ={
"database": "#SCHEMA",
"table": "#TABLE",
"op_type": "#OP_TYPE",
"eventime": "#OP_TIME",
"data": #CONCAT{#NEW_VALUES}#CUT_EMPTY,
"old": #CONCAT{#OLD_VALUES}#CUT_EMPTY,
"ddl": #CONCAT{#DDL_SQL}#CUT_EMPTY
}
目的端启动脚本
dmdba@35:~/dmhs/bin$ cat start_dmhs_kafka.sh
export LANG=en_US.UTF-8
/home/dmdba/jdk1.8.0_301/bin/java -Djava.ext.dirs="/usr/local/kafka/libs:." com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService /home/dmdba/dmhs4314/bin/dmhs_kafka.properties
源端
注册服务后后台启动
root@45:/home/dmdba/dmhs/scripts/root# ./dmhs_service_installer.sh -t dmhs_server -d /home/dmdba/dmhs/bin -p Dmhs -x /home/dmdba/dmhs/bin/dmhs.hs
Created symlink /etc/systemd/system/multi-user.target.wants/DmhsServiceDmhs.service → /lib/systemd/system/DmhsServiceDmhs.service.
创建服务(DmhsServiceDmhs)完成
启动服务
dmdba@45:~/dmhs/bin$ ./DmhsServiceDmhs start
Starting DmhsServiceDmhs: [ OK ]
目的端
kafka目的端不需要注册服务
dmdba@178-35:~/dmhs4314/bin$ nohup ./start_dmhs_kafka.sh &
[1] 64519
nohup: 忽略输入并把输出追加到'nohup.out
dmdba@-45:~/dmhs/bin$ ./dmhs_console
DMHS console tool: V4.3.08-Build(2023.03.31-127399trunc)_64_2303_sp8
Copyright (c) 2020, DMHS. All rights reserved.
Type ? or "help" for help, type "quit" to quit console.
连接到DMHS:127.0.0.1:5345
执行成功
Dameng HS Server V4.3.08-Build(2023.03.31-127399trunc)_64_2303_sp8
DMHS> copy 0 "sch.name='ajgl' and tab.name in('test1','test2')" insert
copy mask is : |INSERT|TABLE|PARTITION|OBJID|REP
执行完成,请查看执行模块日志,检查数据装载是否成功
验证数据
当目的端日志出现一下信息表示发送成功
kafka查看自动生产topic testdm1 dmtestdm2
文章
阅读量
获赞