diff --git a/.DS_Store b/.DS_Store index 2ca9dd2..a371953 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java index 08d52fd..42d17f3 100644 --- a/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java +++ b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java @@ -49,6 +49,34 @@ public class MqService { }); } + /** + * 发送消息,失败后带重试 + * @param exchange + * @param routingKey + * @param message + * @param tryCount 重试次数 + * @return + */ + public Mono sendMessageWithConfirmation(String exchange, String routingKey, Object message, int tryCount) { + rabbitTemplate.convertAndSend(exchange, routingKey, message); + return Mono.defer(() -> { + int reCount = tryCount; + boolean result = confirmationResult.get(); + while(!result && reCount-- > 0){ + log.info("重新发送MQ,剩余次数{}", reCount); + rabbitTemplate.convertAndSend(exchange, routingKey, message); + try{ + Thread.sleep(50); + }catch (Exception e){ + log.info("发送MQ失败,重试异常{}", e); + } + result = confirmationResult.get(); + } + log.info("MQ消息发送:{}", result); + return Mono.just(result); + }); + } + public Mono sendMessage(String exchange, String routingKey, Object message) { // 只设置一次确认回调 rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> { diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java index 3a3bae8..358a1ee 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/command/ActionCommand.java @@ -177,7 +177,8 @@ public abstract class ActionCommand { //发送消息到MQ,通知U3D return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, YunxiRabbitConst.ROUTE_KEY_YUNXI, - msg); + msg, + 2); }catch (Exception e){ log.info("通知U3D MQ异常{}", e);