更换skywalking版本为9.0

This commit is contained in:
wulin 2023-11-10 20:23:30 +08:00
parent e60c8f3279
commit 9016244294
7 changed files with 86 additions and 46 deletions

View File

@ -75,13 +75,13 @@
<dependency> <dependency>
<groupId>org.apache.skywalking</groupId> <groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId> <artifactId>apm-toolkit-trace</artifactId>
<version>8.15.0</version> <version>9.0.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.skywalking</groupId> <groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-webflux</artifactId> <artifactId>apm-toolkit-webflux</artifactId>
<version>8.15.0</version> <version>9.0.0</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -14,6 +14,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.toolkit.trace.Trace; import org.apache.skywalking.apm.toolkit.trace.Trace;
import org.apache.skywalking.apm.toolkit.trace.TraceContext; import org.apache.skywalking.apm.toolkit.trace.TraceContext;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators; import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingTraceContext;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.filter.GlobalFilter;
@ -48,27 +49,24 @@ public class AuthFilter implements GlobalFilter, Ordered {
@Override @Override
@Trace
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String traceId = WebFluxSkyWalkingOperators.continueTracing(exchange, TraceContext::traceId);
//Optional<String> ss = TraceContext.getCorrelation("traceId");
ServerHttpRequest request = exchange.getRequest();
String url = request.getPath().toString();
String customerIp = request.getRemoteAddress().getAddress().getHostAddress();
ServerWebExchange.Builder ex = exchange.mutate();
ex.request(getRequest(exchange, customerIp, traceId));
ServerWebExchange newEx = ex.build();
if (xssProperties.getExcludeUrls().contains(url)) {
return chain.filter(newEx);
}
String api_token = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TOKEN); return WebFluxSkyWalkingOperators.continueTracing(exchange, () -> {
String api_type = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TYPE); ServerHttpRequest request = exchange.getRequest();
if (ObjectUtils.isEmpty(api_token) || ObjectUtils.isEmpty(api_type)) { String url = request.getPath().toString();
return Mono.error(new RuntimeException("未登录")); if (xssProperties.getExcludeUrls().contains(url)) {
} return chain.filter(exchange);
return chain.filter(newEx); }
String api_token = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TOKEN);
String api_type = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TYPE);
if (ObjectUtils.isEmpty(api_token) || ObjectUtils.isEmpty(api_type)) {
return Mono.error(new RuntimeException("未登录"));
}
return chain.filter(exchange);
});
// String key = RedisConstans.IOT_TOKEN.concat(api_token); // String key = RedisConstans.IOT_TOKEN.concat(api_token);
@ -86,22 +84,7 @@ public class AuthFilter implements GlobalFilter, Ordered {
// }); // });
} }
private ServerHttpRequest getRequest(ServerWebExchange exchange, String customerIp, String tracId){
ServerHttpRequest request = exchange.getRequest();
exchange.getResponse().getHeaders().set(Log4Constans.HEADER_TRACE_ID, tracId);
ServerHttpRequest newRequest = new ServerHttpRequestDecorator(request){
@Override
public HttpHeaders getHeaders(){
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
httpHeaders.set(Log4Constans.CUSTOMER_IP, customerIp);
httpHeaders.set(Log4Constans.HEADER_TRACE_ID, tracId);
return httpHeaders;
}
};
return newRequest;
}
@Override @Override

View File

@ -1,14 +1,23 @@
package com.qiuguo.iot.gateway.filter; package com.qiuguo.iot.gateway.filter;
import com.qiuguo.iot.base.constans.Log4Constans;
import com.qiuguo.iot.gateway.util.WebFluxUtils; import com.qiuguo.iot.gateway.util.WebFluxUtils;
import org.apache.skywalking.apm.toolkit.trace.Trace;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingTraceContext;
import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.ArrayList;
/** /**
* 全局缓存获取body请求数据解决流不能重复读取问题. * 全局缓存获取body请求数据解决流不能重复读取问题.
* *
@ -19,17 +28,46 @@ import reactor.core.publisher.Mono;
public class GlobalCacheRequestFilter implements GlobalFilter, Ordered { public class GlobalCacheRequestFilter implements GlobalFilter, Ordered {
@Override @Override
@Trace
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 只缓存json类型请求 // 只缓存json类型请求
// if (!WebFluxUtils.isJsonRequest(exchange)) { // if (!WebFluxUtils.isJsonRequest(exchange)) {
// return chain.filter(exchange); // return chain.filter(exchange);
// } // }
return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> { String traceId = WebFluxSkyWalkingTraceContext.traceId(exchange);
if (serverHttpRequest == exchange.getRequest()) { String customerIp = exchange.getRequest().getRemoteAddress().getAddress().getHostAddress();
return chain.filter(exchange);
} ServerWebExchange.Builder ex = exchange.mutate();
return chain.filter(exchange.mutate().request(serverHttpRequest).build()); ex.request(getRequest(exchange, customerIp, traceId));
ServerWebExchange newEx = ex.build();
return WebFluxSkyWalkingOperators.continueTracing(newEx, () -> {
WebFluxSkyWalkingTraceContext.putCorrelation(newEx, Log4Constans.PRINT_LOG_ID, traceId);
return ServerWebExchangeUtils.cacheRequestBody(newEx, (serverHttpRequest) -> {
if (serverHttpRequest == newEx.getRequest()) {
return chain.filter(newEx);
}
return chain.filter(newEx.mutate().request(serverHttpRequest).build());
});
}); });
}
private ServerHttpRequest getRequest(ServerWebExchange exchange, String customerIp, String tracId){
ServerHttpRequest request = exchange.getRequest();
exchange.getResponse().getHeaders().set(Log4Constans.HEADER_TRACE_ID, tracId);
ServerHttpRequest newRequest = new ServerHttpRequestDecorator(request){
@Override
public HttpHeaders getHeaders(){
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
httpHeaders.set(Log4Constans.CUSTOMER_IP, customerIp);
httpHeaders.set(Log4Constans.HEADER_TRACE_ID, tracId);
return httpHeaders;
}
};
return newRequest;
} }
@Override @Override

View File

@ -93,13 +93,13 @@
<dependency> <dependency>
<groupId>org.apache.skywalking</groupId> <groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId> <artifactId>apm-toolkit-trace</artifactId>
<version>8.15.0</version> <version>9.0.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.skywalking</groupId> <groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-webflux</artifactId> <artifactId>apm-toolkit-webflux</artifactId>
<version>8.15.0</version> <version>9.0.0</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -3,6 +3,7 @@ package com.qiuguo.iot.user.api.controller.device;
import cn.hutool.crypto.digest.MD5; import cn.hutool.crypto.digest.MD5;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.qiuguo.iot.base.constans.Log4Constans;
import com.qiuguo.iot.base.constans.RedisConstans; import com.qiuguo.iot.base.constans.RedisConstans;
import com.qiuguo.iot.base.enums.DeviceTypeEnum; import com.qiuguo.iot.base.enums.DeviceTypeEnum;
import com.qiuguo.iot.base.enums.OrderByEnum; import com.qiuguo.iot.base.enums.OrderByEnum;
@ -27,6 +28,7 @@ import org.apache.skywalking.apm.toolkit.trace.CarrierItemRef;
import org.apache.skywalking.apm.toolkit.trace.ContextCarrierRef; import org.apache.skywalking.apm.toolkit.trace.ContextCarrierRef;
import org.apache.skywalking.apm.toolkit.trace.Tracer; import org.apache.skywalking.apm.toolkit.trace.Tracer;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators; import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingTraceContext;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
@ -79,6 +81,8 @@ public class DeviceController {
//deviceInfoService. //deviceInfoService.
Long now = System.currentTimeMillis(); Long now = System.currentTimeMillis();
//String pid = WebFluxSkyWalkingTraceContext.getCorrelation(, Log4Constans.PRINT_LOG_ID).get();
//log.info("获取到的pid:{}", pid);
if(checkTimeout && now - time > timeOut){ if(checkTimeout && now - time > timeOut){
//超时 //超时
log.info("请求过期"); log.info("请求过期");

View File

@ -34,7 +34,7 @@ public class LogMdcConfiguration {
if(ctx.hasKey(Log4Constans.PRINT_LOG_ID)){ if(ctx.hasKey(Log4Constans.PRINT_LOG_ID)){
MDC.put(Log4Constans.PRINT_LOG_ID, ctx.get(Log4Constans.PRINT_LOG_ID)); MDC.put(Log4Constans.PRINT_LOG_ID, ctx.get(Log4Constans.PRINT_LOG_ID));
Tracer.createLocalSpan(Log4Constans.PRINT_LOG_ID); //Tracer.createLocalSpan(Log4Constans.PRINT_LOG_ID);
} }
return new MdcContextSubscriber(c); return new MdcContextSubscriber(c);
@ -54,7 +54,7 @@ public class LogMdcConfiguration {
@Override @Override
public void onComplete() { public void onComplete() {
coreSubscriber.onComplete(); coreSubscriber.onComplete();
Tracer.stopSpan(); //Tracer.stopSpan();
MDC.remove(Log4Constans.PRINT_LOG_ID); MDC.remove(Log4Constans.PRINT_LOG_ID);
} }
@ -68,7 +68,7 @@ public class LogMdcConfiguration {
} }
MDC.remove(Log4Constans.PRINT_LOG_ID); MDC.remove(Log4Constans.PRINT_LOG_ID);
Tracer.stopSpan(); //Tracer.stopSpan();
coreSubscriber.onError(throwable); coreSubscriber.onError(throwable);
} }

View File

@ -4,6 +4,7 @@ import com.qiuguo.iot.base.constans.Log4Constans;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.toolkit.trace.*; import org.apache.skywalking.apm.toolkit.trace.*;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators; import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingTraceContext;
import org.hswebframework.web.logger.ReactiveLogger; import org.hswebframework.web.logger.ReactiveLogger;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.slf4j.MDC; import org.slf4j.MDC;
@ -42,9 +43,13 @@ public class LogWebFilter implements WebFilter {
@Override @Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
//Tracer.stopSpan(); //Tracer.stopSpan();
//Tracer.createEntrySpan(Log4Constans.PRINT_LOG_ID, null);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest request = exchange.getRequest();
String requestId = request.getId(); String requestId = WebFluxSkyWalkingTraceContext.traceId(exchange);
log.info("获取到的traceId:{}", requestId);
//String pid = WebFluxSkyWalkingTraceContext.getCorrelation(exchange, Log4Constans.PRINT_LOG_ID).get();
//log.info("获取到的pid:{}", pid);
String ip = request.getRemoteAddress().getAddress().getHostAddress(); String ip = request.getRemoteAddress().getAddress().getHostAddress();
if(request.getHeaders().containsKey(Log4Constans.HEADER_TRACE_ID)){ if(request.getHeaders().containsKey(Log4Constans.HEADER_TRACE_ID)){
//网关生成的tracId //网关生成的tracId
@ -85,6 +90,7 @@ public class LogWebFilter implements WebFilter {
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
MDC.put(Log4Constans.PRINT_LOG_ID, tracId); MDC.put(Log4Constans.PRINT_LOG_ID, tracId);
log.info("api end time:{}, total time:{}", endTime, endTime - startTime); log.info("api end time:{}, total time:{}", endTime, endTime - startTime);
//Tracer.stopSpan();
MDC.remove(Log4Constans.PRINT_LOG_ID); MDC.remove(Log4Constans.PRINT_LOG_ID);
}); });
@ -108,6 +114,15 @@ public class LogWebFilter implements WebFilter {
return dataBuffer; return dataBuffer;
}); });
} }
/*@Override
public HttpHeaders getHeaders(){
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
httpHeaders.set("sw8", "customerIp");//sklwaking传过来的
//httpHeaders.set(Log4Constans.HEADER_TRACE_ID, tracId);
return httpHeaders;
}*/
}; };
} }