完成自定义长文本推流的实现

This commit is contained in:
wulin 2023-10-21 17:59:43 +08:00
parent e244641409
commit 2b546f0c05
6 changed files with 150 additions and 57 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -0,0 +1,4 @@
package com.qiuguo.iot.third.nlp;
public class LacWrapper {
}

View File

@ -45,7 +45,7 @@ public class BaseSession {
protected MusicResp music; protected MusicResp music;
/** /**
* 调用千问请求序号每次确定调用前++ * 问题Id每次++
*/ */
protected Long requestId = 0L; protected Long requestId = 0L;
} }

View File

@ -47,6 +47,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -93,6 +94,8 @@ public class BaseWebSocketProcess {
@Value("${tts.suanfa}") @Value("${tts.suanfa}")
boolean ttsSuanfa; boolean ttsSuanfa;
private static int ONE_MAX_TEXT = 30;
@Autowired @Autowired
protected SystemTalkBindU3dService systemTalkBindU3dService; protected SystemTalkBindU3dService systemTalkBindU3dService;
@ -104,10 +107,41 @@ public class BaseWebSocketProcess {
protected static String apiType = "api-type"; protected static String apiType = "api-type";
protected static String apiToken = "api-token"; protected static String apiToken = "api-token";
private String getSendStr(StringBuilder sb, String message){
String old = sb.toString() + message;
int d = old.lastIndexOf("");
int j = old.lastIndexOf("");
int a = old.lastIndexOf("");
int b = old.lastIndexOf("\n");
int c = old.lastIndexOf("");
int n = old.lastIndexOf("\\n");
int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n);
if (m > 0) {
//清空
String msg = old.substring(0, m);
if(msg.replace(" ", "").length() > 0){
//纯空格的不推送
sb.setLength(0);
sb.append(old.substring(m));
return msg;
}
}
sb.append(message);
return null;
}
private void sendMoreMsg(BaseSession baseSession, StringBuilder sb, String message, int type){
message = getSendStr(sb, message);
if(StringUtils.isNotEmpty(message)){
normalSendMsg(baseSession, message, type, YesNo.NO.getCode());
}
}
private Mono<Void> toQianWen(Action action, BaseSession baseSession, Integer type){ private Mono<Void> toQianWen(Action action, BaseSession baseSession, Integer type){
baseSession.setRequestId(baseSession.getRequestId() + 1); //baseSession.setRequestId(baseSession.getRequestId() + 1);
TongYiCommunicationRest tongYiCommunicationRest = new TongYiCommunicationRest(); TongYiCommunicationRest tongYiCommunicationRest = new TongYiCommunicationRest();
tongYiCommunicationRest.setText(action.getAsk()); tongYiCommunicationRest.setText(action.getAsk());
tongYiCommunicationRest.setStatus("2"); tongYiCommunicationRest.setStatus("2");
@ -124,29 +158,8 @@ public class BaseWebSocketProcess {
//通知到客户端 //通知到客户端
MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId()); MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId());
if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) { if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) {
String old = sb.toString() + message; //测试后决定是否需要
int d = old.lastIndexOf(""); sendMoreMsg(baseSession, sb, message, type);
int j = old.lastIndexOf("");
int a = old.lastIndexOf("");
int b = old.lastIndexOf("\n");
int c = old.lastIndexOf("");
int n = old.lastIndexOf("\\n");
int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n);
if (m > 0) {
//清空
String msg = old.substring(0, m);
if(msg.replace(" ", "").length() > 0){
//纯空格的不推送
sb.setLength(0);
sb.append(old.substring(m));
normalSendMsg(baseSession, msg, type, YesNo.NO.getCode());
return;
}
}
sb.append(message);
return; return;
} }
log.info("已经有新的请求不在推送到客户端SN{} userId:{}", baseSession.getSn(), baseSession.getUserId()); log.info("已经有新的请求不在推送到客户端SN{} userId:{}", baseSession.getSn(), baseSession.getUserId());
@ -167,14 +180,7 @@ public class BaseWebSocketProcess {
if(data.getCode() == 200){ if(data.getCode() == 200){
log.info("千问正常结束"); log.info("千问正常结束");
//保存记录 //保存记录
DeviceUserTalkRecordEntity talkRecord = new DeviceUserTalkRecordEntity(); return saveTalkRecord(baseSession, action, data.getResut()).flatMap(i -> {
talkRecord.setAskType(AskTypeEnum.TTS.getCode());
talkRecord.setAskValue(action.getAsk());
talkRecord.setAskKey(action.getAction());
talkRecord.setAnswerValue(data.getResut());
talkRecord.setUserId(baseSession.getUserId());
talkRecord.setDeviceId(baseSession.getDeviceId());
return deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).flatMap(b -> {
return Mono.empty(); return Mono.empty();
}); });
}else{ }else{
@ -185,6 +191,17 @@ public class BaseWebSocketProcess {
})/*.subscribeOn(Schedulers.boundedElastic()).subscribe()*/; })/*.subscribeOn(Schedulers.boundedElastic()).subscribe()*/;
} }
private Mono<Integer> saveTalkRecord(BaseSession baseSession, Action action, String text){
DeviceUserTalkRecordEntity talkRecord = new DeviceUserTalkRecordEntity();
talkRecord.setAskType(AskTypeEnum.TTS.getCode());
talkRecord.setAskValue(action.getAsk());
talkRecord.setAskKey(action.getAction());
talkRecord.setAnswerValue(text);
talkRecord.setUserId(baseSession.getUserId());
talkRecord.setDeviceId(baseSession.getDeviceId());
return deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord);
}
private Mono<Boolean> toU3DMq(Action action, SystemTalkBindDeviceEntity systemTalkBindDeviceEntity, Long metaId){ private Mono<Boolean> toU3DMq(Action action, SystemTalkBindDeviceEntity systemTalkBindDeviceEntity, Long metaId){
U3dMsg u3dMsg = new U3dMsg(); U3dMsg u3dMsg = new U3dMsg();
@ -411,6 +428,7 @@ public class BaseWebSocketProcess {
return sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode()); return sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode());
} }
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.QIU_GUO.getCode())){ }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.QIU_GUO.getCode())){
return sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode()); return sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode());
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.TIME.getCode())){ }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.TIME.getCode())){
DateTimeFormatter df = DateTimeFormatter.ofPattern(action.getSystemTalkAnswerConfigEntity().getAnswerValue()); DateTimeFormatter df = DateTimeFormatter.ofPattern(action.getSystemTalkAnswerConfigEntity().getAnswerValue());
@ -639,13 +657,7 @@ public class BaseWebSocketProcess {
} }
} }
return deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).flatMap(i ->{ return deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).flatMap(i ->{
if(this instanceof BoxWebSocketHandler){
log.info("果box聊天记录同步到客户端");
BaseSession userSession = getUserSessionWithUserId(baseSession.getUserId());
if(userSession != null){
sendMsg(userSession, resp);
}
}
sendMsg(baseSession, resp); sendMsg(baseSession, resp);
return Mono.just(true); return Mono.just(true);
@ -687,28 +699,103 @@ public class BaseWebSocketProcess {
} }
private void sendMsg(BaseSession baseSession, BaseMessageResp baseMessageResp) { private void sendMsg(BaseSession baseSession, BaseMessageResp baseMessageResp) {
if(ttsSuanfa && this instanceof BoxWebSocketHandler){ if(this instanceof BoxWebSocketHandler){
String text = baseMessageResp.getText().replace("\n", "").replace("\t", ""); log.info("果box聊天记录同步到客户端");
if(text.startsWith("") || BaseSession userSession = getUserSessionWithUserId(baseSession.getUserId());
text.startsWith("") || if(userSession != null){
text.startsWith("") || sendMsg(userSession, baseMessageResp);
text.startsWith("") || }
text.startsWith("") || if(ttsSuanfa){
text.startsWith(" ")){ String text = baseMessageResp.getText().replace("\n", "").replace("\t", "");
//标点符号起始会导致合成的声音第一句话有杂音 if(text.startsWith("") ||
text = text.substring(1); text.startsWith("") ||
text.startsWith("") ||
text.startsWith("") ||
text.startsWith("") ||
text.startsWith(" ")){
//标点符号起始会导致合成的声音第一句话有杂音
text = text.substring(1);
}
if(text.length() > ONE_MAX_TEXT){
StringBuilder builder = new StringBuilder();
sendAudioMessage(baseSession,
baseMessageResp,
builder,
text,
0,
text.length() - 1,
baseSession.getRequestId()).subscribe();
}else{
BoxMessageResp boxMessageResp = new BoxMessageResp();
BeanUtils.copyProperties(baseMessageResp, boxMessageResp);
sendAudioMessage(baseSession, boxMessageResp).subscribe();
}
} }
audioService.getAudioUrl(text + "").map(s ->{
BoxMessageResp boxMessageResp = new BoxMessageResp();
BeanUtils.copyProperties(baseMessageResp, boxMessageResp);
boxMessageResp.setAudio(s);
sendMsg(baseSession, JSONObject.toJSONString(boxMessageResp));
return s;
}).subscribeOn(Schedulers.single()).subscribe();
}else{ }else{
sendMsg(baseSession, JSONObject.toJSONString(baseMessageResp)); sendMsg(baseSession, JSONObject.toJSONString(baseMessageResp));
} }
}
/**
* 分批按顺序发送超长ONE_MAX_TEXT文本音频
* @param baseSession
* @param baseMessageResp
* @param builder
* @param text
* @param n
* @param length
* @return
*/
private Mono<String> sendAudioMessage(BaseSession baseSession,
BaseMessageResp baseMessageResp,
StringBuilder builder,
String text,
int n,
int length,
Long requestId){
if(n < length && baseSession.getRequestId().equals(requestId)){
n += ONE_MAX_TEXT;
if(n > length){
n = length;
}
String message = text.substring(n - ONE_MAX_TEXT, n);
int status = 0;
if(n == length){
message += "";
status = 1;
builder.setLength(0);
}
message = getSendStr(builder, message);
int m = n;
if(StringUtils.isNotEmpty(message)){
BoxMessageResp boxMessageResp = new BoxMessageResp();
BeanUtils.copyProperties(baseMessageResp, boxMessageResp);
boxMessageResp.setText(message);
boxMessageResp.getTts().setStatus(status);
return sendAudioMessage(baseSession, boxMessageResp).flatMap(s -> {
return sendAudioMessage(baseSession, baseMessageResp, builder, text, m, length, requestId);
});
}else if(n < length){
return sendAudioMessage(baseSession, baseMessageResp, builder, text, m, length, requestId);
}
}
return Mono.just("");
}
/**
* 一次性发送文本
* @param baseSession
* @param boxMessageResp
* @return
*/
private Mono<String> sendAudioMessage(BaseSession baseSession, BoxMessageResp boxMessageResp){
return audioService.getAudioUrl(boxMessageResp.getText() + "").map(s ->{
boxMessageResp.setAudio(s);
sendMsg(baseSession, JSONObject.toJSONString(boxMessageResp));
return s;
});//.subscribeOn(Schedulers.single()).subscribe();
} }
private void sendMsg(BaseSession baseSession, String msg) { private void sendMsg(BaseSession baseSession, String msg) {

View File

@ -133,6 +133,7 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
} }
log.info("收到SN:{},消息:{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage()); log.info("收到SN:{},消息:{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage());
return nlpService.getActionWithLacSingle(boxSession.getUserId(), boxTalkMessage.getMessage()).defaultIfEmpty(new Actions()).flatMap(actions -> { return nlpService.getActionWithLacSingle(boxSession.getUserId(), boxTalkMessage.getMessage()).defaultIfEmpty(new Actions()).flatMap(actions -> {
boxSession.setRequestId(boxSession.getRequestId() + 1);
return processAction(actions, boxSession); return processAction(actions, boxSession);
//return Mono.empty(); //return Mono.empty();
}); });

View File

@ -121,6 +121,7 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
return nlpService.getActionWithLacSingle(userSession.getUserId(), userTalkMessage.getMessage()) return nlpService.getActionWithLacSingle(userSession.getUserId(), userTalkMessage.getMessage())
.defaultIfEmpty(new Actions()).flatMap(actions -> { .defaultIfEmpty(new Actions()).flatMap(actions -> {
//处理 //处理
userSession.setRequestId(userSession.getRequestId() + 1);
return processAction(actions, userSession); return processAction(actions, userSession);
}); });