qiuguo-iot/README.md
2023-08-03 11:56:48 +08:00

56 lines
5.8 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 响应式编程
### webflux
#### Flux实现发布者 Publisher并返回 N 个元素。
##### 通过静态方法创建 Flux
Reactor 中静态创建 Flux 的方法常见的包括 just()、range()、interval() 以及各种以 from- 为前缀的方法组等。因为 Flux 可以代表 0 个数据,所以也有一些专门用于创建空序列的工具方法。
empty()、error() 和 never()这三个方法类创建一些特殊的数据序列。其中,如果你希望创建一个只包含结束消息的空序列,那么可以使用 empty() 方法。error() 方法可以创建一个只包含错误消息的序列。如果你不希望所创建的序列不发出任何类似的消息通知,也可以使用 never() 方法实现这一目标。当然,这几个方法都比较少用,通常只用于调试和测试。
* just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
* fromArray()fromIterable()和 fromStream()可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
* empty():创建一个不包含任何元素,只发布结束消息的序列。
* error(Throwable error):创建一个只包含错误消息的序列。
* never():创建一个不包含任何消息通知的序列。
* range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
* interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
* intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
##### 通过动态方法创建 Flux
动态创建 Flux 所采用的就是以编程的方式创建数据序列,最常用的就是 generate() 方法和 create() 方法。
generate() 方法生成 Flux 序列依赖于 Reactor 所提供的 SynchronousSink 组件
SynchronousSink 组件包括 next()、complete() 和 error() 这三个核心方法。从 SynchronousSink 组件的命名上就能知道它是一个同步的 Sink 组件,也就是说元素的生成过程是同步执行的。
## next() 方法只能最多被调用一次
create() 方法与 generate() 方法比较类似,但它使用的是一个 FluxSink 组件
FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还定义了背压策略,并且可以在一次调用中产生多个元素
* Flux.create(sink -> {
* for (int i = 0; i < 5; i++) {
* sink.next("javaedge" + i);
* }
* sink.complete();
* }).subscribe(System.out::println);
运行该程序我们会在系统控制台上得到从javaedge0javaedge4 5 个数据
#### Mono实现发布者 Publisher并返回 0 或 1 个元素。
可认为它是 Flux 的一种特例所以很多创建 Flux 的方法同样适用针对静态创建 Mono 的场景前面给出的 just()、empty()、error() never() 等方法同样适用除了这些方法之外比较常用的还有 justOrEmpty() 等方法
justOrEmpty() 方法会先判断所传入的对象中是否包含值只有在传入对象不为空时Mono 序列才生成对应的元素
另一方面如果要想动态创建 Mono我们同样也可以通过 create() 方法并使用 MonoSink 组件
* just()empty()error() never()同Flux
* fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable() fromSupplier()分别从 CallableCompletionStageCompletableFutureRunnable Supplier 中创建 Mono
* delay(Duration duration) delayMillis(long duration)创建一个 Mono 序列在指定的延迟时间之后产生数字 0 作为唯一值
* ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
* justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时Mono 序列才产生对应的元素。
#### 订阅响应式流
可通过 subscribe() 添加相应的订阅逻辑。调用 subscribe() 方法时可指定需要处理的消息通知类型。
Flux 和 Mono 提供了一批非常有用的 subscribe() 方法重载方法,大大简化订阅的开发例程。这些重载方法包括:
* subscribe();//订阅流的最简单方法,忽略所有消息通知
* subscribe(Consumer<T> dataConsumer);//对每个来自 onNext 通知的值调用 dataConsumer但不处理 onError 和 onComplete 通知
* subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer);//在前一个重载方法的基础上添加对 onError 通知的处理
* subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer, Runnable completeConsumer);//在前一个重载方法的基础上添加对 onComplete 通知的处理
* subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer, Runnable completeConsumer, Consumer<Subscription> subscriptionConsumer);//这种重载方法允许通过请求足够数量的数据来控制订阅过程
* subscribe(Subscriber<T> subscriber);//订阅序列的最通用方式,可以为我们的 Subscriber 实现提供所需的任意行为
* [Official Apache Maven documentation](https://maven.apache.org/guides/index.html)
* [Spring Boot Maven Plugin Reference Guide](https://docs.spring.io/spring-boot/docs/2.7.15-SNAPSHOT/maven-plugin/reference/html/)
* [Create an OCI image](https://docs.spring.io/spring-boot/docs/2.7.15-SNAPSHOT/maven-plugin/reference/html/#build-image)
* [Cloud Bootstrap](https://docs.spring.io/spring-cloud-commons/docs/current/reference/html/)