From 979f85f055a3962b8c99eb31d6420b707e710560 Mon Sep 17 00:00:00 2001 From: simon <861719797@qq.com> Date: Tue, 26 Sep 2023 13:23:08 +0800 Subject: [PATCH] =?UTF-8?q?MQ=E6=95=B4=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iot-common/iot-data/pom.xml | 4 ++++ .../iot/data/service/mq}/MqService.java | 2 +- .../user/api/controller/mq/MqController.java | 19 +++++++++++++------ .../iot/user/api/listener/YunxiListener.java | 3 ++- 4 files changed, 20 insertions(+), 8 deletions(-) rename iot-common/{iot-third/src/main/java/com/qiuguo/iot/third/service => iot-data/src/main/java/com/qiuguo/iot/data/service/mq}/MqService.java (98%) diff --git a/iot-common/iot-data/pom.xml b/iot-common/iot-data/pom.xml index 4282833..acddab6 100644 --- a/iot-common/iot-data/pom.xml +++ b/iot-common/iot-data/pom.xml @@ -50,6 +50,10 @@ 0.0.1-SNAPSHOT compile + + org.springframework.amqp + spring-rabbit + ${project.artifactId} diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/MqService.java b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java similarity index 98% rename from iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/MqService.java rename to iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java index beb294e..3202ca1 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/MqService.java +++ b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/service/mq/MqService.java @@ -1,4 +1,4 @@ -package com.qiuguo.iot.third.service; +package com.qiuguo.iot.data.service.mq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/mq/MqController.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/mq/MqController.java index abf0c04..82a6b81 100644 --- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/mq/MqController.java +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/mq/MqController.java @@ -2,15 +2,15 @@ package com.qiuguo.iot.user.api.controller.mq; import com.qiuguo.iot.data.constants.YunxiRabbitConst; import com.qiuguo.iot.data.entity.user.UserHomeEntity; -import com.qiuguo.iot.third.service.MqService; +import com.qiuguo.iot.data.service.mq.MqService; import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; +import javax.annotation.Resource; + /** * @author simon * @date 2023/9/25 @@ -20,16 +20,23 @@ import reactor.core.publisher.Mono; @Slf4j @RequestMapping("/mq") public class MqController { - @Autowired + @Resource private MqService mqService; @GetMapping("/sendMsg") - public Mono sendMsg() { + public Mono sendMsg() { UserHomeEntity userHomeEntity = new UserHomeEntity(); userHomeEntity.setUserId(1L); userHomeEntity.setHomeName("小米的家"); userHomeEntity.setUserId(System.currentTimeMillis()); - return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, YunxiRabbitConst.ROUTE_KEY_YUNXI, userHomeEntity); + Mono booleanMono = mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, YunxiRabbitConst.ROUTE_KEY_YUNXI, userHomeEntity); + return booleanMono.flatMap(res -> { + if (!res) { + return Mono.error(new RuntimeException("消息发送失败")); + } else { + return Mono.just(true); + } + }); } } diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/listener/YunxiListener.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/listener/YunxiListener.java index 79e34ab..ceb3c1b 100644 --- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/listener/YunxiListener.java +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/listener/YunxiListener.java @@ -22,8 +22,9 @@ public class YunxiListener { public void processYunxiQueue(Channel channel, Message message) throws IOException { String messageContent = new String(message.getBody(), "UTF-8"); System.out.println("YunxiListener msg " + messageContent); + //TODO 消费者处理程序 - //手动消息确认 配置需开启 acknowledge-mode: manual + //处理完毕 手动消息确认 配置需开启 acknowledge-mode: manual channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }