注册

flinkcdc连接工具使用报错问题,需要达梦支持下,谢谢

DM_915264 2024/12/11 428 1

为提高效率,提问时请提供以下信息,问题描述清晰可优先响应。
【DM版本】:达梦8,DM Database Server 64 V8
DB Version: 0x7000c
03134284172-20240321-222308-20093
【操作系统】:麒麟v10
【CPU】:
【问题描述】*:
之前达梦团队在11月份提供的flink cdc 工具包,
在(6)参考示例(Stream-api方式)中,遇到以下问题
一、StartupOptions 不支持earliest()

startupOptions(StartupOptions.earliest())
Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported startup mode: EARLIEST_OFFSET at com.ververica.cdc.connectors.base.config.JdbcSourceConfigFactory.startupOptions(JdbcSourceConfigFactory.java:206) at com.ververica.cdc.connectors.dm.source.DMSourceBuilder.startupOptions(DMSourceBuilder.java:198) at com.dameng.flinkcdc.dm.FlinkDMCDC2.main(FlinkDMCDC2.java:69)

二、挖掘日志报错,推测是sql太长导致日志截断或者代码层面截断

/* * @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17 * * Copyright (c) 2000-2020, 达梦数据库有限公司. * All rights reserved. */ package com.dameng.flinkcdc.dm; import com.dameng.flinkcdc.deserialize.CustomerDeserializationSchema; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; import com.ververica.cdc.connectors.dm.source.DMSourceBuilder; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Properties; /** * * Created by wuxin on 2023年8月5日 上午10:33:17 */ public class FlinkDMCDC2 { public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.setProperty("database.tablename.case.insensitive", "false"); properties.setProperty("log.mining.strategy", "offline_catalog"); properties.setProperty("lob.enabled", "true"); JdbcIncrementalSource<String> changeEventSource = new DMSourceBuilder<String>() .hostname("**") .port(5236) .databaseList("Dameng") .schemaList("test") .username("SYSDBA") .password("**") .startupOptions(StartupOptions.latest()) .dmProperties(properties) .includeSchemaChanges(true) .deserializer(new StringDebeziumDeserializationSchema()) .sliceSize(20) .build(); Configuration configuration = new Configuration(); env.fromSource(changeEventSource, WatermarkStrategy.noWatermarks(), "DmSource") .setParallelism(1) .print() .setParallelism(1); env.execute(); } }
Caused by: io.debezium.connector.dm.logminer.parser.DmlParserException: DML statement couldn't be parsed. Please open a Jira issue with the statement 'IS NULL AND "COL 45" IS NULL AND "COL 46" IS NULL AND "COL 47" IS NULL AND "COL 48" IS NULL AND "COL 49" IS NULL AND "COL 50" IS NULL AND "COL 51" = '0' AND "COL 52" = TIMESTAMP'2023-11-27 12:45:03' AND "COL 53" = TIMESTAMP'2023-11-27 14:18:56' AND "COL 54" IS NULL AND "COL 55" IS NULL AND "COL 56" IS NULL AND "COL 57" IS NULL AND "COL 58" IS NULL AND "COL 59" IS NULL AND "COL 60" IS NULL AND "COL 61" IS NULL AND "COL 62" = TIMESTAMP'2024-12-11 14:12:45.255';'. at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.parseDmlStatement(AbstractLogMinerEventProcessor.java:1196) at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.lambda$4(AbstractLogMinerEventProcessor.java:959) at io.debezium.connector.dm.logminer.processor.memory.MemoryLogMinerEventProcessor.addToTransaction(MemoryLogMinerEventProcessor.java:232) at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.handleDataEvent(AbstractLogMinerEventProcessor.java:955) at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:371) at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:310) at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:244) at io.debezium.connector.dm.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:221) ... 8 more Caused by: io.debezium.connector.dm.logminer.parser.DmlParserException: Failed to parse insert DML: 'IS NULL AND "COL 45" IS NULL AND "COL 46" IS NULL AND "COL 47" IS NULL AND "COL 48" IS NULL AND "COL 49" IS NULL AND "COL 50" IS NULL AND "COL 51" = '0' AND "COL 52" = TIMESTAMP'2023-11-27 12:45:03' AND "COL 53" = TIMESTAMP'2023-11-27 14:18:56' AND "COL 54" IS NULL AND "COL 55" IS NULL AND "COL 56" IS NULL AND "COL 57" IS NULL AND "COL 58" IS NULL AND "COL 59" IS NULL AND "COL 60" IS NULL AND "COL 61" IS NULL AND "COL 62" = TIMESTAMP'2024-12-11 14:12:45.255';' at io.debezium.connector.dm.logminer.parser.LogMinerDmlStrParser.parseInsert(LogMinerDmlStrParser.java:103) at io.debezium.connector.dm.logminer.parser.LogMinerDmlStrParser.parse(LogMinerDmlStrParser.java:67) at io.debezium.connector.dm.logminer.processor.AbstractLogMinerEventProcessor.parseDmlStatement(AbstractLogMinerEventProcessor.java:1188) ... 15 more Caused by: java.lang.NumberFormatException: For input string: "ULL AND "COL 46" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at com.ververica.cdc.connectors.dm.model.TableDict.parseColumnName(TableDict.java:121) at io.debezium.connector.dm.logminer.parser.LogMinerDmlStrParser.parseColumnListClause(LogMinerDmlStrParser.java:250) at io.debezium.connector.dm.logminer.parser.LogMinerDmlStrParser.parseInsert(LogMinerDmlStrParser.java:93) ... 17 more Process finished with exit code 1
回答 0
暂无回答
扫一扫
联系客服