websocket模块集成skywalking

This commit is contained in:
wulin 2023-12-05 18:04:43 +08:00
parent d11db62de1
commit a7ad06fd1c
14 changed files with 293 additions and 50 deletions

View File

@ -75,6 +75,7 @@ security:
- /iot-websocket/websocket/customer
- /iot-websocket/websocket/tts/token
- /iot-websocket/websocket/init/sysTalkAnswer
- /iot-websocket/websocket/device/init
#application:
# cors:
# allowed-crigin-patterns:

View File

@ -1,5 +1,5 @@
server:
port: 8080
port: 8081
spring:
application:
name: qiuguo-iot-gateway

View File

@ -70,18 +70,8 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
<version>8.15.0</version>
</dependency>
<!--skywalking trace end-->
<dependency>
<groupId>cn.hutool</groupId>
@ -90,6 +80,13 @@
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.skywalking/apm-toolkit-trace -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
<version>9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
@ -102,6 +99,8 @@
<version>9.0.0</version>
</dependency>
</dependencies>
<!--skywalking trace end-->
<build>
<finalName>${project.artifactId}</finalName>

View File

@ -1,10 +1,12 @@
package com.qiuguo.iot.user.api.controller.device;
import cn.hutool.crypto.digest.MD5;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.qiuguo.iot.base.constans.Log4Constans;
import com.qiuguo.iot.base.constans.RedisConstans;
import com.qiuguo.iot.base.enums.*;
import com.qiuguo.iot.base.utils.DateTimeUtils;
import com.qiuguo.iot.base.utils.StringUtils;
import com.qiuguo.iot.data.domain.BaseMessageResp;
import com.qiuguo.iot.data.domain.BaseSession;
@ -12,12 +14,15 @@ import com.qiuguo.iot.data.domain.IActionSendMessage;
import com.qiuguo.iot.data.domain.action.Action;
import com.qiuguo.iot.data.domain.box.BoxSession;
import com.qiuguo.iot.data.domain.user.UserSession;
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
import com.qiuguo.iot.data.entity.device.DeviceUserBindEntity;
import com.qiuguo.iot.data.entity.device.DeviceUserTalkRecordEntity;
import com.qiuguo.iot.data.model.system.SystemTalkAnswerConfig;
import com.qiuguo.iot.data.request.device.DeviceInfoRequest;
import com.qiuguo.iot.data.request.device.DeviceUserBindRequest;
import com.qiuguo.iot.data.request.device.DeviceUserTalkRecordRequest;
import com.qiuguo.iot.data.request.qiuguo.cloud.UserTalkRequst;
import com.qiuguo.iot.data.resp.device.DeviceInitResp;
import com.qiuguo.iot.data.resp.device.DeviceTalkRecordResp;
import com.qiuguo.iot.data.resp.device.DeviceUserBindResp;
import com.qiuguo.iot.data.service.device.DeviceBatchService;
@ -42,12 +47,14 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.ReactiveValueOperations;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.api.crud.entity.PagerResult;
import javax.annotation.Resource;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
@ -77,6 +84,92 @@ public class DeviceController {
@Resource
private SystemTalkAnswerConfigService systemTalkAnswerConfigService;
@Value("${device.key}")
private String key;
@Value("${device.checkTimeout}")
private Boolean checkTimeout;
@Autowired
private DeviceInfoService deviceInfoService;
@Resource
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;
@Value("${device.timeout}")
private Long timeOut;//2分钟
@GetMapping("/init")
public Mono<DeviceInitResp> deviceInit(@RequestParam String wifiMac, @RequestParam String btMac,
@RequestParam Integer type, @RequestParam Long time,
@RequestParam String signature){
//deviceInfoService.
Long now = System.currentTimeMillis();
if(checkTimeout && now - time > timeOut){
//超时
log.info("请求过期");
BusinessException ex = new BusinessException("请求已失效");
return Mono.error(ex);
}
//设备类型是否匹配暂时不做限制
DeviceTypeEnum entryTypeEnum = DeviceTypeEnum.getEnumWithCode(type);
if(entryTypeEnum == null){
log.info("不支持的设备类型");
BusinessException ex = new BusinessException("不支持的设备类型");
return Mono.error(ex);
}
//验签
String wifiMd5 = MD5.create().digestHex(wifiMac).toUpperCase();
String btMd5 = MD5.create().digestHex(btMac).toUpperCase();
String typeMd5 = MD5.create().digestHex(type.toString()).toUpperCase();
String md5 = MD5.create().digestHex(wifiMd5 + btMd5 + typeMd5 + time + key).toUpperCase();
if (!md5.equals(signature)) {
//
//验签失败
log.info("验签失败:{}", md5);
BusinessException ex = new BusinessException("验签失败");
return Mono.error(ex);
}
DeviceInfoRequest request = new DeviceInfoRequest();
request.setWifiMac(wifiMac);
request.setBtMac(btMac);
Mono<DeviceInfoEntity> mono = deviceInfoService.selectDeviceInfoByRequest(request);
return mono.defaultIfEmpty(new DeviceInfoEntity()).flatMap(entity -> {
Mono mono1 = null;
if(entity.getId() == null){
entity.setWifiMac(wifiMac);
entity.setBtMac(btMac);
entity.setBatchId(1l);
entity.setName(entryTypeEnum.getName());
entity.setDeviceType(type);
entity.setKey( StringUtils.getRandomStr(10));//生成key
DateTimeFormatter df = DateTimeFormatter.ofPattern("yyMMddHHmmss");
entity.setSn("QG" + entryTypeEnum.getSn() +df.format(DateTimeUtils.getNowLocalDateTime()) + StringUtils.getRandomStr(3));
mono1 = deviceInfoService.insertDeviceInfo(entity);
}else{
entity.setKey( StringUtils.getRandomStr(10));//重新生成Key
mono1 = deviceInfoService.updateDeviceInfoById(entity);
}
return mono1.map(m ->{
return entity;
});
}).map(o -> {
DeviceInfoEntity deviceInfoEntity = (DeviceInfoEntity)o;
ReactiveValueOperations<String, String> operations = reactiveStringRedisTemplate.opsForValue();
operations.set(RedisConstans.DEVICE_INFO + deviceInfoEntity.getSn()
, JSONObject.toJSONString(deviceInfoEntity)
, RedisConstans.ONE_HOUR).subscribe();
DeviceInitResp resp = new DeviceInitResp();
resp.setKey(deviceInfoEntity.getKey());
resp.setSn(deviceInfoEntity.getSn());
return resp;
});
}
@GetMapping("/init/sysTalkAnswer")
public Mono<String> sysTalkAnswer(@RequestParam Integer type) {

View File

@ -89,9 +89,9 @@ public class LogWebFilter implements WebFilter {
}).doFinally(signalType -> {
WebFluxSkyWalkingOperators.continueTracing(newEx, () -> {
long endTime = System.currentTimeMillis();
MDC.put(Log4Constans.PRINT_LOG_ID, tracId);
//MDC.put(Log4Constans.PRINT_LOG_ID, tracId);
log.info("api end time:{}, total time:{}", endTime, endTime - startTime);
MDC.remove(Log4Constans.PRINT_LOG_ID);
//MDC.remove(Log4Constans.PRINT_LOG_ID);
});
});

View File

@ -1,5 +1,5 @@
server:
port: 8080
port: 8082
spring:
application:
name: qiuguo-iot-box-user-api

View File

@ -18,12 +18,6 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -95,6 +89,12 @@
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.skywalking/apm-toolkit-trace -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
<version>9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>

View File

@ -85,7 +85,7 @@ public class DeviceController {
if (!md5.equals(signature)) {
//
//验签失败
log.info("验签失败");
log.info("验签失败:{}", md5);
BusinessException ex = new BusinessException("验签失败");
return Mono.error(ex);
}

View File

@ -72,7 +72,9 @@ public class LogMdcConfiguration {
if(coreSubscriber.currentContext().hasKey(Log4Constans.PRINT_LOG_ID)){
MDC.put(Log4Constans.PRINT_LOG_ID, coreSubscriber.currentContext().get(Log4Constans.PRINT_LOG_ID));
}
//log.info("sky-------->");
WebFluxSkyWalkingOperators.continueTracing(ctx, () ->{
//log.info("walking=======>");
coreSubscriber.onNext(t);
});
}

View File

@ -1,7 +1,10 @@
package com.qiuguo.iot.box.websocket.api.filter;
import com.qiuguo.iot.base.constans.Log4Constans;
import com.qiuguo.iot.base.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingTraceContext;
import org.hswebframework.web.logger.ReactiveLogger;
import org.reactivestreams.Publisher;
import org.slf4j.MDC;
@ -37,16 +40,18 @@ import java.util.Arrays;
@Configuration
@Slf4j
public class LogWebFilter implements WebFilter {
public static String HEAD_IP = "customerIP";
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
long startTime = System.currentTimeMillis();
ServerHttpRequest request = exchange.getRequest();
String requestId = request.getId();
String requestId = request.getId();//WebFluxSkyWalkingTraceContext.segmentId(exchange);
log.info("获取到的traceId:{}", requestId);
//String pid = WebFluxSkyWalkingTraceContext.getCorrelation(exchange, Log4Constans.PRINT_LOG_ID).get();
//log.info("获取到的pid:{}", pid);
String ip = request.getRemoteAddress().getAddress().getHostAddress();
if(request.getHeaders().containsKey(Log4Constans.PRINT_LOG_ID)){
if(request.getHeaders().containsKey(Log4Constans.HEADER_TRACE_ID)){
//网关生成的tracId
requestId = request.getHeaders().get(Log4Constans.PRINT_LOG_ID).get(0);
requestId = request.getHeaders().get(Log4Constans.HEADER_TRACE_ID).get(0);
}
if(request.getHeaders().containsKey(Log4Constans.CUSTOMER_IP)){
@ -74,17 +79,20 @@ public class LogWebFilter implements WebFilter {
// if(!request.getMethod().equals(HttpMethod.GET) && !request.getMethod().equals(HttpMethod.DELETE)){
ex.request(getRequest(exchange, ip, requestId));
String tracId = requestId;
return chain.filter(ex.build())
ServerWebExchange newEx = ex.build();
return chain.filter(newEx)
.contextWrite(context -> {
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, tracId);
return contextTmp;
})
.doFinally(signalType -> {
long endTime = System.currentTimeMillis();
MDC.put(Log4Constans.PRINT_LOG_ID, tracId);
log.info("api end time:{}, total time:{}", endTime, endTime - startTime);
MDC.remove(Log4Constans.PRINT_LOG_ID);
WebFluxSkyWalkingOperators.continueTracing(newEx, () -> {
long endTime = System.currentTimeMillis();
//MDC.put(Log4Constans.PRINT_LOG_ID, tracId);
log.info("api end time:{}, total time:{}", endTime, endTime - startTime);
//MDC.remove(Log4Constans.PRINT_LOG_ID);
});
});
}
@ -95,18 +103,19 @@ public class LogWebFilter implements WebFilter {
public Flux<DataBuffer> getBody() {
Flux<DataBuffer> body = this.getDelegate().getBody();
return body.map(dataBuffer -> {
log.info("request:{}", dataBuffer.toString(StandardCharsets.UTF_8));
if (/*!request.getMethod().equals(HttpMethod.GET) && */!request.getMethod().equals(HttpMethod.DELETE)) {
log.info("request:{}", dataBuffer.toString(StandardCharsets.UTF_8));
}
return dataBuffer;
});
}
@Override
/* @Override
public HttpHeaders getHeaders(){
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
httpHeaders.set(HEAD_IP, customerIp);
httpHeaders.set(Log4Constans.PRINT_LOG_ID, requestId);
httpHeaders.set(Log4Constans.HEADER_TRACE_ID, requestId);
return httpHeaders;
}
}*/
};
return newRequest;
}

View File

@ -30,6 +30,8 @@ import com.qiuguo.iot.data.service.device.DeviceUserBindService;
import com.qiuguo.iot.data.domain.action.Actions;
import com.qiuguo.iot.third.service.NlpService;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.toolkit.trace.Trace;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
import org.slf4j.MDC;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.http.HttpHeaders;
@ -87,7 +89,7 @@ public class BoxWebSocketHandler implements WebSocketHandler {
log.info("设备{},请求数据已超时", sn);
return session.close();
}
String ip = headers.get(LogWebFilter.HEAD_IP).get(0);
String ip = headers.get(Log4Constans.CUSTOMER_IP).get(0);
if(ip.startsWith("192.") ||
ip.startsWith("10.") ||
ip.startsWith("172.") ||
@ -107,32 +109,41 @@ public class BoxWebSocketHandler implements WebSocketHandler {
boxSession.setUserId(userId);
boxSession.setRobotId(userId);
boxSession.setBaseLogId(headers.get(Log4Constans.PRINT_LOG_ID).get(0));
boxSession.setBaseLogId(headers.get(Log4Constans.HEADER_TRACE_ID).get(0));
boxSession.setLogId(boxSession.getBaseLogId());
log.info("登录成功SN:{}", sn);
Mono<Void> input = session.receive().map(webSocketMessage ->{
newMessage(webSocketMessage, boxSession).contextWrite(context -> {
log.info("新的消息");
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, boxSession.getLogId());
newMessage(webSocketMessage, boxSession).contextWrite(context -> {
log.info("新的消息");
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, boxSession.getLogId());
return contextTmp;
}).subscribe();
return contextTmp;
}).subscribe();
return webSocketMessage;
}).contextWrite(context -> {
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, boxSession.getLogId());
return contextTmp;
}).then();
//校验
checkToken(boxSession, linkTime, signature, isBind).contextWrite(context -> {
checkToken(boxSession, linkTime, signature, isBind)
.subscribe();
Mono<Void> output = session.send(Flux.create(sink -> boxSession.setSink(sink))).contextWrite(context -> {
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, boxSession.getLogId());
return contextTmp;
}).subscribe();
Mono<Void> output = session.send(Flux.create(sink -> boxSession.setSink(sink))).then();
}).then();
// Mono.zip() 会将多个 Mono 合并为一个新的 Mono任何一个 Mono 产生 error complete 都会导致合并后的 Mono
// 也随之产生 error complete此时其它的 Mono 则会被执行取消操作
@ -144,6 +155,12 @@ public class BoxWebSocketHandler implements WebSocketHandler {
return contextTmp;
}).subscribe();
}).contextWrite(context -> {
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, boxSession.getLogId());
return contextTmp;
}).then();
}

View File

@ -74,7 +74,7 @@ public class CustomerWebSocketHandler implements WebSocketHandler {
log.info("用户{},请求数据已超时", userId);
return session.close();
}
String ip = headers.get(LogWebFilter.HEAD_IP).get(0);
String ip = headers.get(Log4Constans.CUSTOMER_IP).get(0);
UserSession userSession = new UserSession();
userSession.setUserId(userId);
userSession.setSession(session);
@ -82,7 +82,7 @@ public class CustomerWebSocketHandler implements WebSocketHandler {
userSession.setRobotId(userId);
userSession.setSessionType(YesNo.YES.getCode());
userSession.setBaseLogId(headers.get(Log4Constans.PRINT_LOG_ID).get(0));
userSession.setBaseLogId(headers.get(Log4Constans.HEADER_TRACE_ID).get(0));
userSession.setLogId(userSession.getBaseLogId());
log.info("用户成功userId:{}", userId);
Mono<Void> input = session.receive().map(webSocketMessage ->{

View File

@ -24,6 +24,46 @@ spring:
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
r2dbc:
#注意下面修改 easyorm相关也需要修改
#url: r2dbc:postgresql://localhost:5432/postgres #postgresql请使用此配置
url: r2dbc:mysql://192.168.8.146:30416/qiuguo_iot?ssl=false&serverZoneId=Asia/Shanghai # mysql请使用此配置
#url: r2dbc:h2:file:///./data/h2db/jetlinks
username: root
password: 123456
redis:
# cluster:
# nodes:
# - 127.0.0.1:7001
# - 127.0.0.1:7002
host: 192.168.8.146
port: 32030
password: 123456
timeout: 5000
rabbitmq:
host: 192.168.8.146
port: 31043
username: admin
password: 123456
listener:
simple:
# 设置手动ack回复
acknowledge-mode: manual
retry:
# 开启重试机制
enabled: true
# 重试次数
max-attempts: 3
# 重试最大间隔时间
max-interval: 100000
# 重试初始间隔时间
initial-interval: 100
# 间隔时间因子
multiplier: 20
#设置消息发送回调
publisher-returns: true
publisher-confirm-type: simple
virtual-host: /iot
tianqiapi:
url: https://v0.yiketianqi.com/api?unescape=1&version=v91&appid=23293151&appsecret=Lj6ZMcqn&ext=life
qiuguo:

View File

@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<contextName>${HOSTNAME}</contextName>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<timestamp key="startTime" datePattern="yyyyMMdd" timeReference="contextBirth"/>
<springProperty scope="context" name="APP_NAME" source="spring.application.name"/>
<property name="LOG_PATH" value="logs"/>
<property name="LOG_FILE" value="${LOG_PATH}/application.log"/>
<property name="PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS}|${APP_NAME}|%clr(%5p)|%X{tid}|${PID:- }|%t|%-40.40logger{39}|%m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
<property name="SKY_PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS}|${APP_NAME}|%5p|%X{tid}|${PID:- }|%t|%-40.40logger{39}|%m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
<appender name="APPLICATION" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/application.${HOSTNAME}.log</file>
<encoder>
<pattern>${PATTERN}</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/application.%d{yyyy-MM-dd}.${HOSTNAME}.log</fileNamePattern>
<!-- 超过30天的备份文件会被删除 -->
<maxHistory>30</maxHistory>
</rollingPolicy>
</appender>
<appender name="SQL" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/sql.${HOSTNAME}.log</file>
<encoder>
<pattern>${PATTERN}</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/sql.%d{yyyy-MM-dd}.${HOSTNAME}.log</fileNamePattern>
<!-- 超过30天的备份文件会被删除 -->
<maxHistory>3</maxHistory>
</rollingPolicy>
</appender>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout">
<Pattern>${PATTERN}</Pattern>
</layout>
</encoder>
</appender>
<!-- skywalking采集日志 -->
<appender name="SKYWALKING" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout">
<Pattern>${SKY_PATTERN}</Pattern>
</layout>
</encoder>
</appender>
<springProfile name="prod">
<root level="INFO">
<appender-ref ref="APPLICATION"/>
<appender-ref ref="SKYWALKING"/>
<appender-ref ref="CONSOLE"/>
</root>
</springProfile>
<springProfile name="test,staging,gray">
<root level="INFO">
<appender-ref ref="APPLICATION"/>
<appender-ref ref="SKYWALKING"/>
<appender-ref ref="CONSOLE"/>
</root>
</springProfile>
<springProfile name="dev,prod,test">
<root level="info">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="APPLICATION"/>
<appender-ref ref="SKYWALKING"/>
</root>
</springProfile>
</configuration>