springCloud — 高级篇(3)

本系列笔记涉及到的代码在GitHub上,地址:https://github.com/zsllsz/cloud

本文涉及知识点:

  • 分布式事务解决方案之Alibaba seata;

一、分布式事务问题

打个比方,我们在淘宝下单买一件商品,可能涉及到了三个微服务,分别是订单服务,库存服务,支付服务。这三个服务连接的是三个不同的数据库,但是下单的这一个过程对外是表现出一个整体。比如下单成功,然后扣库存也成功了,最后支付这一步失败了,那么库存系统和订单系统都应该回滚这一次操作。同一个数据库用事务就可以回滚了,不同数据库,那就要用分布式事务了。即每个数据库内部的数据一致性由本地事务来保证,全局数据一致性就由分布式事务来保证。


欢迎大家关注我的公众号 javawebkf,目前正在慢慢地将简书文章搬到公众号,以后简书和公众号文章将同步更新,且简书上的付费文章在公众号上将免费。


二、springCloud Alibaba Seata简介

1、是什么?
seata就是用来解决分布式事务的。官网地址:http://seata.io/zh-cn/

2、seata相关术语:
分布式事务处理过程的1个id + 3个组件 模型:1个id就是指全局唯一的事务id(transaction id);3个组件指的是:

  • Transaction Coordinator(TC):事务协调者,说白了就是你服务器上安装的seata。维护全局和分支事务的状态,驱动全局事务提交或回滚。

  • Transaction Manager(TM):事务管理者,说白了就是你加了@GlobalTransactional注解的那个方法。定义全局事务的范围,负责开启一个全局事务,并最终发起全局事务提交或者回滚的决议。

  • Resource Manager(RM):资源管理器,说白了就是本次涉及到的数据库。管理分支事务的资源,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

3、seata处理分布式事务的过程:

seata分布式事务处理过程
  • TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID;
  • XID在微服务调用链路的上下文中传播;
  • RM向TC注册分支事务,将其纳入XID对用的全局事务的管辖;
  • TM向TC发起针对XID的全局提交或回滚决议;
  • TC调度XID下管辖的全部分支事务完成提交或回滚请求。

简单地说,整个过程就是用一个XID关联起来的,比如下订单的过程是一个整体过程,需要用分布式事务,那么订单系统、库存系统和支付系统就会被同一个XID管着,表明它们是一个整体。每个系统就是一个RM,每个系统自己的事务由本地事务完成,每个系统的操作提交还是回滚都会告诉TM,TM再把结果告诉最终该提交还是回滚告诉TC去执行。

3、怎么玩?

  • 去哪儿下:https://github.com/seata/seata/tags
  • 怎么用:本地加@Transactional注解,全局加@GlobalTrasactional注解就完事了(先有个映像,编码实战部分再看具体用法)
  • 安装:官网下载后,解压
  • 在seata/config目录下,有一个nacos-config.txt,打开它,关注文件中的
service.vgroup_mapping.my_test_tx_group=default

my_test_tx_group就是组名,等下在file.conf和项目的application.yml中都要用到的。

  • 修改conf目录下的file.conf,主要改的是自定义事务组名称、事务日志存储模式改为db、数据库连接信息,如下:

这一段是修改事务组名称,即修改了service块的第一行,注意第一行要跟nacos-config.txt中的那一行对应,说白了就是将nacos-config.txt中的那一行拷贝过来去掉service.,等于号后面的值用引号引起来就可以了。还有就是default.grouplist后面的ip和端口,就是你seata启动的ip和端口。

service {
  #vgroup->rgroup
  vgroup_mapping.my_test_tx_group = "default"
  #only support single node
  default.grouplist = "192.168.0.106:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

store块,存储模式由file改为db。

store {
  ## store mode: file、db
  mode = "db"
  ……
}

db块中配置自己的数据库连接信息。

db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "dbcp"
    ## mysql/oracle/h2/oceanbase etc.
    db-type = "mysql"
    driver-class-name = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://192.168.0.106:3306/seata"
    user = "root"
    password = "zsl"
    min-conn = 1
    max-conn = 3
    global.table = "global_table"
    branch.table = "branch_table"
    lock-table = "lock_table"
    query-limit = 100
  }
  • 新建数据库seata;
  • 在seata里新建表,建表的sql在conf目录下,名为db_store.sql,在seata库执行就好了;
  • 修改conf目录下的registry.conf,指明注册中心为nacos,配置nacos的连接信息,如下:
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    serverAddr = "192.168.0.106:8848"
    namespace = ""
    cluster = "default"
  }
  • 启动nacos;
  • 初始化seata的nacos配置:进入seata/conf目录,执行:
sh nacos-config.sh 192.168.0.106

这个ip就是你nacos所在的服务器IP。

  • 启动seata-server,直接执行seata/bin目录执行:
sh seata-server.sh -p 8091 -m db

-p是端口,-m是存储模式,我们配置了db存储,所以这里用db。

最后日志打印出如下日志表明启动成功:

load RegistryProvider[Nacos] extension by class[io.seata.discovery.registry.nacos.NacosRegistryProvider]

三、实战之数据库的准备

创建三个微服务,调用链路为 下订单 ---> 扣库存 ---> 减余额。
1、创建数据库:

  • seata_order:存储订单信息的数据库
  • seata_storage:存储库存信息的数据库
  • seata_account:存储账户信息的数据库
    建库的sql如下:
create database seata_order;
create database seata_storage;
create database seata_account;

2、建立业务数据表:

  • seata_order库下新建t_order表:
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order`  (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
  `count` int(11) DEFAULT NULL COMMENT '数量',
  `money` decimal(11, 0) DEFAULT NULL COMMENT '金额',
  `status` int(1) DEFAULT NULL COMMENT '订单状态:  0:创建中 1:已完结',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '订单表' ROW_FORMAT = Dynamic;
  • seata_storage库下新建t_storage表:
DROP TABLE IF EXISTS `t_storage`;
CREATE TABLE `t_storage`  (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
  `total` int(11) DEFAULT NULL COMMENT '总库存',
  `used` int(11) DEFAULT NULL COMMENT '已用库存',
  `residue` int(11) DEFAULT NULL COMMENT '剩余库存',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '库存' ROW_FORMAT = Dynamic;
INSERT INTO `t_storage` VALUES (1, 1, 100, 0, 100);
  • seata_account库下新建t_account表:
CREATE TABLE `t_account`  (
  `id` bigint(11) NOT NULL COMMENT 'id',
  `user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
  `total` decimal(10, 0) DEFAULT NULL COMMENT '总额度',
  `used` decimal(10, 0) DEFAULT NULL COMMENT '已用余额',
  `residue` decimal(10, 0) DEFAULT NULL COMMENT '剩余可用额度',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '账户表' ROW_FORMAT = Dynamic;
 
INSERT INTO `t_account` VALUES (1, 1, 1000, 0, 1000);

3、新建事务回滚日志表:
上面新建的三个数据库都需要新建各自的回滚日志表。在三个业务数据库中都执行seata-server-0.9.0/seata/conf/目录下的db_undo_log.sql即可。

四、实战之业务代码的编写

业务需求:下订单 ---> 减库存 ---> 扣余额 ---> 改订单状态。
1、新建订单模块seata-order-service2001:

  • pom.xml:主要是nacos、seata、openfeign和数据库连接那一套。
 

    com.alibaba.cloud
    spring-cloud-starter-alibaba-nacos-discovery




    com.alibaba.cloud
    spring-cloud-starter-alibaba-seata
    
        
            io.seata
            seata-all
        
    


    io.seata
    seata-all
    0.9.0




    org.springframework.cloud
    spring-cloud-starter-openfeign


    org.springframework.boot
    spring-boot-starter-web


    org.mybatis.spring.boot
    mybatis-spring-boot-starter



    com.alibaba
    druid-spring-boot-starter
    1.1.22


    mysql
    mysql-connector-java



    org.springframework.boot
    spring-boot-starter-jdbc


    org.springframework.boot
    spring-boot-starter-actuator


    org.projectlombok
    lombok
    true


    org.springframework.boot
    spring-boot-starter-test
    test

  • application.yml(主要是要注意tx-service-group的值要与nacos-config.txt和file.conf中的对应):
server:
  port: 2001

spring:
  application:
    name: seata-order-service
  cloud:
    alibaba:
      seata:
        # 自定义事务组名称需要与seata-server中的对应
        tx-service-group: my_test_tx_group
    nacos:
      discovery:
        server-addr: 192.168.0.106:8848
  datasource:
    # 当前数据源操作类型
    type: com.alibaba.druid.pool.DruidDataSource
    # mysql驱动类
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.0.106:3306/seata_order?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
    username: root
    password: zsl
feign:
  hystrix:
    enabled: false
logging:
  level:
    io:
      seata: info

mybatis:
  mapper-locations: classpath:mapper/*.xml
  • seata配置文件:将seata/conf下的file.conf和registry.conf拷贝到application.yml的同级目录下。
  • CommonResult.java:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult {
    
    private Integer code;
    private String message;
    private T data;
}
  • Order.java:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    
    private Long id;
    private Long userId;
    private Long productId;
    private Integer count;
    private BigDecimal money;
    private Integer status; // 0:创建中, 1:已完结

}
  • OrderDao.java:
@Mapper
public interface OrderDao {
    /** 创建订单 */
    public void create(Order order);
    /** 修改订单状态 */
    public void update(@Param("userId") Long userId, @Param("status") Integer status);
}
  • OrderMapper.xml:



  
   
      insert into t_order(user_id, product_id, count, money, status) 
      values(#{userId}, #{productId}, #{count}, #{money}, 0);
   
   
   
      update t_order set status = 1 where user_id = #{userId} and status = #{status};
   
  

  • OrderService.java:
public interface OrderService {
    /** 创建订单 */
    public void create(Order order);
}
  • OrderServiceImpl.java:
@Service
@Slf4j
public class OrderServiceImpl implements OrderService{
    
    @Autowired
    private OrderDao orderDao;
    @Autowired
    private StorageService storageService;
    @Autowired
    private AccountService accountService;

    @Override
    public void create(Order order) {
        log.info("================= 新建订单start ==============");
        orderDao.create(order);
        log.info("================= 新建订单end ==============");
        
        log.info("================= 订单微服务调用库存微服务扣减库存start ==============");
        storageService.decrease(order.getProductId(), order.getCount());
        log.info("================= 订单微服务调用库存微服务扣减库存end ==============");
        
        log.info("================= 订单微服务调用账户微服务做扣减余额start ==============");
        accountService.decrease(order.getUserId(), order.getMoney());
        log.info("================= 订单微服务调用账户微服务做扣减余额end ==============");
        
        log.info("================= 修改订单状态start ==============");
        orderDao.update(order.getUserId(), 0);
        log.info("================= 修改订单状态end ==============");
        
        log.info("================= 下单完成✔ ==============");
        
    }
}
  • StorageService.java:
@FeignClient(value = "seata-storage-service")
public interface StorageService {

    /** 扣减库存 */
    @PostMapping("/storage/decrease")
    public CommonResult decrease(@RequestParam("productId") Long id, @RequestParam("count") Integer count);
}
  • AccountService.java:
@FeignClient(value = "seata-account-service")
public interface AccountService {

    /** 扣余额 */
    @PostMapping("/account/decrease")
    public CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
  • OrderController.java:
@RestController
@RequestMapping("/order")
public class OrderController {
    
    @Autowired
    private OrderService orderService;
    
    @GetMapping("/create")
    public CommonResult create(Order order) {
        orderService.create(order);
        return new CommonResult<>(200, "订单创建成功", null);
    }
}
  • DataSourceProxyConfig.java:
package com.zhusl.springcloud.config;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import com.alibaba.druid.pool.DruidDataSource;

import io.seata.rm.datasource.DataSourceProxy;


/**
 * 使用seata对数据源进行代理
 * @author zhu
 *
 */
@Configuration
public class DataSourceProxyConfig {

    @Value("${mybatis.mapperLocations}")
    private String mapperLocations;
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource() {
        return new DruidDataSource();
    }
    
    @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }
    
    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception{
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }

}
  • 主启动类:
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) // 取消数据源的自动创建,用自己配置的
@EnableDiscoveryClient
@EnableFeignClients
@MapperScan({"com.zhusl.springcloud.dao"})
public class App {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(App.class, args);
    }
}

准备完成,现在依次启动nacos、seata和2001这个项目,最后项目控制台打印出如下日志并且在nacos中有seata和2001这两个服务表明启动成功。
启动成功的日志:

2020-06-05 11:29:12.393  INFO 74764 --- [           main] com.zhusl.springcloud.App                : Started App in 7.465 seconds (JVM running for 7.931)
2020-06-05 11:29:12.676  INFO 74764 --- [imeoutChecker_1] i.s.c.r.netty.NettyClientChannelManager  : will connect to 192.168.2.43:8091
2020-06-05 11:29:12.676  INFO 74764 --- [imeoutChecker_1] i.s.core.rpc.netty.NettyPoolableFactory  : NettyPool create channel to transactionRole:TMROLE,address:192.168.2.43:8091,msg:< RegisterTMRequest{applicationId='seata-order-service', transactionServiceGroup='my_test_tx_group'} >
2020-06-05 11:29:12.751  INFO 74764 --- [imeoutChecker_1] i.s.core.rpc.netty.NettyPoolableFactory  : register success, cost 71 ms, version:0.9.0,role:TMROLE,channel:[id: 0x9fe21753, L:/192.168.2.36:65186 - R:/192.168.2.43:8091]

在配置启动过程中遇到了很多问题,大家可以去官网寻找解决方案。遇事不要慌,官网来帮忙。
https://github.com/seata/seata-samples

2、新建名为seata-storage-service2002的库存module:

  • pom.xml:和2001订单module的一模一样;
  • application.yml:端口改成2002,微服务名称改成seata-storage-service,连接的数据库改成seata_storage,其他的都和2001的一样;
  • 把file.conf和registry.conf复制粘贴到application.yml的同级目录下;
  • Storage.java:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Storage {

    private Long id;
    /** 产品id */
    private Long productId;
    /** 总库存 */
    private Integer total;
    /** 已用库存 */
    private Integer used;
    /** 剩余库存 */
    private Integer residue;
}
  • StorageDao.java:
@Mapper
public interface StorageDao {
    /** 扣减库存 */
    void decrease(@Param("productId") Long productId, @Param("count") Integer count);
}
  • StorageMapper.xml:



   
      update t_storage set used = used + #{count}, residue = residue - #{count} where product_id = #{productId};
   

  • StorageServiceImpl.java:
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
    @Autowired
    private StorageDao storageDao;
    @Override
    public void decrease(Long productId, Integer count) {
        log.info("============== storageService 扣减库存 start =============");
        storageDao.decrease(productId, count);
        log.info("============== storageService 扣减库存 end =============");
    }
}
  • StorageController.java:
@RestController
@RequestMapping("/storage")
public class StorageController {

    @Autowired
    private StorageService storageService;
    
    @PostMapping("/decrease")
    public CommonResult decrease(Long productId, Integer count) {
        storageService.decrease(productId, count);
        return new CommonResult<>(200, "扣减库存成功!", null);
    }
}
  • 最后数据源的配置和主启动类和都和2001的一样,复制粘贴即可。

3、新建名为seata-account-service2003的账户module:

  • pom.xml:和2001的一模一样;
  • application.yml:端口改为2003,服务名改成seata-account-service,连接的数据库改成seata_account;
  • 复制粘贴file.conf和registry.conf到application.yml的同级目录;
  • Account.java:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Account {
    private Long id;
    /** 用户id */
    private Long userId;
    /** 总额度 */
    private BigDecimal total;
    /** 已用额度 */
    private BigDecimal used;
    /** 剩余额度 */
    private BigDecimal residue;
}
  • AccountDao.java:
@Mapper
public interface AccountDao {
    /** 扣减余额 */
    void decrease(@Param("userId") Long userId, @Param("money") BigDecimal money);
}
  • AccountMapper.xml:



   
      update t_account set residue = residue - #{money},used = used + #{money} where user_id = #{userId};
   

  • AccountServiceImpl.java:
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
    @Autowired
    private AccountDao accountDao;
    @Override
    public void decrease(Long userId, BigDecimal money) {
        log.info("================ account-service 扣减余额 start ===============");
        accountDao.decrease(userId, money);
        log.info("================ account-service 扣减余额 end ===============");
    }
}
  • AccountController.java:
@RestController
@RequestMapping("/account")
public class AccountController {
    @Autowired
    private AccountService accountService;
    @PostMapping("/decrease")
    public CommonResult decrease(Long userId, BigDecimal money) {
        accountService.decrease(userId, money);
        return new CommonResult<>(200, "扣减余额成功!", null);
    }
}
  • 最后别忘记主启动类和数据源配置类。

4、测试:
3个module建完,先测试一下能否成功运行起来,先启动nacos,再启动seata,然后依次启动3个module。下面是3张表的初始情况:

数据库初始状态

现在模拟正常下单:
http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
访问之后,可能出现两种情况:

  • 返回成功信息,数据库成功的创建了一条订单,account和storage也成功的扣除了对应的数量。
  • openfeign报错,read timeout,成功创建了订单,但是account没有扣减。

如果出现第二种情况,那也充分说明了目前这三个操作没有在一个事务里。如果你想不报错,不想让openfeign超时,加上在application.yml中加上如下配置即可:

ribbon:
  ReadTimeout: 10000 #10秒应该就不会超时了
  ConnectTimeout: 10000

接下来我们在account的service里让线程睡11秒钟,虽然刚才openfeign设置了超时时间10秒,但是现在睡11秒,肯定还是会异常的。然后在OrderServiceImpl类上加上全局事务注解:

@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)

name随意,不冲突就好,rollbackFor表示什么情况下回滚,这里的意思是报异常了就回滚。

配置好之后,重新启动account和order这两个微服务,最后再次访问下订单的链接。就会发现报超时异常了,但是三个数据库的三张表都没有数据变化,即使全部都回滚了,这就表明分布式事务起作用了。

五、关于seata的其他说明

seata官网上说它支持AT、TCC、SAGA 和 XA 事务模式,我们用的是默认的AT模式。

1、AT模式如何做到对业务无侵入的?

  • AT模式的前提:基于支持ACID事务的关系型数据库,通过JDBC访问数据库的java应用;
  • 整体机制:两阶段提交协议。
  • 一阶段:seata会拦截业务sql,找到业务要更新的数据,在被更新之前,将其保存为before image;执行业务sql,更新业务数据;在业务数据更新之后,将其保存为after image,最后生成行锁。一阶段的操作都在一个数据库事务内完成,保证了一阶段操作的原子性。这就类似spring的aop思想,前置通知和后置通知。
  • 二阶段提交:如果顺利,二阶段就进行提交。因为一阶段已经执行过业务sql了,所以这里只需要将一阶段保存的before image、after image和行锁删除即可。
  • 二阶段回滚:如果出异常了需要回滚,通过一阶段的回滚日志进行反向补偿。首先会比较当前库中数据和after image是否一致,如果一致,那么就将数据库中的数据还原成before image;如果不一致,说明数据出现过脏写,需要人工处理。

版权声明:
作者:感冒的梵高
链接:https://www.techfm.club/p/714.html
来源:TechFM
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>