Merge branch '20231110_v1.0.1'
This commit is contained in:
commit
987a8bbc93
@ -30,18 +30,12 @@ public class WebClientUtils {
|
||||
|
||||
public static Mono<JSONObject> get(String url, Map<String, String> headers) {
|
||||
log.info("GET WebClient URL:{} headers:{}", url, headers);
|
||||
String logId = MDC.get(Log4Constans.PRINT_LOG_ID);
|
||||
if(headers == null || headers.size() == 0) {
|
||||
return webClient.get().uri(url).retrieve().bodyToMono(String.class).flatMap(s-> {
|
||||
log.info("[{}]GET WebClient Respon:{}", logId, s);
|
||||
return Mono.just(JSONObject.parseObject(s));
|
||||
}).contextWrite(ctx -> {
|
||||
if(StringUtils.isNotEmpty(logId)){
|
||||
Context contextTmp = ctx.put(Log4Constans.PRINT_LOG_ID, logId);
|
||||
|
||||
return contextTmp;
|
||||
}
|
||||
return ctx;
|
||||
log.info("GET WebClient Respon:{}", s);
|
||||
|
||||
return Mono.just(JSONObject.parseObject(s));
|
||||
});
|
||||
}else{
|
||||
return webClient.get().uri(url).headers(httpHeaders -> {
|
||||
@ -50,35 +44,20 @@ public class WebClientUtils {
|
||||
httpHeaders.set(key, headers.get(key));
|
||||
}
|
||||
}).retrieve().bodyToMono(String.class).flatMap(s-> {
|
||||
log.info("[{}]GET WebClient Respon:{}", logId, s);
|
||||
log.info("GET WebClient Respon:{}", s);
|
||||
return Mono.just(JSONObject.parseObject(s));
|
||||
}).contextWrite(ctx -> {
|
||||
if(StringUtils.isNotEmpty(logId)){
|
||||
Context contextTmp = ctx.put(Log4Constans.PRINT_LOG_ID, logId);
|
||||
|
||||
return contextTmp;
|
||||
}
|
||||
return ctx;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public static Mono<JSONObject> post(String url, JSONObject body, Map<String, String> headers) {
|
||||
log.info("POST WebClient URL:{} body:{} headers:{}", url, body, headers);
|
||||
String logId = MDC.get(Log4Constans.PRINT_LOG_ID);
|
||||
if(headers == null || headers.size() == 0) {
|
||||
return webClient.post().uri(url).bodyValue(body.toString()).retrieve().bodyToMono(String.class).flatMap(s->
|
||||
{
|
||||
|
||||
log.info("[{}]POST WebClient Respon:{}", logId, s);
|
||||
log.info("POST WebClient Respon:{}", s);
|
||||
return Mono.just(JSONObject.parseObject(s));
|
||||
}).contextWrite(ctx -> {
|
||||
if(StringUtils.isNotEmpty(logId)){
|
||||
Context contextTmp = ctx.put(Log4Constans.PRINT_LOG_ID, logId);
|
||||
|
||||
return contextTmp;
|
||||
}
|
||||
return ctx;
|
||||
});
|
||||
}else{
|
||||
return webClient.post().uri(url).bodyValue(body.toString()).headers(httpHeaders -> {
|
||||
@ -87,15 +66,9 @@ public class WebClientUtils {
|
||||
httpHeaders.set(key, headers.get(key));
|
||||
}
|
||||
}).retrieve().bodyToMono(String.class).flatMap(s-> {
|
||||
log.info("[{}]POST WebClient Respon:{}", logId, s);
|
||||
return Mono.just(JSONObject.parseObject(s));
|
||||
}).contextWrite(ctx -> {
|
||||
if(StringUtils.isNotEmpty(logId)){
|
||||
Context contextTmp = ctx.put(Log4Constans.PRINT_LOG_ID, logId);
|
||||
|
||||
return contextTmp;
|
||||
}
|
||||
return ctx;
|
||||
log.info("POST WebClient Respon:{}", s);
|
||||
return Mono.just(JSONObject.parseObject(s));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.MDC;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.context.Context;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@ -145,12 +146,13 @@ public abstract class ActionCommand {
|
||||
@Override
|
||||
public void sendMessage(String message) {
|
||||
//通知到客户端
|
||||
MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId());
|
||||
if (tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())) {
|
||||
queueMessage.getQueue().add(message);
|
||||
if(queueMessage.getStatus() == YesNo.YES.getCode().intValue()){
|
||||
queueMessage.setStatus(2);
|
||||
setQueueMessage(action, baseSession, queueMessage, type, actionSendMessage).subscribeOn(Schedulers.single()).subscribe();
|
||||
setQueueMessage(action, baseSession, queueMessage, type, actionSendMessage)
|
||||
.subscribeOn(Schedulers.single())
|
||||
.subscribe();
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -163,7 +165,6 @@ public abstract class ActionCommand {
|
||||
public void finish() {
|
||||
log.info("千问最后调用finish");
|
||||
queueMessage.setStatus(YesNo.NO.getCode());
|
||||
MDC.remove(Log4Constans.PRINT_LOG_ID);
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -211,7 +212,11 @@ public abstract class ActionCommand {
|
||||
}
|
||||
return sendMessage(action, baseSession, "很抱歉,我无法回答您的问题,请换一个问题。", AskTypeEnum.NONE.getCode(), actionSendMessage);
|
||||
}
|
||||
});
|
||||
})/*.contextWrite(ctx -> {
|
||||
Context contextTmp = ctx.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId());
|
||||
|
||||
return contextTmp;
|
||||
})*/;
|
||||
}
|
||||
|
||||
protected Mono<Boolean> sendMq(String msg){
|
||||
|
||||
@ -27,10 +27,6 @@ public class LogMdcConfiguration {
|
||||
@PostConstruct
|
||||
public void contextOperatorHook() {
|
||||
Hooks.onEachOperator(PRINT_LOG_ID, Operators.lift((r, c) ->{
|
||||
Context ctx = c.currentContext();
|
||||
if(ctx.hasKey(PRINT_LOG_ID)){
|
||||
MDC.put(PRINT_LOG_ID, ctx.get(PRINT_LOG_ID));
|
||||
}
|
||||
return new MdcContextSubscriber(c);
|
||||
}));
|
||||
}
|
||||
@ -69,7 +65,9 @@ public class LogMdcConfiguration {
|
||||
|
||||
@Override
|
||||
public void onNext(T t) {
|
||||
|
||||
if(currentContext().hasKey(PRINT_LOG_ID)){
|
||||
MDC.put(PRINT_LOG_ID, currentContext().get(PRINT_LOG_ID));
|
||||
}
|
||||
coreSubscriber.onNext(t);
|
||||
}
|
||||
|
||||
|
||||
@ -9,6 +9,7 @@ import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
|
||||
@ -32,6 +33,8 @@ import java.nio.charset.StandardCharsets;
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class LogWebFilter implements WebFilter {
|
||||
|
||||
public static String HEAD_IP = "customerIP";
|
||||
@Override
|
||||
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
@ -55,7 +58,7 @@ public class LogWebFilter implements WebFilter {
|
||||
ex.response(getResponse(exchange, requestId));
|
||||
|
||||
if(!request.getMethod().equals(HttpMethod.GET) && !request.getMethod().equals(HttpMethod.DELETE)){
|
||||
ex.request(getRequest(exchange));
|
||||
ex.request(getRequest(exchange, ip, requestId));
|
||||
}
|
||||
return chain.filter(ex.build())
|
||||
.contextWrite(context -> {
|
||||
@ -71,17 +74,28 @@ public class LogWebFilter implements WebFilter {
|
||||
});
|
||||
}
|
||||
|
||||
private ServerHttpRequest getRequest(ServerWebExchange exchange){
|
||||
private ServerHttpRequest getRequest(ServerWebExchange exchange, String customerIp, String requestId){
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
return new ServerHttpRequestDecorator(request){
|
||||
@Override
|
||||
public Flux<DataBuffer> getBody() {
|
||||
Flux<DataBuffer> body = this.getDelegate().getBody();
|
||||
return body.map(dataBuffer -> {
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
log.info("request:{}", dataBuffer.toString(StandardCharsets.UTF_8));
|
||||
MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
|
||||
return dataBuffer;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpHeaders getHeaders(){
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.putAll(super.getHeaders());
|
||||
httpHeaders.set(HEAD_IP, customerIp);
|
||||
httpHeaders.set(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
return httpHeaders;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@ -93,6 +107,7 @@ public class LogWebFilter implements WebFilter {
|
||||
if (body instanceof Flux) {
|
||||
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
|
||||
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
|
||||
DataBuffer joinBuffer = dataBufferFactory.join(dataBuffers);
|
||||
byte[] returnContent = new byte[joinBuffer.readableByteCount()];
|
||||
@ -105,6 +120,7 @@ public class LogWebFilter implements WebFilter {
|
||||
}else if(body instanceof Mono){
|
||||
Mono<DataBuffer> monoBody = Mono.from(body);
|
||||
return super.writeWith(monoBody.map(dataBuffer -> {
|
||||
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, requestId);
|
||||
String returnStr = dataBuffer.toString(StandardCharsets.UTF_8);
|
||||
log.info("mono response:{}", returnStr);
|
||||
return response.bufferFactory().wrap(returnStr.getBytes());
|
||||
|
||||
@ -26,10 +26,6 @@ public class LogMdcConfiguration {
|
||||
@PostConstruct
|
||||
public void contextOperatorHook() {
|
||||
Hooks.onEachOperator(PRINT_LOG_ID, Operators.lift((r, c) ->{
|
||||
Context ctx = c.currentContext();
|
||||
if(ctx.hasKey(PRINT_LOG_ID)){
|
||||
MDC.put(PRINT_LOG_ID, ctx.get(PRINT_LOG_ID));
|
||||
}
|
||||
return new MdcContextSubscriber(c);
|
||||
}));
|
||||
}
|
||||
@ -69,7 +65,9 @@ public class LogMdcConfiguration {
|
||||
|
||||
@Override
|
||||
public void onNext(T t) {
|
||||
|
||||
if(coreSubscriber.currentContext().hasKey(PRINT_LOG_ID)){
|
||||
MDC.put(PRINT_LOG_ID, coreSubscriber.currentContext().get(PRINT_LOG_ID));
|
||||
}
|
||||
coreSubscriber.onNext(t);
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user