注册
达梦数据库支持Seata事务框架(具体操作)
技术分享/ 文章详情 /

达梦数据库支持Seata事务框架(具体操作)

DM_sms 2022/03/11 4377 6 4

一、用例说明

用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

  1. 仓储服务:对给定的商品扣除仓储数量。
  2. 订单服务:根据采购需求创建订单。
  3. 帐户服务:从用户帐户中扣除余额。

1.1 项目的架构图

image.png

1.2 初始项目搭建

1.2.1 环境介绍

每个模块一个库,也就是需要 4 个库。

模 块名称 项目端口号 数据库端口号
business-xa 8084 5237
storage-xa 8081 5238
order-xa 8082 5239
account-xa 8083 5240

1.2.2 初始表数据

-- 1. business-xa模块所在数据库 新建BIZ_LOG表 
CREATE TABLE "SYSDBA"."BIZ_LOG" 
(
"ID_" VARCHAR(64) NOT NULL, 
"OP_DATETIME" DATETIME(6) DEFAULT CURRENT_TIMESTAMP, 
CLUSTER PRIMARY KEY("ID_")) STORAGE(ON "MAIN", CLUSTERBTR) ; 

-- 2. storage-xa模块所在数据库 新建STORAGE_TBL表并新增一条数据 
CREATE TABLE "SYSDBA"."STORAGE_TBL" 
(
"ID" INT NOT NULL, 
"COMMODITY_CODE" VARCHAR(255) DEFAULT NULL, 
"COUNT" INT DEFAULT '0', 
CLUSTER PRIMARY KEY("ID"), 
CONSTRAINT "STORAGE_TBL_COMMODITY_CODE" UNIQUE("COMMODITY_CODE")) STORAGE(ON "MAIN", CLUSTERBTR) ; 
insert into "SYSDBA"."STORAGE_TBL"("ID", "COMMODITY_CODE", "COUNT") VALUES(1,'C100000','10000'); 

-- 3. order-xa模块所在数据库 新建ORDER_TBL表 
CREATE TABLE "SYSDBA"."ORDER_TBL" 
(
"ID" BIGINT NOT NULL, 
"USER_ID" VARCHAR(255) DEFAULT NULL, 
"COMMODITY_CODE" VARCHAR(255) DEFAULT NULL, 
"COUNT" INT DEFAULT '0', 
"MONEY" INT DEFAULT '0', 
"CREATE_TIME" DATETIME(6) DEFAULT CURRENT_TIMESTAMP, 
CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ; 

-- 4. account-xa模块所在数据库 新建ACCOUNT_TBL表并新增一条数据 
CREATE TABLE "SYSDBA"."ACCOUNT_TBL" 
(
"ID" INT NOT NULL, 
"USER_ID" VARCHAR(255) DEFAULT NULL, 
"MONEY" INT DEFAULT '0', 
CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ; 
insert into "SYSDBA"."ACCOUNT_TBL"("ID", "USER_ID", "MONEY") VALUES(1, 'U100000', 600);

1.2.3 搭建项目

  1. 启动 seata-xa-original 项目;
  2. 配置项目中每个模块连接数据库的连接在 application.properties 文件中。
#DM 
spring.datasource.url=jdbc:dm://127.0.0.1:5238/ 
spring.datasource.driver-class-name=dm.jdbc.driver.DmDriver
spring.datasource.username=SYSDBA
spring.datasource.password=SYSDBA

1.2.4 存在的问题

每个模块在不同的库,没法保证事务的一致性。所以打算采用Seata分布式事务框架。
资料包中seata-xa-original.zip为这块的代码包。

二、DM 数据库支持 Seata 事务

2.1 流程分析

因 Seata 事务框架的 AT 模式还不支持 Dm 数据库,但支持 Oracle数据库,所以整合过程中需修改数据库添加对 Oracle 的支持。

image.png

2.2 修改 DM 数据库的配置

2.2.1 更新 jdbc 驱动

项目中使用 dm 的 jdbc 版本看图。不建议使用的 jdbc 版本比这个版本低。

image.png

2.2.2 修改 dm.svc.conf 配置文件

## 添加下面这两个属性 第一个是兼容oracle,第二个是屏蔽关键字 
COMPATIBLE_MODE=(oracle) 
KEY_WORDS=(context)

image.png

dm.svc.conf 文件的路径看如下官方文档说明:
image.png

2.2.3 修改 dm.ini 文件

所有数据库实例修都需要改。

COMPATIBLE_MODE = 2 ## Server compatible mode, 0:none, 1:SQL92, 2:Oracle, 3:MS SQL Server, 4:MySQL, 5:DM6, 6:Teradata

以上配置修改完以后,需要重启数据库。

2.3 搭建 TC 端

介绍了 seata 事务的三个模块:TC(事务协调器)、TM(事务管理器)和RM(资源管理器),其中 TM 和 RM 是嵌⼊在业务应⽤中的,而 TC 则是⼀个独⽴服务。

2.3.1 下载 Server 端

最新版本下载地址:https://www.github.com/seata/seata/releases

  1. 官网下载:1.3.0版本的下载地址:https://www.github.com/seata/seata/releases/tag/v1.3.0
  2. 在资料包种已经下载好了 seata-server-1.3.0.zip 解压即可。

image.png

2.3.2 配置 Server 端

Server 端存储模式(store.mode)现有 file、db、redis 三种(后续将引入raft,mongodb),file 模式无需改动,直接启动即可。

  • file 模式为单机模式,全局事务会话信息内存中读写并持久化本地文件bin目录下的root.data,性能较高;
  • db模式为高可用模式,全局事务会话信息通过db共享,相应性能差些;
  • redis模式Seata-Server 1.3及以上版本支持,性能较高,存在事务信息丢失风险,请提前配置合适当前场景的redis持久化配置。

File 模式直连配置
主要关注 conf 文件夹下的 registry.conf 文件以及 file.conf 文件。
采用 File 直连模式 registry.conf 文件无需改动,需要在 file.conf 中添加事务分组。

  • registry.conf
registry
{ 
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa 
  type = "file" 
  file { 
        name = "file.conf" 
        } 
}
config 
{ 
  # file、nacos 、apollo、zk、consul、etcd3 
  type = "file" 
  file { 
        name = "file.conf" 
        } 
}
  • file.conf

在 server 属性中新增这段值 vgroup_mapping.seata-xa=“default”。即添加事务分组。需与 TM/RM 端配置的一致。

transport { 
  # tcp udt unix-domain-socket 
  type = "TCP" 
  #NIO NATIVE 
  server = "NIO" 
  #enable heartbeat 
  heartbeat = true 
  # the client batch send request enable 
  enableClientBatchSendRequest = false 
  #thread factory for netty 
  threadFactory { 
    bossThreadPrefix = "NettyBoss" 
    workerThreadPrefix = "NettyServerNIOWorker" 
    serverExecutorThreadPrefix = "NettyServerBizHandler" 
    shareBossWorker = false 
    clientSelectorThreadPrefix = "NettyClientSelector" 
    clientSelectorThreadSize = 1 
    clientWorkerThreadPrefix = "NettyClientWorkerThread" 
    # netty boss thread size,will not be used for UDT 
    bossThreadSize = 1 
    #auto default pin or 8 
    workerThreadSize = "default" 
  }
  shutdown { 
    # when destroy server, wait seconds 
    wait = 3 
  }
  serialization = "seata" 
  compressor = "none" 
}

## transaction log store, only used in server side 
store { 
  ## store mode: file、db 
  mode = "file" 
  ## file store property 
  file { 
    ## store location dir 
    dir = "sessionStore" 
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions 
    maxBranchSessionSize = 16384 
    # globe session size , if exceeded throws exceptions 
    maxGlobalSessionSize = 512 
    # file buffer size , if exceeded allocate new buffer 
    fileWriteBufferCacheSize = 16384 
    # when recover batch read size 
    sessionReloadReadSize = 100 
    # async, sync 
    flushDiskMode = async 
  } 

}
## server configuration, only used in server side
server { 
  recovery { 
    #schedule committing retry period in milliseconds 
    committingRetryPeriod = 1000 
    #schedule asyn committing retry period in milliseconds 
    asynCommittingRetryPeriod = 1000 
    #schedule rollbacking retry period in milliseconds 
    rollbackingRetryPeriod = 1000 
    #schedule timeout retry period in milliseconds 
    timeoutRetryPeriod = 1000 
  }
  undo { 
    logSaveDays = 7 
    #schedule delete expired undo_log in milliseconds 
    logDeletePeriod = 86400000 
  }
  #check auth 
  enableCheckAuth = true 
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent 
  maxCommitRetryTimeout = "-1" 
  maxRollbackRetryTimeout = "-1" 
  rollbackRetryTimeoutUnlockEnable = false 
  # 新增的这段,设置事务分组 
  vgroup_mapping.seata-xa="default" 
}

## metrics configuration, only used in server side 
metrics { 
  enabled = false 
  registryType = "compact" 
  # multi exporters use comma divided 
  exporterList = "prometheus" 
  exporterPrometheusPort = 9898 
}

2.4 TM/RM 端整合 Seata

Seata 事务框架在 AT 模式下在 RM 端需要 UNDO_LOG 表 ,来记录每个RM的事务信息,主要包含数据修改前,后的相关信息,⽤于回滚处理,所以在所有数据库中分别执行。

CREATE TABLE "SYSDBA"."UNDO_LOG" 
(
"ID" BIGINT NOT NULL, 
"BRANCH_ID" BIGINT NOT NULL, 
"XID" VARCHAR(100) NOT NULL, 
"CONTEXT" VARCHAR(150) NOT NULL, 
"ROLLBACK_INFO" BLOB NOT NULL, 
"LOG_STATUS" INT NOT NULL, 
"LOG_CREATED" DATETIME(6) NOT NULL,
"LOG_MODIFIED" DATETIME(6) NOT NULL, 
NOT CLUSTER PRIMARY KEY("ID"),
CONSTRAINT "UX_UNDO_LOG" UNIQUE("XID", "BRANCH_ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;

RM(事务管理器)端整合 Seata 与 TM(事务管理器)端步骤类似,只不过不需要在⽅法添加@GlobalTransactional注解,针对我们⼯程 lagou_bussiness 是事务的发起者,所以是 TM 端,其它⼯程为 RM 端,所以我们只需要在 lagou_common_db 完成前 4 步即可。

2.4.1 引入 seata 依赖

  1. 修改父 pom.xml 文件,锁定 seata 的版本
<dependencyManagement> 
  <dependencies> 
    <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>Greenwich.RELEASE</version> 
      <type>pom</type> 
      <scope>import</scope>
    </dependency> 
    <!--SCA --> 
    <dependency> 
      <groupId>com.alibaba.cloud</groupId> 
      <artifactId>spring-cloud-alibaba-dependencies</artifactId> 
      <version>2.1.0.RELEASE</version> 
      <type>pom</type> 
      <scope>import</scope> 
    </dependency> 

    <dependency>
      <groupId>com.alibaba</groupId> 
      <artifactId>druid-spring-boot-starter</artifactId> 
      <version>1.1.22</version> 
    </dependency> 
    <!--seata版本管理, ⽤于锁定⾼版本的seata --> 
    <dependency> 
      <groupId>io.seata</groupId> 
      <artifactId>seata-all</artifactId> 
      <version>1.3.0</version> 
    </dependency> 

  </dependencies> 
</dependencyManagement>
  1. 修改每个模块的 pom.xml 文件引入 seata 的依赖
    因为原来的是旧版本,所以需要引入新的 seata 的版本依赖。

image.png

<!--seata依赖--> 
<dependency> 
  <groupId>com.alibaba.cloud</groupId> 
  <artifactId>spring-cloud-alibaba-seata</artifactId> 
  <!--排除低版本seata依赖--> 
  <exclusions> 
    <exclusion> 
      <groupId>io.seata</groupId> 
      <artifactId>seata-all</artifactId> 
    </exclusion> 
  </exclusions> 
</dependency> 
<!--添加⾼版本seata依赖--> 
<dependency> 
  <groupId>io.seata</groupId> 
  <artifactId>seata-all</artifactId> 
  <version>1.3.0</version> 
</dependency>

2.4.2 引入注册中心文件

每个模块的resources目录下引入 Seata 事务 Client 客户端的 registry.conf 文件。又因为注册中心采用的直连模式,所以还需要引入 file.conf 。
这两个文件可以参考资料包。如下图:
image.png

2.4.3 配置连接事务组

新增每个模块的事务组,在每个项目的 application.properties 中添加以下信息:

spring.cloud.alibaba.seata.txServiceGroup=seata-xa 
logging.level.io.seata=debug 
logging.level.io.seata.core.rpc=warn

image.png

2.4.4 修改数据源 url 兼容 Oracle

每个模块兼容 Oracle 数据库,所以需要修改数据源的 Url 连接。
修改每个模块的 url 连接。下面是一个例子,注意端口号:

spring.datasource.url=jdbc:oracle:thin:@localhost:5237 spring.datasource.driverClassName=dm.jdbc.driver.DmDriver 
spring.datasource.username=SYSDBA 
spring.datasource.password=SYSDBA

2.4.5 添加 seata 代理数据源

seata 事务框架的 AT模式需要操作数据源,所以我们把数据源对象代理给 seata 框架。

  1. 在每个模块** Application 启动类***同目录下新建数据源对象,
  2. 修改每个模块的 Application 类的扫描。
    下面是修改 Storage 模块的例子:
@Configuration 
public class StorageDataSourceConfiguration {
   /**
    * 使⽤druid连接池 
    *
    * @return 
    */ 
  @Bean 
  @ConfigurationProperties(prefix = "spring.datasource") 
  public DataSource druidDataSource() { 
    return new DruidDataSource(); 
}

  /**
   * 设置数据源代理-,完成分⽀事务注册/事务提交与回滚等操作 
   *
   * @param druidDataSource 
   * @return 
   */ 
  @Primary //设置⾸选数据源对象 
  @Bean("dataSource") 
  public DataSourceProxy dataSource(DataSource druidDataSource) { 
    return new DataSourceProxy(druidDataSource); 
  } 
}
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class, 
    scanBasePackages = "com.dameng") 
public class StorageXAApplication { 
    public static void main(String[] args) { 
      SpringApplication.run(StorageXAApplication.class, args); 
    } 
}

2.4.6 修改 seata 源码对达梦的兼容

  1. 在每个模块中配置新建目录 com.dameng.rm.datasource.util 以及 XAUtils 类。
  2. 修改每个模块的启动类,让其启动项目时替换掉源代码包中的 XAUtils 类。
package com.dameng.rm.datasource.util;

import com.ali baba.drui d.util.JdbcUtils;
import com.ali baba.drui d.util.MySqlutils;
import com.alibaba.druid.util.PGUtils;
import io.seata.rm.BaseDataSourceResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import java.lang.reflect.Constructor;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;

public class XAUtils { 

    private static final Logger LOGGER = LoggerFactory.getLogger(XAUtils.class);
    public static String getDbType(Stri ng jdbcUrl, String driverClassName) {
        return JdbcUtils.getDbType(jdbcUrl, driverClassName);
    } 

    public static XAConnection createXAConnection(Connection physicalConn, BaseDataSourceResource dataSourceResource) throws SQLException {
        return createXAConnection(physicalConn, dataSourceResource.getDriver(), dataSourceResource.getDbType());
        } 

    public static XAConnection createXAConnection(Connection physicalConn, Driver driver, String dbType) throws SQLException { 
        if (JdbcUtils.ORACLE.equals(dbType)) {
            try {
                // https://www.github.com/alibaba/druid/issues/3707
                // before Druid issue fixed, just make ORACLE XA connection in my way.
                // return OracleUtils.OracleXAConnection(physicalConn); 
                String physicalConnClassName =physicalConn.getClass().getName();
        if ("oracle.jdbc.driver.T4CConnection".equals(physi calConnClassName)) {
                return createOracleXAConnection(physicalConn, "oracle.jdbc.driver.T4CXAConnection");
                }
        else
                {
                return createOracleXAConnection(physicalConn, "oracle.jdbc.xa.client.OracleXAConnection");
                } } catch (XAException xae) { throw new SQLException("create xaConnection error", xae);
                } } i f (JdbcUtils.DM.equals(dbType)) { try {
                // String physicalConnClassName =
                physicalConn.getClass().getName();                                // if
                ("dm.jdbc.driver.DmdbConnection".equals(physicalConnClassName)) { // return createDMXAConnection(physi calConn,
                "dm.jdbc.driver.DmdbConnection");                                 // } else {
// return createDMXAConnection(physi calconn,
"dm.jdbc.driver.DmdbXAConnection"); // }
return createDMXAConnection(physi calConn, "dm.jdbc.driver.DmdbXAConnection");
} catch (XAException xae) { throw new SQLException("create xaConnection error", xae);
} }
if (JdbcUtils.MYSQL.equals(dbType) || JdbcUtils.MARIADB.equals(dbType)) {
        return MySqlUtils.createXAConnection(driver, physi calConn);
        }
        if (JdbcUtils.POSTGRESQL.equals(dbType)) {
                return PGUtils.c reateXAConnecti on
                (
                        physi cal Conn
                )
                ;
                } throw new SQLException("xa not support dbType: " + dbType);
                } private static XAConnection createOracleXAConnection(Connection physi calConnecti on, Stri ng xaConnecti onClassName) throws XAException, SQLException { try { Class xaConnecti onClass = Class.forName(xaConnectionClassName);
                Constructor                                                                                                                                                                               <XAConnection> constructor = xaConnecti onClass.getConstructor(Connection.class);
                constructor.setAccessible(true);
                return constructor.newInstance(physi calConnection);
                } catch (
        Exception
                e) { LOGGER.wa rn("Failed to create Oracle XA Connection " + xaConnectionClassName + " on " + physicalConnection);
                if (e instanceof XAException) { throw (XAException) e;
                        }
                else
                        { throw new SQLException(e);
                        } } } private stati c XAConnection createDMXAConnecti on
                        (
                                Connecti on physicalConnection, String xaConnectionClassName
                        )
                        throws XAException, SQLException { try { Class xaConnectionClass = Class.forName(xaConnectionClassName);
                        Constructor                                                      <XAConnection> constructor = xaConnectionClass.getConstructor(Connection.class);
                        constructor.setAccessible(true);
                        return constructor.newInstance(physicalConnection);
                        } catch (
                Exception
                        e) { LOGGER.wa rn("Failed to create DM XA Connection " + xaConnectionClassName + " on " + physicalConnection);
                        if (e instanceof XAException) { throw (XAException) e;
                                }
                        else
                                { throw new SQLException(e);
                                } 
                        }
            } 
}

在启动类上使用 @ComponentScan 注解,使其项目启动加载时使用我们本机修改的类。

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class, 
      scanBasePackages = "com.dameng") 
@ComponentScan(excludeFilters = { 
      @ComponentScan.Filter(type = 
             FilterType.ASSIGNABLE_TYPE, classes = { 
             XAUtils.class})}) 

public class StorageXAApplication { 
      public static void main(String[] args) { 
            SpringApplication.run(StorageXAApplication.class, args); 
      } 
}

2.4.7 添加注解@GlobalTransactional

Business 为 Seata 事务的 TM,所以在方法上添加 @GlobalTransactional 注解。

@GlobalTransactional(name = "sale", timeoutMills = 100000, rollbackFor = Exception.class) 
public void execWork(String USER_ID, String Storage_Code, Integer orderCount) { 
    //记录本地事务 
    int update = jdbcTemplate.update("insert into BIZ_LOG(id_) values(?)", UUID.randomUUID().toString()); 
    //扣减商品库存 
    String storageResult = storageFeignClient.consumeStorage(Storage_Code, orderCount); 
    if (FAIL.equals(storageResult)) { 
        throw new RuntimeException("商品报错回滚..."); 
    }
    //扣减订单库存 
    String orderResult = orderFeignClient.createOrder(USER_ID, Storage_Code, orderCount); 
    if (FAIL.equals(orderResult)) { 
        throw new RuntimeException("订单报错回滚..."); 
    } 
}

2.5 启动项目

2.5.1 启动 Seata 服务(TC 端)

进入到 seata 的 bin 目录 seata\bin 下管理员执行 seata-server.bat 文件即可。
服务启动后默认端口为 8091。

注意:

  1. 因为采用直连模式,会在 bin 目录下生成 sessionStore 文件,每次启动前建议删除。
  2. 有时候客户端会卡住,按一下回车键刷新下日志就好了。

image.png

2.5.2 启动 business-xa 服务(TM端)

启动 com.dameng.sample.BusinessXAApplication 服务。

2.5.3 启动 storage-xa 服务 (RM端)

启动 com.dameng.sample.StorageXAApplication 服务。

2.5.4 启动 order-xa 服务(RM端)

启动 com.dameng.sample.OrderXAApplication 服务。

2.5.5 启动 account-xa 服务(RM端)

启动 com.dameng.sample.AccountXAApplication 服务。

项目启动后,查看 seata 服务端日志,检查服务是否已经注册到 Seata 服务中。

image.png

2.5.6 测试

测试成功
在浏览器中输入 http://localhost:8084/execWork?orderCount=1,查看库存,订单,金额是否正常。

image.png

测试回滚
修改 order 模块中 service 代码如图,使代码报异常。
image.png

image.png

资料包中 seata-xa-final.zip 为整合以后的包。

三、问题整理

3.1 endpoint format should like ip:port

java.lang.IllegalArgumentException: endpoint format should like ip:port 
at 
io.seata.discovery.registry.FileRegistryServiceImpl.lookup(FileRegistryServiceIm pl.java:95) ~[seata-all-1.3.0.jar:1.3.0] 
at 
io.seata.core.rpc.netty.NettyClientChannelManager.getAvailServerList(NettyClient ChannelManager.java:217) ~[seata-all-1.3.0.jar:1.3.0] 
at
 io.seata.core.rpc.netty.NettyClientChannelManager.reconnect(NettyClientChannelMa nager.java:162) ~[seata-all-1.3.0.jar:1.3.0] 
at
io.seata.core.rpc.netty.RmNettyRemotingClient.registerResource(RmNettyRemotingCl ient.java:181) [seata-all-1.3.0.jar:1.3.0] 
at 
io.seata.rm.AbstractResourceManager.registerResource(AbstractResourceManager.jav a:121) [seata-all-1.3.0.jar:1.3.0] 
at 
io.seata.rm.datasource.DataSourceManager.registerResource(DataSourceManager.java :146) [seata-all-1.3.0.jar:1.3.0]

解决办法:

  1. seata(TC端)事务组与java模块(RM端)事务组不同需自己检查。
  2. seata(TC端)的file.conf文件的server属性 vgroupMapping 配置名有问题自行检查修改。

3.2 io.seata.core.exception.TmTransactionException:RPC timeout

image.png

解决办法:

  1. 即便已经按照顺序启动,seata也提示注册。因为电脑内存等原因实际情况还是没有注册上。重新启动一遍服务即可。
  2. seata配置有问题,seata控制台可能都没有注册上。检查seata的配置文件以及该服务的配置是否正确。

3.3 项目的坑点(重点项目无法运行)

  1. 项目是采用 jdbcTemplate 方式,对 sql 语句有严格的要求。sql 语句一旦有";"会导致回滚数据异常。
  2. 表用的是自增字段,sql语句就不要出现这个字段,会导致回滚数据异常。

四、资料包

image.png

-- 查询blob字段的值 
select utl_raw.cast_to_varchar2(dbms_lob.substr(ROLLBACK_INFO)) from "SYSDBA"."UNDO_LOG";
评论
后发表回复

作者

文章

阅读量

获赞

扫一扫
联系客服