[optimization] 通义千问代码优化03

This commit is contained in:
W.Y 2023-10-11 13:35:43 +08:00
parent ba795bf4c4
commit 01ea8f6f96
5 changed files with 114 additions and 53 deletions

View File

@ -0,0 +1,21 @@
package com.qiuguo.iot.data.dto.queue;
import lombok.Data;
import java.util.List;
@Data
public class QueuePackagingDTO<T>{
//最大长度
private int maxLength;
//数据
private T data;
//状态,0:已完成,1.未发起,2.发起中
private int status;
//休眠时间
private int sleepTime;
}

View File

@ -6,6 +6,6 @@ import lombok.Data;
public class TongYiCommunicationRest {
private Integer isWhiteList;//是否为白名单
private String text;//消息
private String onlyId;//设备序列号 唯一
private String onlyId;//唯一编码
private String status; //状态,0:已完成,1.未发起,2.发起中
}

View File

@ -1,4 +1,4 @@
package com.qiuguo.iot.data.request.qwen;
package com.qiuguo.iot.data.resp.qg.algorithm;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -2,7 +2,7 @@ package com.qiuguo.iot.data.resp.qg.algorithm;
import lombok.Data;
@Data
public class QWenReplyRequest {
public class QWenReplyResponse {
private String resut;
private int code;

View File

@ -3,75 +3,126 @@ package com.qiuguo.iot.third.service;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.qiuguo.iot.base.constans.RedisConstans;
import com.qiuguo.iot.base.utils.StringUtils;
import com.qiuguo.iot.base.utils.WebClientUtils;
import com.qiuguo.iot.data.request.qwen.QGResponse;
import com.qiuguo.iot.data.dto.queue.QueuePackagingDTO;
import com.qiuguo.iot.data.resp.qg.algorithm.QGResponse;
import com.qiuguo.iot.data.request.qwen.TongYiCommunicationRest;
import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyRequest;
import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.ReactiveValueOperations;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@RestController
@RequestMapping("/aaaa")
@Service
public class QWenService {
@Resource
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;
@PostMapping("/aaa")
public Mono<QWenReplyRequest> communication(TongYiCommunicationRest rest) {
public Mono<QWenReplyResponse> communication(TongYiCommunicationRest rest) {
ReactiveValueOperations<String, String> operations = reactiveStringRedisTemplate.opsForValue();
return operations.get(RedisConstans.TY_QUEUE_LIST + rest.getOnlyId()).defaultIfEmpty("").flatMap(res -> {
JSONArray objects = JSONArray.parseArray(res);
List<TongYiCommunicationRest> list = new LinkedList<>();
try {
if (!ObjectUtils.isEmpty(objects)) {
list = JSONObject.parseArray(res, TongYiCommunicationRest.class);
if (rest.getIsWhiteList() == 0) {
//是白名单,插入头部
log.info("收到白名单对应信息,接收参数为:{}", rest);
list.add(0, rest);
String queueKey = RedisConstans.TY_QUEUE_LIST;
return operations.get(queueKey).defaultIfEmpty("")
.flatMap(res -> {
List<TongYiCommunicationRest> list = new LinkedList<>();
try {
if (!StringUtils.isEmpty(res)) {
list = JSONObject.parseArray(res, TongYiCommunicationRest.class);
//判断是否是白名单,是白名单插入头部,不是白名单插入尾部
if (rest.getIsWhiteList() == 0) {
log.info("收到白名单对应信息,接收参数为: {}", rest);
list.add(0, rest);
}
list.add(rest);
}
// 设置新的队列数据到Redis
operations.set(queueKey, JSONArray.toJSONString(list)).subscribe();
// 使用Mono.defer包装redisConsumption调用确保异步执行
List<TongYiCommunicationRest> finalList = list;
return Mono.defer(() -> redisConsumption(queueKey, new QueuePackagingDTO<>()))
.flatMap(t -> {
log.info("当前消费的数据为: {}", t);
finalList.remove(t);
// 删除队列数据
operations.set(queueKey, JSONArray.toJSONString(finalList)).subscribe();
// 调用通义千问接口
HashMap<String, Object> map = new HashMap<>();
map.put("task_type", "qwen");
map.put("text", rest.getText());
return httpTY(map);
});
} catch (Exception e) {
log.error("调用异常:", e);
return Mono.just(new QWenReplyResponse());
} finally {
// 删除临时队列数据
reactiveStringRedisTemplate.delete(queueKey).subscribe();
}
}
//不是白名单插入尾部
list.add(rest);
operations.set(RedisConstans.TY_QUEUE_LIST + rest.getOnlyId(), JSONArray.toJSONString(list)).subscribe();
//通知Redis让Redis进行消费
HashMap<String, Object> map = new HashMap<>();
map.put("task_type", "qwen");
map.put("text", rest.getText());
return httpTY(map);
} catch (Exception e) {
log.error("调用异常:");
return Mono.just(new QWenReplyRequest());
} finally {
reactiveStringRedisTemplate.delete(RedisConstans.TY_QUEUE_LIST + rest.getOnlyId()).subscribe();
}
});
});
}
public static Mono<QWenReplyRequest> httpTY(HashMap<String, Object> map) {
/**
* 判断传入数据状态是否是已完成,如果是已完成则将其过滤
*/
/**
* 根据Redis中的key和实体类进行消费
* @param key:redis的key
* @param clazz:对应实体类的class
* @return :返回对应的实体类
*/
/**
* 1. 创建一个公共父类,用于记录当前的请求参数中的最大允许条数
* 2. 根据当前条数进行判定是否需要消费,如果不需要消费则进行等待
* 3. 如果需要消费,则进行消费,并且将当前的条数进行减一
*/
public Mono<QueuePackagingDTO> redisConsumption(String key, QueuePackagingDTO queue){
return reactiveStringRedisTemplate.opsForValue().get(key)
.flatMap(s -> {
List<QueuePackagingDTO> ts = JSONArray.parseArray(s, QueuePackagingDTO.class);
if (CollectionUtils.isEmpty(ts)) {
return Mono.just(new QueuePackagingDTO<>());
}
//获取ts中status为1的值并且判断当前为1的值是否到达了5个如果达到了就休眠五秒后在进行消费
List<QueuePackagingDTO> collect = ts.stream().filter(t -> t.getStatus() == 1).collect(Collectors.toList());
if (collect.size() >= queue.getMaxLength()) {
// 休眠一段时间后重试
return Mono.delay(Duration.ofMillis(queue.getSleepTime()))
.then(Mono.defer(() -> redisConsumption(key, queue)));
}
return Mono.just(collect.stream().findFirst().orElse(new QueuePackagingDTO<>()));
});
}
public static Mono<QWenReplyResponse> httpTY(HashMap<String, Object> map) {
return WebClientUtils.post("http://192.168.8.211:5010/qg_human/qwen", new JSONObject(map)).flatMap(jsonObject -> {
long time = new Date().getTime();
QGResponse qgResponse = jsonObject.toJavaObject(QGResponse.class);
QWenReplyRequest data = new QWenReplyRequest();
QWenReplyResponse data = new QWenReplyResponse();
log.info("发起TY请求,响应时间时间为:{}", new Date().getTime() - time);
if (qgResponse.getStatusCode() == 0) {
data = JSONObject.parseObject(qgResponse.getData().toString(), QWenReplyRequest.class);
data = JSONObject.parseObject(qgResponse.getData().toString(), QWenReplyResponse.class);
//成功
log.info("通知成功:" + qgResponse.getData());
data.setCode(200);
@ -81,19 +132,8 @@ public class QWenService {
log.info("通知失败:" + map);
data.setCode(500);
return Mono.just(data);
});
}
public static void main(String[] args) {
HashMap<String, Object> map = new HashMap<>();
map.put("task_type", "qwen");
map.put("text", "你好");
try {
httpTY(map);
} catch (Exception e) {
log.info("异常:{}", e);
}
}
}