完成阿里云千问对接,千问增加流式推送。websocket增加互踢。日志拦截增加异常

This commit is contained in:
wulin 2023-10-11 21:11:20 +08:00
parent 60059ab54f
commit 6ef3d637be
13 changed files with 955 additions and 278 deletions

View File

@ -1,115 +0,0 @@
package com.qiuguo.iot.third.entity;
import com.alibaba.dashscope.aigc.generation.Generation;
import com.alibaba.dashscope.aigc.generation.GenerationResult;
import com.alibaba.dashscope.aigc.generation.models.QwenParam;
import com.alibaba.dashscope.common.Message;
import com.alibaba.dashscope.common.MessageManager;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.common.Role;
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.JsonUtils;
import io.reactivex.Flowable;
import io.reactivex.functions.Consumer;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
@Data
@Slf4j
public class AliQianWenEntity {
/**
* 回话唯一标识
*/
String talkId;
/**
* 阿里千问配置参数
*/
QwenParam qwenParam;
/**
* 消息管理上下文默认最多10个
* 上下文传送过程中需要单独收费
*/
String qwKey;
MessageManager msgManager = new MessageManager(10);
Message userMsg;
GenerationResult lastGenerationResult = null;
public AliQianWenEntity(String qwKey){
this.qwKey = qwKey;
qwenParam = QwenParam.builder().model(Generation.Models.QWEN_PLUS)
.resultFormat(QwenParam.ResultFormat.MESSAGE)
.topP(0.8)
.apiKey(qwKey)
.enableSearch(true)
//.incrementalOutput(true) // get streaming output incrementally
.build();
/*Message systemMsg =
Message.builder().role(Role.SYSTEM.getValue()).content("你是智能助手机器人").build();
msgManager.add(systemMsg);*/
}
public void sendMessage(String msg, Consumer<? super GenerationResult> onNext) throws NoApiKeyException, InputRequiredException {
if(userMsg == null){
userMsg = Message
.builder()
.role(Role.USER.getValue())
.content(msg)
.build();
msgManager.add(userMsg);
}else{
//msgManager.add(lastGenerationResult);
qwenParam.setPrompt(msg);
}
Generation gen = new Generation();
qwenParam.setMessages(msgManager.get());
Semaphore semaphore = new Semaphore(0);
GenerationResult result = gen.call(qwenParam);
try {
onNext.accept(result);
} catch (Exception e) {
log.info("千问回调异常{}", e);
}
msgManager.add(result);
/*gen.streamCall(qwenParam, new ResultCallback<GenerationResult>() {
@Override
public void onEvent(GenerationResult message) {
try {
onNext.accept(message);
lastGenerationResult = message;
//msgManager.add(message);
} catch (Exception e) {
log.info("千问回调异常{}", e);
}
}
@Override
public void onComplete() {
semaphore.release();
}
@Override
public void onError(Exception e) {
log.info("调用千问异常{}", e);
semaphore.release();
}
});*/
}
}

View File

@ -0,0 +1,106 @@
package com.qiuguo.iot.third.nlp;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.dashscope.aigc.generation.Generation;
import com.alibaba.dashscope.aigc.generation.GenerationResult;
import com.alibaba.dashscope.aigc.generation.models.QwenParam;
import com.alibaba.dashscope.common.Message;
import com.alibaba.dashscope.common.MessageManager;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.common.Role;
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse;
import io.reactivex.functions.Consumer;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
@Data
@Slf4j
public class AliYunQianWen {
/**
* 回话唯一标识
*/
String talkId;
/**
* 阿里千问配置参数
*/
static final QwenParam qwenParam = QwenParam.builder().model(Generation.Models.QWEN_PLUS)
.resultFormat(QwenParam.ResultFormat.MESSAGE)
.topP(0.8)
.apiKey(SpringUtil.getProperty("Ali.qianwen"))
.enableSearch(true)
.incrementalOutput(true) // get streaming output incrementally
.build();;
/**
* 消息管理上下文默认最多10个
* 上下文传送过程中需要单独收费
*/
MessageManager msgManager = new MessageManager(10);
GenerationResult lastGenerationResult = null;
public void sendMessage(String msg,
Consumer<? super String> onNext,
QWenReplyResponse qwenReplyResponse) throws NoApiKeyException, InputRequiredException, InterruptedException {
Message userMsg = Message
.builder()
.role(Role.USER.getValue())
.content(msg)
.build();
msgManager.add(userMsg);
Generation gen = new Generation();
qwenParam.setMessages(msgManager.get());
Semaphore semaphore = new Semaphore(0);
gen.streamCall(qwenParam, new ResultCallback<GenerationResult>() {
@Override
public void onEvent(GenerationResult message) {
try {
onNext.accept(message.getOutput().getChoices().get(0).getMessage().getContent());
if(lastGenerationResult != null) {
lastGenerationResult.getOutput().getChoices().get(0).getMessage().setContent(
lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent()
+ message.getOutput().getChoices().get(0).getMessage().getContent()
);
}else{
lastGenerationResult = message;
}
} catch (Exception e) {
log.info("千问回调异常{}", e);
msgManager = new MessageManager(10);
}
}
@Override
public void onComplete() {
if(lastGenerationResult != null){
msgManager.add(lastGenerationResult);
qwenReplyResponse.setResut(lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent());
}
lastGenerationResult = null;
semaphore.release();
}
@Override
public void onError(Exception e) {
log.info("调用千问异常{}", e);
msgManager = new MessageManager(10);
semaphore.release();
}
});
semaphore.acquire();
}
}

View File

@ -1,9 +1,5 @@
package com.qiuguo.iot.third.service;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.dashscope.aigc.generation.GenerationResult;
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.qiuguo.iot.base.constans.RedisConstans;
@ -14,22 +10,19 @@ import com.qiuguo.iot.data.dto.queue.QueuePackagingDTO;
import com.qiuguo.iot.data.resp.qg.algorithm.QGResponse;
import com.qiuguo.iot.data.request.qwen.TongYiCommunicationRest;
import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse;
import com.qiuguo.iot.third.entity.AliQianWenEntity;
import com.qiuguo.iot.third.nlp.AliYunQianWen;
import io.reactivex.functions.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.ReactiveValueOperations;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@ -40,21 +33,21 @@ public class QWenService {
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;
protected static ConcurrentHashMap<String, AliQianWenEntity> qianwenGroup = new ConcurrentHashMap<>();
protected static ConcurrentHashMap<String, AliYunQianWen> qianwenGroup = new ConcurrentHashMap<>();
public Mono<QWenReplyResponse> communication(TongYiCommunicationRest rest, Consumer<? super GenerationResult> onNext){
AliQianWenEntity aliQianWen = null;
public Mono<QWenReplyResponse> communication(TongYiCommunicationRest rest, Consumer<? super String> onNext){
AliYunQianWen aliQianWen = null;
if (!qianwenGroup.containsKey(rest.getOnlyId())) {
aliQianWen = new AliQianWenEntity(SpringUtil.getProperty("Ali.qianwen"));
aliQianWen = new AliYunQianWen();
qianwenGroup.put(rest.getOnlyId(), aliQianWen);
} else {
aliQianWen = qianwenGroup.get(rest.getOnlyId());
}
final AliQianWenEntity aliQianWen1 = aliQianWen;
final AliYunQianWen aliQianWen1 = aliQianWen;
return Mono.just(new QWenReplyResponse()).map(qWenReplyResponse -> {
try {
aliQianWen1.sendMessage(rest.getText(), onNext);
aliQianWen1.sendMessage(rest.getText(), onNext, qWenReplyResponse);
qWenReplyResponse.setCode(200);
} catch (Exception e) {
log.info("调用千问异常{}", e);

View File

@ -8,6 +8,10 @@ import reactor.core.publisher.FluxSink;
@Data
public class BaseSession {
/**
* session类型0 box 1 用户
*/
protected Integer sessionType = 0;
/***
* 设备序列号
@ -38,5 +42,5 @@ public class BaseSession {
* 当前歌曲状态正常客户端同步
* 用户登录时同步
*/
MusicResp music;
protected MusicResp music;
}

View File

@ -36,7 +36,7 @@ import java.util.Arrays;
@Configuration
@Slf4j
public class LogWebFilter implements WebFilter {
String customerIp = "";
//String customerIp = "";
public static String HEAD_IP = "customerIP";
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
@ -45,7 +45,7 @@ public class LogWebFilter implements WebFilter {
String requestId = request.getId();
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
customerIp = request.getRemoteAddress().getAddress().getHostAddress();//.getHostName();
String customerIp = request.getRemoteAddress().getAddress().getHostAddress();//.getHostName();
String m = request.getMethod().toString();
log.info("api start time:{} ip:{} method:{} url:{} param:{} headers:{}",
@ -61,7 +61,7 @@ public class LogWebFilter implements WebFilter {
ex.response(getResponse(exchange, requestId));
// if(!request.getMethod().equals(HttpMethod.GET) && !request.getMethod().equals(HttpMethod.DELETE)){
ex.request(getRequest(exchange));
ex.request(getRequest(exchange, customerIp, requestId));
return chain.filter(ex.build())
.contextWrite(context -> {
@ -77,7 +77,7 @@ public class LogWebFilter implements WebFilter {
});
}
private ServerHttpRequest getRequest(ServerWebExchange exchange){
private ServerHttpRequest getRequest(ServerWebExchange exchange, String customerIp, String requestId){
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequest newRequest = new ServerHttpRequestDecorator(request){
@Override
@ -93,6 +93,7 @@ public class LogWebFilter implements WebFilter {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
httpHeaders.set(HEAD_IP, customerIp);
httpHeaders.set(LogMdcConfiguration.PRINT_LOG_ID, requestId);
return httpHeaders;
}
};

View File

@ -8,8 +8,6 @@ import com.qiuguo.iot.base.utils.StringUtils;
import com.qiuguo.iot.box.websocket.api.domain.BaseSession;
import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession;
import com.qiuguo.iot.box.websocket.api.domain.box.resp.BoxMessageResp;
import com.qiuguo.iot.box.websocket.api.domain.user.UserTalkMessage;
import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration;
import com.qiuguo.iot.data.entity.device.DeviceUserBindEntity;
import com.qiuguo.iot.data.entity.device.DeviceUserTalkRecordEntity;
import com.qiuguo.iot.data.entity.system.SystemTalkBindDeviceEntity;
@ -31,17 +29,12 @@ import com.qiuguo.iot.third.resp.SongInfoResponse;
import com.qiuguo.iot.third.service.*;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import javax.annotation.Resource;
import javax.swing.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@ -92,23 +85,15 @@ public class BaseWebSocketProcess {
}
qwenService.communication(tongYiCommunicationRest, message ->{
//通知到客户端
BoxMessageResp resp = new BoxMessageResp();
resp.setType(0);
resp.setText(message.getOutput().getChoices().get(0).getMessage().getContent());
String msg = JSONObject.toJSONString(resp);
log.info("推流信息到终端");
baseSession.getSink().next(baseSession.getSession().textMessage(msg));
//sendMessage(action, baseSession, resp);
}).map(data ->{
normalSendMsg(baseSession, message, AskTypeEnum.TTS.getCode());
}).map(data ->{
if(data.getCode() == 200){
log.info("千问正常结束");
log.info("千问正常结束{}", data.getResut());
//保存记录
}else{
BoxMessageResp resp = new BoxMessageResp();
resp.setType(0);
resp.setText("我还在努力学习中,暂时无法理解");
sendMessage(action, baseSession, resp);
sendMessage(action, baseSession, "我还在努力学习中,暂时无法理解", AskTypeEnum.TTS.getCode());
}
@ -150,16 +135,18 @@ public class BaseWebSocketProcess {
.map(binds ->{
if(binds.getTotal() == 0){
//返回告诉没有备
BoxMessageResp resp = new BoxMessageResp();
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
resp.setText("未找到" + action.getName() + "设备,无法操做!");
sendMessage(action, baseSession, resp);
sendMessage(action,
baseSession,
"未找到" + action.getName() + "设备,无法操做!",
action.getSystemTalkAnswerConfigEntity().getAnswerType());
}else if(binds.getTotal() > 1){
//返回告诉有多个设备请详细说明具体说明设备
BoxMessageResp resp = new BoxMessageResp();
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
resp.setText("您有多个" + action.getName() + "相同设备,请明确说明");
sendMessage(action, baseSession, resp);
sendMessage(action,
baseSession,
"您有多个" + action.getName() + "相同设备,请明确说明",
action.getSystemTalkAnswerConfigEntity().getAnswerType());
}else{
//查询是否有相关指令绑定
action.setDeviceUserBindEntity(binds.getData().get(0));
@ -174,10 +161,8 @@ public class BaseWebSocketProcess {
}
}else{
BoxMessageResp resp = new BoxMessageResp();
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
resp.setText("未找到对应的设备");
sendMessage(action, baseSession, resp);
sendMessage(action, baseSession, "未找到对应的设备", action.getSystemTalkAnswerConfigEntity().getAnswerType());
}
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.WEATHER.getCode())){
@ -225,10 +210,8 @@ public class BaseWebSocketProcess {
log.info("执行指令失败");
}
BoxMessageResp resp = new BoxMessageResp();
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
resp.setText(msg);
sendMessage(action, baseSession, resp);
sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType());
return t;
}).subscribe();
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.MUSIC.getCode())){
@ -269,8 +252,8 @@ public class BaseWebSocketProcess {
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
sendMessage(action, baseSession, resp);
}else{
resp.setType(AskTypeEnum.TTS.getCode());
resp.setText("目前无播放资源,无法操作");
resp.setType(AskTypeEnum.TTS.getCode());
sendMessage(action, baseSession, resp);
}
@ -291,10 +274,9 @@ public class BaseWebSocketProcess {
.map(systemTalkBindDeviceEntity -> {
if(systemTalkBindDeviceEntity.getId() == null){
//通知不支持的指令
BoxMessageResp resp = new BoxMessageResp();
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
resp.setText(deviceName + "不支持" + action.getAction() + "指令!");
sendMessage(action, baseSession, resp);
sendMessage(action, baseSession,
deviceName + "不支持" + action.getAction() + "指令!",
action.getSystemTalkAnswerConfigEntity().getAnswerType());
}else{
//调用涂鸦
@ -317,10 +299,8 @@ public class BaseWebSocketProcess {
msg = systemTalkBindDeviceEntity.getAnswerValueFaild().replaceAll("#name#", deviceName);
log.info("执行指令失败");
}
BoxMessageResp resp = new BoxMessageResp();
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
resp.setText(msg);
sendMessage(action, baseSession, resp);
sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType());
return isOk;
}).subscribe();
@ -329,8 +309,14 @@ public class BaseWebSocketProcess {
}).subscribe();
}
private void sendMessage(Action action, BaseSession baseSession, BoxMessageResp resp){
private void sendMessage(Action action, BaseSession baseSession, String message, Integer type){
BoxMessageResp resp = new BoxMessageResp();
resp.setType(type);
resp.setText(message);
sendMessage(action, baseSession, resp);
}
private void sendMessage(Action action, BaseSession baseSession, BoxMessageResp resp){
DeviceUserTalkRecordEntity talkRecord = new DeviceUserTalkRecordEntity();
talkRecord.setAskType(resp.getType());
talkRecord.setAskValue(action.getAsk());
@ -389,19 +375,48 @@ public class BaseWebSocketProcess {
}
deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).map(i ->{
String msg = JSONObject.toJSONString(resp);
log.info("推送通知到端msg:{}", msg);
if(this instanceof BoxWebSocketHandler){
log.info("果box聊天记录同步到客户端");
log.info("果box聊天记录同步到客户端");
BaseSession userSession = getUserSessionWithUserId(baseSession.getUserId());
if(userSession != null){
userSession.getSink().next(baseSession.getSession().textMessage(msg));
sendMsg(userSession, msg);
}
}
baseSession.getSink().next(baseSession.getSession().textMessage(msg));
sendMsg(baseSession, msg);
return Mono.empty();
}).subscribe();//保存聊天记录
}
/**
* 发完消息后关闭链接
* @param baseSession
* @param message
* @param type
*/
protected void closeSendMsg(BaseSession baseSession, String message, Integer type){
normalSendMsg(baseSession, message, type);
baseSession.getSession().close().subscribe();
}
/**
* 普通发送消息
* @param baseSession
* @param message
* @param type
*/
protected void normalSendMsg(BaseSession baseSession, String message, Integer type){
BoxMessageResp resp = new BoxMessageResp();
resp.setType(type);
resp.setText(message);
String msg = JSONObject.toJSONString(resp);
sendMsg(baseSession, msg);
}
private void sendMsg(BaseSession baseSession, String msg) {
log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg);
baseSession.getSink().next(baseSession.getSession().textMessage(msg));
}
public BoxSession getBoxSessionWithSn(String sn) {

View File

@ -9,7 +9,6 @@ import com.qiuguo.iot.base.model.UserDeviceInfoModel;
import com.qiuguo.iot.box.websocket.api.domain.BaseSession;
import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession;
import com.qiuguo.iot.box.websocket.api.domain.box.BoxTalkMessage;
import com.qiuguo.iot.box.websocket.api.domain.user.UserMessageResp;
import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration;
import com.qiuguo.iot.box.websocket.api.filter.LogWebFilter;
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
@ -20,7 +19,6 @@ import com.qiuguo.iot.data.service.device.DeviceInfoService;
import com.qiuguo.iot.third.nlp.action.Actions;
import com.qiuguo.iot.third.service.NlpService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.ReactiveValueOperations;
@ -52,8 +50,6 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
@Override
public Mono<Void> handle(WebSocketSession session) {
// 在生产环境中需对url中的参数进行检验如token不符合要求的连接的直接关闭
@ -70,9 +66,6 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
String signature = headers.get("signature").get(0);
Long userId = Long.parseLong(headers.get("userId").get(0));
//校验
checkToken(sn, linkTime, signature, userId);
//
log.info("登录成功SN:{}", sn);
@ -100,53 +93,54 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
//MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
return Mono.empty();
}).then();
BoxSession boxSession = new BoxSession();
boxSession.setSn(sn);
boxSession.setCustomerIP(ip);
boxSession.setSession(session);
boxSession.setUserId(userId);
boxSession.setLogId(MDC.get(LogMdcConfiguration.PRINT_LOG_ID));
boxGroup.put(sn, boxSession);
boxSession.setLogId(headers.get(LogMdcConfiguration.PRINT_LOG_ID).get(0));
//校验
checkToken(boxSession, sn, linkTime, signature, userId);
Mono<Void> output = session.send(Flux.create(sink -> boxSession.setSink(sink))).then();
// Mono.zip() 会将多个 Mono 合并为一个新的 Mono任何一个 Mono 产生 error complete 都会导致合并后的 Mono
// 也随之产生 error complete此时其它的 Mono 则会被执行取消操作
return Mono.zip(input, output).doFinally(signalType -> {
// MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
boxGroup.remove(boxSession.getSn());//断链后及时移除
log.info("设备断开连接SN{}", boxSession.getSn());
//通知用户端设备绑定成功
sendNoticeToUser(boxSession.getUserId(), "设备离线,设备序列号:" + boxSession.getSn(), AskTypeEnum.BOX_OFF_LINE.getCode());
deviceInfoService.setOnLineStatus(sn, YesNo.NO.getCode()).subscribe();
ReactiveValueOperations<String, String> operations = reactiveStringRedisTemplate.opsForValue();
UserDeviceInfoModel userDeviceInfoModel = new UserDeviceInfoModel();
userDeviceInfoModel.setStatus(YesNo.NO.getCode());
userDeviceInfoModel.setUserId(userId);
userDeviceInfoModel.setSn(sn);
operations.set(RedisConstans.USER_BOX_INFO + userId,
JSONObject.toJSONString(userDeviceInfoModel),
RedisConstans.TEN_YEAR).subscribe();
// MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
BoxSession boxSession1 = getBoxSessionWithSn(boxSession.getSn());
if(boxSession == boxSession1){
boxGroup.remove(boxSession.getSn());//断链后及时移除
log.info("设备断开连接SN{}", boxSession.getSn());
//通知用户端设备绑定成功
sendNoticeToUser(boxSession.getUserId(), "设备离线,设备序列号:" + boxSession.getSn(), AskTypeEnum.BOX_OFF_LINE.getCode());
deviceInfoService.setOnLineStatus(sn, YesNo.NO.getCode()).subscribe();
ReactiveValueOperations<String, String> operations = reactiveStringRedisTemplate.opsForValue();
UserDeviceInfoModel userDeviceInfoModel = new UserDeviceInfoModel();
userDeviceInfoModel.setStatus(YesNo.NO.getCode());
userDeviceInfoModel.setUserId(userId);
userDeviceInfoModel.setSn(sn);
operations.set(RedisConstans.USER_BOX_INFO + userId,
JSONObject.toJSONString(userDeviceInfoModel),
RedisConstans.TEN_YEAR).subscribe();
return;
}
log.info("被踢下线断开连接:{}", boxSession.getSn());
}).then();
}
private void sendNoticeToUser(Long userId, String text, Integer type){
BaseSession userSession = getUserSessionWithUserId(userId);
if(userSession != null){
//log.info("推送用户通知设备绑定成功");
UserMessageResp userMsgResp = new UserMessageResp();
userMsgResp.setType(type);
userMsgResp.setText(text);
String msg = JSONObject.toJSONString(userMsgResp);
log.info("推送通知到用户端msg:{}", msg);
userSession.getSink().next(userSession.getSession().textMessage(msg));
normalSendMsg(userSession, text, type);
}
}
private void checkToken(String sn, Long linkTime, String signature, Long userId){
private void checkToken(BoxSession boxSession, String sn, Long linkTime, String signature, Long userId){
ReactiveValueOperations<String, String> operations = reactiveStringRedisTemplate.opsForValue();
operations.get(RedisConstans.DEVICE_INFO + sn).defaultIfEmpty("").flatMap(s -> {
if(com.qiuguo.iot.base.utils.StringUtils.isNotBlank(s)){
@ -157,11 +151,7 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
log.info("转换异常清除redis。下次连接成功{}", e);
//清除异常redis
operations.set(RedisConstans.DEVICE_INFO + sn, "").subscribe();//不需要时间
BaseSession boxSession = getBoxSessionWithSn(sn);
if(boxSession != null){
boxSession.getSink().next(boxSession.getSession().textMessage("异常,请重新登录"));
boxSession.getSession().close().subscribe();
}
closeSendMsg(boxSession, "异常,请重新登录", AskTypeEnum.TTS.getCode());
}
}
DeviceInfoRequest request = new DeviceInfoRequest();
@ -178,27 +168,30 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
String wifiMd5 = MD5.create().digestHex(dv.getWifiMac()).toUpperCase();
String btMd5 = MD5.create().digestHex(dv.getBtMac()).toUpperCase();
String signalMd5 = MD5.create().digestHex(snMd5 + wifiMd5 + btMd5 + linkTime + dv.getKey()).toUpperCase();
BaseSession boxSession = getBoxSessionWithSn(sn);
if(!signalMd5.equals(signature)){
log.info("设备{},验签失败", sn);
log.info("设备{},验签失败。正常签:{}", sn, signalMd5);
//session.send(session.textMessage(""));
if(boxSession != null){
boxSession.getSink().next(boxSession.getSession().textMessage("验签失败"));
boxSession.getSession().close().subscribe();
closeSendMsg(boxSession, "验签失败", AskTypeEnum.TTS.getCode());
}
}else{
log.info("设备{},验签成功", sn);
if(boxSession != null){
boxSession.setDeviceId(dv.getId());
BoxSession oldBoxSession = getBoxSessionWithSn(sn);
if(oldBoxSession != null){
//
closeSendMsg(oldBoxSession, "您在其他地方登录", AskTypeEnum.TTS.getCode());
}
boxSession.setDeviceId(dv.getId());
boxGroup.put(sn, boxSession);
bindBox(dv, userId);
}
return Mono.empty();
}).subscribe();
}
private void bindBox(DeviceInfoEntity dv, Long userId){
log.info("开始绑定设备userId:{} SN{}", userId, dv);
@ -250,13 +243,7 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
//通知用户端设备绑定成功
BaseSession boxSession = getBoxSessionWithSn(dv.getSn());
if(boxSession != null){
UserMessageResp userMsgResp = new UserMessageResp();
userMsgResp.setType(AskTypeEnum.DEVICE_UNBIND.getCode());
userMsgResp.setText("设备已解绑无法继续使用");
String msg = JSONObject.toJSONString(userMsgResp);
log.info("推送通知到设备端msg:{}", msg);
boxSession.getSink().next(boxSession.getSession().textMessage(msg));
boxSession.getSession().close().subscribe();
closeSendMsg(boxSession, "设备已解绑无法继续使用", AskTypeEnum.DEVICE_UNBIND.getCode());
}
}

View File

@ -2,6 +2,8 @@ package com.qiuguo.iot.box.websocket.api.handler;
import com.alibaba.fastjson.JSONObject;
import com.qiuguo.iot.base.annotation.WebSocketMapping;
import com.qiuguo.iot.base.enums.AskTypeEnum;
import com.qiuguo.iot.base.enums.DeviceTypeEnum;
import com.qiuguo.iot.base.enums.YesNo;
import com.qiuguo.iot.base.utils.WebClientUtils;
import com.qiuguo.iot.box.websocket.api.domain.BaseSession;
@ -14,7 +16,6 @@ import com.qiuguo.iot.data.service.device.DeviceUserBindService;
import com.qiuguo.iot.third.nlp.action.Actions;
import com.qiuguo.iot.third.service.NlpService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
@ -62,43 +63,7 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
return session.close();
}
String ip = headers.get(LogWebFilter.HEAD_IP).get(0);
Map<String, String> reqHead = new HashMap<>();
reqHead.put(apiType, type);
reqHead.put(apiToken, token);
WebClientUtils.get(checkTokenUrl, reqHead).defaultIfEmpty(new JSONObject()).map(jsonObject -> {
log.info("验签获取的数据{}", jsonObject);
if(jsonObject.getInteger("code").equals(YesNo.YES.getCode())){
Long userId1 = jsonObject.getJSONObject("data").getLong("id");
if(userId1.equals(userId)){
log.info("验签成功{}", userId);
DeviceUserBindRequest request = new DeviceUserBindRequest();
request.setUserId(userId);
request.setIsMain(YesNo.YES.getCode());
deviceUserBindService.selectDeviceUserBindByRequest(request)
.defaultIfEmpty(new DeviceUserBindEntity())
.map(deviceUserBindEntity -> {
if(deviceUserBindEntity.getId() != null){
log.info("用户绑定信息为{}", deviceUserBindEntity);
BaseSession userSession = getUserSessionWithUserId(userId);
if(userSession != null){
userSession.setDeviceId(deviceUserBindEntity.getDeviceId());
userSession.setSn(deviceUserBindEntity.getOtherDeviceId());
}
}
return Mono.empty();
}).subscribe();
return Mono.empty();
}
}
log.info("验签失败{}", userId);
BaseSession userSession = getUserSessionWithUserId(userId);
if(userSession != null){
userSession.getSink().next(session.textMessage("非法登录"));
}
session.close().subscribe();
return Mono.empty();
}).subscribe();
log.info("用户成功userId:{}", userId);
Mono<Void> input = session.receive().map(webSocketMessage ->{
//MDC.put(LogMdcConfiguration.PRINT_LOG_ID, getBoxSessionWithSn().getLogId());
@ -127,22 +92,70 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
userSession.setUserId(userId);
userSession.setSession(session);
userSession.setCustomerIP(ip);
userSession.setLogId(MDC.get(LogMdcConfiguration.PRINT_LOG_ID));
userGroup.put(userId, userSession);
userSession.setSessionType(YesNo.YES.getCode());
userSession.setLogId(headers.get(LogMdcConfiguration.PRINT_LOG_ID).get(0));
checkToken(userSession, type, token, userId);
Mono<Void> output = session.send(Flux.create(sink -> userSession.setSink(sink))).then();
// Mono.zip() 会将多个 Mono 合并为一个新的 Mono任何一个 Mono 产生 error complete 都会导致合并后的 Mono
// 也随之产生 error complete此时其它的 Mono 则会被执行取消操作
return Mono.zip(input, output).doFinally(signalType -> {
// MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
userGroup.remove(userSession.getUserId());//断链后及时移除
log.info("用户断开连接userId{}", userSession.getUserId());
// MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
BaseSession userSession1 = getUserSessionWithUserId(userId);
if(userSession1 == userSession){
userGroup.remove(userSession.getUserId());//断链后及时移除
log.info("用户断开连接userId{}", userSession.getUserId());
}
}).then();
}
private void checkToken(BaseSession userSession, String type, String token, Long userId){
Map<String, String> reqHead = new HashMap<>();
reqHead.put(apiType, type);
reqHead.put(apiToken, token);
WebClientUtils.get(checkTokenUrl, reqHead).defaultIfEmpty(new JSONObject()).map(jsonObject -> {
log.info("验签获取的数据{}", jsonObject);
if(jsonObject.getInteger("code").equals(YesNo.YES.getCode())){
Long userId1 = jsonObject.getJSONObject("data").getLong("id");
if(userId1.equals(userId)){
log.info("验签成功{}", userId);
BaseSession oldUserSession = getUserSessionWithUserId(userId);
if(oldUserSession != null){
closeSendMsg(oldUserSession, "您在其他地方登录", AskTypeEnum.TTS.getCode());
}
userGroup.put(userId, userSession);
DeviceUserBindRequest request = new DeviceUserBindRequest();
request.setUserId(userId);
request.setDeviceType(DeviceTypeEnum.GUO_BOX.getCode());
request.setIsMain(YesNo.YES.getCode());
deviceUserBindService.selectDeviceUserBindByRequest(request)
.defaultIfEmpty(new DeviceUserBindEntity())
.map(deviceUserBindEntity -> {
if(deviceUserBindEntity.getId() != null){
log.info("用户绑定信息为{}", deviceUserBindEntity);
userSession.setDeviceId(deviceUserBindEntity.getDeviceId());
userSession.setSn(deviceUserBindEntity.getOtherDeviceId());
}else{
normalSendMsg(userSession, "您暂未绑定果宝儿Box快去绑定吧", AskTypeEnum.TTS.getCode());
}
return Mono.empty();
}).subscribe();
return Mono.empty();
}
}
log.info("验签失败{}", userId);
closeSendMsg(userSession, "非法登录", AskTypeEnum.TTS.getCode());
return Mono.empty();
}).subscribe();
}
}

View File

@ -0,0 +1,88 @@
package com.admin.service.impl;
import org.apache.commons.lang3.StringUtils;
import java.util.Date;
/**
* <p>
* 数据-区域Controller类
* </p>
*
* @author wulin
* @since 2023-10-11
*/
@RestController
@Slf4j
@RequestMapping("/SystemAddress")
public class SystemAddressController{
@Autowired
private SystemAddressService systemAddressService;
@PostMapping("/info")
public Mono<SystemAddressResp> selectSystemAddressByRequest(@RequestBody SystemAddressRequest request){
return systemAddressService.selectSystemAddressByRequest(request).map(d -> {return new SystemAddressResp(d);});
}
@PostMapping("/list")
public Mono<PagerResult<SystemAddressResp>> selectSystemAddresssByRequest(@RequestBody SystemAddressRequest request){
return systemAddressService.selectDeviceInfosByRequest(request).map(d -> {
PagerResult<SystemAddressResp> result = new PagerResult<>();
result.setPageIndex(d.getPageIndex());
result.setPageSize(d.getPageSize());
result.setTotal(d.getTotal());
List<SystemAddressResp> ds = d.getData().stream().map(new Function<SystemAddressEntity, SystemAddressResp>() {
@Override
public DeviceInfoResp apply(SystemAddressEntity entity) {
return new SystemAddressResp(entity);
}
}
).collect(Collectors.toList());
result.setData(ds);
return result;
});
}
@GetMapping("/id")
public Mono<SystemAddressResp> selectSystemAddressById(@RequestParam Long id){
return systemAddressService.selectSystemAddressById(id).map(d -> {return new SystemAddressResp(d);});
}
@PostMapping("/save")
public Mono<Integer> insertSystemAddress(@RequestBody SystemAddressEntity entity){
return systemAddressService.insertSystemAddress(entity);
}
@PostMapping("/update")
public Mono<Integer> updateSystemAddressById(@RequestBody SystemAddressEntity entity){
return systemAddressService.updateSystemAddressById(entity);
}
@PostMapping("/updateCover")
public Mono<Integer> updateCoverSystemAddressById(@RequestBody SystemAddressEntity entity){
return systemAddressService.updateCoverSystemAddressById(entity);
}
@PostMapping("/delete")
public Mono<Integer> deleteSystemAddressById(@RequestParam Long id){
return systemAddressService.deleteSystemAddressById(id);
}
}

View File

@ -0,0 +1,92 @@
package com.qiuguo.iot.data.entity;
import org.hswebframework.ezorm.rdb.mapping.annotation.Comment;
import org.hswebframework.web.crud.annotation.EnableEntityEvent;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import javax.persistence.Column;
import javax.persistence.Table;import lombok.Data;
import java.util.Date;
/**
* <p>
* </p>*数据-区域
* @author wulin
* @since 2023-10-11
*/
@Data
@Comment("数据-区域")
@Table(name = "system_address")
@EnableEntityEvent
public class SystemAddressEntity extends GenericEntity<Long> {
@Comment("ID")
@Column(name = "id", length = 11, nullable = false, unique = true)
private Long id;
@Comment("is_delete")
@Column(name = "is_delete")
private Integer isDelete;
@Comment("create_time")
@Column(name = "create_time")
private Date createTime;
@Comment("modify_time")
@Column(name = "modify_time")
private Date modifyTime;
@Comment("上级ID")
@Column(name = "parent_id", nullable = false)
private Long parentId;
@Comment("adcode")
@Column(name = "adcode")
private Integer adcode;
@Comment("上级 adcode")
@Column(name = "padcode", nullable = false)
private Integer padcode;
@Comment("首字母")
@Column(name = "first", length = 50)
private String first;
@Comment("区域简称")
@Column(name = "short_name", length = 100)
private String shortName;
@Comment("区域名称")
@Column(name = "name", length = 100)
private String name;
@Comment("adcode_link")
@Column(name = "adcode_link", length = 56)
private String adcodeLink;
@Comment("name_link")
@Column(name = "name_link", length = 255)
private String nameLink;
@Comment("区域层级")
@Column(name = "level")
private Integer level;
@Comment("区域拼音")
@Column(name = "pinyin", length = 100)
private String pinyin;
@Comment("区域邮编")
@Column(name = "postcode", length = 100)
private String postcode;
@Comment("使用状态")
@Column(name = "status", length = 1)
private Integer status;
@Comment("所在经度")
@Column(name = "lng", length = 100)
private String lng;
@Comment("所在纬度")
@Column(name = "lat", length = 100)
private String lat;
}

View File

@ -0,0 +1,111 @@
package com.qiuguo.iot.data.entity;
import lombok.Data;
import java.util.Date;
/**
* <p>
*数据-区域请求类
* @author wulin
* @since 2023-10-11
*/
@Data
public class SystemAddressRequest implements java.io.Serializable {
private int currPage = 1;
private int pageSize = 10;
private String sort;
private String order;
/**
*ID
*/
private Long id;
/**
*
*/
private Integer isDelete;
/**
*
*/
private Date createTime;
/**
*搜索开始
*/
private Date createTimeStart;
/**
*搜索结束
*/
private Date createTimeEnd;
/**
*
*/
private Date modifyTime;
/**
*搜索开始
*/
private Date modifyTimeStart;
/**
*搜索结束
*/
private Date modifyTimeEnd;
/**
*上级ID
*/
private Long parentId;
/**
*adcode
*/
private Integer adcode;
/**
*上级 adcode
*/
private Integer padcode;
/**
*首字母
*/
private String first;
/**
*区域简称
*/
private String shortName;
/**
*区域名称
*/
private String name;
/**
*
*/
private String adcodeLink;
/**
*
*/
private String nameLink;
/**
*区域层级
*/
private Integer level;
/**
*区域拼音
*/
private String pinyin;
/**
*区域邮编
*/
private String postcode;
/**
*使用状态
*/
private Integer status;
/**
*所在经度
*/
private String lng;
/**
*所在纬度
*/
private String lat;
}

View File

@ -0,0 +1,103 @@
package com.qiuguo.iot.data.entity;
import lombok.Data;
import java.util.Date;
/**
* <p>
* </p>*数据-区域返回类
* @author wulin
* @since 2023-10-11
*/
@Data
public class SystemAddressResp {
public SystemAddressResp(){
}
public SystemAddressResp(SystemAddressEntity entity){
id = entity.getId();
createTime = entity.getCreateTime();
modifyTime = entity.getModifyTime();
parentId = entity.getParentId();
adcode = entity.getAdcode();
padcode = entity.getPadcode();
first = entity.getFirst();
shortName = entity.getShortName();
name = entity.getName();
adcodeLink = entity.getAdcodeLink();
nameLink = entity.getNameLink();
level = entity.getLevel();
pinyin = entity.getPinyin();
postcode = entity.getPostcode();
status = entity.getStatus();
lng = entity.getLng();
lat = entity.getLat();
}
/**
*ID
*/
private Long id;
/**
*
*/
private Date createTime;
/**
*
*/
private Date modifyTime;
/**
*上级ID
*/
private Long parentId;
/**
*adcode
*/
private Integer adcode;
/**
*上级 adcode
*/
private Integer padcode;
/**
*首字母
*/
private String first;
/**
*区域简称
*/
private String shortName;
/**
*区域名称
*/
private String name;
/**
*
*/
private String adcodeLink;
/**
*
*/
private String nameLink;
/**
*区域层级
*/
private Integer level;
/**
*区域拼音
*/
private String pinyin;
/**
*区域邮编
*/
private String postcode;
/**
*使用状态
*/
private Integer status;
/**
*所在经度
*/
private String lng;
/**
*所在纬度
*/
private String lat;
}

View File

@ -0,0 +1,279 @@
package com.admin.service.impl;
import org.apache.commons.lang3.StringUtils;
import java.util.Date;
/**
* <p>
* 数据-区域服务类
* </p>
*
* @author wulin
* @since 2023-10-11
*/
@Service
@Slf4j
public class SystemAddressService extends GenericReactiveCrudService<SystemAddressEntity, Long> {
public Mono<SystemAddressEntity> selectSystemAddressByRequest(SystemAddressRequest request){
ReactiveQuery<SystemAddressEntity> reactiveQuery = createQuery();
reactiveQuery = reactiveQuery.and("is_delete", 0);
if(request.getId() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getId, request.getId());
}
if(request.getIsDelete() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getIsDelete, request.getIsDelete());
}
if(request.getCreateTime() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getCreateTime, request.getCreateTime());
}
if(request.getModifyTime() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getModifyTime, request.getModifyTime());
}
if(request.getParentId() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getParentId, request.getParentId());
}
if(request.getAdcode() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getAdcode, request.getAdcode());
}
if(request.getPadcode() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getPadcode, request.getPadcode());
}
if(StringUtils.isNotEmpty(request.getFirst())){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getFirst, request.getFirst());
}
if(StringUtils.isNotEmpty(request.getShortName())){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getShortName, request.getShortName());
}
if(StringUtils.isNotEmpty(request.getName())){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getName, request.getName());
}
if(StringUtils.isNotEmpty(request.getAdcodeLink())){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getAdcodeLink, request.getAdcodeLink());
}
if(StringUtils.isNotEmpty(request.getNameLink())){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getNameLink, request.getNameLink());
}
if(request.getLevel() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getLevel, request.getLevel());
}
if(StringUtils.isNotEmpty(request.getPinyin())){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getPinyin, request.getPinyin());
}
if(StringUtils.isNotEmpty(request.getPostcode())){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getPostcode, request.getPostcode());
}
if(request.getStatus() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getStatus, request.getStatus());
}
if(StringUtils.isNotEmpty(request.getLng())){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getLng, request.getLng());
}
if(StringUtils.isNotEmpty(request.getLat())){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getLat, request.getLat());
}
SortOrder sortOrder = null;
if(StringUtils.isNotEmpty(request.getOrder())){
if(StringUtils.isNotEmpty(request.getSort()) && request.getSort().compareTo("0") == 0){
sortOrder = SortOrder.desc(request.getOrder());
}else{
sortOrder = SortOrder.asc(request.getOrder());
}
reactiveQuery = reactiveQuery.orderBy(sortOrder);
}
return reactiveQuery.fetchOne();
}
public Mono<PagerResult<SystemAddressEntity>> selectSystemAddresssByRequest(SystemAddressRequest request){
ReactiveQuery<SystemAddressEntity> reactiveQuery = createQuery();
reactiveQuery = reactiveQuery.and("is_delete", 0);
if(request.getId() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getId, request.getId());
}
if(request.getIsDelete() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getIsDelete, request.getIsDelete());
}
if(request.getCreateTimeStart() != null){
reactiveQuery = reactiveQuery.gte(SystemAddressRequest::getCreateTime, request.getCreateTimeStart());
}
if(request.getCreateTimeEnd() != null){
reactiveQuery = reactiveQuery.lte(SystemAddressRequest::getCreateTime, request.getCreateTimeEnd());
}
if(request.getModifyTimeStart() != null){
reactiveQuery = reactiveQuery.gte(SystemAddressRequest::getModifyTime, request.getModifyTimeStart());
}
if(request.getModifyTimeEnd() != null){
reactiveQuery = reactiveQuery.lte(SystemAddressRequest::getModifyTime, request.getModifyTimeEnd());
}
if(request.getParentId() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getParentId, request.getParentId());
}
if(request.getAdcode() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getAdcode, request.getAdcode());
}
if(request.getPadcode() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getPadcode, request.getPadcode());
}
if(StringUtils.isNotEmpty(request.getFirst())){
reactiveQuery = reactiveQuery.$like$(SystemAddressRequest::getFirst, request.getFirst());
}
if(StringUtils.isNotEmpty(request.getShortName())){
reactiveQuery = reactiveQuery.$like$(SystemAddressRequest::getShortName, request.getShortName());
}
if(StringUtils.isNotEmpty(request.getName())){
reactiveQuery = reactiveQuery.$like$(SystemAddressRequest::getName, request.getName());
}
if(StringUtils.isNotEmpty(request.getAdcodeLink())){
reactiveQuery = reactiveQuery.$like$(SystemAddressRequest::getAdcodeLink, request.getAdcodeLink());
}
if(StringUtils.isNotEmpty(request.getNameLink())){
reactiveQuery = reactiveQuery.$like$(SystemAddressRequest::getNameLink, request.getNameLink());
}
if(request.getLevel() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getLevel, request.getLevel());
}
if(StringUtils.isNotEmpty(request.getPinyin())){
reactiveQuery = reactiveQuery.$like$(SystemAddressRequest::getPinyin, request.getPinyin());
}
if(StringUtils.isNotEmpty(request.getPostcode())){
reactiveQuery = reactiveQuery.$like$(SystemAddressRequest::getPostcode, request.getPostcode());
}
if(request.getStatus() != null){
reactiveQuery = reactiveQuery.and(SystemAddressRequest::getStatus, request.getStatus());
}
if(StringUtils.isNotEmpty(request.getLng())){
reactiveQuery = reactiveQuery.$like$(SystemAddressRequest::getLng, request.getLng());
}
if(StringUtils.isNotEmpty(request.getLat())){
reactiveQuery = reactiveQuery.$like$(SystemAddressRequest::getLat, request.getLat());
}
QueryParamEntity param = QueryParamEntity.of(reactiveQuery.getParam());
if(StringUtils.isNotEmpty(request.getOrder())){
Sort sort = new Sort();
sort.setName(request.getOrder());
if(StringUtils.isNotEmpty(request.getSort()) && request.getSort().compareTo("0") == 0){
sort.desc();
}else{
sort.asc();
}
param.setSorts(Arrays.asList(sort));
}
param.setPageIndex(request.getCurrPage());
param.setPageSize(request.getPageSize());
param.setPaging(true);
param.setFirstPageIndex(1);
return queryPager(param);
}
public Mono<SystemAddressEntity> selectSystemAddressById(Long id){
return createQuery()
.and("is_delete", 0)
.and("id", id)
.fetchOne();
}
public Mono<Integer> insertSystemAddress(SystemAddressEntity entity){
entity.setId(null);
entity.setCreateTime(null);
entity.setModifyTime(null);
return insert(entity);
}
public Mono<Integer> updateSystemAddressById(SystemAddressEntity entity){
ReactiveUpdate<SystemAddressEntity> update = createUpdate()
.set(SystemAddressEntity::getModifyTime, new Date());
if(entity.getIsDelete() != null){
update = update.set(SystemAddressEntity::getIsDelete, entity.getIsDelete());
}
if(entity.getParentId() != null){
update = update.set(SystemAddressEntity::getParentId, entity.getParentId());
}
if(entity.getAdcode() != null){
update = update.set(SystemAddressEntity::getAdcode, entity.getAdcode());
}
if(entity.getPadcode() != null){
update = update.set(SystemAddressEntity::getPadcode, entity.getPadcode());
}
if(StringUtils.isNotEmpty(entity.getFirst())){
update = update.set(SystemAddressEntity::getFirst, entity.getFirst());
}
if(StringUtils.isNotEmpty(entity.getShortName())){
update = update.set(SystemAddressEntity::getShortName, entity.getShortName());
}
if(StringUtils.isNotEmpty(entity.getName())){
update = update.set(SystemAddressEntity::getName, entity.getName());
}
if(StringUtils.isNotEmpty(entity.getAdcodeLink())){
update = update.set(SystemAddressEntity::getAdcodeLink, entity.getAdcodeLink());
}
if(StringUtils.isNotEmpty(entity.getNameLink())){
update = update.set(SystemAddressEntity::getNameLink, entity.getNameLink());
}
if(entity.getLevel() != null){
update = update.set(SystemAddressEntity::getLevel, entity.getLevel());
}
if(StringUtils.isNotEmpty(entity.getPinyin())){
update = update.set(SystemAddressEntity::getPinyin, entity.getPinyin());
}
if(StringUtils.isNotEmpty(entity.getPostcode())){
update = update.set(SystemAddressEntity::getPostcode, entity.getPostcode());
}
if(entity.getStatus() != null){
update = update.set(SystemAddressEntity::getStatus, entity.getStatus());
}
if(StringUtils.isNotEmpty(entity.getLng())){
update = update.set(SystemAddressEntity::getLng, entity.getLng());
}
if(StringUtils.isNotEmpty(entity.getLat())){
update = update.set(SystemAddressEntity::getLat, entity.getLat());
}
return update.where(SystemAddressEntity::getId, entity.getId()).and("is_delete", 0).execute();
}
public Mono<Integer> updateCoverSystemAddressById(SystemAddressEntity entity){
ReactiveUpdate<SystemAddressEntity> update = createUpdate()
.set(SystemAddressEntity::getModifyTime, new Date());
update = update.set(SystemAddressEntity::getIsDelete, entity.getIsDelete());
update = update.set(SystemAddressEntity::getParentId, entity.getParentId());
update = update.set(SystemAddressEntity::getAdcode, entity.getAdcode());
update = update.set(SystemAddressEntity::getPadcode, entity.getPadcode());
update = update.set(SystemAddressEntity::getFirst, entity.getFirst());
update = update.set(SystemAddressEntity::getShortName, entity.getShortName());
update = update.set(SystemAddressEntity::getName, entity.getName());
update = update.set(SystemAddressEntity::getAdcodeLink, entity.getAdcodeLink());
update = update.set(SystemAddressEntity::getNameLink, entity.getNameLink());
update = update.set(SystemAddressEntity::getLevel, entity.getLevel());
update = update.set(SystemAddressEntity::getPinyin, entity.getPinyin());
update = update.set(SystemAddressEntity::getPostcode, entity.getPostcode());
update = update.set(SystemAddressEntity::getStatus, entity.getStatus());
update = update.set(SystemAddressEntity::getLng, entity.getLng());
update = update.set(SystemAddressEntity::getLat, entity.getLat());
return update.where(SystemAddressEntity::getId, entity.getId()).and("is_delete", 0).execute();
}
public Mono<Integer> deleteSystemAddressById(Long id){
return createUpdate()
.set("is_delete", 1)
.set("modify_time", new Date())
.where("id", id)
.execute();
}
}