增加文本最后一个标识

This commit is contained in:
wulin 2023-10-17 19:44:52 +08:00
parent a067db6e3f
commit cc208363d6
6 changed files with 70 additions and 32 deletions

View File

@ -11,6 +11,7 @@ import com.alibaba.dashscope.common.Role;
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse;
import com.qiuguo.iot.third.service.IQianWen;
import io.reactivex.functions.Consumer;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@ -56,7 +57,7 @@ public class AliYunQianWen {
public Mono<Boolean> sendMessage(String msg,
Consumer<? super String> onNext,
IQianWen<? super String> onNext,
QWenReplyResponse qwenReplyResponse) {
if(!canAsk){
msgManager = new MessageManager(10);
@ -82,7 +83,7 @@ public class AliYunQianWen {
public void onEvent(GenerationResult message) {
try {
onNext.accept(message.getOutput().getChoices().get(0).getMessage().getContent());
onNext.sendMessage(message.getOutput().getChoices().get(0).getMessage().getContent());
if(lastGenerationResult != null) {
lastGenerationResult.getOutput().getChoices().get(0).getMessage().setContent(
lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent()
@ -106,11 +107,7 @@ public class AliYunQianWen {
if(lastGenerationResult != null){
qwenReplyResponse.setResut(lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent());
}
try {
onNext.accept("");
} catch (Exception e) {
log.info("千问最后调用结束时异常{}", e);
}
onNext.finish();
lastGenerationResult = null;
semaphore.release();
}

View File

@ -0,0 +1,7 @@
package com.qiuguo.iot.third.service;
public interface IQianWen<T> {
void sendMessage(T var);
void finish();
}

View File

@ -37,7 +37,7 @@ public class QWenService {
protected static ConcurrentHashMap<String, AliYunQianWen> qianwenGroup = new ConcurrentHashMap<>();
public Mono<QWenReplyResponse> communication(TongYiCommunicationRest rest, Consumer<? super String> onNext){
public Mono<QWenReplyResponse> communication(TongYiCommunicationRest rest, IQianWen<? super String> onNext){
AliYunQianWen aliQianWen = null;
if (!qianwenGroup.containsKey(rest.getOnlyId())) {
aliQianWen = new AliYunQianWen(SpringUtil.getProperty("Ali.qianwen"));

View File

@ -19,4 +19,9 @@ public class BaseMessageResp {
* 在线播放音乐信息
*/
protected MusicResp music;
/**
* 文本推送情况
*/
protected TTSResp tts = new TTSResp();
}

View File

@ -0,0 +1,11 @@
package com.qiuguo.iot.box.websocket.api.domain;
import lombok.Data;
@Data
public class TTSResp {
/**
* 1标识最后一个文本0标识播放中
*/
Integer status = 1;
}

View File

@ -146,32 +146,41 @@ public class BaseWebSocketProcess {
tongYiCommunicationRest.setOnlyId(baseSession.getUserId().toString());
}
StringBuilder sb = new StringBuilder();
return qwenService.communication(tongYiCommunicationRest, message ->{
//通知到客户端
MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId());
if(tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())){
String old = sb.toString() + message;
int d = old.lastIndexOf("");
int j = old.lastIndexOf("");
int a = old.lastIndexOf("");
int b = old.lastIndexOf("\n");
int c = old.lastIndexOf("");
int n = old.lastIndexOf("\\n");
int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n);
if(m > 0){
//清空
sb.setLength(0);
sb.append(old.substring(m));
old = old.substring(0, m);
normalSendMsg(baseSession, old, type);
}else{
sb.append(message);
}
return qwenService.communication(tongYiCommunicationRest, new IQianWen<String>() {
@Override
public void sendMessage(String message) {
//通知到客户端
MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId());
if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) {
String old = sb.toString() + message;
int d = old.lastIndexOf("");
int j = old.lastIndexOf("");
int a = old.lastIndexOf("");
int b = old.lastIndexOf("\n");
int c = old.lastIndexOf("");
int n = old.lastIndexOf("\\n");
int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n);
if (m > 0) {
//清空
sb.setLength(0);
sb.append(old.substring(m));
old = old.substring(0, m);
normalSendMsg(baseSession, old, type, YesNo.NO.getCode());
} else {
sb.append(message);
}
return;
return;
}
log.info("已经有新的请求不在推送到客户端SN{} userId:{}", baseSession.getSn(), baseSession.getUserId());
MDC.remove(Log4Constans.PRINT_LOG_ID);
}
@Override
public void finish() {
log.info("千问最后调用finish");
normalSendMsg(baseSession, sb.toString(), type);
}
log.info("已经有新的请求不在推送到客户端SN{} userId:{}", baseSession.getSn(), baseSession.getUserId());
MDC.remove(Log4Constans.PRINT_LOG_ID);
}).flatMap(data ->{
if(data.getCode() == 200){
log.info("千问正常结束");
@ -686,6 +695,15 @@ public class BaseWebSocketProcess {
sendMsg(baseSession, msg);
}
protected void normalSendMsg(BaseSession baseSession, String message, Integer type, Integer finish){
BoxMessageResp resp = new BoxMessageResp();
resp.setType(type);
resp.setText(message);
resp.getTts().setStatus(finish);
String msg = JSONObject.toJSONString(resp);
sendMsg(baseSession, msg);
}
private void sendMsg(BaseSession baseSession, String msg) {
log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg);
baseSession.getSink().next(baseSession.getSession().textMessage(msg));