From 60059ab54f11fa5b7a7f0ffb221c0d8e33747aab Mon Sep 17 00:00:00 2001 From: wulin Date: Wed, 11 Oct 2023 17:05:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E5=85=A5=E9=98=BF=E9=87=8C=E4=BA=91?= =?UTF-8?q?=E5=8D=83=E9=97=AE=EF=BC=8C=E4=BC=98=E5=8C=96=E6=B0=94=E6=B8=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iot-common/iot-third/pom.xml | 7 ++ .../iot/third/entity/AliQianWenEntity.java | 115 ++++++++++++++++++ .../third/enums/ChinesePartSpeechEnum.java | 8 +- .../qiuguo/iot/third/service/QWenService.java | 36 +++++- .../http/api/filter/LogMdcConfiguration.java | 1 + .../user/api/filter/LogMdcConfiguration.java | 1 + iot-modules/iot-box-websocket-api/pom.xml | 1 + .../api/filter/LogMdcConfiguration.java | 1 + .../api/handler/BaseWebSocketProcess.java | 18 ++- .../src/main/resources/bootstrap-dev.yml | 4 +- 10 files changed, 183 insertions(+), 9 deletions(-) create mode 100644 iot-common/iot-third/src/main/java/com/qiuguo/iot/third/entity/AliQianWenEntity.java diff --git a/iot-common/iot-third/pom.xml b/iot-common/iot-third/pom.xml index a15e2be..3337a76 100644 --- a/iot-common/iot-third/pom.xml +++ b/iot-common/iot-third/pom.xml @@ -63,6 +63,13 @@ spring-data-redis + + + com.alibaba + dashscope-sdk-java + 2.6.3 + + diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/entity/AliQianWenEntity.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/entity/AliQianWenEntity.java new file mode 100644 index 0000000..f07643d --- /dev/null +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/entity/AliQianWenEntity.java @@ -0,0 +1,115 @@ +package com.qiuguo.iot.third.entity; + +import com.alibaba.dashscope.aigc.generation.Generation; +import com.alibaba.dashscope.aigc.generation.GenerationResult; +import com.alibaba.dashscope.aigc.generation.models.QwenParam; +import com.alibaba.dashscope.common.Message; +import com.alibaba.dashscope.common.MessageManager; +import com.alibaba.dashscope.common.ResultCallback; +import com.alibaba.dashscope.common.Role; +import com.alibaba.dashscope.exception.InputRequiredException; +import com.alibaba.dashscope.exception.NoApiKeyException; +import com.alibaba.dashscope.utils.JsonUtils; +import io.reactivex.Flowable; +import io.reactivex.functions.Consumer; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.Semaphore; + +@Data +@Slf4j +public class AliQianWenEntity { + /** + * 回话唯一标识 + */ + String talkId; + + /** + * 阿里千问配置参数 + */ + QwenParam qwenParam; + + /** + * 消息管理,上下文。默认最多10个 + * 上下文传送过程中需要单独收费 + */ + String qwKey; + + MessageManager msgManager = new MessageManager(10); + + Message userMsg; + + GenerationResult lastGenerationResult = null; + + public AliQianWenEntity(String qwKey){ + this.qwKey = qwKey; + qwenParam = QwenParam.builder().model(Generation.Models.QWEN_PLUS) + .resultFormat(QwenParam.ResultFormat.MESSAGE) + .topP(0.8) + .apiKey(qwKey) + .enableSearch(true) + //.incrementalOutput(true) // get streaming output incrementally + .build(); + /*Message systemMsg = + Message.builder().role(Role.SYSTEM.getValue()).content("你是智能助手机器人").build(); + + msgManager.add(systemMsg);*/ + } + + public void sendMessage(String msg, Consumer onNext) throws NoApiKeyException, InputRequiredException { + + if(userMsg == null){ + userMsg = Message + .builder() + .role(Role.USER.getValue()) + .content(msg) + .build(); + + msgManager.add(userMsg); + }else{ + //msgManager.add(lastGenerationResult); + qwenParam.setPrompt(msg); + } + + Generation gen = new Generation(); + + + qwenParam.setMessages(msgManager.get()); + Semaphore semaphore = new Semaphore(0); + GenerationResult result = gen.call(qwenParam); + try { + onNext.accept(result); + } catch (Exception e) { + log.info("千问回调异常{}", e); + } + msgManager.add(result); + /*gen.streamCall(qwenParam, new ResultCallback() { + @Override + public void onEvent(GenerationResult message) { + + try { + onNext.accept(message); + lastGenerationResult = message; + //msgManager.add(message); + } catch (Exception e) { + log.info("千问回调异常{}", e); + } + } + + @Override + public void onComplete() { + + semaphore.release(); + } + + @Override + public void onError(Exception e) { + log.info("调用千问异常{}", e); + semaphore.release(); + } + });*/ + + + } +} diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/enums/ChinesePartSpeechEnum.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/enums/ChinesePartSpeechEnum.java index 58e46b4..55c0c84 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/enums/ChinesePartSpeechEnum.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/enums/ChinesePartSpeechEnum.java @@ -176,7 +176,7 @@ public enum ChinesePartSpeechEnum implements IChinesePartSpeech{ //匹配到关键词就不能作为名词加入,未匹配到的,分割前的都加入名词 SystemTalkAnswerConfigEntity entity = getSystemTalkWithKey(key, keyGroup); if(entity != null){ - if(entity.getAnswerType().equals(AskTypeEnum.IOT.getCode())){ + if(!entity.getAnswerType().equals(AskTypeEnum.COMMAND_N.getCode())){ systemTalkAnswerConfigEntities.add(entity); actions.setA(1); //已记录的加进去 @@ -194,7 +194,7 @@ public enum ChinesePartSpeechEnum implements IChinesePartSpeech{ } } }else{ - log.info("IOT不支持的自定义指令"); + log.info("COMMAND_N自定义指令"); } }else{ @@ -255,11 +255,11 @@ public enum ChinesePartSpeechEnum implements IChinesePartSpeech{ //action.setName(name);//.add(name); SystemTalkAnswerConfigEntity entity = getSystemTalkWithKey(actions.getName(), keyGroup); if(entity != null){ - if(entity.getAnswerType().equals(AskTypeEnum.IOT.getCode())){ + if(!entity.getAnswerType().equals(AskTypeEnum.COMMAND_N.getCode())){ systemTalkAnswerConfigEntities.add(entity); actions.setA(1);//a = 1; }else{ - log.info("IOT不支持的自定义指令"); + log.info("COMMAND_N自定义指令"); } }else{ diff --git a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java index e74597f..97dea9a 100644 --- a/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java +++ b/iot-common/iot-third/src/main/java/com/qiuguo/iot/third/service/QWenService.java @@ -1,5 +1,9 @@ package com.qiuguo.iot.third.service; +import cn.hutool.extra.spring.SpringUtil; +import com.alibaba.dashscope.aigc.generation.GenerationResult; +import com.alibaba.dashscope.exception.InputRequiredException; +import com.alibaba.dashscope.exception.NoApiKeyException; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.qiuguo.iot.base.constans.RedisConstans; @@ -10,6 +14,8 @@ import com.qiuguo.iot.data.dto.queue.QueuePackagingDTO; import com.qiuguo.iot.data.resp.qg.algorithm.QGResponse; import com.qiuguo.iot.data.request.qwen.TongYiCommunicationRest; import com.qiuguo.iot.data.resp.qg.algorithm.QWenReplyResponse; +import com.qiuguo.iot.third.entity.AliQianWenEntity; +import io.reactivex.functions.Consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.ReactiveValueOperations; @@ -22,6 +28,7 @@ import reactor.core.publisher.Mono; import javax.annotation.Resource; import java.time.Duration; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -33,7 +40,34 @@ public class QWenService { private ReactiveStringRedisTemplate reactiveStringRedisTemplate; - public Mono communication(TongYiCommunicationRest rest) { + protected static ConcurrentHashMap qianwenGroup = new ConcurrentHashMap<>(); + + + public Mono communication(TongYiCommunicationRest rest, Consumer onNext){ + AliQianWenEntity aliQianWen = null; + if (!qianwenGroup.containsKey(rest.getOnlyId())) { + aliQianWen = new AliQianWenEntity(SpringUtil.getProperty("Ali.qianwen")); + qianwenGroup.put(rest.getOnlyId(), aliQianWen); + } else { + aliQianWen = qianwenGroup.get(rest.getOnlyId()); + } + final AliQianWenEntity aliQianWen1 = aliQianWen; + return Mono.just(new QWenReplyResponse()).map(qWenReplyResponse -> { + try { + aliQianWen1.sendMessage(rest.getText(), onNext); + qWenReplyResponse.setCode(200); + } catch (Exception e) { + log.info("调用千问异常{}", e); + qWenReplyResponse.setCode(500); + //throw new RuntimeException(e); + + } + return qWenReplyResponse; + }); + } + public Mono communication(TongYiCommunicationRest rest){ + + ReactiveValueOperations operations = reactiveStringRedisTemplate.opsForValue(); String queueKey = RedisConstans.TY_QUEUE_LIST; diff --git a/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/filter/LogMdcConfiguration.java b/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/filter/LogMdcConfiguration.java index 9e59d36..2f45143 100644 --- a/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/filter/LogMdcConfiguration.java +++ b/iot-modules/iot-admin-http-api/src/main/java/com/qiuguo/iot/admin/http/api/filter/LogMdcConfiguration.java @@ -51,6 +51,7 @@ public class LogMdcConfiguration { @Override public void onError(Throwable throwable) { + log.info("异常{}", throwable); coreSubscriber.onError(throwable); MDC.remove(PRINT_LOG_ID); diff --git a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java index 43f4e70..c14235e 100644 --- a/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java +++ b/iot-modules/iot-box-user-api/src/main/java/com/qiuguo/iot/user/api/filter/LogMdcConfiguration.java @@ -51,6 +51,7 @@ public class LogMdcConfiguration { @Override public void onError(Throwable throwable) { + log.info("异常{}", throwable); coreSubscriber.onError(throwable); MDC.remove(PRINT_LOG_ID); diff --git a/iot-modules/iot-box-websocket-api/pom.xml b/iot-modules/iot-box-websocket-api/pom.xml index 9e84d2c..1fb4d40 100644 --- a/iot-modules/iot-box-websocket-api/pom.xml +++ b/iot-modules/iot-box-websocket-api/pom.xml @@ -75,6 +75,7 @@ compile + diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/filter/LogMdcConfiguration.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/filter/LogMdcConfiguration.java index 81833c9..6ed9439 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/filter/LogMdcConfiguration.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/filter/LogMdcConfiguration.java @@ -50,6 +50,7 @@ public class LogMdcConfiguration { @Override public void onError(Throwable throwable) { + log.info("异常{}", throwable); coreSubscriber.onError(throwable); MDC.remove(PRINT_LOG_ID); diff --git a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java index a5e0c20..c103eff 100644 --- a/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java +++ b/iot-modules/iot-box-websocket-api/src/main/java/com/qiuguo/iot/box/websocket/api/handler/BaseWebSocketProcess.java @@ -90,16 +90,28 @@ public class BaseWebSocketProcess { }else{ tongYiCommunicationRest.setOnlyId(baseSession.getUserId().toString()); } - qwenService.communication(tongYiCommunicationRest).map(data ->{ + qwenService.communication(tongYiCommunicationRest, message ->{ + //通知到客户端 BoxMessageResp resp = new BoxMessageResp(); resp.setType(0); + resp.setText(message.getOutput().getChoices().get(0).getMessage().getContent()); + String msg = JSONObject.toJSONString(resp); + log.info("推流信息到终端"); + baseSession.getSink().next(baseSession.getSession().textMessage(msg)); + //sendMessage(action, baseSession, resp); + }).map(data ->{ + if(data.getCode() == 200){ - resp.setText(data.getResut()); + log.info("千问正常结束"); + //保存记录 }else{ + BoxMessageResp resp = new BoxMessageResp(); + resp.setType(0); resp.setText("我还在努力学习中,暂时无法理解"); + sendMessage(action, baseSession, resp); } - sendMessage(action, baseSession, resp); + return data; }).subscribeOn(Schedulers.boundedElastic()).subscribe(); } diff --git a/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml b/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml index 9d2a665..ace3d5f 100644 --- a/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml +++ b/iot-modules/iot-box-websocket-api/src/main/resources/bootstrap-dev.yml @@ -41,4 +41,6 @@ qiuguo: checktoken: url: https://exper.qiuguojihua.com/data/api.auth.center/get lac: - url: http://192.168.8.175:8866/predict/lac \ No newline at end of file + url: http://192.168.8.175:8866/predict/lac +Ali: + qianwen: 'sk-8d64677afaf6404cb83ce1910b5b2558' \ No newline at end of file