增加MQ重试
This commit is contained in:
parent
c784d5d35c
commit
ce51558fed
@ -49,6 +49,34 @@ public class MqService {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息,失败后带重试
|
||||
* @param exchange
|
||||
* @param routingKey
|
||||
* @param message
|
||||
* @param tryCount 重试次数
|
||||
* @return
|
||||
*/
|
||||
public Mono<Boolean> sendMessageWithConfirmation(String exchange, String routingKey, Object message, int tryCount) {
|
||||
rabbitTemplate.convertAndSend(exchange, routingKey, message);
|
||||
return Mono.defer(() -> {
|
||||
int reCount = tryCount;
|
||||
boolean result = confirmationResult.get();
|
||||
while(!result && reCount-- > 0){
|
||||
log.info("重新发送MQ,剩余次数{}", reCount);
|
||||
rabbitTemplate.convertAndSend(exchange, routingKey, message);
|
||||
try{
|
||||
Thread.sleep(50);
|
||||
}catch (Exception e){
|
||||
log.info("发送MQ失败,重试异常{}", e);
|
||||
}
|
||||
result = confirmationResult.get();
|
||||
}
|
||||
log.info("MQ消息发送:{}", result);
|
||||
return Mono.just(result);
|
||||
});
|
||||
}
|
||||
|
||||
public Mono<Object> sendMessage(String exchange, String routingKey, Object message) {
|
||||
// 只设置一次确认回调
|
||||
rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {
|
||||
|
||||
@ -177,7 +177,8 @@ public abstract class ActionCommand {
|
||||
//发送消息到MQ,通知U3D
|
||||
return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT,
|
||||
YunxiRabbitConst.ROUTE_KEY_YUNXI,
|
||||
msg);
|
||||
msg,
|
||||
2);
|
||||
|
||||
}catch (Exception e){
|
||||
log.info("通知U3D MQ异常{}", e);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user