This commit is contained in:
simon 2023-09-26 11:51:41 +08:00
parent c3105b125f
commit d68d9e76c5
16 changed files with 197 additions and 266 deletions

View File

@ -1,4 +1,4 @@
package com.qiuguo.iot.rabbit.constants;
package com.qiuguo.iot.data.constants;
/**
* @author simon

View File

@ -1,58 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.qiuguo.iot</groupId>
<artifactId>iot-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>iot-rabbit</artifactId>
<name>iot-rabbit</name>
<description>mq类</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.maven.plugin.version}</version>
<configuration>
<!--跳过对项目中main方法的查找-->
<skip>true</skip>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,29 +0,0 @@
package com.qiuguo.iot.rabbit.config;
import com.qiuguo.iot.rabbit.constants.YunxiRabbitConst;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RunXiRabbitConfig {
/**
* 交换机
*
* @return
*/
@Bean
public Exchange RunxiEventExchange() {
return new TopicExchange(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT,
true,
false,
null);
}
}

View File

@ -1,33 +0,0 @@
package com.qiuguo.iot.rabbit.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
/**
* @author simon
* @date 2023/9/25
* @description
**/
@Service
public class MqSendMsgService {
@Resource
private RabbitTemplate rabbitTemplate;
public Mono<Boolean> sendRabbitMsg(String exchange, String routingKey, Object message) {
return Mono.create(sink -> {
rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {
if (ack) {
// 消息发送成功
sink.success(Boolean.TRUE);
} else {
// 消息发送失败
sink.error(new RuntimeException("Message send failed: " + cause));
}
});
rabbitTemplate.convertAndSend(exchange, routingKey, message);
});
}
}

View File

@ -20,15 +20,7 @@
<artifactId>tuya-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>com.qiuguo.iot</groupId>
<artifactId>iot-base</artifactId>
@ -42,10 +34,31 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,67 @@
package com.qiuguo.iot.third.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author simon
* @date 2023/9/26
* @description
**/
@Service
public class MqService {
private final RabbitTemplate rabbitTemplate;
private final AtomicBoolean confirmationResult = new AtomicBoolean(false);
@Autowired
public MqService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
// 设置 ConfirmCallback
rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {
if (ack) {
// 消息发送成功
confirmationResult.set(true);
System.out.println("消息发送成功");
} else {
// 消息发送失败
confirmationResult.set(false);
System.out.println("消息发送失败");
}
});
}
public Mono<Boolean> sendMessageWithConfirmation(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return Mono.defer(() -> {
boolean result = confirmationResult.get();
return Mono.just(result);
});
}
public Mono<Object> sendMessage(String exchange, String routingKey, Object message) {
// 只设置一次确认回调
rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {
if (ack) {
// 消息发送成功
System.out.println("Message sent successfully.");
} else {
// 消息发送失败
System.err.println("Message send failed: " + cause);
}
});
return Mono.defer(() -> {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
// 返回一个表示异步操作完成的 Mono
return Mono.empty();
});
}
}

View File

@ -1,16 +0,0 @@
package com.qiuguo.iot.third;
import com.qiuguo.iot.data.resp.third.ThirdIpInfoResp;
import com.qiuguo.iot.data.resp.third.ThirdRpcResp;
import com.qiuguo.iot.third.service.IpService;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicReference;
@SpringBootTest()
class IotThirdApplicationTests {
}

View File

@ -6,7 +6,6 @@ import org.junit.jupiter.api.Test;
import org.springframework.beans.BeanUtils;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import javax.annotation.Resource;

View File

@ -15,7 +15,6 @@
<module>iot-base</module>
<module>iot-data</module>
<module>iot-third</module>
<module>iot-rabbit</module>
</modules>
<dependencies>

View File

@ -1,114 +0,0 @@
package com.qiuguo.iot.admin.http.api.controller;
import com.qiuguo.iot.admin.http.api.req.DemoReq;
import com.qiuguo.iot.base.nlp.lac.LacNlp;
import com.qiuguo.iot.base.nlp.Nlp;
import com.qiuguo.iot.data.entity.device.DeviceInfoEntity;
import com.qiuguo.iot.data.service.device.DeviceInfoService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
@RestController
@RequestMapping("/demo")
@Slf4j
@AllArgsConstructor
public class DemoController {
private final Logger logger = LoggerFactory.getLogger(getClass());
//@Autowired
private final DeviceInfoService deviceInfoService;
@GetMapping("/test")
public Mono<String> test(){
return Mono.just("测试用例");
}
@GetMapping("/delay")
public Mono<String> delay(){
Mono<String> from = Mono.fromSupplier(()->{
try{
Thread.sleep(5000);
}catch (Exception e){
log.info("休眠错误");
}
return "休眠五秒后的时间:" + LocalDateTime.now();
});
log.info("现在时间:" + LocalDateTime.now());
return from;
}
@GetMapping("/flux")
public Flux<String> testFlux(){
return Flux.just("flux测试");
}
@GetMapping("/flux/delay")
public Flux<String> testFluxDelay(){
Flux<String> flux = Flux.fromStream(IntStream.range(1, 6).mapToObj(i -> {
try{
Thread.sleep(1000);
}catch (Exception e){
log.info("休眠错误");
}
return "休眠后的时间:" + LocalDateTime.now();
}));
logger.info("现在时间:" + LocalDateTime.now());
return flux;
}
@PostMapping("/post")//请求数据示例{"aaa":"bbb"}
public Mono<String> postDemo(@RequestBody Mono<Map<String, Object>> contextMono){
Mono<String> mono =contextMono.map(item->{
for (Object o:item.values()
) {
log.info("接受到的信息{}", o);
}
return "1234";//返回结果
});
return mono;
}
@PostMapping("/post/demo")//请求数据示例{"name":"bbb","id":123}
public Mono<DemoReq> postTest(@RequestBody Mono<DemoReq> contextMono){
Mono<DemoReq> mono =contextMono.map(item->{
log.info("接收到的对象:{}", item);
item.setId(456l);
item.setName("gaibian");
return item;//返回结果
});
return mono;
}
@GetMapping("/device/{id}")
public Mono<DeviceInfoEntity> getDeviceById(@PathVariable Long id){
return deviceInfoService.selectDeviceInfoById(id);
}
@GetMapping("/nlp")
public Mono<Nlp> getNlp(@RequestParam String value){
LacNlp lacNlp = new LacNlp();
return lacNlp.geSingletNlp(value);
}
@GetMapping("/nlps")
public Mono<List<Nlp>> getNlp(){
List<String> values = new ArrayList<>();// {"", ""};
values.add("我的未来不是梦");
values.add("AI是大势所趋");
LacNlp lacNlp = new LacNlp();
return lacNlp.getNlp(values);
}
}

View File

@ -28,6 +28,11 @@
<artifactId>iot-third</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qiuguo.iot</groupId>
<artifactId>iot-rabbit</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -7,9 +7,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
@SpringBootApplication(scanBasePackages = {"com.qiuguo.iot.user.api", "com.qiuguo.iot.data.service","com.qiuguo.iot.third.service"})
@SpringBootApplication(scanBasePackages = {"com.qiuguo.iot.user.api", "com.qiuguo.iot.data.service","com.qiuguo.iot.third.service","com.qiuguo.iot.rabbit.service"})
@EnableEasyormRepository(value = "com.qiuguo.iot.data.entity.*")
@ConnectorScan(basePackages = "com.qiuguo.iot.third.service")
@ConnectorScan(basePackages = {"com.qiuguo.iot.third.service"})
@EnableAspectJAutoProxy
@EnableDiscoveryClient
public class IotBoxUserApiApplication {

View File

@ -0,0 +1,15 @@
package com.qiuguo.iot.user.api.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyMsgConverter {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}

View File

@ -0,0 +1,35 @@
package com.qiuguo.iot.user.api.controller.mq;
import com.qiuguo.iot.data.constants.YunxiRabbitConst;
import com.qiuguo.iot.data.entity.user.UserHomeEntity;
import com.qiuguo.iot.third.service.MqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
/**
* @author simon
* @date 2023/9/25
* @description
**/
@RestController
@Slf4j
@RequestMapping("/mq")
public class MqController {
@Autowired
private MqService mqService;
@GetMapping("/sendMsg")
public Mono<Boolean> sendMsg() {
UserHomeEntity userHomeEntity = new UserHomeEntity();
userHomeEntity.setUserId(1L);
userHomeEntity.setHomeName("小米的家");
userHomeEntity.setUserId(System.currentTimeMillis());
return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, YunxiRabbitConst.ROUTE_KEY_YUNXI, userHomeEntity);
}
}

View File

@ -0,0 +1,29 @@
package com.qiuguo.iot.user.api.listener;
import com.qiuguo.iot.data.constants.YunxiRabbitConst;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author simon
* @date 2023/9/26
* @description
**/
@Component
@Slf4j
public class YunxiListener {
@RabbitListener(queues = YunxiRabbitConst.QUEUE_YUNXI)
public void processYunxiQueue(Channel channel, Message message) throws IOException {
String messageContent = new String(message.getBody(), "UTF-8");
System.out.println("YunxiListener msg " + messageContent);
//手动消息确认 配置需开启 acknowledge-mode: manual
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

View File

@ -4,6 +4,25 @@ spring:
port: 31043
username: admin
password: 123456
listener:
simple:
# retry:
# # 开启重试机制
# enabled: true
# # 重试次数
# max-attempts: 5
# # 重试最大间隔时间
# max-interval: 100000
# # 重试初始间隔时间
# initial-interval: 100
# # 间隔时间因子
# multiplier: 20
# 设置手动ack回复
acknowledge-mode: manual
#设置消息发送回调
publisher-returns: true
publisher-confirm-type: simple
virtual-host: /iot
cloud:
# config: