flink-dm-cdc通过jdbc驱动调用数据库中的DBMS_LOGMNR包来实现数据抽取。
Tips: $DM_HOME/jar/com.dameng.logmnr.jar 则是通过dmlogmnr
client.so dmdpi.so等库函数来实现,使用时需注意环境变量配置
。
仅支持DM8 2023年3季度及以后的版本,版本号不得小于8.1.3.77。
--8.1.3.77前的版本:
SQL> select build_version from v$instance
union all select id_code
union all select * from v$version;2 3
行号 BUILD_VERSION
---------- ------------------------------------------
1 1-3-12-2023.06.15-193278-20040-ENT
2 --03134284044-20230615-193278-20040 Pack11
3 DM Database Server 64 V8
4 DB Version: 0x7000c
5 03134284044-20230615-193278-20040
已用时间: 1.084(毫秒). 执行号:1401.
SQL> set pagesize 100;
SQL> desc V$LOGMNR_CONTENTS;
行号 NAME TYPE$ NULLABLE
---------- ---------------- ------------- --------
1 SCN BIGINT Y
2 START_SCN BIGINT Y
3 COMMIT_SCN BIGINT Y
4 TIMESTAMP DATETIME(6) Y
5 START_TIMESTAMP DATETIME(6) Y
6 COMMIT_TIMESTAMP DATETIME(6) Y
7 XIDUSN BIGINT Y
8 XIDSLT BIGINT Y
9 XIDSQN BIGINT Y
10 XID BINARY(8) Y
11 PXIDUSN BIGINT Y
12 PXIDSLT BIGINT Y
13 PXIDSQN BIGINT Y
14 PXID BINARY(8) Y
15 TX_NAME VARCHAR(256) Y
16 OPERATION VARCHAR(32) Y
17 OPERATION_CODE INTEGER Y
18 ROLL_BACK INTEGER Y
19 SEG_OWNER VARCHAR(128) Y
20 SEG_NAME VARCHAR(256) Y
21 TABLE_NAME VARCHAR(128) Y
22 SEG_TYPE INTEGER Y
23 SEG_TYPE_NAME VARCHAR(32) Y
24 TABLE_SPACE VARCHAR(32) Y
25 ROW_ID VARCHAR(20) Y
26 USERNAME VARCHAR(128) Y
27 OS_USERNAME VARCHAR(4000) Y
28 MACHINE_NAME VARCHAR(4000) Y
29 AUDIT_SESSIONID BIGINT Y
30 SESSION# BIGINT Y
31 SERIAL# BIGINT Y
32 SESSION_INFO VARCHAR(4000) Y
33 THREAD# BIGINT Y
34 SEQUENCE# INTEGER Y
35 RBASQN INTEGER Y
36 RBABLK INTEGER Y
37 RBABYTE INTEGER Y
38 UBAFIL BIGINT Y
39 UBABLK BIGINT Y
40 UBAREC BIGINT Y
41 UBASQN BIGINT Y
42 ABS_FILE# INTEGER Y
43 REL_FILE# INTEGER Y
44 DATA_BLK# INTEGER Y
45 DATA_OBJ# INTEGER Y
46 DATA_OBJV# INTEGER Y
47 DATA_OBJD# INTEGER Y
48 SQL_REDO VARCHAR(4000) Y
49 SQL_UNDO VARCHAR(4000) Y
50 RS_ID VARCHAR(32) Y
51 SSN INTEGER Y
52 CSF INTEGER Y
53 INFO VARCHAR(32) Y
54 STATUS INTEGER Y
55 REDO_VALUE BIGINT Y
56 UNDO_VALUE BIGINT Y
57 SAFE_RESUME_SCN BIGINT Y
58 CSCN BIGINT Y
59 OBJECT_ID BINARY(16) Y
60 EDITION_NAME VARCHAR(30) Y
61 CLIENT_ID VARCHAR(64) Y
61 rows got
已用时间: 1.412(毫秒). 执行号:1403.
--8.1.3.77后的版本:
SQL> select build_version from v$instance
union all select id_code
union all select * from v$version;2 3
行号 BUILD_VERSION
---------- ------------------------------------------
1 1-3-162-2024.09.20-243574-20108-ENT
2 --03134284194-20240920-243574-20108 Pack18
3 DM Database Server 64 V8
4 DB Version: 0x7000c
5 03134284194-20240920-243574-20108
6 Msg Version: 14
7 Gsu level(5) cnt: 0
7 rows got
已用时间: 4.092(毫秒). 执行号:1201.
SQL> set pagesize 100;
SQL> desc V$LOGMNR_CONTENTS;
行号 NAME TYPE$ NULLABLE
---------- ---------------- ------------- --------
1 SCN BIGINT Y
2 START_SCN BIGINT Y
3 COMMIT_SCN BIGINT Y
4 TIMESTAMP DATETIME(6) Y
5 START_TIMESTAMP DATETIME(6) Y
6 COMMIT_TIMESTAMP DATETIME(6) Y
7 XIDUSN BIGINT Y
8 XIDSLT BIGINT Y
9 XIDSQN BIGINT Y
10 XID BINARY(8) Y
11 PXIDUSN BIGINT Y
12 PXIDSLT BIGINT Y
13 PXIDSQN BIGINT Y
14 PXID BINARY(8) Y
15 TX_NAME VARCHAR(256) Y
16 OPERATION VARCHAR(32) Y
17 OPERATION_CODE INTEGER Y
18 ROLL_BACK INTEGER Y
19 SEG_OWNER VARCHAR(128) Y
20 SEG_NAME VARCHAR(256) Y
21 TABLE_NAME VARCHAR(128) Y
22 SEG_TYPE INTEGER Y
23 SEG_TYPE_NAME VARCHAR(32) Y
24 TABLE_SPACE VARCHAR(32) Y
25 ROW_ID VARCHAR(20) Y
26 USERNAME VARCHAR(128) Y
27 OS_USERNAME VARCHAR(4000) Y
28 MACHINE_NAME VARCHAR(4000) Y
29 AUDIT_SESSIONID BIGINT Y
30 SESSION# BIGINT Y
31 SERIAL# BIGINT Y
32 SESSION_INFO VARCHAR(4000) Y
33 THREAD# BIGINT Y
34 SEQUENCE# INTEGER Y
35 RBASQN INTEGER Y
36 RBABLK INTEGER Y
37 RBABYTE INTEGER Y
38 UBAFIL BIGINT Y
39 UBABLK BIGINT Y
40 UBAREC BIGINT Y
41 UBASQN BIGINT Y
42 ABS_FILE# INTEGER Y
43 REL_FILE# INTEGER Y
44 DATA_BLK# INTEGER Y
45 DATA_OBJ# INTEGER Y
46 DATA_OBJV# INTEGER Y
47 DATA_OBJD# INTEGER Y
48 SQL_REDO VARCHAR(4000) Y
49 SQL_UNDO VARCHAR(4000) Y
50 RS_ID VARCHAR(32) Y
51 SSN INTEGER Y
52 CSF INTEGER Y
53 INFO VARCHAR(32) Y
54 STATUS INTEGER Y
55 REDO_VALUE BIGINT Y
56 UNDO_VALUE BIGINT Y
57 SAFE_RESUME_SCN BIGINT Y
58 CSCN BIGINT Y
59 OBJECT_ID BINARY(16) Y
60 EDITION_NAME VARCHAR(30) Y
61 CLIENT_ID VARCHAR(64) Y
62 SUBTAB_ID INTEGER Y
62 rows got
已用时间: 19.954(毫秒). 执行号:1202.
1.8.0
1.17.0 (1.13-1.17)
2.4.1
##确认java版本
[root@localhost ~]# java -version
openjdk version "1.8.0_402"
OpenJDK Runtime Environment Bisheng (build 1.8.0_402-b06)
OpenJDK 64-Bit Server VM Bisheng (build 25.402-b06, mixed mode)
##下载,解压
tar xf flink-1.17.0-bin-scala_2.12.tgz
##修改flink配置
vim flink-1.17.0/conf/flink-conf.yaml
##设置以下参数
rest.address: 192.168.131.151
rest.bind-address: 192.168.131.151
taskmanager.numberOfTaskSlots: 10
parallelism.default: 6
##启动
cd flink-1.17.0/bin/
./start-cluster.sh
##访问overview
http://192.168.131.151:8081/
1.单机模式支持
2.主备模式的主库支持,但是使用时要注意主备切换,
3.主备模式的备库不支持
推荐替代方案
如果需要日志挖掘功能,可以选择以下方法:
逻辑备库
:逻辑备库支持逻辑层面的同步,可直接用于日志挖掘。例如使用DMHS搭建的备库,但是DMHS可以直接将数据写入kafka
快照备库
:将物理备库临时转换为快照备库,执行日志挖掘后恢复同步。
主库或归档日志
:直接在主库或独立环境中分析归档日志(归档日志对生产环境影响最小)。
4.DCS不支持,V$LOGMNR_CONTENTS是存在于实例中的动态性能视图,多实例的归档分析汇总较为复杂
5.DPC
DMDPC 使用 DBMS_LOGMNR 时, DBMS_LOGMNR.ADD_LOGFILE 只能添加同一个节点的多个日志同时进行分析,不支持同时分析不同节点的日志。而单节点的归档中事务信息未必完整,所以也不建议使用。
6.MPP不支持 DBMS_LOGMNR 包。
1.开启归档
--切换到配置模式
SQL> alter database mount;
--打开归档
SQL> alter database archivelog;
--配置归档文件
SQL> alter database add archivelog 'type=local,dest=/home/dmdba/arch,file_size=64,space_limit=0';
--切换到打开模式
SQL> alter database open;
归档路径大小的限制:
2.开启逻辑附加日志
--开启附加日志
--sp_set_para_value(范围,参数名,参数值);SP_SET_PARA_VALUE (scope int, paraname varchar(256), value bigint)
--SCOPE 参数为 0 表示修改内存中的动态配置参数值;参数为 1 表示修改内存和 INI 文件中的动态配置参数值;参数为 2 表示只在 INI 文件中修改配置参数,此时可修改静态配置参数和动态配置参数。
sp_set_para_value(2,'RLOG_APPEND_LOGIC',2);
--RLOG_APPEND_LOGIC参数说明:
--1:如果有主键列,记录UPDATE和DELETE操作时只包含主键列信息,若没有主键列则包含所有列信息;
--2:不论是否有主键列,记录UPDATE和DELETE操作时都包含所有列的信息;
--3:记录UPDATE时包含更新列的信息以及ROWID,记录DELETE时只有ROWID;
--4:只生成事务以及DDL相关的逻辑日志
3.开启逻辑附加日志
全部表附加
sp_set_para_value(2,'RLOG_IGNORE_TABLE_SET',1);
表级别附加
--关闭全部表附加
sp_set_para_value(2,'RLOG_IGNORE_TABLE_SET',0);
--创建表
CREATE TABLE "TEST"."MYSQL_TEST"
(
"C1" INT,
"C2" VARCHAR(10))
STORAGE(ON "MAIN", CLUSTERBTR) ADD LOGIC LOG ;
--如果是已存在的表,单独为某张表开启附加日志:
ALTER TABLE TEST.MYSQL_TEST2 ADD LOGIC LOG;
--查询当前归档日志:
SELECT NAME ,FIRST_TIME , NEXT_TIME , FIRST_CHANGE# , NEXT_CHANGE# FROM V$ARCHIVED_LOG;
--快速生成添加日志SQL:
SELECT 'DBMS_LOGMNR.ADD_LOGFILE(''' || NAME || ''');'
,FIRST_TIME
, NEXT_TIME
, FIRST_CHANGE#
, NEXT_CHANGE# FROM V$ARCHIVED_LOG;
--添加需要挖掘的日志
DBMS_LOGMNR.ADD_LOGFILE('/home/dmdba/arch/ARCHIVE_LOCAL1_0xBE364BB_EP0_2024-11-27_09-44-06.log');
DBMS_LOGMNR.ADD_LOGFILE('/home/dmdba/arch/ARCHIVE_LOCAL1_0xBE364BB_EP0_2024-11-27_10-10-19.log');
DBMS_LOGMNR.ADD_LOGFILE('/home/dmdba/arch/ARCHIVE_LOCAL1_0xBE364BB_EP0_2024-11-27_10-57-24.log');
--验证是否添加成功
SELECT LOW_SCN,NEXT_SCN, LOW_TIME, HIGH_TIME, LOG_ID, FILENAME FROM V$LOGMNR_LOGS;
--开始挖掘
DBMS_LOGMNR.START_LOGMNR();
--Tips:停止挖掘的指令
DBMS_LOGMNR.END_LOGMNR();
--修改数据
insert into "TEST"."MYSQL_TEST"("C1", "C2") VALUES(6,'打断点');
insert into "TEST"."MYSQL_TEST"("C1", "C2") VALUES(7,'呃呃呃');
update "TEST"."MYSQL_TEST" set C2 = '烦烦烦' where C1 = 7;
commit;
--查看动态性能视图 V$LOGMNR_CONTENTS
select SCN,START_TIMESTAMP,OPERATION,SESSION_INFO,SQL_REDO from V$LOGMNR_CONTENTS
验证是否是表级
--复制test表
insert into "TEST"."MYSQL_TEST2"("C1", "C2") VALUES(8,'嘎嘎嘎');
commit;
--再次查询动态性能视图 V$LOGMNR_CONTENTS,并没有TEST2相关的数据
select SCN,START_TIMESTAMP,OPERATION,SESSION_INFO,SQL_REDO from V$LOGMNR_CONTENTS
建库
create database TEST;
建表
CREATE TABLE MYSQL_TEST
(
C1
int(11) DEFAULT NULL,
C2
varchar(10) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
用户授权(IP为启动job的服务器IP)
CREATE USER 'root'@'192.168.131.151' IDENTIFIED BY 'FJ7e_nG_VVk6P*eq2pGWYTHQ1mVr)r';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'192.168.131.151';
flush privileges;
/*
* @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17
*
* Copyright (c) 2000-2020, 达梦数据库有限公司.
* All rights reserved.
*/
package com.dameng.flinkcdc.dm;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
//注意这里的class名称,调用的时候会用到
public class DMFlinkSQL
{
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//配置检查点
env.enableCheckpointing(10 * 1000);//启用检查点,每分钟执行一次60
env.getCheckpointConfig().setCheckpointTimeout(3600*1000);//检查点超时时间
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);//检查点最小间隔
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//检查点并发数
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:////opt/flink-1.17.0/log"));//检查点保存位置,可以写HDFS地址、文件地址、Flink内置RocksDB等。
//构建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//源表注册
tableEnv.executeSql("CREATE TABLE TEST_SOURCE ( \r\n" +
" C1 INT not null, \r\n" +
" C2 String, \r\n" +
" PRIMARY KEY(C1) NOT ENFORCED \r\n" +
" ) WITH ( \r\n" +
" 'connector' = 'dm-cdc', \r\n" +
" 'hostname' = '192.168.131.151', \r\n" +
" 'port' = '5236', \r\n" +
" 'username' = 'SYSDBA', \r\n" +
" 'password' = 'SYSDBA', \r\n" +
" 'database-name' = 'DAMENG', \r\n" +
" 'schema-name' = 'TEST', \r\n" +
" 'table-name' = 'MYSQL_TEST', \r\n"
+ " 'scan.startup.mode' = 'initial', \n"
+ " 'debezium.database.tablename.case.insensitive' = 'false', \n "
+ " 'debezium.lob.enabled' = 'true' \n"
+ " );" );
//目标表注册
tableEnv.executeSql("CREATE TABLE TEST_SINK ( \n" +
" C1 INT not null, \r\n" +
" C2 String, \r\n" +
" PRIMARY KEY(C1) NOT ENFORCED \r\n" +
" ) WITH ( \r\n"
+ " 'connector' = 'jdbc', \n "
+ " 'url' ='jdbc:mysql://192.168.131.128:3306/test', \n "
+ " 'username' = 'root', \n "
+ " 'password' = 'FJ7e_nG_VVk6P*eq2pGWYTHQ1mVr)r', \n "
+ " 'table-name' = 'MYSQL_TEST', \n "
+ " 'driver' = 'com.mysql.cj.jdbc.Driver', \n "
+ " 'scan.fetch-size' = '200' \n"
+ " )" );
//开启CDC日志捕获
tableEnv.executeSql("insert into TEST_SINK select * from TEST_SOURCE ");
}
}
切换到flink/bin目录,执行如下指令,使用DMFlinkSQL启动任务
./flink run -c com.dameng.flinkcdc.dm.DMFlinkSQL ./testflinkcdc-2.4.1.jar
注意这里的Job ID,由于上述代码中选择了本地文件存储检查点,flink-1.17.0/log 路径下会生成于此ID同名的目录,任务中断后,此目录下会生成 chk- 开头的目录,该目录下会有一个文件 _metadata,用于存储同步信息。从断点恢复会用到。
cd /opt/flink-1.17.0/bin ./flink run -s file:////opt/flink-1.17.0/log/945ea0844f778d28b7a02c81eaf0ea45/chk-3 -c com.dameng.flinkcdc.dm.DMFlinkSQL ./testflinkcdc-2.4.1.jar
任务启动后查询源端目的地的数据,查看是否同步,如果同步任务有异常,会自动重启任务,可在Exception查看具体报错,例如
技术支持:dmtech@dameng.com
文章
阅读量
获赞