diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java index eff9bbf..2fa6196 100644 --- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java @@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.ReactiveValueOperations; +import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Mono; import lombok.extern.slf4j.Slf4j; diff --git a/iot-modules/iot-box-websocket-api/pom.xml b/iot-modules/iot-box-websocket-api/pom.xml index 7e7c95b..d9ba428 100644 --- a/iot-modules/iot-box-websocket-api/pom.xml +++ b/iot-modules/iot-box-websocket-api/pom.xml @@ -81,13 +81,6 @@ compile - - - com.github.ben-manes.caffeine - caffeine - ${caffeine.version} - - com.alibaba.nls nls-sdk-common diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java index 01da663..8c1009d 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java @@ -10,6 +10,8 @@ import com.qiuguo.iot.box.websocket.api.domain.BaseSession; import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession; import com.qiuguo.iot.box.websocket.api.domain.box.resp.BoxMessageResp; import com.qiuguo.iot.box.websocket.api.domain.user.UserSession; +import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration; +import com.qiuguo.iot.box.websocket.api.filter.LogWebFilter; import com.qiuguo.iot.box.websocket.api.service.BaseWebSocketService; import com.qiuguo.iot.data.constants.YunxiRabbitConst; import com.qiuguo.iot.data.entity.device.DeviceUserTalkRecordEntity; @@ -29,6 +31,7 @@ import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import javax.annotation.Resource; import java.util.LinkedList; @@ -84,15 +87,50 @@ public abstract class ActionCommand { }); } - public void sendMessage(BaseSession baseSession, String message, Integer type) {} + private Mono sendMessage(BaseSession baseSession, QueueMessage queue, StringBuilder sb, Integer type){ + if(baseSession.getRequestId().equals(queue.getRequestId())){ - protected void setQueueMessage(BaseSession baseSession, Queue queue, Integer type){ - if(queue == null){ - queue = new LinkedList<>(); + String message = ""; + if(queue.getQueue().size() > 0){ + message = queue.getQueue().poll(); + message = baseWebSocketService.getSendStr(sb, message); + }else if(queue.getStatus() == YesNo.NO.getCode().intValue()){ + if(sb.length() == 0){ + //结束了 + log.info("发送结束了,请求id:{}", queue.getRequestId()); + return Mono.empty(); + } + message = sb.toString(); + message = baseWebSocketService.removeStringChars(message); + sb.setLength(0); + }else{ + try{ + Thread.sleep(50); + }catch(Exception e){ + log.info("等信息信息休息异常{}", e); + } + } + if(StringUtils.isNotEmpty(message)){ + return baseWebSocketService.normalSendMsg(baseSession, message, type, YesNo.NO.getCode()).flatMap(m -> { + return sendMessage(baseSession, queue, sb, type); + }); + } + return sendMessage(baseSession, queue, sb, type); } - StringBuilder sb = new StringBuilder(); - String message = queue.poll(); - baseWebSocketService.sendMoreMsg(baseSession, sb, message, type); + return Mono.empty(); + } + + protected Mono setQueueMessage(BaseSession baseSession, QueueMessage queue, Integer type){ + return Mono.defer(() -> { + MDC.put(LogMdcConfiguration.PRINT_LOG_ID, baseSession.getLogId()); + StringBuilder sb = new StringBuilder(); + return sendMessage(baseSession, queue, sb, type).flatMap(m -> { + MDC.remove(LogMdcConfiguration.PRINT_LOG_ID); + return Mono.empty(); + }); + + }); + } protected Mono toQianWen(Action action, BaseSession baseSession, Integer type){ @@ -107,26 +145,33 @@ public abstract class ActionCommand { tongYiCommunicationRest.setOnlyId(baseSession.getUserId().toString()); } - Queue queue = new LinkedList(); + + QueueMessage queueMessage = new QueueMessage(); + queueMessage.setRequestId(baseSession.getRequestId()); return qwenService.communication(tongYiCommunicationRest, new IQianWen() { @Override public void sendMessage(String message) { //通知到客户端 MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId()); if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) { - queue.add(message); - setQueueMessage(baseSession, queue, type); + queueMessage.getQueue().add(message); + if(queueMessage.getStatus() == YesNo.YES.getCode().intValue()){ + queueMessage.setStatus(2); + setQueueMessage(baseSession, queueMessage, type).subscribeOn(Schedulers.single()).subscribe(); + } return; } - log.info("已经有新的请求,不在推送到客户端SN:{} userId:{}", baseSession.getSn(), baseSession.getUserId()); + log.info("已有新的请求,不推送客户端SN:{} userId:{}", baseSession.getSn(), baseSession.getUserId()); } @Override public void finish() { log.info("千问最后调用finish"); + queueMessage.setStatus(YesNo.NO.getCode()); MDC.remove(Log4Constans.PRINT_LOG_ID); } }).flatMap(data ->{ + queueMessage.setStatus(YesNo.NO.getCode()); if(data.getCode() == 200){ log.info("千问正常结束"); //保存记录 diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/QueueMessage.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/QueueMessage.java new file mode 100644 index 0000000..ba4d47f --- /dev/null +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/QueueMessage.java @@ -0,0 +1,24 @@ +package com.qiuguo.iot.box.websocket.api.command; + +import lombok.Data; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Data +public class QueueMessage { + /** + * 请求id + */ + Long requestId; + /** + * 要发送的消息队列 + */ + Queue queue = new ConcurrentLinkedQueue<>(); + + /** + * 状态 1 开始 2进行中 0 结束 + */ + int status = 1; +} diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/service/BaseWebSocketService.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/service/BaseWebSocketService.java index b0c60df..116fdca 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/service/BaseWebSocketService.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/service/BaseWebSocketService.java @@ -101,7 +101,7 @@ public class BaseWebSocketService { * @param message * @return */ - protected String getSendStr(StringBuilder sb, String message){ + public String getSendStr(StringBuilder sb, String message){ String old = sb.toString() + message; int d = old.lastIndexOf(","); int j = old.lastIndexOf("。"); @@ -112,6 +112,7 @@ public class BaseWebSocketService { int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n); if (m > 0) { //清空 + m++; String msg = old.substring(0, m); if(msg.replace(" ", "").length() > 0){ //纯空格的不推送 @@ -138,16 +139,6 @@ public class BaseWebSocketService { return baseSession.getSession().close(); } - - public void sendMoreMsg(BaseSession baseSession, - StringBuilder sb, - String message, int type){ - message = getSendStr(sb, message); - if(StringUtils.isNotEmpty(message)){ - normalSendMsg(baseSession, message, type, YesNo.NO.getCode()); - } - } - public void sendMsg(BaseSession baseSession, String msg) { log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg); baseSession.getSink().next(baseSession.getSession().textMessage(msg)); @@ -201,16 +192,16 @@ public class BaseWebSocketService { sendMsg(baseSession, resp); } - public void normalSendMsg(BaseSession baseSession, String message, Integer type, Integer finish){ + public Mono normalSendMsg(BaseSession baseSession, String message, Integer type, Integer finish){ BoxMessageResp resp = new BoxMessageResp(); resp.setType(type); resp.setText(message); resp.getTts().setStatus(finish); - sendMsg(baseSession, resp); + return sendMsgWithMono(baseSession, resp); } - private String removeStringChars(String text){ + public String removeStringChars(String text){ text = text.replace("\n", "").replace("\t", ""); if(text.startsWith(",") || text.startsWith("。") || @@ -236,6 +227,50 @@ public class BaseWebSocketService { return text; } + public Mono sendMsgWithMono(BaseSession baseSession, String msg) { + return Mono.defer(() -> { + log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg); + baseSession.getSink().next(baseSession.getSession().textMessage(msg)); + return Mono.just(true); + }); + + } + + public Mono sendMsgWithMono(BaseSession baseSession, BaseMessageResp baseMessageResp) { + if(baseSession instanceof BoxSession){ + log.info("果box聊天记录,同步到客户端"); + BaseSession userSession = getUserSessionWithUserId(baseSession.getUserId()); + if(userSession != null){ + sendMsg(userSession, baseMessageResp); + } + if(suanfa){ + String text = removeStringChars(baseMessageResp.getText()); + + if(text.length() > ONE_MAX_TEXT){ + StringBuilder builder = new StringBuilder(); + return sendAudioMessage(baseSession, + baseMessageResp, + builder, + text, + 0, + text.length() - 1, + baseSession.getRequestId()).flatMap(s -> { + return Mono.just(true); + }); + }else{ + BoxMessageResp boxMessageResp = new BoxMessageResp(); + BeanUtils.copyProperties(baseMessageResp, boxMessageResp); + boxMessageResp.setText(text); + return sendAudioMessage(baseSession, boxMessageResp).flatMap(s -> { + return Mono.just(true); + }); + } + + } + } + return sendMsgWithMono(baseSession, JSONObject.toJSONString(baseMessageResp)); + } + public void sendMsg(BaseSession baseSession, BaseMessageResp baseMessageResp) { if(baseSession instanceof BoxSession){ log.info("果box聊天记录,同步到客户端"); @@ -326,7 +361,7 @@ public class BaseWebSocketService { sendMsg(baseSession, JSONObject.toJSONString(boxMessageResp)); return Mono.just(""); } - return audioService.getAudioUrl(boxMessageResp.getText() + "。").map(s ->{ + return audioService.getAudioUrl(boxMessageResp.getText()).map(s ->{ boxMessageResp.setAudio(s); sendMsg(baseSession, JSONObject.toJSONString(boxMessageResp)); diff --git a/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml b/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml index 2d74340..be48026 100644 --- a/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml +++ b/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml @@ -46,7 +46,7 @@ qiuguo: checktoken: url: https://qiuguo-app.pre.qiuguojihua.com/pre-api/user/user/getUser tts: - #suanfa: true #nacos控制变化 + suanfa: true #nacos控制变化 url: http://192.168.8.211:18000/run/predict #算法语音合成 lac: #type: suanfa #nacos控制变化 diff --git a/iot-modules/pom.xml b/iot-modules/pom.xml index 6ff50c5..360af0b 100644 --- a/iot-modules/pom.xml +++ b/iot-modules/pom.xml @@ -104,6 +104,13 @@ spring-webmvc + + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + +