diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java index 469c2de..6d9440b 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java @@ -668,8 +668,8 @@ public class BaseWebSocketProcess { */ protected Mono closeSendMsg(BaseSession baseSession, String message, Integer type){ normalSendMsg(baseSession, message, type); + log.info("主动关闭设备链接"); return baseSession.getSession().close(); - } /** diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BoxWebSocketHandler.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BoxWebSocketHandler.java index a570265..8f73c93 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BoxWebSocketHandler.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BoxWebSocketHandler.java @@ -126,7 +126,9 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock BoxSession boxSession1 = getBoxSessionWithSn(boxTalkMessage.getSn()); if(!boxSession.equals(boxSession1)){ log.info("消息发送异常,或者未验签就收到信息不是同一个链接。可能传错SN"); - return closeSendMsg(boxSession, "请等待验签结束或者SN可能错误", AskTypeEnum.TTS.getCode()); + return closeSendMsg(boxSession, "请等待验签结束或者SN可能错误", AskTypeEnum.TTS.getCode()).flatMap(b -> { + return Mono.empty(); + }); } log.info("收到SN:{},消息:{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage()); return nlpService.getActionWithLacSingle(boxSession.getUserId(), boxTalkMessage.getMessage()).defaultIfEmpty(new Actions()).flatMap(actions -> { @@ -138,7 +140,7 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock private Mono disconnect(BoxSession boxSession){ MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId()); BoxSession boxSession1 = getBoxSessionWithSn(boxSession.getSn()); - if(boxSession.equals(boxSession1)){ + if(boxSession == boxSession1){ //断链后及时移除 boxGroup.remove(boxSession.getSn()); log.info("设备断开连接SN:{}", boxSession.getSn()); @@ -163,8 +165,10 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock private Mono errorLogin(BaseSession boxSession, String sn){ //清除异常redis - reactiveStringRedisTemplate.opsForValue().delete(RedisConstans.DEVICE_INFO + sn).subscribe(); - return closeSendMsg(boxSession, "异常,请重新登录", AskTypeEnum.TTS.getCode()); + return reactiveStringRedisTemplate.opsForValue().delete(RedisConstans.DEVICE_INFO + sn).flatMap(i -> { + return closeSendMsg(boxSession, "异常,请重新登录", AskTypeEnum.TTS.getCode()); + }); + } private Mono checkToken(BoxSession boxSession, String sn, Long linkTime, String signature, Long userId){ @@ -205,27 +209,23 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock if(!signalMd5.equals(signature)){ log.info("设备{},验签失败。正常签:{}", sn, signalMd5); if(boxSession != null){ - return closeSendMsg(boxSession, "验签失败", AskTypeEnum.TTS.getCode()).map(v ->{ - return Mono.empty(); - }); + return closeSendMsg(boxSession, "验签失败", AskTypeEnum.TTS.getCode()); } return Mono.just(dv); }else{ log.info("设备{},验签成功", sn); - BoxSession oldBoxSession = getBoxSessionWithSn(sn); - if(oldBoxSession != null){ - // - closeSendMsg(oldBoxSession, "您在其他地方登录", AskTypeEnum.TTS.getCode()).map(v ->{ - return dv; - }); - } return Mono.just(dv); } }).flatMap(d -> { DeviceInfoEntity dv = (DeviceInfoEntity)d; boxSession.setDeviceId(dv.getId()); - boxGroup.put(sn, boxSession); + return bindBox(dv, userId).flatMap(db ->{ + BoxSession oldBoxSession = getBoxSessionWithSn(sn); + boxGroup.put(sn, boxSession); + if(oldBoxSession != null){ + return closeSendMsg(oldBoxSession, "您在其他地方登录", AskTypeEnum.TTS.getCode()); + } return Mono.empty(); }); }); diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/CustomerWebSocketHandler.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/CustomerWebSocketHandler.java index 328de52..6e983b1 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/CustomerWebSocketHandler.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/CustomerWebSocketHandler.java @@ -112,7 +112,9 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We BaseSession userSession1 = getUserSessionWithUserId(userTalkMessage.getUserId()); if(!userSession.equals(userSession1)){ log.info("消息发送异常,或者未验签就收到信息不是同一个链接。可能传错用户ID"); - return closeSendMsg(userSession, "请等待验签结束或者用户ID可能错误", AskTypeEnum.TTS.getCode()); + return closeSendMsg(userSession, "请等待验签结束或者用户ID可能错误", AskTypeEnum.TTS.getCode()).flatMap(b -> { + return Mono.empty(); + }); } log.info("收到用户userId:{},消息:{}", userTalkMessage.getUserId(), userTalkMessage.getMessage()); return nlpService.getActionWithLacSingle(userSession.getUserId(), userTalkMessage.getMessage()) @@ -125,7 +127,7 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We private Mono disconnect(BaseSession userSession){ BaseSession userSession1 = getUserSessionWithUserId(userSession.getUserId()); - if(userSession.equals(userSession1)){ + if(userSession == userSession1){ userGroup.remove(userSession.getUserId());//断链后及时移除 log.info("用户断开连接userId:{}", userSession.getUserId()); } @@ -142,11 +144,7 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We Long userId1 = jsonObject.getJSONObject("data").getLong("id"); if(userId1.equals(userId)){ log.info("验签成功{}", userId); - BaseSession oldUserSession = getUserSessionWithUserId(userId); - if(oldUserSession != null){ - closeSendMsg(oldUserSession, "您在其他地方登录", AskTypeEnum.TTS.getCode()); - } - userGroup.put(userId, userSession); + DeviceUserBindRequest request = new DeviceUserBindRequest(); request.setUserId(userId); @@ -165,12 +163,19 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We }else{ normalSendMsg(userSession, "您暂未绑定果宝儿Box,快去绑定吧", AskTypeEnum.TTS.getCode()); } + BaseSession oldUserSession = getUserSessionWithUserId(userId); + userGroup.put(userId, userSession); + if(oldUserSession != null){ + return closeSendMsg(oldUserSession, "您在其他地方登录", AskTypeEnum.TTS.getCode()); + } return Mono.empty(); }); } } log.info("验签失败{}", userId); - return closeSendMsg(userSession, "非法登录", AskTypeEnum.TTS.getCode()); + return closeSendMsg(userSession, "非法登录", AskTypeEnum.TTS.getCode()).flatMap(b -> { + return Mono.empty(); + }); }); }