集成skywalking

This commit is contained in:
wulin 2023-11-10 09:35:33 +08:00
parent 654a78d93b
commit e60c8f3279
11 changed files with 111 additions and 54 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -1,6 +1,9 @@
package com.qiuguo.iot.base.constans; package com.qiuguo.iot.base.constans;
public class Log4Constans { public class Log4Constans {
public static String PRINT_LOG_ID = "tid"; public static String PRINT_LOG_ID = "traceId";
public static String HEADER_TRACE_ID = "x-trace-id";
public static String CUSTOMER_IP = "customerIp"; public static String CUSTOMER_IP = "customerIp";
} }

View File

@ -71,6 +71,18 @@
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.skywalking/apm-toolkit-trace -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<version>8.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-webflux</artifactId>
<version>8.15.0</version>
</dependency>
</dependencies> </dependencies>

View File

@ -8,7 +8,12 @@ import com.qiuguo.iot.gateway.config.properties.XssProperties;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.toolkit.trace.Trace;
import org.apache.skywalking.apm.toolkit.trace.TraceContext;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.filter.GlobalFilter;
@ -43,16 +48,19 @@ public class AuthFilter implements GlobalFilter, Ordered {
@Override @Override
@Trace
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String traceId = WebFluxSkyWalkingOperators.continueTracing(exchange, TraceContext::traceId);
//Optional<String> ss = TraceContext.getCorrelation("traceId");
ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest request = exchange.getRequest();
String url = request.getPath().toString(); String url = request.getPath().toString();
String customerIp = request.getRemoteAddress().getAddress().getHostAddress(); String customerIp = request.getRemoteAddress().getAddress().getHostAddress();
String tracId = StringUtils.getUUID();
ServerWebExchange.Builder ex = exchange.mutate(); ServerWebExchange.Builder ex = exchange.mutate();
ex.request(getRequest(exchange, customerIp, tracId)); ex.request(getRequest(exchange, customerIp, traceId));
ServerWebExchange newEx = ex.build();
if (xssProperties.getExcludeUrls().contains(url)) { if (xssProperties.getExcludeUrls().contains(url)) {
return chain.filter(ex.build()); return chain.filter(newEx);
} }
String api_token = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TOKEN); String api_token = exchange.getRequest().getHeaders().getFirst(UserAuthContains.API_TOKEN);
@ -60,7 +68,9 @@ public class AuthFilter implements GlobalFilter, Ordered {
if (ObjectUtils.isEmpty(api_token) || ObjectUtils.isEmpty(api_type)) { if (ObjectUtils.isEmpty(api_token) || ObjectUtils.isEmpty(api_type)) {
return Mono.error(new RuntimeException("未登录")); return Mono.error(new RuntimeException("未登录"));
} }
return chain.filter(ex.build()); return chain.filter(newEx);
// String key = RedisConstans.IOT_TOKEN.concat(api_token); // String key = RedisConstans.IOT_TOKEN.concat(api_token);
// return reactiveRedisTemplate.getExpire(key).map(Duration::getSeconds).flatMap(ttl -> { // return reactiveRedisTemplate.getExpire(key).map(Duration::getSeconds).flatMap(ttl -> {
// if (ttl == -1) { // if (ttl == -1) {
@ -78,13 +88,15 @@ public class AuthFilter implements GlobalFilter, Ordered {
private ServerHttpRequest getRequest(ServerWebExchange exchange, String customerIp, String tracId){ private ServerHttpRequest getRequest(ServerWebExchange exchange, String customerIp, String tracId){
ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest request = exchange.getRequest();
exchange.getResponse().getHeaders().set(Log4Constans.HEADER_TRACE_ID, tracId);
ServerHttpRequest newRequest = new ServerHttpRequestDecorator(request){ ServerHttpRequest newRequest = new ServerHttpRequestDecorator(request){
@Override @Override
public HttpHeaders getHeaders(){ public HttpHeaders getHeaders(){
HttpHeaders httpHeaders = new HttpHeaders(); HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders()); httpHeaders.putAll(super.getHeaders());
httpHeaders.set(Log4Constans.CUSTOMER_IP, customerIp); httpHeaders.set(Log4Constans.CUSTOMER_IP, customerIp);
httpHeaders.set(Log4Constans.PRINT_LOG_ID, tracId); httpHeaders.set(Log4Constans.HEADER_TRACE_ID, tracId);
return httpHeaders; return httpHeaders;
} }
}; };

View File

@ -70,11 +70,16 @@
<artifactId>reactor-test</artifactId> <artifactId>reactor-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.skywalking</groupId> <groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId> <artifactId>apm-toolkit-logback-1.x</artifactId>
<version>8.7.0</version> <exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
<version>8.15.0</version>
</dependency> </dependency>
<!--skywalking trace end--> <!--skywalking trace end-->
@ -83,6 +88,19 @@
<artifactId>hutool-all</artifactId> <artifactId>hutool-all</artifactId>
<version>5.8.21</version> <version>5.8.21</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.skywalking/apm-toolkit-trace -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<version>8.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-webflux</artifactId>
<version>8.15.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -23,6 +23,10 @@ import com.qiuguo.iot.data.service.device.DeviceUserTalkRecordService;
import com.qiuguo.iot.third.service.TuyaDeviceConnector; import com.qiuguo.iot.third.service.TuyaDeviceConnector;
import com.qiuguo.iot.data.resp.device.DeviceInitResp; import com.qiuguo.iot.data.resp.device.DeviceInitResp;
import com.qiuguo.iot.user.api.rest.device.SetDeviceBindInfoRest; import com.qiuguo.iot.user.api.rest.device.SetDeviceBindInfoRest;
import org.apache.skywalking.apm.toolkit.trace.CarrierItemRef;
import org.apache.skywalking.apm.toolkit.trace.ContextCarrierRef;
import org.apache.skywalking.apm.toolkit.trace.Tracer;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
@ -72,6 +76,7 @@ public class DeviceController {
public Mono<DeviceInitResp> deviceInit(@RequestParam String wifiMac, @RequestParam String btMac, public Mono<DeviceInitResp> deviceInit(@RequestParam String wifiMac, @RequestParam String btMac,
@RequestParam Integer type, @RequestParam Long time, @RequestParam Integer type, @RequestParam Long time,
@RequestParam String signature){ @RequestParam String signature){
//deviceInfoService. //deviceInfoService.
Long now = System.currentTimeMillis(); Long now = System.currentTimeMillis();
if(checkTimeout && now - time > timeOut){ if(checkTimeout && now - time > timeOut){
@ -95,7 +100,8 @@ public class DeviceController {
if (!md5.equals(signature)) { if (!md5.equals(signature)) {
// //
//验签失败 //验签失败
log.info("验签失败");
log.info("验签失败,{}", md5);
BusinessException ex = new BusinessException("验签失败"); BusinessException ex = new BusinessException("验签失败");
return Mono.error(ex); return Mono.error(ex);
} }
@ -103,9 +109,8 @@ public class DeviceController {
DeviceInfoRequest request = new DeviceInfoRequest(); DeviceInfoRequest request = new DeviceInfoRequest();
request.setWifiMac(wifiMac); request.setWifiMac(wifiMac);
request.setBtMac(btMac); request.setBtMac(btMac);
Mono<DeviceInfoEntity> mono = deviceInfoService.selectDeviceInfoByRequest(request);
return mono.defaultIfEmpty(new DeviceInfoEntity()).flatMap(entity -> { return deviceInfoService.selectDeviceInfoByRequest(request).defaultIfEmpty(new DeviceInfoEntity()).flatMap(entity -> {
Mono mono1 = null; Mono mono1 = null;
if(entity.getId() == null){ if(entity.getId() == null){
entity.setWifiMac(wifiMac); entity.setWifiMac(wifiMac);

View File

@ -2,6 +2,10 @@ package com.qiuguo.iot.user.api.filter;
import com.qiuguo.iot.base.constans.Log4Constans; import com.qiuguo.iot.base.constans.Log4Constans;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.toolkit.trace.Trace;
import org.apache.skywalking.apm.toolkit.trace.TraceContext;
import org.apache.skywalking.apm.toolkit.trace.Tracer;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.slf4j.MDC; import org.slf4j.MDC;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -22,14 +26,18 @@ import javax.annotation.PreDestroy;
@Configuration @Configuration
@Slf4j @Slf4j
public class LogMdcConfiguration { public class LogMdcConfiguration {
public int hashcode;
@PostConstruct @PostConstruct
public void contextOperatorHook() { public void contextOperatorHook() {
Hooks.onEachOperator(Log4Constans.PRINT_LOG_ID, Operators.lift((r, c) ->{ Hooks.onEachOperator(Log4Constans.PRINT_LOG_ID, Operators.lift((r, c) ->{
Context ctx = c.currentContext(); Context ctx = c.currentContext();
if(ctx.hasKey(Log4Constans.PRINT_LOG_ID)){ if(ctx.hasKey(Log4Constans.PRINT_LOG_ID)){
MDC.put(Log4Constans.PRINT_LOG_ID, ctx.get(Log4Constans.PRINT_LOG_ID)); MDC.put(Log4Constans.PRINT_LOG_ID, ctx.get(Log4Constans.PRINT_LOG_ID));
Tracer.createLocalSpan(Log4Constans.PRINT_LOG_ID);
} }
return new MdcContextSubscriber(c); return new MdcContextSubscriber(c);
})); }));
} }
@PreDestroy @PreDestroy
@ -46,19 +54,28 @@ public class LogMdcConfiguration {
@Override @Override
public void onComplete() { public void onComplete() {
coreSubscriber.onComplete(); coreSubscriber.onComplete();
Tracer.stopSpan();
MDC.remove(Log4Constans.PRINT_LOG_ID); MDC.remove(Log4Constans.PRINT_LOG_ID);
} }
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
log.info("异常{}", throwable); int hcode = throwable.hashCode();
coreSubscriber.onError(throwable); if(hcode != hashcode){
log.info("异常{}", throwable);
hashcode = hcode;
}
MDC.remove(Log4Constans.PRINT_LOG_ID); MDC.remove(Log4Constans.PRINT_LOG_ID);
Tracer.stopSpan();
coreSubscriber.onError(throwable);
} }
@Override @Override
public void onSubscribe(Subscription subscription) { public void onSubscribe(Subscription subscription) {
coreSubscriber.onSubscribe(subscription); coreSubscriber.onSubscribe(subscription);
} }

View File

@ -2,14 +2,18 @@ package com.qiuguo.iot.user.api.filter;
import com.qiuguo.iot.base.constans.Log4Constans; import com.qiuguo.iot.base.constans.Log4Constans;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.toolkit.trace.*;
import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
import org.hswebframework.web.logger.ReactiveLogger; import org.hswebframework.web.logger.ReactiveLogger;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.slf4j.MDC; import org.slf4j.MDC;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
@ -18,8 +22,10 @@ import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain; import org.springframework.web.server.WebFilterChain;
import org.springframework.web.servlet.ModelAndView;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.util.context.Context; import reactor.util.context.Context;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -35,15 +41,15 @@ import java.nio.charset.StandardCharsets;
public class LogWebFilter implements WebFilter { public class LogWebFilter implements WebFilter {
@Override @Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
//Tracer.stopSpan();
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest request = exchange.getRequest();
String requestId = request.getId(); String requestId = request.getId();
String ip = request.getRemoteAddress().getAddress().getHostAddress(); String ip = request.getRemoteAddress().getAddress().getHostAddress();
if(request.getHeaders().containsKey(Log4Constans.PRINT_LOG_ID)){ if(request.getHeaders().containsKey(Log4Constans.HEADER_TRACE_ID)){
//网关生成的tracId //网关生成的tracId
requestId = request.getHeaders().get(Log4Constans.PRINT_LOG_ID).get(0); requestId = request.getHeaders().get(Log4Constans.HEADER_TRACE_ID).get(0);
} }
if(request.getHeaders().containsKey(Log4Constans.CUSTOMER_IP)){ if(request.getHeaders().containsKey(Log4Constans.CUSTOMER_IP)){
//网关透传过来的IP //网关透传过来的IP
ip = request.getHeaders().get(Log4Constans.CUSTOMER_IP).get(0); ip = request.getHeaders().get(Log4Constans.CUSTOMER_IP).get(0);
@ -53,7 +59,6 @@ public class LogWebFilter implements WebFilter {
String m = request.getMethod().toString(); String m = request.getMethod().toString();
log.info("api start time:{} ip:{} method:{} url:{} param:{} headers:{}", log.info("api start time:{} ip:{} method:{} url:{} param:{} headers:{}",
startTime, startTime,
ip, ip,
@ -66,36 +71,45 @@ public class LogWebFilter implements WebFilter {
ex.response(getResponse(exchange, requestId)); ex.response(getResponse(exchange, requestId));
if(!request.getMethod().equals(HttpMethod.GET) && !request.getMethod().equals(HttpMethod.DELETE)){
ex.request(getRequest(exchange)); ex.request(getRequest(exchange));
}
String tracId = requestId; String tracId = requestId;
return chain.filter(ex.build()) ServerWebExchange newEx = ex.build();
return chain.filter(newEx)
.contextWrite(context -> { .contextWrite(context -> {
Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, tracId); Context contextTmp = context.put(Log4Constans.PRINT_LOG_ID, tracId);
return contextTmp; return contextTmp;
})
.doFinally(signalType -> { }).doFinally(signalType -> {
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
MDC.put(Log4Constans.PRINT_LOG_ID, tracId); MDC.put(Log4Constans.PRINT_LOG_ID, tracId);
log.info("api end time:{}, total time:{}", endTime, endTime - startTime); log.info("api end time:{}, total time:{}", endTime, endTime - startTime);
MDC.remove(Log4Constans.PRINT_LOG_ID); MDC.remove(Log4Constans.PRINT_LOG_ID);
}); });
} }
private ServerHttpRequest getRequest(ServerWebExchange exchange){ private ServerHttpRequest getRequest(ServerWebExchange exchange){
ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest request = exchange.getRequest();
return new ServerHttpRequestDecorator(request){ return new ServerHttpRequestDecorator(request){
@Override @Override
public Flux<DataBuffer> getBody() { public Flux<DataBuffer> getBody() {
Flux<DataBuffer> body = this.getDelegate().getBody(); Flux<DataBuffer> body = this.getDelegate().getBody();
return body.map(dataBuffer -> { return body.map(dataBuffer -> {
log.info("request:{}", dataBuffer.toString(StandardCharsets.UTF_8)); if(!request.getMethod().equals(HttpMethod.GET) && !request.getMethod().equals(HttpMethod.DELETE)) {
log.info("request:{}", dataBuffer.toString(StandardCharsets.UTF_8));
}
return dataBuffer; return dataBuffer;
}); });
} }
}; };
} }
private ServerHttpResponse getResponse(ServerWebExchange exchange, String requestId){ private ServerHttpResponse getResponse(ServerWebExchange exchange, String requestId){
@ -126,6 +140,7 @@ public class LogWebFilter implements WebFilter {
return super.writeWith(body); return super.writeWith(body);
} }
}; };
} }
} }

View File

@ -1,15 +0,0 @@
package com.qiuguo.iot.user.api.filter;
import ch.qos.logback.classic.pattern.MDCConverter;
import ch.qos.logback.classic.spi.ILoggingEvent;
public class LogbackMDCPatternConverter extends MDCConverter {
@Override
public void start() {
super.start();
}
@Override
public String convert(ILoggingEvent iLoggingEvent) {
return super.convert(iLoggingEvent);
}
}

View File

@ -1,10 +0,0 @@
package com.qiuguo.iot.user.api.filter;
import ch.qos.logback.classic.PatternLayout;
public class TraceIdMDCPatternLogbackLayout extends PatternLayout {
static {
defaultConverterMap.put("X", LogbackMDCPatternConverter.class.getName());
defaultConverterMap.put("mdc", LogbackMDCPatternConverter.class.getName());
}
}

View File

@ -38,7 +38,7 @@
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder"> <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="com.qiuguo.iot.user.api.filter.TraceIdMDCPatternLogbackLayout"> <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout">
<Pattern>${PATTERN}</Pattern> <Pattern>${PATTERN}</Pattern>
</layout> </layout>
</encoder> </encoder>
@ -47,7 +47,7 @@
<!-- skywalking采集日志 --> <!-- skywalking采集日志 -->
<appender name="SKYWALKING" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"> <appender name="SKYWALKING" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder"> <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="com.qiuguo.iot.user.api.filter.TraceIdMDCPatternLogbackLayout"> <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout">
<Pattern>${SKY_PATTERN}</Pattern> <Pattern>${SKY_PATTERN}</Pattern>
</layout> </layout>
</encoder> </encoder>