diff --git a/iot-common/iot-rabbit/pom.xml b/iot-common/iot-rabbit/pom.xml new file mode 100644 index 0000000..b2b3978 --- /dev/null +++ b/iot-common/iot-rabbit/pom.xml @@ -0,0 +1,67 @@ + + + 4.0.0 + + com.qiuguo.iot + iot-common + 0.0.1-SNAPSHOT + + + iot-rabbit + iot-rabbit + mq类 + + + 1.8 + + + + org.springframework.boot + spring-boot-starter-amqp + + + org.springframework.boot + spring-boot-starter-webflux + + + + org.springframework.boot + spring-boot-starter-test + test + + + io.projectreactor + reactor-test + test + + + org.springframework.amqp + spring-rabbit-test + test + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring.boot.maven.plugin.version} + + + true + + + + + repackage + + + + + + + + diff --git a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/config/RabbitMqConfig.java b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/config/RabbitMqConfig.java new file mode 100644 index 0000000..4088043 --- /dev/null +++ b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/config/RabbitMqConfig.java @@ -0,0 +1,22 @@ +package com.qiuguo.iot.rabbit.config; + +import org.springframework.amqp.core.ExchangeBuilder; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitMqConfig { + @Bean + Exchange fanoutExchange(){ + return ExchangeBuilder.fanoutExchange("fanoutExchange").durable(true).build(); + } + + + @Bean + public RabbitAdmin rabbitAdmin(CachingConnectionFactory connectionFactory){ + return new RabbitAdmin(connectionFactory); + } +} diff --git a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/service/MessageListenerContainerFactory.java b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/service/MessageListenerContainerFactory.java new file mode 100644 index 0000000..472c48d --- /dev/null +++ b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/service/MessageListenerContainerFactory.java @@ -0,0 +1,54 @@ +package com.qiuguo.iot.rabbit.service; + +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component +public class MessageListenerContainerFactory { + @Resource + private CachingConnectionFactory connectionFactory; + @Resource + private RabbitAdmin rabbitAdmin; + @Resource + private Exchange fanoutExchange; + + public SimpleMessageListenerContainer create(String queueName) { + Queue queue = QueueBuilder.nonDurable(queueName).maxLength(10000).autoDelete().exclusive().build(); + rabbitAdmin.declareQueue(queue); + rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange).with("").noargs()); + + SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); + // 在容器中放入刚创建好的队列 + simpleMessageListenerContainer.setQueueNames(queue.getName()); + simpleMessageListenerContainer.setConnectionFactory(connectionFactory); + /* + //设置当前的消费者数量 + simpleMessageListenerContainer.setConcurrentConsumers(1); + simpleMessageListenerContainer.setMaxConcurrentConsumers(1); + //设置消息是否重回队列 + simpleMessageListenerContainer.setDefaultRequeueRejected(false); + //设置自动确认消息simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); + //设置暴露监听器通道 + simpleMessageListenerContainer.setExposeListenerChannel(true); + */ + + return simpleMessageListenerContainer; + } + public SimpleMessageListenerContainer create(){ + Queue queue = QueueBuilder.nonDurable().maxLength(10000).autoDelete().exclusive().build(); + rabbitAdmin.declareQueue(queue); + rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange).with("").noargs()); + + SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); + // 在容器中放入刚创建好的队列 + simpleMessageListenerContainer.setQueueNames(queue.getName()); + simpleMessageListenerContainer.setConnectionFactory(connectionFactory); + + return simpleMessageListenerContainer; + } +} diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/mq/MqController.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/mq/MqController.java new file mode 100644 index 0000000..83de18f --- /dev/null +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/mq/MqController.java @@ -0,0 +1,51 @@ +package com.qiuguo.iot.user.api.controller.mq; + +import com.qiuguo.iot.rabbit.service.MessageListenerContainerFactory; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; + +import javax.annotation.Resource; + +/** + * @author simon + * @date 2023/9/25 + * @description + **/ +@RestController +@Slf4j +@RequestMapping("/mq") +public class MqController { + @Resource + private MessageListenerContainerFactory containerFactory; + + @RequestMapping(value = "/test") + @ResponseBody + public Flux test() { + //用自己写的工厂创建一个监听容器 + SimpleMessageListenerContainer container = containerFactory.create("iot.yunxi.queue"); + + return Flux.create(sink->{ + //容器中设置监听器用于接收到消息后使用sink发送给客户端 + container.setupMessageListener((ChannelAwareMessageListener)(Message message, Channel channel)->{ + if (sink.isCancelled()) { + container.stop(); + return; + } + String msg = new String(message.getBody()); + sink.next(msg); + }); + + //启动容器和停止容器 + sink.onRequest(r -> container.start()); + sink.onDispose(container::stop); + }); + + } +}