用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:
每个模块一个库,也就是需要 4 个库。
模 块名称 | 项目端口号 | 数据库端口号 |
---|---|---|
business-xa | 8084 | 5237 |
storage-xa | 8081 | 5238 |
order-xa | 8082 | 5239 |
account-xa | 8083 | 5240 |
-- 1. business-xa模块所在数据库 新建BIZ_LOG表
CREATE TABLE "SYSDBA"."BIZ_LOG"
(
"ID_" VARCHAR(64) NOT NULL,
"OP_DATETIME" DATETIME(6) DEFAULT CURRENT_TIMESTAMP,
CLUSTER PRIMARY KEY("ID_")) STORAGE(ON "MAIN", CLUSTERBTR) ;
-- 2. storage-xa模块所在数据库 新建STORAGE_TBL表并新增一条数据
CREATE TABLE "SYSDBA"."STORAGE_TBL"
(
"ID" INT NOT NULL,
"COMMODITY_CODE" VARCHAR(255) DEFAULT NULL,
"COUNT" INT DEFAULT '0',
CLUSTER PRIMARY KEY("ID"),
CONSTRAINT "STORAGE_TBL_COMMODITY_CODE" UNIQUE("COMMODITY_CODE")) STORAGE(ON "MAIN", CLUSTERBTR) ;
insert into "SYSDBA"."STORAGE_TBL"("ID", "COMMODITY_CODE", "COUNT") VALUES(1,'C100000','10000');
-- 3. order-xa模块所在数据库 新建ORDER_TBL表
CREATE TABLE "SYSDBA"."ORDER_TBL"
(
"ID" BIGINT NOT NULL,
"USER_ID" VARCHAR(255) DEFAULT NULL,
"COMMODITY_CODE" VARCHAR(255) DEFAULT NULL,
"COUNT" INT DEFAULT '0',
"MONEY" INT DEFAULT '0',
"CREATE_TIME" DATETIME(6) DEFAULT CURRENT_TIMESTAMP,
CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;
-- 4. account-xa模块所在数据库 新建ACCOUNT_TBL表并新增一条数据
CREATE TABLE "SYSDBA"."ACCOUNT_TBL"
(
"ID" INT NOT NULL,
"USER_ID" VARCHAR(255) DEFAULT NULL,
"MONEY" INT DEFAULT '0',
CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;
insert into "SYSDBA"."ACCOUNT_TBL"("ID", "USER_ID", "MONEY") VALUES(1, 'U100000', 600);
#DM
spring.datasource.url=jdbc:dm://127.0.0.1:5238/
spring.datasource.driver-class-name=dm.jdbc.driver.DmDriver
spring.datasource.username=SYSDBA
spring.datasource.password=SYSDBA
每个模块在不同的库,没法保证事务的一致性。所以打算采用Seata分布式事务框架。
资料包中seata-xa-original.zip为这块的代码包。
因 Seata 事务框架的 AT 模式还不支持 Dm 数据库,但支持 Oracle数据库,所以整合过程中需修改数据库添加对 Oracle 的支持。
项目中使用 dm 的 jdbc 版本看图。不建议使用的 jdbc 版本比这个版本低。
## 添加下面这两个属性 第一个是兼容oracle,第二个是屏蔽关键字
COMPATIBLE_MODE=(oracle)
KEY_WORDS=(context)
dm.svc.conf 文件的路径看如下官方文档说明:
所有数据库实例修都需要改。
COMPATIBLE_MODE = 2 ## Server compatible mode, 0:none, 1:SQL92, 2:Oracle, 3:MS SQL Server, 4:MySQL, 5:DM6, 6:Teradata
以上配置修改完以后,需要重启数据库。
介绍了 seata 事务的三个模块:TC(事务协调器)、TM(事务管理器)和RM(资源管理器),其中 TM 和 RM 是嵌⼊在业务应⽤中的,而 TC 则是⼀个独⽴服务。
最新版本下载地址:https://www.github.com/seata/seata/releases
Server 端存储模式(store.mode)现有 file、db、redis 三种(后续将引入raft,mongodb),file 模式无需改动,直接启动即可。
File 模式直连配置
主要关注 conf 文件夹下的 registry.conf 文件以及 file.conf 文件。
采用 File 直连模式 registry.conf 文件无需改动,需要在 file.conf 中添加事务分组。
registry
{
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"
file {
name = "file.conf"
}
}
config
{
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
file {
name = "file.conf"
}
}
在 server 属性中新增这段值 vgroup_mapping.seata-xa=“default”。即添加事务分组。需与 TM/RM 端配置的一致。
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = false
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThreadPrefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
## transaction log store, only used in server side
store {
## store mode: file、db
mode = "file"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
}
## server configuration, only used in server side
server {
recovery {
#schedule committing retry period in milliseconds
committingRetryPeriod = 1000
#schedule asyn committing retry period in milliseconds
asynCommittingRetryPeriod = 1000
#schedule rollbacking retry period in milliseconds
rollbackingRetryPeriod = 1000
#schedule timeout retry period in milliseconds
timeoutRetryPeriod = 1000
}
undo {
logSaveDays = 7
#schedule delete expired undo_log in milliseconds
logDeletePeriod = 86400000
}
#check auth
enableCheckAuth = true
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
maxCommitRetryTimeout = "-1"
maxRollbackRetryTimeout = "-1"
rollbackRetryTimeoutUnlockEnable = false
# 新增的这段,设置事务分组
vgroup_mapping.seata-xa="default"
}
## metrics configuration, only used in server side
metrics {
enabled = false
registryType = "compact"
# multi exporters use comma divided
exporterList = "prometheus"
exporterPrometheusPort = 9898
}
Seata 事务框架在 AT 模式下在 RM 端需要 UNDO_LOG 表 ,来记录每个RM的事务信息,主要包含数据修改前,后的相关信息,⽤于回滚处理,所以在所有数据库中分别执行。
CREATE TABLE "SYSDBA"."UNDO_LOG"
(
"ID" BIGINT NOT NULL,
"BRANCH_ID" BIGINT NOT NULL,
"XID" VARCHAR(100) NOT NULL,
"CONTEXT" VARCHAR(150) NOT NULL,
"ROLLBACK_INFO" BLOB NOT NULL,
"LOG_STATUS" INT NOT NULL,
"LOG_CREATED" DATETIME(6) NOT NULL,
"LOG_MODIFIED" DATETIME(6) NOT NULL,
NOT CLUSTER PRIMARY KEY("ID"),
CONSTRAINT "UX_UNDO_LOG" UNIQUE("XID", "BRANCH_ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;
RM(事务管理器)端整合 Seata 与 TM(事务管理器)端步骤类似,只不过不需要在⽅法添加@GlobalTransactional注解,针对我们⼯程 lagou_bussiness 是事务的发起者,所以是 TM 端,其它⼯程为 RM 端,所以我们只需要在 lagou_common_db 完成前 4 步即可。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--SCA -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.22</version>
</dependency>
<!--seata版本管理, ⽤于锁定⾼版本的seata -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
</dependencyManagement>
<!--seata依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<!--排除低版本seata依赖-->
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--添加⾼版本seata依赖-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.3.0</version>
</dependency>
在每个模块的resources目录下引入 Seata 事务 Client 客户端的 registry.conf 文件。又因为注册中心采用的直连模式,所以还需要引入 file.conf 。
这两个文件可以参考资料包。如下图:
新增每个模块的事务组,在每个项目的 application.properties 中添加以下信息:
spring.cloud.alibaba.seata.txServiceGroup=seata-xa
logging.level.io.seata=debug
logging.level.io.seata.core.rpc=warn
每个模块兼容 Oracle 数据库,所以需要修改数据源的 Url 连接。
修改每个模块的 url 连接。下面是一个例子,注意端口号:
spring.datasource.url=jdbc:oracle:thin:@localhost:5237 spring.datasource.driverClassName=dm.jdbc.driver.DmDriver
spring.datasource.username=SYSDBA
spring.datasource.password=SYSDBA
seata 事务框架的 AT模式需要操作数据源,所以我们把数据源对象代理给 seata 框架。
@Configuration
public class StorageDataSourceConfiguration {
/**
* 使⽤druid连接池
*
* @return
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
/**
* 设置数据源代理-,完成分⽀事务注册/事务提交与回滚等操作
*
* @param druidDataSource
* @return
*/
@Primary //设置⾸选数据源对象
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class,
scanBasePackages = "com.dameng")
public class StorageXAApplication {
public static void main(String[] args) {
SpringApplication.run(StorageXAApplication.class, args);
}
}
package com.dameng.rm.datasource.util;
import com.ali baba.drui d.util.JdbcUtils;
import com.ali baba.drui d.util.MySqlutils;
import com.alibaba.druid.util.PGUtils;
import io.seata.rm.BaseDataSourceResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import java.lang.reflect.Constructor;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
public class XAUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(XAUtils.class);
public static String getDbType(Stri ng jdbcUrl, String driverClassName) {
return JdbcUtils.getDbType(jdbcUrl, driverClassName);
}
public static XAConnection createXAConnection(Connection physicalConn, BaseDataSourceResource dataSourceResource) throws SQLException {
return createXAConnection(physicalConn, dataSourceResource.getDriver(), dataSourceResource.getDbType());
}
public static XAConnection createXAConnection(Connection physicalConn, Driver driver, String dbType) throws SQLException {
if (JdbcUtils.ORACLE.equals(dbType)) {
try {
// https://www.github.com/alibaba/druid/issues/3707
// before Druid issue fixed, just make ORACLE XA connection in my way.
// return OracleUtils.OracleXAConnection(physicalConn);
String physicalConnClassName =physicalConn.getClass().getName();
if ("oracle.jdbc.driver.T4CConnection".equals(physi calConnClassName)) {
return createOracleXAConnection(physicalConn, "oracle.jdbc.driver.T4CXAConnection");
}
else
{
return createOracleXAConnection(physicalConn, "oracle.jdbc.xa.client.OracleXAConnection");
} } catch (XAException xae) { throw new SQLException("create xaConnection error", xae);
} } i f (JdbcUtils.DM.equals(dbType)) { try {
// String physicalConnClassName =
physicalConn.getClass().getName(); // if
("dm.jdbc.driver.DmdbConnection".equals(physicalConnClassName)) { // return createDMXAConnection(physi calConn,
"dm.jdbc.driver.DmdbConnection"); // } else {
// return createDMXAConnection(physi calconn,
"dm.jdbc.driver.DmdbXAConnection"); // }
return createDMXAConnection(physi calConn, "dm.jdbc.driver.DmdbXAConnection");
} catch (XAException xae) { throw new SQLException("create xaConnection error", xae);
} }
if (JdbcUtils.MYSQL.equals(dbType) || JdbcUtils.MARIADB.equals(dbType)) {
return MySqlUtils.createXAConnection(driver, physi calConn);
}
if (JdbcUtils.POSTGRESQL.equals(dbType)) {
return PGUtils.c reateXAConnecti on
(
physi cal Conn
)
;
} throw new SQLException("xa not support dbType: " + dbType);
} private static XAConnection createOracleXAConnection(Connection physi calConnecti on, Stri ng xaConnecti onClassName) throws XAException, SQLException { try { Class xaConnecti onClass = Class.forName(xaConnectionClassName);
Constructor <XAConnection> constructor = xaConnecti onClass.getConstructor(Connection.class);
constructor.setAccessible(true);
return constructor.newInstance(physi calConnection);
} catch (
Exception
e) { LOGGER.wa rn("Failed to create Oracle XA Connection " + xaConnectionClassName + " on " + physicalConnection);
if (e instanceof XAException) { throw (XAException) e;
}
else
{ throw new SQLException(e);
} } } private stati c XAConnection createDMXAConnecti on
(
Connecti on physicalConnection, String xaConnectionClassName
)
throws XAException, SQLException { try { Class xaConnectionClass = Class.forName(xaConnectionClassName);
Constructor <XAConnection> constructor = xaConnectionClass.getConstructor(Connection.class);
constructor.setAccessible(true);
return constructor.newInstance(physicalConnection);
} catch (
Exception
e) { LOGGER.wa rn("Failed to create DM XA Connection " + xaConnectionClassName + " on " + physicalConnection);
if (e instanceof XAException) { throw (XAException) e;
}
else
{ throw new SQLException(e);
}
}
}
}
在启动类上使用 @ComponentScan 注解,使其项目启动加载时使用我们本机修改的类。
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class,
scanBasePackages = "com.dameng")
@ComponentScan(excludeFilters = {
@ComponentScan.Filter(type =
FilterType.ASSIGNABLE_TYPE, classes = {
XAUtils.class})})
public class StorageXAApplication {
public static void main(String[] args) {
SpringApplication.run(StorageXAApplication.class, args);
}
}
Business 为 Seata 事务的 TM,所以在方法上添加 @GlobalTransactional 注解。
@GlobalTransactional(name = "sale", timeoutMills = 100000, rollbackFor = Exception.class)
public void execWork(String USER_ID, String Storage_Code, Integer orderCount) {
//记录本地事务
int update = jdbcTemplate.update("insert into BIZ_LOG(id_) values(?)", UUID.randomUUID().toString());
//扣减商品库存
String storageResult = storageFeignClient.consumeStorage(Storage_Code, orderCount);
if (FAIL.equals(storageResult)) {
throw new RuntimeException("商品报错回滚...");
}
//扣减订单库存
String orderResult = orderFeignClient.createOrder(USER_ID, Storage_Code, orderCount);
if (FAIL.equals(orderResult)) {
throw new RuntimeException("订单报错回滚...");
}
}
进入到 seata 的 bin 目录 seata\bin 下管理员执行 seata-server.bat 文件即可。
服务启动后默认端口为 8091。
注意:
启动 com.dameng.sample.BusinessXAApplication 服务。
启动 com.dameng.sample.StorageXAApplication 服务。
启动 com.dameng.sample.OrderXAApplication 服务。
启动 com.dameng.sample.AccountXAApplication 服务。
项目启动后,查看 seata 服务端日志,检查服务是否已经注册到 Seata 服务中。
测试成功
在浏览器中输入 http://localhost:8084/execWork?orderCount=1,查看库存,订单,金额是否正常。
测试回滚
修改 order 模块中 service 代码如图,使代码报异常。
资料包中 seata-xa-final.zip 为整合以后的包。
java.lang.IllegalArgumentException: endpoint format should like ip:port
at
io.seata.discovery.registry.FileRegistryServiceImpl.lookup(FileRegistryServiceIm pl.java:95) ~[seata-all-1.3.0.jar:1.3.0]
at
io.seata.core.rpc.netty.NettyClientChannelManager.getAvailServerList(NettyClient ChannelManager.java:217) ~[seata-all-1.3.0.jar:1.3.0]
at
io.seata.core.rpc.netty.NettyClientChannelManager.reconnect(NettyClientChannelMa nager.java:162) ~[seata-all-1.3.0.jar:1.3.0]
at
io.seata.core.rpc.netty.RmNettyRemotingClient.registerResource(RmNettyRemotingCl ient.java:181) [seata-all-1.3.0.jar:1.3.0]
at
io.seata.rm.AbstractResourceManager.registerResource(AbstractResourceManager.jav a:121) [seata-all-1.3.0.jar:1.3.0]
at
io.seata.rm.datasource.DataSourceManager.registerResource(DataSourceManager.java :146) [seata-all-1.3.0.jar:1.3.0]
解决办法:
解决办法:
-- 查询blob字段的值
select utl_raw.cast_to_varchar2(dbms_lob.substr(ROLLBACK_INFO)) from "SYSDBA"."UNDO_LOG";
文章
阅读量
获赞