This commit is contained in:
simon 2023-09-26 13:23:08 +08:00
parent d68d9e76c5
commit 979f85f055
4 changed files with 20 additions and 8 deletions

View File

@ -50,6 +50,10 @@
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>

View File

@ -1,4 +1,4 @@
package com.qiuguo.iot.third.service;
package com.qiuguo.iot.data.service.mq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -2,15 +2,15 @@ package com.qiuguo.iot.user.api.controller.mq;
import com.qiuguo.iot.data.constants.YunxiRabbitConst;
import com.qiuguo.iot.data.entity.user.UserHomeEntity;
import com.qiuguo.iot.third.service.MqService;
import com.qiuguo.iot.data.service.mq.MqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
/**
* @author simon
* @date 2023/9/25
@ -20,16 +20,23 @@ import reactor.core.publisher.Mono;
@Slf4j
@RequestMapping("/mq")
public class MqController {
@Autowired
@Resource
private MqService mqService;
@GetMapping("/sendMsg")
public Mono<Boolean> sendMsg() {
public Mono sendMsg() {
UserHomeEntity userHomeEntity = new UserHomeEntity();
userHomeEntity.setUserId(1L);
userHomeEntity.setHomeName("小米的家");
userHomeEntity.setUserId(System.currentTimeMillis());
return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, YunxiRabbitConst.ROUTE_KEY_YUNXI, userHomeEntity);
Mono<Boolean> booleanMono = mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT, YunxiRabbitConst.ROUTE_KEY_YUNXI, userHomeEntity);
return booleanMono.flatMap(res -> {
if (!res) {
return Mono.error(new RuntimeException("消息发送失败"));
} else {
return Mono.just(true);
}
});
}
}

View File

@ -22,8 +22,9 @@ public class YunxiListener {
public void processYunxiQueue(Channel channel, Message message) throws IOException {
String messageContent = new String(message.getBody(), "UTF-8");
System.out.println("YunxiListener msg " + messageContent);
//TODO 消费者处理程序
//手动消息确认 配置需开启 acknowledge-mode: manual
//处理完毕 手动消息确认 配置需开启 acknowledge-mode: manual
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}