完成算法语音合成调用队列是调用

This commit is contained in:
wulin 2023-10-26 13:47:35 +08:00
parent 58608ab19b
commit e9cc91e87b
7 changed files with 139 additions and 34 deletions

View File

@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.ReactiveValueOperations;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import lombok.extern.slf4j.Slf4j;

View File

@ -81,13 +81,6 @@
<scope>compile</scope>
</dependency>
<!-- nacos 动态配置用到,注意版本号 -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.nls</groupId>
<artifactId>nls-sdk-common</artifactId>

View File

@ -10,6 +10,8 @@ import com.qiuguo.iot.box.websocket.api.domain.BaseSession;
import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession;
import com.qiuguo.iot.box.websocket.api.domain.box.resp.BoxMessageResp;
import com.qiuguo.iot.box.websocket.api.domain.user.UserSession;
import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration;
import com.qiuguo.iot.box.websocket.api.filter.LogWebFilter;
import com.qiuguo.iot.box.websocket.api.service.BaseWebSocketService;
import com.qiuguo.iot.data.constants.YunxiRabbitConst;
import com.qiuguo.iot.data.entity.device.DeviceUserTalkRecordEntity;
@ -29,6 +31,7 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import javax.annotation.Resource;
import java.util.LinkedList;
@ -84,15 +87,50 @@ public abstract class ActionCommand {
});
}
public void sendMessage(BaseSession baseSession, String message, Integer type) {}
private Mono<Boolean> sendMessage(BaseSession baseSession, QueueMessage queue, StringBuilder sb, Integer type){
if(baseSession.getRequestId().equals(queue.getRequestId())){
protected void setQueueMessage(BaseSession baseSession, Queue<String> queue, Integer type){
if(queue == null){
queue = new LinkedList<>();
String message = "";
if(queue.getQueue().size() > 0){
message = queue.getQueue().poll();
message = baseWebSocketService.getSendStr(sb, message);
}else if(queue.getStatus() == YesNo.NO.getCode().intValue()){
if(sb.length() == 0){
//结束了
log.info("发送结束了请求id{}", queue.getRequestId());
return Mono.empty();
}
message = sb.toString();
message = baseWebSocketService.removeStringChars(message);
sb.setLength(0);
}else{
try{
Thread.sleep(50);
}catch(Exception e){
log.info("等信息信息休息异常{}", e);
}
}
if(StringUtils.isNotEmpty(message)){
return baseWebSocketService.normalSendMsg(baseSession, message, type, YesNo.NO.getCode()).flatMap(m -> {
return sendMessage(baseSession, queue, sb, type);
});
}
return sendMessage(baseSession, queue, sb, type);
}
StringBuilder sb = new StringBuilder();
String message = queue.poll();
baseWebSocketService.sendMoreMsg(baseSession, sb, message, type);
return Mono.empty();
}
protected Mono<Void> setQueueMessage(BaseSession baseSession, QueueMessage queue, Integer type){
return Mono.defer(() -> {
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, baseSession.getLogId());
StringBuilder sb = new StringBuilder();
return sendMessage(baseSession, queue, sb, type).flatMap(m -> {
MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
return Mono.empty();
});
});
}
protected Mono<Void> toQianWen(Action action, BaseSession baseSession, Integer type){
@ -107,26 +145,33 @@ public abstract class ActionCommand {
tongYiCommunicationRest.setOnlyId(baseSession.getUserId().toString());
}
Queue<String> queue = new LinkedList<String>();
QueueMessage queueMessage = new QueueMessage();
queueMessage.setRequestId(baseSession.getRequestId());
return qwenService.communication(tongYiCommunicationRest, new IQianWen<String>() {
@Override
public void sendMessage(String message) {
//通知到客户端
MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId());
if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) {
queue.add(message);
setQueueMessage(baseSession, queue, type);
queueMessage.getQueue().add(message);
if(queueMessage.getStatus() == YesNo.YES.getCode().intValue()){
queueMessage.setStatus(2);
setQueueMessage(baseSession, queueMessage, type).subscribeOn(Schedulers.single()).subscribe();
}
return;
}
log.info("已经有新的请求不在推送到客户端SN{} userId:{}", baseSession.getSn(), baseSession.getUserId());
log.info("有新的请求不推送客户端SN{} userId:{}", baseSession.getSn(), baseSession.getUserId());
}
@Override
public void finish() {
log.info("千问最后调用finish");
queueMessage.setStatus(YesNo.NO.getCode());
MDC.remove(Log4Constans.PRINT_LOG_ID);
}
}).flatMap(data ->{
queueMessage.setStatus(YesNo.NO.getCode());
if(data.getCode() == 200){
log.info("千问正常结束");
//保存记录

View File

@ -0,0 +1,24 @@
package com.qiuguo.iot.box.websocket.api.command;
import lombok.Data;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Data
public class QueueMessage {
/**
* 请求id
*/
Long requestId;
/**
* 要发送的消息队列
*/
Queue<String> queue = new ConcurrentLinkedQueue<>();
/**
* 状态 1 开始 2进行中 0 结束
*/
int status = 1;
}

View File

@ -101,7 +101,7 @@ public class BaseWebSocketService {
* @param message
* @return
*/
protected String getSendStr(StringBuilder sb, String message){
public String getSendStr(StringBuilder sb, String message){
String old = sb.toString() + message;
int d = old.lastIndexOf("");
int j = old.lastIndexOf("");
@ -112,6 +112,7 @@ public class BaseWebSocketService {
int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n);
if (m > 0) {
//清空
m++;
String msg = old.substring(0, m);
if(msg.replace(" ", "").length() > 0){
//纯空格的不推送
@ -138,16 +139,6 @@ public class BaseWebSocketService {
return baseSession.getSession().close();
}
public void sendMoreMsg(BaseSession baseSession,
StringBuilder sb,
String message, int type){
message = getSendStr(sb, message);
if(StringUtils.isNotEmpty(message)){
normalSendMsg(baseSession, message, type, YesNo.NO.getCode());
}
}
public void sendMsg(BaseSession baseSession, String msg) {
log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg);
baseSession.getSink().next(baseSession.getSession().textMessage(msg));
@ -201,16 +192,16 @@ public class BaseWebSocketService {
sendMsg(baseSession, resp);
}
public void normalSendMsg(BaseSession baseSession, String message, Integer type, Integer finish){
public Mono<Boolean> normalSendMsg(BaseSession baseSession, String message, Integer type, Integer finish){
BoxMessageResp resp = new BoxMessageResp();
resp.setType(type);
resp.setText(message);
resp.getTts().setStatus(finish);
sendMsg(baseSession, resp);
return sendMsgWithMono(baseSession, resp);
}
private String removeStringChars(String text){
public String removeStringChars(String text){
text = text.replace("\n", "").replace("\t", "");
if(text.startsWith("") ||
text.startsWith("") ||
@ -236,6 +227,50 @@ public class BaseWebSocketService {
return text;
}
public Mono<Boolean> sendMsgWithMono(BaseSession baseSession, String msg) {
return Mono.defer(() -> {
log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg);
baseSession.getSink().next(baseSession.getSession().textMessage(msg));
return Mono.just(true);
});
}
public Mono<Boolean> sendMsgWithMono(BaseSession baseSession, BaseMessageResp baseMessageResp) {
if(baseSession instanceof BoxSession){
log.info("果box聊天记录同步到客户端");
BaseSession userSession = getUserSessionWithUserId(baseSession.getUserId());
if(userSession != null){
sendMsg(userSession, baseMessageResp);
}
if(suanfa){
String text = removeStringChars(baseMessageResp.getText());
if(text.length() > ONE_MAX_TEXT){
StringBuilder builder = new StringBuilder();
return sendAudioMessage(baseSession,
baseMessageResp,
builder,
text,
0,
text.length() - 1,
baseSession.getRequestId()).flatMap(s -> {
return Mono.just(true);
});
}else{
BoxMessageResp boxMessageResp = new BoxMessageResp();
BeanUtils.copyProperties(baseMessageResp, boxMessageResp);
boxMessageResp.setText(text);
return sendAudioMessage(baseSession, boxMessageResp).flatMap(s -> {
return Mono.just(true);
});
}
}
}
return sendMsgWithMono(baseSession, JSONObject.toJSONString(baseMessageResp));
}
public void sendMsg(BaseSession baseSession, BaseMessageResp baseMessageResp) {
if(baseSession instanceof BoxSession){
log.info("果box聊天记录同步到客户端");
@ -326,7 +361,7 @@ public class BaseWebSocketService {
sendMsg(baseSession, JSONObject.toJSONString(boxMessageResp));
return Mono.just("");
}
return audioService.getAudioUrl(boxMessageResp.getText() + "").map(s ->{
return audioService.getAudioUrl(boxMessageResp.getText()).map(s ->{
boxMessageResp.setAudio(s);
sendMsg(baseSession, JSONObject.toJSONString(boxMessageResp));

View File

@ -46,7 +46,7 @@ qiuguo:
checktoken:
url: https://qiuguo-app.pre.qiuguojihua.com/pre-api/user/user/getUser
tts:
#suanfa: true #nacos控制变化
suanfa: true #nacos控制变化
url: http://192.168.8.211:18000/run/predict #算法语音合成
lac:
#type: suanfa #nacos控制变化

View File

@ -104,6 +104,13 @@
<artifactId>spring-webmvc</artifactId>
</dependency>
<!-- nacos 动态配置用到,注意版本号 -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
</dependencies>
</project>