注册
Dm8_flink_cdc连接器使用
专栏/培训园地/ 文章详情 /

Dm8_flink_cdc连接器使用

苏童 2024/12/02 1201 1 10
摘要

概述

flink-dm-cdc通过jdbc驱动调用数据库中的DBMS_LOGMNR包来实现数据抽取。

Tips: $DM_HOME/jar/com.dameng.logmnr.jar 则是通过dmlogmnrclient.so dmdpi.so等库函数来实现,使用时需注意环境变量配置

版本声明:

数据库版本

仅支持DM8 2023年3季度及以后的版本,版本号不得小于8.1.3.77。

--8.1.3.77前的版本: SQL> select build_version from v$instance union all select id_code union all select * from v$version;2 3 行号 BUILD_VERSION ---------- ------------------------------------------ 1 1-3-12-2023.06.15-193278-20040-ENT 2 --03134284044-20230615-193278-20040 Pack11 3 DM Database Server 64 V8 4 DB Version: 0x7000c 5 03134284044-20230615-193278-20040 已用时间: 1.084(毫秒). 执行号:1401. SQL> set pagesize 100; SQL> desc V$LOGMNR_CONTENTS; 行号 NAME TYPE$ NULLABLE ---------- ---------------- ------------- -------- 1 SCN BIGINT Y 2 START_SCN BIGINT Y 3 COMMIT_SCN BIGINT Y 4 TIMESTAMP DATETIME(6) Y 5 START_TIMESTAMP DATETIME(6) Y 6 COMMIT_TIMESTAMP DATETIME(6) Y 7 XIDUSN BIGINT Y 8 XIDSLT BIGINT Y 9 XIDSQN BIGINT Y 10 XID BINARY(8) Y 11 PXIDUSN BIGINT Y 12 PXIDSLT BIGINT Y 13 PXIDSQN BIGINT Y 14 PXID BINARY(8) Y 15 TX_NAME VARCHAR(256) Y 16 OPERATION VARCHAR(32) Y 17 OPERATION_CODE INTEGER Y 18 ROLL_BACK INTEGER Y 19 SEG_OWNER VARCHAR(128) Y 20 SEG_NAME VARCHAR(256) Y 21 TABLE_NAME VARCHAR(128) Y 22 SEG_TYPE INTEGER Y 23 SEG_TYPE_NAME VARCHAR(32) Y 24 TABLE_SPACE VARCHAR(32) Y 25 ROW_ID VARCHAR(20) Y 26 USERNAME VARCHAR(128) Y 27 OS_USERNAME VARCHAR(4000) Y 28 MACHINE_NAME VARCHAR(4000) Y 29 AUDIT_SESSIONID BIGINT Y 30 SESSION# BIGINT Y 31 SERIAL# BIGINT Y 32 SESSION_INFO VARCHAR(4000) Y 33 THREAD# BIGINT Y 34 SEQUENCE# INTEGER Y 35 RBASQN INTEGER Y 36 RBABLK INTEGER Y 37 RBABYTE INTEGER Y 38 UBAFIL BIGINT Y 39 UBABLK BIGINT Y 40 UBAREC BIGINT Y 41 UBASQN BIGINT Y 42 ABS_FILE# INTEGER Y 43 REL_FILE# INTEGER Y 44 DATA_BLK# INTEGER Y 45 DATA_OBJ# INTEGER Y 46 DATA_OBJV# INTEGER Y 47 DATA_OBJD# INTEGER Y 48 SQL_REDO VARCHAR(4000) Y 49 SQL_UNDO VARCHAR(4000) Y 50 RS_ID VARCHAR(32) Y 51 SSN INTEGER Y 52 CSF INTEGER Y 53 INFO VARCHAR(32) Y 54 STATUS INTEGER Y 55 REDO_VALUE BIGINT Y 56 UNDO_VALUE BIGINT Y 57 SAFE_RESUME_SCN BIGINT Y 58 CSCN BIGINT Y 59 OBJECT_ID BINARY(16) Y 60 EDITION_NAME VARCHAR(30) Y 61 CLIENT_ID VARCHAR(64) Y 61 rows got 已用时间: 1.412(毫秒). 执行号:1403. --8.1.3.77后的版本: SQL> select build_version from v$instance union all select id_code union all select * from v$version;2 3 行号 BUILD_VERSION ---------- ------------------------------------------ 1 1-3-162-2024.09.20-243574-20108-ENT 2 --03134284194-20240920-243574-20108 Pack18 3 DM Database Server 64 V8 4 DB Version: 0x7000c 5 03134284194-20240920-243574-20108 6 Msg Version: 14 7 Gsu level(5) cnt: 0 7 rows got 已用时间: 4.092(毫秒). 执行号:1201. SQL> set pagesize 100; SQL> desc V$LOGMNR_CONTENTS; 行号 NAME TYPE$ NULLABLE ---------- ---------------- ------------- -------- 1 SCN BIGINT Y 2 START_SCN BIGINT Y 3 COMMIT_SCN BIGINT Y 4 TIMESTAMP DATETIME(6) Y 5 START_TIMESTAMP DATETIME(6) Y 6 COMMIT_TIMESTAMP DATETIME(6) Y 7 XIDUSN BIGINT Y 8 XIDSLT BIGINT Y 9 XIDSQN BIGINT Y 10 XID BINARY(8) Y 11 PXIDUSN BIGINT Y 12 PXIDSLT BIGINT Y 13 PXIDSQN BIGINT Y 14 PXID BINARY(8) Y 15 TX_NAME VARCHAR(256) Y 16 OPERATION VARCHAR(32) Y 17 OPERATION_CODE INTEGER Y 18 ROLL_BACK INTEGER Y 19 SEG_OWNER VARCHAR(128) Y 20 SEG_NAME VARCHAR(256) Y 21 TABLE_NAME VARCHAR(128) Y 22 SEG_TYPE INTEGER Y 23 SEG_TYPE_NAME VARCHAR(32) Y 24 TABLE_SPACE VARCHAR(32) Y 25 ROW_ID VARCHAR(20) Y 26 USERNAME VARCHAR(128) Y 27 OS_USERNAME VARCHAR(4000) Y 28 MACHINE_NAME VARCHAR(4000) Y 29 AUDIT_SESSIONID BIGINT Y 30 SESSION# BIGINT Y 31 SERIAL# BIGINT Y 32 SESSION_INFO VARCHAR(4000) Y 33 THREAD# BIGINT Y 34 SEQUENCE# INTEGER Y 35 RBASQN INTEGER Y 36 RBABLK INTEGER Y 37 RBABYTE INTEGER Y 38 UBAFIL BIGINT Y 39 UBABLK BIGINT Y 40 UBAREC BIGINT Y 41 UBASQN BIGINT Y 42 ABS_FILE# INTEGER Y 43 REL_FILE# INTEGER Y 44 DATA_BLK# INTEGER Y 45 DATA_OBJ# INTEGER Y 46 DATA_OBJV# INTEGER Y 47 DATA_OBJD# INTEGER Y 48 SQL_REDO VARCHAR(4000) Y 49 SQL_UNDO VARCHAR(4000) Y 50 RS_ID VARCHAR(32) Y 51 SSN INTEGER Y 52 CSF INTEGER Y 53 INFO VARCHAR(32) Y 54 STATUS INTEGER Y 55 REDO_VALUE BIGINT Y 56 UNDO_VALUE BIGINT Y 57 SAFE_RESUME_SCN BIGINT Y 58 CSCN BIGINT Y 59 OBJECT_ID BINARY(16) Y 60 EDITION_NAME VARCHAR(30) Y 61 CLIENT_ID VARCHAR(64) Y 62 SUBTAB_ID INTEGER Y 62 rows got 已用时间: 19.954(毫秒). 执行号:1202.

JDK版本

1.8.0

Flink版本

1.17.0 (1.13-1.17)

Flink CDC 版本

2.4.1

图片0.png

环境准备

flink运行环境

##确认java版本 [root@localhost ~]# java -version openjdk version "1.8.0_402" OpenJDK Runtime Environment Bisheng (build 1.8.0_402-b06) OpenJDK 64-Bit Server VM Bisheng (build 25.402-b06, mixed mode) ##下载,解压 tar xf flink-1.17.0-bin-scala_2.12.tgz ##修改flink配置 vim flink-1.17.0/conf/flink-conf.yaml ##设置以下参数 rest.address: 192.168.131.151 rest.bind-address: 192.168.131.151 taskmanager.numberOfTaskSlots: 10 parallelism.default: 6 ##启动 cd flink-1.17.0/bin/ ./start-cluster.sh ##访问overview http://192.168.131.151:8081/

数据库

DM8

架构说明

1.单机模式支持

2.主备模式的主库支持,但是使用时要注意主备切换,

3.主备模式的备库不支持

推荐替代方案

  • 如果需要日志挖掘功能,可以选择以下方法:

    • 逻辑备库:逻辑备库支持逻辑层面的同步,可直接用于日志挖掘。例如使用DMHS搭建的备库,但是DMHS可以直接将数据写入kafka
    • 快照备库:将物理备库临时转换为快照备库,执行日志挖掘后恢复同步。
    • 主库或归档日志:直接在主库或独立环境中分析归档日志(归档日志对生产环境影响最小)。

4.DCS不支持,V$LOGMNR_CONTENTS是存在于实例中的动态性能视图,多实例的归档分析汇总较为复杂

5.DPC

DMDPC 使用 DBMS_LOGMNR 时, DBMS_LOGMNR.ADD_LOGFILE 只能添加同一个节点的多个日志同时进行分析,不支持同时分析不同节点的日志。而单节点的归档中事务信息未必完整,所以也不建议使用。

6.MPP不支持 DBMS_LOGMNR 包。

必要的配置

1.开启归档

--切换到配置模式 SQL> alter database mount; --打开归档 SQL> alter database archivelog; --配置归档文件 SQL> alter database add archivelog 'type=local,dest=/home/dmdba/arch,file_size=64,space_limit=0'; --切换到打开模式 SQL> alter database open;

归档路径大小的限制:

2.开启逻辑附加日志

--开启附加日志 --sp_set_para_value(范围,参数名,参数值);SP_SET_PARA_VALUE (scope int, paraname varchar(256), value bigint) --SCOPE 参数为 0 表示修改内存中的动态配置参数值;参数为 1 表示修改内存和 INI 文件中的动态配置参数值;参数为 2 表示只在 INI 文件中修改配置参数,此时可修改静态配置参数和动态配置参数。 sp_set_para_value(2,'RLOG_APPEND_LOGIC',2); --RLOG_APPEND_LOGIC参数说明: --1:如果有主键列,记录UPDATE和DELETE操作时只包含主键列信息,若没有主键列则包含所有列信息; --2:不论是否有主键列,记录UPDATE和DELETE操作时都包含所有列的信息; --3:记录UPDATE时包含更新列的信息以及ROWID,记录DELETE时只有ROWID; --4:只生成事务以及DDL相关的逻辑日志

3.开启逻辑附加日志

全部表附加

sp_set_para_value(2,'RLOG_IGNORE_TABLE_SET',1);

表级别附加

--关闭全部表附加 sp_set_para_value(2,'RLOG_IGNORE_TABLE_SET',0); --创建表 CREATE TABLE "TEST"."MYSQL_TEST" ( "C1" INT, "C2" VARCHAR(10)) STORAGE(ON "MAIN", CLUSTERBTR) ADD LOGIC LOG ; --如果是已存在的表,单独为某张表开启附加日志: ALTER TABLE TEST.MYSQL_TEST2 ADD LOGIC LOG; --查询当前归档日志: SELECT NAME ,FIRST_TIME , NEXT_TIME , FIRST_CHANGE# , NEXT_CHANGE# FROM V$ARCHIVED_LOG; --快速生成添加日志SQL: SELECT 'DBMS_LOGMNR.ADD_LOGFILE(''' || NAME || ''');' ,FIRST_TIME , NEXT_TIME , FIRST_CHANGE# , NEXT_CHANGE# FROM V$ARCHIVED_LOG;

图片1.png

--添加需要挖掘的日志 DBMS_LOGMNR.ADD_LOGFILE('/home/dmdba/arch/ARCHIVE_LOCAL1_0xBE364BB_EP0_2024-11-27_09-44-06.log'); DBMS_LOGMNR.ADD_LOGFILE('/home/dmdba/arch/ARCHIVE_LOCAL1_0xBE364BB_EP0_2024-11-27_10-10-19.log'); DBMS_LOGMNR.ADD_LOGFILE('/home/dmdba/arch/ARCHIVE_LOCAL1_0xBE364BB_EP0_2024-11-27_10-57-24.log'); --验证是否添加成功 SELECT LOW_SCN,NEXT_SCN, LOW_TIME, HIGH_TIME, LOG_ID, FILENAME FROM V$LOGMNR_LOGS;

图片2.png

--开始挖掘 DBMS_LOGMNR.START_LOGMNR(); --Tips:停止挖掘的指令 DBMS_LOGMNR.END_LOGMNR(); --修改数据 insert into "TEST"."MYSQL_TEST"("C1", "C2") VALUES(6,'打断点'); insert into "TEST"."MYSQL_TEST"("C1", "C2") VALUES(7,'呃呃呃'); update "TEST"."MYSQL_TEST" set C2 = '烦烦烦' where C1 = 7; commit; --查看动态性能视图 V$LOGMNR_CONTENTS select SCN,START_TIMESTAMP,OPERATION,SESSION_INFO,SQL_REDO from V$LOGMNR_CONTENTS

图片3.png

验证是否是表级

--复制test表 insert into "TEST"."MYSQL_TEST2"("C1", "C2") VALUES(8,'嘎嘎嘎'); commit; --再次查询动态性能视图 V$LOGMNR_CONTENTS,并没有TEST2相关的数据 select SCN,START_TIMESTAMP,OPERATION,SESSION_INFO,SQL_REDO from V$LOGMNR_CONTENTS

图片4.png

MYSQL5.7

建库

create database TEST;

建表

CREATE TABLE MYSQL_TEST (
C1 int(11) DEFAULT NULL,
C2 varchar(10) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

用户授权(IP为启动job的服务器IP)

CREATE USER 'root'@'192.168.131.151' IDENTIFIED BY 'FJ7e_nG_VVk6P*eq2pGWYTHQ1mVr)r'; GRANT ALL PRIVILEGES ON *.* TO 'root'@'192.168.131.151'; flush privileges;

代码准备

/* * @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17 * * Copyright (c) 2000-2020, 达梦数据库有限公司. * All rights reserved. */ package com.dameng.flinkcdc.dm; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; //注意这里的class名称,调用的时候会用到 public class DMFlinkSQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //配置检查点 env.enableCheckpointing(10 * 1000);//启用检查点,每分钟执行一次60 env.getCheckpointConfig().setCheckpointTimeout(3600*1000);//检查点超时时间 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);//检查点最小间隔 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//检查点并发数 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setStateBackend(new FsStateBackend("file:////opt/flink-1.17.0/log"));//检查点保存位置,可以写HDFS地址、文件地址、Flink内置RocksDB等。 //构建表执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //源表注册 tableEnv.executeSql("CREATE TABLE TEST_SOURCE ( \r\n" + " C1 INT not null, \r\n" + " C2 String, \r\n" + " PRIMARY KEY(C1) NOT ENFORCED \r\n" + " ) WITH ( \r\n" + " 'connector' = 'dm-cdc', \r\n" + " 'hostname' = '192.168.131.151', \r\n" + " 'port' = '5236', \r\n" + " 'username' = 'SYSDBA', \r\n" + " 'password' = 'SYSDBA', \r\n" + " 'database-name' = 'DAMENG', \r\n" + " 'schema-name' = 'TEST', \r\n" + " 'table-name' = 'MYSQL_TEST', \r\n" + " 'scan.startup.mode' = 'initial', \n" + " 'debezium.database.tablename.case.insensitive' = 'false', \n " + " 'debezium.lob.enabled' = 'true' \n" + " );" ); //目标表注册 tableEnv.executeSql("CREATE TABLE TEST_SINK ( \n" + " C1 INT not null, \r\n" + " C2 String, \r\n" + " PRIMARY KEY(C1) NOT ENFORCED \r\n" + " ) WITH ( \r\n" + " 'connector' = 'jdbc', \n " + " 'url' ='jdbc:mysql://192.168.131.128:3306/test', \n " + " 'username' = 'root', \n " + " 'password' = 'FJ7e_nG_VVk6P*eq2pGWYTHQ1mVr)r', \n " + " 'table-name' = 'MYSQL_TEST', \n " + " 'driver' = 'com.mysql.cj.jdbc.Driver', \n " + " 'scan.fetch-size' = '200' \n" + " )" ); //开启CDC日志捕获 tableEnv.executeSql("insert into TEST_SINK select * from TEST_SOURCE "); } }

运行测试

初始化任务

切换到flink/bin目录,执行如下指令,使用DMFlinkSQL启动任务

./flink run -c com.dameng.flinkcdc.dm.DMFlinkSQL ./testflinkcdc-2.4.1.jar

注意这里的Job ID,由于上述代码中选择了本地文件存储检查点,flink-1.17.0/log 路径下会生成于此ID同名的目录,任务中断后,此目录下会生成 chk- 开头的目录,该目录下会有一个文件 _metadata,用于存储同步信息。从断点恢复会用到。

图片5.png

断点恢复(savePoint)

cd /opt/flink-1.17.0/bin ./flink run -s file:////opt/flink-1.17.0/log/945ea0844f778d28b7a02c81eaf0ea45/chk-3 -c com.dameng.flinkcdc.dm.DMFlinkSQL ./testflinkcdc-2.4.1.jar

任务启动后查询源端目的地的数据,查看是否同步,如果同步任务有异常,会自动重启任务,可在Exception查看具体报错,例如
图片6.png

技术支持:dmtech@dameng.com

评论
后发表回复

作者

文章

阅读量

获赞

扫一扫
联系客服