联调完与U3D MQ通讯功能高,实现MQ控制物联网设备动作

This commit is contained in:
wulin 2023-10-12 20:55:42 +08:00
parent 083077b6da
commit 01094da6d3
4 changed files with 79 additions and 35 deletions

View File

@ -4,24 +4,13 @@ package com.qiuguo.iot.base.enums;
* 与U3D消息类型
* 作者吴林
* */
// 动作类型0文本播放 1音频播放 2 U3D动作 3物联网设备动作
public enum U3dMsgTypeEnum {
TTS(0, "文本播放"),
IOT(1, "IOT控制"),
ALARM_CLOCK(3, "闹钟"),
WEATHER(2, "天气"),
DANCE(0, "跳舞"),
CHANGE(1, "换装"),
MOUTH(3, "口型"),
IOT(100, "iot操作"),
U3D(4, "U3D动作"),
MUSIC(5, "音乐,声音"),
UPDATE(100, "固件升级"),
BOX_ON_LINE(101, "Box配网成功"),
BOX_OFF_LINE(102, "Box离线"),
DEVICE_UNBIND(103, "设备解绑"),
DEVICE_BIND(104, "设备绑定成功"),
COMMAND_N(200, "指令后必须跟的名称词"),
QIU_GUO(300, "秋果专有名词"),
IGNORE(400, "^^^"),//全部所有一切等忽略此
;
U3dMsgTypeEnum(Integer c, String n){
code = c;

View File

@ -19,7 +19,7 @@ public class YunxiRabbitConst {
/**
* 云栖活动发送的路由键
*/
public static final String ROUTE_KEY_YUNXI = "IOT";
public static final String ROUTE_KEY_YUNXI = "IOT.TO";
/**
* 订阅队列

View File

@ -0,0 +1,26 @@
package com.qiuguo.iot.box.websocket.api.config;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author simon
* @date 2023/9/26
* @description
**/
@Configuration
public class TopicRabbitListenerConfig {
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
// 配置消息监听容器工厂
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3); // 并发消费者数量
factory.setMaxConcurrentConsumers(10); // 最大并发消费者数量
factory.setPrefetchCount(1); // 每个消费者的消息预取数量
return factory;
}
}

View File

@ -1,10 +1,7 @@
package com.qiuguo.iot.box.websocket.api.handler;
import com.alibaba.fastjson.JSONObject;
import com.qiuguo.iot.base.enums.AskTypeEnum;
import com.qiuguo.iot.base.enums.PlayEnum;
import com.qiuguo.iot.base.enums.RespCodeEnum;
import com.qiuguo.iot.base.enums.YesNo;
import com.qiuguo.iot.base.enums.*;
import com.qiuguo.iot.base.utils.StringUtils;
import com.qiuguo.iot.box.websocket.api.domain.BaseSession;
import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession;
@ -12,6 +9,7 @@ import com.qiuguo.iot.box.websocket.api.domain.box.resp.BoxMessageResp;
import com.qiuguo.iot.data.constants.YunxiRabbitConst;
import com.qiuguo.iot.data.entity.device.DeviceUserBindEntity;
import com.qiuguo.iot.data.entity.device.DeviceUserTalkRecordEntity;
import com.qiuguo.iot.data.entity.system.SystemTalkAnswerConfigEntity;
import com.qiuguo.iot.data.entity.system.SystemTalkBindDeviceEntity;
import com.qiuguo.iot.data.request.device.DeviceUserBindRequest;
import com.qiuguo.iot.data.request.qwen.TongYiCommunicationRest;
@ -90,7 +88,31 @@ public class BaseWebSocketProcess {
String messageContent = new String(message.getBody(), "UTF-8");
log.info("来自RabbitMQ的消息{}", messageContent);
U3dMsg u3dMsg = JSONObject.parseObject(messageContent, U3dMsg.class);
if(U3dMsgTypeEnum.IOT.getCode().equals(u3dMsg.getMsgType())){
//物联网操作
DeviceUserBindRequest deviceBindRequest = new DeviceUserBindRequest();
deviceBindRequest.setU3dId(u3dMsg.getMetaId());
deviceBindRequest.setScenceId(u3dMsg.getScenceId());
deviceUserBindService.selectDeviceUserBindByRequest(deviceBindRequest).map(db ->{
SystemTalkBindDeviceRequest systemTalkBindDeviceRequest = new SystemTalkBindDeviceRequest();
systemTalkBindDeviceRequest.setU3dStatusId(u3dMsg.getStatusId());
systemTalkBindDeviceRequest.setCategoryCode(db.getCategoryCode());
systemTalkBindDeviceService.selectSystemTalkBindDeviceByRequest(systemTalkBindDeviceRequest).map(sd -> {
TuyaQuery query = new TuyaQuery();
query.setDeviceId(db.getOtherDeviceId());
query.setValue(u3dMsg.getExtParam());
query.setUserHandlingDeviceId(sd.getUserHandlingId());
tuyaDeviceService.controlDevice(query).map(isOk ->{
log.info("U3D打开设备调用情况{}", isOk);
return isOk;
}).subscribe();
return sd;
}).subscribe();
return db;
}).subscribe();
}
//处理完毕 手动消息确认 配置需开启 acknowledge-mode: manual
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e){
@ -137,6 +159,23 @@ public class BaseWebSocketProcess {
}).subscribeOn(Schedulers.boundedElastic()).subscribe();
}
private void toU3DMq(Action action, SystemTalkBindDeviceEntity systemTalkBindDeviceEntity){
log.info("通知U3DMQ");
try{
U3dMsg u3dMsg = new U3dMsg();
u3dMsg.setMsgType(U3dMsgTypeEnum.IOT.getCode());
u3dMsg.setMetaId(action.getDeviceUserBindEntity().getU3dId());
u3dMsg.setScenceId(action.getDeviceUserBindEntity().getScenceId());
u3dMsg.setStatusId(systemTalkBindDeviceEntity.getU3dStatusId());
//发送消息到MQ通知U3D
mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT,
YunxiRabbitConst.ROUTE_KEY_YUNXI,
JSONObject.toJSONString(u3dMsg)).subscribe();
}catch (Exception e){
log.info("通知U3D MQ异常{}", e);
}
}
protected void processAction(Actions actions, Long userId, BaseSession baseSession) {
if(actions.getActions() == null || actions.getActions().size() == 0){
//调用千问回答\
@ -325,6 +364,8 @@ public class BaseWebSocketProcess {
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.QIU_GUO.getCode())){
sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode());
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.U3D.getCode())){
//sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode());
}
}
}
@ -356,21 +397,7 @@ public class BaseWebSocketProcess {
String msg = "";
if(isOk.getCode().equals(RespCodeEnum.SUCESS.getCode())){
if(action.getDeviceUserBindEntity().getU3dId() != null){
log.info("通知U3DMQ");
try{
U3dMsg u3dMsg = new U3dMsg();
u3dMsg.setMsgType(100);
u3dMsg.setMetaId(action.getDeviceUserBindEntity().getU3dId());
u3dMsg.setScenceId(action.getDeviceUserBindEntity().getScenceId());
u3dMsg.setStatusId(1l);
//发送消息到MQ通知U3D
mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT,
YunxiRabbitConst.ROUTE_KEY_YUNXI,
JSONObject.toJSONString(u3dMsg)).subscribe();
}catch (Exception e){
log.info("通知U3D MQ异常{}", e);
}
toU3DMq(action, systemTalkBindDeviceEntity);
}
//通知打开灯成功
@ -396,6 +423,8 @@ public class BaseWebSocketProcess {
}).subscribe();
}
private void sendMessage(Action action, BaseSession baseSession, String message, Integer type){
BoxMessageResp resp = new BoxMessageResp();
resp.setType(type);