注册
DM-flinkcdc测试
培训园地/ 文章详情 /

DM-flinkcdc测试

哈哈 2025/08/06 17 0 0

1参考链接

https://blog.csdn.net/ytp552200ytp/article/details/144337547
!!!本环境为最简配置,仅验证同步的可行性。

2安装flink

Apache Flink官网下载安装包。本次使用的版本是flink 1.20.2
image.png

上传至Linux服务器,解压
tar -zxvf flink-1.20.2-bin-scala_2.12.tgz

配置flink环境变量:

[dmdba@centos701 ~]$ cat ~/.bash_profile 
。。。前面的省略
export FLINK_HOME=/home/dmdba/flink/flink-1.20.2
export PATH=$PATH:$FLINK_HOME/bin

配置java环境变量,使用dm安装包的java环境

[root@centos701 log]# cat /etc/profile
。。。前面的省略
export JAVA_HOME=/home/dmdba/dmdbms/jdk
export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$PATH

3配置并启动flink

flink解压后目录如下:
image.png

其中bin目录存放执行码,conf目录下存放配置文件,lib目录下存放依赖的jar包,log目录下存放flink的日志。
修改配置文件conf/config.yaml,修改所有的bind-host、address、bind-address为0.0.0.0,具体原因见后续问题记录。

启动flink:
进入bin目录,执行./start-cluster.sh启动flink
若要关闭则用./stop-cluster.sh
image.png

启动成功后,查看端口监听,默认端口为8081,浏览器打开ip:8081即可
image.png

4dm-mysql同步

4.1准备DM环境

初始化实例,开启归档,开启逻辑附加日志。步骤省略。

ARCH_INI = 1
RLOG_APPEND_LOGIC = 2

创建测试表:

CREATE TABLE "SYSDBA"."MYSQL_TEST"
(
"A" INT NOT NULL,
"B" VARCHAR(10),
NOT CLUSTER PRIMARY KEY("A"));

检查归档中能否正常看到这个表相关的操作记录:
!!!这里如果查不到,会影响后续同步。

--拿最新的归档文件全路径
Select * from v$archived_log;

--添加归档文件并分析
DBMS_LOGMNR.ADD_LOGFILE('/home/dmdba/dmdbms/bin/DAMENG/arch/ARCHIVE_LOCAL_0x2A269CC8_EP0_2025-08-04_15-35-29.log');
DBMS_LOGMNR.START_LOGMNR(OPTIONS => 2130);

-- 查看表上的操作记录
SELECT OPERATION_CODE, SCN, SQL_REDO, TIMESTAMP, SEG_OWNER, TABLE_NAME 
FROM V$LOGMNR_CONTENTS
WHERE TABLE_NAME = 'MYSQL_TEST';

4.2准备MySQL环境

环境无特殊要求,如果是远程连接开放对应的权限即可,否则会报错无法打开jdbc连接。
建表:

CREATE TABLE MYSQL_TEST
(
A INT NOT NULL,
B VARCHAR(10),
PRIMARY KEY("A"));

4.3配置dm-mysql的同步

上传flink-connector-dm-cdc-3.3.0.jar、mysql-connector-j-8.0.33.jar等jar包到flink的lib目录下。
下载链接:
https://eco.dameng.com/community/post/202411191132168PD86B201G3HYB7NK5

这里我是把“参考程序\testflinkcdc\target\flink-cdc-test\flink-cdc-test\lib”下面的所有jar包都拷贝进去了,然后又单独下载了flink-connector-jdbc-3.3.0-1.20.jar上传上去。

配置flinksql文件:

[root@centos701 flink]# cat sql_client_init.sql
SET sql-client.execution.result-mode=tableau;
SET 'table.exec.sink.not-null-enforcer' = 'DROP';
SET 'pipeline.operator-chaining' = 'false';
SET 'log.level' = 'DEBUG';
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE TEST_SOURCE (     
        A INT NOT NULL,
        B string, 
        PRIMARY KEY(A) NOT ENFORCED   
         ) WITH (    
          'connector' = 'dm-cdc',    
          'hostname' = '192.168.13.205',    
          'port' = '5236',    
          'username' = 'SYSDBA',    
          'password' = 'Test123456',    
          'database-name' = 'Dameng',    
          'schema-name' = 'SYSDBA',    
          'table-name' = 'MYSQL_TEST',   
          'debezium.database.tablename.case.insensitive' =  'false',  
          'debezium.lob.enabled' = 'true'  ,
          'scan.startup.mode' = 'initial'
          );
          
CREATE TABLE TEST_SINK (   
         A INT not null,     
         B String,  
         PRIMARY KEY(A) NOT ENFORCED   
         ) WITH (   
          'connector' = 'jdbc',  
          'url' ='jdbc:mysql://192.168.13.89:3306/hql',  
          'username' = 'root',  
          'password' = '123456',  
          'table-name' = 'MYSQL_TEST',  
          'driver' = 'com.mysql.cj.jdbc.Driver',  
          'scan.fetch-size' = '200' ,
      'scan.auto-commit' = 'true',
      'sink.buffer-flush.interval' = '1s',
      'sink.buffer-flush.max-rows' = '1'
          ) ;

执行同步:

进入bin目录
./sql-client.sh -i /home/dmdba/flink/sql_client_init.sql

insert into TEST_SINK select * from TEST_SOURCE;

image.png

测试同步:

Dm执行插入并提交:
Insert into mysql_test values (1,’haha’);
Commit;

Mysql查询:
Select * from mysql_test;

5问题处理与记录

5.1Flink的web界面无法远程访问

使用默认的conf/config.yaml配置文件,启动flink后,查看本地8081端口监听都在,并且防火墙都已经关闭,但是远程访问时无法打开web链接。
处理方法:把conf/config.yaml配置文件中所有的地址由localhost改成0.0.0.0

5.2Flink报错无法打开jdbc连接

查看具体的日志,显示的是到mysql连接异常,开放远程连接的权限即可。
image.png

create user 'root'@'%' identified by '123456';
grant all privileges on *.* to 'root'@'%' with grant option;
flush privileges;

5.3Flink报错could not find any factory for identifier ‘jdbc’

image.png
下载flink-connector-jdbc-3.3.0-1.20.jar放到flink的lib目录下。

评论
后发表回复

作者

文章

阅读量

获赞

扫一扫
联系客服