56 lines
5.8 KiB
Markdown
56 lines
5.8 KiB
Markdown
# 响应式编程
|
||
|
||
### webflux
|
||
#### Flux:实现发布者 Publisher,并返回 N 个元素。
|
||
##### 通过静态方法创建 Flux
|
||
Reactor 中静态创建 Flux 的方法常见的包括 just()、range()、interval() 以及各种以 from- 为前缀的方法组等。因为 Flux 可以代表 0 个数据,所以也有一些专门用于创建空序列的工具方法。
|
||
empty()、error() 和 never()这三个方法类创建一些特殊的数据序列。其中,如果你希望创建一个只包含结束消息的空序列,那么可以使用 empty() 方法。error() 方法可以创建一个只包含错误消息的序列。如果你不希望所创建的序列不发出任何类似的消息通知,也可以使用 never() 方法实现这一目标。当然,这几个方法都比较少用,通常只用于调试和测试。
|
||
* just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
|
||
* fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
|
||
* empty():创建一个不包含任何元素,只发布结束消息的序列。
|
||
* error(Throwable error):创建一个只包含错误消息的序列。
|
||
* never():创建一个不包含任何消息通知的序列。
|
||
* range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
|
||
* interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
|
||
* intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
|
||
|
||
##### 通过动态方法创建 Flux
|
||
动态创建 Flux 所采用的就是以编程的方式创建数据序列,最常用的就是 generate() 方法和 create() 方法。
|
||
generate() 方法生成 Flux 序列依赖于 Reactor 所提供的 SynchronousSink 组件
|
||
SynchronousSink 组件包括 next()、complete() 和 error() 这三个核心方法。从 SynchronousSink 组件的命名上就能知道它是一个同步的 Sink 组件,也就是说元素的生成过程是同步执行的。
|
||
## next() 方法只能最多被调用一次
|
||
create() 方法与 generate() 方法比较类似,但它使用的是一个 FluxSink 组件
|
||
FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还定义了背压策略,并且可以在一次调用中产生多个元素
|
||
* Flux.create(sink -> {
|
||
* for (int i = 0; i < 5; i++) {
|
||
* sink.next("javaedge" + i);
|
||
* }
|
||
* sink.complete();
|
||
* }).subscribe(System.out::println);
|
||
运行该程序,我们会在系统控制台上得到从“javaedge0”到“javaedge4”的 5 个数据
|
||
#### Mono:实现发布者 Publisher,并返回 0 或 1 个元素。
|
||
可认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 justOrEmpty() 等方法。
|
||
justOrEmpty() 方法会先判断所传入的对象中是否包含值,只有在传入对象不为空时,Mono 序列才生成对应的元素
|
||
另一方面,如果要想动态创建 Mono,我们同样也可以通过 create() 方法并使用 MonoSink 组件
|
||
* just(),empty(),error()和 never()同Flux
|
||
* fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
|
||
* delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
|
||
* ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
|
||
* justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
|
||
#### 订阅响应式流
|
||
可通过 subscribe() 添加相应的订阅逻辑。调用 subscribe() 方法时可指定需要处理的消息通知类型。
|
||
|
||
Flux 和 Mono 提供了一批非常有用的 subscribe() 方法重载方法,大大简化订阅的开发例程。这些重载方法包括:
|
||
* subscribe();//订阅流的最简单方法,忽略所有消息通知
|
||
* subscribe(Consumer<T> dataConsumer);//对每个来自 onNext 通知的值调用 dataConsumer,但不处理 onError 和 onComplete 通知
|
||
* subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer);//在前一个重载方法的基础上添加对 onError 通知的处理
|
||
* subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer, Runnable completeConsumer);//在前一个重载方法的基础上添加对 onComplete 通知的处理
|
||
* subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer, Runnable completeConsumer, Consumer<Subscription> subscriptionConsumer);//这种重载方法允许通过请求足够数量的数据来控制订阅过程
|
||
* subscribe(Subscriber<T> subscriber);//订阅序列的最通用方式,可以为我们的 Subscriber 实现提供所需的任意行为
|
||
|
||
* [Official Apache Maven documentation](https://maven.apache.org/guides/index.html)
|
||
* [Spring Boot Maven Plugin Reference Guide](https://docs.spring.io/spring-boot/docs/2.7.15-SNAPSHOT/maven-plugin/reference/html/)
|
||
* [Create an OCI image](https://docs.spring.io/spring-boot/docs/2.7.15-SNAPSHOT/maven-plugin/reference/html/#build-image)
|
||
* [Cloud Bootstrap](https://docs.spring.io/spring-cloud-commons/docs/current/reference/html/)
|
||
|