Mono/Flux/Backpressure)"/>
springboot2 webfux 响应式编程(Mono/Flux/Backpressure)
What’s Spring WebFlux
让我们来看下官方英文怎么介绍的,有兴趣的也可以看一下spring官方的web-react。
The original web framework included in the Spring Framework, Spring Web MVC, was purpose-built for the Servlet API and Servlet containers. The reactive-stack web framework, Spring WebFlux, was added later in version 5.0. It is fully
non-blocking
, supports Reactive Streams back pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.
Both web frameworks mirror the names of their source modules (spring-webmvc and spring-webflux) and co-exist side by side in the Spring Framework. Each module is optional. Applications can use one or the other module or, in some cases, both — for example, Spring MVC controllers with the reactive WebClient.
Spring MVC
是Spring Framework中包含的原始Web框架
,也是专为Servlet API和Servlet容器构建的Web框架。
响应式堆栈Web框架
是Spring WebFlux是在Spring5版才有的, 是一个完全是非阻塞
编程,支持Reactive Streams响应流,并且可以很好运行在Netty,Undertow和Servlet 3.1+容器等服务器上(言外之意,开发人员可以使用 WebFlux 创建高性能
的 Web 应用和客户端)。
这两个Web框架都反映了其源模块的名称(spring-webmvc和spring-webflux),并在Spring Framework
中并存
。 每个模块都是可选的。 应用程序可以使用一个或另一个模块,或者在某些情况下,两者都混合使用,例如在MVC的Controller中带有反应式WebClient。
接下来,我们一起搞清楚几个问题:
- 关于Reactive Streams、Srping Reactor 和 Spring WebFlux之间的关系
Reactive Streams 是
规范
,Srping Reactor实现
了 Reactive Streams。Spring WebFlux 是以 Reactor 为基础
,实现 Web 领域的反应式编程框架
。
- 反应式编程思想是什么?Backpressure背压又是什么?
反应式编程框架主要采用了观察者模式,而 Spring Reactor 的核心则是对观察者模式的一种延伸。观察者模式的架构中
被观察者(Observable)
和观察者(Subscriber)
处在不同的线程环境中时,由于者各自的工作量不一样,导致它们产生事件和处理事件的速度不一样,这时就出现了两种情况:
- Controller被观察者处理事件慢一些,Client/Browser观察者处理在等啊等。那么Client/Browser观察者就会等着被观察者Controller响应事件,例如排队卖烧饼,被观察者在卖,观察者在买,没问题吧,典型的。
- Controller被观察者处理事件的速度很快,而Client/Browser观察者处理很慢(客户端忙,服务器想响应给你,你都无法接收)。那就有点问题了,如果不作处理的话,事件会堆积起来,最终挤爆你的内存,导致程序崩溃,生产一堆烧饼等人买?
- 这时就出现了Backpressure(
背压
-----指在异步场景中,被观察者Controller发送事件速度远快于Client/Browser观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略
)
- 既然Webflux不是基于Servlet,那么Spring Security等基于Servlet的组件可以用吗
如果打算将 Web Flux 运行在 Servlet 容器之上,必须是支持
Servlet 3.1 以上
,因为才有非阻塞NIO
的支持,虽然 Web Flux 的 API 在某些地方,确实提供了NIO的选项,若单纯只是试着将基于 Web MVC 的应用程序,改写为套用 Web Flux,并不是十分出色的方案。
=。=
例如,Spring Security 显然就不能用
了,毕竟是 Spring 基于 Servlet 的安全方案,开发者必须想办法改造为Spring Security Reactive
;而且,在储存方案上,也不是直接采用 Spring Data,而是建议 Spring Data Reactive 等,因为现在整个生态都不完善,所以暂时不会有很多东西出来,但是肯定会慢慢完善。
- 如何更好的理解Mono和Flux
- Mono / Flux 都是
Publisher(发布者)
,也就是被观察者(Observable)
。- Mono / Flux 都实现了
org.reactivestreams.Publisher<T>
接口,在 Web Flux,你的方法只需返回 Mono / Flux 即可。你的代码基本也只和 Mono 或 Flux 打交道。而 Web Flux 则会实现 Subscriber ,调用onNext()时将业务开发人员编写的 Mono/Flux 转换为 HTTP Response 返回给客户端。
Maven依赖
<!-- .springframework.boot/spring-boot-starter-webflux -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Entity
简单构造一个请求和响应的业务实体
@Data
public class User {private String id;private String name;private String mobile;private Integer age;
}
Controller
WebFlux复杂?一个controller搞定hello world。
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;import javax.management.relation.RelationNotFoundException;import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;import com.softdev.system.demo.entity.User;import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;@RestController
@RequestMapping("/user")
public class UserController {private final Map<String, User> data = new ConcurrentHashMap<>();@ResponseStatus(value = HttpStatus.NOT_FOUND, reason = "Resource not found")@ExceptionHandler(RelationNotFoundException.class)public void notFound() {}@GetMapping("")public Flux<User> list() {try {Thread.sleep(5000);} catch (InterruptedException e) {}return Flux.fromIterable(this.data.values());}@GetMapping("/{id}")public Mono<User>getById(@PathVariable("id") final String id) {return Mono.justOrEmpty(this.data.get(id)).switchIfEmpty(Mono.error(new RelationNotFoundException("ResourceNotFoundException")));}@PostMapping("")public Mono<User> create(@RequestBody final User user) {this.data.put(user.getId(), user);return Mono.just(user);}@PutMapping("/{id}")public Mono<User> update(@PathVariable("id") final String id, @RequestBody final User user) {Objects.requireNonNull(user);user.setId(id);this.data.put(user.getId(), user);return Mono.just(user);}@DeleteMapping("/{id}")public Mono<User> delete(@PathVariable("id") final String id) {return Mono.justOrEmpty(this.data.remove(id));}
}
Postman并发压测
- 创建用户
curl -X POST http://localhost:9999/webflux/user
-d ‘{“id”:“10001”,“name”:“zhengkai.blog.csdn”,“mobile”:“18522222222”,“age”:18}’
2.创建Collection和Request
curl -X GET http://localhost:9999/webflux/user- 运行Runner
运行单个是单线程测试,运行多个可以进行多线程测试
单线程测试:5ms阻塞的响应
同时运行两个,双线程测试(=。=postman为何不直接提供多线程测试):
无阻塞模式。都是5ms响应两条线程
Reactive化改造?
目前官方reactive
化的资料,隐约feel到一个场景,就是用weblux+redis做热点数据的读取,应该超级无敌爽,比servlet+cache快。当然现在只是有很多的资料,用的人多了,晚点应该就有stater了:
/
How to control backpressure?
如果需要对backpressure背压进行控制,我们可以使用Reactor Operators进行控制,也就是 limitRate()
方法,very convince。
@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {return tweetService.process(tweetsFlux.limitRate(10)).then();
}
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {return tweetService.retreiveAll().limitRate(10);
}
更多推荐
springboot2 webfux 响应式编程(Mono/Flux/Backpressure)
发布评论