admin管理员组文章数量:1571764
Springcloud
一、介绍
Spring Cloud 是一系列框架的有序集合,可以快速构建分布式系统中的一些常见模式(例如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话,集群状态等等)
官网:https://spring.io/projects/spring-cloud
二、技术选型
Springcloud采用英国伦敦地铁车站命名,由英文字母A-Z依次迭代
官网:https://docs.spring.io/spring-cloud/docs/Hoxton.SR12/reference/html/
中文文档:https://www.bookstack/read/spring-cloud-docs/docs-index.md
Springcloud和Springboot版本选择
查看:https://start.spring.io/actuator/info
选择Springcloud Hoxton.SR12、Springboot 2.2.2.RELEASE(2.2.x ~ 2.3.x都行)、springcloud alibaba
2.1.1.RELEASE
微服务组件:
三、搭建Springcloud项目
创建Maven父工程:
设置字符编码:
配置注解激活生效:
选择java编译器1.8:
文件过滤(选做):
父工程用来管理依赖版本:
pom.xml:
<groupId>com.qingsongxyz</groupId>
<artifactId>SpringCloud</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 在父级工程或聚合工程中使用,做jar包的版本控制 -->
<packaging>pom</packaging>
<!-- 统一管理jar包 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mavenpiler.source>1.8</mavenpiler.source>
<mavenpiler.target>1.8</mavenpiler.target>
<junit.version>4.12</junit.version>
<log4j.version>1.2.17</log4j.version>
<lombok.version>1.16.18</lombok.version>
<mysql.version>8.0.28</mysql.version>
<druid.version>1.2.8</druid.version>
<mybatis-plus.version>3.4.3.4</mybatis-plus.version>
</properties>
<!-- 子模块继承之后,锁定版本,子模块不用写版本号 在此只是声明依赖版本 并不引入 -->
<dependencyManagement>
<dependencies>
<!-- spring boot版本 2.2.2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.2.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- spring cloud版本 Hoxton.SR1 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- spring cloud alibaba版本 2.1.1.RELEASE -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.1.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<!-- mybatis-plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<!-- log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<addResources>true</addResources>
</configuration>
</plugin>
</plugins>
</build>
dependencyManagement作用:
让所有在子项目中引用的依赖不用显示声明版本号,导入依赖时会沿着父子层次向上查找,直到找到一个拥
有dependencyManagement元素的父工程,使用其中指定的版本号
执行clean、install将父工程发布到Maven仓库方便子工程继承
四、RestTemplate使用
1.建表
create table payment
(
id bigint auto_increment comment '主键id' primary key,
serial varchar(50) not null comment '系列'
)Engine=InnoDB default charset=utf8;
2.热部署Devtools
修改项目代码后,自动编译重新运行
导入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
父工程添加插件:
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<addResources>true</addResources>
</configuration>
</plugin>
</plugins>
</build>
开启自动构建:
注册表快捷键Ctrl + Shift + Alt + /
:
自动启动make:
**注意:**最后重启idea
3.构建支付子模块
创建cloud-provider-payment8001 Springboot项目:
如果连接下载Springboot模版很慢,可以换成阿里云地址:
导入依赖:
<project xmlns="http://maven.apache/POM/4.0.0" xmlns:xsi="http://www.w3/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache/POM/4.0.0 https://maven.apache/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- 继承父项目 -->
<parent>
<groupId>com.qingsongxyz</groupId>
<artifactId>SpringCloud</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<!-- 可以不用写groupId和version -->
<artifactId>cloud-provider-payment8001</artifactId>
<name>cloud-provider-payment8001</name>
<description>cloud-provider-payment8001</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml:
server:
port: 8001
spring:
application:
name: cloud-payment-service # 微服务名称
datasource:
username: root
password: 1234
url: jdbc:mysql://localhost:3306/db_cloud?useUnicode=true&characterEncoding=UTF-8&useSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
druid:
aop-patterns: com.qingsongxyz.* # 配置Spring监控
filters: 'stat,wall'
stat-view-servlet:
enabled: true # 打开监控统计功能
login-username: admin
login-password: admin
reset-enable: true
web-stat-filter:
enabled: true # Web关联监控配置
filter:
stat:
enabled: true # 开启sql监控
wall:
enabled: true # 开启防火墙
db-type: mysql
config:
delete-allow: false
drop-table-allow: false
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
map-underscore-to-camel-case: false
实体类:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Payment {
private Long id;
private String serial;
}
结果封装类:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult {
private Integer code;
private String message;
private Object data;
public static CommonResult ok(){
return new CommonResult(200, null, null);
}
public static CommonResult ok(String message){
return new CommonResult(200, message, null);
}
public static CommonResult ok(String message, Object data){
return new CommonResult(200, message, data);
}
public static CommonResult failure(int code, String message){
return new CommonResult(code, message, null);
}
}
Mapper:
@Mapper
public interface PaymentMapper extends BaseMapper<Payment> {
}
Service:
public interface PaymentService extends IService<Payment> {
}
@Service
public class PaymentServiceImpl extends ServiceImpl<PaymentMapper, Payment> implements PaymentService {
}
Controller:
@RestController
@Slf4j
public class PaymentController {
@Autowired
private PaymentService paymentService;
//接受的参数需要添加@RequestBody注解,订单模块控制器标注@RestController,传递过来的是json格式字符串,需要解析为对应的Pojo类
@PostMapping("/payment")
public CommonResult create(@RequestBody Payment payment){
boolean success = paymentService.save(payment);
log.info("插入结果:" + payment);
if(success)
{
return CommonResult.ok("插入数据成功!", payment);
}
return CommonResult.failure(500, "插入数据失败!!!");
}
@GetMapping("/payment/get/{id}")
public CommonResult getPayment(@PathVariable("id") Long id){
Payment payment = paymentService.getById(id);
log.info("查询id:" + id);
if(!ObjectUtils.isEmpty(payment))
{
return CommonResult.ok("查询数据成功!", payment);
}
return CommonResult.failure(500, "查询数据失败!!!");
}
}
4.构建订单子模块
创建cloud-consumer-order8000 Springboot项目:
导入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache/POM/4.0.0" xmlns:xsi="http://www.w3/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache/POM/4.0.0 https://maven.apache/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- 继承父项目 -->
<parent>
<groupId>com.qingsongxyz</groupId>
<artifactId>SpringCloud</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>cloud-consumer-order8000</artifactId>
<name>cloud-consumer-order8000</name>
<description>cloud-consumer-order8000</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml:
server:
port: 8000
spring:
application:
name: cloud-order-service
实体类和结果封装类和支付模块相同
向容器中注入RestTemplate对象:
@Configuration
public class ApplicationContextConfig {
@Bean
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
Controller:
@RestController
@Slf4j
public class OrderController {
@Autowired
private RestTemplate restTemplate;
private static final String PAYMENT_URI = "http://localhost:8001";
@GetMapping("/consumer/payment")
public CommonResult create(Payment payment){
log.info("消费者插入支付信息:" + payment);
return restTemplate.postForObject(PAYMENT_URI + "/payment", payment, CommonResult.class);
}
@GetMapping("/consumer/payment/{id}")
public CommonResult get(@PathVariable("id") Long id){
log.info("消费者查询支付id:" + id);
return restTemplate.getForObject(PAYMENT_URI + "/payment/get/" + id, CommonResult.class);
}
@GetMapping("/consumer/payment/getForEntity")
public CommonResult create2(Payment payment){
log.info("消费者插入支付信息:" + payment);
ResponseEntity<CommonResult> entity = restTemplate.postForEntity(PAYMENT_URI + "/payment", payment, CommonResult.class);
if(entity.getStatusCode().is2xxSuccessful())
{
return entity.getBody();
}
return CommonResult.failure(500, "插入支付信息失败!!!");
}
@GetMapping("/consumer/payment/getForEntity/{id}")
public CommonResult get2(@PathVariable("id") Long id){
log.info("消费者查询支付id:" + id);
ResponseEntity<CommonResult> entity = restTemplate.getForEntity(PAYMENT_URI + "/payment/get/" + id, CommonResult.class);
if(entity.getStatusCode().is2xxSuccessful())
{
return entity.getBody();
}
return CommonResult.failure(500, "通过支付id查询失败!!!");
}
}
测试:
RestTemplate:
RestTemplate文档:https://docs.spring.io/spring-framework/docs/current/javadoc-api/
getForObject:发送get远程调用,获取响应体
postForObject:发送post远程调用,获取响应体
getForEntity:发送get远程调用,获取响应对象(包括响应头、响应码、响应体)
postForEntity:发送post远程调用,获取响应对象(包括响应头、响应码、响应体)
5.项目重构
将各个子模块中使用的公共实体类提取出来
新建子模块cloud-api-commons
项目结构:
导入公共实体类
导入公共依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.4</version>
</dependency>
maven install打包到仓库:
修改支付、订单子模块:
删除公共实体类:
导入公共子模块依赖:
<!-- 导入实体类模块 -->
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
五、注册中心
1.Eureka
在传统的RPC远程调用框架中,管理每个服务与服务之间依赖关系比较复杂,管理比较复杂,需要使用服务
治理,管理服务之间的依赖关系,实现服务调用、负载均衡、容错、服务注册与发现
Eureka采用CS架构,Eureka Server作为服务注册功能的服务器,它是服务注册中心,而系统中的其他微服
务用Eureka Client连接到服务器并维持心跳连接,继而可以使用Eureka Server来监控系统中各个微服务是
否正常运行(任何RPC框架中都会有一个注册中心)
1)搭建单节点Eureka Server
创建子模块cloud-eureka-server7001
导入依赖:
<!-- Eureka Server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
application.yaml:
server:
port: 7001
spring:
application:
name: cloud-eureka7001-server
eureka:
instance:
hostname: eureka7001 # eureka服务端实例
client:
register-with-eureka: false # 不向注册中心注册自己
fetch-registry: false # 表示自己是注册中心
service-url:
defaultZone: http://#{eureka.instance.hostname}:${server.port}/eureka # Eureka地址
启动类开启Eureka Server:
@SpringBootApplication
@EnableEurekaServer //开启Eureka客户端
public class CloudEurekaServer7001Application {
public static void main(String[] args) {
SpringApplication.run(CloudEurekaServer7001Application.class, args);
}
}
启动项目cloud-eureka-server7001,访问http://localhost:7001/
2)注册订单、支付子模块
两个子模块添加依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
两个子模块启动类开启EurekaClient:
@SpringBootApplication
@EnableEurekaServer //开启Eureka客户端
public class CloudEurekaServer7001Application {
public static void main(String[] args) {
SpringApplication.run(CloudEurekaServer7001Application.class, args);
}
}
@SpringBootApplication
@EnableEurekaClient //开启Eureka客户端
public class CloudProviderPayment8001Application {
public static void main(String[] args) {
SpringApplication.run(CloudProviderPayment8001Application.class, args);
}
}
两个子模块application.yaml添加配置:
# 订单模块
eureka:
instance:
instance-id: order8000 # 显示微服务名称
prefer-ip-address: true # 访问路径显示ip
client:
register-with-eureka: true # 表示将自己注册进eureka server
fetch-registry: true # 是否从 eureka server抓取已有的注册信息,单节点默认为true,集群必须设置为true才能配合ribbon使用负载均衡
service-url:
defaultZone: http://localhost:7001/eureka # Eureka Server地址
# 支付模块
eureka:
instance:
instance-id: payment8001 # 显示微服务名称
prefer-ip-address: true # 访问路径显示ip
client:
register-with-eureka: true # 表示将自己注册进eureka server
fetch-registry: true # 是否从 eureka server抓取已有的注册信息,单节点默认为true,集群必须设置为true才能配合ribbon使用负载均衡
service-url:
defaultZone: http://localhost:7001/eureka/ # Eureka Server地址
启动三个子模块,访问http://localhost:7001/
3)搭建Eureka集群
单节点Eureka可能发生单点故障,此时需要搭建集群保证高可用
多个Eureka节点之间相互注册,下面搭建一个两节点的集群
1.创建子模块cloud-eureka-server7002
2.导入Eureka Server依赖
3.启动类添加注解@EurekaServer
4.修改配置文件application.yaml:
server:
port: 7001
spring:
application:
name: cloud-eureka7001-server
eureka:
instance:
hostname: eureka7001 # eureka服务端实例
client:
register-with-eureka: false # 不向注册中心注册自己
fetch-registry: false # 表示自己是注册中心
service-url:
defaultZone: http://eureka7002:7002/eureka # 相互注册服务地址
server:
port: 7002
spring:
application:
name: cloud-eureka7002-server
eureka:
instance:
hostname: eureka7002 # eureka服务端实例
client:
register-with-eureka: false # 不向注册中心注册自己
fetch-registry: false # 表示自己是注册中心
service-url:
defaultZone: http://eureka7001:7001/eureka # 相互注册服务地址
5.修改hosts文件(C:\Windows\System32\drivers\etc\hosts),配置域名
127.0.0.1 eureka7001
127.0.0.1 eureka7002
启动Eureka集群,通过域名访问:
订单、支付子模块,加入Eureka集群:
修改配置文件application.yaml:
defaultZone: http://eureka7001:7001/eureka,http://eureka7002:7002/eureka #集群地址
4)搭建支付模块集群
支付模块(相当于服务提供者)可能发生单点故障,此时需要搭建集群保证高可用
新建子模块cloud-provider-payment8002,和cloud-provider-payment8001子模块相同
两个支付模块修改业务类:
@RestController
@Slf4j
public class PaymentController {
@Autowired
private PaymentService paymentService;
// 注入服务端口号
@Value("${server.port}")
private String serverPort;
// 两个支付方法 结果添加端口号 测试两个支付服务轮询响应
@PostMapping("/payment")
public CommonResult create(@RequestBody Payment payment){
boolean success = paymentService.save(payment);
log.info("插入结果:" + payment);
if(success)
{
return CommonResult.ok("插入数据成功! serverPort:" + serverPort, payment);
}
return CommonResult.failure(500, "插入数据失败!!!");
}
@GetMapping("/payment/get/{id}")
public CommonResult getPayment(@PathVariable("id") Long id){
Payment payment = paymentService.getById(id);
log.info("查询id:" + id);
if(!ObjectUtils.isEmpty(payment))
{
return CommonResult.ok("查询数据成功! serverPort:" + serverPort, payment);
}
return CommonResult.failure(500, "查询数据失败!!!");
}
}
订单模块修改地址:
@RestController
@Slf4j
public class OrderController {
@Autowired
private RestTemplate restTemplate;
//远程地址写死了 永远为8001端口支付模块响应
//private static final String PAYMENT_URI = "http://localhost:8001";
//配置远程地址为服务名称(Eureka注册的服务名称)
private static final String PAYMENT_URI = "http://CLOUD-PAYMENT-SERVICE";
@GetMapping("/consumer/payment")
public CommonResult create(Payment payment){
log.info("消费者插入支付信息:" + payment);
return restTemplate.postForObject(PAYMENT_URI + "/payment", payment, CommonResult.class);
}
@GetMapping("/consumer/payment/{id}")
public CommonResult get(@PathVariable("id") Long id){
log.info("消费者查询支付id:" + id);
return restTemplate.getForObject(PAYMENT_URI + "/payment/get/" + id, CommonResult.class);
}
}
修改RestTemplate配置:
@Configuration
public class ApplicationContextConfig {
@Bean
@LoadBalanced //添加负载均衡
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
启动Eureka集群、订单子模块、两个支付子模块,多次测试支付接口:
8001、8002支付服务轮询响应
5)服务发现
对于注册进Eureka里面的微服务,可以通过服务发现来获得服务信息
支付子模块控制器添加方法:
@Autowired
private DiscoveryClient discoveryClient;
@GetMapping("/payment/discovery")
public Object discovery()
{
//获取所有注册的微服务
List<String> services = discoveryClient.getServices();
for (String service : services) {
//获取每个微服务名称
log.info("微服务名称:" + service);
for (ServiceInstance instance : discoveryClient.getInstances(service)) {
//获取微服务每个实例的信息
log.info("服务详细信息:" + instance.getServiceId() + "\t" + instance.getHost() + "\t" + instance.getPort() + "\t" + instance.getUri());
}
}
return this.discoveryClient;
}
主启动类开启Discovery Client:
@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient //开启服务发现
public class CloudProviderPayment8001Application {
public static void main(String[] args) {
SpringApplication.run(CloudProviderPayment8001Application.class, args);
}
}
开启Eureka集群、支付模块8001,访问http://localhost:8001/payment/discovery
6)自我保护模式
Eureka Server在一定时间(默认90s)内没有接受到某个微服务的心跳,Eureka将会注销该实例。但是有时候
微服务实例是健康的,由于网路分区故障导致和Eureka Server无法正常通信,此时就需要自我保护模式。
用于一组Eureka Client和Eureka Server之间存在网络分区场景下的保护。一旦进入保护模式,Eureka
Server将会保护其中注册的微服务信息,即使微服务不可用也不会删除
禁用自我保护模式,修改cloud-eureka-server7001子模块application.yaml:
eureka:
instance:
hostname: eureka7001
client:
register-with-eureka: false
fetch-registry: false
service-url:
defaultZone: http://eureka7002:7002/eureka
server:
enable-self-preservation: false # 关闭自我保护机制(接受不到微服务的心跳信息后直接删除)
eviction-interval-timer-in-ms: 2000 # 2s没有收到信息,将微服务实例删除
支付8001子模块(Eureka Client)配置:
eureka:
instance:
instance-id: payment8001
prefer-ip-address: true
lease-renewal-interval-in-seconds: 1 # Eureka客户端向服务器发送心跳的时间间隔(默认为30s)
lease-expiration-duration-in-seconds: 2 # Eureka服务器在收到最后一次心跳的等待时间(默认为90s),超时会删除服务实例
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka/
测试:
2.Consul
1)介绍
Consul是一套分布式服务发现和配置管理系统,包括服务治理、配置中心、服务总线等,由go语言开发
官网:https://www.consul.io/
下载Console:https://www.consul.io/downloads
中文文档:https://www.springcloud/spring-cloud-consul.html
下载解压后只有一个exe文件:
运行访问:
2)注册订单、支付子模块
创建子模块cloud-providerconsul-payment8006
导入依赖:
<!-- consul依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
application.yaml:
server:
port: 8006
spring:
cloud:
consul:
host: localhost # consul服务器ip
port: 8500 # consul服务器端口号
discovery:
service-name: ${spring.application.name} # 服务名称
主启动类:
@SpringBootApplication
@EnableDiscoveryClient //开启服务发现
public class CloudProviderconsulPayment8006Application {
public static void main(String[] args) {
SpringApplication.run(CloudProviderconsulPayment8006Application.class, args);
}
}
业务类:
@RestController
@Slf4j
public class PaymentController {
@Value("${server.port}")
private String serverPort;
@GetMapping("/payment/consul")
public String paymentConsul(){
return "springCloud with consul: " + serverPort + "\t" + IdUtil.simpleUUID();
}
}
创建子模块cloud-consumer-consul-order8000
导入相同依赖
application.yaml配置相同
server:
port: 8000
spring:
cloud:
consul:
host: localhost
port: 8500
discovery:
service-name: ${spring.application.name}
主启动类添加@EnableDiscoveryClient注解
配置RestTemplate:
@Configuration
public class ApplicationContextConfig {
@Bean
@LoadBalanced //负载均衡
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
业务类:
@RestController
@Slf4j
public class OrderController {
@Autowired
private RestTemplate restTemplate;
//服务名称
private static final String PAYMENT_URI = "http://consul-provider-payment";
@GetMapping("/consumer/payment/consul")
public String create(){
return restTemplate.getForObject(PAYMENT_URI + "/payment/consul", String.class);
}
}
启动两个新的子模块测试:
3.Nacos
1)介绍
服务注册中心 + 配置中心(Nacos = Eureka + Config + Bus)
官网:https://nacos.io/zh-cn/
下载地址:https://github/alibaba/nacos/releases
2)安装
版本说明:https://github/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E
由于需要和springcloud alibaba版本兼容,在此下载nacos1.1.4版本,地址:https://github/alibaba/naco
s/releases/tag/1.1.4
解压后进入bin目录下,startup.cmd直接运行即可
浏览器访问8848端口,/nacos路径:
用户名密码均为nacos登录:
3)注册支付子模块
创建支付子模块cloud-alibaba-provider-payment9001
导入依赖:
<!-- nacos依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置文件application.yaml:
server:
port: 9001
spring:
application:
name: nacos-payment-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
management:
endpoints:
web:
exposure:
include: '*'
主启动类:
@SpringBootApplication
@EnableDiscoveryClient
public class CloudAlibabaProviderPayment9001Application {
public static void main(String[] args) {
SpringApplication.run(CloudAlibabaProviderPayment9001Application.class, args);
}
}
业务类:
@RestController
public class PaymentController {
@Value("${server.port}")
private String serverPort;
@GetMapping("/payment/nacos/{id}")
public String getPayment(@PathVariable("id") Long id){
return "nacos registry, serverPort:" + serverPort + "\t id:" + id;
}
}
运行测试:
相同操作创建支付子模块cloud-alibaba-provider-payment9002,用于测试负载均衡
4)注册订单子模块
创建支付子模块cloud-alibaba-consumer-order9000:
导入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置文件application.yaml:
server:
port: 9000
spring:
application:
name: nacos-order-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
management:
endpoints:
web:
exposure:
include: '*'
# 消费者将要访问的微服务名称
service-url:
nacos-user-server: http://nacos-payment-service
主启动类:
@SpringBootApplication
@EnableDiscoveryClient
public class CloudAlibabaConsumerOrder9000Application {
public static void main(String[] args) {
SpringApplication.run(CloudAlibabaConsumerOrder9000Application.class, args);
}
}
配置负载均衡:
@Configuration
public class ApplicationContextConfig {
@Bean
@LoadBalanced //负载均衡
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
业务类:
@RestController
@Slf4j
public class OrderController {
@Resource
private RestTemplate restTemplate;
@Value("${service-url.nacos-user-server}")
private String serverURL;
@GetMapping("/consumer/payment/nacos/{id}")
public String paymentInfo(@PathVariable("id") Long id){
return restTemplate.getForObject(serverURL + "/payment/nacos/" + id, String.class);
}
}
运行三个子模块,测试:
4.CAP
C:一致性(Consistency)
A:可用性(Available)
P:分区容错性(Partition tolerance)
CAP理论关注的是数据,一个分布式系统最多只能同时满足这三项中的两项
CA - 单点集群,满足可用性、一致性,不易扩展
CP - 满足一致性、分区容错性,性能不是特别高
AP - 满足可用性,分区容错性,对一致性要求低一些
Eureka和Consul对比
注册中心 | 开发语言 | CAP | 暴露接口 |
---|---|---|---|
Eureka | Java | AP | HTTP |
Consul | Go | CP | HTTP/DNS |
Nacos | Java | AP/CP | HTTP/DNS/UDP |
注意:
Nacos支持AP/CP模式切换,命令:
curl -X PUT 'Nacos IP:8848/nacos/v1/ns/operator/switches?entry=serverMode&value=CP'
六、服务调用
1.Ribbon
1)介绍
Ribbon是负载均衡和RestTemplate调用的结合
官网:https://github/Netflix/ribbon/wiki/Getting-Started
负载均衡(Load Balance):
将用户请求平摊的分配到多个服务中,从而达到系统的高可用,常见的负载均衡软件Nginx、LVS、硬件V5等
Ribbon和Nginx负载均衡区别:
-
Niginx是服务器负载均衡,客户端所有请求都会交给Nginx,由Nginx实现转发
-
Ribbon是本地进程内负载均衡,在调用微服务接口时,会在注册中心获取注册服务信息缓存在本地JVM中,然后在本地实现RPC远程调用
2)负载规则
订单子模块调用支付子模块实现负载均衡,导入了Eureka Clitent依赖,其中自动引入ribbon组件
负载规则和IRule接口相关,根据特定算法选取一个服务实例
IRule接口类图:
规则 | 描述 |
---|---|
RoundRobinRule | 轮询(默认) |
RandomRule | 随机 |
RetryRule | 先按照RoundRobinRule(轮询)策略获取服务,如果获取服务失败则在指定的时间内进行重试 |
WeightResponseTimeRule | 对RoundRobinRule的扩展,响应速度越快的实例选择权重越大 |
BestAvailableRule | 会过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发小的服务 |
AvailabilityFilteringRule | 先过滤掉故障实例,再选择并发较小的服务 |
ZoneAvoidanceRule | 默认规则,复合判断服务器所在区域的服务器的可用性进行选择 |
更改默认轮询规则:
定义规则配置类,不能放在**@ComponentScan所扫描的包及其子包下**,否则定义的配置类会被所有Ribbon
客户端共享,达不到特殊化定制目的
@Configuration
public class MyRule {
@Bean
public IRule myselfRule(){
return new RandomRule(); //更改为随机访问
}
}
修改主启动类:
@SpringBootApplication
@EnableEurekaClient
//替换默认的负载规则(轮询),MyRule类需要放在springboot启动类所在包的外部
@RibbonClient(name = "CLOUD-PAYMENT-SERVICE", configuration = MyRule.class)
public class CloudConsumerOrder80Application {
public static void main(String[] args) {
SpringApplication.run(CloudConsumerOrder80Application.class, args);
}
}
启动测试:
3)源码分析
轮询负载均衡算法本质:
请求数 % 服务器集群总数 = 实际调用服务器位置的下标(每次服务重启后从1开启计数)
//规则接口
public interface IRule {
//根据负载均衡算法选择响应的服务
Server choose(Object var1);
void setLoadBalancer(ILoadBalancer var1);
ILoadBalancer getLoadBalancer();
}
//轮询规则
public class RoundRobinRule extends AbstractLoadBalancerRule {
//原子整型类
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public RoundRobinRule() {
this.nextServerCyclicCounter = new AtomicInteger(0);
}
public RoundRobinRule(ILoadBalancer lb) {
this();
this.setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
} else {
Server server = null;
int count = 0;
while(true) {
if (server == null && count++ < 10) {
//获取健康能响应的服务
List<Server> reachableServers = lb.getReachableServers();
//获取所有的服务
List<Server> allServers = lb.getAllServers();
//记录健康能响应的服务数量
int upCount = reachableServers.size();
//记录所有的服务总数
int serverCount = allServers.size();
if (upCount != 0 && serverCount != 0) {
//获取响应服务的下标
int nextServerIndex = this.incrementAndGetModulo(serverCount);
server = (Server)allServers.get(nextServerIndex);
if (server == null) {
Thread.yield();
} else {
if (server.isAlive() && server.isReadyToServe()) {
return server;
}
server = null;
}
continue;
}
log.warn("No up servers available from load balancer: " + lb);
return null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: " + lb);
}
return server;
}
}
}
//modulo为服务总数量
private int incrementAndGetModulo(int modulo) {
int current;
int next;
//自旋锁 保证并发安全
do {
//依次递增
current = this.nextServerCyclicCounter.get();
next = (current + 1) % modulo;
} while(!this.nextServerCyclicCounter.compareAndSet(current, next));
return next;
}
......
}
4)使用自定义轮询算法
8000端口订单子模块中
自定义LoadBalancer接口:
public interface LoadBalancer {
//传入所有服务集合,返回选择的服务
ServiceInstance instance(List<ServiceInstance> serviceInstances);
}
实现类MyLoadBalancer:
@Component
@Slf4j
public class MyLoadBalancer implements LoadBalancer {
private AtomicInteger atomicInteger = new AtomicInteger(0);
public final int getAndIncrement(){
int current, next;
//自旋锁
do
{
current = atomicInteger.get();
//Integer.MAX_VALUE 2147483637
next = current >= 2147483637 ? 0 : current + 1;
} while (!this.atomicInteger.compareAndSet(current, next));
log.info("访问次数:" + next);
return next;
}
@Override
public ServiceInstance instance(List<ServiceInstance> serviceInstances) {
int index = getAndIncrement() % serviceInstances.size();
return serviceInstances.get(index);
}
}
添加控制器方法:
@Autowired
private RestTemplate restTemplate;
@Resource
private LoadBalancer balancer;
@Autowired
private DiscoveryClient discoveryClient;
@GetMapping("/consumer/payment/lb")
public String getPaymentLoadBalance()
{
//通过服务发现获取所有支付服务集合
List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT-SERVICE");
if(ObjectUtils.isEmpty(instances))
{
return null;
}
//调用自定义轮询算法,传入所有支付服务集合,选择出一个支付服务
ServiceInstance instance = balancer.instance(instances);
URI uri = instance.getUri();
log.info("轮询的uri:" + uri);
return restTemplate.getForObject(uri + "/payment/lb", String.class);
}
两个支付子模块控制器添加方法:
@GetMapping("/payment/lb")
public String getPaymentLoadBalance(){
return serverPort;
}
测试:
2.OpenFeign
1)介绍
OpenFeign是一个声明式的web服务客户端,简化服务客户端开发,只需创建一个接口并在接口上添加注解
官网:https://www.springcloud/spring-cloud-greenwich.html#_spring_cloud_openfeign
2)远程调用服务
创建新模块cloud-consumer-feign-order8000
导入依赖:
<!-- 导入OpenFeign依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
application.yaml:
server:
port: 8000
eureka:
client:
register-with-eureka: false
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka
主启动类:
@SpringBootApplication
@EnableFeignClients //开启Feign客户端
public class CloudConsumerFeignOrder8000Application {
public static void main(String[] args) {
SpringApplication.run(CloudConsumerFeignOrder8000Application.class, args);
}
}
业务类:
@Component
@FeignClient(value = "CLOUD-PAYMENT-SERVICE") //配置支付微服务名称,调用接口方法访问支付服务相应方法
public interface PaymentService {
@GetMapping("/payment/get/{id}")
CommonResult getPayment(@PathVariable("id") Long id);
}
控制器:
@RestController
@Slf4j
public class OrderFeignController {
@Autowired
private PaymentService paymentService;
@GetMapping("/consumer/payment/{id}")
public CommonResult getPaymentById(@PathVariable("id") Long id)
{
log.info("feign id:" + id);
return paymentService.getPayment(id);
}
}
测试:
OpenFeign默认引入Ribbon有负载均衡功能
3)超时控制
8001支付子模块,添加控制器方法:
@Value("${server.port}")
private String serverPort;
@GetMapping("/payment/timeout")
public String timeout(){
//延时3s 模拟耗时任务
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return serverPort;
}
订单子模块添加Feign接口方法:
@Component
@FeignClient(value = "CLOUD-PAYMENT-SERVICE")
public interface PaymentService {
@GetMapping("/payment/get/{id}")
CommonResult getPayment(@PathVariable("id") Long id);
//调用支付服务耗时任务
@GetMapping("/payment/timeout")
String timeout();
}
添加控制器方法:
@RestController
@Slf4j
public class OrderFeignController {
@Autowired
private PaymentService paymentService;
@GetMapping("/consumer/payment/{id}")
public CommonResult getPaymentById(@PathVariable("id") Long id)
{
log.info("feign id:" + id);
return paymentService.getPayment(id);
}
//OpenFeign默认只等待服务1s 超时报错
@GetMapping("/consumer/payment/timeout")
public String timeout(){
return paymentService.timeout();
}
}
测试:
解决方法:
添加application.yaml超时配置
# 配置Feign和ribbon的超时时间都可以,但是如果两个同时配置Feign的优先级会更高
feign:
client:
config:
default:
ReadTimeout: 5000 # 设置建立连接使用时间
ConnectTimeout: 5000 # 设置建立连接后连接服务读取可用资源时间
ribbon:
ReadTimeout: 5000 # 设置建立连接使用时间
ConnectTimeout: 5000 # 设置建立连接后连接服务读取可用资源时间
测试:
4)配置OpenFeign日志
日志级别 | 说明 |
---|---|
NONE | 不显示日志(默认) |
BASIC | 只记录请求方法、URL、响应状态码、执行时间 |
HEADERS | 除了BASIC之外还包括请求头、响应头信息 |
FULL | 除了HEADERS之外还包括请求体、响应体及原数据 |
添加日志配置类:
@Configuration
public class FeignConfig {
@Bean
public Logger.Level feignLoggerLevel(){
return Logger.Level.FULL; //设置Feign日志级别 最详细
}
}
添加application.yaml配置:
logging:
level:
com.qingsongxyz.service.PaymentService: debug
测试打印日志信息:
七、服务降级
1.Hystrix
1)介绍
服务雪崩:
多个微服务之间调用,一个调用另外几个,依次调用,称为扇出,如果其中某个微服务调用时间过长或者不
可用,对于该服务的调用会占用越来越多的系统资源,进而引起系统崩溃
Hystrix是处理分布式系统延迟、容错的开源库,保证某一个微服务故障,不会导致整体服务失败,产生级联
故障,提高分布式系统的弹性。相当于一个断路器,当发生故障时向调用方返回一个符合预期、可处理的备
选响应(FallBack),而不是长时间等待或者抛出异常
官网:https://github/Netflix/Hystrix/wiki/How-To-Use
基本概念:
服务降级(FallBack):
当系统不可用时,向客户端返回一个友好提示,不让其一直等待
产生情况:
- 程序产生异常
- 超时
- 服务熔断触发服务降级
- 线程池/信号量打满
服务熔断(Break):达到最大访问后,直接拒绝访问,然后调用服务降级的方法并返回友好提示
服务限流(FlowLimit):秒杀高并发场景,请求排队,有序进行访问
2)搭建Hystrix服务
创建子模块cloud-provider-hystrix-payment8001
导入依赖:
<!-- hystrix依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
application.yaml:
server:
port: 8001
spring:
application:
name: cloud-payment-hystrix-service
eureka:
instance:
instance-id: hystrixPayment8001
prefer-ip-address: true
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka/
主启动类:
@SpringBootApplication
@EnableEurekaClient
public class CloudProviderHystrixPayment8001Application {
public static void main(String[] args) {
SpringApplication.run(CloudProviderHystrixPayment8001Application.class, args);
}
}
Service:
public interface PaymentService {
String paymentInfo_ok(Long id);
String paymentInfo_Timeout(Long id);
}
ServiceImpl:
@Service
public class PaymentServiceImpl extends ServiceImpl<PaymentMapper, Payment> implements PaymentService {
@Override
public String paymentInfo_ok(Long id) {
return "线程:" + Thread.currentThread().getName() + "paymentInfo_OK, id:" + id;
}
@Override
public String paymentInfo_Timeout(Long id) {
try {
//休眠3s 模拟耗时任务
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "线程:" + Thread.currentThread().getName() + "paymentInfo_timeout, id:" + id;
}
}
控制器:
@RestController
@Slf4j
public class PaymentController {
@Autowired
private PaymentService paymentService;
@Value("${server.port}")
private String serverPort;
@GetMapping("/payment/hystrix/ok/{id}")
public String paymentInfo_ok(@PathVariable("id") Long id) {
String result = paymentService.paymentInfo_ok(id);
log.info("result:" + result);
return result;
}
@GetMapping("/payment/hystrix/timeout/{id}")
public String paymentInfo_Timeout (@PathVariable("id") Long id){
String result = paymentService.paymentInfo_Timeout(id);
log.info("result:" + result);
return result;
}
}
启动测试:
高并发场景下出现卡顿:
使用JMemter进行压力测试
测试:
访问paymentInfo_ok方法出现卡顿
创建子模块cloud-consumer-feign-hystrix-order8000
导入相同依赖,加上OpenFeign
application.yaml:
server:
port: 8000
eureka:
client:
register-with-eureka: false
fetch-registry: tru
service-url:
defaultZone: http://localhost:7001/eureka
主启动类:
@SpringBootApplication
@EnableFeignClients
public class CloudConsumerFeignHystrixOrder8000Application {
public static void main(String[] args) {
SpringApplication.run(CloudConsumerFeignHystrixOrder8000Application.class, args);
}
}
Feign接口:
@Component
@FeignClient(value = "CLOUD-PAYMENT-HYSTRIX-SERVICE")
public interface PaymentService {
@GetMapping("/payment/hystrix/ok/{id}")
String paymentInfo_ok(@PathVariable("id") Long id);
@GetMapping("/payment/hystrix/timeout/{id}")
String paymentInfo_Timeout(@PathVariable("id") Long id);
}
控制器:
@RestController
@Slf4j
public class OrderController {
@Autowired
private PaymentService paymentService;
@Value("${server.port}")
private String serverPort;
@GetMapping("/consumer/payment/hystrix/ok/{id}")
public String paymentInfo_ok(@PathVariable("id") Long id){
String result = paymentService.paymentInfo_ok(id);
log.info("result:" + result);
return result;
}
@GetMapping("/consumer/payment/hystrix/timeout/{id}")
public String paymentInfo_Timeout(@PathVariable("id") Long id){
String result = paymentService.paymentInfo_Timeout(id);
log.info("result:" + result);
return result;
}
}
在压力测试下,访问订单服务接口远程调用支付服务:
3)支付模块配置服务降级
支付模块设置自身最大时长,超时后做服务降级
修改ServiceImpl:
@Service
public class PaymentServiceImpl extends ServiceImpl<PaymentMapper, Payment> implements PaymentService {
@Override
public String paymentInfo_ok(Long id) {
return "线程:" + Thread.currentThread().getName() + "paymentInfo_OK, id:" + id;
}
@Override
//异常或不可用时 进行服务降级 调用fallbackMethod
@HystrixCommand(fallbackMethod = "paymentInfo_TimeoutHandler", commandProperties = {
//正常访问时间在3s以内 超时 服务降级 调用fallbackMethod
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "3000")
})
public String paymentInfo_Timeout(Long id) {
//int i = 10 / 0; //抛出异常 进行服务降级
try {
//休眠5s 模拟耗时任务
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "线程:" + Thread.currentThread().getName() + "paymentInfo_timeout, id:" + id;
}
public String paymentInfo_TimeoutHandler(Long id){
return "线程:" + Thread.currentThread().getName() + "paymentInfo_timeout, id:" + id + "\t" + "系统繁忙,请稍后再试!!!";
}
}
主启动类添加@EnableHystrix注解:
@SpringBootApplication
@EnableEurekaClient
@EnableHystrix //开启Hystrix
public class CloudProviderHystrixPayment8001Application {
public static void main(String[] args) {
SpringApplication.run(CloudProviderHystrixPayment8001Application.class, args);
}
}
测试:
超时或者抛出异常都会进行服务降级
4)订单模块配置服务降级
修改application.yaml:
spring:
application:
name: cloud-order-hystrix-service
eureka:
instance:
instance-id: hystrixOrder8000
prefer-ip-address: true
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka/
feign:
hystrix:
enabled: true # 支持hystrix
主启动类添加@EnableHystrix注解:
@SpringBootApplication
@EnableFeignClients
@EnableHystrix //开启Hystrix
public class CloudConsumerFeignHystrixOrder8000Application {
public static void main(String[] args) {
SpringApplication.run(CloudConsumerFeignHystrixOrder8000Application.class, args);
}
}
修改业务类:
@RestController
@Slf4j
public class OrderController {
@Autowired
private PaymentService paymentService;
@Value("${server.port}")
private String serverPort;
@GetMapping("/consumer/payment/hystrix/ok/{id}")
public String paymentInfo_ok(@PathVariable("id") Long id){
String result = paymentService.paymentInfo_ok(id);
log.info("result:" + result);
return result;
}
@GetMapping("/consumer/payment/hystrix/timeout/{id}")
//异常或不可用时 进行服务降级 调用fallbackMethod
@HystrixCommand(fallbackMethod = "paymentInfo_TimeoutHandler", commandProperties = {
//正常访问时间在1.5s以内 超时 服务降级 调用fallbackMethod
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1500")
})
public String paymentInfo_Timeout(@PathVariable("id") Long id){
//int i = 10 / 0; //抛出异常 进行服务降级
String result = paymentService.paymentInfo_Timeout(id);
log.info("result:" + result);
return result;
}
public String paymentInfo_TimeoutHandler(@PathVariable("id") Long id){
return "支付系统繁忙,请稍后再试!!!";
}
}
测试:
订单服务超时或者抛出异常导致服务降级
5)全局服务降级
每个方法都配置独有的FallBack方法,代码太臃肿,耦合度太高,需要配置全局FallBack方法
修改控制器:
没有指定fallbackMethod的方法服务降级调用全局FallBack方法
@RestController
@Slf4j
@DefaultProperties(defaultFallback = "globalHandler") //声明统一服务降级处理
public class OrderController {
@Autowired
private PaymentService paymentService;
@Value("${server.port}")
private String serverPort;
@GetMapping("/consumer/payment/hystrix/ok/{id}")
@HystrixCommand //需要添加服务降级注解
public String paymentInfo_ok(@PathVariable("id") Long id){
int i = 10 / 0; //测试全局服务降级
String result = paymentService.paymentInfo_ok(id);
log.info("result:" + result);
return result;
}
@GetMapping("/consumer/payment/hystrix/timeout/{id}")
//异常或不可用时 进行服务降级 调用fallbackMethod
@HystrixCommand(fallbackMethod = "paymentInfo_TimeoutHandler", commandProperties = {
//正常访问时间在1.5s以内 超时 服务降级 调用fallbackMethod
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1500")
})
public String paymentInfo_Timeout(@PathVariable("id") Long id){
//int i = 10 / 0; //抛出异常 进行服务降级
String result = paymentService.paymentInfo_Timeout(id);
log.info("result:" + result);
return result;
}
public String paymentInfo_TimeoutHandler(){
return "支付系统繁忙,请稍后再试!!!";
}
//全局FallBack处理
public String globalHandler(){
return "Global FallBack:请稍后再试!!!";
}
}
测试:
Feign接口配置服务降级,进行解耦,控制器中只有业务方法,没有FallBack方法
修改Feign接口:
@Component
//fallback 配置服务降级类 当服务宕机后 调用服务降级
@FeignClient(value = "CLOUD-PAYMENT-HYSTRIX-SERVICE", fallback = PaymentFallbackServiceImpl.class)
public interface PaymentService {
@GetMapping("/payment/hystrix/ok/{id}")
String paymentInfo_ok(@PathVariable("id") Long id);
@GetMapping("/payment/hystrix/timeout/{id}")
String paymentInfo_Timeout(@PathVariable("id") Long id);
}
接口实现类:
/**
* 给每一个方法配置fallback 进行解耦
*/
@Component
public class PaymentFallbackServiceImpl implements PaymentService {
@Override
public String paymentInfo_ok(Long id) {
return "paymentInfo_ok服务繁忙,请稍后再试!!!";
}
@Override
public String paymentInfo_Timeout(Long id) {
return "paymentInfo_Timeout服务繁忙,请稍后再试!!!";
}
}
@RestController
@Slf4j
public class OrderController {
@Autowired
private PaymentService paymentService;
@Value("${server.port}")
private String serverPort;
@GetMapping("/consumer/payment/hystrix/ok/{id}")
public String paymentInfo_ok(@PathVariable("id") Long id){
String result = paymentService.paymentInfo_ok(id);
log.info("result:" + result);
return result;
}
@GetMapping("/consumer/payment/hystrix/timeout/{id}")
public String paymentInfo_Timeout(@PathVariable("id") Long id){
String result = paymentService.paymentInfo_Timeout(id);
log.info("result:" + result);
return result;
}
}
测试:
当远程调用的服务宕机后,进行服务降级
6)配置服务熔断
熔断机制是一种链路保护机制,当某个微服务不可用或响应时间太长时,会进行服务降级,熔断该节点服务
的调用,返回提示信息,当检测到该节点微服务调用响应正常后,恢复调用链路
断路器理论:https://martinfowler/bliki/CircuitBreaker.html
修改cloud-provider-hystrix-payment8001业务类:
@Service
public class PaymentServiceImpl extends ServiceImpl<PaymentMapper, Payment> implements PaymentService {
@Override
public String paymentInfo_ok(Long id) {
return "线程:" + Thread.currentThread().getName() + "paymentInfo_OK, id:" + id;
}
@Override
//异常或不可用时 进行服务降级 调用fallbackMethod
@HystrixCommand(fallbackMethod = "paymentInfo_TimeoutHandler", commandProperties = {
//正常访问时间在3s以内 超时 服务降级 调用fallbackMethod
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "3000")
})
public String paymentInfo_Timeout(Long id) {
//int i = 10 / 0; //抛出异常 进行服务降级
try {
//休眠5s 模拟耗时任务
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "线程:" + Thread.currentThread().getName() + "paymentInfo_timeout, id:" + id;
}
public String paymentInfo_TimeoutHandler(Long id){
return "线程:" + Thread.currentThread().getName() + "paymentInfo_timeout, id:" + id + "\t" + "系统繁忙,请稍后再试!!!";
}
//配置断路器 10s内请求10次 如果有60%(6次)失败 就开启断路器
@HystrixCommand(fallbackMethod = "paymentCircuitBreaker_fallback", commandProperties = {
//是否开启断路器
@HystrixProperty(name="circuitBreaker.enabled", value = "true"),
//请求次数
@HystrixProperty(name="circuitBreaker.requestVolumeThreshold", value = "10"),
//时间窗口期
@HystrixProperty(name="circuitBreaker.sleepWindowInMilliseconds", value = "10000"),
//达到60%失败率跳闸
@HystrixProperty(name="circuitBreaker.errorThresholdPercentage", value = "60")
})
public String paymentCircuitBreaker(@PathVariable("id") Long id)
{
if(id < 0)
{
throw new RuntimeException("id不能小于0!!!");
}
String serialNumber = IdUtil.simpleUUID();
return Thread.currentThread().getName() + "\t" + "调用成功,流水号:" + serialNumber;
}
//服务熔断 导致调用服务降级方法
public String paymentCircuitBreaker_fallback(@PathVariable("id") Long id)
{
return "id为负数,请稍后再试!!!";
}
}
添加控制器方法:
@GetMapping("/payment/hystrix/circuit/{id}")
public String paymentCircuitBreaker (@PathVariable("id") Long id){
String result = paymentService.paymentCircuitBreaker(id);
log.info("result:" + result);
return result;
}
断路器相关参数可以参考HystrixCommandProperties
类
测试:
多次访问http://localhost:8001/payment/hystrix/circuit/-1 (传入负值),导致断路器开启
然后访问http://localhost:8001/payment/hystrix/circuit/1 (传入正值),一段时间内拒绝访问,慢慢恢复,直到
断路器半开,请求成功,最后关闭断路器
7)Hystrix工作流程
1.Hystrix有两种配置方式,注解配置和编程式配置,都会构造出命令对象,执行第二步
2.两个命令对象最后都会调用 toObservable()方法,执行第三步
3.判断缓存中是否有响应结果,如果有直接返回,否则执行第四步
4.判断断路器是否处于开启状态,如果是支持第八步,否则执行第五步
5.判断线程池是否打满状态并设置执行第七步,如果是支持第八步,否则执行第六步
6.调用construct()或者run()方法并执行第七步,如果运行抛出异常或超时则执行第八步,否则执行第九步
7.设置断路器状态
8.进行服务降级,调用FallBack方法,如果没有配置FallBack方法或者调用失败会产生错误,否则返回提示信息
9.返回正确调用结果
8)Hystrix图形化搭建
创建子模块cloud-consumer-hystrix-dashboard9001
导入依赖:
<!-- Hystrix仪表盘依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
application.yaml:
server:
port: 9001
主启动类添加@EnableHystrixDashboard:
@SpringBootApplication
@EnableHystrixDashboard //开启Hystrix仪表板图形化界面
public class CloudConsumerHystrixDashboard9001Application {
public static void main(String[] args) {
SpringApplication.run(CloudConsumerHystrixDashboard9001Application.class, args);
}
}
启动运行,访问http://localhost:9001/hystrix:
监控cloud-provider-hystrix-payment8001模块:
被监控的服务需要导入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
启动测试:
解决方法:
修改主启动类,注入Bean
@SpringBootApplication
@EnableEurekaClient
@EnableHystrix
public class CloudProviderHystrixPayment8001Application {
public static void main(String[] args) {
SpringApplication.run(CloudProviderHystrixPayment8001Application.class, args);
}
//显示dashboard 需要注入这个Bean
@Bean
public ServletRegistrationBean getServlet(){
HystrixMetricsStreamServlet streamServlet = new HystrixMetricsStreamServlet();
ServletRegistrationBean registrationBean = new ServletRegistrationBean(streamServlet);
registrationBean.setLoadOnStartup(1);
registrationBean.addUrlMappings("/hystrix.stream");
registrationBean.setName("HystrixMetricsStreamServlet");
return registrationBean;
}
}
多次访问配置熔断方法:
注:
不同颜色数值对应表
颜色 | 描述 |
---|---|
绿色 | 成功数 |
蓝色 | 熔断数 |
青色 | 错误请求数 |
橙色 | 超时数 |
紫色 | 线程池拒绝数 |
红色 | 失败/异常数 |
灰色 | 最近10s错误百分比 |
(点的颜色 --> 健康程度)绿色 < 黄色 < 橙色 < 红色
2.Sentinel
1)介绍
alibaba提供界面进行细粒度配置服务降级、服务熔断、服务限流
官网:https://github/alibaba/Sentinel/wiki/%E4%BB%8B%E7%BB%8D
Sentinel 分为两个部分:
- 核心库(Java 客户端)不依赖任何框架/库,能够运行于所有 Java 运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持。
- 控制台(Dashboard)基于 Spring Boot 开发,打包后可以直接运行,不需要额外的 Tomcat 等应用容器。
2)安装
下载地址:https://github/alibaba/Sentinel/releases
由于需要兼容springcloud alibaba版本,后续使用Sentinel1.7.0
java -jar运行,浏览器访问8080端口:
用户名密码均为sentinel登录:
3)监测微服务
创建子模块cloud-alibaba-sentinel-service8401
导入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 持久化 -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
<!-- sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置文件application.yml:
server:
port: 8401
spring:
application:
name: cloud-alibaba-sentinel-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
sentinel:
transport:
dashboard: localhost:8080 # 配置sentinel dashboard地址
port: 8719 # 默认端口为 8719 如果端口被占用,会自动从8719开启往后寻找未被占用的端口
management:
endpoints:
web:
exposure:
include: '*'
主启动类:
@EnableDiscoveryClient
@SpringBootApplication
public class CloudAlibabaSentinelService8401Application {
public static void main(String[] args) {
SpringApplication.run(CloudAlibabaSentinelService8401Application.class, args);
}
}
业务类:
@Slf4j
@RestController
public class TestController {
@GetMapping("/testA")
public String testA(){
log.info(Thread.currentThread().getName() + "执行testA...");
return "testA...";
}
@GetMapping("/testB")
public String testB(){
return "testB...";
}
}
启动微服务测试:
控制台未显示微服务,因为Sentinel是采用懒加载机制,需要访问一次微服务即可显示
4)流控规则
阈值类型:QPS(默认)、线程数
阈值数:同一时刻访问QPS\线程数的最大值
流控模式:
- 直接(默认):达到限流条件直接限流
- 关联:当关联的资源达到阈值就限流自己
- 链路:记录指定链路上的流量,当从入口资源访问指定资源的流量达到阈值时就进行限流
流控效果:快速失败(默认)、Warm Up、排队等待
QPS直接快速失败
Sentinel配置/testA流控:
同一时间只能有一个请求访问该路径,超过阈值直接快速失败,显示默认提示信息
访问/testA路径:
线程数直接快速失败
同一时间只能有一个线程访问该路径,超过阈值直接快速失败,显示默认提示信息
浏览器开启两个页签同时访问该路径:
QPS关联快速失败
当关联资源/testB QPS达到超过阈值1时,就会限流/testA(eg.支付微服务达到阈值时就限流下单微服务)
测试:
使用Postman测试/testB接口
添加为Collection运行,QPS超过阈值1
同时浏览器访问/testA路径被限流
QPS链路快速失败
当从/testA路径访问test资源超过阈值1时,就会限流/testA
修改业务类:
public interface TestService {
void test();
}
@Service
public class TestServiceImpl implements TestService {
@SentinelResource(value = "test") //标记资源名称为test
@Override
public void test() {
System.out.println("test");
}
}
@Slf4j
@RestController
public class TestController {
@Autowired
private TestService testService;
@GetMapping("/testA")
public String testA(){
testService.test(); //访问test资源
return "testA...";
}
@GetMapping("/testB")
public String testB(){
testService.test(); //访问test资源
return "testB...";
}
}
从Sentinel1.6.3 版本开始,Sentinel Web filter默认收敛所有URL的入口context,因此链路限流不生效。
1.7.0 版本(springcloud alibaba的2.1.1.RELEASE),官方在CommonFilter 引入了WEB_CONTEXT_UNIFY
参数,将其配置为 false 即可进行链路限流。
由于新版1.7.0版本中CommonFilter类中没有WEB_CONTEXT_UNIFY
参数,需要手动配置
修改配置文件application.yml:
server:
port: 8401
spring:
application:
name: cloud-alibaba-sentinel-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
sentinel:
transport:
dashboard: localhost:8080
port: 8719
filter:
enabled: false # 关闭Sentinel官方的CommonFilter实例化
management:
endpoints:
web:
exposure:
include: '*'
编写FilterConfig类,手动配置:
@Configuration
public class FilterConfig {
@Bean
public FilterRegistrationBean<CommonFilter> sentinelFilterRegistration() {
FilterRegistrationBean<CommonFilter> registrationBean = new FilterRegistrationBean<>();
registrationBean.setFilter(new CommonFilter());
registrationBean.addUrlPatterns("/*");
// 关闭入口资源聚合 开启链路限流
registrationBean.addInitParameter(CommonFilter.WEB_CONTEXT_UNIFY, "false");
registrationBean.setName("sentinelFilter");
registrationBean.setOrder(1);
return registrationBean;
}
}
编写异常处理类:
@RestControllerAdvice
public class FlowExceptionHandler {
@ExceptionHandler(FlowException.class)
public String handleFlowException(FlowException e){
return "Blocked by Sentinel (flow limiting): 链路限流...";
}
}
启动测试:
QPS直接Warm up
冷加载因子默认为3,在预热时长5s内阈值为6/3即2,在5s后阈值会逐渐上升为6
测试:
一开始频繁超过阈值进行限流,5s后阈值上升未被限流
QPS直接排队等待
当QPS超过阈值1时,进行排队等待,超时时间为20s
修改testA()方法:
@GetMapping("/testA")
public String testA(){
log.info(Thread.currentThread().getName());
return "testA...";
}
使用Postman进行测试:
每秒响应一个请求
5)降级规则
降级策略:
- RT:平均响应时间超出阈值 并且 在时间窗口期内通过请求数 ≥ 5时触发服务降级,RT最大4900ms(秒级)
- 异常比例:QPS ≥ 5 且 异常比例超过阈值触达降级
- 异常数:超过阈值触发降级(分钟级)
窗口期过后关闭降级
RT降级
当平均响应时间超过200ms并且在时间窗口1s内请求数 ≥ 5时触发服务降级
添加控制器方法:
/**
* 测试RT
* @return
*/
@GetMapping("/testC")
public String testC() throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
log.info("testC...");
return "testC...";
}
Jmeter测试:
异常比例降级
当QPS ≥ 5 且 1s内异常比例超过阈值0.2时进行降级
添加控制器方法:
/**
* 测试异常比例
* @return
*/
@GetMapping("/testD")
public String testD() {
int i = 10 / 0;
log.info("testD...");
return "testD...";
}
Jmeter测试:
关闭Jmeter后QPS没有达到5抛出异常
异常数降级
在时间窗口60s内异常数达到阈值5进行降级(分钟级),时间窗口必须 ≥ 60否则结束熔断状态后仍可能再进入
熔断
添加控制器方法:
/**
* 测试异常数
* @return
*/
@GetMapping("/testE")
public String testE() {
int i = 10 / 0;
log.info("testE...");
return "testE...";
}
测试:
6)热点规则
我们希望对热点(经常访问的数据)进行限制,Sentinel 利用 LRU 策略统计最近最常访问的热点参数,结合令
牌桶算法来进行参数级别的流控
在窗口时长1s内携带第一个参数访问资源testHotKey达到阈值1时进行降级
添加控制器方法:
/**
* 测试热点规则
* @return
*/
@GetMapping("/testHotKey")
//资源名为testHotKey fallback方法为deal_testHotKey
@SentinelResource(value = "testHotKey", blockHandler = "deal_testHotKey")
public String testHotKeyHotKey(@RequestParam(value = "p1", required = false) String p1,
@RequestParam(value = "p2", required = false) String p2) {
log.info("testHotKey...");
return "testHotKey...";
}
public String deal_testHotKey(String p1, String p2, BlockException exception){
return "deal_testHotKey...";
}
访问测试:
修改热点规则,配置参数例外项,当第一个参数值为5时阈值为10
访问测试:
7)系统规则
系统保护规则是从应用级别的入口流量进行控制,从单台机器的 load、CPU 使用率、平均 RT、入口 QPS
和并发线程数等几个维度监控应用指标,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
系统规则支持以下的模式:
- Load 自适应(仅对 Linux/Unix-like 机器生效):系统的 load1 作为启发指标,进行自适应系统保护。当系统 load1 超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才会触发系统保护(BBR 阶段)。系统容量由系统的
maxQps * minRt
估算得出。设定参考值一般是CPU cores * 2.5
。 - CPU usage(1.5.0+ 版本):当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0),比较灵敏。
- 平均 RT:当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。
- 并发线程数:当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。
- 入口 QPS:当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。
入口QPS测试:
8)自定义服务降级
使用@SentinelResource注解
@RestController
@Slf4j
public class RateLimitController {
@GetMapping("/resource")
//value指定资源名称 blockHandle指定处理方法
@SentinelResource(value = "resource", blockHandler = "handleGetResource")
public CommonResult getResource(){
return CommonResult.ok("按资源名称限流成功!", new Payment(1L, "水"));
}
public CommonResult handleGetResource(BlockException exception){
return CommonResult.failure(500,"按资源名称限流失败!!!");
}
}
按照URL配置流控规则,显示Sentinel默认提示
按照资源名称配置流控规则,显示自定义提示:
处理方法和资源存在同一个类中耦合较大,需要分开:
@RestController
@Slf4j
public class RateLimitController {
@GetMapping("/resource")
//value指定资源名称 blockHandle指定处理类中的方法 blockHandleClass指定处理类
@SentinelResource(value = "resource", blockHandler = "handleGetResource", blockHandlerClass = MyBlockHandler.class)
public CommonResult getResource(){
return CommonResult.ok("按资源名称限流成功!", new Payment(1L, "水"));
}
}
@Component
public class MyBlockHandler {
//处理方法必须为static静态的
public static CommonResult handleGetResource(BlockException exception){
return CommonResult.failure(500,"按资源名称限流失败!!!");
}
}
测试:
9)订单支付微服务配置Sentinel
Ribbon + Sentinel
创建支付子模块cloud-alibaba-provider-payment9003
导入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置文件application.yml:
server:
port: 9003
spring:
application:
name: nacos-payment-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
management:
endpoints:
web:
exposure:
include: '*'
主启动类:
@SpringBootApplication
@EnableDiscoveryClient
public class CloudAlibabaProviderPayment9003Application {
public static void main(String[] args) {
SpringApplication.run(CloudAlibabaProviderPayment9003Application.class, args);
}
}
业务类:
@RestController
public class PaymentController {
@Value("${server.port}")
private String serverPort;
public static HashMap<Long, Payment> hashMap = new HashMap<>();
//使用HashMap模拟数据库
static {
hashMap.put(1L, new Payment(1L, "sadw4531rfsf"));
hashMap.put(2L, new Payment(2L, "s23edaf1qtge"));
hashMap.put(3L, new Payment(3L, "ntee21rtrwqe"));
}
@GetMapping("/paymentSQL/{id}")
public CommonResult getPayment(@PathVariable("id") Long id){
Payment payment = hashMap.get(id);
return CommonResult.ok("from mysql, serverPort:" + serverPort, payment);
}
}
同样配置创建支付子模块cloud-alibaba-provider-payment9004,修改端口为9004即可,用于测试负载均衡
创建订单子模块cloud-alibaba-consumer-order84
导入依赖:
<!-- nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 持久化 -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
<!-- sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置文件application.yml:
server:
port: 84
spring:
application:
name: nacos-order-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
sentinel:
transport:
dashboard: 127.0.0.1:8080
port: 8719
management:
endpoints:
web:
exposure:
include: '*'
# 消费者将要访问的微服务名称
service-url:
nacos-user-server: http://nacos-payment-service
主启动类:
@SpringBootApplication
@EnableDiscoveryClient
public class CloudAlibabaConsumerOrder84Application {
public static void main(String[] args) {
SpringApplication.run(CloudAlibabaConsumerOrder84Application.class, args);
}
}
RestTemplate配置类:
@Configuration
public class ApplicationContextConfig {
@Bean
@LoadBalanced //负载均衡
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
业务类:
@RestController
@Slf4j
public class CircuitBreakerController {
@Value("${service-url.nacos-user-server}")
private String serviceURL;
@Resource
private RestTemplate restTemplate;
@GetMapping("/consumer/fallback/{id}")
//fallback负责业务异常
//blockHandle负责限流处理
//exceptionsToIgnore忽略指定异常,不执行fallback
@SentinelResource(value = "fallback",
fallback = "handleFallback",
fallbackClass = {FallbackHandler.class},
blockHandler = "handleBlock",
blockHandlerClass = {BlockHandler.class},
exceptionsToIgnore = {IllegalArgumentException.class})
public CommonResult fallback(@PathVariable("id") Long id) {
CommonResult result = restTemplate.getForObject(serviceURL + "/paymentSQL/" + id, CommonResult.class);
if (id == 4) {
throw new IllegalArgumentException("id不存在,非法参数!!!");
} else if (result.getData() == null) {
throw new NullPointerException("id没有对应记录,空指针异常!!!");
}
return result;
}
}
FallbackHandler:
@Component
public class FallbackHandler {
/**
* 处理业务异常
* @param id 记录id
* @param e 异常
* @return CommonResult
*/
public static CommonResult handleFallback(@PathVariable("id") Long id, Throwable e){
Payment payment = new Payment(id, null);
return CommonResult.failure(500, "fallback,exception:" + e.getMessage(), payment);
}
}
@Component
public class BlockHandler {
/**
* sentinel限流处理
* @param id 记录id
* @param e 限流异常
* @return CommonResult
*/
public static CommonResult handleBlock(@PathVariable("id") Long id, BlockException e){
Payment payment = new Payment(id, null);
return CommonResult.failure(500, "sentinel限流", payment);
}
}
启动测试:
IllegalArgumentException异常被忽略不触发fallback,NullPointerException触发fallback
配置流控规则:
当fallback和blockhandler都触发时,执行blockhandler限流处理
Feign + Sentinel
修改订单子模块
配置文件application.yml添加Sentinel支持Feign:
feign:
sentinel:
enabled: true # 激活sentinel对feign的支持
主启动类:
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients //开启Feign
public class CloudAlibabaConsumerOrder84Application {
public static void main(String[] args) {
SpringApplication.run(CloudAlibabaConsumerOrder84Application.class, args);
}
}
配置Feign日志:
@Configuration
public class FeignConfig {
@Bean
public Logger.Level feignLoggerLevel(){
return Logger.Level.FULL; //设置Feign日志级别 最详细
}
}
Feign接口:
@Component
@FeignClient(value = "nacos-payment-service", fallback = PaymentServiceImpl.class)
public interface PaymentService {
@GetMapping("/paymentSQL/{id}")
CommonResult getPayment(@PathVariable("id") Long id);
}
fallback:
@Component
public class PaymentServiceImpl implements PaymentService {
@Override
public CommonResult getPayment(Long id) {
return CommonResult.failure(500, "服务不可用,请稍后重试!!!");
}
}
添加控制器方法:
@RestController
@Slf4j
public class CircuitBreakerController {
@Autowired
private PaymentService paymentService;
@GetMapping("/consumer/paymentSQL/{id}")
public CommonResult getPayment(@PathVariable("id") Long id) {
return paymentService.getPayment(id);
}
}
启动测试:
当支付微服务宕机后,执行fallback
10)规则持久化
将Sentinel配置规则写入nacos进行持久化
修改子模块cloud-alibaba-sentinel-service8401
导入依赖:
<!-- 持久化 -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
配置文件application.yml:
server:
port: 8401
spring:
application:
name: cloud-alibaba-sentinel-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
sentinel:
transport:
dashboard: localhost:8080
port: 8719
datasource: # 从nacos读取sentinel配置
ds1:
nacos:
server-addr: 127.0.0.1:8848
dataId: ${spring.application.name}
groupId: DEFAULT_GROUP
data-type: json
rule-type: flow # 流控规则
ds2:
nacos:
server-addr: 127.0.0.1:8848
dataId: ${spring.application.name}
groupId: DEGRADE_GROUP
data-type: json
rule-type: degrade
filter:
enabled: false
management:
endpoints:
web:
exposure:
include: '*'
添加业务类方法:
@GetMapping("/url")
@SentinelResource(value = "url", blockHandler = "handleGetUrl", blockHandlerClass = {MyBlockHandler.class})
public CommonResult getUrl(){
return CommonResult.ok("按url限流成功!", new Payment(1L, "水"));
}
@GetMapping("/testD")
@SentinelResource(value = "testD", blockHandler = "handleTestD", blockHandlerClass = {MyBlockHandler.class})
public String testD() {
int i = 10 / 0;
log.info("testD...");
return "testD...";
}
Sentinel控制台处理类:
@Component
public class MyBlockHandler {
public static CommonResult handleGetUrl(BlockException exception){
return CommonResult.failure(500, "按url限流失败!!!");
}
public static String handleTestD(BlockException exception){
return "触发服务降级!!!";
}
}
nacos添加规则配置:
配置源码分析:
继承关系:
参数解释:
参数名 | 解释 |
---|---|
grade | 阈值类型,0表示线程数,1表示QPS |
strategy | 流控模式,0表示直接,1表示关联,2表示链路 |
controlBehavior | 流控效果,0表示快速失败,1表示Warm up,2表示排队等待 |
参数解释:
参数名 | 解释 |
---|---|
grade | 降级策略,0表示RT,1表示异常比例,2表示异常数 |
count | 对应不同降级策略的阈值 |
QPS超过阈值1直接快速失败
时间窗口期60s内抛出5次异常触发服务降级
访问/url路径:
八、Gateway网关
1.介绍
Gateway基于Netty、Servlet3.1、WebFlux异步响应式编程
基本概念:
Router(路由):由ID、目标URI构建网关,一系列的断言的过滤器组成,如果断言为true则匹配该路由
Predicate(断言):匹配HTTP请求中的所有内容(请求头、参数等),请求与断言相匹配进行路由
Filter(过滤器):指GatewayFilter,使用该过滤器在请求路由前后对请求进行修改
2.搭建网关
创建子模块cloud-gateway-gateway9527
导入依赖(不能导入web、actuator依赖):
<!-- gateway依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
application.yaml:
server:
port: 9527
spring:
application:
name: cloud-gateway
cloud:
gateway:
# 配置两个路由 对应支付模块的控制器方法
routes:
- id: payment_routh # 路由id 唯一建议使用服务名
uri: http://localhost:8001 # 提供服务的路由地址
predicates: # 断言 路径匹配进行路由
- Path=/payment/get/**
- id: payment_routh2
uri: http://localhost:8001
predicates:
- Path=/payment/lb/**
eureka:
instance:
instance-id: cloud-gateway-service
prefer-ip-address: true
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka/
主启动类:
@SpringBootApplication
@EnableEurekaClient
public class CloudGatewayGateway9527Application {
public static void main(String[] args) {
SpringApplication.run(CloudGatewayGateway9527Application.class, args);
}
}
启动测试:
还可以通过编程配置路由:
/**
* 代码配置路由映射
*/
@Configuration
public class GatewayConfig {
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder)
{
//RouteLocatorBuilder.Builder routes = builder.routes();
//RouteLocator routeLocator = routes.route("router1", r -> r.path("/guonei")
// .uri("http://news.baidu/guonei")).build();
//
//return routeLocator;
//简化 访问/guonei 转发到百度国内新闻
return builder.routes()
.route("router1", r -> r.path("/guonei")
.uri("http://news.baidu/guonei"))
.build();
}
}
测试:
3.配置动态路由
根据注册中心的服务信息,以微服务名称作为路径动态路由进行转发,进行负载均衡
修改application.yaml:
server:
port: 9527
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true # 开启从注册中心动态创建路由功能(服务名称进行路由)
routes:
- id: payment_routh
#uri: http://localhost:8001
uri: lb://CLOUD-PAYMENT-SERVICE # 匹配提供服务的微服务名称 lb协议 负载均衡
predicates:
- Path=/payment/get/**
- id: payment_routh2
#uri: http://localhost:8001
uri: lb://CLOUD-PAYMENT-SERVICE
predicates:
- Path=/payment/lb/**
eureka:
instance:
instance-id: cloud-gateway-service
prefer-ip-address: true
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka/
启动两个支付服务、网关、Eureka测试:
4.常用Predicate
启动网关模块,打印日志:
文档:https://docs.spring.io/spring-cloud-gateway/docs/2.2.9.RELEASE/reference/html/#gateway-request-predicates-factories
1.After
在配置时间(ZonedDateTime类型)之后才能进行请求
ZonedDateTime zdt = ZonedDateTime.of(LocalDate.of(2030,1,1), LocalTime.of(0, 0), ZoneId.of("CTT", ZoneId.SHORT_IDS));
System.out.println(zdt); //2030-01-01T00:00+08:00[Asia/Shanghai]
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true # 开启从注册中心动态创建路由功能(服务名称进行路由)
routes:
- id: payment_routh2
uri: lb://CLOUD-PAYMENT-SERVICE
predicates:
- Path=/payment/lb/**
- After=2030-01-01T00:00+08:00[Asia/Shanghai] # 在这个时间之后才能访问
2.Before
在配置时间(ZonedDateTime类型)之前才能进行请求
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true # 开启从注册中心动态创建路由功能(服务名称进行路由)
routes:
- id: payment_routh2
uri: lb://CLOUD-PAYMENT-SERVICE
predicates:
- Path=/payment/lb/**
- Before=2030-01-01T00:00+08:00[Asia/Shanghai] # 在这个时间之前才能访问
3.Between
在两个配置时间(ZonedDateTime类型)之前才能进行请求
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true # 开启从注册中心动态创建路由功能(服务名称进行路由)
routes:
- id: payment_routh2
uri: lb://CLOUD-PAYMENT-SERVICE
predicates:
- Path=/payment/lb/**
- Between=2022-07-21T17:19:59.018+08:00[Asia/Shanghai], 2030-01-01T00:00+08:00[Asia/Shanghai] # 在这个时间段之间才能访问
4.Cookie
只有携带相应的Cookie才能请求
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true # 开启从注册中心动态创建路由功能(服务名称进行路由)
routes:
- id: payment_routh2
uri: lb://CLOUD-PAYMENT-SERVICE
predicates:
- Path=/payment/lb/**
- Cookie=username,root # 带上cookie才能访问
不携带cookie访问:
携带cookie访问:
5.Header
只有携带相应请求头才能访问
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true # 开启从注册中心动态创建路由功能(服务名称进行路由)
routes:
- id: payment_routh2
uri: lb://CLOUD-PAYMENT-SERVICE
predicates:
- Path=/payment/lb/**
- Header=X-Request-Id, \d+ # 带指定请求头才能访问
不携带header访问:
携带header访问:
6.Host
指定主机名才能访问
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true # 开启从注册中心动态创建路由功能(服务名称进行路由)
routes:
- id: payment_routh2
uri: lb://CLOUD-PAYMENT-SERVICE
predicates:
- Path=/payment/lb/**
- Host=**.qingsongxyz # 指定主机名才能访问
7.Method
指定请求方法才能访问
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true # 开启从注册中心动态创建路由功能(服务名称进行路由)
routes:
- id: payment_routh2
uri: lb://CLOUD-PAYMENT-SERVICE
predicates:
- Path=/payment/lb/**
- Method=GET # get请求才能访问
8.Path
匹配请求路径
9.Query
匹配请求参数,包含参数名和正则表达式
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true # 开启从注册中心动态创建路由功能(服务名称进行路由)
routes:
- id: payment_routh2
uri: lb://CLOUD-PAYMENT-SERVICE
predicates:
- Path=/payment/lb/**
- Query=id, \d+ # 携带正整数id参数才能访问
5.过滤器
在请求路由前后对请求进行修改
GatewayFilter:https://docs.spring.io/spring-cloud-gateway/docs/2.2.9.RELEASE/reference/html/#gatewayfilter-factories
GlobalFilter:https://docs.spring.io/spring-cloud-gateway/docs/2.2.9.RELEASE/reference/html/#global-filters
自定义Filter:
@Component
@Slf4j
public class MyLogGateWayFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("--------------进入MyLogGateWayFilter,时间:" + LocalDateTime.now());
String username = exchange.getRequest().getQueryParams().getFirst("username");
if(ObjectUtils.isEmpty(username))
{
log.info("-------------非法用户-------------");
exchange.getResponse().setStatusCode(HttpStatus.NOT_ACCEPTABLE);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
@Override
public int getOrder() {
return 0;
}
}
所有请求进行路由后都需要经过Filter Chain
九、配置中心
1.Config
1)介绍
每个微服务都有自己的配置文件,多个配置文件需要统一管理、动态配置
将git远程仓库中的配置文件拉取到本地,Config Server从本地仓库获取,Config Client从Config Server获取
配置
2)搭建配置中心
config访问资源格式:
在Github或Gitee上新建远程仓库
仓库中新建三个配置文件,对应开发环境、生产环境、测试环境:
新建子模块cloud-config-center3344
导入依赖:
<!-- config配置依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
application.yaml:
server:
port: 3344
spring:
application:
name: cloud-config-center
cloud:
config:
server:
git:
uri: https://gitee/pinus-thunbergii-xyz/springcloud-config.git # gitee仓库名称
skip-ssl-validation: true
force-pull: true
username: 用户名
password: 密码
label: master # 分支
eureka:
instance:
instance-id: configCenter3344
prefer-ip-address: true
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka/
主启动类添加@EnableConfigServer:
@SpringBootApplication
@EnableConfigServer //开启配置服务
public class CloudConfigCenter3344Application {
public static void main(String[] args) {
SpringApplication.run(CloudConfigCenter3344Application.class, args);
}
}
配置本机host文件
127.0.0.1 config3344
启动测试:
访问路径 IP地址:端口号 / 分支 / 文件名
3)搭建配置客户端
新建子模块cloud-config-client-3355
导入依赖:
<!-- 配置客户端依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
bootstrap.yaml是系统级别配置(优先级更高),application.yaml是用户级别配置
Springcloud 会创建一个Bootstrap Context作为Application Context的父级上下文,先从外部源加载配置,默
认情况下不会被本地配置覆盖
配置客户端中需要使用bootstrap.yaml:
server:
port: 3355
spring:
application:
name: cloud-config-client
cloud:
# 从配置中心获取名为config-dev的配置文件
config: # 配置客户端
label: master # 分支
name: config # 配置文件名称
profile: dev # 文件后缀名称
uri: http://localhost:3344 # 配置中心地址
eureka:
instance:
instance-id: configClient3355
prefer-ip-address: true
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka/
主启动类:
@SpringBootApplication
@EnableEurekaClient
public class CloudConfigClient3355Application {
public static void main(String[] args) {
SpringApplication.run(CloudConfigClient3355Application.class, args);
}
}
业务类:
@RestController
public class ConfigClientController {
@Value("${config.info}")
private String configInfo;
@Value("${server.port}")
private String serverPort;
@GetMapping("/config/info")
public String getConfigInfo(){
return "serverPort:" + serverPort + "\n configInfo:" + configInfo;
}
}
启动测试:
动态刷新配置问题:
修改git远程仓库配置文件内容,配置中心配置发生改变,但是配置客户端配置未改变(需要重启才能变更)
解决方法:
修改配置客户端
导入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
修改application.yaml,暴露监控端口:
management:
endpoints:
web:
exposure:
include: "*"
控制类添加刷新注解:
@RestController
@RefreshScope //刷新
public class ConfigClientController {
@Value("${config.info}")
private String configInfo;
@Value("${server.port}")
private String serverPort;
@GetMapping("/config/info")
public String getConfigInfo(){
return "serverPort:" + serverPort + "\n configInfo:" + configInfo;
}
}
启动测试:
在修改远程仓库的配置后,只需要发送post请求通知对应的客户端实现动态刷新(不需要重启):
2.Nacos
1)介绍
配置说明:https://nacos.io/zh-cn/docs/quick-start-spring-cloud.html
2)搭建配置中心
创建子模块cloud-alibaba-config-client3377
导入依赖:
<!-- nacos配置中心依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- nacos注册中心依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置文件bootstrap.yml,保证先从配置中心拉取配置:
# 配置文件名称 ${spring.application.name}-${spring.profile.active}.${spring.cloud.nacos.config.file-extension} 即 nacos-config-client-dev.yaml
server:
port: 3377
spring:
application:
name: nacos-config-client
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
config:
file-extension: yaml # 指定配置文件的格式为yaml
server-addr: 127.0.0.1:8848
配置文件application.yml:
spring:
profiles:
active: dev
nacos添加配置nacos-config-client-dev.yaml(后缀不能写成yml,无法识别):
主启动类:
@SpringBootApplication
@EnableDiscoveryClient
public class CloudAlibabaConfigClient3377Application {
public static void main(String[] args) {
SpringApplication.run(CloudAlibabaConfigClient3377Application.class, args);
}
}
业务类:
@RestController
@RefreshScope //动态刷新
public class ConfigController {
@Value("${config.info}")
private String configInfo;
@GetMapping("/config/info")
public String getConfigInfo(){
return configInfo;
}
}
启动测试:
nacos配置中心修改后自动刷新:
3)命名空间
命名空间由Namespace、DataId和Group组成
Namespace用于隔离环境(dev、test、prod)默认为public
Group用于划分不同的微服务默认为DEFAULT_GROUP
DataId区分不同名称的配置文件
nacos添加不同Group配置文件:
修改配置文件:
server:
port: 3377
spring:
application:
name: nacos-config-client
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
config:
file-extension: yaml
server-addr: 127.0.0.1:8848
#group: TEST_GROUP # 配置分组
group: DEV_GROUP
spring:
profiles:
active: info
nacos创建命名空间:
添加不同namespace的配置文件:
修改配置文件:
server:
port: 3377
spring:
application:
name: nacos-config-client
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
config:
file-extension: yaml
server-addr: 127.0.0.1:8848
group: DEV_GROUP # 配置文件分组
# group: TEST_GROUP
namespace: 5bc683ac-1e00-4e2d-81d3-6acda8f54b7e # 配置文件命名空间
spring:
profiles:
active: dev
十、消息总线Bus
1.介绍
在动态刷新中,当修改远程仓库配置时,配置中心会自动更新,但客户端不会自动变更,需要手动发送post
请求通知其刷新,如果存在大量配置客户端通知就会变得很麻烦
Bus管理和传播分布式系统的消息,支持Kafka和RabbitMQ两种中间件,用于广播状态更改,配合
Config可以实现真正的配置动态刷新
2.配置消息总线
修改配置中心
导入依赖:
<!-- 添加消息总线支持 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
bootstrap.yaml:
# RabbitMQ配置
rabbitmq:
host: ip地址
port: 5672
username: 用户名
password: 密码
# 暴露bus刷新配置端点
management:
endpoints:
web:
exposure:
include:
- 'bus-refresh'
修改配置客户端
导入依赖:
<!-- 添加消息总线支持 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
bootstrap.yaml:
# RabbitMQ配置
rabbitmq:
host: ip地址
port: 5672
username: 用户名
password: 密码
新建子模块cloud-config-client-3366(添加一个配置客户端测试)
导入依赖:
<!-- 添加消息总线支持 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
bootstrap.yaml:
server:
port: 3366
spring:
application:
name: cloud-config-client
cloud:
config: # 配置客户端
label: master # 分支
name: config # 配置文件名称
profile: dev # 文件后缀名称
uri: http://localhost:3344 # 配置中心地址
# RabbitMQ配置
rabbitmq:
host: ip地址
port: 5672
username: 用户名
password: 密码
eureka:
instance:
instance-id: configClient3366
prefer-ip-address: true
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka/
management:
endpoints:
web:
exposure:
include: "*"
主启动类:
@SpringBootApplication
@EnableEurekaClient
public class CloudConfigClient3366Application {
public static void main(String[] args) {
SpringApplication.run(CloudConfigClient3366Application.class, args);
}
}
控制器:
@RestController
@RefreshScope //刷新
public class ConfigClientController {
@Value("${config.info}")
private String configInfo;
@Value("${server.port}")
private String serverPort;
@GetMapping("/config/info")
public String getConfigInfo(){
return "serverPort:" + serverPort + "\n configInfo:" + configInfo;
}
}
启动测试:
刷新3344端口,通知其他所有客户端刷新配置
原理分析:
原理图:
通过消息中间件通知所有配置客户端刷新配置,创建一个topic交换机,每一个配置客户端都有一个临时队列
,这些队列和交换机进行绑定Routing key都是#
,在发送post请求更新配置中心配置的同时发送刷新消息到
topic交换机,交换机再将消息投递到每个绑定的队列中,由各个配置客户端消费,相当于广播通知
3.定点刷新
有时候我们只想更新指定客户端的配置信息,进行灰度更新
此时需要在发送post请求指定客户端服务名称加上端口号:
curl -X POST http://localhost:3344/actuator/bus-refresh/微服务名称:端口号
测试:
只通知更新3355,3366没有更新
十一、消息驱动Stream
1.介绍
消息驱动(Stream),可以屏蔽底层消息中间件的差异,降低切换成本,统一编程模型
官网:https://spring.io/projects/spring-cloud-stream
结构图:
应用程序通过**inputs(消费者)和outputs(生产者)**来与Stream中的Binder对象交互,Binder对象负责与消息中
间件交互,消除中间件之间的差异
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T7Ei9luN-1661052839797)(D:\desktop\笔记\Springcloud.assets\SCSt-overview.png)]
常用注解:
注解 | 解释 |
---|---|
@Input | 标识为输入通道,用于接受消息 |
@Output | 标识为输出通道,用于发送消息 |
@StreamListener | 监听队列,用于消费消息 |
@EnableBinding | 将信道和交换机绑定 |
2.搭建生产者消费者模型
新建生产者子模块:cloud-stream-rabbitmq-provider8801
导入依赖:
<!-- 导入rabbit stream依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
application.yaml:
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 绑定RabbitMQ信息
defaultRabbit: # 名称,用于整合
type: rabbit # 消息中间件类型
bindings: # 服务整合
output: # 发送通道
destination: studyExchange # 交换机名称
content-type: application/json # 消息类型
binder: defaultRabbit # 设置绑定消息服务
rabbitmq:
host: ip地址
port: 5672
username: 用户名
password: 密码
eureka:
instance:
instance-id: streamProvider8801
prefer-ip-address: true
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka/
业务类:
public interface IMessageProviderService {
String send();
}
//定义消息的推送管道
@Slf4j
@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
//消息发送管道
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = IdUtil.simpleUUID();
output.send(MessageBuilder.withPayload(serial).build());
log.info("serial = {}", serial);
return serial;
}
}
@RestController
public class SendMessageController {
@Autowired
private IMessageProviderService messageProviderService;
@GetMapping("/sendMessage")
public String send(){
return messageProviderService.send();
}
}
新建消费者子模块:cloud-stream-rabbitmq-consumer8802
导入相同依赖
application.yaml:
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 绑定RabbitMQ信息
defaultRabbit: # 名称,用于整合
type: rabbit # 消息中间件类型
bindings: # 服务整合
input: # 接受通道
destination: studyExchange # 交换机名称
content-type: application/json # 消息类型
binder: defaultRabbit # 设置绑定消息服务
rabbitmq:
host: ip地址
port: 5672
username: 用户名
password: 密码
eureka:
instance:
instance-id: streamConsumer8802
prefer-ip-address: true
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka/
业务类:
@EnableBinding(Sink.class)
public class ReceiveMessageController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("接受消息:" + message.getPayload() + "\t port:" + serverPort);
}
}
启动测试:
生成的临时队列名称中包含随机分组名
3.分组消费
另外创建一个消费者子模块:cloud-stream-rabbitmq-consumer8803
依赖、业务类和之前的相同,application.yaml只需要修改端口号为8803即可
启动测试:
两个消费者消费了同一条消息,存在重复消费问题
在Stream中处于同一个group中的多个消费者是竞争关系,保证消息只会被其中一个服务消费
修改两个消费者配置文件,将它们放入同一组:
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
group: consumer #进行分组(同一分组对于同一消息只能消费一次)
测试:
4.持久化
指定分组名称的消费者队列默认会进行持久化
未指定分组时,队列为临时队列,一旦消费者宕机队列自动删除,消息也会随之丢失:
指定分组时,队列会被持久化,一旦消费者宕机队列自动删除,消息会被持久化,当消费者恢复后可以继续
消费之前错过的消息:
十二、链路追踪Sleuth
1.介绍
在微服务调用中,每一个请求都会形成一条复杂的分布式调用链路,链路中任何一环出错都会导致整个请求
的失败,此时需要Sleuth和Zipkin来进行链路追踪
2.搭建Zipkin
下载Zipkin:https://repo1.maven/maven2/io/zipkin/zipkin-server/
解压缩后是一个jar包:
运行java -jar zipkin-server-2.23.18-exec.jar
访问浏览器http://localhost:9411/zipkin/
链路图:
整条链路通过Trace Id唯一标识,Span标识发起的请求信息,每一个Span用Parent Id进行关联
3.订单、支付微服务继承Sleuth
修改模块cloud-consumer-order8000
添加依赖:
<!-- sleuth + zipkin -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
添加配置:
spring:
application:
name: cloud-order-service
# zipkin和sleuth
zipkin:
base-url: http://localhost:9411 # zipkin地址
sleuth:
sampler:
# 采样率介于0到1之间,1表示全部采样
probability: 1
添加控制器方法:
@GetMapping("/consumer/payment/zipkin")
public String getPaymentZipkin(){
String result = restTemplate.getForObject("http://localhost:8001" + "/payment/zipkin", String.class);
return result;
}
修改cloud-provider-payment8001模块
添加相同依赖,添加配置
添加控制器方法:
@GetMapping("/payment/zipkin")
public String getPaymentZipkin(){
return "zipkin...";
}
启动测试:
十三、分布式事务Seata
1.介绍
Seata(Simple Extensible Autonomous Transaction Architecture)用于分布式事务解决一次操作多个数据源的
数据一致性问题,Seata 提供了 AT、TCC、SAGA 和 XA 事务模式
术语表:
名称 | 解释 |
---|---|
XID(Transaction ID) | 全局唯一事务ID |
TC(Transaction Coordinator) | 事务协调者,维护全局和分支事务的状态,驱动全局事务提交或回滚 |
TM(Transaction Manager) | 事务管理器,开始全局事务、提交或回滚全局事务 |
RM(Resource Manager) | 资源管理器,控制分支(本地)事务,负责分支注册、状态汇报,接受TC的指令,驱动分支事务提交或回滚 |
事务模式表:
事务模式 | 解释 |
---|---|
AT(Auto Transaction) | 自动事务,只需编写业务SQL,seata进行二阶段提交和回滚操作 |
TCC(Try Confirm Cancle) | 补偿型事务,需要自定义资源检测和预留、提交、释放预留资源操作 |
SAGA | 长事务,适用于业务流程长的系统 |
XA(eXtended Architecture) | 分布式事务,事务粒度大,高并发下系统可用性低 |
基本流程:
- TM向TC申请开启一个全局事务,并生成一个全局唯一的XID
- RM向TC注册分支事务,将其纳入XID对应全局事务的管辖
- TM向TC进行提交或回滚
- TC调度XID管辖下的全部分支事务完成提交或回滚
2.安装
下载地址:https://github/seata/seata/releases
由于需要和springcloud alibaba兼容,下面选择seata0.9.0版本
解压后:
创建数据库seata和相关表:
Seata0.9.0版本Sql脚本/conf/db_store.sql
1.0.0以上版本查看github:https://github/seata/seata/blob/1.0.0/script/server/db/mysql.sql
-- the table to store GlobalSession data
drop table if exists `global_table`;
create table `global_table` (
`xid` varchar(128) not null,
`transaction_id` bigint,
`status` tinyint not null,
`application_id` varchar(32),
`transaction_service_group` varchar(32),
`transaction_name` varchar(128),
`timeout` int,
`begin_time` bigint,
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`xid`),
key `idx_gmt_modified_status` (`gmt_modified`, `status`),
key `idx_transaction_id` (`transaction_id`)
);
-- the table to store BranchSession data
drop table if exists `branch_table`;
create table `branch_table` (
`branch_id` bigint not null,
`xid` varchar(128) not null,
`transaction_id` bigint ,
`resource_group_id` varchar(32),
`resource_id` varchar(256) ,
`lock_key` varchar(128) ,
`branch_type` varchar(8) ,
`status` tinyint,
`client_id` varchar(64),
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`branch_id`),
key `idx_xid` (`xid`)
);
-- the table to store lock data
drop table if exists `lock_table`;
create table `lock_table` (
`row_key` varchar(128) not null,
`xid` varchar(96),
`transaction_id` long ,
`branch_id` long,
`resource_id` varchar(256) ,
`table_name` varchar(32) ,
`pk` varchar(36) ,
`gmt_create` datetime ,
`gmt_modified` datetime,
primary key(`row_key`)
);
修改配置文件file.conf:
如果使用mysql8版本需要删除Connector/J 5.1.30以免出错,下载Connector/J 8.0.30jar包复制到lib目录下
下载地址:https://mvnrepository/artifact/mysql/mysql-connector-java
修改配置文件registry.conf:
先启动nacos,然后启动seata-server,如果出现OOM,修改seata-server.bat:
3.搭建订单库存支付业务
创建三个微服务,进行下单,扣减库存,扣减账户,修改订单状态等一系列流程
1)创建数据库
每个数据需要额外添加一张undo_log
表用于回滚
Seata0.9.0版本Sql脚本/conf/db_undo_log.sql
1.0.0以上版本查看github:https://github/seata/seata/blob/1.0.0/script/client/at/db/mysql.sql
账户数据库seata_account:
CREATE DATABASE seata_account;
USE DATABASE seata_account;
-- ----------------------------
-- Table structure for t_account
-- ----------------------------
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE "t_account" (
"id" bigint NOT NULL AUTO_INCREMENT COMMENT '账户id',
"user_id" bigint NOT NULL COMMENT '用户id',
"total" decimal(10,2) NOT NULL COMMENT '总额度',
"used" decimal(10,2) NOT NULL COMMENT '已用余额',
"residue" decimal(10,2) NOT NULL COMMENT '剩余可用额度',
PRIMARY KEY ("id") USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='账户表';
-- ----------------------------
-- Records of t_account
-- ----------------------------
INSERT INTO `t_account` VALUES ('1', '1', '1000.00', '0.00', '1000.00');
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE "undo_log" (
"id" bigint NOT NULL AUTO_INCREMENT,
"branch_id" bigint NOT NULL,
"xid" varchar(100) NOT NULL,
"context" varchar(128) NOT NULL,
"rollback_info" longblob NOT NULL,
"log_status" int NOT NULL,
"log_created" datetime NOT NULL,
"log_modified" datetime NOT NULL,
"ext" varchar(100) DEFAULT NULL,
PRIMARY KEY ("id"),
UNIQUE KEY "ux_undo_log" ("xid","branch_id")
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
订单数据库seata_order:
-- ----------------------------
-- Table structure for t_order
-- ----------------------------
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE "t_order" (
"id" bigint NOT NULL AUTO_INCREMENT COMMENT '订单id',
"user_id" bigint NOT NULL COMMENT '用户id',
"product_id" bigint NOT NULL COMMENT '产品id',
"count" int NOT NULL COMMENT '数量',
"money" decimal(10,2) NOT NULL COMMENT '金额',
"status" tinyint NOT NULL DEFAULT '0' COMMENT '订单状态: 0:创建中 1:已完结',
PRIMARY KEY ("id") USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='订单表';
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE "undo_log" (
"id" bigint NOT NULL AUTO_INCREMENT,
"branch_id" bigint NOT NULL,
"xid" varchar(100) NOT NULL,
"context" varchar(128) NOT NULL,
"rollback_info" longblob NOT NULL,
"log_status" int NOT NULL,
"log_created" datetime NOT NULL,
"log_modified" datetime NOT NULL,
"ext" varchar(100) DEFAULT NULL,
PRIMARY KEY ("id"),
UNIQUE KEY "ux_undo_log" ("xid","branch_id")
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;
库存数据库seata_storage:
-- ----------------------------
-- Table structure for t_storage
-- ----------------------------
DROP TABLE IF EXISTS `t_storage`;
CREATE TABLE "t_storage" (
"id" bigint NOT NULL AUTO_INCREMENT COMMENT '库存id',
"product_id" bigint NOT NULL COMMENT '产品id',
"total" int NOT NULL COMMENT '总库存',
"used" int NOT NULL COMMENT '已用库存',
"residue" int NOT NULL COMMENT '剩余库存',
PRIMARY KEY ("id") USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='库存';
-- ----------------------------
-- Records of t_storage
-- ----------------------------
INSERT INTO `t_storage` VALUES ('1', '1', '100', '0', '100');
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE "undo_log" (
"id" bigint NOT NULL AUTO_INCREMENT,
"branch_id" bigint NOT NULL,
"xid" varchar(100) NOT NULL,
"context" varchar(128) NOT NULL,
"rollback_info" longblob NOT NULL,
"log_status" int NOT NULL,
"log_created" datetime NOT NULL,
"log_modified" datetime NOT NULL,
"ext" varchar(100) DEFAULT NULL,
PRIMARY KEY ("id"),
UNIQUE KEY "ux_undo_log" ("xid","branch_id")
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;
2)创建订单子模块
创建订单子模块seata-order-service2001
导入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 需要排除seata-all依赖,另外导入对应版本 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 需要导入seata server版本对应的依赖 -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-boot-starter</artifactId>
<version>3.0.0</version>
</dependency>
配置文件application.yml:
server:
port: 2001
spring:
application:
name: seata-order-service
cloud:
alibaba:
seata:
tx-service-group: qingsongxyz # 事务组名称需要和seata-server配置文件file.conf中的一致
nacos:
discovery:
server-addr: localhost:8848
datasource:
username: root
password: 1234
url: jdbc:mysql://localhost:3306/seata_order?useUnicode=true&characterEncoding=UTF-8&useSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
druid:
aop-patterns: com.qingsongxyz.*
filters: 'stat,wall'
stat-view-servlet:
enabled: true
login-username: admin
login-password: admin
reset-enable: true
web-stat-filter:
enabled: true
filter:
stat:
enabled: true
wall:
enabled: true
db-type: mysql
config:
delete-allow: false
drop-table-allow: false
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
map-underscore-to-camel-case: true
mapper-locations: classpath*:/mapper/**/*.xml
feign:
hystrix:
enabled: false
client:
config:
default:
ReadTimeout: 3000 # 设置建立连接使用时间
ConnectTimeout: 3000 # 设置建立连接后连接服务读取可用资源时间
logging:
level:
io:
seata: info
在resources目录下添加file.conf和registry.conf文件:
service {
# 需要指定事务组名称和seata-server配置文件file.conf中的一致
vgroup_mapping.qingsongxyz = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#disable seata
disableGlobalTransaction = false
}
## transaction log store, only used in seata-server
store {
## store mode: file、db
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "druid"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/seata?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false"
user = "root"
password = "1234"
}
}
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
SwaggerConfig配置类:
@Configuration
public class SwaggerConfig {
@Bean
public Docket createRestApi(){
return new Docket(DocumentationType.OAS_30)
.groupName("开发1组")
.select()
//.paths(PathSelectors.ant("/list/**")) //访问路径过滤
.apis(RequestHandlerSelectors.basePackage("com.qingsongxyz.controller")) //包过滤
.build()
.apiInfo(createApiInfo())
.enable(true);
}
@Bean
public ApiInfo createApiInfo() {
return new ApiInfo("qingsongxyz Swagger",
"qingsongxyz Api Documentation",
"3.0",
"http:xx",
new Contact("qingsongxyz", "http:xxx", "xxx@qq"),
"Apache 2.0",
"http://www.apache/licenses/LICENSE-2.0",
new ArrayList());
}
}
MybatisPlusConfig配置类:
@Configuration
public class MybatisPlusConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
//乐观锁
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
//分表
interceptor.addInnerInterceptor(new PaginationInnerInterceptor());
//阻止恶意的全表更新删除
interceptor.addInnerInterceptor(new BlockAttackInnerInterceptor());
return interceptor;
}
}
MybatisPlus自动填充字段:
@Slf4j
@Component
public class MyMetaObjectHandler implements MetaObjectHandler {
@Override
public void insertFill(MetaObject metaObject) {
log.info("start insert fill ....");
this.setFieldValByName("createTime", new Date(), metaObject);
this.setFieldValByName("updateTime", new Date(), metaObject);
this.setFieldValByName("deleted", 0, metaObject);
}
@Override
public void updateFill(MetaObject metaObject) {
log.info("start update fill ....");
this.setFieldValByName("updateTime", new Date(), metaObject);
}
}
FeignConfig配置类:
@Configuration
public class FeignConfig {
@Bean
public Logger.Level feignLoggerLevel(){
return Logger.Level.FULL; //设置Feign日志级别 最详细
}
}
实体类:
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("t_order")
@ApiModel(value = "订单实体类")
public class Order {
/**
* 订单id
*/
@ApiModelProperty("订单id")
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户id
*/
@ApiModelProperty("用户id")
private Long userId;
/**
* 产品id
*/
@ApiModelProperty("产品id")
private Long productId;
/**
* 产品数量
*/
@ApiModelProperty("产品数量")
private Integer count;
/**
* 总金额
*/
@ApiModelProperty("总金额")
private BigDecimal money;
/**
* 订单状态
*/
@ApiModelProperty("订单状态")
private Integer status;
}
OrderMapper:
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
OrderService:
public interface OrderService extends IService<Order> {
int createOrder(Order order);
}
AccountService:
@Component
@FeignClient(value = "seata-account-service")
public interface AccountService {
@PutMapping("/account/userId")
void decreaseAccount(@RequestParam(value = "userId") Long userId, @RequestParam(value = "money") BigDecimal money);
}
StorageService:
@Component
@FeignClient(value = "seata-storage-service")
public interface StorageService {
@PutMapping("/storage/productId")
void decreaseStorage(@RequestParam(value = "productId") Long productId, @RequestParam(value = "count") Integer count);
}
OrderServiceImpl:
@Service
@Slf4j
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
@Autowired
private AccountService accountService;
@Autowired
private OrderMapper orderMapper;
@Autowired
private StorageService storageService;
public int createOrder(Order order){
log.info("创建订单...");
//1.创建订单
int insert = orderMapper.insert(order);
log.info("扣减产品库存...");
//2.扣减产品库存
storageService.decreaseStorage(order.getProductId(), order.getCount());
log.info("扣减账户余额...");
//3.扣减账户余额
accountService.decreaseAccount(order.getUserId(), order.getMoney());
log.info("修改订单状态...");
//4.修改订单状态 已支付
order.setStatus(1);
orderMapper.updateById(order);
log.info("下单完成...");
return insert;
}
}
OrderController:
@RestController
@Slf4j
@Api(tags = "订单类测试")
public class OrderController {
@Autowired
private OrderService orderService;
@ApiOperation("测试创建订单")
@ApiImplicitParams({
@ApiImplicitParam(name = "userId", value = "用户id", required = true, paramType = "query", dataTypeClass = Long.class),
@ApiImplicitParam(name = "productId", value = "产品id", required = true, paramType = "query", dataTypeClass = Long.class),
@ApiImplicitParam(name = "count", value = "产品数量", required = true, paramType = "query", dataTypeClass = Integer.class),
@ApiImplicitParam(name = "money", value = "总金额", required = true, paramType = "query", dataTypeClass = BigDecimal.class)
})
@ApiResponses({
@ApiResponse(code = 400, message = "参数有误"),
@ApiResponse(code = 401, message = "没有认证"),
@ApiResponse(code = 403, message = "没有权限")
})
@PostMapping("/order/create")
public CommonResult create(Order order){
int create = orderService.createOrder(order);
if(create > 0)
{
Order o = orderService.getById(order.getId());
return CommonResult.ok("创建订单成功!", o);
}else {
return CommonResult.failure(500, "创建订单失败!!!");
}
}
}
seata是基于拦截数据源来实现的分布式事务,我们需要排除自动注入的DataSourceAutoConfiguration
,然
后自定义数据源配置信息
@Configuration
@EnableConfigurationProperties({MybatisPlusProperties.class})
public class DataSourceProxyConfig {
@Value("${mybatis-plus.mapper-locations}")
private String mapperLocations;
@Autowired
private MybatisPlusInterceptor mybatisPlusInterceptor;
@Autowired
private MyMetaObjectHandler myMetaObjectHandler;
private MybatisPlusProperties properties;
public DataSourceProxyConfig(MybatisPlusProperties properties) {
this.properties = properties;
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
@Primary //@Primary标识必须配置在数据源上,否则本地事务失效
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public MybatisSqlSessionFactoryBean sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
MybatisConfiguration configuration = this.properties.getConfiguration();
if(configuration == null){
configuration = new MybatisConfiguration();
}
sqlSessionFactoryBean.setConfiguration(configuration);
// 向代理数据源添加插件
sqlSessionFactoryBean.setPlugins(mybatisPlusInterceptor);
// 设置全局配置,增加自动填充主键、插入更新字段
sqlSessionFactoryBean.setGlobalConfig(new GlobalConfig()
.setMetaObjectHandler(myMetaObjectHandler)
.setIdentifierGenerator(new DefaultIdentifierGenerator()));
return sqlSessionFactoryBean;
}
}
主启动类:
@EnableFeignClients
@EnableDiscoveryClient
@MapperScan("com.qingsongxyz.mapper")
@SpringBootApplication(exclude = {
DataSourceAutoConfiguration.class
})//配置Seata代理
public class SeataOrderService2001Application {
public static void main(String[] args) {
SpringApplication.run(SeataOrderService2001Application.class, args);
}
}
3)创建库存、账户子模块
创建库存子模块seata-storage-service2002
导入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 需要排除seata-all依赖,另外导入对应版本 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 需要导入seata server安装版本对应的依赖 -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置文件和订单子模块一致,只需修改端口号和微服务名称
server:
port: 2002
spring:
application:
name: seata-storage-service
...
在resources目录下添加file.conf和registry.conf文件内容和订单子模块一致
MybatisPlus配置类,自动填充字段和订单子模块一致
实体类:
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "t_storage")
public class Storage {
/**
* 库存id
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 产品id
*/
private Long productId;
/**
* 产品总库存
*/
private Integer total;
/**
* 产品出售数量
*/
private Integer used;
/**
* 产品剩余数量
*/
private Integer residue;
}
StorageMapper.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis//DTD Mapper 3.0//EN"
"http://mybatis/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.qingsongxyz.mapper.StorageMapper">
<update id="decreaseStorage">
update t_storage set used = used + #{count}, residue = residue - #{count} where product_id = #{productId}
</update>
</mapper>
StorageMapper:
@Mapper
public interface StorageMapper extends BaseMapper<Storage> {
int decreaseStorage(@Param("productId") Long productId, @Param("count") Integer count);
}
StorageService:
public interface StorageService extends IService<Storage> {
Storage getStorageByProductId(Long productId);
void decreaseStorage(Long productId, Integer count);
}
StorageServiceImpl:
@Service
@Slf4j
public class StorageServiceImpl extends ServiceImpl<StorageMapper, Storage> implements StorageService {
@Autowired
private StorageMapper storageMapper;
@Override
public Storage getStorageByProductId(Long productId) {
return storageMapper.selectOne(new QueryWrapper<Storage>().eq("product_id", productId));
}
@Override
public void decreaseStorage(Long productId, Integer count) {
log.info("扣减产品库存开始...");
//判断产品是否存在
Storage s = getStorageByProductId(productId);
if(ObjectUtil.isEmpty(s))
{
throw new RuntimeException("产品id不存在...");
}
//2.判断产品库存是否充足
if(s.getResidue() < count)
{
throw new RuntimeException("产品库存不足...");
}
//3.扣减产品库存
storageMapper.decreaseStorage(productId, count);
log.info("扣减产品库存结束...");
}
}
StorageController:
@RestController
@Slf4j
public class StorageController {
@Autowired
private StorageService storageService;
@PutMapping("/storage/productId")
public void decreaseStorage(Long productId, Integer count) {
log.info("productId:{}, count:{}", productId, count);
storageService.decreaseStorage(productId, count);
}
}
主启动类注解、自定义数据源和订单子模块一致
创建库存子模块seata-account-service2003
导入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 需要排除seata-all依赖,另外导入对应版本 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 需要导入seata server安装版本对应的依赖 -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.qingsongxyz</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置文件和订单子模块一致,只需修改端口号和微服务名称
server:
port: 2003
spring:
application:
name: seata-account-service
...
在resources目录下添加file.conf和registry.conf文件内容和订单子模块一致
MybatisPlus配置类,自动填充字段和订单子模块一致
实体类:
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("t_account")
public class Account {
/**
* 账户id
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户id
*/
private Long userId;
/**
* 账户总金额
*/
private BigDecimal total;
/**
* 已用金额
*/
private BigDecimal used;
/**
* 剩余金额
*/
private BigDecimal residue;
}
AccountMapper.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis//DTD Mapper 3.0//EN"
"http://mybatis/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.qingsongxyz.mapper.AccountMapper">
<update id="decreaseAccount">
update t_account set used = used + #{money}, residue = residue - #{money} where user_id = #{userId}
</update>
</mapper>
AccountMapper:
@Mapper
public interface AccountMapper extends BaseMapper<Account> {
int decreaseAccount(@Param("userId") Long userId, @Param("money") BigDecimal money);
}
AccountService:
public interface AccountService extends IService<Account> {
Account getAccountByUserId(Long userId);
void decreaseAccount(Long userId, BigDecimal money);
}
AccountServiceImpl:
@Service
@Slf4j
public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements AccountService {
@Autowired
private AccountMapper accountMapper;
@Override
public Account getAccountByUserId(Long userId) {
return accountMapper.selectOne(new QueryWrapper<Account>().eq("user_id", userId));
}
@Override
public void decreaseAccount(Long userId, BigDecimal money) {
log.info("扣减账户余额开始...");
//模拟超时 全局事务回滚
//try {
// TimeUnit.SECONDS.sleep(20);
//} catch (InterruptedException e) {
// e.printStackTrace();
//}
//1.判断账户是否存在
Account a = getAccountByUserId(userId);
if(ObjectUtil.isEmpty(a))
{
throw new RuntimeException("账户id不存在...");
}
//2.判断余额是否充足
if(a.getResidue().compareTo(money) == -1)
{
throw new RuntimeException("账户余额不足...");
}
//3.扣减账户余额
accountMapper.decreaseAccount(userId, money);
log.info("扣减账户余额结束...");
}
}
AccountController:
@RestController
@Slf4j
public class AccountController {
@Autowired
private AccountService accountService;
@PutMapping("/account/userId")
public void decreaseAccount(Long userId, BigDecimal money){
log.info("userId:{}, money:{}", userId, money);
accountService.decreaseAccount(userId, money);
}
}
主启动类注解、自定义数据源和订单子模块一致
4)业务测试
t_account初始数据:
t_order初始为空:
t_storage初始数据:
启动三个子模块
浏览器访问:http://localhost:2001/swagger-ui/#/%E8%AE%A2%E5%8D%95%E7%B1%BB%E6%B5%8B%E8%AF%95/createUsingPOST
t_account数据:
t_order数据:
t_storage数据:
当账户子模块扣减余额超时,造成支付订单后状态未修改成已支付:
public void decreaseAccount(Long userId, BigDecimal money) {
log.info("扣减账户余额开始...");
//模拟超时 全局事务回滚
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
//1.判断账户是否存在
Account a = getAccountByUserId(userId);
if(ObjectUtil.isEmpty(a))
{
throw new RuntimeException("账户id不存在...");
}
//2.判断余额是否充足
if(a.getResidue().compareTo(money) == -1)
{
throw new RuntimeException("账户余额不足...");
}
//3.扣减账户余额
accountMapper.decreaseAccount(userId, money);
log.info("扣减账户余额结束...");
}
t_account数据:
用户付款成功
t_order数据:
订单状态未修改成已支付
t_storage数据:
只要在一整套流程中出现超时,异常都可能导致数据错误,需要进行回滚
修改下单操作方法,添加seata分布式事务注解@GlobalTransactional:
@GlobalTransactional(name = "seata-create-order", rollbackFor = Exception.class)
public int createOrder(Order order){
log.info("创建订单...");
//1.创建订单
int insert = orderMapper.insert(order);
log.info("扣减产品库存...");
//2.扣减产品库存
storageService.decreaseStorage(order.getProductId(), order.getCount());
log.info("扣减账户余额...");
//3.扣减账户余额
accountService.decreaseAccount(order.getUserId(), order.getMoney());
log.info("修改订单状态...");
//4.修改订单状态 已支付
order.setStatus(1);
orderMapper.updateById(order);
log.info("下单完成...");
return insert;
}
测试:
t_order数据:
扣减账户余额超时下单数据被回滚
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Inherited
public @interface GlobalTransactional {
/**
* 全局事务超时时间 默认为60s
*/
int timeoutMills() default TransactionInfo.DEFAULT_TIME_OUT;
/**
* 全局事务名称 唯一即可
*/
String name() default "";
/**
* 指定抛出的异常进行回滚
*/
Class<? extends Throwable>[] rollbackFor() default {};
/**
* 指定类名进行回滚
*/
String[] rollbackForClassName() default {};
/**
* 指定排除的异常不进行回滚
*/
Class<? extends Throwable>[] noRollbackFor() default {};
/**
* 指定排除的类名不进行回滚
*/
String[] noRollbackForClassName() default {};
}
4.原理分析
AT模式:
debug调试:
在扣减账户余额处打断点
seata数据库branch_table分支事务表:
注册三个分支事务xid相同属于同一个分布式事务
seata数据库global_table全局事务表:
seata数据库lock_table全局锁表:
seata_account数据库undo_log表:
{
"@class":"io.seata.rm.datasource.undo.BranchUndoLog",
"xid":"外网IP:8091:2114620759",
"branchId":2114620772,
"sqlUndoLogs":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType":"UPDATE", //扣减账户余额SQL
"tableName":"t_account",
"beforeImage":{ //执行业务SQL之前数据库镜像
"@class":"io.seata.rm.datasource.sql.struct.TableRecords",
"tableName":"t_account",
"rows":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Row",
"fields":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"id",
"keyType":"PrimaryKey",
"type":-5,
"value":[
"java.lang.Long",
1
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"used",
"keyType":"NULL",
"type":3,
"value":[
"java.math.BigDecimal",
10 //原先账户已用金额
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"residue",
"keyType":"NULL",
"type":3,
"value":[
"java.math.BigDecimal",
90 //原先账户剩余金额
]
}
]
]
}
]
]
},
"afterImage":{ //执行业务SQL之后数据库镜像
"@class":"io.seata.rm.datasource.sql.struct.TableRecords",
"tableName":"t_account",
"rows":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Row",
"fields":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"id",
"keyType":"PrimaryKey",
"type":-5,
"value":[
"java.lang.Long",
1
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"used",
"keyType":"NULL",
"type":3,
"value":[
"java.math.BigDecimal",
20 //扣减余额后账户已用金额
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"residue",
"keyType":"NULL",
"type":3,
"value":[
"java.math.BigDecimal",
80 //扣减余额后账户剩余金额
]
}
]
]
}
]
]
}
}
]
]
}
seata_order数据库undo_log表:
{
"@class":"io.seata.rm.datasource.undo.BranchUndoLog",
"xid":"外网IP:8091:2114620759",
"branchId":2114620762,
"sqlUndoLogs":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType":"INSERT", //创建订单SQL
"tableName":"t_order",
"beforeImage":{
"@class":"io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords",
"tableName":"t_order",
"rows":[
"java.util.ArrayList",
[
]
]
},
"afterImage":{ //执行业务SQL之前后数据库镜像
"@class":"io.seata.rm.datasource.sql.struct.TableRecords",
"tableName":"t_order",
"rows":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Row",
"fields":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"id",
"keyType":"PrimaryKey",
"type":-5,
"value":[
"java.lang.Long",
33 //订单id
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"user_id",
"keyType":"NULL",
"type":-5,
"value":[
"java.lang.Long",
1 //用户id
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"product_id",
"keyType":"NULL",
"type":-5,
"value":[
"java.lang.Long",
1 //产品id
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"count",
"keyType":"NULL",
"type":4,
"value":1 //产品数量
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"money",
"keyType":"NULL",
"type":3,
"value":[
"java.math.BigDecimal",
10 //订单总金额
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"status",
"keyType":"NULL",
"type":-6,
"value":0 //订单状态
}
]
]
}
]
]
}
}
]
]
}
seata_storage数据库undo_log表:
{
"@class":"io.seata.rm.datasource.undo.BranchUndoLog",
"xid":"外网IP:8091:2114622508",
"branchId":2114622515,
"sqlUndoLogs":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType":"UPDATE", //扣减库存SQL
"tableName":"t_storage",
"beforeImage":{ //执行业务SQL之前数据库镜像
"@class":"io.seata.rm.datasource.sql.struct.TableRecords",
"tableName":"t_storage",
"rows":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Row",
"fields":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"id",
"keyType":"PrimaryKey",
"type":-5,
"value":[
"java.lang.Long",
1
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"used",
"keyType":"NULL",
"type":4,
"value":2 //原先产品出售数量
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"residue",
"keyType":"NULL",
"type":4,
"value":98 //原先产品库存
}
]
]
}
]
]
},
"afterImage":{ //执行业务SQL之前数据库镜像
"@class":"io.seata.rm.datasource.sql.struct.TableRecords",
"tableName":"t_storage",
"rows":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Row",
"fields":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"id",
"keyType":"PrimaryKey",
"type":-5,
"value":[
"java.lang.Long",
1
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"used",
"keyType":"NULL",
"type":4,
"value":3 //扣减库存后产品出售数量
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"residue",
"keyType":"NULL",
"type":4,
"value":97 //扣减库存后产品库存
}
]
]
}
]
]
}
}
]
]
}
断点放行后会异步批量删除对应的系统表和undo_log表信息
总结:
两阶段提交协议(2pc)
一阶段:
- 解析 SQL 语义,找到业务 SQL 更新的数据,在更新前,将数据保存为一份镜像
before image
- 执行 SQL 更新数据,将更新后的数据保存为一份镜像
after image
- 将前后镜像数据和SQL一起组成一条回滚日志,插入到
undo_log
表中 - 向TC注册分支事务,申请全局锁,提交本地事务(SQL、undo_log、行锁),将结果上报TC
二阶段提交:
- 将TC的分支提交请求放入一个异步队列,返回提交成功结果给TC
- 队列中的分支请求将异步批量删除相应的undo_log记录
二阶段回滚:
- 先将当前数据库数据和
after image
进行比较,如果完全一致则说明没有脏写,可以回滚,否则需要转人工处理 - 通过
before image
生成SQL语句进行数据还原 - 删除中间数据镜像、行锁
版权声明:本文标题:Spring Cloud 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/xitong/1725735987a1039727.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论