Merge remote-tracking branch 'origin/feature-BOX一期' into feature-BOX一期

This commit is contained in:
zhangqy 2023-09-20 17:16:48 +08:00
commit ce8865e343
12 changed files with 1397 additions and 2711 deletions

View File

@ -5,22 +5,25 @@ import com.qiuguo.iot.base.enums.DeviceTypeEnum;
import com.qiuguo.iot.base.utils.StringUtils;
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
import com.qiuguo.iot.data.request.device.DeviceInfoRequest;
import com.qiuguo.iot.data.resp.device.DeviceInfoResp;
import com.qiuguo.iot.data.service.device.DeviceBatchService;
import com.qiuguo.iot.data.service.device.DeviceInfoService;
import com.qiuguo.iot.user.api.resp.device.DeviceInitResp;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.mapping.ReactiveQuery;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.exception.BusinessException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
@RestController
@Slf4j
@ -30,7 +33,7 @@ public class DeviceController {
private String key;
@Value("${device.checkTimeout}")
private Boolean checkTimeout;
@Resource
@Autowired
private DeviceInfoService deviceInfoService;
@Resource
@ -43,6 +46,7 @@ public class DeviceController {
public Mono<DeviceInitResp> deviceInit(@RequestParam String wifiMac, @RequestParam String btMac,
@RequestParam Integer type, @RequestParam Long time,
@RequestParam String signature){
//deviceInfoService.
Long now = System.currentTimeMillis();
if(checkTimeout && now - time > timeOut){
//超时
@ -104,13 +108,4 @@ public class DeviceController {
}
@GetMapping("/list")
public Mono<PagerResult<DeviceInfoEntity>> getList(@RequestParam Integer pageIndex, @RequestParam Integer pageSize, @RequestParam String name){
DeviceInfoRequest request = new DeviceInfoRequest();
request.setCurrPage(pageIndex);
request.setPageSize(pageSize);
request.setName(name);
return deviceInfoService.selectDeviceInfosByRequest(request);
}
}

View File

@ -0,0 +1,42 @@
package com.qiuguo.iot.box.websocket.api.controller;
import com.qiuguo.iot.box.websocket.api.domain.BoxSession;
import com.qiuguo.iot.box.websocket.api.domain.UserSession;
import com.qiuguo.iot.box.websocket.api.handler.BoxWebSocketHandler;
import com.qiuguo.iot.box.websocket.api.handler.CustomerWebSocketHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@Slf4j
@RequestMapping("/websocket/push")
public class WebsocketPushMessageController {
@Autowired
BoxWebSocketHandler boxWebSocketHandler;
@Autowired
CustomerWebSocketHandler customerWebSocketHandler;
@GetMapping("/message")
public Mono<String> pushMessage(@RequestParam String message, @RequestParam String id, @RequestParam Integer type) {
if(type == 0){
//设备推送
BoxSession boxSession = boxWebSocketHandler.getBoxSessionWithSn(id);
if(boxSession != null){
//boxSession.getSession().send(message)
}
return Mono.just("设备未上线");
}else{
//用户推送
UserSession userSession = customerWebSocketHandler.getUserSessionWithSn(id);
if(userSession != null){
}
return Mono.just("用户未上线");
}
//return Mono.just("推送完成");
}
}

View File

@ -0,0 +1,13 @@
package com.qiuguo.iot.box.websocket.api.domain;
import lombok.Data;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.FluxSink;
@Data
public class BoxSession {
String sn;
WebSocketSession session;
FluxSink<WebSocketMessage> sink;
}

View File

@ -0,0 +1,13 @@
package com.qiuguo.iot.box.websocket.api.domain;
import lombok.Data;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.FluxSink;
@Data
public class UserSession {
Long userId;
WebSocketSession session;
FluxSink<WebSocketMessage> sink;
}

View File

@ -7,6 +7,7 @@ import com.chen.tools.util.StringUtils;
import com.google.protobuf.InvalidProtocolBufferException;
import com.qiuguo.iot.base.annotation.WebSocketMapping;
import com.qiuguo.iot.base.constans.RedisConstans;
import com.qiuguo.iot.box.websocket.api.domain.BoxSession;
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
import com.qiuguo.iot.data.request.device.DeviceInfoRequest;
import com.qiuguo.iot.data.service.device.DeviceInfoService;
@ -25,6 +26,8 @@ import org.springframework.web.reactive.socket.*;
import reactor.core.publisher.*;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
@Component
@WebSocketMapping("/box")
@ -41,6 +44,8 @@ public class BoxWebSocketHandler implements WebSocketHandler {
@Resource
private DeviceInfoService deviceInfoService;
public static ConcurrentHashMap<String, BoxSession> group = new ConcurrentHashMap<>();
@Override
public Mono<Void> handle(WebSocketSession session) {
// 在生产环境中需对url中的参数进行检验如token不符合要求的连接的直接关闭
@ -56,82 +61,60 @@ public class BoxWebSocketHandler implements WebSocketHandler {
String token = headers.get("signature").get(0);
ReactiveValueOperations<String, String> operations = reactiveRedisTemplate.opsForValue();
// Mono<DeviceInfoEntity> deviceInfoEntityMono = operations.get(RedisConstans.DEVICE_INFO + sn);
return operations.get(RedisConstans.DEVICE_INFO + sn).defaultIfEmpty("").flatMap(d -> {
if(StringUtils.isNotBlank(d)){
DeviceInfoEntity dv = JSONObject.parseObject(d, DeviceInfoEntity.class);
return Mono.just(dv);
}else{
DeviceInfoRequest request = new DeviceInfoRequest();
request.setSn(sn);
Mono<DeviceInfoEntity> mono = deviceInfoService.selectDeviceInfoByRequest(request);
return mono.map(dv -> {
return operations.set(RedisConstans.DEVICE_INFO + dv.getSn(), JSONObject.toJSONString(dv)).map(b -> {
return Mono.just(dv);
});
});
}
return operations.get(RedisConstans.DEVICE_INFO + sn)
.defaultIfEmpty("")
.flatMap(d -> {
}).map(o -> {
DeviceInfoEntity dv = (DeviceInfoEntity)o;
String snMd5 = MD5.create().digestHex(sn).toUpperCase();
String wifiMd5 = MD5.create().digestHex(dv.getWifiMac()).toUpperCase();
String btMd5 = MD5.create().digestHex(dv.getBtMac()).toUpperCase();
String signalMd5 = MD5.create().digestHex(snMd5 + wifiMd5 + btMd5 + linkTime + dv.getKey()).toUpperCase();
if(!signalMd5.equals(token)){
log.info("设备{},验签失败", sn);
session.close();
return null;
}
return null;
});
if(StringUtils.isNotBlank(d)){
try{
DeviceInfoEntity dv = JSONObject.parseObject(d, DeviceInfoEntity.class);
return Mono.just(dv);
}catch (Exception e){
log.info("转换异常{}", e);
}
}
DeviceInfoRequest request = new DeviceInfoRequest();
request.setSn(sn);
return deviceInfoService.selectDeviceInfoByRequest(request).map(dv -> {
return operations.set(RedisConstans.DEVICE_INFO + dv.getSn(), JSONObject.toJSONString(dv)).map(b -> {
return dv;
});
}).block();
}) .map(o -> {
String snMd5 = MD5.create().digestHex(sn).toUpperCase();
String wifiMd5 = MD5.create().digestHex(o.getWifiMac()).toUpperCase();
String btMd5 = MD5.create().digestHex(o.getBtMac()).toUpperCase();
String signalMd5 = MD5.create().digestHex(snMd5 + wifiMd5 + btMd5 + linkTime + o.getKey()).toUpperCase();
if(!signalMd5.equals(token)){
log.info("设备{},验签失败", sn);
session.close();
return null;
}
log.info("登录成功SN:{}", sn);
Mono<Void> input = session.receive().map(webSocketMessage ->{
log.info("收到消息{}", webSocketMessage);
/*if (handshakeInfo.getUri().getQuery() == null) {
return session.close(CloseStatus.REQUIRED_EXTENSION);
} MultiMap<String> values = new MultiMap<String>();
UrlEncoded.decodeTo(handshakeInfo.getUri().getQuery(), values, "UTF-8");
Long userId = Long.valueOf(values.getString("userId"));
log.info("id:{}, userId:{}", session.getId(), userId);
Mono<Void> input = session.receive().map(webSocketMessage ->{
try {
//log.info("concatMap");
MsgProtocol.Msg reqMsg = getMsg(webSocketMessage.getPayload());
MsgProtocol.User user1 = MsgProtocol.User.parseFrom(reqMsg.getData());
Long id = user1.getUserId();
UserInfo userInfo1 = group.get(id);
userInfo1.setUser(user1);
if(reqMsg.getCmdId().getNumber() == MsgProtocol.Msg.CmdId.SEND_STATUS_REQ_VALUE){
MsgProtocol.Msg reqMsg1 = reqMsg.toBuilder().setCmdId(MsgProtocol.Msg.CmdId.SYNC_STATUS_RSP).build();
batchRunner.add(reqMsg1);
//batchRunner.doFinal();
//countDownLatch.await();
return Mono.empty();//session.textMessage("");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return Mono.empty();
}).then();
UserInfo finalUserInfo = userInfo;
Mono<Void> output = session.send(Flux.create(sink -> finalUserInfo.setSink(sink))).then();
return Mono.empty();
}).then();
BoxSession boxSession = new BoxSession();
boxSession.setSn(sn);
group.put(sn, boxSession);
Mono<Void> output = session.send(Flux.create(sink -> boxSession.setSink(sink))).then();
// Mono.zip() 会将多个 Mono 合并为一个新的 Mono任何一个 Mono 产生 error complete 都会导致合并后的 Mono
// 也随之产生 error complete此时其它的 Mono 则会被执行取消操作
//
return Mono.zip(input, output).doFinally(signalType -> {
group.remove(finalUserInfo.getUser().getUserId());//断链后及时移除
log.info("断开连接{}", finalUserInfo.getUser().getUserId());
}).then();*/
return Mono.zip(input, output).doFinally(signalType -> {
group.remove(boxSession.getSn());//断链后及时移除
log.info("断开连接{}", boxSession.getSn());
}).then().block();
});
}
public BoxSession getBoxSessionWithSn(String sn) {
return group.get(sn);
}
}

View File

@ -1,27 +1,90 @@
package com.qiuguo.iot.box.websocket.api.handler;
import cn.hutool.crypto.digest.MD5;
import cn.hutool.system.UserInfo;
import com.alibaba.fastjson.JSONObject;
import com.chen.tools.util.StringUtils;
import com.qiuguo.iot.base.annotation.WebSocketMapping;
import com.qiuguo.iot.base.constans.RedisConstans;
import com.qiuguo.iot.box.websocket.api.domain.BoxSession;
import com.qiuguo.iot.box.websocket.api.domain.UserSession;
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
import com.qiuguo.iot.data.request.device.DeviceInfoRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveValueOperations;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.concurrent.ConcurrentHashMap;
@Component
@WebSocketMapping("/customer")
@Slf4j
public class CustomerWebSocketHandler implements WebSocketHandler {
@Value("${device.checkTimeout}")
private Boolean checkTimeout;
@Value("${device.timeout}")
private Long timeOut;//2分钟
public static ConcurrentHashMap<Long, UserSession> group = new ConcurrentHashMap<>();
@Resource
private ReactiveRedisTemplate reactiveRedisTemplate;
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()
.concatMap(mapper -> {
String msg = mapper.getPayloadAsText();
System.out.println("mapper: " + msg);
return Flux.just(msg);
}).map(value -> {
System.out.println("value: " + value);
return session.textMessage("Echo " + value);
HandshakeInfo handshakeInfo = session.getHandshakeInfo();
HttpHeaders headers = handshakeInfo.getHeaders();
String token = headers.get("token").get(0);
Long userId = Long.valueOf(headers.get("userId").get(0));
Long linkTime = Long.parseLong(headers.get("time").get(0));
if(checkTimeout && System.currentTimeMillis() - linkTime > timeOut){
//关闭连接
log.info("用户{},请求数据已超时", userId);
return session.close();
}
ReactiveValueOperations<String, String> operations = reactiveRedisTemplate.opsForValue();
return operations.get(RedisConstans.DEVICE_INFO + userId)
.defaultIfEmpty("")
.flatMap(d -> {
if(StringUtils.isNotBlank(d)){
log.info("用户非法登录token无效 userId:{}", userId);
return null;
}
log.info("用户登录成功userId:{}", userId);
Mono<Void> input = session.receive().map(webSocketMessage ->{
log.info("收到用户消息{}", webSocketMessage);
return Mono.empty();
}).then();
UserSession userSession = new UserSession();
userSession.setUserId(userId);
group.put(userId, userSession);
Mono<Void> output = session.send(Flux.create(sink -> userSession.setSink(sink))).then();
// Mono.zip() 会将多个 Mono 合并为一个新的 Mono任何一个 Mono 产生 error complete 都会导致合并后的 Mono
// 也随之产生 error complete此时其它的 Mono 则会被执行取消操作
return Mono.zip(input, output).doFinally(signalType -> {
group.remove(userSession.getUserId());//断链后及时移除
log.info("用户断开连接{}", userSession.getUserId());
}).then();
});
return session.send(output);
}
public UserSession getUserSessionWithSn(String userId) {
return group.get(userId);
}
}

View File

@ -82,7 +82,7 @@ public class MysqlMain {
}
List<TablesBean> list = new ArrayList<>();
list.add(new TablesBean("user_home"));
list.add(new TablesBean("device_info"));
list.add(new TablesBean("user_room"));
List<TablesBean> list2 = new ArrayList<TablesBean>();
@ -103,8 +103,9 @@ public class MysqlMain {
}
/*for (int i = 0; i < list2.size(); i++) {
MysqlUtilTable2Contoller.printController(list2.get(i));
for (int i = 0; i < list2.size(); i++) {
//MysqlUtilTable2Contoller.printController(list2.get(i));
MysqlUtilTable2Contoller.printReactorController(list2.get(i));
}

View File

@ -3,6 +3,10 @@ package com.qiuguo.iot.customer.http.api.mysql;
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
import com.qiuguo.iot.data.resp.device.DeviceInfoResp;
import org.hswebframework.web.api.crud.entity.PagerResult;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
@ -12,6 +16,7 @@ import java.util.Date;
import java.util.List;
public class MysqlUtilTable2Contoller {
public static String TAB = " ";
/**
* 打印entity的信息
*/
@ -19,6 +24,7 @@ public class MysqlUtilTable2Contoller {
boolean hasDate = false;
List<FieldBean> list = tableBean.getFieldList();
StringBuffer bf = new StringBuffer();
@ -112,4 +118,125 @@ public class MysqlUtilTable2Contoller {
} finally {
}
}
public static void printReactorController(TablesBean tableBean) {
String realName = MysqlMain.pre + tableBean.getSpaceName();
String fileName = MysqlMain.save_path + "/" + realName + "Controller.java";
List<FieldBean> list = tableBean.getFieldList();
try {
String content = "package com.admin.service.impl;\n\n\n\n" +
"import org.apache.commons.lang3.StringUtils;\n" +
"import java.util.Date;\n";
content += "/**\n";
content += "* <p>\n";
content += "* " + tableBean.getComment().replace("", "") + "Controller类\n";
content += "* </p>\n";
content += "*\n";
content += "* @author wulin\n";
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
content += "* @since " + format.format(new Date()) + "\n";
content += "*/\n";
content += "\n";
content += "@RestController\n@Slf4j\n@RequestMapping(\"/" + realName + "\")\n";
content += "public class " + tableBean.getSpaceName() + "Controller{\n";
content += "\n";
content += "\n";
content += TAB + "@Autowired\n";
content += TAB + "private " + realName + "Service " + MysqlUtil.getFirstToLower(realName) + "Service;\n";
content += TAB + "@PostMapping(\"/info\")\n";
content += TAB + "public Mono<" + realName + "Resp> select" + realName + "ByRequest(@RequestBody " + realName + "Request request){\n";
content += TAB + TAB + "return " + MysqlUtil.getFirstToLower(realName) + "Service.select"
+ realName + "ByRequest(request).map(d -> {return new " + realName + "Resp(d);});\n";
content += TAB + "}\n";
content += "\n";
content += "\n";
content += "\n";
content += TAB + "@PostMapping(\"/list\")\n";
content += TAB + "public Mono<PagerResult<" + realName + "Resp>> select" + realName + "sByRequest(@RequestBody " + realName + "Request request){\n";
content += TAB + TAB + "return " + MysqlUtil.getFirstToLower(realName) + "Service.selectDeviceInfosByRequest(request).map(d -> {\n" +
" PagerResult<" + realName + "Resp> result = new PagerResult<>();\n" +
" result.setPageIndex(d.getPageIndex());\n" +
" result.setPageSize(d.getPageSize());\n" +
" result.setTotal(d.getTotal());\n" +
" List<" + realName + "Resp> ds = d.getData().stream().map(new Function<" + realName + "Entity, " + realName + "Resp>() {\n" +
" @Override\n" +
" public DeviceInfoResp apply(" + realName + "Entity entity) {\n" +
" return new " + realName + "Resp(entity);\n" +
" }\n" +
" }\n" +
"\n" +
" ).collect(Collectors.toList());\n" +
" result.setData(ds);\n" +
" return result;\n" +
" });\n";
content += TAB + "}\n";
content += "\n";
content += "\n";
content += "\n";
content += TAB + "@GetMapping(\"/id\")\n";
content += TAB + "public Mono<" + realName + "Resp> select" + realName + "ById(@RequestParam Long id){\n";
content += TAB + TAB + "return " + MysqlUtil.getFirstToLower(realName) + "Service.select"
+ realName + "ById(id).map(d -> {return new " + realName + "Resp(d);});\n";
content += TAB + "}\n";
content += "\n";
content += "\n";
content += "\n";
content += TAB + "@PostMapping(\"/save\")\n";
content += TAB + "public Mono<Integer> insert" + realName + "(@RequestBody " + realName + "Entity entity){\n";
content += TAB + TAB + "return " + MysqlUtil.getFirstToLower(realName) + "Service.insert"
+ realName + "(entity);\n";
content += TAB + "}\n";
content += "\n";
content += "\n";
content += "\n";
content += TAB + "@PostMapping(\"/update\")\n";
content += TAB + "public Mono<Integer> update" + realName + "ById(@RequestBody " + realName + "Entity entity){\n";
content += TAB + TAB + "return " + MysqlUtil.getFirstToLower(realName) + "Service.update"
+ realName + "ById(entity);\n";
content += TAB + "}\n";
content += "\n";
content += "\n";
content += "\n";
content += TAB + "@PostMapping(\"/updateCover\")\n";
content += TAB + "public Mono<Integer> updateCover" + realName + "ById(@RequestBody " + realName + "Entity entity){\n";
content += TAB + TAB + "return " + MysqlUtil.getFirstToLower(realName) + "Service.updateCover"
+ realName + "ById(entity);\n";
content += TAB + "}\n";
content += "\n";
content += "\n";
content += "\n";
content += TAB + "@PostMapping(\"/delete\")\n";
content += TAB + "public Mono<Integer> delete" + tableBean.getSpaceName() + "ById(@RequestParam Long id){\n";
content += TAB + TAB + "return " + MysqlUtil.getFirstToLower(realName) + "Service.delete"
+ realName + "ById(id);\n";
content += TAB + "}\n";
content += "\n";
content += "\n";
content += "\n";
content += "}" + "\n";
FileOutputStream fos = new FileOutputStream(fileName);
Writer out = new OutputStreamWriter(fos, "UTF-8");
out.write(content);
out.close();
fos.close();
System.out.println("===" + realName + "Service.java" + "生成");
// 打开一个写文件器构造函数中的第二个参数true表示以追加形式写文件
// FileWriter writer = new FileWriter(fileName, false);
// writer.write(content);
// writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

View File

@ -129,7 +129,7 @@ public class MysqlUtilTable2Service {
content += "\n";
content += "\n";
content += TAB + "public Mono<Integer> insert" + realName + "(" + realName + "Entity entity){\n";
//content += TAB + TAB + "entity.setId(null);\n";让他报错抛异常
content += TAB + TAB + "entity.setId(null);\n";
content += TAB + TAB + "entity.setCreateTime(null);\n";
content += TAB + TAB + "entity.setModifyTime(null);\n";
content += TAB + TAB + "return insert(entity);\n";

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff