From 26acc62ae01a2919814ff84520bef88456116abd Mon Sep 17 00:00:00 2001 From: wulin Date: Thu, 3 Aug 2023 11:56:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E9=83=A8=E5=88=86=E7=A4=BA?= =?UTF-8?q?=E4=BE=8B=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 14 +++++++++ .../http/api/controller/DemoController.java | 31 +++++++++++++++++-- .../iot/admin/http/api/req/DemoReq.java | 9 ++++++ .../http/api/service/DemoDataService.java | 11 +++++-- .../src/main/resources/bootstrap.yml | 2 +- .../handler/BoxWebSocketHandler.java | 22 ++++--------- .../box/websocket/runner/FluxBatchRunner.java | 12 ++++++- 7 files changed, 77 insertions(+), 24 deletions(-) create mode 100644 iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/req/DemoReq.java diff --git a/README.md b/README.md index 1417388..ef14f6c 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,15 @@ ##### 通过静态方法创建 Flux Reactor 中静态创建 Flux 的方法常见的包括 just()、range()、interval() 以及各种以 from- 为前缀的方法组等。因为 Flux 可以代表 0 个数据,所以也有一些专门用于创建空序列的工具方法。 empty()、error() 和 never()这三个方法类创建一些特殊的数据序列。其中,如果你希望创建一个只包含结束消息的空序列,那么可以使用 empty() 方法。error() 方法可以创建一个只包含错误消息的序列。如果你不希望所创建的序列不发出任何类似的消息通知,也可以使用 never() 方法实现这一目标。当然,这几个方法都比较少用,通常只用于调试和测试。 +* just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。 +* fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。 +* empty():创建一个不包含任何元素,只发布结束消息的序列。 +* error(Throwable error):创建一个只包含错误消息的序列。 +* never():创建一个不包含任何消息通知的序列。 +* range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。 +* interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。 +* intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。 + ##### 通过动态方法创建 Flux 动态创建 Flux 所采用的就是以编程的方式创建数据序列,最常用的就是 generate() 方法和 create() 方法。 generate() 方法生成 Flux 序列依赖于 Reactor 所提供的 SynchronousSink 组件 @@ -23,6 +32,11 @@ FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还 可认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 justOrEmpty() 等方法。 justOrEmpty() 方法会先判断所传入的对象中是否包含值,只有在传入对象不为空时,Mono 序列才生成对应的元素 另一方面,如果要想动态创建 Mono,我们同样也可以通过 create() 方法并使用 MonoSink 组件 +* just(),empty(),error()和 never()同Flux +* fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。 +* delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。 +* ignoreElements(Publisher source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。 +* justOrEmpty(Optional data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。 #### 订阅响应式流 可通过 subscribe() 添加相应的订阅逻辑。调用 subscribe() 方法时可指定需要处理的消息通知类型。 diff --git a/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/controller/DemoController.java b/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/controller/DemoController.java index 34a8004..66d0b3f 100644 --- a/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/controller/DemoController.java +++ b/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/controller/DemoController.java @@ -1,13 +1,14 @@ package com.qiuguo.iot.admin.http.api.controller; +import com.qiuguo.iot.admin.http.api.req.DemoReq; import lombok.extern.slf4j.Slf4j; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; +import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; +import java.util.Map; import java.util.stream.IntStream; @RestController @@ -52,4 +53,28 @@ public class DemoController { return flux; } + @PostMapping("/post")//请求数据示例:{"aaa":"bbb"} + public Mono postDemo(@RequestBody Mono> contextMono){ + Mono mono =contextMono.map(item->{ + for (Object o:item.values() + ) { + log.info("接受到的信息{}", o); + } + return "1234";//返回结果 + }); + + return mono; + } + @PostMapping("/post/demo")//请求数据示例:{"name":"bbb","id":123} + public Mono postTest(@RequestBody Mono contextMono){ + Mono mono =contextMono.map(item->{ + log.info("接收到的对象:{}", item); + item.setId(456l); + item.setName("gaibian"); + return item;//返回结果 + }); + + return mono; + } + } diff --git a/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/req/DemoReq.java b/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/req/DemoReq.java new file mode 100644 index 0000000..7f767dc --- /dev/null +++ b/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/req/DemoReq.java @@ -0,0 +1,9 @@ +package com.qiuguo.iot.admin.http.api.req; + +import lombok.Data; + +@Data +public class DemoReq { + private String name; + private Long id; +} diff --git a/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/service/DemoDataService.java b/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/service/DemoDataService.java index 934829d..e8fd303 100644 --- a/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/service/DemoDataService.java +++ b/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/service/DemoDataService.java @@ -24,9 +24,14 @@ public class DemoDataService extends GenericReactiveCrudService findDemoDataDetail(Long id){ - ReactiveValueOperations operations = reactiveRedisTemplate.opsForValue(); - Mono obj = operations.get("key"); - return Mono.just(new DemoDataEntity());/*Mono.zip(userService.findById(id.toString()), + /* + ReactiveValueOperations 是 String (或 value) 的操作视图, + 操作视图还有ReactiveHashOperations、ReactiveListOperations、ReactiveSetOperations 和 ReactiveZSetOperations + */ + ReactiveValueOperations operations = reactiveRedisTemplate.opsForValue(); + Mono obj = operations.get("key"); + return obj; + /*return Mono.just(new DemoDataEntity());/*Mono.zip(userService.findById(id.toString()), this.findById(id.toString()).defaultIfEmpty(new DemoDataEntity()), authenticationManager.getByUserId(id.toString()) .map(Authentication::getDimensions).defaultIfEmpty(Collections.emptyList()).map( diff --git a/iot-modules/iot-admin-http-api/src/main/resources/bootstrap.yml b/iot-modules/iot-admin-http-api/src/main/resources/bootstrap.yml index a828132..fd251a6 100644 --- a/iot-modules/iot-admin-http-api/src/main/resources/bootstrap.yml +++ b/iot-modules/iot-admin-http-api/src/main/resources/bootstrap.yml @@ -1,5 +1,5 @@ server: - port: 8090 + port: 8091 spring: profiles: # 环境配置 diff --git a/iot-modules/iot-box-websocket/src/main/java/com/qiuguo/iot/box/websocket/handler/BoxWebSocketHandler.java b/iot-modules/iot-box-websocket/src/main/java/com/qiuguo/iot/box/websocket/handler/BoxWebSocketHandler.java index 5d7545d..e6becb5 100644 --- a/iot-modules/iot-box-websocket/src/main/java/com/qiuguo/iot/box/websocket/handler/BoxWebSocketHandler.java +++ b/iot-modules/iot-box-websocket/src/main/java/com/qiuguo/iot/box/websocket/handler/BoxWebSocketHandler.java @@ -32,15 +32,15 @@ public class BoxWebSocketHandler implements WebSocketHandler { /** * 所有websocket连接管理容器 **/ - CountDownLatch countDownLatch = new CountDownLatch(5); + //CountDownLatch countDownLatch = new CountDownLatch(5); FluxBatchRunner batchRunner = new FluxBatchRunner<>(20, data -> { log.info("begin:: thread={} 大小:{}", Thread.currentThread().getName(), data.size()); for (MsgProtocol.Msg msg:data ) { for (UserInfo info:group.values() ) { - //info.sendData(info.getSession().textMessage("")); - info.sendData(msg); + info.sendData(info.getSession().textMessage("")); + //info.sendData(msg); } } //data.clear(); @@ -49,7 +49,7 @@ public class BoxWebSocketHandler implements WebSocketHandler { //countDownLatch.countDown(); //System.out.printf("begin:: thread=%s; data=%s", Thread.currentThread().getName()); - log.info("end:: thread={} 大小{}", Thread.currentThread().getName(), data.size()); + //log.info("end:: thread={} 大小{}", Thread.currentThread().getName(), data.size()); //System.out.printf("end:: thread=%s; size=%s%n", Thread.currentThread().getName(), data.size()); }); @Override @@ -86,22 +86,12 @@ public class BoxWebSocketHandler implements WebSocketHandler { if(reqMsg.getCmdId().getNumber() == MsgProtocol.Msg.CmdId.SEND_STATUS_REQ_VALUE){ MsgProtocol.Msg reqMsg1 = reqMsg.toBuilder().setCmdId(MsgProtocol.Msg.CmdId.SYNC_STATUS_RSP).build(); - for (UserInfo info:group.values() + /*for (UserInfo info:group.values() ) { //info.sendData(info.getSession().textMessage("")); info.sendData(reqMsg1); - } - //batchRunner.add(reqMsg1); - //IntStream.range(0, 100).forEach(batchRunner::add); - //group.values().forEach(batchRunner::add); - //batchRunner.doFinal(); - /*try { - countDownLatch.await(); - }catch (Exception e){ - }*/ - log.info("进来"); - //group.forEach(batchRunner::add); + batchRunner.add(reqMsg1); return Mono.empty();//session.textMessage(""); } diff --git a/iot-modules/iot-box-websocket/src/main/java/com/qiuguo/iot/box/websocket/runner/FluxBatchRunner.java b/iot-modules/iot-box-websocket/src/main/java/com/qiuguo/iot/box/websocket/runner/FluxBatchRunner.java index 37f7a21..0c265e3 100644 --- a/iot-modules/iot-box-websocket/src/main/java/com/qiuguo/iot/box/websocket/runner/FluxBatchRunner.java +++ b/iot-modules/iot-box-websocket/src/main/java/com/qiuguo/iot/box/websocket/runner/FluxBatchRunner.java @@ -1,5 +1,7 @@ package com.qiuguo.iot.box.websocket.runner; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -28,7 +30,11 @@ public class FluxBatchRunner { Objects.requireNonNull(consumer); this.batchSize = batchSize; this.consumer = consumer; - scheduler = Schedulers.parallel(); + //scheduler = Schedulers.parallel();//创建CPU内核数量一样多线程池; + //Schedulers.elastic() //无限制的弹性线程池,可以一直创建线程 + //Schedulers.boundedElastic() //有界的弹性线程池,它会回收闲置的线程,默认是60s;它对创建的线程数做了限制,默认值为CPU内核数x 10,达到上限后,最多可提交10万个任务; + //Schedulers.fromExecutorService() 根据我们自定义线程池进行引用; + scheduler = Schedulers.single();//可重用单个线程 records = new ArrayList<>(batchSize); } @@ -70,7 +76,11 @@ public class FluxBatchRunner { } private void subscribeToMono(Collection records) { + //subscribOn : 指定指定调度器,它会让整个事件处理流程进入切换线程处理;需要注意的是:不管怎么切换调度器,他都是单线程执行 Mono.just(records).subscribeOn(scheduler).subscribe(data -> consumer.accept(data)); + + //需要多线程执行必须使用parallel+runOn 如下 + //Flux.just(records).parallel().runOn(scheduler).subscribe(data ->consumer.accept(data)); } public static void main(String[] args) {