为提高效率,提问时请提供以下信息,问题描述清晰可优先响应。
【DM版本】:dm8
【操作系统】:kylinV10
【开发环境】: flink1.16.2 + dm8 + flink-connector-dm-cdc-2.4.1
【问题描述】*:
参考链接:
1、Dm8_flink_cdc连接器使用:https://eco.dameng.com/community/training/bb49aab895c5971f540faf822ffad9ef
2、DM8 Flink CDC 连接器工具下载:
https://eco.dameng.com/community/post/202411191132168PD86B201G3HYB7NK5
3、flink-connector-dm-cdc问题请教:
https://eco.dameng.com/community/question/fbd0eeb3fb60ecc9c62d0e7b6939e2a4
4、通过flink的datastream读取单表数据,如果表数据出现变更, 会报空指针问题,稳定复现, 这个是什么问题导致的,怎么能解决
5、测试代码
/*
* @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17
*
* Copyright (c) 2000-2020, 达梦数据库有限公司.
* All rights reserved.
*/
package com.jsl.dm;
import com.jsl.utils.DmDateConverter;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.dm.source.DMSourceBuilder;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
*
* Created by wuxin on 2023年8月5日 上午10:33:17
*/
public class FlinkDmCdc1
{
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("database.tablename.case.insensitive", "false");
properties.setProperty("log.mining.strategy", "online_catalog");
properties.setProperty("log.mining.continuous.mine", "true");
properties.setProperty("lob.enabled", "true");
properties.putAll(DmDateConverter.DEFAULT_PROPS);
JdbcIncrementalSource<String> changeEventSource =
new DMSourceBuilder<String>()
.hostname("10.1.3.77")
.port(5237)
.databaseList("CMS")
.tableList("CMS.BIGDATA_KEEP_ALIVE")
.schemaList("CMS")
.username("SMU")
.password("12345678")
.startupOptions(StartupOptions.initial())
.dmProperties(properties)
.includeSchemaChanges(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.sliceSize(20)
.build();
Configuration configuration = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(10*60*1000);
env.fromSource(changeEventSource, WatermarkStrategy.noWatermarks(), "DmSource")
.setParallelism(1)
.print()
.setParallelism(1);
env.execute();
}
}
6、日志报错信息
2025-09-30 08:00:20,770 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: DmParallelSource -> Map -> Sink: Unnamed (1/1)#1 (801e954cff1b54acb9e93d7fc8044ff9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
at io.debezium.connector.dm.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:271)
at com.ververica.cdc.connectors.dm.source.reader.fetch.DMStreamFetchTask$RedoLogSplitReadTask.execute(DMStreamFetchTask.java:141)
at com.ververica.cdc.connectors.dm.source.reader.fetch.DMStreamFetchTask.execute(DMStreamFetchTask.java:78)
at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
... 5 more
Caused by: java.lang.NullPointerException
at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:349)
at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:310)
at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:244)
at io.debezium.connector.dm.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:259)
... 8 more

请问您的问题解决了吗,我也遇到了类似的问题