diff --git a/iot-common/iot-base/src/main/java/com/qiuguo/iot/base/utils/WebClientUtils.java b/iot-common/iot-base/src/main/java/com/qiuguo/iot/base/utils/WebClientUtils.java index 2023b11..a239930 100644 --- a/iot-common/iot-base/src/main/java/com/qiuguo/iot/base/utils/WebClientUtils.java +++ b/iot-common/iot-base/src/main/java/com/qiuguo/iot/base/utils/WebClientUtils.java @@ -30,18 +30,12 @@ public class WebClientUtils { public static Mono get(String url, Map headers) { log.info("GET WebClient URL:{} headers:{}", url, headers); - String logId = MDC.get(Log4Constans.PRINT_LOG_ID); if(headers == null || headers.size() == 0) { return webClient.get().uri(url).retrieve().bodyToMono(String.class).flatMap(s-> { - log.info("[{}]GET WebClient Respon:{}", logId, s); - return Mono.just(JSONObject.parseObject(s)); - }).contextWrite(ctx -> { - if(StringUtils.isNotEmpty(logId)){ - Context contextTmp = ctx.put(Log4Constans.PRINT_LOG_ID, logId); - return contextTmp; - } - return ctx; + log.info("GET WebClient Respon:{}", s); + + return Mono.just(JSONObject.parseObject(s)); }); }else{ return webClient.get().uri(url).headers(httpHeaders -> { @@ -50,35 +44,20 @@ public class WebClientUtils { httpHeaders.set(key, headers.get(key)); } }).retrieve().bodyToMono(String.class).flatMap(s-> { - log.info("[{}]GET WebClient Respon:{}", logId, s); + log.info("GET WebClient Respon:{}", s); return Mono.just(JSONObject.parseObject(s)); - }).contextWrite(ctx -> { - if(StringUtils.isNotEmpty(logId)){ - Context contextTmp = ctx.put(Log4Constans.PRINT_LOG_ID, logId); - - return contextTmp; - } - return ctx; }); } } public static Mono post(String url, JSONObject body, Map headers) { log.info("POST WebClient URL:{} body:{} headers:{}", url, body, headers); - String logId = MDC.get(Log4Constans.PRINT_LOG_ID); if(headers == null || headers.size() == 0) { return webClient.post().uri(url).bodyValue(body.toString()).retrieve().bodyToMono(String.class).flatMap(s-> { - log.info("[{}]POST WebClient Respon:{}", logId, s); + log.info("POST WebClient Respon:{}", s); return Mono.just(JSONObject.parseObject(s)); - }).contextWrite(ctx -> { - if(StringUtils.isNotEmpty(logId)){ - Context contextTmp = ctx.put(Log4Constans.PRINT_LOG_ID, logId); - - return contextTmp; - } - return ctx; }); }else{ return webClient.post().uri(url).bodyValue(body.toString()).headers(httpHeaders -> { @@ -87,15 +66,9 @@ public class WebClientUtils { httpHeaders.set(key, headers.get(key)); } }).retrieve().bodyToMono(String.class).flatMap(s-> { - log.info("[{}]POST WebClient Respon:{}", logId, s); - return Mono.just(JSONObject.parseObject(s)); - }).contextWrite(ctx -> { - if(StringUtils.isNotEmpty(logId)){ - Context contextTmp = ctx.put(Log4Constans.PRINT_LOG_ID, logId); - return contextTmp; - } - return ctx; + log.info("POST WebClient Respon:{}", s); + return Mono.just(JSONObject.parseObject(s)); }); } diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/command/ActionCommand.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/command/ActionCommand.java index 2a2f9cc..f2ed4c5 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/command/ActionCommand.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/command/ActionCommand.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.context.Context; import javax.annotation.Resource; @@ -145,12 +146,13 @@ public abstract class ActionCommand { @Override public void sendMessage(String message) { //通知到客户端 - MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId()); if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) { queueMessage.getQueue().add(message); if(queueMessage.getStatus() == YesNo.YES.getCode().intValue()){ queueMessage.setStatus(2); - setQueueMessage(action, baseSession, queueMessage, type, actionSendMessage).subscribeOn(Schedulers.single()).subscribe(); + setQueueMessage(action, baseSession, queueMessage, type, actionSendMessage) + .subscribeOn(Schedulers.single()) + .subscribe(); } return; } @@ -163,7 +165,6 @@ public abstract class ActionCommand { public void finish() { log.info("千问最后调用finish"); queueMessage.setStatus(YesNo.NO.getCode()); - MDC.remove(Log4Constans.PRINT_LOG_ID); } }; } @@ -211,7 +212,11 @@ public abstract class ActionCommand { } return sendMessage(action, baseSession, "很抱歉,我无法回答您的问题,请换一个问题。", AskTypeEnum.NONE.getCode(), actionSendMessage); } - }); + })/*.contextWrite(ctx -> { + Context contextTmp = ctx.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId()); + + return contextTmp; + })*/; } protected Mono sendMq(String msg){ diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java index df4d941..8b945a6 100644 --- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java @@ -27,10 +27,6 @@ public class LogMdcConfiguration { @PostConstruct public void contextOperatorHook() { Hooks.onEachOperator(PRINT_LOG_ID, Operators.lift((r, c) ->{ - Context ctx = c.currentContext(); - if(ctx.hasKey(PRINT_LOG_ID)){ - MDC.put(PRINT_LOG_ID, ctx.get(PRINT_LOG_ID)); - } return new MdcContextSubscriber(c); })); } @@ -69,7 +65,9 @@ public class LogMdcConfiguration { @Override public void onNext(T t) { - + if(currentContext().hasKey(PRINT_LOG_ID)){ + MDC.put(PRINT_LOG_ID, currentContext().get(PRINT_LOG_ID)); + } coreSubscriber.onNext(t); } diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogWebFilter.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogWebFilter.java index 1892179..b8dc3ee 100644 --- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogWebFilter.java +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogWebFilter.java @@ -9,6 +9,7 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; @@ -32,6 +33,8 @@ import java.nio.charset.StandardCharsets; @Configuration @Slf4j public class LogWebFilter implements WebFilter { + + public static String HEAD_IP = "customerIP"; @Override public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { long startTime = System.currentTimeMillis(); @@ -55,7 +58,7 @@ public class LogWebFilter implements WebFilter { ex.response(getResponse(exchange, requestId)); if(!request.getMethod().equals(HttpMethod.GET) && !request.getMethod().equals(HttpMethod.DELETE)){ - ex.request(getRequest(exchange)); + ex.request(getRequest(exchange, ip, requestId)); } return chain.filter(ex.build()) .contextWrite(context -> { @@ -71,17 +74,28 @@ public class LogWebFilter implements WebFilter { }); } - private ServerHttpRequest getRequest(ServerWebExchange exchange){ + private ServerHttpRequest getRequest(ServerWebExchange exchange, String customerIp, String requestId){ ServerHttpRequest request = exchange.getRequest(); return new ServerHttpRequestDecorator(request){ @Override public Flux getBody() { Flux body = this.getDelegate().getBody(); return body.map(dataBuffer -> { + MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId); log.info("request:{}", dataBuffer.toString(StandardCharsets.UTF_8)); + MDC.remove(LogMdcConfiguration.PRINT_LOG_ID); return dataBuffer; }); } + + @Override + public HttpHeaders getHeaders(){ + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.putAll(super.getHeaders()); + httpHeaders.set(HEAD_IP, customerIp); + httpHeaders.set(LogMdcConfiguration.PRINT_LOG_ID, requestId); + return httpHeaders; + } }; } @@ -93,6 +107,7 @@ public class LogWebFilter implements WebFilter { if (body instanceof Flux) { Flux fluxBody = Flux.from(body); return super.writeWith(fluxBody.buffer().map(dataBuffers -> { + MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId); DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); DataBuffer joinBuffer = dataBufferFactory.join(dataBuffers); byte[] returnContent = new byte[joinBuffer.readableByteCount()]; @@ -105,6 +120,7 @@ public class LogWebFilter implements WebFilter { }else if(body instanceof Mono){ Mono monoBody = Mono.from(body); return super.writeWith(monoBody.map(dataBuffer -> { + MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId); String returnStr = dataBuffer.toString(StandardCharsets.UTF_8); log.info("mono response:{}", returnStr); return response.bufferFactory().wrap(returnStr.getBytes()); diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/filter/LogMdcConfiguration.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/filter/LogMdcConfiguration.java index 89154ab..396d1f4 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/filter/LogMdcConfiguration.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/filter/LogMdcConfiguration.java @@ -26,10 +26,6 @@ public class LogMdcConfiguration { @PostConstruct public void contextOperatorHook() { Hooks.onEachOperator(PRINT_LOG_ID, Operators.lift((r, c) ->{ - Context ctx = c.currentContext(); - if(ctx.hasKey(PRINT_LOG_ID)){ - MDC.put(PRINT_LOG_ID, ctx.get(PRINT_LOG_ID)); - } return new MdcContextSubscriber(c); })); } @@ -69,7 +65,9 @@ public class LogMdcConfiguration { @Override public void onNext(T t) { - + if(coreSubscriber.currentContext().hasKey(PRINT_LOG_ID)){ + MDC.put(PRINT_LOG_ID, coreSubscriber.currentContext().get(PRINT_LOG_ID)); + } coreSubscriber.onNext(t); }