注册

flink-connector-dm-cdc使用bug

BelieF 2025/10/10 329 3

为提高效率,提问时请提供以下信息,问题描述清晰可优先响应。
【DM版本】:dm8
【操作系统】:kylinV10
【开发环境】: flink1.16.2 + dm8 + flink-connector-dm-cdc-2.4.1
【问题描述】*:
参考链接:
1、Dm8_flink_cdc连接器使用:https://eco.dameng.com/community/training/bb49aab895c5971f540faf822ffad9ef
2、DM8 Flink CDC 连接器工具下载:
https://eco.dameng.com/community/post/202411191132168PD86B201G3HYB7NK5
3、flink-connector-dm-cdc问题请教:
https://eco.dameng.com/community/question/fbd0eeb3fb60ecc9c62d0e7b6939e2a4
4、通过flink的datastream读取单表数据,如果表数据出现变更, 会报空指针问题,稳定复现, 这个是什么问题导致的,怎么能解决

image.png

5、测试代码

/* * @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17 * * Copyright (c) 2000-2020, 达梦数据库有限公司. * All rights reserved. */ package com.jsl.dm; import com.jsl.utils.DmDateConverter; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; import com.ververica.cdc.connectors.dm.source.DMSourceBuilder; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Properties; /** * * Created by wuxin on 2023年8月5日 上午10:33:17 */ public class FlinkDmCdc1 { public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.setProperty("database.tablename.case.insensitive", "false"); properties.setProperty("log.mining.strategy", "online_catalog"); properties.setProperty("log.mining.continuous.mine", "true"); properties.setProperty("lob.enabled", "true"); properties.putAll(DmDateConverter.DEFAULT_PROPS); JdbcIncrementalSource<String> changeEventSource = new DMSourceBuilder<String>() .hostname("10.1.3.77") .port(5237) .databaseList("CMS") .tableList("CMS.BIGDATA_KEEP_ALIVE") .schemaList("CMS") .username("SMU") .password("12345678") .startupOptions(StartupOptions.initial()) .dmProperties(properties) .includeSchemaChanges(true) .deserializer(new JsonDebeziumDeserializationSchema()) .sliceSize(20) .build(); Configuration configuration = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.enableCheckpointing(10*60*1000); env.fromSource(changeEventSource, WatermarkStrategy.noWatermarks(), "DmSource") .setParallelism(1) .print() .setParallelism(1); env.execute(); } }

6、日志报错信息

2025-09-30 08:00:20,770 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: DmParallelSource -> Map -> Sink: Unnamed (1/1)#1 (801e954cff1b54acb9e93d7fc8044ff9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
	at io.debezium.connector.dm.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:271)
	at com.ververica.cdc.connectors.dm.source.reader.fetch.DMStreamFetchTask$RedoLogSplitReadTask.execute(DMStreamFetchTask.java:141)
	at com.ververica.cdc.connectors.dm.source.reader.fetch.DMStreamFetchTask.execute(DMStreamFetchTask.java:78)
	at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
	... 5 more
Caused by: java.lang.NullPointerException
	at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:349)
	at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:310)
	at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:244)
	at io.debezium.connector.dm.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:259)
	... 8 more
回答 0
暂无回答
扫一扫
联系客服