解决合并代码出现的问题

This commit is contained in:
wulin 2023-12-05 13:25:14 +08:00
parent 8c7af667d6
commit a985dace9c

View File

@ -1,189 +0,0 @@
package com.qiuguo.iot.box.websocket.api.command;
import cn.hutool.extra.spring.SpringUtil;
import com.qiuguo.iot.base.constans.Log4Constans;
import com.qiuguo.iot.base.enums.*;
import com.qiuguo.iot.base.utils.StringUtils;
import com.qiuguo.iot.box.websocket.api.domain.BaseSession;
import com.qiuguo.iot.box.websocket.api.domain.QueueMessage;
import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession;
import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration;
import com.qiuguo.iot.box.websocket.api.service.BaseWebSocketService;
import com.qiuguo.iot.data.constants.YunxiRabbitConst;
import com.qiuguo.iot.data.request.qwen.TongYiCommunicationRest;
import com.qiuguo.iot.data.service.mq.MqService;
import com.qiuguo.iot.third.nlp.action.Action;
import com.qiuguo.iot.third.nlp.action.Actions;
import com.qiuguo.iot.third.service.*;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import javax.annotation.Resource;
@Slf4j
public abstract class ActionCommand {
@Resource
protected QWenService qwenService;
@Resource
protected MqService mqService;
@Resource
BaseWebSocketService baseWebSocketService;
private static Mono<Boolean> process(Actions actions, int i, BaseSession baseSession){
if(i >= 0){
Action action = actions.getActions().get(i--);
return process(actions, i, baseSession).flatMap(v -> {
IActionCommand actionCommand;
if(action.getSystemTalkAnswerConfigEntity() == null ||
StringUtils.isEmpty(action.getSystemTalkAnswerConfigEntity().getBeanName())){
actionCommand = SpringUtil.getBean("qianWenActionCommand");
}else{
actionCommand = SpringUtil.getBean(action.getSystemTalkAnswerConfigEntity().getBeanName());
}
return actionCommand.process(action, baseSession);
});
}
return Mono.just(false);
}
public static Mono<Void> processAction(Actions actions, BaseSession baseSession) {
if(actions.getActions() == null || actions.getActions().size() == 0){
//调用千问回答\
log.info("调用千问{}", actions.getRecordText());
IActionCommand actionCommand = SpringUtil.getBean("qianWenActionCommand");
Action action = new Action();
action.setAsk(actions.getRecordText());
action.setAction(actions.getRecordText());
return actionCommand.process(action, baseSession).flatMap(vo ->{
return Mono.empty();
});
}
return process(actions, actions.getActions().size() - 1, baseSession).flatMap(vo ->{
return Mono.empty();
});
}
private Mono<Boolean> sendMessage(BaseSession baseSession, QueueMessage queue, StringBuilder sb, Integer type){
if(baseSession.getRequestId().equals(queue.getRequestId())){
String message = "";
if(queue.getQueue().size() > 0){
message = queue.getQueue().poll();
message = baseWebSocketService.getSendStr(sb, message, false);
}else if(queue.getStatus() == YesNo.NO.getCode().intValue()){
if(sb.length() == 0){
//结束了
log.info("发送结束了请求id{}", queue.getRequestId());
return Mono.empty();
}
message = sb.toString();
message = baseWebSocketService.removeStringChars(message);
sb.setLength(0);
}else{
try{
Thread.sleep(50);
}catch(Exception e){
log.info("等信息信息休息异常{}", e);
}
}
if(StringUtils.isNotEmpty(message)){
return baseWebSocketService.normalSendMsg(baseSession, message, type, YesNo.NO.getCode()).flatMap(m -> {
return sendMessage(baseSession, queue, sb, type);
});
}
return sendMessage(baseSession, queue, sb, type);
}
return Mono.empty();
}
protected Mono<Void> setQueueMessage(BaseSession baseSession, QueueMessage queue, Integer type){
return Mono.defer(() -> {
MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId());
StringBuilder sb = new StringBuilder();
return sendMessage(baseSession, queue, sb, type).flatMap(m -> {
MDC.remove(Log4Constans.PRINT_LOG_ID);
return Mono.empty();
});
});
}
protected Mono<Void> toQianWen(Action action, BaseSession baseSession, Integer type){
log.info("调用千问{}", action.getAsk());
TongYiCommunicationRest tongYiCommunicationRest = new TongYiCommunicationRest();
tongYiCommunicationRest.setText(action.getAsk());
tongYiCommunicationRest.setStatus("2");
tongYiCommunicationRest.setRequestId(baseSession.getRequestId());
if(baseSession instanceof BoxSession){
tongYiCommunicationRest.setOnlyId(baseSession.getSn());
}else{
tongYiCommunicationRest.setOnlyId(baseSession.getUserId().toString());
}
QueueMessage queueMessage = new QueueMessage();
queueMessage.setRequestId(baseSession.getRequestId());
return qwenService.communication(tongYiCommunicationRest, new IQianWen<String>() {
@Override
public void sendMessage(String message) {
//通知到客户端
MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId());
if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) {
queueMessage.getQueue().add(message);
if(queueMessage.getStatus() == YesNo.YES.getCode().intValue()){
queueMessage.setStatus(2);
setQueueMessage(baseSession, queueMessage, type).subscribeOn(Schedulers.single()).subscribe();
}
return;
}
log.info("已有新的请求不推送客户端SN{} userId:{}", baseSession.getSn(), baseSession.getUserId());
}
@Override
public void finish() {
log.info("千问最后调用finish");
queueMessage.setStatus(YesNo.NO.getCode());
MDC.remove(Log4Constans.PRINT_LOG_ID);
}
}).flatMap(data ->{
queueMessage.setStatus(YesNo.NO.getCode());
if(data.getCode() == 200){
log.info("千问正常结束");
//保存记录
return baseWebSocketService.saveTalkRecord(baseSession, action, data.getResut()).flatMap(i -> {
return Mono.empty();
});
}else{
return baseWebSocketService.sendMessage(action, baseSession, "很抱歉,我无法回答您的问题,请换一个问题。", AskTypeEnum.NONE.getCode()).flatMap(b ->{
return Mono.empty();
});
}
})/*.subscribeOn(Schedulers.boundedElastic()).subscribe()*/;
}
protected Mono<Boolean> sendMq(String msg){
log.info("通知U3DMQ:{}", msg);
try{
//发送消息到MQ通知U3D
return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT,
YunxiRabbitConst.ROUTE_KEY_YUNXI,
msg,
2);
}catch (Exception e){
log.info("通知U3D MQ异常{}", e);
}
//不调用empty防止影响正常业务
return Mono.just(false);
}
}