增加部分示例代码

This commit is contained in:
wulin 2023-08-03 11:56:48 +08:00
parent 02f59c18ae
commit 26acc62ae0
7 changed files with 77 additions and 24 deletions

View File

@ -5,6 +5,15 @@
##### 通过静态方法创建 Flux
Reactor 中静态创建 Flux 的方法常见的包括 just()、range()、interval() 以及各种以 from- 为前缀的方法组等。因为 Flux 可以代表 0 个数据,所以也有一些专门用于创建空序列的工具方法。
empty()、error() 和 never()这三个方法类创建一些特殊的数据序列。其中,如果你希望创建一个只包含结束消息的空序列,那么可以使用 empty() 方法。error() 方法可以创建一个只包含错误消息的序列。如果你不希望所创建的序列不发出任何类似的消息通知,也可以使用 never() 方法实现这一目标。当然,这几个方法都比较少用,通常只用于调试和测试。
* just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
* fromArray()fromIterable()和 fromStream()可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
* empty():创建一个不包含任何元素,只发布结束消息的序列。
* error(Throwable error):创建一个只包含错误消息的序列。
* never():创建一个不包含任何消息通知的序列。
* range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
* interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
* intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
##### 通过动态方法创建 Flux
动态创建 Flux 所采用的就是以编程的方式创建数据序列,最常用的就是 generate() 方法和 create() 方法。
generate() 方法生成 Flux 序列依赖于 Reactor 所提供的 SynchronousSink 组件
@ -23,6 +32,11 @@ FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还
可认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 justOrEmpty() 等方法。
justOrEmpty() 方法会先判断所传入的对象中是否包含值只有在传入对象不为空时Mono 序列才生成对应的元素
另一方面,如果要想动态创建 Mono我们同样也可以通过 create() 方法并使用 MonoSink 组件
* just()empty()error()和 never()同Flux
* fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
* delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
* ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
* justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时Mono 序列才产生对应的元素。
#### 订阅响应式流
可通过 subscribe() 添加相应的订阅逻辑。调用 subscribe() 方法时可指定需要处理的消息通知类型。

View File

@ -1,13 +1,14 @@
package com.qiuguo.iot.admin.http.api.controller;
import com.qiuguo.iot.admin.http.api.req.DemoReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.stream.IntStream;
@RestController
@ -52,4 +53,28 @@ public class DemoController {
return flux;
}
@PostMapping("/post")//请求数据示例{"aaa":"bbb"}
public Mono<String> postDemo(@RequestBody Mono<Map<String, Object>> contextMono){
Mono<String> mono =contextMono.map(item->{
for (Object o:item.values()
) {
log.info("接受到的信息{}", o);
}
return "1234";//返回结果
});
return mono;
}
@PostMapping("/post/demo")//请求数据示例{"name":"bbb","id":123}
public Mono<DemoReq> postTest(@RequestBody Mono<DemoReq> contextMono){
Mono<DemoReq> mono =contextMono.map(item->{
log.info("接收到的对象:{}", item);
item.setId(456l);
item.setName("gaibian");
return item;//返回结果
});
return mono;
}
}

View File

@ -0,0 +1,9 @@
package com.qiuguo.iot.admin.http.api.req;
import lombok.Data;
@Data
public class DemoReq {
private String name;
private Long id;
}

View File

@ -24,9 +24,14 @@ public class DemoDataService extends GenericReactiveCrudService<DemoDataEntity,
private final ReactiveUserService userService;
private final ReactiveAuthenticationManager authenticationManager;
public Mono<DemoDataEntity> findDemoDataDetail(Long id){
ReactiveValueOperations<String, Objects> operations = reactiveRedisTemplate.opsForValue();
Mono<Objects> obj = operations.get("key");
return Mono.just(new DemoDataEntity());/*Mono.zip(userService.findById(id.toString()),
/*
ReactiveValueOperations String value 的操作视图
操作视图还有ReactiveHashOperationsReactiveListOperationsReactiveSetOperations ReactiveZSetOperations
*/
ReactiveValueOperations<String, DemoDataEntity> operations = reactiveRedisTemplate.opsForValue();
Mono<DemoDataEntity> obj = operations.get("key");
return obj;
/*return Mono.just(new DemoDataEntity());/*Mono.zip(userService.findById(id.toString()),
this.findById(id.toString()).defaultIfEmpty(new DemoDataEntity()),
authenticationManager.getByUserId(id.toString())
.map(Authentication::getDimensions).defaultIfEmpty(Collections.emptyList()).map(

View File

@ -1,5 +1,5 @@
server:
port: 8090
port: 8091
spring:
profiles:
# 环境配置

View File

@ -32,15 +32,15 @@ public class BoxWebSocketHandler implements WebSocketHandler {
/**
* 所有websocket连接管理容器
**/
CountDownLatch countDownLatch = new CountDownLatch(5);
//CountDownLatch countDownLatch = new CountDownLatch(5);
FluxBatchRunner<MsgProtocol.Msg> batchRunner = new FluxBatchRunner<>(20, data -> {
log.info("begin:: thread={} 大小:{}", Thread.currentThread().getName(), data.size());
for (MsgProtocol.Msg msg:data
) {
for (UserInfo info:group.values()
) {
//info.sendData(info.getSession().textMessage(""));
info.sendData(msg);
info.sendData(info.getSession().textMessage(""));
//info.sendData(msg);
}
}
//data.clear();
@ -49,7 +49,7 @@ public class BoxWebSocketHandler implements WebSocketHandler {
//countDownLatch.countDown();
//System.out.printf("begin:: thread=%s; data=%s", Thread.currentThread().getName());
log.info("end:: thread={} 大小{}", Thread.currentThread().getName(), data.size());
//log.info("end:: thread={} 大小{}", Thread.currentThread().getName(), data.size());
//System.out.printf("end:: thread=%s; size=%s%n", Thread.currentThread().getName(), data.size());
});
@Override
@ -86,22 +86,12 @@ public class BoxWebSocketHandler implements WebSocketHandler {
if(reqMsg.getCmdId().getNumber() == MsgProtocol.Msg.CmdId.SEND_STATUS_REQ_VALUE){
MsgProtocol.Msg reqMsg1 = reqMsg.toBuilder().setCmdId(MsgProtocol.Msg.CmdId.SYNC_STATUS_RSP).build();
for (UserInfo info:group.values()
/*for (UserInfo info:group.values()
) {
//info.sendData(info.getSession().textMessage(""));
info.sendData(reqMsg1);
}
//batchRunner.add(reqMsg1);
//IntStream.range(0, 100).forEach(batchRunner::add);
//group.values().forEach(batchRunner::add);
//batchRunner.doFinal();
/*try {
countDownLatch.await();
}catch (Exception e){
}*/
log.info("进来");
//group.forEach(batchRunner::add);
batchRunner.add(reqMsg1);
return Mono.empty();//session.textMessage("");
}

View File

@ -1,5 +1,7 @@
package com.qiuguo.iot.box.websocket.runner;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -28,7 +30,11 @@ public class FluxBatchRunner<T> {
Objects.requireNonNull(consumer);
this.batchSize = batchSize;
this.consumer = consumer;
scheduler = Schedulers.parallel();
//scheduler = Schedulers.parallel();//创建CPU内核数量一样多线程池
//Schedulers.elastic() //无限制的弹性线程池,可以一直创建线程
//Schedulers.boundedElastic() //有界的弹性线程池,它会回收闲置的线程默认是60s;它对创建的线程数做了限制默认值为CPU内核数x 10达到上限后最多可提交10万个任务
//Schedulers.fromExecutorService() 根据我们自定义线程池进行引用
scheduler = Schedulers.single();//可重用单个线程
records = new ArrayList<>(batchSize);
}
@ -70,7 +76,11 @@ public class FluxBatchRunner<T> {
}
private void subscribeToMono(Collection<T> records) {
//subscribOn 指定指定调度器它会让整个事件处理流程进入切换线程处理;需要注意的是不管怎么切换调度器他都是单线程执行
Mono.just(records).subscribeOn(scheduler).subscribe(data -> consumer.accept(data));
//需要多线程执行必须使用parallel+runOn 如下
//Flux.just(records).parallel().runOn(scheduler).subscribe(data ->consumer.accept(data));
}
public static void main(String[] args) {