diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java
index eff9bbf..2fa6196 100644
--- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java
+++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java
@@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.ReactiveValueOperations;
+import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import lombok.extern.slf4j.Slf4j;
diff --git a/iot-modules/iot-box-websocket-api/pom.xml b/iot-modules/iot-box-websocket-api/pom.xml
index 7e7c95b..d9ba428 100644
--- a/iot-modules/iot-box-websocket-api/pom.xml
+++ b/iot-modules/iot-box-websocket-api/pom.xml
@@ -81,13 +81,6 @@
compile
-
-
- com.github.ben-manes.caffeine
- caffeine
- ${caffeine.version}
-
-
com.alibaba.nls
nls-sdk-common
diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java
index 01da663..8c1009d 100644
--- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java
+++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java
@@ -10,6 +10,8 @@ import com.qiuguo.iot.box.websocket.api.domain.BaseSession;
import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession;
import com.qiuguo.iot.box.websocket.api.domain.box.resp.BoxMessageResp;
import com.qiuguo.iot.box.websocket.api.domain.user.UserSession;
+import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration;
+import com.qiuguo.iot.box.websocket.api.filter.LogWebFilter;
import com.qiuguo.iot.box.websocket.api.service.BaseWebSocketService;
import com.qiuguo.iot.data.constants.YunxiRabbitConst;
import com.qiuguo.iot.data.entity.device.DeviceUserTalkRecordEntity;
@@ -29,6 +31,7 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
import javax.annotation.Resource;
import java.util.LinkedList;
@@ -84,15 +87,50 @@ public abstract class ActionCommand {
});
}
- public void sendMessage(BaseSession baseSession, String message, Integer type) {}
+ private Mono sendMessage(BaseSession baseSession, QueueMessage queue, StringBuilder sb, Integer type){
+ if(baseSession.getRequestId().equals(queue.getRequestId())){
- protected void setQueueMessage(BaseSession baseSession, Queue queue, Integer type){
- if(queue == null){
- queue = new LinkedList<>();
+ String message = "";
+ if(queue.getQueue().size() > 0){
+ message = queue.getQueue().poll();
+ message = baseWebSocketService.getSendStr(sb, message);
+ }else if(queue.getStatus() == YesNo.NO.getCode().intValue()){
+ if(sb.length() == 0){
+ //结束了
+ log.info("发送结束了,请求id:{}", queue.getRequestId());
+ return Mono.empty();
+ }
+ message = sb.toString();
+ message = baseWebSocketService.removeStringChars(message);
+ sb.setLength(0);
+ }else{
+ try{
+ Thread.sleep(50);
+ }catch(Exception e){
+ log.info("等信息信息休息异常{}", e);
+ }
+ }
+ if(StringUtils.isNotEmpty(message)){
+ return baseWebSocketService.normalSendMsg(baseSession, message, type, YesNo.NO.getCode()).flatMap(m -> {
+ return sendMessage(baseSession, queue, sb, type);
+ });
+ }
+ return sendMessage(baseSession, queue, sb, type);
}
- StringBuilder sb = new StringBuilder();
- String message = queue.poll();
- baseWebSocketService.sendMoreMsg(baseSession, sb, message, type);
+ return Mono.empty();
+ }
+
+ protected Mono setQueueMessage(BaseSession baseSession, QueueMessage queue, Integer type){
+ return Mono.defer(() -> {
+ MDC.put(LogMdcConfiguration.PRINT_LOG_ID, baseSession.getLogId());
+ StringBuilder sb = new StringBuilder();
+ return sendMessage(baseSession, queue, sb, type).flatMap(m -> {
+ MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
+ return Mono.empty();
+ });
+
+ });
+
}
protected Mono toQianWen(Action action, BaseSession baseSession, Integer type){
@@ -107,26 +145,33 @@ public abstract class ActionCommand {
tongYiCommunicationRest.setOnlyId(baseSession.getUserId().toString());
}
- Queue queue = new LinkedList();
+
+ QueueMessage queueMessage = new QueueMessage();
+ queueMessage.setRequestId(baseSession.getRequestId());
return qwenService.communication(tongYiCommunicationRest, new IQianWen() {
@Override
public void sendMessage(String message) {
//通知到客户端
MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId());
if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) {
- queue.add(message);
- setQueueMessage(baseSession, queue, type);
+ queueMessage.getQueue().add(message);
+ if(queueMessage.getStatus() == YesNo.YES.getCode().intValue()){
+ queueMessage.setStatus(2);
+ setQueueMessage(baseSession, queueMessage, type).subscribeOn(Schedulers.single()).subscribe();
+ }
return;
}
- log.info("已经有新的请求,不在推送到客户端SN:{} userId:{}", baseSession.getSn(), baseSession.getUserId());
+ log.info("已有新的请求,不推送客户端SN:{} userId:{}", baseSession.getSn(), baseSession.getUserId());
}
@Override
public void finish() {
log.info("千问最后调用finish");
+ queueMessage.setStatus(YesNo.NO.getCode());
MDC.remove(Log4Constans.PRINT_LOG_ID);
}
}).flatMap(data ->{
+ queueMessage.setStatus(YesNo.NO.getCode());
if(data.getCode() == 200){
log.info("千问正常结束");
//保存记录
diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/QueueMessage.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/QueueMessage.java
new file mode 100644
index 0000000..ba4d47f
--- /dev/null
+++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/QueueMessage.java
@@ -0,0 +1,24 @@
+package com.qiuguo.iot.box.websocket.api.command;
+
+import lombok.Data;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+@Data
+public class QueueMessage {
+ /**
+ * 请求id
+ */
+ Long requestId;
+ /**
+ * 要发送的消息队列
+ */
+ Queue queue = new ConcurrentLinkedQueue<>();
+
+ /**
+ * 状态 1 开始 2进行中 0 结束
+ */
+ int status = 1;
+}
diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/service/BaseWebSocketService.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/service/BaseWebSocketService.java
index b0c60df..116fdca 100644
--- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/service/BaseWebSocketService.java
+++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/service/BaseWebSocketService.java
@@ -101,7 +101,7 @@ public class BaseWebSocketService {
* @param message
* @return
*/
- protected String getSendStr(StringBuilder sb, String message){
+ public String getSendStr(StringBuilder sb, String message){
String old = sb.toString() + message;
int d = old.lastIndexOf(",");
int j = old.lastIndexOf("。");
@@ -112,6 +112,7 @@ public class BaseWebSocketService {
int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n);
if (m > 0) {
//清空
+ m++;
String msg = old.substring(0, m);
if(msg.replace(" ", "").length() > 0){
//纯空格的不推送
@@ -138,16 +139,6 @@ public class BaseWebSocketService {
return baseSession.getSession().close();
}
-
- public void sendMoreMsg(BaseSession baseSession,
- StringBuilder sb,
- String message, int type){
- message = getSendStr(sb, message);
- if(StringUtils.isNotEmpty(message)){
- normalSendMsg(baseSession, message, type, YesNo.NO.getCode());
- }
- }
-
public void sendMsg(BaseSession baseSession, String msg) {
log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg);
baseSession.getSink().next(baseSession.getSession().textMessage(msg));
@@ -201,16 +192,16 @@ public class BaseWebSocketService {
sendMsg(baseSession, resp);
}
- public void normalSendMsg(BaseSession baseSession, String message, Integer type, Integer finish){
+ public Mono normalSendMsg(BaseSession baseSession, String message, Integer type, Integer finish){
BoxMessageResp resp = new BoxMessageResp();
resp.setType(type);
resp.setText(message);
resp.getTts().setStatus(finish);
- sendMsg(baseSession, resp);
+ return sendMsgWithMono(baseSession, resp);
}
- private String removeStringChars(String text){
+ public String removeStringChars(String text){
text = text.replace("\n", "").replace("\t", "");
if(text.startsWith(",") ||
text.startsWith("。") ||
@@ -236,6 +227,50 @@ public class BaseWebSocketService {
return text;
}
+ public Mono sendMsgWithMono(BaseSession baseSession, String msg) {
+ return Mono.defer(() -> {
+ log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg);
+ baseSession.getSink().next(baseSession.getSession().textMessage(msg));
+ return Mono.just(true);
+ });
+
+ }
+
+ public Mono sendMsgWithMono(BaseSession baseSession, BaseMessageResp baseMessageResp) {
+ if(baseSession instanceof BoxSession){
+ log.info("果box聊天记录,同步到客户端");
+ BaseSession userSession = getUserSessionWithUserId(baseSession.getUserId());
+ if(userSession != null){
+ sendMsg(userSession, baseMessageResp);
+ }
+ if(suanfa){
+ String text = removeStringChars(baseMessageResp.getText());
+
+ if(text.length() > ONE_MAX_TEXT){
+ StringBuilder builder = new StringBuilder();
+ return sendAudioMessage(baseSession,
+ baseMessageResp,
+ builder,
+ text,
+ 0,
+ text.length() - 1,
+ baseSession.getRequestId()).flatMap(s -> {
+ return Mono.just(true);
+ });
+ }else{
+ BoxMessageResp boxMessageResp = new BoxMessageResp();
+ BeanUtils.copyProperties(baseMessageResp, boxMessageResp);
+ boxMessageResp.setText(text);
+ return sendAudioMessage(baseSession, boxMessageResp).flatMap(s -> {
+ return Mono.just(true);
+ });
+ }
+
+ }
+ }
+ return sendMsgWithMono(baseSession, JSONObject.toJSONString(baseMessageResp));
+ }
+
public void sendMsg(BaseSession baseSession, BaseMessageResp baseMessageResp) {
if(baseSession instanceof BoxSession){
log.info("果box聊天记录,同步到客户端");
@@ -326,7 +361,7 @@ public class BaseWebSocketService {
sendMsg(baseSession, JSONObject.toJSONString(boxMessageResp));
return Mono.just("");
}
- return audioService.getAudioUrl(boxMessageResp.getText() + "。").map(s ->{
+ return audioService.getAudioUrl(boxMessageResp.getText()).map(s ->{
boxMessageResp.setAudio(s);
sendMsg(baseSession, JSONObject.toJSONString(boxMessageResp));
diff --git a/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml b/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml
index 2d74340..be48026 100644
--- a/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml
+++ b/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml
@@ -46,7 +46,7 @@ qiuguo:
checktoken:
url: https://qiuguo-app.pre.qiuguojihua.com/pre-api/user/user/getUser
tts:
- #suanfa: true #nacos控制变化
+ suanfa: true #nacos控制变化
url: http://192.168.8.211:18000/run/predict #算法语音合成
lac:
#type: suanfa #nacos控制变化
diff --git a/iot-modules/pom.xml b/iot-modules/pom.xml
index 6ff50c5..360af0b 100644
--- a/iot-modules/pom.xml
+++ b/iot-modules/pom.xml
@@ -104,6 +104,13 @@
spring-webmvc
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+ ${caffeine.version}
+
+