完成websocket 日志ID

This commit is contained in:
wulin 2023-10-17 15:30:29 +08:00
parent 88bdf71ceb
commit 5b67105114
7 changed files with 594 additions and 509 deletions

View File

@ -30,21 +30,20 @@ public class MqService {
if (ack) {
// 消息发送成功
confirmationResult.set(true);
log.info("MQ消息发送成功");
//log.info("MQ消息发送成功");
} else {
// 消息发送失败
confirmationResult.set(false);
log.info("MQ消息发送失败");
//log.info("MQ消息发送失败");
}
});
}
public Mono<Boolean> sendMessageWithConfirmation(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return Mono.defer(() -> {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
boolean result = confirmationResult.get();
log.info("MQ消息发送{}", result);
return Mono.just(result);
});
}

View File

@ -14,6 +14,7 @@ import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse;
import io.reactivex.functions.Consumer;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import java.util.concurrent.Semaphore;
@ -54,9 +55,9 @@ public class AliYunQianWen {
}
public void sendMessage(String msg,
Consumer<? super String> onNext,
QWenReplyResponse qwenReplyResponse) throws NoApiKeyException, InputRequiredException, InterruptedException {
public Mono<Boolean> sendMessage(String msg,
Consumer<? super String> onNext,
QWenReplyResponse qwenReplyResponse) {
if(!canAsk){
msgManager = new MessageManager(10);
}
@ -73,52 +74,63 @@ public class AliYunQianWen {
qwenParam.setMessages(msgManager.get());
Semaphore semaphore = new Semaphore(0);
gen.streamCall(qwenParam, new ResultCallback<GenerationResult>() {
@Override
public void onEvent(GenerationResult message) {
qwenReplyResponse.setCode(200);
return Mono.defer(() -> {
try {
gen.streamCall(qwenParam, new ResultCallback<GenerationResult>() {
@Override
public void onEvent(GenerationResult message) {
try {
onNext.accept(message.getOutput().getChoices().get(0).getMessage().getContent());
if(lastGenerationResult != null) {
lastGenerationResult.getOutput().getChoices().get(0).getMessage().setContent(
lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent()
+ message.getOutput().getChoices().get(0).getMessage().getContent()
);
try {
onNext.accept(message.getOutput().getChoices().get(0).getMessage().getContent());
if(lastGenerationResult != null) {
lastGenerationResult.getOutput().getChoices().get(0).getMessage().setContent(
lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent()
+ message.getOutput().getChoices().get(0).getMessage().getContent()
);
}else{
lastGenerationResult = message;
msgManager.add(lastGenerationResult);
canAsk = !canAsk;
}else{
lastGenerationResult = message;
msgManager.add(lastGenerationResult);
canAsk = !canAsk;
}
} catch (Exception e) {
log.info("千问回调异常{}", e);
qwenReplyResponse.setCode(500);
msgManager = new MessageManager(10);
}
}
} catch (Exception e) {
log.info("千问回调异常{}", e);
qwenReplyResponse.setCode(500);
msgManager = new MessageManager(10);
}
}
@Override
public void onComplete() {
if(lastGenerationResult != null){
qwenReplyResponse.setResut(lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent());
}
try {
onNext.accept("");
} catch (Exception e) {
log.info("千问最后调用结束时异常{}", e);
}
lastGenerationResult = null;
semaphore.release();
}
@Override
public void onComplete() {
if(lastGenerationResult != null){
qwenReplyResponse.setResut(lastGenerationResult.getOutput().getChoices().get(0).getMessage().getContent());
}
try {
onNext.accept("");
} catch (Exception e) {
log.info("千问最后调用结束时异常{}", e);
}
lastGenerationResult = null;
semaphore.release();
}
@Override
public void onError(Exception e) {
@Override
public void onError(Exception e) {
log.info("千问回调异常{}", e);
msgManager = new MessageManager(10);
qwenReplyResponse.setCode(500);
semaphore.release();
}
});
semaphore.acquire();
} catch (Exception e) {
log.info("调用千问异常{}", e);
msgManager = new MessageManager(10);
qwenReplyResponse.setCode(500);
semaphore.release();
}
return Mono.just(true);
});
semaphore.acquire();
}
}

View File

@ -63,7 +63,7 @@ public class LacNlpService implements INlp {
@Override
public Mono<Nlp> geSingletNlp(String value) {
return getSuanFaLac(value);
return getHubFaLac(value);
}
/**

View File

@ -45,19 +45,11 @@ public class QWenService {
} else {
aliQianWen = qianwenGroup.get(rest.getOnlyId());
}
final AliYunQianWen aliQianWen1 = aliQianWen;
return Mono.just(new QWenReplyResponse()).map(qWenReplyResponse -> {
try {
qWenReplyResponse.setCode(200);
aliQianWen1.sendMessage(rest.getText(), onNext, qWenReplyResponse);
} catch (Exception e) {
log.info("调用千问异常{}", e);
qWenReplyResponse.setCode(500);
}
return qWenReplyResponse;
QWenReplyResponse qWenReplyResponse = new QWenReplyResponse();
return aliQianWen.sendMessage(rest.getText(), onNext, qWenReplyResponse).flatMap(b -> {
return Mono.just(qWenReplyResponse);
});
}
public Mono<QWenReplyResponse> communication(TongYiCommunicationRest rest){

View File

@ -1,6 +1,7 @@
package com.qiuguo.iot.box.websocket.api.handler;
import com.alibaba.fastjson.JSONObject;
import com.qiuguo.iot.base.constans.Log4Constans;
import com.qiuguo.iot.base.enums.*;
import com.qiuguo.iot.base.utils.StringUtils;
import com.qiuguo.iot.box.websocket.api.domain.BaseMessageResp;
@ -38,15 +39,18 @@ import com.qiuguo.iot.third.service.*;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.slf4j.MDC;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import javax.annotation.Resource;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@ -130,7 +134,7 @@ public class BaseWebSocketProcess {
}
}
private void toQianWen(Action action, BaseSession baseSession, Integer type){
private Mono<Void> toQianWen(Action action, BaseSession baseSession, Integer type){
baseSession.setRequestId(baseSession.getRequestId() + 1);
TongYiCommunicationRest tongYiCommunicationRest = new TongYiCommunicationRest();
tongYiCommunicationRest.setText(action.getAsk());
@ -142,24 +146,23 @@ public class BaseWebSocketProcess {
tongYiCommunicationRest.setOnlyId(baseSession.getUserId().toString());
}
StringBuilder sb = new StringBuilder();
qwenService.communication(tongYiCommunicationRest, message ->{
return qwenService.communication(tongYiCommunicationRest, message ->{
//通知到客户端
MDC.put(Log4Constans.PRINT_LOG_ID, baseSession.getLogId());
if(tongYiCommunicationRest.getRequestId().equals(baseSession.getRequestId())){
String old = sb.toString() + message;
int d = old.indexOf("");
int j = old.indexOf("");
int a = old.indexOf("");
int b = old.indexOf("\n");
int c = old.indexOf("");
int n = old.indexOf("\\n");
int d = old.lastIndexOf("");
int j = old.lastIndexOf("");
int a = old.lastIndexOf("");
int b = old.lastIndexOf("\n");
int c = old.lastIndexOf("");
int n = old.lastIndexOf("\\n");
int m = Math.max(Math.max(Math.max(Math.max(d, j), Math.max(a, b)), c), n);
if(m > 0){
//清空
sb.setLength(0);
sb.append(old.substring(m));
old = old.substring(0, m);
normalSendMsg(baseSession, old, type);
}else{
sb.append(message);
@ -168,8 +171,8 @@ public class BaseWebSocketProcess {
return;
}
log.info("已经有新的请求不在推送到客户端SN{} userId:{}", baseSession.getSn(), baseSession.getUserId());
}).map(data ->{
MDC.remove(Log4Constans.PRINT_LOG_ID);
}).flatMap(data ->{
if(data.getCode() == 200){
log.info("千问正常结束");
//保存记录
@ -180,15 +183,18 @@ public class BaseWebSocketProcess {
talkRecord.setAnswerValue(data.getResut());
talkRecord.setUserId(baseSession.getUserId());
talkRecord.setDeviceId(baseSession.getDeviceId());
deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).subscribe();
return deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).flatMap(b -> {
return Mono.empty();
});
}else{
sendMessage(action, baseSession, "很抱歉,我无法回答您的问题,请换一个问题。", AskTypeEnum.TTS.getCode());
return sendMessage(action, baseSession, "很抱歉,我无法回答您的问题,请换一个问题。", AskTypeEnum.TTS.getCode()).flatMap(b ->{
return Mono.empty();
});
}
return data;
}).subscribeOn(Schedulers.boundedElastic()).subscribe();
})/*.subscribeOn(Schedulers.boundedElastic()).subscribe()*/;
}
private void toU3DMq(Action action, SystemTalkBindDeviceEntity systemTalkBindDeviceEntity, Long metaId){
private Mono<Boolean> toU3DMq(Action action, SystemTalkBindDeviceEntity systemTalkBindDeviceEntity, Long metaId){
U3dMsg u3dMsg = new U3dMsg();
u3dMsg.setMsgType(U3dMsgTypeEnum.IOT.getCode());
@ -196,10 +202,10 @@ public class BaseWebSocketProcess {
u3dMsg.setTime(System.currentTimeMillis());
u3dMsg.setScenceId(String.valueOf(action.getDeviceUserBindEntity().getScenceId()));
u3dMsg.setStatusId(String.valueOf(systemTalkBindDeviceEntity.getU3dStatusId()));
sendMq(JSONObject.toJSONString(u3dMsg));
return sendMq(JSONObject.toJSONString(u3dMsg));
}
private void toU3DMq(SystemTalkBindU3dEntity systemTalkBindU3dEntity, Long metaId){
private Mono<Boolean> toU3DMq(SystemTalkBindU3dEntity systemTalkBindU3dEntity, Long metaId){
U3dMsg u3dMsg = new U3dMsg();
u3dMsg.setMsgType(systemTalkBindU3dEntity.getU3dType());
@ -208,290 +214,318 @@ public class BaseWebSocketProcess {
u3dMsg.setExParam(systemTalkBindU3dEntity.getAnswerAction());
u3dMsg.setStatusId(String.valueOf(systemTalkBindU3dEntity.getU3dStatusId()));
u3dMsg.setTime(System.currentTimeMillis());
sendMq(JSONObject.toJSONString(u3dMsg));
return sendMq(JSONObject.toJSONString(u3dMsg));
}
private void sendMq(String msg){
log.info("通知U3DMQ{}", msg);
private Mono<Boolean> sendMq(String msg){
log.info("通知U3DMQ:{}", msg);
try{
//发送消息到MQ通知U3D
mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT,
return mqService.sendMessageWithConfirmation(YunxiRabbitConst.EXCHANGE_YUNXI_EVENT,
YunxiRabbitConst.ROUTE_KEY_YUNXI,
msg).subscribe();
msg);
}catch (Exception e){
log.info("通知U3D MQ异常{}", e);
}
//不调用empty防止影响正常业务
return Mono.just(false);
}
private Mono<Boolean> process(Actions actions, int i, BaseSession baseSession){
if(i < actions.getActions().size()){
Action action = actions.getActions().get(i++);
return process(actions, i, baseSession).flatMap(v -> {
if(action.getSystemTalkAnswerConfigEntity() == null){
log.info("调用千问{}", action.getAsk());
return toQianWen(action, baseSession, AskTypeEnum.TTS.getCode()).flatMap(vo ->{
//千问只调一次
return Mono.empty();
});
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.IOT.getCode())){
if(StringUtils.isNotEmpty(action.getName())){
if(action.getDeviceUserBindEntity() == null){
log.info("匹配时未找到对应的设备,模糊匹配{}", action.getName());
DeviceUserBindRequest deviceUserBindRequest = new DeviceUserBindRequest();
deviceUserBindRequest.setUserId(baseSession.getUserId());
deviceUserBindRequest.setPageSize(200);
deviceUserBindRequest.setBindName(action.getName());
//查询是否有相关设备
return deviceUserBindService.selectDeviceUserBindsByRequest(deviceUserBindRequest)
.defaultIfEmpty(new PagerResult<>(0, null))
.flatMap(binds ->{
if(binds.getTotal() == 0){
//返回告诉没有备
return sendMessage(action,
baseSession,
"未找到" + action.getName() + "设备,无法操做!",
action.getSystemTalkAnswerConfigEntity().getAnswerType());
}else if(binds.getTotal() > 1){
//返回告诉有多个设备请详细说明具体说明设备
//判断action是否有所有全部一切
if(YesNo.YES.getCode().equals(action.getIgnore())){
//忽略词控制所有设备
return controllerDevice(action, binds.getData(), 0, baseSession);
}else{
return sendMessage(action,
baseSession,
"您有多个" + action.getName() + "相同设备,请明确说明",
action.getSystemTalkAnswerConfigEntity().getAnswerType());
}
}else{
//查询是否有相关指令绑定
action.setDeviceUserBindEntity(binds.getData().get(0));
return controllerDevice(action, action.getDeviceUserBindEntity(), baseSession);
}
});
}else{
log.info("匹配时已找到对应的设备{}", action.getName());
return controllerDevice(action, action.getDeviceUserBindEntity(), baseSession).flatMap(b -> {
return Mono.just(false);
});
}
}else{
if(StringUtils.isEmpty(action.getName())){
return sendMessage(action, baseSession, "请说明确的设备名称", action.getSystemTalkAnswerConfigEntity().getAnswerType());
}else{
return sendMessage(action, baseSession, "未找到对应的设备", action.getSystemTalkAnswerConfigEntity().getAnswerType());
}
}
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.WEATHER.getCode())){
ThirdWeatherInfoRequest req = new ThirdWeatherInfoRequest();
//String city = "";
if(action.getLbs() != null && action.getLbs().size() > 0){
//根据地址查询天气
// city = ;
req.setCity(action.getLbs().get(action.getLbs().size() - 1).replace("", "")
.replace("", "").replace("", ""));
}else{
//使用IP查询天气
req.setIp("115.205.2.137");
}
if(action.getTime() == null){
action.setTime(new ActionTime());
action.getTime().setTime("今天");
}
return weatherService.tianqiApi(req).flatMap(t ->{
if(t.getData() == null){
return sendMessage(action, baseSession, "该城市不支持天气查询", action.getSystemTalkAnswerConfigEntity().getAnswerType());
//return Mono.empty();
}
TianqiapiItemResp item = null;
if(StringUtils.isNotEmpty(action.getTime().getDateTime())){
//匹配对应的日期
for (TianqiapiItemResp itemResp : t.getData())
{
if(action.getTime().getDateTime().equals(itemResp.getDate())){
item = itemResp;
break;
}
}
}else{
item = t.getData().get(0);
}
String msg = "";
if(item != null){
//返回给客户端播报内容
msg = t.getCity() + action.getTime().getTime() + "天气"
+ item.getNarrative().replace("每 km / h", "千米每小时")
+ ",空气质量" + item.getAir_level()
+ ",湿度" + item.getHumidity() + ",最低气温" + item.getTem2() + "°C";
if(this instanceof BoxWebSocketHandler){
WeatherResp weatherResp = new WeatherResp();
weatherResp.setWeatherLocal(t.getCity());
weatherResp.setWeatherTemperature(item.getTem1());
weatherResp.setWeatherIcon(item.getWea());
BoxMessageResp resp = new BoxMessageResp();
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
resp.setText(msg);
resp.setWeather(weatherResp);
return sendMessage(action, baseSession, resp);
//return Mono.empty();
}
}else{
msg = action.getSystemTalkAnswerConfigEntity().getAnswerValueFaild();
log.info("执行指令失败");
}
return sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType());
//return Mono.empty();
});
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.MUSIC.getCode())){
String search = action.getAsk().replaceAll(action.getAction(), "").replaceAll("", "");
if(StringUtils.isNotEmpty(action.getPName())){
search = search.replaceAll(action.getPName(), "");
}
BoxMessageResp resp = new BoxMessageResp();
if(action.getSystemTalkAnswerConfigEntity().getPlayType().equals(PlayEnum.START.getCode())){
return musicService.searchMusic(search, 1).defaultIfEmpty(new ArrayList<>()).flatMap(resultSongs -> {
//BoxMessageResp resp = new BoxMessageResp();
MusicResp musicResp = new MusicResp();
if(resultSongs.size() > 0){
//
SongInfoResponse.ResultSong song = resultSongs.get(0);
musicResp.setPlay(PlayEnum.START.getCode());
musicResp.setName(song.getName());
musicResp.setUrl(song.getUrl());
musicResp.setSinger(song.getArtistName());
resp.setText("现在为您播放" + song.getName() +
(StringUtils.isNotEmpty(song.getArtistName()) ? ("来自" + song.getArtistName()) : ""));
}else{
musicResp.setPlay(PlayEnum.NONE.getCode());
resp.setText("未找到相关资源");
}
resp.setMusic(musicResp);
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
return sendMessage(action, baseSession, resp);
});
}else if(baseSession.getMusic() != null){
//做相应的动作
baseSession.getMusic().setPlay(action.getSystemTalkAnswerConfigEntity().getPlayType());
resp.setMusic(baseSession.getMusic());
resp.setText(action.getSystemTalkAnswerConfigEntity().getAnswerValue().replace("#name#", baseSession.getMusic().getName()));
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
//return sendMessage(action, baseSession, resp);
}else{
resp.setText("目前无播放资源,无法操作");
resp.setType(AskTypeEnum.TTS.getCode());
}
return sendMessage(action, baseSession, resp);
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.TTS.getCode())){
if(!action.getAction().equals(action.getAsk())){
return toQianWen(action, baseSession, AskTypeEnum.TTS.getCode()).flatMap(vo ->{
//千问只调一次
return Mono.empty();
});
}else{
return sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode());
}
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.QIU_GUO.getCode())){
return sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode());
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.TIME.getCode())){
DateTimeFormatter df = DateTimeFormatter.ofPattern(action.getSystemTalkAnswerConfigEntity().getAnswerValue());
if(action.getTime() == null){
action.setTime(new ActionTime());
action.getTime().setTime("今天");
}
if(this instanceof BoxWebSocketHandler){
DateTimeResp dateTimeResp = new DateTimeResp();
dateTimeResp.setYear(String.valueOf(action.getTime().getDetailTime().getYear()));
dateTimeResp.setMonth(String.valueOf(action.getTime().getDetailTime().getMonthValue()));
dateTimeResp.setDay(String.valueOf(action.getTime().getDetailTime().getDayOfMonth()));
dateTimeResp.setHour(String.valueOf(action.getTime().getDetailTime().getHour()));
dateTimeResp.setMinute(String.valueOf(action.getTime().getDetailTime().getMinute()));
dateTimeResp.setSecond(String.valueOf(action.getTime().getDetailTime().getSecond()));
dateTimeResp.setWeak(String.valueOf(action.getTime().getDetailTime().getDayOfWeek().getValue()));
BoxMessageResp resp = new BoxMessageResp();
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
resp.setText(action.getTime().getDetailTime().format(df));
resp.setTime(dateTimeResp);
return sendMessage(action, baseSession, resp);
}else{
return sendMessage(action, baseSession, action.getTime().getDetailTime().format(df), AskTypeEnum.TIME.getCode());
}
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.U3D.getCode())){
SystemTalkBindU3dRequest request = new SystemTalkBindU3dRequest();
request.setSystemTalkId(action.getSystemTalkAnswerConfigEntity().getId());
request.setAskCommon(action.getActionCommand());
return systemTalkBindU3dService.selectSystemTalkBindU3dByRequest(request)
.defaultIfEmpty(new SystemTalkBindU3dEntity())
.flatMap(systemTalkBindU3d ->{
if(systemTalkBindU3d.getId() == null){
return sendMessage(action, baseSession, "暂时不支持该指令", AskTypeEnum.TTS.getCode());
//return systemTalkBindU3d;
}
//数字人的id直接用用户id来代替
if(U3dMsgTypeEnum.DANCE.getCode().equals(systemTalkBindU3d.getU3dType())){
//推送客户端跳舞
BoxMessageResp resp = new BoxMessageResp();
resp.setType(AskTypeEnum.U3D.getCode());
resp.setText("开始跳舞");
ActionResp actionResp = new ActionResp();
actionResp.setType(7);
resp.setAction(actionResp);
return sendMessage(action, baseSession, resp);
}else{
//推送MQ换装
toU3DMq(systemTalkBindU3d, baseSession.getUserId());
DeviceUserTalkRecordEntity talkRecord = new DeviceUserTalkRecordEntity();
talkRecord.setAskType(AskTypeEnum.U3D.getCode());
talkRecord.setAskValue(action.getAsk());
talkRecord.setAskKey(action.getAction());
talkRecord.setAnswerValue("正在" + action.getAction() + ",请稍候");
talkRecord.setUserId(baseSession.getUserId());
talkRecord.setDeviceId(baseSession.getDeviceId());
return deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).flatMap(integer -> {
return Mono.just(true);
});
}
});
}else{
return toQianWen(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerType()).flatMap(vo ->{
//千问只调一次
return Mono.empty();
});
}
});
}
return Mono.just(false);
}
protected void processAction(Actions actions, Long userId, BaseSession baseSession) {
protected Mono<Void> processAction(Actions actions, BaseSession baseSession) {
if(actions.getActions() == null || actions.getActions().size() == 0){
//调用千问回答\
log.info("调用千问{}", actions.getRecordText());
Action action = new Action();
action.setAsk(actions.getRecordText());
action.setAction(actions.getRecordText());
toQianWen(action, baseSession, AskTypeEnum.TTS.getCode());
return;
}
//boolean isToQianWen = false;
for (Action action : actions.getActions()
) {
log.info("匹配到自定义指令{}", action.getSystemTalkAnswerConfigEntity());
if(action.getSystemTalkAnswerConfigEntity() == null){
log.info("调用千问{}", action.getAsk());
toQianWen(action, baseSession, AskTypeEnum.TTS.getCode());
return;
//
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.IOT.getCode())){
if(StringUtils.isNotEmpty(action.getName())){
if(action.getDeviceUserBindEntity() == null){
log.info("匹配时未找到对应的设备,模糊匹配{}", action.getName());
DeviceUserBindRequest deviceUserBindRequest = new DeviceUserBindRequest();
deviceUserBindRequest.setUserId(userId);
deviceUserBindRequest.setPageSize(200);
deviceUserBindRequest.setBindName(action.getName());
//查询是否有相关设备
deviceUserBindService.selectDeviceUserBindsByRequest(deviceUserBindRequest)
.defaultIfEmpty(new PagerResult<>(0, null))
.map(binds ->{
if(binds.getTotal() == 0){
//返回告诉没有备
sendMessage(action,
baseSession,
"未找到" + action.getName() + "设备,无法操做!",
action.getSystemTalkAnswerConfigEntity().getAnswerType());
}else if(binds.getTotal() > 1){
//返回告诉有多个设备请详细说明具体说明设备
//判断action是否有所有全部一切
if(YesNo.YES.getCode().equals(action.getIgnore())){
//忽略词控制所有设备
for (DeviceUserBindEntity bindEntity :binds.getData()
) {
action.setDeviceUserBindEntity(bindEntity);
controllerDevice(action, action.getDeviceUserBindEntity(), baseSession);
}
}else{
sendMessage(action,
baseSession,
"您有多个" + action.getName() + "相同设备,请明确说明",
action.getSystemTalkAnswerConfigEntity().getAnswerType());
}
}else{
//查询是否有相关指令绑定
action.setDeviceUserBindEntity(binds.getData().get(0));
controllerDevice(action, action.getDeviceUserBindEntity(), baseSession);
}
return Mono.empty();
}).subscribe();
}else{
log.info("匹配时已找到对应的设备{}", action.getName());
controllerDevice(action, action.getDeviceUserBindEntity(), baseSession);
}
}else{
if(StringUtils.isEmpty(action.getName())){
sendMessage(action, baseSession, "请说明确的设备名称", action.getSystemTalkAnswerConfigEntity().getAnswerType());
}else{
sendMessage(action, baseSession, "未找到对应的设备", action.getSystemTalkAnswerConfigEntity().getAnswerType());
}
}
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.WEATHER.getCode())){
ThirdWeatherInfoRequest req = new ThirdWeatherInfoRequest();
//String city = "";
if(action.getLbs() != null && action.getLbs().size() > 0){
//根据地址查询天气
// city = ;
req.setCity(action.getLbs().get(action.getLbs().size() - 1).replace("", "")
.replace("", "").replace("", ""));
}else{
//使用IP查询天气
req.setIp("115.205.2.137");
}
if(action.getTime() == null){
action.setTime(new ActionTime());
action.getTime().setTime("今天");
}
weatherService.tianqiApi(req).map(t ->{
if(t.getData() == null){
sendMessage(action, baseSession, "该城市不支持天气查询", action.getSystemTalkAnswerConfigEntity().getAnswerType());
return t;
}
TianqiapiItemResp item = null;
if(StringUtils.isNotEmpty(action.getTime().getDateTime())){
//匹配对应的日期
for (TianqiapiItemResp itemResp : t.getData())
{
if(action.getTime().getDateTime().equals(itemResp.getDate())){
item = itemResp;
break;
}
}
}else{
item = t.getData().get(0);
}
String msg = "";
if(item != null){
//返回给客户端播报内容
msg = t.getCity() + action.getTime().getTime() + "天气"
+ item.getNarrative().replace("每 km / h", "千米每小时")
+ ",空气质量" + item.getAir_level()
+ ",湿度" + item.getHumidity() + ",最低气温" + item.getTem2() + "°C";
if(this instanceof BoxWebSocketHandler){
WeatherResp weatherResp = new WeatherResp();
weatherResp.setWeatherLocal(t.getCity());
weatherResp.setWeatherTemperature(item.getTem1());
weatherResp.setWeatherIcon(item.getWea());
BoxMessageResp resp = new BoxMessageResp();
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
resp.setText(msg);
resp.setWeather(weatherResp);
sendMessage(action, baseSession, resp);
return t;
}
}else{
msg = action.getSystemTalkAnswerConfigEntity().getAnswerValueFaild();
log.info("执行指令失败");
}
sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType());
return t;
}).subscribe();
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.MUSIC.getCode())){
String search = action.getAsk().replaceAll(action.getAction(), "").replaceAll("", "");
if(StringUtils.isNotEmpty(action.getPName())){
search = search.replaceAll(action.getPName(), "");
}
BoxMessageResp resp = new BoxMessageResp();
if(action.getSystemTalkAnswerConfigEntity().getPlayType().equals(PlayEnum.START.getCode())){
musicService.searchMusic(search, 1).defaultIfEmpty(new ArrayList<>()).map(resultSongs -> {
//BoxMessageResp resp = new BoxMessageResp();
MusicResp musicResp = new MusicResp();
if(resultSongs.size() > 0){
//
SongInfoResponse.ResultSong song = resultSongs.get(0);
musicResp.setPlay(PlayEnum.START.getCode());
musicResp.setName(song.getName());
musicResp.setUrl(song.getUrl());
musicResp.setSinger(song.getArtistName());
resp.setText("现在为您播放" + song.getName() +
(StringUtils.isNotEmpty(song.getArtistName()) ? ("来自" + song.getArtistName()) : ""));
}else{
musicResp.setPlay(PlayEnum.NONE.getCode());
resp.setText("未找到相关资源");
}
resp.setMusic(musicResp);
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
sendMessage(action, baseSession, resp);
return resultSongs;
}).subscribe();
}else if(baseSession.getMusic() != null){
//做相应的动作
baseSession.getMusic().setPlay(action.getSystemTalkAnswerConfigEntity().getPlayType());
resp.setMusic(baseSession.getMusic());
resp.setText(action.getSystemTalkAnswerConfigEntity().getAnswerValue().replace("#name#", baseSession.getMusic().getName()));
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
sendMessage(action, baseSession, resp);
}else{
resp.setText("目前无播放资源,无法操作");
resp.setType(AskTypeEnum.TTS.getCode());
sendMessage(action, baseSession, resp);
}
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.TTS.getCode())){
if(!action.getAction().equals(action.getAsk())){
toQianWen(action, baseSession, AskTypeEnum.TTS.getCode());
return;
}else{
sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode());
}
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.QIU_GUO.getCode())){
sendMessage(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerValue(), AskTypeEnum.TTS.getCode());
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.TIME.getCode())){
DateTimeFormatter df = DateTimeFormatter.ofPattern(action.getSystemTalkAnswerConfigEntity().getAnswerValue());
if(this instanceof BoxWebSocketHandler){
DateTimeResp dateTimeResp = new DateTimeResp();
dateTimeResp.setYear(String.valueOf(action.getTime().getDetailTime().getYear()));
dateTimeResp.setMonth(String.valueOf(action.getTime().getDetailTime().getMonthValue()));
dateTimeResp.setDay(String.valueOf(action.getTime().getDetailTime().getDayOfMonth()));
dateTimeResp.setHour(String.valueOf(action.getTime().getDetailTime().getHour()));
dateTimeResp.setMinute(String.valueOf(action.getTime().getDetailTime().getMinute()));
dateTimeResp.setSecond(String.valueOf(action.getTime().getDetailTime().getSecond()));
dateTimeResp.setWeak(String.valueOf(action.getTime().getDetailTime().getDayOfWeek().getValue()));
BoxMessageResp resp = new BoxMessageResp();
resp.setType(action.getSystemTalkAnswerConfigEntity().getAnswerType());
resp.setText(action.getTime().getDetailTime().format(df));
resp.setTime(dateTimeResp);
sendMessage(action, baseSession, resp);
}else{
sendMessage(action, baseSession, action.getTime().getDetailTime().format(df), AskTypeEnum.TIME.getCode());
}
}else if(action.getSystemTalkAnswerConfigEntity().getAnswerType().equals(AskTypeEnum.U3D.getCode())){
SystemTalkBindU3dRequest request = new SystemTalkBindU3dRequest();
request.setSystemTalkId(action.getSystemTalkAnswerConfigEntity().getId());
request.setAskCommon(action.getActionCommand());
systemTalkBindU3dService.selectSystemTalkBindU3dByRequest(request)
.defaultIfEmpty(new SystemTalkBindU3dEntity())
.map(systemTalkBindU3d ->{
if(systemTalkBindU3d.getId() == null){
sendMessage(action, baseSession, "暂时不支持该指令", AskTypeEnum.TTS.getCode());
return systemTalkBindU3d;
}
//数字人的id直接用用户id来代替
if(U3dMsgTypeEnum.DANCE.getCode().equals(systemTalkBindU3d.getU3dType())){
//推送客户端跳舞
BoxMessageResp resp = new BoxMessageResp();
resp.setType(AskTypeEnum.U3D.getCode());
resp.setText("开始跳舞");
ActionResp actionResp = new ActionResp();
actionResp.setType(7);
resp.setAction(actionResp);
sendMessage(action, baseSession, resp);
}else{
//推送MQ换装
toU3DMq(systemTalkBindU3d, baseSession.getUserId());
DeviceUserTalkRecordEntity talkRecord = new DeviceUserTalkRecordEntity();
talkRecord.setAskType(AskTypeEnum.U3D.getCode());
talkRecord.setAskValue(action.getAsk());
talkRecord.setAskKey(action.getAction());
talkRecord.setAnswerValue("正在" + action.getAction() + ",请稍候");
talkRecord.setUserId(baseSession.getUserId());
talkRecord.setDeviceId(baseSession.getDeviceId());
deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).subscribe();
}
return systemTalkBindU3d;
}).subscribe();
}else{
toQianWen(action, baseSession, action.getSystemTalkAnswerConfigEntity().getAnswerType());
return;
}
return toQianWen(action, baseSession, AskTypeEnum.TTS.getCode());
}
return process(actions, 0, baseSession).flatMap(vo ->{
return Mono.empty();
});
}
private void controllerDevice(Action action, DeviceUserBindEntity deviceUserBindEntity, BaseSession baseSession) {
private Mono<Boolean> controllerDevice(Action action,
List<DeviceUserBindEntity> deviceUserBindEntitys,
int i,
BaseSession baseSession){
if(i < deviceUserBindEntitys.size()){
DeviceUserBindEntity deviceUserBindEntity = deviceUserBindEntitys.get(i++);
action.setDeviceUserBindEntity(deviceUserBindEntity);
return controllerDevice(action, deviceUserBindEntitys, i, baseSession).flatMap(b -> {
return controllerDevice(action, deviceUserBindEntity, baseSession);
});
}
return Mono.just(false);
}
private Mono<Boolean> controllerDevice(Action action, DeviceUserBindEntity deviceUserBindEntity, BaseSession baseSession) {
SystemTalkBindDeviceRequest systemTalkBindDeviceRequest = new SystemTalkBindDeviceRequest();
systemTalkBindDeviceRequest.setCategoryCode(deviceUserBindEntity.getCategoryCode());
@ -499,12 +533,12 @@ public class BaseWebSocketProcess {
if(StringUtils.isNotEmpty(action.getActionCommand())){
systemTalkBindDeviceRequest.setAskCommon(action.getActionCommand());
}
systemTalkBindDeviceService.selectSystemTalkBindDeviceByRequest(systemTalkBindDeviceRequest)
return systemTalkBindDeviceService.selectSystemTalkBindDeviceByRequest(systemTalkBindDeviceRequest)
.defaultIfEmpty(new SystemTalkBindDeviceEntity())
.map(systemTalkBindDeviceEntity -> {
.flatMap(systemTalkBindDeviceEntity -> {
if(systemTalkBindDeviceEntity.getId() == null){
//通知不支持的指令
sendMessage(action, baseSession,
return sendMessage(action, baseSession,
deviceUserBindEntity.getBindName() + "不支持" + action.getAction() + "指令!",
action.getSystemTalkAnswerConfigEntity().getAnswerType());
@ -514,46 +548,46 @@ public class BaseWebSocketProcess {
query.setDeviceId(deviceUserBindEntity.getOtherDeviceId());
query.setValue(action.getStatus());
query.setUserHandlingDeviceId(systemTalkBindDeviceEntity.getUserHandlingId());
tuyaDeviceService.controlDevice(query).map(isOk ->{
return tuyaDeviceService.controlDevice(query).flatMap(isOk ->{
String msg = "";
if(isOk.getCode().equals(RespCodeEnum.SUCESS.getCode())){
if(action.getDeviceUserBindEntity().getU3dId() != null){
toU3DMq(action, systemTalkBindDeviceEntity, action.getDeviceUserBindEntity().getU3dId());
}
log.info("执行指令成功");
//通知打开灯成功
msg = systemTalkBindDeviceEntity.getAnswerValue().replaceAll("#name#", deviceUserBindEntity.getBindName());
if(StringUtils.isNotEmpty(action.getStatus())){
msg = msg.replace("#value#", action.getStatus());
}
return sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType()).flatMap(b -> {
if(deviceUserBindEntity.getU3dId() != null){
return toU3DMq(action, systemTalkBindDeviceEntity, deviceUserBindEntity.getU3dId()).flatMap(bo -> {
return Mono.just(bo);
});
}
return Mono.just(b);
});
log.info("执行指令");
}else{
//通知开灯失败;
msg = systemTalkBindDeviceEntity.getAnswerValueFaild().replaceAll("#name#", deviceUserBindEntity.getBindName());
log.info("执行指令失败");
return sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType());
}
sendMessage(action, baseSession, msg, action.getSystemTalkAnswerConfigEntity().getAnswerType());
return isOk;
}).subscribe();
});
}
return systemTalkBindDeviceEntity;
}).subscribe();
});
}
private void sendMessage(Action action, BaseSession baseSession, String message, Integer type){
private Mono<Boolean> sendMessage(Action action, BaseSession baseSession, String message, Integer type){
BaseMessageResp resp = new BaseMessageResp();
resp.setType(type);
resp.setText(message);
sendMessage(action, baseSession, resp);
return sendMessage(action, baseSession, resp);
}
private void sendMessage(Action action, BaseSession baseSession, BaseMessageResp resp){
private Mono<Boolean> sendMessage(Action action, BaseSession baseSession, BaseMessageResp resp){
DeviceUserTalkRecordEntity talkRecord = new DeviceUserTalkRecordEntity();
talkRecord.setAskType(resp.getType());
talkRecord.setAskValue(action.getAsk());
@ -610,7 +644,7 @@ public class BaseWebSocketProcess {
}
}
deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).map(i ->{
return deviceUserTalkRecordService.insertDeviceUserTalkRecord(talkRecord).flatMap(i ->{
String msg = JSONObject.toJSONString(resp);
if(this instanceof BoxWebSocketHandler){
log.info("果box聊天记录同步到客户端");
@ -621,8 +655,8 @@ public class BaseWebSocketProcess {
}
sendMsg(baseSession, msg);
return Mono.empty();
}).subscribe();//保存聊天记录
return Mono.just(true);
});//保存聊天记录
}
/**
@ -631,9 +665,9 @@ public class BaseWebSocketProcess {
* @param message
* @param type
*/
protected void closeSendMsg(BaseSession baseSession, String message, Integer type){
protected Mono<Void> closeSendMsg(BaseSession baseSession, String message, Integer type){
normalSendMsg(baseSession, message, type);
baseSession.getSession().close().subscribe();
return baseSession.getSession().close();
}
@ -654,6 +688,7 @@ public class BaseWebSocketProcess {
private void sendMsg(BaseSession baseSession, String msg) {
log.info("推到终端:{},SN:{},userId:{},消息内容:{}", baseSession.getSessionType(), baseSession.getSn(), baseSession.getUserId(), msg);
baseSession.getSink().next(baseSession.getSession().textMessage(msg));
}
public BoxSession getBoxSessionWithSn(String sn) {

View File

@ -27,6 +27,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.*;
import reactor.core.publisher.*;
import reactor.util.context.Context;
import javax.annotation.Resource;
import java.time.Duration;
@ -79,29 +80,21 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
log.info("登录成功SN:{}", sn);
Mono<Void> input = session.receive().map(webSocketMessage ->{
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
String text = webSocketMessage.getPayloadAsText();
log.info("设备端收到消息:{}", text);
BoxTalkMessage boxTalkMessage = JSONObject.parseObject(text, BoxTalkMessage.class);
BoxSession boxSession1 = getBoxSessionWithSn(boxTalkMessage.getSn());
if(!boxSession.equals(boxSession1)){
log.info("消息发送异常或者未验签就收到信息不是同一个链接。可能传错SN");
closeSendMsg(boxSession, "请等待验签结束或者SN可能错误", AskTypeEnum.TTS.getCode());
return Mono.empty();
}
nlpService.getActionWithLacSingle(boxSession.getUserId(), boxTalkMessage.getMessage()).defaultIfEmpty(new Actions()).map(actions -> {
processAction(actions, userId, boxSession);
return Mono.empty();
}).subscribe();
newMessage(webSocketMessage, boxSession).contextWrite(context -> {
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
log.info("收到SN:{},消息:{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage());
//MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
return Mono.empty();
return contextTmp;
}).subscribe();
return webSocketMessage;
}).then();
//校验
checkToken(boxSession, sn, linkTime, signature, userId);
checkToken(boxSession, sn, linkTime, signature, userId).contextWrite(context -> {
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
return contextTmp;
}).subscribe();
Mono<Void> output = session.send(Flux.create(sink -> boxSession.setSink(sink))).then();
@ -109,23 +102,11 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
// 也随之产生 error complete此时其它的 Mono 则会被执行取消操作
return Mono.zip(input, output).doFinally(signalType -> {
BoxSession boxSession1 = getBoxSessionWithSn(boxSession.getSn());
if(boxSession == boxSession1){
boxGroup.remove(boxSession.getSn());//断链后及时移除
log.info("设备断开连接SN{}", boxSession.getSn());
//通知用户端设备绑定成功
sendNoticeToUser(boxSession.getUserId(), "设备离线,设备序列号:" + boxSession.getSn(), AskTypeEnum.BOX_OFF_LINE.getCode());
deviceInfoService.setOnLineStatus(sn, YesNo.NO.getCode()).subscribe();
UserDeviceInfoModel userDeviceInfoModel = new UserDeviceInfoModel();
userDeviceInfoModel.setStatus(YesNo.NO.getCode());
userDeviceInfoModel.setUserId(userId);
userDeviceInfoModel.setSn(sn);
reactiveStringRedisTemplate.opsForValue().set(RedisConstans.USER_BOX_INFO + userId,
JSONObject.toJSONString(userDeviceInfoModel),
Duration.ofDays(RedisConstans.TEN_YEAR)).subscribe();
return;
}
log.info("被踢下线断开连接:{}", boxSession.getSn());
disconnect(boxSession).contextWrite(context -> {
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
return contextTmp;
}).subscribe();
}).then();
}
@ -137,26 +118,73 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
}
}
private void errorLogin(BaseSession boxSession, String sn){
//清除异常redis
reactiveStringRedisTemplate.opsForValue().delete(RedisConstans.DEVICE_INFO + sn).subscribe();//不需要时间
closeSendMsg(boxSession, "异常,请重新登录", AskTypeEnum.TTS.getCode());
private Mono<Void> newMessage(WebSocketMessage webSocketMessage, BoxSession boxSession){
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
String text = webSocketMessage.getPayloadAsText();
log.info("设备端收到消息:{}", text);
BoxTalkMessage boxTalkMessage = JSONObject.parseObject(text, BoxTalkMessage.class);
BoxSession boxSession1 = getBoxSessionWithSn(boxTalkMessage.getSn());
if(!boxSession.equals(boxSession1)){
log.info("消息发送异常或者未验签就收到信息不是同一个链接。可能传错SN");
return closeSendMsg(boxSession, "请等待验签结束或者SN可能错误", AskTypeEnum.TTS.getCode());
}
log.info("收到SN:{},消息:{}", boxTalkMessage.getSn(), boxTalkMessage.getMessage());
return nlpService.getActionWithLacSingle(boxSession.getUserId(), boxTalkMessage.getMessage()).defaultIfEmpty(new Actions()).flatMap(actions -> {
return processAction(actions, boxSession);
//return Mono.empty();
});
}
private void checkToken(BoxSession boxSession, String sn, Long linkTime, String signature, Long userId){
reactiveStringRedisTemplate.opsForValue().get(RedisConstans.DEVICE_INFO + sn).defaultIfEmpty("").flatMap(s -> {
if(com.qiuguo.iot.base.utils.StringUtils.isNotBlank(s) && s.length() < 1000){
private Mono<Void> disconnect(BoxSession boxSession){
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, boxSession.getLogId());
BoxSession boxSession1 = getBoxSessionWithSn(boxSession.getSn());
if(boxSession.equals(boxSession1)){
//断链后及时移除
boxGroup.remove(boxSession.getSn());
log.info("设备断开连接SN{}", boxSession.getSn());
//通知用户端设备绑定成功
sendNoticeToUser(boxSession.getUserId(), "设备离线,设备序列号:" + boxSession.getSn(), AskTypeEnum.BOX_OFF_LINE.getCode());
return deviceInfoService.setOnLineStatus(boxSession.getSn(), YesNo.NO.getCode()).flatMap(integer -> {
UserDeviceInfoModel userDeviceInfoModel = new UserDeviceInfoModel();
userDeviceInfoModel.setStatus(YesNo.NO.getCode());
userDeviceInfoModel.setUserId(boxSession.getUserId());
userDeviceInfoModel.setSn(boxSession.getSn());
return reactiveStringRedisTemplate.opsForValue().set(RedisConstans.USER_BOX_INFO + boxSession.getUserId(),
JSONObject.toJSONString(userDeviceInfoModel),
Duration.ofDays(RedisConstans.TEN_YEAR)).flatMap(b -> {
return Mono.empty();
});
});
}
log.info("被踢下线断开连接:{}", boxSession.getSn());
return Mono.empty();
}
private Mono<Void> errorLogin(BaseSession boxSession, String sn){
//清除异常redis
reactiveStringRedisTemplate.opsForValue().delete(RedisConstans.DEVICE_INFO + sn).subscribe();
return closeSendMsg(boxSession, "异常,请重新登录", AskTypeEnum.TTS.getCode());
}
private Mono<Void> checkToken(BoxSession boxSession, String sn, Long linkTime, String signature, Long userId){
return reactiveStringRedisTemplate.opsForValue().get(RedisConstans.DEVICE_INFO + sn).defaultIfEmpty("").flatMap(s -> {
if(com.qiuguo.iot.base.utils.StringUtils.isNotBlank(s)){
try{
DeviceInfoEntity dv = JSONObject.parseObject(s, DeviceInfoEntity.class);
if(dv.getId() == null){
log.info("redis设备缓存异常清楚");
errorLogin(boxSession, sn);
return errorLogin(boxSession, sn).flatMap(v -> {
return Mono.empty();
});
}
return Mono.just(dv);
}catch (Exception e){
log.info("转换异常清除redis。下次连接成功{}", e);
errorLogin(boxSession, sn);
return errorLogin(boxSession, sn).flatMap(v -> {
return Mono.empty();
});
}
}
DeviceInfoRequest request = new DeviceInfoRequest();
@ -164,97 +192,104 @@ public class BoxWebSocketHandler extends BaseWebSocketProcess implements WebSock
return deviceInfoService.selectDeviceInfoByRequest(request).defaultIfEmpty(new DeviceInfoEntity()).map(dv -> {
if(dv.getId() != null){
String redis = JSONObject.toJSONString(dv);
reactiveStringRedisTemplate.opsForValue().set(RedisConstans.DEVICE_INFO + dv.getSn(), redis, Duration.ofHours(1)).subscribe();//直接提交订阅
}
return dv;
});
}).map(dv ->{
}).flatMap(dv ->{
String snMd5 = MD5.create().digestHex(sn).toUpperCase();
String wifiMd5 = MD5.create().digestHex(dv.getWifiMac()).toUpperCase();
String btMd5 = MD5.create().digestHex(dv.getBtMac()).toUpperCase();
String signalMd5 = MD5.create().digestHex(snMd5 + wifiMd5 + btMd5 + linkTime + dv.getKey()).toUpperCase();
if(!signalMd5.equals(signature)){
log.info("设备{},验签失败。正常签:{}", sn, signalMd5);
//session.send(session.textMessage(""));
if(boxSession != null){
closeSendMsg(boxSession, "验签失败", AskTypeEnum.TTS.getCode());
return closeSendMsg(boxSession, "验签失败", AskTypeEnum.TTS.getCode()).map(v ->{
return Mono.empty();
});
}
return Mono.just(dv);
}else{
log.info("设备{},验签成功", sn);
BoxSession oldBoxSession = getBoxSessionWithSn(sn);
if(oldBoxSession != null){
//
closeSendMsg(oldBoxSession, "您在其他地方登录", AskTypeEnum.TTS.getCode());
closeSendMsg(oldBoxSession, "您在其他地方登录", AskTypeEnum.TTS.getCode()).map(v ->{
return dv;
});
}
boxSession.setDeviceId(dv.getId());
boxGroup.put(sn, boxSession);
bindBox(dv, userId);
return Mono.just(dv);
}
return Mono.empty();
}).subscribe();
}).flatMap(d -> {
DeviceInfoEntity dv = (DeviceInfoEntity)d;
boxSession.setDeviceId(dv.getId());
boxGroup.put(sn, boxSession);
return bindBox(dv, userId).flatMap(db ->{
return Mono.empty();
});
});
}
private void bindBox(DeviceInfoEntity dv, Long userId){
private Mono<DeviceUserBindEntity> bindBox(DeviceInfoEntity dv, Long userId){
log.info("开始绑定设备userId:{} SN{}", userId, dv);
DeviceUserBindRequest request = new DeviceUserBindRequest();
request.setUserId(userId);
request.setDeviceId(dv.getId());
//跟新在线状态
deviceInfoService.setOnLineStatus(dv.getId(), YesNo.YES.getCode()).subscribe();
return deviceInfoService.setOnLineStatus(dv.getId(), YesNo.YES.getCode()).flatMap(integer -> {
return deviceUserBindService.selectDeviceUserBindByRequest(request)
.defaultIfEmpty(new DeviceUserBindEntity())
.flatMap(entity ->{
if(entity.getId() == null){
entity.setUserId(userId);
entity.setDeviceId(dv.getId());
//设置为主设备
entity.setIsMain(YesNo.YES.getCode());
entity.setOtherDeviceId(dv.getSn());
entity.setCategoryCode(DeviceCodeEnum.BOX.getName());
entity.setBindName("果宝儿Box");
return deviceUserBindService.setNoMain(userId, DeviceTypeEnum.GUO_BOX.getCode()).defaultIfEmpty(0).flatMap(m ->{
log.info("解除历史isMain标注个数{}", m);
return deviceUserBindService.insertDeviceUserBind(entity).flatMap(l ->{
log.info("绑定成功SN{} userId:{}", dv, userId);
//下面所有的以前未主设备改成非主设备
//通知用户端设备绑定成功
sendNoticeToUser(userId, "设备绑定成功,设备序列号:" + dv.getSn(), AskTypeEnum.DEVICE_BIND.getCode());
UserDeviceInfoModel userDeviceInfoModel = new UserDeviceInfoModel();
userDeviceInfoModel.setStatus(YesNo.YES.getCode());
userDeviceInfoModel.setUserId(userId);
userDeviceInfoModel.setSn(entity.getOtherDeviceId());
return reactiveStringRedisTemplate.opsForValue().set(RedisConstans.USER_BOX_INFO + userId,
JSONObject.toJSONString(userDeviceInfoModel),
Duration.ofDays(RedisConstans.TEN_YEAR)).flatMap(b -> {
return Mono.just(entity);
});
});
});
deviceUserBindService.selectDeviceUserBindByRequest(request)
.defaultIfEmpty(new DeviceUserBindEntity())
.map(entity ->{
if(entity.getId() == null){
entity.setUserId(userId);
entity.setDeviceId(dv.getId());
//设置为主设备
entity.setIsMain(YesNo.YES.getCode());
entity.setOtherDeviceId(dv.getSn());
entity.setCategoryCode(DeviceCodeEnum.BOX.getName());
entity.setBindName("果宝儿Box");
deviceUserBindService.setNoMain(userId, DeviceTypeEnum.GUO_BOX.getCode()).defaultIfEmpty(0).map(m ->{
log.info("解除历史isMain标注个数{}", m);
deviceUserBindService.insertDeviceUserBind(entity).map(l ->{
log.info("绑定成功SN{} userId:{}", dv, userId);
//下面所有的以前未主设备改成非主设备
//通知用户端设备绑定成功
sendNoticeToUser(userId, "设备绑定成功,设备序列号:" + dv.getSn(), AskTypeEnum.DEVICE_BIND.getCode());
UserDeviceInfoModel userDeviceInfoModel = new UserDeviceInfoModel();
userDeviceInfoModel.setStatus(YesNo.YES.getCode());
userDeviceInfoModel.setUserId(userId);
userDeviceInfoModel.setSn(entity.getOtherDeviceId());
reactiveStringRedisTemplate.opsForValue().set(RedisConstans.USER_BOX_INFO + userId,
JSONObject.toJSONString(userDeviceInfoModel),
Duration.ofDays(RedisConstans.TEN_YEAR)).subscribe();
return Mono.empty();
}).subscribe();
return Mono.empty();
}).subscribe();
}else{
if(entity.getIsBind().equals(YesNo.YES.getCode())){
//通知用户端设备绑定成功
sendNoticeToUser(userId, "设备联网成功,设备序列号:" + dv.getSn(), AskTypeEnum.BOX_ON_LINE.getCode());
}else{
//通知用户端设备绑定成功
BaseSession boxSession = getBoxSessionWithSn(dv.getSn());
if(boxSession != null){
closeSendMsg(boxSession, "设备已解绑无法继续使用", AskTypeEnum.DEVICE_UNBIND.getCode());
if(entity.getIsBind().equals(YesNo.YES.getCode())){
//通知用户端设备绑定成功
sendNoticeToUser(userId, "设备联网成功,设备序列号:" + dv.getSn(), AskTypeEnum.BOX_ON_LINE.getCode());
}else{
//通知用户端设备绑定成功
BaseSession boxSession = getBoxSessionWithSn(dv.getSn());
if(boxSession != null){
return closeSendMsg(boxSession, "设备已解绑无法继续使用", AskTypeEnum.DEVICE_UNBIND.getCode()).flatMap(
v -> {
return Mono.just(entity);
}
);
}
}
}
}
return Mono.empty();
}).subscribe();
}
return Mono.just(entity);
});
});
}
}

View File

@ -7,6 +7,7 @@ import com.qiuguo.iot.base.enums.DeviceTypeEnum;
import com.qiuguo.iot.base.enums.YesNo;
import com.qiuguo.iot.base.utils.WebClientUtils;
import com.qiuguo.iot.box.websocket.api.domain.BaseSession;
import com.qiuguo.iot.box.websocket.api.domain.box.BoxSession;
import com.qiuguo.iot.box.websocket.api.domain.user.UserTalkMessage;
import com.qiuguo.iot.box.websocket.api.filter.LogMdcConfiguration;
import com.qiuguo.iot.box.websocket.api.filter.LogWebFilter;
@ -22,9 +23,11 @@ import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import javax.annotation.Resource;
import java.util.HashMap;
@ -72,36 +75,20 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
userSession.setLogId(headers.get(LogMdcConfiguration.PRINT_LOG_ID).get(0));
log.info("用户成功userId:{}", userId);
Mono<Void> input = session.receive().map(webSocketMessage ->{
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId());
String text = webSocketMessage.getPayloadAsText();
log.info("收到用户消息:{}", text);
UserTalkMessage userTalkMessage = JSONObject.parseObject(text, UserTalkMessage.class);
BaseSession userSession1 = getUserSessionWithUserId(userTalkMessage.getUserId());
if(!userSession.equals(userSession1)){
log.info("消息发送异常或者未验签就收到信息不是同一个链接。可能传错用户ID");
closeSendMsg(userSession, "请等待验签结束或者用户ID可能错误", AskTypeEnum.TTS.getCode());
return Mono.empty();
}
nlpService.getActionWithLacSingle(userSession.getUserId(), userTalkMessage.getMessage())
.defaultIfEmpty(new Actions()).map(actions -> {
//处理
if(actions.getActions() == null || actions.getActions().size() == 0){
//调用千问回答
log.info("未匹配到自定义命令,调用千问");
}else{
processAction(actions, userId, userSession);
}
newMessage(webSocketMessage, userSession).contextWrite(context -> {
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId());
return Mono.empty();
}).subscribe();
log.info("收到用户userId:{},消息:{}", userTalkMessage.getUserId(), userTalkMessage.getMessage());
//MDC.remove(LogMdcConfiguration.PRINT_LOG_ID);
return Mono.empty();
return contextTmp;
}).subscribe();
return webSocketMessage;
}).then();
checkToken(userSession, type, token, userId);
checkToken(userSession, type, token, userId).contextWrite(context -> {
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId());
return contextTmp;
}).subscribe();
Mono<Void> output = session.send(Flux.create(sink -> userSession.setSink(sink))).then();
@ -109,19 +96,47 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
// 也随之产生 error complete此时其它的 Mono 则会被执行取消操作
return Mono.zip(input, output).doFinally(signalType -> {
BaseSession userSession1 = getUserSessionWithUserId(userId);
if(userSession1 == userSession){
userGroup.remove(userSession.getUserId());//断链后及时移除
log.info("用户断开连接userId{}", userSession.getUserId());
}
disconnect(userSession).contextWrite(context -> {
Context contextTmp = context.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId());
return contextTmp;
}).subscribe();;
}).then();
}
private void checkToken(BaseSession userSession, String type, String token, Long userId){
private Mono<Void> newMessage(WebSocketMessage webSocketMessage, BaseSession userSession){
MDC.put(LogMdcConfiguration.PRINT_LOG_ID, userSession.getLogId());
String text = webSocketMessage.getPayloadAsText();
log.info("收到用户消息:{}", text);
UserTalkMessage userTalkMessage = JSONObject.parseObject(text, UserTalkMessage.class);
BaseSession userSession1 = getUserSessionWithUserId(userTalkMessage.getUserId());
if(!userSession.equals(userSession1)){
log.info("消息发送异常或者未验签就收到信息不是同一个链接。可能传错用户ID");
return closeSendMsg(userSession, "请等待验签结束或者用户ID可能错误", AskTypeEnum.TTS.getCode());
}
log.info("收到用户userId:{},消息:{}", userTalkMessage.getUserId(), userTalkMessage.getMessage());
return nlpService.getActionWithLacSingle(userSession.getUserId(), userTalkMessage.getMessage())
.defaultIfEmpty(new Actions()).flatMap(actions -> {
//处理
return processAction(actions, userSession);
});
}
private Mono<Void> disconnect(BaseSession userSession){
BaseSession userSession1 = getUserSessionWithUserId(userSession.getUserId());
if(userSession.equals(userSession1)){
userGroup.remove(userSession.getUserId());//断链后及时移除
log.info("用户断开连接userId{}", userSession.getUserId());
}
return Mono.empty();
}
private Mono<Void> checkToken(BaseSession userSession, String type, String token, Long userId){
Map<String, String> reqHead = new HashMap<>();
reqHead.put(apiType, type);
reqHead.put(apiToken, token);
WebClientUtils.get(checkTokenUrl, reqHead).defaultIfEmpty(new JSONObject()).map(jsonObject -> {
return WebClientUtils.get(checkTokenUrl, reqHead).defaultIfEmpty(new JSONObject()).flatMap(jsonObject -> {
log.info("验签获取的数据{}", jsonObject);
if(jsonObject.getInteger("code").equals(YesNo.YES.getCode())){
Long userId1 = jsonObject.getJSONObject("data").getLong("id");
@ -138,9 +153,9 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
request.setDeviceType(DeviceTypeEnum.GUO_BOX.getCode());
request.setIsMain(YesNo.YES.getCode());
deviceUserBindService.selectDeviceUserBindByRequest(request)
return deviceUserBindService.selectDeviceUserBindByRequest(request)
.defaultIfEmpty(new DeviceUserBindEntity())
.map(deviceUserBindEntity -> {
.flatMap(deviceUserBindEntity -> {
if(deviceUserBindEntity.getId() != null){
log.info("用户绑定信息为{}", deviceUserBindEntity);
@ -151,15 +166,12 @@ public class CustomerWebSocketHandler extends BaseWebSocketProcess implements We
normalSendMsg(userSession, "您暂未绑定果宝儿Box快去绑定吧", AskTypeEnum.TTS.getCode());
}
return Mono.empty();
}).subscribe();
return Mono.empty();
});
}
}
log.info("验签失败{}", userId);
closeSendMsg(userSession, "非法登录", AskTypeEnum.TTS.getCode());
return Mono.empty();
}).subscribe();
return closeSendMsg(userSession, "非法登录", AskTypeEnum.TTS.getCode());
});
}