diff --git a/iot-gateway/pom.xml b/iot-gateway/pom.xml index 33552d4..72fd36c 100644 --- a/iot-gateway/pom.xml +++ b/iot-gateway/pom.xml @@ -75,13 +75,13 @@ org.apache.skywalking apm-toolkit-trace - 8.15.0 + 9.0.0 org.apache.skywalking apm-toolkit-webflux - 8.15.0 + 9.0.0 diff --git a/iot-gateway/src/main/java/com/qiuguo/iot/gateway/filter/AuthFilter.java b/iot-gateway/src/main/java/com/qiuguo/iot/gateway/filter/AuthFilter.java index bc43a93..8d3849f 100644 --- a/iot-gateway/src/main/java/com/qiuguo/iot/gateway/filter/AuthFilter.java +++ b/iot-gateway/src/main/java/com/qiuguo/iot/gateway/filter/AuthFilter.java @@ -14,6 +14,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.toolkit.trace.Trace; import org.apache.skywalking.apm.toolkit.trace.TraceContext; import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators; +import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingTraceContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; @@ -48,27 +49,24 @@ public class AuthFilter implements GlobalFilter, Ordered { @Override - @Trace public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { - String traceId = WebFluxSkyWalkingOperators.continueTracing(exchange, TraceContext::traceId); - //Optional ss = TraceContext.getCorrelation("traceId"); - ServerHttpRequest request = exchange.getRequest(); - String url = request.getPath().toString(); - String customerIp = request.getRemoteAddress().getAddress().getHostAddress(); - ServerWebExchange.Builder ex = exchange.mutate(); - ex.request(getRequest(exchange, customerIp, traceId)); - ServerWebExchange newEx = ex.build(); - if (xssProperties.getExcludeUrls().contains(url)) { - return chain.filter(newEx); - } - String api_token = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TOKEN); - String api_type = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TYPE); - if (ObjectUtils.isEmpty(api_token) || ObjectUtils.isEmpty(api_type)) { - return Mono.error(new RuntimeException("未登录")); - } - return chain.filter(newEx); + return WebFluxSkyWalkingOperators.continueTracing(exchange, () -> { + ServerHttpRequest request = exchange.getRequest(); + String url = request.getPath().toString(); + if (xssProperties.getExcludeUrls().contains(url)) { + return chain.filter(exchange); + } + + String api_token = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TOKEN); + String api_type = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TYPE); + if (ObjectUtils.isEmpty(api_token) || ObjectUtils.isEmpty(api_type)) { + return Mono.error(new RuntimeException("未登录")); + } + return chain.filter(exchange); + }); + // String key = RedisConstans.IOT_TOKEN.concat(api_token); @@ -86,22 +84,7 @@ public class AuthFilter implements GlobalFilter, Ordered { // }); } - private ServerHttpRequest getRequest(ServerWebExchange exchange, String customerIp, String tracId){ - ServerHttpRequest request = exchange.getRequest(); - exchange.getResponse().getHeaders().set(Log4Constans.HEADER_TRACE_ID, tracId); - ServerHttpRequest newRequest = new ServerHttpRequestDecorator(request){ - @Override - public HttpHeaders getHeaders(){ - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.putAll(super.getHeaders()); - httpHeaders.set(Log4Constans.CUSTOMER_IP, customerIp); - httpHeaders.set(Log4Constans.HEADER_TRACE_ID, tracId); - return httpHeaders; - } - }; - return newRequest; - } @Override diff --git a/iot-gateway/src/main/java/com/qiuguo/iot/gateway/filter/GlobalCacheRequestFilter.java b/iot-gateway/src/main/java/com/qiuguo/iot/gateway/filter/GlobalCacheRequestFilter.java index 93132e8..8a014fa 100644 --- a/iot-gateway/src/main/java/com/qiuguo/iot/gateway/filter/GlobalCacheRequestFilter.java +++ b/iot-gateway/src/main/java/com/qiuguo/iot/gateway/filter/GlobalCacheRequestFilter.java @@ -1,14 +1,23 @@ package com.qiuguo.iot.gateway.filter; +import com.qiuguo.iot.base.constans.Log4Constans; import com.qiuguo.iot.gateway.util.WebFluxUtils; +import org.apache.skywalking.apm.toolkit.trace.Trace; +import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators; +import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingTraceContext; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; import org.springframework.core.Ordered; +import org.springframework.http.HttpHeaders; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; +import java.util.ArrayList; + /** * 全局缓存获取body请求数据(解决流不能重复读取问题). * @@ -19,17 +28,46 @@ import reactor.core.publisher.Mono; public class GlobalCacheRequestFilter implements GlobalFilter, Ordered { @Override + @Trace public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 只缓存json类型请求 // if (!WebFluxUtils.isJsonRequest(exchange)) { // return chain.filter(exchange); // } - return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> { - if (serverHttpRequest == exchange.getRequest()) { - return chain.filter(exchange); - } - return chain.filter(exchange.mutate().request(serverHttpRequest).build()); + String traceId = WebFluxSkyWalkingTraceContext.traceId(exchange); + String customerIp = exchange.getRequest().getRemoteAddress().getAddress().getHostAddress(); + + ServerWebExchange.Builder ex = exchange.mutate(); + ex.request(getRequest(exchange, customerIp, traceId)); + + ServerWebExchange newEx = ex.build(); + return WebFluxSkyWalkingOperators.continueTracing(newEx, () -> { + WebFluxSkyWalkingTraceContext.putCorrelation(newEx, Log4Constans.PRINT_LOG_ID, traceId); + return ServerWebExchangeUtils.cacheRequestBody(newEx, (serverHttpRequest) -> { + if (serverHttpRequest == newEx.getRequest()) { + return chain.filter(newEx); + } + return chain.filter(newEx.mutate().request(serverHttpRequest).build()); + }); }); + + } + + private ServerHttpRequest getRequest(ServerWebExchange exchange, String customerIp, String tracId){ + ServerHttpRequest request = exchange.getRequest(); + + exchange.getResponse().getHeaders().set(Log4Constans.HEADER_TRACE_ID, tracId); + ServerHttpRequest newRequest = new ServerHttpRequestDecorator(request){ + @Override + public HttpHeaders getHeaders(){ + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.putAll(super.getHeaders()); + httpHeaders.set(Log4Constans.CUSTOMER_IP, customerIp); + httpHeaders.set(Log4Constans.HEADER_TRACE_ID, tracId); + return httpHeaders; + } + }; + return newRequest; } @Override diff --git a/iot-modules/iot-box-user-api/pom.xml b/iot-modules/iot-box-user-api/pom.xml index 8961665..f970f07 100644 --- a/iot-modules/iot-box-user-api/pom.xml +++ b/iot-modules/iot-box-user-api/pom.xml @@ -93,13 +93,13 @@ org.apache.skywalking apm-toolkit-trace - 8.15.0 + 9.0.0 org.apache.skywalking apm-toolkit-webflux - 8.15.0 + 9.0.0 diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java index 351b13e..7646dcf 100644 --- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/device/DeviceController.java @@ -3,6 +3,7 @@ package com.qiuguo.iot.user.api.controller.device; import cn.hutool.crypto.digest.MD5; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.qiuguo.iot.base.constans.Log4Constans; import com.qiuguo.iot.base.constans.RedisConstans; import com.qiuguo.iot.base.enums.DeviceTypeEnum; import com.qiuguo.iot.base.enums.OrderByEnum; @@ -27,6 +28,7 @@ import org.apache.skywalking.apm.toolkit.trace.CarrierItemRef; import org.apache.skywalking.apm.toolkit.trace.ContextCarrierRef; import org.apache.skywalking.apm.toolkit.trace.Tracer; import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators; +import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingTraceContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; @@ -79,6 +81,8 @@ public class DeviceController { //deviceInfoService. Long now = System.currentTimeMillis(); + //String pid = WebFluxSkyWalkingTraceContext.getCorrelation(, Log4Constans.PRINT_LOG_ID).get(); + //log.info("获取到的pid:{}", pid); if(checkTimeout && now - time > timeOut){ //超时 log.info("请求过期"); diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java index 739f183..d46f8fe 100644 --- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java @@ -34,7 +34,7 @@ public class LogMdcConfiguration { if(ctx.hasKey(Log4Constans.PRINT_LOG_ID)){ MDC.put(Log4Constans.PRINT_LOG_ID, ctx.get(Log4Constans.PRINT_LOG_ID)); - Tracer.createLocalSpan(Log4Constans.PRINT_LOG_ID); + //Tracer.createLocalSpan(Log4Constans.PRINT_LOG_ID); } return new MdcContextSubscriber(c); @@ -54,7 +54,7 @@ public class LogMdcConfiguration { @Override public void onComplete() { coreSubscriber.onComplete(); - Tracer.stopSpan(); + //Tracer.stopSpan(); MDC.remove(Log4Constans.PRINT_LOG_ID); } @@ -68,7 +68,7 @@ public class LogMdcConfiguration { } MDC.remove(Log4Constans.PRINT_LOG_ID); - Tracer.stopSpan(); + //Tracer.stopSpan(); coreSubscriber.onError(throwable); } diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogWebFilter.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogWebFilter.java index 5df9f06..38ed37b 100644 --- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogWebFilter.java +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogWebFilter.java @@ -4,6 +4,7 @@ import com.qiuguo.iot.base.constans.Log4Constans; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.toolkit.trace.*; import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators; +import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingTraceContext; import org.hswebframework.web.logger.ReactiveLogger; import org.reactivestreams.Publisher; import org.slf4j.MDC; @@ -42,9 +43,13 @@ public class LogWebFilter implements WebFilter { @Override public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { //Tracer.stopSpan(); + //Tracer.createEntrySpan(Log4Constans.PRINT_LOG_ID, null); long startTime = System.currentTimeMillis(); ServerHttpRequest request = exchange.getRequest(); - String requestId = request.getId(); + String requestId = WebFluxSkyWalkingTraceContext.traceId(exchange); + log.info("获取到的traceId:{}", requestId); + //String pid = WebFluxSkyWalkingTraceContext.getCorrelation(exchange, Log4Constans.PRINT_LOG_ID).get(); + //log.info("获取到的pid:{}", pid); String ip = request.getRemoteAddress().getAddress().getHostAddress(); if(request.getHeaders().containsKey(Log4Constans.HEADER_TRACE_ID)){ //网关生成的tracId @@ -85,6 +90,7 @@ public class LogWebFilter implements WebFilter { long endTime = System.currentTimeMillis(); MDC.put(Log4Constans.PRINT_LOG_ID, tracId); log.info("api end time:{}, total time:{}", endTime, endTime - startTime); + //Tracer.stopSpan(); MDC.remove(Log4Constans.PRINT_LOG_ID); }); @@ -108,6 +114,15 @@ public class LogWebFilter implements WebFilter { return dataBuffer; }); } + + /*@Override + public HttpHeaders getHeaders(){ + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.putAll(super.getHeaders()); + httpHeaders.set("sw8", "customerIp");//sklwaking传过来的 + //httpHeaders.set(Log4Constans.HEADER_TRACE_ID, tracId); + return httpHeaders; + }*/ }; }