diff --git a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/constants/YunxiRabbitConst.java b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/constants/YunxiRabbitConst.java similarity index 91% rename from iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/constants/YunxiRabbitConst.java rename to iot-common/iot-data/src/main/java/com/qiuguo/iot/data/constants/YunxiRabbitConst.java index c53e4a5..d561e74 100644 --- a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/constants/YunxiRabbitConst.java +++ b/iot-common/iot-data/src/main/java/com/qiuguo/iot/data/constants/YunxiRabbitConst.java @@ -1,4 +1,4 @@ -package com.qiuguo.iot.rabbit.constants; +package com.qiuguo.iot.data.constants; /** * @author simon diff --git a/iot-common/iot-rabbit/pom.xml b/iot-common/iot-rabbit/pom.xml deleted file mode 100644 index 4f63e3c..0000000 --- a/iot-common/iot-rabbit/pom.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - 4.0.0 - - com.qiuguo.iot - iot-common - 0.0.1-SNAPSHOT - - - iot-rabbit - iot-rabbit - mq类 - - - 1.8 - - - - org.springframework.boot - spring-boot-starter-amqp - - - org.springframework.boot - spring-boot-starter-webflux - - - - - org.springframework.amqp - spring-rabbit-test - test - - - - - ${project.artifactId} - - - org.springframework.boot - spring-boot-maven-plugin - ${spring.boot.maven.plugin.version} - - - true - - - - - repackage - - - - - - - - diff --git a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/config/RunXiRabbitConfig.java b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/config/RunXiRabbitConfig.java deleted file mode 100644 index 11796f7..0000000 --- a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/config/RunXiRabbitConfig.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.qiuguo.iot.rabbit.config; - -import com.qiuguo.iot.rabbit.constants.YunxiRabbitConst; -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.Exchange; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.core.TopicExchange; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.HashMap; -import java.util.Map; - -@Configuration -public class RunXiRabbitConfig { - - /** - * 交换机 - * - * @return - */ - @Bean - public Exchange RunxiEventExchange() { - return new TopicExchange(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, - true, - false, - null); - } -} diff --git a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/service/MqSendMsgService.java b/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/service/MqSendMsgService.java deleted file mode 100644 index 9370d11..0000000 --- a/iot-common/iot-rabbit/src/main/java/com/qiuguo/iot/rabbit/service/MqSendMsgService.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.qiuguo.iot.rabbit.service; - -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.stereotype.Service; -import reactor.core.publisher.Mono; - -import javax.annotation.Resource; - -/** - * @author simon - * @date 2023/9/25 - * @description - **/ -@Service -public class MqSendMsgService { - @Resource - private RabbitTemplate rabbitTemplate; - public Mono sendRabbitMsg(String exchange, String routingKey, Object message) { - return Mono.create(sink -> { - rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> { - if (ack) { - // 消息发送成功 - sink.success(Boolean.TRUE); - } else { - // 消息发送失败 - sink.error(new RuntimeException("Message send failed: " + cause)); - } - }); - - rabbitTemplate.convertAndSend(exchange, routingKey, message); - }); - } -} diff --git a/iot-common/iot-third/pom.xml b/iot-common/iot-third/pom.xml index 3ecd44b..44e7320 100644 --- a/iot-common/iot-third/pom.xml +++ b/iot-common/iot-third/pom.xml @@ -20,15 +20,7 @@ tuya-spring-boot-starter 1.3.2 - - org.springframework.boot - spring-boot-starter-test - test - - - org.springframework.boot - spring-boot-starter-webflux - + com.qiuguo.iot iot-base @@ -42,10 +34,31 @@ compile - io.projectreactor - reactor-test + org.testng + testng + RELEASE test + + org.springframework.boot + spring-boot-test + test + + + junit + junit + test + + + org.junit.jupiter + junit-jupiter-api + test + + + org.springframework.amqp + spring-rabbit + + diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/MqService.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/MqService.java new file mode 100644 index 0000000..beb294e --- /dev/null +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/MqService.java @@ -0,0 +1,67 @@ +package com.qiuguo.iot.third.service; + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author simon + * @date 2023/9/26 + * @description + **/ + +@Service +public class MqService { + private final RabbitTemplate rabbitTemplate; + private final AtomicBoolean confirmationResult = new AtomicBoolean(false); + + + @Autowired + public MqService(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + + // 设置 ConfirmCallback + rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> { + if (ack) { + // 消息发送成功 + confirmationResult.set(true); + System.out.println("消息发送成功"); + } else { + // 消息发送失败 + confirmationResult.set(false); + System.out.println("消息发送失败"); + } + }); + } + + public Mono sendMessageWithConfirmation(String exchange, String routingKey, Object message) { + rabbitTemplate.convertAndSend(exchange, routingKey, message); + + return Mono.defer(() -> { + boolean result = confirmationResult.get(); + return Mono.just(result); + }); + } + + public Mono sendMessage(String exchange, String routingKey, Object message) { + // 只设置一次确认回调 + rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> { + if (ack) { + // 消息发送成功 + System.out.println("Message sent successfully."); + } else { + // 消息发送失败 + System.err.println("Message send failed: " + cause); + } + }); + + return Mono.defer(() -> { + rabbitTemplate.convertAndSend(exchange, routingKey, message); + // 返回一个表示异步操作完成的 Mono + return Mono.empty(); + }); + } +} diff --git a/iot-common/iot-third/src/test/java/com/qiuguo/iot/third/IotThirdApplicationTests.java b/iot-common/iot-third/src/test/java/com/qiuguo/iot/third/IotThirdApplicationTests.java deleted file mode 100644 index 93a6c67..0000000 --- a/iot-common/iot-third/src/test/java/com/qiuguo/iot/third/IotThirdApplicationTests.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.qiuguo.iot.third; - -import com.qiuguo.iot.data.resp.third.ThirdIpInfoResp; -import com.qiuguo.iot.data.resp.third.ThirdRpcResp; -import com.qiuguo.iot.third.service.IpService; -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; -import reactor.core.publisher.Mono; - -import javax.annotation.Resource; -import java.util.concurrent.atomic.AtomicReference; - -@SpringBootTest() -class IotThirdApplicationTests { - -} diff --git a/iot-common/iot-third/src/test/java/com/qiuguo/iot/third/service/IpServiceTest.java b/iot-common/iot-third/src/test/java/com/qiuguo/iot/third/service/IpServiceTest.java index ee3284f..c522f8f 100644 --- a/iot-common/iot-third/src/test/java/com/qiuguo/iot/third/service/IpServiceTest.java +++ b/iot-common/iot-third/src/test/java/com/qiuguo/iot/third/service/IpServiceTest.java @@ -6,7 +6,6 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.BeanUtils; import org.springframework.boot.test.context.SpringBootTest; import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; import javax.annotation.Resource; diff --git a/iot-common/pom.xml b/iot-common/pom.xml index 3b161a0..1cc5a63 100644 --- a/iot-common/pom.xml +++ b/iot-common/pom.xml @@ -15,7 +15,6 @@ iot-base iot-data iot-third - iot-rabbit diff --git a/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/controller/DemoController.java b/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/controller/DemoController.java deleted file mode 100644 index 1cdef7b..0000000 --- a/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/controller/DemoController.java +++ /dev/null @@ -1,114 +0,0 @@ -package com.qiuguo.iot.admin.http.api.controller; - -import com.qiuguo.iot.admin.http.api.req.DemoReq; -import com.qiuguo.iot.base.nlp.lac.LacNlp; -import com.qiuguo.iot.base.nlp.Nlp; -import com.qiuguo.iot.data.entity.device.DeviceInfoEntity; -import com.qiuguo.iot.data.service.device.DeviceInfoService; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.web.bind.annotation.*; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.stream.IntStream; - -@RestController -@RequestMapping("/demo") -@Slf4j -@AllArgsConstructor - -public class DemoController { - private final Logger logger = LoggerFactory.getLogger(getClass()); - //@Autowired - private final DeviceInfoService deviceInfoService; - @GetMapping("/test") - public Mono test(){ - return Mono.just("测试用例"); - } - - @GetMapping("/delay") - public Mono delay(){ - Mono from = Mono.fromSupplier(()->{ - try{ - Thread.sleep(5000); - }catch (Exception e){ - log.info("休眠错误"); - } - return "休眠五秒后的时间:" + LocalDateTime.now(); - }); - log.info("现在时间:" + LocalDateTime.now()); - return from; - } - - @GetMapping("/flux") - public Flux testFlux(){ - return Flux.just("flux测试"); - } - - @GetMapping("/flux/delay") - public Flux testFluxDelay(){ - Flux flux = Flux.fromStream(IntStream.range(1, 6).mapToObj(i -> { - try{ - Thread.sleep(1000); - }catch (Exception e){ - log.info("休眠错误"); - } - return "休眠后的时间:" + LocalDateTime.now(); - })); - logger.info("现在时间:" + LocalDateTime.now()); - return flux; - } - - @PostMapping("/post")//请求数据示例:{"aaa":"bbb"} - public Mono postDemo(@RequestBody Mono> contextMono){ - Mono mono =contextMono.map(item->{ - for (Object o:item.values() - ) { - log.info("接受到的信息{}", o); - } - return "1234";//返回结果 - }); - - return mono; - } - @PostMapping("/post/demo")//请求数据示例:{"name":"bbb","id":123} - - public Mono postTest(@RequestBody Mono contextMono){ - Mono mono =contextMono.map(item->{ - log.info("接收到的对象:{}", item); - item.setId(456l); - item.setName("gaibian"); - return item;//返回结果 - }); - - return mono; - } - - @GetMapping("/device/{id}") - public Mono getDeviceById(@PathVariable Long id){ - return deviceInfoService.selectDeviceInfoById(id); - } - - @GetMapping("/nlp") - public Mono getNlp(@RequestParam String value){ - LacNlp lacNlp = new LacNlp(); - return lacNlp.geSingletNlp(value); - } - - @GetMapping("/nlps") - public Mono> getNlp(){ - List values = new ArrayList<>();// {"", ""}; - values.add("我的未来不是梦"); - values.add("AI是大势所趋"); - LacNlp lacNlp = new LacNlp(); - return lacNlp.getNlp(values); - } - -} diff --git a/iot-modules/iot-box-user-api/pom.xml b/iot-modules/iot-box-user-api/pom.xml index 40c6cf7..83754aa 100644 --- a/iot-modules/iot-box-user-api/pom.xml +++ b/iot-modules/iot-box-user-api/pom.xml @@ -28,6 +28,11 @@ iot-third 0.0.1-SNAPSHOT + + com.qiuguo.iot + iot-rabbit + 0.0.1-SNAPSHOT + org.springframework.boot diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/IotBoxUserApiApplication.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/IotBoxUserApiApplication.java index 5d1be16..23b8c98 100644 --- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/IotBoxUserApiApplication.java +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/IotBoxUserApiApplication.java @@ -7,9 +7,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.context.annotation.EnableAspectJAutoProxy; -@SpringBootApplication(scanBasePackages = {"com.qiuguo.iot.user.api", "com.qiuguo.iot.data.service","com.qiuguo.iot.third.service"}) +@SpringBootApplication(scanBasePackages = {"com.qiuguo.iot.user.api", "com.qiuguo.iot.data.service","com.qiuguo.iot.third.service","com.qiuguo.iot.rabbit.service"}) @EnableEasyormRepository(value = "com.qiuguo.iot.data.entity.*") -@ConnectorScan(basePackages = "com.qiuguo.iot.third.service") +@ConnectorScan(basePackages = {"com.qiuguo.iot.third.service"}) @EnableAspectJAutoProxy @EnableDiscoveryClient public class IotBoxUserApiApplication { diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/config/MyMsgConverter.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/config/MyMsgConverter.java new file mode 100644 index 0000000..ec2ea13 --- /dev/null +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/config/MyMsgConverter.java @@ -0,0 +1,15 @@ +package com.qiuguo.iot.user.api.config; + +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MyMsgConverter { + + @Bean + public MessageConverter messageConverter(){ + return new Jackson2JsonMessageConverter(); + } +} diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/mq/MqController.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/mq/MqController.java new file mode 100644 index 0000000..abf0c04 --- /dev/null +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/controller/mq/MqController.java @@ -0,0 +1,35 @@ +package com.qiuguo.iot.user.api.controller.mq; + +import com.qiuguo.iot.data.constants.YunxiRabbitConst; +import com.qiuguo.iot.data.entity.user.UserHomeEntity; +import com.qiuguo.iot.third.service.MqService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +/** + * @author simon + * @date 2023/9/25 + * @description + **/ +@RestController +@Slf4j +@RequestMapping("/mq") +public class MqController { + @Autowired + private MqService mqService; + + @GetMapping("/sendMsg") + public Mono sendMsg() { + UserHomeEntity userHomeEntity = new UserHomeEntity(); + userHomeEntity.setUserId(1L); + userHomeEntity.setHomeName("小米的家"); + userHomeEntity.setUserId(System.currentTimeMillis()); + + return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, YunxiRabbitConst.ROUTE_KEY_YUNXI, userHomeEntity); + } +} diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/listener/YunxiListener.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/listener/YunxiListener.java new file mode 100644 index 0000000..79e34ab --- /dev/null +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/listener/YunxiListener.java @@ -0,0 +1,29 @@ +package com.qiuguo.iot.user.api.listener; + +import com.qiuguo.iot.data.constants.YunxiRabbitConst; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @author simon + * @date 2023/9/26 + * @description + **/ +@Component +@Slf4j +public class YunxiListener { + @RabbitListener(queues = YunxiRabbitConst.QUEUE_YUNXI) + public void processYunxiQueue(Channel channel, Message message) throws IOException { + String messageContent = new String(message.getBody(), "UTF-8"); + System.out.println("YunxiListener msg " + messageContent); + + //手动消息确认 配置需开启 acknowledge-mode: manual + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } +} diff --git a/iot-modules/iot-box-user-api/src/main/resources/bootstrap-dev.yml b/iot-modules/iot-box-user-api/src/main/resources/bootstrap-dev.yml index 69b834f..cfa25f5 100644 --- a/iot-modules/iot-box-user-api/src/main/resources/bootstrap-dev.yml +++ b/iot-modules/iot-box-user-api/src/main/resources/bootstrap-dev.yml @@ -4,6 +4,25 @@ spring: port: 31043 username: admin password: 123456 + listener: + simple: + # retry: + # # 开启重试机制 + # enabled: true + # # 重试次数 + # max-attempts: 5 + # # 重试最大间隔时间 + # max-interval: 100000 + # # 重试初始间隔时间 + # initial-interval: 100 + # # 间隔时间因子 + # multiplier: 20 + # 设置手动ack回复 + acknowledge-mode: manual + #设置消息发送回调 + publisher-returns: true + publisher-confirm-type: simple + virtual-host: /iot cloud: # config: