提交websocket相关代码

This commit is contained in:
wulin 2023-09-20 15:49:30 +08:00
parent 027506df44
commit 42191260f4
9 changed files with 1256 additions and 2702 deletions

View File

@ -108,13 +108,4 @@ public class DeviceController {
}
@GetMapping("/list")
public Mono<PagerResult<DeviceInfoEntity>> getList(@RequestParam Integer pageIndex, @RequestParam Integer pageSize, @RequestParam String name){
DeviceInfoRequest request = new DeviceInfoRequest();
request.setCurrPage(pageIndex);
request.setPageSize(pageSize);
request.setName(name);
return deviceInfoService.selectDeviceInfosByRequest(request);
}
}

View File

@ -0,0 +1,42 @@
package com.qiuguo.iot.box.websocket.api.controller;
import com.qiuguo.iot.box.websocket.api.domain.BoxSession;
import com.qiuguo.iot.box.websocket.api.domain.UserSession;
import com.qiuguo.iot.box.websocket.api.handler.BoxWebSocketHandler;
import com.qiuguo.iot.box.websocket.api.handler.CustomerWebSocketHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@Slf4j
@RequestMapping("/websocket/push")
public class WebsocketPushMessageController {
@Autowired
BoxWebSocketHandler boxWebSocketHandler;
@Autowired
CustomerWebSocketHandler customerWebSocketHandler;
@GetMapping("/message")
public Mono<String> pushMessage(@RequestParam String message, @RequestParam String id, @RequestParam Integer type) {
if(type == 0){
//设备推送
BoxSession boxSession = boxWebSocketHandler.getBoxSessionWithSn(id);
if(boxSession != null){
//boxSession.getSession().send(message)
}
return Mono.just("设备未上线");
}else{
//用户推送
UserSession userSession = customerWebSocketHandler.getUserSessionWithSn(id);
if(userSession != null){
}
return Mono.just("用户未上线");
}
//return Mono.just("推送完成");
}
}

View File

@ -0,0 +1,13 @@
package com.qiuguo.iot.box.websocket.api.domain;
import lombok.Data;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.FluxSink;
@Data
public class BoxSession {
String sn;
WebSocketSession session;
FluxSink<WebSocketMessage> sink;
}

View File

@ -0,0 +1,13 @@
package com.qiuguo.iot.box.websocket.api.domain;
import lombok.Data;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.FluxSink;
@Data
public class UserSession {
Long userId;
WebSocketSession session;
FluxSink<WebSocketMessage> sink;
}

View File

@ -7,6 +7,7 @@ import com.chen.tools.util.StringUtils;
import com.google.protobuf.InvalidProtocolBufferException;
import com.qiuguo.iot.base.annotation.WebSocketMapping;
import com.qiuguo.iot.base.constans.RedisConstans;
import com.qiuguo.iot.box.websocket.api.domain.BoxSession;
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
import com.qiuguo.iot.data.request.device.DeviceInfoRequest;
import com.qiuguo.iot.data.service.device.DeviceInfoService;
@ -25,6 +26,8 @@ import org.springframework.web.reactive.socket.*;
import reactor.core.publisher.*;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
@Component
@WebSocketMapping("/box")
@ -41,6 +44,8 @@ public class BoxWebSocketHandler implements WebSocketHandler {
@Resource
private DeviceInfoService deviceInfoService;
public static ConcurrentHashMap<String, BoxSession> group = new ConcurrentHashMap<>();
@Override
public Mono<Void> handle(WebSocketSession session) {
// 在生产环境中需对url中的参数进行检验如token不符合要求的连接的直接关闭
@ -56,82 +61,60 @@ public class BoxWebSocketHandler implements WebSocketHandler {
String token = headers.get("signature").get(0);
ReactiveValueOperations<String, String> operations = reactiveRedisTemplate.opsForValue();
// Mono<DeviceInfoEntity> deviceInfoEntityMono = operations.get(RedisConstans.DEVICE_INFO + sn);
return operations.get(RedisConstans.DEVICE_INFO + sn).defaultIfEmpty("").flatMap(d -> {
if(StringUtils.isNotBlank(d)){
DeviceInfoEntity dv = JSONObject.parseObject(d, DeviceInfoEntity.class);
return Mono.just(dv);
}else{
DeviceInfoRequest request = new DeviceInfoRequest();
request.setSn(sn);
Mono<DeviceInfoEntity> mono = deviceInfoService.selectDeviceInfoByRequest(request);
return mono.map(dv -> {
return operations.set(RedisConstans.DEVICE_INFO + dv.getSn(), JSONObject.toJSONString(dv)).map(b -> {
return Mono.just(dv);
});
});
}
return operations.get(RedisConstans.DEVICE_INFO + sn)
.defaultIfEmpty("")
.flatMap(d -> {
}).map(o -> {
DeviceInfoEntity dv = (DeviceInfoEntity)o;
String snMd5 = MD5.create().digestHex(sn).toUpperCase();
String wifiMd5 = MD5.create().digestHex(dv.getWifiMac()).toUpperCase();
String btMd5 = MD5.create().digestHex(dv.getBtMac()).toUpperCase();
String signalMd5 = MD5.create().digestHex(snMd5 + wifiMd5 + btMd5 + linkTime + dv.getKey()).toUpperCase();
if(!signalMd5.equals(token)){
log.info("设备{},验签失败", sn);
session.close();
return null;
}
return null;
});
if(StringUtils.isNotBlank(d)){
try{
DeviceInfoEntity dv = JSONObject.parseObject(d, DeviceInfoEntity.class);
return Mono.just(dv);
}catch (Exception e){
log.info("转换异常{}", e);
}
}
DeviceInfoRequest request = new DeviceInfoRequest();
request.setSn(sn);
return deviceInfoService.selectDeviceInfoByRequest(request).map(dv -> {
return operations.set(RedisConstans.DEVICE_INFO + dv.getSn(), JSONObject.toJSONString(dv)).map(b -> {
return dv;
});
}).block();
}) .map(o -> {
String snMd5 = MD5.create().digestHex(sn).toUpperCase();
String wifiMd5 = MD5.create().digestHex(o.getWifiMac()).toUpperCase();
String btMd5 = MD5.create().digestHex(o.getBtMac()).toUpperCase();
String signalMd5 = MD5.create().digestHex(snMd5 + wifiMd5 + btMd5 + linkTime + o.getKey()).toUpperCase();
if(!signalMd5.equals(token)){
log.info("设备{},验签失败", sn);
session.close();
return null;
}
log.info("登录成功SN:{}", sn);
Mono<Void> input = session.receive().map(webSocketMessage ->{
log.info("收到消息{}", webSocketMessage);
/*if (handshakeInfo.getUri().getQuery() == null) {
return session.close(CloseStatus.REQUIRED_EXTENSION);
} MultiMap<String> values = new MultiMap<String>();
UrlEncoded.decodeTo(handshakeInfo.getUri().getQuery(), values, "UTF-8");
Long userId = Long.valueOf(values.getString("userId"));
log.info("id:{}, userId:{}", session.getId(), userId);
Mono<Void> input = session.receive().map(webSocketMessage ->{
try {
//log.info("concatMap");
MsgProtocol.Msg reqMsg = getMsg(webSocketMessage.getPayload());
MsgProtocol.User user1 = MsgProtocol.User.parseFrom(reqMsg.getData());
Long id = user1.getUserId();
UserInfo userInfo1 = group.get(id);
userInfo1.setUser(user1);
if(reqMsg.getCmdId().getNumber() == MsgProtocol.Msg.CmdId.SEND_STATUS_REQ_VALUE){
MsgProtocol.Msg reqMsg1 = reqMsg.toBuilder().setCmdId(MsgProtocol.Msg.CmdId.SYNC_STATUS_RSP).build();
batchRunner.add(reqMsg1);
//batchRunner.doFinal();
//countDownLatch.await();
return Mono.empty();//session.textMessage("");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return Mono.empty();
}).then();
UserInfo finalUserInfo = userInfo;
Mono<Void> output = session.send(Flux.create(sink -> finalUserInfo.setSink(sink))).then();
return Mono.empty();
}).then();
BoxSession boxSession = new BoxSession();
boxSession.setSn(sn);
group.put(sn, boxSession);
Mono<Void> output = session.send(Flux.create(sink -> boxSession.setSink(sink))).then();
// Mono.zip() 会将多个 Mono 合并为一个新的 Mono任何一个 Mono 产生 error complete 都会导致合并后的 Mono
// 也随之产生 error complete此时其它的 Mono 则会被执行取消操作
//
return Mono.zip(input, output).doFinally(signalType -> {
group.remove(finalUserInfo.getUser().getUserId());//断链后及时移除
log.info("断开连接{}", finalUserInfo.getUser().getUserId());
}).then();*/
return Mono.zip(input, output).doFinally(signalType -> {
group.remove(boxSession.getSn());//断链后及时移除
log.info("断开连接{}", boxSession.getSn());
}).then().block();
});
}
public BoxSession getBoxSessionWithSn(String sn) {
return group.get(sn);
}
}

View File

@ -1,27 +1,90 @@
package com.qiuguo.iot.box.websocket.api.handler;
import cn.hutool.crypto.digest.MD5;
import cn.hutool.system.UserInfo;
import com.alibaba.fastjson.JSONObject;
import com.chen.tools.util.StringUtils;
import com.qiuguo.iot.base.annotation.WebSocketMapping;
import com.qiuguo.iot.base.constans.RedisConstans;
import com.qiuguo.iot.box.websocket.api.domain.BoxSession;
import com.qiuguo.iot.box.websocket.api.domain.UserSession;
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
import com.qiuguo.iot.data.request.device.DeviceInfoRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveValueOperations;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.concurrent.ConcurrentHashMap;
@Component
@WebSocketMapping("/customer")
@Slf4j
public class CustomerWebSocketHandler implements WebSocketHandler {
@Value("${device.checkTimeout}")
private Boolean checkTimeout;
@Value("${device.timeout}")
private Long timeOut;//2分钟
public static ConcurrentHashMap<Long, UserSession> group = new ConcurrentHashMap<>();
@Resource
private ReactiveRedisTemplate reactiveRedisTemplate;
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()
.concatMap(mapper -> {
String msg = mapper.getPayloadAsText();
System.out.println("mapper: " + msg);
return Flux.just(msg);
}).map(value -> {
System.out.println("value: " + value);
return session.textMessage("Echo " + value);
HandshakeInfo handshakeInfo = session.getHandshakeInfo();
HttpHeaders headers = handshakeInfo.getHeaders();
String token = headers.get("token").get(0);
Long userId = Long.valueOf(headers.get("userId").get(0));
Long linkTime = Long.parseLong(headers.get("time").get(0));
if(checkTimeout && System.currentTimeMillis() - linkTime > timeOut){
//关闭连接
log.info("用户{},请求数据已超时", userId);
return session.close();
}
ReactiveValueOperations<String, String> operations = reactiveRedisTemplate.opsForValue();
return operations.get(RedisConstans.DEVICE_INFO + userId)
.defaultIfEmpty("")
.flatMap(d -> {
if(StringUtils.isNotBlank(d)){
log.info("用户非法登录token无效 userId:{}", userId);
return null;
}
log.info("用户登录成功userId:{}", userId);
Mono<Void> input = session.receive().map(webSocketMessage ->{
log.info("收到用户消息{}", webSocketMessage);
return Mono.empty();
}).then();
UserSession userSession = new UserSession();
userSession.setUserId(userId);
group.put(userId, userSession);
Mono<Void> output = session.send(Flux.create(sink -> userSession.setSink(sink))).then();
// Mono.zip() 会将多个 Mono 合并为一个新的 Mono任何一个 Mono 产生 error complete 都会导致合并后的 Mono
// 也随之产生 error complete此时其它的 Mono 则会被执行取消操作
return Mono.zip(input, output).doFinally(signalType -> {
group.remove(userSession.getUserId());//断链后及时移除
log.info("用户断开连接{}", userSession.getUserId());
}).then();
});
return session.send(output);
}
public UserSession getUserSessionWithSn(String userId) {
return group.get(userId);
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff