Compare commits
3 Commits
master
...
feature-sh
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b09d479140 | ||
|
|
5366d5b41e | ||
|
|
9eeeeea273 |
67
iot-common/iot-rabbit/pom.xml
Normal file
67
iot-common/iot-rabbit/pom.xml
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>com.qiuguo.iot</groupId>
|
||||||
|
<artifactId>iot-common</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<artifactId>iot-rabbit</artifactId>
|
||||||
|
<name>iot-rabbit</name>
|
||||||
|
<description>mq类</description>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<java.version>1.8</java.version>
|
||||||
|
</properties>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.projectreactor</groupId>
|
||||||
|
<artifactId>reactor-test</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.amqp</groupId>
|
||||||
|
<artifactId>spring-rabbit-test</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<finalName>${project.artifactId}</finalName>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||||
|
<version>${spring.boot.maven.plugin.version}</version>
|
||||||
|
<configuration>
|
||||||
|
<!--跳过对项目中main方法的查找-->
|
||||||
|
<skip>true</skip>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<goals>
|
||||||
|
<goal>repackage</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
</project>
|
||||||
@ -0,0 +1,22 @@
|
|||||||
|
package com.qiuguo.iot.rabbit.config;
|
||||||
|
|
||||||
|
import org.springframework.amqp.core.ExchangeBuilder;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||||
|
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class RabbitMqConfig {
|
||||||
|
@Bean
|
||||||
|
Exchange fanoutExchange(){
|
||||||
|
return ExchangeBuilder.fanoutExchange("fanoutExchange").durable(true).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RabbitAdmin rabbitAdmin(CachingConnectionFactory connectionFactory){
|
||||||
|
return new RabbitAdmin(connectionFactory);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,54 @@
|
|||||||
|
package com.qiuguo.iot.rabbit.service;
|
||||||
|
|
||||||
|
import org.springframework.amqp.core.*;
|
||||||
|
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||||
|
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class MessageListenerContainerFactory {
|
||||||
|
@Resource
|
||||||
|
private CachingConnectionFactory connectionFactory;
|
||||||
|
@Resource
|
||||||
|
private RabbitAdmin rabbitAdmin;
|
||||||
|
@Resource
|
||||||
|
private Exchange fanoutExchange;
|
||||||
|
|
||||||
|
public SimpleMessageListenerContainer create(String queueName) {
|
||||||
|
Queue queue = QueueBuilder.nonDurable(queueName).maxLength(10000).autoDelete().exclusive().build();
|
||||||
|
rabbitAdmin.declareQueue(queue);
|
||||||
|
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange).with("").noargs());
|
||||||
|
|
||||||
|
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
|
||||||
|
// 在容器中放入刚创建好的队列
|
||||||
|
simpleMessageListenerContainer.setQueueNames(queue.getName());
|
||||||
|
simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
|
||||||
|
/*
|
||||||
|
//设置当前的消费者数量
|
||||||
|
simpleMessageListenerContainer.setConcurrentConsumers(1);
|
||||||
|
simpleMessageListenerContainer.setMaxConcurrentConsumers(1);
|
||||||
|
//设置消息是否重回队列
|
||||||
|
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
|
||||||
|
//设置自动确认消息simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
|
||||||
|
//设置暴露监听器通道
|
||||||
|
simpleMessageListenerContainer.setExposeListenerChannel(true);
|
||||||
|
*/
|
||||||
|
|
||||||
|
return simpleMessageListenerContainer;
|
||||||
|
}
|
||||||
|
public SimpleMessageListenerContainer create(){
|
||||||
|
Queue queue = QueueBuilder.nonDurable().maxLength(10000).autoDelete().exclusive().build();
|
||||||
|
rabbitAdmin.declareQueue(queue);
|
||||||
|
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange).with("").noargs());
|
||||||
|
|
||||||
|
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
|
||||||
|
// 在容器中放入刚创建好的队列
|
||||||
|
simpleMessageListenerContainer.setQueueNames(queue.getName());
|
||||||
|
simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
|
||||||
|
|
||||||
|
return simpleMessageListenerContainer;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,51 @@
|
|||||||
|
package com.qiuguo.iot.user.api.controller.mq;
|
||||||
|
|
||||||
|
import com.qiuguo.iot.rabbit.service.MessageListenerContainerFactory;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||||
|
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.ResponseBody;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author simon
|
||||||
|
* @date 2023/9/25
|
||||||
|
* @description
|
||||||
|
**/
|
||||||
|
@RestController
|
||||||
|
@Slf4j
|
||||||
|
@RequestMapping("/mq")
|
||||||
|
public class MqController {
|
||||||
|
@Resource
|
||||||
|
private MessageListenerContainerFactory containerFactory;
|
||||||
|
|
||||||
|
@RequestMapping(value = "/test")
|
||||||
|
@ResponseBody
|
||||||
|
public Flux<String> test() {
|
||||||
|
//用自己写的工厂创建一个监听容器
|
||||||
|
SimpleMessageListenerContainer container = containerFactory.create("iot.yunxi.queue");
|
||||||
|
|
||||||
|
return Flux.create(sink->{
|
||||||
|
//容器中设置监听器用于接收到消息后使用sink发送给客户端
|
||||||
|
container.setupMessageListener((ChannelAwareMessageListener)(Message message, Channel channel)->{
|
||||||
|
if (sink.isCancelled()) {
|
||||||
|
container.stop();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String msg = new String(message.getBody());
|
||||||
|
sink.next(msg);
|
||||||
|
});
|
||||||
|
|
||||||
|
//启动容器和停止容器
|
||||||
|
sink.onRequest(r -> container.start());
|
||||||
|
sink.onDispose(container::stop);
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -218,7 +218,7 @@ public class UserController {
|
|||||||
*/
|
*/
|
||||||
@PostMapping("/login/pwd")
|
@PostMapping("/login/pwd")
|
||||||
public Mono<JSONObject> loginByPwd(@RequestBody JSONObject jsonObject) {
|
public Mono<JSONObject> loginByPwd(@RequestBody JSONObject jsonObject) {
|
||||||
log.info("UserController[]loginByPwd[]jsonObject:{}", jsonObject);
|
//log.info("UserController[]loginByPwd[]jsonObject:{}", jsonObject);
|
||||||
return webClient.post().uri(baseUrl + pwdUrl).bodyValue(getMultiValueMap(jsonObject)).retrieve()
|
return webClient.post().uri(baseUrl + pwdUrl).bodyValue(getMultiValueMap(jsonObject)).retrieve()
|
||||||
.bodyToMono(JSONObject.class)
|
.bodyToMono(JSONObject.class)
|
||||||
.flatMap(res -> {
|
.flatMap(res -> {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user