diff --git a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java index 8e50d3f..500f8aa 100644 --- a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java +++ b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java @@ -30,21 +30,20 @@ public class MqService { if (ack) { // 消息发送成功 confirmationResult.set(true); - log.info("MQ消息发送成功"); + //log.info("MQ消息发送成功"); } else { // 消息发送失败 confirmationResult.set(false); - log.info("MQ消息发送失败"); + //log.info("MQ消息发送失败"); } }); } public Mono sendMessageWithConfirmation(String exchange, String routingKey, Object message) { - rabbitTemplate.convertAndSend(exchange, routingKey, message); - - return Mono.defer(() -> { + rabbitTemplate.convertAndSend(exchange, routingKey, message); boolean result = confirmationResult.get(); + log.info("MQ消息发送:{}", result); return Mono.just(result); }); } diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunQianWen.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunQianWen.java index 6788c46..0d35635 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunQianWen.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/nlp/AliYunQianWen.java @@ -14,6 +14,7 @@ import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse; import io.reactivex.functions.Consumer; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; import java.util.concurrent.Semaphore; @@ -54,9 +55,9 @@ public class AliYunQianWen { } - public void sendMessage(String msg, - Consumer onNext, - QWenReplyResponse qwenReplyResponse) throws NoApiKeyException, InputRequiredException, InterruptedException { + public Mono sendMessage(String msg, + Consumer onNext, + QWenReplyResponse qwenReplyResponse) { if(!canAsk){ msgManager = new MessageManager(10); } @@ -73,52 +74,63 @@ public class AliYunQianWen { qwenParam.setMessages(msgManager.get()); Semaphore semaphore = new Semaphore(0); - gen.streamCall(qwenParam, new ResultCallback() { - @Override - public void onEvent(GenerationResult message) { + qwenReplyResponse.setCode(200); + return Mono.defer(() -> { + try { + gen.streamCall(qwenParam, new ResultCallback() { + @Override + public void onEvent(GenerationResult message) { - try { - onNext.accept(message.getOutput().getChoices().get(0).getMessage().getContent()); - if(lastGenerationResult != null) { - lastGenerationResult.getOutput().getChoices().get(0).getMessage().setContent( - lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent() - + message.getOutput().getChoices().get(0).getMessage().getContent() - ); + try { + onNext.accept(message.getOutput().getChoices().get(0).getMessage().getContent()); + if(lastGenerationResult != null) { + lastGenerationResult.getOutput().getChoices().get(0).getMessage().setContent( + lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent() + + message.getOutput().getChoices().get(0).getMessage().getContent() + ); - }else{ - lastGenerationResult = message; - msgManager.add(lastGenerationResult); - canAsk = !canAsk; + }else{ + lastGenerationResult = message; + msgManager.add(lastGenerationResult); + canAsk = !canAsk; + } + } catch (Exception e) { + log.info("千问回调异常{}", e); + qwenReplyResponse.setCode(500); + msgManager = new MessageManager(10); + } } - } catch (Exception e) { - log.info("千问回调异常{}", e); - qwenReplyResponse.setCode(500); - msgManager = new MessageManager(10); - } - } - @Override - public void onComplete() { - if(lastGenerationResult != null){ - qwenReplyResponse.setResut(lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent()); - } - try { - onNext.accept(""); - } catch (Exception e) { - log.info("千问最后调用结束时异常{}", e); - } - lastGenerationResult = null; - semaphore.release(); - } + @Override + public void onComplete() { + if(lastGenerationResult != null){ + qwenReplyResponse.setResut(lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent()); + } + try { + onNext.accept("。"); + } catch (Exception e) { + log.info("千问最后调用结束时异常{}", e); + } + lastGenerationResult = null; + semaphore.release(); + } - @Override - public void onError(Exception e) { + @Override + public void onError(Exception e) { + log.info("千问回调异常{}", e); + msgManager = new MessageManager(10); + qwenReplyResponse.setCode(500); + semaphore.release(); + } + }); + semaphore.acquire(); + } catch (Exception e) { log.info("调用千问异常{}", e); msgManager = new MessageManager(10); qwenReplyResponse.setCode(500); - semaphore.release(); } + return Mono.just(true); }); - semaphore.acquire(); + } } diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/LacNlpService.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/LacNlpService.java index f935de8..71df5c8 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/LacNlpService.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/LacNlpService.java @@ -63,7 +63,7 @@ public class LacNlpService implements INlp { @Override public Mono geSingletNlp(String value) { - return getSuanFaLac(value); + return getHubFaLac(value); } /** diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java index 9b1bd52..c2f3f5e 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java @@ -45,19 +45,11 @@ public class QWenService { } else { aliQianWen = qianwenGroup.get(rest.getOnlyId()); } - final AliYunQianWen aliQianWen1 = aliQianWen; - return Mono.just(new QWenReplyResponse()).map(qWenReplyResponse -> { - try { - qWenReplyResponse.setCode(200); - aliQianWen1.sendMessage(rest.getText(), onNext, qWenReplyResponse); - - } catch (Exception e) { - log.info("调用千问异常{}", e); - qWenReplyResponse.setCode(500); - - } - return qWenReplyResponse; + QWenReplyResponse qWenReplyResponse = new QWenReplyResponse(); + return aliQianWen.sendMessage(rest.getText(), onNext, qWenReplyResponse).flatMap(b -> { + return Mono.just(qWenReplyResponse); }); + } public Mono communication(TongYiCommunicationRest rest){ diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java index 7ab3567..0c03dc6 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java @@ -1,6 +1,7 @@ package com.qiuguo.iot.box.websocket.api.handler; import com.alibaba.fastjson.JSONObject; +import com.qiuguo.iot.base.constans.Log4Constans; import com.qiuguo.iot.base.enums.*; import com.qiuguo.iot.base.utils.StringUtils; import com.qiuguo.iot.box.websocket.api.domain.BaseMessageResp; @@ -38,15 +39,18 @@ import com.qiuguo.iot.third.service.*; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.api.crud.entity.PagerResult; +import org.slf4j.MDC; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.reactive.socket.WebSocketSession; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import javax.annotation.Resource; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; @Slf4j @@ -130,7 +134,7 @@ public class BaseWebSocketProcess { } } - private void toQianWen(Action action, BaseSession baseSession, Integer type){ + private Mono toQianWen(Action action, BaseSession baseSession, Integer type){ baseSession.setRequestId(baseSession.getRequestId() + 1); TongYiCommunicationRest tongYiCommunicationRest = new TongYiCommunicationRest(); tongYiCommunicationRest.setText(action.getAsk()); @@ -142,24 +146,23 @@ public class BaseWebSocketProcess { tongYiCommunicationRest.setOnlyId(baseSession.getUserId().toString()); } StringBuilder sb = new StringBuilder(); - qwenService.communication(tongYiCommunicationRest, message ->{ + return qwenService.communication(tongYiCommunicationRest, message ->{ //通知到客户端 - + MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId()); if(tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())){ String old = sb.toString() + message; - int d = old.indexOf(","); - int j = old.indexOf("。"); - int a = old.indexOf(":"); - int b = old.indexOf("\n"); - int c = old.indexOf(";"); - int n = old.indexOf("\\n"); + int d = old.lastIndexOf(","); + int j = old.lastIndexOf("。"); + int a = old.lastIndexOf(":"); + int b = old.lastIndexOf("\n"); + int c = old.lastIndexOf(";"); + int n = old.lastIndexOf("\\n"); int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n); if(m > 0){ //清空 sb.setLength(0); sb.append(old.substring(m)); old = old.substring(0, m); - normalSendMsg(baseSession, old, type); }else{ sb.append(message); @@ -168,8 +171,8 @@ public class BaseWebSocketProcess { return; } log.info("已经有新的请求,不在推送到客户端SN:{} userId:{}", baseSession.getSn(), baseSession.getUserId()); - - }).map(data ->{ + MDC.remove(Log4Constans.PRINT_LOG_ID); + }).flatMap(data ->{ if(data.getCode() == 200){ log.info("千问正常结束"); //保存记录 @@ -180,15 +183,18 @@ public class BaseWebSocketProcess { talkRecord.setAnswerValue(data.getResut()); talkRecord.setUserId(baseSession.getUserId()); talkRecord.setDeviceId(baseSession.getDeviceId()); - deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).subscribe(); + return deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).flatMap(b -> { + return Mono.empty(); + }); }else{ - sendMessage(action, baseSession, "很抱歉,我无法回答您的问题,请换一个问题。", AskTypeEnum.TTS.getCode()); + return sendMessage(action, baseSession, "很抱歉,我无法回答您的问题,请换一个问题。", AskTypeEnum.TTS.getCode()).flatMap(b ->{ + return Mono.empty(); + }); } - return data; - }).subscribeOn(Schedulers.boundedElastic()).subscribe(); + })/*.subscribeOn(Schedulers.boundedElastic()).subscribe()*/; } - private void toU3DMq(Action action, SystemTalkBindDeviceEntity systemTalkBindDeviceEntity, Long metaId){ + private Mono toU3DMq(Action action, SystemTalkBindDeviceEntity systemTalkBindDeviceEntity, Long metaId){ U3dMsg u3dMsg = new U3dMsg(); u3dMsg.setMsgType(U3dMsgTypeEnum.IOT.getCode()); @@ -196,10 +202,10 @@ public class BaseWebSocketProcess { u3dMsg.setTime(System.currentTimeMillis()); u3dMsg.setScenceId(String.valueOf(action.getDeviceUserBindEntity().getScenceId())); u3dMsg.setStatusId(String.valueOf(systemTalkBindDeviceEntity.getU3dStatusId())); - sendMq(JSONObject.toJSONString(u3dMsg)); + return sendMq(JSONObject.toJSONString(u3dMsg)); } - private void toU3DMq(SystemTalkBindU3dEntity systemTalkBindU3dEntity, Long metaId){ + private Mono toU3DMq(SystemTalkBindU3dEntity systemTalkBindU3dEntity, Long metaId){ U3dMsg u3dMsg = new U3dMsg(); u3dMsg.setMsgType(systemTalkBindU3dEntity.getU3dType()); @@ -208,290 +214,318 @@ public class BaseWebSocketProcess { u3dMsg.setExParam(systemTalkBindU3dEntity.getAnswerAction()); u3dMsg.setStatusId(String.valueOf(systemTalkBindU3dEntity.getU3dStatusId())); u3dMsg.setTime(System.currentTimeMillis()); - sendMq(JSONObject.toJSONString(u3dMsg)); + return sendMq(JSONObject.toJSONString(u3dMsg)); } - private void sendMq(String msg){ - log.info("通知U3DMQ{}", msg); + private Mono sendMq(String msg){ + log.info("通知U3DMQ:{}", msg); try{ //发送消息到MQ,通知U3D - mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, + return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, YunxiRabbitConst.ROUTE_KEY_YUNXI, - msg).subscribe(); + msg); }catch (Exception e){ log.info("通知U3D MQ异常{}", e); } + //不调用empty,防止影响正常业务 + return Mono.just(false); + } + + private Mono process(Actions actions, int i, BaseSession baseSession){ + if(i < actions.getActions().size()){ + Action action = actions.getActions().get(i++); + + return process(actions, i, baseSession).flatMap(v -> { + if(action.getSystemTalkAnswerConfigEntity() == null){ + log.info("调用千问{}", action.getAsk()); + + return toQianWen(action, baseSession, AskTypeEnum.TTS.getCode()).flatMap(vo ->{ + //千问只调一次 + return Mono.empty(); + }); + }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.IOT.getCode())){ + + if(StringUtils.isNotEmpty(action.getName())){ + + if(action.getDeviceUserBindEntity() == null){ + log.info("匹配时未找到对应的设备,模糊匹配{}", action.getName()); + DeviceUserBindRequest deviceUserBindRequest = new DeviceUserBindRequest(); + deviceUserBindRequest.setUserId(baseSession.getUserId()); + deviceUserBindRequest.setPageSize(200); + deviceUserBindRequest.setBindName(action.getName()); + + //查询是否有相关设备 + return deviceUserBindService.selectDeviceUserBindsByRequest(deviceUserBindRequest) + .defaultIfEmpty(new PagerResult<>(0, null)) + .flatMap(binds ->{ + if(binds.getTotal() == 0){ + //返回告诉没有备 + + return sendMessage(action, + baseSession, + "未找到" + action.getName() + "设备,无法操做!", + action.getSystemTalkAnswerConfigEntity().getAnswerType()); + }else if(binds.getTotal() > 1){ + //返回告诉有多个设备,请详细说明具体说明设备 + //判断action是否有所有、全部。一切 + if(YesNo.YES.getCode().equals(action.getIgnore())){ + //忽略词,控制所有设备 + return controllerDevice(action, binds.getData(), 0, baseSession); + }else{ + return sendMessage(action, + baseSession, + "您有多个" + action.getName() + "相同设备,请明确说明", + action.getSystemTalkAnswerConfigEntity().getAnswerType()); + } + + }else{ + //查询是否有相关指令绑定 + + action.setDeviceUserBindEntity(binds.getData().get(0)); + return controllerDevice(action, action.getDeviceUserBindEntity(), baseSession); + } + }); + }else{ + log.info("匹配时已找到对应的设备{}", action.getName()); + return controllerDevice(action, action.getDeviceUserBindEntity(), baseSession).flatMap(b -> { + return Mono.just(false); + }); + } + + }else{ + if(StringUtils.isEmpty(action.getName())){ + return sendMessage(action, baseSession, "请说明确的设备名称", action.getSystemTalkAnswerConfigEntity().getAnswerType()); + }else{ + return sendMessage(action, baseSession, "未找到对应的设备", action.getSystemTalkAnswerConfigEntity().getAnswerType()); + } + } + + }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.WEATHER.getCode())){ + ThirdWeatherInfoRequest req = new ThirdWeatherInfoRequest(); + //String city = ""; + if(action.getLbs() != null && action.getLbs().size() > 0){ + //根据地址查询天气 + // city = ; + req.setCity(action.getLbs().get(action.getLbs().size() - 1).replace("市", "") + .replace("区", "").replace("县", "")); + }else{ + //使用IP查询天气 + req.setIp("115.205.2.137"); + } + if(action.getTime() == null){ + action.setTime(new ActionTime()); + action.getTime().setTime("今天"); + } + return weatherService.tianqiApi(req).flatMap(t ->{ + if(t.getData() == null){ + return sendMessage(action, baseSession, "该城市不支持天气查询", action.getSystemTalkAnswerConfigEntity().getAnswerType()); + //return Mono.empty(); + } + + TianqiapiItemResp item = null; + + if(StringUtils.isNotEmpty(action.getTime().getDateTime())){ + //匹配对应的日期 + for (TianqiapiItemResp itemResp : t.getData()) + { + if(action.getTime().getDateTime().equals(itemResp.getDate())){ + item = itemResp; + break; + } + } + + }else{ + item = t.getData().get(0); + } + String msg = ""; + if(item != null){ + //返回给客户端播报内容 + msg = t.getCity() + action.getTime().getTime() + "天气" + + item.getNarrative().replace("每 km / h", "千米每小时") + + ",空气质量" + item.getAir_level() + + ",湿度" + item.getHumidity() + ",最低气温" + item.getTem2() + "°C"; + if(this instanceof BoxWebSocketHandler){ + WeatherResp weatherResp = new WeatherResp(); + weatherResp.setWeatherLocal(t.getCity()); + weatherResp.setWeatherTemperature(item.getTem1()); + weatherResp.setWeatherIcon(item.getWea()); + + BoxMessageResp resp = new BoxMessageResp(); + resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType()); + resp.setText(msg); + resp.setWeather(weatherResp); + return sendMessage(action, baseSession, resp); + //return Mono.empty(); + } + }else{ + msg = action.getSystemTalkAnswerConfigEntity().getAnswerValueFaild(); + + log.info("执行指令失败"); + } + return sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType()); + //return Mono.empty(); + }); + }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.MUSIC.getCode())){ + + String search = action.getAsk().replaceAll(action.getAction(), "").replaceAll("的", ""); + if(StringUtils.isNotEmpty(action.getPName())){ + search = search.replaceAll(action.getPName(), ""); + } + BoxMessageResp resp = new BoxMessageResp(); + if(action.getSystemTalkAnswerConfigEntity().getPlayType().equals(PlayEnum.START.getCode())){ + return musicService.searchMusic(search, 1).defaultIfEmpty(new ArrayList<>()).flatMap(resultSongs -> { + //BoxMessageResp resp = new BoxMessageResp(); + MusicResp musicResp = new MusicResp(); + if(resultSongs.size() > 0){ + // + SongInfoResponse.ResultSong song = resultSongs.get(0); + musicResp.setPlay(PlayEnum.START.getCode()); + musicResp.setName(song.getName()); + musicResp.setUrl(song.getUrl()); + musicResp.setSinger(song.getArtistName()); + resp.setText("现在为您播放" + song.getName() + + (StringUtils.isNotEmpty(song.getArtistName()) ? ("来自" + song.getArtistName()) : "")); + }else{ + musicResp.setPlay(PlayEnum.NONE.getCode()); + resp.setText("未找到相关资源"); + } + resp.setMusic(musicResp); + resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType()); + + return sendMessage(action, baseSession, resp); + }); + }else if(baseSession.getMusic() != null){ + //做相应的动作 + baseSession.getMusic().setPlay(action.getSystemTalkAnswerConfigEntity().getPlayType()); + resp.setMusic(baseSession.getMusic()); + resp.setText(action.getSystemTalkAnswerConfigEntity().getAnswerValue().replace("#name#", baseSession.getMusic().getName())); + resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType()); + //return sendMessage(action, baseSession, resp); + }else{ + resp.setText("目前无播放资源,无法操作"); + resp.setType(AskTypeEnum.TTS.getCode()); + + } + return sendMessage(action, baseSession, resp); + + }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.TTS.getCode())){ + if(!action.getAction().equals(action.getAsk())){ + return toQianWen(action, baseSession, AskTypeEnum.TTS.getCode()).flatMap(vo ->{ + //千问只调一次 + return Mono.empty(); + }); + }else{ + return sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode()); + } + }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.QIU_GUO.getCode())){ + return sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode()); + }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.TIME.getCode())){ + DateTimeFormatter df = DateTimeFormatter.ofPattern(action.getSystemTalkAnswerConfigEntity().getAnswerValue()); + if(action.getTime() == null){ + action.setTime(new ActionTime()); + action.getTime().setTime("今天"); + } + if(this instanceof BoxWebSocketHandler){ + DateTimeResp dateTimeResp = new DateTimeResp(); + dateTimeResp.setYear(String.valueOf(action.getTime().getDetailTime().getYear())); + dateTimeResp.setMonth(String.valueOf(action.getTime().getDetailTime().getMonthValue())); + dateTimeResp.setDay(String.valueOf(action.getTime().getDetailTime().getDayOfMonth())); + dateTimeResp.setHour(String.valueOf(action.getTime().getDetailTime().getHour())); + dateTimeResp.setMinute(String.valueOf(action.getTime().getDetailTime().getMinute())); + dateTimeResp.setSecond(String.valueOf(action.getTime().getDetailTime().getSecond())); + dateTimeResp.setWeak(String.valueOf(action.getTime().getDetailTime().getDayOfWeek().getValue())); + + BoxMessageResp resp = new BoxMessageResp(); + resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType()); + resp.setText(action.getTime().getDetailTime().format(df)); + resp.setTime(dateTimeResp); + return sendMessage(action, baseSession, resp); + + }else{ + return sendMessage(action, baseSession, action.getTime().getDetailTime().format(df), AskTypeEnum.TIME.getCode()); + } + + + + }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.U3D.getCode())){ + SystemTalkBindU3dRequest request = new SystemTalkBindU3dRequest(); + request.setSystemTalkId(action.getSystemTalkAnswerConfigEntity().getId()); + request.setAskCommon(action.getActionCommand()); + return systemTalkBindU3dService.selectSystemTalkBindU3dByRequest(request) + .defaultIfEmpty(new SystemTalkBindU3dEntity()) + .flatMap(systemTalkBindU3d ->{ + if(systemTalkBindU3d.getId() == null){ + return sendMessage(action, baseSession, "暂时不支持该指令", AskTypeEnum.TTS.getCode()); + //return systemTalkBindU3d; + } + //数字人的id直接用用户id来代替 + if(U3dMsgTypeEnum.DANCE.getCode().equals(systemTalkBindU3d.getU3dType())){ + //推送客户端跳舞 + BoxMessageResp resp = new BoxMessageResp(); + resp.setType(AskTypeEnum.U3D.getCode()); + resp.setText("开始跳舞"); + ActionResp actionResp = new ActionResp(); + actionResp.setType(7); + resp.setAction(actionResp); + return sendMessage(action, baseSession, resp); + }else{ + //推送MQ换装 + toU3DMq(systemTalkBindU3d, baseSession.getUserId()); + DeviceUserTalkRecordEntity talkRecord = new DeviceUserTalkRecordEntity(); + talkRecord.setAskType(AskTypeEnum.U3D.getCode()); + talkRecord.setAskValue(action.getAsk()); + talkRecord.setAskKey(action.getAction()); + talkRecord.setAnswerValue("正在" + action.getAction() + ",请稍候"); + talkRecord.setUserId(baseSession.getUserId()); + talkRecord.setDeviceId(baseSession.getDeviceId()); + return deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).flatMap(integer -> { + return Mono.just(true); + }); + } + + }); + }else{ + return toQianWen(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerType()).flatMap(vo ->{ + //千问只调一次 + return Mono.empty(); + }); + + } + }); + } + return Mono.just(false); } - protected void processAction(Actions actions, Long userId, BaseSession baseSession) { + protected Mono processAction(Actions actions, BaseSession baseSession) { if(actions.getActions() == null || actions.getActions().size() == 0){ //调用千问回答\ log.info("调用千问{}", actions.getRecordText()); Action action = new Action(); action.setAsk(actions.getRecordText()); action.setAction(actions.getRecordText()); - toQianWen(action, baseSession, AskTypeEnum.TTS.getCode()); - return; - } - //boolean isToQianWen = false; - for (Action action : actions.getActions() - ) { - log.info("匹配到自定义指令{}", action.getSystemTalkAnswerConfigEntity()); - if(action.getSystemTalkAnswerConfigEntity() == null){ - log.info("调用千问{}", action.getAsk()); - toQianWen(action, baseSession, AskTypeEnum.TTS.getCode()); - return; - // - }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.IOT.getCode())){ - - if(StringUtils.isNotEmpty(action.getName())){ - - if(action.getDeviceUserBindEntity() == null){ - log.info("匹配时未找到对应的设备,模糊匹配{}", action.getName()); - DeviceUserBindRequest deviceUserBindRequest = new DeviceUserBindRequest(); - deviceUserBindRequest.setUserId(userId); - deviceUserBindRequest.setPageSize(200); - deviceUserBindRequest.setBindName(action.getName()); - - //查询是否有相关设备 - deviceUserBindService.selectDeviceUserBindsByRequest(deviceUserBindRequest) - .defaultIfEmpty(new PagerResult<>(0, null)) - .map(binds ->{ - if(binds.getTotal() == 0){ - //返回告诉没有备 - - sendMessage(action, - baseSession, - "未找到" + action.getName() + "设备,无法操做!", - action.getSystemTalkAnswerConfigEntity().getAnswerType()); - }else if(binds.getTotal() > 1){ - //返回告诉有多个设备,请详细说明具体说明设备 - //判断action是否有所有、全部。一切 - if(YesNo.YES.getCode().equals(action.getIgnore())){ - //忽略词,控制所有设备 - for (DeviceUserBindEntity bindEntity :binds.getData() - ) { - action.setDeviceUserBindEntity(bindEntity); - controllerDevice(action, action.getDeviceUserBindEntity(), baseSession); - } - }else{ - sendMessage(action, - baseSession, - "您有多个" + action.getName() + "相同设备,请明确说明", - action.getSystemTalkAnswerConfigEntity().getAnswerType()); - } - - }else{ - //查询是否有相关指令绑定 - - action.setDeviceUserBindEntity(binds.getData().get(0)); - controllerDevice(action, action.getDeviceUserBindEntity(), baseSession); - - } - return Mono.empty(); - }).subscribe(); - }else{ - log.info("匹配时已找到对应的设备{}", action.getName()); - controllerDevice(action, action.getDeviceUserBindEntity(), baseSession); - } - - }else{ - if(StringUtils.isEmpty(action.getName())){ - sendMessage(action, baseSession, "请说明确的设备名称", action.getSystemTalkAnswerConfigEntity().getAnswerType()); - }else{ - sendMessage(action, baseSession, "未找到对应的设备", action.getSystemTalkAnswerConfigEntity().getAnswerType()); - } - - } - - }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.WEATHER.getCode())){ - ThirdWeatherInfoRequest req = new ThirdWeatherInfoRequest(); - //String city = ""; - if(action.getLbs() != null && action.getLbs().size() > 0){ - //根据地址查询天气 - // city = ; - req.setCity(action.getLbs().get(action.getLbs().size() - 1).replace("市", "") - .replace("区", "").replace("县", "")); - }else{ - //使用IP查询天气 - req.setIp("115.205.2.137"); - } - if(action.getTime() == null){ - action.setTime(new ActionTime()); - action.getTime().setTime("今天"); - } - weatherService.tianqiApi(req).map(t ->{ - if(t.getData() == null){ - sendMessage(action, baseSession, "该城市不支持天气查询", action.getSystemTalkAnswerConfigEntity().getAnswerType()); - return t; - } - - TianqiapiItemResp item = null; - - if(StringUtils.isNotEmpty(action.getTime().getDateTime())){ - //匹配对应的日期 - for (TianqiapiItemResp itemResp : t.getData()) - { - if(action.getTime().getDateTime().equals(itemResp.getDate())){ - item = itemResp; - break; - } - } - - }else{ - item = t.getData().get(0); - } - String msg = ""; - if(item != null){ - //返回给客户端播报内容 - msg = t.getCity() + action.getTime().getTime() + "天气" - + item.getNarrative().replace("每 km / h", "千米每小时") - + ",空气质量" + item.getAir_level() - + ",湿度" + item.getHumidity() + ",最低气温" + item.getTem2() + "°C"; - if(this instanceof BoxWebSocketHandler){ - WeatherResp weatherResp = new WeatherResp(); - weatherResp.setWeatherLocal(t.getCity()); - weatherResp.setWeatherTemperature(item.getTem1()); - weatherResp.setWeatherIcon(item.getWea()); - - BoxMessageResp resp = new BoxMessageResp(); - resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType()); - resp.setText(msg); - resp.setWeather(weatherResp); - sendMessage(action, baseSession, resp); - return t; - } - - }else{ - msg = action.getSystemTalkAnswerConfigEntity().getAnswerValueFaild(); - - log.info("执行指令失败"); - } - - sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType()); - return t; - }).subscribe(); - }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.MUSIC.getCode())){ - - String search = action.getAsk().replaceAll(action.getAction(), "").replaceAll("的", ""); - if(StringUtils.isNotEmpty(action.getPName())){ - search = search.replaceAll(action.getPName(), ""); - } - BoxMessageResp resp = new BoxMessageResp(); - if(action.getSystemTalkAnswerConfigEntity().getPlayType().equals(PlayEnum.START.getCode())){ - musicService.searchMusic(search, 1).defaultIfEmpty(new ArrayList<>()).map(resultSongs -> { - //BoxMessageResp resp = new BoxMessageResp(); - MusicResp musicResp = new MusicResp(); - if(resultSongs.size() > 0){ - // - SongInfoResponse.ResultSong song = resultSongs.get(0); - musicResp.setPlay(PlayEnum.START.getCode()); - musicResp.setName(song.getName()); - musicResp.setUrl(song.getUrl()); - musicResp.setSinger(song.getArtistName()); - resp.setText("现在为您播放" + song.getName() + - (StringUtils.isNotEmpty(song.getArtistName()) ? ("来自" + song.getArtistName()) : "")); - }else{ - musicResp.setPlay(PlayEnum.NONE.getCode()); - resp.setText("未找到相关资源"); - } - resp.setMusic(musicResp); - resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType()); - - sendMessage(action, baseSession, resp); - return resultSongs; - }).subscribe(); - }else if(baseSession.getMusic() != null){ - //做相应的动作 - baseSession.getMusic().setPlay(action.getSystemTalkAnswerConfigEntity().getPlayType()); - resp.setMusic(baseSession.getMusic()); - resp.setText(action.getSystemTalkAnswerConfigEntity().getAnswerValue().replace("#name#", baseSession.getMusic().getName())); - resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType()); - sendMessage(action, baseSession, resp); - }else{ - resp.setText("目前无播放资源,无法操作"); - resp.setType(AskTypeEnum.TTS.getCode()); - sendMessage(action, baseSession, resp); - } - - }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.TTS.getCode())){ - if(!action.getAction().equals(action.getAsk())){ - toQianWen(action, baseSession, AskTypeEnum.TTS.getCode()); - return; - }else{ - sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode()); - } - }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.QIU_GUO.getCode())){ - sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode()); - }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.TIME.getCode())){ - DateTimeFormatter df = DateTimeFormatter.ofPattern(action.getSystemTalkAnswerConfigEntity().getAnswerValue()); - if(this instanceof BoxWebSocketHandler){ - DateTimeResp dateTimeResp = new DateTimeResp(); - dateTimeResp.setYear(String.valueOf(action.getTime().getDetailTime().getYear())); - dateTimeResp.setMonth(String.valueOf(action.getTime().getDetailTime().getMonthValue())); - dateTimeResp.setDay(String.valueOf(action.getTime().getDetailTime().getDayOfMonth())); - dateTimeResp.setHour(String.valueOf(action.getTime().getDetailTime().getHour())); - dateTimeResp.setMinute(String.valueOf(action.getTime().getDetailTime().getMinute())); - dateTimeResp.setSecond(String.valueOf(action.getTime().getDetailTime().getSecond())); - dateTimeResp.setWeak(String.valueOf(action.getTime().getDetailTime().getDayOfWeek().getValue())); - - BoxMessageResp resp = new BoxMessageResp(); - resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType()); - resp.setText(action.getTime().getDetailTime().format(df)); - resp.setTime(dateTimeResp); - sendMessage(action, baseSession, resp); - - }else{ - sendMessage(action, baseSession, action.getTime().getDetailTime().format(df), AskTypeEnum.TIME.getCode()); - } - - - - }else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.U3D.getCode())){ - SystemTalkBindU3dRequest request = new SystemTalkBindU3dRequest(); - request.setSystemTalkId(action.getSystemTalkAnswerConfigEntity().getId()); - request.setAskCommon(action.getActionCommand()); - systemTalkBindU3dService.selectSystemTalkBindU3dByRequest(request) - .defaultIfEmpty(new SystemTalkBindU3dEntity()) - .map(systemTalkBindU3d ->{ - if(systemTalkBindU3d.getId() == null){ - sendMessage(action, baseSession, "暂时不支持该指令", AskTypeEnum.TTS.getCode()); - return systemTalkBindU3d; - } - //数字人的id直接用用户id来代替 - if(U3dMsgTypeEnum.DANCE.getCode().equals(systemTalkBindU3d.getU3dType())){ - //推送客户端跳舞 - BoxMessageResp resp = new BoxMessageResp(); - resp.setType(AskTypeEnum.U3D.getCode()); - resp.setText("开始跳舞"); - ActionResp actionResp = new ActionResp(); - actionResp.setType(7); - resp.setAction(actionResp); - sendMessage(action, baseSession, resp); - }else{ - //推送MQ换装 - toU3DMq(systemTalkBindU3d, baseSession.getUserId()); - DeviceUserTalkRecordEntity talkRecord = new DeviceUserTalkRecordEntity(); - talkRecord.setAskType(AskTypeEnum.U3D.getCode()); - talkRecord.setAskValue(action.getAsk()); - talkRecord.setAskKey(action.getAction()); - talkRecord.setAnswerValue("正在" + action.getAction() + ",请稍候"); - talkRecord.setUserId(baseSession.getUserId()); - talkRecord.setDeviceId(baseSession.getDeviceId()); - deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).subscribe(); - } - - return systemTalkBindU3d; - }).subscribe(); - }else{ - toQianWen(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerType()); - return; - } + return toQianWen(action, baseSession, AskTypeEnum.TTS.getCode()); + } + return process(actions, 0, baseSession).flatMap(vo ->{ + return Mono.empty(); + }); } - private void controllerDevice(Action action, DeviceUserBindEntity deviceUserBindEntity, BaseSession baseSession) { + private Mono controllerDevice(Action action, + List deviceUserBindEntitys, + int i, + BaseSession baseSession){ + if(i < deviceUserBindEntitys.size()){ + DeviceUserBindEntity deviceUserBindEntity = deviceUserBindEntitys.get(i++); + action.setDeviceUserBindEntity(deviceUserBindEntity); + return controllerDevice(action, deviceUserBindEntitys, i, baseSession).flatMap(b -> { + return controllerDevice(action, deviceUserBindEntity, baseSession); + }); + } + return Mono.just(false); + } + private Mono controllerDevice(Action action, DeviceUserBindEntity deviceUserBindEntity, BaseSession baseSession) { SystemTalkBindDeviceRequest systemTalkBindDeviceRequest = new SystemTalkBindDeviceRequest(); systemTalkBindDeviceRequest.setCategoryCode(deviceUserBindEntity.getCategoryCode()); @@ -499,12 +533,12 @@ public class BaseWebSocketProcess { if(StringUtils.isNotEmpty(action.getActionCommand())){ systemTalkBindDeviceRequest.setAskCommon(action.getActionCommand()); } - systemTalkBindDeviceService.selectSystemTalkBindDeviceByRequest(systemTalkBindDeviceRequest) + return systemTalkBindDeviceService.selectSystemTalkBindDeviceByRequest(systemTalkBindDeviceRequest) .defaultIfEmpty(new SystemTalkBindDeviceEntity()) - .map(systemTalkBindDeviceEntity -> { + .flatMap(systemTalkBindDeviceEntity -> { if(systemTalkBindDeviceEntity.getId() == null){ //通知不支持的指令 - sendMessage(action, baseSession, + return sendMessage(action, baseSession, deviceUserBindEntity.getBindName() + "不支持" + action.getAction() + "指令!", action.getSystemTalkAnswerConfigEntity().getAnswerType()); @@ -514,46 +548,46 @@ public class BaseWebSocketProcess { query.setDeviceId(deviceUserBindEntity.getOtherDeviceId()); query.setValue(action.getStatus()); query.setUserHandlingDeviceId(systemTalkBindDeviceEntity.getUserHandlingId()); - tuyaDeviceService.controlDevice(query).map(isOk ->{ + return tuyaDeviceService.controlDevice(query).flatMap(isOk ->{ String msg = ""; if(isOk.getCode().equals(RespCodeEnum.SUCESS.getCode())){ - if(action.getDeviceUserBindEntity().getU3dId() != null){ - toU3DMq(action, systemTalkBindDeviceEntity, action.getDeviceUserBindEntity().getU3dId()); - } + log.info("执行指令成功"); //通知打开灯成功 msg = systemTalkBindDeviceEntity.getAnswerValue().replaceAll("#name#", deviceUserBindEntity.getBindName()); if(StringUtils.isNotEmpty(action.getStatus())){ msg = msg.replace("#value#", action.getStatus()); } + return sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType()).flatMap(b -> { + if(deviceUserBindEntity.getU3dId() != null){ + return toU3DMq(action, systemTalkBindDeviceEntity, deviceUserBindEntity.getU3dId()).flatMap(bo -> { + return Mono.just(bo); + }); + } + return Mono.just(b); + }); - - log.info("执行指令"); }else{ //通知开灯失败; msg = systemTalkBindDeviceEntity.getAnswerValueFaild().replaceAll("#name#", deviceUserBindEntity.getBindName()); log.info("执行指令失败"); + return sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType()); } - - sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType()); - - return isOk; - }).subscribe(); + }); } - return systemTalkBindDeviceEntity; - }).subscribe(); + }); } - private void sendMessage(Action action, BaseSession baseSession, String message, Integer type){ + private Mono sendMessage(Action action, BaseSession baseSession, String message, Integer type){ BaseMessageResp resp = new BaseMessageResp(); resp.setType(type); resp.setText(message); - sendMessage(action, baseSession, resp); + return sendMessage(action, baseSession, resp); } - private void sendMessage(Action action, BaseSession baseSession, BaseMessageResp resp){ + private Mono sendMessage(Action action, BaseSession baseSession, BaseMessageResp resp){ DeviceUserTalkRecordEntity talkRecord = new DeviceUserTalkRecordEntity(); talkRecord.setAskType(resp.getType()); talkRecord.setAskValue(action.getAsk()); @@ -610,7 +644,7 @@ public class BaseWebSocketProcess { } } - deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).map(i ->{ + return deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).flatMap(i ->{ String msg = JSONObject.toJSONString(resp); if(this instanceof BoxWebSocketHandler){ log.info("果box聊天记录,同步到客户端"); @@ -621,8 +655,8 @@ public class BaseWebSocketProcess { } sendMsg(baseSession, msg); - return Mono.empty(); - }).subscribe();//保存聊天记录 + return Mono.just(true); + });//保存聊天记录 } /** @@ -631,9 +665,9 @@ public class BaseWebSocketProcess { * @param message * @param type */ - protected void closeSendMsg(BaseSession baseSession, String message, Integer type){ + protected Mono closeSendMsg(BaseSession baseSession, String message, Integer type){ normalSendMsg(baseSession, message, type); - baseSession.getSession().close().subscribe(); + return baseSession.getSession().close(); } @@ -654,6 +688,7 @@ public class BaseWebSocketProcess { private void sendMsg(BaseSession baseSession, String msg) { log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg); baseSession.getSink().next(baseSession.getSession().textMessage(msg)); + } public BoxSession getBoxSessionWithSn(String sn) { diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BoxWebSocketHandler.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BoxWebSocketHandler.java index 1a1effb..a570265 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BoxWebSocketHandler.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BoxWebSocketHandler.java @@ -27,6 +27,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.stereotype.Component; import org.springframework.web.reactive.socket.*; import reactor.core.publisher.*; +import reactor.util.context.Context; import javax.annotation.Resource; import java.time.Duration; @@ -79,29 +80,21 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock log.info("登录成功SN:{}", sn); Mono input = session.receive().map(webSocketMessage ->{ - MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId()); - String text = webSocketMessage.getPayloadAsText(); - log.info("设备端收到消息:{}", text); - BoxTalkMessage boxTalkMessage = JSONObject.parseObject(text, BoxTalkMessage.class); - BoxSession boxSession1 = getBoxSessionWithSn(boxTalkMessage.getSn()); - if(!boxSession.equals(boxSession1)){ - log.info("消息发送异常,或者未验签就收到信息不是同一个链接。可能传错SN"); - closeSendMsg(boxSession, "请等待验签结束或者SN可能错误", AskTypeEnum.TTS.getCode()); - return Mono.empty(); - } - nlpService.getActionWithLacSingle(boxSession.getUserId(), boxTalkMessage.getMessage()).defaultIfEmpty(new Actions()).map(actions -> { - processAction(actions, userId, boxSession); - return Mono.empty(); - }).subscribe(); + newMessage(webSocketMessage, boxSession).contextWrite(context -> { + Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId()); - log.info("收到SN:{},消息:{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage()); - //MDC.remove(LogMdcConfiguration.PRINT_LOG_ID); - return Mono.empty(); + return contextTmp; + }).subscribe(); + return webSocketMessage; }).then(); //校验 - checkToken(boxSession, sn, linkTime, signature, userId); + checkToken(boxSession, sn, linkTime, signature, userId).contextWrite(context -> { + Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId()); + + return contextTmp; + }).subscribe(); Mono output = session.send(Flux.create(sink -> boxSession.setSink(sink))).then(); @@ -109,23 +102,11 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock // 也随之产生 error 或 complete,此时其它的 Mono 则会被执行取消操作。 return Mono.zip(input, output).doFinally(signalType -> { - BoxSession boxSession1 = getBoxSessionWithSn(boxSession.getSn()); - if(boxSession == boxSession1){ - boxGroup.remove(boxSession.getSn());//断链后及时移除 - log.info("设备断开连接SN:{}", boxSession.getSn()); - //通知用户端设备绑定成功 - sendNoticeToUser(boxSession.getUserId(), "设备离线,设备序列号:" + boxSession.getSn(), AskTypeEnum.BOX_OFF_LINE.getCode()); - deviceInfoService.setOnLineStatus(sn, YesNo.NO.getCode()).subscribe(); - UserDeviceInfoModel userDeviceInfoModel = new UserDeviceInfoModel(); - userDeviceInfoModel.setStatus(YesNo.NO.getCode()); - userDeviceInfoModel.setUserId(userId); - userDeviceInfoModel.setSn(sn); - reactiveStringRedisTemplate.opsForValue().set(RedisConstans.USER_BOX_INFO + userId, - JSONObject.toJSONString(userDeviceInfoModel), - Duration.ofDays(RedisConstans.TEN_YEAR)).subscribe(); - return; - } - log.info("被踢下线断开连接:{}", boxSession.getSn()); + disconnect(boxSession).contextWrite(context -> { + Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId()); + + return contextTmp; + }).subscribe(); }).then(); } @@ -137,26 +118,73 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock } } - private void errorLogin(BaseSession boxSession, String sn){ - //清除异常redis - reactiveStringRedisTemplate.opsForValue().delete(RedisConstans.DEVICE_INFO + sn).subscribe();//不需要时间 - closeSendMsg(boxSession, "异常,请重新登录", AskTypeEnum.TTS.getCode()); + private Mono newMessage(WebSocketMessage webSocketMessage, BoxSession boxSession){ + MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId()); + String text = webSocketMessage.getPayloadAsText(); + log.info("设备端收到消息:{}", text); + BoxTalkMessage boxTalkMessage = JSONObject.parseObject(text, BoxTalkMessage.class); + BoxSession boxSession1 = getBoxSessionWithSn(boxTalkMessage.getSn()); + if(!boxSession.equals(boxSession1)){ + log.info("消息发送异常,或者未验签就收到信息不是同一个链接。可能传错SN"); + return closeSendMsg(boxSession, "请等待验签结束或者SN可能错误", AskTypeEnum.TTS.getCode()); + } + log.info("收到SN:{},消息:{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage()); + return nlpService.getActionWithLacSingle(boxSession.getUserId(), boxTalkMessage.getMessage()).defaultIfEmpty(new Actions()).flatMap(actions -> { + return processAction(actions, boxSession); + //return Mono.empty(); + }); } - private void checkToken(BoxSession boxSession, String sn, Long linkTime, String signature, Long userId){ - reactiveStringRedisTemplate.opsForValue().get(RedisConstans.DEVICE_INFO + sn).defaultIfEmpty("").flatMap(s -> { - if(com.qiuguo.iot.base.utils.StringUtils.isNotBlank(s) && s.length() < 1000){ + private Mono disconnect(BoxSession boxSession){ + MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId()); + BoxSession boxSession1 = getBoxSessionWithSn(boxSession.getSn()); + if(boxSession.equals(boxSession1)){ + //断链后及时移除 + boxGroup.remove(boxSession.getSn()); + log.info("设备断开连接SN:{}", boxSession.getSn()); + //通知用户端设备绑定成功 + sendNoticeToUser(boxSession.getUserId(), "设备离线,设备序列号:" + boxSession.getSn(), AskTypeEnum.BOX_OFF_LINE.getCode()); + return deviceInfoService.setOnLineStatus(boxSession.getSn(), YesNo.NO.getCode()).flatMap(integer -> { + UserDeviceInfoModel userDeviceInfoModel = new UserDeviceInfoModel(); + userDeviceInfoModel.setStatus(YesNo.NO.getCode()); + userDeviceInfoModel.setUserId(boxSession.getUserId()); + userDeviceInfoModel.setSn(boxSession.getSn()); + return reactiveStringRedisTemplate.opsForValue().set(RedisConstans.USER_BOX_INFO + boxSession.getUserId(), + JSONObject.toJSONString(userDeviceInfoModel), + Duration.ofDays(RedisConstans.TEN_YEAR)).flatMap(b -> { + return Mono.empty(); + }); + }); + + } + log.info("被踢下线断开连接:{}", boxSession.getSn()); + return Mono.empty(); + } + + private Mono errorLogin(BaseSession boxSession, String sn){ + //清除异常redis + reactiveStringRedisTemplate.opsForValue().delete(RedisConstans.DEVICE_INFO + sn).subscribe(); + return closeSendMsg(boxSession, "异常,请重新登录", AskTypeEnum.TTS.getCode()); + } + + private Mono checkToken(BoxSession boxSession, String sn, Long linkTime, String signature, Long userId){ + return reactiveStringRedisTemplate.opsForValue().get(RedisConstans.DEVICE_INFO + sn).defaultIfEmpty("").flatMap(s -> { + if(com.qiuguo.iot.base.utils.StringUtils.isNotBlank(s)){ try{ DeviceInfoEntity dv = JSONObject.parseObject(s, DeviceInfoEntity.class); if(dv.getId() == null){ log.info("redis设备缓存异常,清楚"); - errorLogin(boxSession, sn); + return errorLogin(boxSession, sn).flatMap(v -> { + return Mono.empty(); + }); } return Mono.just(dv); }catch (Exception e){ log.info("转换异常,清除redis。下次连接成功{}", e); - errorLogin(boxSession, sn); + return errorLogin(boxSession, sn).flatMap(v -> { + return Mono.empty(); + }); } } DeviceInfoRequest request = new DeviceInfoRequest(); @@ -164,97 +192,104 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock return deviceInfoService.selectDeviceInfoByRequest(request).defaultIfEmpty(new DeviceInfoEntity()).map(dv -> { if(dv.getId() != null){ String redis = JSONObject.toJSONString(dv); - reactiveStringRedisTemplate.opsForValue().set(RedisConstans.DEVICE_INFO + dv.getSn(), redis, Duration.ofHours(1)).subscribe();//直接提交订阅 } return dv; }); - }).map(dv ->{ + }).flatMap(dv ->{ String snMd5 = MD5.create().digestHex(sn).toUpperCase(); String wifiMd5 = MD5.create().digestHex(dv.getWifiMac()).toUpperCase(); String btMd5 = MD5.create().digestHex(dv.getBtMac()).toUpperCase(); String signalMd5 = MD5.create().digestHex(snMd5 + wifiMd5 + btMd5 + linkTime + dv.getKey()).toUpperCase(); if(!signalMd5.equals(signature)){ log.info("设备{},验签失败。正常签:{}", sn, signalMd5); - //session.send(session.textMessage("")); - if(boxSession != null){ - closeSendMsg(boxSession, "验签失败", AskTypeEnum.TTS.getCode()); + return closeSendMsg(boxSession, "验签失败", AskTypeEnum.TTS.getCode()).map(v ->{ + return Mono.empty(); + }); } - + return Mono.just(dv); }else{ log.info("设备{},验签成功", sn); BoxSession oldBoxSession = getBoxSessionWithSn(sn); if(oldBoxSession != null){ // - closeSendMsg(oldBoxSession, "您在其他地方登录", AskTypeEnum.TTS.getCode()); + closeSendMsg(oldBoxSession, "您在其他地方登录", AskTypeEnum.TTS.getCode()).map(v ->{ + return dv; + }); } - boxSession.setDeviceId(dv.getId()); - boxGroup.put(sn, boxSession); - bindBox(dv, userId); + return Mono.just(dv); } - return Mono.empty(); - }).subscribe(); + }).flatMap(d -> { + DeviceInfoEntity dv = (DeviceInfoEntity)d; + boxSession.setDeviceId(dv.getId()); + boxGroup.put(sn, boxSession); + return bindBox(dv, userId).flatMap(db ->{ + return Mono.empty(); + }); + }); } - private void bindBox(DeviceInfoEntity dv, Long userId){ + private Mono bindBox(DeviceInfoEntity dv, Long userId){ log.info("开始绑定设备userId:{}, SN:{}", userId, dv); DeviceUserBindRequest request = new DeviceUserBindRequest(); request.setUserId(userId); request.setDeviceId(dv.getId()); //跟新在线状态 - deviceInfoService.setOnLineStatus(dv.getId(), YesNo.YES.getCode()).subscribe(); + return deviceInfoService.setOnLineStatus(dv.getId(), YesNo.YES.getCode()).flatMap(integer -> { + return deviceUserBindService.selectDeviceUserBindByRequest(request) + .defaultIfEmpty(new DeviceUserBindEntity()) + .flatMap(entity ->{ + if(entity.getId() == null){ + entity.setUserId(userId); + entity.setDeviceId(dv.getId()); + //设置为主设备 + entity.setIsMain(YesNo.YES.getCode()); + entity.setOtherDeviceId(dv.getSn()); + entity.setCategoryCode(DeviceCodeEnum.BOX.getName()); + entity.setBindName("果宝儿Box"); + return deviceUserBindService.setNoMain(userId, DeviceTypeEnum.GUO_BOX.getCode()).defaultIfEmpty(0).flatMap(m ->{ + log.info("解除历史isMain标注个数{}", m); + return deviceUserBindService.insertDeviceUserBind(entity).flatMap(l ->{ + log.info("绑定成功SN:{} userId:{}", dv, userId); + //下面所有的以前未主设备改成非主设备 + //通知用户端设备绑定成功 + sendNoticeToUser(userId, "设备绑定成功,设备序列号:" + dv.getSn(), AskTypeEnum.DEVICE_BIND.getCode()); + UserDeviceInfoModel userDeviceInfoModel = new UserDeviceInfoModel(); + userDeviceInfoModel.setStatus(YesNo.YES.getCode()); + userDeviceInfoModel.setUserId(userId); + userDeviceInfoModel.setSn(entity.getOtherDeviceId()); + return reactiveStringRedisTemplate.opsForValue().set(RedisConstans.USER_BOX_INFO + userId, + JSONObject.toJSONString(userDeviceInfoModel), + Duration.ofDays(RedisConstans.TEN_YEAR)).flatMap(b -> { + return Mono.just(entity); + }); + }); + }); - - - deviceUserBindService.selectDeviceUserBindByRequest(request) - .defaultIfEmpty(new DeviceUserBindEntity()) - .map(entity ->{ - if(entity.getId() == null){ - entity.setUserId(userId); - entity.setDeviceId(dv.getId()); - //设置为主设备 - entity.setIsMain(YesNo.YES.getCode()); - entity.setOtherDeviceId(dv.getSn()); - entity.setCategoryCode(DeviceCodeEnum.BOX.getName()); - entity.setBindName("果宝儿Box"); - deviceUserBindService.setNoMain(userId, DeviceTypeEnum.GUO_BOX.getCode()).defaultIfEmpty(0).map(m ->{ - log.info("解除历史isMain标注个数{}", m); - deviceUserBindService.insertDeviceUserBind(entity).map(l ->{ - log.info("绑定成功SN:{} userId:{}", dv, userId); - //下面所有的以前未主设备改成非主设备 - //通知用户端设备绑定成功 - sendNoticeToUser(userId, "设备绑定成功,设备序列号:" + dv.getSn(), AskTypeEnum.DEVICE_BIND.getCode()); - UserDeviceInfoModel userDeviceInfoModel = new UserDeviceInfoModel(); - userDeviceInfoModel.setStatus(YesNo.YES.getCode()); - userDeviceInfoModel.setUserId(userId); - userDeviceInfoModel.setSn(entity.getOtherDeviceId()); - reactiveStringRedisTemplate.opsForValue().set(RedisConstans.USER_BOX_INFO + userId, - JSONObject.toJSONString(userDeviceInfoModel), - Duration.ofDays(RedisConstans.TEN_YEAR)).subscribe(); - - return Mono.empty(); - }).subscribe(); - return Mono.empty(); - }).subscribe(); - - }else{ - if(entity.getIsBind().equals(YesNo.YES.getCode())){ - //通知用户端设备绑定成功 - sendNoticeToUser(userId, "设备联网成功,设备序列号:" + dv.getSn(), AskTypeEnum.BOX_ON_LINE.getCode()); }else{ - //通知用户端设备绑定成功 - BaseSession boxSession = getBoxSessionWithSn(dv.getSn()); - if(boxSession != null){ - closeSendMsg(boxSession, "设备已解绑无法继续使用", AskTypeEnum.DEVICE_UNBIND.getCode()); + if(entity.getIsBind().equals(YesNo.YES.getCode())){ + //通知用户端设备绑定成功 + sendNoticeToUser(userId, "设备联网成功,设备序列号:" + dv.getSn(), AskTypeEnum.BOX_ON_LINE.getCode()); + }else{ + //通知用户端设备绑定成功 + BaseSession boxSession = getBoxSessionWithSn(dv.getSn()); + if(boxSession != null){ + return closeSendMsg(boxSession, "设备已解绑无法继续使用", AskTypeEnum.DEVICE_UNBIND.getCode()).flatMap( + v -> { + return Mono.just(entity); + } + ); + } } - } - } - return Mono.empty(); - }).subscribe(); + } + return Mono.just(entity); + }); + }); + } } diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/CustomerWebSocketHandler.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/CustomerWebSocketHandler.java index 7df4cfc..328de52 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/CustomerWebSocketHandler.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/CustomerWebSocketHandler.java @@ -7,6 +7,7 @@ import com.qiuguo.iot.base.enums.DeviceTypeEnum; import com.qiuguo.iot.base.enums.YesNo; import com.qiuguo.iot.base.utils.WebClientUtils; import com.qiuguo.iot.box.websocket.api.domain.BaseSession; +import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession; import com.qiuguo.iot.box.websocket.api.domain.user.UserTalkMessage; import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration; import com.qiuguo.iot.box.websocket.api.filter.LogWebFilter; @@ -22,9 +23,11 @@ import org.springframework.http.HttpHeaders; import org.springframework.stereotype.Component; import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import javax.annotation.Resource; import java.util.HashMap; @@ -72,36 +75,20 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We userSession.setLogId(headers.get(LogMdcConfiguration.PRINT_LOG_ID).get(0)); log.info("用户成功userId:{}", userId); Mono input = session.receive().map(webSocketMessage ->{ - MDC.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId()); - String text = webSocketMessage.getPayloadAsText(); - log.info("收到用户消息:{}", text); - UserTalkMessage userTalkMessage = JSONObject.parseObject(text, UserTalkMessage.class); - BaseSession userSession1 = getUserSessionWithUserId(userTalkMessage.getUserId()); - if(!userSession.equals(userSession1)){ - log.info("消息发送异常,或者未验签就收到信息不是同一个链接。可能传错用户ID"); - closeSendMsg(userSession, "请等待验签结束或者用户ID可能错误", AskTypeEnum.TTS.getCode()); - return Mono.empty(); - } - nlpService.getActionWithLacSingle(userSession.getUserId(), userTalkMessage.getMessage()) - .defaultIfEmpty(new Actions()).map(actions -> { - //处理 - if(actions.getActions() == null || actions.getActions().size() == 0){ - //调用千问回答 - log.info("未匹配到自定义命令,调用千问"); - }else{ - processAction(actions, userId, userSession); - } + newMessage(webSocketMessage, userSession).contextWrite(context -> { + Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId()); - return Mono.empty(); - }).subscribe(); - - log.info("收到用户userId:{},消息:{}", userTalkMessage.getUserId(), userTalkMessage.getMessage()); - //MDC.remove(LogMdcConfiguration.PRINT_LOG_ID); - return Mono.empty(); + return contextTmp; + }).subscribe(); + return webSocketMessage; }).then(); - checkToken(userSession, type, token, userId); + checkToken(userSession, type, token, userId).contextWrite(context -> { + Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId()); + + return contextTmp; + }).subscribe(); Mono output = session.send(Flux.create(sink -> userSession.setSink(sink))).then(); @@ -109,19 +96,47 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We // 也随之产生 error 或 complete,此时其它的 Mono 则会被执行取消操作。 return Mono.zip(input, output).doFinally(signalType -> { - BaseSession userSession1 = getUserSessionWithUserId(userId); - if(userSession1 == userSession){ - userGroup.remove(userSession.getUserId());//断链后及时移除 - log.info("用户断开连接userId:{}", userSession.getUserId()); - } + disconnect(userSession).contextWrite(context -> { + Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId()); + + return contextTmp; + }).subscribe();; }).then(); } - private void checkToken(BaseSession userSession, String type, String token, Long userId){ + private Mono newMessage(WebSocketMessage webSocketMessage, BaseSession userSession){ + MDC.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId()); + String text = webSocketMessage.getPayloadAsText(); + log.info("收到用户消息:{}", text); + UserTalkMessage userTalkMessage = JSONObject.parseObject(text, UserTalkMessage.class); + BaseSession userSession1 = getUserSessionWithUserId(userTalkMessage.getUserId()); + if(!userSession.equals(userSession1)){ + log.info("消息发送异常,或者未验签就收到信息不是同一个链接。可能传错用户ID"); + return closeSendMsg(userSession, "请等待验签结束或者用户ID可能错误", AskTypeEnum.TTS.getCode()); + } + log.info("收到用户userId:{},消息:{}", userTalkMessage.getUserId(), userTalkMessage.getMessage()); + return nlpService.getActionWithLacSingle(userSession.getUserId(), userTalkMessage.getMessage()) + .defaultIfEmpty(new Actions()).flatMap(actions -> { + //处理 + return processAction(actions, userSession); + }); + + } + + private Mono disconnect(BaseSession userSession){ + BaseSession userSession1 = getUserSessionWithUserId(userSession.getUserId()); + if(userSession.equals(userSession1)){ + userGroup.remove(userSession.getUserId());//断链后及时移除 + log.info("用户断开连接userId:{}", userSession.getUserId()); + } + return Mono.empty(); + } + + private Mono checkToken(BaseSession userSession, String type, String token, Long userId){ Map reqHead = new HashMap<>(); reqHead.put(apiType, type); reqHead.put(apiToken, token); - WebClientUtils.get(checkTokenUrl, reqHead).defaultIfEmpty(new JSONObject()).map(jsonObject -> { + return WebClientUtils.get(checkTokenUrl, reqHead).defaultIfEmpty(new JSONObject()).flatMap(jsonObject -> { log.info("验签获取的数据{}", jsonObject); if(jsonObject.getInteger("code").equals(YesNo.YES.getCode())){ Long userId1 = jsonObject.getJSONObject("data").getLong("id"); @@ -138,9 +153,9 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We request.setDeviceType(DeviceTypeEnum.GUO_BOX.getCode()); request.setIsMain(YesNo.YES.getCode()); - deviceUserBindService.selectDeviceUserBindByRequest(request) + return deviceUserBindService.selectDeviceUserBindByRequest(request) .defaultIfEmpty(new DeviceUserBindEntity()) - .map(deviceUserBindEntity -> { + .flatMap(deviceUserBindEntity -> { if(deviceUserBindEntity.getId() != null){ log.info("用户绑定信息为{}", deviceUserBindEntity); @@ -151,15 +166,12 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We normalSendMsg(userSession, "您暂未绑定果宝儿Box,快去绑定吧", AskTypeEnum.TTS.getCode()); } return Mono.empty(); - }).subscribe(); - return Mono.empty(); + }); } - } log.info("验签失败{}", userId); - closeSendMsg(userSession, "非法登录", AskTypeEnum.TTS.getCode()); - return Mono.empty(); - }).subscribe(); + return closeSendMsg(userSession, "非法登录", AskTypeEnum.TTS.getCode()); + }); }