分装部分nlp
This commit is contained in:
parent
c18f94cf40
commit
a27062040a
@ -97,6 +97,9 @@ public class SystemTalkAnswerConfigService extends GenericReactiveCrudService<Sy
|
|||||||
if(request.getKeyOrder() != null){
|
if(request.getKeyOrder() != null){
|
||||||
reactiveQuery = reactiveQuery.and(SystemTalkAnswerConfigRequest::getKeyOrder, request.getKeyOrder());
|
reactiveQuery = reactiveQuery.and(SystemTalkAnswerConfigRequest::getKeyOrder, request.getKeyOrder());
|
||||||
}
|
}
|
||||||
|
if(request.getAnswerType() != null){
|
||||||
|
reactiveQuery = reactiveQuery.and(SystemTalkAnswerConfigRequest::getAnswerType, request.getAnswerType());
|
||||||
|
}
|
||||||
SortOrder sortOrder = null;
|
SortOrder sortOrder = null;
|
||||||
if(StringUtils.isNotEmpty(request.getOrder())){
|
if(StringUtils.isNotEmpty(request.getOrder())){
|
||||||
if(StringUtils.isNotEmpty(request.getSort()) && request.getSort().compareTo("0") == 0){
|
if(StringUtils.isNotEmpty(request.getSort()) && request.getSort().compareTo("0") == 0){
|
||||||
@ -156,6 +159,9 @@ public class SystemTalkAnswerConfigService extends GenericReactiveCrudService<Sy
|
|||||||
if(StringUtils.isNotEmpty(request.getAnswerBackImg())){
|
if(StringUtils.isNotEmpty(request.getAnswerBackImg())){
|
||||||
reactiveQuery = reactiveQuery.$like$(SystemTalkAnswerConfigRequest::getAnswerBackImg, request.getAnswerBackImg());
|
reactiveQuery = reactiveQuery.$like$(SystemTalkAnswerConfigRequest::getAnswerBackImg, request.getAnswerBackImg());
|
||||||
}
|
}
|
||||||
|
if(request.getAnswerType() != null){
|
||||||
|
reactiveQuery = reactiveQuery.and(SystemTalkAnswerConfigRequest::getAnswerType, request.getAnswerType());
|
||||||
|
}
|
||||||
if(request.getKeyOrder() != null){
|
if(request.getKeyOrder() != null){
|
||||||
reactiveQuery = reactiveQuery.and(SystemTalkAnswerConfigRequest::getKeyOrder, request.getKeyOrder());
|
reactiveQuery = reactiveQuery.and(SystemTalkAnswerConfigRequest::getKeyOrder, request.getKeyOrder());
|
||||||
}
|
}
|
||||||
@ -229,6 +235,9 @@ public class SystemTalkAnswerConfigService extends GenericReactiveCrudService<Sy
|
|||||||
if(entity.getKeyOrder() != null){
|
if(entity.getKeyOrder() != null){
|
||||||
update = update.set(SystemTalkAnswerConfigEntity::getKeyOrder, entity.getKeyOrder());
|
update = update.set(SystemTalkAnswerConfigEntity::getKeyOrder, entity.getKeyOrder());
|
||||||
}
|
}
|
||||||
|
if(entity.getAnswerType() != null){
|
||||||
|
update = update.set(SystemTalkAnswerConfigEntity::getAnswerType, entity.getAnswerType());
|
||||||
|
}
|
||||||
return update.where(SystemTalkAnswerConfigEntity::getId, entity.getId()).and("is_delete", 0).execute();
|
return update.where(SystemTalkAnswerConfigEntity::getId, entity.getId()).and("is_delete", 0).execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,6 +256,7 @@ public class SystemTalkAnswerConfigService extends GenericReactiveCrudService<Sy
|
|||||||
update = update.set(SystemTalkAnswerConfigEntity::getAnswerBackSound, entity.getAnswerBackSound());
|
update = update.set(SystemTalkAnswerConfigEntity::getAnswerBackSound, entity.getAnswerBackSound());
|
||||||
update = update.set(SystemTalkAnswerConfigEntity::getAnswerBackImg, entity.getAnswerBackImg());
|
update = update.set(SystemTalkAnswerConfigEntity::getAnswerBackImg, entity.getAnswerBackImg());
|
||||||
update = update.set(SystemTalkAnswerConfigEntity::getKeyOrder, entity.getKeyOrder());
|
update = update.set(SystemTalkAnswerConfigEntity::getKeyOrder, entity.getKeyOrder());
|
||||||
|
update = update.set(SystemTalkAnswerConfigEntity::getAnswerType, entity.getAnswerType());
|
||||||
return update.where(SystemTalkAnswerConfigEntity::getId, entity.getId()).and("is_delete", 0).execute();
|
return update.where(SystemTalkAnswerConfigEntity::getId, entity.getId()).and("is_delete", 0).execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -261,5 +271,7 @@ public class SystemTalkAnswerConfigService extends GenericReactiveCrudService<Sy
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public SystemTalkAnswerConfigEntity getSystemTalkWithKey(String key) {
|
||||||
|
return group.get(key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,5 @@
|
|||||||
package com.qiuguo.iot.base.nlp;
|
package com.qiuguo.iot.third.nlp;
|
||||||
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package com.qiuguo.iot.base.nlp;
|
package com.qiuguo.iot.third.nlp;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package com.qiuguo.iot.base.nlp;
|
package com.qiuguo.iot.third.nlp;
|
||||||
|
|
||||||
import com.qiuguo.iot.base.enums.ChinesePartSpeechEnum;
|
import com.qiuguo.iot.base.enums.ChinesePartSpeechEnum;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package com.qiuguo.iot.base.nlp.action;
|
package com.qiuguo.iot.third.nlp.action;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
@ -1,8 +1,9 @@
|
|||||||
package com.qiuguo.iot.base.nlp.action;
|
package com.qiuguo.iot.third.nlp.action;
|
||||||
|
|
||||||
import com.qiuguo.iot.base.enums.ChinesePartSpeechEnum;
|
import com.qiuguo.iot.base.enums.ChinesePartSpeechEnum;
|
||||||
import com.qiuguo.iot.base.nlp.Nlp;
|
import com.qiuguo.iot.base.utils.StringUtils;
|
||||||
import com.qiuguo.iot.base.nlp.NlpKey;
|
import com.qiuguo.iot.third.nlp.Nlp;
|
||||||
|
import com.qiuguo.iot.third.nlp.NlpKey;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -18,6 +19,7 @@ public class Actions {
|
|||||||
actions = new ArrayList<>();
|
actions = new ArrayList<>();
|
||||||
nlp.getKeys().sort(Comparator.comparing(NlpKey::getType)); //解析,按照type从小到大排序
|
nlp.getKeys().sort(Comparator.comparing(NlpKey::getType)); //解析,按照type从小到大排序
|
||||||
Action action = new Action();
|
Action action = new Action();
|
||||||
|
String name = "";
|
||||||
action.setName(new ArrayList<>());
|
action.setName(new ArrayList<>());
|
||||||
for (NlpKey key : nlp.getKeys()
|
for (NlpKey key : nlp.getKeys()
|
||||||
) {
|
) {
|
||||||
@ -25,10 +27,18 @@ public class Actions {
|
|||||||
if(key.getType().equals(ChinesePartSpeechEnum.v.getCode())){
|
if(key.getType().equals(ChinesePartSpeechEnum.v.getCode())){
|
||||||
action.setAction(key.getKey());
|
action.setAction(key.getKey());
|
||||||
}else if(key.getType().equals(ChinesePartSpeechEnum.n.getCode())){
|
}else if(key.getType().equals(ChinesePartSpeechEnum.n.getCode())){
|
||||||
action.getName().add(key.getKey());
|
name += key.getKey();
|
||||||
}else if(key.getType().equals(ChinesePartSpeechEnum.a.getCode())){
|
//action.getName().add(key.getKey());
|
||||||
action.setStatus(key.getKey());
|
}else if(key.getType().equals(ChinesePartSpeechEnum.c.getCode())){
|
||||||
|
//action.setStatus(key.getKey());
|
||||||
|
if(StringUtils.isNotEmpty(name)){
|
||||||
|
action.getName().add(name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if(StringUtils.isNotEmpty(name)){
|
||||||
|
action.getName().add(name);
|
||||||
|
}
|
||||||
|
actions.add(action);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package com.qiuguo.iot.base.nlp.lac;
|
package com.qiuguo.iot.third.nlp.lac;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
@ -1,16 +1,17 @@
|
|||||||
package com.qiuguo.iot.base.nlp.lac;
|
package com.qiuguo.iot.third.service;
|
||||||
|
|
||||||
import cn.hutool.extra.spring.SpringUtil;
|
import cn.hutool.extra.spring.SpringUtil;
|
||||||
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONArray;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.qiuguo.iot.base.nlp.INlp;
|
import com.qiuguo.iot.third.nlp.INlp;
|
||||||
import com.qiuguo.iot.base.nlp.Nlp;
|
import com.qiuguo.iot.third.nlp.Nlp;
|
||||||
import com.qiuguo.iot.base.nlp.NlpKey;
|
import com.qiuguo.iot.third.nlp.NlpKey;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
|
import com.qiuguo.iot.third.nlp.lac.LacRequest;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.web.reactive.function.client.WebClient;
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
@ -28,14 +29,16 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
|
|||||||
* @since 2023-09-06
|
* @since 2023-09-06
|
||||||
*/
|
*/
|
||||||
//LAC中的tag说明 v:动词 n:名称 PER/nr:人名 a:量词 w:标点符号 m:数字(中文、阿拉伯) TIME:时间词 vn:动名词组 xc:语气词 c:连接词
|
//LAC中的tag说明 v:动词 n:名称 PER/nr:人名 a:量词 w:标点符号 m:数字(中文、阿拉伯) TIME:时间词 vn:动名词组 xc:语气词 c:连接词
|
||||||
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class LacNlp implements INlp {
|
public class LacNlpService implements INlp {
|
||||||
private static WebClient webClient = WebClient.builder()
|
private static WebClient webClient = WebClient.builder()
|
||||||
.defaultHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON_VALUE).build();
|
.defaultHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON_VALUE).build();
|
||||||
|
@Value("${lac.url}")
|
||||||
|
private String url;
|
||||||
|
|
||||||
private Mono<JSONObject> getNlpFromLac(LacRequest request){
|
private Mono<JSONObject> getNlpFromLac(LacRequest request){
|
||||||
return webClient.post().uri(SpringUtil.getProperty("lac.url") + "/predict/lac").bodyValue(JSONObject.toJSON(request))
|
return webClient.post().uri(url + "/predict/lac").bodyValue(JSONObject.toJSON(request))
|
||||||
.retrieve()
|
.retrieve()
|
||||||
.bodyToMono(JSONObject.class).doOnNext(res -> {
|
.bodyToMono(JSONObject.class).doOnNext(res -> {
|
||||||
if (!Objects.equals(res.getInteger("status"), 0)) {
|
if (!Objects.equals(res.getInteger("status"), 0)) {
|
||||||
@ -0,0 +1,22 @@
|
|||||||
|
package com.qiuguo.iot.third.service;
|
||||||
|
|
||||||
|
import com.qiuguo.iot.third.nlp.INlp;
|
||||||
|
import com.qiuguo.iot.third.nlp.action.Actions;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class NlpService {
|
||||||
|
@Resource
|
||||||
|
private LacNlpService liguoNlpService;
|
||||||
|
|
||||||
|
public Mono<Actions> getActionWithLacSingle(String text){
|
||||||
|
return liguoNlpService.geSingletNlp(text).map(nlp -> {
|
||||||
|
return new Actions(nlp);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -57,6 +57,12 @@
|
|||||||
<artifactId>spring-boot-starter-test</artifactId>
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.qiuguo.iot</groupId>
|
||||||
|
<artifactId>iot-third</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@ -5,7 +5,7 @@ import org.springframework.boot.SpringApplication;
|
|||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
|
|
||||||
@SpringBootApplication(scanBasePackages = {"com.qiuguo.iot.box.websocket.api", "com.qiuguo.iot.data.service"})
|
@SpringBootApplication(scanBasePackages = {"com.qiuguo.iot.box.websocket.api", "com.qiuguo.iot.data.service", "com.qiuguo.iot.third.service"})
|
||||||
@EnableEasyormRepository(value = "com.qiuguo.iot.data.entity.*")
|
@EnableEasyormRepository(value = "com.qiuguo.iot.data.entity.*")
|
||||||
public class IotBoxWebsocketApplication {
|
public class IotBoxWebsocketApplication {
|
||||||
|
|
||||||
|
|||||||
@ -8,9 +8,11 @@ import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession;
|
|||||||
import com.qiuguo.iot.box.websocket.api.domain.box.BoxTalkMessage;
|
import com.qiuguo.iot.box.websocket.api.domain.box.BoxTalkMessage;
|
||||||
import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration;
|
import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration;
|
||||||
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
|
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
|
||||||
|
import com.qiuguo.iot.data.entity.system.SystemTalkAnswerConfigEntity;
|
||||||
import com.qiuguo.iot.data.request.device.DeviceInfoRequest;
|
import com.qiuguo.iot.data.request.device.DeviceInfoRequest;
|
||||||
import com.qiuguo.iot.data.service.device.DeviceInfoService;
|
import com.qiuguo.iot.data.service.device.DeviceInfoService;
|
||||||
import com.qiuguo.iot.data.service.system.SystemTalkAnswerConfigService;
|
import com.qiuguo.iot.data.service.system.SystemTalkAnswerConfigService;
|
||||||
|
import com.qiuguo.iot.third.service.NlpService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.slf4j.MDC;
|
import org.slf4j.MDC;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
@ -42,6 +44,9 @@ public class BoxWebSocketHandler implements WebSocketHandler {
|
|||||||
@Resource
|
@Resource
|
||||||
private SystemTalkAnswerConfigService systemTalkAnswerConfigService;
|
private SystemTalkAnswerConfigService systemTalkAnswerConfigService;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private NlpService nlpService;
|
||||||
|
|
||||||
public static ConcurrentHashMap<String, BoxSession> group = new ConcurrentHashMap<>();
|
public static ConcurrentHashMap<String, BoxSession> group = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -57,9 +62,57 @@ public class BoxWebSocketHandler implements WebSocketHandler {
|
|||||||
return session.close();
|
return session.close();
|
||||||
}
|
}
|
||||||
String token = headers.get("signature").get(0);
|
String token = headers.get("signature").get(0);
|
||||||
|
//校验token
|
||||||
ReactiveValueOperations<String, String> operations = reactiveStringRedisTemplate.opsForValue();
|
checkToken(sn, linkTime, token);
|
||||||
//
|
//
|
||||||
|
|
||||||
|
log.info("登录成功SN:{}", sn);
|
||||||
|
Mono<Void> input = session.receive().map(webSocketMessage ->{
|
||||||
|
//MDC.put(LogMdcConfiguration.PRINT_LOG_ID, getBoxSessionWithSn().getLogId());
|
||||||
|
String text = webSocketMessage.getPayloadAsText();
|
||||||
|
log.info("设备端收到消息:{}", text);
|
||||||
|
BoxTalkMessage boxTalkMessage = JSONObject.parseObject(text, BoxTalkMessage.class);
|
||||||
|
nlpService.getActionWithLacSingle(boxTalkMessage.getMessage()).map(actions -> {
|
||||||
|
//处理
|
||||||
|
if(actions.getActions().size() > 0){
|
||||||
|
SystemTalkAnswerConfigEntity talkAnswerConfigEntity =
|
||||||
|
systemTalkAnswerConfigService.getSystemTalkWithKey(actions.getActions().get(0).getAction());
|
||||||
|
log.info("匹配到自定义指令{}", talkAnswerConfigEntity);
|
||||||
|
}else{
|
||||||
|
//调用千问回答
|
||||||
|
log.info("未匹配到自定义命令,调用千问");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Mono.empty();
|
||||||
|
}).thenEmpty(empty ->{
|
||||||
|
//调用千问回答
|
||||||
|
log.info("未匹配到自定义命令,调用千问");
|
||||||
|
}).subscribe();
|
||||||
|
|
||||||
|
log.info("收到SN:{},消息{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage());
|
||||||
|
//MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
|
||||||
|
return Mono.empty();
|
||||||
|
}).then();
|
||||||
|
BoxSession boxSession = new BoxSession();
|
||||||
|
boxSession.setSn(sn);
|
||||||
|
boxSession.setSession(session);
|
||||||
|
boxSession.setLogId(MDC.get(LogMdcConfiguration.PRINT_LOG_ID));
|
||||||
|
group.put(sn, boxSession);
|
||||||
|
Mono<Void> output = session.send(Flux.create(sink -> boxSession.setSink(sink))).then();
|
||||||
|
|
||||||
|
// Mono.zip() 会将多个 Mono 合并为一个新的 Mono,任何一个 Mono 产生 error 或 complete 都会导致合并后的 Mono
|
||||||
|
// 也随之产生 error 或 complete,此时其它的 Mono 则会被执行取消操作。
|
||||||
|
|
||||||
|
return Mono.zip(input, output).doFinally(signalType -> {
|
||||||
|
// MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||||
|
group.remove(boxSession.getSn());//断链后及时移除
|
||||||
|
log.info("设备断开连接SN:{}", boxSession.getSn());
|
||||||
|
// MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
|
||||||
|
}).then();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkToken(String sn, Long linkTime, String token){
|
||||||
|
ReactiveValueOperations<String, String> operations = reactiveStringRedisTemplate.opsForValue();
|
||||||
operations.get(RedisConstans.DEVICE_INFO + sn).defaultIfEmpty("").flatMap(s -> {
|
operations.get(RedisConstans.DEVICE_INFO + sn).defaultIfEmpty("").flatMap(s -> {
|
||||||
if(com.qiuguo.iot.base.utils.StringUtils.isNotBlank(s)){
|
if(com.qiuguo.iot.base.utils.StringUtils.isNotBlank(s)){
|
||||||
try{
|
try{
|
||||||
@ -88,41 +141,12 @@ public class BoxWebSocketHandler implements WebSocketHandler {
|
|||||||
//session.send(session.textMessage(""));
|
//session.send(session.textMessage(""));
|
||||||
BoxSession boxSession = getBoxSessionWithSn(sn);
|
BoxSession boxSession = getBoxSessionWithSn(sn);
|
||||||
if(boxSession != null){
|
if(boxSession != null){
|
||||||
boxSession.getSink().next(session.textMessage("验签失败"));
|
boxSession.getSink().next(boxSession.getSession().textMessage("验签失败"));
|
||||||
}
|
}
|
||||||
session.close().subscribe();
|
boxSession.getSession().close().subscribe();
|
||||||
}
|
}
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
}).subscribe();
|
}).subscribe();
|
||||||
log.info("登录成功SN:{}", sn);
|
|
||||||
Mono<Void> input = session.receive().map(webSocketMessage ->{
|
|
||||||
//MDC.put(LogMdcConfiguration.PRINT_LOG_ID, getBoxSessionWithSn().getLogId());
|
|
||||||
String text = webSocketMessage.getPayloadAsText();
|
|
||||||
log.info("设备端收到消息:{}", text);
|
|
||||||
BoxTalkMessage boxTalkMessage = JSONObject.parseObject(text, BoxTalkMessage.class);
|
|
||||||
log.info("收到SN:{},消息{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage());
|
|
||||||
//MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
|
|
||||||
return Mono.empty();
|
|
||||||
}).then();
|
|
||||||
BoxSession boxSession = new BoxSession();
|
|
||||||
boxSession.setSn(sn);
|
|
||||||
boxSession.setSession(session);
|
|
||||||
boxSession.setLogId(MDC.get(LogMdcConfiguration.PRINT_LOG_ID));
|
|
||||||
group.put(sn, boxSession);
|
|
||||||
Mono<Void> output = session.send(Flux.create(sink -> boxSession.setSink(sink))).then();
|
|
||||||
|
|
||||||
// Mono.zip() 会将多个 Mono 合并为一个新的 Mono,任何一个 Mono 产生 error 或 complete 都会导致合并后的 Mono
|
|
||||||
// 也随之产生 error 或 complete,此时其它的 Mono 则会被执行取消操作。
|
|
||||||
|
|
||||||
return Mono.zip(input, output).doFinally(signalType -> {
|
|
||||||
// MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
|
||||||
group.remove(boxSession.getSn());//断链后及时移除
|
|
||||||
log.info("设备断开连接SN:{}", boxSession.getSn());
|
|
||||||
// MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
|
|
||||||
}).then();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public BoxSession getBoxSessionWithSn(String sn) {
|
public BoxSession getBoxSessionWithSn(String sn) {
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import reactor.core.publisher.Flux;
|
|||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@ -40,6 +41,7 @@ public class CustomerWebSocketHandler implements WebSocketHandler {
|
|||||||
public Mono<Void> handle(WebSocketSession session) {
|
public Mono<Void> handle(WebSocketSession session) {
|
||||||
HandshakeInfo handshakeInfo = session.getHandshakeInfo();
|
HandshakeInfo handshakeInfo = session.getHandshakeInfo();
|
||||||
HttpHeaders headers = handshakeInfo.getHeaders();
|
HttpHeaders headers = handshakeInfo.getHeaders();
|
||||||
|
//List<String> tokens = headers.get("token");
|
||||||
String token = headers.get("token").get(0);
|
String token = headers.get("token").get(0);
|
||||||
Long userId = Long.valueOf(headers.get("userId").get(0));
|
Long userId = Long.valueOf(headers.get("userId").get(0));
|
||||||
Long linkTime = Long.parseLong(headers.get("time").get(0));
|
Long linkTime = Long.parseLong(headers.get("time").get(0));
|
||||||
@ -50,7 +52,7 @@ public class CustomerWebSocketHandler implements WebSocketHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ReactiveValueOperations<String, String> operations = reactiveStringRedisTemplate.opsForValue();
|
ReactiveValueOperations<String, String> operations = reactiveStringRedisTemplate.opsForValue();
|
||||||
operations.get(RedisConstans.DEVICE_INFO + userId).defaultIfEmpty("").flatMap(s -> {
|
operations.get(RedisConstans.DEVICE_INFO + userId).defaultIfEmpty("ba2ef9fd8a70a6ac72c38aa6a46be4f6").flatMap(s -> {
|
||||||
if(com.qiuguo.iot.base.utils.StringUtils.isNotBlank(s)){
|
if(com.qiuguo.iot.base.utils.StringUtils.isNotBlank(s)){
|
||||||
if(!token.equals(s)){
|
if(!token.equals(s)){
|
||||||
log.info("验签失败{}", userId);
|
log.info("验签失败{}", userId);
|
||||||
@ -87,7 +89,7 @@ public class CustomerWebSocketHandler implements WebSocketHandler {
|
|||||||
return Mono.zip(input, output).doFinally(signalType -> {
|
return Mono.zip(input, output).doFinally(signalType -> {
|
||||||
// MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
// MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||||
group.remove(userSession.getUserId());//断链后及时移除
|
group.remove(userSession.getUserId());//断链后及时移除
|
||||||
log.info("设备断开连接SN:{}", userSession.getUserId());
|
log.info("用户断开连接SN:{}", userSession.getUserId());
|
||||||
// MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
|
// MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
|
||||||
}).then();
|
}).then();
|
||||||
}
|
}
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user