SpringCloud - 分布式事务四(多数据源事务)
# 项目搭建
我搭建的是一个最基础的微服务框架,因为后面需要测试微服务下的分布式事务。这里我们先测试的是 多数据源 下的分布式事务。我们的测试方案是:
采购开始 -> 减库存,得到所需花费金额 -> 用户减去所需花费金额 -> 创建订单
# 项目结构如下:
|-- demo
|-- entity 实体对象(为了让其他服务拥有所有服务对象)
|-- order 订单 (pom导入了 entity )
|-- stock 库存 (pom导入了 entity )
|-- user 用户 (pom导入了 entity )
2
3
4
5
这里使用了 mybatis-plus 的相关技术,不懂得请自行百度,数据源配置如下:
spring:
main:
allow-bean-definition-overriding: true
autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource:
druid:
stat-view-servlet:
enabled: true
url-pattern: "/druid/*"
allow: 127.0.0.1
deny: 192.168.1.73
reset-enable: true
login-username: admin
login-password: admin@2020
web-stat-filter:
enabled: true
url-pattern: "/*"
exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
dynamic:
druid:
filters: stat,wall
initial-size: 10
min-idle: 10
maxActive: 200
maxWait: 10000
useUnfairLock: true
validation-query: 'select 1'
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
primary: user
datasource:
user:
url: jdbc:mysql://0.0.0.0:3306/user?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
order:
url: jdbc:mysql://0.0.0.0:3306/order?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
stock:
url: jdbc:mysql://0.0.0.0:3306/stock?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 表结构创建
所建的表都不在同一个库下,分为 order (表为 orders)、user、stock 三个库,各自的库建各自的表,每个库都要建一个相同的 undo_log 表,如下:
/*表: orders*/--------------
/*列信息*/-----------
Field Type Collation Null Key Default Extra Privileges Comment
------ -------- --------- ------ ------ ------- ------ ------------------------------- ---------
id int (NULL) NO PRI (NULL) select,insert,update,references
uid int (NULL) YES (NULL) select,insert,update,references
count int (NULL) YES (NULL) select,insert,update,references
time datetime (NULL) YES (NULL) select,insert,update,references
money int (NULL) YES (NULL) select,insert,update,references
2
3
4
5
6
7
8
9
/*表: stock*/--------------
/*列信息*/-----------
Field Type Collation Null Key Default Extra Privileges Comment
------ ------ --------- ------ ------ ------- ------ ------------------------------- ---------
id int (NULL) NO PRI (NULL) select,insert,update,references
num int (NULL) YES (NULL) select,insert,update,references
price int (NULL) YES (NULL) select,insert,update,references
2
3
4
5
6
7
/*表: account*/----------------
/*列信息*/-----------
Field Type Collation Null Key Default Extra Privileges Comment
------ ------ --------- ------ ------ ------- -------------- ------------------------------- ---------
id int (NULL) NO PRI (NULL) auto_increment select,insert,update,references
amount int (NULL) YES (NULL) select,insert,update,references
2
3
4
5
6
表数据如下:
stock: id = 1,num = 500,price = 5
account: id = 1,amount = 2000
2
undo_log 表结构 (opens new window)
# 编写过程
我把所有的接口和实现均写在 user 服务下进行测试,如下图:
# AccountServer
public interface AccountServer {
/**
* 从胡账户中指支出
**/
void debit(Integer uid, Integer money);
}
2
3
4
5
6
7
@Service
public class AccountServerImpl implements AccountServer {
@Autowired
private AccountMapper accountMapper;
@DS("user")
@Override
public void debit(Integer uid, Integer money) {
Account account = accountMapper.selectById(uid);
account.setAmount(account.getAmount() - money);
accountMapper.updateById(account);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# StockServer
public interface StockServer {
/**
* 扣除库存数量,返回金额
**/
Integer deduct(Integer stockId, Integer count);
}
2
3
4
5
6
@Service
public class StockServerImpl implements StockServer {
@Autowired
private StockMapper stockMapper;
// 库存这里需要乐观锁,但是这里我就不做了
@DS("stock")
@Override
public Integer deduct(Integer stockId, Integer count) {
Stock stock = stockMapper.selectById(stockId);
stock.setNum(stock.getNum() - count);
stockMapper.updateById(stock);
return stock.getPrice().intValue() * count.intValue();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# OrderServer
public interface OrderServer {
/**
* 创建订单
**/
void create(Integer uid, Integer stockId, Integer count,Integer money);
}
2
3
4
5
6
7
@Service
public class OrderServerImpl implements OrderServer {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountServer accountServer;
@DS("order")
@Override
public void create(Integer uid, Integer stockId, Integer count,Integer money) {
accountServer.debit(uid,money);
Order order = new Order();
order.setId(uid);
order.setTime(new Date());
order.setCount(count);
order.setMoney(money);
orderMapper.insert(order);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# BusinessService
public interface BusinessService {
/**
* 采购
**/
void purchase(Integer uid, Integer stockId, Integer count);
}
2
3
4
5
6
@Service
public class BusinessServiceImpl implements BusinessService {
@Autowired
private StockServer stockServer;
@Autowired
private OrderServer orderServer;
@Override
public void purchase(Integer uid, Integer stockId, Integer count) {
Integer money = stockServer.deduct(stockId, count);
orderServer.create(uid, stockId, count,money);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# UserApplication
@SpringBootApplication
public class UserApplication implements CommandLineRunner {
@Autowired
private BusinessService businessService;
public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
/**
* 执行采购,用户id=1,库存id=1,采购数量count=3
**/
businessService.purchase(1,1,3);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 无 seata 事务处理测试
# 测试加 @Transactional 是否有效
@Transactional(rollbackFor = Exception.class)
@Override
public void purchase(Integer uid, Integer stockId, Integer count) {
Integer money = stockServer.deduct(stockId, count);
orderServer.create(uid, stockId, count,money);
}
2
3
4
5
6
测试结果:
java.sql.SQLSyntaxErrorException: Table 'user.stock' doesn't exist;
什么原因呢?其实就是 @Transactional 的事务传播策略默认为 Propagation.REQUIRED,如果当前没有事务,就新建一个事务,如果已经存在一个事务,就加入到这个事务中。也就是说不同之间的服务调用使用的是同一个库的事务,所以他就查同一个库下的这张表。
避免这种情况可以在子服务的方法上加 @Transactional (propagation = Propagation.REQUIRES_NEW),新建事务,如果当前存在事务,把当前事务挂起。意思就是,A 调 B 的过程中,A 方法用的是 A 库的事务,B 方法用的是 B 库的事务,相互独立不受影响。
代码修改如下:
// 库存这里需要乐观锁,但是这里我就不做了
@DS("stock")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public Integer deduct(Integer stockId, Integer count) {
Stock stock = stockMapper.selectById(stockId);
stock.setNum(stock.getNum() - count);
stockMapper.updateById(stock);
return stock.getPrice().intValue() * count.intValue();
}
2
3
4
5
6
7
8
9
10
@DS("order")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void create(Integer uid, Integer stockId, Integer count,Integer money) {
accountServer.debit(uid,money);
Order order = new Order();
order.setUid(uid);
order.setTime(new Date());
order.setCount(count);
order.setMoney(money);
if(1==1){
throw new RuntimeException("下单异常");
}
orderMapper.insert(order);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@DS("user")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void debit(Integer uid, Integer money) {
Account account = accountMapper.selectById(uid);
account.setAmount(account.getAmount() - money);
accountMapper.updateById(account);
}
2
3
4
5
6
7
8
# 测试下单异常
@Service
public class OrderServerImpl implements OrderServer {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountServer accountServer;
@DS("order")
@Override
public void create(Integer uid, Integer stockId, Integer count,Integer money) {
accountServer.debit(uid,money);
Order order = new Order();
order.setUid(uid);
order.setTime(new Date());
order.setCount(count);
order.setMoney(money);
if(1==1){
throw new RuntimeException("下单异常");
}
orderMapper.insert(order);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
测试结果:
stock:id=1,num=497,price=5
account:id=1,amount=1985
order:
2
3
用户账户扣除,库存扣除,下单失败。
也就是说 @Transactional 处理不了分布式事务,只能处理同一个库的事务。
# 添加 Seata 分布式事务
# 添加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.3.0</version>
</dependency>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 修改配置
server:
port: 8082
spring:
main:
allow-bean-definition-overriding: true
autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource:
druid:
stat-view-servlet:
enabled: true
url-pattern: "/druid/*"
allow: 127.0.0.1
deny: 192.168.1.73
reset-enable: true
login-username: admin
login-password: admin@2020
web-stat-filter:
enabled: true
url-pattern: "/*"
exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
dynamic:
druid:
filters: stat,wall
initial-size: 30
min-idle: 20
maxActive: 200
maxWait: 10000
useUnfairLock: true
validation-query: 'select 1'
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
primary: user
# 启用严格模式
strict: true
# 开启分布式事务
seata: true
# 事务模式 为AT
seata-mode: AT
datasource:
user:
url: jdbc:mysql://10.240.30.100:3306/user?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
# 建表脚本,启动时会运行
# schema: classpath:db/schema-account.sql
order:
url: jdbc:mysql://10.240.30.100:3306/order?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
stock:
url: jdbc:mysql://10.240.30.100:3306/stock?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
# 事务配置
seata:
enabled: true
# 启用自动代理数据源
enable-auto-data-source-proxy: false
# 随便起个名字,但最好与服务名称一致
application-id: ${spring.application.name}
# 此处的名称一定要与 service.vgroupMapping 下配置的参数保持一致
tx-service-group: my_test_tx_group
# 目的是从nacos 获取配置信息
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
username: nacos
password: nacos@root@2020
namespace:
group: SEATA_GROUP
# registry 目的是从nacos找 seata-server 服务
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
username: nacos
password: nacos@root@2020
namespace:
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
seata-server 端的 config 和 registry 是注册中心 和 配置中心。
client 配置的 registry 是从 naocs 所在的注册中心获取 seata-server 所在的集群或服务,用来连接 seata-serve,config 是从 naocs 所在 config,获取配置。
关于 nacos 和 seata-server 的配置请看 分布式事务 Seata (三) Seata 搭建 (opens new window)
# 添加全局事务
@Transactional
@GlobalTransactional(rollbackFor = Exception.class)
@Override
public void purchase(Integer uid, Integer stockId, Integer count) {
Integer money = stockServer.deduct(stockId, count);
orderServer.create(uid, stockId, count,money);
}
2
3
4
5
6
7
# 测试
stock:id=1,num=500,price=5
account:id=1,amount=2000
order:
2
3
证明多数据源事务处理成功。