互踢功能

This commit is contained in:
wulin 2023-10-17 17:58:25 +08:00
parent dd51f57dc8
commit a067db6e3f
3 changed files with 29 additions and 24 deletions

View File

@ -668,8 +668,8 @@ public class BaseWebSocketProcess {
*/ */
protected Mono<Void> closeSendMsg(BaseSession baseSession, String message, Integer type){ protected Mono<Void> closeSendMsg(BaseSession baseSession, String message, Integer type){
normalSendMsg(baseSession, message, type); normalSendMsg(baseSession, message, type);
log.info("主动关闭设备链接");
return baseSession.getSession().close(); return baseSession.getSession().close();
} }
/** /**

View File

@ -126,7 +126,9 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
BoxSession boxSession1 = getBoxSessionWithSn(boxTalkMessage.getSn()); BoxSession boxSession1 = getBoxSessionWithSn(boxTalkMessage.getSn());
if(!boxSession.equals(boxSession1)){ if(!boxSession.equals(boxSession1)){
log.info("消息发送异常或者未验签就收到信息不是同一个链接。可能传错SN"); log.info("消息发送异常或者未验签就收到信息不是同一个链接。可能传错SN");
return closeSendMsg(boxSession, "请等待验签结束或者SN可能错误", AskTypeEnum.TTS.getCode()); return closeSendMsg(boxSession, "请等待验签结束或者SN可能错误", AskTypeEnum.TTS.getCode()).flatMap(b -> {
return Mono.empty();
});
} }
log.info("收到SN:{},消息:{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage()); log.info("收到SN:{},消息:{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage());
return nlpService.getActionWithLacSingle(boxSession.getUserId(), boxTalkMessage.getMessage()).defaultIfEmpty(new Actions()).flatMap(actions -> { return nlpService.getActionWithLacSingle(boxSession.getUserId(), boxTalkMessage.getMessage()).defaultIfEmpty(new Actions()).flatMap(actions -> {
@ -138,7 +140,7 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
private Mono<Void> disconnect(BoxSession boxSession){ private Mono<Void> disconnect(BoxSession boxSession){
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId()); MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
BoxSession boxSession1 = getBoxSessionWithSn(boxSession.getSn()); BoxSession boxSession1 = getBoxSessionWithSn(boxSession.getSn());
if(boxSession.equals(boxSession1)){ if(boxSession == boxSession1){
//断链后及时移除 //断链后及时移除
boxGroup.remove(boxSession.getSn()); boxGroup.remove(boxSession.getSn());
log.info("设备断开连接SN{}", boxSession.getSn()); log.info("设备断开连接SN{}", boxSession.getSn());
@ -163,8 +165,10 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
private Mono<Void> errorLogin(BaseSession boxSession, String sn){ private Mono<Void> errorLogin(BaseSession boxSession, String sn){
//清除异常redis //清除异常redis
reactiveStringRedisTemplate.opsForValue().delete(RedisConstans.DEVICE_INFO + sn).subscribe(); return reactiveStringRedisTemplate.opsForValue().delete(RedisConstans.DEVICE_INFO + sn).flatMap(i -> {
return closeSendMsg(boxSession, "异常,请重新登录", AskTypeEnum.TTS.getCode()); return closeSendMsg(boxSession, "异常,请重新登录", AskTypeEnum.TTS.getCode());
});
} }
private Mono<Void> checkToken(BoxSession boxSession, String sn, Long linkTime, String signature, Long userId){ private Mono<Void> checkToken(BoxSession boxSession, String sn, Long linkTime, String signature, Long userId){
@ -205,27 +209,23 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
if(!signalMd5.equals(signature)){ if(!signalMd5.equals(signature)){
log.info("设备{},验签失败。正常签:{}", sn, signalMd5); log.info("设备{},验签失败。正常签:{}", sn, signalMd5);
if(boxSession != null){ if(boxSession != null){
return closeSendMsg(boxSession, "验签失败", AskTypeEnum.TTS.getCode()).map(v ->{ return closeSendMsg(boxSession, "验签失败", AskTypeEnum.TTS.getCode());
return Mono.empty();
});
} }
return Mono.just(dv); return Mono.just(dv);
}else{ }else{
log.info("设备{},验签成功", sn); log.info("设备{},验签成功", sn);
BoxSession oldBoxSession = getBoxSessionWithSn(sn);
if(oldBoxSession != null){
//
closeSendMsg(oldBoxSession, "您在其他地方登录", AskTypeEnum.TTS.getCode()).map(v ->{
return dv;
});
}
return Mono.just(dv); return Mono.just(dv);
} }
}).flatMap(d -> { }).flatMap(d -> {
DeviceInfoEntity dv = (DeviceInfoEntity)d; DeviceInfoEntity dv = (DeviceInfoEntity)d;
boxSession.setDeviceId(dv.getId()); boxSession.setDeviceId(dv.getId());
boxGroup.put(sn, boxSession);
return bindBox(dv, userId).flatMap(db ->{ return bindBox(dv, userId).flatMap(db ->{
BoxSession oldBoxSession = getBoxSessionWithSn(sn);
boxGroup.put(sn, boxSession);
if(oldBoxSession != null){
return closeSendMsg(oldBoxSession, "您在其他地方登录", AskTypeEnum.TTS.getCode());
}
return Mono.empty(); return Mono.empty();
}); });
}); });

View File

@ -112,7 +112,9 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
BaseSession userSession1 = getUserSessionWithUserId(userTalkMessage.getUserId()); BaseSession userSession1 = getUserSessionWithUserId(userTalkMessage.getUserId());
if(!userSession.equals(userSession1)){ if(!userSession.equals(userSession1)){
log.info("消息发送异常或者未验签就收到信息不是同一个链接。可能传错用户ID"); log.info("消息发送异常或者未验签就收到信息不是同一个链接。可能传错用户ID");
return closeSendMsg(userSession, "请等待验签结束或者用户ID可能错误", AskTypeEnum.TTS.getCode()); return closeSendMsg(userSession, "请等待验签结束或者用户ID可能错误", AskTypeEnum.TTS.getCode()).flatMap(b -> {
return Mono.empty();
});
} }
log.info("收到用户userId:{},消息:{}", userTalkMessage.getUserId(), userTalkMessage.getMessage()); log.info("收到用户userId:{},消息:{}", userTalkMessage.getUserId(), userTalkMessage.getMessage());
return nlpService.getActionWithLacSingle(userSession.getUserId(), userTalkMessage.getMessage()) return nlpService.getActionWithLacSingle(userSession.getUserId(), userTalkMessage.getMessage())
@ -125,7 +127,7 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
private Mono<Void> disconnect(BaseSession userSession){ private Mono<Void> disconnect(BaseSession userSession){
BaseSession userSession1 = getUserSessionWithUserId(userSession.getUserId()); BaseSession userSession1 = getUserSessionWithUserId(userSession.getUserId());
if(userSession.equals(userSession1)){ if(userSession == userSession1){
userGroup.remove(userSession.getUserId());//断链后及时移除 userGroup.remove(userSession.getUserId());//断链后及时移除
log.info("用户断开连接userId{}", userSession.getUserId()); log.info("用户断开连接userId{}", userSession.getUserId());
} }
@ -142,11 +144,7 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
Long userId1 = jsonObject.getJSONObject("data").getLong("id"); Long userId1 = jsonObject.getJSONObject("data").getLong("id");
if(userId1.equals(userId)){ if(userId1.equals(userId)){
log.info("验签成功{}", userId); log.info("验签成功{}", userId);
BaseSession oldUserSession = getUserSessionWithUserId(userId);
if(oldUserSession != null){
closeSendMsg(oldUserSession, "您在其他地方登录", AskTypeEnum.TTS.getCode());
}
userGroup.put(userId, userSession);
DeviceUserBindRequest request = new DeviceUserBindRequest(); DeviceUserBindRequest request = new DeviceUserBindRequest();
request.setUserId(userId); request.setUserId(userId);
@ -165,12 +163,19 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
}else{ }else{
normalSendMsg(userSession, "您暂未绑定果宝儿Box快去绑定吧", AskTypeEnum.TTS.getCode()); normalSendMsg(userSession, "您暂未绑定果宝儿Box快去绑定吧", AskTypeEnum.TTS.getCode());
} }
BaseSession oldUserSession = getUserSessionWithUserId(userId);
userGroup.put(userId, userSession);
if(oldUserSession != null){
return closeSendMsg(oldUserSession, "您在其他地方登录", AskTypeEnum.TTS.getCode());
}
return Mono.empty(); return Mono.empty();
}); });
} }
} }
log.info("验签失败{}", userId); log.info("验签失败{}", userId);
return closeSendMsg(userSession, "非法登录", AskTypeEnum.TTS.getCode()); return closeSendMsg(userSession, "非法登录", AskTypeEnum.TTS.getCode()).flatMap(b -> {
return Mono.empty();
});
}); });
} }