技术分享/ 文章详情 /

DM8 Flink CDC 连接器工具

达梦数据技术支持 2024/11/29 3678 22 49

功能描述

  DM8 Flink CDC 连接器是在 Flink 框架的基础上对 DM8 数据进行实时采集。底层使用 DM 提供的 DBMS_LOGMNR 包对归档日志进行挖掘,采用已有的解析工具对 SQL 进行解析,重构出 DDL 和 DML 操作,将结构化的数据下发给下游消费。
  DM8 Flink CDC 连接器已适配 Flink SQL,可以通过使用纯 SQL 的方式来开发和运行一个 Flink 数据同步任务,用户也可以通过编写代码的方式将 Table-api 转为 Stream-api,并对流式数据消息进行自定义处理。
  工具的具体配置与说明请参考用户手册。

环境要求

Jdk 版本:1.8.0
Flink 版本:1.17.0
Flink CDC 版本:2.4.1
Dm8 版本:dm8 版本号不能小于 8.1.3.77

下载地址

详细内容见 flink-cdc-dm2.4.1.241128

评论
后发表回复
键盘舞者
您好,我在ARM版本的数据库上测试这个包有一个问题 LogMinerEventRow 中获取到的eventType有等于空的情况,然后导致AbstractLogMinerEventProcessor processRow的方法中switch时报空指针导致数据不能处理 数据库版本: DM Database Server 64 V8 ,8.4 ,企业版 11:10:24.528[ERROR]Mining session stopped due to the java.lang.NullPointerException: Cannot invoke "io.debezium.connector.dm.logminer.events.EventType.ordinal()" because the return value of "io.debezium.connector.dm.logminer.events.LogMinerEventRow.getEventType()" is null 11:10:24.550[ERROR]Producer failure java.lang.NullPointerException: Cannot invoke "io.debezium.connector.dm.logminer.events.EventType.ordinal()" because the return value of "io.debezium.connector.dm.logminer.events.LogMinerEventRow.getEventType()" is null at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:349) ~[flink-connector-dm-cdc-2.4.1.jar:?] at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:310) ~[flink-connector-dm-cdc-2.4.1.jar:?] at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:244) ~[flink-connector-dm-cdc-2.4.1.jar:?] at io.debezium.connector.dm.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:221) ~[flink-connector-dm-cdc-2.4.1.jar:?] at com.ververica.cdc.connectors.dm.source.reader.fetch.DMStreamFetchTask$RedoLogSplitReadTask.execute(DMStreamFetchTask.java:141) ~[flink-connector-dm-cdc-2.4.1.jar:?] at com.ververica.cdc.connectors.dm.source.reader.fetch.DMStreamFetchTask.execute(DMStreamFetchTask.java:78) ~[flink-connector-dm-cdc-2.4.1.jar:?] at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ~[flink-cdc-base-2.4.1.jar:2.4.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.lang.Thread.run(Thread.java:1583) [?:?] 11:10:24.557[INFO ]startScn=114150495, endScn=114150528 发生这个异常时,LogMinerEventRow initializeFromResultSet 方法中获取的tableName=null,operation= seq modify , redosql=14005
发布于 6天前
探月
Flink SQL> select * from dm_source_cdc; [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.connect.data.Schema 创建表dm_source_cdc成功,查询报错。是不是需要kafka相关的驱动? 如果在flink集群环境中运行,需上传flinkcdc所需依赖包(测试程序目录testflinkcdc\lib和testflinkcdc\target\flink-cdc-test\flink-cdc-test\lib下的jar) 但是,testflinkcdc\target\flink-cdc-test\flink-cdc-test\lib路径不存在,是不是缺少该路径下的jar包导致的?
发布于 7天前
探月
各位大佬,请教一下,支持达梦同步到kafka吗?另外,有没有人在生产环境稳定运行的?
发布于 2025/04/01 09:00
夏洪
源表注册2个表,目标表注册2个表,再开启2个CDC日志捕获 tableEnv.executeSql("insert into TEST_SINK select * from TEST_SOURCE "); tableEnv.executeSql("insert into TEST_SINK1 select * from TEST_SOURCE1 "); 第二任务报错
发布于 2025/03/17 01:28
夏洪
谢谢,单个表同步没有问题,请问源端有2个表或多个表要怎么处理??
发布于 2025/03/17 01:20
niceeee
请问使用flink sql client同步数据, 但是完全获取不到数据是怎么回事? 已经开启了归档模式而且RLOG_APPEND_LOGIC也设置成2了
发布于 2025/03/14 08:11
DM_001229
大佬,请问新版本还要多久发布呀?
发布于 2025/03/13 02:06
月半居士
运行起来会报这个错误: Caused by: com.alibaba.fastjson2.JSONException: offset 193, character ", line 1, column 193, fastjson-version 2.0.56 {"@type":"com.ververica.cdc.connectors.dm.model.DmDictionary","flushTableId":"1038","schemaDictMap":{"@type":"java.util.HashMap","150995944":"XXX"},"tableDictMap":{"@type":"java.util.HashMap","1024":[{"@type":"com.ververica.cdc.connectors.dm.model.TableDict","columnDict":["XXX","XXX","XXX","XXX","XXX","ENABLE","XXX","XXX","XXX","XXX","XXX","XXX],"lsn":46461639L,"objectId":"1024","schema":"DAM","tableName":"XXX"}]}} at com.alibaba.fastjson2.reader.ObjectReaderImplList.readObject(ObjectReaderImplList.java:647) ~[?:?] at com.alibaba.fastjson2.reader.ObjectReaderImplMapTyped.readObject(ObjectReaderImplMapTyped.java:408) ~[?:?] at com.alibaba.fastjson2.reader.ORG_1_3_DmDictionary.readObject(Unknown Source) ~[?:?] at com.alibaba.fastjson.JSON.parseObject(JSON.java:507) ~[?:?] at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.<init>(AbstractLogMinerEventProcessor.java:137) ~[flink-connector-dm-cdc-2.4.1.jar:?] at io.debezium.connector.dm.logminer.processor.memory.MemoryLogMinerEventProcessor.<init>(MemoryLogMinerEventProcessor.java:79) ~[flink-connector-dm-cdc-2.4.1.jar:?] at io.debezium.connector.dm.DMConnectorConfig$LogMiningBufferType$1.createProcessor(DMConnectorConfig.java:1260) ~[?:?] at io.debezium.connector.dm.logminer.LogMinerStreamingChangeEventSource.createProcessor(LogMinerStreamingChangeEventSource.java:259) ~[?:?] at io.debezium.connector.dm.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:191) ~[?:?] at com.ververica.cdc.connectors.dm.source.reader.fetch.DMStreamFetchTask$RedoLogSplitReadTask.execute(DMStreamFetchTask.java:141) ~[?:?] at com.ververica.cdc.connectors.dm.source.reader.fetch.DMStreamFetchTask.execute(DMStreamFetchTask.java:78) ~[?:?] at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] ... 1 more 简单排查了下,似乎是最开始转换DM_DICT为json的时候使用了SerializerFeature.WriteClassName,转换的isn long值后面带了L,后面又把转换为对象的时候报了这个错,不识别这个L。
发布于 2025/03/10 07:45
不是铅笔字
源代码可以开源吗?
发布于 2025/03/04 17:43
GS
大佬,用了你的flink-connector-dm-cdc的jar包可以正常抓取归档日志,并实现数据同步,但是如果数据表过长的话,达到100多个字段时,归档日志会分成两条数据进行存储,这样在parseUpdate的时候出错,请问应该如何对这种情况做兼容,让flink-connector-dm-cdc可以抓取完成的redo日志
发布于 2025/02/28 01:51
月半居士
请问下对flink1.20.1的支持版本啥时候可以发布呢?
发布于 2025/02/27 03:35
DM_121666
一个依赖包都不提供源代码?
发布于 2025/02/26 07:39
方斌
用datastream方式同步数据,发现跑不起来,报错。通过定位发现原因是:SELECT LOG_MODE FROM V$DATABASE这个语句执行报错,用了20231226和20250117这两个版本的数据库都不行,是bug吗?还是哪里配置不对?
发布于 2025/02/25 07:16
龙涛
启动FlinkDMCDC报错: java.lang.NoClassDefFoundError: org/apache/flink/api/java/typeutils/ResultTypeQueryable idea导入testflinkcdc后,仅删除了common-aop这个依赖。
发布于 2025/02/17 07:22
123
demo 运行时报 ClassNotFoundException: org.apache.flink.api.java.typeutils.ResultTypeQueryable 错误,请问缺少那个jar包
发布于 2025/02/17 00:48
DM_850040
目前有更新吗,有最新版本吗
发布于 2025/02/13 07:39
DM_850040
flinkcdc3.2.1版本支持吗
发布于 2025/02/13 03:41
斜落风雨
flink3.xx版本 支持 pipline的方式实现整库同步,咱这个支持整库同步吗。
发布于 2025/02/06 03:16
DM_026536
老师请教一下,配置了 'debezium.lob.enabled' = 'true' 这个用flink-sql也没正常同步text类型的字段啥原因呢。
发布于 2025/02/04 05:54
aol_aog
达梦数据CDC能支持debezium的connector吗?或者有没有开源的项目?
发布于 2025/01/13 05:10
DM_515915
能否使用DM8 Flink CDC实现达梦到达梦的数据实时同步
发布于 2024/12/27 02:03
DM_515915
请问这个版本的达梦可以用DM8 Flink CDC吗:1-3-140-2024.05.27-229704-20093-ENT 。 为什么dm8 版本号不能小于 8.1.3.77?
发布于 2024/12/27 01:58
迷思
根据示例程序只能捕获全量数据,无法捕获增量数据
发布于 2024/12/25 09:45
山山而川丶
记录一下
发布于 2024/12/17 15:01
DM_915264
如果数据库执行了truncate table 操作,程序直接报错并结束执行
发布于 2024/12/17 07:37
LeeWen
<dependency> <groupId>com.dameng.common</groupId> <artifactId>common-aop</artifactId> <version>5.0.0</version> </dependency> 包里面是不是少了这个依赖
发布于 2024/12/05 03:02

作者

文章

阅读量

获赞

扫一扫
联系客服