From 01ea8f6f96948d8a43d08fe9051aecf8fd6b172b Mon Sep 17 00:00:00 2001 From: "W.Y" <343887809@qq.com> Date: Wed, 11 Oct 2023 13:35:43 +0800 Subject: [PATCH] =?UTF-8?q?[optimization]=20=E9=80=9A=E4=B9=89=E5=8D=83?= =?UTF-8?q?=E9=97=AE=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=9603?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/data/dto/queue/QueuePackagingDTO.java | 21 +++ .../request/qwen/TongYiCommunicationRest.java | 2 +- .../qg/algorithm}/QGResponse.java | 2 +- ...plyRequest.java => QWenReplyResponse.java} | 2 +- .../qiuguo/iot/third/service/QWenService.java | 140 +++++++++++------- 5 files changed, 114 insertions(+), 53 deletions(-) create mode 100644 iot-common/iot-data/src/main/java/com/qiuguo/iot/data/dto/queue/QueuePackagingDTO.java rename iot-common/iot-data/src/main/java/com/qiuguo/iot/data/{request/qwen => resp/qg/algorithm}/QGResponse.java (88%) rename iot-common/iot-data/src/main/java/com/qiuguo/iot/data/resp/qg/algorithm/{QWenReplyRequest.java => QWenReplyResponse.java} (79%) diff --git a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/dto/queue/QueuePackagingDTO.java b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/dto/queue/QueuePackagingDTO.java new file mode 100644 index 0000000..70a7f44 --- /dev/null +++ b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/dto/queue/QueuePackagingDTO.java @@ -0,0 +1,21 @@ +package com.qiuguo.iot.data.dto.queue; + +import lombok.Data; + +import java.util.List; + +@Data +public class QueuePackagingDTO{ + + //最大长度 + private int maxLength; + + //数据 + private T data; + + //状态,0:已完成,1.未发起,2.发起中 + private int status; + + //休眠时间 + private int sleepTime; +} diff --git a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/request/qwen/TongYiCommunicationRest.java b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/request/qwen/TongYiCommunicationRest.java index 2e091a2..6c08ee1 100644 --- a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/request/qwen/TongYiCommunicationRest.java +++ b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/request/qwen/TongYiCommunicationRest.java @@ -6,6 +6,6 @@ import lombok.Data; public class TongYiCommunicationRest { private Integer isWhiteList;//是否为白名单 private String text;//消息 - private String onlyId;//设备序列号 唯一 + private String onlyId;//唯一编码 private String status; //状态,0:已完成,1.未发起,2.发起中 } diff --git a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/request/qwen/QGResponse.java b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/resp/qg/algorithm/QGResponse.java similarity index 88% rename from iot-common/iot-data/src/main/java/com/qiuguo/iot/data/request/qwen/QGResponse.java rename to iot-common/iot-data/src/main/java/com/qiuguo/iot/data/resp/qg/algorithm/QGResponse.java index cc1e742..79fa298 100644 --- a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/request/qwen/QGResponse.java +++ b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/resp/qg/algorithm/QGResponse.java @@ -1,4 +1,4 @@ -package com.qiuguo.iot.data.request.qwen; +package com.qiuguo.iot.data.resp.qg.algorithm; import lombok.Data; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/resp/qg/algorithm/QWenReplyRequest.java b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/resp/qg/algorithm/QWenReplyResponse.java similarity index 79% rename from iot-common/iot-data/src/main/java/com/qiuguo/iot/data/resp/qg/algorithm/QWenReplyRequest.java rename to iot-common/iot-data/src/main/java/com/qiuguo/iot/data/resp/qg/algorithm/QWenReplyResponse.java index 4ce0e45..eb1bbac 100644 --- a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/resp/qg/algorithm/QWenReplyRequest.java +++ b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/resp/qg/algorithm/QWenReplyResponse.java @@ -2,7 +2,7 @@ package com.qiuguo.iot.data.resp.qg.algorithm; import lombok.Data; @Data -public class QWenReplyRequest { +public class QWenReplyResponse { private String resut; private int code; diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java index 6dcf736..e74597f 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java @@ -3,75 +3,126 @@ package com.qiuguo.iot.third.service; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.qiuguo.iot.base.constans.RedisConstans; +import com.qiuguo.iot.base.utils.StringUtils; import com.qiuguo.iot.base.utils.WebClientUtils; -import com.qiuguo.iot.data.request.qwen.QGResponse; +import com.qiuguo.iot.data.dto.queue.QueuePackagingDTO; +import com.qiuguo.iot.data.resp.qg.algorithm.QGResponse; import com.qiuguo.iot.data.request.qwen.TongYiCommunicationRest; -import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyRequest; +import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.ReactiveValueOperations; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import reactor.core.Disposable; import reactor.core.publisher.Mono; import javax.annotation.Resource; +import java.time.Duration; import java.util.*; import java.util.function.Function; +import java.util.stream.Collectors; @Slf4j -@RestController -@RequestMapping("/aaaa") +@Service public class QWenService { @Resource private ReactiveStringRedisTemplate reactiveStringRedisTemplate; - @PostMapping("/aaa") - public Mono communication(TongYiCommunicationRest rest) { + public Mono communication(TongYiCommunicationRest rest) { ReactiveValueOperations operations = reactiveStringRedisTemplate.opsForValue(); - return operations.get(RedisConstans.TY_QUEUE_LIST + rest.getOnlyId()).defaultIfEmpty("").flatMap(res -> { - JSONArray objects = JSONArray.parseArray(res); - List list = new LinkedList<>(); - try { - if (!ObjectUtils.isEmpty(objects)) { - list = JSONObject.parseArray(res, TongYiCommunicationRest.class); - if (rest.getIsWhiteList() == 0) { - //是白名单,插入头部 - log.info("收到白名单对应信息,接收参数为:{}", rest); - list.add(0, rest); + String queueKey = RedisConstans.TY_QUEUE_LIST; + + return operations.get(queueKey).defaultIfEmpty("") + .flatMap(res -> { + List list = new LinkedList<>(); + try { + if (!StringUtils.isEmpty(res)) { + list = JSONObject.parseArray(res, TongYiCommunicationRest.class); + //判断是否是白名单,是白名单插入头部,不是白名单插入尾部 + if (rest.getIsWhiteList() == 0) { + log.info("收到白名单对应信息,接收参数为: {}", rest); + list.add(0, rest); + } + list.add(rest); + } + + // 设置新的队列数据到Redis + operations.set(queueKey, JSONArray.toJSONString(list)).subscribe(); + + // 使用Mono.defer包装redisConsumption调用,确保异步执行 + List finalList = list; + return Mono.defer(() -> redisConsumption(queueKey, new QueuePackagingDTO<>())) + .flatMap(t -> { + log.info("当前消费的数据为: {}", t); + finalList.remove(t); + // 删除队列数据 + operations.set(queueKey, JSONArray.toJSONString(finalList)).subscribe(); + + // 调用通义千问接口 + HashMap map = new HashMap<>(); + map.put("task_type", "qwen"); + map.put("text", rest.getText()); + return httpTY(map); + }); + + } catch (Exception e) { + log.error("调用异常:", e); + return Mono.just(new QWenReplyResponse()); + } finally { + // 删除临时队列数据 + reactiveStringRedisTemplate.delete(queueKey).subscribe(); } - } - //不是白名单插入尾部 - list.add(rest); - operations.set(RedisConstans.TY_QUEUE_LIST + rest.getOnlyId(), JSONArray.toJSONString(list)).subscribe(); - //通知Redis让Redis进行消费 - HashMap map = new HashMap<>(); - map.put("task_type", "qwen"); - map.put("text", rest.getText()); - return httpTY(map); - } catch (Exception e) { - log.error("调用异常:"); - return Mono.just(new QWenReplyRequest()); - } finally { - reactiveStringRedisTemplate.delete(RedisConstans.TY_QUEUE_LIST + rest.getOnlyId()).subscribe(); - } - }); + }); } - public static Mono httpTY(HashMap map) { + /** + * 判断传入数据状态是否是已完成,如果是已完成则将其过滤 + */ + + /** + * 根据Redis中的key和实体类进行消费 + * @param key:redis的key + * @param clazz:对应实体类的class + * @return :返回对应的实体类 + */ + /** + * 1. 创建一个公共父类,用于记录当前的请求参数中的最大允许条数 + * 2. 根据当前条数进行判定是否需要消费,如果不需要消费则进行等待 + * 3. 如果需要消费,则进行消费,并且将当前的条数进行减一 + */ + public Mono redisConsumption(String key, QueuePackagingDTO queue){ + return reactiveStringRedisTemplate.opsForValue().get(key) + .flatMap(s -> { + List ts = JSONArray.parseArray(s, QueuePackagingDTO.class); + if (CollectionUtils.isEmpty(ts)) { + return Mono.just(new QueuePackagingDTO<>()); + } + //获取ts中status为1的值并且判断当前为1的值是否到达了5个如果达到了就休眠五秒后在进行消费 + List collect = ts.stream().filter(t -> t.getStatus() == 1).collect(Collectors.toList()); + if (collect.size() >= queue.getMaxLength()) { + // 休眠一段时间后重试 + return Mono.delay(Duration.ofMillis(queue.getSleepTime())) + .then(Mono.defer(() -> redisConsumption(key, queue))); + } + return Mono.just(collect.stream().findFirst().orElse(new QueuePackagingDTO<>())); + + }); + } + + public static Mono httpTY(HashMap map) { return WebClientUtils.post("http://192.168.8.211:5010/qg_human/qwen", new JSONObject(map)).flatMap(jsonObject -> { long time = new Date().getTime(); QGResponse qgResponse = jsonObject.toJavaObject(QGResponse.class); - QWenReplyRequest data = new QWenReplyRequest(); + QWenReplyResponse data = new QWenReplyResponse(); log.info("发起TY请求,响应时间时间为:{}", new Date().getTime() - time); if (qgResponse.getStatusCode() == 0) { - data = JSONObject.parseObject(qgResponse.getData().toString(), QWenReplyRequest.class); + data = JSONObject.parseObject(qgResponse.getData().toString(), QWenReplyResponse.class); //成功 log.info("通知成功:" + qgResponse.getData()); data.setCode(200); @@ -81,19 +132,8 @@ public class QWenService { log.info("通知失败:" + map); data.setCode(500); return Mono.just(data); + }); } - public static void main(String[] args) { - HashMap map = new HashMap<>(); - map.put("task_type", "qwen"); - map.put("text", "你好"); - try { - httpTY(map); - } catch (Exception e) { - log.info("异常:{}", e); - } - - } - }