日志调用链从网关传入
This commit is contained in:
parent
33ceed7809
commit
cb4661b681
@ -1,5 +1,6 @@
|
||||
package com.qiuguo.iot.base.constans;
|
||||
|
||||
public class Log4Constans {
|
||||
public static String PRINT_LOG_ID = "logid";
|
||||
public static String PRINT_LOG_ID = "tid";
|
||||
public static String CUSTOMER_IP = "customerIp";
|
||||
}
|
||||
|
||||
@ -71,6 +71,18 @@
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.skywalking</groupId>
|
||||
<artifactId>apm-toolkit-trace</artifactId>
|
||||
<version>8.7.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.skywalking</groupId>
|
||||
<artifactId>apm-toolkit-logback-1.x</artifactId>
|
||||
<version>8.7.0</version>
|
||||
</dependency>
|
||||
<!--skywalking trace end-->
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
@ -1,20 +1,28 @@
|
||||
package com.qiuguo.iot.gateway.filter;
|
||||
|
||||
import com.qiuguo.iot.base.constans.Log4Constans;
|
||||
import com.qiuguo.iot.base.constans.RedisConstans;
|
||||
import com.qiuguo.iot.base.constans.UserAuthContains;
|
||||
import com.qiuguo.iot.gateway.config.properties.XssProperties;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.skywalking.apm.toolkit.trace.TraceContext;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
|
||||
import org.springframework.cloud.gateway.filter.GlobalFilter;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.server.RequestPath;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
@ -37,8 +45,12 @@ public class AuthFilter implements GlobalFilter, Ordered {
|
||||
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
String url = request.getPath().toString();
|
||||
String customerIp = request.getRemoteAddress().getAddress().getHostAddress();
|
||||
String tracId = TraceContext.traceId();
|
||||
ServerWebExchange.Builder ex = exchange.mutate();
|
||||
ex.request(getRequest(exchange, customerIp, tracId));
|
||||
if (xssProperties.getExcludeUrls().contains(url)) {
|
||||
return chain.filter(exchange);
|
||||
return chain.filter(ex.build());
|
||||
}
|
||||
|
||||
String api_token = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TOKEN);
|
||||
@ -46,7 +58,7 @@ public class AuthFilter implements GlobalFilter, Ordered {
|
||||
if (ObjectUtils.isEmpty(api_token) || ObjectUtils.isEmpty(api_type)) {
|
||||
return Mono.error(new RuntimeException("未登录"));
|
||||
}
|
||||
return chain.filter(exchange);
|
||||
return chain.filter(ex.build());
|
||||
// String key = RedisConstans.IOT_TOKEN.concat(api_token);
|
||||
// return reactiveRedisTemplate.getExpire(key).map(Duration::getSeconds).flatMap(ttl -> {
|
||||
// if (ttl == -1) {
|
||||
@ -61,6 +73,21 @@ public class AuthFilter implements GlobalFilter, Ordered {
|
||||
// }
|
||||
// });
|
||||
}
|
||||
|
||||
private ServerHttpRequest getRequest(ServerWebExchange exchange, String customerIp, String tracId){
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
ServerHttpRequest newRequest = new ServerHttpRequestDecorator(request){
|
||||
@Override
|
||||
public HttpHeaders getHeaders(){
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.putAll(super.getHeaders());
|
||||
httpHeaders.set(Log4Constans.CUSTOMER_IP, customerIp);
|
||||
httpHeaders.set(Log4Constans.PRINT_LOG_ID, tracId);
|
||||
return httpHeaders;
|
||||
}
|
||||
};
|
||||
return newRequest;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
|
||||
@ -22,20 +22,19 @@ import javax.annotation.PreDestroy;
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class LogMdcConfiguration {
|
||||
public static String PRINT_LOG_ID = Log4Constans.PRINT_LOG_ID;
|
||||
@PostConstruct
|
||||
public void contextOperatorHook() {
|
||||
Hooks.onEachOperator(PRINT_LOG_ID, Operators.lift((r, c) ->{
|
||||
Hooks.onEachOperator(Log4Constans.PRINT_LOG_ID, Operators.lift((r, c) ->{
|
||||
Context ctx = c.currentContext();
|
||||
if(ctx.hasKey(PRINT_LOG_ID)){
|
||||
MDC.put(PRINT_LOG_ID, ctx.get(PRINT_LOG_ID));
|
||||
if(ctx.hasKey(Log4Constans.PRINT_LOG_ID)){
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, ctx.get(Log4Constans.PRINT_LOG_ID));
|
||||
}
|
||||
return new MdcContextSubscriber(c);
|
||||
}));
|
||||
}
|
||||
@PreDestroy
|
||||
public void cleanupHook() {
|
||||
Hooks.resetOnEachOperator(PRINT_LOG_ID);
|
||||
Hooks.resetOnEachOperator(Log4Constans.PRINT_LOG_ID);
|
||||
}
|
||||
|
||||
class MdcContextSubscriber<T> implements CoreSubscriber<T> {
|
||||
@ -47,14 +46,14 @@ public class LogMdcConfiguration {
|
||||
@Override
|
||||
public void onComplete() {
|
||||
coreSubscriber.onComplete();
|
||||
MDC.remove(PRINT_LOG_ID);
|
||||
MDC.remove(Log4Constans.PRINT_LOG_ID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
log.info("异常{}", throwable);
|
||||
coreSubscriber.onError(throwable);
|
||||
MDC.remove(PRINT_LOG_ID);
|
||||
MDC.remove(Log4Constans.PRINT_LOG_ID);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.qiuguo.iot.admin.http.api.filter;
|
||||
|
||||
import com.qiuguo.iot.base.constans.Log4Constans;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.logging.log4j.ThreadContext;
|
||||
@ -38,9 +39,18 @@ public class LogWebFilter implements WebFilter {
|
||||
long startTime = System.currentTimeMillis();
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
String requestId = request.getId();
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
String ip = request.getRemoteAddress().getAddress().getHostAddress();
|
||||
if(request.getHeaders().containsKey(Log4Constans.PRINT_LOG_ID)){
|
||||
//网关生成的tracId
|
||||
requestId = request.getHeaders().get(Log4Constans.PRINT_LOG_ID).get(0);
|
||||
}
|
||||
|
||||
if(request.getHeaders().containsKey(Log4Constans.CUSTOMER_IP)){
|
||||
//网关透传过来的IP
|
||||
ip = request.getHeaders().get(Log4Constans.CUSTOMER_IP).get(0);
|
||||
}
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, requestId);
|
||||
|
||||
String ip = request.getRemoteAddress().getAddress().getHostAddress();//.getHostName();
|
||||
String m = request.getMethod().toString();
|
||||
|
||||
log.info("api start time:{} ip:{} method:{} url:{} param:{} headers:{}",
|
||||
@ -58,17 +68,18 @@ public class LogWebFilter implements WebFilter {
|
||||
if(!request.getMethod().equals(HttpMethod.GET) && !request.getMethod().equals(HttpMethod.DELETE)){
|
||||
ex.request(getRequest(exchange));
|
||||
}
|
||||
String tracId = requestId;
|
||||
return chain.filter(ex.build())
|
||||
.contextWrite(context -> {
|
||||
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, tracId);
|
||||
|
||||
return contextTmp;
|
||||
})
|
||||
.doFinally(signalType -> {
|
||||
long endTime = System.currentTimeMillis();
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, tracId);
|
||||
log.info("api end time:{}, total time:{}", endTime, endTime - startTime);
|
||||
MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
|
||||
MDC.remove(Log4Constans.PRINT_LOG_ID);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -82,18 +82,6 @@
|
||||
<version>8.7.0</version>
|
||||
</dependency>
|
||||
<!--skywalking trace end-->
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-web</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-context</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>javax.servlet-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.hutool</groupId>
|
||||
|
||||
@ -1,25 +0,0 @@
|
||||
package com.qiuguo.iot.user.api.controller.fliter;
|
||||
|
||||
import org.apache.skywalking.apm.toolkit.trace.TraceContext;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.filter.OncePerRequestFilter;
|
||||
|
||||
import javax.servlet.FilterChain;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.io.IOException;
|
||||
|
||||
@Component
|
||||
public class TraceResponseFilter extends OncePerRequestFilter {
|
||||
@Override
|
||||
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
|
||||
try{
|
||||
//设置skywalking的traceId
|
||||
response.addHeader("traceId", TraceContext.traceId());
|
||||
}catch (Exception e){
|
||||
|
||||
}
|
||||
filterChain.doFilter(request, response);
|
||||
}
|
||||
}
|
||||
@ -22,20 +22,19 @@ import javax.annotation.PreDestroy;
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class LogMdcConfiguration {
|
||||
public static String PRINT_LOG_ID = Log4Constans.PRINT_LOG_ID;
|
||||
@PostConstruct
|
||||
public void contextOperatorHook() {
|
||||
Hooks.onEachOperator(PRINT_LOG_ID, Operators.lift((r, c) ->{
|
||||
Hooks.onEachOperator(Log4Constans.PRINT_LOG_ID, Operators.lift((r, c) ->{
|
||||
Context ctx = c.currentContext();
|
||||
if(ctx.hasKey(PRINT_LOG_ID)){
|
||||
MDC.put(PRINT_LOG_ID, ctx.get(PRINT_LOG_ID));
|
||||
if(ctx.hasKey(Log4Constans.PRINT_LOG_ID)){
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, ctx.get(Log4Constans.PRINT_LOG_ID));
|
||||
}
|
||||
return new MdcContextSubscriber(c);
|
||||
}));
|
||||
}
|
||||
@PreDestroy
|
||||
public void cleanupHook() {
|
||||
Hooks.resetOnEachOperator(PRINT_LOG_ID);
|
||||
Hooks.resetOnEachOperator(Log4Constans.PRINT_LOG_ID);
|
||||
}
|
||||
|
||||
class MdcContextSubscriber<T> implements CoreSubscriber<T> {
|
||||
@ -47,14 +46,14 @@ public class LogMdcConfiguration {
|
||||
@Override
|
||||
public void onComplete() {
|
||||
coreSubscriber.onComplete();
|
||||
MDC.remove(PRINT_LOG_ID);
|
||||
MDC.remove(Log4Constans.PRINT_LOG_ID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
log.info("异常{}", throwable);
|
||||
coreSubscriber.onError(throwable);
|
||||
MDC.remove(PRINT_LOG_ID);
|
||||
MDC.remove(Log4Constans.PRINT_LOG_ID);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.qiuguo.iot.user.api.filter;
|
||||
|
||||
import com.qiuguo.iot.base.constans.Log4Constans;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.hswebframework.web.logger.ReactiveLogger;
|
||||
import org.reactivestreams.Publisher;
|
||||
@ -37,9 +38,20 @@ public class LogWebFilter implements WebFilter {
|
||||
long startTime = System.currentTimeMillis();
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
String requestId = request.getId();
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
String ip = request.getRemoteAddress().getAddress().getHostAddress();
|
||||
if(request.getHeaders().containsKey(Log4Constans.PRINT_LOG_ID)){
|
||||
//网关生成的tracId
|
||||
requestId = request.getHeaders().get(Log4Constans.PRINT_LOG_ID).get(0);
|
||||
}
|
||||
|
||||
String ip = request.getRemoteAddress().getAddress().getHostAddress();//.getHostName();
|
||||
if(request.getHeaders().containsKey(Log4Constans.CUSTOMER_IP)){
|
||||
//网关透传过来的IP
|
||||
ip = request.getHeaders().get(Log4Constans.CUSTOMER_IP).get(0);
|
||||
}
|
||||
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, requestId);
|
||||
|
||||
|
||||
String m = request.getMethod().toString();
|
||||
|
||||
log.info("api start time:{} ip:{} method:{} url:{} param:{} headers:{}",
|
||||
@ -57,17 +69,18 @@ public class LogWebFilter implements WebFilter {
|
||||
if(!request.getMethod().equals(HttpMethod.GET) && !request.getMethod().equals(HttpMethod.DELETE)){
|
||||
ex.request(getRequest(exchange));
|
||||
}
|
||||
String tracId = requestId;
|
||||
return chain.filter(ex.build())
|
||||
.contextWrite(context -> {
|
||||
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, tracId);
|
||||
|
||||
return contextTmp;
|
||||
})
|
||||
.doFinally(signalType -> {
|
||||
long endTime = System.currentTimeMillis();
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, tracId);
|
||||
log.info("api end time:{}, total time:{}", endTime, endTime - startTime);
|
||||
MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
|
||||
MDC.remove(Log4Constans.PRINT_LOG_ID);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -21,20 +21,19 @@ import javax.annotation.PreDestroy;
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class LogMdcConfiguration {
|
||||
public static String PRINT_LOG_ID = Log4Constans.PRINT_LOG_ID;
|
||||
@PostConstruct
|
||||
public void contextOperatorHook() {
|
||||
Hooks.onEachOperator(PRINT_LOG_ID, Operators.lift((r, c) ->{
|
||||
Hooks.onEachOperator(Log4Constans.PRINT_LOG_ID, Operators.lift((r, c) ->{
|
||||
Context ctx = c.currentContext();
|
||||
if(ctx.hasKey(PRINT_LOG_ID)){
|
||||
MDC.put(PRINT_LOG_ID, ctx.get(PRINT_LOG_ID));
|
||||
if(ctx.hasKey(Log4Constans.PRINT_LOG_ID)){
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, ctx.get(Log4Constans.PRINT_LOG_ID));
|
||||
}
|
||||
return new MdcContextSubscriber(c);
|
||||
}));
|
||||
}
|
||||
@PreDestroy
|
||||
public void cleanupHook() {
|
||||
Hooks.resetOnEachOperator(PRINT_LOG_ID);
|
||||
Hooks.resetOnEachOperator(Log4Constans.PRINT_LOG_ID);
|
||||
}
|
||||
|
||||
class MdcContextSubscriber<T> implements CoreSubscriber<T> {
|
||||
@ -46,14 +45,14 @@ public class LogMdcConfiguration {
|
||||
@Override
|
||||
public void onComplete() {
|
||||
coreSubscriber.onComplete();
|
||||
MDC.remove(PRINT_LOG_ID);
|
||||
MDC.remove(Log4Constans.PRINT_LOG_ID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
log.info("异常{}", throwable);
|
||||
coreSubscriber.onError(throwable);
|
||||
MDC.remove(PRINT_LOG_ID);
|
||||
MDC.remove(Log4Constans.PRINT_LOG_ID);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.qiuguo.iot.box.websocket.api.filter;
|
||||
|
||||
import com.qiuguo.iot.base.constans.Log4Constans;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.hswebframework.web.logger.ReactiveLogger;
|
||||
import org.reactivestreams.Publisher;
|
||||
@ -36,21 +37,32 @@ import java.util.Arrays;
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class LogWebFilter implements WebFilter {
|
||||
//String customerIp = "";
|
||||
public static String HEAD_IP = "customerIP";
|
||||
@Override
|
||||
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
String requestId = request.getId();
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
|
||||
String customerIp = request.getRemoteAddress().getAddress().getHostAddress();//.getHostName();
|
||||
String requestId = request.getId();
|
||||
String ip = request.getRemoteAddress().getAddress().getHostAddress();
|
||||
if(request.getHeaders().containsKey(Log4Constans.PRINT_LOG_ID)){
|
||||
//网关生成的tracId
|
||||
requestId = request.getHeaders().get(Log4Constans.PRINT_LOG_ID).get(0);
|
||||
}
|
||||
|
||||
if(request.getHeaders().containsKey(Log4Constans.CUSTOMER_IP)){
|
||||
//网关透传过来的IP
|
||||
ip = request.getHeaders().get(Log4Constans.CUSTOMER_IP).get(0);
|
||||
}
|
||||
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, requestId);
|
||||
|
||||
|
||||
String m = request.getMethod().toString();
|
||||
|
||||
log.info("api start time:{} ip:{} method:{} url:{} param:{} headers:{}",
|
||||
startTime,
|
||||
customerIp,
|
||||
ip,
|
||||
m,
|
||||
request.getPath(),
|
||||
request.getQueryParams(),
|
||||
@ -61,19 +73,19 @@ public class LogWebFilter implements WebFilter {
|
||||
ex.response(getResponse(exchange, requestId));
|
||||
|
||||
// if(!request.getMethod().equals(HttpMethod.GET) && !request.getMethod().equals(HttpMethod.DELETE)){
|
||||
ex.request(getRequest(exchange, customerIp, requestId));
|
||||
|
||||
ex.request(getRequest(exchange, ip, requestId));
|
||||
String tracId = requestId;
|
||||
return chain.filter(ex.build())
|
||||
.contextWrite(context -> {
|
||||
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, tracId);
|
||||
|
||||
return contextTmp;
|
||||
})
|
||||
.doFinally(signalType -> {
|
||||
long endTime = System.currentTimeMillis();
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, tracId);
|
||||
log.info("api end time:{}, total time:{}", endTime, endTime - startTime);
|
||||
MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
|
||||
MDC.remove(Log4Constans.PRINT_LOG_ID);
|
||||
});
|
||||
}
|
||||
|
||||
@ -93,7 +105,7 @@ public class LogWebFilter implements WebFilter {
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.putAll(super.getHeaders());
|
||||
httpHeaders.set(HEAD_IP, customerIp);
|
||||
httpHeaders.set(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
httpHeaders.set(Log4Constans.PRINT_LOG_ID, requestId);
|
||||
return httpHeaders;
|
||||
}
|
||||
};
|
||||
|
||||
@ -3,6 +3,7 @@ package com.qiuguo.iot.box.websocket.api.handler;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.qiuguo.iot.base.annotation.WebSocketMapping;
|
||||
import com.qiuguo.iot.base.constans.HttpHeaderConstans;
|
||||
import com.qiuguo.iot.base.constans.Log4Constans;
|
||||
import com.qiuguo.iot.base.constans.RedisConstans;
|
||||
import com.qiuguo.iot.base.enums.*;
|
||||
import com.qiuguo.iot.base.model.UserDeviceInfoModel;
|
||||
@ -97,13 +98,13 @@ public class BoxWebSocketHandler implements WebSocketHandler {
|
||||
boxSession.setCustomerIP(ip);
|
||||
boxSession.setSession(session);
|
||||
boxSession.setUserId(userId);
|
||||
boxSession.setLogId(headers.get(LogMdcConfiguration.PRINT_LOG_ID).get(0));
|
||||
boxSession.setLogId(headers.get(Log4Constans.PRINT_LOG_ID).get(0));
|
||||
|
||||
log.info("登录成功SN:{}", sn);
|
||||
|
||||
Mono<Void> input = session.receive().map(webSocketMessage ->{
|
||||
newMessage(webSocketMessage, boxSession).contextWrite(context -> {
|
||||
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
|
||||
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, boxSession.getLogId());
|
||||
|
||||
return contextTmp;
|
||||
}).subscribe();
|
||||
@ -113,7 +114,7 @@ public class BoxWebSocketHandler implements WebSocketHandler {
|
||||
|
||||
//校验
|
||||
checkToken(boxSession, linkTime, signature, isBind).contextWrite(context -> {
|
||||
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
|
||||
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, boxSession.getLogId());
|
||||
|
||||
return contextTmp;
|
||||
}).subscribe();
|
||||
@ -125,7 +126,7 @@ public class BoxWebSocketHandler implements WebSocketHandler {
|
||||
|
||||
return Mono.zip(input, output).doFinally(signalType -> {
|
||||
disconnect(boxSession).contextWrite(context -> {
|
||||
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
|
||||
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, boxSession.getLogId());
|
||||
|
||||
return contextTmp;
|
||||
}).subscribe();
|
||||
@ -141,7 +142,7 @@ public class BoxWebSocketHandler implements WebSocketHandler {
|
||||
}
|
||||
|
||||
private Mono<Void> newMessage(WebSocketMessage webSocketMessage, BoxSession boxSession){
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, boxSession.getLogId());
|
||||
String text = webSocketMessage.getPayloadAsText();
|
||||
log.info("设备端收到消息:{}", text);
|
||||
BoxTalkMessage boxTalkMessage = JSONObject.parseObject(text, BoxTalkMessage.class);
|
||||
@ -158,7 +159,7 @@ public class BoxWebSocketHandler implements WebSocketHandler {
|
||||
}
|
||||
|
||||
private Mono<Void> disconnect(BoxSession boxSession){
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, boxSession.getLogId());
|
||||
BoxSession boxSession1 = baseWebSocketService.getBoxSessionWithSn(boxSession.getSn());
|
||||
if(boxSession == boxSession1){
|
||||
//断链后及时移除
|
||||
|
||||
@ -3,6 +3,7 @@ package com.qiuguo.iot.box.websocket.api.handler;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.qiuguo.iot.base.annotation.WebSocketMapping;
|
||||
import com.qiuguo.iot.base.constans.HttpHeaderConstans;
|
||||
import com.qiuguo.iot.base.constans.Log4Constans;
|
||||
import com.qiuguo.iot.base.enums.AskTypeEnum;
|
||||
import com.qiuguo.iot.base.enums.DeviceTypeEnum;
|
||||
import com.qiuguo.iot.base.enums.ResponeEnum;
|
||||
@ -79,11 +80,11 @@ public class CustomerWebSocketHandler implements WebSocketHandler {
|
||||
userSession.setSession(session);
|
||||
userSession.setCustomerIP(ip);
|
||||
userSession.setSessionType(YesNo.YES.getCode());
|
||||
userSession.setLogId(headers.get(LogMdcConfiguration.PRINT_LOG_ID).get(0));
|
||||
userSession.setLogId(headers.get(Log4Constans.PRINT_LOG_ID).get(0));
|
||||
log.info("用户成功userId:{}", userId);
|
||||
Mono<Void> input = session.receive().map(webSocketMessage ->{
|
||||
newMessage(webSocketMessage, userSession).contextWrite(context -> {
|
||||
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId());
|
||||
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, userSession.getLogId());
|
||||
|
||||
return contextTmp;
|
||||
}).subscribe();
|
||||
@ -92,7 +93,7 @@ public class CustomerWebSocketHandler implements WebSocketHandler {
|
||||
|
||||
|
||||
checkToken(userSession, type, token).contextWrite(context -> {
|
||||
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId());
|
||||
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, userSession.getLogId());
|
||||
|
||||
return contextTmp;
|
||||
}).subscribe();
|
||||
@ -104,7 +105,7 @@ public class CustomerWebSocketHandler implements WebSocketHandler {
|
||||
|
||||
return Mono.zip(input, output).doFinally(signalType -> {
|
||||
disconnect(userSession).contextWrite(context -> {
|
||||
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId());
|
||||
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, userSession.getLogId());
|
||||
|
||||
return contextTmp;
|
||||
}).subscribe();;
|
||||
@ -112,7 +113,7 @@ public class CustomerWebSocketHandler implements WebSocketHandler {
|
||||
}
|
||||
|
||||
private Mono<Void> newMessage(WebSocketMessage webSocketMessage, UserSession userSession){
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId());
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, userSession.getLogId());
|
||||
String text = webSocketMessage.getPayloadAsText();
|
||||
log.info("收到用户消息:{}", text);
|
||||
UserTalkMessage userTalkMessage = JSONObject.parseObject(text, UserTalkMessage.class);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user