diff --git a/iot-common/iot-rabbit/pom.xml b/iot-common/iot-rabbit/pom.xml new file mode 100644 index 0000000..4f63e3c --- /dev/null +++ b/iot-common/iot-rabbit/pom.xml @@ -0,0 +1,58 @@ + + + 4.0.0 + + com.qiuguo.iot + iot-common + 0.0.1-SNAPSHOT + + + iot-rabbit + iot-rabbit + mq类 + + + 1.8 + + + + org.springframework.boot + spring-boot-starter-amqp + + + org.springframework.boot + spring-boot-starter-webflux + + + + + org.springframework.amqp + spring-rabbit-test + test + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring.boot.maven.plugin.version} + + + true + + + + + repackage + + + + + + + + diff --git a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/config/RunXiRabbitConfig.java b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/config/RunXiRabbitConfig.java new file mode 100644 index 0000000..11796f7 --- /dev/null +++ b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/config/RunXiRabbitConfig.java @@ -0,0 +1,29 @@ +package com.qiuguo.iot.rabbit.config; + +import com.qiuguo.iot.rabbit.constants.YunxiRabbitConst; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.Exchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class RunXiRabbitConfig { + + /** + * 交换机 + * + * @return + */ + @Bean + public Exchange RunxiEventExchange() { + return new TopicExchange(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, + true, + false, + null); + } +} diff --git a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/constants/YunxiRabbitConst.java b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/constants/YunxiRabbitConst.java new file mode 100644 index 0000000..c53e4a5 --- /dev/null +++ b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/constants/YunxiRabbitConst.java @@ -0,0 +1,23 @@ +package com.qiuguo.iot.rabbit.constants; + +/** + * @author simon + * @date 2023/9/25 + * @description + **/ +public class YunxiRabbitConst { + /** + * 云栖活动交换机 + */ + public static final String EXCHANGE_YUNXI_EVENT = "iot-yunxi-exchange"; + + /** + * 云栖活动队列 + */ + public static final String QUEUE_YUNXI = "iot.yunxi.queue"; + + /** + * 云栖活动发送的路由键 + */ + public static final String ROUTE_KEY_YUNXI = "iot.yunxi"; +} diff --git a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/service/MqSendMsgService.java b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/service/MqSendMsgService.java new file mode 100644 index 0000000..9370d11 --- /dev/null +++ b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/service/MqSendMsgService.java @@ -0,0 +1,33 @@ +package com.qiuguo.iot.rabbit.service; + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +import javax.annotation.Resource; + +/** + * @author simon + * @date 2023/9/25 + * @description + **/ +@Service +public class MqSendMsgService { + @Resource + private RabbitTemplate rabbitTemplate; + public Mono sendRabbitMsg(String exchange, String routingKey, Object message) { + return Mono.create(sink -> { + rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> { + if (ack) { + // 消息发送成功 + sink.success(Boolean.TRUE); + } else { + // 消息发送失败 + sink.error(new RuntimeException("Message send failed: " + cause)); + } + }); + + rabbitTemplate.convertAndSend(exchange, routingKey, message); + }); + } +} diff --git a/iot-common/pom.xml b/iot-common/pom.xml index 1cc5a63..3b161a0 100644 --- a/iot-common/pom.xml +++ b/iot-common/pom.xml @@ -15,6 +15,7 @@ iot-base iot-data iot-third + iot-rabbit