From 037a57ea28fffedbecbc2dc6b4dfa582b0ce6459 Mon Sep 17 00:00:00 2001 From: wulin Date: Tue, 21 Nov 2023 01:45:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=98=9F=E5=B0=98=E4=BD=BF=E7=94=A8redis?= =?UTF-8?q?=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../qiuguo/iot/third/nlp/AliYunXingChen.java | 100 ++++++++++++------ .../third/service/TongYiXinChenService.java | 5 +- 2 files changed, 74 insertions(+), 31 deletions(-) diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunXingChen.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunXingChen.java index f0c877d..e42b94d 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunXingChen.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunXingChen.java @@ -6,17 +6,24 @@ import com.alibaba.dashscope.aigc.generation.models.QwenParam; import com.alibaba.dashscope.common.MessageManager; import com.alibaba.dashscope.common.ResultCallback; import com.alibaba.dashscope.common.Role; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; import com.alibaba.xingchen.ApiClient; import com.alibaba.xingchen.api.ChatApiSub; import com.alibaba.xingchen.auth.HttpBearerAuth; import com.alibaba.xingchen.model.*; +import com.qiuguo.iot.base.constans.RedisConstans; +import com.qiuguo.iot.base.utils.StringUtils; import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse; import com.qiuguo.iot.third.service.IQianWen; import io.reactivex.Flowable; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import reactor.core.publisher.Mono; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Semaphore; @@ -39,6 +46,7 @@ public class AliYunXingChen { ChatReqParams chatReqParams; String userName; + String userId; public AliYunXingChen(String basePath, String key, String roleId, String userId, String userName) { api = new ChatApiSub(); ApiClient apiClient = new ApiClient(); @@ -51,6 +59,7 @@ public class AliYunXingChen { authorization.setBearerToken(key); api.setApiClient(apiClient); this.userName = userName; + this.userId = userId; chatReqParams = ChatReqParams.builder() .botProfile( CharacterKey.builder() @@ -74,39 +83,70 @@ public class AliYunXingChen { } public Mono sendMessage(String msg, IQianWen onNext, - QWenReplyResponse qwenReplyResponse){ + QWenReplyResponse qwenReplyResponse, + ReactiveStringRedisTemplate reactiveStringRedisTemplate){ log.info("调用通义星尘回答:{}", msg); - Message message = Message.builder().name(userName).role("user").content(msg).build(); - addMessage(message); - chatReqParams.setMessages(messages); - qwenReplyResponse.setCode(200); - return Mono.defer(() -> { - try { - Flowable response = api.streamOut(chatReqParams); - RecordMessage recordMessage = new RecordMessage(); - response.blockingForEach(m -> { - String v = m.getChoices().get(0).getMessages().get(0).getContent().replaceAll(recordMessage.getMsg(), ""); - onNext.sendMessage(v); - recordMessage.setMsg(m.getChoices().get(0).getMessages().get(0).getContent()); - if("stop".equals(m.getChoices().get(0).getStopReason())){ - // - - Message message1 = Message.builder() - .role(m.getChoices().get(0).getMessages().get(0).getRole()) - .content(m.getChoices().get(0).getMessages().get(0).getContent()) - .build(); - qwenReplyResponse.setResut(message1.getContent()); - addMessage(message1); - onNext.finish(); + return reactiveStringRedisTemplate.opsForValue().get(RedisConstans.TONGYI_TALK_CONTENT + userId) + .defaultIfEmpty("") + .map(s -> { + try{ + if(StringUtils.isNotEmpty(s)){ + log.info("缓存不为空"); + JSONArray jsonArray = JSON.parseArray(s); + messages.clear(); + int i = 0; + if(jsonArray.size() > 49){ + i = 1; + } + for(; i < jsonArray.size(); ++i){ + Message m = JSONObject.toJavaObject(jsonArray.getJSONObject(i), Message.class); + messages.add(m); + } + } + }catch (Exception e){ + log.info("聊天缓存转换异常{}", e); + } + finally { + Message message = Message.builder().name(userName).role("user").content(msg).build(); + messages.add(message); + return messages; } - }); - }catch (Exception e){ - log.info("调用星尘异常{}", e); - qwenReplyResponse.setCode(500); - } - return Mono.just(true); - }); + }).flatMap(msgs ->{ + chatReqParams.setMessages(messages); + qwenReplyResponse.setCode(200); + try { + Flowable response = api.streamOut(chatReqParams); + RecordMessage recordMessage = new RecordMessage(); + response.blockingForEach(m -> { + String v = m.getChoices().get(0).getMessages().get(0).getContent().replaceAll(recordMessage.getMsg(), ""); + onNext.sendMessage(v); + recordMessage.setMsg(m.getChoices().get(0).getMessages().get(0).getContent()); + if("stop".equals(m.getChoices().get(0).getStopReason())){ + // + + Message message1 = Message.builder() + .role(m.getChoices().get(0).getMessages().get(0).getRole()) + .content(m.getChoices().get(0).getMessages().get(0).getContent()) + .build(); + qwenReplyResponse.setResut(message1.getContent()); + addMessage(message1); + onNext.finish(); + reactiveStringRedisTemplate.opsForValue().set(RedisConstans.TONGYI_TALK_CONTENT + userId, JSONObject.toJSONString(messages), Duration.ofDays(1L)) + .map(b -> { + log.info("保存聊天缓存状态{}", b); + return b; + }).subscribe(); + + } + }); + return Mono.just(true); + }catch (Exception e){ + log.info("调用星尘异常{}", e); + qwenReplyResponse.setCode(500); + } + return Mono.just(false); + }); } private void addMessage(Message msg){ diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/TongYiXinChenService.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/TongYiXinChenService.java index 1074832..3c8753e 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/TongYiXinChenService.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/TongYiXinChenService.java @@ -32,6 +32,9 @@ import java.util.stream.Collectors; @Service public class TongYiXinChenService implements ITongYi{ + @Resource + ReactiveStringRedisTemplate reactiveStringRedisTemplate; + protected static ConcurrentHashMap qianwenGroup = new ConcurrentHashMap<>(); @Override @@ -47,7 +50,7 @@ public class TongYiXinChenService implements ITongYi{ } QWenReplyResponse qWenReplyResponse = new QWenReplyResponse(); - return aliXingChen.sendMessage(rest.getText(), onNext, qWenReplyResponse).flatMap(b -> { + return aliXingChen.sendMessage(rest.getText(), onNext, qWenReplyResponse, reactiveStringRedisTemplate).flatMap(b -> { return Mono.just(qWenReplyResponse); });