diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java deleted file mode 100644 index 4205b87..0000000 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java +++ /dev/null @@ -1,189 +0,0 @@ -package com.qiuguo.iot.box.websocket.api.command; - -import cn.hutool.extra.spring.SpringUtil; -import com.qiuguo.iot.base.constans.Log4Constans; -import com.qiuguo.iot.base.enums.*; -import com.qiuguo.iot.base.utils.StringUtils; -import com.qiuguo.iot.box.websocket.api.domain.BaseSession; -import com.qiuguo.iot.box.websocket.api.domain.QueueMessage; -import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession; -import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration; -import com.qiuguo.iot.box.websocket.api.service.BaseWebSocketService; -import com.qiuguo.iot.data.constants.YunxiRabbitConst; -import com.qiuguo.iot.data.request.qwen.TongYiCommunicationRest; -import com.qiuguo.iot.data.service.mq.MqService; -import com.qiuguo.iot.third.nlp.action.Action; -import com.qiuguo.iot.third.nlp.action.Actions; -import com.qiuguo.iot.third.service.*; -import lombok.extern.slf4j.Slf4j; -import org.slf4j.MDC; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -import javax.annotation.Resource; - -@Slf4j -public abstract class ActionCommand { - - - @Resource - protected QWenService qwenService; - - @Resource - protected MqService mqService; - - @Resource - BaseWebSocketService baseWebSocketService; - - - - private static Mono process(Actions actions, int i, BaseSession baseSession){ - if(i >= 0){ - Action action = actions.getActions().get(i--); - - return process(actions, i, baseSession).flatMap(v -> { - IActionCommand actionCommand; - if(action.getSystemTalkAnswerConfigEntity() == null || - StringUtils.isEmpty(action.getSystemTalkAnswerConfigEntity().getBeanName())){ - actionCommand = SpringUtil.getBean("qianWenActionCommand"); - }else{ - actionCommand = SpringUtil.getBean(action.getSystemTalkAnswerConfigEntity().getBeanName()); - } - - return actionCommand.process(action, baseSession); - }); - } - return Mono.just(false); - } - public static Mono processAction(Actions actions, BaseSession baseSession) { - if(actions.getActions() == null || actions.getActions().size() == 0){ - //调用千问回答\ - log.info("调用千问{}", actions.getRecordText()); - IActionCommand actionCommand = SpringUtil.getBean("qianWenActionCommand"); - Action action = new Action(); - action.setAsk(actions.getRecordText()); - action.setAction(actions.getRecordText()); - return actionCommand.process(action, baseSession).flatMap(vo ->{ - return Mono.empty(); - }); - } - return process(actions, actions.getActions().size() - 1, baseSession).flatMap(vo ->{ - return Mono.empty(); - }); - } - - private Mono sendMessage(BaseSession baseSession, QueueMessage queue, StringBuilder sb, Integer type){ - if(baseSession.getRequestId().equals(queue.getRequestId())){ - - String message = ""; - if(queue.getQueue().size() > 0){ - message = queue.getQueue().poll(); - message = baseWebSocketService.getSendStr(sb, message, false); - }else if(queue.getStatus() == YesNo.NO.getCode().intValue()){ - if(sb.length() == 0){ - //结束了 - log.info("发送结束了,请求id:{}", queue.getRequestId()); - return Mono.empty(); - } - message = sb.toString(); - message = baseWebSocketService.removeStringChars(message); - sb.setLength(0); - }else{ - try{ - Thread.sleep(50); - }catch(Exception e){ - log.info("等信息信息休息异常{}", e); - } - } - if(StringUtils.isNotEmpty(message)){ - return baseWebSocketService.normalSendMsg(baseSession, message, type, YesNo.NO.getCode()).flatMap(m -> { - return sendMessage(baseSession, queue, sb, type); - }); - } - return sendMessage(baseSession, queue, sb, type); - } - return Mono.empty(); - } - - protected Mono setQueueMessage(BaseSession baseSession, QueueMessage queue, Integer type){ - return Mono.defer(() -> { - MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId()); - StringBuilder sb = new StringBuilder(); - return sendMessage(baseSession, queue, sb, type).flatMap(m -> { - MDC.remove(Log4Constans.PRINT_LOG_ID); - return Mono.empty(); - }); - - }); - - } - - protected Mono toQianWen(Action action, BaseSession baseSession, Integer type){ - log.info("调用千问{}", action.getAsk()); - TongYiCommunicationRest tongYiCommunicationRest = new TongYiCommunicationRest(); - tongYiCommunicationRest.setText(action.getAsk()); - tongYiCommunicationRest.setStatus("2"); - tongYiCommunicationRest.setRequestId(baseSession.getRequestId()); - if(baseSession instanceof BoxSession){ - tongYiCommunicationRest.setOnlyId(baseSession.getSn()); - }else{ - tongYiCommunicationRest.setOnlyId(baseSession.getUserId().toString()); - } - - - QueueMessage queueMessage = new QueueMessage(); - queueMessage.setRequestId(baseSession.getRequestId()); - return qwenService.communication(tongYiCommunicationRest, new IQianWen() { - @Override - public void sendMessage(String message) { - //通知到客户端 - MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId()); - if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) { - queueMessage.getQueue().add(message); - if(queueMessage.getStatus() == YesNo.YES.getCode().intValue()){ - queueMessage.setStatus(2); - setQueueMessage(baseSession, queueMessage, type).subscribeOn(Schedulers.single()).subscribe(); - } - return; - } - log.info("已有新的请求,不推送客户端SN:{} userId:{}", baseSession.getSn(), baseSession.getUserId()); - } - - @Override - public void finish() { - log.info("千问最后调用finish"); - queueMessage.setStatus(YesNo.NO.getCode()); - MDC.remove(Log4Constans.PRINT_LOG_ID); - } - }).flatMap(data ->{ - queueMessage.setStatus(YesNo.NO.getCode()); - if(data.getCode() == 200){ - log.info("千问正常结束"); - //保存记录 - return baseWebSocketService.saveTalkRecord(baseSession, action, data.getResut()).flatMap(i -> { - return Mono.empty(); - }); - }else{ - return baseWebSocketService.sendMessage(action, baseSession, "很抱歉,我无法回答您的问题,请换一个问题。", AskTypeEnum.NONE.getCode()).flatMap(b ->{ - return Mono.empty(); - }); - } - })/*.subscribeOn(Schedulers.boundedElastic()).subscribe()*/; - } - - protected Mono sendMq(String msg){ - log.info("通知U3DMQ:{}", msg); - try{ - //发送消息到MQ,通知U3D - return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, - YunxiRabbitConst.ROUTE_KEY_YUNXI, - msg, - 2); - - }catch (Exception e){ - log.info("通知U3D MQ异常{}", e); - } - //不调用empty,防止影响正常业务 - return Mono.just(false); - } -}