Compare commits
3 Commits
master
...
feature-sh
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b09d479140 | ||
|
|
5366d5b41e | ||
|
|
9eeeeea273 |
67
iot-common/iot-rabbit/pom.xml
Normal file
67
iot-common/iot-rabbit/pom.xml
Normal file
@ -0,0 +1,67 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.qiuguo.iot</groupId>
|
||||
<artifactId>iot-common</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>iot-rabbit</artifactId>
|
||||
<name>iot-rabbit</name>
|
||||
<description>mq类</description>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.amqp</groupId>
|
||||
<artifactId>spring-rabbit-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<finalName>${project.artifactId}</finalName>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<version>${spring.boot.maven.plugin.version}</version>
|
||||
<configuration>
|
||||
<!--跳过对项目中main方法的查找-->
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>repackage</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@ -0,0 +1,22 @@
|
||||
package com.qiuguo.iot.rabbit.config;
|
||||
|
||||
import org.springframework.amqp.core.ExchangeBuilder;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class RabbitMqConfig {
|
||||
@Bean
|
||||
Exchange fanoutExchange(){
|
||||
return ExchangeBuilder.fanoutExchange("fanoutExchange").durable(true).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public RabbitAdmin rabbitAdmin(CachingConnectionFactory connectionFactory){
|
||||
return new RabbitAdmin(connectionFactory);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,54 @@
|
||||
package com.qiuguo.iot.rabbit.service;
|
||||
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Component
|
||||
public class MessageListenerContainerFactory {
|
||||
@Resource
|
||||
private CachingConnectionFactory connectionFactory;
|
||||
@Resource
|
||||
private RabbitAdmin rabbitAdmin;
|
||||
@Resource
|
||||
private Exchange fanoutExchange;
|
||||
|
||||
public SimpleMessageListenerContainer create(String queueName) {
|
||||
Queue queue = QueueBuilder.nonDurable(queueName).maxLength(10000).autoDelete().exclusive().build();
|
||||
rabbitAdmin.declareQueue(queue);
|
||||
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange).with("").noargs());
|
||||
|
||||
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
|
||||
// 在容器中放入刚创建好的队列
|
||||
simpleMessageListenerContainer.setQueueNames(queue.getName());
|
||||
simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
|
||||
/*
|
||||
//设置当前的消费者数量
|
||||
simpleMessageListenerContainer.setConcurrentConsumers(1);
|
||||
simpleMessageListenerContainer.setMaxConcurrentConsumers(1);
|
||||
//设置消息是否重回队列
|
||||
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
|
||||
//设置自动确认消息simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
|
||||
//设置暴露监听器通道
|
||||
simpleMessageListenerContainer.setExposeListenerChannel(true);
|
||||
*/
|
||||
|
||||
return simpleMessageListenerContainer;
|
||||
}
|
||||
public SimpleMessageListenerContainer create(){
|
||||
Queue queue = QueueBuilder.nonDurable().maxLength(10000).autoDelete().exclusive().build();
|
||||
rabbitAdmin.declareQueue(queue);
|
||||
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange).with("").noargs());
|
||||
|
||||
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
|
||||
// 在容器中放入刚创建好的队列
|
||||
simpleMessageListenerContainer.setQueueNames(queue.getName());
|
||||
simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
|
||||
|
||||
return simpleMessageListenerContainer;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,51 @@
|
||||
package com.qiuguo.iot.user.api.controller.mq;
|
||||
|
||||
import com.qiuguo.iot.rabbit.service.MessageListenerContainerFactory;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* @author simon
|
||||
* @date 2023/9/25
|
||||
* @description
|
||||
**/
|
||||
@RestController
|
||||
@Slf4j
|
||||
@RequestMapping("/mq")
|
||||
public class MqController {
|
||||
@Resource
|
||||
private MessageListenerContainerFactory containerFactory;
|
||||
|
||||
@RequestMapping(value = "/test")
|
||||
@ResponseBody
|
||||
public Flux<String> test() {
|
||||
//用自己写的工厂创建一个监听容器
|
||||
SimpleMessageListenerContainer container = containerFactory.create("iot.yunxi.queue");
|
||||
|
||||
return Flux.create(sink->{
|
||||
//容器中设置监听器用于接收到消息后使用sink发送给客户端
|
||||
container.setupMessageListener((ChannelAwareMessageListener)(Message message, Channel channel)->{
|
||||
if (sink.isCancelled()) {
|
||||
container.stop();
|
||||
return;
|
||||
}
|
||||
String msg = new String(message.getBody());
|
||||
sink.next(msg);
|
||||
});
|
||||
|
||||
//启动容器和停止容器
|
||||
sink.onRequest(r -> container.start());
|
||||
sink.onDispose(container::stop);
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
@ -218,7 +218,7 @@ public class UserController {
|
||||
*/
|
||||
@PostMapping("/login/pwd")
|
||||
public Mono<JSONObject> loginByPwd(@RequestBody JSONObject jsonObject) {
|
||||
log.info("UserController[]loginByPwd[]jsonObject:{}", jsonObject);
|
||||
//log.info("UserController[]loginByPwd[]jsonObject:{}", jsonObject);
|
||||
return webClient.post().uri(baseUrl + pwdUrl).bodyValue(getMultiValueMap(jsonObject)).retrieve()
|
||||
.bodyToMono(JSONObject.class)
|
||||
.flatMap(res -> {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user