From ce51558fed8c0dac73d95f0a5a180db98e060269 Mon Sep 17 00:00:00 2001 From: wulin Date: Fri, 27 Oct 2023 09:33:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0MQ=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .DS_Store | Bin 6148 -> 6148 bytes .../qiuguo/iot/data/service/mq/MqService.java | 28 ++++++++++++++++++ .../websocket/api/command/ActionCommand.java | 3 +- 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/.DS_Store b/.DS_Store index 2ca9dd2696a8ca9ea709237640c29ddf20a82781..a3719534acca7d2dd377d6c7fd522763ae5723cb 100644 GIT binary patch delta 46 zcmZoMXffDe!OX}p*^)V0nv*kLK%%-@*V4>PN5R~vR!5=Q!pJ~J!P3Hb^Lpk`5daB% B3se9A delta 40 vcmZoMXffDe!93ZSDM4JKy4uu2N5ROs;=kp5O diff --git a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java index 08d52fd..42d17f3 100644 --- a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java +++ b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java @@ -49,6 +49,34 @@ public class MqService { }); } + /** + * 发送消息,失败后带重试 + * @param exchange + * @param routingKey + * @param message + * @param tryCount 重试次数 + * @return + */ + public Mono sendMessageWithConfirmation(String exchange, String routingKey, Object message, int tryCount) { + rabbitTemplate.convertAndSend(exchange, routingKey, message); + return Mono.defer(() -> { + int reCount = tryCount; + boolean result = confirmationResult.get(); + while(!result && reCount-- > 0){ + log.info("重新发送MQ,剩余次数{}", reCount); + rabbitTemplate.convertAndSend(exchange, routingKey, message); + try{ + Thread.sleep(50); + }catch (Exception e){ + log.info("发送MQ失败,重试异常{}", e); + } + result = confirmationResult.get(); + } + log.info("MQ消息发送:{}", result); + return Mono.just(result); + }); + } + public Mono sendMessage(String exchange, String routingKey, Object message) { // 只设置一次确认回调 rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> { diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java index 3a3bae8..358a1ee 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java @@ -177,7 +177,8 @@ public abstract class ActionCommand { //发送消息到MQ,通知U3D return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, YunxiRabbitConst.ROUTE_KEY_YUNXI, - msg); + msg, + 2); }catch (Exception e){ log.info("通知U3D MQ异常{}", e);