https://blog.csdn.net/ytp552200ytp/article/details/144337547
!!!本环境为最简配置,仅验证同步的可行性。
Apache Flink官网下载安装包。本次使用的版本是flink 1.20.2
上传至Linux服务器,解压
tar -zxvf flink-1.20.2-bin-scala_2.12.tgz
配置flink环境变量:
[dmdba@centos701 ~]$ cat ~/.bash_profile
。。。前面的省略
export FLINK_HOME=/home/dmdba/flink/flink-1.20.2
export PATH=$PATH:$FLINK_HOME/bin
配置java环境变量,使用dm安装包的java环境
[root@centos701 log]# cat /etc/profile
。。。前面的省略
export JAVA_HOME=/home/dmdba/dmdbms/jdk
export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$PATH
flink解压后目录如下:
其中bin目录存放执行码,conf目录下存放配置文件,lib目录下存放依赖的jar包,log目录下存放flink的日志。
修改配置文件conf/config.yaml,修改所有的bind-host、address、bind-address为0.0.0.0,具体原因见后续问题记录。
启动flink:
进入bin目录,执行./start-cluster.sh启动flink
若要关闭则用./stop-cluster.sh
启动成功后,查看端口监听,默认端口为8081,浏览器打开ip:8081即可
初始化实例,开启归档,开启逻辑附加日志。步骤省略。
ARCH_INI = 1
RLOG_APPEND_LOGIC = 2
创建测试表:
CREATE TABLE "SYSDBA"."MYSQL_TEST"
(
"A" INT NOT NULL,
"B" VARCHAR(10),
NOT CLUSTER PRIMARY KEY("A"));
检查归档中能否正常看到这个表相关的操作记录:
!!!这里如果查不到,会影响后续同步。
--拿最新的归档文件全路径
Select * from v$archived_log;
--添加归档文件并分析
DBMS_LOGMNR.ADD_LOGFILE('/home/dmdba/dmdbms/bin/DAMENG/arch/ARCHIVE_LOCAL_0x2A269CC8_EP0_2025-08-04_15-35-29.log');
DBMS_LOGMNR.START_LOGMNR(OPTIONS => 2130);
-- 查看表上的操作记录
SELECT OPERATION_CODE, SCN, SQL_REDO, TIMESTAMP, SEG_OWNER, TABLE_NAME
FROM V$LOGMNR_CONTENTS
WHERE TABLE_NAME = 'MYSQL_TEST';
环境无特殊要求,如果是远程连接开放对应的权限即可,否则会报错无法打开jdbc连接。
建表:
CREATE TABLE MYSQL_TEST
(
A INT NOT NULL,
B VARCHAR(10),
PRIMARY KEY("A"));
上传flink-connector-dm-cdc-3.3.0.jar、mysql-connector-j-8.0.33.jar等jar包到flink的lib目录下。
下载链接:
https://eco.dameng.com/community/post/202411191132168PD86B201G3HYB7NK5
这里我是把“参考程序\testflinkcdc\target\flink-cdc-test\flink-cdc-test\lib”下面的所有jar包都拷贝进去了,然后又单独下载了flink-connector-jdbc-3.3.0-1.20.jar上传上去。
配置flinksql文件:
[root@centos701 flink]# cat sql_client_init.sql
SET sql-client.execution.result-mode=tableau;
SET 'table.exec.sink.not-null-enforcer' = 'DROP';
SET 'pipeline.operator-chaining' = 'false';
SET 'log.level' = 'DEBUG';
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE TEST_SOURCE (
A INT NOT NULL,
B string,
PRIMARY KEY(A) NOT ENFORCED
) WITH (
'connector' = 'dm-cdc',
'hostname' = '192.168.13.205',
'port' = '5236',
'username' = 'SYSDBA',
'password' = 'Test123456',
'database-name' = 'Dameng',
'schema-name' = 'SYSDBA',
'table-name' = 'MYSQL_TEST',
'debezium.database.tablename.case.insensitive' = 'false',
'debezium.lob.enabled' = 'true' ,
'scan.startup.mode' = 'initial'
);
CREATE TABLE TEST_SINK (
A INT not null,
B String,
PRIMARY KEY(A) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' ='jdbc:mysql://192.168.13.89:3306/hql',
'username' = 'root',
'password' = '123456',
'table-name' = 'MYSQL_TEST',
'driver' = 'com.mysql.cj.jdbc.Driver',
'scan.fetch-size' = '200' ,
'scan.auto-commit' = 'true',
'sink.buffer-flush.interval' = '1s',
'sink.buffer-flush.max-rows' = '1'
) ;
执行同步:
进入bin目录
./sql-client.sh -i /home/dmdba/flink/sql_client_init.sql
insert into TEST_SINK select * from TEST_SOURCE;
测试同步:
Dm执行插入并提交:
Insert into mysql_test values (1,’haha’);
Commit;
Mysql查询:
Select * from mysql_test;
使用默认的conf/config.yaml配置文件,启动flink后,查看本地8081端口监听都在,并且防火墙都已经关闭,但是远程访问时无法打开web链接。
处理方法:把conf/config.yaml配置文件中所有的地址由localhost改成0.0.0.0
查看具体的日志,显示的是到mysql连接异常,开放远程连接的权限即可。
create user 'root'@'%' identified by '123456';
grant all privileges on *.* to 'root'@'%' with grant option;
flush privileges;
下载flink-connector-jdbc-3.3.0-1.20.jar放到flink的lib目录下。
文章
阅读量
获赞