From cc208363d6a5e047b9cab33bda1dcb74597c8d1f Mon Sep 17 00:00:00 2001 From: wulin Date: Tue, 17 Oct 2023 19:44:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=96=87=E6=9C=AC=E6=9C=80?= =?UTF-8?q?=E5=90=8E=E4=B8=80=E4=B8=AA=E6=A0=87=E8=AF=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../qiuguo/iot/third/nlp/AliYunQianWen.java | 11 ++-- .../qiuguo/iot/third/service/IQianWen.java | 7 ++ .../qiuguo/iot/third/service/QWenService.java | 2 +- .../websocket/api/domain/BaseMessageResp.java | 5 ++ .../iot/box/websocket/api/domain/TTSResp.java | 11 ++++ .../api/handler/BaseWebSocketProcess.java | 66 ++++++++++++------- 6 files changed, 70 insertions(+), 32 deletions(-) create mode 100644 iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/IQianWen.java create mode 100644 iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/domain/TTSResp.java diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunQianWen.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunQianWen.java index 0d35635..8318004 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunQianWen.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunQianWen.java @@ -11,6 +11,7 @@ import com.alibaba.dashscope.common.Role; import com.alibaba.dashscope.exception.InputRequiredException; import com.alibaba.dashscope.exception.NoApiKeyException; import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse; +import com.qiuguo.iot.third.service.IQianWen; import io.reactivex.functions.Consumer; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -56,7 +57,7 @@ public class AliYunQianWen { public Mono sendMessage(String msg, - Consumer onNext, + IQianWen onNext, QWenReplyResponse qwenReplyResponse) { if(!canAsk){ msgManager = new MessageManager(10); @@ -82,7 +83,7 @@ public class AliYunQianWen { public void onEvent(GenerationResult message) { try { - onNext.accept(message.getOutput().getChoices().get(0).getMessage().getContent()); + onNext.sendMessage(message.getOutput().getChoices().get(0).getMessage().getContent()); if(lastGenerationResult != null) { lastGenerationResult.getOutput().getChoices().get(0).getMessage().setContent( lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent() @@ -106,11 +107,7 @@ public class AliYunQianWen { if(lastGenerationResult != null){ qwenReplyResponse.setResut(lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent()); } - try { - onNext.accept("。"); - } catch (Exception e) { - log.info("千问最后调用结束时异常{}", e); - } + onNext.finish(); lastGenerationResult = null; semaphore.release(); } diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/IQianWen.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/IQianWen.java new file mode 100644 index 0000000..e6fcf40 --- /dev/null +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/IQianWen.java @@ -0,0 +1,7 @@ +package com.qiuguo.iot.third.service; + +public interface IQianWen { + void sendMessage(T var); + + void finish(); +} diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java index c2f3f5e..c1f5a0a 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java @@ -37,7 +37,7 @@ public class QWenService { protected static ConcurrentHashMap qianwenGroup = new ConcurrentHashMap<>(); - public Mono communication(TongYiCommunicationRest rest, Consumer onNext){ + public Mono communication(TongYiCommunicationRest rest, IQianWen onNext){ AliYunQianWen aliQianWen = null; if (!qianwenGroup.containsKey(rest.getOnlyId())) { aliQianWen = new AliYunQianWen(SpringUtil.getProperty("Ali.qianwen")); diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/domain/BaseMessageResp.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/domain/BaseMessageResp.java index fd5a538..7703a68 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/domain/BaseMessageResp.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/domain/BaseMessageResp.java @@ -19,4 +19,9 @@ public class BaseMessageResp { * 在线播放音乐信息 */ protected MusicResp music; + + /** + * 文本推送情况 + */ + protected TTSResp tts = new TTSResp(); } diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/domain/TTSResp.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/domain/TTSResp.java new file mode 100644 index 0000000..6199b8b --- /dev/null +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/domain/TTSResp.java @@ -0,0 +1,11 @@ +package com.qiuguo.iot.box.websocket.api.domain; + +import lombok.Data; + +@Data +public class TTSResp { + /** + * 1标识最后一个文本,0标识播放中 + */ + Integer status = 1; +} diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java index 6d9440b..2617ab4 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java @@ -146,32 +146,41 @@ public class BaseWebSocketProcess { tongYiCommunicationRest.setOnlyId(baseSession.getUserId().toString()); } StringBuilder sb = new StringBuilder(); - return qwenService.communication(tongYiCommunicationRest, message ->{ - //通知到客户端 - MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId()); - if(tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())){ - String old = sb.toString() + message; - int d = old.lastIndexOf(","); - int j = old.lastIndexOf("。"); - int a = old.lastIndexOf(":"); - int b = old.lastIndexOf("\n"); - int c = old.lastIndexOf(";"); - int n = old.lastIndexOf("\\n"); - int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n); - if(m > 0){ - //清空 - sb.setLength(0); - sb.append(old.substring(m)); - old = old.substring(0, m); - normalSendMsg(baseSession, old, type); - }else{ - sb.append(message); - } + return qwenService.communication(tongYiCommunicationRest, new IQianWen() { + @Override + public void sendMessage(String message) { + //通知到客户端 + MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId()); + if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) { + String old = sb.toString() + message; + int d = old.lastIndexOf(","); + int j = old.lastIndexOf("。"); + int a = old.lastIndexOf(":"); + int b = old.lastIndexOf("\n"); + int c = old.lastIndexOf(";"); + int n = old.lastIndexOf("\\n"); + int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n); + if (m > 0) { + //清空 + sb.setLength(0); + sb.append(old.substring(m)); + old = old.substring(0, m); + normalSendMsg(baseSession, old, type, YesNo.NO.getCode()); + } else { + sb.append(message); + } - return; + return; + } + log.info("已经有新的请求,不在推送到客户端SN:{} userId:{}", baseSession.getSn(), baseSession.getUserId()); + MDC.remove(Log4Constans.PRINT_LOG_ID); + } + + @Override + public void finish() { + log.info("千问最后调用finish"); + normalSendMsg(baseSession, sb.toString(), type); } - log.info("已经有新的请求,不在推送到客户端SN:{} userId:{}", baseSession.getSn(), baseSession.getUserId()); - MDC.remove(Log4Constans.PRINT_LOG_ID); }).flatMap(data ->{ if(data.getCode() == 200){ log.info("千问正常结束"); @@ -686,6 +695,15 @@ public class BaseWebSocketProcess { sendMsg(baseSession, msg); } + protected void normalSendMsg(BaseSession baseSession, String message, Integer type, Integer finish){ + BoxMessageResp resp = new BoxMessageResp(); + resp.setType(type); + resp.setText(message); + resp.getTts().setStatus(finish); + String msg = JSONObject.toJSONString(resp); + sendMsg(baseSession, msg); + } + private void sendMsg(BaseSession baseSession, String msg) { log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg); baseSession.getSink().next(baseSession.getSession().textMessage(msg));